วิธีสร้างไปป์ไลน์ ETL ใน Python

เผยแพร่แล้ว: 2022-01-11

ETL ย่อมาจาก E xtract, T ransform, L oad ในฐานะที่เป็นส่วนหนึ่งของกระบวนการ ETL ข้อมูลจะถูกแยก แปลง และโหลดลงในคลังข้อมูล เพื่อให้องค์กรสามารถวิเคราะห์ข้อมูลเพื่อทำการตัดสินใจเชิงกลยุทธ์ได้

ต่อไปนี้คือขั้นตอนสำคัญที่ดำเนินการในไปป์ไลน์ ETL:

  • แยกข้อมูล: กระบวนการนี้จะรวบรวมและรวมข้อมูลจากแหล่งต่างๆ รวมถึงฐานข้อมูล Data Lakes CRM และอื่นๆ
  • การเปลี่ยนแปลง: นี่เป็นขั้นตอนที่สำคัญที่สุดในท่อส่ง ETL ในการจัดทำข้อมูล พร้อมสำหรับการวิเคราะห์ จะต้องรวบรวม จัดเรียง ทำความสะอาด และปรับเปลี่ยนข้อมูลอย่างเหมาะสมในขั้นตอนนี้
  • โหลด: กระบวนการนี้เกี่ยวข้องกับการนำเข้าข้อมูลที่มีโครงสร้างหรือไม่มีโครงสร้างจาก Data Lakes ฐานข้อมูล และแหล่งข้อมูลอื่นๆ ไปยัง Data Warehouses เพื่อให้ Data Analyst หรือผู้ใช้รายอื่นสามารถได้รับข้อมูลเชิงลึกอย่างลึกซึ้งได้อย่างง่ายดาย

การทำความเข้าใจความสำคัญของ Python ETL

Python เป็นหนึ่งในภาษาการเขียนโปรแกรมที่ได้รับความนิยมและมีการใช้ประโยชน์มากที่สุดในโลกสมัยใหม่ โดยมีแอพพลิเคชั่นที่ไม่มีที่สิ้นสุดในหลากหลายสาขา ได้รับรางวัล TIOBE Programming Language of the Year 2021 อันทรงเกียรติ

ลักษณะความยืดหยุ่นและไดนามิกของ Python ทำให้เหมาะสำหรับงานการปรับใช้ การวิเคราะห์ และการบำรุงรักษา Python ETL เป็นหนึ่งในทักษะที่สำคัญที่จำเป็นใน Data Engineering เพื่อสร้าง Data Pipelines พัฒนาแบบจำลองทางสถิติ และทำการวิเคราะห์อย่างละเอียด

ได้กลายเป็นเครื่องมือยอดนิยมสำหรับดำเนินการกระบวนการ ETL เนื่องจากใช้งานง่ายและไลบรารีที่มีประสิทธิภาพสำหรับการเข้าถึงฐานข้อมูลและระบบจัดเก็บข้อมูล หลายทีมใช้ Python สำหรับ ETL & Data Engineering มากกว่าเครื่องมือ ETL เนื่องจากมีความอเนกประสงค์และมีประสิทธิภาพมากกว่าสำหรับงานเหล่านี้

ประโยชน์สูงสุดของ Python เหนือภาษาการเขียนโปรแกรมอื่นๆ คือความเรียบง่ายในการใช้งานใน Data Mining, Data Science, Big Data, ปัญญาประดิษฐ์ และ Machine Learning

บริษัทต่างๆ ทั่วโลกใช้ Python สำหรับข้อมูลเพื่อรับข้อมูลเชิงลึก จัดการการดำเนินงาน และทำให้ทุกอย่างทำงานได้อย่างราบรื่น

2 ขั้นตอนง่ายๆ ในการสร้าง Python ETL Pipeline

ในส่วนนี้ คุณจะได้เรียนรู้ขั้นตอนที่จำเป็นสำหรับการสร้าง ไปป์ไลน์ ETL โดยใช้ Python คุณจะต้องสร้าง Data Pipeline พื้นฐานที่ดึงข้อมูลเข้าสู่ฐานข้อมูล Microsoft SQL Server จากฐานข้อมูล MySQL และ Microsoft SQL Server

