PythonでETLパイプラインを構築する方法
公開: 2022-01-11ETLは、 E xtract、 T ransform 、Loadの略です。 ETLプロセスの一部として、データは抽出され、変換され、データウェアハウスにロードされるため、組織はデータを分析して戦略的な意思決定を行うことができます。
ETLパイプラインで実行される主な手順は次のとおりです。
- 抽出:このプロセスは、データベース、データレイク、CRMなどのさまざまなソースからデータを収集して統合します。
- 変換:これは、ETLパイプラインで最も重要なフェーズです。 データを分析に対応させるには、このステップでデータを適切に収集、並べ替え、クリーンアップ、およびピボットする必要があります。
- ロード:このプロセスでは、データレイク、データベース、その他のソースから構造化データまたは非構造化データをデータウェアハウスにインポートして、データアナリストやその他のユーザーが簡単に深い洞察を得ることができるようにします。
PythonETLの重要性を理解する
Pythonは、現代の世界で最も人気があり、一般的に利用されているプログラミング言語の1つであり、さまざまな分野で無限のアプリケーションがあります。 名誉あるTIOBEプログラミング言語オブザイヤー2021賞を受賞しています。
Pythonは柔軟で動的な性質を備えているため、デプロイ、分析、およびメンテナンスのタスクに最適です。 Python ETLは、データパイプラインを構築し、統計モデルを開発し、それらを徹底的に分析するためにデータエンジニアリングに必要な重要なスキルの1つです。
使いやすさとデータベースやストレージシステムにアクセスするための堅牢なライブラリにより、ETLプロセスを実行するための一般的なツールになりました。 多くのチームは、ETLツールではなくPythonをETLおよびデータエンジニアリングに使用しています。これは、これらのタスクに対してより用途が広く強力であるためです。
他のプログラミング言語に対するPythonの最大の利点は、データマイニング、データサイエンス、ビッグデータ、人工知能、機械学習での使用が簡単なことです。
世界中の企業がデータにPythonを使用して、洞察を得て運用を管理し、すべてをスムーズに実行し続けています。
PythonETLパイプラインを構築するための2つの簡単なステップ
このパートでは、 Pythonを使用してETLパイプラインを構築するための基本的な手順を学習します。 MySQLおよびMicrosoftSQLServerデータベースからMicrosoftSQLServerデータベースにデータをフィードする基本的なデータパイプラインを作成します。
Python ETLスクリプトを設定するには、次の手順に従います。
ステップ1:必要なモジュールをインストールする
Python ETLパイプラインを設定するには、次のモジュールをインストールする必要があります。
- PythonからMySQLへのコネクタ:mysql-connector-python( pip install mysql-connector-pythonコマンドを使用してインストールします)
- PythonからMicrosoftSQL Serverへのコネクタ:pyodbc( pip install pyodbcコマンドを使用してインストールします)
手順2:ETLディレクトリを設定する
上記のパッケージをインストールした後、プロジェクトディレクトリに以下の4つのPythonファイルを作成する必要があります。
- db_credentials.py:このファイルには、すべてのデータベースとの接続を確立するためのコードが含まれています。
- sql_queries.py:このファイルは、文字列形式でデータを抽出およびロードするために一般的に使用されるデータベースクエリで構成されています。
- etl.py:このファイルには、データベースに接続して必要なクエリを実行するために必要な操作が含まれています。
- main.py:これは、PythonETLパイプラインのフローと実行を規制するプライマリファイルです。
A)db_credentials.py
すべてのソースおよびターゲットデータベース接続文字列をこのファイルに含める必要があります。 関連するデータベースにアクセスするために必要なすべての情報がリスト形式で含まれている必要があります。これにより、必要なときにすばやく繰り返すことができます。 以下は、データベース接続を確立するためのサンプルPythonスクリプトです。
datawarehouse_name = 'your_dwh_name' #sql-server(ターゲットデータベース、データウェアハウス) datawarehouse_db_config = { 'Trusted_Connection': 'はい'、 'ドライバー': '{SQL Server}'、 'サーバー': 'datawarehouse_sql_server'、 'データベース': '{}'。format(datawarehouse_name)、 'ユーザー': 'your_db_uname'、 'パスワード': 'your_db_pword'、 'autocommit':True、 } #ソースデータベース> sql-server sqlserver_db_config = [ {{ 'Trusted_Connection': 'はい'、 'ドライバー': '{SQL Server}'、 'サーバー': 'your_db_sql_server'、 'データベース': 'db_1st'、 'ユーザー': 'your_db_uname'、 'パスワード': 'your_db_pword'、 'autocommit':True、 } ] #ソースデータベース> mysql mysql_db_config = [ {{ 'user': 'your_1_user'、 'パスワード': 'your_1_pword'、 'ホスト': 'db_connection_string_1'、 'データベース': 'db_1st'、 }、 {{ 'user': 'your_2_user、 'パスワード': 'your_2_password'、 'ホスト': 'db_connection_string_2'、 'データベース': 'db_2nd'、 }、 ]
B)sql_queries.py
このファイルには、ソースデータベースからデータを抽出してターゲットデータベースにロードするためのクエリが含まれています。 次のスクリプトは、このタスクの実行に役立ちます。
#クエリの例、データベースプラットフォームごとに一意になります sqlserver_extract =( '' ' SELECT sqlserver_col_1、sqlserver_col_2、sqlserver_col_3 FROM sqlserver_1_table '' ') sqlserver_insert =( '' ' INSERT INTO table_demo(col_1、col_2、col_3) 値(?、?、?) '' ') mysql_extract =( '' ' SELECT mysql_col_1、mysql_col_2、mysql_col_3 FROMmysql_demo_table '' ') mysql_insert =( '' ' INSERT INTO table_demo(col_1、col_2、col_3) 値(?、?、?) '' ') #エクスポートされるクエリ クラスSql_Query: def __init __(self、extract_query、load_query): self.extract_query = extract_query self.load_query = load_query #Sql_Queryクラスのインスタンスを作成します sqlserver_query = SqlQuery(sqlserver_extract、sqlserver_insert) mysql_query = SqlQuery(mysql_extract、mysql_insert) #値を反復処理するためのリストを作成する mysql_queries = [mysql_query] sqlserver_queries = [sqlserver_query]
C)etl.py
このファイルには、関連するデータベースにアクセスして必要なクエリを実行するために必要なコードが含まれている必要があります。 次のスクリプトは、このタスクの実行に役立ちます。
#Pythonベースのモジュール pyodbcをインポートする mysql.connectorをインポートします def etl(query、source_cnx、target_cnx): #デモソースデータベースからデータを抽出 source_cursor = source_cnx.cursor() source_cursor.execute(query.extract_query) data = source_cursor.fetchall() source_cursor.close() #データをデモにロードするデータウェアハウスデータベース データの場合: target_cursor = target_cnx.cursor() target_cursor.execute( "USE {}"。format(name_for_datawarehouse)) target_cursor.executemany(query.load_query、data) print( 'デモデータウェアハウスデータベースにロードされたデータ') target_cursor.close() そうしないと: print( 'データは空です') def etl_process(queries、target_cnx、source_db_config、db_platform): #デモソースデータベース接続の構成 db_platform == 'mysql'の場合: source_cnx = mysql.connector.connect(** source_db_config) elif db_platform == 'sqlserver': source_cnx = pyodbc.connect(** source_db_config) そうしないと: 'エラーを返します! 認識されないソースデータベースプラットフォーム ' #SQLクエリをループする クエリ内のクエリの場合: etl(query、source_cnx、target_cnx) #ソースデータベース接続を閉じます source_cnx.close()
D)main.py
このファイルには、データベースに接続して必要なETL Python操作を実行するために、指定された資格情報を反復処理するコードが含まれています。 次のスクリプトは、このタスクの実行に役立ちます。
#変数 db_credentialsからdatawarehouse_db_config、sqlserver_db_config、mysql_db_configをインポートします sql_queriesからimportsqlserver_queries、mysql_queries #メソッド etlからインポートetl_process def main(): print( 'etlデータプロセスの開始') #SQL Server、目的の宛先ストレージへの接続を確立する target_cnx = pyodbc.connect(** datawarehouse_db_config) #クレデンシャルをループする #データベース> mysql mysql_db_configのconfigの場合: 試す: print( "loading db:" + config ['database']) etl_process(mysql_queries、target_cnx、config、 'mysql') エラーとしての例外を除く: print( "etl for {} has error" .format(config ['database'])) print( 'エラーメッセージ:{}'。format(error)) 継続する #データベース> sql-server sqlserver_db_configの構成の場合: 試す: print( "loading db:" + config ['database']) etl_process(sqlserver_queries、target_cnx、config、 'sqlserver') エラーとしての例外を除く: print( "etl for {} has error" .format(config ['database'])) print( 'エラーメッセージ:{}'。format(error)) 継続する target_cnx.close() __name__ == "__main__"の場合: 主要()
結論
すごい仕事! PythonETLパイプラインの構築に関する基本的な理解を得ることができました。 これで、使用しているデータベースに変更を加え、それに応じてクエリを実行することで、要件に基づいてカスタムPythonETLスクリプトを実装できます。
業界で広く使用されているPythonETLツールを調べるには、Best Python ETLToolsブログを読んでください。
現在、ほとんどの組織はビッグデータを使用しています。 したがって、このようなデータのETLパイプラインを最初から作成することは、時間がかかり、困難な場合があります。
さらに、企業はそれを構築するためにかなりの量のリソースを投資し、それから彼らが大量のデータ量とスキーマの変動に追いつくことができることを保証する必要があります。
したがって、ETLスクリプトを最初から作成する代わりに、Hevoなどの自動化されたデータパイプラインを活用できます。
これについて何か考えがありますか? コメントで下に知らせてください、または私たちのツイッターまたはフェイスブックに議論を持ち越してください。