Как построить конвейеры ETL в Python
Опубликовано: 2022-01-11ETL расшифровывается как Extract , Transform , Load . В рамках процесса ETL данные извлекаются, преобразуются и загружаются в хранилища данных, чтобы организации могли анализировать их для принятия стратегических решений.
Ниже приведены ключевые шаги, выполняемые в конвейере ETL:
- Извлечение: этот процесс собирает и интегрирует данные из различных источников, включая базы данных, озера данных, CRM и другие.
- Преобразование: это самый важный этап в конвейере ETL. Чтобы данные были готовы к аналитике, на этом этапе их необходимо правильно собрать, отсортировать, очистить и свести.
- Загрузка: этот процесс включает в себя импорт структурированных или неструктурированных данных из озер данных, баз данных и других источников в хранилища данных, чтобы аналитики данных или другие пользователи могли легко получить глубокое понимание.
Понимание важности Python ETL
Python — один из самых популярных и часто используемых языков программирования в современном мире с бесконечным количеством приложений в самых разных областях. Он получил престижную награду TIOBE Programming Language of the Year 2021.
Гибкий и динамичный характер Python делает его идеальным для задач развертывания, анализа и обслуживания. Python ETL — один из важнейших навыков, необходимых в Data Engineering для создания конвейеров данных, разработки статистических моделей и их тщательного анализа.
Он стал популярным инструментом для выполнения процессов ETL благодаря простоте использования и надежным библиотекам для доступа к базам данных и системам хранения. Многие команды используют Python для ETL и обработки данных, а не инструмент ETL, поскольку он более универсален и мощен для этих задач.
Самым большим преимуществом Python по сравнению с другими языками программирования является простота использования в интеллектуальном анализе данных, науке о данных, больших данных, искусственном интеллекте и машинном обучении.
Компании по всему миру используют Python для своих данных, чтобы получать информацию, управлять своими операциями и обеспечивать бесперебойную работу.
2 простых шага для создания конвейера Python ETL
В этой части вы узнаете об основных шагах по созданию конвейера ETL с использованием Python . Вы создадите базовый конвейер данных, который передает данные в базу данных Microsoft SQL Server из баз данных MySQL и Microsoft SQL Server.
Чтобы настроить скрипт Python ETL, выполните следующие действия:
Шаг 1. Установите необходимые модули
Чтобы настроить Python ETL Pipeline, вам необходимо установить следующие модули:
- Коннектор Python для MySQL: mysql-connector-python (для установки используйте команду pip install mysql-connector-python )
- Коннектор Python для Microsoft SQL Server: pyodbc (для установки используйте команду pip install pyodbc )
Шаг 2. Настройте каталог ETL
После установки вышеуказанных пакетов вам необходимо создать 4 файла Python, упомянутых ниже, в каталоге вашего проекта:
- db_credentials.py: этот файл содержит код для установления соединений со всеми базами данных.
- sql_queries.py: этот файл содержит часто используемые запросы к базе данных для извлечения и загрузки данных в строковом формате.
- etl.py: этот файл содержит необходимые операции для подключения к базе данных и выполнения необходимых запросов.
- main.py: это основной файл, который регулирует поток и выполнение конвейера Python ETL.
А) db_credentials.py
Все строки подключения к исходной и целевой базе данных должны быть включены в этот файл. Он должен содержать всю необходимую информацию для доступа к соответствующей базе данных в формате списка, чтобы его можно было быстро повторить при необходимости. Ниже приведен пример скрипта Python для установки подключения к базе данных:
datawarehouse_name = 'your_dwh_name' # sql-сервер (целевая БД, хранилище данных) datawarehouse_db_config = { «Доверенное_соединение»: «да», «драйвер»: «{SQL Server}», 'сервер': 'datawarehouse_sql_server', 'база данных': '{}'.format(datawarehouse_name), 'пользователь': 'your_db_uname', 'пароль': 'your_db_pword', «автокоммит»: правда, } # исходная база данных > sql-сервер sqlserver_db_config = [ { «Доверенное_соединение»: «да», «драйвер»: «{SQL Server}», 'сервер': 'ваш_db_sql_server', «база данных»: «db_1st», 'пользователь': 'your_db_uname', 'пароль': 'your_db_pword', «автокоммит»: правда, } ] # исходная база данных > mysql mysql_db_config = [ { 'пользователь': 'ваш_1_пользователь', 'пароль': 'ваш_1_пароль', «хост»: «db_connection_string_1», «база данных»: «db_1st», }, { 'пользователь': 'ваш_2_пользователь, 'пароль': 'ваш_2_пароль', «хост»: «db_connection_string_2», «база данных»: «db_2nd», }, ]
Б) sql_queries.py
Этот файл включает запросы для извлечения данных из исходных баз данных и загрузки их в целевую базу данных. Следующий скрипт поможет вам выполнить эту задачу:
# примеры запросов, будут уникальными для разных платформ баз данных sqlserver_extract = (''' ВЫБЕРИТЕ sqlserver_col_1, sqlserver_col_2, sqlserver_col_3 ИЗ sqlserver_1_table ''') sqlserver_insert = (''' ВСТАВИТЬ В table_demo (col_1, col_2, col_3) ЗНАЧЕНИЯ (?, ?, ?) ''') mysql_extract = (''' ВЫБЕРИТЕ mysql_col_1, mysql_col_2, mysql_col_3 ОТ mysql_demo_table ''') mysql_insert = (''' ВСТАВИТЬ В table_demo (col_1, col_2, col_3) ЗНАЧЕНИЯ (?, ?, ?) ''') # Запросы экспортируются класс Sql_Query: def __init__(я, extract_query, load_query): self.extract_query = экстракт_запроса self.load_query = загрузить_запрос # создаем экземпляры для класса Sql_Query sqlserver_query = SqlQuery (sqlserver_extract, sqlserver_insert) mysql_query = SqlQuery (mysql_extract, mysql_insert) # создание списка для перебора значений mysql_queries = [mysql_query] sqlserver_query = [sqlserver_query]
С) этл.py
Этот файл должен включать код, необходимый для доступа к соответствующим базам данных и выполнения необходимых запросов. Следующий скрипт поможет вам выполнить эту задачу:
# модули на основе Python импортировать pyodbc импортировать mysql.connector def etl (запрос, source_cnx, target_cnx): # извлечь данные из демо исходной базы данных source_cursor = source_cnx.cursor() source_cursor.execute(query.extract_query) данные = source_cursor.fetchall() source_cursor.close() # загрузить данные в демонстрационную базу данных хранилища данных если данные: target_cursor = target_cnx.cursor() target_cursor.execute("ИСПОЛЬЗОВАТЬ {}".format(name_for_datawarehouse)) target_cursor.executemany(query.load_query, данные) print('данные загружены в демонстрационную базу данных хранилища данных') target_cursor.close() еще: print('данные пусты') def etl_process(запросы, target_cnx, source_db_config, db_platform): # настройка подключения к исходной демо-базе данных если db_platform == 'mysql': source_cnx = mysql.connector.connect(**source_db_config) Элиф db_platform == 'sqlserver': source_cnx = pyodbc.connect(**source_db_config) еще: вернуть 'Ошибка! неизвестная исходная платформа базы данных" # цикл по sql запросам для запроса в запросах: etl (запрос, source_cnx, target_cnx) # закрыть соединение с исходной базой данных source_cnx.close()
Г) main.py
Этот файл включает код для повторения заданных учетных данных для подключения к базе данных и выполнения необходимых операций ETL Python. Следующий скрипт поможет вам выполнить эту задачу:
# переменные из db_credentials импортировать datawarehouse_db_config, sqlserver_db_config, mysql_db_config из sql_queries импортировать sqlserver_queries, mysql_queries # методы из etl импортировать etl_process деф основной(): print('запуск обработки данных etl') # установить соединение с SQL Server, желаемое целевое хранилище target_cnx = pyodbc.connect(**datawarehouse_db_config) # цикл по учетным данным # База данных > mysql для конфигурации в mysql_db_config: пытаться: print("загрузка БД: " + config['база данных']) etl_process(mysql_queries, target_cnx, конфигурация, 'mysql') кроме исключения как ошибки: print("etl для {} имеет ошибку".format(config['database'])) print('сообщение об ошибке: {}'.format(ошибка)) Продолжать # База данных > sql-сервер для конфигурации в sqlserver_db_config: пытаться: print("загрузка БД: " + config['база данных']) etl_process(sqlserver_queries, target_cnx, конфигурация, 'sqlserver') кроме исключения как ошибки: print("etl для {} имеет ошибку".format(config['database'])) print('сообщение об ошибке: {}'.format(ошибка)) Продолжать target_cnx.close() если __name__ == "__main__": основной()
Заключение
Отличная работа! Вы успешно получили базовые знания о построении Python ETL Pipeline. Теперь вы можете реализовать свой собственный скрипт Python ETL на основе ваших требований, внеся соответствующие изменения в используемые базы данных и запросы.
Чтобы изучить широко используемые инструменты Python ETL в отрасли, прочитайте блог Best Python ETL Tools.
Большинство организаций в настоящее время работают с большими данными. Следовательно, создание конвейера ETL с нуля для таких данных может занять много времени и быть сложной задачей.
Более того, предприятиям потребуется вложить значительный объем ресурсов, чтобы построить его, а затем гарантировать, что они смогут справиться с большими объемами данных и колебаниями схемы.
Таким образом, вместо того, чтобы создавать сценарии ETL с нуля, вы можете использовать автоматизированные конвейеры данных, такие как Hevo.
Есть какие-нибудь мысли по этому поводу? Дайте нам знать внизу в комментариях или перенесите обсуждение в наш Twitter или Facebook.