Az Airflow által vezérelt automatizált munkafolyamatok és rendszerek fejlesztése során egyre inkább szükség van arra, hogy testreszabott szolgáltatókat (providers) hozzunk létre, amelyek egyedi operátorokat és szenzorokat tartalmaznak. A testreszabott szolgáltatók lehetővé teszik, hogy bármilyen külső rendszert, például adatbázisokat, API-kat vagy szenzorokat integráljunk az Airflow környezetébe. Az alábbiakban bemutatjuk, hogyan lehet létrehozni egy egyszerű testreszabott szolgáltatót, amely a teáskanna vízszintjét figyeli, és ennek alapján különböző műveleteket hajt végre, például teát főz vagy kávét készít.

A szolgáltató egyik legfontosabb része a szenzor, amely a valós világ eseményeit figyeli és válaszokat ad arra, hogy mikor kell egy bizonyos műveletet végrehajtani. A következő kódrészletben egy WaterLevelSensor nevű szenzort hozunk létre, amely az adott teáskanna vízszintjét figyeli.

python
class WaterLevelSensor(BaseOperator): def __init__(self, tea_pot_conn_id, minimum_level, **kwargs): super().__init__(**kwargs) self.tea_pot_conn_id = tea_pot_conn_id self.minimum_level = minimum_level def execute(self, context): self.defer( trigger=WaterLevelTrigger( tea_pot_conn_id=self.tea_pot_conn_id, minimum_level=self.minimum_level ), method_name="execute_complete" ) def execute_complete(self, context, event=None): return event

A fenti kód egy egyszerű operátor osztályt hoz létre, amelynek két fontos része van: az __init__ és az execute metódusok. Az __init__ metódusban inicializáljuk a szenzort, beállítjuk a csatlakozási adatokat és a minimális vízszintet, míg az execute metódus a defer metódust hívja meg, amely elindítja a vízszintet figyelő eseményt.

Az esemény sikeres feldolgozása után a execute_complete metódus hívódik meg. Ez az a pont, ahol további üzleti logikát és ellenőrzéseket végezhetünk, amelyek szükségesek az operátor végrehajtásához.

A fejlesztés során mindig fontos, hogy teszteljük kódunk működését. A tesztelés során győződjünk meg róla, hogy a szenzor és az operátor megfelelően működnek, és a külső szolgáltatásokat megfelelően integráljuk. Az alábbiakban bemutatunk egy egyszerű tesztelési környezetet, amely segít abban, hogy a kódunk helyesen működjön.

Tesztelési környezet beállítása

A tesztelés egyik alapvető eleme, hogy egy tesztkörnyezetet hozzunk létre, amely lehetővé teszi számunkra, hogy valós adatokat és kapcsolatokat használjunk anélkül, hogy külső rendszerekhez kellene kapcsolódnunk. Ehhez a pytest keretrendszert használhatjuk, amely segít abban, hogy automatizáljuk a teszteket és a szolgáltatások szimulálását. Az alábbiakban bemutatjuk, hogyan hozhatunk létre egy tesztkörnyezetet:

