Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support passive auto discovery #267

Merged
merged 3 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions databend-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
<parent>
<groupId>com.databend</groupId>
<artifactId>databend-base</artifactId>
<version>0.3.0</version>
<version>0.3.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>com.databend</groupId>
<artifactId>databend-client</artifactId>
<version>0.3.0</version>
<version>0.3.1</version>

<properties>
<!--suppress UnresolvedMavenProperty -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings sett
}
}

public static List<DiscoveryNode> dicoverNodes(OkHttpClient httpClient, ClientSettings settings) {
public static List<DiscoveryNode> discoverNodes(OkHttpClient httpClient, ClientSettings settings) {
requireNonNull(httpClient, "httpClient is null");
requireNonNull(settings, "settings is null");
requireNonNull(settings.getHost(), "settings.host is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public DiscoveryNode(
this.address = address;
}

public static DiscoveryNode create(String address) {
return new DiscoveryNode(address);
}
// add builder

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testDiscoverNodes() {
Map<String, String> additionalHeaders = new HashMap<>();
additionalHeaders.put(X_Databend_Query_ID, expectedUUID);
ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS);
List<DiscoveryNode> nodes = DatabendClientV1.dicoverNodes(client, settings);
List<DiscoveryNode> nodes = DatabendClientV1.discoverNodes(client, settings);
Assert.assertFalse(nodes.isEmpty());
for (DiscoveryNode node : nodes) {
System.out.println(node.getAddress());
Expand All @@ -119,7 +119,7 @@ public void testDiscoverNodesUnSupported() {
additionalHeaders.put("~mock.unsupported.discovery", "true");
ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS);
try {
DatabendClientV1.dicoverNodes(client, settings);
DatabendClientV1.discoverNodes(client, settings);
Assert.fail("Expected exception was not thrown");
} catch (Exception e) {
System.out.println(e.getMessage());
Expand Down
6 changes: 3 additions & 3 deletions databend-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
<parent>
<groupId>com.databend</groupId>
<artifactId>databend-base</artifactId>
<version>0.3.0</version>
<version>0.3.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>com.databend</groupId>
<artifactId>databend-jdbc</artifactId>
<version>0.3.0</version>
<version>0.3.1</version>

<properties>
<!--suppress UnresolvedMavenProperty -->
Expand All @@ -24,7 +24,7 @@
<dependency>
<groupId>com.databend</groupId>
<artifactId>databend-client</artifactId>
<version>0.3.0</version>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public final class ConnectionProperties {
public static final ConnectionProperty<Integer> MAX_FAILOVER_RETRY = new MaxFailoverRetry();
public static final ConnectionProperty<String> LOAD_BALANCING_POLICY = new LoadBalancingPolicy();
public static final ConnectionProperty<Boolean> AUTO_DISCOVERY = new AutoDiscovery();

public static final ConnectionProperty<Boolean> ENABLE_MOCK = new EnableMock();
public static final ConnectionProperty<String> DATABASE = new Database();
public static final ConnectionProperty<String> ACCESS_TOKEN = new AccessToken();

Expand Down Expand Up @@ -162,6 +162,12 @@ public AutoDiscovery() {
}
}

private static class EnableMock extends AbstractConnectionProperty<Boolean> {
public EnableMock() {
super("enable_mock", Optional.of("false"), NOT_REQUIRED, ALLOWED, BOOLEAN_CONVERTER);
}
}

private static class AccessToken
extends AbstractConnectionProperty<String> {
public AccessToken() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,10 @@ public boolean copyPurge() {
return this.driverUri.copyPurge();
}

public boolean isAutoDiscovery() {
return this.autoDiscovery;
}

public String warehouse() {
return this.driverUri.getWarehouse();
}
Expand Down Expand Up @@ -718,17 +722,43 @@ DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws
ClientSettings s = sb.build();
logger.log(Level.FINE, "retry " + i + " times to execute query: " + sql + " on " + s.getHost());
// discover new hosts in need.
// if (this.autoDiscovery) {
//
// }
if (this.autoDiscovery) {
tryAutoDiscovery(httpClient, s);
}
return new DatabendClientV1(httpClient, sql, s, this);
} catch (RuntimeException e1) {
e = e1;
} catch (Exception e1) {
throw new SQLException("Error executing query: " + "SQL: " + sql + " " + e1.getMessage() + " cause: " + e1.getCause(), e1);
}
}
throw new SQLException("Failover Retry Error executing query after" + getMaxFailoverRetries() + "failover retry: " + "SQL: " + sql + " " + e.getMessage() + " cause: " + e.getCause(), e);
throw new SQLException("Failover Retry Error executing query after " + getMaxFailoverRetries() + " failover retry: " + "SQL: " + sql + " " + e.getMessage() + " cause: " + e.getCause(), e);
}

/**
* Try to auto discovery the databend nodes it will log exceptions when auto discovery failed and not affect real query execution
*
* @param client the http client to query on
* @param settings the client settings to use
*/
void tryAutoDiscovery(OkHttpClient client, ClientSettings settings) {
if (this.autoDiscovery) {
if (this.driverUri.enableMock()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat mock 👍

settings.getAdditionalHeaders().put("~mock.unsupported.discovery", "true");
}
DatabendNodes nodes = this.driverUri.getNodes();
if (nodes != null && nodes.needDiscovery()) {
try {
nodes.discoverUris(client, settings);
} catch (UnsupportedOperationException e) {
logger.log(Level.WARNING, "Current Query Node do not support auto discovery, close the functionality: " + e.getMessage());
this.autoDiscovery = false;
} catch (Exception e) {
logger.log(Level.FINE, "Error auto discovery: " + " cause: " + e.getCause() + " message: " + e.getMessage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first time to find out FINE log level, what's it's normal use case?

(not an issue at all, just curious 👀)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

info level will print problem by default, I think it is similar to DEBUG level on golang

}
}
}

}

DatabendClient startQuery(String sql) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public final class DatabendDriverUri {
private final Integer connectionTimeout;
private final Integer maxFailoverRetry;
private final boolean autoDiscovery;
private final boolean enableMock;
private final Integer queryTimeout;
private final Integer socketTimeout;
private final Integer waitTimeSecs;
Expand All @@ -73,6 +74,7 @@ private DatabendDriverUri(String url, Properties driverProperties)
this.useSecureConnection = SSL.getValue(properties).orElse(false);
this.useVerify = USE_VERIFY.getValue(properties).orElse(false);
this.debug = DEBUG.getValue(properties).orElse(false);
this.enableMock = ENABLE_MOCK.getValue(properties).orElse(false);
this.strNullAsNull = STRNULL_AS_NULL.getValue(properties).orElse(true);
this.warehouse = WAREHOUSE.getValue(properties).orElse("");
this.sslmode = SSL_MODE.getValue(properties).orElse("disable");
Expand Down Expand Up @@ -117,7 +119,7 @@ private static void initDatabase(URI uri, Map<String, String> uriProperties) thr
uriProperties.put(DATABASE.getKey(), db);
}

private static List<URI> canonicalizeUris(List<URI> uris, boolean isSSLSecured, String sslmode) throws SQLException {
public static List<URI> canonicalizeUris(List<URI> uris, boolean isSSLSecured, String sslmode) throws SQLException {
List<URI> finalUris = new ArrayList<>();
for (URI uri : uris) {
finalUris.add(canonicalizeUri(uri, isSSLSecured, sslmode));
Expand Down Expand Up @@ -351,6 +353,10 @@ public boolean getDebug() {
return debug;
}

public boolean enableMock() {
return enableMock;
}

public String getSslmode() {
return sslmode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ public interface DatabendNodeRouter {

/**
* Discover all possible query uris through databend discovery api and update candidate node router list in need
*
* @return true if update operation executed, false otherwise
* Ref PR:
* https://github.com/datafuselabs/databend-jdbc/pull/264
* https://github.com/datafuselabs/databend/pull/16353
*/
boolean discoverUris(OkHttpClient client, ClientSettings settings);
void discoverUris(OkHttpClient client, ClientSettings settings) throws UnsupportedOperationException;

boolean needDiscovery();
}
59 changes: 39 additions & 20 deletions databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,28 @@
import com.databend.client.ClientSettings;
import com.databend.client.DatabendClientV1;
import com.databend.client.DiscoveryNode;
import lombok.Setter;
import okhttp3.OkHttpClient;

import java.net.URI;
import java.net.URISyntaxException;
import java.security.InvalidParameterException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.logging.Logger;

public class DatabendNodes implements DatabendNodeRouter {

private AtomicReference<List<URI>> query_nodes_uris;
protected final AtomicInteger index;
// keep track of latest discovery scheduled time
protected final AtomicReference<Long> lastDiscoveryTime = new AtomicReference<>(0L);
private static final Logger logger = Logger.getLogger(DatabendNodes.class.getPackage().getName());
@Setter
private boolean debug = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess if there is only one setter, define a setDebug is preferrable than using lomok

// minimum time between discovery
protected long discoveryInterval = 1000 * 60 * 5;
protected DatabendClientLoadBalancingPolicy policy;
Expand Down Expand Up @@ -64,28 +69,42 @@ public DatabendClientLoadBalancingPolicy getPolicy() {
}

@Override
public boolean discoverUris(OkHttpClient client, ClientSettings settings) {
public void discoverUris(OkHttpClient client, ClientSettings settings) throws UnsupportedOperationException {
// do nothing if discovery interval is not reached
Long lastDiscoveryTime = this.lastDiscoveryTime.get();
if (System.currentTimeMillis() - lastDiscoveryTime < discoveryInterval) {
return false;
return;
}
List<URI> current_nodes = query_nodes_uris.get();
List<URI> current_uris = query_nodes_uris.get();
if (!this.lastDiscoveryTime.compareAndSet(lastDiscoveryTime, System.currentTimeMillis())) {
return false;
return;
}

List<DiscoveryNode> new_nodes = DatabendClientV1.dicoverNodes(client, settings);
if (!new_nodes.isEmpty()) {
// convert new nodes using lambda
List<URI> new_uris = new_nodes.stream().map(node -> URI.create("http://" + node.getAddress())).collect(Collectors.toList());
updateNodes(new_uris);
return true;
try {
List<DiscoveryNode> new_nodes = DatabendClientV1.discoverNodes(client, settings);
if (!new_nodes.isEmpty()) {
// convert new nodes using lambda
List<URI> new_uris = this.parseURI(new_nodes);
if (this.query_nodes_uris.compareAndSet(current_uris, new_uris)) {
java.util.logging.Level level = debug ? java.util.logging.Level.INFO : java.util.logging.Level.FINE;
// the log would only show that when truly updated the nodes
logger.log(level, "Automatic Discovery updated nodes: " + new_uris);
}
}
} catch (UnsupportedOperationException e) {
throw e;
} catch (RuntimeException e) {
logger.log(java.util.logging.Level.WARNING, "Error updating nodes: " + e.getMessage());
}
return false;

}

private List<URI> parseURI(List<DiscoveryNode> nodes) throws SQLException {
@Override
public boolean needDiscovery() {
Long lastDiscoveryTime = this.lastDiscoveryTime.get();
return System.currentTimeMillis() - lastDiscoveryTime >= discoveryInterval;
}

public List<URI> parseURI(List<com.databend.client.DiscoveryNode> nodes) throws RuntimeException {
String host = null;
List<URI> uris = new ArrayList<>();
try {
Expand All @@ -103,20 +122,20 @@ private List<URI> parseURI(List<DiscoveryNode> nodes) throws SQLException {
} else if (hostAndPort.length == 1) {
host = hostAndPort[0];
} else {
throw new SQLException("Invalid host and port, url: " + uri);
throw new InvalidParameterException("Invalid host and port, url: " + uri);
}
if (host == null || host.isEmpty()) {
throw new SQLException("Invalid host " + host);
throw new InvalidParameterException("Invalid host " + host);
}

uris.add(new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), uriPath, uriQuery, uriFragment));
}

return DatabendDriverUri.canonicalizeUris(uris, this.useSecureConnection, this.sslmode);
} catch (URISyntaxException e) {
throw new SQLException("Invalid URI", e.getMessage());
throw new InvalidParameterException("Invalid URI " + e.getMessage());
} catch (SQLException e) {
throw new RuntimeException("Error parsing URI " + e.getMessage());
}

return uris;
}

public URI pickUri(String query_id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;


import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,7 @@
import org.testng.annotations.Test;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ParameterMetaData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,9 @@
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.*;
import java.nio.file.Files;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
Expand Down
Loading