Skip to content

feat(ilp): binary wire protocol#7

Open
mtopolnik wants to merge 255 commits intomainfrom
jh_experiment_new_ilp
Open

feat(ilp): binary wire protocol#7
mtopolnik wants to merge 255 commits intomainfrom
jh_experiment_new_ilp

Conversation

@mtopolnik
Copy link
Copy Markdown
Contributor

@mtopolnik mtopolnik commented Feb 25, 2026

TODO in run_tests_pipeline.yaml! Change before merging!

# TODO: remove branch once jh_experiment_new_ilp is merged
- script: git clone --depth 1 -b jh_experiment_new_ilp https://github.com/questdb/questdb.git ./questdb

Change to

- script: git clone --depth 1 https://github.com/questdb/questdb.git ./questdb

Summary

This PR adds a new WebSocket-based ingestion path to the Java client using QWP (QuestDB Wire Protocol), a binary protocol that replaces text-based ILP for higher-throughput data ingestion. The existing HTTP and TCP ILP senders remain unchanged.

Users select the new transport via Sender.builder(Transport.WEBSOCKET). The builder accepts WebSocket-specific options such as asyncMode, autoFlushBytes, autoFlushIntervalMillis, and inFlightWindowSize.

Architecture

The implementation follows a layered design:

Protocol layer (cutlass/qwp/protocol/)

  • QwpTableBuffer stores rows in columnar format using off-heap memory (zero-GC on the data path).
  • QwpSchemaHash computes XXHash64 over column names and types, enabling server-side schema caching. The client sends a full schema on the first batch and a hash reference on subsequent batches if the schema has not changed.
  • QwpGorillaEncoder applies delta-of-delta compression to timestamp columns.
  • QwpBitWriter handles bit-level packing for booleans and null bitmaps.
  • QwpConstants defines the wire format: "QWP1" magic bytes, type codes, feature flags, status codes.

Client layer (cutlass/qwp/client/)

  • QwpWebSocketSender implements the Sender interface. It uses a double-buffering scheme: the user thread writes rows into an active MicrobatchBuffer, which is sealed and handed to an I/O thread when an auto-flush trigger fires (row count, byte size, or time interval).
  • QwpWebSocketEncoder serializes QwpTableBuffer contents into binary QWP frames, including delta symbol dictionaries (only new symbols since the last acknowledged batch).
  • InFlightWindow implements a lock-free sliding window protocol that tracks batches awaiting server ACKs, providing backpressure from the server to the user thread.
  • WebSocketSendQueue runs the dedicated I/O thread, managing frame transmission and ACK/NACK response parsing.
  • GlobalSymbolDictionary assigns sequential integer IDs to symbol strings and supports delta encoding across batches.

WebSocket transport (cutlass/http/client/, cutlass/qwp/websocket/)

  • WebSocketClient is a zero-GC WebSocket implementation with platform-specific subclasses for Linux (epoll), macOS (kqueue), and Windows (select).
  • WebSocketFrameParser and WebSocketFrameWriter handle RFC 6455 frame serialization, including fragmentation, close-frame echo, and ping/pong.
  • WebSocketSendBuffer builds masked WebSocket frames directly in native memory.

Bug fixes and robustness improvements

The PR fixes a number of issues found during development and testing:

  • Fix native memory leaks in WebSocketClient constructor and on allocation failure.
  • Fix sendQueue leak on close when flush fails.
  • Fix integer overflows in buffer growth (WebSocketClient, WebSocketSendBuffer, QwpTableBuffer), array dimension products, and putBlockOfBytes().
  • Fix lone surrogate hash mismatch between schema hashing and wire encoding.
  • Fix receiveFrame() throwing instead of returning false, which masked I/O errors as timeouts.
  • Fix pong/close frames clobbering an in-progress send buffer.
  • Fix delta dictionary corruption on send failure by rolling back symbol IDs.
  • Fix stale array offsets after cancelRow() truncation.
  • Fix case-insensitive header validation in WebSocket handshake.
  • Cap receive buffer growth to prevent OOM.
  • Use SecureRnd (ChaCha20-based CSPRNG) for WebSocket masking keys instead of java.util.Random.
  • Validate table names, column names, WebSocket payload lengths, and UTF-8 low surrogates.

Code cleanup

