Python'da ETL boru hatları nasıl oluşturulur

Yayınlanan: 2022-01-11

ETL , Çıkar , Dönüştür , Yük anlamına gelir. ETL sürecinin bir parçası olarak veriler, kuruluşların stratejik kararlar almak üzere analiz edebilmeleri için Çıkarılır, Dönüştürülür ve Veri Ambarlarına Yüklenir.

ETL ardışık düzeninde gerçekleştirilen temel adımlar şunlardır:

  • Ayıkla: Bu süreç, Veritabanları, Veri Gölleri, CRM'ler ve diğerleri dahil olmak üzere çeşitli kaynaklardan verileri toplar ve bütünleştirir.
  • Dönüşüm: Bu, bir ETL Pipeline'daki en önemli aşamadır. Verileri analize hazır hale getirmek için bu adımda düzgün bir şekilde toplanmalı, sıralanmalı, temizlenmeli ve döndürülmelidir.
  • Yükleme: Bu süreç, Veri Analistlerinin veya diğer kullanıcıların kolayca derinlemesine içgörüler elde edebilmesi için Veri Göllerinden, Veritabanlarından ve diğer kaynaklardan yapılandırılmış veya yapılandırılmamış verilerin Veri Ambarı'na aktarılmasını içerir.

Python ETL'nin Önemini Anlamak

Python, çeşitli alanlarda sonsuz uygulamalarla modern dünyanın en popüler ve yaygın olarak kullanılan programlama dillerinden biridir. Prestijli TIOBE Yılın Programlama Dili 2021 ödülünü kazandı.

Python'un esnek ve dinamik yapısı, onu Dağıtım, Analiz ve Bakım görevleri için ideal kılar. Python ETL, Veri Boru Hatları oluşturmak, İstatistiksel Modeller geliştirmek ve bunlar üzerinde kapsamlı bir analiz gerçekleştirmek için Veri Mühendisliğinde gerekli olan çok önemli becerilerden biridir.

Kullanım kolaylığı ve veritabanlarına ve depolama sistemlerine erişim için sağlam kitaplıklar nedeniyle ETL işlemlerini yürütmek için popüler bir araç haline geldi. Birçok ekip, bu görevler için daha çok yönlü ve güçlü olduğundan ETL aracı yerine ETL ve Veri Mühendisliği için Python kullanır.

Python'un diğer programlama dillerine göre en büyük yararı, Veri Madenciliği, Veri Bilimi, Büyük Veri, Yapay Zeka ve Makine Öğreniminde kullanım kolaylığıdır.

Dünyanın dört bir yanındaki şirketler, içgörüler elde etmek, operasyonlarını yönetmek ve her şeyin sorunsuz çalışmasını sağlamak için verileri için Python kullanıyor.

Python ETL Pipeline Oluşturmak için 2 Kolay Adım

Bu bölümde, Python kullanarak bir ETL ardışık düzeni oluşturmak için gerekli adımları öğreneceksiniz. MySQL ve Microsoft SQL Server Veritabanlarından bir Microsoft SQL Server veritabanına veri besleyen temel bir Veri Hattı oluşturacaksınız.

Python ETL betiğini kurmak için aşağıdaki adımları izleyin:

Adım 1: Gerekli Modülleri Kurun

Python ETL Pipeline'ı kurmak için aşağıdaki modülleri yüklemeniz gerekir:

  • Python'dan MySQL Connector'a: mysql-connector-python ( Yüklemek için pip install mysql-connector-python komutunu kullanın)
  • Python'dan Microsoft SQL Server Connector'a: pyodbc (Yüklemek için pip install pyodbc komutunu kullanın)

2. Adım: ETL Dizinini Kurun

Yukarıdaki paketleri kurduktan sonra, proje dizininizde aşağıda belirtilen 4 Python dosyası oluşturmanız gerekir:

  • db_credentials.py: Bu dosya, tüm Veritabanları ile bağlantı kurmak için kod içerir.
  • sql_queries.py: Bu dosya, verileri dize biçiminde çıkarmak ve yüklemek için yaygın olarak kullanılan Veritabanı sorgularını içerir.
  • etl.py: Bu dosya, Veritabanına bağlanmak ve gerekli sorguları çalıştırmak için gerekli işlemleri içerir.
  • main.py: Bu, Python ETL Pipeline'ın akışını ve yürütülmesini düzenleyen birincil dosyadır.

