Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move/rename some class #287

Merged
merged 6 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/setup_databend_cluster/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ inputs:
version:
description: "query and meta service version"
required: true
default: "1.2.661-nightly"
default: "1.2.663-nightly"
target:
description: ""
required: true
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- uses: ./.github/actions/setup_databend_cluster
timeout-minutes: 15
with:
version: '1.2.661-nightly'
version: '1.2.663-nightly'
target: 'x86_64-unknown-linux-gnu'

- name: Test with conn to node 1
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,17 @@
import java.io.Closeable;
import java.util.Map;

public interface DatabendClient extends Closeable {
public interface DatabendQueryResult extends Closeable {
String getQuery();

@Override
void close();

DatabendSession getSession();

String getHost();

Map<String, String> getAdditionalHeaders();

QueryResults getResults();
QueryResponse getResults();

// execute Restful query request for the first time.
// @param request the request to be executed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package com.databend.client;

import com.databend.client.errors.CloudErrors;
import com.databend.client.utils.JsonCodec;
import com.databend.client.utils.JsonResponse;
import okhttp3.*;
import okio.Buffer;

Expand All @@ -30,7 +32,7 @@
import java.util.function.Consumer;
import java.util.logging.Logger;

import static com.databend.client.JsonCodec.jsonCodec;
import static com.databend.client.utils.JsonCodec.jsonCodec;
import static com.google.common.base.MoreObjects.firstNonNull;
import static java.lang.String.format;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
Expand All @@ -39,14 +41,14 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;

@ThreadSafe
public class DatabendClientV1
implements DatabendClient {
public class DatabendQueryResultV1
implements DatabendQueryResult {
private final AtomicReference<Boolean> finished = new AtomicReference<>(false);
public static final String USER_AGENT_VALUE = DatabendClientV1.class.getSimpleName() +
public static final String USER_AGENT_VALUE = DatabendQueryResultV1.class.getSimpleName() +
"/" +
firstNonNull(DatabendClientV1.class.getPackage().getImplementationVersion(), "jvm-unknown");
firstNonNull(DatabendQueryResultV1.class.getPackage().getImplementationVersion(), "jvm-unknown");
public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8");
public static final JsonCodec<QueryResults> QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class);
public static final JsonCodec<QueryResponse> QUERY_RESULTS_CODEC = jsonCodec(QueryResponse.class);
public static final JsonCodec<DiscoveryResponseCodec.DiscoveryResponse> DISCOVERY_RESULT_CODEC = jsonCodec(DiscoveryResponseCodec.DiscoveryResponse.class);
public static final String succeededState = "succeeded";
public static final String failedState = "failed";
Expand All @@ -68,12 +70,12 @@ public class DatabendClientV1
// client session
private final AtomicReference<DatabendSession> databendSession;
private String nodeID;
private final AtomicReference<QueryResults> currentResults = new AtomicReference<>(null);
private static final Logger logger = Logger.getLogger(DatabendClientV1.class.getPackage().getName());
private final AtomicReference<QueryResponse> currentResults = new AtomicReference<>(null);
private static final Logger logger = Logger.getLogger(DatabendQueryResultV1.class.getPackage().getName());

private Consumer<DatabendSession> on_session_state_update;

public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings, Consumer<DatabendSession> on_session_state_update, AtomicReference<String> last_node_id) {
public DatabendQueryResultV1(OkHttpClient httpClient, String sql, ClientSettings settings, Consumer<DatabendSession> on_session_state_update, AtomicReference<String> last_node_id) {
requireNonNull(httpClient, "httpClient is null");
requireNonNull(sql, "sql is null");
requireNonNull(settings, "settings is null");
Expand Down Expand Up @@ -252,7 +254,7 @@ private boolean executeInternal(Request request, OptionalLong materializedJsonSi
}
}
attempts++;
JsonResponse<QueryResults> response;
JsonResponse<QueryResponse> response;
try {
response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request, materializedJsonSizeLimit);
} catch (RuntimeException e) {
Expand Down Expand Up @@ -306,7 +308,7 @@ public boolean execute(Request request) {
return executeInternal(request, OptionalLong.empty());
}

private void processResponse(Headers headers, QueryResults results) {
private void processResponse(Headers headers, QueryResponse results) {
nodeID = results.getNodeId();
DatabendSession session = results.getSession();
if (session != null) {
Expand Down Expand Up @@ -357,7 +359,7 @@ public Map<String, String> getAdditionalHeaders() {
}

@Override
public QueryResults getResults() {
public QueryResponse getResults() {
return currentResults.get();
}

Expand All @@ -366,11 +368,6 @@ public DatabendSession getSession() {
return databendSession.get();
}

@Override
public String getHost() {
return this.host;
}

@Override
public void close() {
closeQuery();
Expand All @@ -380,7 +377,7 @@ private void closeQuery() {
if (!finished.compareAndSet(false, true)) {
return;
}
QueryResults q = this.currentResults.get();
QueryResponse q = this.currentResults.get();
if (q == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import static com.google.common.base.MoreObjects.toStringHelper;

public class QueryResults {
public class QueryResponse {
private final String queryId;
private final String nodeId;
private final String sessionId;
Expand All @@ -41,7 +41,7 @@ public class QueryResults {
private final URI killUri;

@JsonCreator
public QueryResults(
public QueryResponse(
@JsonProperty("id") String queryId,
@JsonProperty("node_id") String nodeId,
@JsonProperty("session_id") String sessionId,
Expand Down
41 changes: 0 additions & 41 deletions databend-client/src/main/java/com/databend/client/QuerySchema.java

This file was deleted.

35 changes: 0 additions & 35 deletions databend-client/src/main/java/com/databend/client/ServerInfo.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
* limitations under the License.
*/

package com.databend.client;
package com.databend.client.utils;

import com.databend.client.DiscoveryResponseCodec;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* limitations under the License.
*/

package com.databend.client;
package com.databend.client.utils;

import com.fasterxml.jackson.core.JsonProcessingException;
import okhttp3.*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* limitations under the License.
*/

package com.databend.client;
package com.databend.client.utils;

import com.google.common.base.CharMatcher;
import okhttp3.Credentials;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.databend.client;

import com.databend.client.utils.OkHttpUtils;
import okhttp3.OkHttpClient;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -41,7 +42,7 @@ public void testBasicQueryPagination() {

ClientSettings settings = new ClientSettings(DATABEND_HOST);
AtomicReference<String> lastNodeID = new AtomicReference<>();
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID);
DatabendQueryResult cli = new DatabendQueryResultV1(client, "select 1", settings, null, lastNodeID);
System.out.println(cli.getResults().getData());
Assert.assertEquals(cli.getQuery(), "select 1");
Assert.assertEquals(cli.getSession().getDatabase(), DATABASE);
Expand All @@ -62,7 +63,7 @@ public void testConnectionRefused() {
AtomicReference<String> lastNodeID = new AtomicReference<>();

try {
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID);
DatabendQueryResult cli = new DatabendQueryResultV1(client, "select 1", settings, null, lastNodeID);
cli.getResults(); // This should trigger the connection attempt
Assert.fail("Expected exception was not thrown");
} catch (Exception e) {
Expand All @@ -82,7 +83,7 @@ public void testBasicQueryIDHeader() {
Map<String, String> additionalHeaders = new HashMap<>();
additionalHeaders.put(X_Databend_Query_ID, expectedUUID);
ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS);
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID);
DatabendQueryResult cli = new DatabendQueryResultV1(client, "select 1", settings, null, lastNodeID);
Assert.assertEquals(cli.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID);

String expectedUUID1 = UUID.randomUUID().toString();
Expand All @@ -91,7 +92,7 @@ public void testBasicQueryIDHeader() {
ClientSettings settings1 = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders1, null, DEFAULT_RETRY_ATTEMPTS);
Assert.assertEquals(cli.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID);
// check X_Databend_Query_ID won't change after calling next()
DatabendClient cli1 = new DatabendClientV1(client, "SELECT number from numbers(200000) order by number", settings1, null, lastNodeID);
DatabendQueryResult cli1 = new DatabendQueryResultV1(client, "SELECT number from numbers(200000) order by number", settings1, null, lastNodeID);
for (int i = 1; i < 1000; i++) {
cli.advance();
Assert.assertEquals(cli1.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID1);
Expand All @@ -109,7 +110,7 @@ public void testDiscoverNodes() {
Map<String, String> additionalHeaders = new HashMap<>();
additionalHeaders.put(X_Databend_Query_ID, expectedUUID);
ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS);
List<DiscoveryNode> nodes = DatabendClientV1.discoverNodes(client, settings);
List<DiscoveryNode> nodes = DatabendQueryResultV1.discoverNodes(client, settings);
Assert.assertFalse(nodes.isEmpty());
for (DiscoveryNode node : nodes) {
System.out.println(node.getAddress());
Expand All @@ -126,7 +127,7 @@ public void testDiscoverNodesUnSupported() {
additionalHeaders.put("~mock.unsupported.discovery", "true");
ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS);
try {
DatabendClientV1.discoverNodes(client, settings);
DatabendQueryResultV1.discoverNodes(client, settings);
Assert.fail("Expected exception was not thrown");
} catch (Exception e) {
System.out.println(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.databend.client;

import com.databend.client.errors.QueryErrors;
import com.databend.client.utils.JsonCodec;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.airlift.json.JsonCodecFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.List;

import static com.databend.client.JsonCodec.jsonCodec;
import static com.databend.client.utils.JsonCodec.jsonCodec;

public class TestDiscoveryNodes {
private static final JsonCodec<DiscoveryResponseCodec.DiscoveryResponse> QUERY_RESULTS_CODEC = jsonCodec(DiscoveryResponseCodec.DiscoveryResponse.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

package com.databend.client;

import com.databend.client.utils.JsonCodec;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.testng.Assert;
import org.testng.annotations.Test;

import static com.databend.client.JsonCodec.jsonCodec;
import static com.databend.client.utils.JsonCodec.jsonCodec;


@Test(timeOut = 10000)
Expand Down
Loading
Loading