The PR removes ~11,000 lines of dead code:

  • Delete unused utility classes: ConcurrentHashMap (3,791 lines), ConcurrentIntHashMap (3,612 lines), GenericLexer, Base64Helper, LongObjHashMap, FilesFacade, and others.
  • Remove unused methods from Numbers, Chars, Utf8s, Rnd, and ColumnType.
  • Delete obsolete classes: ParanoiaState, GeoHashes, BorrowedArray, HttpCookie.
  • Modernize code style: enhanced switch expressions, pattern variables in instanceof checks.
  • Upgrade minimum Java version from 11 to 17.

CI changes

  • Add a ClientIntegrationTests CI stage that starts a QuestDB server and runs the client's integration tests against it (both default and authenticated configurations).
  • Cache Maven dependencies in CI to speed up builds.
  • Fix sed portability for macOS CI runners.
  • Enable the HTTP server in CI test configurations (required for WebSocket).

Test plan

  • Unit tests cover all protocol building blocks: bit writer, Gorilla encoder, schema hash, column definitions, constants, table buffer, native buffer writer, off-heap memory
  • Unit tests cover WebSocket frame parsing/writing, send buffer, send queue, in-flight window, microbatch buffer, delta/global symbol dictionaries
  • QwpSenderTest (8,346 lines) exercises the full Sender API surface for all column types, null handling, cancelRow, schema changes, and error paths
  • QwpWebSocketSenderTest tests WebSocket-specific sender behavior including async mode
  • QwpWebSocketEncoderTest validates binary encoding for all column types and encoding modes
  • LineSenderBuilderWebSocketTest covers builder validation and configuration for the WebSocket transport
  • Integration tests run the client against a real QuestDB server in CI (default and authenticated)
  • assertMemoryLeak wrappers added to client tests to detect native memory leaks

bluestreak01 and others added 30 commits February 14, 2026 23:02
sendPongFrame() used the shared sendBuffer, calling reset()
which destroyed any partially-built frame the caller had in
progress via getSendBuffer(). This could happen when a PING
arrived during receiveFrame()/tryReceiveFrame() while the
caller was mid-way through constructing a data frame.

Add a dedicated 256-byte controlFrameBuffer for sending pong
responses. RFC 6455 limits control frame payloads to 125 bytes
plus a 14-byte max header, so 256 bytes is sufficient and never
needs to grow.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
sendCloseFrame() used reason.length() (UTF-16 code units) to
calculate the payload size, but wrote reason.getBytes(UTF_8)
(UTF-8 bytes) into the buffer. For non-ASCII close reasons,
UTF-8 encoding can be longer than the UTF-16 length, causing
writes past the declared payload size. This corrupted the
frame header length, the masking range, and could overrun the
allocated buffer.

Compute the UTF-8 byte array upfront and use its length for
all sizing calculations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When receiving a CLOSE frame from the server, the client now echoes
a close frame back before marking the connection as no longer
upgraded. This is required by RFC 6455 Section 5.5.1.

The close code parsing was moved out of the handler-null check so
the code is always available for the echo. The echo uses the
dedicated controlFrameBuffer to avoid clobbering any in-progress
frame in the main send buffer.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Handle CONTINUATION frames (opcode 0x0) in tryParseFrame()
which were previously silently dropped. Fragment payloads are
accumulated in a lazily-allocated native memory buffer and
delivered as a complete message to the handler when the final
FIN=1 frame arrives.

The FIN bit is now checked on TEXT/BINARY frames: FIN=0 starts
fragment accumulation, FIN=1 delivers immediately. Protocol
errors are raised for continuation without an initial fragment
and for overlapping fragmented messages.

The fragment buffer is freed in close() and the fragmentation
state is reset on disconnect().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a configurable maximum size for the WebSocket receive buffer,
mirroring the pattern already used by WebSocketSendBuffer. Previously,
growRecvBuffer() doubled the buffer without any upper bound, allowing
a malicious server to trigger out-of-memory by sending arbitrarily
large frames.

Add getMaximumResponseBufferSize() to HttpClientConfiguration
(defaulting to Integer.MAX_VALUE for backwards compatibility) and
enforce the limit in both growRecvBuffer() and
appendToFragmentBuffer(), which had the same unbounded growth issue
for fragmented messages.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Tests that expect connection failure were hardcoding ports
(9000, 19999) which could collide with running services. When
a QuestDB server is running on port 9000, the WebSocket
connection succeeds and the test fails with "Expected
LineSenderException".

