**Hadoop/Spark con Dataproc su Google Cloud**
Dataproc è la scelta di riferimento per eseguire un cluster Hadoop su Google Cloud Platform ed è decisamente MOLTO più semplice della gestione manuale di un cluster. Essendo parte dell'ecosistema big data, Hadoop si porta spesso dietro un costo all'altezza del nome. Per contenere questa spesa, molti clienti utilizzano istanze preemptible come worker node del cluster.
Per chi non le conoscesse, le istanze preemptible sono macchine virtuali che sfruttano le risorse di calcolo in eccesso di cui un cloud provider dispone in un dato momento e che possono essere recuperate quando tali risorse servono altrove: in pratica, conviene considerarle macchine virtuali temporanee. La loro disponibilità non è garantita in qualsiasi istante, dipende dalle risorse di calcolo libere, e possono essere recuperate in qualunque momento con un preavviso minimo o nullo. Per compensare questo limite, il prezzo è scontato in modo notevole, fino all'80% secondo Google, rispetto alle istanze tradizionali.
Queste istanze vengono spesso collegate ai cluster Dataproc per ridurre sensibilmente i costi o per aggiungere capacità di elaborazione quando serve.
Uno scenario che ci viene proposto molto di frequente in DoiT International è quello di un cliente che ha già o deve creare un nuovo cluster Hadoop su cui far girare job Spark di lunga durata (ore o addirittura giorni), ma che ha bisogno di scalare in base al carico richiesto o di contenere il più possibile i costi. Nella maggior parte dei casi Dataproc con istanze preemptible è la soluzione consigliata sia da noi sia da Google.
Una domanda che ci pongono i clienti più avversi al rischio è: come gestisce Dataproc il caso in cui le istanze preemptible vengono recuperate da Google, soprattutto durante job molto lunghi che elaborano dati mission-critical?
Per rispondere proprio a questa domanda ho costruito uno scenario, o esperimento, che simula quanto avviene in un ambiente di batch-load in produzione, così da capire come reagisce il servizio Hadoop gestito di GCP.
**Spark Checkpointing**
Aggiornamento: poco dopo la pubblicazione, Google mi ha contattato per segnalarmi una modalità nativa di Dataproc, introdotta in versione beta (al momento della stesura), che svolge la stessa funzione in modo nativo. L'ho documentata in fondo all'articolo, nella sezione post-conclusione.
Innanzitutto, qualche premessa su come Spark gestisce lo spostamento dei workloads tra macchine virtuali o nodi diversi, che possono esserci per un'operazione e non essere più disponibili per quella successiva.
Spark si basa su un concetto chiamato checkpointing che, in estrema sintesi, consiste nello scrivere su disco lo stato corrente di un RDD o di un DataFrame (in pratica un dataset all'interno di Spark). È utile perché crea una sorta di \"segnalibro\" all'interno del job: se una macchina virtuale diventa unhealthy (si arresta o non è più raggiungibile), un'altra istanza può riprendere dall'ultimo segnalibro e proseguire da lì.
In questo caso, se un cluster usa istanze preemptible e una di esse viene recuperata, la presenza di un checkpoint consente all'elaborazione di proseguire pressoché senza interruzioni a partire da quel checkpoint su un altro worker node.
Ecco un rapido esempio di come configurare un checkpoint con un po' di codice PySpark. Imposta la directory dei checkpoint, seleziona una colonna da un dataframe e applica il checkpoint al risultato prima di scriverlo come file parquet su HDFS:
spark.sparkContext.setCheckpointDir('gs://bucket/checkpoints')events_df = df.select('event_type')
events_df.checkpoint()
events_df.write.format("parquet").save("/results/1234/")
Supponiamo ora di avere un cluster Dataproc con un singolo master, 2 worker node e 2 worker node preemptible che eseguono il codice qui sopra. Se durante l'esecuzione uno dei worker node preemptible viene recuperato da Google proprio mentre viene eseguita l'ultima riga dell'esempio, ovvero la scrittura dei risultati, Spark rileverà il guasto del nodo e riassegnerà l'attività a un altro nodo. A quel punto, anziché ripartire dall'inizio dell'esempio, l'elaborazione riprenderà dall'ultima riga, perché il checkpoint è stato salvato proprio prima di essa.
È un esempio molto semplice, ma con un job Spark che prevede oltre 100 operazioni da 5 minuti l'una, può rivelarsi un vero salvavita, soprattutto con un'ampia flotta di istanze preemptible che possono sparire a piacere tra un'operazione e l'altra.
**L'esperimento di un job Spark di lunga durata su Dataproc**
Per quanto in teoria sia tutto molto promettente, online si trovano pochissime verifiche concrete e documentate su Dataproc, ed è probabilmente questo il motivo per cui molti clienti che valutano Dataproc per i propri workloads big data nutrono ancora dei dubbi.
Versione TL;DR: sì, funziona e porta il job a termine. Aggiornamento: rimando alla sezione post-conclusione per indicazioni su un modo ancora migliore di farlo in modo nativo.
Ecco come ho impostato l'ambiente, in modo che chiunque possa replicarlo per le proprie verifiche:
Per testare lo scenario senza rischiare di farmi sfuggire eventuali eventi di prelazione, ho creato un ambiente di test che, in teoria, eseguiva un job Spark 24/7, così da osservare cosa succedeva quando una VM preemptible veniva recuperata e/o sostituita.
L'ambiente è composto da un cluster Dataproc con un master node, 2 worker node e 2 worker node preemptible, da un job Spark batch (non in streaming) della durata di poco più di 30 minuti e da un job Cloud Scheduler che avvia il job Spark ogni 30 minuti, in modo da farlo girare il più vicino possibile al 24/7. Sui worker node ho scelto vCPU N1, perché più datate e quindi con maggiore probabilità di essere recuperate spesso: in effetti è emerso che le istanze E2 vengono recuperate molto meno delle N1. Il job Spark è molto basilare: ho preso un dataset open da BigQuery e vi ho applicato una serie di operazioni casuali e onerose — join, cross join, aggregazioni su campioni casuali e così via — per simulare un vero job di elaborazione dati che distribuisse il carico su tutti i nodi del cluster.
Per individuare quando un'istanza preemptible veniva recuperata e/o sostituita, ho creato una metrica personalizzata di monitoraggio sul managed instance group che Dataproc crea per il cluster (il cui nome è di solito dataproc-cluster-
Esempio di metrica su un dashboard che mostra le dimensioni del managed instance group
Una volta validata la metrica con i log dell'istanza e del master node del job, ho avviato l'esecuzione del job durante il lungo weekend del Labor Day qui negli Stati Uniti e fino al martedì successivo, così da includere nell'analisi sia un weekend completo sia una giornata lavorativa intensa. Va precisato che le istanze preemptible girano per un massimo di 24 ore consecutive: al raggiungimento di quella soglia vengono recuperate e si tenta poi un riavvio.
In quell'arco di tempo si sono verificati diversi cali e risalite nel grafico, in corrispondenza di istanze recuperate e sostituite (vedi grafico precedente come esempio). Si sono presentati anche alcuni \"falsi positivi\": job ancora in fase di avvio o impegnati in operazioni di lettura nel momento in cui si è verificata la prelazione, che mostravano comunque il comportamento atteso ma non erano esempi facili da illustrare ai fini di questo articolo. È capitato però un caso da manuale che ha permesso di visualizzare il comportamento in modo molto chiaro: subito dopo un'operazione di checkpoint e nel pieno della scrittura di un dataframe su HDFS, l'istanza preemptible che stava effettuando la scrittura è stata recuperata e riavviata, lasciando log molto eloquenti.
L'eccezione sollevata, dal log del master Dataproc, è stata la seguente:
20/09/08 20:05:36 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 18.0 in stage 21.0 (TID 1490, cluster-4b46-sw-41l5.c.project-id.internal, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:288)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Era esattamente ciò che cercavo: la prova che la macchina veniva recuperata durante un'operazione di scrittura e, guarda caso, subito dopo un checkpoint. Dopo questa eccezione si sono susseguiti altri gruppi di eccezioni con avvisi come quelli qui sotto, che segnalavano l'impossibilità di comunicare e schedulare attività sul nodo recuperato (executor 2) e confermavano la prelazione del nodo:
20/09/08 20:06:41 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 2 for reason Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 2 on cluster-rand-sw-4c7x.c.project-id.internal: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 73.0 in stage 63.0 (TID 9855, cluster-rand-sw-4c7x.c.project-id.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 74.0 in stage 63.0 (TID 9860, cluster-rand-sw-4c7x.c.project-id.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.ExecutorAllocationManager: Attempted to mark unknown executor 2 idle
Dopo qualche secondo e qualche serie di eccezioni di questo tipo, il job è proseguito normalmente fino a restituire a Dataproc un esito positivo. Ho verificato che i dati fossero stati scritti correttamente nella cartella di destinazione su HDFS, a conferma che l'operazione si era conclusa con successo come previsto.
Passando ai log della nuova istanza preemptible che ha sostituito quella recuperata, ho notato che l'elaborazione era ripresa esattamente dal punto in cui si era interrotta sull'istanza precedente. Va detto che si è trattato di un caso fortuito ma particolarmente eloquente, perché il task è stato riassegnato proprio all'istanza sostitutiva e non a un altro worker node: un'eventualità che nella maggior parte dei casi non si verifica (1 volta su 9, in questo esperimento). Ecco le voci di log della nuova istanza:
{
"insertId": "j96wpu5rh8p09edb5",
"jsonPayload": {
"message": "src: /10.128.0.9:55928, dest: /10.128.0.8:9866, bytes: 134217728, op: HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_-1208291363_17, offset: 0, srvID: 3b9b065f-15f4-49d7-a9ad-a5a2136e4ce1, blockid: BP-2070054281-10.128.0.10-1599249511859:blk_1073816330_75506, duration(ns): 556814753645",
"class": "org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace",
"filename": "hadoop-hdfs-datanode-cluster-rand-w-1.log"
},
"resource": {
"type": "cloud_dataproc_cluster",
"labels": {
"project_id": "project-id",
"cluster_uuid": "3de29175-f051-4aa5-9dee-e9925bfabec2",
"region": "us-central1",
"cluster_name": "cluster-rand"
}
},
"timestamp": "2020-09-08T19:06:15.035Z",
"severity": "INFO",
"labels": {
"compute.googleapis.com/resource_id": "5331347012694516446",
"compute.googleapis.com/resource_name": "cluster-rand-w-1",
"compute.googleapis.com/zone": "us-central1-a"
},
"logName": "projects/project-id/logs/hadoop-hdfs-datanode",
"receiveTimestamp": "2020-09-08T19:06:21.477492444Z"
}
{
"insertId": "j96wpu5rh8p09edb6",
"jsonPayload": {
"class": "org.apache.hadoop.hdfs.server.datanode.DataNode",
"filename": "hadoop-hdfs-datanode-cluster-rand-w-1.log",
"message": "PacketResponder: BP-2070054281-10.128.0.10-1599249511859:blk_1073816330_75506, type=LAST_IN_PIPELINE terminating"
},
"resource": {
"type": "cloud_dataproc_cluster",
"labels": {
"project_id": "project-id",
"cluster_uuid": "3de29175-f051-4aa5-9dee-e9925bfabec2",
"region": "us-central1",
"cluster_name": "cluster-rand"
}
},
"timestamp": "2020-09-08T19:06:15.035Z",
"severity": "INFO",
"labels": {
"compute.googleapis.com/resource_id": "5331347012694516446",
"compute.googleapis.com/zone": "us-central1-a",
"compute.googleapis.com/resource_name": "cluster-rand-w-1"
},
"logName": "projects/project-id/logs/hadoop-hdfs-datanode",
"receiveTimestamp": "2020-09-08T19:06:21.477492444Z"
}
Purtroppo, in nessuno dei log disponibili sono riuscito a trovare voci che mostrassero esplicitamente la rilettura dalla directory dei checkpoint, ma l'elaborazione è effettivamente ripresa proprio dal punto in cui si era interrotta, portando poi a termine le operazioni rimanenti del job.
**Conclusione**
In conclusione, in questo esperimento Dataproc ha gestito il recupero e la sostituzione di un nodo con istanza preemptible esattamente come previsto dal design di Hadoop e Spark. Gli ingegneri di Google hanno fatto un lavoro eccellente: Dataproc gestisce in modo trasparente i \"failure\" dei worker node, ovvero i casi in cui un'istanza preemptible viene recuperata senza preavviso.
**Post-conclusione: Dataproc Enhanced Flexibility Mode**
Dopo la pubblicazione dell'articolo, un Googler mi ha condiviso i dettagli di un progetto beta di Google strettamente collegato a quanto descritto qui, di cui non ero al corrente al momento della stesura.
Google offre una modalità di Dataproc pensata appositamente per i cluster che possono perdere i propri worker node, ovvero le istanze worker preemptible. Questa modalità organizza lo shuffle dei dati sul filesystem in modo da ottimizzarlo proprio per gli scenari in cui i worker node vengono meno.
Consiglio vivamente di approfondirla: rafforza ulteriormente la conclusione qui sopra e dimostra che anche Google sta ottimizzando in silenzio questo aspetto.
I dettagli del prodotto sono disponibili qui: https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/flex