Prefacio
En este blog te mostramos una forma de resolver un reto conocido en los sistemas de data warehouse: mantener los datos frescos y actualizados sin recurrir a mutaciones a gran escala.
En DoiT trabajamos con muchos clientes para diseñar sistemas bien arquitectados y aprovechar los servicios de la nube de manera eficiente. Lo que sigue surge de la experiencia con uno de ellos.
Contexto
Una empresa SaaS ofrece a sus clientes una plataforma analítica sobre BigQuery. En el caso de algunos clientes, los datos llegan como un snapshot completo, con todo el histórico y no solo los cambios incrementales desde la última actualización.
Uno de estos clientes entrega un snapshot de 15 TB, aunque los datos realmente actualizados representan apenas el 0.1%. Como no había forma de recibir solo el incremental, el equipo se enfrentó al reto de encontrar la manera más confiable y eficiente —en rendimiento y costo— de mantener la tabla existente al día con cada nuevo snapshot.
Requisitos del data pipeline
La empresa y el cliente definieron un contrato de datos para que el pipeline cumpliera con los requisitos de negocio y técnicos:
- Los datos entrantes son un snapshot completo de una ventana deslizante de dos años, con una cadencia de seis llegadas al día.
- Los datos están particionados por día (cada partición pesa cerca de 20 GiB) y se marcan con un snapshot ID (un valor incremental).
- Los datos contienen registros nuevos (claves nuevas) y registros modificados (actualizaciones de datos existentes).
- La expiración de particiones está configurada a 2 años.
- Los registros que están en el snapshot actual pero no aparecen en el nuevo se conservan hasta que expire la partición.
El equipo de producto de la empresa SaaS definió los siguientes requisitos:
- Modelado: dos tablas contendrán los datos existentes y los nuevos.
Los datos nuevos se alojan en una tabla llamada "staging", y los datos existentes (los que ve el cliente) en una tabla llamada "target".
- Unicidad: los datos de la tabla "target" son únicos (sin claves duplicadas).
- Frescura: la tabla "target" se actualiza dentro de la hora siguiente a la llegada del snapshot.
- Disponibilidad: el usuario final siempre obtiene respuesta a su consulta.
El reto de la mutación
Como explica el siguiente blog de Google, BigQuery no está diseñado como las bases de datos OLTP transaccionales, que son nativamente eficientes en mutaciones de gran tamaño.
"BigQuery no es la única entre las bases de datos OLAP con restricciones en la frecuencia de mutación, ya sea de forma explícita (mediante cuotas) o implícita (con una degradación significativa del rendimiento), porque este tipo de bases de datos están optimizadas para la ingesta a gran escala y las consultas analíticas, no para el procesamiento transaccional.
Además, BigQuery te permite leer el estado de una tabla en cualquier instante de los siete días anteriores. Esa ventana de retroceso obliga a conservar datos que el usuario ya borró. Para mantener consultas eficientes y rentables a escala, BigQuery limita la frecuencia de las mutaciones mediante cuotas". (Performing large-scale mutations in BigQuery | Google Cloud Blog)
Un reemplazo total —que borre el snapshot anterior y lo sustituya por el nuevo— no alcanza, porque hay que conservar los registros que existen en el snapshot actual pero no aparecen en el último.
Es decir, hay que comparar los registros entre el snapshot existente y el nuevo, algo que el reemplazo simple no hace.
Para mostrar las opciones de mutación y sus diferencias, definamos el problema con precisión, con su modelado de datos y su lógica de mutación.
Modelado de datos
Esquema de la tabla:
Un registro de una tabla ("staging" o "target") contiene un attribute_id y un valor para un producto específico (product_id) en una tienda (store_id) en una fecha determinada, además de la referencia al snapshot en el que existió (update_id).

Ejemplo de registro:
Según los datos recibidos en el snapshot ID 1 (update_id), el valor de cantidad total (attribute_id = 1) de un producto (product_id = 301865) en una tienda (store_id = 2072) era 20 el 2017-04-21.
Ejemplo de la lógica de mutación:
A continuación se muestra una muestra de una partición (fecha específica) tomada de dos snapshots distintos.

Tabla "target" tras la ingesta del snapshot #1
El snapshot #2 traía los siguientes cambios:
1. Azul: registros actualizados (aparecen en ambos snapshots), el valor cambió.
2. Rojo: registro histórico (aparece solo en el snapshot #1).
3. Verde: registro nuevo (aparece solo en el snapshot #2).

Tabla "target" tras la ingesta del snapshot #2
Como BigQuery soporta MERGE, fusionar la tabla staging con la tabla target es el enfoque intuitivo, aplicando la siguiente lógica.
Lógica del merge
Esta lógica de MERGE define las mutaciones que se aplican según los distintos casos, a partir de una clave compuesta por los siguientes campos: product_id, store_id, attribute_id.

Esta es la implementación en código de la lógica de MERGE:
MERGE
nw-playground.demo.merge_target T
USING
nw-playground.demo.merge_staging S
ON
T.product_Id = S.product_Id
AND T.store_Id = S.store_Id
AND T.attribute_Id = S.attribute_Id
AND T.date = S.date
WHEN MATCHED and T.date >= '2017-04-14' and T.date <= '2017-04-26'
THEN UPDATE SET T.attr_value = S.attr_value, T.update_id = S.update_id
WHEN NOT MATCHED BY TARGET and S.date >= '2017-04-14' and S.date <= '2017-04-26'
THEN
INSERT
(date,
product_Id,
store_Id,
attribute_Id,
attr_value,
update_id)
VALUES
(S.date, S.Product_id, S.store_Id, S.attribute_Id, S.attr_value, S.update_id);
El problema:
Como dijimos antes, la operación MERGE es muy costosa en BQ y tiene limitaciones, ya que requiere shuffling y mutación de los datos existentes.
Por eso necesitamos una forma de evitar el MERGE y la mutación de datos.
Enfoque de deduplicación y clonado
¿Qué vamos a hacer? En lugar de fusionar los registros de las tablas "staging" y "target", seguimos estos pasos:
- Anexar el nuevo snapshot a la tabla "staging".
- Deduplicar los registros similares y conservar el más reciente.
La deduplicación es un proceso para eliminar registros duplicados y dejar uno solo. Se logra definiendo una clave que identifica los registros similares y una lógica que decide cuál conservar entre los duplicados (por ejemplo, el más reciente).
3. Reemplazar la tabla "staging" con el resultado de la deduplicación.
4. Reemplazar la tabla "target" con un CLONE de la tabla "staging".
"CLONE Un clon de tabla es una copia ligera y modificable de otra tabla (llamada tabla base). Solo se cobra por el almacenamiento de los datos del clon que difieren de la tabla base, así que al inicio no hay costo de almacenamiento por el clon . Salvo por el modelo de facturación del almacenamiento y algunos metadatos adicionales para la tabla base, un clon de tabla se comporta como una tabla estándar: lo puedes consultar, copiar, eliminar, etc. Una vez creado, el clon es independiente de la tabla base. Los cambios hechos en la tabla base o en el clon no se reflejan en la otra. " table-clones-intro
¿Cómo lo hacemos?
- Anexado de datos: con el comando
LOADen modo "append", para que la tabla staging contenga el snapshot actual y el nuevo. - Deduplicación: deduplicar los registros similares (misma clave) y conservar el más reciente. La deduplicación no afecta a los registros completamente nuevos ni a los que existen solo en el snapshot actual. Se implementa con funciones de ventana, que calculan valores sobre un grupo de filas, junto con la operación
QUALIFY. La cláusulaQUALIFYpermite filtrar los resultados de una función de ventana (grupo de filas). - Reemplazar "staging": con
CREATE OR REPLACEa partir de los resultados de la consulta de deduplicación. - Clonar "staging" en "target": con
CREATE OR REPLACE TABLE ... CLONE.
¿Por qué lo hacemos así?
Este enfoque ofrece varias ventajas frente al de MERGE:
1. Menos shuffling de datos: anexar el nuevo snapshot al existente evita el shuffling entre dos tablas.
2. Sin operación MERGE/JOIN: la deduplicación se implementa con una función de ventana que usa la cláusula PARTITION BY, la cual divide las filas de entrada en particiones independientes, sobre las que la función de ventana se evalúa por separado.
3. Sin mutación: se crean o clonan tablas nuevas. Así se evitan los límites de cuota de mutación de BigQuery.
Ejemplo de la lógica de anexado y deduplicación:

Tabla staging tras anexar el nuevo snapshot (#2)
Igual que con el resultado del MERGE, la tabla tras la deduplicación contiene los siguientes cambios:
1. Azul: registros actualizados (aparecen en ambos snapshots), el valor cambió.
2. Rojo: registros antiguos que se deben conservar (aparecen solo en el snapshot #1).
3. Verde: registro nuevo (aparece solo en el snapshot #2).

Tabla staging tras la deduplicación
Cuando llegue el siguiente snapshot, se repite el mismo proceso.
Ejemplos de código:
El siguiente código crea una nueva tabla staging a partir de los resultados de la deduplicación, que reemplaza la existente con la cláusula QUALIFY y agrega un número de fila a cada registro con clave similar; el primer registro (rownum=1) es el más reciente.
CREATE OR REPLACE TABLE
`nw-playground.demo.dedup_staging`
(date DATE, product_id INT64,
store_id INT64, attribute_id INT64,
attr_value INT64, update_id INT64)
PARTITION BY
date
CLUSTER BY
product_id,
store_id AS
SELECT
date, product_id, store_id, attribute_id, attr_value, update_id
FROM
`nw-playground.demo.dedup_staging`
WHERE
date >= '2017-04-14' AND date <= '2017-04-26'
QUALIFY ROW_NUMBER()
OVER(PARTITION BY date, product_id, store_id, attribute_id
ORDER BY update_id DESC ) = 1;
CREATE OR REPLACE TABLE
`nw-playground.nw_demo.dedup_target`
CLONE `nw-playground.nw_demo.dedup_staging`;
Datos de prueba y resultados
Las pruebas se hicieron sobre 7,800 millones de registros (350 GiB) distribuidos en 13 particiones, con cerca de 3 millones de diferencias (de todos los tipos de cambio) entre las tablas "staging" y "target". Se ejecutaron con el modelo de facturación on-demand.
En el caso del MERGE, la tabla "staging" contenía solo el nuevo snapshot y la "target" tenía el snapshot existente. En el caso de la deduplicación, en cambio, la tabla staging tenía el snapshot existente y el nuevo, y la "target" era un clon del primer snapshot de staging.
Comparación de resultados
La siguiente tabla muestra la comparación de resultados:

Resumen comparativo de los detalles de ejecución
Al comparar los planes de ejecución, queda claro que la duración y la utilización de slots de la prueba con MERGE son el doble que en la de DEDUPLICATE:

Duración de la operación principal y utilización de slots
Comparación de costos
Modelo de facturación on-demand:
El costo es similar entre las pruebas porque se escanea el mismo volumen de datos. En una carga completa rondaría los 187 USD (6.25 USD por TiB), considerando 30 TB y 2 snapshots.
BigQuery Editions:
El nuevo modelo de facturación que Google introdujo en abril de 2023 se basa en precios por capacidad (pago por slot/hora) y tiene tres ediciones. Para estimar el costo de una carga en producción, podemos calcular una carga completa de snapshot con un factor de 56 frente a la prueba que hicimos (730 días-particiones en 2 años, 730/13=56).
La estimación no es exacta, ya que depende también de las distintas configuraciones y commitments que se apliquen. Sirve sobre todo para mostrar la diferencia entre las dos opciones.

Comparación de costos estimados entre ediciones de BigQuery (precios de EE. UU.)
Apéndice
Existen varias formas de implementar la deduplicación (con ROWNUM o GROUP BY). Probarla con ROWNUM dio un rendimiento similar al de la cláusula QUALIFY, aunque QUALIFY es un comando más limpio. Usar "GROUP BY" arrojó un peor rendimiento, idéntico al de los resultados con Merge.
Hay otra alternativa eficiente para implementarlo: procedimientos almacenados para Apache Spark, que abordaremos en el próximo blog…
Como vimos en el caso de uso, hay una mutación a gran escala que depende de una lógica de join con los datos existentes. La mejor manera de abordarla es evitar las modificaciones sobre los datos.
Apoyarse en la deduplicación y en CLONE evita las limitaciones de DML y mejora de forma notable ambas dimensiones —tiempo transcurrido y utilización de slots—, con una reducción cercana al 50% en rendimiento y costo.