Как построить конвейеры ETL в Python

Опубликовано: 2022-01-11

ETL расшифровывается как Extract , Transform , Load . В рамках процесса ETL данные извлекаются, преобразуются и загружаются в хранилища данных, чтобы организации могли анализировать их для принятия стратегических решений.

Ниже приведены ключевые шаги, выполняемые в конвейере ETL:

  • Извлечение: этот процесс собирает и интегрирует данные из различных источников, включая базы данных, озера данных, CRM и другие.
  • Преобразование: это самый важный этап в конвейере ETL. Чтобы данные были готовы к аналитике, на этом этапе их необходимо правильно собрать, отсортировать, очистить и свести.
  • Загрузка: этот процесс включает в себя импорт структурированных или неструктурированных данных из озер данных, баз данных и других источников в хранилища данных, чтобы аналитики данных или другие пользователи могли легко получить глубокое понимание.

Понимание важности Python ETL

Python — один из самых популярных и часто используемых языков программирования в современном мире с бесконечным количеством приложений в самых разных областях. Он получил престижную награду TIOBE Programming Language of the Year 2021.

Гибкий и динамичный характер Python делает его идеальным для задач развертывания, анализа и обслуживания. Python ETL — один из важнейших навыков, необходимых в Data Engineering для создания конвейеров данных, разработки статистических моделей и их тщательного анализа.

Он стал популярным инструментом для выполнения процессов ETL благодаря простоте использования и надежным библиотекам для доступа к базам данных и системам хранения. Многие команды используют Python для ETL и обработки данных, а не инструмент ETL, поскольку он более универсален и мощен для этих задач.

Самым большим преимуществом Python по сравнению с другими языками программирования является простота использования в интеллектуальном анализе данных, науке о данных, больших данных, искусственном интеллекте и машинном обучении.

Компании по всему миру используют Python для своих данных, чтобы получать информацию, управлять своими операциями и обеспечивать бесперебойную работу.

2 простых шага для создания конвейера Python ETL

В этой части вы узнаете об основных шагах по созданию конвейера ETL с использованием Python . Вы создадите базовый конвейер данных, который передает данные в базу данных Microsoft SQL Server из баз данных MySQL и Microsoft SQL Server.

Чтобы настроить скрипт Python ETL, выполните следующие действия:

Шаг 1. Установите необходимые модули

Чтобы настроить Python ETL Pipeline, вам необходимо установить следующие модули:

  • Коннектор Python для MySQL: mysql-connector-python (для установки используйте команду pip install mysql-connector-python )
  • Коннектор Python для Microsoft SQL Server: pyodbc (для установки используйте команду pip install pyodbc )

Шаг 2. Настройте каталог ETL

После установки вышеуказанных пакетов вам необходимо создать 4 файла Python, упомянутых ниже, в каталоге вашего проекта:

  • db_credentials.py: этот файл содержит код для установления соединений со всеми базами данных.
  • sql_queries.py: этот файл содержит часто используемые запросы к базе данных для извлечения и загрузки данных в строковом формате.
  • etl.py: этот файл содержит необходимые операции для подключения к базе данных и выполнения необходимых запросов.
  • main.py: это основной файл, который регулирует поток и выполнение конвейера Python ETL.

А) db_credentials.py

Все строки подключения к исходной и целевой базе данных должны быть включены в этот файл. Он должен содержать всю необходимую информацию для доступа к соответствующей базе данных в формате списка, чтобы его можно было быстро повторить при необходимости. Ниже приведен пример скрипта Python для установки подключения к базе данных:

 datawarehouse_name = 'your_dwh_name'
# sql-сервер (целевая БД, хранилище данных)
datawarehouse_db_config = {
  «Доверенное_соединение»: «да»,
  «драйвер»: «{SQL Server}»,
  'сервер': 'datawarehouse_sql_server',
  'база данных': '{}'.format(datawarehouse_name),
  'пользователь': 'your_db_uname',
  'пароль': 'your_db_pword',
  «автокоммит»: правда,
}
# исходная база данных > sql-сервер
sqlserver_db_config = [
  {
    «Доверенное_соединение»: «да»,
    «драйвер»: «{SQL Server}»,
    'сервер': 'ваш_db_sql_server',
    «база данных»: «db_1st»,
    'пользователь': 'your_db_uname',
    'пароль': 'your_db_pword',
    «автокоммит»: правда,
  }
]
# исходная база данных > mysql
mysql_db_config = [
  {
    'пользователь': 'ваш_1_пользователь',
    'пароль': 'ваш_1_пароль',
    «хост»: «db_connection_string_1»,
    «база данных»: «db_1st»,
  },
  {
    'пользователь': 'ваш_2_пользователь,
    'пароль': 'ваш_2_пароль',
    «хост»: «db_connection_string_2»,
    «база данных»: «db_2nd»,
  },
]

Б) sql_queries.py

Этот файл включает запросы для извлечения данных из исходных баз данных и загрузки их в целевую базу данных. Следующий скрипт поможет вам выполнить эту задачу:

 # примеры запросов, будут уникальными для разных платформ баз данных