A) db_credentials.py

Tüm Kaynak ve Hedef Veritabanı Bağlantı Dizeleri bu dosyaya dahil edilmelidir. Gerektiğinde hızlı bir şekilde tekrarlanabilmesi için ilgili veritabanına erişim için gerekli tüm bilgileri bir liste formatında içermelidir. Aşağıdaki, Veritabanı bağlantısını kurmak için örnek bir Python betiğidir:

 datawarehouse_name = 'your_dwh_name'
# sql-server (hedef db, veri ambarı)
datawarehouse_db_config = {
  'Güvenilir_Bağlantı': 'evet',
  'sürücü': '{SQL Sunucusu}',
  'sunucu': 'datawarehouse_sql_server',
  'veritabanı': '{}'.format(datawarehouse_name),
  'kullanıcı': 'your_db_uname',
  'parola': 'your_db_pword',
  'otomatik taahhüt': Doğru,
}
# kaynak db > sql sunucusu
sqlserver_db_config = [
  {
    'Güvenilir_Bağlantı': 'evet',
    'sürücü': '{SQL Sunucusu}',
    'sunucu': 'your_db_sql_server',
    'veritabanı': 'db_1st',
    'kullanıcı': 'your_db_uname',
    'parola': 'your_db_pword',
    'otomatik taahhüt': Doğru,
  }
]
# kaynak db > mysql
mysql_db_config = [
  {
    'user': 'your_1_user',
    'şifre': 'your_1_pword',
    'ana bilgisayar': 'db_connection_string_1',
    'veritabanı': 'db_1st',
  },
  {
    'user': 'your_2_user,
    'parola': 'your_2_password',
    'ana bilgisayar': 'db_connection_string_2',
    'veritabanı': 'db_2nd',
  },
]

B) sql_queries.py

Bu dosya, Kaynak Veritabanlarından veri çıkarmak ve Hedef Veritabanına yüklemek için sorgular içerir. Aşağıdaki komut dosyası bu görevi gerçekleştirmenize yardımcı olacaktır:

 # örnek sorgu, farklı veritabanı platformları için benzersiz olacak

sqlserver_extract = ('''
  SEÇ sqlserver_col_1, sqlserver_col_2, sqlserver_col_3
  sqlserver_1_table'DAN
'')
sqlserver_insert = ('''
  INSERT INTO table_demo (col_1, col_2, col_3)
  DEĞERLER (?, ?, ?)  
'')
mysql_extract = ('''
  mysql_col_1, mysql_col_2, mysql_col_3 SEÇ
  mysql_demo_table'DAN
'')
mysql_insert = ('''
  INSERT INTO table_demo (col_1, col_2, col_3)
  DEĞERLER (?, ?, ?)  
'')

# Sorgular dışa aktarılıyor
sınıf Sql_Query:
  def __init__(self, özü_sorgu, yük_sorgusu):
    self.extract_query = Extract_query
    self.load_query = load_query   
# Sql_Query sınıfı için örnekler oluşturun
sqlserver_query = SqlQuery(sqlserver_extract, sqlserver_insert)
mysql_query = SqlQuery(mysql_extract, mysql_insert)
# değerler arasında yineleme yapmak için bir liste oluşturma
mysql_queries = [mysql_query]
sqlserver_queries = [sqlserver_query]

C) etl.py

Bu dosya, ilgili Veritabanlarına erişmek ve gerekli sorguları yürütmek için gereken kodu içermelidir. Aşağıdaki komut dosyası bu görevi gerçekleştirmenize yardımcı olacaktır:

 # python tabanlı modüller
pyodbc'yi içe aktar
mysql.connector'ı içe aktar

def etl(sorgu, kaynak_cnx, hedef_cnx):
  # demo kaynak veritabanından veri ayıklayın
  source_cursor = source_cnx.cursor()
  source_cursor.execute(query.extract_query)
  veri = source_cursor.fetchall()
  kaynak_cursor.close()

  # verileri demo Data Warehouse db'ye yükleyin
  
