Estado Actual con ClickHouse® ReplacingMergeTree
Un exchange produce un flujo continuo de eventos de actualización de ticks. Cada cambio de liquidez, cada ajuste de reservas. Los batches pesan 25MB. Llegan con menos de un segundo entre ellos. Los datos tienen que actualizarse en tiempo real para los traders.
De todo este torrente solo necesitamos una cosa: el estado actual expuesto vía API. Qué ticks están activos ahora mismo y cuáles son sus reservas.
Lo montamos con ReplacingMergeTree e is_deleted, servido a través de Cloudflare Durable Objects. Este post asume que sabes cómo funciona el engine. Si no, el deep dive de ReplacingMergeTree lo cubre de cabo a rabo. También puedes empezar por el inicio de la serie.
El problema
Si no vienes del mundo del trading, un tick es un punto de precio en un par de trading. Contiene reservas: los fondos disponibles para operar a ese precio. Los traders miran los ticks para saber qué precios existen y cuánta liquidez hay en cada uno. Todos los ticks activos juntos forman el libro de órdenes. Investopedia tiene una buena intro si el concepto te suena nuevo.
Los ticks cambian constantemente. Se crea uno cuando alguien aporta liquidez. Sus reservas se actualizan cada vez que un trade pasa por él. Las reservas caen a cero y el tick está muerto.
El volumen no para. Miles de actualizaciones por batch, batches llegando cada menos de un segundo, datos acumulándose. Se emiten millones de actualizaciones de ticks al día, pero solo unos miles de ticks por par están activos a la vez. Sin forma de limpiar los ticks muertos, las queries escanean demasiados datos. Y no te puedes permitir perder ni un solo batch, porque un hueco significa que ciertas cancelaciones de ticks nunca llegan y el estado queda mal de ahí en adelante.
A los traders todo esto les da igual. Necesitan el libro de órdenes en tiempo real para tomar decisiones informadas.
Todo es append-only. No hay un solo UPDATE en el stack. En Postgres, harías un upsert: INSERT ... ON CONFLICT DO UPDATE. Sencillo a bajo volumen. Pero con este throughput, Postgres se convierte en cuello de botella. Cada UPDATE crea una nueva versión de la tupla, y la antigua se queda hasta que VACUUM la limpia. Con miles de actualizaciones de ticks por segundo, el autovacuum no da abasto. El write amplification llega a 3-5x. La tabla se hincha. Los endpoints revientan.
Necesitábamos algo capaz de tragarse un batch de 25MB en menos de un segundo y responder "cuál es el estado actual" sin ahogarse. ClickHouse fue la respuesta.
Pipeline de ingesta
ClickHouse no está hecho para gestionar reintentos, y perder datos aquí no es opción. Un solo evento perdido se traduce en cancelaciones de ticks que nunca llegan, lo que significa servir estado obsoleto a los traders. Así que pusimos Google Cloud Pub/Sub delante. Los eventos brutos van ahí primero, un mensaje por batch. Si el worker falla al procesar un batch, cae en una dead-letter queue y se reintenta de inmediato.
El worker recoge cada mensaje, extrae los eventos de actualización de ticks y los escribe primero en una tabla fuente MergeTree. Los datos brutos quedan siempre consultables. Cuando algo no cuadra en el estado de los ticks, volvemos a los eventos fuente y debuggeamos desde ahí.
A partir de aquí, las materialized views toman el control. Se disparan en cada INSERT a la tabla fuente, transforman y normalizan los datos hacia la tabla ReplacingMergeTree tick_state. Los inserts duplicados no son problema: el engine se encarga de la dedup.
El schema
ReplacingMergeTree te deja insertar nuevas versiones de una fila y que ClickHouse se quede solo con la última. Sin UPDATE, sin locking: solo append y que los merges de fondo dedupliquen. Tiene dos parámetros relevantes. El primero, version, identifica la columna de versión. El segundo, is_deleted, permite eliminar físicamente las filas que marques como muertas. Sin él, la tabla crece demasiado y las queries tardan varios segundos.
CREATE TABLE tick_state (
asset_0 String,
asset_1 String,
asset_in String,
tick_index Int64,
fee UInt64,
updated_at DateTime64(6),
reserves UInt256,
reserves_zero UInt8,
ingestion_timestamp DateTime64(3),
INDEX idx_ingestion_ts ingestion_timestamp TYPE minmax GRANULARITY 1
) ENGINE = ReplacingMergeTree(updated_at, reserves_zero)
ORDER BY (asset_0, asset_1, asset_in, tick_index, fee)
SETTINGS
index_granularity = 1024,
min_age_to_force_merge_seconds = 21600;
index_granularity = 1024 da granularidad más fina que el valor por defecto (8192). El dataset activo es pequeño, y gránulos más pequeños implican menos datos escaneados por point query.
min_age_to_force_merge_seconds = 21600 fuerza merges en partes de más de 6 horas, que es lo que dispara la limpieza física de las filas is_deleted. La ventana de 6 horas no es arbitraria. Los marcadores de borrado tienen que mantenerse el tiempo suficiente para que lleguen datos tardíos. Si un batch se ingesta fuera de orden y el marcador de borrado de ese tick ya desapareció, FINAL no tiene nada contra lo que resolver y la fila obsoleta se hace visible. Seis horas dan margen para reintentos y reprocesado sin arriesgar ticks fantasma. Sin este setting, los ticks muertos se acumulan hasta que ClickHouse decide fusionar por su cuenta. La limpieza también requiere activar allow_experimental_replacing_merge_with_cleanup a nivel de servidor. La feature sigue marcada como experimental, pero funciona.
ORDER BY hace doble función aquí. Esas cinco columnas identifican un tick de forma única y son a la vez la clave de dedup. Cuando ReplacingMergeTree detecta dos filas con los mismos valores de ORDER BY, se queda con la de updated_at más alto y elimina la otra.
reserves_zero es la columna is_deleted (segundo argumento de ReplacingMergeTree). Cuando las reservas de un tick caen a cero, ponemos reserves_zero = 1. ClickHouse lo trata como señal de borrado y elimina físicamente la fila durante los merges de limpieza.
Millones de cambios diarios en el estado de los ticks. Unos miles de ticks activos. Misma tabla, mismas queries. Eso es lo que te da is_deleted.
Cálculo de estado
FINAL deduplica en query time. Para cada clave ORDER BY, se queda con la fila de updated_at más alto y descarta el resto. Desde la v22.6, FINAL es multi-threaded. La penalización de rendimiento que hacía que la gente lo evitase ya casi no existe. El post anterior explica cómo funciona por dentro.
SELECT *
FROM tick_state FINAL
WHERE asset_0 = {asset0:String}
AND asset_1 = {asset1:String}
AND reserves_zero = 0
FINAL deduplica, WHERE reserves_zero = 0 filtra los ticks muertos. Ya está. Sin GROUP BY, sin funciones de agregación. Funciona incluso antes de que los merges de fondo hayan limpiado las filas is_deleted, porque FINAL resuelve duplicados en query time sin importar el estado de los merges.
Queries delta
Las queries de estado completo escanean todos los ticks de un par de trading. Para la carga inicial, perfecto. Un desperdicio cuando haces polling cada segundo y la mayoría de ticks no han cambiado.
Añadimos un filtro de ingestion_timestamp:
SELECT *
FROM tick_state FINAL
WHERE asset_0 = {asset0:String}
AND asset_1 = {asset1:String}
AND ingestion_timestamp > parseDateTimeBestEffort('{lastTimestamp}')
Por qué ingestion_timestamp y no updated_at: porque ingestion_timestamp refleja cuándo llegaron los datos a nuestro sistema, no cuándo pasó el evento en el origen. A la capa de polling le importa eso: dame todo lo que hayas ingestado desde la última vez que pregunté.
Sin índice en ingestion_timestamp, esta query era inusable. La columna no forma parte del ORDER BY, así que ClickHouse no podía saltar gránulos y escaneaba todo. Añadimos un índice minmax y las queries delta bajaron a menos de 200ms.
Los clientes llaman al endpoint, obtienen el estado completo, y se suscriben a un stream WebSocket para los deltas. Se hace polling cada segundo con la query de arriba. Cada 3 minutos se lanza un refresco completo como red de seguridad. Cinturón y tirantes.
Capa de servicio: Cloudflare Durable Objects
ClickHouse se encarga del almacenamiento y la dedup. Pero no es un sistema de entrega en tiempo real. Los traders necesitan actualizaciones en vivo por WebSocket, y no quieres cientos de clientes conectados haciendo polling a ClickHouse directamente.
Pusimos Cloudflare Durable Objects delante. Es una instancia serverless con estado, single-threaded, globalmente única, cuya memoria persiste durante el ciclo de vida de la instancia. Una instancia por par de trading.
Cada DO mantiene el estado actual de los ticks en memoria. Cuando se conecta el primer usuario, obtiene el historial completo. A partir de ahí, hace polling a ClickHouse con queries delta cada segundo y reenvía los cambios por todas las conexiones abiertas. Como red de seguridad, ejecuta un refresco completo cada 3 minutos, calcula el diff entre estado antiguo y nuevo, y difunde solo los cambios a los clientes conectados.
Lo que más me gusta de este setup: la dedup de fetches en vuelo. Si 50 clientes se conectan al mismo par de trading en el mismo instante, solo se dispara una query a ClickHouse. Todos los demás esperan la misma promise:
if (this.cached) return this.cached.clone()
if (!this.inflight) {
this.inflight = this.fetchFromClickHouse().then((response) => {
this.cached = response
this.inflight = undefined
return response
})
}
return await this.inflight
El routing singleton por clave (integrado en los DOs) asegura que todas las peticiones para el mismo par lleguen a la misma instancia. Sin thundering herd.
Evaluamos Redis Pub/Sub al principio. Los DOs ganaron porque agrupan estado y soporte WebSocket en un solo primitivo, con dedup de peticiones incluida. Con Redis habríamos tenido que montar gestión de estado, pub/sub y manejo de conexiones por separado. Además, los DOs no cuestan nada en reposo, y eso importa porque muchos pares de trading están en silencio casi todo el tiempo.
Ojo: los deploys desconectan todos los WebSockets. Los clientes necesitan lógica de reconexión. Lo sabíamos de antemano y lo construimos así.
La pipeline completa
PubSub recibe los batches en bruto; un worker parsea los eventos de actualización de ticks y los escribe en una tabla MergeTree. Los inserts que fallan van a DLQ para reintento inmediato. Las materialized views transforman los datos hacia la tabla ReplacingMergeTree tick_state en cada insert.
Los merges de fondo colapsan duplicados y se quedan con la fila de updated_at más reciente. Las filas is_deleted se limpian físicamente al cumplir 6 horas mediante merges forzados. OPTIMIZE TABLE ... FINAL CLEANUP está disponible para eliminación inmediata, pero nunca lo hemos usado en producción.
Las queries FINAL sirven el estado actual; las queries delta filtradas por ingestion_timestamp alimentan la capa de streaming. Los Durable Objects mantienen estado en memoria por par de trading, hacen polling de deltas cada segundo y difunden cambios por WebSocket.
Batches de 25MB a datos en vivo para traders en menos de un segundo. Millones de actualizaciones de ticks, unos pocos miles de ticks activos. Funciona sin que nadie lo toque.
Esto no es una arquitectura novedosa. Cada pieza es simple por separado. El trabajo estuvo en hacer que encajaran bajo la carga de producción. El test de las 2am se cumple: cualquier ingeniero del equipo puede debuggear cualquier capa sin molestar a nadie. De eso se trata.
Seguir Leyendo
Publicado originalmente en obsessionDB. Lee el artículo original aquí.
ClickHouse is a registered trademark of ClickHouse, Inc. https://clickhouse.com