KubernetesExecutor ja DaskExecutor ovat kaksi tehokasta vaihtoehtoa, jotka tarjoavat joustavia ja skaalautuvia ympäristöjä Apache Airflowin tehtävien suorittamiseen. Molemmat tarjoavat etuja, mutta ne myös tuovat mukanaan omia haasteitaan ja monimutkaisuuksia. On tärkeää ymmärtää näiden kahden executorin vahvuudet ja rajoitukset, jotta voit valita oikean ratkaisun tarpeidesi mukaan.

KubernetesExecutor hyödyntää Kubernetesin tarjoamia dynaamisia skaalausmahdollisuuksia ja tarjoaa Airflowlle erittäin joustavan ympäristön, jossa tehtäviä voidaan suorittaa konttiteknologian avulla. Tämä mahdollistaa skaalautuvien, hajautettujen työnkulkujen toteuttamisen ja tarjoaa resurssien optimointimahdollisuuksia. Kubernetes on erityisen hyödyllinen, kun Airflowin tehtävät vaativat erikoistuneita resursseja tai jos halutaan hyödyntää pilviympäristön joustavuutta. Kuitenkin, Kubernetesin käyttäminen Airflowin kanssa voi olla monimutkaista, erityisesti tiimeille, jotka eivät ole perehtyneet Kubernetesin käsitteisiin.

Vaikka Kubernetes tarjoaa erinomaisen skaalautuvuuden, sen käyttöönotto ja ylläpito voivat olla vaativia. Ensinnäkin, Kubernetes klusterin ylläpito tuo mukanaan kustannuksia ja operatiivista monimutkaisuutta. Podien luominen ja tuhoaminen jatkuvasti voi lisätä kustannuksia, erityisesti jos ne eivät ole optimaalisesti hallittuja. Lisäksi, koska podit ovat kertakäyttöisiä, pysyvän datan tallentaminen voi olla haasteellista, vaikka pysyvien volyymien käyttö voi tarjota ratkaisun – mutta tämä taas tuo lisäkompleksisuutta. Verkko- ja käynnistysaikaviiveet ovat myös asioita, jotka voivat hidastaa suorituskykyä erityisesti, jos kuvia on ladattava rekisteristä.

DaskExecutor, joka käyttää Daskia, on erinomainen vaihtoehto erityisesti datatieteelle ja koneoppimiselle. Daskin joustavat ja skaalautuvat laskentamaahdollisuudet tekevät siitä ihanteellisen työkalun, kun tarvitaan suuria laskentatehoja. DaskExecutor voi jakaa laskentatehtäviä useille työntekijöille, mikä voi merkittävästi nopeuttaa koneoppimismallien kouluttamista tai datan esikäsittelyä. Daskin tiivis integrointi Python-ekosysteemiin tekee siitä erityisen houkuttelevan Python-pohjaisille työkuormille. Kuitenkin, Daskin käyttöönotto voi olla monimutkaista, erityisesti jos tiimillä ei ole aiempaa kokemusta Daskin hallinnasta tai jos infrastruktuuria ei ole vielä määritelty.

Kuten KubernetesExecutor, DaskExecutor tuo mukanaan myös omat haasteensa. Yksi suurimmista haasteista on ympäristön ja riippuvuuksien hallinta: Daskin työntekijöiden on oltava yhteensopivia, jotta laskentatehtävien jakaminen sujuu ongelmitta. Jos Dask-klusteria käytetään myös muihin laskentatehtäviin, voi syntyä resurssikilpailua, mikä voi heikentää suorituskykyä. Verkkoyhteyksien latenssi ja I/O-keskeisten tehtävien suorittaminen voivat myös tuoda lisähaasteita, erityisesti jos Daskin ja Airflow’n välinen tiedonsiirto ei ole optimoitu.