ในการตั้งค่าสคริปต์ Python ETL ให้ทำตามขั้นตอนด้านล่าง:

ขั้นตอนที่ 1: ติดตั้งโมดูลที่จำเป็น

ในการตั้งค่า Python ETL Pipeline คุณจะต้องติดตั้งโมดูลต่อไปนี้:

  • ตัวเชื่อมต่อ Python to MySQL: mysql-connector-python (ใช้คำสั่ง pip install mysql-connector-python เพื่อติดตั้ง)
  • Python กับ Microsoft SQL Server Connector: pyodbc (ใช้คำสั่ง pip install pyodbc เพื่อติดตั้ง)

ขั้นตอนที่ 2: ตั้งค่าไดเรกทอรี ETL

หลังจากติดตั้งแพ็คเกจข้างต้นแล้ว คุณต้องสร้างไฟล์ Python 4 ไฟล์ ดังที่กล่าวถึงด้านล่างในไดเร็กทอรีโครงการของคุณ:

  • db_credentials.py: ไฟล์นี้มีโค้ดสำหรับสร้างการเชื่อมต่อกับฐานข้อมูลทั้งหมด
  • sql_queries.py: ไฟล์นี้ประกอบด้วยการสืบค้นฐานข้อมูลที่ใช้กันทั่วไปเพื่อแยกและโหลดข้อมูลในรูปแบบสตริง
  • etl.py: ไฟล์นี้มีการดำเนินการที่จำเป็นในการเชื่อมต่อกับฐานข้อมูลและเรียกใช้การสืบค้นข้อมูลที่จำเป็น
  • main.py: นี่คือไฟล์หลักที่ควบคุมการไหลและการทำงานของ Python ETL Pipeline

ก) db_credentials.py

สตริงการเชื่อมต่อฐานข้อมูลต้นทางและเป้าหมายทั้งหมดควรรวมอยู่ในไฟล์นี้ ควรมีข้อมูลที่จำเป็นทั้งหมดสำหรับการเข้าถึงฐานข้อมูลที่เกี่ยวข้องในรูปแบบรายการเพื่อให้สามารถทำซ้ำได้อย่างรวดเร็วเมื่อจำเป็น ต่อไปนี้คือตัวอย่างสคริปต์ Python เพื่อสร้างการเชื่อมต่อฐานข้อมูล:

 datawarehouse_name = 'your_dwh_name'
# sql-เซิร์ฟเวอร์ (ฐานข้อมูลเป้าหมาย, คลังข้อมูล)
datawarehouse_db_config = {
  'Trusted_Connection': 'ใช่',
  'ไดรเวอร์': '{SQL Server}',
  'เซิร์ฟเวอร์': 'datawarehouse_sql_server',
  'ฐานข้อมูล': '{}'.format(datawarehouse_name),
  'ผู้ใช้': 'your_db_uname',
  'รหัสผ่าน': 'your_db_pword',
  'ส่งอัตโนมัติ': จริง
}
# source db > sql-server
sqlserver_db_config = [
  {
    'Trusted_Connection': 'ใช่',
    'ไดรเวอร์': '{SQL Server}',
    'เซิร์ฟเวอร์': 'เซิร์ฟเวอร์ของคุณ_db_sql_server',
    'ฐานข้อมูล': 'db_1st',
    'ผู้ใช้': 'your_db_uname',
    'รหัสผ่าน': 'your_db_pword',
    'ส่งอัตโนมัติ': จริง
  }
]
# source db > mysql
mysql_db_config = [
  {
    'ผู้ใช้': 'your_1_user',
    'รหัสผ่าน': 'your_1_pword',
    'โฮสต์': 'db_connection_string_1',
    'ฐานข้อมูล': 'db_1st',
  },
  {
    'ผู้ใช้': 'your_2_user,
    'รหัสผ่าน': 'รหัสผ่านของคุณ_2_รหัสผ่าน',
    'โฮสต์': 'db_connection_string_2',
    'ฐานข้อมูล': 'db_2nd',
  },
]

ข) sql_queries.py

ไฟล์นี้มีคิวรีสำหรับดึงข้อมูลจากฐานข้อมูลต้นทางและโหลดลงในฐานข้อมูลเป้าหมาย สคริปต์ต่อไปนี้จะช่วยคุณทำงานนี้:

 # ตัวอย่างการสืบค้นจะไม่ซ้ำกันสำหรับแพลตฟอร์มฐานข้อมูลที่แตกต่างกัน

sqlserver_extract = ('''
  เลือก sqlserver_col_1, sqlserver_col_2, sqlserver_col_3
  จาก sqlserver_1_table
''')
sqlserver_insert = ('''
  INSERT INTO table_demo (col_1, col_2, col_3)
  ค่านิยม (?, ?, ?)  
''')
mysql_extract = ('''
  เลือก mysql_col_1, mysql_col_2, mysql_col_3
  จาก mysql_demo_table
''')
mysql_insert = ('''
  INSERT INTO table_demo (col_1, col_2, col_3)
  ค่านิยม (?, ?, ?)  
''')

# แบบสอบถามกำลังส่งออก
คลาส SQL_Query:
  def __init__(ตัวเอง, extract_query, load_query):
    self.extract_query = extract_query
    self.load_query = load_query   
# สร้างอินสแตนซ์สำหรับคลาส SQL_Query
sqlserver_query = SqlQuery (sqlserver_extract, sqlserver_insert)
mysql_query = SqlQuery (mysql_extract, mysql_insert)
#สร้างรายการวนซ้ำค่า
mysql_queries = [mysql_query]
sqlserver_queries = [sqlserver_query]

ค) etl.py

ไฟล์นี้ควรมีรหัสที่จำเป็นในการเข้าถึงฐานข้อมูลที่เกี่ยวข้องและดำเนินการค้นหาที่จำเป็น สคริปต์ต่อไปนี้จะช่วยคุณทำงานนี้:

 # โมดูลที่ใช้หลาม
นำเข้า pyodbc
นำเข้า mysql.connector

def etl (แบบสอบถาม, source_cnx, target_cnx):
  # ดึงข้อมูลจากฐานข้อมูลแหล่งสาธิต
  source_cursor = source_cnx.cursor ()
  source_cursor.execute(query.extract_query)
  ข้อมูล = source_cursor.fetchall()
  source_cursor.close()

  # โหลดข้อมูลลงในตัวอย่าง Data Warehouse db
  
ถ้าข้อมูล:
    target_cursor = target_cnx.cursor ()
    target_cursor.execute("ใช้ {}".format(name_for_datawarehouse))
    target_cursor.executemany (query.load_query ข้อมูล)
    พิมพ์ ('ข้อมูลที่โหลดไปยังฐานข้อมูลเดโมคลังข้อมูล')
    target_cursor.close()
  อื่น:
    พิมพ์ ('ข้อมูลว่างเปล่า')

def etl_process (แบบสอบถาม, target_cnx, source_db_config, db_platform):

  # การกำหนดค่าการเชื่อมต่อฐานข้อมูลแหล่งสาธิต
  ถ้า db_platform == 'mysql':
    source_cnx = mysql.connector.connect (**source_db_config)
  elif db_platform == 'sqlserver':
    source_cnx = pyodbc.connect (**source_db_config)
  อื่น:
    กลับ 'ข้อผิดพลาด! แพลตฟอร์มฐานข้อมูลต้นทางที่ไม่รู้จัก'
  # วนซ้ำคำสั่ง sql
  สำหรับแบบสอบถามในแบบสอบถาม:
    etl (แบบสอบถาม, source_cnx, target_cnx)    
  # ปิดการเชื่อมต่อฐานข้อมูลต้นทาง
  source_cnx.close()

ง) main.py

