V Airflow je běžnou praxí pravidelně kontrolovat vlastnictví DAG (Directed Acyclic Graph), zejména pokud dochází k častým změnám v týmu. Je doporučeno každých několik měsíců přehodnotit odpovědnost za různé DAGy a úkoly, včetně vyřešení problémů s jejich selháním. Správná správa DAG a úkolů je nezbytná pro udržení efektivity a spolehlivosti automatizačních procesů.

Nastavení depends_on_past je důležitým faktorem, který, pokud je nastaven na hodnotu True, zabrání spuštění úkolu, pokud neuspěl jeho předchozí naplánovaný běh. V našem případě, pokud se DAG nezdaří v jeden den, ale nemá mít vliv na následující den, nastavíme depends_on_past na False. Tato volba je užitečná zejména pro API, která nejsou zcela spolehlivá a mohou selhávat, což zajistí, že DAG bude pokračovat i přes dočasné problémy s API.

Dalšími důležitými parametry jsou retries a retry_delay. Nastavení retries určuje, kolikrát se úkol pokusí znovu, pokud selže. V tomto případě je nastaveno na 2, což znamená, že Airflow se pokusí vykonat úkol dvakrát, než se vzdá. V základním nastavení Airflow je počet pokusů stanoven na 0, což znamená žádné opakování. Důvodem, proč je dobré nastavit tento parametr, je především nestabilita API nebo konektivita, což může vést k dočasným chybám. Parametr retry_delay, nastavený na 5 minut, udává časovou prodlevu mezi jednotlivými pokusy o opětovné vykonání úkolu. Tato prodleva pomáhá v případě, že je API přetíženo a potřebuje čas na zpracování předchozích požadavků.

Přistupme nyní k definici prvního úkolu, který nazýváme get_pictures. Tento úkol splňuje téměř všechny kroky, které byly identifikovány v Python kódu Jupyter Notebooku. Existuje více způsobů, jak rozdělit kód na jednotlivé úkoly, ale pro zjednodušení jsme se rozhodli sloučit všechny kroky do jednoho úkolu. Tento úkol zahrnuje připojení k API, ověření spojení, uložení metadat, uložení obrázku a odeslání notifikace.

Ve funkci get_pictures používáme operátor PythonOperator, který nám umožňuje vykonat Python funkci. Tento operátor je klíčovým stavebním kamenem Airflow, protože zajišťuje spouštění různých úkolů v rámci workflow. Operátory v Airflow jsou opakovaně použitelné a přizpůsobitelné, což umožňuje snadno vytvářet složité pracovní toky bez nutnosti psát mnoho vlastního kódu. Například operátor PythonOperator umožňuje spustit libovolnou Python funkci, což je ideální pro naše potřeby.

Pokud se podíváme na kód funkce _get_pictures(), zjistíme, že začínáme tím, že se ujistíme, že složka pro uložení obrázků existuje. Používáme metodu mkdir z modulu pathlib, která vytvoří složku, pokud ještě neexistuje. Poté je nastaven klíč API, který je potřeba pro přístup k NASA API. URL pro přístup k obrázku dne je sestavena pomocí tohoto klíče. Funkce requests.get() se používá pro odeslání HTTP požadavku na API a načtení odpovědi ve formátu JSON. Pomocí JSON odpovědi získáme URL pro obrázek dne, který je následně stažen a uložen do místní složky.

Jakmile je obrázek úspěšně stažen a uložen, přichází čas na druhý úkol – notifikaci uživatelů o úspěšném dokončení úkolu. K tomu využíváme operátor BashOperator, který umožňuje spouštění příkazů v shellu. V tomto případě příkaz echo vypíše zprávu, že obrázek byl přidán. Tato zpráva bude zobrazená v logu Airflow, což je užitečné pro sledování průběhu vykonávání úkolu.

Celý proces vykonávání úkolů v Airflow se tedy sestává z několika operátorů, které se skládají do DAGu, což je základní nástroj pro definici pracovních toků v Airflow. Kromě PythonOperator a BashOperator existují i další operátory, které pokrývají širokou škálu úkolů, od práce s databázemi přes zasílání e-mailů až po interakci s externími API.

Je důležité si uvědomit, že Airflow umožňuje velkou flexibilitu při práci s těmito operátory. Například operátor SimpleHttpOperator může být využit pro integraci s externími API, SnowflakeOperator pro práci se Snowflake databázemi, nebo KubernetesPodOperator pro běh úkolů v Kubernetes podů, což přináší výhody v oblasti škálovatelnosti a správy zdrojů.