Kubernetes Local Executor on suhteellisen uusi vaihtoehto, joka tarjoaa yhdistelmän paikallista ja Kubernetes-pohjaista suoritusta. Tämä mahdollistaa Airflowin tehtävien suorittamisen joko paikallisesti tai Kubernetes-ympäristössä, riippuen tehtävän tarpeista ja resursseista. Tämä on erityisen kätevää, jos halutaan kehittää ja testata Airflow DAG:eja paikallisesti ilman, että niitä tarvitsee jatkuvasti käyttää Kubernetes-klusterissa. Kubernetes Local Executor voi myös olla hyödyllinen tuotantoympäristössä, kun tarvitaan skaalautuvuutta ja vikasietoisuutta. Kuitenkin, sen käyttöön liittyy riskejä, kuten suurentunut kuormitus tietokannan suhteen, erityisesti jos Kubernetes Local Executor luo tehtäviä samanaikaisesti. Tämä voi johtaa kisaehtoihin, jolloin synkronointimekanismit voivat olla tarpeen.

Yhteenvetona voidaan todeta, että valinta KubernetesExecutorin, DaskExecutorin ja Kubernetes Local Executorin välillä riippuu pitkälti siitä, millaisia tehtäviä suoritetaan ja millaiset resurssit ovat käytettävissä. Kubernetes tarjoaa erinomaisen skaalautuvuuden ja joustavuuden, mutta sen käyttöön liittyy merkittäviä operatiivisia haasteita. Dask taas on erityisen hyvä vaihtoehto dataintensiivisiin työkuormiin, mutta sen käyttöönotto ja hallinta voivat olla vaativia. Kubernetes Local Executor puolestaan tuo joustavuutta paikalliseen kehitykseen ja tuotantoon, mutta sen käyttöön liittyy omat riskinsä. On tärkeää tehdä huolellinen arviointi siitä, mikä executor vastaa parhaiten organisaation tarpeita, resursseja ja käytettävissä olevaa infrastruktuuria.

Miten automatisoida koodi Airflow DAG:lla ja mitä huomioida

Airflow on yksi tehokkaimmista työkaluista, kun halutaan automatisoida toistuvia tehtäviä ja orkestroituja työnkulkuja. Sen avulla voidaan luoda ja ajaa DAG:ja (Directed Acyclic Graphs), jotka jakavat monimutkaiset prosessit useisiin hallittaviin osiin. Tässä käsittelemme, kuinka voit luoda yksinkertaisen DAG:n, joka lataa päivittäisiä kuvia NASA:n API:sta ja ilmoittaa, kun kuva on ladattu onnistuneesti. Tämä esimerkki on suunniteltu auttamaan sinua ymmärtämään Airflow:n perusperiaatteet ja sen tarjoamat mahdollisuudet.

Aloitetaan ensin importoimalla tarvittavat kirjastot Pythonissa ja Airflow:ssa, jotka mahdollistavat DAG:n luomisen. Tässä tapauksessa käytämme samoja kirjastoja, joita hyödynsimme Jupyter Notebookissa, kuten requests, pathlib, ja datetime, sekä Airflow:n operaatioita, kuten PythonOperator ja BashOperator. Muista määritellä myös muuttuja, joka kertoo DAG:n omistajan:

python
import json
import pathlib import airflow import requests import requests.exceptions as request_exceptions from datetime import date from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from airflow.decorators import task from datetime import datetime, timedelta dag_owner = 'Kendrick'

Seuraavaksi luomme Python-funktion, joka kutsuu NASA:n Planetary API:a ja lataa päivän kuvan paikalliselle levylle. Kuvan nimi sisältää nykyisen päivän päivämäärän:

python
def _get_pictures(): pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True) api_key = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxx' url = f'https://api.nasa.gov/planetary/apod?api_key={api_key}' response = requests.get(url).json() today_image = response['hdurl']
with open(f'todays_image_{date.today()}.png', 'wb') as f:
f.write(requests.get(today_image).content)

