Cloud Intelligence™Cloud Intelligence™

Cloud Intelligence™

Deduplication, Delayed Messaging und FIFO mit Pub/Sub

By Evgeny Varela-SavelaSep 23, 20205 min read

Diese Seite ist auch in English, Español, Français, Italiano, 日本語 und Português verfügbar.

Event-getriebene Architekturen sind auf dem Vormarsch – doch für einige fehlende Funktionen von Google Pub/Sub braucht es Workarounds. Wir zeigen unseren Ansatz für Deduplication, Delayed Messaging und FIFO mit cloud-nativen Google-Diensten.

Dieser Beitrag ist gemeinsam mit Moshe Ohaion entstanden.

Einführung

Event-getriebene Architekturen sind heute so allgegenwärtig, dass selbst meine Oma sie neulich beim Abendessen erwähnt hat. Unzählige Systemarchitekturen setzen auf Messaging-Dienste – sei es als nachrichtenorientierte Middleware oder zur Event-Aufnahme und -Auslieferung in Streaming-Analytics-Pipelines. Auf GCP (Google Cloud Platform) bietet sich dafür der Managed Service Google Pub/Sub an.

Google Pub/Sub punktet mit zentralen Features wie persistenter Nachrichtenspeicherung, Echtzeit-Auslieferung mit hoher Verfügbarkeit, gleichbleibender Performance bei Skalierung und At-least-once Delivery. Wie bei jedem Messaging-Dienst gibt es aber auch hier Lücken, die je nach Anwendungsfall zum Ausschlusskriterium werden können.

In diesem Artikel zeigen wir Ihnen einfache Wege, drei dieser "Dealbreaker" zu umgehen – ohne die native GCP-Pub/Sub-Lösung verlassen zu müssen.

Deduplication

Pub/Sub garantiert At-least-once Delivery (aber kein Exactly-once). Heißt: Gelegentliche Duplikate sind eingeplant – und können zu mehreren Problemen führen:

  • Datenintegrität – Daten könnten doppelt gezählt werden.
  • Unnötige Prozessaufrufe, die Performance und Kosten in die Höhe treiben.

Duplikate haben in der Regel zwei Ursachen:

  1. Die App hat die Nachricht nicht innerhalb der Frist bestätigt (ack).
  2. Auch nach erfolgter Bestätigung kann Pub/Sub die Nachricht erneut zustellen.

Zum ersten Fall – vorausgesetzt, das Bestätigen der Nachrichten ist im Code nicht vergessen worden:

  • Liegt die Verarbeitungszeit pro Nachricht durchgängig über der ACK-Frist der Subscription, erhöhen Sie die Frist für die gesamte Subscription.
  • Braucht eine einzelne Nachricht länger, lässt sich mit der Methode modify_ack_deadline gezielt deren ACK-Frist auf der Subscriber-Seite verlängern.

Stammen die Duplikate aus der internen Implementierung von Pub/Sub (seltener), schalten Sie am besten eine Komponente vor, die sie zuvor dedupliziert. Pub/Sub vergibt für jede Nachricht eine eindeutige `message_id`, anhand der Subscriber Duplikate erkennen können.

Firestore-Implementierung

Redis-(Memorystore-)Implementierung

Mein Subscriber-Code

Delayed Messaging

Google Pub/Sub bringt keinerlei Logik für eine verzögerte Auslieferung mit. Nachrichten werden so schnell wie möglich zugestellt.

Mit einer messagebasierten Architektur lässt sich verzögerte Codeausführung umsetzen, ohne eine eigene Suspendierungslogik zu bauen oder wait() aus der App heraus aufzurufen.

Wie also lässt sich Delayed Messaging realisieren, ohne wait() im Code einzusetzen?

Werfen wir einen Blick auf unsere Lösung:

Wir kombinieren die Cloud Tasks API mit einer Cloud Function, um geplante Tasks anzulegen, die später Nachrichten an die App-Engine-App pushen (App Engine fungiert dabei als HTTP-Target für Cloud Task).

Eine Queue legen Sie über das Cloud SDK mit folgendem gcloud-Befehl an:

