Cloud Intelligence™Cloud Intelligence™

Cloud Intelligence™

Pub/Subで実現する重複排除・遅延配信・FIFO

By Evgeny Varela-SavelaSep 23, 20205 min read

このページはEnglishDeutschEspañolFrançaisItalianoPortuguêsでもご覧いただけます。

イベント駆動型アーキテクチャの普及が進むなか、Google Pub/Subに不足している機能を補うための工夫が求められる場面があります。本記事では、Google Cloudのネイティブサービスを活用した重複排除・遅延配信・FIFOの実装アプローチをご紹介します。

本記事は Moshe Ohaion との共著です。

はじめに

イベント駆動型アーキテクチャは、いまや誰もが口にするほど広く浸透しています。多くのシステムが、メッセージ指向ミドルウェアとして、あるいはストリーミング分析パイプラインへのイベント取り込み・配信基盤として、メッセージングサービスを採用しています。GCP(Google Cloud Platform)上で開発するなら、マネージドサービスのGoogle Pub/Subをイベント取り込み・配信システムとして利用できます。

Google Pub/Subは、永続的なメッセージストレージ、高可用性を備えたリアルタイム配信、スケール時の安定したパフォーマンス、at-least-once配信といった重要な機能を提供します。しかし他のメッセージングサービスと同様に、選定の際にネックとなり得るいくつかの機能が不足しているのも事実です。

本記事では、こうした「決定的な不足」を簡単に克服しつつ、GCP Pub/Subというネイティブな選択肢を維持するための方法をいくつかご紹介します。

重複排除

Pub/Subはat-least-once配信を保証する一方で、exactly-once配信は保証しません。つまり一定の重複発生は前提となっており、次のような問題を引き起こす可能性があります。

  • データ整合性 — 同じデータを二重にカウントしてしまう恐れがあります。
  • 不要な処理が起動され、パフォーマンスやコストに影響が出る可能性があります。

重複が発生する原因は、主に次の2つです。

  1. アプリが期限内にメッセージをackできなかった場合。
  2. メッセージがackされたにもかかわらず、Pub/Subが複数回送信してしまう場合。

1つ目のケースについて、コード内でのack処理を忘れていないという前提で対処法を挙げます。

  • 各メッセージの処理時間が常にサブスクリプションのACK期限を超えているなら、サブスクリプション全体の期限を延長してください。
  • 特定のメッセージだけ処理に時間がかかる場合は、そのメッセージに対してmodify_ack_deadlineメソッドを使い、サブスクリプション側のack期限を個別に延長できます。

Pub/Subの内部実装に起因する重複(こちらはより稀なケース)については、まず重複排除を行うコンポーネントを経由させる必要があります。Pub/Subは各メッセージに一意の`message_id`を付与しているため、これを利用すればサブスクライバー側で重複メッセージを検出できます。

Firestoreによる実装

Redis(Memorystore)による実装

サブスクライバーのコード

遅延配信

Google Pub/Subは遅延配信のロジックを実装していません。メッセージは可能な限り速やかに配信されます。

メッセージ駆動型アーキテクチャを使えば、アプリ内でwait()を呼び出すサスペンドロジックを実装することなく、コードの遅延実行を実現できます。

では、コード内でwait()を使わずに遅延配信を実装するにはどうすればよいのでしょうか。

本課題に対する我々のソリューションを見ていきましょう。

Cloud Tasks APIとCloud Functionを組み合わせ、後からApp Engineアプリにメッセージをプッシュするスケジュール済みタスクを作成しました(Cloud TaskのHTTPターゲットにはApp Engineを採用しています)。

Cloud SDKでキューを作成するには、以下のgcloudコマンドを実行します。

gcloud tasks queues create delay-queue

遅延配信の構成図

Pub/Sub側では、2つのトピックを管理します。

  1. Publisherからすべてのメッセージを受け取るオリジントピック
  2. 遅延されたメッセージをすべて受け取るトピック

このCloud Functionはオリジントピックをトリガーに起動し、将来実行されるCloud Taskをキューに登録します。

App Engine側のコードは、遅延後にCloud Taskから呼び出され、最終トピックへメッセージを発行します。

FIFOキュー

Pub/Subは現時点で、順序付き配信(FIFOや優先度付き)のロジックを実装していません。

メッセージは可能な限り高速に配信され、古いメッセージが優先される傾向はあるものの、それは保証されているわけではありません — 困ったものです!この設計のため、ステップの順序が重要となる複雑なETLのようなworkloadsのトリガーとしてPub/Subを使うのは難があります。

そこで我々は、Cloud Firestoreにキューの状態を保持させることで、FIFOの挙動を実現することに成功しました。ただし、順序保証はパフォーマンスとのトレードオフであるため、こうした仕組みは大量のストリーム処理には向かない点にご注意ください。

FIFOフロー図

Cloud Firestoreでは、2つのカウンターを保持します。

  1. メッセージ番号 — 次に発行すべきメッセージ番号を示します。Publisherはメッセージを送信するたびに、この番号を取得し、1だけ加算したうえで、メッセージの属性として送信します。初期値は1です。
  2. 処理済みの最終メッセージ番号 — 直近に処理されたメッセージ番号を示します。初期値は0です。

Publisherは各メッセージに対して、以下を実行します。

補足:複数のPublisherが並列に大量処理を行うと、Publisherが「Failed to commit transaction in 5 attempts」例外をスローすることがあります。client.transaction()の呼び出し時にmax_attempts引数を渡すことで、リトライ回数を調整できます。

Subscriberは以下を実行します。

到着した各メッセージは、次の3つのいずれかに分類されます。

  1. 過去のメッセージがすべて処理済みのため、現在のメッセージをそのまま処理できる(delta=1)。
  2. 過去のメッセージのうち少なくとも1件が未処理のため、後で処理できるよう現在のメッセージをキューに戻す(delta>1)。
  3. このメッセージはすでに処理済みのため、ackするだけでよい(delta<1)。前述のとおり、過去のメッセージはさまざまな理由で再送される可能性があります。

免責事項: 現在の実装は、各メッセージが「直前のメッセージが少なくとも1回処理されるまで処理されない」ことを保証するものです。そのため極端なケースでは、現在のメッセージと並行して重複メッセージが処理される状況が起こり得ます。この状態を完全に防ぐには、先ほどご紹介した重複排除ソリューションと組み合わせる必要があります。

Firestoreのコード:

なぜFirestoreなのか?

Publisher側では、Publisherが1つだけの場合、どの永続化層を選んでも大きな違いはありません。一方で複数のPublisherが存在する場合は、Cloud FirestoreのようなACID対応ストレージを永続化層として用いる必要があります。各Publisherは1つのトランザクション内でカウンターの取得と更新を行わなければならず、こうすることで複数のPublisherが同じメッセージ番号で送信してしまう事態を防げます。

Subscriber側では、新しい値を書き込む前に、他のSubscriberがLast Message Processedカウンターを変更していないことを確認する必要があります。

本記事が皆さまのお役に立てば幸いです。コメントやご質問をお待ちしております。