Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,10 @@ public String getColumnName() {
return columnName;
}

public String getColumnIndexAndName() {
return (columnIndex + 1) + " (`" + columnName + "`)";
}

public String getOriginalTypeName() {
return originalTypeName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm
private Map[] convertions;
private boolean hasNext = true;
private boolean initialState = true; // reader is in initial state, no records have been read yet
private long row = -1; // before first row
private long lastNextCallTs; // for exception to detect slow reader

protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,BinaryStreamReader.ByteBufferAllocator byteBufferAllocator, Map<ClickHouseDataType, Class<?>> defaultTypeHintMap) {
this.input = inputStream;
Expand All @@ -92,6 +94,7 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
setSchema(schema);
}
this.dataTypeConverter = DataTypeConverter.INSTANCE; // singleton while no need to customize conversion
this.lastNextCallTs = System.currentTimeMillis();
}

protected Object[] currentRecord;
Expand Down Expand Up @@ -181,6 +184,7 @@ protected boolean readRecord(Object[] record) throws IOException {
return false;
}

row++;
boolean firstColumn = true;
for (int i = 0; i < columns.length; i++) {
try {
Expand All @@ -191,12 +195,12 @@ protected boolean readRecord(Object[] record) throws IOException {
record[i] = null;
}
firstColumn = false;
} catch (EOFException e) {
if (firstColumn) {
} catch (IOException e) {
if (e instanceof EOFException && firstColumn) {
endReached();
return false;
}
throw e;
throw new IOException(recordReadExceptionMsg(columns[i].getColumnIndexAndName()), e);
}
}
return true;
Expand Down Expand Up @@ -238,35 +242,52 @@ protected void readNextRecord() {
}
} catch (IOException e) {
endReached();
throw new ClientException("Failed to read next row", e);
throw new ClientException(recordReadExceptionMsg(), e);
}
}

private long timeSinceLastNext() {
return System.currentTimeMillis() - lastNextCallTs;
}

private String recordReadExceptionMsg() {
return recordReadExceptionMsg(null);
}

private String recordReadExceptionMsg(String column) {
return "Reading " + (column != null ? "column " + column + " in " : "")
+ " row " + (row + 1) + " (time since last next call " + timeSinceLastNext() + ")";
}

@Override
public Map<String, Object> next() {
if (!hasNext) {
return null;
}

if (!nextRecordEmpty) {
Object[] tmp = currentRecord;
currentRecord = nextRecord;
nextRecord = tmp;
readNextRecord();
return new RecordWrapper(currentRecord, schema);
} else {
try {
if (readRecord(currentRecord)) {
readNextRecord();
return new RecordWrapper(currentRecord, schema);
} else {
currentRecord = null;
return null;
try {
if (!nextRecordEmpty) {
Object[] tmp = currentRecord;
currentRecord = nextRecord;
nextRecord = tmp;
readNextRecord();
return new RecordWrapper(currentRecord, schema);
} else {
try {
if (readRecord(currentRecord)) {
readNextRecord();
return new RecordWrapper(currentRecord, schema);
} else {
currentRecord = null;
return null;
}
} catch (IOException e) {
endReached();
throw new ClientException(recordReadExceptionMsg(), e);
}
} catch (IOException e) {
endReached();
throw new ClientException("Failed to read row", e);
}
} finally {
lastNextCallTs = System.currentTimeMillis();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder;
import org.apache.hc.client5.http.entity.mime.MultipartPartBuilder;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
Expand Down Expand Up @@ -86,24 +85,24 @@
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Stream;

public class HttpAPIClientHelper {

Expand Down Expand Up @@ -341,85 +340,130 @@
return clientBuilder.build();
}

// private static final String ERROR_CODE_PREFIX_PATTERN = "Code: %d. DB::Exception:";
private static final String ERROR_CODE_PREFIX_PATTERN = "%d. DB::Exception:";


/**
* Reads status line and if error tries to parse response body to get server error message.
*
* @param httpResponse - HTTP response
* @return exception object with server code
*/
public Exception readError(ClassicHttpResponse httpResponse) {
final Header qIdHeader = httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID);
final String queryId = qIdHeader == null ? "" : qIdHeader.getValue();
public Exception readError(HttpPost req, ClassicHttpResponse httpResponse) {
final Header serverQueryIdHeader = httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID);
final Header clientQueryIdHeader = req.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID);
final Header queryHeader = Stream.of(serverQueryIdHeader, clientQueryIdHeader).filter(Objects::nonNull).findFirst().orElse(null);
final String queryId = queryHeader == null ? "" : queryHeader.getValue();
int serverCode = getHeaderInt(httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE), 0);
InputStream body = null;
try {
body = httpResponse.getEntity().getContent();
byte[] buffer = new byte[ERROR_BODY_BUFFER_SIZE];
byte[] lookUpStr = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode).getBytes(StandardCharsets.UTF_8);
StringBuilder msgBuilder = new StringBuilder();
boolean found = false;
while (true) {
int rBytes = -1;
try {
rBytes = body.read(buffer);
} catch (ClientException e) {
// Invalid LZ4 Magic
if (body instanceof ClickHouseLZ4InputStream) {
ClickHouseLZ4InputStream stream = (ClickHouseLZ4InputStream) body;
body = stream.getInputStream();
byte[] headerBuffer = stream.getHeaderBuffer();
System.arraycopy(headerBuffer, 0, buffer, 0, headerBuffer.length);
rBytes = headerBuffer.length;
return serverCode > 0 ? readClickHouseError(httpResponse.getEntity(), serverCode, queryId, httpResponse.getCode()) :
readNotClickHouseError(httpResponse.getEntity(), queryId, httpResponse.getCode());
} catch (Exception e) {
LOG.error("Failed to read error message", e);
String msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " <Unreadable error message> (transport error: " + httpResponse.getCode() + ")";
return new ServerException(serverCode, msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId);
}
}

private ServerException readNotClickHouseError(HttpEntity httpEntity, String queryId, int httpCode) {

Check failure on line 367 in client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 16 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZ1P9o7PhtHiJuRVndbA&open=AZ1P9o7PhtHiJuRVndbA&pullRequest=2804

byte[] buffer = new byte[ERROR_BODY_BUFFER_SIZE];

String msg = null;
InputStream body = null;
int offset = 0;
for (int i = 0; i < 2; i++) {

Check warning on line 374 in client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Reduce the total number of break and continue statements in this loop to use at most one.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZ1P9o7PhtHiJuRVnda_&open=AZ1P9o7PhtHiJuRVnda_&pullRequest=2804
try {
if (body == null) {
body = httpEntity.getContent();
}
int msgLen = body.read(buffer, offset, buffer.length - offset);
if (msgLen > 0) {
msg = new String(buffer, 0, msgLen, StandardCharsets.UTF_8).trim();
if (msg.isEmpty()) {
msg = "<empty body response>";
}
}
if (rBytes == -1) {
break;
break;
} catch (ClientException e) {
// Invalid LZ4 Magic
if (body instanceof ClickHouseLZ4InputStream) {
ClickHouseLZ4InputStream stream = (ClickHouseLZ4InputStream) body;
body = stream.getInputStream();
byte[] lzHeader = stream.getHeaderBuffer(); // Here is read part of original body
offset = Math.min(lzHeader.length, buffer.length);
System.arraycopy(lzHeader, 0, buffer, 0, offset);
continue;
}
throw e;
} catch (Exception e) {
LOG.warn("Failed to read error message (queryId = " + queryId + ")", e);
break;
}
}

String errormsg = msg == null ? "unknown server error" : msg;
return new ServerException(ServerException.CODE_UNKNOWN, errormsg + " (transport error: " + httpCode +")", httpCode, queryId);
}

private static ServerException readClickHouseError(HttpEntity httpEntity, int serverCode, String queryId, int httpCode) throws Exception {

Check failure on line 408 in client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 32 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZ0m9TGxIqNXeWr2gTBp&open=AZ0m9TGxIqNXeWr2gTBp&pullRequest=2804

Check warning on line 408 in client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace generic exceptions with specific library exceptions or a custom exception.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZ0m9TGxIqNXeWr2gTBo&open=AZ0m9TGxIqNXeWr2gTBo&pullRequest=2804
InputStream body = httpEntity.getContent();
byte[] buffer = new byte[ERROR_BODY_BUFFER_SIZE];
byte[] lookUpStr = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode).getBytes(StandardCharsets.UTF_8);
StringBuilder msgBuilder = new StringBuilder();
boolean found = false;
while (true) {
int rBytes = -1;
try {
rBytes = body.read(buffer);
} catch (ClientException e) {
// Invalid LZ4 Magic
if (body instanceof ClickHouseLZ4InputStream) {
ClickHouseLZ4InputStream stream = (ClickHouseLZ4InputStream) body;
body = stream.getInputStream();
byte[] headerBuffer = stream.getHeaderBuffer();
System.arraycopy(headerBuffer, 0, buffer, 0, headerBuffer.length);
rBytes = headerBuffer.length;
}
}
if (rBytes == -1) {
break;
}

for (int i = 0; i < rBytes; i++) {
if (buffer[i] == lookUpStr[0]) {
found = true;
for (int j = 1; j < Math.min(rBytes - i, lookUpStr.length); j++) {
if (buffer[i + j] != lookUpStr[j]) {
found = false;
break;
}
}
if (found) {
msgBuilder.append(new String(buffer, i, rBytes - i, StandardCharsets.UTF_8));
for (int i = 0; i < rBytes; i++) {
if (buffer[i] == lookUpStr[0]) {
found = true;
for (int j = 1; j < Math.min(rBytes - i, lookUpStr.length); j++) {
if (buffer[i + j] != lookUpStr[j]) {
found = false;
break;
}
}
}

if (found) {
break;
if (found) {
msgBuilder.append(new String(buffer, i, rBytes - i, StandardCharsets.UTF_8));
break;
}
}
}

while (true) {
int rBytes = body.read(buffer);
if (rBytes == -1) {
break;
}
msgBuilder.append(new String(buffer, 0, rBytes, StandardCharsets.UTF_8));
if (found) {
break;
}
}

String msg = msgBuilder.toString().replaceAll("\\s+", " ").replaceAll("\\\\n", " ")
.replaceAll("\\\\/", "/");
if (msg.trim().isEmpty()) {
msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " <Unreadable error message> (transport error: " + httpResponse.getCode() + ")";
while (true) {
int rBytes = body.read(buffer);
if (rBytes == -1) {
break;
}
return new ServerException(serverCode, "Code: " + msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId);
} catch (Exception e) {
LOG.error("Failed to read error message", e);
String msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " <Unreadable error message> (transport error: " + httpResponse.getCode() + ")";
return new ServerException(serverCode, msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId);
msgBuilder.append(new String(buffer, 0, rBytes, StandardCharsets.UTF_8));
}

String msg = msgBuilder.toString().replaceAll("\\s+", " ").replaceAll("\\\\n", " ")
.replaceAll("\\\\/", "/");
if (msg.trim().isEmpty()) {
msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " <Unreadable error message> (transport error: " + httpCode + ")";
}
return new ServerException(serverCode, "Code: " + msg + " (queryId= " + queryId + ")", httpCode, queryId);
}

private static final long POOL_VENT_TIMEOUT = 10000L;
Expand Down Expand Up @@ -536,7 +580,7 @@
throw new ClientException("Server returned '502 Bad gateway'. Check network and proxy settings.");
} else if (httpResponse.getCode() >= HttpStatus.SC_BAD_REQUEST || httpResponse.containsHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE)) {
try {
throw readError(httpResponse);
throw readError(req, httpResponse);
} finally {
httpResponse.close();
}
Expand Down
Loading
Loading