**Hadoop/Spark mit Dataproc auf der Google Cloud**
Dataproc ist die erste Wahl für den Betrieb eines Hadoop-Clusters auf der Google Cloud Platform – und definitiv deutlich einfacher, als ein Cluster manuell zu verwalten. Da Hadoop zum Big-Data-Ökosystem gehört, hat es oft auch ein entsprechend hohes Preisschild. Um diese Kosten zu senken, setzen viele Kunden Preemptible Instances als Worker Nodes im Cluster ein.
Für alle, die damit noch nicht vertraut sind: Preemptible Instances sind virtuelle Maschinen, die auf den überschüssigen Rechenressourcen eines Cloud-Anbieters laufen und zurückgefordert werden können, sobald diese Ressourcen anderswo benötigt werden. Man kann sie also als temporäre VMs betrachten. Sie sind nicht garantiert verfügbar und können jederzeit mit kaum oder gar keiner Vorwarnung zurückgenommen werden. Als Ausgleich dafür sind sie laut Google bis zu 80 % günstiger als reguläre VM-Instanzen.
Solche Instanzen werden häufig an Dataproc-Cluster angehängt, um Kosten deutlich zu senken oder bei Bedarf zusätzliche Verarbeitungskapazität bereitzustellen.
Ein Szenario, das uns bei DoiT International häufig begegnet: Ein Kunde hat ein bestehendes Hadoop-Cluster oder benötigt ein neues, auf dem Spark-Jobs über lange Zeiträume laufen (Stunden oder sogar Tage), das aber gleichzeitig mit der Last skalieren oder so kostengünstig wie möglich betrieben werden soll. In den meisten Fällen sind Dataproc mit Preemptible Instances unsere und Googles empfohlene Lösung.
Eine Frage, die uns risikoaverse Kunden immer wieder stellen, lautet: Wie geht Dataproc damit um, wenn Google Preemptible Instances zurückfordert – insbesondere bei sehr lang laufenden Jobs, die geschäftskritische Daten verarbeiten?
Um genau das zu beantworten, habe ich ein Szenario aufgesetzt, das diese Situation in einer produktiven Batch-Load-Umgebung simuliert und zeigt, wie sich der Managed-Hadoop-Service von GCP verhält.
**Spark Checkpointing**
Nachtrag: Kurz nach Veröffentlichung dieses Artikels hat mich Google darauf hingewiesen, dass es einen nativen Dataproc-Modus gibt, der (zum Zeitpunkt des Schreibens) als Beta verfügbar ist und genau diese Funktionalität nativ abdeckt. Ich habe ihn am Ende des Artikels im Abschnitt nach dem Fazit dokumentiert.
Vorab etwas Hintergrund dazu, wie Spark workloads zwischen verschiedenen virtuellen Maschinen oder Nodes verschiebt, die für eine Operation verfügbar sind, für die nächste aber möglicherweise nicht mehr.
Spark kennt das Konzept des Checkpointing. Vereinfacht gesagt wird dabei der aktuelle Zustand eines RDD oder DataFrame (also eines Datensatzes innerhalb von Spark) auf die Festplatte geschrieben. Das ist nützlich, weil so ein "Lesezeichen" im Job entsteht: Wird eine VM instabil (sie stirbt oder ist anderweitig nicht mehr erreichbar), kann eine andere Instanz ab dem letzten Lesezeichen weiterarbeiten.
Wird in einem Cluster mit Preemptible Instances eine Instanz zurückgefordert, sorgt ein vorhandener Checkpoint dafür, dass die Verarbeitung weitgehend unterbrechungsfrei auf einem anderen Worker Node ab diesem Checkpoint fortgesetzt wird.
Hier ein kurzes Beispiel, wie sich ein Checkpoint mit etwas PySpark-Code einrichten lässt. Der Code setzt das Checkpoint-Verzeichnis, wählt eine Spalte aus einem DataFrame aus und legt einen Checkpoint an, bevor das Ergebnis als Parquet-Datei in HDFS geschrieben wird:
spark.sparkContext.setCheckpointDir('gs://bucket/checkpoints')events_df = df.select('event_type')
events_df.checkpoint()
events_df.write.format("parquet").save("/results/1234/")
Angenommen, Sie haben ein Dataproc-Cluster mit einem Master, 2 Worker Nodes und 2 Preemptible-Worker-Nodes, auf dem dieser Code läuft. Wird während der letzten Zeile, die die Ergebnisse schreibt, einer der Preemptible-Worker-Nodes von Google zurückgefordert, erkennt Spark den Ausfall und plant die Aufgabe auf einem anderen Node neu ein. Statt von vorn zu beginnen, setzt Spark dank des kurz zuvor gesetzten Checkpoints direkt bei der letzten Zeile wieder ein.
Das ist ein sehr einfaches Beispiel. Bei einem Spark-Job mit über 100 Operationen, die jeweils 5 Minuten dauern können, ist Checkpointing ein echter Lebensretter – besonders bei einer großen Flotte von Preemptible Instances, von denen jederzeit welche zwischen den Operationen verschwinden können.
**Das Experiment: Long-Running Spark Job auf Dataproc**
So überzeugend das in der Theorie klingt: Online findet sich erstaunlich wenig praktische Validierung dieses Verhaltens auf Dataproc. Genau deshalb herrscht bei Kunden, die Dataproc für ihre Big-Data-workloads in Betracht ziehen, oft große Unsicherheit.
TL;DR: Ja, es funktioniert, und der Job läuft sauber bis zum Ende durch. Nachtrag: Bitte beachten Sie den Abschnitt nach dem Fazit für Hinweise auf eine noch bessere, native Methode.
So habe ich die Umgebung aufgebaut, damit andere sie zur Validierung nachstellen können:
Damit mir keine möglichen Preemptible-Aktionen entgehen, habe ich eine Testumgebung eingerichtet, die theoretisch rund um die Uhr einen Spark-Job ausführt, um zu beobachten, was passiert, wenn eine Preemptible-VM zurückgefordert und/oder ersetzt wird.
Diese Testumgebung besteht aus einem Dataproc-Cluster mit einem Master Node, 2 Worker Nodes und 2 Preemptible-Worker-Nodes, einem Batch-Spark-Job (kein Streaming), der etwas über 30 Minuten läuft, sowie einem Cloud-Scheduler-Job, der den Spark-Job alle 30 Minuten startet, sodass er möglichst nahe an 24/7 läuft. Für die Worker Nodes habe ich N1-vCPUs gewählt, weil sie älter sind und mit höherer Wahrscheinlichkeit häufiger zurückgefordert werden – tatsächlich werden E2-Instanzen deutlich seltener zurückgefordert als N1. Der Spark-Job ist sehr einfach gehalten: Ich habe einen offenen Datensatz aus BigQuery geladen und darauf zahlreiche zufällige, rechenintensive Operationen ausgeführt – Joins, Cross Joins, zufällige Sample-Aggregationen usw. – um eine echte Datenverarbeitung zu simulieren, die die Last über alle Nodes des Clusters verteilt.
Um zu erfassen, wann eine Preemptible Instance zurückgefordert und/oder ersetzt wurde, habe ich eine benutzerdefinierte Metrik für die von Dataproc erzeugte Managed Instance Group des Clusters angelegt (der Name lautet typischerweise dataproc-cluster-
Beispielmetrik im Dashboard mit der Größe der Managed Instance Group
Sobald die Metrik anhand der Instanz- und Master-Node-Logs validiert war, habe ich den Job über das lange Labor-Day-Wochenende in den USA und den darauffolgenden Dienstag laufen lassen, um sowohl ein komplettes Wochenende als auch einen geschäftigen Werktag in die Auswertung einzubeziehen. Wichtig zu wissen: Preemptible Instances laufen maximal 24 Stunden am Stück und werden nach Ablauf dieser 24 Stunden zurückgefordert, bevor ein Neustart versucht wird.
Im Verlauf dieses Zeitraums gab es mehrere Einbrüche und Anstiege im Diagramm, wenn Instanzen zurückgefordert und ersetzt wurden (siehe Grafik oben). Es traten einige "False Positives" auf, bei denen die Jobs gerade hochfuhren oder Lesevorgänge ausführten, während die Reclaim-Operationen stattfanden – das beabsichtigte Verhalten zeigte sich zwar, eignete sich aber nicht gut zur Veranschaulichung in diesem Artikel. Ein Lehrbuchbeispiel ergab sich allerdings, als unmittelbar nach einer Checkpoint-Operation und mitten beim Schreiben eines DataFrames nach HDFS genau die schreibende Preemptible Instance zurückgefordert und neu gestartet wurde – mit sehr aussagekräftigen Logs, die das Verhalten dokumentieren.
Die Exception, die im Master-Log von Dataproc auftauchte, sah so aus:
20/09/08 20:05:36 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 18.0 in stage 21.0 (TID 1490, cluster-4b46-sw-41l5.c.project-id.internal, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:288)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Genau danach hatte ich gesucht: Die Maschine wurde während eines Schreibvorgangs zurückgefordert – und das praktischerweise unmittelbar nach einer Checkpoint-Operation. Im Anschluss an diese Exception folgten mehrere Warnungen, die das Scheitern der Kommunikation und Aufgabenverteilung mit dem zurückgeforderten Node (Executor 2) zeigten und damit den Reclaim bestätigten:
20/09/08 20:06:41 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 2 for reason Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 2 on cluster-rand-sw-4c7x.c.project-id.internal: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 73.0 in stage 63.0 (TID 9855, cluster-rand-sw-4c7x.c.project-id.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 74.0 in stage 63.0 (TID 9860, cluster-rand-sw-4c7x.c.project-id.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.ExecutorAllocationManager: Attempted to mark unknown executor 2 idle
Nach wenigen Sekunden und einigen weiteren dieser Exceptions lief der Job ganz normal weiter und meldete am Ende einen erfolgreichen Job-Status an Dataproc zurück. Ich habe geprüft, dass die Daten korrekt in den Zielordner auf HDFS geschrieben wurden – die Operation wurde also wie beabsichtigt erfolgreich abgeschlossen.
Im Log der neuen Preemptible Instance, die die zurückgeforderte ersetzte, sah ich, dass sie genau dort weitermachte, wo die Verarbeitung auf der zurückgeforderten Instanz unterbrochen worden war. Hinweis: Das war ein zufälliges, aber besonders schönes Beispiel, da die Aufgabe auf die Ersatzinstanz statt auf einen anderen Worker Node verschoben wurde. In den meisten Fällen passiert das nicht (in diesem Experiment war es 1 von 9 Mal). Hier die Log-Einträge der neuen Instanz:
{
"insertId": "j96wpu5rh8p09edb5",
"jsonPayload": {
"message": "src: /10.128.0.9:55928, dest: /10.128.0.8:9866, bytes: 134217728, op: HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_-1208291363_17, offset: 0, srvID: 3b9b065f-15f4-49d7-a9ad-a5a2136e4ce1, blockid: BP-2070054281-10.128.0.10-1599249511859:blk_1073816330_75506, duration(ns): 556814753645",
"class": "org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace",
"filename": "hadoop-hdfs-datanode-cluster-rand-w-1.log"
},
"resource": {
"type": "cloud_dataproc_cluster",
"labels": {
"project_id": "project-id",
"cluster_uuid": "3de29175-f051-4aa5-9dee-e9925bfabec2",
"region": "us-central1",
"cluster_name": "cluster-rand"
}
},
"timestamp": "2020-09-08T19:06:15.035Z",
"severity": "INFO",
"labels": {
"compute.googleapis.com/resource_id": "5331347012694516446",
"compute.googleapis.com/resource_name": "cluster-rand-w-1",
"compute.googleapis.com/zone": "us-central1-a"
},
"logName": "projects/project-id/logs/hadoop-hdfs-datanode",
"receiveTimestamp": "2020-09-08T19:06:21.477492444Z"
}
{
"insertId": "j96wpu5rh8p09edb6",
"jsonPayload": {
"class": "org.apache.hadoop.hdfs.server.datanode.DataNode",
"filename": "hadoop-hdfs-datanode-cluster-rand-w-1.log",
"message": "PacketResponder: BP-2070054281-10.128.0.10-1599249511859:blk_1073816330_75506, type=LAST_IN_PIPELINE terminating"
},
"resource": {
"type": "cloud_dataproc_cluster",
"labels": {
"project_id": "project-id",
"cluster_uuid": "3de29175-f051-4aa5-9dee-e9925bfabec2",
"region": "us-central1",
"cluster_name": "cluster-rand"
}
},
"timestamp": "2020-09-08T19:06:15.035Z",
"severity": "INFO",
"labels": {
"compute.googleapis.com/resource_id": "5331347012694516446",
"compute.googleapis.com/zone": "us-central1-a",
"compute.googleapis.com/resource_name": "cluster-rand-w-1"
},
"logName": "projects/project-id/logs/hadoop-hdfs-datanode",
"receiveTimestamp": "2020-09-08T19:06:21.477492444Z"
}
Leider habe ich in keinem der verfügbaren Logs Einträge gefunden, die das Zurücklesen aus dem Checkpoint-Verzeichnis explizit dokumentieren – die Verarbeitung lief jedoch nahtlos genau dort weiter, wo sie zuvor unterbrochen worden war, und schloss die verbleibenden Operationen des Jobs ab.
**Fazit**
Zusammengefasst: Dataproc hat das Zurückfordern und Ersetzen einer Preemptible-Instance-Node genau so gehandhabt, wie es das Design von Hadoop und Spark vorsieht. Die Engineers von Google haben hervorragende Arbeit geleistet, damit Dataproc "Ausfälle" von Worker Nodes – etwa wenn eine Preemptible Instance ohne Vorwarnung zurückgefordert wird – nahtlos abfängt.
**Nachtrag: Dataproc Enhanced Flexibility Mode**
Nach Veröffentlichung dieses Artikels hat mir ein Googler Details zu einem Beta-Projekt von Google geteilt, das direkt zum hier beschriebenen Thema passt und mir beim Schreiben des Artikels nicht bekannt war.
Google bietet einen Dataproc-Modus, der speziell für Cluster konzipiert ist, die ihre Worker Nodes verlieren können – also etwa für Preemptible-Worker-Instances. Dieser Modus legt die Shuffle-Daten so im Dateisystem ab, dass das System auf den Ausfall von Worker Nodes optimiert ist.
Ich kann nur empfehlen, sich diesen Modus näher anzusehen. Er untermauert mein obiges Fazit zusätzlich und zeigt, dass Google diesen Prozess auch im Hintergrund kontinuierlich optimiert.
Details zum Produkt finden Sie hier: https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/flex