Cómo construir canalizaciones ETL en Python

Publicado: 2022-01-11

ETL significa Extraer , Transformar , Cargar. Como parte del proceso ETL, los datos se extraen, transforman y cargan en almacenes de datos para que las organizaciones puedan analizarlos para tomar decisiones estratégicas.

Los siguientes son los pasos clave realizados en la canalización de ETL:

  • Extraer: este proceso recopila e integra datos de una variedad de fuentes, incluidas bases de datos, lagos de datos, CRM y otros.
  • Transformar: Esta es la fase más crucial en un Pipeline ETL. Para que los datos estén listos para el análisis, deben recopilarse, clasificarse, limpiarse y girarse correctamente en este paso.
  • Carga: este proceso implica la importación de datos estructurados o no estructurados de lagos de datos, bases de datos y otras fuentes a almacenes de datos para que los analistas de datos u otros usuarios puedan obtener información profunda fácilmente.

Comprender la importancia de Python ETL

Python es uno de los lenguajes de programación más populares y comúnmente aprovechados del mundo moderno, con infinitas aplicaciones en una variedad de campos. Ha ganado el prestigioso premio TIOBE Programación Lenguaje del Año 2021.

La naturaleza flexible y dinámica de Python lo hace ideal para tareas de implementación, análisis y mantenimiento. Python ETL es una de las habilidades cruciales requeridas en la ingeniería de datos para construir canalizaciones de datos, desarrollar modelos estadísticos y realizar un análisis exhaustivo de ellos.

Se ha convertido en una herramienta popular para ejecutar procesos ETL debido a su facilidad de uso y bibliotecas robustas para acceder a bases de datos y sistemas de almacenamiento. Muchos equipos usan Python para ETL e ingeniería de datos en lugar de una herramienta ETL, ya que es más versátil y potente para estas tareas.

El mayor beneficio de Python sobre otros lenguajes de programación es la simplicidad de uso en minería de datos, ciencia de datos, big data, inteligencia artificial y aprendizaje automático.

Las empresas de todo el mundo utilizan Python para sus datos a fin de obtener información, administrar sus operaciones y mantener todo funcionando sin problemas.

2 sencillos pasos para construir una canalización ETL de Python

En esta parte, aprenderá los pasos esenciales para crear una canalización de ETL con Python . Creará una canalización de datos básica que alimenta datos a una base de datos de Microsoft SQL Server desde bases de datos MySQL y Microsoft SQL Server.

Para configurar el script Python ETL, siga los pasos a continuación:

Paso 1: instale los módulos necesarios

Para configurar Python ETL Pipeline, deberá instalar los siguientes módulos:

  • Conector de Python a MySQL: mysql-connector-python (Use el comando pip install mysql-connector-python para instalar)
  • Python a Microsoft SQL Server Connector: pyodbc (Use el comando pip install pyodbc para instalar)

Paso 2: configurar el directorio ETL

Después de instalar los paquetes anteriores, debe crear 4 archivos de Python, que se mencionan a continuación en el directorio de su proyecto:

  • db_credentials.py: Este archivo incluye código para establecer conexiones con todas las Bases de Datos.
  • sql_queries.py: este archivo comprende las consultas de base de datos comúnmente utilizadas para extraer y cargar datos en formato de cadena.
  • etl.py: Este archivo posee las operaciones necesarias para conectarse a la Base de Datos y ejecutar las consultas requeridas.
  • main.py: este es el archivo principal que regula el flujo y la ejecución de la tubería ETL de Python.

A) db_credenciales.py

Todas las cadenas de conexión de la base de datos de origen y de destino deben incluirse en este archivo. Debe contener toda la información necesaria para acceder a la base de datos relevante en un formato de lista para que pueda iterarse rápidamente cuando sea necesario. El siguiente es un script de Python de muestra para establecer la conexión de la base de datos:

 datawarehouse_name = 'tu_dwh_name'
# sql-server (bd de destino, almacén de datos)
datawarehouse_db_config = {
  'Conexión_de_Confianza': 'sí',
  'controlador': '{servidor SQL}',
  'servidor': 'datawarehouse_sql_server',
  'base de datos': '{}'.format(nombre_del_almacén_de_datos),
  'usuario': 'tu_db_uname',
  'contraseña': 'tu_palabra_bd',
  'compromiso automático': Verdadero,
}
# fuente db > sql-server
sqlserver_db_config = [
  {
    'Conexión_de_Confianza': 'sí',
    'controlador': '{servidor SQL}',
    'servidor': 'su_db_sql_server',
    'base de datos': 'db_1st',
    'usuario': 'tu_db_uname',
    'contraseña': 'tu_palabra_bd',
    'compromiso automático': Verdadero,
  }
]
# base de datos fuente > mysql
mysql_db_config = [
  {
    'usuario': 'tu_1_usuario',
    'contraseña': 'tu_1_contraseña',
    'host': 'db_connection_string_1',
    'base de datos': 'db_1st',
  },
  {
    'usuario': 'tu_2_usuario,
    'contraseña': 'tu_2_contraseña',
    'host': 'db_connection_string_2',
    'base de datos': 'db_2nd',
  },
]

B) sql_consultas.py

Este archivo incluye consultas para extraer datos de las bases de datos de origen y cargarlos en la base de datos de destino. El siguiente script le ayudará a realizar esta tarea:

 # consultas de ejemplo, serán únicas para diferentes plataformas de bases de datos

