From c5ccaa4f09d10fe7465f38e4a0d07dee950b5b35 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 25 Nov 2024 09:27:21 +0800 Subject: [PATCH] rename --- ...ndClient.java => DatabendQueryResult.java} | 6 ++-- ...ientV1.java => DatabendQueryResultV1.java} | 29 +++++++-------- .../{QueryResults.java => QueryResponse.java} | 4 +-- .../com/databend/client/TestClientIT.java | 12 +++---- ...eryResults.java => TestQueryResponse.java} | 36 +++++++++---------- .../jdbc/AbstractDatabendResultSet.java | 4 +-- .../com/databend/jdbc/DatabendConnection.java | 16 ++++----- .../java/com/databend/jdbc/DatabendNodes.java | 4 +-- .../com/databend/jdbc/DatabendResultSet.java | 24 ++++++------- .../com/databend/jdbc/DatabendStatement.java | 16 ++++----- 10 files changed, 72 insertions(+), 79 deletions(-) rename databend-client/src/main/java/com/databend/client/{DatabendClient.java => DatabendQueryResult.java} (92%) rename databend-client/src/main/java/com/databend/client/{DatabendClientV1.java => DatabendQueryResultV1.java} (94%) rename databend-client/src/main/java/com/databend/client/{QueryResults.java => QueryResponse.java} (98%) rename databend-client/src/test/java/com/databend/client/{TestQueryResults.java => TestQueryResponse.java} (87%) diff --git a/databend-client/src/main/java/com/databend/client/DatabendClient.java b/databend-client/src/main/java/com/databend/client/DatabendQueryResult.java similarity index 92% rename from databend-client/src/main/java/com/databend/client/DatabendClient.java rename to databend-client/src/main/java/com/databend/client/DatabendQueryResult.java index 8e2c7f7d..7cea8d13 100644 --- a/databend-client/src/main/java/com/databend/client/DatabendClient.java +++ b/databend-client/src/main/java/com/databend/client/DatabendQueryResult.java @@ -19,7 +19,7 @@ import java.io.Closeable; import java.util.Map; -public interface DatabendClient extends Closeable { +public interface DatabendQueryResult extends Closeable { String getQuery(); @Override @@ -27,11 +27,9 @@ public interface DatabendClient extends Closeable { DatabendSession getSession(); - String getHost(); - Map getAdditionalHeaders(); - QueryResults getResults(); + QueryResponse getResults(); // execute Restful query request for the first time. // @param request the request to be executed diff --git a/databend-client/src/main/java/com/databend/client/DatabendClientV1.java b/databend-client/src/main/java/com/databend/client/DatabendQueryResultV1.java similarity index 94% rename from databend-client/src/main/java/com/databend/client/DatabendClientV1.java rename to databend-client/src/main/java/com/databend/client/DatabendQueryResultV1.java index a0b79fd0..93baf1d7 100644 --- a/databend-client/src/main/java/com/databend/client/DatabendClientV1.java +++ b/databend-client/src/main/java/com/databend/client/DatabendQueryResultV1.java @@ -41,14 +41,14 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; @ThreadSafe -public class DatabendClientV1 - implements DatabendClient { +public class DatabendQueryResultV1 + implements DatabendQueryResult { private final AtomicReference finished = new AtomicReference<>(false); - public static final String USER_AGENT_VALUE = DatabendClientV1.class.getSimpleName() + + public static final String USER_AGENT_VALUE = DatabendQueryResultV1.class.getSimpleName() + "/" + - firstNonNull(DatabendClientV1.class.getPackage().getImplementationVersion(), "jvm-unknown"); + firstNonNull(DatabendQueryResultV1.class.getPackage().getImplementationVersion(), "jvm-unknown"); public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8"); - public static final JsonCodec QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class); + public static final JsonCodec QUERY_RESULTS_CODEC = jsonCodec(QueryResponse.class); public static final JsonCodec DISCOVERY_RESULT_CODEC = jsonCodec(DiscoveryResponseCodec.DiscoveryResponse.class); public static final String succeededState = "succeeded"; public static final String failedState = "failed"; @@ -70,12 +70,12 @@ public class DatabendClientV1 // client session private final AtomicReference databendSession; private String nodeID; - private final AtomicReference currentResults = new AtomicReference<>(null); - private static final Logger logger = Logger.getLogger(DatabendClientV1.class.getPackage().getName()); + private final AtomicReference currentResults = new AtomicReference<>(null); + private static final Logger logger = Logger.getLogger(DatabendQueryResultV1.class.getPackage().getName()); private Consumer on_session_state_update; - public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings, Consumer on_session_state_update, AtomicReference last_node_id) { + public DatabendQueryResultV1(OkHttpClient httpClient, String sql, ClientSettings settings, Consumer on_session_state_update, AtomicReference last_node_id) { requireNonNull(httpClient, "httpClient is null"); requireNonNull(sql, "sql is null"); requireNonNull(settings, "settings is null"); @@ -254,7 +254,7 @@ private boolean executeInternal(Request request, OptionalLong materializedJsonSi } } attempts++; - JsonResponse response; + JsonResponse response; try { response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request, materializedJsonSizeLimit); } catch (RuntimeException e) { @@ -308,7 +308,7 @@ public boolean execute(Request request) { return executeInternal(request, OptionalLong.empty()); } - private void processResponse(Headers headers, QueryResults results) { + private void processResponse(Headers headers, QueryResponse results) { nodeID = results.getNodeId(); DatabendSession session = results.getSession(); if (session != null) { @@ -359,7 +359,7 @@ public Map getAdditionalHeaders() { } @Override - public QueryResults getResults() { + public QueryResponse getResults() { return currentResults.get(); } @@ -368,11 +368,6 @@ public DatabendSession getSession() { return databendSession.get(); } - @Override - public String getHost() { - return this.host; - } - @Override public void close() { closeQuery(); @@ -382,7 +377,7 @@ private void closeQuery() { if (!finished.compareAndSet(false, true)) { return; } - QueryResults q = this.currentResults.get(); + QueryResponse q = this.currentResults.get(); if (q == null) { return; } diff --git a/databend-client/src/main/java/com/databend/client/QueryResults.java b/databend-client/src/main/java/com/databend/client/QueryResponse.java similarity index 98% rename from databend-client/src/main/java/com/databend/client/QueryResults.java rename to databend-client/src/main/java/com/databend/client/QueryResponse.java index 35fe223b..63a7cc4e 100644 --- a/databend-client/src/main/java/com/databend/client/QueryResults.java +++ b/databend-client/src/main/java/com/databend/client/QueryResponse.java @@ -23,7 +23,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; -public class QueryResults { +public class QueryResponse { private final String queryId; private final String nodeId; private final String sessionId; @@ -41,7 +41,7 @@ public class QueryResults { private final URI killUri; @JsonCreator - public QueryResults( + public QueryResponse( @JsonProperty("id") String queryId, @JsonProperty("node_id") String nodeId, @JsonProperty("session_id") String sessionId, diff --git a/databend-client/src/test/java/com/databend/client/TestClientIT.java b/databend-client/src/test/java/com/databend/client/TestClientIT.java index 1341200d..290ccbe3 100644 --- a/databend-client/src/test/java/com/databend/client/TestClientIT.java +++ b/databend-client/src/test/java/com/databend/client/TestClientIT.java @@ -42,7 +42,7 @@ public void testBasicQueryPagination() { ClientSettings settings = new ClientSettings(DATABEND_HOST); AtomicReference lastNodeID = new AtomicReference<>(); - DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID); + DatabendQueryResult cli = new DatabendQueryResultV1(client, "select 1", settings, null, lastNodeID); System.out.println(cli.getResults().getData()); Assert.assertEquals(cli.getQuery(), "select 1"); Assert.assertEquals(cli.getSession().getDatabase(), DATABASE); @@ -63,7 +63,7 @@ public void testConnectionRefused() { AtomicReference lastNodeID = new AtomicReference<>(); try { - DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID); + DatabendQueryResult cli = new DatabendQueryResultV1(client, "select 1", settings, null, lastNodeID); cli.getResults(); // This should trigger the connection attempt Assert.fail("Expected exception was not thrown"); } catch (Exception e) { @@ -83,7 +83,7 @@ public void testBasicQueryIDHeader() { Map additionalHeaders = new HashMap<>(); additionalHeaders.put(X_Databend_Query_ID, expectedUUID); ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS); - DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID); + DatabendQueryResult cli = new DatabendQueryResultV1(client, "select 1", settings, null, lastNodeID); Assert.assertEquals(cli.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID); String expectedUUID1 = UUID.randomUUID().toString(); @@ -92,7 +92,7 @@ public void testBasicQueryIDHeader() { ClientSettings settings1 = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders1, null, DEFAULT_RETRY_ATTEMPTS); Assert.assertEquals(cli.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID); // check X_Databend_Query_ID won't change after calling next() - DatabendClient cli1 = new DatabendClientV1(client, "SELECT number from numbers(200000) order by number", settings1, null, lastNodeID); + DatabendQueryResult cli1 = new DatabendQueryResultV1(client, "SELECT number from numbers(200000) order by number", settings1, null, lastNodeID); for (int i = 1; i < 1000; i++) { cli.advance(); Assert.assertEquals(cli1.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID1); @@ -110,7 +110,7 @@ public void testDiscoverNodes() { Map additionalHeaders = new HashMap<>(); additionalHeaders.put(X_Databend_Query_ID, expectedUUID); ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS); - List nodes = DatabendClientV1.discoverNodes(client, settings); + List nodes = DatabendQueryResultV1.discoverNodes(client, settings); Assert.assertFalse(nodes.isEmpty()); for (DiscoveryNode node : nodes) { System.out.println(node.getAddress()); @@ -127,7 +127,7 @@ public void testDiscoverNodesUnSupported() { additionalHeaders.put("~mock.unsupported.discovery", "true"); ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS); try { - DatabendClientV1.discoverNodes(client, settings); + DatabendQueryResultV1.discoverNodes(client, settings); Assert.fail("Expected exception was not thrown"); } catch (Exception e) { System.out.println(e.getMessage()); diff --git a/databend-client/src/test/java/com/databend/client/TestQueryResults.java b/databend-client/src/test/java/com/databend/client/TestQueryResponse.java similarity index 87% rename from databend-client/src/test/java/com/databend/client/TestQueryResults.java rename to databend-client/src/test/java/com/databend/client/TestQueryResponse.java index ebc1bd4c..1dd93e29 100644 --- a/databend-client/src/test/java/com/databend/client/TestQueryResults.java +++ b/databend-client/src/test/java/com/databend/client/TestQueryResponse.java @@ -23,13 +23,13 @@ import static io.airlift.json.JsonCodec.jsonCodec; @Test(timeOut = 10000) -public class TestQueryResults { - private static final JsonCodec QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class); +public class TestQueryResponse { + private static final JsonCodec QUERY_RESULTS_CODEC = jsonCodec(QueryResponse.class); @Test(groups = {"unit"}) public void testBasic() { String goldenValue = "{\"id\":\"5c4e776a-8171-462a-b2d3-6a34823d0552\",\"session_id\":\"3563624b-8767-44ff-a235-3f5bb4e54d03\",\"session\":{},\"schema\":[{\"name\":\"(number / 3)\",\"type\":\"Float64\"},{\"name\":\"(number + 1)\",\"type\":\"UInt64\"}],\"data\":[[\"0.0\",\"1\"],[\"0.3333333333333333\",\"2\"],[\"0.6666666666666666\",\"3\"],[\"1.0\",\"4\"],[\"1.3333333333333333\",\"5\"],[\"1.6666666666666667\",\"6\"],[\"2.0\",\"7\"],[\"2.3333333333333335\",\"8\"],[\"2.6666666666666665\",\"9\"],[\"3.0\",\"10\"]],\"state\":\"Succeeded\",\"error\":null,\"stats\":{\"scan_progress\":{\"rows\":10,\"bytes\":80},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":10,\"bytes\":160},\"running_time_ms\":1.494205},\"affect\":null,\"stats_uri\":\"/v1/query/5c4e776a-8171-462a-b2d3-6a34823d0552\",\"final_uri\":\"/v1/query/5c4e776a-8171-462a-b2d3-6a34823d0552/final\",\"next_uri\":\"/v1/query/5c4e776a-8171-462a-b2d3-6a34823d0552/final\",\"kill_uri\":\"/v1/query/5c4e776a-8171-462a-b2d3-6a34823d0552/kill\"}"; - QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenValue); + QueryResponse queryResults = QUERY_RESULTS_CODEC.fromJson(goldenValue); Assert.assertEquals(queryResults.getQueryId(), "5c4e776a-8171-462a-b2d3-6a34823d0552"); Assert.assertEquals(queryResults.getSessionId(), "3563624b-8767-44ff-a235-3f5bb4e54d03"); Assert.assertEquals(queryResults.getSchema().size(), 2); @@ -46,7 +46,7 @@ public void testBasic() { @Test(groups = "unit") public void TestError() { String goldenValue = "{\"id\":\"\",\"session_id\":null,\"session\":null,\"schema\":[],\"data\":[],\"state\":\"Failed\",\"error\":{\"code\":1065,\"message\":\"error: \\n --> SQL:1:8\\n |\\n1 | select error\\n | ^^^^^ column doesn't exist\\n\\n\"},\"stats\":{\"scan_progress\":{\"rows\":0,\"bytes\":0},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":0,\"bytes\":0},\"running_time_ms\":0.0},\"affect\":null,\"stats_uri\":null,\"final_uri\":null,\"next_uri\":null,\"kill_uri\":null}"; - QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenValue); + QueryResponse queryResults = QUERY_RESULTS_CODEC.fromJson(goldenValue); Assert.assertEquals(queryResults.getQueryId(), ""); Assert.assertEquals(queryResults.getSessionId(), null); Assert.assertEquals(queryResults.getSession(), null); @@ -58,7 +58,7 @@ public void TestError() { @Test(groups = "unit") public void TestDateTime() { String goldenString = "{\"id\":\"1fbbaf5b-8807-47d3-bb9c-122a3b7c527c\",\"session_id\":\"ef4a4a66-7a81-4a90-b6ab-d484313111b8\",\"session\":{},\"schema\":[{\"name\":\"date\",\"type\":\"Date\"},{\"name\":\"ts\",\"type\":\"Timestamp\"}],\"data\":[[\"2022-04-07\",\"2022-04-07 01:01:01.123456\"],[\"2022-04-08\",\"2022-04-08 01:01:01.000000\"],[\"2022-04-07\",\"2022-04-07 01:01:01.123456\"],[\"2022-04-08\",\"2022-04-08 01:01:01.000000\"],[\"2022-04-07\",\"2022-04-07 01:01:01.123456\"],[\"2022-04-08\",\"2022-04-08 01:01:01.000000\"]],\"state\":\"Succeeded\",\"error\":null,\"stats\":{\"scan_progress\":{\"rows\":6,\"bytes\":72},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":6,\"bytes\":72},\"running_time_ms\":7.681399},\"affect\":null,\"stats_uri\":\"/v1/query/1fbbaf5b-8807-47d3-bb9c-122a3b7c527c\",\"final_uri\":\"/v1/query/1fbbaf5b-8807-47d3-bb9c-122a3b7c527c/final\",\"next_uri\":\"/v1/query/1fbbaf5b-8807-47d3-bb9c-122a3b7c527c/final\",\"kill_uri\":\"/v1/query/1fbbaf5b-8807-47d3-bb9c-122a3b7c527c/kill\"}"; - QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenString); + QueryResponse queryResults = QUERY_RESULTS_CODEC.fromJson(goldenString); Assert.assertEquals(queryResults.getQueryId(), "1fbbaf5b-8807-47d3-bb9c-122a3b7c527c"); Assert.assertEquals(queryResults.getSessionId(), "ef4a4a66-7a81-4a90-b6ab-d484313111b8"); Assert.assertEquals(queryResults.getSession().getDatabase(), null); @@ -79,7 +79,7 @@ public void TestDateTime() { @Test(groups = "unit") public void TestUseDB() { String goldenString = "{\"id\":\"d0aa3285-0bf5-42da-b06b-0d3db55f10bd\",\"session_id\":\"ded852b7-0da2-46ba-8708-e6fcb1c33081\",\"session\":{\"database\":\"db2\"},\"schema\":[],\"data\":[],\"state\":\"Succeeded\",\"error\":null,\"stats\":{\"scan_progress\":{\"rows\":0,\"bytes\":0},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":0,\"bytes\":0},\"running_time_ms\":0.891883},\"affect\":{\"type\":\"UseDB\",\"name\":\"db2\"},\"stats_uri\":\"/v1/query/d0aa3285-0bf5-42da-b06b-0d3db55f10bd\",\"final_uri\":\"/v1/query/d0aa3285-0bf5-42da-b06b-0d3db55f10bd/final\",\"next_uri\":\"/v1/query/d0aa3285-0bf5-42da-b06b-0d3db55f10bd/final\",\"kill_uri\":\"/v1/query/d0aa3285-0bf5-42da-b06b-0d3db55f10bd/kill\"}"; - QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenString); + QueryResponse queryResults = QUERY_RESULTS_CODEC.fromJson(goldenString); Assert.assertEquals(queryResults.getQueryId(), "d0aa3285-0bf5-42da-b06b-0d3db55f10bd"); QueryAffect affect = queryResults.getAffect(); Assert.assertEquals(affect.getClass(), QueryAffect.UseDB.class); @@ -89,7 +89,7 @@ public void TestUseDB() { @Test(groups = "unit") public void TestChangeSettings() { String goldenString = "{\"id\":\"a59cf8ff-f8a0-4bf6-bb90-120d3ea140c0\",\"session_id\":\"3423881e-f57b-4c53-a432-cf665ac1fb3e\",\"session\":{\"settings\":{\"max_threads\":\"1\"}},\"schema\":[],\"data\":[],\"state\":\"Succeeded\",\"error\":null,\"stats\":{\"scan_progress\":{\"rows\":0,\"bytes\":0},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":0,\"bytes\":0},\"running_time_ms\":0.81772},\"affect\":{\"type\":\"ChangeSettings\",\"keys\":[\"max_threads\"],\"values\":[\"1\"],\"is_globals\":[false]},\"stats_uri\":\"/v1/query/a59cf8ff-f8a0-4bf6-bb90-120d3ea140c0\",\"final_uri\":\"/v1/query/a59cf8ff-f8a0-4bf6-bb90-120d3ea140c0/final\",\"next_uri\":\"/v1/query/a59cf8ff-f8a0-4bf6-bb90-120d3ea140c0/final\",\"kill_uri\":\"/v1/query/a59cf8ff-f8a0-4bf6-bb90-120d3ea140c0/kill\"}"; - QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenString); + QueryResponse queryResults = QUERY_RESULTS_CODEC.fromJson(goldenString); QueryAffect affect = queryResults.getAffect(); Assert.assertEquals(affect.getClass(), QueryAffect.ChangeSettings.class); Assert.assertEquals(((QueryAffect.ChangeSettings) affect).getKeys().size(), 1); @@ -104,22 +104,22 @@ public void TestChangeSettings() { @Test(groups = "unit") public void TestArray() { String goldenString = "{\"id\":\"eecb2440-0180-45cb-8b21-23f4a9975df3\",\"session_id\":\"ef692df6-657d-42b8-a10d-6e6cac657abe\",\"session\":{},\"schema\":[{\"name\":\"id\",\"type\":\"Int8\"},{\"name\":\"obj\",\"type\":\"Variant\"},{\"name\":\"d\",\"type\":\"Timestamp\"},{\"name\":\"s\",\"type\":\"String\"},{\"name\":\"arr\",\"type\":\"Array(Int64)\"}],\"data\":[[\"1\",\"{\\\"a\\\": 1,\\\"b\\\": 2}\",\"1983-07-12 21:30:55.888000\",\"hello world, 你好\",\"[1,2,3,4,5]\"]],\"state\":\"Succeeded\",\"error\":null,\"stats\":{\"scan_progress\":{\"rows\":1,\"bytes\":131},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":1,\"bytes\":131},\"running_time_ms\":9.827047},\"affect\":null,\"stats_uri\":\"/v1/query/eecb2440-0180-45cb-8b21-23f4a9975df3\",\"final_uri\":\"/v1/query/eecb2440-0180-45cb-8b21-23f4a9975df3/final\",\"next_uri\":\"/v1/query/eecb2440-0180-45cb-8b21-23f4a9975df3/final\",\"kill_uri\":\"/v1/query/eecb2440-0180-45cb-8b21-23f4a9975df3/kill\"}"; - QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenString); - Assert.assertEquals(queryResults.getQueryId(), "eecb2440-0180-45cb-8b21-23f4a9975df3"); - Assert.assertEquals(queryResults.getSchema().size(), 5); - Assert.assertEquals(queryResults.getSchema().get(0).getName(), "id"); - Assert.assertEquals(queryResults.getSchema().get(0).getDataType().getType(), "Int8"); + QueryResponse queryResponse = QUERY_RESULTS_CODEC.fromJson(goldenString); + Assert.assertEquals(queryResponse.getQueryId(), "eecb2440-0180-45cb-8b21-23f4a9975df3"); + Assert.assertEquals(queryResponse.getSchema().size(), 5); + Assert.assertEquals(queryResponse.getSchema().get(0).getName(), "id"); + Assert.assertEquals(queryResponse.getSchema().get(0).getDataType().getType(), "Int8"); } @Test(groups = "unit") public void TestVariant() { String goldenString = "{\"id\":\"d74b2471-3a15-45e2-9ef4-ca8a39505661\",\"session_id\":\"f818e198-20d9-4c06-8de6-bc68ab6e9dc1\",\"session\":{},\"schema\":[{\"name\":\"var\",\"type\":\"Nullable(Variant)\"}],\"data\":[[\"1\"],[\"1.34\"],[\"true\"],[\"[1,2,3,[\\\"a\\\",\\\"b\\\",\\\"c\\\"]]\"],[\"{\\\"a\\\":1,\\\"b\\\":{\\\"c\\\":2}}\"]],\"state\":\"Succeeded\",\"error\":null,\"stats\":{\"scan_progress\":{\"rows\":5,\"bytes\":168},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":5,\"bytes\":168},\"running_time_ms\":7.827281},\"affect\":null,\"stats_uri\":\"/v1/query/d74b2471-3a15-45e2-9ef4-ca8a39505661\",\"final_uri\":\"/v1/query/d74b2471-3a15-45e2-9ef4-ca8a39505661/final\",\"next_uri\":\"/v1/query/d74b2471-3a15-45e2-9ef4-ca8a39505661/final\",\"kill_uri\":\"/v1/query/d74b2471-3a15-45e2-9ef4-ca8a39505661/kill\"}\n"; - QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenString); - Assert.assertEquals(queryResults.getQueryId(), "d74b2471-3a15-45e2-9ef4-ca8a39505661"); - Assert.assertEquals(queryResults.getSchema().size(), 1); - Assert.assertEquals(queryResults.getSchema().get(0).getName(), "var"); - Assert.assertEquals(queryResults.getSchema().get(0).getDataType().getType(), "Variant"); - Assert.assertEquals(queryResults.getSchema().get(0).getDataType().isNullable(), true); + QueryResponse queryResponse = QUERY_RESULTS_CODEC.fromJson(goldenString); + Assert.assertEquals(queryResponse.getQueryId(), "d74b2471-3a15-45e2-9ef4-ca8a39505661"); + Assert.assertEquals(queryResponse.getSchema().size(), 1); + Assert.assertEquals(queryResponse.getSchema().get(0).getName(), "var"); + Assert.assertEquals(queryResponse.getSchema().get(0).getDataType().getType(), "Variant"); + Assert.assertEquals(queryResponse.getSchema().get(0).getDataType().isNullable(), true); } } diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/AbstractDatabendResultSet.java b/databend-jdbc/src/main/java/com/databend/jdbc/AbstractDatabendResultSet.java index 2eec7742..6e90ba15 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/AbstractDatabendResultSet.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/AbstractDatabendResultSet.java @@ -1,6 +1,6 @@ package com.databend.jdbc; -import com.databend.client.QueryResults; +import com.databend.client.QueryResponse; import com.databend.client.QueryRowField; import com.databend.client.data.ColumnTypeHandler; import com.databend.client.data.ColumnTypeHandlerFactory; @@ -155,7 +155,7 @@ private static BigDecimal parseBigDecimal(String value) .orElseThrow(() -> new SQLException("Value is not a number: " + value)); } - static SQLException resultsException(QueryResults results, String originalSQL) { + static SQLException resultsException(QueryResponse results, String originalSQL) { QueryErrors error = requireNonNull(results.getError()); String message = format("SQL: (%s) Query failed (#%s): %s", originalSQL, results.getQueryId(), error.getMessage()); return new SQLException(message, String.valueOf(error.getCode())); diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java index 7703db42..858b1946 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java @@ -1,8 +1,8 @@ package com.databend.jdbc; import com.databend.client.ClientSettings; -import com.databend.client.DatabendClient; -import com.databend.client.DatabendClientV1; +import com.databend.client.DatabendQueryResult; +import com.databend.client.DatabendQueryResultV1; import com.databend.client.DatabendSession; import com.databend.client.PaginationOptions; import com.databend.client.QueryRequest; @@ -51,8 +51,8 @@ import java.util.zip.GZIPOutputStream; import static com.databend.client.ClientSettings.*; -import static com.databend.client.DatabendClientV1.MEDIA_TYPE_JSON; -import static com.databend.client.DatabendClientV1.USER_AGENT_VALUE; +import static com.databend.client.DatabendQueryResultV1.MEDIA_TYPE_JSON; +import static com.databend.client.DatabendQueryResultV1.USER_AGENT_VALUE; import static com.google.common.base.Preconditions.checkState; import static java.net.URI.create; import static java.util.Collections.newSetFromMap; @@ -687,7 +687,7 @@ public void accept(DatabendSession session) { * @throws SQLException If the query fails after retrying the specified number of times. * @see DatabendClientLoadBalancingPolicy */ - DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws SQLException { + DatabendQueryResult startQueryWithFailover(String sql, StageAttachment attach) throws SQLException { Exception e = null; int times = getMaxFailoverRetries() + 1; @@ -726,7 +726,7 @@ DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws if (this.autoDiscovery) { tryAutoDiscovery(httpClient, s); } - return new DatabendClientV1(httpClient, sql, s, this, lastNodeID); + return new DatabendQueryResultV1(httpClient, sql, s, this, lastNodeID); } catch (RuntimeException e1) { e = e1; } catch (Exception e1) { @@ -762,11 +762,11 @@ void tryAutoDiscovery(OkHttpClient client, ClientSettings settings) { } - DatabendClient startQuery(String sql) throws SQLException { + DatabendQueryResult startQuery(String sql) throws SQLException { return startQueryWithFailover(sql, null); } - DatabendClient startQuery(String sql, StageAttachment attach) throws SQLException { + DatabendQueryResult startQuery(String sql, StageAttachment attach) throws SQLException { return startQueryWithFailover(sql, attach); } diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodes.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodes.java index 86757251..355844a6 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodes.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodes.java @@ -1,7 +1,7 @@ package com.databend.jdbc; import com.databend.client.ClientSettings; -import com.databend.client.DatabendClientV1; +import com.databend.client.DatabendQueryResultV1; import com.databend.client.DiscoveryNode; import lombok.Setter; import okhttp3.OkHttpClient; @@ -85,7 +85,7 @@ public void discoverUris(OkHttpClient client, ClientSettings settings) throws Un return; } try { - List new_nodes = DatabendClientV1.discoverNodes(client, settings); + List new_nodes = DatabendQueryResultV1.discoverNodes(client, settings); if (!new_nodes.isEmpty()) { // convert new nodes using lambda List new_uris = this.parseURI(new_nodes); diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendResultSet.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendResultSet.java index fe235179..3c55e181 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendResultSet.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendResultSet.java @@ -1,7 +1,7 @@ package com.databend.jdbc; -import com.databend.client.DatabendClient; -import com.databend.client.QueryResults; +import com.databend.client.DatabendQueryResult; +import com.databend.client.QueryResponse; import com.databend.client.QueryRowField; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.AbstractIterator; @@ -30,13 +30,13 @@ public class DatabendResultSet extends AbstractDatabendResultSet { private final String queryId; private final Statement statement; - private final DatabendClient client; + private final DatabendQueryResult client; @GuardedBy("this") private boolean closed; @GuardedBy("this") private boolean closeStatementOnClose; - private DatabendResultSet(Statement statement, DatabendClient client, List schema, long maxRows) throws SQLException { + private DatabendResultSet(Statement statement, DatabendQueryResult client, List schema, long maxRows) throws SQLException { super(Optional.of(requireNonNull(statement, "statement is null")), schema, new AsyncIterator<>(flatten(new ResultsPageIterator(client), maxRows), client), client.getResults().getQueryId()); this.statement = statement; @@ -44,7 +44,7 @@ private DatabendResultSet(Statement statement, DatabendClient client, List s = client.getResults().getSchema(); @@ -107,19 +107,19 @@ static class AsyncIterator extends AbstractIterator { private static final int MAX_QUEUED_ROWS = 50_000; private static final ExecutorService executorService = newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat("Databend JDBC worker-%s").setDaemon(true).build()); - private final DatabendClient client; + private final DatabendQueryResult client; private final BlockingQueue rowQueue; private final Semaphore semaphore = new Semaphore(0); private final Future future; private volatile boolean cancelled; private volatile boolean finished; - public AsyncIterator(Iterator dataIterator, DatabendClient client) { + public AsyncIterator(Iterator dataIterator, DatabendQueryResult client) { this(dataIterator, client, Optional.empty()); } @VisibleForTesting - AsyncIterator(Iterator dataIterator, DatabendClient client, Optional> queue) { + AsyncIterator(Iterator dataIterator, DatabendQueryResult client, Optional> queue) { requireNonNull(dataIterator, "dataIterator is null"); this.client = client; this.rowQueue = queue.orElseGet(() -> new ArrayBlockingQueue<>(MAX_QUEUED_ROWS)); @@ -191,9 +191,9 @@ private void handleInterrupt(InterruptedException e) { } private static class ResultsPageIterator extends AbstractIterator>> { - private final DatabendClient client; + private final DatabendQueryResult client; - private ResultsPageIterator(DatabendClient client) { + private ResultsPageIterator(DatabendQueryResult client) { this.client = client; } @@ -201,7 +201,7 @@ private ResultsPageIterator(DatabendClient client) { @Override protected Iterable> computeNext() { while (client.hasNext()) { - QueryResults results = client.getResults(); + QueryResponse results = client.getResults(); List> rows = results.getData(); try { client.advance(); @@ -213,7 +213,7 @@ protected Iterable> computeNext() { } } // next uri is null, no more data - QueryResults results = client.getResults(); + QueryResponse results = client.getResults(); if (results.getError() != null) { throw new RuntimeException(resultsException(results, client.getQuery())); } diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendStatement.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendStatement.java index 0ee3bbe3..e6a8fef5 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendStatement.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendStatement.java @@ -1,8 +1,8 @@ package com.databend.jdbc; -import com.databend.client.DatabendClient; +import com.databend.client.DatabendQueryResult; import com.databend.client.DatabendSession; -import com.databend.client.QueryResults; +import com.databend.client.QueryResponse; import com.databend.client.StageAttachment; import com.databend.jdbc.annotation.NotImplemented; @@ -28,7 +28,7 @@ public class DatabendStatement implements Statement { private final Consumer onClose; private int currentUpdateCount = -1; private final AtomicReference currentResult = new AtomicReference<>(); - private final AtomicReference executingClient = new AtomicReference<>(); + private final AtomicReference executingClient = new AtomicReference<>(); private final AtomicLong maxRows = new AtomicLong(); private final AtomicBoolean closeOnCompletion = new AtomicBoolean(); @@ -58,7 +58,7 @@ public void close() return; } onClose.accept(this); - DatabendClient client = executingClient.get(); + DatabendQueryResult client = executingClient.get(); if (client != null) { client.close(); } @@ -118,7 +118,7 @@ public void setQueryTimeout(int i) public void cancel() throws SQLException { checkOpen(); - DatabendClient client = executingClient.get(); + DatabendQueryResult client = executingClient.get(); if (client != null) { client.close(); } @@ -165,7 +165,7 @@ private void clearCurrentResults() { currentResult.set(null); } - private void updateClientSession(QueryResults q) { + private void updateClientSession(QueryResponse q) { if (q == null) { return; } @@ -181,7 +181,7 @@ private void updateClientSession(QueryResults q) { final boolean internalExecute(String sql, StageAttachment attachment) throws SQLException { clearCurrentResults(); checkOpen(); - DatabendClient client = null; + DatabendQueryResult client = null; DatabendResultSet resultSet = null; try { @@ -202,7 +202,7 @@ final boolean internalExecute(String sql, StageAttachment attachment) throws SQL } executingClient.set(client); while (client.hasNext()) { - QueryResults results = client.getResults(); + QueryResponse results = client.getResults(); List> data = results.getData(); // List schema = results.getSchema(); if (data == null || data.isEmpty()) {