Cum să construiți conducte ETL în Python

Publicat: 2022-01-11

ETL înseamnă E xtract , T ransform, Load. Ca parte a procesului ETL, datele sunt extrase, transformate și încărcate în depozite de date, astfel încât organizațiile să le poată analiza pentru a lua decizii strategice.

Următorii sunt pașii cheie efectuați în conducta ETL:

  • Extras: Acest proces colectează și integrează date dintr-o varietate de surse, inclusiv baze de date, lacuri de date, CRM-uri și altele.
  • Transformare: Aceasta este cea mai importantă fază dintr-o conductă ETL. Pentru ca datele să fie pregătite pentru analiză, acestea trebuie adunate, sortate, curățate și pivotate în mod corespunzător în acest pas.
  • Încărcare: acest proces implică importarea datelor structurate sau nestructurate din Lacuri de date, baze de date și alte surse în depozite de date, astfel încât analiștii de date sau alți utilizatori să poată obține informații profunde cu ușurință.

Înțelegerea importanței Python ETL

Python este unul dintre cele mai populare și utilizate limbaje de programare din lumea modernă, cu aplicații nesfârșite într-o varietate de domenii. A câștigat prestigiosul premiu TIOBE Programming Language of the Year 2021.

Natura flexibilă și dinamică a lui Python îl face ideal pentru sarcini de implementare, analiză și întreținere. Python ETL este una dintre abilitățile cruciale necesare în ingineria datelor pentru a construi conducte de date, a dezvolta modele statistice și a efectua o analiză amănunțită a acestora.

A devenit un instrument popular pentru executarea proceselor ETL datorită ușurinței sale de utilizare și bibliotecilor solide pentru accesarea bazelor de date și sistemelor de stocare. Multe echipe folosesc Python pentru ETL și Data Engineering, mai degrabă decât un instrument ETL, deoarece este mai versatil și mai puternic pentru aceste sarcini.

Cel mai mare beneficiu al Python față de alte limbaje de programare este simplitatea utilizării în Data Mining, Data Science, Big Data, Inteligență artificială și Machine Learning.

Companiile din întreaga lume folosesc Python pentru datele lor pentru a obține informații, pentru a-și gestiona operațiunile și pentru a menține totul să funcționeze fără probleme.

2 pași simpli pentru a construi Python ETL Pipeline

În această parte, veți învăța pașii esențiali pentru construirea unei conducte ETL folosind Python . Veți crea o conductă de date de bază care introduce date într-o bază de date Microsoft SQL Server din bazele de date MySQL și Microsoft SQL Server.

Pentru a configura scriptul Python ETL, urmați pașii de mai jos:

Pasul 1: Instalați modulele necesare

Pentru a configura conducta Python ETL, va trebui să instalați următoarele module:

  • Conector Python la MySQL: mysql-connector-python (Utilizați comanda pip install mysql-connector-python pentru a instala)
  • Conector Python la Microsoft SQL Server: pyodbc (Utilizați comanda pip install pyodbc pentru a instala)

Pasul 2: Configurați directorul ETL

După instalarea pachetelor de mai sus, trebuie să creați 4 fișiere Python, menționate mai jos în directorul de proiect:

  • db_credentials.py: Acest fișier include cod pentru a stabili conexiuni cu toate bazele de date.
  • sql_queries.py: acest fișier cuprinde interogările de bază de date utilizate în mod obișnuit pentru extragerea și încărcarea datelor în format șir.
  • etl.py: Acest fișier are operațiunile necesare pentru a se conecta la baza de date și a rula interogările necesare.
  • main.py: Acesta este fișierul principal care reglementează fluxul și execuția conductei ETL Python.

A) db_credentials.py

Toate șirurile de conexiune la baza de date sursă și țintă ar trebui incluse în acest fișier. Ar trebui să conțină toate informațiile necesare pentru accesarea bazei de date relevante într-un format de listă, astfel încât să poată fi repetat rapid atunci când este necesar. Următorul este un exemplu de script Python pentru a stabili conexiunea la baza de date:

 datawarehouse_name = 'numele_dwh_dwh'
# sql-server (db țintă, depozit de date)
datawarehouse_db_config = {
  „Trusted_Connection”: „da”,
  „driver”: „{SQL Server}”,
  'server': 'datawarehouse_sql_server',
  'database': '{}'.format(datawarehouse_name),
  „user”: „your_db_uname”,
  „parolă”: „parola_db_db”,
  'autocommit': adevărat,
}
# sursă db > sql-server
sqlserver_db_config = [
  {
    „Trusted_Connection”: „da”,
    „driver”: „{SQL Server}”,
    'server': 'serverul_db_sql_dvs',
    „bază de date”: „db_1st”,
    „user”: „your_db_uname”,
    „parolă”: „parola_db_db”,
    'autocommit': adevărat,
  }
]
# sursă db > mysql
mysql_db_config = [
  {
    'user': 'your_1_user',
    „parolă”: „parola_1_voastră”,
    „gazdă”: „db_connection_string_1”,
    „bază de date”: „db_1st”,
  },
  {
    „utilizator”: „utilizatorul_2_dvs,
    „parola”: „parola_2_dvs”,
    „gazdă”: „db_connection_string_2”,
    'database': 'db_2nd',
  },
]

B) sql_queries.py

Acest fișier include interogări pentru extragerea datelor din bazele de date sursă și încărcarea acestora în baza de date țintă. Următorul script vă va ajuta să efectuați această sarcină:

 # exemple de interogări, vor fi unice pentru diferite platforme de baze de date

sqlserver_extract = ('''
  SELECTAȚI sqlserver_col_1, sqlserver_col_2, sqlserver_col_3
  DIN sqlserver_1_table
''')
sqlserver_insert = ('''
  INSERT INTO table_demo (col_1, col_2, col_3)
  VALORI (?, ?, ?)  
''')
mysql_extract = ('''
  SELECTAȚI mysql_col_1, mysql_col_2, mysql_col_3
  DIN mysql_demo_table
''')
mysql_insert = ('''
  INSERT INTO table_demo (col_1, col_2, col_3)
  VALORI (?, ?, ?)  
''')

# Interogările sunt exportate
clasa Sql_Query:
  def __init__(self, extract_query, load_query):
    self.extract_query = extragere_interogare
    self.load_query = load_query   
# creați instanțe pentru clasa Sql_Query
sqlserver_query = SqlQuery (sqlserver_extract, sqlserver_insert)
mysql_query = SqlQuery(mysql_extract, mysql_insert)
# crearea unei liste pentru iterare prin valori
mysql_queries = [interogare_mysql]
sqlserver_queries = [sqlserver_query]

C) etl.py

Acest fișier ar trebui să includă codul necesar pentru a accesa bazele de date relevante și pentru a executa interogările necesare. Următorul script vă va ajuta să efectuați această sarcină:

 # module bazate pe python
import pyodbc
import mysql.connector

def etl(interogare, source_cnx, target_cnx):
  # extrageți date din baza de date sursă demonstrativă
  source_cursor = source_cnx.cursor()
  source_cursor.execute(query.extract_query)
  date = source_cursor.fetchall()
  source_cursor.close()

  # încărcați datele în demo Data Warehouse db
  
daca date:
    target_cursor = target_cnx.cursor()
    target_cursor.execute("USE {}".format(name_for_datawarehouse))
    target_cursor.executemany(query.load_query, data)
    print('datele încărcate în demo Data Warehouse db')
    target_cursor.close()
  altceva:
    print('datele sunt goale')

def etl_process(interogări, target_cnx, source_db_config, db_platform):

  # configurarea conexiunii la baza de date sursă demo
  dacă db_platform == 'mysql':
    source_cnx = mysql.connector.connect(**source_db_config)
  elif db_platform == 'sqlserver':
    source_cnx = pyodbc.connect(**source_db_config)
  altceva:
    returnează „Eroare! platformă de baze de date sursă nerecunoscută”
  # parcurge interogări sql
  pentru interogări în interogări:
    etl (interogare, source_cnx, target_cnx)    
  # închideți conexiunea db sursă
  source_cnx.close()

D) principal.py

Acest fișier include cod pentru a repeta prin acreditările date pentru a se conecta la baza de date și a executa operațiunile ETL Python necesare. Următorul script vă va ajuta să efectuați această sarcină:

 # variabile
din db_credentials import datawarehouse_db_config, sqlserver_db_config, mysql_db_config
din sql_queries import sqlserver_queries, mysql_queries

# metode
din etl import etl_process
def main():
  print('pornirea procesului de date etl')
	
  # stabiliți conexiunea pentru SQL Server, stocarea destinației dorite
  target_cnx = pyodbc.connect(**datawarehouse_db_config)
	
  # trecerea în buclă prin acreditări
  # Baza de date > mysql
  pentru config în mysql_db_config: 
    încerca:
      print("se incarca db: " + config['baza de date'])
      etl_process(mysql_queries, target_cnx, config, 'mysql')
    cu excepția excepției ca eroare:
      print("etl pentru {} are eroare".format(config['baza de date']))
      print('mesaj de eroare: {}'.format(eroare))
      continua
	
  # Bază de date > sql-server
  pentru config în sqlserver_db_config: 
    încerca:
      print("se incarca db: " + config['baza de date'])
      etl_process(sqlserver_queries, target_cnx, config, 'sqlserver')
    cu excepția excepției ca eroare:
      print("etl pentru {} are eroare".format(config['baza de date']))
      print('mesaj de eroare: {}'.format(eroare))
      continua

  target_cnx.close()
if __name__ == "__main__":
  principal()

Concluzie

Buna treaba! Ați dobândit cu succes o înțelegere de bază a construirii Python ETL Pipeline. Acum puteți implementa scriptul dvs. personalizat ETL Python pe baza cerințelor dvs. făcând modificări bazelor de date utilizate și interogând în consecință.

Pentru a explora instrumentele Python ETL utilizate pe scară largă în industrie, citiți blogul Cele mai bune instrumente Python ETL.

Majoritatea organizațiilor lucrează în prezent cu Big Data. Prin urmare, crearea unei conducte ETL de la zero pentru astfel de date poate fi consumatoare de timp și o provocare.

În plus, întreprinderile vor trebui să investească o cantitate semnificativă de resurse pentru a-l construi și apoi să garanteze că pot ține pasul cu volumul mare de date și fluctuațiile schemei.

Așadar, în loc să creați scripturi ETL de la zero, puteți utiliza pipeline de date automate, cum ar fi Hevo.

Ai vreo părere despre asta? Anunțați-ne mai jos în comentarii sau transmiteți discuția pe Twitter sau Facebook.

Recomandările editorilor: