Az Apache Airflow egy elterjedt eszköz, amelyet az adatfeldolgozási munkafolyamatok automatizálására és kezelésére használnak. A CI/CD (Continuous Integration/Continuous Deployment) gyakorlatok integrálása az Airflow munkafolyamatokba lehetővé teszi a csapatok számára, hogy felgyorsítsák a fejlesztési ciklust, javítsák a megbízhatóságot és csökkentsék a piacra jutási időt. A megfelelő tesztelési stratégia kialakítása, a fejlesztési környezetek pontos meghatározása és az Airflow alapvető tesztelési elveinek betartása mind elengedhetetlenek ahhoz, hogy a rendszer stabilan működjön.

A tesztelési folyamatoknak minden esetben az adott teszt típusának és a vállalat operatív irányelveinek megfelelően kell zajlania. A legegyszerűbb tesztelési típusok, mint a smoke és unit tesztek, gyakran teljes mértékben az CI/CD rendszerben futtathatók, míg a funkcionális tesztek gyakran QA környezetben történnek, ahol az adatok szabadon módosíthatók az egyes tesztesetekhez. A teljesítményteszteket pedig olyan staging környezetben célszerű végezni, amely szinte teljesen megegyezik a produkciós környezettel.

A tesztelési folyamatok célja, hogy minimalizálják a hibák előfordulásának esélyét, miközben biztosítják, hogy a fejlesztett kód megfeleljen az elvárásoknak. Az alábbiakban a legfontosabb tesztelési típusok kerülnek ismertetésre:

Smoke tesztelés: A smoke tesztek célja, hogy biztosítsák, hogy a Python-kód működőképes. Az ilyen tesztek általában egyszerűek és arra összpontosítanak, hogy a csomag telepíthető legyen, és az alapvető funkciók működnek.

Unit tesztelés: Az unit tesztek az egyes kódegységek helyes működését ellenőrzik. Az ilyen tesztek nem igényelnek külső szolgáltatásokat, és a tesztelés során a kapcsolatok gyakran hamisítva (mocking) vannak, így nem kell valódi külső rendszerekhez kapcsolódni.

Funkcionális és integrációs tesztek: Ezek a tesztek valós szolgáltatásokkal és rendszerekkel történnek, hogy biztosítsák a kód megfelelő működését a tényleges környezetben. A tesztelés során a rendszerek, például adatbázisok, létrehozása és megsemmisítése, valamint szükség esetén a teszteléshez szükséges adatok előkészítése is szerepelhet.

Az Airflow rendszer tesztelését különös figyelemmel kell kezelni. A legfontosabb tanács ebben az esetben az, hogy alapvetően ne próbáljuk meg tesztelni a core Airflow rendszert, ha nem történt lényeges módosítás. Az Airflow kódjának alapvető tesztelését a projekt karbantartói végzik, és ha valamilyen probléma merül fel a rendszerben, célszerű inkább a közösséghez fordulni segítségért.

A rendszer hatékonyságának fenntartása érdekében elengedhetetlen, hogy a telepítés során kiemelt figyelmet fordítsunk a monitorozásra és az észlelésre. Az operatív csapatok számára a megfelelő eszközök és eljárások kiválasztása kulcsfontosságú. Mivel a monitorozás aktívan és passzívan is történhet, a rendszerek egészségi állapotának folyamatos ellenőrzésére szükség van. Az aktív monitorozás során egy folyamat folyamatosan ellenőrzi a szolgáltatás állapotát, míg a passzív monitorozás esetében az állapotváltozás hiánya is figyelmeztethet egy potenciális problémára.

A következő szempontokat mindenképpen érdemes figyelemmel kísérni a rendszer működésének biztosítása érdekében:

Airflow komponensek monitorozása: Az Airflow rendszer komponenseinek állapotát folyamatosan figyelemmel kell kísérni. Legalább egy „Működik-e?” típusú ellenőrzést végezni minden komponensnél, és ha egy komponens nem működik, azonnali értesítést kell küldeni a csapatnak. A legegyszerűbb módja annak, hogy ezt ellenőrizzük, a REST API-n keresztüli /health/ végpont lekérdezése, amely JSON formátumban ad vissza adatokat a komponensek egészségi állapotáról.

