如何在 Python 中構建 ETL 管道
已發表: 2022-01-11ETL代表提取、轉換、負載。 作為 ETL 流程的一部分,數據被提取、轉換並加載到數據倉庫中,以便組織可以對其進行分析以製定戰略決策。
以下是 ETL 管道中執行的關鍵步驟:
- 提取:此過程收集和集成來自各種來源的數據,包括數據庫、數據湖、CRM 等。
- 轉換:這是 ETL 管道中最關鍵的階段。 為了使數據做好分析準備,必須在此步驟中正確收集、排序、清理和旋轉數據。
- 加載:此過程涉及將結構化或非結構化數據從數據湖、數據庫和其他來源導入數據倉庫,以便數據分析師或其他用戶可以輕鬆獲得深入的見解。
了解 Python ETL 的重要性
Python 是現代世界上最流行和最常用的編程語言之一,在各個領域都有無窮無盡的應用。 它贏得了著名的 TIOBE 2021 年度編程語言獎。
Python 的靈活和動態特性使其成為部署、分析和維護任務的理想選擇。 Python ETL 是數據工程構建數據管道、開發統計模型並對它們進行全面分析所需的關鍵技能之一。
由於其易用性和用於訪問數據庫和存儲系統的強大庫,它已成為執行 ETL 流程的流行工具。 許多團隊將 Python 用於 ETL 和數據工程,而不是 ETL 工具,因為它對於這些任務更加通用和強大。
與其他編程語言相比,Python 的最大好處是在數據挖掘、數據科學、大數據、人工智能和機器學習中的使用簡單。
世界各地的公司都使用 Python 獲取數據,以獲取洞察、管理運營並保持一切順利運行。
構建 Python ETL 管道的 2 個簡單步驟
在這一部分中,您將學習使用 Python 構建 ETL 管道的基本步驟。 您將創建一個基本的數據管道,將數據從 MySQL 和 Microsoft SQL Server 數據庫饋送到 Microsoft SQL Server 數據庫。
為了設置 Python ETL 腳本,請按照以下步驟操作:
第 1 步:安裝所需模塊
要設置 Python ETL 管道,您需要安裝以下模塊:
- Python to MySQL Connector:mysql-connector-python(使用pip install mysql-connector-python命令安裝)
- Python to Microsoft SQL Server Connector:pyodbc(使用pip install pyodbc命令安裝)
第 2 步:設置 ETL 目錄
安裝上述包後,您需要在項目目錄中創建 4 個 Python 文件,如下所述:
- db_credentials.py:該文件包含與所有數據庫建立連接的代碼。
- sql_queries.py:該文件包含常用的數據庫查詢,用於以字符串格式提取和加載數據。
- etl.py:該文件擁有連接數據庫和運行所需查詢的必要操作。
- main.py:這是規範 Python ETL 管道的流程和執行的主要文件。
A) db_credentials.py
所有源和目標數據庫連接字符串都應包含在此文件中。 它應該包含以列表格式訪問相關數據庫的所有必要信息,以便在需要時可以快速迭代。 以下是用於建立數據庫連接的示例 Python 腳本:
datawarehouse_name = 'your_dwh_name' # sql-server(目標數據庫,數據倉庫) datawarehouse_db_config = { 'Trusted_Connection':'是', '驅動程序': '{SQL Server}', '服務器': 'datawarehouse_sql_server', '數據庫': '{}'.format(datawarehouse_name), '用戶': 'your_db_uname', '密碼': 'your_db_pword', “自動提交”:是的, } # 源數據庫 > sql-server sqlserver_db_config = [ { 'Trusted_Connection':'是', '驅動程序': '{SQL Server}', '服務器': 'your_db_sql_server', '數據庫':'db_1st', '用戶': 'your_db_uname', '密碼': 'your_db_pword', “自動提交”:是的, } ] # 源數據庫 > mysql mysql_db_config = [ { “用戶”:“您的_1_用戶”, “密碼”:“你的_1_密碼”, '主機':'db_connection_string_1', '數據庫':'db_1st', }, { '用戶':'你的_2_用戶, “密碼”:“您的_2_密碼”, '主機':'db_connection_string_2', '數據庫':'db_2nd', }, ]
B) sql_queries.py
該文件包括用於從源數據庫中提取數據並將其加載到目標數據庫中的查詢。 以下腳本將幫助您執行此任務:
# 示例查詢,對於不同的數據庫平台將是唯一的 sqlserver_extract = (''' 選擇 sqlserver_col_1、sqlserver_col_2、sqlserver_col_3 FROM sqlserver_1_table ''') sqlserver_insert = (''' 插入到 table_demo (col_1, col_2, col_3) 值(?,?,?) ''') mysql_extract = (''' 選擇 mysql_col_1、mysql_col_2、mysql_col_3 來自 mysql_demo_table ''') mysql_insert = (''' 插入到 table_demo (col_1, col_2, col_3) 值(?,?,?) ''') # 查詢被導出 類 Sql_Query: def __init__(self, extract_query, load_query): self.extract_query = extract_query self.load_query = load_query # 為 Sql_Query 類創建實例 sqlserver_query = SqlQuery(sqlserver_extract, sqlserver_insert) mysql_query = SqlQuery(mysql_extract, mysql_insert) # 創建一個用於遍歷值的列表 mysql_queries = [mysql_query] sqlserver_queries = [sqlserver_query]
C) etl.py
該文件應包含訪問相關數據庫和執行所需查詢所需的代碼。 以下腳本將幫助您執行此任務:
# 基於python的模塊 導入pyodbc 導入 mysql.connector def etl(查詢,source_cnx,target_cnx): # 從演示源數據庫中提取數據 source_cursor = source_cnx.cursor() source_cursor.execute(query.extract_query) 數據 = source_cursor.fetchall() source_cursor.close() # 將數據加載到演示數據倉庫數據庫中 如果數據: target_cursor = target_cnx.cursor() target_cursor.execute("USE {}".format(name_for_datawarehouse)) target_cursor.executemany(query.load_query,數據) print('數據加載到演示數據倉庫數據庫') target_cursor.close() 別的: print('數據為空') def etl_process(查詢,target_cnx,source_db_config,db_platform): # 配置演示源數據庫連接 如果 db_platform == 'mysql': source_cnx = mysql.connector.connect(**source_db_config) elif db_platform == 'sqlserver': source_cnx = pyodbc.connect(**source_db_config) 別的: 返回'錯誤! 無法識別的源數據庫平台' # 循環遍歷 sql 查詢 對於查詢中的查詢: etl(查詢,source_cnx,target_cnx) # 關閉源數據庫連接 source_cnx.close()
D) main.py
此文件包含用於迭代給定憑據以連接到數據庫並執行必要的 ETL Python 操作的代碼。 以下腳本將幫助您執行此任務:
# 變量 從 db_credentials 導入 datawarehouse_db_config、sqlserver_db_config、mysql_db_config 從 sql_queries 導入 sqlserver_queries、mysql_queries # 方法 從 etl 導入 etl_process 定義主(): print('啟動 etl 數據處理') # 為 SQL Server 建立連接,所需的目標存儲 target_cnx = pyodbc.connect(**datawarehouse_db_config) # 遍歷憑證 # 數據庫 > mysql 對於 mysql_db_config 中的配置: 嘗試: print("加載數據庫:" + config['數據庫']) etl_process(mysql_queries,target_cnx,配置,'mysql') 除了異常作為錯誤: print("etl for {} 有錯誤".format(config['database'])) print('錯誤信息:{}'.format(error)) 繼續 # 數據庫 > sql-server 對於 sqlserver_db_config 中的配置: 嘗試: print("加載數據庫:" + config['數據庫']) etl_process(sqlserver_queries, target_cnx, config, 'sqlserver') 除了異常作為錯誤: print("etl for {} 有錯誤".format(config['database'])) print('錯誤信息:{}'.format(error)) 繼續 target_cnx.close() 如果 __name__ == "__main__": 主要的()
結論
做得好! 您已經成功地獲得了構建 Python ETL 管道的基本知識。 現在,您可以通過更改正在使用的數據庫和相應的查詢來根據您的要求實現自定義 Python ETL 腳本。
要探索業界廣泛使用的 Python ETL 工具,請閱讀 Best Python ETL Tools 博客。
如今,大多數組織都使用大數據。 因此,從頭開始為此類數據創建 ETL 管道可能既耗時又具有挑戰性。
此外,企業將需要投入大量資源來構建它,然後保證他們能夠跟上高數據量和架構波動的步伐。
因此,您可以利用 Hevo 等自動化數據管道,而不是從頭開始創建 ETL 腳本。
對此有什麼想法嗎? 在下面的評論中讓我們知道,或者將討論帶到我們的 Twitter 或 Facebook。