sqlserver_extract = ('''
  ВЫБЕРИТЕ sqlserver_col_1, sqlserver_col_2, sqlserver_col_3
  ИЗ sqlserver_1_table
''')
sqlserver_insert = ('''
  ВСТАВИТЬ В table_demo (col_1, col_2, col_3)
  ЗНАЧЕНИЯ (?, ?, ?)  
''')
mysql_extract = ('''
  ВЫБЕРИТЕ mysql_col_1, mysql_col_2, mysql_col_3
  ОТ mysql_demo_table
''')
mysql_insert = ('''
  ВСТАВИТЬ В table_demo (col_1, col_2, col_3)
  ЗНАЧЕНИЯ (?, ?, ?)  
''')

# Запросы экспортируются
класс Sql_Query:
  def __init__(я, extract_query, load_query):
    self.extract_query = экстракт_запроса
    self.load_query = загрузить_запрос   
# создаем экземпляры для класса Sql_Query
sqlserver_query = SqlQuery (sqlserver_extract, sqlserver_insert)
mysql_query = SqlQuery (mysql_extract, mysql_insert)
# создание списка для перебора значений
mysql_queries = [mysql_query]
sqlserver_query = [sqlserver_query]

С) этл.py

Этот файл должен включать код, необходимый для доступа к соответствующим базам данных и выполнения необходимых запросов. Следующий скрипт поможет вам выполнить эту задачу:

 # модули на основе Python
импортировать pyodbc
импортировать mysql.connector

def etl (запрос, source_cnx, target_cnx):
  # извлечь данные из демо исходной базы данных
  source_cursor = source_cnx.cursor()
  source_cursor.execute(query.extract_query)
  данные = source_cursor.fetchall()
  source_cursor.close()

  # загрузить данные в демонстрационную базу данных хранилища данных
  
если данные:
    target_cursor = target_cnx.cursor()
    target_cursor.execute("ИСПОЛЬЗОВАТЬ {}".format(name_for_datawarehouse))
    target_cursor.executemany(query.load_query, данные)
    print('данные загружены в демонстрационную базу данных хранилища данных')
    target_cursor.close()
  еще:
    print('данные пусты')

def etl_process(запросы, target_cnx, source_db_config, db_platform):

  # настройка подключения к исходной демо-базе данных
  если db_platform == 'mysql':
    source_cnx = mysql.connector.connect(**source_db_config)
  Элиф db_platform == 'sqlserver':
    source_cnx = pyodbc.connect(**source_db_config)
  еще:
    вернуть 'Ошибка! неизвестная исходная платформа базы данных"
  # цикл по sql запросам
  для запроса в запросах:
    etl (запрос, source_cnx, target_cnx)    
  # закрыть соединение с исходной базой данных
  source_cnx.close()

Г) main.py

Этот файл включает код для повторения заданных учетных данных для подключения к базе данных и выполнения необходимых операций ETL Python. Следующий скрипт поможет вам выполнить эту задачу:

 # переменные
из db_credentials импортировать datawarehouse_db_config, sqlserver_db_config, mysql_db_config
из sql_queries импортировать sqlserver_queries, mysql_queries

# методы
из etl импортировать etl_process
деф основной():
  print('запуск обработки данных etl')
	
  # установить соединение с SQL Server, желаемое целевое хранилище
  target_cnx = pyodbc.connect(**datawarehouse_db_config)
	
  # цикл по учетным данным
  # База данных > mysql
  для конфигурации в mysql_db_config: 
    пытаться:
      print("загрузка БД: " + config['база данных'])
      etl_process(mysql_queries, target_cnx, конфигурация, 'mysql')
    кроме исключения как ошибки:
      print("etl для {} имеет ошибку".format(config['database']))
      print('сообщение об ошибке: {}'.format(ошибка))
      Продолжать
	
  # База данных > sql-сервер
  для конфигурации в sqlserver_db_config: 
    пытаться:
      print("загрузка БД: " + config['база данных'])
      etl_process(sqlserver_queries, target_cnx, конфигурация, 'sqlserver')
    кроме исключения как ошибки:
      print("etl для {} имеет ошибку".format(config['database']))
      print('сообщение об ошибке: {}'.format(ошибка))
      Продолжать

  target_cnx.close()
если __name__ == "__main__":
  основной()

Заключение

Отличная работа! Вы успешно получили базовые знания о построении Python ETL Pipeline. Теперь вы можете реализовать свой собственный скрипт Python ETL на основе ваших требований, внеся соответствующие изменения в используемые базы данных и запросы.

Чтобы изучить широко используемые инструменты Python ETL в отрасли, прочитайте блог Best Python ETL Tools.

Большинство организаций в настоящее время работают с большими данными. Следовательно, создание конвейера ETL с нуля для таких данных может занять много времени и быть сложной задачей.

Более того, предприятиям потребуется вложить значительный объем ресурсов, чтобы построить его, а затем гарантировать, что они смогут справиться с большими объемами данных и колебаниями схемы.

Таким образом, вместо того, чтобы создавать сценарии ETL с нуля, вы можете использовать автоматизированные конвейеры данных, такие как Hevo.

Есть какие-нибудь мысли по этому поводу? Дайте нам знать внизу в комментариях или перенесите обсуждение в наш Twitter или Facebook.

Рекомендации редакции: