diff --git a/.github/actions/setup_databend_cluster/action.yml b/.github/actions/setup_databend_cluster/action.yml
index 3f770c88..cd89f93c 100644
--- a/.github/actions/setup_databend_cluster/action.yml
+++ b/.github/actions/setup_databend_cluster/action.yml
@@ -4,7 +4,7 @@ inputs:
version:
description: "query and meta service version"
required: true
- default: "1.2.616-nightly"
+ default: "1.2.629-nightly"
target:
description: ""
required: true
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index cf84009e..32e6c1ed 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
services:
databend:
- image: datafuselabs/databend
+ image: datafuselabs/databend:nightly
env:
QUERY_DEFAULT_USER: databend
QUERY_DEFAULT_PASSWORD: databend
diff --git a/.github/workflows/test_cluster.yml b/.github/workflows/test_cluster.yml
index e9e32d23..a9a348c4 100644
--- a/.github/workflows/test_cluster.yml
+++ b/.github/workflows/test_cluster.yml
@@ -31,7 +31,7 @@ jobs:
- uses: ./.github/actions/setup_databend_cluster
timeout-minutes: 15
with:
- version: '1.2.616-nightly'
+ version: '1.2.629-nightly'
target: 'x86_64-unknown-linux-gnu'
- name: Run Maven clean deploy with release profile
run: mvn test -DexcludedGroups=FLAKY
diff --git a/.gitignore b/.gitignore
index a3252e50..b510ae56 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,5 @@
.idea/
databend-jdbc/databend-jdbc-debug.log
target/
+/databend/
+.databend/
diff --git a/README.md b/README.md
index f3aa95d0..09cf2263 100644
--- a/README.md
+++ b/README.md
@@ -20,7 +20,7 @@ Add following code block as a dependency
com.databend
databend-jdbc
- 0.2.9
+ 0.3.0
```
diff --git a/databend-client/pom.xml b/databend-client/pom.xml
index 38d91ac4..6119dfff 100644
--- a/databend-client/pom.xml
+++ b/databend-client/pom.xml
@@ -6,12 +6,12 @@
com.databend
databend-base
- 0.2.9
+ 0.3.0
../pom.xml
com.databend
databend-client
- 0.2.9
+ 0.3.0
diff --git a/databend-client/src/main/java/com/databend/client/ClientSettings.java b/databend-client/src/main/java/com/databend/client/ClientSettings.java
index a510445c..2cdb82fa 100644
--- a/databend-client/src/main/java/com/databend/client/ClientSettings.java
+++ b/databend-client/src/main/java/com/databend/client/ClientSettings.java
@@ -16,8 +16,6 @@
import java.util.HashMap;
import java.util.Map;
-import java.util.Objects;
-import java.util.Random;
public class ClientSettings {
public static final Integer DEFAULT_QUERY_TIMEOUT = 300;
diff --git a/databend-client/src/main/java/com/databend/client/DatabendClient.java b/databend-client/src/main/java/com/databend/client/DatabendClient.java
index 6ace7bde..8e2c7f7d 100644
--- a/databend-client/src/main/java/com/databend/client/DatabendClient.java
+++ b/databend-client/src/main/java/com/databend/client/DatabendClient.java
@@ -21,13 +21,18 @@
public interface DatabendClient extends Closeable {
String getQuery();
+
@Override
void close();
DatabendSession getSession();
+
String getHost();
+
Map getAdditionalHeaders();
+
QueryResults getResults();
+
// execute Restful query request for the first time.
// @param request the request to be executed
// @return true if request finished with result
diff --git a/databend-client/src/main/java/com/databend/client/DatabendClientFactory.java b/databend-client/src/main/java/com/databend/client/DatabendClientFactory.java
index 6e7fa068..7750bc3b 100644
--- a/databend-client/src/main/java/com/databend/client/DatabendClientFactory.java
+++ b/databend-client/src/main/java/com/databend/client/DatabendClientFactory.java
@@ -15,6 +15,7 @@
package com.databend.client;
public final class DatabendClientFactory {
- private DatabendClientFactory() {}
+ private DatabendClientFactory() {
+ }
}
diff --git a/databend-client/src/main/java/com/databend/client/DatabendClientV1.java b/databend-client/src/main/java/com/databend/client/DatabendClientV1.java
index d0014a6c..11243876 100644
--- a/databend-client/src/main/java/com/databend/client/DatabendClientV1.java
+++ b/databend-client/src/main/java/com/databend/client/DatabendClientV1.java
@@ -15,25 +15,20 @@
package com.databend.client;
import com.databend.client.errors.CloudErrors;
-import okhttp3.Headers;
-import okhttp3.HttpUrl;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
+import okhttp3.*;
import okio.Buffer;
import javax.annotation.concurrent.ThreadSafe;
-
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.net.URI;
import java.time.Duration;
+import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Logger;
import java.util.function.Consumer;
+import java.util.logging.Logger;
import static com.databend.client.JsonCodec.jsonCodec;
import static com.google.common.base.MoreObjects.firstNonNull;
@@ -51,12 +46,14 @@ public class DatabendClientV1
firstNonNull(DatabendClientV1.class.getPackage().getImplementationVersion(), "jvm-unknown");
public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8");
public static final JsonCodec QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class);
+ public static final JsonCodec DISCOVERY_RESULT_CODEC = jsonCodec(DiscoveryResponseCodec.DiscoveryResponse.class);
public static final String succeededState = "succeeded";
public static final String failedState = "failed";
public static final String runningState = "running";
public static final String QUERY_PATH = "/v1/query";
+ public static final String DISCOVERY_PATH = "/v1/discovery_nodes";
private static final long MAX_MATERIALIZED_JSON_RESPONSE_SIZE = 128 * 1024;
private final OkHttpClient httpClient;
private final String query;
@@ -96,14 +93,23 @@ public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings sett
}
}
- public Request.Builder prepareRequest(HttpUrl url) {
+ public static List dicoverNodes(OkHttpClient httpClient, ClientSettings settings) {
+ requireNonNull(httpClient, "httpClient is null");
+ requireNonNull(settings, "settings is null");
+ requireNonNull(settings.getHost(), "settings.host is null");
+ Request request = buildDiscoveryRequest(settings);
+ DiscoveryResponseCodec.DiscoveryResponse response = getDiscoveryResponse(httpClient, request, OptionalLong.empty(), settings.getQueryTimeoutSecs());
+ return response.getNodes();
+ }
+
+ public static Request.Builder prepareRequest(HttpUrl url, Map additionalHeaders) {
Request.Builder builder = new Request.Builder()
.url(url)
.header("User-Agent", USER_AGENT_VALUE)
.header("Accept", "application/json")
.header("Content-Type", "application/json");
- if (this.getAdditionalHeaders() != null) {
- this.getAdditionalHeaders().forEach(builder::addHeader);
+ if (additionalHeaders != null) {
+ additionalHeaders.forEach(builder::addHeader);
}
return builder;
}
@@ -121,15 +127,90 @@ private Request buildQueryRequest(String query, ClientSettings settings) {
throw new IllegalArgumentException("Invalid request: " + req);
}
url = url.newBuilder().encodedPath(QUERY_PATH).build();
- Request.Builder builder = prepareRequest(url);
+ Request.Builder builder = prepareRequest(url, this.additonalHeaders);
return builder.post(okhttp3.RequestBody.create(MEDIA_TYPE_JSON, reqString)).build();
}
+ private static Request buildDiscoveryRequest(ClientSettings settings) {
+ HttpUrl url = HttpUrl.get(settings.getHost());
+ if (url == null) {
+ // TODO(zhihanz) use custom exception
+ throw new IllegalArgumentException("Invalid host: " + settings.getHost());
+ }
+ url = url.newBuilder().encodedPath(DISCOVERY_PATH).build();
+ Request.Builder builder = prepareRequest(url, settings.getAdditionalHeaders());
+ return builder.get().build();
+ }
+
@Override
public String getQuery() {
return query;
}
+ private static DiscoveryResponseCodec.DiscoveryResponse getDiscoveryResponse(OkHttpClient httpClient, Request request, OptionalLong materializedJsonSizeLimit, int requestTimeoutSecs) {
+ requireNonNull(request, "request is null");
+
+ long start = System.nanoTime();
+ int attempts = 0;
+ Exception lastException = null;
+
+ while (true) {
+ if (attempts > 0) {
+ Duration sinceStart = Duration.ofNanos(System.nanoTime() - start);
+ if (sinceStart.compareTo(Duration.ofSeconds(requestTimeoutSecs)) > 0) {
+ throw new RuntimeException(format("Error fetching discovery nodes (attempts: %s, duration: %s)", attempts, sinceStart.getSeconds()), lastException);
+ }
+
+ try {
+ MILLISECONDS.sleep(attempts * 100); // Exponential backoff
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while fetching discovery nodes", e);
+ }
+ }
+ attempts++;
+
+ JsonResponse response;
+ try {
+ response = JsonResponse.execute(
+ DISCOVERY_RESULT_CODEC,
+ httpClient,
+ request,
+ materializedJsonSizeLimit);
+ } catch (RuntimeException e) {
+ lastException = e;
+ if (e.getCause() instanceof ConnectException) {
+ // Retry on connection refused errors
+ continue;
+ }
+ throw new RuntimeException("Failed to fetch discovery nodes: " + e.getMessage(), e);
+ }
+
+ if (response.getStatusCode() == HTTP_OK && response.hasValue()) {
+ DiscoveryResponseCodec.DiscoveryResponse discoveryResponse = response.getValue();
+ if (discoveryResponse.getError() == null) {
+ return discoveryResponse; // Successful response
+ }
+ if (discoveryResponse.getError().notFound()) {
+ throw new UnsupportedOperationException("Discovery request feature not supported: " + discoveryResponse.getError());
+ }
+ throw new RuntimeException("Discovery request failed: " + discoveryResponse.getError());
+ }
+
+ // Handle other HTTP error codes and response body parsing for errors
+ if (response.getResponseBody().isPresent()) {
+ CloudErrors errors = CloudErrors.tryParse(response.getResponseBody().get());
+ if (errors != null && errors.tryGetErrorKind().canRetry()) {
+ continue;
+ }
+ }
+
+ if (response.getStatusCode() != 520) {
+ throw new RuntimeException("Discovery request failed with status code: " + response.getStatusCode());
+ }
+ }
+ }
+
private boolean executeInternal(Request request, OptionalLong materializedJsonSizeLimit) {
requireNonNull(request, "request is null");
long start = System.nanoTime();
@@ -219,7 +300,7 @@ private void processResponse(Headers headers, QueryResults results) {
if (results.getQueryId() != null && this.additonalHeaders.get(ClientSettings.X_Databend_Query_ID) == null) {
this.additonalHeaders.put(ClientSettings.X_Databend_Query_ID, results.getQueryId());
}
- if (headers != null && headers.get(ClientSettings.X_DATABEND_ROUTE_HINT) != null){
+ if (headers != null && headers.get(ClientSettings.X_DATABEND_ROUTE_HINT) != null) {
this.additonalHeaders.put(ClientSettings.X_DATABEND_ROUTE_HINT, headers.get(ClientSettings.X_DATABEND_ROUTE_HINT));
}
currentResults.set(results);
@@ -241,7 +322,7 @@ public boolean advance() {
String nextUriPath = this.currentResults.get().getNextUri().toString();
HttpUrl url = HttpUrl.get(this.host);
url = url.newBuilder().encodedPath(nextUriPath).build();
- Request.Builder builder = prepareRequest(url);
+ Request.Builder builder = prepareRequest(url, this.additonalHeaders);
Request request = builder.get().build();
return executeInternal(request, OptionalLong.of(MAX_MATERIALIZED_JSON_RESPONSE_SIZE));
}
@@ -291,7 +372,7 @@ private void closeQuery() {
String path = uri.toString();
HttpUrl url = HttpUrl.get(this.host);
url = url.newBuilder().encodedPath(path).build();
- Request r = prepareRequest(url).get().build();
+ Request r = prepareRequest(url, this.additonalHeaders).get().build();
try {
httpClient.newCall(r).execute().close();
} catch (IOException ignored) {
diff --git a/databend-client/src/main/java/com/databend/client/DatabendSession.java b/databend-client/src/main/java/com/databend/client/DatabendSession.java
index f195ab19..24178f4d 100644
--- a/databend-client/src/main/java/com/databend/client/DatabendSession.java
+++ b/databend-client/src/main/java/com/databend/client/DatabendSession.java
@@ -19,7 +19,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
diff --git a/databend-client/src/main/java/com/databend/client/DiscoveryNode.java b/databend-client/src/main/java/com/databend/client/DiscoveryNode.java
new file mode 100644
index 00000000..876a6e22
--- /dev/null
+++ b/databend-client/src/main/java/com/databend/client/DiscoveryNode.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.databend.client;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+public class DiscoveryNode {
+ private final String address;
+
+ @JsonCreator
+ public DiscoveryNode(
+ @JsonProperty("address") String address) {
+ this.address = address;
+ }
+
+ // add builder
+
+ @JsonProperty
+ public String getAddress() {
+ return address;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("address", address)
+ .toString();
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+ private String address;
+
+ public Builder setAddress(String address) {
+ this.address = address;
+ return this;
+ }
+
+ public DiscoveryNode build() {
+ return new DiscoveryNode(address);
+ }
+ }
+}
diff --git a/databend-client/src/main/java/com/databend/client/DiscoveryResponseCodec.java b/databend-client/src/main/java/com/databend/client/DiscoveryResponseCodec.java
new file mode 100644
index 00000000..74284882
--- /dev/null
+++ b/databend-client/src/main/java/com/databend/client/DiscoveryResponseCodec.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.databend.client;
+
+import com.databend.client.errors.QueryErrors;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DiscoveryResponseCodec {
+
+ public static class DiscoveryResponse {
+ private final List nodes;
+ private final QueryErrors error;
+
+ public DiscoveryResponse(List nodes, QueryErrors error) {
+ this.nodes = nodes;
+ this.error = error;
+ }
+
+ // Getters for nodes and error
+ public List getNodes() {
+ return nodes;
+ }
+
+ public QueryErrors getError() {
+ return error;
+ }
+ }
+
+
+ public static class DiscoveryResponseDeserializer extends StdDeserializer {
+
+ public DiscoveryResponseDeserializer() {
+ this(null);
+ }
+
+ public DiscoveryResponseDeserializer(Class> vc) {
+ super(vc);
+ }
+
+ @Override
+ public DiscoveryResponse deserialize(JsonParser jp, DeserializationContext ctxt)
+ throws IOException, JsonProcessingException {
+ ObjectMapper mapper = (ObjectMapper) jp.getCodec();
+ JsonNode rootNode = mapper.readTree(jp);
+
+ List nodes = new ArrayList<>();
+ QueryErrors error = null;
+
+ if (rootNode.has("error")) {
+ // Deserialize error
+ error = mapper.treeToValue(rootNode.get("error"), QueryErrors.class);
+ } else if (rootNode.isArray()) {
+ // Deserialize nodes
+ for (JsonNode element : rootNode) {
+ DiscoveryNode node = new DiscoveryNode(element.get("address").asText());
+ nodes.add(node);
+ }
+ } else {
+ throw new JsonProcessingException("Unrecognized JSON format") {
+ };
+ }
+
+ return new DiscoveryResponse(nodes, error);
+ }
+ }
+
+ // Method to deserialize from JSON string
+ public static T fromJson(String json, Class valueType) throws JsonProcessingException {
+ ObjectMapper objectMapper = new ObjectMapper();
+ SimpleModule module = new SimpleModule();
+ module.addDeserializer(DiscoveryResponse.class, new DiscoveryResponseDeserializer());
+ objectMapper.registerModule(module);
+ return objectMapper.readValue(json, valueType);
+ }
+}
diff --git a/databend-client/src/main/java/com/databend/client/JsonCodec.java b/databend-client/src/main/java/com/databend/client/JsonCodec.java
index 85e1c0ef..d4ba4e5c 100644
--- a/databend-client/src/main/java/com/databend/client/JsonCodec.java
+++ b/databend-client/src/main/java/com/databend/client/JsonCodec.java
@@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import java.io.IOException;
@@ -31,8 +32,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
-public class JsonCodec
-{
+public class JsonCodec {
// copy of https://github.com/airlift/airlift/blob/master/json/src/main/java/io/airlift/json/ObjectMapperProvider.java
static final Supplier OBJECT_MAPPER_SUPPLIER = () -> new ObjectMapper()
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
@@ -45,46 +45,43 @@ public class JsonCodec
.disable(MapperFeature.CAN_OVERRIDE_ACCESS_MODIFIERS)
.disable(MapperFeature.INFER_PROPERTY_MUTATORS)
.disable(MapperFeature.ALLOW_FINAL_FIELDS_AS_MUTATORS)
- .registerModule(new Jdk8Module());
+ .registerModule(new Jdk8Module()).registerModule(new SimpleModule().addDeserializer(
+ DiscoveryResponseCodec.DiscoveryResponse.class, new DiscoveryResponseCodec.DiscoveryResponseDeserializer()
+ ));
+
private final ObjectMapper mapper;
private final Type type;
private final JavaType javaType;
- private JsonCodec(ObjectMapper mapper, Type type)
- {
+
+ private JsonCodec(ObjectMapper mapper, Type type) {
this.mapper = requireNonNull(mapper, "mapper is null");
this.type = requireNonNull(type, "type is null");
this.javaType = mapper.getTypeFactory().constructType(type);
}
- public static JsonCodec jsonCodec(Class type)
- {
+ public static JsonCodec jsonCodec(Class type) {
return new JsonCodec<>(OBJECT_MAPPER_SUPPLIER.get(), type);
}
- public Type getType()
- {
+ public Type getType() {
return type;
}
public T fromJson(String json)
- throws JsonProcessingException
- {
+ throws JsonProcessingException {
try (JsonParser parser = mapper.createParser(json)) {
T value = mapper.readerFor(javaType).readValue(parser);
checkArgument(parser.nextToken() == null, "Found characters after the expected end of input");
return value;
- }
- catch (JsonProcessingException e) {
+ } catch (JsonProcessingException e) {
throw e;
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public T fromJson(InputStream inputStream)
- throws IOException, JsonProcessingException
- {
+ throws IOException, JsonProcessingException {
try (JsonParser parser = mapper.createParser(inputStream)) {
T value = mapper.readerFor(javaType).readValue(parser);
checkArgument(parser.nextToken() == null, "Found characters after the expected end of input");
diff --git a/databend-client/src/main/java/com/databend/client/JsonResponse.java b/databend-client/src/main/java/com/databend/client/JsonResponse.java
index 15c9590e..226fed69 100644
--- a/databend-client/src/main/java/com/databend/client/JsonResponse.java
+++ b/databend-client/src/main/java/com/databend/client/JsonResponse.java
@@ -15,19 +15,11 @@
package com.databend.client;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import okhttp3.Headers;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.Response;
-import okhttp3.ResponseBody;
+import okhttp3.*;
import javax.annotation.Nullable;
-
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.net.ConnectException;
import java.util.Optional;
import java.util.OptionalLong;
@@ -36,8 +28,7 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
-public final class JsonResponse
-{
+public final class JsonResponse {
private final int statusCode;
private final String statusMessage;
private final Headers headers;
@@ -47,8 +38,7 @@ public final class JsonResponse
private final T value;
private final IllegalArgumentException exception;
- private JsonResponse(int statusCode, String statusMessage, Headers headers, String responseBody)
- {
+ private JsonResponse(int statusCode, String statusMessage, Headers headers, String responseBody) {
this.statusCode = statusCode;
this.statusMessage = statusMessage;
this.headers = requireNonNull(headers, "headers is null");
@@ -59,8 +49,7 @@ private JsonResponse(int statusCode, String statusMessage, Headers headers, Stri
this.exception = null;
}
- private JsonResponse(int statusCode, String statusMessage, Headers headers, @Nullable String responseBody, @Nullable T value, @Nullable IllegalArgumentException exception)
- {
+ private JsonResponse(int statusCode, String statusMessage, Headers headers, @Nullable String responseBody, @Nullable T value, @Nullable IllegalArgumentException exception) {
this.statusCode = statusCode;
this.statusMessage = statusMessage;
this.headers = requireNonNull(headers, "headers is null");
@@ -70,8 +59,7 @@ private JsonResponse(int statusCode, String statusMessage, Headers headers, @Nul
this.hasValue = (exception == null);
}
- public static JsonResponse execute(JsonCodec codec, OkHttpClient client, Request request, OptionalLong materializedJsonSizeLimit) throws RuntimeException
- {
+ public static JsonResponse execute(JsonCodec codec, OkHttpClient client, Request request, OptionalLong materializedJsonSizeLimit) throws RuntimeException {
try (Response response = client.newCall(request).execute()) {
// TODO: fix in OkHttp: https://github.com/square/okhttp/issues/3111
if ((response.code() == 307) || (response.code() == 308)) {
@@ -92,19 +80,16 @@ public static JsonResponse execute(JsonCodec codec, OkHttpClient clien
// Parse from input stream, response is either of unknown size or too large to materialize. Raw response body
// will not be available if parsing fails
value = codec.fromJson(responseBody.byteStream());
- }
- else {
+ } else {
// parse from materialized response body string
body = responseBody.string();
value = codec.fromJson(body);
}
- }
- catch (JsonProcessingException e) {
+ } catch (JsonProcessingException e) {
String message;
if (body != null) {
message = format("Unable to create %s from JSON response:\n[%s]", codec.getType(), body);
- }
- else {
+ } else {
message = format("Unable to create %s from JSON response", codec.getType());
}
exception = new IllegalArgumentException(message, e);
@@ -113,59 +98,49 @@ public static JsonResponse execute(JsonCodec codec, OkHttpClient clien
return new JsonResponse<>(response.code(), response.message(), response.headers(), body, value, exception);
}
return new JsonResponse<>(response.code(), response.message(), response.headers(), responseBody.string());
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new UncheckedIOException(e);
}
}
- private static boolean isJson(MediaType type)
- {
+ private static boolean isJson(MediaType type) {
return (type != null) && "application".equals(type.type()) && "json".equals(type.subtype());
}
- public int getStatusCode()
- {
+ public int getStatusCode() {
return statusCode;
}
- public String getStatusMessage()
- {
+ public String getStatusMessage() {
return statusMessage;
}
- public Headers getHeaders()
- {
+ public Headers getHeaders() {
return headers;
}
- public boolean hasValue()
- {
+ public boolean hasValue() {
return hasValue;
}
- public T getValue()
- {
+ public T getValue() {
if (!hasValue) {
throw new IllegalStateException("Response does not contain a JSON value", exception);
}
return value;
}
- public Optional getResponseBody()
- {
+ public Optional getResponseBody() {
return Optional.ofNullable(responseBody);
}
@Nullable
- public IllegalArgumentException getException()
- {
+ public IllegalArgumentException getException() {
return exception;
}
@Override
- public String toString()
- {
+ public String toString() {
return toStringHelper(this)
.add("statusCode", statusCode)
.add("statusMessage", statusMessage)
diff --git a/databend-client/src/main/java/com/databend/client/OkHttpUtils.java b/databend-client/src/main/java/com/databend/client/OkHttpUtils.java
index 5debecfb..5ec2f63f 100644
--- a/databend-client/src/main/java/com/databend/client/OkHttpUtils.java
+++ b/databend-client/src/main/java/com/databend/client/OkHttpUtils.java
@@ -22,7 +22,6 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
-
import java.security.GeneralSecurityException;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
@@ -32,22 +31,19 @@
import static com.google.common.net.HttpHeaders.AUTHORIZATION;
import static java.util.Objects.requireNonNull;
-public final class OkHttpUtils
-{
- private OkHttpUtils() {}
+public final class OkHttpUtils {
+ private OkHttpUtils() {
+ }
- public static Interceptor userAgentInterceptor(String userAgent)
- {
+ public static Interceptor userAgentInterceptor(String userAgent) {
return chain -> chain.proceed(chain.request().newBuilder().header("User-Agent", userAgent).build());
}
- public static Interceptor basicAuthInterceptor(String username, String password)
- {
+ public static Interceptor basicAuthInterceptor(String username, String password) {
return chain -> chain.proceed(chain.request().newBuilder().header("Authorization", Credentials.basic(username, password)).build());
}
- public static Interceptor tokenAuth(String accessToken)
- {
+ public static Interceptor tokenAuth(String accessToken) {
requireNonNull(accessToken, "accessToken is null");
checkArgument(CharMatcher.inRange((char) 33, (char) 126).matchesAllOf(accessToken));
@@ -56,45 +52,38 @@ public static Interceptor tokenAuth(String accessToken)
.build());
}
- public static void setupTimeouts(OkHttpClient.Builder clientBuilder, int timeout, TimeUnit unit)
- {
+ public static void setupTimeouts(OkHttpClient.Builder clientBuilder, int timeout, TimeUnit unit) {
clientBuilder
.connectTimeout(timeout, unit)
.readTimeout(timeout, unit)
.writeTimeout(timeout, unit);
}
- public static void setupInsecureSsl(OkHttpClient.Builder clientBuilder)
- {
+ public static void setupInsecureSsl(OkHttpClient.Builder clientBuilder) {
try {
- X509TrustManager trustAllCerts = new X509TrustManager()
- {
+ X509TrustManager trustAllCerts = new X509TrustManager() {
@Override
- public void checkClientTrusted(X509Certificate[] chain, String authType)
- {
+ public void checkClientTrusted(X509Certificate[] chain, String authType) {
throw new UnsupportedOperationException("checkClientTrusted should not be called");
}
@Override
- public void checkServerTrusted(X509Certificate[] chain, String authType)
- {
+ public void checkServerTrusted(X509Certificate[] chain, String authType) {
// skip validation of server certificate
}
@Override
- public X509Certificate[] getAcceptedIssuers()
- {
+ public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
};
SSLContext sslContext = SSLContext.getInstance("SSL");
- sslContext.init(null, new TrustManager[] {trustAllCerts}, new SecureRandom());
+ sslContext.init(null, new TrustManager[]{trustAllCerts}, new SecureRandom());
clientBuilder.sslSocketFactory(sslContext.getSocketFactory(), trustAllCerts);
clientBuilder.hostnameVerifier((hostname, session) -> true);
- }
- catch (GeneralSecurityException e) {
+ } catch (GeneralSecurityException e) {
throw new RuntimeException("Error setting up SSL: " + e.getMessage(), e);
}
}
diff --git a/databend-client/src/main/java/com/databend/client/PaginationOptions.java b/databend-client/src/main/java/com/databend/client/PaginationOptions.java
index 4bbe72f7..d4e34a7a 100644
--- a/databend-client/src/main/java/com/databend/client/PaginationOptions.java
+++ b/databend-client/src/main/java/com/databend/client/PaginationOptions.java
@@ -28,6 +28,7 @@ public class PaginationOptions {
private final int waitTimeSecs;
private final int maxRowsInBuffer;
private final int maxRowsPerPage;
+
@JsonCreator
public PaginationOptions(
@JsonProperty("wait_time_secs") int waitTimeSecs
diff --git a/databend-client/src/main/java/com/databend/client/ParseJsonDataUtils.java b/databend-client/src/main/java/com/databend/client/ParseJsonDataUtils.java
index 63e91e11..898b6a41 100644
--- a/databend-client/src/main/java/com/databend/client/ParseJsonDataUtils.java
+++ b/databend-client/src/main/java/com/databend/client/ParseJsonDataUtils.java
@@ -24,18 +24,18 @@
import static java.util.Collections.unmodifiableList;
-final class ParseJsonDataUtils
-{
- private ParseJsonDataUtils() {}
+final class ParseJsonDataUtils {
+ private ParseJsonDataUtils() {
+ }
+
/**
* parseRawData is used to convert json data an immutable list of data
* input QuerySchema: contains column names and types
* input List> : a list of rows parsed from QueryResponse
* output Iterable> : convert the input rows into DatabendType and return an immutable list
*/
- public static List> parseRawData(List schema, List> data)
- {
- if (data == null || schema == null ) {
+ public static List> parseRawData(List schema, List> data) {
+ if (data == null || schema == null) {
return null;
}
ColumnTypeHandler[] typeHandlers = createTypeHandlers(schema);
diff --git a/databend-client/src/main/java/com/databend/client/QueryAffect.java b/databend-client/src/main/java/com/databend/client/QueryAffect.java
index dc5f16f5..740f699a 100644
--- a/databend-client/src/main/java/com/databend/client/QueryAffect.java
+++ b/databend-client/src/main/java/com/databend/client/QueryAffect.java
@@ -115,6 +115,7 @@ public ChangeSettings(
this.values = values;
this.isGlobals = isGlobals;
}
+
@JsonProperty
public List getKeys() {
return keys;
diff --git a/databend-client/src/main/java/com/databend/client/QueryStats.java b/databend-client/src/main/java/com/databend/client/QueryStats.java
index 855e906a..95bb1172 100644
--- a/databend-client/src/main/java/com/databend/client/QueryStats.java
+++ b/databend-client/src/main/java/com/databend/client/QueryStats.java
@@ -30,6 +30,7 @@ public class QueryStats {
private final QueryProgress scanProgress;
private final QueryProgress writeProgress;
private final QueryProgress resultProgress;
+
@JsonCreator
public QueryStats(
@JsonProperty("running_time_ms") float runningTimeMS,
diff --git a/databend-client/src/main/java/com/databend/client/data/ColumnTypeHandler.java b/databend-client/src/main/java/com/databend/client/data/ColumnTypeHandler.java
index 16f5205a..325be59d 100644
--- a/databend-client/src/main/java/com/databend/client/data/ColumnTypeHandler.java
+++ b/databend-client/src/main/java/com/databend/client/data/ColumnTypeHandler.java
@@ -14,10 +14,10 @@
package com.databend.client.data;
-public interface ColumnTypeHandler
-{
+public interface ColumnTypeHandler {
/**
* Convert the input object to the corresponding Java Type
+ *
* @param value raw input row value
* @return parsed java type value
*/
diff --git a/databend-client/src/main/java/com/databend/client/data/ColumnTypeHandlerFactory.java b/databend-client/src/main/java/com/databend/client/data/ColumnTypeHandlerFactory.java
index b97c8aef..994046c5 100644
--- a/databend-client/src/main/java/com/databend/client/data/ColumnTypeHandlerFactory.java
+++ b/databend-client/src/main/java/com/databend/client/data/ColumnTypeHandlerFactory.java
@@ -16,8 +16,7 @@
import java.util.Locale;
-public class ColumnTypeHandlerFactory
-{
+public class ColumnTypeHandlerFactory {
public static ColumnTypeHandler getTypeHandler(DatabendRawType type) {
if (type == null) {
return null;
diff --git a/databend-client/src/main/java/com/databend/client/data/DatabendRawType.java b/databend-client/src/main/java/com/databend/client/data/DatabendRawType.java
index 23c40357..d02cbf60 100644
--- a/databend-client/src/main/java/com/databend/client/data/DatabendRawType.java
+++ b/databend-client/src/main/java/com/databend/client/data/DatabendRawType.java
@@ -81,7 +81,7 @@ public DatabendRawType(String type) {
.map(DatabendRawType::new)
.collect(Collectors.toList());
this.columnSize = subType.size();
- } else if (dataType == DatabendDataType.MAP) {
+ } else if (dataType == DatabendDataType.MAP) {
// remove "Map(" and last ")"
String subTypes = this.type.substring(4, this.type.length() - 1);
// split by ","
diff --git a/databend-client/src/main/java/com/databend/client/data/StringHandler.java b/databend-client/src/main/java/com/databend/client/data/StringHandler.java
index 061fc22f..df4189c9 100644
--- a/databend-client/src/main/java/com/databend/client/data/StringHandler.java
+++ b/databend-client/src/main/java/com/databend/client/data/StringHandler.java
@@ -14,26 +14,23 @@
package com.databend.client.data;
-class StringHandler implements ColumnTypeHandler
-{
+class StringHandler implements ColumnTypeHandler {
private final boolean isNullable;
+
public StringHandler() {
this.isNullable = false;
}
- public StringHandler(boolean isNullable)
- {
+ public StringHandler(boolean isNullable) {
this.isNullable = isNullable;
}
@Override
- public Object parseValue(Object value)
- {
+ public Object parseValue(Object value) {
if (value == null) {
if (isNullable) {
return null;
- }
- else {
+ } else {
throw new IllegalArgumentException("String type is not nullable");
}
}
@@ -44,8 +41,7 @@ public Object parseValue(Object value)
}
@Override
- public void setNullable(boolean isNullable)
- {
+ public void setNullable(boolean isNullable) {
// do nothing
}
}
diff --git a/databend-client/src/main/java/com/databend/client/errors/CloudErrorKinds.java b/databend-client/src/main/java/com/databend/client/errors/CloudErrorKinds.java
index a9ddd3d6..a82cd632 100644
--- a/databend-client/src/main/java/com/databend/client/errors/CloudErrorKinds.java
+++ b/databend-client/src/main/java/com/databend/client/errors/CloudErrorKinds.java
@@ -17,8 +17,7 @@
import static com.google.common.base.MoreObjects.toStringHelper;
// CloudErrorKinds is a list of error kinds that can be returned by the databend cloud service.
-public enum CloudErrorKinds
-{
+public enum CloudErrorKinds {
TENANT_NOT_FOUND("TenantNotFound", "Tenant not found, please check your tenant id", false),
TENANT_BOOTSTRAP_FAILED("TenantBootstrapFailed", "tenant bootstrap failed", false),
WAREHOUSE_NOT_FOUND("WarehouseNotFound", "warehouse not found", false),
@@ -41,9 +40,9 @@ public enum CloudErrorKinds
HEALTH_CHECK_FAILED("HealthCheckFailed", "health check failed", false),
BAD_TENANT("BadTenant", "bad tenant", false);
- private final String kind;
- private final String description;
- private final boolean canRetry;
+ private final String kind;
+ private final String description;
+ private final boolean canRetry;
CloudErrorKinds(String kind, String description, boolean canRetry) {
this.kind = kind;
diff --git a/databend-client/src/main/java/com/databend/client/errors/CloudErrors.java b/databend-client/src/main/java/com/databend/client/errors/CloudErrors.java
index 5d5fbca1..97d621ed 100644
--- a/databend-client/src/main/java/com/databend/client/errors/CloudErrors.java
+++ b/databend-client/src/main/java/com/databend/client/errors/CloudErrors.java
@@ -21,25 +21,22 @@
import static com.google.common.base.MoreObjects.toStringHelper;
/**
- * CloudErrors is a list of errors that can be returned by the databend cloud service.
+ * CloudErrors is a list of errors that can be returned by the databend cloud service.
*/
-public class CloudErrors
-{
+public class CloudErrors {
private final String kind;
private final String message;
@JsonCreator
public CloudErrors(
@JsonProperty("kind") String kind,
- @JsonProperty("message") String message)
- {
+ @JsonProperty("message") String message) {
this.kind = kind;
this.message = message;
}
// return null if parse failed
- public static CloudErrors tryParse(String json)
- {
+ public static CloudErrors tryParse(String json) {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(json, CloudErrors.class);
diff --git a/databend-client/src/main/java/com/databend/client/errors/QueryErrors.java b/databend-client/src/main/java/com/databend/client/errors/QueryErrors.java
index 8721a227..1e1e0e51 100644
--- a/databend-client/src/main/java/com/databend/client/errors/QueryErrors.java
+++ b/databend-client/src/main/java/com/databend/client/errors/QueryErrors.java
@@ -22,16 +22,14 @@
/**
* QueryErrors represent errors from databend server
*/
-public class QueryErrors
-{
+public class QueryErrors {
private final int code;
private final String message;
@JsonCreator
public QueryErrors(
@JsonProperty("code") int code,
- @JsonProperty("message") String message)
- {
+ @JsonProperty("message") String message) {
this.code = code;
this.message = message;
}
@@ -59,6 +57,10 @@ public String toString() {
.toString();
}
+ public boolean notFound() {
+ return code == 404;
+ }
+
public static final class Builder {
private int code;
private String message;
diff --git a/databend-client/src/test/java/com/databend/client/TestClientIT.java b/databend-client/src/test/java/com/databend/client/TestClientIT.java
index bf3f8c9f..3214ab9a 100644
--- a/databend-client/src/test/java/com/databend/client/TestClientIT.java
+++ b/databend-client/src/test/java/com/databend/client/TestClientIT.java
@@ -18,8 +18,6 @@
import org.testng.Assert;
import org.testng.annotations.Test;
-import java.io.IOException;
-import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.util.HashMap;
import java.util.List;
@@ -96,4 +94,19 @@ public void testBasicQueryIDHeader() {
Assert.assertEquals(cli1.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID1);
}
+ @Test(groups = {"it"})
+ public void testDiscoverNodes() {
+ OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build();
+ String expectedUUID = UUID.randomUUID().toString();
+
+ Map additionalHeaders = new HashMap<>();
+ additionalHeaders.put(X_Databend_Query_ID, expectedUUID);
+ ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS);
+ List nodes = DatabendClientV1.dicoverNodes(client, settings);
+ Assert.assertFalse(nodes.isEmpty());
+ for (DiscoveryNode node : nodes) {
+ System.out.println(node.getAddress());
+ }
+ }
+
}
diff --git a/databend-client/src/test/java/com/databend/client/TestDiscoveryNodes.java b/databend-client/src/test/java/com/databend/client/TestDiscoveryNodes.java
new file mode 100644
index 00000000..0eb8c00d
--- /dev/null
+++ b/databend-client/src/test/java/com/databend/client/TestDiscoveryNodes.java
@@ -0,0 +1,46 @@
+package com.databend.client;
+
+import com.databend.client.errors.QueryErrors;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import io.airlift.json.JsonCodecFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+import static com.databend.client.JsonCodec.jsonCodec;
+
+public class TestDiscoveryNodes {
+ private static final JsonCodec QUERY_RESULTS_CODEC = jsonCodec(DiscoveryResponseCodec.DiscoveryResponse.class);
+
+ @Test(groups = {"unit"})
+ public void testDecodeValidJsonArray() throws JsonProcessingException {
+ String json = "[{\"address\":\"127.0.0.1:8003\"},{\"address\":\"127.0.0.1:8002\"},{\"address\":\"sfgsjsdhida:111\"}]";
+ DiscoveryResponseCodec.DiscoveryResponse errorResponse = QUERY_RESULTS_CODEC.fromJson(json);
+ List nodesList = errorResponse.getNodes();
+ Assert.assertEquals(nodesList.size(), 3);
+ Assert.assertEquals(nodesList.get(0).getAddress(), "127.0.0.1:8003");
+ Assert.assertEquals(nodesList.get(1).getAddress(), "127.0.0.1:8002");
+ Assert.assertEquals(nodesList.get(2).getAddress(), "sfgsjsdhida:111");
+ }
+
+ @Test(groups = {"unit"})
+ public void testQueryError() throws JsonProcessingException {
+ String json = "{\"error\":{\"code\":404,\"message\":\"not found\"}}";
+ DiscoveryResponseCodec.DiscoveryResponse errorResponse = QUERY_RESULTS_CODEC.fromJson(json);
+ QueryErrors error = errorResponse.getError();
+ Assert.assertEquals(error.getMessage(), "not found");
+ Assert.assertEquals(error.getCode(), 404);
+ }
+
+ // test empty array
+ @Test(groups = {"unit"})
+ public void testEmptyArray() throws JsonProcessingException {
+ String json = "[]";
+ DiscoveryResponseCodec.DiscoveryResponse errorResponse = QUERY_RESULTS_CODEC.fromJson(json);
+ List nodesList = errorResponse.getNodes();
+ Assert.assertEquals(nodesList.size(), 0);
+ }
+
+
+}
diff --git a/databend-client/src/test/java/com/databend/client/TestQueryAffect.java b/databend-client/src/test/java/com/databend/client/TestQueryAffect.java
index dd6d62ce..dc2a1bc9 100644
--- a/databend-client/src/test/java/com/databend/client/TestQueryAffect.java
+++ b/databend-client/src/test/java/com/databend/client/TestQueryAffect.java
@@ -14,18 +14,19 @@
package com.databend.client;
-import io.airlift.json.JsonCodec;
+import com.fasterxml.jackson.core.JsonProcessingException;
import org.testng.Assert;
import org.testng.annotations.Test;
-import static io.airlift.json.JsonCodec.jsonCodec;
+import static com.databend.client.JsonCodec.jsonCodec;
+
@Test(timeOut = 10000)
public class TestQueryAffect {
private static final JsonCodec QUERY_AFFECT_JSON_CODEC = jsonCodec(QueryAffect.class);
@Test( groups = {"unit"} )
- public void testQueryAffectUseDB() {
+ public void testQueryAffectUseDB() throws JsonProcessingException {
String json = "{\"type\":\"UseDB\",\"name\":\"db1\"}";
QueryAffect clause = QUERY_AFFECT_JSON_CODEC.fromJson(json);
@@ -36,7 +37,7 @@ public void testQueryAffectUseDB() {
}
@Test( groups = {"unit"} )
- public void testQueryAffectChangeSettings() {
+ public void testQueryAffectChangeSettings() throws JsonProcessingException {
String json = "{\"type\":\"ChangeSettings\",\"keys\":[\"max_threads\"],\"values\":[\"1\"],\"is_globals\":[false]}";
QueryAffect clause = QUERY_AFFECT_JSON_CODEC.fromJson(json);
diff --git a/databend-jdbc/pom.xml b/databend-jdbc/pom.xml
index f0362f4b..17d8b8bd 100644
--- a/databend-jdbc/pom.xml
+++ b/databend-jdbc/pom.xml
@@ -6,12 +6,12 @@
com.databend
databend-base
- 0.2.9
+ 0.3.0
../pom.xml
com.databend
databend-jdbc
- 0.2.9
+ 0.3.0
@@ -24,7 +24,7 @@
com.databend
databend-client
- 0.2.9
+ 0.3.0
com.squareup.okhttp3
diff --git a/pom.xml b/pom.xml
index d4d6a33e..5ce8bf19 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,7 +9,7 @@
com.databend
databend-base
- 0.2.9
+ 0.3.0
databend-base
Databend
pom