Q in the Wild: Breaking Out of the Silo
Q is a strange language to love. Its syntax looks like someone bet a Lisp programmer they couldn’t make a language with fewer vowels. Its documentation assumes a specific kind of patience. And yet — once it clicks — you find yourself genuinely irritated when you have to do time-series work in anything else.
The problem is that Q exists in a silo. Not because it has to, but because the path of least resistance in a kdb+ shop is to write everything in Q. Tick infrastructure in Q. Analytics in Q. The internal tooling script that one person wrote in 2011 that nobody touches? Q. The REST endpoint that technically works but falls over if you look at it wrong? Wrapped Q.
This book is about breaking out of that pattern — not by replacing Q, but by making it a first-class participant in a modern polyglot stack.
The Silo Problem, Concretely
Here’s what a typical kdb+ shop looks like in practice:
[Feed Handler (C/Q)] → [Tickerplant (Q)] → [RDB (Q)] → [HDB (Q)]
↑
[Analytics (Q)]
↑
[Reports (Q? Excel?)]
Everything in that diagram is either Q or Excel. The Excel is because someone gave up on Q GUIs and the quants had to get their data somehow. The integration surface is basically non-existent.
Now here’s what a modern stack looks like when Q is doing what it’s good at and letting other tools do what they’re good at:
[Feed Handler] → [Kafka] → [kdb+ Feed Handler] → [Tickerplant (Q)]
|
┌─────────────────────────────────────┘
↓
[RDB (Q)] ←→ [HDB (Q)]
|
┌──────────┴──────────────────────┐
↓ ↓
[FastAPI gateway] [pykx (Python analytics)]
| |
[REST clients] [Jupyter / ML pipelines]
[Web dashboards] [R research notebooks]
[Compliance systems] [Spark / dbt pipelines]
Q is still doing what it does best: ingesting, storing, and querying time-series data at speed. But now it’s connected. The Python team doesn’t need a Q license to get trade data. The compliance system doesn’t need to speak IPC. The dashboard doesn’t need to run inside a Q process.
What This Book Covers
Language bridges — Python (via pykx and qPython), Rust (via FFI and IPC), and R (via rkdb). Real patterns for passing typed data across language boundaries without your timestamps becoming floats or your symbols becoming bytes. Each chapter compares the available libraries honestly and tells you which one to use.
APIs and protocols — Q’s IPC protocol in depth, REST API wrappers, and how to build something external consumers can actually call without needing a Q license or understanding what hopen means.
Streaming — Kafka integration, WebSocket servers, and the patterns that let Q participate in real-time data pipelines as both producer and consumer, not just as the thing at the end of the pipe.
Tooling — IDEs that won’t make you feel like a hermit, linters, formatters, and the state of Q developer experience in 2026 (better than 2020, worse than Python, genuinely not bad).
How to Use This Book
The chapters are mostly independent. If you need Python integration right now, go to the Python chapter. If you’re building a REST API, go to REST. If you’ve never thought much about what happens on the wire when Q processes talk to each other, the IPC chapter is worth reading before anything that involves network communication.
If you’re starting fresh and don’t know which integration you need, the quick guide:
| You want to… | Go to… |
|---|---|
| Query kdb+ from Python | Python — use pykx |
| Build a Rust service that reads kdb+ data | Rust — use kdbplus IPC |
| Do statistical research in R on kdb+ data | R — use rkdb |
| Expose kdb+ data as a REST API | REST — FastAPI gateway |
| Set up a real IDE for Q development | IDEs — VS Code + KX extension |
| Understand Q’s wire protocol | IPC |
| Serve live data to a browser dashboard | Streaming — WebSockets |
| Integrate with Kafka | Streaming — Kafka section |
| Add linting and testing to a Q codebase | Dev Tooling |
What This Book Assumes
You know Q. Not necessarily at guru level, but you can write a functional update, you know why select beats a loop, and you’ve debugged a type error at least once by adding backtick casts until it stopped complaining.
You know at least one of: Python, Rust, or R. Ideally more. You’ve worked with at least one web framework. You understand what a TCP socket is.
You’ve wondered at least once why Q can’t just have a decent package manager. (It can’t. This book doesn’t fix that.)
The IPC Foundation
One concept runs through every chapter: Q’s IPC protocol. Every library covered here — pykx, qPython, kdbplus, rkdb — is ultimately speaking this protocol. Understanding it once, at a conceptual level, makes everything else clearer.
Q communicates over TCP. A kdb+ process is always one of:
- A server: listening on a port, handling incoming connections
- A client: connecting to a server and sending queries
- A gateway: doing both (and ideally doing it without being a single point of failure)
The port is set with the -p flag at startup:
q -p 5001
From another Q process:
/ Open a handle to a remote process
h: hopen `::5001
/ or with auth: h: hopen `::5001:user:pass
/ Synchronous call — blocks until remote returns
result: h "select count from trade where date=.z.d"
/ Safer: function + arguments rather than string eval
result: h (`.myns.myFunc; arg1; arg2)
/ Asynchronous — returns immediately, no response
neg[h] ".u.upd[`trade; data]"
/ Close when done
hclose h
The synchronous call blocks the calling process. The asynchronous (negative handle) variant does not wait for a response — used by tickerplants to publish to subscribers. The IPC chapter covers both in detail, along with message handlers, gateway patterns, and the situations where a synchronous call will hang your process and ruin your day.
Every integration library in this book provides an abstraction over this mechanism. When something breaks, stripping the abstraction and thinking about what’s happening at the handle level is usually how you find the bug.
The Type System Across Boundaries
Q’s type system is the source of most integration friction. A brief orientation:
| Q type | Bytes | Python (pykx) | R (rkdb) | JSON |
|---|---|---|---|---|
| boolean | 1 | bool | logical | true/false |
| short | 2 | np.int16 | integer | number |
| int | 4 | np.int32 | integer | number |
| long | 8 | np.int64 | numeric* | number |
| float | 8 | np.float64 | numeric | number |
| symbol | var | str | character | string |
| timestamp | 8 | np.datetime64[ns] | POSIXct | string (ISO 8601) |
| date | 4 | datetime.date | Date | string |
| char list | var | str | character | string |
| table | — | pd.DataFrame | data.frame | array of objects |
*R has no 64-bit integer. Q longs larger than 2³¹ lose precision silently when received in R.
The timestamps column deserves emphasis: Q timestamps are nanoseconds since 2000-01-01. Python datetimes are microsecond-resolution since 1970-01-01. Every library converts for you, but the conversion can drop nanoseconds silently (Python’s datetime has no nanosecond field) or produce unexpected values if you’re not using the right output format. Each language chapter calls out the specific gotcha.
A Note on Versions
This book targets kdb+ 4.1 and the 2025–2026 versions of the libraries discussed. Where a library has changed significantly or has a version-specific gotcha, it’s called out inline.
The free 64-bit on-demand license from KX works for everything in this book. Get it at kx.com/developers/download-licenses/. The license requires internet connectivity for validation — relevant if you’re planning air-gapped deployments, which need a commercial license.
Setting Up a Base Environment
If you’re starting fresh:
# macOS (Apple Silicon)
mkdir -p ~/q
# Download from kx.com: macosx.zip for Intel, macosarm.zip for Apple Silicon
unzip macosarm.zip -d ~/q
echo 'export PATH="$HOME/q/m64:$PATH"' >> ~/.zshrc
source ~/.zshrc
# Linux x86-64
mkdir -p ~/q
# Download: linuxx86.zip
unzip linuxx86.zip -d ~/q
echo 'export PATH="$HOME/q/l64:$PATH"' >> ~/.bashrc
source ~/.bashrc
# Verify: start a q process on port 5001
q -p 5001
# You should see the q prompt: q)
# Test: q) 2 + 2
# 4
# Exit: q) \\
Each subsequent chapter has its own setup section for the specific integration. We won’t front-load dependencies — install things when you need them.
A minimal test that your q process is accepting IPC connections, from a second terminal:
# Quick connectivity test using bash and /dev/tcp
# (This is just to verify the port is open, not a real client)
echo "" > /dev/tcp/localhost/5001 && echo "Port 5001 is open"
# Or with nc:
nc -zv localhost 5001
And from a second q process:
h: hopen `::5001
h "1b" / should return 1b
hclose h
The Philosophy
The goal is not to make Q look like Python. Q developers who want Python should write Python. The goal is to make Q work with Python — and Rust, and R, and Kafka, and REST clients — without losing what makes Q worth using: its speed on columnar time-series data, its expressive query syntax, its tick infrastructure.
Q as a database. Q as a compute engine. Q as a streaming backbone. Everything else as a consumer, a producer, or an orchestrator.
That’s the architecture this book is building toward. Each chapter adds one more connection point. By the end, you’ll have a Q process that’s no longer a silo — it’s the center of something.
Let’s build it.
Python and Q: PyQ, qPython, and Getting Along
There are two serious Python-Q integration libraries, and they have different philosophies, different trade-offs, and different ways of making you feel like you chose wrong. Let’s save you the afternoon of discovery.
PyQ: KX’s official library. Deep integration — runs Python inside the q process, shares memory, no serialization overhead for in-process calls. Requires a real KX installation. Heavyweight but serious.
qPython: Pure Python, pure IPC. Connects to a running kdb+ process over the network. No special installation, no KX runtime dependency. The right choice for most external consumers of kdb+ data.
pykx: KX’s newer, actively-developed library. Wraps the KX C API, can run embedded or in IPC mode, has a pandas-compatible API, and is where KX is putting its energy. If you’re starting fresh, this is the one.
We’ll cover all three, but spend the most time on qPython (because you probably already have a kdb+ process and just want to talk to it) and pykx (because it’s the future).
qPython: The Pragmatic Choice
Installation
pip install qpython
That’s it. No KX runtime. Works against any kdb+ 3.x or 4.x process.
Connecting and Basic Queries
from qpython import qconnection
from qpython.qtype import QException
# Connect to a running kdb+ process
q = qconnection.QConnection(host='localhost', port=5001)
q.open()
# Execute a simple expression
result = q('2 + 2')
print(result) # 4
# Execute a function call
result = q('til 10')
print(result) # [0 1 2 3 4 5 6 7 8 9]
q.close()
Use it as a context manager to avoid forgetting to close:
from qpython import qconnection
with qconnection.QConnection(host='localhost', port=5001) as q:
result = q('select from trade where date=2024.01.15, sym=`AAPL')
print(result)
Type Mapping: Where Things Get Interesting
Q’s type system is richer than Python’s. When qPython receives data, it maps Q types to numpy arrays and pandas objects. Understanding this mapping saves debugging sessions.
import numpy as np
from qpython import qconnection
from qpython.qtype import (
QSymbolList, QTimestampList, QFloat64List, QLONG_LIST
)
with qconnection.QConnection(host='localhost', port=5001, pandas=True) as q:
# With pandas=True, tables come back as DataFrames
df = q('select time, sym, price, size from trade where date=2024.01.15')
print(df.dtypes)
# time datetime64[ns]
# sym object <- Q symbols become Python strings
# price float64
# size int64
print(df.head())
The pandas=True flag is almost always what you want for tabular data. Without it, you get a QTable object that you then have to convert anyway.
Sending Data to Q
Pushing data from Python to Q is where type handling gets tricky. Q is strict about types; Python is not.
import numpy as np
import pandas as pd
from qpython import qconnection
from qpython.qtype import QSymbolList, QLONG, QFLOAT64
with qconnection.QConnection(host='localhost', port=5001, pandas=True) as q:
# Create a DataFrame to push
df = pd.DataFrame({
'time': pd.to_datetime(['2024-01-15 09:30:00', '2024-01-15 09:30:01']),
'sym': ['AAPL', 'MSFT'],
'price': [185.50, 375.20],
'size': [100, 200]
})
# Push a variable into Q's namespace
q('set', np.string_('mydata'), df)
# Verify it's there
result = q('count mydata')
print(result) # 2
# Now query it
result = q('select from mydata where sym=`AAPL')
print(result)
The Symbol Problem
Q symbols and Python strings are conceptually similar but mechanically different. When you query Q and get back symbol data, qPython gives you Python bytes objects by default (without pandas=True) or strings (with it). When you send symbols to Q, you need to be explicit:
from qpython.qtype import QSymbolList
import numpy as np
with qconnection.QConnection(host='localhost', port=5001) as q:
# Wrong: Python list of strings won't be interpreted as Q symbols
# q('insert', np.string_('mytable'), [['AAPL', 'MSFT'], [100, 200]])
# Right: use QSymbolList for symbol columns
syms = QSymbolList(np.array(['AAPL', 'MSFT'], dtype='S'))
sizes = np.array([100, 200], dtype=np.int64)
q('`mytable insert', (syms, sizes))
The difference between bytes and symbols matters in Q. If sym in your trade table is a symbol and you send bytes, you’ll get a type error on join operations that will be deeply confusing for ten minutes.
Asynchronous Calls
For fire-and-forget operations (publishing to a tickerplant, for instance):
from qpython import qconnection
with qconnection.QConnection(host='localhost', port=5001) as q:
# Synchronous — waits for response
result = q('.u.upd', np.string_('trade'), data)
# Asynchronous — returns immediately
q('.u.upd', np.string_('trade'), data, sync=False)
Be careful with async calls in tight loops — you can flood a Q process faster than it can handle messages. Add a small sleep or batch your updates if you’re publishing high-frequency data.
Error Handling
Q errors propagate back as QException:
from qpython.qtype import QException
with qconnection.QConnection(host='localhost', port=5001) as q:
try:
result = q('1 + `sym') # type error
except QException as e:
print(f"Q error: {e}") # type
except Exception as e:
print(f"Connection error: {e}")
pykx: The Modern Choice
pykx is KX’s actively-maintained library that has both an embedded mode (runs kdb+ in-process) and an IPC mode (connects to a running process). The API is cleaner than qPython and it has better pandas and numpy integration.
Installation
pip install pykx
For IPC-only mode (no KX license needed):
# Set this before importing pykx to use IPC-only mode
export PYKX_LICENSED_LIBRARIES=false
# Or in Python:
import os
os.environ['PYKX_LICENSED_LIBRARIES'] = 'false'
IPC Mode
import pykx as kx
# Connect to a running kdb+ process
with kx.SyncQConnection(host='localhost', port=5001) as q:
# Execute q code
result = q('select from trade where date=2024.01.15')
# Convert to pandas
df = result.pd()
print(df)
# Or arrow
table = result.pa()
pykx Type System
pykx has a clean type hierarchy that mirrors Q’s:
import pykx as kx
with kx.SyncQConnection(host='localhost', port=5001) as q:
# Atoms
x = q('42')
print(type(x)) # pykx.LongAtom
print(x.py()) # 42 (Python int)
# Lists
xs = q('1 2 3f')
print(type(xs)) # pykx.FloatVector
print(xs.np()) # numpy array([1., 2., 3.])
# Tables come back as pykx.Table
tbl = q('([] a:1 2 3; b:`x`y`z)')
print(tbl.pd()) # pandas DataFrame
Sending Typed Data
import pykx as kx
import pandas as pd
import numpy as np
with kx.SyncQConnection(host='localhost', port=5001) as q:
# pykx can convert pandas DataFrames directly
df = pd.DataFrame({
'sym': ['AAPL', 'MSFT', 'GOOG'],
'price': [185.5, 375.2, 140.8],
'size': [100, 200, 150]
})
# Set a variable in Q
q['mydata'] = df
# Call a Q function with a Python argument
result = q('{[t] select from t where price > 200}', df)
print(result.pd())
Async Connections
import pykx as kx
import asyncio
async def publish_data():
async with kx.AsyncQConnection(host='localhost', port=5001) as q:
# Async query
result = await q('select from trade')
df = result.pd()
# Fire-and-forget publish
await q('.u.upd', 'trade', df, wait=False)
PyQ: The Deep Integration Option
PyQ is for when you want Python to run inside the q process — sharing memory, zero serialization overhead, using Q data structures directly from Python. It’s powerful and it requires a full KX installation.
pip install pyq
# Run with q's Python interpreter:
pyq my_script.py
Inside a PyQ session:
from pyq import q, K
# Q functions are accessible as attributes
q.til(10) # calls q's til function
q('.', 'mylib.q') # loads a Q script
# K objects are Q values
x = K(3.14)
y = q.sqrt(x)
# Create Q tables from Python
import numpy as np
data = {
'price': np.array([100.5, 101.2, 99.8]),
'size': np.array([100, 200, 150], dtype=np.int64)
}
q['mytrade'] = data
PyQ’s strength is in embedded analytics — Python ML libraries operating on Q data without copying it. If you’re running TensorFlow or scikit-learn against live kdb+ data and latency matters, PyQ is the right architecture.
Which One Should You Use?
| Scenario | Library |
|---|---|
| External service consuming kdb+ data | qPython or pykx IPC |
| New project, greenfield setup | pykx |
| High-frequency publisher to tickerplant | qPython (async) or pykx |
| ML/analytics running inside q process | PyQ |
| Don’t have KX license, just want data | qPython |
| Need pandas/arrow integration | pykx |
The honest answer: start with pykx. Its API is cleaner, it’s actively maintained, and it handles the type system better than qPython. Use qPython if you need a lighter dependency or you’re working with legacy code that already uses it. Use PyQ only if you genuinely need in-process Python execution.
A Complete Example: Time-Series Analysis Pipeline
This is the pattern that actually comes up in production — Python consuming kdb+ data for analysis, then writing results back.
import pykx as kx
import pandas as pd
import numpy as np
from scipy import stats
def compute_vwap(df: pd.DataFrame) -> pd.DataFrame:
"""Compute VWAP from trade data."""
df['notional'] = df['price'] * df['size']
vwap = df.groupby('sym').apply(
lambda x: x['notional'].sum() / x['size'].sum()
).reset_index()
vwap.columns = ['sym', 'vwap']
return vwap
def compute_volatility(df: pd.DataFrame, window: int = 20) -> pd.DataFrame:
"""Rolling annualized volatility."""
df = df.sort_values(['sym', 'time'])
df['log_ret'] = df.groupby('sym')['price'].transform(
lambda x: np.log(x / x.shift(1))
)
df['vol'] = df.groupby('sym')['log_ret'].transform(
lambda x: x.rolling(window).std() * np.sqrt(252)
)
return df[['time', 'sym', 'vol']].dropna()
def main():
with kx.SyncQConnection(host='localhost', port=5001) as q:
# Pull today's trade data
trades = q('''
select time, sym, price, size
from trade
where date=.z.d,
sym in `AAPL`MSFT`GOOG`AMZN`META
''').pd()
print(f"Loaded {len(trades):,} trades")
# Compute analytics in Python
vwap = compute_vwap(trades)
vol = compute_volatility(trades)
# Write results back to Q
q['analytics_vwap'] = vwap
q['analytics_vol'] = vol
# Or insert into a Q table
q('{`results_vwap upsert x}', vwap)
print("Analytics written to Q")
print(vwap)
if __name__ == '__main__':
main()
Timestamp Handling (The Thing That Will Burn You Once)
Q timestamps are nanoseconds since 2000.01.01. Python datetimes are microsecond-resolution since 1970-01-01. Every library does the conversion for you, but sometimes it goes wrong.
import pykx as kx
import pandas as pd
with kx.SyncQConnection(host='localhost', port=5001) as q:
# Q timestamp
ts = q('2024.01.15D09:30:00.123456789')
# pykx converts correctly
print(ts.py()) # datetime.datetime(2024, 1, 15, 9, 30, 0, 123456)
# Note: microseconds, nanoseconds truncated
# For nanosecond precision, use numpy
print(ts.np()) # numpy.datetime64('2024-01-15T09:30:00.123456789')
# pandas preserves nanoseconds
df = q('([] t:enlist 2024.01.15D09:30:00.123456789)').pd()
print(df['t'].dtype) # datetime64[ns]
print(df['t'].iloc[0]) # 2024-01-15 09:30:00.123456789
The gotcha: if you use ts.py() on a timestamp column, you lose the nanoseconds silently. Use .np() or work through pandas for nanosecond-resolution data.
Authentication
Both libraries support authenticated connections:
# qPython
q = qconnection.QConnection(
host='localhost',
port=5001,
username='myuser',
password='mypass'
)
# pykx
q = kx.SyncQConnection(
host='localhost',
port=5001,
username='myuser',
password='mypass'
)
Q’s authentication is username:password in the connection handshake. If the server has .z.pw defined, it uses that for validation; otherwise it uses the OS password file. For production, use .z.pw with proper credential management — don’t hardcode passwords in code that’s going into version control.
Rust and Q: FFI, IPC, and High-Performance Integration
Rust and Q are natural partners in the sense that both communities believe in doing things the right way even when doing it wrong would be faster. They differ in that the Rust compiler will tell you exactly what you did wrong, while Q will silently coerce your type and let you discover the problem three joins later.
There are two ways to connect Rust to kdb+:
- IPC over TCP: Connect to a running kdb+ process, speak the wire protocol. This is the same protocol everything else uses.
- FFI via the C API: Load a shared library into Rust and call kdb+ functions directly. Faster, more complex, requires the C API headers.
For 95% of use cases, IPC is the right choice. For the other 5% — where you’re embedding kdb+ in a Rust application, or writing a C extension for kdb+ in Rust — FFI is the path.
IPC: The kdbplus Crate
The kdbplus crate provides both IPC client functionality and FFI bindings. Start with IPC.
# Cargo.toml
[dependencies]
kdbplus = { version = "0.5", features = ["ipc"] }
tokio = { version = "1", features = ["full"] }
Basic Connection
use kdbplus::ipc::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect to a running kdb+ process
let mut client = QStream::connect(ConnectionMethod::TCP, "localhost", 5001, "").await?;
// Execute a q expression
let result = client.send_sync_message(&"til 10").await?;
println!("{}", result); // 0 1 2 3 4 5 6 7 8 9
Ok(())
}
The empty string in connect is the auth credentials ("username:password" if needed). The send_sync_message call blocks until the remote q process responds — same as a synchronous IPC call in Q itself.
Type System
This is where the THAT’S-how-you-do-it moment lives. kdbplus represents Q values with a K type that mirrors Q’s type hierarchy:
use kdbplus::ipc::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = QStream::connect(ConnectionMethod::TCP, "localhost", 5001, "").await?;
// Integer atom -> K with type QLONG
let x: K = client.send_sync_message(&"42").await?;
println!("Type: {:?}", x.get_type()); // QLONG
println!("Value: {}", x.get_long()?); // 42
// Float list
let xs: K = client.send_sync_message(&"1.0 2.5 3.7").await?;
let floats: Vec<f64> = xs.get_float_list()?;
println!("{:?}", floats); // [1.0, 2.5, 3.7]
// Symbol list
let syms: K = client.send_sync_message(&"`AAPL`MSFT`GOOG").await?;
let symbols: Vec<&str> = syms.get_symbol_list()?;
println!("{:?}", symbols); // ["AAPL", "MSFT", "GOOG"]
// Table
let table: K = client.send_sync_message(&"([] sym:`AAPL`MSFT; price:185.5 375.2)").await?;
// Tables are dictionaries of lists in Q; iterate columns
let col_names = table.get_keys()?;
for name in col_names {
println!("Column: {}", name);
}
Ok(())
}
Querying a Table
The pattern for real data retrieval — execute a Q select, get a table back, process it:
use kdbplus::ipc::*;
use std::collections::HashMap;
#[derive(Debug)]
struct Trade {
sym: String,
price: f64,
size: i64,
}
async fn get_trades(
client: &mut QStream,
date: &str,
syms: &[&str],
) -> Result<Vec<Trade>, Box<dyn std::error::Error>> {
let sym_list = syms.iter()
.map(|s| format!("`{}", s))
.collect::<Vec<_>>()
.join("");
let query = format!(
"select sym, price, size from trade where date={}, sym in {}",
date, sym_list
);
let result = client.send_sync_message(&query.as_str()).await?;
// Extract columns from the table
let sym_col: Vec<&str> = result.get_column("sym")?.get_symbol_list()?;
let price_col: Vec<f64> = result.get_column("price")?.get_float_list()?;
let size_col: Vec<i64> = result.get_column("size")?.get_long_list()?;
let trades = sym_col.into_iter()
.zip(price_col.into_iter())
.zip(size_col.into_iter())
.map(|((sym, price), size)| Trade {
sym: sym.to_string(),
price,
size,
})
.collect();
Ok(trades)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = QStream::connect(ConnectionMethod::TCP, "localhost", 5001, "").await?;
let trades = get_trades(&mut client, "2024.01.15", &["AAPL", "MSFT"]).await?;
for trade in &trades {
println!("{:?}", trade);
}
println!("Total trades: {}", trades.len());
Ok(())
}
Sending Data to Q
Pushing Rust data into a Q process requires constructing K objects:
use kdbplus::ipc::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = QStream::connect(ConnectionMethod::TCP, "localhost", 5001, "").await?;
// Build a Q table from Rust data
let syms = K::new_symbol_list(vec!["AAPL".to_string(), "MSFT".to_string()], qattribute::NONE);
let prices = K::new_float_list(vec![185.5_f64, 375.2_f64], qattribute::NONE);
let sizes = K::new_long_list(vec![100_i64, 200_i64], qattribute::NONE);
let table = K::new_table(
&["sym", "price", "size"],
&[syms, prices, sizes]
)?;
// Set it as a variable in the Q process
let set_query = K::new_mixed_list(vec![
K::new_string("set", qattribute::NONE),
K::new_symbol("`rust_data"),
table,
]);
client.send_sync_message(&set_query).await?;
// Verify
let count = client.send_sync_message(&"count rust_data").await?;
println!("Rows inserted: {}", count.get_long()?); // 2
Ok(())
}
Async Streaming
For continuous data feeds — subscribing to a tickerplant, for example:
use kdbplus::ipc::*;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = QStream::connect(ConnectionMethod::TCP, "localhost", 5001, "").await?;
// Subscribe to a tickerplant
// Standard tick subscriber pattern: .u.sub[`trade; `]
client.send_sync_message(&".u.sub[`trade; `]").await?;
println!("Subscribed to trade feed");
// Listen for incoming messages
loop {
// receive_message blocks until a message arrives
let msg = client.receive_message().await?;
// Tickerplant sends: (`upd; `trade; table_data)
if let Ok(msg_list) = msg.get_mixed_list() {
if msg_list.len() == 3 {
let func_name = msg_list[0].get_symbol()?;
let table_name = msg_list[1].get_symbol()?;
let data = &msg_list[2];
if func_name == "upd" && table_name == "trade" {
let price_col: Vec<f64> = data.get_column("price")?.get_float_list()?;
let sym_col: Vec<&str> = data.get_column("sym")?.get_symbol_list()?;
for (sym, price) in sym_col.iter().zip(price_col.iter()) {
println!("UPDATE: {} @ {:.2}", sym, price);
}
}
}
}
}
}
This is the real-time subscriber pattern. The kdb+ tickerplant calls .u.upd on all subscribers when new data arrives; the Rust side receives these as incoming messages without polling.
FFI: The C API
When you need to go deeper — writing kdb+ extensions in Rust, embedding kdb+ in a Rust process, or implementing a custom feed handler — you need the C API.
The kdbplus crate includes FFI bindings in its api feature:
[dependencies]
kdbplus = { version = "0.5", features = ["api"] }
Writing a kdb+ Extension in Rust
kdb+ extensions are shared libraries loaded with 2:. The extension exports C-compatible functions that take and return K values.
#![allow(unused)]
fn main() {
// src/lib.rs
use kdbplus::api::*;
use kdbplus::api::native::*;
// Function signature must match kdb+ C API: K func(K x)
#[no_mangle]
pub extern "C" fn rust_add(x: K, y: K) -> K {
// Extract values
let a = unsafe { x.get_long() };
let b = unsafe { y.get_long() };
// Compute result
let result = a + b;
// Return a new K long atom
unsafe { kj(result) }
}
// More complex: process a list
#[no_mangle]
pub extern "C" fn rust_squares(x: K) -> K {
unsafe {
let n = x.get_count();
let result = ktn(KJ as i32, n as i64); // allocate long list
for i in 0..n {
let val = kJ(x).add(i).read();
kJ(result).add(i).write(val * val);
}
result
}
}
}
Build as a dynamic library:
# Cargo.toml
[lib]
crate-type = ["cdylib"]
cargo build --release
# Output: target/release/libmyextension.dylib (macOS) or .so (Linux)
Load from Q:
/ Load the shared library
mylib: `:/path/to/libmyextension 2: (`rust_add; 2)
squares: `:/path/to/libmyextension 2: (`rust_squares; 1)
/ Call the Rust functions
mylib[3; 4] / 7
squares til 10 / 0 1 4 9 16 25 36 49 64 81
Memory Management in the C API
This is the part where the Rust compiler cannot save you. Q manages memory with reference counting. When a Rust function returns a K value to Q, Q owns it. The rules:
- Arguments passed into your function have their reference count incremented by Q before the call. You don’t need to increment them again.
- The return value from your function becomes Q’s property. Don’t free it.
- If you create a
Kvalue inside your function and it becomes an argument to another function (not the return value), you may need to callr0to decrement its reference count.
#![allow(unused)]
fn main() {
#[no_mangle]
pub extern "C" fn rust_filter_positive(x: K) -> K {
unsafe {
let n = x.get_count();
let mut positives: Vec<i64> = Vec::new();
for i in 0..n {
let val = kJ(x).add(i).read();
if val > 0 {
positives.push(val);
}
}
let result = ktn(KJ as i32, positives.len() as i64);
for (i, &val) in positives.iter().enumerate() {
kJ(result).add(i).write(val);
}
// x is owned by Q, we don't free it
// result will be returned to Q, which takes ownership
result
}
}
}
Memory bugs here won’t show up as Rust compilation errors — they’ll show up as kdb+ crashes or memory leaks. Write tests, use valgrind on Linux, and check your reference counts carefully.
Connection Pooling
For high-throughput Rust services consuming kdb+ data, managing connections efficiently matters:
use kdbplus::ipc::*;
use std::sync::Arc;
use tokio::sync::Mutex;
struct QConnectionPool {
connections: Vec<Arc<Mutex<QStream>>>,
size: usize,
next: std::sync::atomic::AtomicUsize,
}
impl QConnectionPool {
async fn new(host: &str, port: u16, auth: &str, size: usize)
-> Result<Self, Box<dyn std::error::Error>>
{
let mut connections = Vec::with_capacity(size);
for _ in 0..size {
let conn = QStream::connect(
ConnectionMethod::TCP, host, port, auth
).await?;
connections.push(Arc::new(Mutex::new(conn)));
}
Ok(Self {
connections,
size,
next: std::sync::atomic::AtomicUsize::new(0),
})
}
fn get(&self) -> Arc<Mutex<QStream>> {
let idx = self.next.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % self.size;
self.connections[idx].clone()
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = Arc::new(
QConnectionPool::new("localhost", 5001, "", 4).await?
);
// Spawn concurrent queries
let mut handles = Vec::new();
for sym in &["AAPL", "MSFT", "GOOG", "AMZN"] {
let pool = pool.clone();
let sym = sym.to_string();
let handle = tokio::spawn(async move {
let conn = pool.get();
let mut guard = conn.lock().await;
let query = format!("last select price from trade where sym=`{}", sym);
guard.send_sync_message(&query.as_str()).await
});
handles.push(handle);
}
for handle in handles {
let result = handle.await??;
println!("{}", result);
}
Ok(())
}
TLS Connections
If your kdb+ process is behind a TLS-terminating proxy or configured with TLS directly:
use kdbplus::ipc::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TLS connection
let mut client = QStream::connect(
ConnectionMethod::TLS,
"myserver.example.com",
5001,
"user:pass"
).await?;
let result = client.send_sync_message(&"til 5").await?;
println!("{}", result);
Ok(())
}
When to Use IPC vs FFI
Use IPC when:
- You have a running kdb+ process you want to query
- You’re building a service that consumes or publishes kdb+ data
- You want the kdb+ process to remain independently manageable
- You want Rust compilation errors instead of runtime crashes
Use FFI when:
- You’re writing a kdb+ extension (
.so/.dylibloaded with2:) - You need zero-copy access to Q data structures
- You’re embedding a compute kernel that Q will call directly
- You genuinely enjoy reading about reference counting
The IPC path is the more Rust-idiomatic one — clean separation, async-friendly, proper error types. The FFI path gives you more power and more ways to segfault. Start with IPC.
R and Q: Statistical Power Meets Time-Series Performance
The R-Q relationship is one of complementary strengths that coexist awkwardly. R is genuinely excellent at statistics, visualization, and the kind of exploratory analysis where you want twelve different regression models and a scatterplot matrix in thirty lines. Q is genuinely excellent at storing and querying large time-series datasets efficiently. Together, they cover a lot of ground. Separately, they each have a large blind spot.
The practical question is: how do you get R talking to Q without it being painful?
The Options
rkdb: The most direct path. R package that connects to kdb+ via IPC. Wraps the C API. The one you want.
rqpython / qPython bridge: Use Python as a middleman. Q → Python → R. Occasionally done in organizations that already have Python infrastructure. More moving parts than necessary if you just want R talking to Q.
Flat files / HDF5: Export from Q, import in R. Works. Terrible for anything real-time. Mentioned only to be dismissed.
We’ll focus on rkdb. It’s the direct, performant option and the one that’s actually maintained.
rkdb: Setup and Connection
# Install from CRAN (or GitHub for latest)
install.packages("rkdb")
# or
devtools::install_github("KxSystems/rkdb")
Connecting to a kdb+ process:
library(rkdb)
# Open a connection
h <- open_connection("localhost", 5001)
# Execute a q expression
result <- execute(h, "til 10")
print(result)
# [1] 0 1 2 3 4 5 6 7 8 9
# Close when done
close_connection(h)
Use a tryCatch block to ensure the connection is closed:
library(rkdb)
with_qconnection <- function(host, port, expr) {
h <- open_connection(host, port)
on.exit(close_connection(h))
expr(h)
}
# Usage
result <- with_qconnection("localhost", 5001, function(h) {
execute(h, "select from trade where date=2024.01.15")
})
Type Mapping
Q types map to R types reasonably cleanly, with a few exceptions that matter:
| Q type | R type |
|---|---|
| boolean | logical |
| byte | raw |
| short | integer |
| int | integer |
| long | numeric (R has no 64-bit int) |
| real | numeric |
| float | numeric |
| char | character |
| symbol | character |
| timestamp | POSIXct |
| date | Date |
| time | difftime |
| list | list |
| table | data.frame |
| dictionary | named list |
The important one: Q longs become R numerics. R’s integer type is 32-bit. Q’s long is 64-bit. Values up to 2^31 survive the conversion without issue; larger values lose precision silently. If you’re working with large integer IDs or nanosecond timestamps as integers, be aware of this.
h <- open_connection("localhost", 5001)
# Q table → R data.frame (this is the good news)
df <- execute(h, "select date, sym, price, size from trade where date=2024.01.15")
class(df) # "data.frame"
str(df)
# 'data.frame': 1000 obs. of 4 variables:
# $ date : Date, format: "2024-01-15" ...
# $ sym : chr "AAPL" "AAPL" "MSFT" ...
# $ price: num 185.5 185.6 375.2 ...
# $ size : num 100 200 150 ...
# Symbol columns come back as character vectors
class(df$sym) # "character"
# Timestamp handling
ts_data <- execute(h, "select time from trade where date=2024.01.15, i<5")
class(ts_data$time) # "POSIXct" "POSIXt"
format(ts_data$time[1], "%H:%M:%OS6") # "09:30:00.123456"
close_connection(h)
The Moment: Doing Real Analysis
Here’s the pattern that makes the R-Q combination click — pull structured data from Q, do something R is uniquely good at, and optionally push results back.
library(rkdb)
library(dplyr)
library(ggplot2)
library(lubridate)
h <- open_connection("localhost", 5001)
on.exit(close_connection(h))
# Pull a month of OHLC data from Q
ohlc <- execute(h, "
select
date,
open: first price,
high: max price,
low: min price,
close: last price,
volume: sum size
from trade
where
date within 2024.01.01 2024.01.31,
sym=`AAPL
by date
")
# That Q query runs in milliseconds on a properly splayed table.
# Now R takes over.
# Compute technical indicators in R
ohlc <- ohlc %>%
arrange(date) %>%
mutate(
# Simple moving averages
sma_5 = zoo::rollmean(close, 5, fill = NA, align = "right"),
sma_20 = zoo::rollmean(close, 20, fill = NA, align = "right"),
# Daily return
ret = (close / lag(close)) - 1,
# Bollinger bands (20-day)
bb_mid = sma_20,
bb_sd = zoo::rollapply(close, 20, sd, fill = NA, align = "right"),
bb_upper = bb_mid + 2 * bb_sd,
bb_lower = bb_mid - 2 * bb_sd,
# RSI (14-day)
gain = pmax(ret, 0),
loss = pmax(-ret, 0),
avg_gain = zoo::rollmean(gain, 14, fill = NA, align = "right"),
avg_loss = zoo::rollmean(loss, 14, fill = NA, align = "right"),
rs = avg_gain / avg_loss,
rsi = 100 - (100 / (1 + rs))
)
print(tail(ohlc %>% select(date, close, sma_5, sma_20, rsi), 5))
# Visualize
ggplot(ohlc %>% filter(!is.na(sma_20)), aes(x = date)) +
geom_ribbon(aes(ymin = bb_lower, ymax = bb_upper), fill = "steelblue", alpha = 0.2) +
geom_line(aes(y = close), color = "black") +
geom_line(aes(y = sma_20), color = "red", linetype = "dashed") +
labs(title = "AAPL: Price with Bollinger Bands (Jan 2024)",
y = "Price", x = NULL) +
theme_minimal()
Pushing Results Back to Q
Results computed in R can be inserted back into Q:
library(rkdb)
h <- open_connection("localhost", 5001)
on.exit(close_connection(h))
# Compute something in R
set.seed(42)
simulated_paths <- data.frame(
t = 0:252,
path1 = cumsum(c(0, rnorm(252, 0.0003, 0.015))),
path2 = cumsum(c(0, rnorm(252, 0.0003, 0.015))),
path3 = cumsum(c(0, rnorm(252, 0.0003, 0.015)))
)
# Push back to Q
# Note: execute() sends data; the second argument to `set` is the name
execute(h, "set", "sim_paths", simulated_paths)
# Verify
n <- execute(h, "count sim_paths")
cat("Rows in Q:", n, "\n") # 253
The type mapping on the way in can be fiddly. Data frames map to Q tables; R numerics map to Q floats; R integers map to Q ints. Factors will cause you grief — convert them to character first.
Statistical Patterns That Work Well
Cross-Sectional Factor Analysis
library(rkdb)
library(dplyr)
h <- open_connection("localhost", 5001)
on.exit(close_connection(h))
# Pull end-of-day prices for a universe of symbols
eod <- execute(h, "
select
date,
sym,
close: last price,
volume: sum size
from trade
where
date within 2023.01.01 2024.01.01,
sym in `AAPL`MSFT`GOOG`AMZN`META`NVDA`TSLA`JPM`BAC`GS
by date, sym
")
# Compute factor exposures
eod <- eod %>%
group_by(sym) %>%
arrange(date) %>%
mutate(
ret_1d = close / lag(close) - 1,
ret_5d = close / lag(close, 5) - 1,
ret_21d = close / lag(close, 21) - 1,
vol_21d = zoo::rollapply(ret_1d, 21, sd, fill = NA, align = "right"),
vol_log = log(volume),
momentum = ret_21d / vol_21d # risk-adjusted momentum
) %>%
ungroup()
# Cross-sectional z-score each factor
eod <- eod %>%
group_by(date) %>%
mutate(
z_momentum = scale(momentum)[,1],
z_vol_log = scale(vol_log)[,1]
) %>%
ungroup() %>%
filter(!is.na(z_momentum))
# IC (Information Coefficient) of momentum factor
ic_by_date <- eod %>%
group_by(date) %>%
summarise(
ic = cor(z_momentum, lead(ret_5d), use = "complete.obs", method = "spearman"),
.groups = "drop"
) %>%
filter(!is.na(ic))
cat("Mean IC:", round(mean(ic_by_date$ic, na.rm = TRUE), 4), "\n")
cat("IC IR:", round(mean(ic_by_date$ic, na.rm = TRUE) /
sd(ic_by_date$ic, na.rm = TRUE), 4), "\n")
Cointegration Analysis
Q’s tick data is excellent for cointegration testing — pairs trading, spread analysis, that sort of thing:
library(rkdb)
library(urca) # for Johansen cointegration test
h <- open_connection("localhost", 5001)
on.exit(close_connection(h))
# Pull synchronized mid prices for two instruments
prices <- execute(h, "
aj[`time`sym;
select time, sym:`SPY, mid:0.5*(bid+ask) from quotes where date=2024.01.15, sym=`SPY;
select time, sym:`IVV, mid:0.5*(bid+ask) from quotes where date=2024.01.15, sym=`IVV
]
")
spy <- prices$mid[prices$sym == "SPY"]
ivv <- prices$mid[prices$sym == "IVV"]
# Engle-Granger cointegration test
coint_reg <- lm(spy ~ ivv)
cat("Cointegration regression R²:", summary(coint_reg)$r.squared, "\n")
# ADF test on residuals
adf_result <- ur.df(residuals(coint_reg), type = "drift", lags = 5)
cat("ADF test statistic:", adf_result@teststat[1], "\n")
cat("5% critical value:", adf_result@cval[1, 2], "\n")
# Spread (basis)
spread <- spy - coef(coint_reg)[2] * ivv - coef(coint_reg)[1]
cat("Spread mean:", round(mean(spread), 6), "\n")
cat("Spread SD:", round(sd(spread), 6), "\n")
cat("Current z-score:", round(tail(spread, 1) / sd(spread), 4), "\n")
Parameterized Queries
String interpolation for Q queries is fragile. Use the list-form execute when you need parameters:
# Fragile: SQL injection-ish problems, quoting issues
sym <- "AAPL"
bad_query <- paste0("select from trade where sym=`", sym)
execute(h, bad_query) # works but ugly
# Better: use execute with arguments
# rkdb supports passing arguments after the function call
result <- execute(h, "{[s;d] select from trade where sym=s, date=d}",
as.name("AAPL"), # Q symbol
as.Date("2024-01-15")) # Q date
# The as.name() trick converts R character to Q symbol
# as.Date() converts R Date to Q date
Scheduling and Automation
For regular R → Q data pulls (end-of-day analytics, overnight batch jobs):
library(rkdb)
run_eod_analytics <- function(date_str) {
h <- open_connection("localhost", 5001)
on.exit(close_connection(h))
tryCatch({
# Pull data
data <- execute(h, paste0(
"select from eod_summary where date=", date_str
))
if (nrow(data) == 0) {
cat("No data for", date_str, "\n")
return(invisible(NULL))
}
# Run analytics
results <- compute_factor_scores(data)
# Write back
execute(h, "set", "r_factor_scores", results)
execute(h, paste0("r_factor_scores: update date:", date_str,
" from r_factor_scores"))
# Append to persistent table
execute(h, "`factor_scores upsert r_factor_scores")
cat("EOD analytics complete for", date_str, "\n")
}, error = function(e) {
cat("Error in EOD analytics:", conditionMessage(e), "\n")
})
}
# Run for today
run_eod_analytics(format(Sys.Date(), "%Y.%m.%d"))
Performance Notes
Q will outperform R on any query over large datasets. R’s data.frame operations get slow past a few million rows; Q’s columnar storage and query engine handle billions of rows comfortably. The right division of labor is:
- Filter and aggregate in Q (date ranges, symbol selection, OHLC, VWAP)
- Analyze in R (regressions, factor models, rolling stats, visualization)
- Write results back to Q (persist computed signals for the trading system)
Don’t pull raw tick data into R and filter it there. Pull the Q-computed aggregates. Your laptop will thank you.
A query like this runs in milliseconds in Q; the equivalent in R against a data.frame with 100M rows does not:
/ Q: ~10ms on 100M row trade table
select vwap: size wavg price by sym from trade where date=2024.01.15
# R equivalent: slow
# trade_df %>% group_by(sym) %>% summarise(vwap = weighted.mean(price, size))
# Much better: let Q do it, just receive the result in R
IDE Support: What Actually Works in 2026
For most of Q’s history, the IDE situation was: use a text editor, run your code in the q REPL, repeat. This was not considered a problem because the alternative — the q IDE that shipped with older KX products — was not obviously better.
The landscape has genuinely improved. There are now real options with real features: syntax highlighting, autocomplete, code navigation, debugging integration, and linters that tell you things you didn’t know you were doing wrong. This chapter covers what actually works, what sounds good in a README but doesn’t in practice, and the one setup that’s worth the configuration time.
VS Code with the q Extension
The kdb extension from KX (marketplace ID: KxSystems.kdb) is the current best-in-class option. It’s actively maintained, it’s free, and it works.
Installation
Install from the VS Code marketplace:
ext install KxSystems.kdb
Or from the command palette: Extensions: Install Extensions → search “kdb”.
What You Get
Syntax highlighting: Accurate Q syntax highlighting including functional forms, system commands, iterators. Not the “we colored the keywords” approach — it correctly handles Q’s context-dependent syntax.
Code completion: Autocomplete for built-in Q functions and operators. Less impressive for user-defined namespaces unless you’re connected to a running process (see below).
Live connection: The extension can connect to a running kdb+ process. With a live connection, you get:
- Execute selected code against the connected process (Cmd/Ctrl+Return)
- Q console output in VS Code’s output panel
- Variable completion from the connected process’s namespace
q scratch pad: Open a .q file, connect to a process, and evaluate expressions interactively. This is the workflow that replaces the REPL loop.
Configuration
In your VS Code settings.json:
{
"kdb.servers": [
{
"serverName": "dev",
"host": "localhost",
"port": 5001,
"auth": false,
"tls": false
},
{
"serverName": "prod",
"host": "prod-kdb.internal",
"port": 5001,
"auth": true,
"tls": true
}
],
"kdb.defaultServer": "dev",
"kdb.hideSubscriptionWarning": false
}
The server list appears in the KDB panel in the activity bar. Click to connect, right-click to set as default.
The Workflow That Actually Works
The productive VS Code + Q workflow:
- Keep your
.qsource files open in the editor - Connect to a local development q process
- Highlight a block and hit Cmd+Return to execute it
- Check results in the output panel or the Q console
- Iterate
For namespace navigation: Go to Definition works for functions defined in the connected process. For functions defined in files you haven’t loaded yet, it falls back to text search.
Debugging
The extension doesn’t yet support the Q debugger interactively (breakpoints, step-through). For debugging, you’re still using:
/ Q's built-in debug mode when a function errors
\e 1 / enable debug mode
/ Then when a function errors, you're dropped into its context
/ .z.ex contains the failing expression
/ .z.ey contains the argument
Or the increasingly popular approach of extracting functions to test them in isolation rather than debugging in-place.
IntelliJ / JetBrains with the Q Plugin
The q4intellij plugin provides Q support for IntelliJ IDEA and the JetBrains family. If your team is already JetBrains-based, this is worth considering.
Install via JetBrains Marketplace: search “q language support” or “kdb+”.
What you get:
- Syntax highlighting and formatting
- Structure view (namespace/function tree)
- Go to Definition for local code
- Remote REPL integration
What you don’t get: the same depth of kdb+ integration as the KX VS Code extension. The KX extension benefits from being written by the language vendor.
Emacs and Vim: The Long Game
Emacs has q-mode, maintained with the same level of enthusiasm you’d expect from Emacs maintainers (which is to say: it works, has been working for years, and the documentation is a README in a GitHub repo from 2014 that still applies).
;; q-mode for Emacs
;; Install via MELPA:
(use-package q-mode
:ensure t
:mode "\\.q\\'"
:config
(setq q-host "localhost")
(setq q-port 5001))
With q-mode:
C-c C-l: Load current buffer into QC-c C-r: Send region to QC-c C-z: Open Q REPL
Vim users have vim-q, which provides syntax highlighting and some REPL integration. If you’re already fluent in Vim, this keeps you in Vim. If you’re not, the VS Code extension is less friction.
Notepad++ and Sublime Text
Both have syntax highlighting plugins that make Q code readable. Neither has REPL integration or anything beyond colorization. Useful for reading Q code on a machine where you haven’t installed a proper IDE; not a development workflow.
qStudio: The Dedicated Option
qStudio (from timestored.com) is a dedicated kdb+/q IDE. It’s been around for a long time and has features the VS Code extension doesn’t:
- Table viewer: Paginated, sortable display of Q tables — much better than reading raw console output
- SQL-style query editor: Write Q queries with a GUI for table results
- Charts: Built-in charting of Q result sets
- Schema browser: Tree view of tables, functions, variables across connected processes
Free version available with most features. Pro version adds more chart types and advanced features.
The honest comparison: qStudio is better for querying and exploring kdb+ data interactively. VS Code with the KX extension is better for developing and maintaining Q code. Many teams use both — qStudio for ad hoc analysis, VS Code for the actual codebase.
Jupyter with Q
If your team uses Jupyter heavily, there’s a kernel for Q:
pip install jupyterq
# or via conda
conda install -c kx jupyterq
This gives you Q code cells in Jupyter notebooks. Useful for:
- Exploratory analysis with Q + Python mixed in the same notebook
- Sharing Q analyses with colleagues who want a notebook interface
- Teaching Q in an interactive environment
# In a Jupyter notebook cell — magic syntax after installing jupyterq
%%q
/ This is a Q cell
select vwap: size wavg price by sym from trade where date=.z.d
The limitation: Jupyter’s model of discrete cell execution doesn’t map perfectly onto Q’s stateful, incremental development style. It works, but Q developers tend to find the Q REPL more natural for iteration.
The Recommended Setup
For a developer who writes Q professionally:
-
VS Code + KX extension: Primary development environment. Connected to a local q process. Use it for all code editing, namespace navigation, and interactive execution.
-
qStudio: For data exploration and ad hoc queries, especially when looking at large table results you want to sort/filter interactively.
-
Terminal Q REPL: Still useful for quick experiments and for running long-lived processes. The VS Code extension doesn’t replace the terminal q session; it complements it.
-
Git: Not an IDE feature, but: version control your Q code.
.qand.kfiles belong in version control. The Q shop that doesn’t do this is the Q shop that loses code.
Linting and Formatting
The VS Code KX extension includes a linter (via kdb-lsp). It catches:
- Type errors it can detect statically
- Unused variables
- Shadowed names
- Style issues (inconsistent spacing, missing semicolons)
For CI integration, qls (Q Language Server) can be run headlessly:
# Run linter in CI
qls --check src/*.q
We’ll cover linting and tooling in depth in the Developer Tooling chapter.
A Note on Color Themes
Q’s syntax is unusual enough that generic dark themes don’t highlight it well. The KX extension comes with Q-specific token colors; use them, or use a theme that supports custom token colors and add:
// VS Code settings.json
{
"editor.tokenColorCustomizations": {
"[One Dark Pro]": {
"textMateRules": [
{
"scope": "keyword.operator.q",
"settings": { "foreground": "#56B6C2" }
},
{
"scope": "entity.name.function.q",
"settings": { "foreground": "#61AFEF" }
}
]
}
}
}
This is more optional than the preceding advice, but staring at poorly highlighted Q code for eight hours is an unforced error.
Keybindings for Q Development
The VS Code KX extension’s most useful keybindings:
| Action | macOS | Linux/Windows |
|---|---|---|
| Execute selected code | Cmd+Return | Ctrl+Return |
| Execute current line | Cmd+Return (no selection) | Ctrl+Return |
| Execute entire file | Cmd+Shift+Return | Ctrl+Shift+Return |
| Connect to server | via KDB panel | via KDB panel |
| Toggle Q terminal | Cmd+J | Ctrl+J |
The execute-selection shortcut is the one you’ll use constantly. The workflow is: write a function definition, select it, hit Cmd+Return, see it load into the connected q process. Adjust, re-select, repeat. Much faster than copy-pasting into a terminal REPL.
For loading whole files, a useful VS Code task:
// .vscode/tasks.json
{
"version": "2.0.0",
"tasks": [
{
"label": "Load Q file",
"type": "shell",
"command": "echo '\\\\l ${file}' | nc localhost 5001",
"group": "build",
"presentation": { "reveal": "silent" },
"keybinding": "ctrl+shift+l"
}
]
}
Managing Multiple kdb+ Connections
Real Q development involves multiple environments — at minimum a local dev process and a shared test environment. The KX extension handles this cleanly.
In settings.json, define all your environments:
{
"kdb.servers": [
{
"serverName": "local-dev",
"serverAlias": "dev",
"host": "localhost",
"port": 5001,
"auth": false
},
{
"serverName": "shared-test",
"serverAlias": "test",
"host": "test-kdb.internal",
"port": 5001,
"auth": true,
"username": "dev"
},
{
"serverName": "hdb-prod-readonly",
"serverAlias": "hdb",
"host": "hdb.internal",
"port": 5010,
"auth": true,
"username": "readonly"
}
]
}
The extension shows these in the KDB panel. Active connection is shown in the status bar. Switching connections is one click. Never accidentally execute against prod by always checking the status bar before running anything that writes.
For the test and prod connections that require passwords, the extension prompts on connect rather than storing passwords in settings files, which means your settings.json can safely be in version control.
Code Snippets for Common Q Patterns
VS Code snippets save significant typing for boilerplate-heavy Q patterns. Add these to your Q snippets file (Cmd+Shift+P → “Configure User Snippets” → “q”):
{
"Q function with named args": {
"prefix": "fn",
"body": [
"${1:funcName}: {[${2:args}]",
" ${0}",
" }"
],
"description": "Q function with named parameters"
},
"Q namespace block": {
"prefix": "ns",
"body": [
"\\\\d .${1:namespace}",
"",
"${0}",
"",
"\\\\d ."
],
"description": "Q namespace block"
},
"Q select statement": {
"prefix": "sel",
"body": [
"select ${2:cols}",
"from ${1:table}",
"where ${0}"
],
"description": "Q select"
},
"Q select by": {
"prefix": "selby",
"body": [
"select ${3:aggregates}",
"from ${1:table}",
"where ${2:conditions}",
"by ${0:groups}"
],
"description": "Q select with by"
},
"Q protected eval": {
"prefix": "trp",
"body": [
"@[${1:func}; ${2:args}; {[e] ${0}}]"
],
"description": "Protected evaluation"
},
"Q table definition": {
"prefix": "tbl",
"body": [
"${1:name}: ([]",
" ${2:col1}: \\`${3:type}\\$();",
" ${0}",
" )"
],
"description": "Q table schema"
},
"Q timer": {
"prefix": "timer",
"body": [
".z.ts: {[]",
" ${0}",
" }",
"\\\\t ${1:1000}"
],
"description": "Q timer setup"
}
}
The kdb+ REPL in VS Code Terminal
The terminal REPL and the VS Code extension aren’t mutually exclusive. A useful setup: split pane with the source file on the left and a terminal running q on the right. The extension sends code to the connected process; the terminal shows output and lets you run ad hoc queries.
# Start a q session in VS Code's integrated terminal
# Terminal → New Terminal, then:
q -p 5001
# Now the extension can connect to this same process
# Code you execute via Cmd+Return appears in the terminal output
The advantage: you can see exactly what’s happening in the q process — error messages, timer output, debug prints — while editing in the left pane.
IDE Comparison Summary
| Feature | VS Code + KX | qStudio | IntelliJ + plugin | Emacs/Vim |
|---|---|---|---|---|
| Syntax highlighting | ✓ | ✓ | ✓ | ✓ |
| Live connection | ✓ | ✓ | ✓ | ✓ |
| Autocomplete (builtins) | ✓ | limited | ✓ | limited |
| Autocomplete (connected) | ✓ | ✓ | partial | — |
| Table result viewer | basic | ✓✓ | basic | — |
| Linting (qls) | ✓ | — | — | — |
| Charts | — | ✓ | — | — |
| Schema browser | ✓ | ✓ | — | — |
| Git integration | ✓✓ | — | ✓✓ | ✓ |
| Actively maintained | ✓ | ✓ | partial | community |
| Cost | free | free/paid | free/paid | free |
The recommendation hasn’t changed: VS Code + KX extension for development, qStudio for data exploration. But the right answer is whatever you’ll actually use — a well-configured Emacs setup beats a poorly-configured VS Code setup every time.
Web Frameworks and Q: Serving Data to the Outside World
Q has a built-in HTTP server. This is both better and worse than you expect.
It’s better because it works, it’s fast, and it’s already running alongside your q process with a one-line configuration change. It’s worse because it’s not a web framework — it’s a raw HTTP server with a callback, and if you want routing, middleware, authentication, JSON serialization, and all the other things a real web framework provides, you’re either building them or wrapping q with something external.
The two viable approaches:
- q’s built-in HTTP server: Minimal, fast, good for simple APIs and dashboards. Surprisingly capable when you understand how it works.
- External web framework wrapping q via IPC: Python (FastAPI), Node.js, Go — your choice — sits in front of kdb+ and translates HTTP requests to IPC calls. More operational overhead, much more flexibility.
Q’s Built-In HTTP Server
Start q with a port and it also accepts HTTP:
q -p 5001
That’s it. The process now accepts both Q IPC connections and HTTP requests on the same port. An HTTP GET to localhost:5001?query=2+2 returns… something. By default it returns Q’s text representation, which is approximately useful.
The .z.ph Handler
.z.ph is the HTTP GET handler. It receives the raw query string and returns whatever you give it:
/ Default .z.ph strips the query and evaluates it
/ (this is the built-in behavior, not something you write)
/ Override with your own handler
.z.ph: {[x]
/ x is the raw HTTP request as a string
/ Parse the query parameter
qry: .h.mp x; / parse into a dict of params
/ Execute if a 'q' parameter is present
result: $["q" in key qry;
.Q.s value qry`q; / evaluate and format
"No query parameter"];
/ Return HTTP response
"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\n", result
}
Serving JSON
The moment where this becomes actually useful — returning JSON for real API consumers:
/ JSON serialization/deserialization (built-in since kdb+ 3.x)
.j.j / serialize to JSON
.j.k / deserialize from JSON
/ Simple JSON API endpoint
.z.ph: {[x]
/ Parse query parameters
params: .h.mp x;
/ Route based on 'endpoint' parameter
response: $[
"endpoint" in key params;
handleRequest params;
"404 Not Found"
];
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nAccess-Control-Allow-Origin: *\r\n\r\n",
.j.j response
}
handleRequest: {[params]
ep: params`endpoint;
$[ep ~ "trades"; getTrades params;
ep ~ "quotes"; getQuotes params;
ep ~ "summary"; getSummary params;
(`error; "Unknown endpoint")]
}
getTrades: {[params]
sym: `$params`sym;
dt: "D"$params`date;
select time, sym, price, size
from trade
where date=dt, sym=sym
}
Test with curl:
curl "http://localhost:5001?endpoint=trades&sym=AAPL&date=2024.01.15"
POST Handler and .z.pp
For POST requests with JSON bodies:
.z.pp: {[x]
/ x contains the HTTP headers and body
/ Parse the JSON body
body: .j.k last "\r\n\r\n" vs x;
/ Handle the request
result: processPost body;
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n",
.j.j result
}
processPost: {[body]
/ body is now a Q dictionary
$[`action in key body;
executeAction body;
`error`message!("missing_field"; "action is required")]
}
executeAction: {[body]
action: body`action;
$[action ~ "insert"; insertRecord body;
action ~ "delete"; deleteRecord body;
`error`message!("unknown_action"; action)]
}
A Real Example: OHLC API
/ OHLC data API — the kind of thing a dashboard or charting library calls
.z.ph: {[x]
params: .h.mp x;
result: @[handleApi; params; {[e] `status`error!("error"; string e)}];
headers: "HTTP/1.1 200 OK\r\n";
headers,: "Content-Type: application/json\r\n";
headers,: "Access-Control-Allow-Origin: *\r\n\r\n";
headers, .j.j result
}
handleApi: {[params]
/ Validate required parameters
required: `sym`date;
missing: required where not required in key params;
if[count missing; '"missing parameters: ", " " sv string missing];
sym: `$params`sym;
dt: "D"$params`date;
bins: `long$@[value; `$"$[`bins in key params; params`bins; "78"]; 78];
/ OHLC in time buckets
ohlc: select
open: first price,
high: max price,
low: min price,
close: last price,
volume: sum size,
vwap: size wavg price
from trade
where date=dt, sym=sym
by time: bins xbar time.minute;
/ Convert to list-of-records for JSON
`sym`date`bars!(sym; dt; flip ohlc)
}
Calling this from JavaScript:
const response = await fetch(
'http://localhost:5001?sym=AAPL&date=2024.01.15&bins=30'
);
const data = await response.json();
// data.bars contains {time: [...], open: [...], high: [...], ...}
Embedded HTML Dashboard
q can serve HTML directly. The .h namespace has utilities for generating HTML tables:
.z.ph: {[x]
params: .h.mp x;
$["page" in key params;
servePage params`page;
serveIndex[]
]}
serveIndex: {
html: "<html><body>";
html,: "<h1>kdb+ Dashboard</h1>";
html,: "<p><a href='?page=trades'>Trades</a> | ";
html,: "<a href='?page=summary'>Summary</a></p>";
html,: "</body></html>";
"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n", html
}
servePage: {[page]
data: $[page ~ "trades";
select[-100] from trade where date=.z.d;
page ~ "summary";
select last price, sum size by sym from trade where date=.z.d;
([]) "Unknown page"];
html: "<html><body>", .h.hb[`p; page], .h.htc[`table; .h.ht data], "</body></html>";
"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n", html
}
.h.ht renders a Q table as an HTML table. It’s not beautiful but it works and it’s one function call.
FastAPI as a Gateway
For production APIs with real requirements (authentication, rate limiting, request validation, proper error handling), put a Python FastAPI server in front of kdb+:
Client → FastAPI → kdb+ (IPC)
The FastAPI layer handles HTTP concerns; kdb+ handles data concerns.
# gateway.py
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from typing import Optional
import pykx as kx
from datetime import date
import os
# Connection pool (simplified)
q_connections = []
Q_HOST = os.getenv("KDB_HOST", "localhost")
Q_PORT = int(os.getenv("KDB_PORT", "5001"))
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: create connections
for _ in range(4):
conn = kx.SyncQConnection(host=Q_HOST, port=Q_PORT)
q_connections.append(conn)
yield
# Shutdown: close connections
for conn in q_connections:
conn.close()
app = FastAPI(
title="kdb+ Gateway API",
description="REST API for kdb+ time-series data",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET"],
allow_headers=["*"],
)
def get_q():
"""Round-robin connection from pool."""
import random
return random.choice(q_connections)
@app.get("/trades")
async def get_trades(
sym: str = Query(..., description="Symbol, e.g. AAPL"),
date: date = Query(..., description="Date in YYYY-MM-DD format"),
limit: int = Query(1000, ge=1, le=10000)
):
"""Get trades for a symbol on a date."""
q = get_q()
try:
result = q(
"{[s;d;n] select[-n] time, sym, price, size from trade where date=d, sym=s}",
kx.SymbolAtom(sym),
kx.DateAtom(date),
kx.LongAtom(limit)
)
return result.pd().to_dict(orient="records")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/ohlc")
async def get_ohlc(
sym: str = Query(..., description="Symbol"),
date: date = Query(..., description="Date"),
bins: int = Query(30, ge=1, le=390, description="Minutes per bar")
):
"""Get OHLC bars for a symbol."""
q = get_q()
try:
result = q(
"""{[s;d;b]
select
open: first price,
high: max price,
low: min price,
close: last price,
volume: sum size,
vwap: size wavg price
from trade
where date=d, sym=s
by time: b xbar time.minute
}""",
kx.SymbolAtom(sym),
kx.DateAtom(date),
kx.LongAtom(bins)
)
df = result.pd()
df.index = df.index.astype(str) # serialize time index
return df.reset_index().to_dict(orient="records")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/symbols")
async def get_symbols():
"""List available symbols."""
q = get_q()
try:
syms = q("exec distinct sym from trade where date=.z.d")
return {"symbols": [str(s) for s in syms.py()]}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
"""Health check — also tests kdb+ connectivity."""
q = get_q()
try:
result = q("1b")
return {"status": "ok", "kdb_connected": bool(result.py())}
except Exception as e:
return {"status": "degraded", "error": str(e)}
Run it:
pip install fastapi uvicorn pykx
uvicorn gateway:app --host 0.0.0.0 --port 8000 --workers 4
Test:
curl "http://localhost:8000/trades?sym=AAPL&date=2024-01-15"
curl "http://localhost:8000/ohlc?sym=AAPL&date=2024-01-15&bins=30"
curl "http://localhost:8000/symbols"
And you get automatic OpenAPI docs at http://localhost:8000/docs.
Node.js: The Web Frontend’s Native Gateway
If your consumers are JavaScript applications, Node.js as the gateway makes the data pipeline feel native:
// gateway.js
import express from 'express';
import { connect } from 'node-q';
import { promisify } from 'util';
const app = express();
// Create kdb+ connection pool
async function createConnection() {
return new Promise((resolve, reject) => {
const conn = connect({host: 'localhost', port: 5001});
conn.on('connect', () => resolve(conn));
conn.on('error', reject);
});
}
let qConn;
(async () => {
qConn = await createConnection();
console.log('Connected to kdb+');
})();
function executeQ(query, ...args) {
return new Promise((resolve, reject) => {
qConn.k(query, ...args, (err, result) => {
if (err) reject(err);
else resolve(result);
});
});
}
app.get('/trades', async (req, res) => {
const { sym, date } = req.query;
if (!sym || !date) {
return res.status(400).json({error: 'sym and date are required'});
}
try {
const result = await executeQ(
'{[s;d] select time, sym, price, size from trade where date=d, sym=s}',
sym, new Date(date)
);
res.json(result);
} catch (err) {
res.status(500).json({error: err.message});
}
});
app.listen(3000, () => console.log('Gateway on port 3000'));
Choosing Your Approach
| Approach | Use when |
|---|---|
| q built-in HTTP | Simple dashboards, internal tooling, prototypes |
| FastAPI gateway | External APIs, need authentication/validation, Python team |
| Node.js gateway | Primary consumers are web apps, team is JS-native |
| Go gateway | High-throughput proxy, multiple downstream kdb+ processes |
The q built-in HTTP server is underrated for internal tooling — the zero-infrastructure story is real. For anything customer-facing or requiring proper auth, put a real web framework in front.
Q IPC Deep Dive: The Protocol, the Patterns, and the Pitfalls
The IPC protocol is the foundation everything in this book rests on. Every library we’ve discussed — qPython, pykx, kdbplus for Rust, rkdb for R — is ultimately speaking this protocol. Understanding it once means you can debug integration issues at the wire level, write your own client in any language, and understand why certain patterns are faster or safer than others.
The Wire Protocol
Q’s IPC uses a simple binary protocol over TCP. A message has a fixed 8-byte header followed by a serialized Q value.
Header Structure
Byte 0: Endianness (0x01 = little-endian, 0x00 = big-endian)
Byte 1: Message type
0x00 = async (fire and forget)
0x01 = sync (request, expects response)
0x02 = response (reply to sync request)
Byte 2: Compression flag (0x01 if body is compressed)
Byte 3: Reserved (0x00)
Bytes 4-7: Total message length (little-endian int32, includes header)
After the header: serialized Q data in KDB’s internal binary format.
You can observe this with a raw TCP dump:
# Start q on port 5001, connect with nc, and observe the handshake
# (The connection handshake sends/receives capability strings)
Or from Q itself, capture the raw bytes:
/ Serialize a Q value to bytes
-8! 42 / integer 42 as byte sequence
/ Result: 0x0100000009000000fa2a000000
/ Deserialize
-9! 0x0100000009000000fa2a000000
/ Result: 42
The -8! (serialize) and -9! (deserialize) system functions are your wire-format debugging tools. Every Q value has a deterministic binary representation.
Serialization Format
/ See the raw bytes of various types
-8! 3.14 / float: 0x010000000d000000070182d497855e0940
-8! `hello / symbol: 0x01000000100000000568656c6c6f00
-8! "hello" / char list: 0x010000000e00000006000500000068656c6c6f
-8! 1 2 3 / long list: 0x010000001800000006000300000001000000000000000200000000000000...
-8! ([] a:1 2; b:`x`y) / table
Type bytes: 0x01 = boolean, 0x04 = byte, 0x05 = short, 0x06 = int, 0x07 = long, 0x08 = real, 0x09 = float, 0x0a = char, 0x0b = symbol. Negative values (0xfb, 0xfc…) are atom equivalents of the corresponding list types.
You don’t need to implement this by hand (the libraries do it), but knowing it exists means you can debug with Wireshark or tcpdump when something goes wrong.
Handles and Connection Lifecycle
/ Opening a handle
h: hopen `::5001 / localhost:5001, no auth
h: hopen `::5001:user:pass / with auth
h: hopen (`:myserver; 5001; "user:pass"; 10000) / with timeout (ms)
/ Check handle type
type h / -7h = int handle
/ Send synchronous message (blocking)
result: h "select count from trade where date=.z.d"
/ Send asynchronous message (non-blocking, no response)
neg[h] "slow_background_task[]"
/ Close
hclose h
/ Check if handle is open
.z.W / dictionary of open handles -> their send buffer size
h in key .z.W / true if h is open
Timeouts
Q’s default IPC has no timeout — a hung remote process will block your q process indefinitely. For production code, always use timeout:
/ Synchronous call with timeout (kdb+ 4.1+)
h: hopen (`:server; 5001; ""; 5000) / 5 second connect timeout
/ Per-query timeout using .Q.trp
.Q.trp[{h "slow_query[]"}; (); {[err] 'err}]
/ Or with system-level timeout control
system "T 30" / set 30 second IPC timeout globally (not recommended)
The timeout on hopen is a connection timeout, not a query timeout. For query timeouts, you need either a gateway that enforces them or the async pattern described below.
Synchronous vs Asynchronous: The Key Trade-off
Synchronous (Default)
/ Client sends query, blocks until response
result: h "count trade"
/ h is blocked during query execution — can't send another query on this handle
Simple, safe, familiar. The bottleneck: one outstanding request per handle. For high-throughput scenarios, you need either multiple handles or the async pattern.
Asynchronous (Fire-and-Forget)
/ Client sends message, returns immediately, no response
neg[h] ".u.upd[`trade; data]"
Used by tickerplants to publish updates to subscribers. Very fast — no round-trip. But: if the message errors on the server, you’ll never know. Use for pub/sub and logging, not for queries that need results.
Async with Callback (The Advanced Pattern)
kdb+ 4.1 introduced deferred sync, which gives you async queries that still get responses:
/ Server-side setup: handle the deferred callback
.z.ps: {[x]
/ x is the incoming async message
/ For deferred sync, x is (neg handle; function; args)
if[10h = type x;
/ This is a string eval, just execute
@[value; x; {[e] 'e}];
:()
];
/ Deferred pattern: (response_handle; function; args)
if[99h = type x;
h: neg x 0; / response handle
fn: x 1; / function
args: x 2; / arguments
result: @[fn; args; {[e] 'e}];
h result; / send result back asynchronously
:()
]
}
/ Client-side: send async, receive via .z.ps on client
/ The client becomes a listener too
.z.ps: {[x] show x} / client's async handler
/ Send deferred request:
neg[h] (neg[.z.w]; `myFunction; (`AAPL; 2024.01.15))
/ The server calls myFunction, sends result back to client async
This is how you build non-blocking, high-throughput query patterns. The complication is that your client now needs a message loop.
Gateway Patterns
A gateway is a q process that sits in front of other q processes — routing queries, merging results, managing permissions. The canonical gateway patterns:
Simple Synchronous Gateway
/ gateway.q
\p 5000
/ Backend connections
backends: `hdb`rdb!5001 5002;
handles: hopen each `::/:' string backends;
/ Route queries to appropriate backend
route: {[query]
/ Simple routing: queries mentioning historical tables → hdb
$["hdb" in query;
handles`hdb;
handles`rdb] "\"", query, "\""
}
/ Query handler
.z.pg: {[x]
h: route x;
h x / forward and return result
}
Scatter-Gather: Fan-Out Across Partitions
The pattern for querying across multiple kdb+ processes and merging results:
/ Query multiple processes, merge results
scatterQuery: {[handles; query]
/ Send async to all, collect responses
results: handles @\: query;
/ Merge (appropriate for tables)
raze results
}
/ Example with multiple date-partitioned HDB processes
hdb_handles: `::5001`::5002`::5003; / each has different date ranges
h1: hopen `::5001;
h2: hopen `::5002;
h3: hopen `::5003;
merged: raze {h "select from trade where date=.z.d"} each (h1; h2; h3)
For true parallel execution, use the async pattern:
/ Fan-out asynchronously
fanOut: {[handles; query]
/ Send to all without waiting
neg[handles] @\: query;
/ Collect responses
results: ();
do[count handles;
results,: enlist .z.ps[] / wait for each response
];
raze results
}
Protected Evaluation
Always wrap remote execution in protected eval to prevent server crashes from propagating:
/ Server-side: protect query execution
.z.pg: {[x]
/ Protected execution
result: @[value; x; {[err] (`error; err)}];
/ Check if error
$[(`error ~ type result) and 2 = count result;
'"remote error: ", string result 1;
result]
}
The @[f; args; err_handler] pattern is Q’s try-catch. Use it on any query that comes from an external source.
Message Handlers: The Full Set
These are the callbacks Q calls when IPC events occur:
.z.pg / synchronous GET handler (client sent sync message)
.z.ps / asynchronous message handler (client sent async message)
.z.ph / HTTP GET handler
.z.pp / HTTP POST handler
.z.pw / authentication handler — return 1b to allow, 0b to deny
.z.po / open handler — called when a handle is opened
.z.pc / close handler — called when a handle is closed
.z.wo / websocket open handler
.z.wc / websocket close handler
.z.ws / websocket message handler
Connection Tracking
/ Track active connections
conns: ([handle: `long$()] host: `(); user: `(); opened: `timestamp$())
.z.po: {[h]
/ h is the new handle
`conns upsert (h; .z.a; .z.u; .z.p)
}
.z.pc: {[h]
/ h is the closing handle
delete from `conns where handle=h
}
/ See who's connected
conns
/ handle | host user opened
/ -------+-----------------------------------------
/ 8 | 127.0.0.1 alice 2024.01.15T09:30:01
/ 9 | 10.0.0.1 bob 2024.01.15T09:30:15
Authentication
/ .z.pw: called on connection attempt
/ Arguments: username (symbol), password (string/symbol)
/ Return: 1b to allow, 0b to deny
/ Simple user/password table
users: ([ user:`alice`bob`readonly]
pass:("secret1"; "secret2"; "readpass");
perms:`admin`admin`read)
.z.pw: {[user; pass]
$[user in key users;
pass ~ users[user; `pass];
0b]
}
/ Role-based: store role in connection handle metadata
.z.pg: {[x]
/ Check permission for this user's role
user: .z.u;
role: $[user in key users; users[user; `perms]; `none];
/ Block write operations for read-only users
if[role ~ `read;
if[any x like/: ("insert*"; "delete*"; "update*"; "*upsert*");
'"permission denied"]
];
value x
}
Compression
For large result sets, Q can compress IPC messages:
/ Enable compression for this connection (server-side)
/ Set compression level for outgoing messages
h set `compression`level!(1b; 6) / zlib compression, level 6
/ Or configure globally
.z.pg: {[x]
result: value x;
/ Compress results larger than 100KB
$[-1000000 < type result;
result; / small result, no compression
-19! result] / compress
}
Compression makes sense for large tables sent over WAN. For intranet queries, the CPU cost often outweighs the bandwidth saving.
Common Pitfalls
The Blocking Handle Problem
/ Wrong: using one handle for high-frequency queries
/ This serializes all requests
h: hopen `::5001
do[1000; result: h "expensive_query[]"]
/ Right: connection pool or async pattern
/ Multiple handles, round-robin
handles: hopen each 1000 # `::5001
idx: 0
getHandle: {handles idx mod: count handles; idx+:1; handles idx-1}
Message Size Limits
Q has a default message size limit of 2GB. For very large result sets, either:
- Paginate in Q:
select[-1000]gets last 1000 rows;select[1000;offset]for pagination - Stream results in chunks
- Compress the message
Handle Leaks
Forgetting to close handles is a slow resource leak:
/ Dangerous: handle opened but may not be closed on error
result: (hopen `::5001) "query"
/ If query errors, handle stays open
/ Safe pattern
safeQuery: {[host; port; query]
h: hopen (host; port; ""; 5000);
result: @[h; query; {[h; e] hclose h; 'e}[h]];
hclose h;
result
}
Endianness
Q handles endianness negotiation automatically during the connection handshake. You don’t need to worry about it unless you’re implementing the protocol from scratch — at which point you need to read the KX documentation on the wire format carefully. Most people are not doing this.
Debugging IPC Issues
/ See all open handles
.z.W / dict: handle -> buffer size
/ Handle information
h / just the integer
-1000 * h / this is a trick: negative handle for async
type h / should be -7h
/ Test connectivity without a real query
h "1b" / returns 1b on success, errors on failure
/ Check what's waiting in the send buffer
.z.W h / bytes waiting to be sent on handle h
/ Connection details about the *current* caller
.z.a / caller's IP address (within .z.pg / .z.ph)
.z.u / caller's username
.z.w / caller's handle (use neg[.z.w] to respond async)
For deep debugging, start q with verbose logging:
q -p 5001 -t 1000 2>&1 | tee q.log
The -t flag enables timer callbacks; the combined output to a log file gives you a timestamped record of messages and errors.
REST and Q: Building APIs Around kdb+/q
Most systems that need kdb+ data don’t speak kdb+ IPC. They speak HTTP. Your data science team wants a REST endpoint they can call from pandas. Your mobile app team wants JSON. The compliance system wants a webhook. None of these are going to implement the Q wire protocol.
This chapter is about building REST APIs that sit in front of kdb+ — properly, with authentication, error handling, sensible data types, and the kind of response format that doesn’t make your consumers write adapter code.
The Architecture Decision
Three viable approaches, each with a distinct trade-off:
1. Q’s built-in HTTP (.z.ph/.z.pp)
- Zero infrastructure: it’s already running
- Good for internal tooling and simple cases
- Manual JSON serialization, no framework features
- Covered in the Web Frameworks chapter
2. External gateway (FastAPI, Flask, Go, Node.js)
- Full web framework features: routing, middleware, validation, auth
- Proper OpenAPI/Swagger docs
- Separation of concerns: q does data, web framework does HTTP
- The right choice for external-facing APIs
3. KX Insights / KX Dashboard (commercial)
- KX’s commercial API management layer
- Not covered here; see KX documentation if you have a license
We’ll build a production-grade FastAPI gateway with authentication, OpenAPI docs, and proper error handling.
Designing the API
Before writing code, decide on your conventions. These decisions are hard to change later:
Date format: ISO 8601 (2024-01-15), not Q’s 2024.01.15. REST APIs should be format-agnostic; Q dates are an implementation detail.
Timestamps: ISO 8601 with UTC timezone (2024-01-15T09:30:00.123Z). Not Q timestamps, not Unix milliseconds.
Numbers: JSON doesn’t distinguish int from float. Document your precision guarantees. Prices are floats. Sizes are integers. Don’t mix them up.
Pagination: Use limit and offset (or cursor-based for time-series). Don’t return unbounded result sets.
Errors: Consistent error format: {"error": "description", "code": "machine_readable_code"}.
Symbol naming: Decide early whether API consumers pass "AAPL" or "AAPL.O" and stick with it. Your Q sym and the API sym may differ; the gateway is where you translate.
A Production FastAPI Gateway
# kdb_gateway/main.py
from fastapi import FastAPI, HTTPException, Depends, Query, Security
from fastapi.security.api_key import APIKeyHeader
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
from pydantic import BaseModel, Field
from typing import Optional
from datetime import date, datetime
import pykx as kx
import os
import logging
logger = logging.getLogger(__name__)
# ─── Configuration ──────────────────────────────────────────────────────────
KDB_HOST = os.getenv("KDB_HOST", "localhost")
KDB_PORT = int(os.getenv("KDB_PORT", "5001"))
KDB_USER = os.getenv("KDB_USER", "")
KDB_PASS = os.getenv("KDB_PASS", "")
API_KEYS = set(os.getenv("API_KEYS", "dev-key-123").split(","))
# ─── Connection Pool ──────────────────────────────────────────────────────────
class QPool:
def __init__(self, host: str, port: int, user: str, password: str, size: int = 4):
self._conns = []
self._idx = 0
self._host = host
self._port = port
self._user = user
self._password = password
self._size = size
def start(self):
for _ in range(self._size):
auth = f"{self._user}:{self._password}" if self._user else ""
conn = kx.SyncQConnection(
host=self._host,
port=self._port,
username=self._user or None,
password=self._password or None
)
self._conns.append(conn)
logger.info(f"Connected to kdb+ at {self._host}:{self._port} ({self._size} connections)")
def stop(self):
for conn in self._conns:
try:
conn.close()
except Exception:
pass
logger.info("kdb+ connections closed")
def get(self) -> kx.SyncQConnection:
conn = self._conns[self._idx % self._size]
self._idx += 1
return conn
def query(self, q_code: str, *args):
conn = self.get()
try:
if args:
return conn(q_code, *args)
return conn(q_code)
except Exception as e:
logger.error(f"kdb+ query error: {e}")
raise
pool = QPool(KDB_HOST, KDB_PORT, KDB_USER, KDB_PASS)
# ─── App Lifecycle ──────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
pool.start()
yield
pool.stop()
app = FastAPI(
title="kdb+ Market Data API",
description="REST API for accessing kdb+/q time-series market data",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
# ─── Authentication ──────────────────────────────────────────────────────────
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
async def require_api_key(api_key: str = Security(api_key_header)):
if api_key not in API_KEYS:
raise HTTPException(status_code=401, detail="Invalid or missing API key")
return api_key
# ─── Response Models ──────────────────────────────────────────────────────────
class Trade(BaseModel):
time: str
sym: str
price: float
size: int
class OHLCBar(BaseModel):
time: str
open: float
high: float
low: float
close: float
volume: int
vwap: float
class ErrorResponse(BaseModel):
error: str
code: str
# ─── Helpers ──────────────────────────────────────────────────────────────────
def serialize_result(result) -> list[dict]:
"""Convert pykx result to JSON-serializable list of dicts."""
import pandas as pd
import numpy as np
df = result.pd()
# Convert timestamps to ISO strings
for col in df.columns:
if df[col].dtype == 'datetime64[ns]':
df[col] = df[col].dt.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
elif df[col].dtype == 'object':
df[col] = df[col].astype(str)
return df.to_dict(orient="records")
def q_date(d: date) -> str:
"""Convert Python date to Q date literal."""
return d.strftime("%Y.%m.%d")
# ─── Endpoints ───────────────────────────────────────────────────────────────
@app.get("/health")
async def health():
"""Health check. Tests kdb+ connectivity."""
try:
result = pool.query("1b")
return {"status": "ok", "kdb_connected": True}
except Exception as e:
return JSONResponse(
status_code=503,
content={"status": "degraded", "kdb_connected": False, "error": str(e)}
)
@app.get(
"/trades",
response_model=list[Trade],
summary="Get trades for a symbol",
responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}}
)
async def get_trades(
sym: str = Query(..., description="Ticker symbol, e.g. AAPL", example="AAPL"),
date: date = Query(..., description="Trading date"),
limit: int = Query(1000, ge=1, le=10000, description="Maximum rows to return"),
offset: int = Query(0, ge=0, description="Row offset for pagination"),
_: str = Depends(require_api_key)
):
"""
Retrieve trades for a symbol on a given date.
Returns up to `limit` trades starting from `offset`, ordered by time.
"""
try:
result = pool.query(
"{[s;d;n;o] select[n;o] time, sym, price, size from trade where date=d, sym=s}",
kx.SymbolAtom(sym.upper()),
kx.DateAtom(date),
kx.LongAtom(limit),
kx.LongAtom(offset)
)
return serialize_result(result)
except kx.QError as e:
raise HTTPException(status_code=400, detail={"error": str(e), "code": "q_error"})
except Exception as e:
logger.exception("Failed to query trades")
raise HTTPException(status_code=500, detail={"error": "Internal error", "code": "internal"})
@app.get(
"/ohlc",
response_model=list[OHLCBar],
summary="Get OHLC bars",
)
async def get_ohlc(
sym: str = Query(..., description="Ticker symbol", example="AAPL"),
date: date = Query(..., description="Trading date"),
bars: int = Query(30, ge=1, le=390, description="Bar size in minutes"),
_: str = Depends(require_api_key)
):
"""
Retrieve OHLC (open/high/low/close) bars for a symbol.
`bars` is the bar duration in minutes. 30 gives 30-minute bars,
1 gives 1-minute bars (up to 390 for a full trading day).
"""
try:
result = pool.query(
"""{[s;d;b]
select
open: first price,
high: max price,
low: min price,
close: last price,
volume: sum `long$size,
vwap: size wavg price
from trade
where date=d, sym=s
by time: b xbar time.minute
}""",
kx.SymbolAtom(sym.upper()),
kx.DateAtom(date),
kx.LongAtom(bars)
)
return serialize_result(result)
except kx.QError as e:
raise HTTPException(status_code=400, detail={"error": str(e), "code": "q_error"})
except Exception as e:
logger.exception("Failed to query OHLC")
raise HTTPException(status_code=500, detail={"error": "Internal error", "code": "internal"})
@app.get("/symbols")
async def get_symbols(
date: Optional[date] = Query(None, description="Date to check symbols for (default: today)"),
_: str = Depends(require_api_key)
):
"""List available symbols, optionally filtered by date."""
q_date_arg = date or __import__('datetime').date.today()
try:
result = pool.query(
"{[d] exec distinct sym from trade where date=d}",
kx.DateAtom(q_date_arg)
)
return {"symbols": sorted([str(s) for s in result.py()])}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/vwap")
async def get_vwap(
date: date = Query(..., description="Trading date"),
syms: Optional[str] = Query(None, description="Comma-separated symbols, e.g. AAPL,MSFT"),
_: str = Depends(require_api_key)
):
"""
Compute VWAP for all symbols (or a subset) on a date.
Returns volume-weighted average price and total volume per symbol.
"""
try:
if syms:
sym_list = [s.strip().upper() for s in syms.split(",")]
result = pool.query(
"{[d;ss] select vwap: size wavg price, volume: sum size by sym from trade where date=d, sym in ss}",
kx.DateAtom(date),
kx.SymbolVector(sym_list)
)
else:
result = pool.query(
"{[d] select vwap: size wavg price, volume: sum size by sym from trade where date=d}",
kx.DateAtom(date)
)
return serialize_result(result)
except kx.QError as e:
raise HTTPException(status_code=400, detail={"error": str(e), "code": "q_error"})
# ─── POST: Execute Q (admin only) ────────────────────────────────────────────
class QQueryRequest(BaseModel):
query: str = Field(..., description="Q expression to evaluate")
ADMIN_KEYS = set(os.getenv("ADMIN_API_KEYS", "admin-key-secret").split(","))
async def require_admin_key(api_key: str = Security(api_key_header)):
if api_key not in ADMIN_KEYS:
raise HTTPException(status_code=403, detail="Admin access required")
return api_key
@app.post("/query")
async def execute_query(
request: QQueryRequest,
_: str = Depends(require_admin_key)
):
"""
Execute an arbitrary Q query. Admin access only.
Use this sparingly — prefer the typed endpoints above.
"""
try:
result = pool.query(request.query)
df = result.pd()
return {"result": df.to_dict(orient="records") if hasattr(df, 'to_dict') else str(result)}
except kx.QError as e:
raise HTTPException(status_code=400, detail={"error": str(e), "code": "q_error"})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Running It
pip install fastapi uvicorn pykx
# Development
uvicorn kdb_gateway.main:app --reload --port 8000
# Production
uvicorn kdb_gateway.main:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--log-level info
Environment Variables
export KDB_HOST=kdb.internal
export KDB_PORT=5001
export KDB_USER=apiuser
export KDB_PASS=secure_password
export API_KEYS="key1,key2,key3"
export ADMIN_API_KEYS="admin-secret"
Testing
# Health check
curl http://localhost:8000/health
# Get trades (with API key)
curl -H "X-API-Key: dev-key-123" \
"http://localhost:8000/trades?sym=AAPL&date=2024-01-15&limit=100"
# OHLC bars
curl -H "X-API-Key: dev-key-123" \
"http://localhost:8000/ohlc?sym=AAPL&date=2024-01-15&bars=30"
# VWAP for multiple symbols
curl -H "X-API-Key: dev-key-123" \
"http://localhost:8000/vwap?date=2024-01-15&syms=AAPL,MSFT,GOOG"
# OpenAPI docs (no auth needed)
open http://localhost:8000/docs
Rate Limiting
Add rate limiting with slowapi:
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get("/trades")
@limiter.limit("100/minute")
async def get_trades(request: Request, ...):
...
Caching
For expensive queries or endpoints where millisecond-fresh data isn’t needed:
from functools import lru_cache
import time
class TimedCache:
def __init__(self, ttl_seconds: int = 60):
self._cache = {}
self._ttl = ttl_seconds
def get(self, key: str):
if key in self._cache:
value, expires = self._cache[key]
if time.time() < expires:
return value
del self._cache[key]
return None
def set(self, key: str, value):
self._cache[key] = (value, time.time() + self._ttl)
symbol_cache = TimedCache(ttl_seconds=300) # 5-minute cache for symbol list
@app.get("/symbols")
async def get_symbols(...):
cache_key = f"symbols:{date}"
cached = symbol_cache.get(cache_key)
if cached:
return cached
result = pool.query(...)
response = {"symbols": sorted([str(s) for s in result.py()])}
symbol_cache.set(cache_key, response)
return response
Docker Deployment
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "kdb_gateway.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
services:
kdb-gateway:
build: .
ports:
- "8000:8000"
environment:
KDB_HOST: kdb
KDB_PORT: 5001
API_KEYS: "${API_KEYS}"
depends_on:
- kdb
restart: unless-stopped
kdb:
image: kxsys/kdb:latest
ports:
- "5001:5001"
command: ["-p", "5001"]
OpenAPI Integration
FastAPI generates OpenAPI (Swagger) docs automatically. At http://localhost:8000/docs you get an interactive API explorer, and at http://localhost:8000/openapi.json you get the schema for generating client libraries:
# Generate a Python client from the OpenAPI spec
pip install openapi-generator-cli
openapi-generator-cli generate \
-i http://localhost:8000/openapi.json \
-g python \
-o ./generated-client \
--package-name kdb_client
Your consumers get a typed client without writing any integration code. That’s the kind of thing that makes the Python team happy with you.
Streaming Integrations: Kafka, WebSockets, and Real-Time Q
kdb+/q was built for real-time data. The tick infrastructure — the tickerplant, real-time database, and historical database — is one of Q’s strongest features and the reason it dominates in high-frequency finance. What it wasn’t originally designed for is plugging into the broader streaming ecosystem: Kafka topics, WebSocket-hungry frontends, and the event-driven microservices architecture that everyone built in the 2020s.
This chapter is about making Q a first-class participant in that ecosystem — both as a consumer of external streams and as a producer of real-time data.
Q’s Native Tick Architecture (Brief Review)
Before plugging Q into Kafka, understand what Q already does natively:
Feed Handler → Tickerplant → RDB (real-time DB)
→ Subscribers (other q processes)
→ Log file → HDB (historical DB)
The tickerplant receives data, timestamps it, writes to a log, and publishes to subscribers via IPC. Each subscriber calls .u.sub to register and receives updates via .u.upd.
This is already a streaming architecture — it’s just Q-native. When you need to integrate with non-Q systems, you extend this pattern rather than replacing it.
Kafka Integration
Architecture
The two Kafka-Q integration patterns:
Q as consumer: A Q process subscribes to Kafka topics and ingests data into kdb+ tables. Useful for bringing external event streams into your time-series database.
Q as producer: A Q process (typically a tickerplant subscriber) publishes processed data to Kafka topics for consumption by other services.
KafkaQ
The KafkaQ library (from KX) provides Kafka integration for q:
# Install KafkaQ (requires librdkafka)
# macOS
brew install librdkafka
# Ubuntu/Debian
apt-get install librdkafka-dev
# Download KafkaQ
# https://github.com/KxSystems/kafka
# Place kafkaq.so/.dylib in your q path
/ Load KafkaQ
\l kafkaq.q
/ ─── Consumer ───────────────────────────────────────────────────────────────
/ Create a consumer with configuration
consumer: .kafka.newConsumer[
`bootstrap.servers`group.id`auto.offset.reset!
("localhost:9092"; "my-q-consumer"; "earliest")
]
/ Subscribe to topics
.kafka.subscribe[consumer; `trade_data`order_data]
/ Message handler — called for each incoming Kafka message
.kafka.recvMsg: {[consumer; msg]
/ msg is a dictionary: `topic`partition`offset`key`data`timestamp
topic: msg`topic;
payload: .j.k msg`data; / deserialize JSON payload
/ Route to appropriate handler
$[topic ~ "trade_data"; ingestTrade payload;
topic ~ "order_data"; ingestOrder payload;
/ default
-1 "Unknown topic: ", string topic]
}
/ Start polling (call in a timer or loop)
.kafka.poll[consumer; 1000] / poll with 1000ms timeout
/ ─── Trade ingestion ─────────────────────────────────────────────────────────
/ In-memory trade table (replace with actual schema)
trade: ([] time:`timestamp$(); sym:`symbol$(); price:`float$(); size:`long$())
ingestTrade: {[payload]
/ Convert Kafka message to Q row
row: (
`timestamp$payload`timestamp;
`$payload`sym;
`float$payload`price;
`long$payload`size
);
`trade insert row;
/ Optional: forward to tickerplant
if[0 < count tp_handle; neg[tp_handle] (`.u.upd; `trade; row)]
}
Q as Kafka Producer
Publishing Q data to Kafka:
\l kafkaq.q
/ Create a producer
producer: .kafka.newProducer[
`bootstrap.servers`acks!
("localhost:9092"; "all")
]
/ Publish tickerplant updates to Kafka
/ Add this to your RDB subscriber
.u.upd: {[t; data]
/ Standard RDB upsert
t insert data;
/ Also publish to Kafka
publishToKafka[t; data]
}
publishToKafka: {[table; data]
topic: "kdb-", string table;
/ Convert each row to JSON and publish
rows: flip data;
do[count rows 0;
row: rows[; i];
msg: .j.j `table`data!(table; row);
.kafka.publish[producer; `$topic; ""; msg]
]
}
Python Kafka Bridge
If installing KafkaQ feels like more than you want to manage, a Python bridge is a clean alternative:
# kafka_bridge.py
# Consumes from Kafka, inserts into kdb+ via IPC
from kafka import KafkaConsumer
import pykx as kx
import json
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
def run_bridge(
kafka_brokers: str,
kafka_topic: str,
kdb_host: str,
kdb_port: int,
kdb_table: str
):
consumer = KafkaConsumer(
kafka_topic,
bootstrap_servers=kafka_brokers,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='latest',
group_id='kdb-bridge'
)
q = kx.SyncQConnection(host=kdb_host, port=kdb_port)
logger.info(f"Bridge running: {kafka_topic} → kdb+:{kdb_port}:{kdb_table}")
batch = []
batch_size = 100
try:
for message in consumer:
payload = message.value
batch.append(payload)
if len(batch) >= batch_size:
flush_batch(q, kdb_table, batch)
batch = []
except KeyboardInterrupt:
if batch:
flush_batch(q, kdb_table, batch)
finally:
consumer.close()
q.close()
def flush_batch(q: kx.SyncQConnection, table: str, batch: list):
"""Insert a batch of records into kdb+."""
import pandas as pd
df = pd.DataFrame(batch)
# Type coercions
if 'timestamp' in df.columns:
df['timestamp'] = pd.to_datetime(df['timestamp'])
if 'sym' in df.columns:
df['sym'] = df['sym'].astype(str)
q(f'`{table} insert', df)
logger.info(f"Flushed {len(batch)} records to {table}")
if __name__ == '__main__':
run_bridge(
kafka_brokers='localhost:9092',
kafka_topic='market-trades',
kdb_host='localhost',
kdb_port=5001,
kdb_table='trade'
)
WebSockets: Streaming Q Data to Browsers
The use case: a dashboard that shows live market data, updated as trades arrive in kdb+. WebSockets are the right transport for this — persistent connection, low overhead, bidirectional.
Q’s Built-In WebSocket Server
kdb+ 3.5+ has native WebSocket support via .z.wo, .z.wc, .z.ws:
\p 5001 / start on port 5001 (same port handles HTTP, IPC, and WebSockets)
/ WebSocket connection tracking
ws_clients: `long$() / list of connected WebSocket handles
.z.wo: {[h]
/ h = new WebSocket handle
ws_clients,: h;
-1 "WS connected: ", string h;
}
.z.wc: {[h]
/ h = closing WebSocket handle
ws_clients: ws_clients except h;
-1 "WS disconnected: ", string h;
}
.z.ws: {[x]
/ x = incoming WebSocket message (string or bytes)
/ Parse request and send back data
request: .j.k x;
response: handleWsRequest request;
neg[.z.w] .j.j response
}
handleWsRequest: {[req]
action: req`action;
$[action ~ "subscribe";
handleSubscribe req;
action ~ "query";
handleQuery req;
`error`message!("unknown_action"; action)]
}
handleSubscribe: {[req]
syms: `$req`syms;
/ Store subscription: handle -> syms
`subscriptions upsert (.z.w; syms);
`status`message!("subscribed"; "Subscribed to ", " " sv string syms)
}
handleQuery: {[req]
q_code: req`query;
result: @[value; q_code; {[e] `error`message!(1b; string e)}];
`status`data!("ok"; result)
}
/ Broadcast trade updates to subscribed WebSocket clients
subscriptions: ([handle:`long$()] syms:())
broadcastTrade: {[trade_data]
/ For each connected WS client with matching subscription
{[h; trade_data]
client_syms: subscriptions[h; `syms];
matching: trade_data where trade_data[`sym] in client_syms;
if[count matching;
neg[h] .j.j `type`data!("trade_update"; matching)]
}[; trade_data] each ws_clients;
}
/ Hook into tickerplant update function
.u.upd: {[t; data]
$[t ~ `trade;
broadcastTrade data;
()];
}
JavaScript Client
// dashboard.js
const ws = new WebSocket('ws://localhost:5001');
ws.onopen = () => {
console.log('Connected to kdb+');
// Subscribe to specific symbols
ws.send(JSON.stringify({
action: 'subscribe',
syms: ['AAPL', 'MSFT', 'GOOG']
}));
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === 'trade_update') {
updateDashboard(msg.data);
}
};
ws.onerror = (error) => console.error('WebSocket error:', error);
ws.onclose = () => {
console.log('Disconnected');
setTimeout(() => reconnect(), 5000); // auto-reconnect
};
function updateDashboard(trades) {
trades.forEach(trade => {
const el = document.getElementById(`price-${trade.sym}`);
if (el) {
el.textContent = trade.price.toFixed(2);
el.classList.add('flash');
setTimeout(() => el.classList.remove('flash'), 300);
}
});
}
// Request historical data via query
function fetchOHLC(sym, date) {
ws.send(JSON.stringify({
action: 'query',
query: `select from ohlc where sym=\`${sym}, date=${date}`
}));
}
FastAPI WebSocket Proxy
For production, a FastAPI proxy handles authentication, connection management, and adds observability:
# websocket_proxy.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.websockets import WebSocketState
import pykx as kx
import asyncio
import json
import logging
logger = logging.getLogger(__name__)
app = FastAPI()
# kdb+ connection for WebSocket data
q = kx.SyncQConnection(host='localhost', port=5001)
class ConnectionManager:
def __init__(self):
self.active: dict[str, list[WebSocket]] = {} # sym -> [websocket]
async def connect(self, websocket: WebSocket, sym: str):
await websocket.accept()
if sym not in self.active:
self.active[sym] = []
self.active[sym].append(websocket)
logger.info(f"WS connected for {sym}, total: {sum(len(v) for v in self.active.values())}")
def disconnect(self, websocket: WebSocket, sym: str):
if sym in self.active:
self.active[sym] = [ws for ws in self.active[sym] if ws != websocket]
async def broadcast(self, sym: str, data: dict):
if sym not in self.active:
return
dead = []
for ws in self.active[sym]:
try:
if ws.client_state == WebSocketState.CONNECTED:
await ws.send_json(data)
except Exception:
dead.append(ws)
for ws in dead:
self.active[sym].remove(ws)
manager = ConnectionManager()
@app.websocket("/stream/{sym}")
async def stream_symbol(websocket: WebSocket, sym: str):
"""Stream real-time trades for a symbol."""
await manager.connect(websocket, sym.upper())
sym = sym.upper()
try:
# Send initial snapshot
snapshot = q(
"{[s] select[-50] time, price, size from trade where date=.z.d, sym=s}",
kx.SymbolAtom(sym)
).pd()
await websocket.send_json({
"type": "snapshot",
"sym": sym,
"data": snapshot.to_dict(orient="records")
})
# Keep connection alive, receive control messages
while True:
try:
msg = await asyncio.wait_for(websocket.receive_json(), timeout=30)
if msg.get("action") == "ping":
await websocket.send_json({"type": "pong"})
except asyncio.TimeoutError:
# Send keepalive
await websocket.send_json({"type": "heartbeat"})
except WebSocketDisconnect:
manager.disconnect(websocket, sym)
logger.info(f"WS disconnected: {sym}")
# Background task: poll kdb+ and broadcast updates
async def poll_and_broadcast():
"""Poll kdb+ for new trades and broadcast to subscribers."""
last_time = {} # sym -> last seen time
while True:
await asyncio.sleep(0.5) # 500ms polling interval
for sym in list(manager.active.keys()):
if not manager.active.get(sym):
continue
try:
cutoff = last_time.get(sym, "00:00:00.000")
new_trades = q(
"{[s;t] select time, price, size from trade where date=.z.d, sym=s, time>t}",
kx.SymbolAtom(sym),
kx.TimeAtom(cutoff)
).pd()
if len(new_trades) > 0:
last_time[sym] = str(new_trades['time'].max())
await manager.broadcast(sym, {
"type": "update",
"sym": sym,
"trades": new_trades.to_dict(orient="records")
})
except Exception as e:
logger.error(f"Poll error for {sym}: {e}")
@app.on_event("startup")
async def startup():
asyncio.create_task(poll_and_broadcast())
Handling Back-Pressure
When Q is producing data faster than consumers can handle it, you need a back-pressure strategy:
/ Simple back-pressure in Q: check send buffer before sending
broadcastWithBackPressure: {[h; data]
/ .z.W[h] = bytes in send buffer for handle h
buffer_size: .z.W[h];
$[buffer_size > 1000000; / 1MB threshold
/ Buffer full: drop or queue
(-1 "Dropping update for slow consumer: ", string h);
/ Buffer OK: send
neg[h] .j.j data]
}
For Kafka as a buffer (the right architectural answer for high-throughput):
kdb+ tickerplant
→ publishes to Kafka topic (Q→Kafka producer)
→ Kafka topic acts as buffer
→ slow consumers read from Kafka at their own pace
This decouples the producer (kdb+) from the consumer’s throughput.
The Complete Real-Time Pipeline
Putting it together: Kafka → Q → WebSocket → Browser:
[Market Data Feed]
↓
[Kafka Topic: raw-trades]
↓
[kdb+ Feed Handler] → [Tickerplant] → [RDB]
↓
[WebSocket Server]
↓
[Browser Dashboard]
↓
[REST API]
↓
[External Consumers]
Each layer has a clear responsibility:
- Kafka: durable message queue, decouples feed from processing
- kdb+ tickerplant: time-stamping, logging, fan-out
- RDB: in-memory storage for today’s data
- WebSocket server: real-time push to browsers
- REST API: request-response for historical data
Q sits in the middle of this, doing what it does best: storing and querying time-series data at speed.
Developer Tooling: Linters, Formatters, and Making Q Less Lonely
Q development has historically been a solitary activity. You write code in a text editor, paste it into a REPL, and find out what’s wrong when it errors. There’s no static type checker, the formatter options are limited, and “test suite” in many Q shops means “I ran it and it seemed fine.”
This has improved. The improvement is modest compared to what Python, Rust, or JavaScript developers take for granted, but it’s real and worth using.
Linting: qls and the KX Language Server
The KX Language Server (qls) is the most capable static analysis tool available for Q. It powers the VS Code extension’s diagnostics and can be run standalone for CI integration.
What it checks:
- Undefined variables (within analyzable scope)
- Type errors it can infer statically
- Unused variable assignments
- Shadowed names
- Wrong number of arguments to known functions
- Some style issues
Using qls in VS Code
The KX VS Code extension installs and runs qls automatically. Underlined code in the editor with a red/yellow squiggle is qls talking to you. Take it seriously.
Common diagnostics:
/ qls will flag this:
myFunc: {[x; y]
result: x + z / 'z' is not defined in this scope
result * y
}
/ And this:
unused: 42 / assigned but never used in the function
/ And this (wrong arg count):
count[1; 2] / count takes 1 argument
/ This is fine:
f: {[x] x * 2}
g: {[x] f x}
qls in CI
For automated code quality checks:
# Install (part of the KX VS Code extension bundle, also available standalone)
npm install -g @kxsystems/kdb-lsp
# Run on Q files
qls --check src/**/*.q
# With specific rules
qls --check --rules=all src/
# JSON output for programmatic processing
qls --check --format=json src/*.q | jq '.diagnostics[] | select(.severity == "error")'
GitHub Actions integration:
# .github/workflows/lint.yml
name: Lint Q Code
on: [push, pull_request]
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install kdb-lsp
run: npm install -g @kxsystems/kdb-lsp
- name: Lint Q files
run: |
qls --check --format=json q/**/*.q > lint_results.json || true
cat lint_results.json
# Fail if any errors
errors=$(cat lint_results.json | jq '[.diagnostics[] | select(.severity == "error")] | length')
if [ "$errors" -gt "0" ]; then
echo "Found $errors lint errors"
exit 1
fi
Formatting: The State of Q Formatting
Q has no official formatter equivalent to gofmt, rustfmt, or black. This is a gap in the tooling ecosystem.
What exists:
qformat(community): A Python-based formatter for Q code. Not as opinionated asblack, but handles basic whitespace and indentation.- Manual conventions: Most Q shops define their own style guide and enforce it in code review.
Install and use qformat:
pip install qformat
# Format in place
qformat --in-place mycode.q
# Check without modifying (for CI)
qformat --check mycode.q && echo "Formatted" || echo "Needs formatting"
Q Style Conventions (The Practical Guide)
Since there’s no gofmt that enforces these for you, here are conventions worth adopting:
/ ─── Naming ──────────────────────────────────────────────────────────────────
/ Functions: camelCase
computeVwap: {[trades] ...}
/ Tables: lowercase
trade: ([] time:`timestamp$(); sym:`symbol$(); price:`float$())
/ Namespaces: lowercase with dot
.myapp.config: `host`port!("localhost"; 5001)
.myapp.init: {[] ...}
/ Constants: UPPERCASE (informal convention)
MAX_SIZE: 1000000
/ ─── Function Style ──────────────────────────────────────────────────────────
/ Named arguments for multi-arg functions (don't use x y z for non-trivial functions)
computeOHLC: {[trades; sym; date; barMins]
select
open: first price,
high: max price,
low: min price,
close: last price
from trades
where date=date, sym=sym
by time: barMins xbar time.minute
}
/ Short lambda: x y z are fine for 1-3 arg lambdas with obvious semantics
doubled: {x * 2}
sumPair: {x + y}
/ ─── Spacing ─────────────────────────────────────────────────────────────────
/ Space around operators in expressions
a: b + c * d / good
a:b+c*d / bad (hard to read at speed)
/ No space before colon in assignment (the q way)
result: 42 / good
result : 42 / bad
/ Space after semicolon in multi-statement function
f: {[x] a: x+1; b: a*2; b-1} / acceptable for short functions
/ Multi-line for complex functions
f: {[x]
a: x + 1;
b: a * 2;
b - 1
}
/ ─── Documentation ──────────────────────────────────────────────────────────
/ Use line comments for non-obvious logic
/ Use block comments for function documentation
/ Block comment style for function docs:
/ computeVwap: Compute volume-weighted average price
/ trades: trade table with columns time, sym, price, size
/ sym: symbol to compute VWAP for
/ date: date to compute VWAP for
/ returns: float (VWAP) or null if no trades
computeVwap: {[trades; sym; date]
data: select from trades where date=date, sym=sym;
$[count data;
data[`size] wavg data[`price];
0n]
}
Testing: q-unit and Homemade Test Frameworks
Q doesn’t have a built-in test framework. The options are:
q-unit: A minimal test framework for Q. Similar in spirit to xUnit.
qspec: A BDD-style testing framework. Less used but cleaner for descriptive tests.
Roll your own: Many Q shops use a simple convention file that tests functions inline.
q-unit
# Download q-unit
# https://github.com/CharlesSkelton/qunit
# Place qunit.q in your q path
/ test_trade.q
\l qunit.q
/ Test: VWAP calculation
test_vwap_basic: {
trades: ([] price: 100.0 101.0 99.0; size: 100 200 150);
expected: (100.0*100 + 101.0*200 + 99.0*150) % (100+200+150);
result: computeVwap trades;
.qunit.assertEqual[result; expected; "basic VWAP"]
}
test_vwap_empty: {
trades: ([] price:`float$(); size:`long$());
result: computeVwap trades;
.qunit.assertNull[result; "empty input returns null"]
}
test_vwap_single: {
trades: ([] price: enlist 100.0; size: enlist 100);
result: computeVwap trades;
.qunit.assertEqual[result; 100.0; "single trade VWAP"]
}
/ Run tests
.qunit.run[]
Simple Homemade Framework
If adding a dependency feels heavy, this pattern works fine for smaller codebases:
/ test.q — minimal test runner
.test.passed: 0;
.test.failed: 0;
/ assert: check a condition
.test.assert: {[name; condition]
$[condition;
[.test.passed +: 1;
-1 "[PASS] ", name];
[.test.failed +: 1;
-1 "[FAIL] ", name]]
}
/ assertEqual: check equality
.test.assertEqual: {[name; actual; expected]
.test.assert[name; actual ~ expected]
}
/ assertError: check that an expression throws
.test.assertError: {[name; expr]
result: @[value; expr; {[e] 1b}];
.test.assert[name; result ~ 1b]
}
/ run: execute test function, report results
.test.run: {[testFns]
testFns@\: ();
-1 "\nResults: ", (string .test.passed), " passed, ",
(string .test.failed), " failed";
if[.test.failed > 0; exit 1] / non-zero exit for CI
}
/ ─── Example tests ──────────────────────────────────────────────────────────
testComputeVwap: {[]
.test.assertEqual[
"basic VWAP";
computeVwap ([] price:100.0 101.0; size:100 200);
(100.0*100 + 101.0*200) % 300
];
.test.assertError[
"null sym throws";
{computeVwap ([] price:1.0; size:1); (0N; .z.d)}
]
}
testScalePrice: {[]
.test.assertEqual["scale by 2"; scalePrice[2.0; 100.0]; 200.0];
.test.assertEqual["scale by 0"; scalePrice[0.0; 100.0]; 0.0];
.test.assertEqual["negative scale"; scalePrice[-1.0; 100.0]; -100.0]
}
.test.run `testComputeVwap`testScalePrice
Run from the command line:
q test.q -q / -q for quiet mode (suppresses startup output)
echo "Exit code: $?" / 0 = all tests pass, 1 = failures
CI Integration for Tests
# .github/workflows/test.yml
name: Test Q Code
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Download kdb+
run: |
# The free on-demand license works for CI
# Set QLIC to point to your license file
wget -q https://files.kx.com/linuxx86.zip
unzip -q linuxx86.zip -d ~/q
echo "$HOME/q/l64" >> $GITHUB_PATH
- name: Run Q tests
env:
QLIC: ${{ secrets.KX_LICENSE_ENCODED }}
run: |
echo "$QLIC" | base64 -d > ~/q/kc.lic
q tests/test.q -q
Version Control Practices
Q code in version control isn’t different from any other code, but a few Q-specific practices help:
.gitignore for Q Projects
# kdb+ generated files
*.log
*.idx
*.d
# Splayed table files (the data, not the schema)
sym
*.dat
# Q session state
.q
.z_history
# Build artifacts
*.so
*.dylib
# License files (never commit these)
*.lic
kc.lic
Separating Schema from Data
Keep your table schemas in .q files under version control; keep the actual data files out:
project/
├── q/
│ ├── schema.q # table definitions
│ ├── functions.q # business logic
│ └── init.q # startup script
├── tests/
│ └── test.q
└── data/ # not in git
├── hdb/
└── rdb/
/ schema.q — version controlled
trade: ([]
time: `timestamp$();
sym: `symbol$();
price: `float$();
size: `long$()
)
quote: ([]
time: `timestamp$();
sym: `symbol$();
bid: `float$();
ask: `float$();
bsize: `long$();
asize: `long$()
)
Dependency Management
Q has no package manager. The ecosystem is small enough that this isn’t as painful as it sounds, but it’s something to manage intentionally.
Common patterns:
/ Explicit version tracking in a manifest
/ versions.q
\d .versions
kdbplus: "4.1.0"
kafkaq: "1.5.2"
websockets: "0.3.1"
\d .
/ Load dependencies with version checks
loadDep: {[name; minVersion]
// Load the library
@[system; "l ", string name; {[e] '"Failed to load ", string name, ": ", e}];
// Check version if accessible
$[minVersion <= .versions[name];
-1 "Loaded ", string name, " v", string .versions[name];
'"Version ", string name, " ", string .versions[name],
" < required ", string minVersion]
}
No lock files, no requirements.txt, no Cargo.toml. Manage versions explicitly and document them. It’s not ideal. It’s the reality.
Profiling Q Code
For performance work, Q has a built-in profiler:
/ Time a single expression
\t select from trade where date=.z.d, sym=`AAPL
/ Output: 12 (milliseconds)
/ Time over N iterations
\t:1000 count trade
/ Output: 3 (total ms for 1000 runs)
/ More detailed profiling with .Q.ts
.Q.ts select from trade where date=.z.d
/ Output: (time ms; space bytes)
/ Memory usage
.Q.w[]
/ `used`heap`peak`wmax`mmap`mphy`syms`symw
/ See current memory allocation
For deeper profiling — finding hotspots in complex Q code — the combination of \t on individual functions with binary search is often the fastest approach:
/ Profile each step of a pipeline
stages: (
"load"; {select from trade where date=.z.d};
"filter"; {select from trade where date=.z.d, sym=`AAPL};
"aggregate"; {select vwap:size wavg price by sym from trade where date=.z.d};
"join"; {aj[`sym`time; ...; ...]}
);
/ Time each stage
{-1 (x 0), ": ", string \t (x 1)[]} each stages
There’s no flame graph generator for Q. Yet.
Where to Go From Here
You now have a map of the integration landscape. Let’s be direct about what you actually need.
The Honest Summary
If you’re building a data pipeline that feeds kdb+ from Kafka: Use the Python Kafka bridge pattern from the Streaming chapter. It’s the lowest-friction path, and Python’s Kafka client ecosystem is mature. If throughput demands it, graduate to the native KafkaQ library.
If your Python team needs kdb+ data: Give them the FastAPI gateway from the REST chapter. They get a standard REST API with OpenAPI docs, you retain control over what they can query. pykx if they need direct access; the REST API if they just need data.
If you’re doing R-based research on kdb+ data: rkdb plus the “aggregate in Q, analyze in R” pattern. The failure mode is pulling raw tick data into R. Don’t do that.
If you’re building high-performance infrastructure around kdb+: Rust via kdbplus. IPC for most use cases; FFI only when you need to write kdb+ extensions.
If your IDE situation is still “text editor and a REPL”: VS Code with the KX extension. Configure it with a connection to your dev kdb+ process. The interactive execution and diagnostics are worth the fifteen minutes of setup.
What We Didn’t Cover
A few areas deliberately left out:
C integration: The kdb+ C API is comprehensive and well-documented by KX. If you’re writing a feed handler in C, the KX documentation is more authoritative than anything here.
Java integration: jdbc for kdb+ exists and works. If your organization is a Java shop and you need Q integration, the JDBC driver is maintained by KX and integrates cleanly with standard Java database frameworks.
KX Insights and KDB.AI: KX’s commercial products have their own integration story. This book covers the open-source and community tooling; the commercial products have their own documentation.
kdb+ on cloud infrastructure: Deploying kdb+ on AWS/GCP/Azure has become more tractable with containers and KX’s cloud products. The integration patterns here work the same in cloud environments; the operational considerations are different.
Time-series ML with kdb+: The combination of kdb+ tick data and Python ML libraries (scikit-learn, PyTorch, etc.) is a whole book. The PyQ and pykx chapters give you the foundation.
The Patterns That Matter
Stepping back, three architectural patterns appear throughout this book:
Pattern 1: Q as the source of truth, everything else as a consumer
Tick → kdb+ → [Python analytics] [R research] [REST API] [WebSocket dashboard]
Q does the heavy lifting: ingestion, storage, time-series queries. External tools read from it. The integration layer (FastAPI, qPython, rkdb) is thin and stateless.
Pattern 2: Q as one node in a broader pipeline
Kafka → kdb+ → Kafka → [downstream services]
Q participates in a message-passing architecture. It consumes from Kafka, enriches or aggregates, and publishes back. Good for organizations that have already standardized on event streaming.
Pattern 3: Q embedded in a polyglot service
Rust service → kdb+ (IPC) → Rust service
→ kdb+ (FFI) ←
A Rust (or Go, or C++) service uses kdb+ as a compute or storage engine, calling it via IPC or embedding it via FFI. The kdb+ component is not independently managed; it’s part of the service.
Most real systems are a hybrid. Know which pattern you’re using for each integration point.
The Community
Q’s community is small but substantive:
- KX Community (community.kx.com): Official forum. KX engineers answer questions. Good signal-to-noise ratio.
- kdb+/q Insights (kdb.ai): KX’s developer portal, tutorials, and documentation.
- GitHub: Search
kdborkdbplus— there’s more community tooling than the sparse ecosystem reputation suggests. - Stack Overflow:
[kdb]and[q-lang]tags. Smaller than Python/JS but answers exist for most common questions.
A Note on KX Licensing
The free 64-bit on-demand license from KX covers everything in this book. The license requires internet connectivity for validation. For air-gapped or cloud deployments, you need a commercial license.
The business reality: if kdb+ is generating value in your organization, you likely already have a commercial relationship with KX. If you’re evaluating or building with the free license, be aware of the validation requirement before you plan a production deployment.
One More Thing
Q developers who complain that the language is too opaque, the tooling is immature, and the community is too small are usually right on all three counts. They are often also the people who have been quietly building the most performant time-series systems in their organizations for a decade.
The gap between Q’s ergonomics and Q’s performance is real and it’s shrinking. The IDE support is better than it was. The integration libraries are better than they were. The free license tier exists now. None of that was true five years ago.
What makes Q worth the friction is what has always made it worth the friction: when you need to query a billion-row time-series table and get a result in milliseconds, there are very few alternatives that aren’t either considerably more expensive, considerably more complex to operate, or both. That’s a narrow but deep niche, and it’s not going away.
The rest of the stack — Python, Rust, R, REST, Kafka, WebSockets — is better at what it’s good at. The goal of this book was to make Q a first-class participant in that stack rather than a silo. Whether you got there depends on what you built.
Go build something.