From 459334755497c42a27e26c5c34408249b8d529e5 Mon Sep 17 00:00:00 2001 From: zhihanz Date: Mon, 2 Sep 2024 17:56:45 +0800 Subject: [PATCH 1/4] feat: add discovery api on databend client --- .../actions/setup_databend_cluster/action.yml | 2 +- .github/workflows/test.yml | 2 +- .gitignore | 2 + .../com/databend/client/ClientSettings.java | 2 - .../com/databend/client/DatabendClient.java | 5 + .../client/DatabendClientFactory.java | 3 +- .../com/databend/client/DatabendClientV1.java | 111 +++++++++++++++--- .../com/databend/client/DatabendSession.java | 1 - .../java/com/databend/client/JsonCodec.java | 31 +++-- .../com/databend/client/JsonResponse.java | 61 +++------- .../java/com/databend/client/OkHttpUtils.java | 39 +++--- .../databend/client/PaginationOptions.java | 1 + .../databend/client/ParseJsonDataUtils.java | 12 +- .../java/com/databend/client/QueryAffect.java | 1 + .../java/com/databend/client/QueryStats.java | 1 + .../client/data/ColumnTypeHandler.java | 4 +- .../client/data/ColumnTypeHandlerFactory.java | 3 +- .../databend/client/data/DatabendRawType.java | 2 +- .../databend/client/data/StringHandler.java | 16 +-- .../client/errors/CloudErrorKinds.java | 9 +- .../databend/client/errors/CloudErrors.java | 11 +- .../databend/client/errors/QueryErrors.java | 10 +- .../com/databend/client/TestClientIT.java | 17 ++- .../com/databend/client/TestQueryAffect.java | 9 +- 24 files changed, 206 insertions(+), 149 deletions(-) diff --git a/.github/actions/setup_databend_cluster/action.yml b/.github/actions/setup_databend_cluster/action.yml index 3f770c88..cd89f93c 100644 --- a/.github/actions/setup_databend_cluster/action.yml +++ b/.github/actions/setup_databend_cluster/action.yml @@ -4,7 +4,7 @@ inputs: version: description: "query and meta service version" required: true - default: "1.2.616-nightly" + default: "1.2.629-nightly" target: description: "" required: true diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index cf84009e..32e6c1ed 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,7 +15,7 @@ jobs: runs-on: ubuntu-latest services: databend: - image: datafuselabs/databend + image: datafuselabs/databend:nightly env: QUERY_DEFAULT_USER: databend QUERY_DEFAULT_PASSWORD: databend diff --git a/.gitignore b/.gitignore index a3252e50..c869b3c2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ .idea/ databend-jdbc/databend-jdbc-debug.log target/ +databend/ +.databend/ diff --git a/databend-client/src/main/java/com/databend/client/ClientSettings.java b/databend-client/src/main/java/com/databend/client/ClientSettings.java index a510445c..2cdb82fa 100644 --- a/databend-client/src/main/java/com/databend/client/ClientSettings.java +++ b/databend-client/src/main/java/com/databend/client/ClientSettings.java @@ -16,8 +16,6 @@ import java.util.HashMap; import java.util.Map; -import java.util.Objects; -import java.util.Random; public class ClientSettings { public static final Integer DEFAULT_QUERY_TIMEOUT = 300; diff --git a/databend-client/src/main/java/com/databend/client/DatabendClient.java b/databend-client/src/main/java/com/databend/client/DatabendClient.java index 6ace7bde..8e2c7f7d 100644 --- a/databend-client/src/main/java/com/databend/client/DatabendClient.java +++ b/databend-client/src/main/java/com/databend/client/DatabendClient.java @@ -21,13 +21,18 @@ public interface DatabendClient extends Closeable { String getQuery(); + @Override void close(); DatabendSession getSession(); + String getHost(); + Map getAdditionalHeaders(); + QueryResults getResults(); + // execute Restful query request for the first time. // @param request the request to be executed // @return true if request finished with result diff --git a/databend-client/src/main/java/com/databend/client/DatabendClientFactory.java b/databend-client/src/main/java/com/databend/client/DatabendClientFactory.java index 6e7fa068..7750bc3b 100644 --- a/databend-client/src/main/java/com/databend/client/DatabendClientFactory.java +++ b/databend-client/src/main/java/com/databend/client/DatabendClientFactory.java @@ -15,6 +15,7 @@ package com.databend.client; public final class DatabendClientFactory { - private DatabendClientFactory() {} + private DatabendClientFactory() { + } } diff --git a/databend-client/src/main/java/com/databend/client/DatabendClientV1.java b/databend-client/src/main/java/com/databend/client/DatabendClientV1.java index d0014a6c..11243876 100644 --- a/databend-client/src/main/java/com/databend/client/DatabendClientV1.java +++ b/databend-client/src/main/java/com/databend/client/DatabendClientV1.java @@ -15,25 +15,20 @@ package com.databend.client; import com.databend.client.errors.CloudErrors; -import okhttp3.Headers; -import okhttp3.HttpUrl; -import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.Request; +import okhttp3.*; import okio.Buffer; import javax.annotation.concurrent.ThreadSafe; - import java.io.IOException; -import java.io.UncheckedIOException; import java.net.ConnectException; import java.net.URI; import java.time.Duration; +import java.util.List; import java.util.Map; import java.util.OptionalLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Logger; import java.util.function.Consumer; +import java.util.logging.Logger; import static com.databend.client.JsonCodec.jsonCodec; import static com.google.common.base.MoreObjects.firstNonNull; @@ -51,12 +46,14 @@ public class DatabendClientV1 firstNonNull(DatabendClientV1.class.getPackage().getImplementationVersion(), "jvm-unknown"); public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8"); public static final JsonCodec QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class); + public static final JsonCodec DISCOVERY_RESULT_CODEC = jsonCodec(DiscoveryResponseCodec.DiscoveryResponse.class); public static final String succeededState = "succeeded"; public static final String failedState = "failed"; public static final String runningState = "running"; public static final String QUERY_PATH = "/v1/query"; + public static final String DISCOVERY_PATH = "/v1/discovery_nodes"; private static final long MAX_MATERIALIZED_JSON_RESPONSE_SIZE = 128 * 1024; private final OkHttpClient httpClient; private final String query; @@ -96,14 +93,23 @@ public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings sett } } - public Request.Builder prepareRequest(HttpUrl url) { + public static List dicoverNodes(OkHttpClient httpClient, ClientSettings settings) { + requireNonNull(httpClient, "httpClient is null"); + requireNonNull(settings, "settings is null"); + requireNonNull(settings.getHost(), "settings.host is null"); + Request request = buildDiscoveryRequest(settings); + DiscoveryResponseCodec.DiscoveryResponse response = getDiscoveryResponse(httpClient, request, OptionalLong.empty(), settings.getQueryTimeoutSecs()); + return response.getNodes(); + } + + public static Request.Builder prepareRequest(HttpUrl url, Map additionalHeaders) { Request.Builder builder = new Request.Builder() .url(url) .header("User-Agent", USER_AGENT_VALUE) .header("Accept", "application/json") .header("Content-Type", "application/json"); - if (this.getAdditionalHeaders() != null) { - this.getAdditionalHeaders().forEach(builder::addHeader); + if (additionalHeaders != null) { + additionalHeaders.forEach(builder::addHeader); } return builder; } @@ -121,15 +127,90 @@ private Request buildQueryRequest(String query, ClientSettings settings) { throw new IllegalArgumentException("Invalid request: " + req); } url = url.newBuilder().encodedPath(QUERY_PATH).build(); - Request.Builder builder = prepareRequest(url); + Request.Builder builder = prepareRequest(url, this.additonalHeaders); return builder.post(okhttp3.RequestBody.create(MEDIA_TYPE_JSON, reqString)).build(); } + private static Request buildDiscoveryRequest(ClientSettings settings) { + HttpUrl url = HttpUrl.get(settings.getHost()); + if (url == null) { + // TODO(zhihanz) use custom exception + throw new IllegalArgumentException("Invalid host: " + settings.getHost()); + } + url = url.newBuilder().encodedPath(DISCOVERY_PATH).build(); + Request.Builder builder = prepareRequest(url, settings.getAdditionalHeaders()); + return builder.get().build(); + } + @Override public String getQuery() { return query; } + private static DiscoveryResponseCodec.DiscoveryResponse getDiscoveryResponse(OkHttpClient httpClient, Request request, OptionalLong materializedJsonSizeLimit, int requestTimeoutSecs) { + requireNonNull(request, "request is null"); + + long start = System.nanoTime(); + int attempts = 0; + Exception lastException = null; + + while (true) { + if (attempts > 0) { + Duration sinceStart = Duration.ofNanos(System.nanoTime() - start); + if (sinceStart.compareTo(Duration.ofSeconds(requestTimeoutSecs)) > 0) { + throw new RuntimeException(format("Error fetching discovery nodes (attempts: %s, duration: %s)", attempts, sinceStart.getSeconds()), lastException); + } + + try { + MILLISECONDS.sleep(attempts * 100); // Exponential backoff + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while fetching discovery nodes", e); + } + } + attempts++; + + JsonResponse response; + try { + response = JsonResponse.execute( + DISCOVERY_RESULT_CODEC, + httpClient, + request, + materializedJsonSizeLimit); + } catch (RuntimeException e) { + lastException = e; + if (e.getCause() instanceof ConnectException) { + // Retry on connection refused errors + continue; + } + throw new RuntimeException("Failed to fetch discovery nodes: " + e.getMessage(), e); + } + + if (response.getStatusCode() == HTTP_OK && response.hasValue()) { + DiscoveryResponseCodec.DiscoveryResponse discoveryResponse = response.getValue(); + if (discoveryResponse.getError() == null) { + return discoveryResponse; // Successful response + } + if (discoveryResponse.getError().notFound()) { + throw new UnsupportedOperationException("Discovery request feature not supported: " + discoveryResponse.getError()); + } + throw new RuntimeException("Discovery request failed: " + discoveryResponse.getError()); + } + + // Handle other HTTP error codes and response body parsing for errors + if (response.getResponseBody().isPresent()) { + CloudErrors errors = CloudErrors.tryParse(response.getResponseBody().get()); + if (errors != null && errors.tryGetErrorKind().canRetry()) { + continue; + } + } + + if (response.getStatusCode() != 520) { + throw new RuntimeException("Discovery request failed with status code: " + response.getStatusCode()); + } + } + } + private boolean executeInternal(Request request, OptionalLong materializedJsonSizeLimit) { requireNonNull(request, "request is null"); long start = System.nanoTime(); @@ -219,7 +300,7 @@ private void processResponse(Headers headers, QueryResults results) { if (results.getQueryId() != null && this.additonalHeaders.get(ClientSettings.X_Databend_Query_ID) == null) { this.additonalHeaders.put(ClientSettings.X_Databend_Query_ID, results.getQueryId()); } - if (headers != null && headers.get(ClientSettings.X_DATABEND_ROUTE_HINT) != null){ + if (headers != null && headers.get(ClientSettings.X_DATABEND_ROUTE_HINT) != null) { this.additonalHeaders.put(ClientSettings.X_DATABEND_ROUTE_HINT, headers.get(ClientSettings.X_DATABEND_ROUTE_HINT)); } currentResults.set(results); @@ -241,7 +322,7 @@ public boolean advance() { String nextUriPath = this.currentResults.get().getNextUri().toString(); HttpUrl url = HttpUrl.get(this.host); url = url.newBuilder().encodedPath(nextUriPath).build(); - Request.Builder builder = prepareRequest(url); + Request.Builder builder = prepareRequest(url, this.additonalHeaders); Request request = builder.get().build(); return executeInternal(request, OptionalLong.of(MAX_MATERIALIZED_JSON_RESPONSE_SIZE)); } @@ -291,7 +372,7 @@ private void closeQuery() { String path = uri.toString(); HttpUrl url = HttpUrl.get(this.host); url = url.newBuilder().encodedPath(path).build(); - Request r = prepareRequest(url).get().build(); + Request r = prepareRequest(url, this.additonalHeaders).get().build(); try { httpClient.newCall(r).execute().close(); } catch (IOException ignored) { diff --git a/databend-client/src/main/java/com/databend/client/DatabendSession.java b/databend-client/src/main/java/com/databend/client/DatabendSession.java index f195ab19..24178f4d 100644 --- a/databend-client/src/main/java/com/databend/client/DatabendSession.java +++ b/databend-client/src/main/java/com/databend/client/DatabendSession.java @@ -19,7 +19,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/databend-client/src/main/java/com/databend/client/JsonCodec.java b/databend-client/src/main/java/com/databend/client/JsonCodec.java index 85e1c0ef..d4ba4e5c 100644 --- a/databend-client/src/main/java/com/databend/client/JsonCodec.java +++ b/databend-client/src/main/java/com/databend/client/JsonCodec.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import java.io.IOException; @@ -31,8 +32,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; -public class JsonCodec -{ +public class JsonCodec { // copy of https://github.com/airlift/airlift/blob/master/json/src/main/java/io/airlift/json/ObjectMapperProvider.java static final Supplier OBJECT_MAPPER_SUPPLIER = () -> new ObjectMapper() .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) @@ -45,46 +45,43 @@ public class JsonCodec .disable(MapperFeature.CAN_OVERRIDE_ACCESS_MODIFIERS) .disable(MapperFeature.INFER_PROPERTY_MUTATORS) .disable(MapperFeature.ALLOW_FINAL_FIELDS_AS_MUTATORS) - .registerModule(new Jdk8Module()); + .registerModule(new Jdk8Module()).registerModule(new SimpleModule().addDeserializer( + DiscoveryResponseCodec.DiscoveryResponse.class, new DiscoveryResponseCodec.DiscoveryResponseDeserializer() + )); + private final ObjectMapper mapper; private final Type type; private final JavaType javaType; - private JsonCodec(ObjectMapper mapper, Type type) - { + + private JsonCodec(ObjectMapper mapper, Type type) { this.mapper = requireNonNull(mapper, "mapper is null"); this.type = requireNonNull(type, "type is null"); this.javaType = mapper.getTypeFactory().constructType(type); } - public static JsonCodec jsonCodec(Class type) - { + public static JsonCodec jsonCodec(Class type) { return new JsonCodec<>(OBJECT_MAPPER_SUPPLIER.get(), type); } - public Type getType() - { + public Type getType() { return type; } public T fromJson(String json) - throws JsonProcessingException - { + throws JsonProcessingException { try (JsonParser parser = mapper.createParser(json)) { T value = mapper.readerFor(javaType).readValue(parser); checkArgument(parser.nextToken() == null, "Found characters after the expected end of input"); return value; - } - catch (JsonProcessingException e) { + } catch (JsonProcessingException e) { throw e; - } - catch (IOException e) { + } catch (IOException e) { throw new UncheckedIOException(e); } } public T fromJson(InputStream inputStream) - throws IOException, JsonProcessingException - { + throws IOException, JsonProcessingException { try (JsonParser parser = mapper.createParser(inputStream)) { T value = mapper.readerFor(javaType).readValue(parser); checkArgument(parser.nextToken() == null, "Found characters after the expected end of input"); diff --git a/databend-client/src/main/java/com/databend/client/JsonResponse.java b/databend-client/src/main/java/com/databend/client/JsonResponse.java index 15c9590e..226fed69 100644 --- a/databend-client/src/main/java/com/databend/client/JsonResponse.java +++ b/databend-client/src/main/java/com/databend/client/JsonResponse.java @@ -15,19 +15,11 @@ package com.databend.client; import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.util.concurrent.UncheckedExecutionException; -import okhttp3.Headers; -import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import okhttp3.ResponseBody; +import okhttp3.*; import javax.annotation.Nullable; - import java.io.IOException; import java.io.UncheckedIOException; -import java.net.ConnectException; import java.util.Optional; import java.util.OptionalLong; @@ -36,8 +28,7 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; -public final class JsonResponse -{ +public final class JsonResponse { private final int statusCode; private final String statusMessage; private final Headers headers; @@ -47,8 +38,7 @@ public final class JsonResponse private final T value; private final IllegalArgumentException exception; - private JsonResponse(int statusCode, String statusMessage, Headers headers, String responseBody) - { + private JsonResponse(int statusCode, String statusMessage, Headers headers, String responseBody) { this.statusCode = statusCode; this.statusMessage = statusMessage; this.headers = requireNonNull(headers, "headers is null"); @@ -59,8 +49,7 @@ private JsonResponse(int statusCode, String statusMessage, Headers headers, Stri this.exception = null; } - private JsonResponse(int statusCode, String statusMessage, Headers headers, @Nullable String responseBody, @Nullable T value, @Nullable IllegalArgumentException exception) - { + private JsonResponse(int statusCode, String statusMessage, Headers headers, @Nullable String responseBody, @Nullable T value, @Nullable IllegalArgumentException exception) { this.statusCode = statusCode; this.statusMessage = statusMessage; this.headers = requireNonNull(headers, "headers is null"); @@ -70,8 +59,7 @@ private JsonResponse(int statusCode, String statusMessage, Headers headers, @Nul this.hasValue = (exception == null); } - public static JsonResponse execute(JsonCodec codec, OkHttpClient client, Request request, OptionalLong materializedJsonSizeLimit) throws RuntimeException - { + public static JsonResponse execute(JsonCodec codec, OkHttpClient client, Request request, OptionalLong materializedJsonSizeLimit) throws RuntimeException { try (Response response = client.newCall(request).execute()) { // TODO: fix in OkHttp: https://github.com/square/okhttp/issues/3111 if ((response.code() == 307) || (response.code() == 308)) { @@ -92,19 +80,16 @@ public static JsonResponse execute(JsonCodec codec, OkHttpClient clien // Parse from input stream, response is either of unknown size or too large to materialize. Raw response body // will not be available if parsing fails value = codec.fromJson(responseBody.byteStream()); - } - else { + } else { // parse from materialized response body string body = responseBody.string(); value = codec.fromJson(body); } - } - catch (JsonProcessingException e) { + } catch (JsonProcessingException e) { String message; if (body != null) { message = format("Unable to create %s from JSON response:\n[%s]", codec.getType(), body); - } - else { + } else { message = format("Unable to create %s from JSON response", codec.getType()); } exception = new IllegalArgumentException(message, e); @@ -113,59 +98,49 @@ public static JsonResponse execute(JsonCodec codec, OkHttpClient clien return new JsonResponse<>(response.code(), response.message(), response.headers(), body, value, exception); } return new JsonResponse<>(response.code(), response.message(), response.headers(), responseBody.string()); - } - catch (IOException e) { + } catch (IOException e) { throw new UncheckedIOException(e); } } - private static boolean isJson(MediaType type) - { + private static boolean isJson(MediaType type) { return (type != null) && "application".equals(type.type()) && "json".equals(type.subtype()); } - public int getStatusCode() - { + public int getStatusCode() { return statusCode; } - public String getStatusMessage() - { + public String getStatusMessage() { return statusMessage; } - public Headers getHeaders() - { + public Headers getHeaders() { return headers; } - public boolean hasValue() - { + public boolean hasValue() { return hasValue; } - public T getValue() - { + public T getValue() { if (!hasValue) { throw new IllegalStateException("Response does not contain a JSON value", exception); } return value; } - public Optional getResponseBody() - { + public Optional getResponseBody() { return Optional.ofNullable(responseBody); } @Nullable - public IllegalArgumentException getException() - { + public IllegalArgumentException getException() { return exception; } @Override - public String toString() - { + public String toString() { return toStringHelper(this) .add("statusCode", statusCode) .add("statusMessage", statusMessage) diff --git a/databend-client/src/main/java/com/databend/client/OkHttpUtils.java b/databend-client/src/main/java/com/databend/client/OkHttpUtils.java index 5debecfb..5ec2f63f 100644 --- a/databend-client/src/main/java/com/databend/client/OkHttpUtils.java +++ b/databend-client/src/main/java/com/databend/client/OkHttpUtils.java @@ -22,7 +22,6 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; - import java.security.GeneralSecurityException; import java.security.SecureRandom; import java.security.cert.X509Certificate; @@ -32,22 +31,19 @@ import static com.google.common.net.HttpHeaders.AUTHORIZATION; import static java.util.Objects.requireNonNull; -public final class OkHttpUtils -{ - private OkHttpUtils() {} +public final class OkHttpUtils { + private OkHttpUtils() { + } - public static Interceptor userAgentInterceptor(String userAgent) - { + public static Interceptor userAgentInterceptor(String userAgent) { return chain -> chain.proceed(chain.request().newBuilder().header("User-Agent", userAgent).build()); } - public static Interceptor basicAuthInterceptor(String username, String password) - { + public static Interceptor basicAuthInterceptor(String username, String password) { return chain -> chain.proceed(chain.request().newBuilder().header("Authorization", Credentials.basic(username, password)).build()); } - public static Interceptor tokenAuth(String accessToken) - { + public static Interceptor tokenAuth(String accessToken) { requireNonNull(accessToken, "accessToken is null"); checkArgument(CharMatcher.inRange((char) 33, (char) 126).matchesAllOf(accessToken)); @@ -56,45 +52,38 @@ public static Interceptor tokenAuth(String accessToken) .build()); } - public static void setupTimeouts(OkHttpClient.Builder clientBuilder, int timeout, TimeUnit unit) - { + public static void setupTimeouts(OkHttpClient.Builder clientBuilder, int timeout, TimeUnit unit) { clientBuilder .connectTimeout(timeout, unit) .readTimeout(timeout, unit) .writeTimeout(timeout, unit); } - public static void setupInsecureSsl(OkHttpClient.Builder clientBuilder) - { + public static void setupInsecureSsl(OkHttpClient.Builder clientBuilder) { try { - X509TrustManager trustAllCerts = new X509TrustManager() - { + X509TrustManager trustAllCerts = new X509TrustManager() { @Override - public void checkClientTrusted(X509Certificate[] chain, String authType) - { + public void checkClientTrusted(X509Certificate[] chain, String authType) { throw new UnsupportedOperationException("checkClientTrusted should not be called"); } @Override - public void checkServerTrusted(X509Certificate[] chain, String authType) - { + public void checkServerTrusted(X509Certificate[] chain, String authType) { // skip validation of server certificate } @Override - public X509Certificate[] getAcceptedIssuers() - { + public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } }; SSLContext sslContext = SSLContext.getInstance("SSL"); - sslContext.init(null, new TrustManager[] {trustAllCerts}, new SecureRandom()); + sslContext.init(null, new TrustManager[]{trustAllCerts}, new SecureRandom()); clientBuilder.sslSocketFactory(sslContext.getSocketFactory(), trustAllCerts); clientBuilder.hostnameVerifier((hostname, session) -> true); - } - catch (GeneralSecurityException e) { + } catch (GeneralSecurityException e) { throw new RuntimeException("Error setting up SSL: " + e.getMessage(), e); } } diff --git a/databend-client/src/main/java/com/databend/client/PaginationOptions.java b/databend-client/src/main/java/com/databend/client/PaginationOptions.java index 4bbe72f7..d4e34a7a 100644 --- a/databend-client/src/main/java/com/databend/client/PaginationOptions.java +++ b/databend-client/src/main/java/com/databend/client/PaginationOptions.java @@ -28,6 +28,7 @@ public class PaginationOptions { private final int waitTimeSecs; private final int maxRowsInBuffer; private final int maxRowsPerPage; + @JsonCreator public PaginationOptions( @JsonProperty("wait_time_secs") int waitTimeSecs diff --git a/databend-client/src/main/java/com/databend/client/ParseJsonDataUtils.java b/databend-client/src/main/java/com/databend/client/ParseJsonDataUtils.java index 63e91e11..898b6a41 100644 --- a/databend-client/src/main/java/com/databend/client/ParseJsonDataUtils.java +++ b/databend-client/src/main/java/com/databend/client/ParseJsonDataUtils.java @@ -24,18 +24,18 @@ import static java.util.Collections.unmodifiableList; -final class ParseJsonDataUtils -{ - private ParseJsonDataUtils() {} +final class ParseJsonDataUtils { + private ParseJsonDataUtils() { + } + /** * parseRawData is used to convert json data an immutable list of data * input QuerySchema: contains column names and types * input List> : a list of rows parsed from QueryResponse * output Iterable> : convert the input rows into DatabendType and return an immutable list */ - public static List> parseRawData(List schema, List> data) - { - if (data == null || schema == null ) { + public static List> parseRawData(List schema, List> data) { + if (data == null || schema == null) { return null; } ColumnTypeHandler[] typeHandlers = createTypeHandlers(schema); diff --git a/databend-client/src/main/java/com/databend/client/QueryAffect.java b/databend-client/src/main/java/com/databend/client/QueryAffect.java index dc5f16f5..740f699a 100644 --- a/databend-client/src/main/java/com/databend/client/QueryAffect.java +++ b/databend-client/src/main/java/com/databend/client/QueryAffect.java @@ -115,6 +115,7 @@ public ChangeSettings( this.values = values; this.isGlobals = isGlobals; } + @JsonProperty public List getKeys() { return keys; diff --git a/databend-client/src/main/java/com/databend/client/QueryStats.java b/databend-client/src/main/java/com/databend/client/QueryStats.java index 855e906a..95bb1172 100644 --- a/databend-client/src/main/java/com/databend/client/QueryStats.java +++ b/databend-client/src/main/java/com/databend/client/QueryStats.java @@ -30,6 +30,7 @@ public class QueryStats { private final QueryProgress scanProgress; private final QueryProgress writeProgress; private final QueryProgress resultProgress; + @JsonCreator public QueryStats( @JsonProperty("running_time_ms") float runningTimeMS, diff --git a/databend-client/src/main/java/com/databend/client/data/ColumnTypeHandler.java b/databend-client/src/main/java/com/databend/client/data/ColumnTypeHandler.java index 16f5205a..325be59d 100644 --- a/databend-client/src/main/java/com/databend/client/data/ColumnTypeHandler.java +++ b/databend-client/src/main/java/com/databend/client/data/ColumnTypeHandler.java @@ -14,10 +14,10 @@ package com.databend.client.data; -public interface ColumnTypeHandler -{ +public interface ColumnTypeHandler { /** * Convert the input object to the corresponding Java Type + * * @param value raw input row value * @return parsed java type value */ diff --git a/databend-client/src/main/java/com/databend/client/data/ColumnTypeHandlerFactory.java b/databend-client/src/main/java/com/databend/client/data/ColumnTypeHandlerFactory.java index b97c8aef..994046c5 100644 --- a/databend-client/src/main/java/com/databend/client/data/ColumnTypeHandlerFactory.java +++ b/databend-client/src/main/java/com/databend/client/data/ColumnTypeHandlerFactory.java @@ -16,8 +16,7 @@ import java.util.Locale; -public class ColumnTypeHandlerFactory -{ +public class ColumnTypeHandlerFactory { public static ColumnTypeHandler getTypeHandler(DatabendRawType type) { if (type == null) { return null; diff --git a/databend-client/src/main/java/com/databend/client/data/DatabendRawType.java b/databend-client/src/main/java/com/databend/client/data/DatabendRawType.java index 23c40357..d02cbf60 100644 --- a/databend-client/src/main/java/com/databend/client/data/DatabendRawType.java +++ b/databend-client/src/main/java/com/databend/client/data/DatabendRawType.java @@ -81,7 +81,7 @@ public DatabendRawType(String type) { .map(DatabendRawType::new) .collect(Collectors.toList()); this.columnSize = subType.size(); - } else if (dataType == DatabendDataType.MAP) { + } else if (dataType == DatabendDataType.MAP) { // remove "Map(" and last ")" String subTypes = this.type.substring(4, this.type.length() - 1); // split by "," diff --git a/databend-client/src/main/java/com/databend/client/data/StringHandler.java b/databend-client/src/main/java/com/databend/client/data/StringHandler.java index 061fc22f..df4189c9 100644 --- a/databend-client/src/main/java/com/databend/client/data/StringHandler.java +++ b/databend-client/src/main/java/com/databend/client/data/StringHandler.java @@ -14,26 +14,23 @@ package com.databend.client.data; -class StringHandler implements ColumnTypeHandler -{ +class StringHandler implements ColumnTypeHandler { private final boolean isNullable; + public StringHandler() { this.isNullable = false; } - public StringHandler(boolean isNullable) - { + public StringHandler(boolean isNullable) { this.isNullable = isNullable; } @Override - public Object parseValue(Object value) - { + public Object parseValue(Object value) { if (value == null) { if (isNullable) { return null; - } - else { + } else { throw new IllegalArgumentException("String type is not nullable"); } } @@ -44,8 +41,7 @@ public Object parseValue(Object value) } @Override - public void setNullable(boolean isNullable) - { + public void setNullable(boolean isNullable) { // do nothing } } diff --git a/databend-client/src/main/java/com/databend/client/errors/CloudErrorKinds.java b/databend-client/src/main/java/com/databend/client/errors/CloudErrorKinds.java index a9ddd3d6..a82cd632 100644 --- a/databend-client/src/main/java/com/databend/client/errors/CloudErrorKinds.java +++ b/databend-client/src/main/java/com/databend/client/errors/CloudErrorKinds.java @@ -17,8 +17,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; // CloudErrorKinds is a list of error kinds that can be returned by the databend cloud service. -public enum CloudErrorKinds -{ +public enum CloudErrorKinds { TENANT_NOT_FOUND("TenantNotFound", "Tenant not found, please check your tenant id", false), TENANT_BOOTSTRAP_FAILED("TenantBootstrapFailed", "tenant bootstrap failed", false), WAREHOUSE_NOT_FOUND("WarehouseNotFound", "warehouse not found", false), @@ -41,9 +40,9 @@ public enum CloudErrorKinds HEALTH_CHECK_FAILED("HealthCheckFailed", "health check failed", false), BAD_TENANT("BadTenant", "bad tenant", false); - private final String kind; - private final String description; - private final boolean canRetry; + private final String kind; + private final String description; + private final boolean canRetry; CloudErrorKinds(String kind, String description, boolean canRetry) { this.kind = kind; diff --git a/databend-client/src/main/java/com/databend/client/errors/CloudErrors.java b/databend-client/src/main/java/com/databend/client/errors/CloudErrors.java index 5d5fbca1..97d621ed 100644 --- a/databend-client/src/main/java/com/databend/client/errors/CloudErrors.java +++ b/databend-client/src/main/java/com/databend/client/errors/CloudErrors.java @@ -21,25 +21,22 @@ import static com.google.common.base.MoreObjects.toStringHelper; /** - * CloudErrors is a list of errors that can be returned by the databend cloud service. + * CloudErrors is a list of errors that can be returned by the databend cloud service. */ -public class CloudErrors -{ +public class CloudErrors { private final String kind; private final String message; @JsonCreator public CloudErrors( @JsonProperty("kind") String kind, - @JsonProperty("message") String message) - { + @JsonProperty("message") String message) { this.kind = kind; this.message = message; } // return null if parse failed - public static CloudErrors tryParse(String json) - { + public static CloudErrors tryParse(String json) { try { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(json, CloudErrors.class); diff --git a/databend-client/src/main/java/com/databend/client/errors/QueryErrors.java b/databend-client/src/main/java/com/databend/client/errors/QueryErrors.java index 8721a227..1e1e0e51 100644 --- a/databend-client/src/main/java/com/databend/client/errors/QueryErrors.java +++ b/databend-client/src/main/java/com/databend/client/errors/QueryErrors.java @@ -22,16 +22,14 @@ /** * QueryErrors represent errors from databend server */ -public class QueryErrors -{ +public class QueryErrors { private final int code; private final String message; @JsonCreator public QueryErrors( @JsonProperty("code") int code, - @JsonProperty("message") String message) - { + @JsonProperty("message") String message) { this.code = code; this.message = message; } @@ -59,6 +57,10 @@ public String toString() { .toString(); } + public boolean notFound() { + return code == 404; + } + public static final class Builder { private int code; private String message; diff --git a/databend-client/src/test/java/com/databend/client/TestClientIT.java b/databend-client/src/test/java/com/databend/client/TestClientIT.java index bf3f8c9f..3214ab9a 100644 --- a/databend-client/src/test/java/com/databend/client/TestClientIT.java +++ b/databend-client/src/test/java/com/databend/client/TestClientIT.java @@ -18,8 +18,6 @@ import org.testng.Assert; import org.testng.annotations.Test; -import java.io.IOException; -import java.io.UncheckedIOException; import java.net.ConnectException; import java.util.HashMap; import java.util.List; @@ -96,4 +94,19 @@ public void testBasicQueryIDHeader() { Assert.assertEquals(cli1.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID1); } + @Test(groups = {"it"}) + public void testDiscoverNodes() { + OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build(); + String expectedUUID = UUID.randomUUID().toString(); + + Map additionalHeaders = new HashMap<>(); + additionalHeaders.put(X_Databend_Query_ID, expectedUUID); + ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS); + List nodes = DatabendClientV1.dicoverNodes(client, settings); + Assert.assertFalse(nodes.isEmpty()); + for (DiscoveryNode node : nodes) { + System.out.println(node.getAddress()); + } + } + } diff --git a/databend-client/src/test/java/com/databend/client/TestQueryAffect.java b/databend-client/src/test/java/com/databend/client/TestQueryAffect.java index dd6d62ce..dc2a1bc9 100644 --- a/databend-client/src/test/java/com/databend/client/TestQueryAffect.java +++ b/databend-client/src/test/java/com/databend/client/TestQueryAffect.java @@ -14,18 +14,19 @@ package com.databend.client; -import io.airlift.json.JsonCodec; +import com.fasterxml.jackson.core.JsonProcessingException; import org.testng.Assert; import org.testng.annotations.Test; -import static io.airlift.json.JsonCodec.jsonCodec; +import static com.databend.client.JsonCodec.jsonCodec; + @Test(timeOut = 10000) public class TestQueryAffect { private static final JsonCodec QUERY_AFFECT_JSON_CODEC = jsonCodec(QueryAffect.class); @Test( groups = {"unit"} ) - public void testQueryAffectUseDB() { + public void testQueryAffectUseDB() throws JsonProcessingException { String json = "{\"type\":\"UseDB\",\"name\":\"db1\"}"; QueryAffect clause = QUERY_AFFECT_JSON_CODEC.fromJson(json); @@ -36,7 +37,7 @@ public void testQueryAffectUseDB() { } @Test( groups = {"unit"} ) - public void testQueryAffectChangeSettings() { + public void testQueryAffectChangeSettings() throws JsonProcessingException { String json = "{\"type\":\"ChangeSettings\",\"keys\":[\"max_threads\"],\"values\":[\"1\"],\"is_globals\":[false]}"; QueryAffect clause = QUERY_AFFECT_JSON_CODEC.fromJson(json); From 1fe7a9f6e537d2aa68c6f078df3bc4c00fdccac5 Mon Sep 17 00:00:00 2001 From: zhihanz Date: Mon, 2 Sep 2024 18:03:31 +0800 Subject: [PATCH 2/4] chore: upgrade to v0.3.0 --- README.md | 2 +- databend-client/pom.xml | 4 ++-- databend-jdbc/pom.xml | 6 +++--- pom.xml | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index f3aa95d0..09cf2263 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ Add following code block as a dependency com.databend databend-jdbc - 0.2.9 + 0.3.0 ``` diff --git a/databend-client/pom.xml b/databend-client/pom.xml index 38d91ac4..6119dfff 100644 --- a/databend-client/pom.xml +++ b/databend-client/pom.xml @@ -6,12 +6,12 @@ com.databend databend-base - 0.2.9 + 0.3.0 ../pom.xml com.databend databend-client - 0.2.9 + 0.3.0 diff --git a/databend-jdbc/pom.xml b/databend-jdbc/pom.xml index f0362f4b..17d8b8bd 100644 --- a/databend-jdbc/pom.xml +++ b/databend-jdbc/pom.xml @@ -6,12 +6,12 @@ com.databend databend-base - 0.2.9 + 0.3.0 ../pom.xml com.databend databend-jdbc - 0.2.9 + 0.3.0 @@ -24,7 +24,7 @@ com.databend databend-client - 0.2.9 + 0.3.0 com.squareup.okhttp3 diff --git a/pom.xml b/pom.xml index d4d6a33e..5ce8bf19 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ com.databend databend-base - 0.2.9 + 0.3.0 databend-base Databend pom From a7547de4ccd096ea38343edc04898e6cedb54715 Mon Sep 17 00:00:00 2001 From: zhihanz Date: Mon, 2 Sep 2024 18:10:51 +0800 Subject: [PATCH 3/4] fill out missing files --- .gitignore | 2 +- .../com/databend/client/DiscoveryNode.java | 61 ++++++++++++ .../client/DiscoveryResponseCodec.java | 97 +++++++++++++++++++ .../databend/client/TestDiscoveryNodes.java | 46 +++++++++ 4 files changed, 205 insertions(+), 1 deletion(-) create mode 100644 databend-client/src/main/java/com/databend/client/DiscoveryNode.java create mode 100644 databend-client/src/main/java/com/databend/client/DiscoveryResponseCodec.java create mode 100644 databend-client/src/test/java/com/databend/client/TestDiscoveryNodes.java diff --git a/.gitignore b/.gitignore index c869b3c2..518817bd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ .idea/ databend-jdbc/databend-jdbc-debug.log target/ -databend/ + .databend/ diff --git a/databend-client/src/main/java/com/databend/client/DiscoveryNode.java b/databend-client/src/main/java/com/databend/client/DiscoveryNode.java new file mode 100644 index 00000000..876a6e22 --- /dev/null +++ b/databend-client/src/main/java/com/databend/client/DiscoveryNode.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databend.client; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import static com.google.common.base.MoreObjects.toStringHelper; + +public class DiscoveryNode { + private final String address; + + @JsonCreator + public DiscoveryNode( + @JsonProperty("address") String address) { + this.address = address; + } + + // add builder + + @JsonProperty + public String getAddress() { + return address; + } + + @Override + public String toString() { + return toStringHelper(this) + .add("address", address) + .toString(); + } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + private String address; + + public Builder setAddress(String address) { + this.address = address; + return this; + } + + public DiscoveryNode build() { + return new DiscoveryNode(address); + } + } +} diff --git a/databend-client/src/main/java/com/databend/client/DiscoveryResponseCodec.java b/databend-client/src/main/java/com/databend/client/DiscoveryResponseCodec.java new file mode 100644 index 00000000..74284882 --- /dev/null +++ b/databend-client/src/main/java/com/databend/client/DiscoveryResponseCodec.java @@ -0,0 +1,97 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databend.client; + +import com.databend.client.errors.QueryErrors; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.module.SimpleModule; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class DiscoveryResponseCodec { + + public static class DiscoveryResponse { + private final List nodes; + private final QueryErrors error; + + public DiscoveryResponse(List nodes, QueryErrors error) { + this.nodes = nodes; + this.error = error; + } + + // Getters for nodes and error + public List getNodes() { + return nodes; + } + + public QueryErrors getError() { + return error; + } + } + + + public static class DiscoveryResponseDeserializer extends StdDeserializer { + + public DiscoveryResponseDeserializer() { + this(null); + } + + public DiscoveryResponseDeserializer(Class vc) { + super(vc); + } + + @Override + public DiscoveryResponse deserialize(JsonParser jp, DeserializationContext ctxt) + throws IOException, JsonProcessingException { + ObjectMapper mapper = (ObjectMapper) jp.getCodec(); + JsonNode rootNode = mapper.readTree(jp); + + List nodes = new ArrayList<>(); + QueryErrors error = null; + + if (rootNode.has("error")) { + // Deserialize error + error = mapper.treeToValue(rootNode.get("error"), QueryErrors.class); + } else if (rootNode.isArray()) { + // Deserialize nodes + for (JsonNode element : rootNode) { + DiscoveryNode node = new DiscoveryNode(element.get("address").asText()); + nodes.add(node); + } + } else { + throw new JsonProcessingException("Unrecognized JSON format") { + }; + } + + return new DiscoveryResponse(nodes, error); + } + } + + // Method to deserialize from JSON string + public static T fromJson(String json, Class valueType) throws JsonProcessingException { + ObjectMapper objectMapper = new ObjectMapper(); + SimpleModule module = new SimpleModule(); + module.addDeserializer(DiscoveryResponse.class, new DiscoveryResponseDeserializer()); + objectMapper.registerModule(module); + return objectMapper.readValue(json, valueType); + } +} diff --git a/databend-client/src/test/java/com/databend/client/TestDiscoveryNodes.java b/databend-client/src/test/java/com/databend/client/TestDiscoveryNodes.java new file mode 100644 index 00000000..0eb8c00d --- /dev/null +++ b/databend-client/src/test/java/com/databend/client/TestDiscoveryNodes.java @@ -0,0 +1,46 @@ +package com.databend.client; + +import com.databend.client.errors.QueryErrors; +import com.fasterxml.jackson.core.JsonProcessingException; +import io.airlift.json.JsonCodecFactory; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.List; + +import static com.databend.client.JsonCodec.jsonCodec; + +public class TestDiscoveryNodes { + private static final JsonCodec QUERY_RESULTS_CODEC = jsonCodec(DiscoveryResponseCodec.DiscoveryResponse.class); + + @Test(groups = {"unit"}) + public void testDecodeValidJsonArray() throws JsonProcessingException { + String json = "[{\"address\":\"127.0.0.1:8003\"},{\"address\":\"127.0.0.1:8002\"},{\"address\":\"sfgsjsdhida:111\"}]"; + DiscoveryResponseCodec.DiscoveryResponse errorResponse = QUERY_RESULTS_CODEC.fromJson(json); + List nodesList = errorResponse.getNodes(); + Assert.assertEquals(nodesList.size(), 3); + Assert.assertEquals(nodesList.get(0).getAddress(), "127.0.0.1:8003"); + Assert.assertEquals(nodesList.get(1).getAddress(), "127.0.0.1:8002"); + Assert.assertEquals(nodesList.get(2).getAddress(), "sfgsjsdhida:111"); + } + + @Test(groups = {"unit"}) + public void testQueryError() throws JsonProcessingException { + String json = "{\"error\":{\"code\":404,\"message\":\"not found\"}}"; + DiscoveryResponseCodec.DiscoveryResponse errorResponse = QUERY_RESULTS_CODEC.fromJson(json); + QueryErrors error = errorResponse.getError(); + Assert.assertEquals(error.getMessage(), "not found"); + Assert.assertEquals(error.getCode(), 404); + } + + // test empty array + @Test(groups = {"unit"}) + public void testEmptyArray() throws JsonProcessingException { + String json = "[]"; + DiscoveryResponseCodec.DiscoveryResponse errorResponse = QUERY_RESULTS_CODEC.fromJson(json); + List nodesList = errorResponse.getNodes(); + Assert.assertEquals(nodesList.size(), 0); + } + + +} From 62fa5c6d4005f331a953caffc36824a213b71cb1 Mon Sep 17 00:00:00 2001 From: zhihanz Date: Mon, 2 Sep 2024 18:14:42 +0800 Subject: [PATCH 4/4] chore: update to minimum supported jdbc version --- .github/workflows/test_cluster.yml | 2 +- .gitignore | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test_cluster.yml b/.github/workflows/test_cluster.yml index e9e32d23..a9a348c4 100644 --- a/.github/workflows/test_cluster.yml +++ b/.github/workflows/test_cluster.yml @@ -31,7 +31,7 @@ jobs: - uses: ./.github/actions/setup_databend_cluster timeout-minutes: 15 with: - version: '1.2.616-nightly' + version: '1.2.629-nightly' target: 'x86_64-unknown-linux-gnu' - name: Run Maven clean deploy with release profile run: mvn test -DexcludedGroups=FLAKY diff --git a/.gitignore b/.gitignore index 518817bd..b510ae56 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ .idea/ databend-jdbc/databend-jdbc-debug.log target/ - +/databend/ .databend/