Scheduler monitorozása: Az Airflow scheduler szolgáltatásának működése alapvetően fontos, mivel ez kezeli a feladatok ütemezését. A scheduler szintén egy /health végpontot indít el, amelyet külső folyamatok folyamatosan ellenőrizhetnek. Azonban fontos megjegyezni, hogy a scheduler működése önmagában nem garantálja, hogy a feladatok megfelelően ütemeződnek. A magas értékek esetén érdemes ellenőrizni a konfigurációt és a rendelkezésre álló erőforrásokat.

Metadata adatbázis monitorozása: A metadata adatbázis tárolja az Airflow feladatvégrehajtások előzményeit és a rendszer szerepköreit. Mivel a rendszer szíve, az adatbázis esetleges meghibásodása súlyos következményekkel járhat, és komoly adatvesztést okozhat, elengedhetetlen a megfelelő felügyelet. Az adatbázis kapcsolatainak méretét és kihasználtságát folyamatosan figyelemmel kell kísérni, hogy az esetleges teljesítménybeli problémák időben észlelhetők legyenek.

Ahhoz, hogy a csapatok teljes mértékben kiaknázhassák az Airflow erejét, fontos, hogy figyelmet fordítsanak a tesztelési és monitorozási gyakorlatok folyamatos fejlesztésére. A jól megtervezett és implementált CI/CD gyakorlatok nemcsak a fejlesztési folyamatokat gyorsítják fel, hanem hosszú távon is stabil és megbízható rendszert eredményeznek, amely képes kezelni a komplex adatfeldolgozási munkafolyamatokat a legkülönbözőbb környezetekben.

Hogyan működnek a DAG-ok és a legfontosabb komponensek az Apache Airflow-ban?

A web szerverhez való navigáláshoz nyissa meg a böngészőt, és adja meg a következő URL-t: http://localhost:8080. Miután a web szerver elindult, a böngészőben be kell jelentkeznie, használva az admin felhasználónevet és a terminálban megadott jelszót. A bejelentkezés után a főoldal jelenik meg, amely hasonlóképpen néz ki a következőhöz: a képernyőn megjelenő DAG-ok, amelyek az Apache Airflow rendszerén belül az adatfolyamatok kezelésére szolgálnak.

Az Apache Airflow egy olyan nyílt forráskódú platform, amelyet az adatfolyamatok és munkafolyamatok automatizálására használnak. Az Airflow egy DAG-ot (Directed Acyclic Graph - irányított ciklikus gráf) használ az adatfeldolgozási folyamatok irányításához. Egy DAG különböző feladatok sorozataként van definiálva, amelyek párhuzamosan vagy sorban hajthatók végre.

A következő lépésekben bemutatjuk, hogyan használhatjuk az Airflow-t egy egyszerű példán keresztül, amely tartalmazza a legfontosabb alapfogalmakat, például a DAG-okat, operátorokat és szenzorokat. A bemutatott példában az example_dag_basic nevű alapértelmezett DAG-ot fogjuk elemezni, amely bemutatja az adatkinyerés, átalakítás és betöltés (ETL) alapvető funkcióit.

DAG-ok és feladataik

A DAG-ok (Directed Acyclic Graphs) az Apache Airflow alapvető építőkövei, és ezek segítségével lehet létrehozni adatfolyamatokat. Minden DAG különböző feladatokból (task) áll, amelyeket operátorok segítségével hajtanak végre. A feladatok az Airflow-ban a végrehajtandó logikai egységeket képviselik. Az operátorok határozzák meg a feladatok típusát és azok működését. A példánkban három alapvető feladat található, amelyek sorozatosan hajtódnak végre.

A DAG-ok fő célja, hogy az adatokat különböző lépésekben feldolgozzák, így lehetővé teszik az adatok áramlásának kezelését a rendszeren belül. Az egyszerű példákban a feladatok gyakran lineárisan követik egymást, de összetettebb DAG-okban a feladatok párhuzamosan is végrehajthatók, vagy várakozniuk kell bizonyos eseményekre.

