Cómo construir canalizaciones ETL en Python
Publicado: 2022-01-11ETL significa Extraer , Transformar , Cargar. Como parte del proceso ETL, los datos se extraen, transforman y cargan en almacenes de datos para que las organizaciones puedan analizarlos para tomar decisiones estratégicas.
Los siguientes son los pasos clave realizados en la canalización de ETL:
- Extraer: este proceso recopila e integra datos de una variedad de fuentes, incluidas bases de datos, lagos de datos, CRM y otros.
- Transformar: Esta es la fase más crucial en un Pipeline ETL. Para que los datos estén listos para el análisis, deben recopilarse, clasificarse, limpiarse y girarse correctamente en este paso.
- Carga: este proceso implica la importación de datos estructurados o no estructurados de lagos de datos, bases de datos y otras fuentes a almacenes de datos para que los analistas de datos u otros usuarios puedan obtener información profunda fácilmente.
Comprender la importancia de Python ETL
Python es uno de los lenguajes de programación más populares y comúnmente aprovechados del mundo moderno, con infinitas aplicaciones en una variedad de campos. Ha ganado el prestigioso premio TIOBE Programación Lenguaje del Año 2021.
La naturaleza flexible y dinámica de Python lo hace ideal para tareas de implementación, análisis y mantenimiento. Python ETL es una de las habilidades cruciales requeridas en la ingeniería de datos para construir canalizaciones de datos, desarrollar modelos estadísticos y realizar un análisis exhaustivo de ellos.
Se ha convertido en una herramienta popular para ejecutar procesos ETL debido a su facilidad de uso y bibliotecas robustas para acceder a bases de datos y sistemas de almacenamiento. Muchos equipos usan Python para ETL e ingeniería de datos en lugar de una herramienta ETL, ya que es más versátil y potente para estas tareas.
El mayor beneficio de Python sobre otros lenguajes de programación es la simplicidad de uso en minería de datos, ciencia de datos, big data, inteligencia artificial y aprendizaje automático.
Las empresas de todo el mundo utilizan Python para sus datos a fin de obtener información, administrar sus operaciones y mantener todo funcionando sin problemas.
2 sencillos pasos para construir una canalización ETL de Python
En esta parte, aprenderá los pasos esenciales para crear una canalización de ETL con Python . Creará una canalización de datos básica que alimenta datos a una base de datos de Microsoft SQL Server desde bases de datos MySQL y Microsoft SQL Server.
Para configurar el script Python ETL, siga los pasos a continuación:
Paso 1: instale los módulos necesarios
Para configurar Python ETL Pipeline, deberá instalar los siguientes módulos:
- Conector de Python a MySQL: mysql-connector-python (Use el comando pip install mysql-connector-python para instalar)
- Python a Microsoft SQL Server Connector: pyodbc (Use el comando pip install pyodbc para instalar)
Paso 2: configurar el directorio ETL
Después de instalar los paquetes anteriores, debe crear 4 archivos de Python, que se mencionan a continuación en el directorio de su proyecto:
- db_credentials.py: Este archivo incluye código para establecer conexiones con todas las Bases de Datos.
- sql_queries.py: este archivo comprende las consultas de base de datos comúnmente utilizadas para extraer y cargar datos en formato de cadena.
- etl.py: Este archivo posee las operaciones necesarias para conectarse a la Base de Datos y ejecutar las consultas requeridas.
- main.py: este es el archivo principal que regula el flujo y la ejecución de la tubería ETL de Python.
A) db_credenciales.py
Todas las cadenas de conexión de la base de datos de origen y de destino deben incluirse en este archivo. Debe contener toda la información necesaria para acceder a la base de datos relevante en un formato de lista para que pueda iterarse rápidamente cuando sea necesario. El siguiente es un script de Python de muestra para establecer la conexión de la base de datos:
datawarehouse_name = 'tu_dwh_name' # sql-server (bd de destino, almacén de datos) datawarehouse_db_config = { 'Conexión_de_Confianza': 'sí', 'controlador': '{servidor SQL}', 'servidor': 'datawarehouse_sql_server', 'base de datos': '{}'.format(nombre_del_almacén_de_datos), 'usuario': 'tu_db_uname', 'contraseña': 'tu_palabra_bd', 'compromiso automático': Verdadero, } # fuente db > sql-server sqlserver_db_config = [ { 'Conexión_de_Confianza': 'sí', 'controlador': '{servidor SQL}', 'servidor': 'su_db_sql_server', 'base de datos': 'db_1st', 'usuario': 'tu_db_uname', 'contraseña': 'tu_palabra_bd', 'compromiso automático': Verdadero, } ] # base de datos fuente > mysql mysql_db_config = [ { 'usuario': 'tu_1_usuario', 'contraseña': 'tu_1_contraseña', 'host': 'db_connection_string_1', 'base de datos': 'db_1st', }, { 'usuario': 'tu_2_usuario, 'contraseña': 'tu_2_contraseña', 'host': 'db_connection_string_2', 'base de datos': 'db_2nd', }, ]
B) sql_consultas.py
Este archivo incluye consultas para extraer datos de las bases de datos de origen y cargarlos en la base de datos de destino. El siguiente script le ayudará a realizar esta tarea:
# consultas de ejemplo, serán únicas para diferentes plataformas de bases de datos sqlserver_extract = (''' SELECCIONE sqlserver_col_1, sqlserver_col_2, sqlserver_col_3 DESDE sqlserver_1_table ''') sqlserver_insert = (''' INSERTAR EN table_demo (col_1, col_2, col_3) VALORES (?, ?, ?) ''') extracto_mysql = (''' SELECCIONE mysql_col_1, mysql_col_2, mysql_col_3 DESDE mysql_demo_table ''') mysql_insert = (''' INSERTAR EN table_demo (col_1, col_2, col_3) VALORES (?, ?, ?) ''') # Consultas siendo exportadas clase Sql_Query: def __init__(self, extraer_consulta, cargar_consulta): self.extraer_consulta = extraer_consulta self.load_query = load_query # crear instancias para la clase Sql_Query sqlserver_query = SqlQuery(sqlserver_extract, sqlserver_insert) mysql_query = SqlQuery(mysql_extract, mysql_insert) # creando una lista para iterar a través de los valores consultas_mysql = [consultas_mysql] sqlserver_consultas = [sqlserver_consulta]
C) etl.py
Este archivo debe incluir el código requerido para acceder a las Bases de datos relevantes y ejecutar las consultas requeridas. El siguiente script le ayudará a realizar esta tarea:
# módulos basados en python importar pyodbc importar mysql.conector def etl(consulta, source_cnx, target_cnx): # extraer datos de la base de datos fuente de demostración fuente_cursor = fuente_cnx.cursor() source_cursor.execute(consulta.extraer_consulta) datos = source_cursor.fetchall() fuente_cursor.cerrar() # cargar datos en la base de datos de demostración Data Warehouse si datos: objetivo_cursor = objetivo_cnx.cursor() target_cursor.execute("USE {}".format(name_for_datawarehouse)) target_cursor.executemany(query.load_query, data) print('datos cargados en la base de datos de demostración Data Warehouse') target_cursor.cerrar() demás: imprimir ('los datos están vacíos') def etl_process (consultas, target_cnx, source_db_config, db_platform): # configurar la conexión de la base de datos fuente de demostración si db_plataforma == 'mysql': source_cnx = mysql.conector.connect(**source_db_config) elif db_platform == 'sqlserver': source_cnx = pyodbc.connect(**source_db_config) demás: devuelve '¡Error! plataforma de base de datos de origen no reconocida' # recorrer las consultas sql para consulta en consultas: etl (consulta, source_cnx, target_cnx) # cerrar la conexión de base de datos de origen fuente_cnx.cerrar()
D) principal.py
Este archivo incluye código para iterar a través de las credenciales dadas para conectarse a la base de datos y ejecutar las operaciones ETL Python necesarias. El siguiente script le ayudará a realizar esta tarea:
# variables desde db_credentials importar datawarehouse_db_config, sqlserver_db_config, mysql_db_config desde sql_queries importar sqlserver_queries, mysql_queries # métodos desde etl importar etl_process def principal(): print('comenzando el proceso de datos etl') # establecer conexión para SQL Server, almacenamiento de destino deseado target_cnx = pyodbc.connect(**datawarehouse_db_config) # recorriendo las credenciales # Base de datos > mysql para la configuración en mysql_db_config: tratar: print("cargando db: " + config['base de datos']) etl_process(mysql_queries, target_cnx, config, 'mysql') excepto Excepción como error: print("etl para {} tiene error".format(config['database'])) imprimir ('mensaje de error: {}'. formato (error)) Seguir # Base de datos > servidor sql para la configuración en sqlserver_db_config: tratar: print("cargando db: " + config['base de datos']) etl_process(sqlserver_queries, target_cnx, config, 'sqlserver') excepto Excepción como error: print("etl para {} tiene error".format(config['database'])) imprimir ('mensaje de error: {}'. formato (error)) Seguir objetivo_cnx.cerrar() si __nombre__ == "__principal__": principal()
Conclusión
¡Buen trabajo! Obtuvo con éxito una comprensión básica de la construcción de Python ETL Pipeline. Ahora puede implementar su secuencia de comandos ETL de Python personalizada en función de sus requisitos realizando cambios en las bases de datos que se utilizan y consultando en consecuencia.
Para explorar las herramientas Python ETL ampliamente utilizadas en la industria, lea el blog Best Python ETL Tools.
La mayoría de las organizaciones hoy en día trabajan con Big Data. Por lo tanto, crear una tubería ETL desde cero para dichos datos puede llevar mucho tiempo y ser un desafío.
Además, las empresas deberán invertir una cantidad significativa de recursos para construirlo y luego garantizar que pueden mantenerse al día con el alto volumen de datos y las fluctuaciones del esquema.
Entonces, en lugar de crear secuencias de comandos ETL desde cero, puede aprovechar las canalizaciones de datos automatizadas como Hevo.
¿Tiene alguna idea sobre esto? Háganos saber a continuación en los comentarios o lleve la discusión a nuestro Twitter o Facebook.