Při vytváření DAGu je důležité mít na paměti nejen výběr správných operátorů, ale i správnou strukturu workflow. Pokud je to možné, je vhodné co nejvíce rozdělovat jednotlivé úkoly na menší a přehlednější kroky. To usnadní ladění a údržbu systému, protože jednotlivé úkoly budou mít jasně definované odpovědnosti. Každý úkol, který je součástí DAGu, by měl vykonávat pouze jednu konkrétní akci. Pokud máte složitější kód, doporučuje se ho rozdělit na více menších úkolů, které mohou být nezávisle vykonávány, což zvyšuje flexibilitu a usnadňuje diagnostiku případných problémů.

Jak vytvořit plugin pro metriky v Airflow: Implementace backendu a frontendového dashboardu

Pro vytvoření pluginu pro metriky v Apache Airflow je potřeba definovat několik klíčových komponent. Začneme vytvořením adresáře pro náš plugin, jeho registrací a nastavením Flask Blueprint ve souboru __init__.py. Tento plugin bude obsahovat šablony pro HTML kód dashboardu a implementaci backendového kódu v souboru views/dashboard.py, který se postará o zobrazování metrik v uživatelském rozhraní.

Implementace pohledu (view)

Začneme implementací backendového pohledu v souboru views/dashboard.py. Flask aplikace spustí tento kód vždy, když je navštívena registrovaná cesta v uživatelském rozhraní webového serveru. To nám umožní provádět dotazy na databázi Airflow a získat metriky, které budou zobrazeny na dashboardu.

  1. Nejprve je nutné nastavit potřebné importy pro náš pohled na dashboard:

python
from __future__ import annotations from typing import TYPE_CHECKING from airflow.auth.managers.models.resource_details import AccessView from airflow.utils.session import NEW_SESSION, provide_session from airflow.www.auth import has_access_view from flask_appbuilder import BaseView, expose from sqlalchemy import text if TYPE_CHECKING: from sqlalchemy.orm import Session

Dekorátor has_access_view zajišťuje, že uživatel má oprávnění k přístupu na tuto stránku, čímž se zabrání jejímu zobrazení neautentikovaným uživatelům. Dekorátor provide_session poskytne funkci pro práci s databázovou relací, která bude použita pro dotazy. Třída BaseView je základem pro naši vlastní třídu pohledu MetricsDashboardView.

  1. Následně vytvoříme třídu MetricsDashboardView, která definuje cestu pro náš webový pohled:

python
class MetricsDashboardView(BaseView): """Flask-AppBuilder View pro metriky dashboardu""" default_view = "index" route_base = "/metrics_dashboard"

V této třídě definujeme základní nastavení pro náš dashboard. default_view určuje, jaká funkce se použije pro základní cestu, v tomto případě funkci index. Pro přívětivost URL je route_base nastaven na /metrics_dashboard.

  1. Funkce index provede dotazy na databázi Airflow, aby získala požadované metriky a vrátila je do šablony pro zobrazení:

python
@provide_session
@expose("/") @has_access_view(AccessView.PLUGINS) def index(self, session: Session = NEW_SESSION): """Vytvoření pohledu na dashboard""" def interval(n: int): return f"now() - interval '{n} days'" dag_run_query = text(f""" SELECT dr.dag_id, SUM(CASE WHEN dr.state = 'success' AND dr.start_date > {interval(1)} THEN 1 ELSE 0 END) AS "1_day_success", SUM(CASE WHEN dr.state = 'failed' AND dr.start_date > {interval(1)} THEN 1 ELSE 0 END) AS "1_day_failed", SUM(CASE WHEN dr.state = 'success' AND dr.start_date > {interval(7)} THEN 1 ELSE 0 END) AS "7_days_success", SUM(CASE WHEN dr.state = 'failed' AND dr.start_date > {interval(7)} THEN 1 ELSE 0 END) AS "7_days_failed", SUM(CASE WHEN dr.state = 'success' AND dr.start_date > {interval(30)} THEN 1 ELSE 0 END) AS "30_days_success", SUM(CASE WHEN dr.state = 'failed' AND dr.start_date > {interval(30)} THEN 1 ELSE 0 END) AS "30_days_failed" FROM dag_run AS dr JOIN dag AS d ON dr.dag_id = d.dag_id WHERE d.is_paused != true GROUP BY dr.dag_id """) dag_run_stats = [dict(result) for result in session.execute(dag_run_query)] return self.render_template( "dashboard.html", title="Metrics Dashboard", dag_run_stats=dag_run_stats, )