python
import os
import pytest import shutil os.environ["AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS"] = "False" os.environ['AIRFLOW__CORE__UNIT_TEST_MODE'] = 'True' os.environ['AIRFLOW__CORE__LOAD_EXAMPLES'] = 'False' os.environ['AIRFLOW_HOME'] = os.path.join(os.path.dirname(__file__), 'airflow') @pytest.mark.filterwarnings("ignore:DeprecationWarning") @pytest.fixture(autouse=True, scope='session') def initdb(): """Létrehozza az adatbázist minden tesztelési munkamenethez és hozzáadja a kapcsolatokat.""" from airflow.models import Connection from airflow.utils import db db.initdb(load_connections=False) db.merge_conn( Connection(...) # Kapcsolati információk hozzáadása ) yield # Tisztítja az adatbázist a tesztelés után shutil.rmtree(os.environ["AIRFLOW_HOME"])

Ez a kód az Airflow tesztkörnyezetet konfigurálja, beleértve a környezeti változókat, az adatbázis inicializálását és a kapcsolatok betöltését. A tesztkörnyezetet minden egyes tesztelési munkamenet előtt létrehozzuk, majd a tesztek után töröljük azt, így biztosítva a tiszta munkakörnyezetet.

Működési példák

A tesztelés és fejlesztés mellett érdemes példákat adni arra, hogyan használhatják a fejlesztők a szolgáltatót valós környezetben. Egy egyszerű DAG (Directed Acyclic Graph) példát adunk, amely a vízszint mérésére épít, és ha a vízszint megfelelő, akkor teát és kávét készít párhuzamosan:

python
from datetime import datetime, timedelta from airflow import DAG from airflow_provider_tea_pot.operators import MakeTeaOperator, BrewCoffeeOperator from airflow_provider_tea_pot.sensors import WaterLevelSensor with DAG(...) as dag: t1 = WaterLevelSensor( task_id="check_water_level", tea_pot_conn_id="tea_pot_example", minimum_level=0.2 ) t2 = MakeTeaOperator( task_id="make_tea", tea_pot_conn_id="tea_pot_example" ) t3 = BrewCoffeeOperator( task_id="brew_coffee", tea_pot_conn_id="tea_pot_example" ) t1 >> [t2, t3]

Ez a DAG minden nap ellenőrzi a vízszintet a teáskannában, és ha az eléri a minimális szintet, két párhuzamos műveletet hajt végre: elkészíti a teát és a kávét. A kód tartalmazza a szükséges operátorokat és a szenzort, amely meghatározza a vízszintet, valamint a teáskanna csatlakozási adatokat.

Docker-környezet

A fejlesztés és tesztelés egyszerűsítése érdekében érdemes Docker környezetet használni. A Dockerfile és a docker-compose.yaml fájl segítségével létrehozhatunk egy olyan környezetet, amely lehetővé teszi a szolgáltatók és DAG-ok egyszerű futtatását. Az alábbiakban egy egyszerű Dockerfile példát mutatunk be, amely az Airflow alapú képet használja:

Dockerfile
ARG IMAGE_NAME="apache/airflow:2.5.0"
FROM ${IMAGE_NAME} USER airflow COPY --chown=airflow:airflow . ${AIRFLOW_HOME}/airflow-provider-tea-pot COPY --chown=airflow:root example_dags/ /opt/airflow/dags RUN pip install --no-cache-dir --user ${AIRFLOW_HOME}/airflow-provider-tea-pot/.

Ez a Dockerfile az alap Airflow képet használja, majd hozzáadja a szükséges kódokat és DAG-okat a konténerhez, és telepíti a testreszabott szolgáltatót.

Fontos szempontok

A testreszabott szolgáltató fejlesztése során fontos, hogy figyelmet fordítsunk a kód modularitására és a tiszta kódra. A jól megtervezett és tesztelt szolgáltatók könnyen karbantarthatók és bővíthetők. A tesztelési környezet megfelelő konfigurálása lehetővé teszi, hogy a fejlesztők valódi környezethez hasonló körülmények között validálják a kód működését, miközben minimalizálják a külső rendszerekhez való kapcsolódás szükségességét. Az ilyen fejlesztési gyakorlatok segítenek abban, hogy az Airflow-alapú automatizált munkafolyamatok megbízhatóbbak és hatékonyabbak legyenek.

Miért fontos a szolgáltatók kiválasztása az Airflow telepítésekor?

A legjobb döntést kell hoznunk, ha saját magunk kívánunk Airflow-t telepíteni és üzemeltetni, vagy szolgáltatót választunk. A szolgáltatók választása kulcsfontosságú tényező lehet költségek, stabilitás és operatív terhek szempontjából. Különösen akkor, ha még nem rendelkezünk jelentős előzetes beruházással a számítástechnikai infrastruktúrában, a szolgáltatók gyakran olcsóbbak és stabilabbak lehetnek, mint a saját fizikai infrastruktúra kiépítése.

A szolgáltatók lehetőséget adnak arra, hogy csökkentsük a tulajdonlás és a támogatás terheit. Az Airflow használata során ők felelősek az összes mögöttes infrastruktúra biztosításáért és működtetéséért, így biztosítva minket az infrastruktúra és a szoftverek működtetési terhei alól. Azok számára, akik nem biztosak abban, hogy képesek saját infrastruktúrájukat üzemeltetni, alapvető, hogy alaposan megvizsgálják a rendelkezésre álló szolgáltatókat. Miközben ezt teszik, fontos, hogy szem előtt tartsák az alábbiakat: a szolgáltatók gyakran saját preferenciáik szerint alakítják ki az Airflow használatát, ami bizonyos fejlesztési technikákra és telepítési mintákra kényszeríthet. A szolgáltatók döntései hatással lehetnek a csapat munkájára, mivel nehezíthetik (vagy akár lehetetlenné is tehetik) új Airflow funkciók bevezetését, bizonyos Python csomagok frissítését, vagy közösségi bővítmények használatát. Az ajánlott legjobb gyakorlatok és a szolgáltató által biztosított eszközök alapos megismerése elengedhetetlen a sikeres integráció érdekében.

Miután eldöntöttük, hogy hogyan kívánjuk lokalizálni fejlesztési környezetünket, érdemes a tesztelési gyakorlatokat is alaposan átgondolni. A tesztelés az Airflow egyik legvitatottabb témája, mivel sokan dogmatikusan közelítik meg, miközben a legfontosabb szempont az, hogy a tesztelés bizalmat építsen abban, hogy a telepítésünk kiszámítható és stabil módon fog működni.

A fejlesztés során, legyen szó egy lokális környezetről vagy felhőalapú fejlesztési platformról, az Airflow telepítése előtt célszerű tesztelni a rendszert és az egyes DAG-eket. A különböző fejlesztési környezetek – mint a Python virtuális környezetek vagy a Docker Compose – különböző előnyöket és hátrányokat kínálnak. A virtuális környezetek gyors, könnyű beállítást tesznek lehetővé, de esetenként ütközhetnek más rendszerek és szolgáltatások igényeivel. Ezzel szemben a Docker Compose egy komplexebb, de nagyobb rugalmasságot biztosító megoldás, amely jobban tükrözi a végleges telepítési környezetet. A felhőalapú fejlesztési környezetek szintén magas szintű rugalmasságot kínálnak, de az ilyen megoldások beállítása általában több időt és erőforrást igényel.

A tesztelés során nem csak a rendszerszintű működést érdemes ellenőrizni, hanem a DAG-ek szintjén is érdemes különböző típusú teszteket végezni. A legáltalánosabb tesztelési gyakorlatok közé tartozik a füstteszt, amely egyszerűen ellenőrzi, hogy a DAG-ok jól definiáltak-e és érvényes Python kódot tartalmaznak-e. Emellett a unit tesztek azokat a speciális metódusokat és függvényeket tesztelik, amelyeket kifejezetten a DAG-okhoz írtak. A funkcionális tesztek a DAG-ok specifikus kontextusában mérik a kívánt eredményeket, míg a teljesítménytesztek azt vizsgálják, hogy a rendszer megfelel-e a teljesítményi követelményeknek, mint például a futási idő, memóriahasználat vagy CPU terhelés.

A tesztelés nem csak egyszeri folyamat: ahogy a rendszer éretté válik és tapasztalatokat szerzünk, úgy a tesztelési eljárások is fejlődnek. Amennyiben hibák lépnek fel, a tesztelési folyamatot ki kell egészíteni olyan új tesztekkel, amelyek biztosítják, hogy a probléma a jövőben ne ismétlődhessen meg.

Fontos, hogy mindig legyen legalább egy alacsonyabb szintű tesztkörnyezet, amely könnyen törölhető, újratelepíthető és konfigurálható. Ez lehetővé teszi, hogy a rendszer működését validáljuk, mielőtt az éles munkaterhelés alá kerülne. A tesztelés mellett a környezetek, ahol a teszteket futtatjuk, ugyanolyan fontosak. A legjobb gyakorlatok szerint a DAG-ok tesztelését az Airflow rendszer teljes tesztelése előtt kell elvégezni, hogy elkerüljük az esetleges hibákat a végleges telepítés előtt.

Hogyan építsünk egyszerű adatfeldolgozó rendszert Apache Airflow segítségével?

Az Apache Airflow egy népszerű eszköz, amely lehetővé teszi a munkafolyamatok és adatfeldolgozási folyamatok automatizálását és orkestrálását. Az ETL (Extract, Transform, Load) folyamatok megértése alapvető, mivel ezek a munkafolyamatok az adatkezelés alapjai, és elengedhetetlenek az adatmodellezésben. Az alábbiakban bemutatott példa a három lépésből álló folyamatot (adatkinyerés, adattranszformáció, adatbetöltés) valósítja meg az Airflow segítségével.

Az adatkinyerés (Extract) során először egy egyszerű data_string változót hozunk létre, amelyet betöltünk egy adat szótárba. Ez a szótár tartalmazza az adatokat, amelyeket a további lépésekben felhasználunk. A transform funkció az adatokat átalakítja, hogy kiszámoljuk a teljes rendelési értéket. Ehhez a transform funkció egy ciklust használ, hogy összeadja az értékeket, majd visszaadja a végső eredményt, amelyet a load funkció használ fel.

A load funkció a végső lépés, amely az adatokat betölti a kívánt célrendszerbe. Az egyszerű példában ezt a lépést csak egy print utasítással jelenítjük meg, de a valós életben gyakran adatbázisokba vagy adatwarehousokba töltjük be az adatokat.

A feladatok végrehajtása között kapcsolatokat kell kialakítani, és ezek a kapcsolatok határozzák meg a feladatok sorrendjét. Az Airflow lehetőséget ad arra, hogy explicit módon meghatározzuk a feladatok egymás utáni végrehajtását. Ezt a >> szimbólummal tehetjük meg, amely biztosítja, hogy az egyik feladat a másik előtt fusson le.

A XCom (Cross-Communication) egy kulcsfontosságú funkció az Airflow-ban, amely lehetővé teszi az adatok megosztását a feladatok között. Az XCom-ok segítségével egy feladat adatokat küldhet más feladatoknak, amelyek később felhasználhatják ezeket. Az XComok ideálisak kisebb adatcsomagok átvitelére, és segítenek a feladatok közötti kommunikációban. Azonban fontos megjegyezni, hogy nagy adatállományokat nem érdemes XCom-okkal kezelni, mivel az Airflow nem ideális nagy mennyiségű adat mozgatására.

A feladatok végrehajtásának sorrendje meghatározható a kódban szereplő változók segítségével, de az Airflow UI-ban is elérhetők azok a lehetőségek, amelyekkel manuálisan indíthatjuk el a munkafolyamatokat. Az Airflow grafikus felületén könnyedén nyomon követhetjük a feladatok állapotát és az adatokat, amit a rendszer feldolgoz.

A bonyolultabb munkafolyamatok esetén az Airflow 2.0-ban bevezetett feladatcsoportok (task groups) segítenek a DAG-ok (Directed Acyclic Graph) vizuális egyszerűsítésében és logikai felosztásában. A feladatcsoportok használata különösen hasznos, amikor több adatforrást dolgozunk fel, és szeretnénk egyértelműen elválasztani a különböző lépéseket.

Egy feladatcsoport például az @task_group dekorátorral hozható létre, amely lehetővé teszi a feladatok hierarchikus csoportosítását. A feladatcsoportok lehetővé teszik a DAG-ok vizuális egyszerűsítését, ami különösen fontos, ha a munkafolyamatok bonyolulttá válnak. A legbonyolultabb rendszerekben akár egymásba ágyazott feladatcsoportokat is használhatunk a még részletesebb struktúrák kialakításához.

A triggerek fontos szerepet játszanak az Airflow-ban, mivel meghatározzák, hogy egy feladat mikor fusson le. A triggerek lehetnek időalapúak (pl. naponta vagy óránként), külső események, vagy más feladatok befejezése. A triggerek megértése és helyes beállítása kulcsfontosságú az automatizált munkafolyamatokban, mivel biztosítják, hogy a feladatok a megfelelő időpontban fussanak le, összhangban a rendszer igényeivel.

A fent bemutatott példák alapján láthatjuk, hogy az Airflow segítségével egy egyszerű ETL folyamat is könnyedén megvalósítható. A rendszer modularitása és rugalmassága lehetővé teszi, hogy bármilyen komplexitású adatfeldolgozó munkafolyamatot kialakítsunk, és biztosítja, hogy az adatok megfelelő módon áramoljanak a különböző rendszerek között. Az Apache Airflow tehát ideális eszköz a modern adatkezelési és feldolgozási folyamatok automatizálására.