Jak budować potoki ETL w Pythonie
Opublikowany: 2022-01-11ETL to skrót od Extract, T ransform , Load . W ramach procesu ETL dane są wyodrębniane, przekształcane i ładowane do hurtowni danych, dzięki czemu organizacje mogą je analizować w celu podejmowania strategicznych decyzji.
Poniżej przedstawiono kluczowe kroki wykonywane w potoku ETL:
- Wyciąg: Ten proces gromadzi i integruje dane z różnych źródeł, w tym baz danych, jezior danych, systemów CRM i innych.
- Transformacja: jest to najważniejsza faza w potoku ETL. Aby dane były gotowe do analizy, muszą być odpowiednio zebrane, posortowane, wyczyszczone i przestawione na tym etapie.
- Ładowanie: ten proces obejmuje importowanie ustrukturyzowanych lub nieustrukturyzowanych danych z jezior danych, baz danych i innych źródeł do hurtowni danych, dzięki czemu analitycy danych lub inni użytkownicy mogą łatwo uzyskać szczegółowe informacje.
Zrozumienie znaczenia Pythona ETL
Python to jeden z najpopularniejszych i powszechnie wykorzystywanych języków programowania współczesnego świata, z nieskończonymi aplikacjami w różnych dziedzinach. Zdobył prestiżową nagrodę TIOBE Language Programming of the Year 2021.
Elastyczny i dynamiczny charakter Pythona czyni go idealnym do zadań wdrażania, analizy i konserwacji. Python ETL to jedna z kluczowych umiejętności wymaganych w inżynierii danych do budowania potoków danych, opracowywania modeli statystycznych i przeprowadzania ich dokładnej analizy.
Stało się popularnym narzędziem do wykonywania procesów ETL ze względu na łatwość obsługi i solidne biblioteki dostępu do baz danych i systemów pamięci masowej. Wiele zespołów używa Pythona do ETL i inżynierii danych zamiast narzędzia ETL, ponieważ jest ono bardziej wszechstronne i wydajne do tych zadań.
Największą zaletą Pythona w porównaniu z innymi językami programowania jest prostota użycia w eksploracji danych, nauce o danych, Big Data, sztucznej inteligencji i uczeniu maszynowym.
Firmy na całym świecie używają Pythona do swoich danych, aby uzyskać wgląd, zarządzać swoimi operacjami i zapewnić płynne działanie.
2 proste kroki do zbudowania potoku ETL w Pythonie
W tej części poznasz podstawowe kroki tworzenia potoku ETL za pomocą Pythona . Utworzysz podstawowy potok danych, który przesyła dane do bazy danych Microsoft SQL Server z baz danych MySQL i Microsoft SQL Server.
Aby skonfigurować skrypt Python ETL, wykonaj poniższe czynności:
Krok 1: Zainstaluj wymagane moduły
Aby skonfigurować Python ETL Pipeline, musisz zainstalować następujące moduły:
- Złącze Python do MySQL: mysql-connector-python (użyj polecenia pip install mysql-connector-python, aby zainstalować)
- Złącze Python do Microsoft SQL Server: pyodbc (użyj polecenia pip install pyodbc , aby zainstalować)
Krok 2: Skonfiguruj katalog ETL
Po zainstalowaniu powyższych pakietów, musisz utworzyć 4 pliki Pythona, wymienione poniżej w katalogu twojego projektu:
- db_credentials.py: Ten plik zawiera kod do nawiązywania połączeń ze wszystkimi bazami danych.
- sql_queries.py: Ten plik zawiera często używane zapytania bazy danych do wyodrębniania i ładowania danych w formacie ciągu.
- etl.py: Ten plik zawiera operacje niezbędne do połączenia z bazą danych i uruchomienia wymaganych zapytań.
- main.py: Jest to podstawowy plik, który reguluje przepływ i wykonanie potoku ETL Pythona.
A) db_credentials.py
Wszystkie ciągi połączeń źródłowej i docelowej bazy danych powinny być zawarte w tym pliku. Powinna ona zawierać wszystkie informacje niezbędne do uzyskania dostępu do odpowiedniej bazy danych w formie listy, aby w razie potrzeby można było ją szybko iterować. Poniżej znajduje się przykładowy skrypt Pythona do nawiązania połączenia z bazą danych:
datawarehouse_name = 'twoja_nazwa_dwh' # sql-server (docelowa baza danych, hurtownia danych) datawarehouse_db_config = { 'Trusted_Connection': 'tak', 'sterownik': '{Serwer SQL}', 'serwer': 'datawarehouse_sql_server', 'baza danych': '{}'.format(datawarehouse_name), 'użytkownik': 'twoja_nazwa_bazy_danych', 'hasło': 'twoje_db_hasło', „automatyczne zatwierdzanie”: Prawda, } # źródłowa baza danych > serwer sql sqlserver_db_config = [ { 'Trusted_Connection': 'tak', 'sterownik': '{Serwer SQL}', 'serwer': 'twój_db_sql_serwer', 'baza danych': 'db_1st', 'użytkownik': 'twoja_nazwa_bazy_danych', 'hasło': 'twoje_db_hasło', „automatyczne zatwierdzanie”: Prawda, } ] # baza danych źródłowych > mysql mysql_db_config = [ { 'użytkownik': 'twój_1_użytkownik', 'hasło': 'twoje_1_hasło', 'host': 'db_connection_string_1', 'baza danych': 'db_1st', }, { 'użytkownik': 'twój_2_użytkownik, 'hasło': 'twoje_2_hasło', 'host': 'db_connection_string_2', 'baza danych': 'db_2nd', }, ]
B) sql_queries.py
Plik ten zawiera zapytania do wydobycia danych ze źródłowych baz danych i załadowania ich do docelowej bazy danych. Poniższy skrypt pomoże ci wykonać to zadanie:
# przykładowe zapytania, będą unikalne dla różnych platform bazodanowych sqlserver_extract = (''' WYBIERZ sqlserver_col_1, sqlserver_col_2, sqlserver_col_3 Z sqlserver_1_table '''') sqlserver_insert = (''' INSERT INTO table_demo (col_1, col_2, col_3) WARTOŚCI (?, ?, ?) '''') mysql_extract = (''' WYBIERZ mysql_col_1, mysql_col_2, mysql_col_3 Z mysql_demo_table '''') mysql_insert = (''' INSERT INTO table_demo (col_1, col_2, col_3) WARTOŚCI (?, ?, ?) '''') # Kwerendy są eksportowane klasa Sql_Query: def __init__(self, extract_query, load_query): self.extract_query = ekstrakt_zapytanie self.load_query = load_query # utwórz instancje dla klasy Sql_Query sqlserver_query = SqlQuery (sqlserver_extract, sqlserver_insert) mysql_query = SqlQuery (mysql_extract, mysql_insert) # tworzenie listy do iteracji przez wartości mysql_query = [mysql_query] sqlserver_queries = [sqlserver_query]
C) etl.py
Plik ten powinien zawierać kod wymagany do uzyskania dostępu do odpowiednich baz danych i wykonania wymaganych zapytań. Poniższy skrypt pomoże ci wykonać to zadanie:
# modułów opartych na Pythonie importuj pyodbc import mysql.connector def etl(zapytanie, source_cnx, target_cnx): # wyodrębnij dane z demo źródłowej bazy danych kursor_źródłowy = kursor_źródłowy.kursor() source_cursor.execute(query.extract_query) dane = kursor_źródłowy.fetchall() kursor_źródłowy.zamknij() # załaduj dane do demo hurtowni danych db jeśli dane: cel_kursor = cel_cnx.kursor() target_cursor.execute("USE {}".format(name_for_datawarehouse)) target_cursor.executemany(query.load_query, dane) print('dane załadowane do demo Hurtowni Danych db') cel_kursor.zamknij() w przeciwnym razie: print('dane są puste') def etl_process(zapytania, target_cnx, source_db_config, db_platform): # konfigurowanie połączenia z bazą danych źródła demo if db_platform == 'mysql': source_cnx = mysql.connector.connect(**source_db_config) elif db_platform == 'sqlserver': source_cnx = pyodbc.connect(**source_db_config) w przeciwnym razie: return 'Błąd! nierozpoznana platforma źródłowej bazy danych” # pętla przez zapytania sql dla zapytania w zapytaniach: etl (zapytanie, source_cnx, target_cnx) # zamknij źródłowe połączenie z bazą danych source_cnx.zamknij()
D) główna.py
Ten plik zawiera kod do iteracji przez podane poświadczenia, aby połączyć się z bazą danych i wykonać niezbędne operacje ETL Python. Poniższy skrypt pomoże ci wykonać to zadanie:
# zmienne z db_credentials importuj datawarehouse_db_config, sqlserver_db_config, mysql_db_config z sql_queries import sqlserver_queries, mysql_queries # metody z etl importuje etl_process def główna(): print('rozpoczęcie procesu danych etl') # nawiąż połączenie dla SQL Server, żądane miejsce docelowe target_cnx = pyodbc.connect(**datawarehouse_db_config) # Pętla przez poświadczenia # Baza danych > mysql dla konfiguracji w mysql_db_config: próbować: print("ładowanie bazy danych: " + config['baza danych']) etl_process(mysql_queries, target_cnx, config, 'mysql') z wyjątkiem Wyjątku jako błędu: print("etl for {} ma błąd".format(config['baza danych'])) print('komunikat o błędzie: {}'.format(błąd)) kontyntynuj # Baza danych > serwer sql dla konfiguracji w sqlserver_db_config: próbować: print("ładowanie bazy danych: " + config['baza danych']) etl_process(sqlserver_queries, target_cnx, config, 'sqlserver') z wyjątkiem Wyjątku jako błędu: print("etl for {} ma błąd".format(config['baza danych'])) print('komunikat o błędzie: {}'.format(błąd)) kontyntynuj cel_cnx.zamknij() if __name__ == "__main__": Główny()
Wniosek
Świetna robota! Pomyślnie zdobyłeś podstawową wiedzę na temat budowania Python ETL Pipeline. Teraz możesz zaimplementować swój niestandardowy skrypt ETL języka Python w oparciu o swoje wymagania, wprowadzając zmiany w używanych bazach danych i odpowiednio wysyłając zapytania.
Aby zapoznać się z szeroko stosowanymi narzędziami Python ETL w branży, przeczytaj blog Best Python ETL Tools.
Większość organizacji pracuje obecnie z Big Data. Dlatego tworzenie potoku ETL od podstaw dla takich danych może być czasochłonne i trudne.
Co więcej, przedsiębiorstwa będą musiały zainwestować znaczne środki, aby go zbudować, a następnie zagwarantować, że nadążą za dużym wolumenem danych i fluktuacjami schematów.
Tak więc zamiast tworzyć skrypty ETL od podstaw, możesz wykorzystać zautomatyzowane potoki danych, takie jak Hevo.
Masz jakieś przemyślenia na ten temat? Daj nam znać poniżej w komentarzach lub przenieś dyskusję na naszego Twittera lub Facebooka.