Dans le cadre du développement de plugins pour Airflow, il est essentiel de comprendre comment créer et configurer un tableau de bord de métriques personnalisé. Ce tableau de bord permet de visualiser les statistiques de succès et d'échec des exécutions des DAGs (Directed Acyclic Graphs) sur différentes périodes de temps. Nous allons passer par plusieurs étapes pour mettre en place ce tableau de bord en utilisant Flask, SQLAlchemy et la syntaxe de Jinja dans le cadre d'une application web basée sur Flask-AppBuilder.

Le répertoire du module plugin, nommé metrics_plugin, contient les éléments essentiels pour la création et l'affichage du tableau de bord. L'élément clé de cette structure est le fichier __init__.py, qui enregistre le plugin ainsi que la vue Flask associée. Le répertoire des templates héberge le code HTML nécessaire à l'affichage du tableau de bord, tandis que le fichier views/dashboard.py implémente le code backend de cette vue.

Mise en place de la vue backend

La première étape pour la création de notre vue backend consiste à configurer les routes et les fonctions qui interagiront avec la base de données d'Airflow. Pour cela, nous devons importer les modules nécessaires à l'exécution de notre tableau de bord dans views/dashboard.py. Parmi ces modules, on trouve BaseView et expose de Flask-AppBuilder, utilisés pour rendre les pages accessibles via une URL spécifiée, ainsi que SQLAlchemy pour effectuer les requêtes vers la base de données.

Un élément clé du développement est l'utilisation du décorateur has_access_view qui vérifie que l'utilisateur possède les autorisations nécessaires pour accéder à cette vue. De plus, le décorateur provide_session permet de fournir une session de base de données à la fonction qui gère la vue, pour exécuter des requêtes sur la base de données d'Airflow.

Voici un exemple de la configuration de la classe MetricsDashboardView, qui définit la route du tableau de bord dans l'application Flask :

python
class MetricsDashboardView(BaseView): """Une vue Flask-AppBuilder pour le tableau de bord des métriques""" default_view = "index" route_base = "/metrics_dashboard"

Dans ce code, nous définissons une classe MetricsDashboardView qui hérite de BaseView, une classe de Flask-AppBuilder. Le default_view désigne la fonction à exécuter lorsque l'utilisateur visite la route de base, et ici elle pointe vers la fonction index. Quant au route_base, il détermine l'URL que l'utilisateur devra visiter pour accéder au tableau de bord : /metrics_dashboard.

Exécution des requêtes et présentation des données

La fonction index est la principale fonction qui génère la vue du tableau de bord. Elle s'exécute lorsqu'un utilisateur visite la route /metrics_dashboard. Dans cette fonction, nous définissons les requêtes SQL nécessaires pour obtenir les métriques sur les exécutions des DAGs dans les dernières 24 heures, 7 jours et 30 jours.

Voici un exemple de requête SQL pour extraire ces informations :

python
dag_run_query = text(""" SELECT dr.dag_id, SUM(CASE WHEN dr.state = 'success' AND dr.start_date > {interval(1)} THEN 1 ELSE 0 END) AS "1_day_success", SUM(CASE WHEN dr.state = 'failed' AND dr.start_date > {interval(1)} THEN 1 ELSE 0 END) AS "1_day_failed", ... FROM dag_run AS dr JOIN dag AS d ON dr.dag_id = d.dag_id WHERE d.is_paused != true GROUP BY dr.dag_id """)

Cette requête récupère les informations sur les DAGs actifs en filtrant les exécutions réussies et échouées sur trois périodes (1 jour, 7 jours, 30 jours). Les résultats sont ensuite convertis en une liste de dictionnaires appelée dag_run_stats, qui sera utilisée pour rendre les données dans le modèle HTML via render_template.

Création du template HTML

Une fois que les données sont extraites de la base de données, il est nécessaire de les rendre visuellement accessibles à l'utilisateur via un tableau de bord interactif. Pour cela, nous créons un template HTML qui utilise la syntaxe de Jinja pour afficher les graphiques. Le fichier HTML de notre tableau de bord doit être placé dans le répertoire templates/dashboard.html.

Voici un extrait du code HTML de base pour afficher les métriques :

html
{% extends base_template %}
{% block title %} {{ title }} {% endblock %} {% block head_meta %} {{ super() }} {% endblock %} {% block content %} <div class="container"> <canvas id="successChart"></canvas>
<canvas id="failedChart"></canvas>
</div> {% endblock %}

Dans ce template, nous utilisons des éléments canvas pour afficher les graphiques des DAGs réussis et échoués. Ces graphiques seront rendus avec JavaScript, utilisant des bibliothèques comme Chart.js pour la visualisation.

Le code HTML est aussi responsable de l'intégration des éléments de style de Bootstrap CSS qui rendent l'interface réactive, afin que les graphiques s'ajustent en fonction des différentes tailles d'écran.

JavaScript pour la visualisation des graphiques

Afin de rendre les graphiques interactifs et dynamiques, il est nécessaire d'ajouter un bloc tail dans le template, où nous insérerons le JavaScript nécessaire pour générer les graphiques à partir des données passées par le backend :

html
{% block tail %} <script> const successData = {{ dag_run_stats['1_day_success'] }}; const failedData = {{ dag_run_stats['1_day_failed'] }}; const successChart = new Chart(document.getElementById("successChart"), { type: "bar", data: {
labels: ["1 Day Success", "7 Days Success", "30 Days Success"],
datasets: [{ label: "Success", data: successData, backgroundColor: "green" }] } }); const failedChart = new Chart(document.getElementById("failedChart"), { type: "bar", data: { labels: ["1 Day Failed", "7 Days Failed", "30 Days Failed"], datasets: [{ label: "Failed", data: failedData, backgroundColor: "red" }] } }); </script> {% endblock %}

Ce JavaScript utilise Chart.js pour afficher les statistiques sous forme de graphiques à barres, avec des données dynamiques insérées directement depuis le backend.

Autres considérations importantes

L'intégration de ce tableau de bord nécessite non seulement la gestion des données mais aussi la sécurité et la performance de l'application. Il est important de s'assurer que l'accès aux métriques soit réservé aux utilisateurs autorisés et que les requêtes SQL soient optimisées pour éviter toute surcharge du serveur. Par ailleurs, l'utilisation de la pagination et de la mise en cache des résultats peut être nécessaire pour améliorer la performance si le volume de données devient trop important.

Comment migrer vos flux de travail dans Airflow et gérer les environnements de production

Lorsqu'un projet Airflow se développe, il devient nécessaire de migrer des flux de travail d'un environnement à un autre, notamment lors de la transition d’un environnement de test (UAT) vers la production, ou lorsqu'il faut transférer des charges de travail vers une nouvelle instance Airflow pour des raisons de performance, de sécurité ou d'optimisation des ressources. Ce processus de migration est essentiel pour garantir la continuité des opérations tout en minimisant les risques d'erreur.

Après avoir validé vos flux de travail dans l'environnement UAT, il devient primordial de les promouvoir vers la production. À ce moment-là, il est conseillé de désactiver les anciens flux de travail, souvent hérités de versions précédentes, avant de déployer les nouveaux DAGs Airflow dans votre environnement de production. Ce processus garantit une transition fluide, avec une gestion soignée des versions des flux de travail.

Planification de la migration entre environnements Airflow

Au fur et à mesure de la croissance des instances Airflow, il devient fréquent de devoir migrer des flux de travail vers de nouveaux environnements. Cette migration peut être motivée par un changement de fournisseur de services, ou par le besoin d'isoler les équipes et les charges de travail qui étaient précédemment regroupées dans une même instance. Contrairement à d'autres migrations de systèmes, passer d’une instance Airflow à une autre est une tâche relativement directe, mais cela demande tout de même une attention particulière pour garantir la réussite du processus.

