Como construir pipelines ETL em Python

Publicados: 2022-01-11

ETL significa E xtract, T ransform, L oad. Como parte do processo de ETL, os dados são extraídos, transformados e carregados em data warehouses para que as organizações possam analisá-los para tomar decisões estratégicas.

A seguir estão as principais etapas executadas no pipeline de ETL:

  • Extrair: esse processo coleta e integra dados de várias fontes, incluindo Bancos de Dados, Data Lakes, CRMs e outros.
  • Transformar: Esta é a fase mais crucial em um pipeline de ETL. Para tornar os dados prontos para análise, eles devem ser coletados, classificados, limpos e dinamizados adequadamente nesta etapa.
  • Carga: esse processo envolve a importação de dados estruturados ou não estruturados de Data Lakes, Bancos de Dados e outras fontes para Data Warehouses para que os Analistas de Dados ou outros usuários possam obter insights profundos com facilidade.

Entendendo a importância do Python ETL

Python é uma das linguagens de programação mais populares e comumente alavancadas do mundo moderno, com inúmeras aplicações em uma variedade de campos. Ele ganhou o prestigioso prêmio TIOBE Programming Language of the Year 2021.

A natureza flexível e dinâmica do Python o torna ideal para tarefas de implantação, análise e manutenção. O Python ETL é uma das habilidades cruciais necessárias na Engenharia de Dados para construir pipelines de dados, desenvolver modelos estatísticos e realizar uma análise completa sobre eles.

Tornou-se uma ferramenta popular para executar processos ETL devido à sua facilidade de uso e bibliotecas robustas para acessar bancos de dados e sistemas de armazenamento. Muitas equipes usam o Python para ETL e Engenharia de Dados em vez de uma ferramenta ETL, pois é mais versátil e poderoso para essas tarefas.

O maior benefício do Python sobre outras linguagens de programação é a simplicidade de uso em Data Mining, Data Science, Big Data, Inteligência Artificial e Machine Learning.

Empresas de todo o mundo usam Python para seus dados para obter insights, gerenciar suas operações e manter tudo funcionando sem problemas.

2 etapas fáceis para construir o pipeline ETL Python

Nesta parte, você aprenderá as etapas essenciais para criar um pipeline ETL usando Python . Você criará um pipeline de dados básico que alimentará dados em um banco de dados Microsoft SQL Server a partir de bancos de dados MySQL e Microsoft SQL Server.

Para configurar o script Python ETL, siga as etapas abaixo:

Etapa 1: instalar os módulos necessários

Para configurar o Python ETL Pipeline, você precisará instalar os seguintes módulos:

  • Python para MySQL Connector: mysql-connector-python (Use o comando pip install mysql-connector-python para instalar)
  • Python para Microsoft SQL Server Connector: pyodbc (Use o comando pip install pyodbc para instalar)

Etapa 2: configurar o diretório ETL

Após instalar os pacotes acima, você precisa criar 4 arquivos Python, mencionados abaixo no diretório do seu projeto:

  • db_credentials.py: Este arquivo inclui código para estabelecer conexões com todos os Bancos de Dados.
  • sql_queries.py: Este arquivo inclui as consultas de banco de dados comumente usadas para extrair e carregar dados em formato de string.
  • etl.py: Este arquivo possui as operações necessárias para conectar-se ao Banco de Dados e executar as consultas necessárias.
  • main.py: Este é o arquivo principal que regula o fluxo e a execução do Python ETL Pipeline.

A) db_credentials.py

Todas as cadeias de conexão de banco de dados de origem e destino devem ser incluídas neste arquivo. Ele deve conter todas as informações necessárias para acessar o banco de dados relevante em um formato de lista para que possa ser iterado rapidamente quando necessário. Veja a seguir um exemplo de script Python para estabelecer a conexão com o banco de dados:

 datawarehouse_name = 'your_dwh_name'
# sql-server (db de destino, datawarehouse)
datawarehouse_db_config = {
  'Trusted_Connection': 'sim',
  'driver': '{SQL Server}',
  'servidor': 'datawarehouse_sql_server',
  'database': '{}'.format(datawarehouse_name),
  'usuário': 'seu_db_uname',
  'password': 'your_db_pword',
  'autocommit': Verdadeiro,
}
# fonte db > sql-server
sqlserver_db_config = [
  {
    'Trusted_Connection': 'sim',
    'driver': '{SQL Server}',
    'servidor': 'seu_db_sql_server',
    'banco de dados': 'db_1st',
    'usuário': 'seu_db_uname',
    'password': 'your_db_pword',
    'autocommit': Verdadeiro,
  }
]
# fonte db > mysql
mysql_db_config = [
  {
    'usuário': 'seu_1_usuário',
    'password': 'your_1_pword',
    'host': 'db_connection_string_1',
    'banco de dados': 'db_1st',
  },
  {
    'usuário': 'seu_2_usuário,
    'senha': 'sua_2_senha',
    'host': 'db_connection_string_2',
    'banco de dados': 'db_2nd',
  },
]

B) sql_queries.py

Esse arquivo inclui consultas para extrair dados dos bancos de dados de origem e carregá-los no banco de dados de destino. O script a seguir irá ajudá-lo a realizar esta tarefa:

 # consultas de exemplo, serão exclusivas para diferentes plataformas de banco de dados