Seuraava vaihe on määrittää DAG:n oletusparametrit. Oletusparametreja käytetään automaattisesti kaikissa DAG:ssa määritellyissä tehtävissä, ellei niitä erikseen muuteta. Tässä määritämme, että tehtävä voidaan suorittaa uudelleen, jos se epäonnistuu, ja että uudelleenyritykselle annetaan 5 minuutin viive:

python
default_args = {
'owner': dag_owner, 'depends_on_past': False, 'retries': 2, 'retry_delay': timedelta(minutes=5) }

DAG:n luominen tapahtuu käyttämällä with-avainsanaa, joka varmistaa, että Airflow ymmärtää, milloin DAG luodaan ja mitä parametreja käytetään. Tässä määritämme, että DAG:n suoritus aloitetaan tänään ja että se suoritetaan päivittäin:

python
with DAG(dag_id='download_ASOD_image', default_args=default_args, description='download and notify', start_date=airflow.utils.dates.days_ago(0), schedule_interval='@daily', catchup=False, tags=['None']): get_pictures = PythonOperator( task_id="get_pictures", python_callable=_get_pictures ) notify = BashOperator( task_id="notify", bash_command='echo "Image for today has been added!"' ) get_pictures >> notify

Tässä vaiheessa olemme määrittäneet DAG:n, joka suorittaa kaksi tehtävää peräkkäin: lataa kuvan NASA:n API:sta ja lähettää ilmoituksen, kun kuva on ladattu. DAG:in suoritus on yksinkertainen, mutta se voi helposti monimutkaistua, jos tarvitaan rinnakkaisia tehtäviä.

DAG:n luominen voi aluksi näyttää pelottavalta, mutta suurin osa koodista toistuu. Monet kentät, kuten dag_id, schedule_interval, ja start_date, auttavat Airflow:ta ymmärtämään, miten DAG:n tulisi toimia. Kun luot omaa työnkulkua, on tärkeää miettiä, mitkä tehtävät voidaan suorittaa rinnakkain ja mitkä vaativat sekvenssia.

On myös tärkeää ymmärtää, että Airflow tarjoaa monia mahdollisuuksia tehostaa työnkulkuja, kuten eri aikaväleillä ajoa, riippuvuuksia, ja tehtävien ajamista useilla koneilla samanaikaisesti. DAG:n avulla voit jakaa monimutkaisimmat prosessit hallittavampiin, erillisiin tehtäviin, joita voidaan suorittaa ja seurata itsenäisesti.

Muista, että on suositeltavaa dokumentoida DAG:it huolellisesti. Määritä aina kuvaus ja lisää kommentteja koodiin, jotta muut kehittäjät voivat ymmärtää, mitä kukin osa tekee. Lisäksi on tärkeää määrittää aloituspäivämäärä ja suunnitella, kuinka usein DAG suoritetaan.

Yksi tärkeä osa Airflow:ta on catchup-parametri. Tämä määrittää, suorittaako Airflow aiempia väliin jääneitä tehtäviä, jos DAG:ia ei ole suoritettu tietyn ajan. Jos catchup on asetettu arvoon True, Airflow suorittaa kaikki jääneet tehtävät aloituspäivämäärän jälkeen. Jos se on asetettu False, vain nykyinen tehtävä suoritetaan. Tämä parametri on tärkeä, sillä sen avulla voit estää ylimääräisten tehtävien ajamisen, jotka voivat hidastaa järjestelmää.

Endtext.

Kuinka luoda ja hallita koneoppimismallia orkestrointiputkessa Airflow'n avulla?

Pre-prosessointivaiheessa käsittelemme CSV-tiedostot, jotka sisältävät elokuvien ja käyttäjien vektoriesityksiä. Tämä vaihe voi olla monivaiheinen ja sisältää useita välituotteita, jotka varmistavat, ettei putken keskeytyksessä tarvitse käyttää liikaa aikaa toimenpiteiden jälkikäsittelyyn. Esimerkiksi, tiedon lataaminen S3:sta ja sen käsittely tehdään seuraavasti:

