Comment créer des pipelines ETL en Python
Publié: 2022-01-11ETL signifie Extraire , Transformer , Charger. Dans le cadre du processus ETL, les données sont extraites, transformées et chargées dans des entrepôts de données afin que les organisations puissent les analyser pour prendre des décisions stratégiques.
Voici les étapes clés effectuées dans le pipeline ETL :
- Extraire : ce processus collecte et intègre des données provenant de diverses sources, notamment des bases de données, des lacs de données, des CRM et autres.
- Transformer : il s'agit de la phase la plus cruciale d'un pipeline ETL. Pour que les données soient prêtes pour l'analyse, elles doivent être correctement collectées, triées, nettoyées et pivotées à cette étape.
- Chargement : ce processus implique l'importation de données structurées ou non structurées à partir de lacs de données, de bases de données et d'autres sources dans des entrepôts de données afin que les analystes de données ou d'autres utilisateurs puissent facilement obtenir des informations approfondies.
Comprendre l'importance de Python ETL
Python est l'un des langages de programmation les plus populaires et les plus utilisés au monde moderne, avec des applications infinies dans une variété de domaines. Il a remporté le prestigieux prix TIOBE du langage de programmation de l'année 2021.
La nature flexible et dynamique de Python le rend idéal pour les tâches de déploiement, d'analyse et de maintenance. Python ETL est l'une des compétences cruciales requises en ingénierie des données pour créer des pipelines de données, développer des modèles statistiques et effectuer une analyse approfondie de ceux-ci.
Il est devenu un outil populaire pour l'exécution de processus ETL en raison de sa facilité d'utilisation et de ses bibliothèques robustes pour accéder aux bases de données et aux systèmes de stockage. De nombreuses équipes utilisent Python pour ETL & Data Engineering plutôt qu'un outil ETL car il est plus polyvalent et puissant pour ces tâches.
Le plus grand avantage de Python par rapport aux autres langages de programmation est la simplicité d'utilisation dans l'exploration de données, la science des données, le Big Data, l'intelligence artificielle et l'apprentissage automatique.
Les entreprises du monde entier utilisent Python pour leurs données afin d'obtenir des informations, de gérer leurs opérations et de garantir le bon fonctionnement de tout.
2 étapes faciles pour créer un pipeline ETL Python
Dans cette partie, vous apprendrez les étapes essentielles pour créer un pipeline ETL à l'aide de Python . Vous allez créer un pipeline de données de base qui alimente en données une base de données Microsoft SQL Server à partir des bases de données MySQL et Microsoft SQL Server.
Pour configurer le script Python ETL, suivez les étapes ci-dessous :
Étape 1 : Installez les modules requis
Pour configurer le pipeline ETL Python, vous devez installer les modules suivants :
- Connecteur Python vers MySQL : mysql-connector-python (utilisez la commande pip install mysql-connector-python pour l'installation)
- Connecteur Python vers Microsoft SQL Server : pyodbc (utilisez la commande pip install pyodbc pour l'installation)
Étape 2 : Configurer le répertoire ETL
Après avoir installé les packages ci-dessus, vous devez créer 4 fichiers Python, mentionnés ci-dessous dans votre répertoire de projet :
- db_credentials.py : ce fichier inclut le code permettant d'établir des connexions avec toutes les bases de données.
- sql_queries.py : ce fichier comprend les requêtes de base de données couramment utilisées pour extraire et charger des données au format chaîne.
- etl.py : Ce fichier possède les opérations nécessaires pour se connecter à la base de données et exécuter les requêtes requises.
- main.py : il s'agit du fichier principal qui régule le flux et l'exécution du pipeline ETL Python.
A) db_credentials.py
Toutes les chaînes de connexion à la base de données source et cible doivent être incluses dans ce fichier. Il doit contenir toutes les informations nécessaires pour accéder à la base de données pertinente sous forme de liste afin qu'elle puisse être rapidement itérée en cas de besoin. Voici un exemple de script Python pour établir la connexion à la base de données :
datawarehouse_name = 'votre_nom_dwh' # sql-server (base de données cible, entrepôt de données) datawarehouse_db_config = { 'Trusted_Connection' : 'oui', 'pilote' : '{SQL Server}', 'serveur' : 'datawarehouse_sql_server', 'database' : '{}'.format(datawarehouse_name), 'utilisateur' : 'votre_nom_db', 'password': 'votre_db_pword', 'autocommit' : vrai, } # source db > sql-server sqlserver_db_config = [ { 'Trusted_Connection' : 'oui', 'pilote' : '{SQL Server}', 'serveur' : 'votre_serveur_db_sql', 'base de données' : 'db_1st', 'utilisateur' : 'votre_nom_db', 'password': 'votre_db_pword', 'autocommit' : vrai, } ] # base de données source > mysql mysql_db_config = [ { 'utilisateur' : 'votre_1_utilisateur', 'password' : 'votre_1_pmot', 'hôte' : 'db_connection_string_1', 'base de données' : 'db_1st', }, { 'utilisateur' : 'votre_2_utilisateur, 'mot_de_passe' : 'votre_2_mot_de_passe', 'hôte' : 'db_connection_string_2', 'base de données' : 'db_2nd', }, ]
B) sql_queries.py
Ce fichier comprend des requêtes pour extraire des données des bases de données source et les charger dans la base de données cible. Le script suivant vous aidera à effectuer cette tâche :
# requêtes d'exemple, seront uniques pour différentes plates-formes de base de données sqlserver_extract = (''' SELECT sqlserver_col_1, sqlserver_col_2, sqlserver_col_3 DE sqlserver_1_table ''') sqlserver_insert = (''' INSERT INTO table_demo (col_1, col_2, col_3) VALEURS (?, ?, ?) ''') mysql_extract = (''' SELECT mysql_col_1, mysql_col_2, mysql_col_3 DE mysql_demo_table ''') mysql_insert = (''' INSERT INTO table_demo (col_1, col_2, col_3) VALEURS (?, ?, ?) ''') # Requêtes exportées classe Sql_Query : def __init__(self, extract_query, load_query): self.extract_query = extract_query self.load_query = load_query # créer des instances pour la classe Sql_Query sqlserver_query = SqlQuery(sqlserver_extract, sqlserver_insert) mysql_query = SqlQuery(mysql_extract, mysql_insert) # créer une liste pour parcourir les valeurs mysql_queries = [mysql_query] sqlserver_queries = [sqlserver_query]
C) etl.py
Ce fichier doit inclure le code requis pour accéder aux bases de données pertinentes et exécuter les requêtes requises. Le script suivant vous aidera à effectuer cette tâche :
# modules basés sur python importer pyodbc importer mysql.connector def etl (requête, source_cnx, cible_cnx): # extraire les données de la base de données source de démonstration source_cursor = source_cnx.cursor() source_cursor.execute(query.extract_query) données = source_cursor.fetchall() curseur_source.close() # charger les données dans la démo Data Warehouse db si données : target_cursor = target_cnx.cursor() target_cursor.execute("USE {}".format(name_for_datawarehouse)) target_cursor.executemany(query.load_query, données) print('données chargées dans la démo Data Warehouse db') curseur_cible.close() autre: print('les données sont vides') def etl_process (requêtes, target_cnx, source_db_config, db_platform): # configuration de la connexion à la base de données source de démonstration si db_platform == 'mysql' : source_cnx = mysql.connector.connect(**source_db_config) elif db_platform == 'sqlserver' : source_cnx = pyodbc.connect(**source_db_config) autre: renvoie 'Erreur ! plate-forme de base de données source non reconnue' # boucle dans les requêtes sql pour la requête dans les requêtes : etl (requête, source_cnx, cible_cnx) # fermer la connexion à la base de données source source_cnx.close()
D) main.py
Ce fichier inclut du code pour parcourir les informations d'identification données pour se connecter à la base de données et exécuter les opérations ETL Python nécessaires. Le script suivant vous aidera à effectuer cette tâche :
# variables à partir de db_credentials importer datawarehouse_db_config, sqlserver_db_config, mysql_db_config à partir de sql_queries importer sqlserver_queries, mysql_queries # méthodes depuis etl importer etl_process def main() : print('démarrage du processus de données etl') # établir une connexion pour SQL Server, stockage de destination souhaité target_cnx = pyodbc.connect(**datawarehouse_db_config) # boucle sur les informations d'identification # Base de données > mysql pour la configuration dans mysql_db_config : essayer: print("loading db: " + config['database']) etl_process(mysql_queries, target_cnx, config, 'mysql') sauf Exception comme erreur : print("etl pour {} a une erreur".format(config['database'])) print('message d'erreur : {}'.format(error)) Continuez # Base de données > sql-server pour la configuration dans sqlserver_db_config : essayer: print("loading db: " + config['database']) etl_process(sqlserver_queries, target_cnx, config, 'sqlserver') sauf Exception comme erreur : print("etl pour {} a une erreur".format(config['database'])) print('message d'erreur : {}'.format(error)) Continuez target_cnx.close() si __nom__ == "__main__": principale()
Conclusion
Bon travail! Vous avez acquis avec succès une compréhension de base de la construction de Python ETL Pipeline. Vous pouvez désormais implémenter votre script Python ETL personnalisé en fonction de vos besoins en apportant des modifications aux bases de données utilisées et aux requêtes en conséquence.
Pour explorer les outils Python ETL largement utilisés dans l'industrie, lisez le blog Best Python ETL Tools.
La plupart des organisations travaillent aujourd'hui avec le Big Data. Par conséquent, la création d'un pipeline ETL à partir de zéro pour de telles données peut être longue et difficile.
De plus, les entreprises devront investir une quantité importante de ressources pour le construire et garantir ensuite qu'elles peuvent suivre le rythme du volume élevé de données et des fluctuations de schéma.
Ainsi, au lieu de créer des scripts ETL à partir de zéro, vous pouvez exploiter des pipelines de données automatisés tels que Hevo.
Avez-vous des idées à ce sujet? Faites-le nous savoir ci-dessous dans les commentaires ou transférez la discussion sur notre Twitter ou Facebook.