Replace hardcoded ports with dynamically allocated ephemeral
ports via ServerSocket(0). The port is bound and immediately
closed, guaranteeing nothing is listening when the test tries
to connect.

Affected tests:
- testBuilderWithWebSocketTransportCreatesCorrectSenderType
- testConnectionRefused
- testWsConfigString
- testWsConfigString_missingAddr_fails
- testWsConfigString_protocolAlreadyConfigured_fails

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The Sec-WebSocket-Accept header validation used case-sensitive
String.contains(), which violates RFC 7230 (HTTP headers are
case-insensitive). A server sending the header in a different
casing (e.g., sec-websocket-accept) would cause the handshake
to fail.

Replace with a containsHeaderValue() helper that uses
String.regionMatches(ignoreCase=true) for the header name
lookup, avoiding both the case-sensitivity bug and unnecessary
string allocation from toLowerCase().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace byte-by-byte native-heap copies in writeToSocket and
readFromSocket with Unsafe.copyMemory(), using the 5-argument
form that bridges native memory and Java byte arrays via
Unsafe.BYTE_OFFSET.

Add WebSocketChannelTest with a local echo server that verifies
data integrity through the copy paths across various payload
sizes and patterns.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move maxSentSymbolId and sentSchemaHashes updates to after the
send/enqueue succeeds in both async and sync flush paths. Previously
these were updated before the send, so if sealAndSwapBuffer() threw
(async) or sendBinary()/waitForAck() threw (sync), the next batch's
delta dictionary would omit symbols the server never received,
silently corrupting subsequent data.

Also move sentSchemaHashes.add() inside the messageSize > 0 guard
in the sync path, where it was incorrectly marking schemas as sent
even when no data was produced.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The validate() range check used TYPE_DECIMAL256 (0x15) as the upper
bound, which excluded TYPE_CHAR (0x16). CHAR columns would throw
IllegalArgumentException on validation.

Extend the upper bound to TYPE_CHAR and add tests covering all valid
type codes, nullable CHAR, and invalid type rejection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace raw AssertionError with LineSenderException when a token
parameter is provided in ws:: or wss:: configuration strings. The
else branch in config string parsing was unreachable when the code
only supported HTTP and TCP, but became reachable after WebSocket
support was added. Users now get a clear "token is not supported
for WebSocket protocol" error instead of a cryptic AssertionError.

Add test assertions for both ws:: and wss:: schemas with token.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The static nextBatchId field was a plain long incremented with ++,
which is a non-atomic read-modify-write. Multiple threads creating
or resetting MicrobatchBuffer instances concurrently (e.g., several
Sender instances, or a user thread resetting while another constructs)
could read the same value and produce duplicate batch IDs.

Replace the plain long with an AtomicLong and use getAndIncrement()
in both the constructor and reset() to guarantee uniqueness.

Add MicrobatchBufferTest with two concurrent tests that confirm
batch ID uniqueness under contention from 8 threads.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
QwpBitWriter's write methods (writeBits, writeByte, writeInt,
writeLong, flush) silently dropped data when the buffer was full
instead of signaling an error. The same pattern existed in
QwpGorillaEncoder.encodeTimestamps(), which returned partial
byte counts when capacity was insufficient for the first or
second uncompressed timestamp.

Replace all silent-drop guards with LineSenderException throws
so callers get a clear error instead of silently corrupted data.
Add QwpBitWriterTest covering overflow for each write method
and both Gorilla encoder early-return paths.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add test coverage for NativeBufferWriter, which previously had no
tests. The tests exercise skip(), patchInt(), and ensureCapacity()
mechanics, including the skip-then-patch pattern used by
QwpWebSocketEncoder to reserve and backfill length fields.

skip() and patchInt() were flagged as missing bounds checks, but
investigation showed both are internal-only methods called exclusively
by QwpWebSocketEncoder with structurally guaranteed-safe arguments:
skip()'s single caller already ensures capacity before the call, and
patchInt()'s two callers always pass the hardcoded offset 8 within a
12-byte header. Adding runtime checks would be dead branches on the
hot path, so only test coverage was added.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
truncateTo() rewound off-heap buffers (data, strings, aux) but
forgot to rewind arrayShapeOffset and arrayDataOffset. After
cancelling a row with array data and adding a replacement row,
the new values were written at a gap past where the encoder
reads, causing the encoder to serialize stale cancelled data.

Fix by walking the retained values to recompute both offsets,
matching the same traversal pattern the encoder uses. Add tests
that verify the encoder would read correct data after cancel
for 1D double arrays, 1D long arrays, and 2D double arrays.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The builder computed actualAutoFlushRows, actualAutoFlushBytes, and
actualAutoFlushIntervalNanos but the sync-mode WebSocket path called
QwpWebSocketSender.connect(host, port, tls), which hardcoded all
three to 0. This silently disabled auto-flush for every sync-mode
WebSocket sender, regardless of user configuration.

Add a connect() overload that accepts auto-flush parameters and
update the builder to call it. Also update createForTesting(h, p,
windowSize) to use default auto-flush values instead of zeros, so
it mirrors the production connect() path.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
jerrinot and others added 30 commits March 20, 2026 12:49
QwpWebSocketSender.checkedColumnName() validated every column
name on every row write, even though getOrCreateColumn() finds
the column already exists in most cases. Move the validation
into QwpTableBuffer.getOrCreateColumn() so it runs only when a
new column is created.

Add getOrCreateDesignatedTimestampColumn() for the designated
timestamp, which uses an empty-string sentinel name and must
bypass column name validation. Both QwpWebSocketSender and
QwpUdpSender now use this dedicated method.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
"Nullable" is a confusing concept due to QuestDB null sentinels, which
means a "non-nullable" column could still hold nulls via sentinel
values.
this avoid extra work when just global dictionary is needed
we need to keep the per-column dictionaries due to UDP. for now.
Replace the nullable type-code flag (0x80 high bit) with an
inline null-count byte at the start of each column's data.
QwpColumnWriter.writeNullHeader() now writes a single byte
(0 = no nulls, 1 = has nulls) followed by the bitmap only
when nulls are present.

QwpColumnDef no longer stores or OR's a nullable flag into
the type code. The hasNullBitmap field and the 3-argument
constructor are removed. QwpSchemaHash hashes only the base
type code.

QwpTableBuffer.ColumnBuffer defers null bitmap expansion to
addNull() calls instead of checking capacity on every row.
Non-null addXxx() methods no longer touch the bitmap at all.
The bitmap is tail-expanded to the full row count at
serialization time in writeNullHeader().

Safety fixes: isNull() returns false for indices beyond
bitmap capacity, truncateTo() and clearToEmptyFast() clamp
to the allocated bitmap size.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The nullable-flag-in-type-code mechanism was replaced by inline
null count encoding. Since the protocol was never released, remove
the dead constants and their tests entirely rather than
deprecating them.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add begin/addTable/finish streaming API to QwpWebSocketEncoder
so callers can encode multiple tables into a single QWP v1
message. Refactor encodeWithDeltaDict() to delegate to the new
methods for backward compatibility.

Change flushPendingRows() (async) and flushSync() (sync) in
QwpWebSocketSender to encode all non-empty tables into one
message instead of sending one message per table. The header's
tableCount field now reflects the actual number of tables. The
delta symbol dictionary and payload length are written once per
message. Sent-state updates (maxSentSymbolId, sentSchemaHashes)
are deferred until after the message is successfully enqueued or
ACKed, preserving the existing error-recovery guarantee.

The server already loops over tableCount when processing QWP
messages, so no server-side changes are needed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The client sends X-QWP-Max-Version and X-QWP-Client-Id
headers in the WebSocket upgrade request. After the server
responds with X-QWP-Version, the client reads the selected
version and uses it in the version byte of all subsequent
QWP message headers.

QwpWebSocketEncoder.writeHeader() now uses a configurable
version field instead of the hardcoded VERSION_1 constant.
QwpWebSocketSender.ensureConnected() sets the version on
the encoder after reading it from the upgrade response.

QwpConstants gains MAX_SUPPORTED_VERSION and CLIENT_ID
constants for the negotiation headers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
QWP transmitted DECIMAL64/128/256 values in big-endian while all
other numeric types used little-endian. This was inherited from
Parquet's convention but served no purpose in QWP, which is its
own protocol.