eğer veri:
    target_cursor = target_cnx.cursor()
    target_cursor.execute("{} KULLAN".format(name_for_datawarehouse))
    target_cursor.executemany(query.load_query, veri)
    print('veriler demo Data Warehouse db'ye yüklendi')
    target_cursor.close()
  Başka:
    print('veriler boş')

def etl_process(sorgular, target_cnx, source_db_config, db_platform):

  # demo kaynak veritabanı bağlantısını yapılandırma
  if db_platform == 'mysql':
    source_cnx = mysql.connector.connect(**source_db_config)
  elif db_platform == 'sqlserver':
    source_cnx = pyodbc.connect(**source_db_config)
  Başka:
    dönüş 'Hata! tanınmayan kaynak veritabanı platformu'
  # sql sorguları arasında döngü
  sorgularda sorgu için:
    etl (sorgu, kaynak_cnx, hedef_cnx)    
  # kaynak db bağlantısını kapat
  kaynak_cnx.close()

D) ana.py

Bu dosya, veritabanına bağlanmak ve gerekli ETL Python işlemlerini yürütmek için verilen kimlik bilgileri aracılığıyla yinelenecek kodu içerir. Aşağıdaki komut dosyası bu görevi gerçekleştirmenize yardımcı olacaktır:

 # değişkenler
db_credentials'tan içe aktarma datawarehouse_db_config, sqlserver_db_config, mysql_db_config
sql_queries'den içe aktar sqlserver_queries, mysql_queries

# yöntem
etl import etl_process'den
tanım ana():
  print('etl veri sürecini başlatma')
	
  # SQL Server için bağlantı kurun, istenen hedef depolama
  target_cnx = pyodbc.connect(**datawarehouse_db_config)
	
  # kimlik bilgileri arasında döngü
  # Veritabanı > mysql
  mysql_db_config içindeki yapılandırma için: 
    denemek:
      print("db yükleniyor: " + config['veritabanı'])
      etl_process(mysql_queries, target_cnx, config, 'mysql')
    hata olarak İstisna dışında:
      print("etl for {} hatası var".format(config['database']))
      print('hata mesajı: {}'.format(hata))
      devam et
	
  # Veritabanı > sql-sunucu
  sqlserver_db_config içindeki yapılandırma için: 
    denemek:
      print("db yükleniyor: " + config['veritabanı'])
      etl_process(sqlserver_queries, target_cnx, config, 'sqlserver')
    hata olarak İstisna dışında:
      print("etl for {} hatası var".format(config['database']))
      print('hata mesajı: {}'.format(hata))
      devam et

  target_cnx.close()
eğer __name__ == "__main__":
  ana()

Çözüm

Harika iş! Python ETL Pipeline oluşturma konusunda başarılı bir şekilde temel bir anlayış kazandınız. Artık, kullanılan veritabanlarında değişiklikler yaparak ve buna göre sorgular yaparak özel Python ETL betiğinizi gereksinimlerinize göre uygulayabilirsiniz.

Sektörde yaygın olarak kullanılan Python ETL Araçlarını keşfetmek için En İyi Python ETL Araçları blogunu okuyun.

Günümüzde çoğu kuruluş Büyük Veri ile çalışıyor. Bu nedenle, bu tür veriler için sıfırdan bir ETL işlem hattı oluşturmak zaman alıcı ve zorlayıcı olabilir.

Ayrıca, işletmelerin bunu inşa etmek ve ardından yüksek veri hacmine ve şema dalgalanmalarına ayak uydurabileceklerini garanti etmek için önemli miktarda kaynak yatırımı yapmaları gerekecektir.

Bu nedenle, sıfırdan ETL komut dosyaları oluşturmak yerine Hevo gibi otomatik Veri Boru Hatlarından yararlanabilirsiniz.

Bu konuda herhangi bir fikriniz var mı? Aşağıdaki yorumlarda bize bildirin veya tartışmayı Twitter veya Facebook'a taşıyın.

Editörün Önerileri: