Cloud Intelligence™Cloud Intelligence™

Cloud Intelligence™

Spark Jobs largos en GCP con Dataproc e instancias preemptible

By Sayle MatthewsSep 15, 202010 min read

Esta página también está disponible en English, Deutsch, Français, Italiano, 日本語 y Português.

**Hadoop/Spark con Dataproc en Google Cloud**

Dataproc es la opción ideal para correr un cluster de Hadoop sobre Google Cloud Platform y resulta MUCHO más fácil que administrar un cluster a mano. Como Hadoop forma parte del ecosistema de big data, suele venir con un precio acorde a esa magnitud. Para reducir ese costo, muchos clientes usan instancias preemptible como nodos worker en el cluster.

Para quienes no están familiarizados, las instancias preemptible son máquinas virtuales que aprovechan los recursos de cómputo excedentes que tiene un proveedor de nube en un momento dado y que pueden recuperarse cuando esos recursos se necesitan en otro lugar; conviene pensarlas como máquinas virtuales temporales. Pueden estar disponibles o no en un momento determinado según la disponibilidad de cómputo, y pueden ser reclamadas en cualquier instante con poca o nula notificación. Para compensar esa desventaja, su precio tiene un descuento enorme —de hasta el 80% según Google— frente a las instancias tradicionales.

Estas instancias suelen sumarse a los clusters de Dataproc para reducir costos de forma significativa o para aportar capacidad de procesamiento adicional cuando hace falta.

Un escenario que nos plantean seguido en DoiT International es el de un cliente que ya tiene o necesita un nuevo cluster de Hadoop para correr Spark Jobs durante períodos largos (horas o incluso días), pero que debe poder escalar según la carga o costar lo menos posible. La mayoría de las veces, Dataproc con instancias preemptible es nuestra recomendación —y la de Google— para este caso.

Una pregunta que nos han hecho algunos clientes más cautelosos es: ¿cómo maneja Dataproc el escenario en que Google reclama de vuelta las instancias preemptible, sobre todo en jobs muy largos que procesan datos críticos para el negocio?

Para responder esa pregunta puntual, armé un experimento que simulara esta situación en un entorno de carga batch en producción y así determinar cómo reaccionaría el servicio gestionado de Hadoop de GCP.

**Checkpointing en Spark**

Edición: Poco después de publicar el artículo, Google se contactó conmigo para contarme sobre un modo nativo de Dataproc lanzado en versión beta (al momento de escribir esto) que cumple esta misma función de forma nativa. Lo documenté al final del artículo, en la sección de post-conclusión.

Primero, un poco de contexto sobre cómo Spark mueve los workloads entre distintas máquinas virtuales o nodos que pueden existir para una operación y dejar de existir para la siguiente.

Spark tiene un concepto llamado checkpointing que, a muy alto nivel, consiste en escribir en disco el estado actual de un RDD o DataFrame (piensa en un dataset dentro de Spark). Es útil porque crea un "marcador" en tu job: si una máquina virtual deja de estar saludable (muere o queda inaccesible), otra instancia puede retomar desde el último marcador y seguir desde ahí.

En este caso, si un cluster usa instancias preemptible y una de ellas es reclamada, la existencia de un checkpoint permitirá que el procesamiento continúe casi sin interrupciones desde ese punto en otro nodo worker.

Aquí va un ejemplo rápido de cómo configurar un checkpoint con un poco de código PySpark. Se define el directorio del checkpoint, se selecciona una columna de un dataframe y se hace checkpoint del resultado antes de escribirlo en un archivo parquet en HDFS:

spark.sparkContext.setCheckpointDir('gs://bucket/checkpoints')events_df = df.select('event_type')
events_df.checkpoint()
events_df.write.format("parquet").save("/results/1234/")

Ahora supongamos que tienes un cluster de Dataproc con un único master, 2 nodos worker y 2 nodos worker preemptible corriendo el código anterior. Si Google reclama uno de los nodos worker preemptible justo durante la última línea del ejemplo —la que escribe los resultados—, Spark detectará la caída del nodo y replanificará la tarea en otro. En lugar de empezar desde el principio, retomará en la última línea, porque tiene un checkpoint guardado justo antes.

Es un ejemplo muy simple, pero cuando tienes un Spark Job con más de 100 operaciones que pueden tardar 5 minutos cada una, esto se vuelve un salvavidas, sobre todo con una flota grande de instancias preemptible donde varias pueden desaparecer entre operaciones.

**El experimento de un Spark Job largo en Dataproc**

Aunque en teoría suena increíble, hay muy poca validación documentada en línea de cómo se comporta esto sobre Dataproc, y por eso los clientes que evalúan Dataproc para sus workloads de big data tienen tantas dudas.

Versión TL;DR: sí, funciona y el job corre hasta completarse. Edición: revisa la post-conclusión para ver una forma aún mejor de hacerlo de manera nativa.

Así fue como armé el entorno para que cualquiera pueda recrearlo y validarlo si lo necesita:

Para probarlo y no dejar pasar ninguna posible acción de preemption, creé un entorno de prueba que en teoría correría un Spark Job 24/7 para ver qué pasaba cuando una VM preemptible era reclamada o reemplazada.

Este entorno de prueba consiste en un cluster de Dataproc con un nodo master, 2 nodos worker y 2 nodos worker preemptible, un Spark Job batch (no streaming) que corre en poco más de 30 minutos, y un job de Cloud Scheduler que dispara el Spark Job cada 30 minutos para acercarse lo más posible a 24/7. Elegí vCPUs N1 en los nodos worker porque son más viejas y tenían más probabilidades de ser reclamadas con frecuencia; resulta que las instancias E2 se reclaman bastante menos que las N1. El Spark Job es muy básico: tomé un dataset abierto desde BigQuery y le apliqué un montón de operaciones costosas y aleatorias —joins, cross joins, agregaciones de muestras aleatorias, etc.— para simular un job real de procesamiento de datos que distribuyera el trabajo entre todos los nodos del cluster.

Para monitorear cuándo una instancia preemptible era reclamada o reemplazada, creé una métrica personalizada sobre el managed instance group que Dataproc crea para el cluster (el nombre suele ser dataproc-cluster-) y la puse en una gráfica del dashboard. Las caídas y subidas en esa gráfica mostraban cuándo se reclamaban o reemplazaban instancias preemptible, lo que me daba los tiempos para filtrar los logs.

Ejemplo de métrica en un dashboard mostrando el tamaño del managed instance group

Una vez validada la métrica con los logs de la instancia y del nodo master del job, arranqué la corrida durante el largo fin de semana de Labor Day en Estados Unidos y el martes siguiente, para incluir un fin de semana completo y un día laboral cargado en el análisis. Vale la pena notar que las instancias preemptible corren un máximo de 24 horas seguidas y luego son reclamadas, intentando reiniciar al cumplirse ese plazo.

Durante este período hubo varias caídas y subidas en la gráfica cuando las instancias se reclamaron y reemplazaron (ver gráfico arriba como ejemplo). Hubo algunos "falsos positivos" en los que los jobs aún estaban arrancando o haciendo lecturas justo cuando ocurrieron las operaciones de reclamo y, aunque mostraban el comportamiento esperado, no eran ejemplos fáciles de presentar para este artículo. Pero sí ocurrió un ejemplo de manual que permite visualizar el comportamiento con claridad: justo después de una operación de checkpoint y en medio de la escritura de un dataframe a HDFS, la instancia preemptible que estaba escribiendo fue reclamada y reiniciada, dejando logs muy claros de su comportamiento.

La excepción que se lanzó, desde el log del master de Dataproc, fue esta:

20/09/08 20:05:36 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 18.0 in stage 21.0 (TID 1490, cluster-4b46-sw-41l5.c.project-id.internal, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:288)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:198)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Era exactamente lo que estaba buscando: mostraba la máquina siendo reclamada durante una operación de escritura y, oportunamente, justo después de un checkpoint. Tras esta excepción aparecieron varias tandas de excepciones que generaron advertencias como las siguientes, evidenciando la falla de comunicación y de planificación de trabajo en el nodo reclamado (executor 2) y confirmando el reclamo:

20/09/08 20:06:41 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 2 for reason Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 2 on cluster-rand-sw-4c7x.c.project-id.internal: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 73.0 in stage 63.0 (TID 9855, cluster-rand-sw-4c7x.c.project-id.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 74.0 in stage 63.0 (TID 9860, cluster-rand-sw-4c7x.c.project-id.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.ExecutorAllocationManager: Attempted to mark unknown executor 2 idle

Tras unos segundos y varias tandas de esas excepciones, el job siguió con normalidad hasta devolverle a Dataproc un valor de éxito. Verifiqué que los datos se escribieron correctamente en la carpeta de destino en HDFS, lo que confirma que la operación se completó como estaba previsto.

Cuando pasé a revisar los logs de la nueva instancia preemptible que reemplazó a la reclamada, vi que había arrancado exactamente donde se había detenido el procesamiento en la anterior. Cabe aclarar que fue un caso aleatorio, pero excelente: la tarea volvió a la instancia de reemplazo en lugar de pasar a otro nodo worker, algo que no ocurrirá la mayoría de las veces (1 de cada 9 en mi experimento). Estas son las entradas de log de la nueva instancia:

{
  "insertId": "j96wpu5rh8p09edb5",
  "jsonPayload": {
    "message": "src: /10.128.0.9:55928, dest: /10.128.0.8:9866, bytes: 134217728, op: HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_-1208291363_17, offset: 0, srvID: 3b9b065f-15f4-49d7-a9ad-a5a2136e4ce1, blockid: BP-2070054281-10.128.0.10-1599249511859:blk_1073816330_75506, duration(ns): 556814753645",
    "class": "org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace",
    "filename": "hadoop-hdfs-datanode-cluster-rand-w-1.log"
  },
  "resource": {
    "type": "cloud_dataproc_cluster",
    "labels": {
      "project_id": "project-id",
      "cluster_uuid": "3de29175-f051-4aa5-9dee-e9925bfabec2",
      "region": "us-central1",
      "cluster_name": "cluster-rand"
    }
  },
  "timestamp": "2020-09-08T19:06:15.035Z",
  "severity": "INFO",
  "labels": {
    "compute.googleapis.com/resource_id": "5331347012694516446",
    "compute.googleapis.com/resource_name": "cluster-rand-w-1",
    "compute.googleapis.com/zone": "us-central1-a"
  },
  "logName": "projects/project-id/logs/hadoop-hdfs-datanode",
  "receiveTimestamp": "2020-09-08T19:06:21.477492444Z"
}
{
  "insertId": "j96wpu5rh8p09edb6",
  "jsonPayload": {
    "class": "org.apache.hadoop.hdfs.server.datanode.DataNode",
    "filename": "hadoop-hdfs-datanode-cluster-rand-w-1.log",
    "message": "PacketResponder: BP-2070054281-10.128.0.10-1599249511859:blk_1073816330_75506, type=LAST_IN_PIPELINE terminating"
  },
  "resource": {
    "type": "cloud_dataproc_cluster",
    "labels": {
      "project_id": "project-id",
      "cluster_uuid": "3de29175-f051-4aa5-9dee-e9925bfabec2",
      "region": "us-central1",
      "cluster_name": "cluster-rand"
    }
  },
  "timestamp": "2020-09-08T19:06:15.035Z",
  "severity": "INFO",
  "labels": {
    "compute.googleapis.com/resource_id": "5331347012694516446",
    "compute.googleapis.com/zone": "us-central1-a",
    "compute.googleapis.com/resource_name": "cluster-rand-w-1"
  },
  "logName": "projects/project-id/logs/hadoop-hdfs-datanode",
  "receiveTimestamp": "2020-09-08T19:06:21.477492444Z"
}

Lamentablemente, no escribe entradas de log que muestren la lectura desde el directorio del checkpoint —al menos no encontré ninguna en los logs disponibles—, pero sí retomó exactamente donde se había detenido y completó las operaciones que quedaban del job.

**Conclusión**

Para cerrar este experimento: Dataproc gestionó el reclamo y reemplazo de un nodo de instancia preemptible tal como lo prevé el diseño de Hadoop y Spark. Los Engineers de Google han hecho un trabajo excelente para que Dataproc maneje sin sobresaltos las "fallas" de los nodos worker, es decir, cuando una instancia preemptible se reclama sin previo aviso.

**Post-conclusión: Dataproc Enhanced Flexibility Mode**

Después de publicar este artículo, un Googler me compartió detalles sobre un proyecto en beta de Google que se relaciona directamente con lo que conté aquí y del que no estaba al tanto al momento de escribirlo.

Google tiene un modo de Dataproc pensado específicamente para clusters que pueden perder sus nodos worker, es decir, instancias worker preemptible. Ese modo distribuye los datos en el filesystem de una forma optimizada para tolerar la caída de nodos worker.

Recomiendo mucho explorarlo, porque refuerza la conclusión anterior y muestra que Google también ha estado optimizando este proceso en silencio.

Los detalles del producto están aquí: https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/flex