Come costruire pipeline ETL in Python

Pubblicato: 2022-01-11

ETL sta per Estrarre , Trasformare , Caricare . Come parte del processo ETL, i dati vengono estratti, trasformati e caricati in data warehouse in modo che le organizzazioni possano analizzarli per prendere decisioni strategiche.

Di seguito sono riportati i passaggi chiave eseguiti nella pipeline ETL:

  • Estratto: questo processo raccoglie e integra i dati da una varietà di fonti, inclusi database, Data Lake, CRM e altri.
  • Trasforma: questa è la fase più cruciale in una pipeline ETL. Per rendere i dati pronti per l'analisi, in questo passaggio devono essere raccolti, ordinati, puliti e ruotati correttamente.
  • Carico: questo processo prevede l'importazione di dati strutturati o non strutturati da Data Lakes, database e altre origini in Data Warehouse in modo che gli analisti di dati o altri utenti possano ottenere facilmente informazioni approfondite.

Comprendere l'importanza di Python ETL

Python è uno dei linguaggi di programmazione più popolari e comunemente utilizzati al mondo moderno, con infinite applicazioni in una varietà di campi. Ha vinto il prestigioso premio TIOBE Programming Language of the Year 2021.

La natura flessibile e dinamica di Python lo rende ideale per attività di distribuzione, analisi e manutenzione. Python ETL è una delle competenze cruciali richieste nell'ingegneria dei dati per costruire pipeline di dati, sviluppare modelli statistici ed eseguire un'analisi approfondita su di essi.

È diventato uno strumento popolare per l'esecuzione di processi ETL grazie alla sua facilità d'uso e alle solide librerie per l'accesso a database e sistemi di archiviazione. Molti team utilizzano Python per ETL e ingegneria dei dati piuttosto che uno strumento ETL in quanto è più versatile e potente per queste attività.

Il più grande vantaggio di Python rispetto ad altri linguaggi di programmazione è la semplicità d'uso in Data Mining, Data Science, Big Data, Intelligenza Artificiale e Machine Learning.

Le aziende di tutto il mondo utilizzano Python per i propri dati per ottenere informazioni dettagliate, gestire le proprie operazioni e mantenere tutto in ordine.

2 semplici passaggi per creare una pipeline ETL Python

In questa parte imparerai i passaggi essenziali per creare una pipeline ETL usando Python . Creerai una pipeline di dati di base che alimenta i dati in un database di Microsoft SQL Server da database MySQL e Microsoft SQL Server.

Per configurare lo script ETL Python, attenersi alla seguente procedura:

Passaggio 1: installare i moduli richiesti

Per configurare Python ETL Pipeline, dovrai installare i seguenti moduli:

  • Connettore da Python a MySQL: mysql-connector-python (usa il comando pip install mysql-connector-python per installare)
  • Connettore da Python a Microsoft SQL Server: pyodbc (usa il comando pip install pyodbc per installare)

Passaggio 2: configurare la directory ETL

Dopo aver installato i pacchetti di cui sopra, devi creare 4 file Python, menzionati di seguito nella directory del tuo progetto:

  • db_credentials.py: questo file include il codice per stabilire connessioni con tutti i database.
  • sql_queries.py: questo file comprende le query di database comunemente utilizzate per estrarre e caricare dati in formato stringa.
  • etl.py: Questo file possiede le operazioni necessarie per connettersi al Database ed eseguire le query richieste.
  • main.py: questo è il file principale che regola il flusso e l'esecuzione della pipeline ETL Python.

A) db_credentials.py

Tutte le stringhe di connessione al database di origine e di destinazione devono essere incluse in questo file. Dovrebbe contenere tutte le informazioni necessarie per accedere al database pertinente in un formato elenco in modo che possa essere ripetuto rapidamente quando necessario. Quello che segue è uno script Python di esempio per stabilire la connessione al database:

 datawarehouse_name = 'tuo_dwh_name'
# sql-server (db di destinazione, datawarehouse)
datawarehouse_db_config = {
  'Trusd_Connection': 'sì',
  'driver': '{SQL Server}',
  'server': 'datawarehouse_sql_server',
  'database': '{}'.format(datawarehouse_name),
  'user': 'your_db_uname',
  'password': 'tua_db_pword',
  'autocommit': Vero,
}
# sorgente db > sql-server
sqlserver_db_config = [
  {
    'Trusd_Connection': 'sì',
    'driver': '{SQL Server}',
    'server': 'your_db_sql_server',
    'database': 'db_1st',
    'user': 'your_db_uname',
    'password': 'tua_db_pword',
    'autocommit': Vero,
  }
]
# sorgente db > mysql
mysql_db_config = [
  {
    'utente': 'tuo_1_utente',
    'password': 'tua_1_pword',
    'host': 'db_connection_string_1',
    'database': 'db_1st',
  },
  {
    'utente': 'tuo_2_utente,
    'password': 'your_2_password',
    'host': 'db_connection_string_2',
    'database': 'db_2nd',
  },
]

B) sql_queries.py

Questo file include query per estrarre i dati dai database di origine e caricarli nel database di destinazione. Il seguente script ti aiuterà a eseguire questa attività:

 # query di esempio, saranno univoche per diverse piattaforme di database

sqlserver_extract = ('''
  SELEZIONA sqlserver_col_1, sqlserver_col_2, sqlserver_col_3
  DA sqlserver_1_table
''')
sqlserver_insert = ('''
  INSERT INTO table_demo (col_1, col_2, col_3)
  I VALORI (?, ?, ?)  
''')
mysql_extract = ('''
  SELEZIONA mysql_col_1, mysql_col_2, mysql_col_3
  DA mysql_demo_table
''')
mysql_insert = ('''
  INSERT INTO table_demo (col_1, col_2, col_3)
  I VALORI (?, ?, ?)  
''')

# Le query vengono esportate
classe SQL_Query:
  def __init__(self, extract_query, load_query):
    self.extract_query = estrarre_query
    self.load_query = carico_query   
# crea istanze per la classe Sql_Query
sqlserver_query = SqlQuery(sqlserver_extract, sqlserver_insert)
mysql_query = SqlQuery(mysql_extract, mysql_insert)
# creazione di un elenco per l'iterazione dei valori
query_mysql = [query_mysql]
query_sqlserver = [query_sqlserver]

C) etl.py

Questo file dovrebbe includere il codice necessario per accedere ai Database pertinenti ed eseguire le query richieste. Il seguente script ti aiuterà a eseguire questa attività:

 # moduli basati su Python
importa pyodbc
importa mysql.connector

def etl(query, source_cnx, target_cnx):
  # estrarre i dati dal database di origine demo
  source_cursor = source_cnx.cursor()
  source_cursor.execute(query.extract_query)
  dati = source_cursor.fetchall()
  source_cursor.close()

  # carica i dati nella demo Data Warehouse db
  
se i dati:
    target_cursor = target_cnx.cursor()
    target_cursor.execute("USE {}".format(name_for_datawarehouse))
    target_cursor.executemany(query.load_query, dati)
    print('dati caricati nella demo Data Warehouse db')
    target_cursor.close()
  altro:
    print('i dati sono vuoti')

def etl_process(query, target_cnx, source_db_config, db_platform):

  # configurazione della connessione al database di origine demo
  se db_platform == 'mysql':
    source_cnx = mysql.connector.connect(**source_db_config)
  elif db_platform == 'sqlserver':
    source_cnx = pyodbc.connect(**source_db_config)
  altro:
    restituisce 'Errore! piattaforma di database di origine non riconosciuta'
  # scorrere le query sql
  per query nelle query:
    etl (query, source_cnx, target_cnx)    
  # chiudere la connessione db di origine
  source_cnx.close()

D) main.py

Questo file include il codice per scorrere le credenziali fornite per connettersi al database ed eseguire le necessarie operazioni ETL Python. Il seguente script ti aiuterà a eseguire questa attività:

 # variabili
da db_credentials import datawarehouse_db_config, sqlserver_db_config, mysql_db_config
da sql_queries import sqlserver_queries, mysql_queries

# metodi
da etl import etl_process
def main():
  print('avvio del processo dati etl')
	
  # stabilire una connessione per SQL Server, archiviazione di destinazione desiderata
  target_cnx = pyodbc.connect(**datawarehouse_db_config)
	
  # scorrendo le credenziali
  # Database > MySQL
  per la configurazione in mysql_db_config: 
    Tentativo:
      print("caricamento db: " + config['database'])
      etl_process(mysql_queries, target_cnx, config, 'mysql')
    tranne Eccezione come errore:
      print("etl per {} ha un errore".format(config['database'])))
      print('messaggio di errore: {}'.format(errore))
      Continua
	
  # Database > sql-server
  per la configurazione in sqlserver_db_config: 
    Tentativo:
      print("caricamento db: " + config['database'])
      etl_process(sqlserver_queries, target_cnx, config, 'sqlserver')
    tranne Eccezione come errore:
      print("etl per {} ha un errore".format(config['database'])))
      print('messaggio di errore: {}'.format(errore))
      Continua

  target_cnx.close()
if __name__ == "__main__":
  principale()

Conclusione

Ottimo lavoro! Hai acquisito con successo una conoscenza di base della creazione di Python ETL Pipeline. Ora puoi implementare il tuo script ETL Python personalizzato in base ai tuoi requisiti apportando modifiche ai database utilizzati e interrogando di conseguenza.

Per esplorare gli strumenti Python ETL ampiamente utilizzati nel settore, dai una lettura al blog Best Python ETL Tools.

La maggior parte delle organizzazioni oggigiorno lavora con i Big Data. Pertanto, la creazione di una pipeline ETL da zero per tali dati può richiedere molto tempo e risultare impegnativa.

Inoltre, le aziende dovranno investire una quantità significativa di risorse per costruirlo e quindi garantire di poter stare al passo con l'elevato volume di dati e le fluttuazioni dello schema.

Quindi, invece di creare script ETL da zero, puoi sfruttare pipeline di dati automatizzate come Hevo.

Hai qualche idea su questo? Fatecelo sapere in basso nei commenti o trasferite la discussione sul nostro Twitter o Facebook.

Raccomandazioni della redazione: