Skip to content

Commit

Permalink
Merge pull request #125 from forcedotcom/develop
Browse files Browse the repository at this point in the history
@W-12387797: 1.17.0 release for tableau user to use hyper engine as default option
  • Loading branch information
soaggarwal authored Feb 13, 2023
2 parents 5fc9c93 + 2920b65 commit aeb6b11
Show file tree
Hide file tree
Showing 13 changed files with 154 additions and 164 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ Class.forName("com.salesforce.cdp.queryservice.QueryServiceDriver");
Properties properties = new Properties();
properties.put("user", <UserName>);
properties.put("password", <Password>);
properties.put("clientId", <Client Id of the connected App>);
properties.put("clientSecret", <Client Secret of the connected App>);
Connection connection = DriverManager.getConnection("jdbc:queryService-jdbc:https://login.salesforce.com", properties);
```
Expand Down Expand Up @@ -164,7 +167,9 @@ import jaydebeapi
// Sample properties with username and password flow.
properties = {
'user': "<UserName>",
'password': "<Password>"
'password': "<Password>",
'clientId': "<Client Id of the connected App>",
'clientSecret': "<Client Secret of the connected App>"
}
// Sample properties with key-pair authentication flow.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.queryService</groupId>
<artifactId>Salesforce-CDP-jdbc</artifactId>
<version>1.16.0</version>
<version>1.17.0</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlQueryStreamResponse;
import com.salesforce.cdp.queryservice.enums.QueryEngineEnum;
import com.salesforce.cdp.queryservice.model.QueryServiceResponse;
import com.salesforce.cdp.queryservice.model.Type;
import com.salesforce.cdp.queryservice.util.ArrowUtil;
Expand Down Expand Up @@ -80,15 +81,15 @@ public QueryServiceAbstractStatement(QueryServiceConnection queryServiceConnecti
public ResultSet executeQuery(String sql) throws SQLException {
try {
this.sql = sql;
boolean isEnableStreamFlow = this.connection.isEnableStreamFlow();
QueryEngineEnum engineEnum = this.connection.getQueryEngineEnum();

boolean isCursorBasedPaginationReq = this.connection.isCursorBasedPaginationReq();

boolean requireManagedPagination = isTableauQuery() && !isCursorBasedPaginationReq;
Optional<Integer> limit = requireManagedPagination ? Optional.of(Constants.MAX_LIMIT) : Optional.empty();
Optional<String> orderby = requireManagedPagination ? Optional.of("1 ASC") : Optional.empty();

if(isEnableStreamFlow) {
if (QueryEngineEnum.HYPER == engineEnum) {
Iterator<AnsiSqlQueryStreamResponse> response = queryGrpcExecutor.executeQueryWithRetry(sql);
return createResultSetFromResponse(response);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,29 @@
package com.salesforce.cdp.queryservice.core;

import com.google.common.annotations.VisibleForTesting;
import com.salesforce.cdp.queryservice.enums.QueryEngineEnum;
import com.salesforce.cdp.queryservice.model.QueryConfigResponse;
import com.salesforce.cdp.queryservice.model.Token;
import com.salesforce.cdp.queryservice.util.Constants;
import com.salesforce.cdp.queryservice.util.HttpHelper;
import com.salesforce.cdp.queryservice.util.QueryExecutor;
import com.salesforce.cdp.queryservice.util.TokenHelper;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.salesforce.cdp.queryservice.util.Messages.QUERY_CONFIG_ERROR;

@Slf4j
public class QueryServiceConnection implements Connection {

private static final String TEST_CONNECT_QUERY = "select 1";

private AtomicBoolean closed = new AtomicBoolean(false);
private Properties properties;
private final String serviceRootUrl;
Expand All @@ -42,12 +49,15 @@ public class QueryServiceConnection implements Connection {
private final boolean isSocksProxyDisabled;
private boolean enableStreamFlow = false;
private String tenantUrl;
private QueryEngineEnum queryEngineEnum;

private boolean isValid = false;

public QueryServiceConnection(String url, Properties properties) throws SQLException {
this.properties = properties; // fixme: do deepCopy and modify the props
this.serviceRootUrl = getServiceRootUrl(url);
this.properties.put(Constants.LOGIN_URL, serviceRootUrl);
addClientSecretsIfRequired(serviceRootUrl, this.properties);
addClientUsernameIfRequired(this.properties);

// default `enableArrowStream` is false
enableArrowStream = Boolean.parseBoolean(this.properties.getProperty(Constants.ENABLE_ARROW_STREAM));
Expand All @@ -57,8 +67,12 @@ public QueryServiceConnection(String url, Properties properties) throws SQLExcep

this.isSocksProxyDisabled = Boolean.parseBoolean(this.properties.getProperty(Constants.DISABLE_SOCKS_PROXY));

boolean isTableauConnection = Constants.TABLEAU_USER_AGENT_VALUE.equals(properties.getProperty(Constants.USER_AGENT));

// default `enableStreamFlow` is false
enableStreamFlow = Boolean.parseBoolean(this.properties.getProperty(Constants.ENABLE_STREAM_FLOW, Constants.FALSE_STR));
enableStreamFlow = isTableauConnection || Boolean.parseBoolean(this.properties.getProperty(Constants.ENABLE_STREAM_FLOW, Constants.FALSE_STR));

log.info("isTableauConnection {}, enableStreamFlow {}", isTableauConnection, enableStreamFlow);

// use isValid to test connection
this.isValid(20);
Expand Down Expand Up @@ -87,37 +101,14 @@ static String getServiceRootUrl(String url) throws SQLException {
/**
* Adds client secrets to properties if not present and service url matches one of the existing envs.
*
* @param serviceRootUrl service url which is used to infer the environment
* @param properties Properties containing the config
* @throws SQLException when given service url doesn't match any envs and config doesn't have exists secrets
*/
@VisibleForTesting
static void addClientSecretsIfRequired(String serviceRootUrl, Properties properties) throws SQLException {
static void addClientUsernameIfRequired(Properties properties) throws SQLException {
if (properties.containsKey(Constants.USER) && !properties.containsKey(Constants.USER_NAME)) {
properties.put(Constants.USER_NAME, properties.get(Constants.USER));
}

if (properties.containsKey(Constants.USER_NAME)
&& !properties.containsKey(Constants.CLIENT_ID)
&& !properties.containsKey(Constants.CLIENT_SECRET)
&& !properties.containsKey(Constants.PRIVATE_KEY)) {
log.debug("adding client secrets for server {}", serviceRootUrl);
String serverUrl = serviceRootUrl.toLowerCase();
if (serverUrl.endsWith(Constants.NA45_SERVER_URL)) {
properties.put(Constants.CLIENT_ID, Constants.NA45_DEFAULT_CLIENT_ID);
properties.put(Constants.CLIENT_SECRET, Constants.NA45_DEFAULT_CLIENT_SECRET);
} else if (serverUrl.endsWith(Constants.NA46_SERVER_URL)) {
properties.put(Constants.CLIENT_ID, Constants.NA46_DEFAULT_CLIENT_ID);
properties.put(Constants.CLIENT_SECRET, Constants.NA46_DEFAULT_CLIENT_SECRET);
} else if (serverUrl.endsWith(Constants.PROD_SERVER_URL)) {
properties.put(Constants.CLIENT_ID, Constants.PROD_DEFAULT_CLIENT_ID);
properties.put(Constants.CLIENT_SECRET, Constants.PROD_DEFAULT_CLIENT_SECRET);
} else {
throw new SQLException("specified url didn't match any existing envs");
}
} else {
log.debug("No client secrets added for server {}", serviceRootUrl);
}
}

public boolean getEnableArrowStream() {
Expand All @@ -141,6 +132,10 @@ public boolean updateStreamFlow(boolean flag) {
return enableStreamFlow;
}

public QueryEngineEnum getQueryEngineEnum() {
return queryEngineEnum;
}

@Override
public Statement createStatement() throws SQLException {
return createStatement(ResultSet.TYPE_FORWARD_ONLY,
Expand Down Expand Up @@ -351,17 +346,17 @@ public boolean isValid(int timeout) throws SQLException {
}

try {
PreparedStatement statement = this.prepareStatement(TEST_CONNECT_QUERY);
return statement.execute();
if(this.isValid) {
log.info("Reusing connection");
return true;
}

QueryConfigResponse configResponse = getQueryConfigResponse();
this.queryEngineEnum = QueryEngineEnum.fromValue(configResponse.getQueryengine());
this.isValid = true;
return true;
} catch (Exception e) {
log.error("Exception while connecting to server", e);
if(isEnableStreamFlow()) {
// use http v2 api if hyper gRPC call is failing
updateStreamFlow(false);
try(PreparedStatement statement = this.prepareStatement(TEST_CONNECT_QUERY)) {
return statement.execute();
}
}
throw e;
}
}
Expand Down Expand Up @@ -457,4 +452,20 @@ public String getTenantUrl() {
public void setTenantUrl(String tenantUrl) {
this.tenantUrl = tenantUrl;
}

private QueryExecutor createQueryExecutor() {
return new QueryExecutor(this);
}

QueryConfigResponse getQueryConfigResponse() throws SQLException {
try {
QueryExecutor executor = createQueryExecutor();
Response response = executor.getQueryConfig();

return HttpHelper.handleSuccessResponse(response, QueryConfigResponse.class, false);
} catch (IOException e) {
log.error("Exception while getting config from query service", e);
throw new SQLException(QUERY_CONFIG_ERROR, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.salesforce.cdp.queryservice.enums;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;

public enum QueryEngineEnum {
HYPER("hyper"),
TRINO("trino");

private final String value;

QueryEngineEnum(String value) {
this.value = value;
}

/**
* get QueryEngineEnum value.
*
* @param value String value
* @return QueryEngineEnum
*/
@JsonCreator
public static QueryEngineEnum fromValue(String value) {
for (QueryEngineEnum queryEngineEnum : QueryEngineEnum.values()) {
if (queryEngineEnum.value.equals(value)) {
return queryEngineEnum;
}
}

throw new IllegalArgumentException("Unexpected value for QueryEngineEnum: '" + value + "'");
}

@Override
@JsonValue
public String toString() {
return String.valueOf(this.value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.salesforce.cdp.queryservice.model;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class QueryConfigResponse {
String queryengine;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class Constants {
public static final String CDP_URL_V2 = "/api/v2";
public static final String ANSI_SQL_URL = "/query";
public static final String METADATA_URL = "/metadata";
public static final String QUERY_CONFIG_URL = "/query-config";
public static final String TOKEN_EXCHANGE_URL = "/services/a360/token";
public static final String TOKEN_REVOKE_URL = "/services/oauth2/revoke";
public static final String CORE_TOKEN_URL = "/services/oauth2/token";
Expand All @@ -34,18 +35,10 @@ public class Constants {
public static final String DRIVER_NAME = "QueryService-jdbc";
public static final String DRIVER_VERSION = "1.0";

// Common client id and secret information
// Common server information
public static final String PROD_SERVER_URL = ".salesforce.com";
public static final String PROD_DEFAULT_CLIENT_ID = "3MVG9VeAQy5y3BQVJqaUbFmV5jd8imcck2K5idmrTTGocSu9qZZ6qkbuEkxECKVYwmzm3WgvxkujqsxZDcBpL";
public static final String PROD_DEFAULT_CLIENT_SECRET = "1007FFFBA2B6B6B1EF21E2B03F4C4F692ADE10AB7DEF19E00D8AAF85EF6F6A12";

public static final String NA45_SERVER_URL = "na45.test1.pc-rnd.salesforce.com";
public static final String NA45_DEFAULT_CLIENT_ID = "3MVG9XjhiDAzhaqaC4RR0yon8blsafwWlTnckUT8bEduWr0v9UpiQ2cJkmhNtFI1kVqpY8WpyE9JYkG.ZtgiE";
public static final String NA45_DEFAULT_CLIENT_SECRET = "84642974065EDF90CA6F30FFEE23E2C16BDFA84D3083EF90E0AE905FA46131AD";

public static final String NA46_SERVER_URL = "na46.test1.pc-rnd.salesforce.com";
public static final String NA46_DEFAULT_CLIENT_ID = "3MVG9sA57VMGPDfeS67yma6IPflHn83FRhxVpmnuzp7R8uS42JYshQ7gWgWR63CQRgKL9gY5AfitSme.01ib6";
public static final String NA46_DEFAULT_CLIENT_SECRET = "BDAF015C3D2418008842CAE91B0C8DD2D672B41707FB11EA3CFC5A5392E31866";

//Audience constants for different environments
public static final String PROD_SERVER_AUD = "login.salesforce.com";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ public class Messages {

public static String RENEW_TOKEN = "Failed to Renew Token. Please retry";

public static String QUERY_CONFIG_ERROR = "Failed to get config from the server";

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,20 @@ public Response getMetadata() throws IOException, SQLException {
return getResponse(request);
}

public Response getQueryConfig() throws IOException, SQLException {
log.info("Getting query config from CDP Query Service");
Map<String, String> tokenWithTenantUrl = getTokenWithTenantUrl();
String url = Constants.PROTOCOL + tokenWithTenantUrl.get(Constants.TENANT_URL)
+ Constants.CDP_URL
+ Constants.QUERY_CONFIG_URL;

Map<String, String> headers = createHeaders(tokenWithTenantUrl, false);
headers.put(Constants.ENABLE_STREAM_FLOW, String.valueOf(this.connection.isEnableStreamFlow()));

Request request = HttpHelper.buildRequest(Constants.GET, url, null, headers);
return getResponse(request);
}

private Map<String, String> createHeaders(Map<String, String> tokenWithTenantUrl, boolean enableArrowStream) throws SQLException {
Properties properties = connection.getClientInfo();
Map<String, String> headers = new HashMap<>();
Expand All @@ -121,7 +135,7 @@ protected Response getResponse(Request request) throws IOException {
// use queryClient to fetch metadata or to execute the query
Response response = queryClient.newCall(request).execute();
long endTime = System.currentTimeMillis();
log.info("Total time taken to get response for url {} is {} ms", request.url(), endTime - startTime);
log.info("Total time taken to get response for url {} is {} ms and traceid {}", request.url(), endTime - startTime, response.headers(Constants.TRACE_ID));
return response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,16 @@
package com.salesforce.cdp.queryservice.util;

import com.salesforce.cdp.queryservice.core.QueryServiceConnection;
import com.salesforce.cdp.queryservice.enums.QueryEngineEnum;
import com.salesforce.cdp.queryservice.interceptors.MetadataCacheInterceptor;
import com.salesforce.cdp.queryservice.interceptors.RetryInterceptor;
import com.salesforce.cdp.queryservice.model.Token;
import com.salesforce.cdp.queryservice.util.internal.SFDefaultSocketFactoryWrapper;
import lombok.extern.slf4j.Slf4j;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.FailsafeException;
import net.jodah.failsafe.RetryPolicy;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;

import java.io.IOException;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -71,7 +68,7 @@ public QueryTokenExecutor(QueryServiceConnection connection, OkHttpClient client
this.client = updateClientWithSocketFactory(client, connection.isSocksProxyDisabled());

// set TenantUrl in connection. This is mandatory in gRPC flow.
if(connection.isEnableStreamFlow()) {
if(QueryEngineEnum.HYPER == connection.getQueryEngineEnum()) {
try {
Map<String, String> token = getTokenWithTenantUrl();
connection.setTenantUrl(token.get(Constants.TENANT_URL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ public static Map<String, String> getTokenWithUrl(Token token) {
return tokenWithUrlMap;
}

public static boolean tokenExistsInCache(String coreToken) {
return tokenCache.getIfPresent(coreToken) != null;
}

private static void clearToken(String tokenKey) {
tokenCache.invalidate(tokenKey);
}
Expand Down
Loading

0 comments on commit aeb6b11

Please sign in to comment.