**Hadoop/Spark com Dataproc no Google Cloud**
O Dataproc é a opção número um para rodar um cluster Hadoop no Google Cloud Platform e é, sem dúvida, MUITO mais simples do que gerenciar um cluster na mão. Como o Hadoop faz parte do ecossistema de big data, ele costuma vir com um preço à altura desse universo. Para reduzir esse custo, muitos clientes usam instâncias preemptivas como worker nodes nesse cluster.
Para quem ainda não conhece, instâncias preemptivas são máquinas virtuais que rodam em cima dos recursos de computação ociosos que o provedor de nuvem tem em determinado momento e que podem ser retomadas quando esses recursos forem necessários em outro lugar — ou seja, vale pensar nelas como máquinas virtuais temporárias. Elas podem ou não estar disponíveis a qualquer instante, conforme a oferta de recursos de computação, e podem ser recuperadas a qualquer momento, com pouca ou nenhuma notificação prévia. Para compensar essa desvantagem, o preço dessas instâncias tem um desconto enorme — de até 80%, segundo o Google — em relação às máquinas virtuais tradicionais.
Por isso, é comum acoplar essas instâncias a clusters do Dataproc para reduzir custos de forma significativa ou para adicionar capacidade extra de processamento quando necessário.
Um cenário que aparece com frequência aqui na DoiT International é o do cliente que tem (ou precisa de) um cluster Hadoop para rodar Spark jobs por longos períodos (horas ou até dias), mas que precisa escalar conforme a carga ou pagar o mínimo possível. Na maioria das vezes, o Dataproc com instâncias preemptivas é a opção que nós e o Google recomendamos para esse caso.
Uma dúvida que alguns clientes mais avessos a riscos costumam trazer é: como o Dataproc lida com o cenário em que instâncias preemptivas são retomadas pelo Google, ainda mais em jobs muito longos que processam dados de missão crítica?
Para responder a essa pergunta, montei um cenário (ou experimento) para simular essa situação em um ambiente de carga em batch de produção e ver como o serviço Hadoop gerenciado do GCP reagiria.
**Checkpointing no Spark**
Edit: pouco depois da publicação, o Google entrou em contato comigo e me avisou sobre um modo nativo do Dataproc lançado em beta (na época em que escrevi) que entrega exatamente essa funcionalidade de forma nativa. Documentei tudo no fim deste artigo, na seção pós-conclusão.
Antes de tudo, um pouco de contexto sobre como o Spark lida com a movimentação de workloads entre máquinas virtuais ou nodes que podem existir em uma operação e desaparecer na seguinte.
O Spark tem um conceito chamado checkpointing que, em alto nível, consiste em gravar em disco o estado atual de um RDD ou DataFrame (pense em um conjunto de dados dentro do Spark). Isso é útil porque cria um "marcador" no seu job: se uma máquina virtual ficar comprometida (cair ou ficar indisponível), outra instância consegue retomar a partir do último marcador e seguir dali.
Nesse caso, se o cluster usa instâncias preemptivas e uma delas é retomada, a existência de um checkpoint permite que o processamento continue praticamente sem interrupção a partir desse ponto, em outro worker node.
Veja um exemplo rápido de como configurar um checkpoint com um trecho de código PySpark. Ele define o diretório do checkpoint, seleciona uma coluna de um dataframe e faz checkpoint do resultado antes de gravá-lo em um arquivo parquet no HDFS:
spark.sparkContext.setCheckpointDir('gs://bucket/checkpoints')events_df = df.select('event_type')
events_df.checkpoint()
events_df.write.format("parquet").save("/results/1234/")
Agora imagine que você tem um cluster Dataproc com um único master, 2 worker nodes e 2 worker nodes em instância preemptiva rodando o código acima. Se, durante a última linha do exemplo (a gravação dos resultados), um dos worker nodes preemptivos for retomado pelo Google, o Spark detecta a falha do node e reagenda a tarefa em outro node. Em vez de começar do zero, o job recomeça pela última linha, porque há um checkpoint salvo logo antes dela.
Esse é um exemplo bem simples, mas, quando você tem um Spark job com mais de 100 operações que podem levar 5 minutos cada, isso vira um salva-vidas — principalmente em uma frota grande de instâncias preemptivas, em que várias podem sumir entre uma operação e outra.
**O experimento de um Spark Job longo no Dataproc**
Embora isso pareça incrível na teoria, há pouquíssima validação prática documentada online rodando no Dataproc, e é justamente por isso que existe tanta incerteza entre clientes que cogitam o Dataproc para seus workloads de big data.
Versão TL;DR: sim, funciona e o job roda até o fim. Edit: confira a pós-conclusão para saber de uma forma ainda melhor de fazer isso de modo nativo.
Veja como configurei o ambiente para que ele possa ser recriado por qualquer pessoa que queira validar:
Para testar e não deixar passar nenhuma ação preemptiva, criei um ambiente de testes que, em teoria, rodaria um Spark job 24/7 para observar o que acontecia quando uma VM preemptiva era retomada e/ou substituída.
Esse ambiente é formado por um cluster Dataproc com um master node, 2 worker nodes e 2 worker nodes em instância preemptiva, um Spark job em batch (não streaming) que roda em pouco mais de 30 minutos e um job do Cloud Scheduler que dispara o Spark job a cada 30 minutos, para chegar o mais perto possível de 24/7. Optei por vCPUs N1 nos worker nodes porque são mais antigas e teriam maior chance de serem retomadas com frequência — descobri que instâncias E2 são retomadas bem menos do que as N1. O Spark job é bem básico: peguei um conjunto de dados público do BigQuery e executei várias operações aleatórias e custosas, como joins, cross joins, agregações de amostras aleatórias etc., para simular um job real de processamento de dados que distribuísse a carga por todos os nodes do cluster.
Para monitorar quando uma instância preemptiva era retomada e/ou substituída, criei uma métrica customizada de monitoramento sobre o managed instance group que o Dataproc cria para o cluster (o nome costuma ser dataproc-cluster-
Exemplo de métrica em um dashboard mostrando o tamanho do managed instance group
Depois que validei essa métrica com os logs da instância e do master node do job, dei início ao processo para rodar o job durante o longo feriado do Labor Day aqui nos EUA e na terça-feira seguinte, para incluir um fim de semana inteiro e um dia útil movimentado na análise. Vale lembrar que instâncias preemptivas rodam por no máximo 24 horas seguidas e são retomadas — com uma tentativa de reinício — ao bater nessa marca de 24 horas.
Ao longo desse período, houve várias quedas e subidas no gráfico quando as instâncias foram retomadas e substituídas (veja o gráfico acima como exemplo). Houve alguns "falsos positivos" em que os jobs ainda estavam iniciando ou estavam fazendo leituras no momento das retomadas e que mostravam o comportamento esperado — só não eram exemplos fáceis de ilustrar para os fins deste artigo. Mas um exemplo de manual, ótimo para visualizar o comportamento, ocorreu quando, logo após uma operação de checkpoint e no meio da gravação de um dataframe no HDFS, a instância preemptiva responsável pela escrita foi retomada e reiniciada, deixando logs bem claros do que aconteceu.
A exceção lançada, vinda do log do master do Dataproc, foi esta:
20/09/08 20:05:36 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 18.0 in stage 21.0 (TID 1490, cluster-4b46-sw-41l5.c.project-id.internal, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:288)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Era exatamente o que eu procurava: a máquina sendo retomada durante uma operação de escrita e, convenientemente, logo após uma operação de checkpoint. Em seguida, vieram alguns conjuntos de exceções com warnings como estes, mostrando a falha em se comunicar e agendar trabalho no node retomado (executor 2) e confirmando a retomada do node:
20/09/08 20:06:41 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 2 for reason Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 2 on cluster-rand-sw-4c7x.c.project-id.internal: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 73.0 in stage 63.0 (TID 9855, cluster-rand-sw-4c7x.c.project-id.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 74.0 in stage 63.0 (TID 9860, cluster-rand-sw-4c7x.c.project-id.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.ExecutorAllocationManager: Attempted to mark unknown executor 2 idle
Após alguns segundos e algumas séries dessas exceções, o job seguiu normalmente e devolveu um status de sucesso ao Dataproc. Confirmei que os dados foram gravados corretamente na pasta de destino no HDFS, mostrando que a operação foi concluída com sucesso, conforme o esperado.
Ao olhar os logs da nova instância preemptiva que substituiu a retomada, vi que ela tinha começado exatamente de onde a anterior havia parado. Vale observar que esse foi um exemplo aleatório, mas excelente, porque a tarefa foi reagendada justamente na instância de substituição, e não em outro worker node — algo que não acontece na maioria das vezes (1 vez em 9 no meu experimento). Aqui estão as entradas de log da nova instância:
{
"insertId": "j96wpu5rh8p09edb5",
"jsonPayload": {
"message": "src: /10.128.0.9:55928, dest: /10.128.0.8:9866, bytes: 134217728, op: HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_-1208291363_17, offset: 0, srvID: 3b9b065f-15f4-49d7-a9ad-a5a2136e4ce1, blockid: BP-2070054281-10.128.0.10-1599249511859:blk_1073816330_75506, duration(ns): 556814753645",
"class": "org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace",
"filename": "hadoop-hdfs-datanode-cluster-rand-w-1.log"
},
"resource": {
"type": "cloud_dataproc_cluster",
"labels": {
"project_id": "project-id",
"cluster_uuid": "3de29175-f051-4aa5-9dee-e9925bfabec2",
"region": "us-central1",
"cluster_name": "cluster-rand"
}
},
"timestamp": "2020-09-08T19:06:15.035Z",
"severity": "INFO",
"labels": {
"compute.googleapis.com/resource_id": "5331347012694516446",
"compute.googleapis.com/resource_name": "cluster-rand-w-1",
"compute.googleapis.com/zone": "us-central1-a"
},
"logName": "projects/project-id/logs/hadoop-hdfs-datanode",
"receiveTimestamp": "2020-09-08T19:06:21.477492444Z"
}
{
"insertId": "j96wpu5rh8p09edb6",
"jsonPayload": {
"class": "org.apache.hadoop.hdfs.server.datanode.DataNode",
"filename": "hadoop-hdfs-datanode-cluster-rand-w-1.log",
"message": "PacketResponder: BP-2070054281-10.128.0.10-1599249511859:blk_1073816330_75506, type=LAST_IN_PIPELINE terminating"
},
"resource": {
"type": "cloud_dataproc_cluster",
"labels": {
"project_id": "project-id",
"cluster_uuid": "3de29175-f051-4aa5-9dee-e9925bfabec2",
"region": "us-central1",
"cluster_name": "cluster-rand"
}
},
"timestamp": "2020-09-08T19:06:15.035Z",
"severity": "INFO",
"labels": {
"compute.googleapis.com/resource_id": "5331347012694516446",
"compute.googleapis.com/zone": "us-central1-a",
"compute.googleapis.com/resource_name": "cluster-rand-w-1"
},
"logName": "projects/project-id/logs/hadoop-hdfs-datanode",
"receiveTimestamp": "2020-09-08T19:06:21.477492444Z"
}
Infelizmente, não encontrei nenhuma entrada de log mostrando a leitura do diretório de checkpoint nos logs disponíveis, mas a instância retomou exatamente do ponto em que tinha parado e finalizou as operações restantes do job.
**Conclusão**
Para encerrar este experimento, o Dataproc lidou com a retomada e a substituição de um node de instância preemptiva exatamente como esperado pelo design do Hadoop e do Spark. Os engineers do Google fizeram um trabalho excelente para garantir que o Dataproc lide de forma transparente com "falhas" de worker nodes — ou seja, quando uma instância preemptiva é retomada sem aviso prévio.
**Pós-conclusão: Dataproc Enhanced Flexibility Mode**
Depois de publicar este artigo, um Googler me passou detalhes sobre um projeto em beta do Google que tem tudo a ver com o que descrevi aqui e que eu desconhecia na hora de escrever.
O Google tem um modo do Dataproc criado especificamente para clusters que podem perder seus worker nodes — ou seja, instâncias preemptivas como worker. Esse modo organiza o shuffle dos dados no filesystem de forma otimizada para o cenário de falha de worker nodes.
Recomendo muito conhecer esse modo, já que ele reforça ainda mais a conclusão acima e mostra que o Google também vem otimizando esse processo nos bastidores.
Os detalhes do produto estão aqui: https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/flex