如何在 Python 中构建 ETL 管道

已发表: 2022-01-11

ETL代表提取、转换负载。 作为 ETL 流程的一部分,数据被提取、转换并加载到数据仓库中,以便组织可以对其进行分析以制定战略决策。

以下是 ETL 管道中执行的关键步骤:

  • 提取:此过程收集和集成来自各种来源的数据,包括数据库、数据湖、CRM 等。
  • 转换:这是 ETL 管道中最关键的阶段。 为了使数据做好分析准备,必须在此步骤中正确收集、排序、清理和旋转数据。
  • 加载:此过程涉及将结构化或非结构化数据从数据湖、数据库和其他来源导入数据仓库,以便数据分析师或其他用户可以轻松获得深入的见解。

了解 Python ETL 的重要性

Python 是现代世界上最流行和最常用的编程语言之一,在各个领域都有无穷无尽的应用。 它赢得了著名的 TIOBE 2021 年度编程语言奖。

Python 的灵活和动态特性使其成为部署、分析和维护任务的理想选择。 Python ETL 是数据工程构建数据管道、开发统计模型并对它们进行全面分析所需的关键技能之一。

由于其易用性和用于访问数据库和存储系统的强大库,它已成为执行 ETL 流程的流行工具。 许多团队将 Python 用于 ETL 和数据工程,而不是 ETL 工具,因为它对于这些任务更加通用和强大。

与其他编程语言相比,Python 的最大好处是在数据挖掘、数据科学、大数据、人工智能和机器学习中的使用简单。

世界各地的公司都使用 Python 获取数据,以获取洞察、管理运营并保持一切顺利运行。

构建 Python ETL 管道的 2 个简单步骤

在这一部分中,您将学习使用 Python 构建 ETL 管道的基本步骤。 您将创建一个基本的数据管道,将数据从 MySQL 和 Microsoft SQL Server 数据库馈送到 Microsoft SQL Server 数据库。

为了设置 Python ETL 脚本,请按照以下步骤操作:

第 1 步:安装所需模块

要设置 Python ETL 管道,您需要安装以下模块:

  • Python to MySQL Connector:mysql-connector-python(使用pip install mysql-connector-python命令安装)
  • Python to Microsoft SQL Server Connector:pyodbc(使用pip install pyodbc命令安装)

第 2 步:设置 ETL 目录

安装上述包后,您需要在项目目录中创建 4 个 Python 文件,如下所述:

  • db_credentials.py:该文件包含与所有数据库建立连接的代码。
  • sql_queries.py:该文件包含常用的数据库查询,用于以字符串格式提取和加载数据。
  • etl.py:该文件拥有连接数据库和运行所需查询的必要操作。
  • main.py:这是规范 Python ETL 管道的流程和执行的主要文件。

A) db_credentials.py

所有源和目标数据库连接字符串都应包含在此文件中。 它应该包含以列表格式访问相关数据库的所有必要信息,以便在需要时可以快速迭代。 以下是用于建立数据库连接的示例 Python 脚本:

 datawarehouse_name = 'your_dwh_name'
# sql-server(目标数据库,数据仓库)
datawarehouse_db_config = {
  'Trusted_Connection':'是',
  '驱动程序': '{SQL Server}',
  '服务器': 'datawarehouse_sql_server',
  '数据库': '{}'.format(datawarehouse_name),
  '用户': 'your_db_uname',
  '密码': 'your_db_pword',
  “自动提交”:是的,
}
# 源数据库 > sql-server
sqlserver_db_config = [
  {
    'Trusted_Connection':'是',
    '驱动程序': '{SQL Server}',
    '服务器': 'your_db_sql_server',
    '数据库':'db_1st',
    '用户': 'your_db_uname',
    '密码': 'your_db_pword',
    “自动提交”:是的,
  }
]
# 源数据库 > mysql
mysql_db_config = [
  {
    “用户”:“您的_1_用户”,
    “密码”:“你的_1_密码”,
    '主机':'db_connection_string_1',
    '数据库':'db_1st',
  },
  {
    '用户':'你的_2_用户,
    “密码”:“您的_2_密码”,
    '主机':'db_connection_string_2',
    '数据库':'db_2nd',
  },
]

B) sql_queries.py

该文件包括用于从源数据库中提取数据并将其加载到目标数据库中的查询。 以下脚本将帮助您执行此任务:

 # 示例查询,对于不同的数据库平台将是唯一的

