Open
Conversation
…va-questdb-client into jh_experiment_new_ilp
sendPongFrame() used the shared sendBuffer, calling reset() which destroyed any partially-built frame the caller had in progress via getSendBuffer(). This could happen when a PING arrived during receiveFrame()/tryReceiveFrame() while the caller was mid-way through constructing a data frame. Add a dedicated 256-byte controlFrameBuffer for sending pong responses. RFC 6455 limits control frame payloads to 125 bytes plus a 14-byte max header, so 256 bytes is sufficient and never needs to grow. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
sendCloseFrame() used reason.length() (UTF-16 code units) to calculate the payload size, but wrote reason.getBytes(UTF_8) (UTF-8 bytes) into the buffer. For non-ASCII close reasons, UTF-8 encoding can be longer than the UTF-16 length, causing writes past the declared payload size. This corrupted the frame header length, the masking range, and could overrun the allocated buffer. Compute the UTF-8 byte array upfront and use its length for all sizing calculations. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When receiving a CLOSE frame from the server, the client now echoes a close frame back before marking the connection as no longer upgraded. This is required by RFC 6455 Section 5.5.1. The close code parsing was moved out of the handler-null check so the code is always available for the echo. The echo uses the dedicated controlFrameBuffer to avoid clobbering any in-progress frame in the main send buffer. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Handle CONTINUATION frames (opcode 0x0) in tryParseFrame() which were previously silently dropped. Fragment payloads are accumulated in a lazily-allocated native memory buffer and delivered as a complete message to the handler when the final FIN=1 frame arrives. The FIN bit is now checked on TEXT/BINARY frames: FIN=0 starts fragment accumulation, FIN=1 delivers immediately. Protocol errors are raised for continuation without an initial fragment and for overlapping fragmented messages. The fragment buffer is freed in close() and the fragmentation state is reset on disconnect(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a configurable maximum size for the WebSocket receive buffer, mirroring the pattern already used by WebSocketSendBuffer. Previously, growRecvBuffer() doubled the buffer without any upper bound, allowing a malicious server to trigger out-of-memory by sending arbitrarily large frames. Add getMaximumResponseBufferSize() to HttpClientConfiguration (defaulting to Integer.MAX_VALUE for backwards compatibility) and enforce the limit in both growRecvBuffer() and appendToFragmentBuffer(), which had the same unbounded growth issue for fragmented messages. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Tests that expect connection failure were hardcoding ports (9000, 19999) which could collide with running services. When a QuestDB server is running on port 9000, the WebSocket connection succeeds and the test fails with "Expected LineSenderException". Replace hardcoded ports with dynamically allocated ephemeral ports via ServerSocket(0). The port is bound and immediately closed, guaranteeing nothing is listening when the test tries to connect. Affected tests: - testBuilderWithWebSocketTransportCreatesCorrectSenderType - testConnectionRefused - testWsConfigString - testWsConfigString_missingAddr_fails - testWsConfigString_protocolAlreadyConfigured_fails Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The Sec-WebSocket-Accept header validation used case-sensitive String.contains(), which violates RFC 7230 (HTTP headers are case-insensitive). A server sending the header in a different casing (e.g., sec-websocket-accept) would cause the handshake to fail. Replace with a containsHeaderValue() helper that uses String.regionMatches(ignoreCase=true) for the header name lookup, avoiding both the case-sensitivity bug and unnecessary string allocation from toLowerCase(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace byte-by-byte native-heap copies in writeToSocket and readFromSocket with Unsafe.copyMemory(), using the 5-argument form that bridges native memory and Java byte arrays via Unsafe.BYTE_OFFSET. Add WebSocketChannelTest with a local echo server that verifies data integrity through the copy paths across various payload sizes and patterns. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move maxSentSymbolId and sentSchemaHashes updates to after the send/enqueue succeeds in both async and sync flush paths. Previously these were updated before the send, so if sealAndSwapBuffer() threw (async) or sendBinary()/waitForAck() threw (sync), the next batch's delta dictionary would omit symbols the server never received, silently corrupting subsequent data. Also move sentSchemaHashes.add() inside the messageSize > 0 guard in the sync path, where it was incorrectly marking schemas as sent even when no data was produced. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The validate() range check used TYPE_DECIMAL256 (0x15) as the upper bound, which excluded TYPE_CHAR (0x16). CHAR columns would throw IllegalArgumentException on validation. Extend the upper bound to TYPE_CHAR and add tests covering all valid type codes, nullable CHAR, and invalid type rejection. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace raw AssertionError with LineSenderException when a token parameter is provided in ws:: or wss:: configuration strings. The else branch in config string parsing was unreachable when the code only supported HTTP and TCP, but became reachable after WebSocket support was added. Users now get a clear "token is not supported for WebSocket protocol" error instead of a cryptic AssertionError. Add test assertions for both ws:: and wss:: schemas with token. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The static nextBatchId field was a plain long incremented with ++, which is a non-atomic read-modify-write. Multiple threads creating or resetting MicrobatchBuffer instances concurrently (e.g., several Sender instances, or a user thread resetting while another constructs) could read the same value and produce duplicate batch IDs. Replace the plain long with an AtomicLong and use getAndIncrement() in both the constructor and reset() to guarantee uniqueness. Add MicrobatchBufferTest with two concurrent tests that confirm batch ID uniqueness under contention from 8 threads. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
QwpBitWriter's write methods (writeBits, writeByte, writeInt, writeLong, flush) silently dropped data when the buffer was full instead of signaling an error. The same pattern existed in QwpGorillaEncoder.encodeTimestamps(), which returned partial byte counts when capacity was insufficient for the first or second uncompressed timestamp. Replace all silent-drop guards with LineSenderException throws so callers get a clear error instead of silently corrupted data. Add QwpBitWriterTest covering overflow for each write method and both Gorilla encoder early-return paths. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add test coverage for NativeBufferWriter, which previously had no tests. The tests exercise skip(), patchInt(), and ensureCapacity() mechanics, including the skip-then-patch pattern used by QwpWebSocketEncoder to reserve and backfill length fields. skip() and patchInt() were flagged as missing bounds checks, but investigation showed both are internal-only methods called exclusively by QwpWebSocketEncoder with structurally guaranteed-safe arguments: skip()'s single caller already ensures capacity before the call, and patchInt()'s two callers always pass the hardcoded offset 8 within a 12-byte header. Adding runtime checks would be dead branches on the hot path, so only test coverage was added. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
truncateTo() rewound off-heap buffers (data, strings, aux) but forgot to rewind arrayShapeOffset and arrayDataOffset. After cancelling a row with array data and adding a replacement row, the new values were written at a gap past where the encoder reads, causing the encoder to serialize stale cancelled data. Fix by walking the retained values to recompute both offsets, matching the same traversal pattern the encoder uses. Add tests that verify the encoder would read correct data after cancel for 1D double arrays, 1D long arrays, and 2D double arrays. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The builder computed actualAutoFlushRows, actualAutoFlushBytes, and actualAutoFlushIntervalNanos but the sync-mode WebSocket path called QwpWebSocketSender.connect(host, port, tls), which hardcoded all three to 0. This silently disabled auto-flush for every sync-mode WebSocket sender, regardless of user configuration. Add a connect() overload that accepts auto-flush parameters and update the builder to call it. Also update createForTesting(h, p, windowSize) to use default auto-flush values instead of zeros, so it mirrors the production connect() path. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
QwpWebSocketSender.checkedColumnName() validated every column name on every row write, even though getOrCreateColumn() finds the column already exists in most cases. Move the validation into QwpTableBuffer.getOrCreateColumn() so it runs only when a new column is created. Add getOrCreateDesignatedTimestampColumn() for the designated timestamp, which uses an empty-string sentinel name and must bypass column name validation. Both QwpWebSocketSender and QwpUdpSender now use this dedicated method. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
"Nullable" is a confusing concept due to QuestDB null sentinels, which means a "non-nullable" column could still hold nulls via sentinel values.
this avoid extra work when just global dictionary is needed we need to keep the per-column dictionaries due to UDP. for now.
…xperiment_new_ilp
Replace the nullable type-code flag (0x80 high bit) with an inline null-count byte at the start of each column's data. QwpColumnWriter.writeNullHeader() now writes a single byte (0 = no nulls, 1 = has nulls) followed by the bitmap only when nulls are present. QwpColumnDef no longer stores or OR's a nullable flag into the type code. The hasNullBitmap field and the 3-argument constructor are removed. QwpSchemaHash hashes only the base type code. QwpTableBuffer.ColumnBuffer defers null bitmap expansion to addNull() calls instead of checking capacity on every row. Non-null addXxx() methods no longer touch the bitmap at all. The bitmap is tail-expanded to the full row count at serialization time in writeNullHeader(). Safety fixes: isNull() returns false for indices beyond bitmap capacity, truncateTo() and clearToEmptyFast() clamp to the allocated bitmap size. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The nullable-flag-in-type-code mechanism was replaced by inline null count encoding. Since the protocol was never released, remove the dead constants and their tests entirely rather than deprecating them. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…xperiment_new_ilp
Add begin/addTable/finish streaming API to QwpWebSocketEncoder so callers can encode multiple tables into a single QWP v1 message. Refactor encodeWithDeltaDict() to delegate to the new methods for backward compatibility. Change flushPendingRows() (async) and flushSync() (sync) in QwpWebSocketSender to encode all non-empty tables into one message instead of sending one message per table. The header's tableCount field now reflects the actual number of tables. The delta symbol dictionary and payload length are written once per message. Sent-state updates (maxSentSymbolId, sentSchemaHashes) are deferred until after the message is successfully enqueued or ACKed, preserving the existing error-recovery guarantee. The server already loops over tableCount when processing QWP messages, so no server-side changes are needed. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The client sends X-QWP-Max-Version and X-QWP-Client-Id headers in the WebSocket upgrade request. After the server responds with X-QWP-Version, the client reads the selected version and uses it in the version byte of all subsequent QWP message headers. QwpWebSocketEncoder.writeHeader() now uses a configurable version field instead of the hardcoded VERSION_1 constant. QwpWebSocketSender.ensureConnected() sets the version on the encoder after reading it from the upgrade response. QwpConstants gains MAX_SUPPORTED_VERSION and CLIENT_ID constants for the negotiation headers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
QWP transmitted DECIMAL64/128/256 values in big-endian while all other numeric types used little-endian. This was inherited from Parquet's convention but served no purpose in QWP, which is its own protocol. Switch decimal columns to little-endian on the wire, matching Cairo's in-memory layout. This replaces putLongBE() with putLong() in QwpColumnWriter and deletes the now-unused putLongBE() method from the buffer writer interface and its implementations. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The test decoder reads decimal bytes from the wire and passes them directly to BigInteger, which expects big-endian byte order. After commit 9af4a61 switched the QWP decimal wire format to little-endian, the decoder produces wrong values. Reverse the byte array before constructing BigInteger so the decoder correctly interprets little-endian decimal data. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The decimal encoder wrote multi-word values with the most significant word first (hi, lo). This made the wire format inconsistent: individual words were little-endian but word order was big-endian. QwpColumnWriter now emits words least-significant-first, matching the byte-level little-endian convention used by all other multi-word QWP types (UUID, LONG256). The test decoder reversal and QwpConstants docs are updated to match. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
While closing the sender, we send the IO thread the stop signal and wait for it to complete. If it doesn't complete on time, we can't just continue with the teardown, freeing the resources it's still using. We must bail out of teardown.
Replace XXHash64-based schema hashes with client-assigned, monotonically increasing schema IDs in the QWP wire protocol. The previous mechanism computed a 64-bit hash over column names and types, which could theoretically collide. Schema IDs are dense integers starting at 0, assigned by the client during each session, eliminating collisions entirely. Wire format changes: - SCHEMA_MODE_FULL now includes a varint schemaId before column definitions - SCHEMA_MODE_REFERENCE sends a varint schemaId instead of an 8-byte hash, reducing overhead to 1-2 bytes Client side (java-questdb-client): - QwpTableBuffer stores an int schemaId assigned by the sender - QwpWebSocketSender tracks nextSchemaId counter and maxSentSchemaId instead of a LongHashSet of sent hashes - QwpColumnWriter writes varint schemaId in both schema modes - QwpUdpSender size estimate accounts for the new varint - Delete QwpSchemaHash and its tests Server side (core): - QwpSchemaCache uses ObjList indexed by schemaId instead of LongObjHashMap keyed by combined hash - QwpSchema removes schemaHash field, equals(), hashCode(), and the dependency on QwpSchemaHash - QwpTableBlockCursor uses parseResult.schemaId for cache ops - Delete QwpSchemaHash and its test Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add server-side validation that schema IDs arrive in strict monotonic sequence and do not exceed a configurable per-connection limit. The client-side sender exposes the limit through the builder API and configuration string. Server side: - QwpSchemaCache.put() validates monotonic sequence and rejects IDs that exceed maxSchemasPerConnection - Add INVALID_SCHEMA_ID error code to QwpParseException - Add DEFAULT_MAX_SCHEMAS_PER_CONNECTION (65,535) to QwpConstants - Wire the limit through LineHttpProcessorConfiguration, PropServerConfiguration, and QwpProcessorState - QwpStreamingDecoder.resetConnectionState() clears schema cache on disconnect Client side: - Sender.LineSenderBuilder.maxSchemasPerConnection() builder method and max_schemas_per_connection config string parameter - QwpWebSocketSender accepts and forwards the limit - Replace static import of QwpConstants.* with explicit prefix Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace "hash lookup" with "ID lookup" in the SCHEMA_MODE_REFERENCE javadoc, and "Schema hash not recognized" with "Schema ID not recognized" in the STATUS_SCHEMA_REQUIRED javadoc. The implementation now uses monotonic integer IDs, not hashes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
WebSocketResponse used ad-hoc status code values (e.g., PARSE_ERROR=1, INTERNAL_ERROR=255) that diverged from the QWP specification and the server's QwpConstants. This caused the client to decode server error responses as UNKNOWN. Update all WebSocketResponse status constants to match the spec: PARSE_ERROR=0x05, SCHEMA_ERROR=0x03, WRITE_ERROR=0x09, SECURITY_ERROR=0x08, INTERNAL_ERROR=0x06. Add the two new status codes (SECURITY_ERROR, WRITE_ERROR) to the client's QwpConstants and QwpConstantsTest. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
TODO in run_tests_pipeline.yaml! Change before merging!
Change to
Summary
This PR adds a new WebSocket-based ingestion path to the Java client using QWP (QuestDB Wire Protocol), a binary protocol that replaces text-based ILP for higher-throughput data ingestion. The existing HTTP and TCP ILP senders remain unchanged.
Users select the new transport via
Sender.builder(Transport.WEBSOCKET). The builder accepts WebSocket-specific options such asasyncMode,autoFlushBytes,autoFlushIntervalMillis, andinFlightWindowSize.Architecture
The implementation follows a layered design:
Protocol layer (
cutlass/qwp/protocol/)QwpTableBufferstores rows in columnar format using off-heap memory (zero-GC on the data path).QwpSchemaHashcomputes XXHash64 over column names and types, enabling server-side schema caching. The client sends a full schema on the first batch and a hash reference on subsequent batches if the schema has not changed.QwpGorillaEncoderapplies delta-of-delta compression to timestamp columns.QwpBitWriterhandles bit-level packing for booleans and null bitmaps.QwpConstantsdefines the wire format: "QWP1" magic bytes, type codes, feature flags, status codes.Client layer (
cutlass/qwp/client/)QwpWebSocketSenderimplements theSenderinterface. It uses a double-buffering scheme: the user thread writes rows into an activeMicrobatchBuffer, which is sealed and handed to an I/O thread when an auto-flush trigger fires (row count, byte size, or time interval).QwpWebSocketEncoderserializesQwpTableBuffercontents into binary QWP frames, including delta symbol dictionaries (only new symbols since the last acknowledged batch).InFlightWindowimplements a lock-free sliding window protocol that tracks batches awaiting server ACKs, providing backpressure from the server to the user thread.WebSocketSendQueueruns the dedicated I/O thread, managing frame transmission and ACK/NACK response parsing.GlobalSymbolDictionaryassigns sequential integer IDs to symbol strings and supports delta encoding across batches.WebSocket transport (
cutlass/http/client/,cutlass/qwp/websocket/)WebSocketClientis a zero-GC WebSocket implementation with platform-specific subclasses for Linux (epoll), macOS (kqueue), and Windows (select).WebSocketFrameParserandWebSocketFrameWriterhandle RFC 6455 frame serialization, including fragmentation, close-frame echo, and ping/pong.WebSocketSendBufferbuilds masked WebSocket frames directly in native memory.Bug fixes and robustness improvements
The PR fixes a number of issues found during development and testing:
WebSocketClientconstructor and on allocation failure.sendQueueleak on close when flush fails.WebSocketClient,WebSocketSendBuffer,QwpTableBuffer), array dimension products, andputBlockOfBytes().receiveFrame()throwing instead of returning false, which masked I/O errors as timeouts.cancelRow()truncation.SecureRnd(ChaCha20-based CSPRNG) for WebSocket masking keys instead ofjava.util.Random.Code cleanup
The PR removes ~11,000 lines of dead code:
ConcurrentHashMap(3,791 lines),ConcurrentIntHashMap(3,612 lines),GenericLexer,Base64Helper,LongObjHashMap,FilesFacade, and others.Numbers,Chars,Utf8s,Rnd, andColumnType.ParanoiaState,GeoHashes,BorrowedArray,HttpCookie.CI changes
ClientIntegrationTestsCI stage that starts a QuestDB server and runs the client's integration tests against it (both default and authenticated configurations).sedportability for macOS CI runners.Test plan
QwpSenderTest(8,346 lines) exercises the fullSenderAPI surface for all column types, null handling, cancelRow, schema changes, and error pathsQwpWebSocketSenderTesttests WebSocket-specific sender behavior including async modeQwpWebSocketEncoderTestvalidates binary encoding for all column types and encoding modesLineSenderBuilderWebSocketTestcovers builder validation and configuration for the WebSocket transportassertMemoryLeakwrappers added to client tests to detect native memory leaks