ไฟล์นี้มีโค้ดที่จะวนซ้ำผ่านข้อมูลประจำตัวที่กำหนดเพื่อเชื่อมต่อกับฐานข้อมูลและดำเนินการ ETL Python ที่จำเป็น สคริปต์ต่อไปนี้จะช่วยคุณทำงานนี้:

 # ตัวแปร
จาก db_credentials นำเข้า datawarehouse_db_config, sqlserver_db_config, mysql_db_config
จาก sql_queries นำเข้า sqlserver_queries, mysql_queries

#วิธีการ
จาก etl นำเข้า etl_process
def หลัก ():
  พิมพ์ ('การเริ่มต้นกระบวนการข้อมูล etl')
	
  # สร้างการเชื่อมต่อสำหรับ SQL Server ที่เก็บข้อมูลปลายทางที่ต้องการ
  target_cnx = pyodbc.connect(**datawarehouse_db_config)
	
  # วนซ้ำผ่านข้อมูลประจำตัว
  # ฐานข้อมูล > mysql
  สำหรับการกำหนดค่าใน mysql_db_config: 
    พยายาม:
      พิมพ์ ("กำลังโหลด db: " + config['database'])
      etl_process (mysql_queries, target_cnx, config, 'mysql')
    ยกเว้นข้อยกเว้นเป็นข้อผิดพลาด:
      พิมพ์("etl สำหรับ {} มีข้อผิดพลาด".format(config['database']))
      print('ข้อความแสดงข้อผิดพลาด: {}'.format(ข้อผิดพลาด))
      ดำเนินต่อ
	
  # ฐานข้อมูล > sql-เซิร์ฟเวอร์
  สำหรับการกำหนดค่าใน sqlserver_db_config: 
    พยายาม:
      พิมพ์ ("กำลังโหลด db: " + config['database'])
      etl_process (sqlserver_queries, target_cnx, config, 'sqlserver')
    ยกเว้นข้อยกเว้นเป็นข้อผิดพลาด:
      พิมพ์("etl สำหรับ {} มีข้อผิดพลาด".format(config['database']))
      print('ข้อความแสดงข้อผิดพลาด: {}'.format(ข้อผิดพลาด))
      ดำเนินต่อ

  target_cnx.close()
ถ้า __name__ == "__main__":
  หลัก()

บทสรุป

การทำงานที่ดี! คุณได้รับความเข้าใจพื้นฐานเกี่ยวกับการสร้าง Python ETL Pipeline เรียบร้อยแล้ว ตอนนี้คุณสามารถปรับใช้สคริปต์ Python ETL แบบกำหนดเองได้ตามความต้องการของคุณโดยทำการเปลี่ยนแปลงฐานข้อมูลที่ใช้และสืบค้นตามนั้น

หากต้องการสำรวจเครื่องมือ Python ETL ที่ใช้กันอย่างแพร่หลายในอุตสาหกรรม โปรดอ่านบล็อก Best Python ETL Tools

องค์กรส่วนใหญ่ในปัจจุบันทำงานกับ Big Data ดังนั้น การสร้างไปป์ไลน์ ETL ตั้งแต่เริ่มต้นสำหรับข้อมูลดังกล่าวอาจใช้เวลานานและท้าทาย

นอกจากนี้ องค์กรต่างๆ จะต้องลงทุนทรัพยากรจำนวนมากเพื่อสร้างมันขึ้นมา จากนั้นจึงรับประกันว่าจะสามารถติดตามปริมาณข้อมูลที่สูงและความผันผวนของสคีมาได้

ดังนั้น แทนที่จะสร้างสคริปต์ ETL ตั้งแต่เริ่มต้น คุณสามารถใช้ประโยชน์จาก Data Pipelines อัตโนมัติ เช่น Hevo

มีความคิดเกี่ยวกับเรื่องนี้หรือไม่? แจ้งให้เราทราบด้านล่างในความคิดเห็นหรือดำเนินการสนทนาไปที่ Twitter หรือ Facebook ของเรา

คำแนะนำของบรรณาธิการ: