diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseColumn.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseColumn.java index 3baff8680..0c368f72b 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseColumn.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseColumn.java @@ -952,6 +952,10 @@ public String getColumnName() { return columnName; } + public String getColumnIndexAndName() { + return (columnIndex + 1) + " (`" + columnName + "`)"; + } + public String getOriginalTypeName() { return originalTypeName; } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java index 8abcdc65b..e5892748d 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java @@ -73,6 +73,8 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm private Map[] convertions; private boolean hasNext = true; private boolean initialState = true; // reader is in initial state, no records have been read yet + private long row = -1; // before first row + private long lastNextCallTs; // for exception to detect slow reader protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,BinaryStreamReader.ByteBufferAllocator byteBufferAllocator, Map> defaultTypeHintMap) { this.input = inputStream; @@ -92,6 +94,7 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer setSchema(schema); } this.dataTypeConverter = DataTypeConverter.INSTANCE; // singleton while no need to customize conversion + this.lastNextCallTs = System.currentTimeMillis(); } protected Object[] currentRecord; @@ -181,6 +184,7 @@ protected boolean readRecord(Object[] record) throws IOException { return false; } + row++; boolean firstColumn = true; for (int i = 0; i < columns.length; i++) { try { @@ -191,12 +195,12 @@ protected boolean readRecord(Object[] record) throws IOException { record[i] = null; } firstColumn = false; - } catch (EOFException e) { - if (firstColumn) { + } catch (IOException e) { + if (e instanceof EOFException && firstColumn) { endReached(); return false; } - throw e; + throw new IOException(recordReadExceptionMsg(columns[i].getColumnIndexAndName()), e); } } return true; @@ -238,35 +242,52 @@ protected void readNextRecord() { } } catch (IOException e) { endReached(); - throw new ClientException("Failed to read next row", e); + throw new ClientException(recordReadExceptionMsg(), e); } } + private long timeSinceLastNext() { + return System.currentTimeMillis() - lastNextCallTs; + } + + private String recordReadExceptionMsg() { + return recordReadExceptionMsg(null); + } + + private String recordReadExceptionMsg(String column) { + return "Reading " + (column != null ? "column " + column + " in " : "") + + " row " + (row + 1) + " (time since last next call " + timeSinceLastNext() + ")"; + } + @Override public Map next() { if (!hasNext) { return null; } - if (!nextRecordEmpty) { - Object[] tmp = currentRecord; - currentRecord = nextRecord; - nextRecord = tmp; - readNextRecord(); - return new RecordWrapper(currentRecord, schema); - } else { - try { - if (readRecord(currentRecord)) { - readNextRecord(); - return new RecordWrapper(currentRecord, schema); - } else { - currentRecord = null; - return null; + try { + if (!nextRecordEmpty) { + Object[] tmp = currentRecord; + currentRecord = nextRecord; + nextRecord = tmp; + readNextRecord(); + return new RecordWrapper(currentRecord, schema); + } else { + try { + if (readRecord(currentRecord)) { + readNextRecord(); + return new RecordWrapper(currentRecord, schema); + } else { + currentRecord = null; + return null; + } + } catch (IOException e) { + endReached(); + throw new ClientException(recordReadExceptionMsg(), e); } - } catch (IOException e) { - endReached(); - throw new ClientException("Failed to read row", e); } + } finally { + lastNextCallTs = System.currentTimeMillis(); } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 7ca25f473..24cb82aa5 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -22,7 +22,6 @@ import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder; -import org.apache.hc.client5.http.entity.mime.MultipartPartBuilder; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager; @@ -86,17 +85,16 @@ import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; +import java.util.Arrays; import java.util.Base64; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Properties; -import java.util.Arrays; -import java.util.HashMap; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -104,6 +102,7 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.regex.Pattern; +import java.util.stream.Stream; public class HttpAPIClientHelper { @@ -341,85 +340,130 @@ public CloseableHttpClient createHttpClient(boolean initSslContext, Map 0 ? readClickHouseError(httpResponse.getEntity(), serverCode, queryId, httpResponse.getCode()) : + readNotClickHouseError(httpResponse.getEntity(), queryId, httpResponse.getCode()); + } catch (Exception e) { + LOG.error("Failed to read error message", e); + String msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " (transport error: " + httpResponse.getCode() + ")"; + return new ServerException(serverCode, msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId); + } + } + + private ServerException readNotClickHouseError(HttpEntity httpEntity, String queryId, int httpCode) { + + byte[] buffer = new byte[ERROR_BODY_BUFFER_SIZE]; + + String msg = null; + InputStream body = null; + int offset = 0; + for (int i = 0; i < 2; i++) { + try { + if (body == null) { + body = httpEntity.getContent(); + } + int msgLen = body.read(buffer, offset, buffer.length - offset); + if (msgLen > 0) { + msg = new String(buffer, 0, msgLen, StandardCharsets.UTF_8).trim(); + if (msg.isEmpty()) { + msg = ""; } } - if (rBytes == -1) { - break; + break; + } catch (ClientException e) { + // Invalid LZ4 Magic + if (body instanceof ClickHouseLZ4InputStream) { + ClickHouseLZ4InputStream stream = (ClickHouseLZ4InputStream) body; + body = stream.getInputStream(); + byte[] lzHeader = stream.getHeaderBuffer(); // Here is read part of original body + offset = Math.min(lzHeader.length, buffer.length); + System.arraycopy(lzHeader, 0, buffer, 0, offset); + continue; + } + throw e; + } catch (Exception e) { + LOG.warn("Failed to read error message (queryId = " + queryId + ")", e); + break; + } + } + + String errormsg = msg == null ? "unknown server error" : msg; + return new ServerException(ServerException.CODE_UNKNOWN, errormsg + " (transport error: " + httpCode +")", httpCode, queryId); + } + + private static ServerException readClickHouseError(HttpEntity httpEntity, int serverCode, String queryId, int httpCode) throws Exception { + InputStream body = httpEntity.getContent(); + byte[] buffer = new byte[ERROR_BODY_BUFFER_SIZE]; + byte[] lookUpStr = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode).getBytes(StandardCharsets.UTF_8); + StringBuilder msgBuilder = new StringBuilder(); + boolean found = false; + while (true) { + int rBytes = -1; + try { + rBytes = body.read(buffer); + } catch (ClientException e) { + // Invalid LZ4 Magic + if (body instanceof ClickHouseLZ4InputStream) { + ClickHouseLZ4InputStream stream = (ClickHouseLZ4InputStream) body; + body = stream.getInputStream(); + byte[] headerBuffer = stream.getHeaderBuffer(); + System.arraycopy(headerBuffer, 0, buffer, 0, headerBuffer.length); + rBytes = headerBuffer.length; } + } + if (rBytes == -1) { + break; + } - for (int i = 0; i < rBytes; i++) { - if (buffer[i] == lookUpStr[0]) { - found = true; - for (int j = 1; j < Math.min(rBytes - i, lookUpStr.length); j++) { - if (buffer[i + j] != lookUpStr[j]) { - found = false; - break; - } - } - if (found) { - msgBuilder.append(new String(buffer, i, rBytes - i, StandardCharsets.UTF_8)); + for (int i = 0; i < rBytes; i++) { + if (buffer[i] == lookUpStr[0]) { + found = true; + for (int j = 1; j < Math.min(rBytes - i, lookUpStr.length); j++) { + if (buffer[i + j] != lookUpStr[j]) { + found = false; break; } } - } - - if (found) { - break; + if (found) { + msgBuilder.append(new String(buffer, i, rBytes - i, StandardCharsets.UTF_8)); + break; + } } } - while (true) { - int rBytes = body.read(buffer); - if (rBytes == -1) { - break; - } - msgBuilder.append(new String(buffer, 0, rBytes, StandardCharsets.UTF_8)); + if (found) { + break; } + } - String msg = msgBuilder.toString().replaceAll("\\s+", " ").replaceAll("\\\\n", " ") - .replaceAll("\\\\/", "/"); - if (msg.trim().isEmpty()) { - msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " (transport error: " + httpResponse.getCode() + ")"; + while (true) { + int rBytes = body.read(buffer); + if (rBytes == -1) { + break; } - return new ServerException(serverCode, "Code: " + msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId); - } catch (Exception e) { - LOG.error("Failed to read error message", e); - String msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " (transport error: " + httpResponse.getCode() + ")"; - return new ServerException(serverCode, msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId); + msgBuilder.append(new String(buffer, 0, rBytes, StandardCharsets.UTF_8)); + } + + String msg = msgBuilder.toString().replaceAll("\\s+", " ").replaceAll("\\\\n", " ") + .replaceAll("\\\\/", "/"); + if (msg.trim().isEmpty()) { + msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " (transport error: " + httpCode + ")"; } + return new ServerException(serverCode, "Code: " + msg + " (queryId= " + queryId + ")", httpCode, queryId); } private static final long POOL_VENT_TIMEOUT = 10000L; @@ -536,7 +580,7 @@ private ClassicHttpResponse doPostRequest(Map requestConfig, Htt throw new ClientException("Server returned '502 Bad gateway'. Check network and proxy settings."); } else if (httpResponse.getCode() >= HttpStatus.SC_BAD_REQUEST || httpResponse.containsHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE)) { try { - throw readError(httpResponse); + throw readError(req, httpResponse); } finally { httpResponse.close(); } diff --git a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index 0bf88ea46..ed571bb7b 100644 --- a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -9,6 +9,7 @@ import com.clickhouse.client.api.ServerException; import com.clickhouse.client.api.command.CommandResponse; import com.clickhouse.client.api.command.CommandSettings; +import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader; import com.clickhouse.client.api.enums.Protocol; import com.clickhouse.client.api.enums.ProxyType; import com.clickhouse.client.api.insert.InsertResponse; @@ -26,6 +27,7 @@ import com.github.tomakehurst.wiremock.core.WireMockConfiguration; import com.github.tomakehurst.wiremock.http.Fault; import com.github.tomakehurst.wiremock.http.trafficlistener.WiremockNetworkTrafficListener; +import org.apache.hc.core5.http.ConnectionClosedException; import org.apache.hc.core5.http.ConnectionRequestTimeoutException; import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.HttpStatus; @@ -36,6 +38,7 @@ import org.testng.annotations.Test; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.net.InetAddress; import java.net.Socket; import java.nio.ByteBuffer; @@ -54,6 +57,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; +import java.util.zip.GZIPOutputStream; import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED; import static org.testng.Assert.fail; @@ -400,6 +404,24 @@ public void testServerErrorHandling(ClickHouseFormat format, boolean serverCompr Assert.fail("Unexpected exception", e); } } + + if (!isCloud()) { + Client.Builder clientBuilder = new Client.Builder() + .addEndpoint("http://" + server.getHost() + ":" + server.getPort() + "/some-path") + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .compressServerResponse(serverCompression) + .useHttpCompression(useHttpCompression) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .serverSetting(ServerSettings.WAIT_END_OF_QUERY, "1"); + + try (Client client = clientBuilder.build()) { + client.queryAll("select 1"); + fail("Exception expected"); + } catch (ClientException e) { + Assert.assertTrue(e.getCause().getMessage().contains("no handle /some-path?")); + } + } } @DataProvider(name = "testServerErrorHandlingDataProvider") @@ -487,7 +509,7 @@ public void testServerErrorsUncompressed(int code, String message, String expect } catch (ServerException e) { e.printStackTrace(); Assert.assertEquals(e.getCode(), code); - Assert.assertTrue(e.getMessage().startsWith(expectedMessage)); + Assert.assertTrue(e.getMessage().startsWith(expectedMessage), "but started with " + e.getMessage()); } catch (Exception e) { e.printStackTrace(); Assert.fail("Unexpected exception", e); @@ -604,7 +626,9 @@ public void testServerSettings() { } static { - System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); + if (Boolean.getBoolean("test.debug")) { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); + } } @Test(groups = { "integration" }) @@ -1260,6 +1284,438 @@ public void testMultiPartRequest() { } } + @Test(groups = {"integration"}) + public void testSmallNetworkBufferDoesNotBreakColumnDecoding() throws Exception { + if (isCloud()) { + return; // mocked server + } + + final int rowsToRead = 2_000; + final int networkBufferSize = 8196; + final String query = "SELECT toUInt32(number), toUInt64(number), now() FROM system.numbers LIMIT " + rowsToRead; + byte[] validBody = fetchBinaryPayload(query, networkBufferSize, 60); + + Assert.assertTrue(validBody.length > 3, "Source binary payload is unexpectedly small"); + byte[] corruptedBody = Arrays.copyOf(validBody, validBody.length - 5); + + WireMockServer mockServer = new WireMockServer(WireMockConfiguration + .options().dynamicPort().notifier(new ConsoleNotifier(false))); + mockServer.start(); + try { + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_OK) + .withHeader("X-ClickHouse-Summary", "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}") + .withBody(corruptedBody)) + .build()); + Throwable thrown = assertBinaryDecodeFails(mockServer, query, networkBufferSize, 60, + "Expected failure when reading truncated binary stream"); + assertBinaryReadFailureContainsColumnName(thrown); + } finally { + mockServer.stop(); + } + } + + @Test(groups = {"integration"}) + public void testChunkedResponsePrematureEndIsReported() throws Exception { + if (isCloud()) { + return; // mocked server + } + + final String query = "SELECT toUInt32(number), toUInt64(number), now() FROM system.numbers LIMIT 10"; + + WireMockServer mockServer = new WireMockServer(WireMockConfiguration + .options().dynamicPort().notifier(new ConsoleNotifier(false))); + mockServer.start(); + try { + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_OK) + .withHeader(HttpHeaders.TRANSFER_ENCODING, "chunked") + .withFault(Fault.MALFORMED_RESPONSE_CHUNK)) + .build()); + Throwable thrown = assertBinaryDecodeFails(mockServer, query, null, 30, + "Expected failure when reading malformed chunked response"); + ConnectionClosedException connectionClosedException = findCause(thrown, ConnectionClosedException.class); + boolean hasChunkedPrematureCloseSignature = containsMessageInCauseChain(thrown, + "closing chunk expected", + "premature end of chunk coded message body", + "failed to read header"); + Assert.assertTrue(connectionClosedException != null || hasChunkedPrematureCloseSignature, + "Expected chunked/premature-close failure signature, but was: " + thrown); + } finally { + mockServer.stop(); + } + } + + @Test(groups = {"integration"}) + public void testTailCorruptedStreamFailsDecoding() throws Exception { + if (isCloud()) { + return; // mocked server + } + + final int rowsToRead = 100_000; + final int networkBufferSize = 8196; + final String query = "SELECT toUInt32(number), toUInt64(number), now() FROM system.numbers LIMIT " + rowsToRead; + byte[] validBody = fetchBinaryPayload(query, networkBufferSize, 60); + + final int removedBytes = 3; + Assert.assertTrue(validBody.length > removedBytes, "Source binary payload is unexpectedly small"); + byte[] corruptedBody = Arrays.copyOf(validBody, validBody.length - removedBytes); + + WireMockServer mockServer = new WireMockServer(WireMockConfiguration + .options().dynamicPort().notifier(new ConsoleNotifier(false))); + mockServer.start(); + try { + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_OK) + .withHeader("X-ClickHouse-Summary", "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}") + .withBody(corruptedBody)) + .build()); + Throwable thrown = assertBinaryDecodeFails(mockServer, query, networkBufferSize, 60, + "Expected failure when reading binary stream truncated at tail"); + assertBinaryReadFailureContainsColumnName(thrown); + } finally { + mockServer.stop(); + } + } + + @Test(groups = {"integration"}) + public void testTailStreamFailureReportsPositiveTimeSinceLastNextCall() throws Exception { + if (isCloud()) { + return; // mocked server + } + + final int rowsToRead = 2000; + final int networkBufferSize = 8196; + final String query = "SELECT toUInt32(number), toUInt64(number), now() FROM system.numbers LIMIT " + rowsToRead; + byte[] validBody = fetchBinaryPayload(query, networkBufferSize, 60); + final int removedBytes = 3; + Assert.assertTrue(validBody.length > removedBytes, "Source binary payload is unexpectedly small"); + byte[] corruptedBody = Arrays.copyOf(validBody, validBody.length - removedBytes); + + WireMockServer mockServer = new WireMockServer(WireMockConfiguration + .options().dynamicPort().notifier(new ConsoleNotifier(false))); + mockServer.start(); + try { + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_OK) + .withHeader("X-ClickHouse-Summary", "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}") + .withBody(corruptedBody)) + .build()); + + QuerySettings querySettings = new QuerySettings().setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes); + try (Client client = newMockServerClient(mockServer.port(), networkBufferSize) + .build(); + QueryResponse response = client.query(query, querySettings).get(60, TimeUnit.SECONDS); + ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response)) { + final int[] rowsRead = new int[] {0}; + Throwable thrown = Assert.expectThrows(Throwable.class, () -> { + while (true) { + if (rowsRead[0] >= 5) { + try { + Thread.sleep(25); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + if (reader.next() == null) { + return; + } + rowsRead[0]++; + } + }); + + Assert.assertTrue(rowsRead[0] >= 5, + "Expected to read at least a few rows before failure, but read " + rowsRead[0]); + ClientException clientException = findCause(thrown, ClientException.class); + Assert.assertNotNull(clientException, + "Expected ClientException in cause chain, but was: " + thrown); + Assert.assertTrue(containsMessageInCauseChain(thrown, "Reading column "), + "Expected column information in failure message chain, but was: " + thrown); + + String elapsedTimeMessage = findFirstMessageInCauseChain(thrown, "time since last next call"); + Assert.assertNotNull(elapsedTimeMessage, + "Expected elapsed-time fragment in failure message chain, but was: " + thrown); + + java.util.regex.Matcher matcher = Pattern.compile("time since last next call (\\d+)\\)") + .matcher(elapsedTimeMessage); + Assert.assertTrue(matcher.find(), + "Expected elapsed-time fragment in message: " + elapsedTimeMessage); + long elapsedSinceLastNext = Long.parseLong(matcher.group(1)); + Assert.assertTrue(elapsedSinceLastNext > 0, + "Expected positive elapsed time since last next call, but was " + elapsedSinceLastNext); + } + } finally { + mockServer.stop(); + } + } + + @Test(groups = {"integration"}, dataProvider = "testHttpStatusErrorBodyDataProvider") + public void testHttpStatusErrorsIncludeResponseBody(int httpStatus, String responseBody, String expectedBodyPart) throws Exception { + if (isCloud()) { + return; // mocked server + } + + WireMockServer mockServer = new WireMockServer(WireMockConfiguration + .options().dynamicPort().notifier(new ConsoleNotifier(false))); + mockServer.start(); + + try { + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse() + .withStatus(httpStatus) + .withBody(responseBody)) + .build()); + + try (Client client = new Client.Builder().addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .compressServerResponse(false) + .build()) { + + Throwable thrown = Assert.expectThrows(Throwable.class, + () -> client.query("SELECT 1").get(1, TimeUnit.SECONDS)); + ServerException serverException = findServerException(thrown); + Assert.assertNotNull(serverException, "Expected ServerException in cause chain"); + Assert.assertEquals(serverException.getTransportProtocolCode(), httpStatus); + Assert.assertTrue(serverException.getMessage().contains(expectedBodyPart), + "Expected to contain '" + expectedBodyPart + "', but was: " + serverException.getMessage()); + } + } finally { + mockServer.stop(); + } + } + + @DataProvider(name = "testHttpStatusErrorBodyDataProvider") + public static Object[][] testHttpStatusErrorBodyDataProvider() { + return new Object[][]{ + {HttpStatus.SC_UNAUTHORIZED, "Unauthorized: invalid credentials for user default", "invalid credentials"}, + {HttpStatus.SC_FORBIDDEN, "Forbidden: user default has no access to this operation", "no access"}, + {HttpStatus.SC_NOT_FOUND, "Not found: requested endpoint does not exist", "requested endpoint"} + }; + } + + @Test(groups = {"integration"}, dataProvider = "testHttpStatusWithoutBodyDataProvider") + public void testHttpStatusErrorsWithoutBody(int httpStatus) throws Exception { + if (isCloud()) { + return; // mocked server + } + + WireMockServer mockServer = new WireMockServer(WireMockConfiguration + .options().dynamicPort().notifier(new ConsoleNotifier(false))); + mockServer.start(); + + try { + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse().withStatus(httpStatus)) + .build()); + + try (Client client = new Client.Builder().addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .compressServerResponse(false) + .build()) { + + Throwable thrown = Assert.expectThrows(Throwable.class, + () -> client.query("SELECT 1").get(1, TimeUnit.SECONDS)); + ServerException serverException = findServerException(thrown); + Assert.assertNotNull(serverException, "Expected ServerException in cause chain"); + Assert.assertEquals(serverException.getTransportProtocolCode(), httpStatus); + Assert.assertTrue(serverException.getMessage().contains("unknown server error"), + "Expected unknown error message for empty body, but was: " + serverException.getMessage()); + } + } finally { + mockServer.stop(); + } + } + + @DataProvider(name = "testHttpStatusWithoutBodyDataProvider") + public static Object[][] testHttpStatusWithoutBodyDataProvider() { + return new Object[][]{ + {HttpStatus.SC_UNAUTHORIZED}, + {HttpStatus.SC_FORBIDDEN}, + {HttpStatus.SC_NOT_FOUND} + }; + } + + @Test(groups = {"integration"}, dataProvider = "testHttpStatusCompressedBodyDataProvider") + public void testHttpStatusErrorsWithHttpCompression(int httpStatus, String responseBody, String expectedBodyPart) throws Exception { + if (isCloud()) { + return; // mocked server + } + + WireMockServer mockServer = new WireMockServer(WireMockConfiguration + .options().dynamicPort().notifier(new ConsoleNotifier(false))); + mockServer.start(); + + try { + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse() + .withStatus(httpStatus) + .withHeader(HttpHeaders.CONTENT_ENCODING, "gzip") + .withBody(gzip(responseBody))) + .build()); + + try (Client client = new Client.Builder().addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .useHttpCompression(true) + .compressServerResponse(true) + .build()) { + + Throwable thrown = Assert.expectThrows(Throwable.class, + () -> client.query("SELECT 1").get(1, TimeUnit.SECONDS)); + ServerException serverException = findServerException(thrown); + Assert.assertNotNull(serverException, "Expected ServerException in cause chain"); + Assert.assertEquals(serverException.getTransportProtocolCode(), httpStatus); + Assert.assertTrue(serverException.getMessage().contains(expectedBodyPart), + "Expected compressed body part '" + expectedBodyPart + "', but was: " + serverException.getMessage()); + } + } finally { + mockServer.stop(); + } + } + + @DataProvider(name = "testHttpStatusCompressedBodyDataProvider") + public static Object[][] testHttpStatusCompressedBodyDataProvider() { + return new Object[][]{ + {HttpStatus.SC_UNAUTHORIZED, "Unauthorized: token is expired", "token is expired"}, + {HttpStatus.SC_FORBIDDEN, "Forbidden: policy denies this query", "policy denies"}, + {HttpStatus.SC_NOT_FOUND, "Not found: route does not exist", "route does not exist"} + }; + } + + private byte[] fetchBinaryPayload(String query, int networkBufferSize, int timeoutSec) throws Exception { + QuerySettings querySettings = new QuerySettings().setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes); + try (Client client = newClient() + .useHttpCompression(false) + .compressServerResponse(false) + .setOption(ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), String.valueOf(networkBufferSize)) + .build(); + QueryResponse response = client.query(query, querySettings).get(timeoutSec, TimeUnit.SECONDS)) { + return readAllBytes(response.getInputStream()); + } + } + + private Throwable assertBinaryDecodeFails(WireMockServer mockServer, String query, Integer networkBufferSize, + int timeoutSec, String assertionMessage) throws Exception { + QuerySettings querySettings = new QuerySettings().setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes); + try (Client client = newMockServerClient(mockServer.port(), networkBufferSize).build()) { + Throwable thrown = Assert.expectThrows(Throwable.class, () -> { + try (QueryResponse response = client.query(query, querySettings).get(timeoutSec, TimeUnit.SECONDS); + ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response)) { + readAllRows(reader); + } + }); + Assert.assertNotNull(thrown, assertionMessage); + return thrown; + } + } + + private Client.Builder newMockServerClient(int port, Integer networkBufferSize) { + Client.Builder builder = new Client.Builder() + .addEndpoint(Protocol.HTTP, "localhost", port, false) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .serverSetting(ServerSettings.WAIT_END_OF_QUERY, "1") + .useHttpCompression(false) + .compressServerResponse(false); + if (networkBufferSize != null) { + builder.setOption(ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), String.valueOf(networkBufferSize)); + } + return builder; + } + + private static void readAllRows(ClickHouseBinaryFormatReader reader) { + while (reader.next() != null) { + reader.getInteger(1); + reader.getLong(2); + reader.getString(3); + } + } + + private static byte[] gzip(String value) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(out)) { + gzipOutputStream.write(value.getBytes(StandardCharsets.UTF_8)); + } + return out.toByteArray(); + } + + private static byte[] readAllBytes(java.io.InputStream inputStream) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] buffer = new byte[8192]; + int read; + while ((read = inputStream.read(buffer)) != -1) { + out.write(buffer, 0, read); + } + return out.toByteArray(); + } + + private static ServerException findServerException(Throwable throwable) { + Throwable current = throwable; + while (current != null) { + if (current instanceof ServerException) { + return (ServerException) current; + } + current = current.getCause(); + } + return null; + } + + private static T findCause(Throwable throwable, Class clazz) { + Throwable current = throwable; + while (current != null) { + if (clazz.isInstance(current)) { + return clazz.cast(current); + } + current = current.getCause(); + } + return null; + } + + private static boolean containsMessageInCauseChain(Throwable throwable, String... parts) { + Throwable current = throwable; + while (current != null) { + String message = current.getMessage(); + if (message != null) { + String lower = message.toLowerCase(); + for (String part : parts) { + if (lower.contains(part.toLowerCase())) { + return true; + } + } + } + current = current.getCause(); + } + return false; + } + + private static String findFirstMessageInCauseChain(Throwable throwable, String part) { + Throwable current = throwable; + while (current != null) { + String message = current.getMessage(); + if (message != null && message.contains(part)) { + return message; + } + current = current.getCause(); + } + return null; + } + + private static void assertBinaryReadFailureContainsColumnName(Throwable thrown) { + ClientException clientException = findCause(thrown, ClientException.class); + Assert.assertNotNull(clientException, + "Expected ClientException in cause chain, but was: " + thrown); + Assert.assertTrue(containsMessageInCauseChain(thrown, "Reading column "), + "Expected column information in failure message chain, but was: " + thrown); + } + protected Client.Builder newClient() { ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); boolean isSecure = isCloud();