Cloud Intelligence™Cloud Intelligence™

Cloud Intelligence™

DataprocとPreemptibleインスタンスでGCP上の長時間Sparkジョブを動かす

By Sayle MatthewsSep 15, 202010 min read

このページはEnglishDeutschEspañolFrançaisItalianoPortuguêsでもご覧いただけます。

**Google Cloud上のDataprocで動かすHadoop/Spark**

Google Cloud Platform上でHadoopクラスタを動かすなら、Dataprocが定番の選択肢です。クラスタを手作業で管理するのに比べれば、圧倒的にラクになります。とはいえ、Hadoopはビッグデータ領域のソフトウェアだけに、それに見合った大きなコストがつきまといがちです。このコストを抑えるため、ワーカーノードにpreemptibleインスタンスを使うお客様は少なくありません。

preemptibleインスタンスをご存じない方のために補足すると、これはクラウド事業者がその時点で抱える余剰のコンピュートリソースを使って動く仮想マシンで、リソースが他で必要になれば回収されます。要するに「一時的な仮想マシン」と捉えるのが分かりやすいでしょう。リソースの空き状況によって使えたり使えなかったりしますし、ほぼ通知なしにいつでも回収される可能性があります。その代わり価格は大幅に割引されており、Googleによれば従来の仮想マシンと比べて最大80%も安く利用できます。

こうした特性から、コスト削減のため、あるいは必要に応じて処理能力を上積みするために、preemptibleインスタンスはDataprocクラスタにアタッチされることがよくあります。

DoiT Internationalでよくご相談を受けるケースに、長時間(数時間、場合によっては数日)動き続けるSparkジョブをHadoopクラスタで実行したい、しかも負荷に応じてスケールさせたい、または可能な限り安く動かしたい、というものがあります。多くの場合、私たちもGoogleもpreemptibleインスタンスを組み合わせたDataprocをおすすめしています。

ここで、リスクに敏感なお客様からよくいただく質問があります。「特にミッションクリティカルなデータを扱う長時間ジョブで、Googleにpreemptibleインスタンスが回収されたとき、Dataprocはどのように振る舞うのか?」というものです。

この問いに答えるため、本番のバッチロード環境で同様の状況をシミュレートし、GCPのマネージドHadoopサービスがどう反応するかを検証する実験を組みました。

**Sparkのチェックポイント**

追記: 本記事の公開直後にGoogleからご連絡をいただき、執筆時点ではベータ版として提供されている、同等の機能をネイティブに実現するDataprocのモードがあることを教えていただきました。詳細は記事末尾の「結論のあとに」セクションにまとめています。

まずは、ある処理では存在していたが次の処理では消えているかもしれない仮想マシン(ノード)間で、Sparkがどのようにワークロードを移すのかを簡単におさらいしておきます。

Sparkにはチェックポイントという仕組みがあります。ざっくり言えば、RDDやDataFrame(Spark内部のデータセットと考えてください)の現在の状態をディスクに書き出すものです。ジョブの途中に「しおり」を挟むイメージで、ある仮想マシンが落ちたり利用不能になったりしても、別のインスタンスが直近のしおりからジョブを引き継いで再開できるため、非常に役立ちます。

つまり、preemptibleインスタンスを使っているクラスタでインスタンスが回収されても、チェックポイントさえあれば、別のワーカーノードがそこから処理をほぼ中断なく引き継げるということです。

以下は、PySparkでチェックポイントを設定する簡単な例です。チェックポイントディレクトリを指定し、データフレームから列を取り出してチェックポイント化したうえで、HDFS上のparquetファイルへ書き出しています。

spark.sparkContext.setCheckpointDir('gs://bucket/checkpoints')events_df = df.select('event_type')
events_df.checkpoint()
events_df.write.format("parquet").save("/results/1234/")

では、マスター1台、ワーカー2台、preemptibleワーカー2台で構成されたDataprocクラスタで、上記コードを実行している状況を考えてみてください。最終行で結果を書き出している最中にpreemptibleワーカーの1台がGoogleに回収されると、Sparkはノード障害を検知して別のノードへタスクを再スケジュールします。このとき、コードの先頭からやり直すのではなく、直前にチェックポイントが保存されているため、最終行から再開されます。

ごく単純な例ですが、1ステップ5分かかる処理が100以上連なるようなSparkジョブでは、これがまさに命綱になります。とりわけ大量のpreemptibleインスタンスを抱え、処理の合間にいくつもインスタンスが消える可能性がある環境では、その効果は絶大です。

**Dataproc上で長時間Sparkジョブを動かす実験**

理屈の上では素晴らしい話ですが、Dataproc上での実挙動を実際に検証してオンラインに公開している例はほとんど見当たりません。これが、ビッグデータワークロードでDataprocの採用を検討するお客様の間に不安が残っている理由のように感じます。

結論を先に言えば、ちゃんと動作してジョブも最後まで完走します。追記: より優れたネイティブの方法については、本文末の「結論のあとに」をご覧ください。

同じ検証を再現できるよう、私が用意した環境を以下にご紹介します。

preemptibleの動きを取りこぼさないため、24時間365日Sparkジョブが動き続ける状態に近い環境を用意し、preemptible VMが回収・置換されたときに何が起きるかを観測できるようにしました。

具体的には、マスター1台、ワーカー2台、preemptibleワーカー2台で構成されたDataprocクラスタに、30分強で完了するバッチ(非ストリーミング)のSparkジョブを用意し、それを30分ごとに起動するCloud Schedulerジョブを組み合わせて、できるだけ24/7に近い形で動かしました。ワーカーノードにはN1のvCPUを選びました。世代が古く回収頻度も高そうだと考えたためですが、実際にはE2インスタンスの方がN1よりずっと回収されにくいことが分かりました。Sparkジョブはごく基本的な内容で、BigQueryから公開データセットを取得し、joinやcross join、ランダムサンプリングの集計など、コストの高い処理をランダムに大量に実行することで、クラスタ内の全ノードに処理を分散させる実データ処理ジョブを模擬しました

preemptibleインスタンスがいつ回収・置換されたかを把握するため、Dataprocがクラスタ用に作成するマネージドインスタンスグループ(名前は通常dataproc-cluster-<クラスタ名>)に対してカスタムメトリクスを作成し、ダッシュボードのグラフに載せました。グラフの上下を観察することで回収・置換のタイミングが分かり、ログを絞り込む時刻として活用できました。

マネージドインスタンスグループのサイズを示すダッシュボードのメトリクス例

このメトリクスがインスタンスとジョブのマスターノードのログと整合していることを確認したうえで、米国のレイバーデー連休と、その翌週火曜日(多忙な平日)にまたがってジョブを動かしました。週末と平日の両方を分析対象に含めるためです。なおpreemptibleインスタンスは1回の起動で最長24時間しか動作せず、24時間に達した時点で回収されたうえで再起動が試みられます。

この期間中、インスタンスの回収・置換に合わせてグラフには何度も上下動が現れました(例は上の図を参照)。回収のタイミングがジョブの起動中や読み取り処理中に重なり、想定どおりの挙動だったものの本記事で示すには分かりづらい「ぎりぎり」の例もいくつかありました。一方で、教科書のように分かりやすい例も発生しました。チェックポイント直後、データフレームをHDFSへ書き出している最中に、その書き込みを担当していたpreemptibleインスタンスが回収・再起動され、挙動を示す非常に明瞭なログが残されたのです。

Dataprocのマスターログには、次の例外がスローされていました。

20/09/08 20:05:36 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 18.0 in stage 21.0 (TID 1490, cluster-4b46-sw-41l5.c.project-id.internal, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:288)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:198)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

これはまさに探していた状況でした。書き込み処理の最中、しかもチェックポイント直後にマシンが回収されたことを示しています。この例外のあと、回収されたノード(executor 2)への通信やタスク割り当ての失敗を示す警告がいくつか出力され、ノードが回収されたことが裏付けられました。

20/09/08 20:06:41 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 2 for reason Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 2 on cluster-rand-sw-4c7x.c.project-id.internal: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 73.0 in stage 63.0 (TID 9855, cluster-rand-sw-4c7x.c.project-id.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 74.0 in stage 63.0 (TID 9860, cluster-rand-sw-4c7x.c.project-id.internal, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1599249516460_0228_01_000003 on host: cluster-rand-sw-4c7x.c.project-id.internal. Exit status: -100. Diagnostics: Container released on a *lost* node20/09/08 20:06:41 WARN org.apache.spark.ExecutorAllocationManager: Attempted to mark unknown executor 2 idle

数秒のあいだに同種の例外が何度か出力されたあと、ジョブはそのまま通常どおり進行し、Dataprocへ成功ステータスを返しました。HDFS上の出力先フォルダにもデータが正しく書き込まれており、想定どおりに処理が完了したことを確認できました。

続いて、回収されたインスタンスの代わりに起動した新しいpreemptibleインスタンスのログに切り替えて確認したところ、回収されたインスタンスで処理が中断したまさにその位置から再開していました。なお、これは偶然ながら好例で、タスクが他のワーカーノードではなく、置き換えとして立ち上がった同じpreemptibleインスタンス上に再投入されたケースです。多くの場合はこうはなりません(私の今回の実験では9回中1回でした)。新インスタンスのログエントリは以下のとおりです。

{
  "insertId": "j96wpu5rh8p09edb5",
  "jsonPayload": {
    "message": "src: /10.128.0.9:55928, dest: /10.128.0.8:9866, bytes: 134217728, op: HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_-1208291363_17, offset: 0, srvID: 3b9b065f-15f4-49d7-a9ad-a5a2136e4ce1, blockid: BP-2070054281-10.128.0.10-1599249511859:blk_1073816330_75506, duration(ns): 556814753645",
    "class": "org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace",
    "filename": "hadoop-hdfs-datanode-cluster-rand-w-1.log"
  },
  "resource": {
    "type": "cloud_dataproc_cluster",
    "labels": {
      "project_id": "project-id",
      "cluster_uuid": "3de29175-f051-4aa5-9dee-e9925bfabec2",
      "region": "us-central1",
      "cluster_name": "cluster-rand"
    }
  },
  "timestamp": "2020-09-08T19:06:15.035Z",
  "severity": "INFO",
  "labels": {
    "compute.googleapis.com/resource_id": "5331347012694516446",
    "compute.googleapis.com/resource_name": "cluster-rand-w-1",
    "compute.googleapis.com/zone": "us-central1-a"
  },
  "logName": "projects/project-id/logs/hadoop-hdfs-datanode",
  "receiveTimestamp": "2020-09-08T19:06:21.477492444Z"
}
{
  "insertId": "j96wpu5rh8p09edb6",
  "jsonPayload": {
    "class": "org.apache.hadoop.hdfs.server.datanode.DataNode",
    "filename": "hadoop-hdfs-datanode-cluster-rand-w-1.log",
    "message": "PacketResponder: BP-2070054281-10.128.0.10-1599249511859:blk_1073816330_75506, type=LAST_IN_PIPELINE terminating"
  },
  "resource": {
    "type": "cloud_dataproc_cluster",
    "labels": {
      "project_id": "project-id",
      "cluster_uuid": "3de29175-f051-4aa5-9dee-e9925bfabec2",
      "region": "us-central1",
      "cluster_name": "cluster-rand"
    }
  },
  "timestamp": "2020-09-08T19:06:15.035Z",
  "severity": "INFO",
  "labels": {
    "compute.googleapis.com/resource_id": "5331347012694516446",
    "compute.googleapis.com/zone": "us-central1-a",
    "compute.googleapis.com/resource_name": "cluster-rand-w-1"
  },
  "logName": "projects/project-id/logs/hadoop-hdfs-datanode",
  "receiveTimestamp": "2020-09-08T19:06:21.477492444Z"
}

残念ながら、確認できたログの中に、チェックポイントディレクトリから読み戻していることを示すエントリは見つかりませんでした。それでも処理は中断したまさにその地点から再開され、ジョブに残っていた処理を最後までやり切りました。

**結論**

本実験の結論として、Dataprocはpreemptibleインスタンスノードの回収と置換を、HadoopとSparkの設計どおりに処理しました。preemptibleインスタンスが予告なく回収されたときのようなワーカーノードの「障害」を、Dataprocがシームレスに扱えるようにしているGoogleのエンジニアリングは、本当に見事だと感じました。

**結論のあとに: Dataproc Enhanced Flexibility Mode**

本記事を公開したあと、Googleの方から、ここで取り上げた内容に直接関係するベータプロジェクトの存在を教えていただきました。執筆時には把握できていなかった機能です。

Googleは、ワーカーノード(つまりpreemptibleワーカーインスタンス)を失う可能性があるDataprocクラスタ向けに専用設計されたモードを提供しています。このモードでは、ワーカーノード障害に最適化された方法でデータをファイルシステムへシャッフルします。

このモードは、上記の私の結論をさらに裏付けるものであり、Googleが裏側でこのプロセスを着実に最適化してきていることを示しています。ぜひ調査・活用されることをおすすめします。

本機能の詳細はこちらをご覧ください: https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/flex