Streaming Integrations: Kafka, WebSockets, and Real-Time Q
kdb+/q was built for real-time data. The tick infrastructure — the tickerplant, real-time database, and historical database — is one of Q’s strongest features and the reason it dominates in high-frequency finance. What it wasn’t originally designed for is plugging into the broader streaming ecosystem: Kafka topics, WebSocket-hungry frontends, and the event-driven microservices architecture that everyone built in the 2020s.
This chapter is about making Q a first-class participant in that ecosystem — both as a consumer of external streams and as a producer of real-time data.
Q’s Native Tick Architecture (Brief Review)
Before plugging Q into Kafka, understand what Q already does natively:
Feed Handler → Tickerplant → RDB (real-time DB)
→ Subscribers (other q processes)
→ Log file → HDB (historical DB)
The tickerplant receives data, timestamps it, writes to a log, and publishes to subscribers via IPC. Each subscriber calls .u.sub to register and receives updates via .u.upd.
This is already a streaming architecture — it’s just Q-native. When you need to integrate with non-Q systems, you extend this pattern rather than replacing it.
Kafka Integration
Architecture
The two Kafka-Q integration patterns:
Q as consumer: A Q process subscribes to Kafka topics and ingests data into kdb+ tables. Useful for bringing external event streams into your time-series database.
Q as producer: A Q process (typically a tickerplant subscriber) publishes processed data to Kafka topics for consumption by other services.
KafkaQ
The KafkaQ library (from KX) provides Kafka integration for q:
# Install KafkaQ (requires librdkafka)
# macOS
brew install librdkafka
# Ubuntu/Debian
apt-get install librdkafka-dev
# Download KafkaQ
# https://github.com/KxSystems/kafka
# Place kafkaq.so/.dylib in your q path
/ Load KafkaQ
\l kafkaq.q
/ ─── Consumer ───────────────────────────────────────────────────────────────
/ Create a consumer with configuration
consumer: .kafka.newConsumer[
`bootstrap.servers`group.id`auto.offset.reset!
("localhost:9092"; "my-q-consumer"; "earliest")
]
/ Subscribe to topics
.kafka.subscribe[consumer; `trade_data`order_data]
/ Message handler — called for each incoming Kafka message
.kafka.recvMsg: {[consumer; msg]
/ msg is a dictionary: `topic`partition`offset`key`data`timestamp
topic: msg`topic;
payload: .j.k msg`data; / deserialize JSON payload
/ Route to appropriate handler
$[topic ~ "trade_data"; ingestTrade payload;
topic ~ "order_data"; ingestOrder payload;
/ default
-1 "Unknown topic: ", string topic]
}
/ Start polling (call in a timer or loop)
.kafka.poll[consumer; 1000] / poll with 1000ms timeout
/ ─── Trade ingestion ─────────────────────────────────────────────────────────
/ In-memory trade table (replace with actual schema)
trade: ([] time:`timestamp$(); sym:`symbol$(); price:`float$(); size:`long$())
ingestTrade: {[payload]
/ Convert Kafka message to Q row
row: (
`timestamp$payload`timestamp;
`$payload`sym;
`float$payload`price;
`long$payload`size
);
`trade insert row;
/ Optional: forward to tickerplant
if[0 < count tp_handle; neg[tp_handle] (`.u.upd; `trade; row)]
}
Q as Kafka Producer
Publishing Q data to Kafka:
\l kafkaq.q
/ Create a producer
producer: .kafka.newProducer[
`bootstrap.servers`acks!
("localhost:9092"; "all")
]
/ Publish tickerplant updates to Kafka
/ Add this to your RDB subscriber
.u.upd: {[t; data]
/ Standard RDB upsert
t insert data;
/ Also publish to Kafka
publishToKafka[t; data]
}
publishToKafka: {[table; data]
topic: "kdb-", string table;
/ Convert each row to JSON and publish
rows: flip data;
do[count rows 0;
row: rows[; i];
msg: .j.j `table`data!(table; row);
.kafka.publish[producer; `$topic; ""; msg]
]
}
Python Kafka Bridge
If installing KafkaQ feels like more than you want to manage, a Python bridge is a clean alternative:
# kafka_bridge.py
# Consumes from Kafka, inserts into kdb+ via IPC
from kafka import KafkaConsumer
import pykx as kx
import json
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
def run_bridge(
kafka_brokers: str,
kafka_topic: str,
kdb_host: str,
kdb_port: int,
kdb_table: str
):
consumer = KafkaConsumer(
kafka_topic,
bootstrap_servers=kafka_brokers,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='latest',
group_id='kdb-bridge'
)
q = kx.SyncQConnection(host=kdb_host, port=kdb_port)
logger.info(f"Bridge running: {kafka_topic} → kdb+:{kdb_port}:{kdb_table}")
batch = []
batch_size = 100
try:
for message in consumer:
payload = message.value
batch.append(payload)
if len(batch) >= batch_size:
flush_batch(q, kdb_table, batch)
batch = []
except KeyboardInterrupt:
if batch:
flush_batch(q, kdb_table, batch)
finally:
consumer.close()
q.close()
def flush_batch(q: kx.SyncQConnection, table: str, batch: list):
"""Insert a batch of records into kdb+."""
import pandas as pd
df = pd.DataFrame(batch)
# Type coercions
if 'timestamp' in df.columns:
df['timestamp'] = pd.to_datetime(df['timestamp'])
if 'sym' in df.columns:
df['sym'] = df['sym'].astype(str)
q(f'`{table} insert', df)
logger.info(f"Flushed {len(batch)} records to {table}")
if __name__ == '__main__':
run_bridge(
kafka_brokers='localhost:9092',
kafka_topic='market-trades',
kdb_host='localhost',
kdb_port=5001,
kdb_table='trade'
)
WebSockets: Streaming Q Data to Browsers
The use case: a dashboard that shows live market data, updated as trades arrive in kdb+. WebSockets are the right transport for this — persistent connection, low overhead, bidirectional.
Q’s Built-In WebSocket Server
kdb+ 3.5+ has native WebSocket support via .z.wo, .z.wc, .z.ws:
\p 5001 / start on port 5001 (same port handles HTTP, IPC, and WebSockets)
/ WebSocket connection tracking
ws_clients: `long$() / list of connected WebSocket handles
.z.wo: {[h]
/ h = new WebSocket handle
ws_clients,: h;
-1 "WS connected: ", string h;
}
.z.wc: {[h]
/ h = closing WebSocket handle
ws_clients: ws_clients except h;
-1 "WS disconnected: ", string h;
}
.z.ws: {[x]
/ x = incoming WebSocket message (string or bytes)
/ Parse request and send back data
request: .j.k x;
response: handleWsRequest request;
neg[.z.w] .j.j response
}
handleWsRequest: {[req]
action: req`action;
$[action ~ "subscribe";
handleSubscribe req;
action ~ "query";
handleQuery req;
`error`message!("unknown_action"; action)]
}
handleSubscribe: {[req]
syms: `$req`syms;
/ Store subscription: handle -> syms
`subscriptions upsert (.z.w; syms);
`status`message!("subscribed"; "Subscribed to ", " " sv string syms)
}
handleQuery: {[req]
q_code: req`query;
result: @[value; q_code; {[e] `error`message!(1b; string e)}];
`status`data!("ok"; result)
}
/ Broadcast trade updates to subscribed WebSocket clients
subscriptions: ([handle:`long$()] syms:())
broadcastTrade: {[trade_data]
/ For each connected WS client with matching subscription
{[h; trade_data]
client_syms: subscriptions[h; `syms];
matching: trade_data where trade_data[`sym] in client_syms;
if[count matching;
neg[h] .j.j `type`data!("trade_update"; matching)]
}[; trade_data] each ws_clients;
}
/ Hook into tickerplant update function
.u.upd: {[t; data]
$[t ~ `trade;
broadcastTrade data;
()];
}
JavaScript Client
// dashboard.js
const ws = new WebSocket('ws://localhost:5001');
ws.onopen = () => {
console.log('Connected to kdb+');
// Subscribe to specific symbols
ws.send(JSON.stringify({
action: 'subscribe',
syms: ['AAPL', 'MSFT', 'GOOG']
}));
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === 'trade_update') {
updateDashboard(msg.data);
}
};
ws.onerror = (error) => console.error('WebSocket error:', error);
ws.onclose = () => {
console.log('Disconnected');
setTimeout(() => reconnect(), 5000); // auto-reconnect
};
function updateDashboard(trades) {
trades.forEach(trade => {
const el = document.getElementById(`price-${trade.sym}`);
if (el) {
el.textContent = trade.price.toFixed(2);
el.classList.add('flash');
setTimeout(() => el.classList.remove('flash'), 300);
}
});
}
// Request historical data via query
function fetchOHLC(sym, date) {
ws.send(JSON.stringify({
action: 'query',
query: `select from ohlc where sym=\`${sym}, date=${date}`
}));
}
FastAPI WebSocket Proxy
For production, a FastAPI proxy handles authentication, connection management, and adds observability:
# websocket_proxy.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.websockets import WebSocketState
import pykx as kx
import asyncio
import json
import logging
logger = logging.getLogger(__name__)
app = FastAPI()
# kdb+ connection for WebSocket data
q = kx.SyncQConnection(host='localhost', port=5001)
class ConnectionManager:
def __init__(self):
self.active: dict[str, list[WebSocket]] = {} # sym -> [websocket]
async def connect(self, websocket: WebSocket, sym: str):
await websocket.accept()
if sym not in self.active:
self.active[sym] = []
self.active[sym].append(websocket)
logger.info(f"WS connected for {sym}, total: {sum(len(v) for v in self.active.values())}")
def disconnect(self, websocket: WebSocket, sym: str):
if sym in self.active:
self.active[sym] = [ws for ws in self.active[sym] if ws != websocket]
async def broadcast(self, sym: str, data: dict):
if sym not in self.active:
return
dead = []
for ws in self.active[sym]:
try:
if ws.client_state == WebSocketState.CONNECTED:
await ws.send_json(data)
except Exception:
dead.append(ws)
for ws in dead:
self.active[sym].remove(ws)
manager = ConnectionManager()
@app.websocket("/stream/{sym}")
async def stream_symbol(websocket: WebSocket, sym: str):
"""Stream real-time trades for a symbol."""
await manager.connect(websocket, sym.upper())
sym = sym.upper()
try:
# Send initial snapshot
snapshot = q(
"{[s] select[-50] time, price, size from trade where date=.z.d, sym=s}",
kx.SymbolAtom(sym)
).pd()
await websocket.send_json({
"type": "snapshot",
"sym": sym,
"data": snapshot.to_dict(orient="records")
})
# Keep connection alive, receive control messages
while True:
try:
msg = await asyncio.wait_for(websocket.receive_json(), timeout=30)
if msg.get("action") == "ping":
await websocket.send_json({"type": "pong"})
except asyncio.TimeoutError:
# Send keepalive
await websocket.send_json({"type": "heartbeat"})
except WebSocketDisconnect:
manager.disconnect(websocket, sym)
logger.info(f"WS disconnected: {sym}")
# Background task: poll kdb+ and broadcast updates
async def poll_and_broadcast():
"""Poll kdb+ for new trades and broadcast to subscribers."""
last_time = {} # sym -> last seen time
while True:
await asyncio.sleep(0.5) # 500ms polling interval
for sym in list(manager.active.keys()):
if not manager.active.get(sym):
continue
try:
cutoff = last_time.get(sym, "00:00:00.000")
new_trades = q(
"{[s;t] select time, price, size from trade where date=.z.d, sym=s, time>t}",
kx.SymbolAtom(sym),
kx.TimeAtom(cutoff)
).pd()
if len(new_trades) > 0:
last_time[sym] = str(new_trades['time'].max())
await manager.broadcast(sym, {
"type": "update",
"sym": sym,
"trades": new_trades.to_dict(orient="records")
})
except Exception as e:
logger.error(f"Poll error for {sym}: {e}")
@app.on_event("startup")
async def startup():
asyncio.create_task(poll_and_broadcast())
Handling Back-Pressure
When Q is producing data faster than consumers can handle it, you need a back-pressure strategy:
/ Simple back-pressure in Q: check send buffer before sending
broadcastWithBackPressure: {[h; data]
/ .z.W[h] = bytes in send buffer for handle h
buffer_size: .z.W[h];
$[buffer_size > 1000000; / 1MB threshold
/ Buffer full: drop or queue
(-1 "Dropping update for slow consumer: ", string h);
/ Buffer OK: send
neg[h] .j.j data]
}
For Kafka as a buffer (the right architectural answer for high-throughput):
kdb+ tickerplant
→ publishes to Kafka topic (Q→Kafka producer)
→ Kafka topic acts as buffer
→ slow consumers read from Kafka at their own pace
This decouples the producer (kdb+) from the consumer’s throughput.
The Complete Real-Time Pipeline
Putting it together: Kafka → Q → WebSocket → Browser:
[Market Data Feed]
↓
[Kafka Topic: raw-trades]
↓
[kdb+ Feed Handler] → [Tickerplant] → [RDB]
↓
[WebSocket Server]
↓
[Browser Dashboard]
↓
[REST API]
↓
[External Consumers]
Each layer has a clear responsibility:
- Kafka: durable message queue, decouples feed from processing
- kdb+ tickerplant: time-stamping, logging, fan-out
- RDB: in-memory storage for today’s data
- WebSocket server: real-time push to browsers
- REST API: request-response for historical data
Q sits in the middle of this, doing what it does best: storing and querying time-series data at speed.