V této funkci spustíme SQL dotaz na tabulky dag_run a dag, který nám vrátí statistiky o bězích DAG (Directed Acyclic Graph) za poslední den, týden a měsíc. Výsledky dotazu se uloží do proměnné dag_run_stats, která bude předána do šablony pro zobrazení.

HTML šablona pro dashboard

V dalším kroku vytvoříme HTML šablonu pro náš dashboard ve složce templates/dashboard.html. Airflow používá Bootstrap CSS pro responzivní design a Jinja templating pro dynamické vykreslování webového obsahu.

Na začátku šablony přidáme základní strukturu:

html
{% extends base_template %}
{% block title %} {{ title }} {% endblock %} {% block head_meta %} {{ super() }} {% endblock %}

V bloku title definujeme název stránky, který bude zobrazen v záhlaví. Blok head_meta slouží pro přidání dalších metadat a propojení se základním šablonovým souborem.

Dále přidáme blok content, který bude obsahovat vizualizace metrik:

html
{% block content %} <h1>{{ title }}</h1> <div class="row"> <div class="col-md-6">
<canvas id="successChart"></canvas>
</div> <div class="col-md-6"> <canvas id="failedChart"></canvas> </div> </div> {% endblock %}

Tato část šablony obsahuje dvě plátna (canvas), na kterých budou zobrazeny grafy pro úspěšné a neúspěšné běhy DAG.

Na závěr přidáme blok tail, který se postará o načtení JavaScriptu pro vykreslení grafů:

html
{% block tail %}
{{ super() }} {% endblock %}

Co je potřeba mít na paměti

Při vytváření pluginu pro metriky v Airflow je kladeno důraz na správné zabezpečení přístupu k dashboardu. Uživatelé by měli mít příslušná oprávnění pro zobrazení metrik. K tomu slouží dekorátor has_access_view, který zajistí, že přístup bude pouze pro autorizované osoby.

Další důležitý aspekt je výkonnost při práci s velkými objemy dat. Airflow může být nasazeno na velkých instalacích, kde běží stovky nebo tisíce DAGů, a vyžaduje efektivní dotazy, které nezpomalí výkon webového rozhraní. Při definování dotazů je dobré se zaměřit na jejich optimalizaci, například použitím indexů v databázi.

Celkově je důležité udržet dashboard přehledný a přístupný pro uživatele s různými úrovněmi zkušeností s Airflow. Grafy by měly být jednoduché na interpretaci a měly by zobrazovat klíčové metriky týkající se běhu DAGů.

Jak efektivně nasadit a spravovat DAGy v Apache Airflow

Vytváření a správné nasazení pracovních toků (DAGs) v Apache Airflow představuje klíčový aspekt pro efektivní automatizaci a orchestraci procesů. Jakmile si osvojíte základní principy práce s Airflow, následující kroky se zaměřují na optimalizaci nasazení a správu DAGů v reálných prostředích. Tato část se podrobněji věnuje metodám nasazení DAGů a jejich správě s důrazem na optimalizaci pracovních toků pro produktivní nasazení.

Začněme s tím, jak lze efektivně automatizovat proces generování a nasazování DAGů. Příklad kódu, který generuje DAGy a následně čistí ty, které již byly úspěšně dokončeny, ilustruje jednoduchý, ale výkonný způsob správy těchto pracovníků v Airflow. Tento přístup umožňuje i méně technickým uživatelům snadno vytvářet a vykonávat pracovní toky bez hlubokého porozumění Airflow nebo Pythonu. Tento princip je důležitý pro firmy, které chtějí zpřístupnit sílu Airflow širší skupině uživatelů.

Ve své podstatě jde o vzorec abstrakce, který poskytuje stabilitu Airflow bez nutnosti, aby uživatelé detailně rozuměli samotné implementaci systému. Je však důležité mít na paměti, že takové implementace jsou většinou pouze ukázkové a neměly by být považovány za plně připravené pro produkční nasazení. Doporučuje se, abyste experimentovali s tímto příkladem a vyzkoušeli různé metody pro vytváření komplexnějších pracovních toků, přidávání funkcionality, notifikací pro QA inženýry a dalších Airflow operátorů.

Dalším důležitým krokem je porozumění metodám nasazení DAGů v produkčním prostředí. Tyto metody zahrnují různé techniky jako bundlování, push a pull. Tyto vzory se od sebe liší v tom, jakým způsobem se DAGy dostanou do prostředí Airflow. Každý z těchto přístupů má své výhody a nevýhody, a výběr správného způsobu závisí na konkrétních potřebách nasazení.

Například bundlování DAGů s celkovým nasazením Airflow může být ideální pro malé systémy nebo systémy, kde se často mění nejen DAGy, ale i samotné nastavení Airflow. Tento přístup zaručuje, že všechny komponenty systému budou používat identické verze DAGů, pluginů a poskytovatelů. Na druhou stranu tento přístup může mít vyšší nároky na čas potřebný pro sestavení a nasazení kontejnerů, což vede k delší době nečinnosti během aktualizace.

Pro složitější prostředí, kde se požaduje flexibilita a nezávislost nasazení DAGů a samotného Airflow, je vhodné oddělit nasazení DAGů od nasazení samotného Airflow. Tento přístup je často efektivní v systémech, kde je potřeba, aby byly DAGy pravidelně aktualizovány bez nutnosti měnit celý systém Airflow. Tento vzorec je zvláště užitečný v dynamických prostředích, kde se pracovní toky neustále mění a vyžadují nezávislé nasazení. Nicméně to také znamená, že budete muset implementovat a spravovat další infrastrukturu, což může být časově náročné a složité.

Zvolení mezi push a pull metodami pro doručování DAGů je dalším krokem, který ovlivňuje efektivitu nasazení. Push metoda je poměrně snadná na implementaci a vyžaduje minimální infrastrukturu. Ovšem při selhání systému nebo při obnově souborového systému Airflow budete muset znovu vyvolat kopírování souborů, což může být problém. Naopak pull metoda, kdy Airflow systém pravidelně stahuje DAGy z externího repozitáře, je flexibilnější a lépe se hodí pro automatizované scénáře. Tato metoda obvykle vyžaduje více konfigurace a orchestrace, ale v dlouhodobém horizontu nabízí větší robustnost a udržitelnost.

Zajištění správného testování kódu a verze DAGů je rovněž klíčové pro správné fungování celé infrastruktury. Při výběru metod nasazení byste měli vzít v úvahu, jaký způsob testování bude nejlepší pro vaši organizaci. V praxi se testování často zaměřuje na ověřování správného chování samotného kódu DAGů a jejich provádění v rámci Airflow. Pro firmy je důležité, aby měli robustní CI/CD procesy, které umožní nejen automatizované testování, ale i sledování a správu změn v reálném čase.

Nasazení a správa DAGů v Airflow je komplexní úkol, který vyžaduje dobře promyšlené a efektivní postupy. Vyvážení mezi flexibilitou a stabilitou nasazení, správná volba metod pro doručování a správu DAGů a efektivní testování jsou klíčovými faktory pro úspěch v produktivním prostředí. Jakmile si osvojíte základní principy nasazení, budete schopni přizpůsobit svou strategii podle konkrétních potřeb vaší organizace a vytvářet efektivní a škálovatelné pracovní toky.

Jak pracovat s DAG v Apache Airflow: Základní kroky a principy

Vytvoření a správa DAG v Apache Airflow je proces, který vyžaduje pochopení základních principů tohoto nástroje a jeho architektury. Airflow je silný nástroj pro orchestraci datových pipeline, který umožňuje automatizované spouštění a správu pracovních procesů ve formě DAG (Directed Acyclic Graph). Tato kapitola se zaměřuje na základní kroky při práci s Airflow, včetně tvorby DAG, definování úkolů a práce s operátory.

Pro navigaci do webového rozhraní Airflow serveru stačí otevřít webový prohlížeč a zadat adresu http://localhost:8080. Po přihlášení do systému Airflow s použitím jména uživatele „admin“ a hesla, které získáte při spuštění Airflow v terminálu, se dostanete na domovskou stránku. Zde budete mít přístup k přehledu běžících DAG a jejich stavu. Tento přehled vám pomůže lépe porozumět fungování jednotlivých úkolů a celkovému chodu vašeho datového workflow.

V následujících kapitolách se zaměříme na konkrétní detaily Airflow rozhraní a ukážeme příklady běžných scénářů správy DAG. Ale nyní se podívejme na základní komponenty, které tvoří jádro Apache Airflow, konkrétně na příklad jednoduchého DAG, který bude sloužit jako úvod do práce s tímto nástrojem.

DAGy, nebo-li směrové acyklické grafy, jsou klíčovým nástrojem pro orchestraci datových pipeline. Jsou postaveny na jazyce Python a využívají širokou škálu podpůrných knihoven. Při vytvoření místního vývojového prostředí se automaticky načte ukázkový DAG example_dag_basic, který budeme analyzovat v této části. DAGy se skládají ze tří hlavních prvků: úkolů, operátorů a senzorů. V průběhu této kapitoly se budeme věnovat i novým technikám, jako jsou task groupy a odložené operátory, které byly zavedeny v novějších verzích Airflow.

Pokud chcete vizualizovat tento DAG, stačí na webovém rozhraní Airflow kliknout na název DAG a vybrat možnost „Graph“. Tento krok vám ukáže grafické znázornění úkolů a jejich závislostí. V tomto konkrétním příkladu jsou všechny úkoly prováděny sekvenčně, ale v komplexnějších DAG budou úkoly probíhat paralelně nebo čekat na specifický spouštěcí trigger. V takových případech se může vizualizace stát nejasnou a příliš složitou. Doporučuje se tedy v případě složitých DAG rozdělit procesy do menších, snadněji spravovatelných celků.

Každý DAG začíná definováním jeho parametrů. V příkladu DAG se nejprve importují potřebné knihovny, jako je JSON a dekorátory Airflow, které usnadňují zápis a správu DAG. Poté deklarujeme DAG s parametry, jako je schedule, start_date, catchup, a default_args. Tento proces zjednodušuje předchozí způsob zápisu, kde bylo nutné používat funkci „with DAG as“.

Ve specifikovaném příkladu DAG bude úloha spuštěna denně od 1. ledna 2023, s parametrem catchup nastaveným na False, což znamená, že Airflow nebude spouštět žádné dávky úkolů, které by měly probíhat před tímto datem. Pokud bychom použili catchup=True, systém by se pokusil vykonat všechny úkoly za předchozí dny.

Dalším důležitým parametrem je default_args, který nastavuje výchozí hodnoty pro parametry, jako je počet pokusů o opakování úkolů při neúspěchu. V příkladu je tento parametr nastaven na hodnotu 2, což znamená, že Airflow se pokusí úkol zopakovat maximálně dvakrát, pokud dojde k jeho selhání. Tento mechanismus je užitečný především při práci s externími databázemi nebo API, které mohou mít výpadky nebo časové prodlevy.

TAGS jsou také důležitým nástrojem pro organizaci a seskupování DAG. Pomocí tagů můžeme například rozdělit DAG podle jednotlivých systémů, týmů nebo uživatelů, což usnadňuje správu větších systémů.

Úkoly (tasks) jsou základní jednotkou vykonávání v Airflow. Každý úkol reprezentuje jednu operaci nebo akci, která má být vykonána v rámci pipeline. Úkoly jsou vykonávány pomocí operátorů, které určují typ úkolu. Mezi běžné úkoly patří spuštění Python funkcí, provádění SQL dotazů nebo transformace dat.

Operátory v Airflow jsou klíčové pro definování a provádění úkolů. Každý operátor specifikuje konkrétní úkol, který má být vykonán, a kapsuluje logiku potřebnou k jeho provedení. Airflow nabízí širokou škálu operátorů, jako jsou:

  • BashOperator: Spouští bash příkazy

  • PythonOperator: Spouští Python funkce

  • SqlOperator: Spouští SQL dotazy

  • DockerOperator: Spouští Docker kontejnery

  • HttpOperator: Posílá HTTP požadavky

Tyto operátory mohou být také přizpůsobeny a rozšířeny pro specifické potřeby uživatelů, což umožňuje vytvářet komplexní datové pipeline, které jsou zároveň přehledné, udržovatelné a škálovatelné.

Prvním úkolem v příkladu je úkol „extract“, který se stará o načítání a extrakci dat. Tento úkol používá Python k načtení datového řetězce a jeho konverzi do slovníku. Tato data pak mohou být předána do dalších úkolů pro transformaci a načítání.

Dalšími klíčovými aspekty jsou transformace a načítání dat (ETL), kde Airflow umožňuje provádět různé operace podle definovaných úkolů. V praxi se tedy jedná o vysoce flexibilní a rozšiřitelný systém pro orchestraci a správu datových procesů.

Pro efektivní práci s Airflow je také důležité pochopit principy plánování úloh a jejich správy. Airflow využívá cron-like rozvrhy, které umožňují spouštět úkoly na základě definovaných intervalů, ať už je to denně, měsíčně, nebo na základě specifických časových oken. Výběr správného intervalu pro spuštění úkolů je klíčový pro optimalizaci běhu pipeline a minimalizaci zpoždění při zpracování dat.