Cloud Intelligence™Cloud Intelligence™

Cloud Intelligence™

BigQuery: Frische Daten ohne aufwendige Massen-Mutationen

By Nadav WeissmanMay 7, 20239 min read

Diese Seite ist auch in English, Español, Français, Italiano, 日本語 und Português verfügbar.

BigQuery — Daten aktuell halten und Massen-Mutationen vermeiden

Foto von eMotion Tech auf Unsplash

Vorwort

Dieser Blogbeitrag stellt einen Ansatz für eine bekannte Herausforderung in Data-Warehouse-Systemen vor: Daten aktuell halten und gleichzeitig große Mutationen vermeiden.

Bei DoiT begleiten wir viele Kunden beim Aufbau gut architektierter Systeme und beim effizienten Einsatz von Cloud-Diensten. Der folgende Beitrag basiert auf einer dieser Kundenstorys.

Hintergrund

Ein SaaS-Unternehmen bietet seinen Kunden eine Analytics-Plattform auf Basis von BigQuery. Bei manchen Kunden kommen die Daten als vollständiger Snapshot an – mit der gesamten Historie und nicht nur mit den inkrementellen Änderungen seit dem letzten Update.

Einer dieser Kunden liefert einen 15 TB großen Snapshot, wobei die tatsächlich aktualisierten Daten nur 0,1 % ausmachen. Da sich die inkrementellen Änderungen nicht isoliert beziehen ließen, stand das Team vor der Aufgabe, einen möglichst zuverlässigen sowie performance- und kostenseitig effizienten Weg zu finden, um die bestehende Tabelle mit den neuen Snapshot-Daten aktuell zu halten.

Anforderungen an die Datenpipeline

Unternehmen und Kunde definierten einen Datenvertrag für die Pipeline, um die fachlichen und technischen Anforderungen zu erfüllen:

  • Eingehende Daten sind ein vollständiger Snapshot eines rollierenden Zwei-Jahres-Fensters und treffen sechsmal täglich ein.
  • Die Daten sind nach Tag partitioniert (Partitionsgröße ca. 20 GiB) und mit einer Snapshot-ID (inkrementeller Wert) versehen.
  • Die Daten enthalten neue Datensätze (neue Schlüssel) und geänderte Datensätze (Updates bestehender Daten).
  • Partition Expiration ist auf 2 Jahre konfiguriert.
  • Datensätze, die im aktuellen Snapshot vorhanden sind, im neuen Snapshot jedoch fehlen, bleiben bis zur Partition Expiration erhalten.

Das Produktteam des SaaS-Unternehmens definierte folgende Anforderungen:

  • Modellierung: Zwei Tabellen halten die bestehenden und die neuen Daten.

Neue Daten liegen in einer Tabelle namens "staging", bestehende Daten (die dem Kunden bereitgestellten Daten) in einer Tabelle namens "target".

  • Eindeutigkeit: Die Daten in der Tabelle "target" sind eindeutig (keine doppelten Schlüssel).
  • Aktualität: Die Tabelle "target" wird innerhalb einer Stunde nach Eintreffen des Snapshots aktualisiert.
  • Verfügbarkeit: Endnutzer erhalten stets eine Antwort auf ihre Abfragen.

Die Mutations-Herausforderung

Wie der folgende Google-Blog beschreibt, ist BigQuery anders aufgebaut als transaktionale OLTP-Datenbanken, die von Haus aus effizient mit umfangreichen Mutationen umgehen.

"BigQuery is not unique among OLAP databases in being constrained on mutation frequency, either explicitly (through quotas) or implicitly (resulting in significant performance degradation), since these types of databases are optimized for large-scale ingestion and analytical queries as opposed to transaction processing. Furthermore, BigQuery allows you to read the state of a table at any instant in time during the preceding seven days. This look-back window requires retaining data that the user has already deleted. To support cost-effective and efficient queries at scale, BigQuery limits the frequency of mutations using quotas." ( Performing large-scale mutations in BigQuery | Google Cloud Blog )

Den alten Snapshot einfach vollständig durch den neuen zu ersetzen, reicht nicht aus: Datensätze, die zwar im aktuellen, aber nicht mehr im neuesten Snapshot enthalten sind, müssen erhalten bleiben.

Es ist also ein Abgleich zwischen den Datensätzen des bestehenden und des neuen Snapshots nötig – genau das leistet ein klassischer Replace-Ansatz nicht.

Um die Mutations-Optionen und ihre Unterschiede zu zeigen, definieren wir das Problem präzise mit konkreter Datenmodellierung und Mutationslogik.

Datenmodellierung

Tabellenschema:

Ein Datensatz in einer Tabelle ("staging" oder "target") enthält eine attribute_id und einen Wert für ein bestimmtes Produkt (product_id) in einer Filiale (store_id) zu einem bestimmten Datum sowie die Referenz auf den Snapshot, in dem er existierte (update_id).

Datenmodell

Beispiel für einen Datensatz:

Auf Basis der Daten aus Snapshot-ID 1 (update_id) lag die Gesamtmenge (attribute_id = 1) eines Produkts (product_id = 301865) in einer Filiale (store_id = 2072) am 21.04.2017 bei 20.

