**Hadoop/Spark avec Dataproc sur Google Cloud**
Dataproc est l'option de référence pour exécuter un cluster Hadoop sur Google Cloud Platform, et c'est BIEN plus simple que de gérer un cluster à la main. Hadoop appartenant à l'écosystème big data, son coût est souvent à la hauteur de ses ambitions. Pour alléger la facture, beaucoup de clients utilisent des instances préemptibles comme nœuds worker dans leur cluster.
Pour les non-initiés, les instances préemptibles sont des machines virtuelles qui exploitent les ressources de calcul excédentaires dont dispose un fournisseur cloud à un instant donné, et qui peuvent être récupérées dès que ces ressources sont sollicitées ailleurs. Il faut donc les considérer comme des machines virtuelles temporaires. Leur disponibilité fluctue selon les ressources de calcul disponibles, et elles peuvent être réclamées à tout moment, avec peu ou pas de préavis. Pour compenser cet inconvénient, leur tarif est très fortement réduit, jusqu'à 80 % selon Google, par rapport à des instances de machines virtuelles classiques.
Ces instances sont fréquemment rattachées à des clusters Dataproc pour réduire significativement les coûts ou ajouter de la capacité de traitement supplémentaire en cas de besoin.
Un scénario qui revient régulièrement chez DoiT International : un client dispose ou souhaite mettre en place un cluster Hadoop pour exécuter des jobs Spark sur de longues durées (heures, voire jours), tout en pouvant être dimensionné selon la charge ou facturé au plus juste. Dans la majorité des cas, Dataproc avec instances préemptibles est l'option recommandée, par nous comme par Google.
Une question revient chez les clients les plus prudents : comment Dataproc se comporte-t-il lorsque des instances préemptibles sont récupérées par Google, en particulier sur des jobs très longs traitant des données critiques ?
Pour y répondre précisément, j'ai conçu un scénario, ou plutôt une expérience, simulant ce comportement dans un environnement de batch en production afin d'observer la réaction du service Hadoop managé de GCP.
**Le checkpointing dans Spark**
Édit : peu après la publication, Google m'a contacté pour m'informer de l'existence d'un mode natif Dataproc, en bêta au moment de la rédaction, qui assure nativement la même fonctionnalité. Je l'ai documenté en fin d'article, dans la section post-conclusion.
Commençons par un peu de contexte sur la manière dont Spark gère le déplacement des workloads entre différentes machines virtuelles ou nœuds, susceptibles d'exister pour une opération mais plus pour la suivante.
Spark s'appuie sur un concept appelé checkpointing qui consiste, à très haut niveau, à écrire l'état actuel d'un RDD ou d'une DataFrame (un dataset au sein de Spark) sur disque. C'est utile car cela pose un marque-page dans votre job : si une machine virtuelle devient défaillante (plantage ou indisponibilité), une autre instance peut reprendre le travail à partir du dernier marque-page.
Concrètement, si un cluster utilise des instances préemptibles et que l'une d'elles est récupérée, l'existence d'un checkpoint permet au traitement de reprendre quasiment sans interruption depuis ce checkpoint, sur un autre nœud worker.
Voici un exemple rapide pour mettre en place un checkpoint avec quelques lignes de PySpark. On définit le répertoire de checkpoint, on sélectionne une colonne d'une DataFrame, puis on checkpointe le résultat avant de l'écrire dans un fichier parquet sur HDFS :
spark.sparkContext.setCheckpointDir('gs://bucket/checkpoints')events_df = df.select('event_type')
events_df.checkpoint()
events_df.write.format("parquet").save("/results/1234/")
Imaginons maintenant un cluster Dataproc avec un master, 2 nœuds worker et 2 nœuds worker en instances préemptibles, exécutant le code ci-dessus. Pendant l'exécution, si l'un des nœuds worker préemptibles est récupéré par Google au moment où la dernière ligne de l'exemple écrit les résultats, Spark détecte la défaillance du nœud et replanifie la tâche sur un autre nœud. Au lieu de tout recommencer depuis le début, l'exécution reprend à la dernière ligne, car un checkpoint a été enregistré juste avant.
L'exemple est volontairement simple, mais lorsque vous avez un job Spark comportant plus de 100 opérations, dont chacune peut prendre 5 minutes, ce mécanisme peut sauver la mise, surtout avec un parc important d'instances préemptibles dont plusieurs peuvent disparaître entre deux opérations.
**L'expérience d'un job Spark longue durée sur Dataproc**
Si tout cela paraît extraordinaire en théorie, on trouve très peu de validations concrètes documentées en ligne sur Dataproc, ce qui explique en grande partie l'incertitude des clients qui envisagent Dataproc pour leurs workloads big data.
Version TL;DR : oui, ça fonctionne et le job va jusqu'au bout. Édit : voir la post-conclusion pour une approche encore meilleure, en mode natif.
Voici comment j'ai configuré l'environnement, afin que d'autres puissent reproduire le test et le valider :
Pour tester ce comportement sans laisser passer la moindre action de préemption, j'ai monté un environnement de test exécutant en théorie un job Spark 24h/24 et 7j/7, afin d'observer ce qui se passe lorsqu'une VM préemptible est récupérée et/ou remplacée.
Cet environnement de test se compose d'un cluster Dataproc avec un nœud master, 2 nœuds worker et 2 nœuds worker en instances préemptibles, d'un job Spark batch (non streaming) qui dure un peu plus de 30 minutes, et d'un job Cloud Scheduler qui lance le job Spark toutes les 30 minutes pour rester aussi proche que possible d'une exécution 24/7. J'ai opté pour des vCPU N1 sur les nœuds worker car ils sont plus anciens et avaient plus de chances d'être récupérés ; il s'avère que les instances E2 sont bien moins préemptées que les N1. Ce job Spark est très basique : j'extrais un dataset ouvert depuis BigQuery et j'enchaîne quantité d'opérations coûteuses aléatoires (jointures, jointures croisées, agrégations sur échantillons aléatoires, etc.) pour simuler un véritable traitement de données qui répartit la charge sur l'ensemble des nœuds du cluster.
Pour repérer les moments où une instance préemptible était récupérée et/ou remplacée, j'ai créé une métrique personnalisée sur le managed instance group que Dataproc crée pour le cluster (le nom est généralement dataproc-cluster-
Exemple de métrique sur un dashboard montrant la taille du managed instance group
Une fois cette métrique validée à l'aide des logs de l'instance et du nœud master du job, j'ai lancé l'exécution sur le long week-end de Labor Day aux États-Unis, prolongé jusqu'au mardi suivant, pour intégrer un week-end complet et une journée de travail chargée à l'analyse. À noter : les instances préemptibles fonctionnent au maximum 24 heures d'affilée et sont récupérées au bout de 24 heures, avec une tentative de redémarrage à ce moment-là.
Sur cette période, le graphique a affiché plusieurs creux et hausses correspondant aux récupérations et aux remplacements d'instances (voir le visuel ci-dessus). Quelques faux positifs sont apparus lorsque les jobs étaient encore en phase de démarrage ou en pleine lecture au moment des préemptions, illustrant bien le comportement attendu mais peu adaptés à une présentation dans cet article. En revanche, un cas d'école parfaitement lisible s'est produit : juste après une opération de checkpoint, et en plein milieu de l'écriture d'une DataFrame sur HDFS, l'instance préemptible chargée de l'écriture a été récupérée puis redémarrée, laissant derrière elle des logs très clairs sur son comportement.
L'exception levée, dans le log du master Dataproc, était la suivante :
20/09/08 20:05:36 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 18.0 in stage 21.0 (TID 1490, cluster-4b46-sw-41l5.c.project-id.internal, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:288)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
C'était exactement ce que je cherchais : une machine récupérée pendant une opération d'écriture, et fort à propos juste après une opération de checkpoint. Après cette exception, plusieurs séries d'avertissements ont signalé l'impossibilité de communiquer avec le nœud récupéré (executor 2) et d'y planifier du travail, confirmant la préemption :
20/09/08 20:06:41 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 2 for reason Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 2 on cluster-rand-sw-4c7x.c.project-id.internal: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 73.0 in stage 63.0 (TID 9855, cluster-rand-sw-4c7x.c.project-id.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 74.0 in stage 63.0 (TID 9860, cluster-rand-sw-4c7x.c.project-id.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.ExecutorAllocationManager: Attempted to mark unknown executor 2 idle
Après quelques secondes et plusieurs séries de ces exceptions, le job a poursuivi son exécution normalement et a renvoyé un statut de réussite à Dataproc. J'ai vérifié que les données avaient bien été écrites dans le dossier de destination sur HDFS, confirmant que l'opération s'était terminée correctement, comme prévu.
En consultant les logs de la nouvelle instance préemptible qui avait remplacé celle récupérée, j'ai constaté qu'elle avait repris exactement là où le traitement s'était arrêté. À noter : il s'agit d'un cas aléatoire mais idéal, car la tâche a été replanifiée sur l'instance de remplacement plutôt que sur un autre nœud worker, ce qui ne se produit pas la plupart du temps (1 cas sur 9 dans cette expérience). Voici les entrées de log de la nouvelle instance :
{
"insertId": "j96wpu5rh8p09edb5",
"jsonPayload": {
"message": "src: /10.128.0.9:55928, dest: /10.128.0.8:9866, bytes: 134217728, op: HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_-1208291363_17, offset: 0, srvID: 3b9b065f-15f4-49d7-a9ad-a5a2136e4ce1, blockid: BP-2070054281-10.128.0.10-1599249511859:blk_1073816330_75506, duration(ns): 556814753645",
"class": "org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace",
"filename": "hadoop-hdfs-datanode-cluster-rand-w-1.log"
},
"resource": {
"type": "cloud_dataproc_cluster",
"labels": {
"project_id": "project-id",
"cluster_uuid": "3de29175-f051-4aa5-9dee-e9925bfabec2",
"region": "us-central1",
"cluster_name": "cluster-rand"
}
},
"timestamp": "2020-09-08T19:06:15.035Z",
"severity": "INFO",
"labels": {
"compute.googleapis.com/resource_id": "5331347012694516446",
"compute.googleapis.com/resource_name": "cluster-rand-w-1",
"compute.googleapis.com/zone": "us-central1-a"
},
"logName": "projects/project-id/logs/hadoop-hdfs-datanode",
"receiveTimestamp": "2020-09-08T19:06:21.477492444Z"
}
{
"insertId": "j96wpu5rh8p09edb6",
"jsonPayload": {
"class": "org.apache.hadoop.hdfs.server.datanode.DataNode",
"filename": "hadoop-hdfs-datanode-cluster-rand-w-1.log",
"message": "PacketResponder: BP-2070054281-10.128.0.10-1599249511859:blk_1073816330_75506, type=LAST_IN_PIPELINE terminating"
},
"resource": {
"type": "cloud_dataproc_cluster",
"labels": {
"project_id": "project-id",
"cluster_uuid": "3de29175-f051-4aa5-9dee-e9925bfabec2",
"region": "us-central1",
"cluster_name": "cluster-rand"
}
},
"timestamp": "2020-09-08T19:06:15.035Z",
"severity": "INFO",
"labels": {
"compute.googleapis.com/resource_id": "5331347012694516446",
"compute.googleapis.com/zone": "us-central1-a",
"compute.googleapis.com/resource_name": "cluster-rand-w-1"
},
"logName": "projects/project-id/logs/hadoop-hdfs-datanode",
"receiveTimestamp": "2020-09-08T19:06:21.477492444Z"
}
Aucune entrée, parmi tous les logs disponibles, ne montrait explicitement la relecture depuis le répertoire de checkpoint. L'exécution a néanmoins repris exactement au point où elle s'était arrêtée, puis a mené à bien les opérations restantes du job.
**Conclusion**
Pour conclure cette expérience, Dataproc a géré la récupération et le remplacement d'un nœud d'instance préemptible exactement comme attendu, dans la lignée de la conception de Hadoop et de Spark. Les Engineers de Google ont accompli un travail remarquable pour que Dataproc absorbe sans friction les défaillances des nœuds worker, autrement dit la récupération sans préavis d'une instance préemptible.
**Post-conclusion : Dataproc Enhanced Flexibility Mode**
Après la publication de cet article, un Googler m'a transmis les détails d'un projet Google en bêta directement lié au sujet abordé ici, et dont je n'avais pas connaissance au moment de la rédaction.
Google propose un mode pour Dataproc spécifiquement conçu pour les clusters susceptibles de perdre leurs nœuds worker, autrement dit les instances worker préemptibles. Ce mode redistribue les données dans le système de fichiers de manière à optimiser le comportement face aux défaillances des nœuds worker.
Je recommande vivement de s'y intéresser : il vient renforcer ma conclusion ci-dessus et montre que Google optimise discrètement ce processus.
Les détails du produit sont disponibles ici : https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/flex