Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Nov 25, 2024
1 parent e66e96f commit c5ccaa4
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,17 @@
import java.io.Closeable;
import java.util.Map;

public interface DatabendClient extends Closeable {
public interface DatabendQueryResult extends Closeable {
String getQuery();

@Override
void close();

DatabendSession getSession();

String getHost();

Map<String, String> getAdditionalHeaders();

QueryResults getResults();
QueryResponse getResults();

// execute Restful query request for the first time.
// @param request the request to be executed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;

@ThreadSafe
public class DatabendClientV1
implements DatabendClient {
public class DatabendQueryResultV1
implements DatabendQueryResult {
private final AtomicReference<Boolean> finished = new AtomicReference<>(false);
public static final String USER_AGENT_VALUE = DatabendClientV1.class.getSimpleName() +
public static final String USER_AGENT_VALUE = DatabendQueryResultV1.class.getSimpleName() +
"/" +
firstNonNull(DatabendClientV1.class.getPackage().getImplementationVersion(), "jvm-unknown");
firstNonNull(DatabendQueryResultV1.class.getPackage().getImplementationVersion(), "jvm-unknown");
public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8");
public static final JsonCodec<QueryResults> QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class);
public static final JsonCodec<QueryResponse> QUERY_RESULTS_CODEC = jsonCodec(QueryResponse.class);
public static final JsonCodec<DiscoveryResponseCodec.DiscoveryResponse> DISCOVERY_RESULT_CODEC = jsonCodec(DiscoveryResponseCodec.DiscoveryResponse.class);
public static final String succeededState = "succeeded";
public static final String failedState = "failed";
Expand All @@ -70,12 +70,12 @@ public class DatabendClientV1
// client session
private final AtomicReference<DatabendSession> databendSession;
private String nodeID;
private final AtomicReference<QueryResults> currentResults = new AtomicReference<>(null);
private static final Logger logger = Logger.getLogger(DatabendClientV1.class.getPackage().getName());
private final AtomicReference<QueryResponse> currentResults = new AtomicReference<>(null);
private static final Logger logger = Logger.getLogger(DatabendQueryResultV1.class.getPackage().getName());

private Consumer<DatabendSession> on_session_state_update;

public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings, Consumer<DatabendSession> on_session_state_update, AtomicReference<String> last_node_id) {
public DatabendQueryResultV1(OkHttpClient httpClient, String sql, ClientSettings settings, Consumer<DatabendSession> on_session_state_update, AtomicReference<String> last_node_id) {
requireNonNull(httpClient, "httpClient is null");
requireNonNull(sql, "sql is null");
requireNonNull(settings, "settings is null");
Expand Down Expand Up @@ -254,7 +254,7 @@ private boolean executeInternal(Request request, OptionalLong materializedJsonSi
}
}
attempts++;
JsonResponse<QueryResults> response;
JsonResponse<QueryResponse> response;
try {
response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request, materializedJsonSizeLimit);
} catch (RuntimeException e) {
Expand Down Expand Up @@ -308,7 +308,7 @@ public boolean execute(Request request) {
return executeInternal(request, OptionalLong.empty());
}

private void processResponse(Headers headers, QueryResults results) {
private void processResponse(Headers headers, QueryResponse results) {
nodeID = results.getNodeId();
DatabendSession session = results.getSession();
if (session != null) {
Expand Down Expand Up @@ -359,7 +359,7 @@ public Map<String, String> getAdditionalHeaders() {
}

@Override
public QueryResults getResults() {
public QueryResponse getResults() {
return currentResults.get();
}

Expand All @@ -368,11 +368,6 @@ public DatabendSession getSession() {
return databendSession.get();
}

@Override
public String getHost() {
return this.host;
}

@Override
public void close() {
closeQuery();
Expand All @@ -382,7 +377,7 @@ private void closeQuery() {
if (!finished.compareAndSet(false, true)) {
return;
}
QueryResults q = this.currentResults.get();
QueryResponse q = this.currentResults.get();
if (q == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import static com.google.common.base.MoreObjects.toStringHelper;

public class QueryResults {
public class QueryResponse {
private final String queryId;
private final String nodeId;
private final String sessionId;
Expand All @@ -41,7 +41,7 @@ public class QueryResults {
private final URI killUri;

@JsonCreator
public QueryResults(
public QueryResponse(
@JsonProperty("id") String queryId,
@JsonProperty("node_id") String nodeId,
@JsonProperty("session_id") String sessionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testBasicQueryPagination() {

ClientSettings settings = new ClientSettings(DATABEND_HOST);
AtomicReference<String> lastNodeID = new AtomicReference<>();
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID);
DatabendQueryResult cli = new DatabendQueryResultV1(client, "select 1", settings, null, lastNodeID);
System.out.println(cli.getResults().getData());
Assert.assertEquals(cli.getQuery(), "select 1");
Assert.assertEquals(cli.getSession().getDatabase(), DATABASE);
Expand All @@ -63,7 +63,7 @@ public void testConnectionRefused() {
AtomicReference<String> lastNodeID = new AtomicReference<>();

try {
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID);
DatabendQueryResult cli = new DatabendQueryResultV1(client, "select 1", settings, null, lastNodeID);
cli.getResults(); // This should trigger the connection attempt
Assert.fail("Expected exception was not thrown");
} catch (Exception e) {
Expand All @@ -83,7 +83,7 @@ public void testBasicQueryIDHeader() {
Map<String, String> additionalHeaders = new HashMap<>();
additionalHeaders.put(X_Databend_Query_ID, expectedUUID);
ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS);
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID);
DatabendQueryResult cli = new DatabendQueryResultV1(client, "select 1", settings, null, lastNodeID);
Assert.assertEquals(cli.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID);

String expectedUUID1 = UUID.randomUUID().toString();
Expand All @@ -92,7 +92,7 @@ public void testBasicQueryIDHeader() {
ClientSettings settings1 = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders1, null, DEFAULT_RETRY_ATTEMPTS);
Assert.assertEquals(cli.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID);
// check X_Databend_Query_ID won't change after calling next()
DatabendClient cli1 = new DatabendClientV1(client, "SELECT number from numbers(200000) order by number", settings1, null, lastNodeID);
DatabendQueryResult cli1 = new DatabendQueryResultV1(client, "SELECT number from numbers(200000) order by number", settings1, null, lastNodeID);
for (int i = 1; i < 1000; i++) {
cli.advance();
Assert.assertEquals(cli1.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID1);
Expand All @@ -110,7 +110,7 @@ public void testDiscoverNodes() {
Map<String, String> additionalHeaders = new HashMap<>();
additionalHeaders.put(X_Databend_Query_ID, expectedUUID);
ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS);
List<DiscoveryNode> nodes = DatabendClientV1.discoverNodes(client, settings);
List<DiscoveryNode> nodes = DatabendQueryResultV1.discoverNodes(client, settings);
Assert.assertFalse(nodes.isEmpty());
for (DiscoveryNode node : nodes) {
System.out.println(node.getAddress());
Expand All @@ -127,7 +127,7 @@ public void testDiscoverNodesUnSupported() {
additionalHeaders.put("~mock.unsupported.discovery", "true");
ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS);
try {
DatabendClientV1.discoverNodes(client, settings);
DatabendQueryResultV1.discoverNodes(client, settings);
Assert.fail("Expected exception was not thrown");
} catch (Exception e) {
System.out.println(e.getMessage());
Expand Down
Loading

0 comments on commit c5ccaa4

Please sign in to comment.