gcloud tasks queues create delay-queue

Diagramm zu Delayed Messaging

In Pub/Sub verwalte ich zwei Topics:

  1. Das Ursprungs-Topic, das alle Nachrichten vom Publisher entgegennimmt.
  2. Das Topic, in dem alle verzögerten Nachrichten landen.

Diese Cloud Function wird vom Ursprungs-Topic getriggert und legt einen zukünftigen Cloud Task in der Queue an:

Der App-Engine-Code wird nach Ablauf der Verzögerung von Cloud Task ausgelöst und veröffentlicht die Nachricht im finalen Topic:

FIFO-Queue

Pub/Sub bietet bislang keinerlei Logik für eine geordnete Auslieferung (FIFO, Priorität).

Nachrichten werden so schnell wie möglich zugestellt, ältere Nachrichten werden bevorzugt – garantiert ist das aber nicht. Ein ziemliches Durcheinander! Damit eignet sich Pub/Sub nur bedingt als Trigger für workloads wie komplexe ETLs, bei denen die Reihenfolge der Schritte entscheidend ist.

Wir haben FIFO-Verhalten erfolgreich umgesetzt, indem wir den Queue-Status in Cloud Firestore ablegen. Beachten Sie: Solche Konstruktionen sind nicht für sehr hohe Durchsätze gedacht – Ordnung kostet bekanntlich Performance.

FIFO-Flussdiagramm

Cloud Firestore hält zwei Zähler vor:

  1. Die Nachrichtennummer – gibt an, welche Nummer die nächste Nachricht erhält. Jedes Mal, wenn ein Publisher eine Nachricht sendet, liest er die Nachrichtennummer, erhöht sie um 1 und übergibt sie als Attribut der Nachricht. Der Startwert ist 1.
  2. Die zuletzt verarbeitete Nachricht – gibt die Nummer der zuletzt verarbeiteten Nachricht an. Der Startwert ist 0.

Für jede Nachricht führt der Publisher Folgendes aus:

P.S. Wenn mehrere Publisher intensiv parallel arbeiten, kann es zur Exception "Failed to commit transaction in 5 attempts" kommen. Den Retry-Parameter steuern Sie, indem Sie dem Aufruf von client.transaction() das Argument max_attempts mitgeben.

Der Subscriber führt Folgendes aus:

Jede eingehende Nachricht fällt in eine von drei Kategorien:

  1. Alle vorherigen Nachrichten wurden verarbeitet – die aktuelle Nachricht kann also verarbeitet werden (delta=1).
  2. Mindestens eine vorherige Nachricht wurde noch nicht verarbeitet – die Nachricht geht zurück in die Queue zur späteren Verarbeitung (delta>1).
  3. Die Nachricht wurde bereits verarbeitet – wir müssen sie nur bestätigen (delta<1). Wie erwähnt, können frühere Nachrichten aus unterschiedlichen Gründen erneut zugestellt werden.

Disclaimer: Die aktuelle Implementierung stellt sicher, dass eine Nachricht erst dann verarbeitet wird, wenn die vorherige Nachricht mindestens einmal verarbeitet wurde. In Extremfällen kann es deshalb passieren, dass eine doppelte Nachricht parallel zur aktuellen Nachricht verarbeitet wird. Um das vollständig auszuschließen, integrieren Sie zusätzlich die zuvor vorgestellte Dedupe-Lösung.

Firestore-Code:

Warum Firestore?

Auf der Publisher-Seite ist die Wahl der Persistenzschicht bei nur einem Publisher zweitrangig. Sobald aber mehrere Publisher im Spiel sind, muss es ein ACID-fähiges System wie Cloud Firestore sein. Jeder Publisher muss den Zähler in einer einzigen Transaktion lesen und setzen – sonst senden zwei Publisher womöglich Nachrichten mit derselben Nachrichtennummer.

Auf der Subscriber-Seite muss sichergestellt sein, dass kein anderer Subscriber den Zähler "Last Message Processed" verändert hat, bevor der neue Wert geschrieben wird.

Ich hoffe, dieser Beitrag ist hilfreich für Sie. Über Kommentare und Fragen freue ich mich.