python
generate_data_frames = PythonOperator( task_id="generate_data_frames", python_callable=_generate_data_frames ) def _generate_data_frames(ti, **kwargs): s3 = __get_s3_hook() bucket = __get_recsys_bucket() ratings_object = ti.xcom_pull(key="ratings.csv", task_ids="fetch_dataset") movies_object = ti.xcom_pull(key="movies.csv", task_ids="fetch_dataset") ratings_csv = s3.download_file(key=ratings_object, bucket_name=bucket) movies_csv = s3.download_file(key=movies_object, bucket_name=bucket) files_to_upload = _process_csv(ratings_csv=ratings_csv, movies_csv=movies_csv) for f, shape in files_to_upload: hash_id = ti.xcom_pull(key='hash_id', task_ids="data_is_new") s3_dst = f"{hash_id}/{Path(f).name}" s3.load_file(filename=f, key=s3_dst, bucket_name=bucket, replace=True) ti.xcom_push(key=Path(f).name, value=s3_dst)
ti.xcom_push(key=f"{Path(f).name}.vector_length", value=shape[1]-1)

Tässä vaiheessa tiedot ladataan S3:sta ja käsitellään niin, että myöhemmissä vaiheissa tiedot voidaan ladata ilman lisämuokkauksia. Lisäksi XCom-arvoja käytetään objektin sijaintien ja metatietojen välittämiseen, jotta myöhemmät vaiheet voivat hyödyntää tätä tietoa ilman erillisiä laskentatehtäviä.

Tiedon esikäsittelyn jälkeen voimme aloittaa KNN (K-lähimmän naapurin) ominaisuuksien luomisen. Tässä vaiheessa data siirretään erikoistuneeseen vektoritietokantaan, joka mahdollistaa suorat kyselyt samanlaisten kohteiden löytämiseksi. Tämä saavutetaan seuraavasti:

python
enable_vector_extension = PostgresOperator(
task_id="enable_vector_extension", postgres_conn_id=PG_VECTOR_BACKEND, sql="CREATE EXTENSION IF NOT EXISTS vector;", ) load_movie_vectors = PythonOperator( task_id="load_movie_vectors", python_callable=_load_movie_vectors, op_kwargs={'pg_connection_id': PG_VECTOR_BACKEND}, ) def _load_movie_vectors(ti): s3 = __get_s3_hook() bucket = __get_recsys_bucket() hash_id = __get_current_run_hash(ti) vector_length = ti.xcom_pull(key='movie_watcher_df.parquet.vector_length', task_ids='generate_data_frames') movies_ratings_object = f"{hash_id}/movie_watcher_df.parquet" movie_ratings_file = s3.download_file(key=movies_ratings_object, bucket_name=bucket) pg_hook = __get_pgvector_hook() def row_generator(df): for r in movie_ratings_df.rows(): yield (r[0], f"{list(r[1:])}") movie_ratings_df = pl.read_parquet(movie_ratings_file) pg_hook.insert_rows(table=f'"{hash_id}"', rows=(r for r in row_generator(movie_ratings_df)), target_fields=['movieId','vector'])

Tässä osassa pipelinea huomioimme sen, että vektorilaajennus on aktivoitava PostgreSQL-tietokannassa, minkä jälkeen data ladataan vektoritietokantaan, ja luomme väliaikaisen taulukon, jonka avulla mallin koulutusta varten tarvittavat tiedot voidaan nopeasti siirtää tuotantoon.

Viimeinen vaihe, joka liittyy syväoppimismallin koulutukseen, vaatii erityisiä laskentaresursseja, joita voidaan hallita Kubernetesin avulla. KubernetesPodOperator mahdollistaa työn kuormituksen jakamisen Kubernetes-klusterille, mikä tekee järjestelmästä joustavan ja tehokkaan.

python
run_hash = os.environ.get('RUN_HASH', None)
def main(): # lataa data konttiin # valmistele datasetti # kouluta malli # tallenna paikallisesti # lataa etävarastoon # puske XCom-arvot # Kubernetes-tyylinen suoritus with open("/airflow/xcom/return.json", "w") as f: f.write(f'"{model_destination}"') if __name__ == '__main__': main()

Tässä kohtaa Docker-kuva luodaan, joka sisältää kaikki tarvittavat Python-riippuvuudet ja suorittaa mallin koulutuksen. Kuvassa on myös määritelty kohtaa, johon XCom-arvot tallennetaan, ja se mahdollistaa mallin tulosten palauttamisen Airflow'n kautta.

Kun tämä on valmis, käytämme KubernetesPodOperatoria Airflow'ssa, joka mahdollistaa kuormien jakamisen ja suorituksen Kubernetes-klusterissa. Tämä mahdollistaa laskentatehtävien joustavan hallinnan ja erikoistuneiden resurssien hyödyntämisen, kuten erityisesti varustettuja laitteistoja, esimerkiksi GPU:ita.

Tämän kaltaisen koneoppimismallin orkestroinnin haasteita ovat erityisesti järjestelmän skaalautuvuus ja tehokkuus. Siksi on tärkeää varmistaa, että jokaista vaihetta voidaan seurata ja virheiden sattuessa järjestelmä pystyy toipumaan nopeasti ilman suuria viivästyksiä. Erityisesti suuriin datamääriin ja laskentatehtäviin liittyy monimutkaisia riippuvuuksia, joita hallitaan tehokkaasti XComien ja muiden Airflow:n mekanismien avulla.

Miten valita oikea repositoriostruktuuri ja hallita yhteyksiä Airflow-ympäristössä?

Repositoriostruktuuri on tärkeä, mutta usein aliarvioitu päätös kaikissa projekteissa. Sen merkitys korostuu erityisesti suurissa ja monimutkaisissa ympäristöissä, kuten Airflowin kaltaisissa työnkulkujen hallintajärjestelmissä. Repositoriostruktuuria valittaessa on tärkeää ottaa huomioon tiimisi toimintatavat, projektin laajuus sekä tavoite organisoida koodin jakaminen ja hallinta mahdollisimman tehokkaasti. Valinta ei ole lopullinen, sillä voit aina muuttaa rakennevalintaasi projektin kehittyessä.

Mono-repo on repositorioiden hallintamalli, jossa kaikki koodipohjat tallennetaan yhteen suureen repositorioon ja hallitaan yhtäaikaisesti versionhallinnan avulla. Tällöin Airflow-rakenteet, kuten käyttöliittymäosat, omat operoinnit ja tiimien julkaisemien DAG-tiedostojen kansiot, voivat olla kaikki samassa repositoriossa. Mono-repon etuna on, että kaikki tiimit voivat työskennellä saman koodikannan kanssa ilman tarpeettomia erillisiä rajapintoja. Tämä malli tukee hyvin "head of the history" -strategiaa, jossa tiimit työskentelevät aina koodin viimeisimmän version kanssa.

Kuitenkin, mono-repo ei ole ilman haasteita. Suuren repositorion lataaminen vie aikaa ja hallintakustannukset voivat kasvaa, erityisesti kun käytetään monimutkaisempia CI/CD-putkia mikropalveluarkkitehtuurissa. Näiden ongelmien ratkaiseminen vaatii huolellista suunnittelua ja tehokasta versionhallintaa.

Toisaalta multi-repo-malli jakaa Airflowin eri osat useisiin eri repositorioihin. Tässä mallissa voi olla erilliset repositoriot ydinkomponenteille, käytettäville lisäosille ja tiimikohtaisille DAG-tiedostoille. Multi-repo mahdollistaa tiimien itsenäisen kehityksen ja deploymisen, mutta samalla tuo tullessaan synkronointitarpeen, sillä eri repositorioiden välillä tarvitaan tiukkaa integraatiota ja testauksia ennen julkaisua.