Az Airflow DAG-jának definiálása

A DAG-ok egyszerűsített definíciója érdekében az Apache Airflow a Python dekorátorokat használ. Az Airflow 2.x verziójától kezdve a DAG definícióját már nem szükséges with DAG as formában készíteni, amit az előző verziók használtak. A következő kódrészletben bemutatjuk, hogyan hozhatjuk létre az alap DAG-ot:

python
@dag(
schedule="@daily", start_date=datetime(2023,1,1), catchup=False, default_args={ "retries": 2, }, tags=["example"], )

Ebben a kódban a schedule határozza meg a végrehajtás gyakoriságát. Az @daily érték például azt jelzi, hogy a DAG naponta egyszer fut le. A start_date az a kezdő időpont, amelytől a DAG futni kezd. A catchup opció meghatározza, hogy az Airflow próbálja-e újra végrehajtani azokat a feladatokat, amelyek a start_date előtt nem futottak le.

Az default_args paraméterrel a DAG alapértelmezett beállításait is definiálhatjuk, például a feladatok újrapróbálkozási számát (retries), ha azok hibásan futnak.

Operátorok és feladatok

A DAG-ok feladatai operátorokkal vannak definiálva. Az operátorok egy-egy feladat végrehajtásáért felelnek, és minden operátor egy speciális feladatot hajt végre, mint például Python függvények futtatása, SQL lekérdezések végrehajtása, vagy éppen adatátvitel egy másik rendszerbe.

Néhány gyakran használt operátor:

  • BashOperator: Bash parancs végrehajtása.

  • PythonOperator: Python függvény futtatása.

  • SqlOperator: SQL lekérdezés futtatása.

  • DockerOperator: Docker konténer futtatása.

  • HttpOperator: HTTP kérés küldése.

Az operátorok lehetővé teszik, hogy különböző típusú feladatokat végezzünk el, miközben az Airflow által biztosított egyszerűsített API segítségével az összes feladatot könnyedén kezelhetjük.

A DAG vizualizálása

Miután a DAG-ot definiáltuk és sikeresen lefuttattuk, az Airflow webes felületén vizualizálhatjuk a folyamatot. A DAG grafikus ábrázolása segít megérteni a feladatok közötti kapcsolatokat és azok végrehajtásának sorrendjét. Egy egyszerű DAG példában a feladatok sorban hajtódnak végre, de összetettebb DAG-ok esetén előfordulhat, hogy a feladatok párhuzamosan futnak, és az ábrázolás segít nyomon követni a pontos végrehajtási állapotot.

Dekorátorok használata a feladatok definiálásakor

Az Apache Airflow 2.x verziójától kezdve a dekorátorok használata jelentősen leegyszerűsíti a feladatok definiálását. A @task dekorátorral jelölhetjük meg, hogy egy adott függvény feladatot hajt végre az Airflow rendszerében. A következő példában az extract feladatot egy egyszerű Python függvénnyel definiáljuk:

python
@task()
def extract(): data_string = '{"1001": 301.27, "1002": 433.21, "1003":502.22}' order_data_dict = json.loads(data_string) return order_data_dict

Ez a feladat a JSON formátumban kapott adatot alakítja át Python szótárrá, és visszaadja az átalakított adatokat.

Mit fontos még tudni?

Az Apache Airflow és DAG-ok hatékony használatához elengedhetetlen a rendszer alapos ismerete és a megfelelő konfigurálás. Fontos megérteni, hogy bár az Airflow egy rendkívül rugalmas és skálázható platform, az összetett rendszerekben való alkalmazása megköveteli a jól tervezett architektúrát. A rendszer teljesítményének optimalizálásához és a hibaelhárításhoz megfelelő monitoring és logolás szükséges, különben a komplex adatfolyamatok hamar átláthatatlanná válhatnak. Az operátorok és feladatok megfelelő kiválasztása és konfigurálása kulcsfontosságú a sikeres futtatásban.