L'identification des DAGs à migrer est la première étape cruciale. Une fois cette étape franchie, il est essentiel de repérer les objets associés nécessaires à leur bon fonctionnement, tels que les connexions, les variables et l'historique des exécutions précédentes. Une migration réussie repose sur la prise en compte de ces éléments.

Connexions et variables

Lors de la migration, les connexions et les variables d'un environnement à l'autre doivent être soigneusement examinées. Si vous utilisez un backend de secrets pour gérer ces objets, il sera nécessaire de consulter les documents spécifiques à ce service pour connaître la procédure de migration. Si des variables d’environnement sont utilisées, il est important de se référer à la documentation de votre gestionnaire de variables d'environnement, pour assurer la bonne migration des données.

Airflow propose également un moyen pratique d'exporter et d'importer des objets tels que les connexions et les variables via sa bibliothèque principale. L’utilisation du modèle suivant, qui consiste à exporter les objets d’une instance pour les importer dans une autre, permet de migrer ces données de manière relativement simple :

python
from airflow.models import Variable
from airflow.models import Connection required_connections = [] required_variables = [] all_variables = session.query(Variable).all() all_connections = session.query(Connection).all() for v in all_variables: if v.key in required_variables: session.add(v) session.commit() for c in all_connections: if c.conn_id in required_connections: session.add(c) session.commit()

Cette méthode peut être étendue à d’autres objets représentant des modèles de données au sein d'Airflow. Pour un petit nombre de connexions ou de variables, la migration manuelle via l'interface utilisateur d'Airflow reste également une option viable.

Migration des DAGs

Une fois les objets essentiels migrés et le code mis à jour dans un nouveau référentiel, il convient de déterminer la méthode la plus appropriée pour finaliser la migration et transférer les charges de travail. Pour les environnements où le nombre de DAGs est limité, la solution la plus simple consiste à désactiver le DAG dans l'environnement source, à modifier son code pour fixer la dernière exécution comme nouvelle date de début, puis à activer ce DAG dans la nouvelle instance Airflow après avoir poussé le code.

Dans le cas où le DAG ne possède pas de date de début ou ne peut pas en être attribuée, et si l'historique des exécutions précédentes ne peut pas être perdu, il devient nécessaire de migrer l'historique des exécutions via la base de données de métadonnées, en utilisant la requête suivante pour récupérer l’état actuel des exécutions d'un DAG et le réinsérer dans la nouvelle instance :

python
from airflow.models import DagRun
dag_runs = session.query(DagRun).filter(DagRun.dag_id == "dag_id")

En réintégrant ces données dans la nouvelle instance, vous garantissez la continuité des états opérationnels des deux instances.

Les défis de la migration

Migrer des flux de travail entre différents outils et environnements fait partie des tâches régulières d’un ingénieur en données. Bien que la migration entre deux instances Airflow puisse sembler simple à première vue, elle demande une planification minutieuse pour éviter toute perte de données et garantir que le système continue de fonctionner sans interruptions majeures.

Un des aspects souvent négligés lors de la migration est la gestion des dépendances entre les différents composants du DAG et la cohérence des données. Assurez-vous que toutes les variables, connexions et autres objets partagés sont correctement transférés et configurés dans le nouvel environnement. Si des services externes sont utilisés, comme des bases de données ou des systèmes de gestion de secrets, il est crucial de bien comprendre comment ces services interagissent avec Airflow et d'anticiper tout ajustement nécessaire lors de la migration.

Enfin, gardez à l'esprit que la migration n'est pas une activité ponctuelle, mais un processus continu qui doit être suivi de près. Il est toujours plus sûr de passer un peu plus de temps à planifier avant de procéder à l'exécution, plutôt que de devoir réparer des erreurs une fois la migration effectuée.