Valinta mono-repon ja multi-repon välillä ei ole yksiselitteinen. Usein päädytään kompromissiin, jossa käytetään useita pieniä mono-repoja, jotta saadaan hyödynnettyä molempien mallien edut. Esimerkiksi yksi mono-repo voi hallita Airflowin perusasetuksia ja erillisiä toiminnallisuuksia, kun taas tiimikohtaiset repositoriot voivat vastata DAGien hallinnasta ja jakelusta. Tämä mahdollistaa eriytetyn työnkulun ilman liian suurta koodin yhteenliittämistä.

Kun repositoriostruktuuri on valittu, on aika keskittyä Airflowin tärkeisiin komponentteihin, kuten yhteys- ja muuttujahallintaan. Yhteys- ja muuttujaobjektit ovat keskeisessä roolissa, sillä ne määrittävät, miten Airflow kommunikoi muiden järjestelmien kanssa ja miten työnkulut käyttäytyvät ympäristökohtaisesti. Näitä objekteja voi hallita Airflowin WebUI:n kautta, mutta tämä menetelmä ei ole suositeltava tuotantokäytössä, koska se ei tarjoa skaalautuvaa ja turvallista tapaa hallita ympäristön konfiguraatioita.

Airflow tarjoaa mahdollisuuden määritellä yhteys- ja muuttujaobjektit ympäristömuuttujina. Näin ollen voit määrittää esimerkiksi tietokannan yhteysarvot suoraan ympäristömuuttujista, mikä helpottaa konfiguraation hallintaa ja automatisointia. Tämä kuitenkin vaatii huolellisuutta, sillä salaisten tietojen tallentaminen tavallisina tekstijonoina ei ole turvallista. Tämän vuoksi on suositeltavaa käyttää ulkoisia salaisuudenhallintajärjestelmiä, kuten AWS Secrets Manager, Google Cloud Secret Manager tai Hashicorp Vault, jotka tarjoavat turvallisemman tavan hallita salaisuuksia.

Yhteys- ja muuttujahallintaan liittyvät ratkaisut tulee valita tiiviissä yhteistyössä tietoturvatiimien kanssa, sillä väärin hallitut konfiguraatiot voivat vaarantaa koko järjestelmän turvallisuuden. Airflow tukee salaisuudenhallintajärjestelmiä, ja tämä voidaan konfiguroida suoraan Airflowin asetustiedostossa.

Kun yhteys- ja muuttujahallinta on suunniteltu ja turvallisuusnäkökohdat otettu huomioon, on aika valita, miten Airflow otetaan käyttöön organisaatiossa. Yksi suosituimmista menetelmistä on Kubernetes, joka yksinkertaistaa monimutkaisten sovellusten, kuten Airflowin, käyttöönottoa ja hallintaa. Kubernetes tarjoaa erinomaisen ympäristön Airflowin kaltaisten järjestelmien skaalaamiseen ja orkestrointiin. Jos organisaatiossa on jo kokemusta Kubernetesista tai jos se on käytettävissä palveluna, tämä on yleensä suositeltavin tapa.

Toinen vaihtoehto on käyttää virtuaalikoneita tai jopa fyysisiä palvelimia Airflowin asennukseen. Tämä lähestymistapa on haasteellisempi ja vaatii infrastruktuurin ja ohjelmiston hallintaan erillisten työkalujen, kuten Terraformin tai Ansibleen, käyttöä. Jos organisaatiossa ei ole Kubernetesin osaamista, tämä voi olla hyödyllinen vaihtoehto, mutta se vaatii enemmän työtä infrastruktuurin ja ohjelmistojen hallinnan osalta.

Yhteenvetona voidaan todeta, että oikean repositoriostruktureihin ja hallintamalleihin liittyvät päätökset ovat tärkeitä, mutta myös jatkuvasti kehittyviä valintoja. Tiimien ja projektin tarpeet voivat muuttua ajan myötä, ja on tärkeää valita sellainen malli, joka tukee joustavuutta ja helpottaa kehitystyötä pitkällä aikavälillä.