วิธีสร้างไปป์ไลน์ ETL ใน Python
เผยแพร่แล้ว: 2022-01-11ETL ย่อมาจาก E xtract, T ransform, L oad ในฐานะที่เป็นส่วนหนึ่งของกระบวนการ ETL ข้อมูลจะถูกแยก แปลง และโหลดลงในคลังข้อมูล เพื่อให้องค์กรสามารถวิเคราะห์ข้อมูลเพื่อทำการตัดสินใจเชิงกลยุทธ์ได้
ต่อไปนี้คือขั้นตอนสำคัญที่ดำเนินการในไปป์ไลน์ ETL:
- แยกข้อมูล: กระบวนการนี้จะรวบรวมและรวมข้อมูลจากแหล่งต่างๆ รวมถึงฐานข้อมูล Data Lakes CRM และอื่นๆ
- การเปลี่ยนแปลง: นี่เป็นขั้นตอนที่สำคัญที่สุดในท่อส่ง ETL ในการจัดทำข้อมูล พร้อมสำหรับการวิเคราะห์ จะต้องรวบรวม จัดเรียง ทำความสะอาด และปรับเปลี่ยนข้อมูลอย่างเหมาะสมในขั้นตอนนี้
- โหลด: กระบวนการนี้เกี่ยวข้องกับการนำเข้าข้อมูลที่มีโครงสร้างหรือไม่มีโครงสร้างจาก Data Lakes ฐานข้อมูล และแหล่งข้อมูลอื่นๆ ไปยัง Data Warehouses เพื่อให้ Data Analyst หรือผู้ใช้รายอื่นสามารถได้รับข้อมูลเชิงลึกอย่างลึกซึ้งได้อย่างง่ายดาย
การทำความเข้าใจความสำคัญของ Python ETL
Python เป็นหนึ่งในภาษาการเขียนโปรแกรมที่ได้รับความนิยมและมีการใช้ประโยชน์มากที่สุดในโลกสมัยใหม่ โดยมีแอพพลิเคชั่นที่ไม่มีที่สิ้นสุดในหลากหลายสาขา ได้รับรางวัล TIOBE Programming Language of the Year 2021 อันทรงเกียรติ
ลักษณะความยืดหยุ่นและไดนามิกของ Python ทำให้เหมาะสำหรับงานการปรับใช้ การวิเคราะห์ และการบำรุงรักษา Python ETL เป็นหนึ่งในทักษะที่สำคัญที่จำเป็นใน Data Engineering เพื่อสร้าง Data Pipelines พัฒนาแบบจำลองทางสถิติ และทำการวิเคราะห์อย่างละเอียด
ได้กลายเป็นเครื่องมือยอดนิยมสำหรับดำเนินการกระบวนการ ETL เนื่องจากใช้งานง่ายและไลบรารีที่มีประสิทธิภาพสำหรับการเข้าถึงฐานข้อมูลและระบบจัดเก็บข้อมูล หลายทีมใช้ Python สำหรับ ETL & Data Engineering มากกว่าเครื่องมือ ETL เนื่องจากมีความอเนกประสงค์และมีประสิทธิภาพมากกว่าสำหรับงานเหล่านี้
ประโยชน์สูงสุดของ Python เหนือภาษาการเขียนโปรแกรมอื่นๆ คือความเรียบง่ายในการใช้งานใน Data Mining, Data Science, Big Data, ปัญญาประดิษฐ์ และ Machine Learning
บริษัทต่างๆ ทั่วโลกใช้ Python สำหรับข้อมูลเพื่อรับข้อมูลเชิงลึก จัดการการดำเนินงาน และทำให้ทุกอย่างทำงานได้อย่างราบรื่น
2 ขั้นตอนง่ายๆ ในการสร้าง Python ETL Pipeline
ในส่วนนี้ คุณจะได้เรียนรู้ขั้นตอนที่จำเป็นสำหรับการสร้าง ไปป์ไลน์ ETL โดยใช้ Python คุณจะต้องสร้าง Data Pipeline พื้นฐานที่ดึงข้อมูลเข้าสู่ฐานข้อมูล Microsoft SQL Server จากฐานข้อมูล MySQL และ Microsoft SQL Server
ในการตั้งค่าสคริปต์ Python ETL ให้ทำตามขั้นตอนด้านล่าง:
ขั้นตอนที่ 1: ติดตั้งโมดูลที่จำเป็น
ในการตั้งค่า Python ETL Pipeline คุณจะต้องติดตั้งโมดูลต่อไปนี้:
- ตัวเชื่อมต่อ Python to MySQL: mysql-connector-python (ใช้คำสั่ง pip install mysql-connector-python เพื่อติดตั้ง)
- Python กับ Microsoft SQL Server Connector: pyodbc (ใช้คำสั่ง pip install pyodbc เพื่อติดตั้ง)
ขั้นตอนที่ 2: ตั้งค่าไดเรกทอรี ETL
หลังจากติดตั้งแพ็คเกจข้างต้นแล้ว คุณต้องสร้างไฟล์ Python 4 ไฟล์ ดังที่กล่าวถึงด้านล่างในไดเร็กทอรีโครงการของคุณ:
- db_credentials.py: ไฟล์นี้มีโค้ดสำหรับสร้างการเชื่อมต่อกับฐานข้อมูลทั้งหมด
- sql_queries.py: ไฟล์นี้ประกอบด้วยการสืบค้นฐานข้อมูลที่ใช้กันทั่วไปเพื่อแยกและโหลดข้อมูลในรูปแบบสตริง
- etl.py: ไฟล์นี้มีการดำเนินการที่จำเป็นในการเชื่อมต่อกับฐานข้อมูลและเรียกใช้การสืบค้นข้อมูลที่จำเป็น
- main.py: นี่คือไฟล์หลักที่ควบคุมการไหลและการทำงานของ Python ETL Pipeline
ก) db_credentials.py
สตริงการเชื่อมต่อฐานข้อมูลต้นทางและเป้าหมายทั้งหมดควรรวมอยู่ในไฟล์นี้ ควรมีข้อมูลที่จำเป็นทั้งหมดสำหรับการเข้าถึงฐานข้อมูลที่เกี่ยวข้องในรูปแบบรายการเพื่อให้สามารถทำซ้ำได้อย่างรวดเร็วเมื่อจำเป็น ต่อไปนี้คือตัวอย่างสคริปต์ Python เพื่อสร้างการเชื่อมต่อฐานข้อมูล:
datawarehouse_name = 'your_dwh_name' # sql-เซิร์ฟเวอร์ (ฐานข้อมูลเป้าหมาย, คลังข้อมูล) datawarehouse_db_config = { 'Trusted_Connection': 'ใช่', 'ไดรเวอร์': '{SQL Server}', 'เซิร์ฟเวอร์': 'datawarehouse_sql_server', 'ฐานข้อมูล': '{}'.format(datawarehouse_name), 'ผู้ใช้': 'your_db_uname', 'รหัสผ่าน': 'your_db_pword', 'ส่งอัตโนมัติ': จริง } # source db > sql-server sqlserver_db_config = [ { 'Trusted_Connection': 'ใช่', 'ไดรเวอร์': '{SQL Server}', 'เซิร์ฟเวอร์': 'เซิร์ฟเวอร์ของคุณ_db_sql_server', 'ฐานข้อมูล': 'db_1st', 'ผู้ใช้': 'your_db_uname', 'รหัสผ่าน': 'your_db_pword', 'ส่งอัตโนมัติ': จริง } ] # source db > mysql mysql_db_config = [ { 'ผู้ใช้': 'your_1_user', 'รหัสผ่าน': 'your_1_pword', 'โฮสต์': 'db_connection_string_1', 'ฐานข้อมูล': 'db_1st', }, { 'ผู้ใช้': 'your_2_user, 'รหัสผ่าน': 'รหัสผ่านของคุณ_2_รหัสผ่าน', 'โฮสต์': 'db_connection_string_2', 'ฐานข้อมูล': 'db_2nd', }, ]
ข) sql_queries.py
ไฟล์นี้มีคิวรีสำหรับดึงข้อมูลจากฐานข้อมูลต้นทางและโหลดลงในฐานข้อมูลเป้าหมาย สคริปต์ต่อไปนี้จะช่วยคุณทำงานนี้:
# ตัวอย่างการสืบค้นจะไม่ซ้ำกันสำหรับแพลตฟอร์มฐานข้อมูลที่แตกต่างกัน sqlserver_extract = (''' เลือก sqlserver_col_1, sqlserver_col_2, sqlserver_col_3 จาก sqlserver_1_table ''') sqlserver_insert = (''' INSERT INTO table_demo (col_1, col_2, col_3) ค่านิยม (?, ?, ?) ''') mysql_extract = (''' เลือก mysql_col_1, mysql_col_2, mysql_col_3 จาก mysql_demo_table ''') mysql_insert = (''' INSERT INTO table_demo (col_1, col_2, col_3) ค่านิยม (?, ?, ?) ''') # แบบสอบถามกำลังส่งออก คลาส SQL_Query: def __init__(ตัวเอง, extract_query, load_query): self.extract_query = extract_query self.load_query = load_query # สร้างอินสแตนซ์สำหรับคลาส SQL_Query sqlserver_query = SqlQuery (sqlserver_extract, sqlserver_insert) mysql_query = SqlQuery (mysql_extract, mysql_insert) #สร้างรายการวนซ้ำค่า mysql_queries = [mysql_query] sqlserver_queries = [sqlserver_query]
ค) etl.py
ไฟล์นี้ควรมีรหัสที่จำเป็นในการเข้าถึงฐานข้อมูลที่เกี่ยวข้องและดำเนินการค้นหาที่จำเป็น สคริปต์ต่อไปนี้จะช่วยคุณทำงานนี้:
# โมดูลที่ใช้หลาม นำเข้า pyodbc นำเข้า mysql.connector def etl (แบบสอบถาม, source_cnx, target_cnx): # ดึงข้อมูลจากฐานข้อมูลแหล่งสาธิต source_cursor = source_cnx.cursor () source_cursor.execute(query.extract_query) ข้อมูล = source_cursor.fetchall() source_cursor.close() # โหลดข้อมูลลงในตัวอย่าง Data Warehouse db ถ้าข้อมูล: target_cursor = target_cnx.cursor () target_cursor.execute("ใช้ {}".format(name_for_datawarehouse)) target_cursor.executemany (query.load_query ข้อมูล) พิมพ์ ('ข้อมูลที่โหลดไปยังฐานข้อมูลเดโมคลังข้อมูล') target_cursor.close() อื่น: พิมพ์ ('ข้อมูลว่างเปล่า') def etl_process (แบบสอบถาม, target_cnx, source_db_config, db_platform): # การกำหนดค่าการเชื่อมต่อฐานข้อมูลแหล่งสาธิต ถ้า db_platform == 'mysql': source_cnx = mysql.connector.connect (**source_db_config) elif db_platform == 'sqlserver': source_cnx = pyodbc.connect (**source_db_config) อื่น: กลับ 'ข้อผิดพลาด! แพลตฟอร์มฐานข้อมูลต้นทางที่ไม่รู้จัก' # วนซ้ำคำสั่ง sql สำหรับแบบสอบถามในแบบสอบถาม: etl (แบบสอบถาม, source_cnx, target_cnx) # ปิดการเชื่อมต่อฐานข้อมูลต้นทาง source_cnx.close()
ง) main.py
ไฟล์นี้มีโค้ดที่จะวนซ้ำผ่านข้อมูลประจำตัวที่กำหนดเพื่อเชื่อมต่อกับฐานข้อมูลและดำเนินการ ETL Python ที่จำเป็น สคริปต์ต่อไปนี้จะช่วยคุณทำงานนี้:
# ตัวแปร จาก db_credentials นำเข้า datawarehouse_db_config, sqlserver_db_config, mysql_db_config จาก sql_queries นำเข้า sqlserver_queries, mysql_queries #วิธีการ จาก etl นำเข้า etl_process def หลัก (): พิมพ์ ('การเริ่มต้นกระบวนการข้อมูล etl') # สร้างการเชื่อมต่อสำหรับ SQL Server ที่เก็บข้อมูลปลายทางที่ต้องการ target_cnx = pyodbc.connect(**datawarehouse_db_config) # วนซ้ำผ่านข้อมูลประจำตัว # ฐานข้อมูล > mysql สำหรับการกำหนดค่าใน mysql_db_config: พยายาม: พิมพ์ ("กำลังโหลด db: " + config['database']) etl_process (mysql_queries, target_cnx, config, 'mysql') ยกเว้นข้อยกเว้นเป็นข้อผิดพลาด: พิมพ์("etl สำหรับ {} มีข้อผิดพลาด".format(config['database'])) print('ข้อความแสดงข้อผิดพลาด: {}'.format(ข้อผิดพลาด)) ดำเนินต่อ # ฐานข้อมูล > sql-เซิร์ฟเวอร์ สำหรับการกำหนดค่าใน sqlserver_db_config: พยายาม: พิมพ์ ("กำลังโหลด db: " + config['database']) etl_process (sqlserver_queries, target_cnx, config, 'sqlserver') ยกเว้นข้อยกเว้นเป็นข้อผิดพลาด: พิมพ์("etl สำหรับ {} มีข้อผิดพลาด".format(config['database'])) print('ข้อความแสดงข้อผิดพลาด: {}'.format(ข้อผิดพลาด)) ดำเนินต่อ target_cnx.close() ถ้า __name__ == "__main__": หลัก()
บทสรุป
การทำงานที่ดี! คุณได้รับความเข้าใจพื้นฐานเกี่ยวกับการสร้าง Python ETL Pipeline เรียบร้อยแล้ว ตอนนี้คุณสามารถปรับใช้สคริปต์ Python ETL แบบกำหนดเองได้ตามความต้องการของคุณโดยทำการเปลี่ยนแปลงฐานข้อมูลที่ใช้และสืบค้นตามนั้น
หากต้องการสำรวจเครื่องมือ Python ETL ที่ใช้กันอย่างแพร่หลายในอุตสาหกรรม โปรดอ่านบล็อก Best Python ETL Tools
องค์กรส่วนใหญ่ในปัจจุบันทำงานกับ Big Data ดังนั้น การสร้างไปป์ไลน์ ETL ตั้งแต่เริ่มต้นสำหรับข้อมูลดังกล่าวอาจใช้เวลานานและท้าทาย
นอกจากนี้ องค์กรต่างๆ จะต้องลงทุนทรัพยากรจำนวนมากเพื่อสร้างมันขึ้นมา จากนั้นจึงรับประกันว่าจะสามารถติดตามปริมาณข้อมูลที่สูงและความผันผวนของสคีมาได้
ดังนั้น แทนที่จะสร้างสคริปต์ ETL ตั้งแต่เริ่มต้น คุณสามารถใช้ประโยชน์จาก Data Pipelines อัตโนมัติ เช่น Hevo
มีความคิดเกี่ยวกับเรื่องนี้หรือไม่? แจ้งให้เราทราบด้านล่างในความคิดเห็นหรือดำเนินการสนทนาไปที่ Twitter หรือ Facebook ของเรา