I arbejdet med Airflow er det vigtigt at forstå, hvordan man organiserer og administrerer sine DAGs (Directed Acyclic Graphs). Et centralt aspekt af Airflow er, hvordan man konfigurerer opgaver (tasks) i en DAG, og hvordan man håndterer fejl og forsøg på at køre opgaverne igen. Dette giver mulighed for pålidelighed og automatisering i arbejdet med eksterne systemer, som for eksempel API'er, der ikke altid er stabile.
En nyttig funktion i Airflow er depends_on_past, som forhindrer, at en opgave køres, hvis den forrige kørsel af opgaven ikke er lykkedes. Når depends_on_past sættes til True, vil en opgave ikke blive udført, hvis den forrige opgave ikke er blevet gennemført succesfuldt. I et scenario, hvor der kan være problemer med pålideligheden af en ekstern API, kan det dog være hensigtsmæssigt at sætte denne værdi til False. På den måde sikres det, at opgaven fortsætter, selvom den forrige kørsel mislykkedes. Dette er særlig nyttigt i situationer, hvor systemet, der bruges til at hente data, ikke altid er stabilt, da det gør det muligt for DAG'en at fortsætte uden forsinkelser.
En anden vigtig parameter er retries, som angiver, hvor mange gange Airflow skal forsøge at køre en opgave, hvis den fejler. I dette eksempel er retries sat til 2, hvilket betyder, at Airflow vil forsøge at køre opgaven to gange ekstra, før den giver op. Som standard er retries sat til 0 i Airflow, hvilket betyder, at der ikke vil blive forsøgt igen, hvis opgaven fejler første gang. Denne funktion er essentiel, når der arbejdes med opgaver, der kan være afhængige af eksterne systemer, som kan have midlertidige problemer.
Derudover kan retry_delay bruges til at specificere, hvor lang tid Airflow skal vente mellem de forskellige forsøg på at køre en opgave. I dette eksempel er retry_delay sat til 5 minutter, hvilket betyder, at Airflow vil vente 5 minutter, før det forsøger at køre opgaven igen. Dette kan være nyttigt, hvis opgaven interagerer med et system, der kan være overbelastet, og som måske kun har midlertidige problemer, der løses med et kortere interval mellem forsøgene.
Når det kommer til at definere de første opgaver i en DAG, er det vigtigt at forstå, hvordan man bryder koden op i individuelle opgaver. I et simpelt eksempel kan én Python-funktion som get_pictures indeholde flere trin, som for eksempel at oprette forbindelse til en API, hente data, gemme metadata og gemme billeder. Dette kunne også opdeles i flere individuelle opgaver, men i dette tilfælde har vi valgt at holde det simpelt og sammensætte flere funktioner i én opgave.
Funktionen get_pictures defineres som en PythonOperator, som er en af de grundlæggende operatører i Airflow. Denne operatør bruges til at køre Python-funktioner, og det er her, den faktiske logik til at hente billeder fra en ekstern API og gemme dem lokalt implementeres. For at gøre dette opretter vi først en mappe på systemet, hvis den ikke allerede eksisterer, og derefter hentes et billede fra en ekstern API. For at hente billedet bruges en simpel HTTP-anmodning, og derefter gemmes det i en fil med et dato-stempel for at sikre, at hvert billede gemmes med en unik filnavn.
Når billedet er hentet og gemt, er det en god idé at benytte en anden opgave til at notificere brugerne om, at billedet er blevet tilføjet. Dette kan gøres ved at bruge en BashOperator, som giver mulighed for at køre bash-kommandoer. I dette tilfælde bruges echo til at sende en simpel besked til systemet, der angiver, at billedet er blevet tilføjet. Denne besked kan derefter ses i loggene for opgaven i Airflow's grænseflade, hvor man kan følge med i status for DAG'en og alle dens opgaver.
Airflow gør det muligt at bygge komplekse arbejdsflows ved at bruge en række forskellige operatører, der kan udføre en lang række opgaver. Nogle af de mest anvendte operatører er:
-
PythonOperator: Bruges til at køre Python-funktioner. Denne operatør er meget fleksibel og giver mulighed for at inkorporere kompleks logik og databehandling i workflowet.
-
BashOperator: Bruges til at køre Bash-scripts. Denne operatør er nyttig, hvis man har behov for at bruge shell-kommandoer i workflowet, som for eksempel filhåndtering eller systemadministration.
-
EmailOperator: Sender e-mails fra Airflow, hvilket kan bruges til at notificere interessenter eller sende rapporter.
-
SimpleHttpOperator: Gør det muligt at sende HTTP-anmodninger til eksterne API'er, hvilket er nyttigt, når man skal integrere med tredjepartssystemer.
-
KubernetesPodOperator: Kører opgaver i Kubernetes-pods, hvilket giver mulighed for at udnytte Kubernetes' skalerbarhed og ressourcehåndtering.
Det er vigtigt at forstå, at Airflow giver en stor fleksibilitet og skalerbarhed, men det kræver også en vis forståelse af, hvordan opgaver oprettes, organiseres og fejlhåndteres. For eksempel, når man arbejder med eksterne systemer som API'er, kan det være nødvendigt at konfigurere fejlhåndtering for at sikre, at workflowet ikke stopper, hvis en enkelt opgave fejler. Desuden kan man med Airflows operatører effektivt skabe komplekse workflows uden at skulle skrive meget specialiseret kode.
Det er også værd at nævne, at det er god praksis at revidere ejerskabet af forskellige DAG'er med jævne mellemrum. Efterhånden som teammedlemmer ændrer sig, kan ansvaret for at fejlfinde på specifikke opgaver og DAG'er også ændre sig. Derfor er det en god idé at holde sig opdateret på, hvem der har ansvar for hvilke opgaver, og hvordan eventuelle fejl håndteres.
Hvordan bygger man et metrics dashboard plugin i Apache Airflow?
For at bygge et metrics dashboard plugin i Apache Airflow skal vi følge nogle grundlæggende trin, der involverer både backend- og frontend-udvikling. Først skal vi forstå strukturen af et Airflow-plugin og hvordan vi kan udvide Airflows funktionalitet med brugerdefinerede værktøjer.
I første omgang integrerer vi Chart.js, et populært JavaScript-bibliotek, i vores dashboard via et open source CDN. Dette bibliotek giver os mulighed for at visualisere data i canvas-elementer, som vi definerer i vores HTML-template. Dataene, der skal bruges til grafikken, kommer fra en JSON-fil (dag_run_stats), som vi har genereret i backend-koden af dashboardet. Dernæst bruger vi den indbyggede Chart() metode til at rendre grafikken i de relevante canvas-elementer. Denne metode kræver både canvas-elementet som det første argument og konfigurationen af diagrammet som det andet.
Den næste nødvendige komponent er implementeringen af Airflow-pluginet. I dette trin opretter vi en Flask Blueprint og en MetricsDashboardView, der definerer hvordan dashboardet skal vises i Airflow UI. Flask Blueprint er en central komponent, der gør det muligt at registrere ruter og håndtere HTTP-anmodninger, mens MetricsDashboardView håndterer backend-logikken, såsom at hente data fra databasen og sende dem til frontend til visualisering.
For at registrere pluginet i Airflow skal vi inkludere vores Blueprint og View i Airflows plugin-struktur. I init.py-filen i vores metrics_plugin-mappe registrerer vi flask_blueprint og MetricsDashboardView, som derefter vil blive inkluderet i Airflow’s brugerflade. Det betyder, at når vi placerer vores plugin i Airflow’s plugin-mappe og genstarter webserveren, vil vi kunne se en ny kategori i Airflow-menuen – nemlig "Metrics" – med en undermenu "Dashboard", der fører til vores metrics dashboard.
Når pluginet er implementeret, giver dashboardet os værdifulde indsigter i Airflows arbejdsforløb. For eksempel viser grafen for "Successful Dag Runs" antallet af pipeline-kørsler, der er afsluttet succesfuldt inden for de sidste 1, 7 og 30 dage. På samme måde viser grafen "Failed Dag Runs" de fejlbehæftede DAG-kørsler i de samme tidsrammer. Denne type visualisering gør det muligt for Airflow-brugere hurtigt at få et overblik over systemets præstationer og eventuelle problemer, som kræver opmærksomhed.
De nødvendige skridt for at bygge et metrics dashboard plugin er som følger:
-
Oprettelse af en projektstruktur i Airflow plugins-mappen, som kan genbruges til andre brugerdefinerede plugins.
-
Implementering af backend-funktionaliteten i MetricsDashboardView, som henter de nødvendige data fra databasen.
-
Oprettelse af en HTML-template, der definerer layoutet og visualiseringerne på dashboardet.
-
Registrering af pluginet i Airflows system ved at tilføje flask_blueprint og MetricsDashboardView i den passende Airflow-pluginklasse.
Det vigtigste at forstå, når man arbejder med Airflow plugins, er, at de tilbyder en stor fleksibilitet til at tilpasse brugerfladen og backend-funktionerne. Dette kan være uundværligt, når man har specifikke krav til, hvordan Airflow skal visualisere og håndtere data. Selvom den indbyggede Airflow UI allerede tilbyder grundlæggende funktionalitet, gør evnen til at oprette tilpassede dashboards og rapporter det muligt at skræddersy overvågningserfaringen til ens specifikke behov.
Der er dog flere andre aspekter, som er væsentlige at tage i betragtning. For det første er det vigtigt at forstå, at opbygning af et plugin ikke kun handler om at få data til at blive vist korrekt; det handler også om at sikre, at dashboardet er robust og skalerbart. Det betyder, at backend-koden bør optimeres til effektiv databehandling, og frontend-visualiseringerne skal være lette at forstå for brugerne. Desuden kan man eksperimentere med at tilføje flere graf- og datafiltreringsmuligheder for at give brugerne endnu flere indsigter i deres workflows.
At bygge et metrics dashboard plugin i Airflow er derfor en fremragende mulighed for at udvide Airflows funktionalitet og skabe skræddersyede løsninger, der imødekommer de specifikke behov i en given organisation eller projekt.
Hvordan designe og implementere en DAG til maskinlæring i Airflow
I maskinlæringens livscyklus er forskellige faser afgørende for systemets succes, fra dataindsamling og behandling til modellens træning og distribution. Et vigtigt værktøj til at orkestrere disse faser er Airflow, som muliggør automatisering og koordination af de processer, der udgør "trænings" fasen i maskinlæringens cyklus. Der er dog også situationer, hvor Airflow kan spille en rolle under "prediktions" fasen, især i batchsystemer, hvor forudsigelser skal genereres for store datamængder på en regelmæssig basis. For online-prediktioner er der dog andre systemer, der er bedre egnede.
For at illustrere brugen af Airflow i en maskinlæringsarbejdsgang vil vi bruge et konkret eksempel: en anbefalingssystem for film, baseret på MovieLens-databasen. Dette eksempel viser, hvordan man designer en Directed Acyclic Graph (DAG) i Airflow, der automatisk kan hente data, opdatere modeller og gøre forudsigelser til brugere.
Når vi ser på designet af vores DAG, opstår der hurtigt nogle vigtige overvejelser. For det første ved vi ikke præcist, hvornår data vil være tilgængelige, så vi kan ikke bruge en ren tidsplanlagt tilgang. I stedet skal vi sikre, at første trin i vores arbejdsgang kan afgøre, om data er blevet ændret, før vi fortsætter med at downloade og køre modellerne. Dette betyder, at vi først tjekker dataens md5-hash for at vurdere, om der er sket ændringer siden sidste kørsel. Hvis dataene ikke er ændret, afsluttes processen hurtigt, hvilket sparer tid og ressourcer.
En anden vigtig opdagelse under designfasen er, at de notebook-koder, som vores datavidenskabsven bruger, indeholder noget gentagen kode til både dataindsamling og træning af modeller. Dette kan forenkles ved at samle koden i én central del af DAG'en, hvilket gør processen både mere effektiv og lettere at vedligeholde.
I vores eksempel arbejder vi med et forholdsvis lille dataset under de indledende udviklingsstadier, men det fulde dataset kræver betydelige beregningsressourcer. Dette betyder, at vi skal være opmærksomme på de ressourcer, der er tilgængelige, og sikre, at systemet kan håndtere de nødvendige beregninger, når data vokser.
Når nye data modtages, vil både film- og brugervektorer ændre sig betydeligt i struktur og indhold. Når vi uploader disse vektorer til webstedets database, er det vigtigt, at vi kan "bytte" data uden at forårsage lange nedetider eller forstyrrelser i tjenesten. Dette kræver en præcis og omhyggelig håndtering af dataudveksling.
For at implementere systemet starter vi med at designe et trin, der sikrer, at vi kun behandler data, når der er ændringer. Vi bruger en BranchPythonOperator i Airflow til at kontrollere md5-hashen af datasettene, og hvis den er ændret, fortsætter vi med at hente dataene og bearbejde dem. Denne proces gør det muligt at undgå unødvendige nedetider, da vi kun downloader dataene, når det er nødvendigt.
Når dataene er hentet, skal vi pakke dem ud og forberede dem til videre behandling. Vi adskiller download- og behandlingsstadierne, så fejl i senere faser ikke medfører unødvendige downloads. Når dataene er pakket ud, sørger vi for, at de nødvendige filer er tilgængelige til den videre databehandling, hvilket betyder, at vi kun gemmer de relevante oplysninger for vores modeller.
Det er også vigtigt at forstå, at datakilder i maskinlæring ofte kan ændre sig, og at vi må designe systemer, der kan håndtere disse ændringer fleksibelt. I eksemplet med MovieLens skal vi være opmærksomme på, at dataene bliver opdateret med jævne mellemrum, og vi skal sikre, at disse opdateringer kan integreres i systemet uden at forstyrre den eksisterende drift.
I betragtning af den tekniske kompleksitet og de ressourcer, der kræves, er det essentielt, at alle processer er veldokumenterede og lette at vedligeholde. Når systemet først er i drift, skal det være i stand til at hente data automatisk, opdatere modeller og levere anbefalinger til brugere uden behov for manuel indgriben.
Det er nødvendigt at være opmærksom på de praktiske udfordringer, der opstår under implementeringen, især når det gælder om at sikre systemets robusthed, fleksibilitet og skalerbarhed. Fejl i tidlig datahåndtering eller modelopdatering kan føre til alvorlige problemer senere i systemet. Det er derfor vigtigt at teste og validere hele workflowet grundigt før det implementeres i produktion.
Hvordan Apache Airflow Triggers og Planlægning Arbejder i Dynamiske Arbejdsgange
Apache Airflow er et kraftfuldt værktøj til at orkestrere og automatisere arbejdsprocesser. En af de vigtigste funktioner, der gør Airflow fleksibelt og effektivt, er dens brug af triggere og planlægningsmekanismer, der giver udviklere mulighed for at styre, hvornår og hvordan opgaver bliver udført i komplekse arbejdsflows.
En af de centrale triggere i Airflow er TimeDeltaTrigger, som giver mulighed for at planlægge en opgave til at køre efter en specifik tidsforsinkelse i forhold til, hvornår en anden opgave er blevet afsluttet. Denne funktion er en del af Airflows dynamiske opgavekortlægning og udsatte operatorfunktioner, som gør det muligt for arbejdsflows at justere sig dynamisk baseret på de betingelser, der er til stede ved køretid.
For eksempel, i en simpel DAG, kunne det være nødvendigt at vente 30 minutter efter, at en opgave (som en dataudtrækning) er afsluttet, før en efterfølgende opgave (som datatransformering) kan udføres. Dette kan opnås med en TimeDeltaTrigger ved at tilføje en forsinkelse på 30 minutter til den relevante opgavedefinition, som dette:
Denne form for tidsstyring er ikke den eneste type trigger, som Airflow tilbyder. Der findes flere typer triggere, som alle har specifikke anvendelsesområder. De mest almindelige typer inkluderer tidsbaserede, afhængighedsbaserede og hændelsesbaserede triggere.
Tidsbaserede triggere omfatter både CRON-udtryk og intervalbaserede planlægningsmekanismer. CRON-udtryk er meget brugte, da de giver mulighed for at planlægge opgaver til at køre på faste tidspunkter. For eksempel, udtrykket 0 0 * * * planlægger en opgave til at køre dagligt ved midnat. Intervalbaseret planlægning gør det muligt at køre opgaver med faste tidsintervaller, som f.eks. hver time eller hver dag.
Afhængighedsbaserede triggere er essentielle i arbejdsflows, hvor opgaver er afhængige af resultaterne af andre opgaver. En sådan trigger kan aktiveres, når en forudgående opgave er afsluttet succesfuldt. Dette er særligt nyttigt i situationer, hvor dataudtrækninger eller transformationer skal ske i en bestemt rækkefølge. Desuden kan den eksterne opgavesensor trigger bruges til at vente på, at en opgave i en anden DAG bliver afsluttet, før en bestemt opgave kan udføres. Dette muliggør koordinering mellem flere arbejdsflows i Airflow.
Hændelsesbaserede triggere som Webhooks og sensors gør det muligt at starte opgaver baseret på eksterne hændelser. En Webhook-trigger kan aktivere en opgave, når et eksternt system sender et signal. Dette er nyttigt, når arbejdsflowet skal reagere på hændelser uden for Airflow. En e-mail-sensor kan bruges til at udløse en opgave, når en e-mail, der opfylder specifikke betingelser, modtages, hvilket gør det muligt at starte processer som reaktion på bestemte notifikationer.
Der er også triggere, der relaterer sig til dataens tilgængelighed, og manuelle triggere, som gør det muligt for brugeren at starte opgaver med et klik. Hver af disse triggere har deres egne specifikke anvendelser, og valget af trigger afhænger af den konkrete opgave, arbejdsflowet og de ønskede resultater.
For at forstå, hvordan disse triggere arbejder sammen med Airflows grundlæggende koncepter som DAG’er, opgaver, operatorer og Xcoms, er det vigtigt at få en solid forståelse af systemets arkitektur. Apache Airflow er bygget omkring en central metadata-database, der lagrer oplysninger om opgavernes status, DAG-udførelser og andre vigtige data. Scheduler og Executor spiller også en central rolle i, hvordan og hvornår opgaver bliver eksekveret, mens webserveren giver en grafisk brugergrænseflade, hvor brugerne kan overvåge status på deres DAG’er og manuelt udløse opgaver.
For dem, der arbejder med større ETL-pipelines eller komplekse dataarbejdsgange, er det afgørende at forstå, hvordan man korrekt anvender og konfigurerer triggere og planlægning. Den rette brug af triggere kan være en game-changer i håndteringen af store og komplekse dataflow, hvor timing og afhængigheder mellem opgaver er kritiske.
Det er ikke kun nødvendigt at mestre de grundlæggende triggere, men det er også vigtigt at kunne forstå de forskellige optimeringsmuligheder, som Airflow giver. For eksempel, hvordan man håndterer fejlhåndtering, opgavestyring og effektiv ressourceudnyttelse, når arbejdsflowet vokser i størrelse og kompleksitet.
Det er også vigtigt at bemærke, at Airflow ikke kun anvendes til dataudtræk, transformation og indlæsning (ETL) – det er også en vigtig komponent i maskinlæring og AI-arbejdsgange. Ved at bruge triggere effektivt, kan man optimere automatiseringen af disse arbejdsgange og sikre, at modeller trænes og valideres på de rette tidspunkter.

Deutsch
Francais
Nederlands
Svenska
Norsk
Dansk
Suomi
Espanol
Italiano
Portugues
Magyar
Polski
Cestina
Русский