Python에서 ETL 파이프라인을 빌드하는 방법
게시 됨: 2022-01-11ETL 은 추출, T 변환, 로드를 나타 냅니다. ETL 프로세스의 일부로 데이터가 추출, 변환 및 데이터 웨어하우스로 로드되어 조직에서 이를 분석하여 전략적 결정을 내릴 수 있습니다.
다음은 ETL 파이프라인에서 수행되는 주요 단계입니다.
- 추출: 이 프로세스는 데이터베이스, 데이터 레이크, CRM 등을 포함한 다양한 소스에서 데이터를 수집하고 통합합니다.
- 변환: ETL 파이프라인에서 가장 중요한 단계입니다. 데이터를 분석 준비로 만들려면 이 단계에서 적절하게 수집, 정렬, 정리 및 피벗해야 합니다.
- 로드: 이 프로세스에는 데이터 분석가나 다른 사용자가 쉽게 깊은 통찰력을 얻을 수 있도록 데이터 레이크, 데이터베이스 및 기타 소스에서 데이터 웨어하우스로 구조화되거나 구조화되지 않은 데이터를 가져오는 작업이 포함됩니다.
Python ETL의 중요성 이해
Python은 현대 세계에서 가장 인기 있고 일반적으로 활용되는 프로그래밍 언어 중 하나이며 다양한 분야에서 끝없는 응용 프로그램을 제공합니다. 2021년 올해의 TIOBE 프로그래밍 언어 상을 수상했습니다.
Python의 유연하고 동적인 특성은 배포, 분석 및 유지 관리 작업에 이상적입니다. Python ETL은 데이터 파이프라인을 구축하고, 통계 모델을 개발하고, 이에 대한 철저한 분석을 수행하기 위해 데이터 엔지니어링에 필요한 중요한 기술 중 하나입니다.
사용 용이성과 데이터베이스 및 스토리지 시스템 액세스를 위한 강력한 라이브러리로 인해 ETL 프로세스를 실행하는 데 널리 사용되는 도구가 되었습니다. 많은 팀에서 ETL 도구보다 ETL 및 데이터 엔지니어링용 Python을 사용합니다. Python은 이러한 작업에 더 다재다능하고 강력하기 때문입니다.
다른 프로그래밍 언어에 비해 Python의 가장 큰 이점은 데이터 마이닝, 데이터 과학, 빅 데이터, 인공 지능 및 기계 학습에서 사용이 간편하다는 것입니다.
전 세계의 기업은 데이터에 Python을 사용하여 통찰력을 얻고 운영을 관리하며 모든 것이 원활하게 실행되도록 합니다.
Python ETL 파이프라인을 구축하는 쉬운 2단계
이 부분에서는 Python을 사용하여 ETL 파이프라인 을 구축하기 위한 필수 단계를 학습합니다. MySQL 및 Microsoft SQL Server 데이터베이스에서 Microsoft SQL Server 데이터베이스로 데이터를 공급하는 기본 데이터 파이프라인을 생성합니다.
Python ETL 스크립트를 설정하려면 다음 단계를 따르세요.
1단계: 필수 모듈 설치
Python ETL 파이프라인을 설정하려면 다음 모듈을 설치해야 합니다.
- Python-MySQL 커넥터: mysql-connector-python( pip install mysql-connector-python 명령을 사용하여 설치)
- Python-Microsoft SQL Server 커넥터: pyodbc( pip install pyodbc 명령을 사용하여 설치)
2단계: ETL 디렉토리 설정
위 패키지를 설치한 후 프로젝트 디렉토리에 아래에 언급된 4개의 Python 파일을 생성해야 합니다.
- db_credentials.py: 이 파일에는 모든 데이터베이스와의 연결을 설정하는 코드가 포함되어 있습니다.
- sql_queries.py: 이 파일은 문자열 형식으로 데이터를 추출하고 로드하기 위해 일반적으로 사용되는 데이터베이스 쿼리로 구성됩니다.
- etl.py: 이 파일에는 데이터베이스에 연결하고 필요한 쿼리를 실행하는 데 필요한 작업이 있습니다.
- main.py: 이것은 Python ETL 파이프라인의 흐름과 실행을 규제하는 기본 파일입니다.
A) db_credentials.py
모든 소스 및 대상 데이터베이스 연결 문자열이 이 파일에 포함되어야 합니다. 필요할 때 빠르게 반복할 수 있도록 관련 데이터베이스에 액세스하는 데 필요한 모든 정보를 목록 형식으로 포함해야 합니다. 다음은 데이터베이스 연결을 설정하는 샘플 Python 스크립트입니다.
datawarehouse_name = 'your_dwh_name' # sql-server(대상 db, 데이터웨어하우스) 데이터웨어하우스_db_config = { 'Trusted_Connection': '예', '드라이버': '{SQL 서버}', '서버': '데이터웨어하우스_sql_server', '데이터베이스': '{}'.format(datawarehouse_name), '사용자': 'your_db_uname', '비밀번호': 'your_db_pword', '자동 커밋': 참, } # 소스 DB > SQL 서버 sqlserver_db_config = [ { 'Trusted_Connection': '예', '드라이버': '{SQL 서버}', '서버': 'your_db_sql_server', '데이터베이스': 'db_1st', '사용자': 'your_db_uname', '비밀번호': 'your_db_pword', '자동 커밋': 참, } ] # 소스 DB > mysql mysql_db_config = [ { '사용자': 'your_1_user', '비밀번호': 'your_1_pword', '호스트': 'db_connection_string_1', '데이터베이스': 'db_1st', }, { '사용자': 'your_2_user, '비밀번호': 'your_2_password', '호스트': 'db_connection_string_2', '데이터베이스': 'db_2nd', }, ]
나) sql_queries.py
이 파일에는 원본 데이터베이스에서 데이터를 추출하고 대상 데이터베이스로 로드하기 위한 쿼리가 포함되어 있습니다. 다음 스크립트는 이 작업을 수행하는 데 도움이 됩니다.
# 예제 쿼리는 서로 다른 데이터베이스 플랫폼에 대해 고유합니다. sqlserver_extract = (''' sqlserver_col_1, sqlserver_col_2, sqlserver_col_3 선택 sqlserver_1_table에서 ''') sqlserver_insert = (''' INSERT INTO table_demo(col_1, col_2, col_3) 값(?, ?, ?) ''') mysql_extract = (''' mysql_col_1, mysql_col_2, mysql_col_3 선택 mysql_demo_table에서 ''') mysql_insert = (''' INSERT INTO table_demo(col_1, col_2, col_3) 값(?, ?, ?) ''') # 쿼리 내보내기 클래스 SQL_Query: def __init__(self, extract_query, load_query): self.extract_query = extract_query self.load_query = load_query # Sql_Query 클래스에 대한 인스턴스 생성 sqlserver_query = SQLQuery(sqlserver_extract, sqlserver_insert) mysql_query = SqlQuery(mysql_extract, mysql_insert) # 값을 순회하기 위한 목록 생성 mysql_queries = [mysql_query] sqlserver_queries = [sqlserver_query]
다) etl.py
이 파일에는 관련 데이터베이스에 액세스하고 필요한 쿼리를 실행하는 데 필요한 코드가 포함되어야 합니다. 다음 스크립트는 이 작업을 수행하는 데 도움이 됩니다.
# 파이썬 기반 모듈 pyodbc 가져오기 mysql.connector 가져오기 def etl(쿼리, source_cnx, target_cnx): # 데모 소스 데이터베이스에서 데이터 추출 source_cursor = source_cnx.cursor() source_cursor.execute(query.extract_query) 데이터 = source_cursor.fetchall() source_cursor.close() # 데모 Data Warehouse db에 데이터 로드 데이터: target_cursor = target_cnx.cursor() target_cursor.execute("{} 사용".format(name_for_datawarehouse)) target_cursor.executemany(query.load_query, 데이터) print('데모 데이터 웨어하우스 db에 로드된 데이터') target_cursor.close() 또 다른: print('데이터가 비어 있습니다') def etl_process(쿼리, target_cnx, source_db_config, db_platform): # 데모 소스 데이터베이스 연결 구성 db_platform == 'mysql'인 경우: source_cnx = mysql.connector.connect(**source_db_config) elif db_platform == 'sqlserver': source_cnx = pyodbc.connect(**source_db_config) 또 다른: 반환 '오류! 인식할 수 없는 소스 데이터베이스 플랫폼' # SQL 쿼리를 통해 루프 쿼리의 쿼리: etl(쿼리, source_cnx, target_cnx) # 소스 db 연결을 닫습니다. source_cnx.close()
라) 메인.파이
이 파일에는 데이터베이스에 연결하고 필요한 ETL Python 작업을 실행하기 위해 주어진 자격 증명을 반복하는 코드가 포함되어 있습니다. 다음 스크립트는 이 작업을 수행하는 데 도움이 됩니다.
# 변수 db_credentials에서 datawarehouse_db_config, sqlserver_db_config, mysql_db_config 가져오기 sql_queries에서 sqlserver_queries, mysql_queries 가져오기 # 메서드 etl import etl_process에서 def 메인(): print('etl 데이터 프로세스 시작') # 원하는 대상 저장소인 SQL Server에 대한 연결을 설정합니다. target_cnx = pyodbc.connect(**데이터웨어하우스_db_config) # 자격 증명을 통해 반복 # 데이터베이스 > mysql mysql_db_config의 구성: 노력하다: print("DB 로드 중: " + config['데이터베이스']) etl_process(mysql_queries, target_cnx, 구성, 'mysql') 예외를 오류로 제외: print("{}에 대한 etl에 오류가 있습니다.".format(config['database'])) print('오류 메시지: {}'.format(오류)) 계속하다 # 데이터베이스 > sql-server sqlserver_db_config의 구성: 노력하다: print("DB 로드 중: " + config['데이터베이스']) etl_process(sqlserver_queries, target_cnx, 구성, 'sqlserver') 예외를 오류로 제외: print("{}에 대한 etl에 오류가 있습니다.".format(config['database'])) print('오류 메시지: {}'.format(오류)) 계속하다 target_cnx.close() __name__ == "__main__"인 경우: 기본()
결론
훌륭한 일! Python ETL 파이프라인 구축에 대한 기본 이해를 성공적으로 얻었습니다. 이제 사용 중인 데이터베이스를 변경하고 그에 따라 쿼리하여 요구 사항에 따라 사용자 지정 Python ETL 스크립트를 구현할 수 있습니다.
업계에서 널리 사용되는 Python ETL 도구를 살펴보려면 최고의 Python ETL 도구 블로그를 읽어보세요.
오늘날 대부분의 조직은 빅 데이터를 사용합니다. 따라서 이러한 데이터에 대해 처음부터 ETL 파이프라인을 만드는 것은 시간이 많이 걸리고 어려울 수 있습니다.
또한 기업은 이를 구축하고 높은 데이터 볼륨과 스키마 변동을 따라잡을 수 있도록 보장하기 위해 상당한 양의 리소스를 투자해야 합니다.
따라서 ETL 스크립트를 처음부터 생성하는 대신 Hevo와 같은 자동화된 데이터 파이프라인을 활용할 수 있습니다.
이에 대한 생각이 있습니까? 의견에 아래로 알려주거나 Twitter 또는 Facebook으로 토론을 진행하십시오.