sqlserver_extract = ('''
  选择 sqlserver_col_1、sqlserver_col_2、sqlserver_col_3
  FROM sqlserver_1_table
''')
sqlserver_insert = ('''
  插入到 table_demo (col_1, col_2, col_3)
  值(?,?,?)  
''')
mysql_extract = ('''
  选择 mysql_col_1、mysql_col_2、mysql_col_3
  来自 mysql_demo_table
''')
mysql_insert = ('''
  插入到 table_demo (col_1, col_2, col_3)
  值(?,?,?)  
''')

# 查询被导出
类 Sql_Query:
  def __init__(self, extract_query, load_query):
    self.extract_query = extract_query
    self.load_query = load_query   
# 为 Sql_Query 类创建实例
sqlserver_query = SqlQuery(sqlserver_extract, sqlserver_insert)
mysql_query = SqlQuery(mysql_extract, mysql_insert)
# 创建一个用于遍历值的列表
mysql_queries = [mysql_query]
sqlserver_queries = [sqlserver_query]

C) etl.py

该文件应包含访问相关数据库和执行所需查询所需的代码。 以下脚本将帮助您执行此任务:

 # 基于python的模块
导入pyodbc
导入 mysql.connector

def etl(查询,source_cnx,target_cnx):
  # 从演示源数据库中提取数据
  source_cursor = source_cnx.cursor()
  source_cursor.execute(query.extract_query)
  数据 = source_cursor.fetchall()
  source_cursor.close()

  # 将数据加载到演示数据仓库数据库中
  
如果数据:
    target_cursor = target_cnx.cursor()
    target_cursor.execute("USE {}".format(name_for_datawarehouse))
    target_cursor.executemany(query.load_query,数据)
    print('数据加载到演示数据仓库数据库')
    target_cursor.close()
  别的:
    print('数据为空')

def etl_process(查询,target_cnx,source_db_config,db_platform):

  # 配置演示源数据库连接
  如果 db_platform == 'mysql':
    source_cnx = mysql.connector.connect(**source_db_config)
  elif db_platform == 'sqlserver':
    source_cnx = pyodbc.connect(**source_db_config)
  别的:
    返回'错误! 无法识别的源数据库平台'
  # 循环遍历 sql 查询
  对于查询中的查询:
    etl(查询,source_cnx,target_cnx)    
  # 关闭源数据库连接
  source_cnx.close()

D) main.py

此文件包含用于迭代给定凭据以连接到数据库并执行必要的 ETL Python 操作的代码。 以下脚本将帮助您执行此任务:

 # 变量
从 db_credentials 导入 datawarehouse_db_config、sqlserver_db_config、mysql_db_config
从 sql_queries 导入 sqlserver_queries、mysql_queries

# 方法
从 etl 导入 etl_process
定义主():
  print('启动 etl 数据处理')
	
  # 为 SQL Server 建立连接,所需的目标存储
  target_cnx = pyodbc.connect(**datawarehouse_db_config)
	
  # 遍历凭证
  # 数据库 > mysql
  对于 mysql_db_config 中的配置: 
    尝试:
      print("加载数据库:" + config['数据库'])
      etl_process(mysql_queries,target_cnx,配置,'mysql')
    除了异常作为错误:
      print("etl for {} 有错误".format(config['database']))
      print('错误信息:{}'.format(error))
      继续
	
  # 数据库 > sql-server
  对于 sqlserver_db_config 中的配置: 
    尝试:
      print("加载数据库:" + config['数据库'])
      etl_process(sqlserver_queries, target_cnx, config, 'sqlserver')
    除了异常作为错误:
      print("etl for {} 有错误".format(config['database']))
      print('错误信息:{}'.format(error))
      继续

  target_cnx.close()
如果 __name__ == "__main__":
  主要的()

结论

做得好! 您已经成功地获得了构建 Python ETL 管道的基本知识。 现在,您可以通过更改正在使用的数据库和相应的查询来根据您的要求实现自定义 Python ETL 脚本。

要探索业界广泛使用的 Python ETL 工具,请阅读 Best Python ETL Tools 博客。

如今,大多数组织都使用大数据。 因此,从头开始为此类数据创建 ETL 管道可能既耗时又具有挑战性。

此外,企业将需要投入大量资源来构建它,然后保证他们能够跟上高数据量和架构波动的步伐。

因此,您可以利用 Hevo 等自动化数据管道,而不是从头开始创建 ETL 脚本。

对此有什么想法吗? 在下面的评论中让我们知道,或者将讨论带到我们的 Twitter 或 Facebook。

编辑推荐: