diff --git a/README.md b/README.md index b0e7a61..918bfe0 100644 --- a/README.md +++ b/README.md @@ -142,17 +142,16 @@ Class.forName("com.salesforce.cdp.queryservice.QueryServiceDriver"); Connection connection = DriverManager.getConnection("jdbc:queryService-jdbc:https://login.salesforce.com", properties); ``` -Create Statements/ Prepared Statements to execute Query and get ResultSet +Create Statements to execute Query and get ResultSet ``` -PreparedStatement preparedStatement = connection.prepareStatement("select FirstName__c, BirthDate__c, YearlyIncome__c from Individual__dlm where FirstName__c = ? and YearlyIncome__c > ?"); - preparedStatement.setString(0, "Angella"); - preparedStatement.setInt(1, 1000); - -ResultSet resultSet = preparedStatement.executeQuery(); +Statement statement = connection.createStatement(); +ResultSet resultSet = statement.executeQuery("select FirstName__c, BirthDate__c, YearlyIncome__c from Individual__dlm where FirstName__c = 'Angella' and YearlyIncome__c > 1000"); while (resultSet.next()) { - log.info("FirstName : {}, BirthDate__c : {}, YearlyIncome__c : {}", resultSet.getString("FirstName__c"), resultSet.getTimestamp("BirthDate__c"), resultSet.getInt("YearlyIncome__c")); + log.info("FirstName : {}, BirthDate__c : {}, YearlyIncome__c : {}", resultSet.getString("FirstName__c"), resultSet.getTimestamp("BirthDate__c"), resultSet.getInt("YearlyIncome__c")); +} ``` +_Note: We are not supporting PreparedStatement in the driver due to lack of parameters support in the query APIs._ # Python Code diff --git a/src/main/java/com/salesforce/cdp/queryservice/auth/TokenExchangeHelper.java b/src/main/java/com/salesforce/cdp/queryservice/auth/TokenExchangeHelper.java index e2739d2..fa2e1ef 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/auth/TokenExchangeHelper.java +++ b/src/main/java/com/salesforce/cdp/queryservice/auth/TokenExchangeHelper.java @@ -52,7 +52,7 @@ private OffcoreToken exchangeToken(String url, String coreToken, String dataspac Response response = null; try { response = login(requestBody, token_url); - OffcoreToken token = HttpHelper.handleSuccessResponse(response, OffcoreToken.class, false); + OffcoreToken token = HttpHelper.handleSuccessResponse(response, OffcoreToken.class); if (token.getErrorDescription() != null) { log.error("Token exchange failed with error {}", token.getErrorDescription()); TokenUtils.invalidateCoreToken(url, coreToken, client); diff --git a/src/main/java/com/salesforce/cdp/queryservice/auth/jwt/JwtLoginClient.java b/src/main/java/com/salesforce/cdp/queryservice/auth/jwt/JwtLoginClient.java index 6e1d7db..dee8f91 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/auth/jwt/JwtLoginClient.java +++ b/src/main/java/com/salesforce/cdp/queryservice/auth/jwt/JwtLoginClient.java @@ -58,7 +58,7 @@ public CoreToken keyPairAuthLogin( log.error("login with user credentials failed with status code {}", response.code()); HttpHelper.handleErrorResponse(response, Constants.ERROR_DESCRIPTION); } - return HttpHelper.handleSuccessResponse(response, CoreToken.class, false); + return HttpHelper.handleSuccessResponse(response, CoreToken.class); } catch (IOException e) { log.error("login with user credentials failed", e); throw new TokenException(FAILED_LOGIN, e); diff --git a/src/main/java/com/salesforce/cdp/queryservice/auth/refresh/RefreshTokenClient.java b/src/main/java/com/salesforce/cdp/queryservice/auth/refresh/RefreshTokenClient.java index aca9818..f617480 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/auth/refresh/RefreshTokenClient.java +++ b/src/main/java/com/salesforce/cdp/queryservice/auth/refresh/RefreshTokenClient.java @@ -37,7 +37,7 @@ public CoreToken getCoreToken(String url, String refreshToken, String clientId, requestBody.put(Constants.REFRESH_TOKEN_GRANT_TYPE, refreshToken); try { Response response = login(requestBody, token_url); - return HttpHelper.handleSuccessResponse(response, CoreToken.class, false); + return HttpHelper.handleSuccessResponse(response, CoreToken.class); } catch (IOException e) { log.error("Caught exception while renewing the core token", e); diff --git a/src/main/java/com/salesforce/cdp/queryservice/auth/unpwd/UnPwdAuthClient.java b/src/main/java/com/salesforce/cdp/queryservice/auth/unpwd/UnPwdAuthClient.java index b37402a..8b9a253 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/auth/unpwd/UnPwdAuthClient.java +++ b/src/main/java/com/salesforce/cdp/queryservice/auth/unpwd/UnPwdAuthClient.java @@ -59,7 +59,7 @@ public CoreToken un_pw_login( log.error("login with user credentials failed with status code {}", response.code()); HttpHelper.handleErrorResponse(response, Constants.ERROR_DESCRIPTION); } - return HttpHelper.handleSuccessResponse(response, CoreToken.class, false); + return HttpHelper.handleSuccessResponse(response, CoreToken.class); } catch (IOException e) { log.error("login with user credentials failed", e); throw new TokenException(FAILED_LOGIN, e); diff --git a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceAbstractStatement.java b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceAbstractStatement.java index b36f929..77e1278 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceAbstractStatement.java +++ b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceAbstractStatement.java @@ -99,7 +99,7 @@ public ResultSet executeQuery(String sql) throws SQLException { log.error("Request query {} failed with response code {} and trace-Id {}", sql, response.code(), response.headers().get(Constants.TRACE_ID)); HttpHelper.handleErrorResponse(response, Constants.MESSAGE); } - QueryServiceResponse queryServiceResponse = HttpHelper.handleSuccessResponse(response, QueryServiceResponse.class, false); + QueryServiceResponse queryServiceResponse = HttpHelper.handleSuccessResponse(response, QueryServiceResponse.class); return createResultSetFromResponse(queryServiceResponse, isCursorBasedPaginationReq); } } catch (IOException e) { @@ -115,7 +115,7 @@ public ResultSet executeNextBatchQuery(String nextBatchId) throws SQLException { log.error("Request query {} failed with response code {} and trace-Id {}", sql, response.code(), response.headers().get(Constants.TRACE_ID)); HttpHelper.handleErrorResponse(response, Constants.MESSAGE); } - QueryServiceResponse queryServiceResponse = HttpHelper.handleSuccessResponse(response, QueryServiceResponse.class, false); + QueryServiceResponse queryServiceResponse = HttpHelper.handleSuccessResponse(response, QueryServiceResponse.class); return createResultSetFromResponse(queryServiceResponse, true); } catch (IOException e) { log.error("Exception while running the query", e); diff --git a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java index eb7a2d4..2226b23 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java +++ b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java @@ -23,9 +23,11 @@ import com.salesforce.cdp.queryservice.util.Constants; import com.salesforce.cdp.queryservice.util.HttpHelper; import com.salesforce.cdp.queryservice.util.QueryExecutor; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import okhttp3.Response; import org.apache.commons.lang3.StringUtils; +import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.sql.*; @@ -188,7 +190,7 @@ public boolean isClosed() throws SQLException { } @Override - public DatabaseMetaData getMetaData() throws SQLException { + public DatabaseMetaData getMetaData() { return new QueryServiceMetadata(this, serviceRootUrl, properties); } @@ -460,7 +462,7 @@ QueryConfigResponse getQueryConfigResponse() throws SQLException { QueryExecutor executor = createQueryExecutor(); Response response = executor.getQueryConfig(); - return HttpHelper.handleSuccessResponse(response, QueryConfigResponse.class, false); + return HttpHelper.handleSuccessResponse(response, QueryConfigResponse.class); } catch (IOException e) { log.error("Exception while getting config from query service", e); throw new SQLException(QUERY_CONFIG_ERROR, e); diff --git a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceMetadata.java b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceMetadata.java index 1a099e7..ea07c83 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceMetadata.java +++ b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceMetadata.java @@ -22,7 +22,6 @@ import com.salesforce.cdp.queryservice.model.TableMetadata; import com.salesforce.cdp.queryservice.util.Constants; import com.salesforce.cdp.queryservice.util.HttpHelper; -import static com.salesforce.cdp.queryservice.util.Messages.METADATA_EXCEPTION; import com.salesforce.cdp.queryservice.util.QueryExecutor; import com.salesforce.cdp.queryservice.util.Utils; import lombok.extern.slf4j.Slf4j; @@ -31,11 +30,30 @@ import org.apache.commons.lang3.StringUtils; import java.io.IOException; -import java.sql.*; -import java.util.*; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.RowIdLifetime; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.regex.Pattern; -import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.*; +import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.EMPTY; +import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.GET_CATALOGS; +import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.GET_COLUMNS; +import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.GET_PRIMARY_KEYS; +import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.GET_SCHEMAS; +import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.GET_TABLES; +import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.GET_TABLE_PRIVILEGES; +import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.GET_TABLE_TYPES; +import static com.salesforce.cdp.queryservice.util.Messages.METADATA_EXCEPTION; @Slf4j public class QueryServiceMetadata implements DatabaseMetaData { @@ -729,7 +747,7 @@ MetadataResponse getMetadataResponse() throws SQLException { response.code(), response.headers().get(Constants.TRACE_ID)); HttpHelper.handleErrorResponse(response, Constants.MESSAGE); } - return HttpHelper.handleSuccessResponse(response, MetadataResponse.class, true); + return HttpHelper.handleSuccessResponse(response, MetadataResponse.class); } catch (IOException e) { log.error("Exception while getting metadata from query service", e); throw new SQLException(METADATA_EXCEPTION, e); @@ -1101,6 +1119,6 @@ private ResultSet createColumnResultSet(MetadataResponse metadataResponse, Strin } protected QueryExecutor createQueryExecutor() { - return new QueryExecutor(queryServiceConnection); + return new QueryExecutor(queryServiceConnection, true); } } diff --git a/src/main/java/com/salesforce/cdp/queryservice/interceptors/MetadataCacheInterceptor.java b/src/main/java/com/salesforce/cdp/queryservice/interceptors/MetadataCacheInterceptor.java index 9f1108c..cece931 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/interceptors/MetadataCacheInterceptor.java +++ b/src/main/java/com/salesforce/cdp/queryservice/interceptors/MetadataCacheInterceptor.java @@ -16,35 +16,60 @@ package com.salesforce.cdp.queryservice.interceptors; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.salesforce.cdp.queryservice.util.Constants; -import com.salesforce.cdp.queryservice.util.MetadataCacheUtil; import lombok.extern.slf4j.Slf4j; -import okhttp3.*; +import okhttp3.Interceptor; +import okhttp3.MediaType; +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; import org.apache.http.HttpStatus; import org.jetbrains.annotations.NotNull; import java.io.IOException; +import java.util.concurrent.TimeUnit; @Slf4j public class MetadataCacheInterceptor implements Interceptor { + private final Cache metaDataCache; + + public MetadataCacheInterceptor(int metaDataCacheDurationInMs) { + this.metaDataCache = CacheBuilder.newBuilder() + .expireAfterWrite(metaDataCacheDurationInMs, TimeUnit.MILLISECONDS) + .maximumSize(10).build(); + } @NotNull @Override public Response intercept(@NotNull Chain chain) throws IOException { Request request = chain.request(); + String cacheKey = request.url().toString(); Response response; - String responseString = MetadataCacheUtil.getMetadata(request.url().toString()); + String responseString = metaDataCache.getIfPresent(cacheKey); + + Response.Builder responseBuilder = new Response.Builder().code(HttpStatus.SC_OK). + request(request).protocol(Protocol.HTTP_1_1). + message("OK"); + if (responseString != null) { log.trace("Getting the metadata response from local cache"); - response = new Response.Builder().code(HttpStatus.SC_OK). - request(request).protocol(Protocol.HTTP_1_1). - message("OK"). - addHeader("from-local-cache", Constants.TRUE_STR). - body(ResponseBody.create(responseString, MediaType.parse(Constants.JSON_CONTENT))).build(); } else { log.trace("Cache miss for metadata response. Getting from server"); response = chain.proceed(request); + + if(!response.isSuccessful()) { + return response; + } else { + log.info("Caching the response"); + responseString = response.body().string(); + metaDataCache.put(cacheKey, responseString); + } } - return response; + + responseBuilder.body(ResponseBody.create(responseString, MediaType.parse(Constants.JSON_CONTENT))); + return responseBuilder.build(); } } diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/Constants.java b/src/main/java/com/salesforce/cdp/queryservice/util/Constants.java index dfc82f9..8000104 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/Constants.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/Constants.java @@ -77,6 +77,7 @@ public class Constants { public static final String PD = "password"; public static final String PRIVATE_KEY = "privateKey"; public static final String MAX_RETRIES = "maxRetries"; + public static final String RESULT_SET_METADATA_CACHE_DURATION_IN_MS = "metaDataCacheDurationInMs"; // Response Constants public static final String MESSAGE = "message"; @@ -89,6 +90,7 @@ public class Constants { // Integer Constants public static final int REST_TIME_OUT = 600; public static final Integer MAX_LIMIT = 49999; + public static final int RESULT_SET_METADATA_CACHE_DURATION_IN_MS_VALUE = 600000; // Token Constants public static final String GRANT_TYPE_NAME = "grant_type"; diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/HttpHelper.java b/src/main/java/com/salesforce/cdp/queryservice/util/HttpHelper.java index c92ab1d..b6d2246 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/HttpHelper.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/HttpHelper.java @@ -45,12 +45,8 @@ public static void handleErrorResponse(String response, String propertyName) thr throw new IOException(message); } - public static T handleSuccessResponse(Response response, Class type, boolean cacheResponse) throws IOException { + public static T handleSuccessResponse(Response response, Class type) throws IOException { String responseString = response.body().string(); - if (response.headers().get("from-local-cache") == null && cacheResponse) { - log.info("Caching the response"); - MetadataCacheUtil.cacheMetadata(response.request().url().toString(), responseString); - } return handleSuccessResponse(responseString, type); } diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/QueryExecutor.java b/src/main/java/com/salesforce/cdp/queryservice/util/QueryExecutor.java index 6814856..d5b1266 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/QueryExecutor.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/QueryExecutor.java @@ -46,23 +46,22 @@ public class QueryExecutor extends QueryTokenExecutor { } private final OkHttpClient queryClient; - public QueryExecutor(QueryServiceConnection connection) { - this(connection, null, null); + this(connection, null, null, null, false); } - public QueryExecutor(QueryServiceConnection connection, OkHttpClient tokenClient, OkHttpClient client) { - this(connection, tokenClient, client, null); + public QueryExecutor(QueryServiceConnection connection, boolean cached) { + this(connection, null, null, null, cached); } - QueryExecutor(QueryServiceConnection connection, OkHttpClient tokenClient, OkHttpClient client, TokenManager tokenManager) { + QueryExecutor(QueryServiceConnection connection, OkHttpClient tokenClient, OkHttpClient client, TokenManager tokenManager, boolean cached) { super(connection, tokenClient, tokenManager); - queryClient = getQueryClient(connection, client); + queryClient = getQueryClient(connection, client, cached); } - private OkHttpClient getQueryClient(QueryServiceConnection connection, OkHttpClient client) { + private OkHttpClient getQueryClient(QueryServiceConnection connection, OkHttpClient client, boolean cached) { client = client == null ? DEFAULT_QUERY_CLIENT : client; - return updateClientWithSocketFactory(client, connection.isSocksProxyDisabled()); + return updateClientWithSocketFactory(client, connection.isSocksProxyDisabled(), cached); } public Response executeQuery(String sql, boolean isV2Query, Optional limit, Optional offset, Optional orderby) throws IOException, SQLException { diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/QueryTokenExecutor.java b/src/main/java/com/salesforce/cdp/queryservice/util/QueryTokenExecutor.java index 73190b4..0725ae5 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/QueryTokenExecutor.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/QueryTokenExecutor.java @@ -50,7 +50,6 @@ public class QueryTokenExecutor { .callTimeout(Constants.REST_TIME_OUT, TimeUnit.SECONDS) .retryOnConnectionFailure(true) .socketFactory(new SFDefaultSocketFactoryWrapper(false)) - .addInterceptor(new MetadataCacheInterceptor()) .build(); } @@ -86,7 +85,7 @@ public QueryTokenExecutor(QueryServiceConnection connection, OkHttpClient client } // this makes query executor not reuse across requests - this.client = updateClientWithSocketFactory(client, connection.isSocksProxyDisabled()); + this.client = updateClientWithSocketFactory(client, connection.isSocksProxyDisabled(), false); // set TenantUrl in connection. This is mandatory in gRPC flow. if(QueryEngineEnum.HYPER == connection.getQueryEngineEnum()) { @@ -143,13 +142,25 @@ private int getMaxRetries(Properties properties) { } } - protected static OkHttpClient updateClientWithSocketFactory(OkHttpClient client, boolean isSocksProxyDisabled) { + protected OkHttpClient updateClientWithSocketFactory(OkHttpClient client, boolean isSocksProxyDisabled, boolean cached) { + OkHttpClient.Builder builder = client.newBuilder(); if (isSocksProxyDisabled) { - return client.newBuilder() - .socketFactory(new SFDefaultSocketFactoryWrapper(true)) - .build(); + builder.socketFactory(new SFDefaultSocketFactoryWrapper(true)); } - return client; + int metaDataCacheDurationInMs; + + if(cached) { + try { + String defaultValue = String.valueOf(Constants.RESULT_SET_METADATA_CACHE_DURATION_IN_MS_VALUE); + Properties clientInfo = this.connection.getClientInfo(); + String metadataProperty = clientInfo.getProperty(Constants.RESULT_SET_METADATA_CACHE_DURATION_IN_MS, defaultValue); + metaDataCacheDurationInMs = Integer.parseInt(metadataProperty); + } catch (SQLException e) { + metaDataCacheDurationInMs = Constants.RESULT_SET_METADATA_CACHE_DURATION_IN_MS_VALUE; + } + builder.addInterceptor(new MetadataCacheInterceptor(metaDataCacheDurationInMs)); + } + return builder.build(); } protected CoreToken getCoreToken() throws SQLException, TokenException { return tokenManager.getCoreToken(); diff --git a/src/test/java/com/salesforce/cdp/queryservice/interceptors/MetadataCacheInterceptorTest.java b/src/test/java/com/salesforce/cdp/queryservice/interceptors/MetadataCacheInterceptorTest.java index 0cf1ba8..e679700 100644 --- a/src/test/java/com/salesforce/cdp/queryservice/interceptors/MetadataCacheInterceptorTest.java +++ b/src/test/java/com/salesforce/cdp/queryservice/interceptors/MetadataCacheInterceptorTest.java @@ -17,11 +17,12 @@ package com.salesforce.cdp.queryservice.interceptors; import com.salesforce.cdp.queryservice.ResponseEnum; +import com.salesforce.cdp.queryservice.core.QueryServiceConnection; import com.salesforce.cdp.queryservice.util.Constants; -import com.salesforce.cdp.queryservice.util.MetadataCacheUtil; import okhttp3.*; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; import java.io.IOException; @@ -35,11 +36,14 @@ public class MetadataCacheInterceptorTest { private Interceptor.Chain chain; private MetadataCacheInterceptor metadataCacheInterceptor; + @Mock + QueryServiceConnection connection; @Before public void init() { chain = mock(Interceptor.Chain.class); - metadataCacheInterceptor = new MetadataCacheInterceptor(); + connection = mock(QueryServiceConnection.class); + metadataCacheInterceptor = new MetadataCacheInterceptor(600000); doReturn(buildRequest()).when(chain).request(); } @@ -50,13 +54,6 @@ public void testMetadataRequestWithNoCachePresent() throws IOException { verify(chain, times(1)).proceed(any(Request.class)); } - @Test - public void testMetadataFromCache() throws IOException { - MetadataCacheUtil.cacheMetadata("https://mjrgg9bzgy2dsyzvmjrgkmzzg1.c360a.salesforce.com" + Constants.CDP_URL + Constants.METADATA_URL, TABLE_METADATA.getResponse()); - metadataCacheInterceptor.intercept(chain); - verify(chain, times(0)).proceed(any(Request.class)); - } - private Request buildRequest() { return new Request.Builder() .url("https://mjrgg9bzgy2dsyzvmjrgkmzzg1.c360a.salesforce.com" + Constants.CDP_URL + Constants.METADATA_URL) diff --git a/src/test/java/com/salesforce/cdp/queryservice/util/QueryExecutorTest.java b/src/test/java/com/salesforce/cdp/queryservice/util/QueryExecutorTest.java index 1afad55..2039d6c 100644 --- a/src/test/java/com/salesforce/cdp/queryservice/util/QueryExecutorTest.java +++ b/src/test/java/com/salesforce/cdp/queryservice/util/QueryExecutorTest.java @@ -58,7 +58,7 @@ public void init() throws Exception { doNothing().when(connection).setToken(any()); TokenManager mockTokenManager = mock(TokenManager.class); when(mockTokenManager.getOffcoreToken()).thenReturn(getToken()); - queryExecutor = new QueryExecutor(connection, null, null, mockTokenManager) { + queryExecutor = new QueryExecutor(connection, null, null, mockTokenManager, false) { @Override protected OkHttpClient createClient() { return mock(OkHttpClient.class);