Skip to content

Commit

Permalink
feat: add discovery api on databend client
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhiHanZ committed Sep 2, 2024
1 parent f2c3dd3 commit 4593347
Show file tree
Hide file tree
Showing 24 changed files with 206 additions and 149 deletions.
2 changes: 1 addition & 1 deletion .github/actions/setup_databend_cluster/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ inputs:
version:
description: "query and meta service version"
required: true
default: "1.2.616-nightly"
default: "1.2.629-nightly"
target:
description: ""
required: true
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
services:
databend:
image: datafuselabs/databend
image: datafuselabs/databend:nightly
env:
QUERY_DEFAULT_USER: databend
QUERY_DEFAULT_PASSWORD: databend
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.idea/
databend-jdbc/databend-jdbc-debug.log
target/
databend/
.databend/
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Random;

public class ClientSettings {
public static final Integer DEFAULT_QUERY_TIMEOUT = 300;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@

public interface DatabendClient extends Closeable {
String getQuery();

@Override
void close();

DatabendSession getSession();

String getHost();

Map<String, String> getAdditionalHeaders();

QueryResults getResults();

// execute Restful query request for the first time.
// @param request the request to be executed
// @return true if request finished with result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.databend.client;

public final class DatabendClientFactory {
private DatabendClientFactory() {}
private DatabendClientFactory() {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,20 @@
package com.databend.client;

import com.databend.client.errors.CloudErrors;
import okhttp3.Headers;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.*;
import okio.Buffer;

import javax.annotation.concurrent.ThreadSafe;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import java.util.function.Consumer;
import java.util.logging.Logger;

import static com.databend.client.JsonCodec.jsonCodec;
import static com.google.common.base.MoreObjects.firstNonNull;
Expand All @@ -51,12 +46,14 @@ public class DatabendClientV1
firstNonNull(DatabendClientV1.class.getPackage().getImplementationVersion(), "jvm-unknown");
public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8");
public static final JsonCodec<QueryResults> QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class);
public static final JsonCodec<DiscoveryResponseCodec.DiscoveryResponse> DISCOVERY_RESULT_CODEC = jsonCodec(DiscoveryResponseCodec.DiscoveryResponse.class);
public static final String succeededState = "succeeded";
public static final String failedState = "failed";
public static final String runningState = "running";


public static final String QUERY_PATH = "/v1/query";
public static final String DISCOVERY_PATH = "/v1/discovery_nodes";
private static final long MAX_MATERIALIZED_JSON_RESPONSE_SIZE = 128 * 1024;
private final OkHttpClient httpClient;
private final String query;
Expand Down Expand Up @@ -96,14 +93,23 @@ public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings sett
}
}

public Request.Builder prepareRequest(HttpUrl url) {
public static List<DiscoveryNode> dicoverNodes(OkHttpClient httpClient, ClientSettings settings) {
requireNonNull(httpClient, "httpClient is null");
requireNonNull(settings, "settings is null");
requireNonNull(settings.getHost(), "settings.host is null");
Request request = buildDiscoveryRequest(settings);
DiscoveryResponseCodec.DiscoveryResponse response = getDiscoveryResponse(httpClient, request, OptionalLong.empty(), settings.getQueryTimeoutSecs());
return response.getNodes();
}

public static Request.Builder prepareRequest(HttpUrl url, Map<String, String> additionalHeaders) {
Request.Builder builder = new Request.Builder()
.url(url)
.header("User-Agent", USER_AGENT_VALUE)
.header("Accept", "application/json")
.header("Content-Type", "application/json");
if (this.getAdditionalHeaders() != null) {
this.getAdditionalHeaders().forEach(builder::addHeader);
if (additionalHeaders != null) {
additionalHeaders.forEach(builder::addHeader);
}
return builder;
}
Expand All @@ -121,15 +127,90 @@ private Request buildQueryRequest(String query, ClientSettings settings) {
throw new IllegalArgumentException("Invalid request: " + req);
}
url = url.newBuilder().encodedPath(QUERY_PATH).build();
Request.Builder builder = prepareRequest(url);
Request.Builder builder = prepareRequest(url, this.additonalHeaders);
return builder.post(okhttp3.RequestBody.create(MEDIA_TYPE_JSON, reqString)).build();
}

private static Request buildDiscoveryRequest(ClientSettings settings) {
HttpUrl url = HttpUrl.get(settings.getHost());
if (url == null) {
// TODO(zhihanz) use custom exception
throw new IllegalArgumentException("Invalid host: " + settings.getHost());
}
url = url.newBuilder().encodedPath(DISCOVERY_PATH).build();
Request.Builder builder = prepareRequest(url, settings.getAdditionalHeaders());
return builder.get().build();
}

@Override
public String getQuery() {
return query;
}

private static DiscoveryResponseCodec.DiscoveryResponse getDiscoveryResponse(OkHttpClient httpClient, Request request, OptionalLong materializedJsonSizeLimit, int requestTimeoutSecs) {
requireNonNull(request, "request is null");

long start = System.nanoTime();
int attempts = 0;
Exception lastException = null;

while (true) {
if (attempts > 0) {
Duration sinceStart = Duration.ofNanos(System.nanoTime() - start);
if (sinceStart.compareTo(Duration.ofSeconds(requestTimeoutSecs)) > 0) {
throw new RuntimeException(format("Error fetching discovery nodes (attempts: %s, duration: %s)", attempts, sinceStart.getSeconds()), lastException);
}

try {
MILLISECONDS.sleep(attempts * 100); // Exponential backoff
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while fetching discovery nodes", e);
}
}
attempts++;

JsonResponse<DiscoveryResponseCodec.DiscoveryResponse> response;
try {
response = JsonResponse.execute(
DISCOVERY_RESULT_CODEC,
httpClient,
request,
materializedJsonSizeLimit);
} catch (RuntimeException e) {
lastException = e;
if (e.getCause() instanceof ConnectException) {
// Retry on connection refused errors
continue;
}
throw new RuntimeException("Failed to fetch discovery nodes: " + e.getMessage(), e);
}

if (response.getStatusCode() == HTTP_OK && response.hasValue()) {
DiscoveryResponseCodec.DiscoveryResponse discoveryResponse = response.getValue();
if (discoveryResponse.getError() == null) {
return discoveryResponse; // Successful response
}
if (discoveryResponse.getError().notFound()) {
throw new UnsupportedOperationException("Discovery request feature not supported: " + discoveryResponse.getError());
}
throw new RuntimeException("Discovery request failed: " + discoveryResponse.getError());
}

// Handle other HTTP error codes and response body parsing for errors
if (response.getResponseBody().isPresent()) {
CloudErrors errors = CloudErrors.tryParse(response.getResponseBody().get());
if (errors != null && errors.tryGetErrorKind().canRetry()) {
continue;
}
}

if (response.getStatusCode() != 520) {
throw new RuntimeException("Discovery request failed with status code: " + response.getStatusCode());
}
}
}

private boolean executeInternal(Request request, OptionalLong materializedJsonSizeLimit) {
requireNonNull(request, "request is null");
long start = System.nanoTime();
Expand Down Expand Up @@ -219,7 +300,7 @@ private void processResponse(Headers headers, QueryResults results) {
if (results.getQueryId() != null && this.additonalHeaders.get(ClientSettings.X_Databend_Query_ID) == null) {
this.additonalHeaders.put(ClientSettings.X_Databend_Query_ID, results.getQueryId());
}
if (headers != null && headers.get(ClientSettings.X_DATABEND_ROUTE_HINT) != null){
if (headers != null && headers.get(ClientSettings.X_DATABEND_ROUTE_HINT) != null) {
this.additonalHeaders.put(ClientSettings.X_DATABEND_ROUTE_HINT, headers.get(ClientSettings.X_DATABEND_ROUTE_HINT));
}
currentResults.set(results);
Expand All @@ -241,7 +322,7 @@ public boolean advance() {
String nextUriPath = this.currentResults.get().getNextUri().toString();
HttpUrl url = HttpUrl.get(this.host);
url = url.newBuilder().encodedPath(nextUriPath).build();
Request.Builder builder = prepareRequest(url);
Request.Builder builder = prepareRequest(url, this.additonalHeaders);
Request request = builder.get().build();
return executeInternal(request, OptionalLong.of(MAX_MATERIALIZED_JSON_RESPONSE_SIZE));
}
Expand Down Expand Up @@ -291,7 +372,7 @@ private void closeQuery() {
String path = uri.toString();
HttpUrl url = HttpUrl.get(this.host);
url = url.newBuilder().encodedPath(path).build();
Request r = prepareRequest(url).get().build();
Request r = prepareRequest(url, this.additonalHeaders).get().build();
try {
httpClient.newCall(r).execute().close();
} catch (IOException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down
31 changes: 14 additions & 17 deletions databend-client/src/main/java/com/databend/client/JsonCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;

import java.io.IOException;
Expand All @@ -31,8 +32,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class JsonCodec<T>
{
public class JsonCodec<T> {
// copy of https://github.com/airlift/airlift/blob/master/json/src/main/java/io/airlift/json/ObjectMapperProvider.java
static final Supplier<ObjectMapper> OBJECT_MAPPER_SUPPLIER = () -> new ObjectMapper()
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
Expand All @@ -45,46 +45,43 @@ public class JsonCodec<T>
.disable(MapperFeature.CAN_OVERRIDE_ACCESS_MODIFIERS)
.disable(MapperFeature.INFER_PROPERTY_MUTATORS)
.disable(MapperFeature.ALLOW_FINAL_FIELDS_AS_MUTATORS)
.registerModule(new Jdk8Module());
.registerModule(new Jdk8Module()).registerModule(new SimpleModule().addDeserializer(
DiscoveryResponseCodec.DiscoveryResponse.class, new DiscoveryResponseCodec.DiscoveryResponseDeserializer()
));

private final ObjectMapper mapper;
private final Type type;
private final JavaType javaType;
private JsonCodec(ObjectMapper mapper, Type type)
{

private JsonCodec(ObjectMapper mapper, Type type) {
this.mapper = requireNonNull(mapper, "mapper is null");
this.type = requireNonNull(type, "type is null");
this.javaType = mapper.getTypeFactory().constructType(type);
}

public static <T> JsonCodec<T> jsonCodec(Class<T> type)
{
public static <T> JsonCodec<T> jsonCodec(Class<T> type) {
return new JsonCodec<>(OBJECT_MAPPER_SUPPLIER.get(), type);
}

public Type getType()
{
public Type getType() {
return type;
}

public T fromJson(String json)
throws JsonProcessingException
{
throws JsonProcessingException {
try (JsonParser parser = mapper.createParser(json)) {
T value = mapper.readerFor(javaType).readValue(parser);
checkArgument(parser.nextToken() == null, "Found characters after the expected end of input");
return value;
}
catch (JsonProcessingException e) {
} catch (JsonProcessingException e) {
throw e;
}
catch (IOException e) {
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public T fromJson(InputStream inputStream)
throws IOException, JsonProcessingException
{
throws IOException, JsonProcessingException {
try (JsonParser parser = mapper.createParser(inputStream)) {
T value = mapper.readerFor(javaType).readValue(parser);
checkArgument(parser.nextToken() == null, "Found characters after the expected end of input");
Expand Down
Loading

0 comments on commit 4593347

Please sign in to comment.