sqlserver_extract = ('''
  SELECCIONE sqlserver_col_1, sqlserver_col_2, sqlserver_col_3
  DESDE sqlserver_1_table
''')
sqlserver_insert = ('''
  INSERTAR EN table_demo (col_1, col_2, col_3)
  VALORES (?, ?, ?)  
''')
extracto_mysql = ('''
  SELECCIONE mysql_col_1, mysql_col_2, mysql_col_3
  DESDE mysql_demo_table
''')
mysql_insert = ('''
  INSERTAR EN table_demo (col_1, col_2, col_3)
  VALORES (?, ?, ?)  
''')

# Consultas siendo exportadas
clase Sql_Query:
  def __init__(self, extraer_consulta, cargar_consulta):
    self.extraer_consulta = extraer_consulta
    self.load_query = load_query   
# crear instancias para la clase Sql_Query
sqlserver_query = SqlQuery(sqlserver_extract, sqlserver_insert)
mysql_query = SqlQuery(mysql_extract, mysql_insert)
# creando una lista para iterar a través de los valores
consultas_mysql = [consultas_mysql]
sqlserver_consultas = [sqlserver_consulta]

C) etl.py

Este archivo debe incluir el código requerido para acceder a las Bases de datos relevantes y ejecutar las consultas requeridas. El siguiente script le ayudará a realizar esta tarea:

 # módulos basados ​​en python
importar pyodbc
importar mysql.conector

def etl(consulta, source_cnx, target_cnx):
  # extraer datos de la base de datos fuente de demostración
  fuente_cursor = fuente_cnx.cursor()
  source_cursor.execute(consulta.extraer_consulta)
  datos = source_cursor.fetchall()
  fuente_cursor.cerrar()

  # cargar datos en la base de datos de demostración Data Warehouse
  
si datos:
    objetivo_cursor = objetivo_cnx.cursor()
    target_cursor.execute("USE {}".format(name_for_datawarehouse))
    target_cursor.executemany(query.load_query, data)
    print('datos cargados en la base de datos de demostración Data Warehouse')
    target_cursor.cerrar()
  demás:
    imprimir ('los datos están vacíos')

def etl_process (consultas, target_cnx, source_db_config, db_platform):

  # configurar la conexión de la base de datos fuente de demostración
  si db_plataforma == 'mysql':
    source_cnx = mysql.conector.connect(**source_db_config)
  elif db_platform == 'sqlserver':
    source_cnx = pyodbc.connect(**source_db_config)
  demás:
    devuelve '¡Error! plataforma de base de datos de origen no reconocida'
  # recorrer las consultas sql
  para consulta en consultas:
    etl (consulta, source_cnx, target_cnx)    
  # cerrar la conexión de base de datos de origen
  fuente_cnx.cerrar()

D) principal.py

Este archivo incluye código para iterar a través de las credenciales dadas para conectarse a la base de datos y ejecutar las operaciones ETL Python necesarias. El siguiente script le ayudará a realizar esta tarea:

 # variables
desde db_credentials importar datawarehouse_db_config, sqlserver_db_config, mysql_db_config
desde sql_queries importar sqlserver_queries, mysql_queries

# métodos
desde etl importar etl_process
def principal():
  print('comenzando el proceso de datos etl')
	
  # establecer conexión para SQL Server, almacenamiento de destino deseado
  target_cnx = pyodbc.connect(**datawarehouse_db_config)
	
  # recorriendo las credenciales
  # Base de datos > mysql
  para la configuración en mysql_db_config: 
    tratar:
      print("cargando db: " + config['base de datos'])
      etl_process(mysql_queries, target_cnx, config, 'mysql')
    excepto Excepción como error:
      print("etl para {} tiene error".format(config['database']))
      imprimir ('mensaje de error: {}'. formato (error))
      Seguir
	
  # Base de datos > servidor sql
  para la configuración en sqlserver_db_config: 
    tratar:
      print("cargando db: " + config['base de datos'])
      etl_process(sqlserver_queries, target_cnx, config, 'sqlserver')
    excepto Excepción como error:
      print("etl para {} tiene error".format(config['database']))
      imprimir ('mensaje de error: {}'. formato (error))
      Seguir

  objetivo_cnx.cerrar()
si __nombre__ == "__principal__":
  principal()

Conclusión

¡Buen trabajo! Obtuvo con éxito una comprensión básica de la construcción de Python ETL Pipeline. Ahora puede implementar su secuencia de comandos ETL de Python personalizada en función de sus requisitos realizando cambios en las bases de datos que se utilizan y consultando en consecuencia.

Para explorar las herramientas Python ETL ampliamente utilizadas en la industria, lea el blog Best Python ETL Tools.

La mayoría de las organizaciones hoy en día trabajan con Big Data. Por lo tanto, crear una tubería ETL desde cero para dichos datos puede llevar mucho tiempo y ser un desafío.

Además, las empresas deberán invertir una cantidad significativa de recursos para construirlo y luego garantizar que pueden mantenerse al día con el alto volumen de datos y las fluctuaciones del esquema.

Entonces, en lugar de crear secuencias de comandos ETL desde cero, puede aprovechar las canalizaciones de datos automatizadas como Hevo.

¿Tiene alguna idea sobre esto? Háganos saber a continuación en los comentarios o lleve la discusión a nuestro Twitter o Facebook.

Recomendaciones de los editores: