Når du opretter en brugerdefineret Airflow-udbyder, er det vigtigt at forstå den grundlæggende struktur og de best practices, der er blevet etableret i fællesskabet. Disse konventioner er designet for at sikre, at udbyderen fungerer korrekt i Airflow og er nem at forstå og bruge for andre udviklere. Strukturen, der er beskrevet her, gør det muligt at integrere tredjepartsservices på en enkel måde, samtidig med at den opretholder Airflows modulære og fleksible natur.

Først og fremmest skal udbyderen oprettes som et Python-pakke, der følger specifikke navngivnings- og strukturkrav. Dit udbydernavn bør begynde med "airflow_provider", efterfulgt af teknologien, du integrerer med. I dette tilfælde er det "airflow_provider_tea_pot", hvilket indikerer, at udbyderen integrerer med en "teapot"-service. Strukturen af pakken bør omfatte flere mapper og filer, der hjælper med at organisere koden og funktionaliteten.

En typisk mappeopbygning kan inkludere følgende:

  • Hooks: Indeholder de nødvendige hooks for at kommunikere med den eksterne service.

  • Operators: Indeholder operators, der bruges i Airflows Directed Acyclic Graphs (DAGs).

  • Sensors: Specialiserede operators, der venter på eller overvåger en ekstern service.

  • Triggers: Triggers, der bruges af specialiserede, udsættelige operators.

  • Dev: Indeholder filer og konfigurationer til lokal testning og demo.

  • Tests: Indeholder enheder, funktionelle eller ende-til-ende tests, som sikrer, at din kode fungerer korrekt.

  • Example DAGs: Eksempler på DAGs, der kan køres i udviklingsmiljøet for at demonstrere funktionaliteten af udbyderen.

Ud over selve koden er der nødvendige filer for pakkehåndtering og distribution, såsom setup.py og setup.cfg, der definerer metadata og konfigurationer for pakken.

Det er værd at understrege, at mens de nævnte konventioner ikke er absolut nødvendige for, at udbyderen fungerer, er de blevet standardiseret af fællesskabet og anbefales kraftigt for at gøre det lettere for andre udviklere at forstå og bruge din kode.

Når du har oprettet strukturen, er det tid til at begynde at kode selve udbyderen. Der er dog nogle designovervejelser, som er vigtige at tage højde for. For eksempel bør koden kunne køre uden internetadgang. Dette er en vigtig overvejelse, da Airflow Scheduler regelmæssigt parser DAGs, og alt i __init__ metoden for en klasse bliver kørt ved denne parsing. Hvis du laver netværksanmodninger eller afhængigheder i denne fase, kan det føre til fejl. Det er bedre at bruge Jinja-templating og makroer i stedet for at kalde funktioner, der kun er gyldige under opgaveudførelse.

En anden vigtig note er, at alle operators kræver en execute metode. Dette er en obligatorisk grænseflade for alle operators, og den bør implementeres for at sikre, at din udbyder fungerer korrekt indenfor Airflow.

At registrere din udbyder

Før du begynder at skrive selve funktionaliteten, skal du sørge for, at din udbyder bliver korrekt registreret hos Airflow ved opstart. Dette gøres via en bestemt metadata-entry i din pakke, som fortæller Airflow, at den skal registrere din udbyder.

I din provider.py fil skal du oprette en funktion, der returnerer information om din udbyder. Denne funktion, get_provider_info, bør returnere en ordbog med nøgler som package-name, name, description og versions, som Airflow bruger til at vise oplysninger om udbyderen i brugergrænsefladen, CLI og API.

Desuden skal du fortælle Python, at denne entry findes ved at tilføje følgende til din setup.cfg:

ini
[options.entry_points] apache_airflow_provider= provider_info=airflow_provider_tea_pot.provider:get_provider_info

Med dette på plads vil din udbyder automatisk blive registreret, når Airflow starter, og du vil kunne interagere med den via Airflows UI og CLI.

At skrive en hook

Nu er det tid til at oprette en hook, som giver mulighed for at interagere med den eksterne service. I vores eksempel har vi en teapot-service, som understøtter HTTP-anmodninger og en minimal implementering af HTCPCP (Hyper Text Coffee Pot Control Protocol).

For at oprette en hook skal du oprette en klasse, der nedarver fra Airflows BaseHook. Denne hook håndterer forbindelsen til teapot-tjenesten og giver funktionalitet til at sende og modtage HTTP-anmodninger. Hooken skal have en initieringsmetode, der gemmer forbindelsesoplysninger, og eventuelle nødvendige UI-elementer for at præsentere forbindelsesvariablerne korrekt i Airflow-webgrænsefladen.

I dette eksempel vil hooken indeholde funktionalitet til at sende anmodninger som GET /ready, POST /make_tea og GET /water_level, som gør det muligt at kontrollere tepotens status og interagere med den. Du kan også tilføje UI-elementer til forbindelsessiden i Airflow-webgrænsefladen, hvilket gør det lettere for brugere at konfigurere forbindelsen.

Det er en god praksis at bruge statiske metoder til at definere formularfelter, så de nemt kan præsenteres i Airflows UI. I dette tilfælde kan du bruge Flask- og WTForms-biblioteker til at definere felterne for brugerens input, såsom pot-designator og eventuelle ekstra ingredienser (tilføjelser) i tepotens brygning.

Yderligere betragtninger

Når du arbejder med Airflow og opretter en udbyder, er det vigtigt at forstå, at både design og kode bør tage højde for genbrug og udvidelsesmuligheder. Tænk på din kode som et modul, der ikke kun fungerer i dit eget miljø, men som også kan integreres i andres arbejdsgange.

At implementere grundlæggende testning er en anden kritisk del af udviklingsprocessen. Testning sikrer, at din udbyder fungerer som forventet og kan lette vedligeholdelse og videreudvikling. Det er en god idé at oprette en række enhedstests, funktionelle tests og ende-til-ende tests, så du kan være sikker på, at udbyderen fungerer under forskellige scenarier.

Testene kan inkludere kontrol af, at de rette HTTP-anmodninger bliver sendt til den eksterne service, samt at Airflow korrekt håndterer operationerne i dine DAGs. At implementere testning fra begyndelsen af projektet hjælper med at undgå problemer og giver en nemmere måde at validere funktionaliteten på, mens du udvikler.

Hvordan tilpassede Airflow-forbindelsestyper og -operationer kan forbedre integrationen

Når vi arbejder med Airflow og tilpassede udbydere, kræver det, at vi har en grundlæggende forståelse af, hvordan vi kan designe forbindelsestyper og operatører for at udvide funktionaliteten i vores arbejdsflows. Et praktisk eksempel på dette kan ses i integrationen af en tepotte, som kan anvendes til at demonstrere de teknikker, der anvendes til at oprette forbindelse og interagere med eksterne systemer. For at få den ønskede funktionalitet i Airflow, skal vi først definere de nødvendige forbindelsestyper, hooks og operatører.

En vigtig del af integrationen er at definere feltet "Pot Designator" og "Additions" for tepotteforbindelsen. Dette gør det muligt at gemme disse felter som nøgler i et ekstra ordbog, der er knyttet til en forbindelse. Når vi arbejder med UI-felter, kan vi definere, hvordan disse felter skal vises, og hvilke pladsholdere der skal anvendes. For eksempel vil pladsholderen for "pot_designator" blive sat til "1", og "additions" kan være "sugar". Dette giver en enkel visuel guide til den person, der interagerer med Airflow-grænsefladen, så de nemt kan forstå, hvilke data de skal indtaste.

En central funktion i Airflow-forbindelsen er at hente og håndtere data fra eksterne kilder. I eksemplet anvender vi et @cached_property for at hente forbindelsesoplysninger, såsom host og port, og derefter konstruere en URL, der kan anvendes til at kommunikere med den eksterne tjeneste. Hvis de nødvendige felter, som f.eks. pot_designator og additions, findes i forbindelsen, vil de blive indlæst og gemt som attributter i klassen.

Når vi tester forbindelsen, er det vigtigt at bruge metoden test_connection, som returnerer en simpel status for forbindelsen. Hvis forbindelsen er aktiv, returneres en succesmeddelelse; ellers returneres en fejlinformation. Denne metode anvendes, når en bruger interagerer med grænsefladen og trykker på "Test" knappen for at kontrollere forbindelsen.

For at registrere en forbindelse af typen "teapot" i Airflow, skal vi også opdatere udbyderens informationsfunktion med en "connection-type"-nøgle, der beskriver typen af forbindelse og hvilken hook-klasse der skal bruges. Når udbyderen er registreret, vil Airflow UI automatisk være opmærksom på den nye forbindelse og vise den korrekt i brugergrænsefladen.

Når forbindelsen er etableret, kan vi begynde at definere funktionerne til at interagere med den eksterne tjeneste. I eksemplet defineres flere metoder, som f.eks. make_tea, brew_coffee og get_water_level. Hver af disse metoder sender en HTTP-anmodning til tepotten via dens URL og modtager svar, der bekræfter, om operationen er gennemført korrekt. Hvis et uventet svar opstår, kastes en fejl. Selvom dette eksempel er meget simpelt, vil de faktiske brugsscenarier ofte være mere komplekse og kræve passende fejlhåndtering og defensive foranstaltninger.

Næste skridt er at oprette en operatør, der interagerer med denne tepotte. I eksemplet er en operatør til at lave te oprettet, som bruger TeaPotHook til at hente forbindelse og udføre den ønskede handling, f.eks. at lave te. En operatør kræver to hovedmetoder: en initialiseringsmetode og en execute metode. Initialiseringsmetoden gemmer bare konfigurationsinformation, mens execute metoden håndterer arbejdet ved at kalde de relevante hooks og få arbejdet gjort.

I Airflow kan vi også oprette sensorer, som er en speciel type operatør, der kontinuerligt overvåger en ekstern ressource, indtil den modtager et signal om, at den kan fortsætte. Et eksempel kunne være en sensor, der overvåger vandstanden i tepotten og sikrer, at niveauet er tilstrækkeligt, før den begynder at lave te. I dette tilfælde anvendes en asynkron tilgang til at kontrollere vandstanden, hvilket er nødvendigt for at sikre, at systemet reagerer hurtigt på ændringer i vandstanden.

For at arbejde med asynkrone opgaver i Airflow skal man være opmærksom på de designmæssige overvejelser, der følger med det. Når man skriver en trigger, som f.eks. WaterLevelTrigger, skal man sikre sig, at den kører effektivt og ikke blokerer eksekveringen af andre opgaver. Dette kan opnås ved at bruge asynkrone funktioner og ved at sørge for, at triggeren kan køre flere gange uden at forårsage uønskede bivirkninger.

Når man arbejder med deferrable operatører, som gør det muligt at udskyde arbejdet, bør man være opmærksom på, at disse operatører er i en suspenderet tilstand, hvilket betyder, at de ikke bruger systemressourcer, mens de venter. Dette skaber mulighed for effektiv brug af ressourcer i et travlt miljø som Airflow.

Det er væsentligt at forstå, at når man udvikler tilpassede udbydere og operatører, skal man tage højde for de praktiske udfordringer, der kan opstå, især når man arbejder med eksterne systemer. Fejlhåndtering, asynkrone kald og effektiv ressourcestyring er nøgleaspekter, der kan sikre, at integrationen fungerer effektivt og pålideligt i et produktionsmiljø. En god forståelse af disse elementer vil give dig mulighed for at designe mere robuste og skalerbare løsninger.