Beispiel für die Mutationslogik:

Im Folgenden ein Auszug aus einer Partition (bestimmtes Datum) zweier verschiedener Snapshots.

Tabelle nach Snapshot 1

Tabelle "target" nach dem Laden von Snapshot #1

Snapshot #2 enthielt folgende Änderungen:

1. Blau: aktualisierte Datensätze (in beiden Snapshots vorhanden), Wert geändert.

2. Rot: historischer Datensatz (nur in Snapshot #1 vorhanden).

3. Grün: neuer Datensatz (nur in Snapshot #2 vorhanden).

DoiT

Tabelle "target" nach dem Laden von Snapshot #2

Da BigQuery MERGE unterstützt, liegt es nahe, die Staging-Tabelle mit der Target-Tabelle zu mergen – nach folgender Logik.

Merge-Logik

Diese MERGE-Logik definiert die Mutationen je nach Fall, basierend auf einem Joined Key aus den Feldern: product_id, store_id, attribute_id.

Merge-Logik

Hier die Code-Implementierung der MERGE-Logik:

MERGE
nw-playground.demo.merge_target T
USING
nw-playground.demo.merge_staging S
ON
T.product_Id = S.product_Id
AND T.store_Id = S.store_Id
AND T.attribute_Id = S.attribute_Id
AND T.date = S.date
WHEN MATCHED and T.date >= '2017-04-14' and T.date <= '2017-04-26'
THEN UPDATE SET T.attr_value = S.attr_value, T.update_id = S.update_id
WHEN NOT MATCHED BY TARGET and S.date >= '2017-04-14' and S.date <= '2017-04-26'
THEN
INSERT
(date,
product_Id,
store_Id,
attribute_Id,
attr_value,
update_id)
VALUES
(S.date, S.Product_id, S.store_Id, S.attribute_Id, S.attr_value, S.update_id);

Problem:

Wie eingangs erwähnt, ist die MERGE-Operation in BigQuery sehr teuer und unterliegt Limits, da sie ein Shuffling und eine Mutation der bestehenden Daten erfordert.

Wir brauchen also einen Weg, der ohne MERGE und ohne Datenmutationen auskommt.

Ansatz: Deduplizierung und Clone

Was machen wir?

Statt die Datensätze der Tabellen "staging" und "target" zu mergen, gehen wir wie folgt vor:

  1. Den neuen Snapshot an die Tabelle "staging" anhängen.
  2. Ähnliche Datensätze deduplizieren und nur den neuesten behalten.

Deduplizierung ist ein Verfahren, das doppelte Datensätze entfernt und nur einen einzigen behält. Sie wird umgesetzt, indem ein Schlüssel zur Identifikation ähnlicher Datensätze definiert wird – plus eine Logik, die festlegt, welcher der Duplikate erhalten bleibt (z. B. der neueste). 3. Die Tabelle "staging" durch das Deduplizierungs-Ergebnis ersetzen. 4. Die Tabelle "target" durch einen CLONE der Tabelle "staging" ersetzen.

"CLONE A table clone is a lightweight, writeable copy of another table (called the base table). You are only charged for data storage in the table clone that differs from the base table, so initially, there is no storage cost for a table clone. Other than the billing model for storage and some additional metadata for the base table, a table clone is similar to a standard table — you can query it, make a copy of it, delete it, and so on. After you create a table clone, it is independent of the base table. Any changes made to the base table or table clone aren't reflected in the other." table-clones-intro

Wie machen wir das?

  1. Daten anhängen: über den LOAD-Befehl im Modus "append", sodass die Staging-Tabelle den aktuellen und den neuen Snapshot enthält.
  2. Deduplizierung: Ähnliche Datensätze (gleicher Schlüssel) deduplizieren und davon den neuesten behalten. Datensätze, die komplett neu sind oder nur im aktuellen Snapshot existieren, bleiben unangetastet. Umgesetzt wird die Deduplizierung über Window Functions, die Werte über eine Gruppe von Zeilen berechnen, sowie über die QUALIFY-Operation. Die QUALIFY-Klausel kann die Ergebnisse einer Window Function (Gruppe von Zeilen) filtern.
  3. "staging" ersetzen: per CREATE OR REPLACE auf Basis der Ergebnisse der Deduplizierungs-Abfrage.
  4. "staging" als "target" klonen: per CREATE OR REPLACE TABLE ... CLONE

Warum machen wir das?

Gegenüber dem MERGE-Ansatz ergeben sich gleich mehrere Vorteile:

1. Weniger Daten-Shuffling: Das Anhängen des neuen Snapshots an den bestehenden vermeidet das Shuffling zwischen zwei Tabellen.

2. Keine MERGE/JOIN-Operation: Die Deduplizierung erfolgt über eine Window Function mit der PARTITION BY-Klausel, die die Eingabezeilen in einzelne Partitionen aufteilt, über die die Window Function unabhängig ausgewertet wird.

3. Keine Mutation: Es werden neue Tabellen erstellt oder geklont. Damit greifen die Mutations-Quotenlimits von BigQuery nicht.

Beispiel für die Append- und Deduplizierungslogik:

Anhängen

Staging-Tabelle nach dem Anhängen des neuen Snapshots (#2)

Analog zum MERGE-Ergebnis enthält die Tabelle nach der Deduplizierung folgende Änderungen:

1. Blau – aktualisierte Datensätze (in beiden Snapshots vorhanden), Wert geändert.

2. Rot – alte Datensätze, die erhalten bleiben sollen (nur in Snapshot #1 vorhanden).

3. Grün – neuer Datensatz (nur in Snapshot #2 vorhanden).

nach Deduplizierung

Staging-Tabelle nach der Deduplizierung

Trifft der nächste Snapshot ein, läuft derselbe Prozess erneut ab.

Code-Beispiele:

Der folgende Code erzeugt aus den Deduplizierungs-Ergebnissen eine neue Staging-Tabelle, die die bestehende ersetzt. Dazu wird die QUALIFY-Klausel verwendet und jedem Datensatz mit gleichem Schlüssel eine Zeilennummer zugewiesen, wobei der erste Datensatz (rownum=1) der neueste ist.

CREATE OR REPLACE TABLE
`nw-playground.demo.dedup_staging`
(date DATE, product_id INT64,
store_id INT64, attribute_id INT64,
attr_value INT64, update_id INT64)
PARTITION BY
date
CLUSTER BY
product_id,
store_id AS
SELECT
date, product_id, store_id, attribute_id, attr_value, update_id
FROM
`nw-playground.demo.dedup_staging`
WHERE
date >= '2017-04-14' AND date <= '2017-04-26'
QUALIFY ROW_NUMBER()
OVER(PARTITION BY date, product_id, store_id, attribute_id
ORDER BY update_id DESC ) = 1;
CREATE OR REPLACE TABLE
`nw-playground.nw_demo.dedup_target`
CLONE `nw-playground.nw_demo.dedup_staging`;

Testdaten und Ergebnisse

Getestet wurde mit 7,8 Milliarden Datensätzen (350 GiB), verteilt auf 13 Partitionen, mit ca. 3 Mio. Unterschieden (alle Änderungstypen) zwischen "staging" und "target". Die Tests liefen im On-Demand-Abrechnungsmodell.

Im MERGE-Fall enthielt die Tabelle "staging" nur den neuen Snapshot und die Tabelle "target" den bestehenden Snapshot. Im Deduplizierungs-Fall enthielt die Staging-Tabelle dagegen den bestehenden und den neuen Snapshot, und "target" war ein Klon des ersten Staging-Snapshots.

Vergleich der Ergebnisse

Die folgende Tabelle zeigt den Vergleich der Ergebnisse:

Tabellenvergleich

Zusammenfassender Vergleich der Ausführungsdetails

Im Vergleich der Execution Plans wird deutlich, dass Laufzeit und Slot-Auslastung beim MERGE-Test doppelt so hoch sind wie bei der DEDUPLICATE-Variante:

Ausführung

Laufzeit und Slot-Auslastung der Hauptoperation

Kostenvergleich

On-Demand-Abrechnungsmodell:

Die Kosten sind in beiden Tests vergleichbar, da dieselbe Datenmenge gescannt wird. Bei einem vollständigen Load liegen sie auf Basis von 30 TB / 2 Snapshots bei rund 187 $ (6,25 $ pro 1 TiB).

BigQuery Editions:

Das von Google im April 2023 eingeführte neue Abrechnungsmodell basiert auf Capacity Pricing (Bezahlung pro Slot/Stunde) und umfasst drei Editions. Für die Schätzung der Produktionskosten lässt sich ein vollständiger Snapshot-Load mit dem Faktor 56 gegenüber unserem Test hochrechnen (730 Tagespartitionen in 2 Jahren, 730/13=56).

Die Schätzung ist nicht exakt, da je nach Konfiguration und Commitments noch weitere Faktoren hinzukommen. Sie verdeutlicht aber den Unterschied zwischen den beiden Optionen.

BigQuery

Kostenvergleich der BigQuery Editions (US-Pricing)

Anhang

Es gibt mehrere Wege, eine Deduplizierung umzusetzen (etwa mit ROWNUM oder GROUP BY). Die Variante mit ROWNUM lieferte ähnliche Ergebnisse wie die QUALIFY-Klausel; QUALIFY ist jedoch der elegantere Befehl. "GROUP BY" schnitt schlechter ab und entsprach den Ergebnissen von MERGE.

Eine weitere effiziente Umsetzung ist über Stored Procedures für Apache Spark möglich – mehr dazu im nächsten Blogbeitrag.

Wie der Use Case zeigt, geht es um eine Massen-Mutation, die auf einer Join-Logik mit den bestehenden Daten beruht. Der bevorzugte Weg dorthin: Änderungen an den Daten möglichst vermeiden.

Die Kombination aus "Deduplizierung" und CLONE umgeht DML-Limitierungen und verbessert beide Dimensionen – Elapsed Time und Slot-Auslastung – deutlich, mit einer Performance- und Kostenreduktion von rund 50 %.