Switch decimal columns to little-endian on the wire, matching
Cairo's in-memory layout. This replaces putLongBE() with
putLong() in QwpColumnWriter and deletes the now-unused
putLongBE() method from the buffer writer interface and its
implementations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The test decoder reads decimal bytes from the wire and passes
them directly to BigInteger, which expects big-endian byte
order. After commit 9af4a61 switched the QWP decimal wire
format to little-endian, the decoder produces wrong values.

Reverse the byte array before constructing BigInteger so the
decoder correctly interprets little-endian decimal data.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The decimal encoder wrote multi-word values with the most
significant word first (hi, lo). This made the wire format
inconsistent: individual words were little-endian but word
order was big-endian.

QwpColumnWriter now emits words least-significant-first,
matching the byte-level little-endian convention used by all
other multi-word QWP types (UUID, LONG256). The test decoder
reversal and QwpConstants docs are updated to match.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
While closing the sender, we send the IO thread the stop signal and
wait for it to complete. If it doesn't complete on time, we can't
just continue with the teardown, freeing the resources it's still
using. We must bail out of teardown.
Replace XXHash64-based schema hashes with client-assigned,
monotonically increasing schema IDs in the QWP wire protocol.

The previous mechanism computed a 64-bit hash over column names
and types, which could theoretically collide. Schema IDs are
dense integers starting at 0, assigned by the client during each
session, eliminating collisions entirely.

Wire format changes:
- SCHEMA_MODE_FULL now includes a varint schemaId before column
  definitions
- SCHEMA_MODE_REFERENCE sends a varint schemaId instead of an
  8-byte hash, reducing overhead to 1-2 bytes

Client side (java-questdb-client):
- QwpTableBuffer stores an int schemaId assigned by the sender
- QwpWebSocketSender tracks nextSchemaId counter and
  maxSentSchemaId instead of a LongHashSet of sent hashes
- QwpColumnWriter writes varint schemaId in both schema modes
- QwpUdpSender size estimate accounts for the new varint
- Delete QwpSchemaHash and its tests

Server side (core):
- QwpSchemaCache uses ObjList indexed by schemaId instead of
  LongObjHashMap keyed by combined hash
- QwpSchema removes schemaHash field, equals(), hashCode(), and
  the dependency on QwpSchemaHash
- QwpTableBlockCursor uses parseResult.schemaId for cache ops
- Delete QwpSchemaHash and its test

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add server-side validation that schema IDs arrive in strict
monotonic sequence and do not exceed a configurable per-connection
limit. The client-side sender exposes the limit through the
builder API and configuration string.

Server side:
- QwpSchemaCache.put() validates monotonic sequence and rejects
  IDs that exceed maxSchemasPerConnection
- Add INVALID_SCHEMA_ID error code to QwpParseException
- Add DEFAULT_MAX_SCHEMAS_PER_CONNECTION (65,535) to QwpConstants
- Wire the limit through LineHttpProcessorConfiguration,
  PropServerConfiguration, and QwpProcessorState
- QwpStreamingDecoder.resetConnectionState() clears schema cache
  on disconnect

Client side:
- Sender.LineSenderBuilder.maxSchemasPerConnection() builder
  method and max_schemas_per_connection config string parameter
- QwpWebSocketSender accepts and forwards the limit
- Replace static import of QwpConstants.* with explicit prefix

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace "hash lookup" with "ID lookup" in the
SCHEMA_MODE_REFERENCE javadoc, and "Schema hash not
recognized" with "Schema ID not recognized" in the
STATUS_SCHEMA_REQUIRED javadoc. The implementation now
uses monotonic integer IDs, not hashes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
WebSocketResponse used ad-hoc status code values (e.g.,
PARSE_ERROR=1, INTERNAL_ERROR=255) that diverged from the
QWP specification and the server's QwpConstants. This caused
the client to decode server error responses as UNKNOWN.

Update all WebSocketResponse status constants to match the
spec: PARSE_ERROR=0x05, SCHEMA_ERROR=0x03, WRITE_ERROR=0x09,
SECURITY_ERROR=0x08, INTERNAL_ERROR=0x06. Add the two new
status codes (SECURITY_ERROR, WRITE_ERROR) to the client's
QwpConstants and QwpConstantsTest.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants