Automatizace kódu pomocí DAG je výkonným nástrojem pro efektivní správu úloh v rámci datových pracovních toků. V tomto případě se zaměříme na vytvoření DAG v Apache Airflow, který umožní automatické spuštění kódu každý den. Tento proces zahrnuje několik důležitých kroků, které je třeba splnit, aby byl váš kód správně integrován do Airflow prostředí. V rámci tohoto procesu se seznámíme s některými nástroji, které jsme dosud nezmiňovali, jako jsou operátory a další techniky pro vytváření DAG.
Začneme importováním potřebných knihoven v Pythonu a Airflow, které nám umožní definovat DAG a nastavit základní parametry pro jeho použití. Stejně jako v předchozích příkladech budeme používat některé knihovny, které nám umožní pracovat s daty a komunikovat s API, například pro získání obrázku z NASA Planetary API.
Následně si definujeme Python funkci pro získání nejnovějšího obrázku z NASA Planetary API. Funkce volá API s naším API klíčem a ukládá obrázek na disk, přičemž název souboru obsahuje aktuální datum.
Dalším krokem je nastavení výchozích argumentů pro DAG. Tato nastavení určují, jaké parametry bude Airflow používat při volání každého úkolu v DAG. Definujeme parametry jako majitele DAG, zda úkoly závisí na minulosti, počet pokusů o opakování a zpoždění mezi pokusy.
Dále inicializujeme DAG objekt a definujeme jeho parametry. Použití klíčového slova with nám umožňuje definovat kontext DAG a zamezit opětovnému inicializování objektu při každém analýze DAG kódu. Takto definujeme DAG, který se spustí každý den:
Následně vytvoříme úkoly, které budou součástí tohoto DAG. Prvním úkolem je stažení obrázku, což realizujeme pomocí PythonOperator, který zavolá naši funkci _get_pictures. Druhý úkol je notifikační úkol, který nám po stažení obrázku pošle zprávu pomocí BashOperator.
Nakonec definujeme pořadí vykonávání úkolů v DAG. Určíme, že úkol get_pictures se má vykonat jako první, a úkol notify se vykoná po něm.
Když se podíváme na celý DAG, může to vypadat na první pohled složitěji než kód v Jupyter Notebooku, který jsme používali dříve. Většina kódu však bude opakujícími se částmi, které pomáhají Apache Airflow pochopit, jak má workflow fungovat. Jako odborníci na data a vývojáři si musíme být vědomi toho, že existuje spousta různých možností pro nastavení a správu úloh v Airflow. Airflow umožňuje nejen rozdělit úkoly na menší části, ale také je paralelně vykonávat, což značně zvyšuje efektivitu.
Instanciování DAG objektu je základem každého pracovního toku, a je tedy klíčové správně nastavit parametry DAGu tak, aby odpovídaly požadavkům konkrétního úkolu. Například výchozí argumenty DAG jako dag_owner, depends_on_past, retries, a retry_delay mohou mít velký vliv na to, jak Airflow provádí úkoly v případě chyb nebo opakovaných pokusů.
Doporučení pro popis a komentáře v kódu: Každý DAG by měl mít svůj popis. Tímto způsobem nejen že vy sami budete mít jasnost o jeho účelu, ale i vaši kolegové a další členové týmu budou schopni snadno pochopit, co daný DAG provádí. Komentáře a popisy jsou dobrým zvykem, který zjednodušuje spolupráci na projektu a údržbu kódu.
Airflow také umožňuje definovat interval, jak často se má DAG spouštět. Tento interval může být nastaven na různé hodnoty, jako je @daily, @weekly nebo i na základě cron výrazu. Pro tento konkrétní případ používáme denní interval, což je ideální pro náš scénář. Ale v jiných případech může být výhodné spouštět DAG pouze v pracovních dnech nebo pouze o víkendech.
Nastavení catchup je také důležité. Pokud je catchup nastaveno na True, Airflow se pokusí spustit DAG pro všechny časové intervaly, které byly vynechány od doby, kdy začal běh DAGu. V našem případě není potřeba, aby Airflow znovu spouštěl úkoly pro minulost, proto je nastaveno na False.
Použití tagů pro DAGy je novinkou v Airflow a pomáhá s organizováním a kategorizováním DAGů podle týmů nebo specifických funkcí. To může výrazně usnadnit práci, zejména při správě velkých projektů s mnoha DAGy.
Definování výchozích argumentů pro DAG je nezbytné pro správné fungování pracovního toku. dag_owner označuje, kdo je zodpovědný za správu DAGu, což je důležité pro správu a aktualizaci úkolů v rámci týmu. Tento parametr by měl být vždy aktuální a v souladu s tím, kdo má daný DAG na starosti.
Jak vytvořit vlastní plugin pro metrics dashboard v Apache Airflow?
Pro vytvoření vlastního pluginu pro metrics dashboard v Apache Airflow je nutné projít několika klíčovými kroky, které vám umožní zobrazit důležité metriky a vizualizace týkající se úspěšných a neúspěšných běhů DAGs (Directed Acyclic Graphs). Tento proces je zaměřen na využití Airflow pluginů a umožňuje přizpůsobení webového rozhraní pro specifické potřeby sledování výkonu vašich pracovních postupů.
Prvním krokem je integrace knihovny Chart.js, kterou načítáme z open-source CDN. Tato knihovna nám umožňuje vykreslovat grafy do elementů canvas na webové stránce. Na backendu definujeme proměnnou pro data ve formátu JSON, která obsahuje informace o statistice běhů DAGs. Posledním krokem je volání metody new Chart(), která jako první argument přijímá elementy canvas a jako druhý konfiguraci grafu.
Po nastavení potřebných prvků na straně frontendové aplikace přichází krok implementace pluginu do samotného Airflow. Tento krok zahrnuje registraci Flask Blueprint a třídy MetricsDashboardView v Airflow. K tomu je potřeba upravit soubor metrics_plugin/__init__.py, kde je uveden základní kód pro registraci pluginu.
V tomto kódu definujeme Flask Blueprint pro plugin, který slouží jako rozhraní pro zobrazení dashboardu, a třídu MetricsPlugin, která se odvozuje od základní třídy AirflowPlugin. Tato třída zajišťuje integraci pluginu do prostředí Airflow a připojuje potřebné webové stránky do menu Airflow pod kategorií "Metrics" a položkou "Dashboard".
Pro úspěšnou implementaci je nutné zkopírovat složku metrics_plugin do adresáře pluginů v Airflow. Po restartování webového serveru by měl být plugin funkční, a vy tak uvidíte v menu Airflow položku pro "Metrics" a "Dashboard", která vás přesměruje na vaši metriku.
Po implementaci tohoto pluginu se zobrazí grafy, které ukazují statistiky úspěšných a neúspěšných běhů DAGs. Například, graf úspěšných běhů zobrazuje počet běhů, které byly úspěšně dokončeny za poslední den, týden a měsíc. Další graf ukazuje neúspěšné běhy za stejné časové období.
Nyní máme kompletní metrics dashboard. Tento dashboard poskytuje užitečné vizualizace výkonu vašeho prostředí Airflow a umožňuje rychlou identifikaci problémových oblastí, jako jsou neúspěšné DAG běhy. Představte si, že každá z těchto vizualizací může být dále přizpůsobena dle potřeb konkrétního pracovního postupu. Možnosti pro experimentování s různými metrikami a vizualizacemi jsou rozsáhlé a lze je snadno implementovat přidáním nových grafů a statistik.
Při vytváření pluginu je důležité mít na paměti nejen implementaci základního grafu, ale také to, jak budete zobrazovat data z různých časových rámců (například za poslední den, týden nebo měsíc), a jak budete tento dashboard přizpůsobovat pro různé potřeby vašich týmů nebo organizace. Vytvoření vlastního metrics dashboardu v Airflow vám umožní přizpůsobit způsob monitorování výkonu a operací vašeho systému podle specifických požadavků.
Co byste měli vědět při práci s těmito dashboardy
Je nezbytné mít na paměti, že správná implementace pluginu závisí na tom, jak efektivně dokážete pracovat s databází, která poskytuje potřebná data pro vaše grafy. V tomto případě, kdy se zobrazují informace o bězích DAGs, budete potřebovat efektivní způsob dotazování na tuto databázi, aby byly grafy vždy aktuální. Zároveň si dejte pozor na optimalizaci výkonu, zejména pokud máte velké množství dat, protože vizualizace může zpomalit vaše webové rozhraní, pokud není správně nakonfigurováno.
Pluginy jsou mocným nástrojem pro přizpůsobení Airflow podle vašich potřeb, ale mohou také vést k složitosti, pokud nejsou dobře spravovány. Ačkoli základní plugin pro metrics dashboard je relativně jednoduchý, jakýkoli budoucí vývoj a úpravy by měly být pečlivě promyšleny, aby se zajistila kompatibilita s novými verzemi Airflow a správná integrace s ostatními moduly.

Deutsch
Francais
Nederlands
Svenska
Norsk
Dansk
Suomi
Espanol
Italiano
Portugues
Magyar
Polski
Cestina
Русский