sqlserver_extract = ('''
  SELECT sqlserver_col_1, sqlserver_col_2, sqlserver_col_3
  DE sqlserver_1_table
''')
sqlserver_insert = ('''
  INSERT INTO table_demo (coluna_1, coluna_2, coluna_3)
  VALORES (?, ?,?)  
''')
mysql_extract = ('''
  SELECT mysql_col_1, mysql_col_2, mysql_col_3
  DE mysql_demo_table
''')
mysql_insert = ('''
  INSERT INTO table_demo (coluna_1, coluna_2, coluna_3)
  VALORES (?, ?,?)  
''')

# Consultas sendo exportadas
classe Sql_Query:
  def __init__(self, extract_query, load_query):
    self.extract_query = extract_query
    self.load_query = load_query   
# cria instâncias para a classe Sql_Query
sqlserver_query = SqlQuery(sqlserver_extract, sqlserver_insert)
mysql_query = SqlQuery(extrato_mysql, inserção_mysql)
# criando uma lista para iterar pelos valores
mysql_queries = [mysql_query]
sqlserver_queries = [sqlserver_query]

C) etl.py

Este arquivo deve incluir o código necessário para acessar os Bancos de Dados relevantes e executar as consultas necessárias. O script a seguir irá ajudá-lo a realizar esta tarefa:

 # módulos baseados em python
importar pyodbc
importar mysql.connector

def etl(consulta, source_cnx, target_cnx):
  # extrair dados do banco de dados de origem de demonstração
  source_cursor = source_cnx.cursor()
  source_cursor.execute(query.extract_query)
  dados = source_cursor.fetchall()
  source_cursor.close()

  # carrega dados no banco de dados do Data Warehouse de demonstração
  
se dados:
    target_cursor = target_cnx.cursor()
    target_cursor.execute("USE {}".format(name_for_datawarehouse))
    target_cursor.executemany(query.load_query, dados)
    print('dados carregados no demo Data Warehouse db')
    target_cursor.close()
  outro:
    print('dados vazios')

def etl_process(consultas, target_cnx, source_db_config, db_platform):

  # configurando a conexão de banco de dados de origem de demonstração
  if db_platform == 'mysql':
    source_cnx = mysql.connector.connect(**source_db_config)
  elif db_platform == 'sqlserver':
    source_cnx = pyodbc.connect(**source_db_config)
  outro:
    return 'Erro! plataforma de banco de dados de origem não reconhecida'
  # faz um loop através de consultas sql
  para consulta em consultas:
    etl (consulta, source_cnx, target_cnx)    
  # fecha a conexão do banco de dados de origem
  source_cnx.close()

D) main.py

Esse arquivo inclui código para iterar por meio de credenciais fornecidas para se conectar ao banco de dados e executar as operações ETL Python necessárias. O script a seguir irá ajudá-lo a realizar esta tarefa:

 # variáveis
de db_credentials importe datawarehouse_db_config, sqlserver_db_config, mysql_db_config
de sql_queries importar sqlserver_queries, mysql_queries

# métodos
de etl import etl_process
def main():
  print('iniciando o processo de dados etl')
	
  # estabelece conexão para SQL Server, armazenamento de destino desejado
  target_cnx = pyodbc.connect(**datawarehouse_db_config)
	
  # percorrendo as credenciais
  #Banco de dados > mysql
  para configuração em mysql_db_config: 
    experimentar:
      print("carregando db: " + config['database'])
      etl_process(mysql_queries, target_cnx, config, 'mysql')
    exceto Exceção como erro:
      print("etl para {} tem erro".format(config['database'])))
      print('mensagem de erro: {}'.format(erro))
      Prosseguir
	
  # Banco de dados > sql-server
  para configuração em sqlserver_db_config: 
    experimentar:
      print("carregando db: " + config['database'])
      etl_process(sqlserver_queries, target_cnx, config, 'sqlserver')
    exceto Exceção como erro:
      print("etl para {} tem erro".format(config['database'])))
      print('mensagem de erro: {}'.format(erro))
      Prosseguir

  target_cnx.close()
if __name__ == "__main__":
  a Principal()

Conclusão

Ótimo trabalho! Você obteve com sucesso uma compreensão básica da construção do Python ETL Pipeline. Agora você pode implementar seu script Python ETL personalizado com base em seus requisitos, fazendo alterações nos bancos de dados que estão sendo usados ​​e consultas de acordo.

Para explorar as ferramentas Python ETL amplamente usadas no setor, leia o blog Best Python ETL Tools.

A maioria das organizações hoje em dia trabalha com Big Data. Portanto, criar um pipeline de ETL do zero para esses dados pode ser demorado e desafiador.

Além disso, as empresas precisarão investir uma quantidade significativa de recursos para construí-lo e garantir que possam acompanhar o alto volume de dados e as flutuações de esquema.

Portanto, em vez de criar scripts ETL do zero, você pode aproveitar pipelines de dados automatizados, como o Hevo.

Tem alguma opinião sobre isso? Deixe-nos saber abaixo nos comentários ou leve a discussão para o nosso Twitter ou Facebook.

Recomendações dos editores: