Jak budować potoki ETL w Pythonie

Opublikowany: 2022-01-11

ETL to skrót od Extract, T ransform , Load . W ramach procesu ETL dane są wyodrębniane, przekształcane i ładowane do hurtowni danych, dzięki czemu organizacje mogą je analizować w celu podejmowania strategicznych decyzji.

Poniżej przedstawiono kluczowe kroki wykonywane w potoku ETL:

  • Wyciąg: Ten proces gromadzi i integruje dane z różnych źródeł, w tym baz danych, jezior danych, systemów CRM i innych.
  • Transformacja: jest to najważniejsza faza w potoku ETL. Aby dane były gotowe do analizy, muszą być odpowiednio zebrane, posortowane, wyczyszczone i przestawione na tym etapie.
  • Ładowanie: ten proces obejmuje importowanie ustrukturyzowanych lub nieustrukturyzowanych danych z jezior danych, baz danych i innych źródeł do hurtowni danych, dzięki czemu analitycy danych lub inni użytkownicy mogą łatwo uzyskać szczegółowe informacje.

Zrozumienie znaczenia Pythona ETL

Python to jeden z najpopularniejszych i powszechnie wykorzystywanych języków programowania współczesnego świata, z nieskończonymi aplikacjami w różnych dziedzinach. Zdobył prestiżową nagrodę TIOBE Language Programming of the Year 2021.

Elastyczny i dynamiczny charakter Pythona czyni go idealnym do zadań wdrażania, analizy i konserwacji. Python ETL to jedna z kluczowych umiejętności wymaganych w inżynierii danych do budowania potoków danych, opracowywania modeli statystycznych i przeprowadzania ich dokładnej analizy.

Stało się popularnym narzędziem do wykonywania procesów ETL ze względu na łatwość obsługi i solidne biblioteki dostępu do baz danych i systemów pamięci masowej. Wiele zespołów używa Pythona do ETL i inżynierii danych zamiast narzędzia ETL, ponieważ jest ono bardziej wszechstronne i wydajne do tych zadań.

Największą zaletą Pythona w porównaniu z innymi językami programowania jest prostota użycia w eksploracji danych, nauce o danych, Big Data, sztucznej inteligencji i uczeniu maszynowym.

Firmy na całym świecie używają Pythona do swoich danych, aby uzyskać wgląd, zarządzać swoimi operacjami i zapewnić płynne działanie.

2 proste kroki do zbudowania potoku ETL w Pythonie

W tej części poznasz podstawowe kroki tworzenia potoku ETL za pomocą Pythona . Utworzysz podstawowy potok danych, który przesyła dane do bazy danych Microsoft SQL Server z baz danych MySQL i Microsoft SQL Server.

Aby skonfigurować skrypt Python ETL, wykonaj poniższe czynności:

Krok 1: Zainstaluj wymagane moduły

Aby skonfigurować Python ETL Pipeline, musisz zainstalować następujące moduły:

  • Złącze Python do MySQL: mysql-connector-python (użyj polecenia pip install mysql-connector-python, aby zainstalować)
  • Złącze Python do Microsoft SQL Server: pyodbc (użyj polecenia pip install pyodbc , aby zainstalować)

Krok 2: Skonfiguruj katalog ETL

Po zainstalowaniu powyższych pakietów, musisz utworzyć 4 pliki Pythona, wymienione poniżej w katalogu twojego projektu:

  • db_credentials.py: Ten plik zawiera kod do nawiązywania połączeń ze wszystkimi bazami danych.
  • sql_queries.py: Ten plik zawiera często używane zapytania bazy danych do wyodrębniania i ładowania danych w formacie ciągu.
  • etl.py: Ten plik zawiera operacje niezbędne do połączenia z bazą danych i uruchomienia wymaganych zapytań.
  • main.py: Jest to podstawowy plik, który reguluje przepływ i wykonanie potoku ETL Pythona.

A) db_credentials.py

Wszystkie ciągi połączeń źródłowej i docelowej bazy danych powinny być zawarte w tym pliku. Powinna ona zawierać wszystkie informacje niezbędne do uzyskania dostępu do odpowiedniej bazy danych w formie listy, aby w razie potrzeby można było ją szybko iterować. Poniżej znajduje się przykładowy skrypt Pythona do nawiązania połączenia z bazą danych:

 datawarehouse_name = 'twoja_nazwa_dwh'
# sql-server (docelowa baza danych, hurtownia danych)
datawarehouse_db_config = {
  'Trusted_Connection': 'tak',
  'sterownik': '{Serwer SQL}',
  'serwer': 'datawarehouse_sql_server',
  'baza danych': '{}'.format(datawarehouse_name),
  'użytkownik': 'twoja_nazwa_bazy_danych',
  'hasło': 'twoje_db_hasło',
  „automatyczne zatwierdzanie”: Prawda,
}
# źródłowa baza danych > serwer sql
sqlserver_db_config = [
  {
    'Trusted_Connection': 'tak',
    'sterownik': '{Serwer SQL}',
    'serwer': 'twój_db_sql_serwer',
    'baza danych': 'db_1st',
    'użytkownik': 'twoja_nazwa_bazy_danych',
    'hasło': 'twoje_db_hasło',
    „automatyczne zatwierdzanie”: Prawda,
  }
]
# baza danych źródłowych > mysql
mysql_db_config = [
  {
    'użytkownik': 'twój_1_użytkownik',
    'hasło': 'twoje_1_hasło',
    'host': 'db_connection_string_1',
    'baza danych': 'db_1st',
  },
  {
    'użytkownik': 'twój_2_użytkownik,
    'hasło': 'twoje_2_hasło',
    'host': 'db_connection_string_2',
    'baza danych': 'db_2nd',
  },
]

B) sql_queries.py

Plik ten zawiera zapytania do wydobycia danych ze źródłowych baz danych i załadowania ich do docelowej bazy danych. Poniższy skrypt pomoże ci wykonać to zadanie:

 # przykładowe zapytania, będą unikalne dla różnych platform bazodanowych

sqlserver_extract = ('''
  WYBIERZ sqlserver_col_1, sqlserver_col_2, sqlserver_col_3
  Z sqlserver_1_table
'''')
sqlserver_insert = ('''
  INSERT INTO table_demo (col_1, col_2, col_3)
  WARTOŚCI (?, ?, ?)  
'''')
mysql_extract = ('''
  WYBIERZ mysql_col_1, mysql_col_2, mysql_col_3
  Z mysql_demo_table
'''')
mysql_insert = ('''
  INSERT INTO table_demo (col_1, col_2, col_3)
  WARTOŚCI (?, ?, ?)  
'''')

# Kwerendy są eksportowane
klasa Sql_Query:
  def __init__(self, extract_query, load_query):
    self.extract_query = ekstrakt_zapytanie
    self.load_query = load_query   
# utwórz instancje dla klasy Sql_Query
sqlserver_query = SqlQuery (sqlserver_extract, sqlserver_insert)
mysql_query = SqlQuery (mysql_extract, mysql_insert)
# tworzenie listy do iteracji przez wartości
mysql_query = [mysql_query]
sqlserver_queries = [sqlserver_query]

C) etl.py

Plik ten powinien zawierać kod wymagany do uzyskania dostępu do odpowiednich baz danych i wykonania wymaganych zapytań. Poniższy skrypt pomoże ci wykonać to zadanie:

 # modułów opartych na Pythonie
importuj pyodbc
import mysql.connector

def etl(zapytanie, source_cnx, target_cnx):
  # wyodrębnij dane z demo źródłowej bazy danych
  kursor_źródłowy = kursor_źródłowy.kursor()
  source_cursor.execute(query.extract_query)
  dane = kursor_źródłowy.fetchall()
  kursor_źródłowy.zamknij()

  # załaduj dane do demo hurtowni danych db
  
jeśli dane:
    cel_kursor = cel_cnx.kursor()
    target_cursor.execute("USE {}".format(name_for_datawarehouse))
    target_cursor.executemany(query.load_query, dane)
    print('dane załadowane do demo Hurtowni Danych db')
    cel_kursor.zamknij()
  w przeciwnym razie:
    print('dane są puste')

def etl_process(zapytania, target_cnx, source_db_config, db_platform):

  # konfigurowanie połączenia z bazą danych źródła demo
  if db_platform == 'mysql':
    source_cnx = mysql.connector.connect(**source_db_config)
  elif db_platform == 'sqlserver':
    source_cnx = pyodbc.connect(**source_db_config)
  w przeciwnym razie:
    return 'Błąd! nierozpoznana platforma źródłowej bazy danych”
  # pętla przez zapytania sql
  dla zapytania w zapytaniach:
    etl (zapytanie, source_cnx, target_cnx)    
  # zamknij źródłowe połączenie z bazą danych
  source_cnx.zamknij()

D) główna.py

Ten plik zawiera kod do iteracji przez podane poświadczenia, aby połączyć się z bazą danych i wykonać niezbędne operacje ETL Python. Poniższy skrypt pomoże ci wykonać to zadanie:

 # zmienne
z db_credentials importuj datawarehouse_db_config, sqlserver_db_config, mysql_db_config
z sql_queries import sqlserver_queries, mysql_queries

# metody
z etl importuje etl_process
def główna():
  print('rozpoczęcie procesu danych etl')
	
  # nawiąż połączenie dla SQL Server, żądane miejsce docelowe
  target_cnx = pyodbc.connect(**datawarehouse_db_config)
	
  # Pętla przez poświadczenia
  # Baza danych > mysql
  dla konfiguracji w mysql_db_config: 
    próbować:
      print("ładowanie bazy danych: " + config['baza danych'])
      etl_process(mysql_queries, target_cnx, config, 'mysql')
    z wyjątkiem Wyjątku jako błędu:
      print("etl for {} ma błąd".format(config['baza danych']))
      print('komunikat o błędzie: {}'.format(błąd))
      kontyntynuj
	
  # Baza danych > serwer sql
  dla konfiguracji w sqlserver_db_config: 
    próbować:
      print("ładowanie bazy danych: " + config['baza danych'])
      etl_process(sqlserver_queries, target_cnx, config, 'sqlserver')
    z wyjątkiem Wyjątku jako błędu:
      print("etl for {} ma błąd".format(config['baza danych']))
      print('komunikat o błędzie: {}'.format(błąd))
      kontyntynuj

  cel_cnx.zamknij()
if __name__ == "__main__":
  Główny()

Wniosek

Świetna robota! Pomyślnie zdobyłeś podstawową wiedzę na temat budowania Python ETL Pipeline. Teraz możesz zaimplementować swój niestandardowy skrypt ETL języka Python w oparciu o swoje wymagania, wprowadzając zmiany w używanych bazach danych i odpowiednio wysyłając zapytania.

Aby zapoznać się z szeroko stosowanymi narzędziami Python ETL w branży, przeczytaj blog Best Python ETL Tools.

Większość organizacji pracuje obecnie z Big Data. Dlatego tworzenie potoku ETL od podstaw dla takich danych może być czasochłonne i trudne.

Co więcej, przedsiębiorstwa będą musiały zainwestować znaczne środki, aby go zbudować, a następnie zagwarantować, że nadążą za dużym wolumenem danych i fluktuacjami schematów.

Tak więc zamiast tworzyć skrypty ETL od podstaw, możesz wykorzystać zautomatyzowane potoki danych, takie jak Hevo.

Masz jakieś przemyślenia na ten temat? Daj nam znać poniżej w komentarzach lub przenieś dyskusję na naszego Twittera lub Facebooka.

Rekomendacje redaktorów: