Skip to content

Commit

Permalink
@W-16008197: Fix for caching the metadata across connections (#176)
Browse files Browse the repository at this point in the history
This commit resolves the caching problem that occurred when two different connections were used, resulting in the same metadata being retrieved due to caching based on the tenantUrl. The cache key has now been updated to include both the tenantUrl plus metadata path. Additionally, updating documentation by removing references to PreparedStatements.

Co-authored-by: Vishnu Prasad <[email protected]>
Co-authored-by: Vishnu Prasad V S <[email protected]>
  • Loading branch information
3 people authored Jul 16, 2024
1 parent 3bdaee1 commit cd2fbca
Show file tree
Hide file tree
Showing 15 changed files with 109 additions and 60 deletions.
13 changes: 6 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,16 @@ Class.forName("com.salesforce.cdp.queryservice.QueryServiceDriver");
Connection connection = DriverManager.getConnection("jdbc:queryService-jdbc:https://login.salesforce.com", properties);
```

Create Statements/ Prepared Statements to execute Query and get ResultSet
Create Statements to execute Query and get ResultSet
```
PreparedStatement preparedStatement = connection.prepareStatement("select FirstName__c, BirthDate__c, YearlyIncome__c from Individual__dlm where FirstName__c = ? and YearlyIncome__c > ?");
preparedStatement.setString(0, "Angella");
preparedStatement.setInt(1, 1000);
ResultSet resultSet = preparedStatement.executeQuery();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select FirstName__c, BirthDate__c, YearlyIncome__c from Individual__dlm where FirstName__c = 'Angella' and YearlyIncome__c > 1000");
while (resultSet.next()) {
log.info("FirstName : {}, BirthDate__c : {}, YearlyIncome__c : {}", resultSet.getString("FirstName__c"), resultSet.getTimestamp("BirthDate__c"), resultSet.getInt("YearlyIncome__c"));
log.info("FirstName : {}, BirthDate__c : {}, YearlyIncome__c : {}", resultSet.getString("FirstName__c"), resultSet.getTimestamp("BirthDate__c"), resultSet.getInt("YearlyIncome__c"));
}
```
_Note: We are not supporting PreparedStatement in the driver due to lack of parameters support in the query APIs._


# Python Code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private OffcoreToken exchangeToken(String url, String coreToken, String dataspac
Response response = null;
try {
response = login(requestBody, token_url);
OffcoreToken token = HttpHelper.handleSuccessResponse(response, OffcoreToken.class, false);
OffcoreToken token = HttpHelper.handleSuccessResponse(response, OffcoreToken.class);
if (token.getErrorDescription() != null) {
log.error("Token exchange failed with error {}", token.getErrorDescription());
TokenUtils.invalidateCoreToken(url, coreToken, client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public CoreToken keyPairAuthLogin(
log.error("login with user credentials failed with status code {}", response.code());
HttpHelper.handleErrorResponse(response, Constants.ERROR_DESCRIPTION);
}
return HttpHelper.handleSuccessResponse(response, CoreToken.class, false);
return HttpHelper.handleSuccessResponse(response, CoreToken.class);
} catch (IOException e) {
log.error("login with user credentials failed", e);
throw new TokenException(FAILED_LOGIN, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public CoreToken getCoreToken(String url, String refreshToken, String clientId,
requestBody.put(Constants.REFRESH_TOKEN_GRANT_TYPE, refreshToken);
try {
Response response = login(requestBody, token_url);
return HttpHelper.handleSuccessResponse(response, CoreToken.class, false);
return HttpHelper.handleSuccessResponse(response, CoreToken.class);
}
catch (IOException e) {
log.error("Caught exception while renewing the core token", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public CoreToken un_pw_login(
log.error("login with user credentials failed with status code {}", response.code());
HttpHelper.handleErrorResponse(response, Constants.ERROR_DESCRIPTION);
}
return HttpHelper.handleSuccessResponse(response, CoreToken.class, false);
return HttpHelper.handleSuccessResponse(response, CoreToken.class);
} catch (IOException e) {
log.error("login with user credentials failed", e);
throw new TokenException(FAILED_LOGIN, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public ResultSet executeQuery(String sql) throws SQLException {
log.error("Request query {} failed with response code {} and trace-Id {}", sql, response.code(), response.headers().get(Constants.TRACE_ID));
HttpHelper.handleErrorResponse(response, Constants.MESSAGE);
}
QueryServiceResponse queryServiceResponse = HttpHelper.handleSuccessResponse(response, QueryServiceResponse.class, false);
QueryServiceResponse queryServiceResponse = HttpHelper.handleSuccessResponse(response, QueryServiceResponse.class);
return createResultSetFromResponse(queryServiceResponse, isCursorBasedPaginationReq);
}
} catch (IOException e) {
Expand All @@ -115,7 +115,7 @@ public ResultSet executeNextBatchQuery(String nextBatchId) throws SQLException {
log.error("Request query {} failed with response code {} and trace-Id {}", sql, response.code(), response.headers().get(Constants.TRACE_ID));
HttpHelper.handleErrorResponse(response, Constants.MESSAGE);
}
QueryServiceResponse queryServiceResponse = HttpHelper.handleSuccessResponse(response, QueryServiceResponse.class, false);
QueryServiceResponse queryServiceResponse = HttpHelper.handleSuccessResponse(response, QueryServiceResponse.class);
return createResultSetFromResponse(queryServiceResponse, true);
} catch (IOException e) {
log.error("Exception while running the query", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import com.salesforce.cdp.queryservice.util.Constants;
import com.salesforce.cdp.queryservice.util.HttpHelper;
import com.salesforce.cdp.queryservice.util.QueryExecutor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.sql.*;
Expand Down Expand Up @@ -188,7 +190,7 @@ public boolean isClosed() throws SQLException {
}

@Override
public DatabaseMetaData getMetaData() throws SQLException {
public DatabaseMetaData getMetaData() {
return new QueryServiceMetadata(this, serviceRootUrl, properties);
}

Expand Down Expand Up @@ -460,7 +462,7 @@ QueryConfigResponse getQueryConfigResponse() throws SQLException {
QueryExecutor executor = createQueryExecutor();
Response response = executor.getQueryConfig();

return HttpHelper.handleSuccessResponse(response, QueryConfigResponse.class, false);
return HttpHelper.handleSuccessResponse(response, QueryConfigResponse.class);
} catch (IOException e) {
log.error("Exception while getting config from query service", e);
throw new SQLException(QUERY_CONFIG_ERROR, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.salesforce.cdp.queryservice.model.TableMetadata;
import com.salesforce.cdp.queryservice.util.Constants;
import com.salesforce.cdp.queryservice.util.HttpHelper;
import static com.salesforce.cdp.queryservice.util.Messages.METADATA_EXCEPTION;
import com.salesforce.cdp.queryservice.util.QueryExecutor;
import com.salesforce.cdp.queryservice.util.Utils;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -31,11 +30,30 @@
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.sql.*;
import java.util.*;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.RowIdLifetime;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;

import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.*;
import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.EMPTY;
import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.GET_CATALOGS;
import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.GET_COLUMNS;
import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.GET_PRIMARY_KEYS;
import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.GET_SCHEMAS;
import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.GET_TABLES;
import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.GET_TABLE_PRIVILEGES;
import static com.salesforce.cdp.queryservice.core.QueryServiceDbMetadata.GET_TABLE_TYPES;
import static com.salesforce.cdp.queryservice.util.Messages.METADATA_EXCEPTION;

@Slf4j
public class QueryServiceMetadata implements DatabaseMetaData {
Expand Down Expand Up @@ -729,7 +747,7 @@ MetadataResponse getMetadataResponse() throws SQLException {
response.code(), response.headers().get(Constants.TRACE_ID));
HttpHelper.handleErrorResponse(response, Constants.MESSAGE);
}
return HttpHelper.handleSuccessResponse(response, MetadataResponse.class, true);
return HttpHelper.handleSuccessResponse(response, MetadataResponse.class);
} catch (IOException e) {
log.error("Exception while getting metadata from query service", e);
throw new SQLException(METADATA_EXCEPTION, e);
Expand Down Expand Up @@ -1101,6 +1119,6 @@ private ResultSet createColumnResultSet(MetadataResponse metadataResponse, Strin
}

protected QueryExecutor createQueryExecutor() {
return new QueryExecutor(queryServiceConnection);
return new QueryExecutor(queryServiceConnection, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,60 @@

package com.salesforce.cdp.queryservice.interceptors;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.salesforce.cdp.queryservice.util.Constants;
import com.salesforce.cdp.queryservice.util.MetadataCacheUtil;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import okhttp3.Interceptor;
import okhttp3.MediaType;
import okhttp3.Protocol;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.http.HttpStatus;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

@Slf4j
public class MetadataCacheInterceptor implements Interceptor {
private final Cache<String, String> metaDataCache;

public MetadataCacheInterceptor(int metaDataCacheDurationInMs) {
this.metaDataCache = CacheBuilder.newBuilder()
.expireAfterWrite(metaDataCacheDurationInMs, TimeUnit.MILLISECONDS)
.maximumSize(10).build();
}

@NotNull
@Override
public Response intercept(@NotNull Chain chain) throws IOException {
Request request = chain.request();
String cacheKey = request.url().toString();
Response response;
String responseString = MetadataCacheUtil.getMetadata(request.url().toString());
String responseString = metaDataCache.getIfPresent(cacheKey);

Response.Builder responseBuilder = new Response.Builder().code(HttpStatus.SC_OK).
request(request).protocol(Protocol.HTTP_1_1).
message("OK");

if (responseString != null) {
log.trace("Getting the metadata response from local cache");
response = new Response.Builder().code(HttpStatus.SC_OK).
request(request).protocol(Protocol.HTTP_1_1).
message("OK").
addHeader("from-local-cache", Constants.TRUE_STR).
body(ResponseBody.create(responseString, MediaType.parse(Constants.JSON_CONTENT))).build();
} else {
log.trace("Cache miss for metadata response. Getting from server");
response = chain.proceed(request);

if(!response.isSuccessful()) {
return response;
} else {
log.info("Caching the response");
responseString = response.body().string();
metaDataCache.put(cacheKey, responseString);
}
}
return response;

responseBuilder.body(ResponseBody.create(responseString, MediaType.parse(Constants.JSON_CONTENT)));
return responseBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class Constants {
public static final String PD = "password";
public static final String PRIVATE_KEY = "privateKey";
public static final String MAX_RETRIES = "maxRetries";
public static final String RESULT_SET_METADATA_CACHE_DURATION_IN_MS = "metaDataCacheDurationInMs";

// Response Constants
public static final String MESSAGE = "message";
Expand All @@ -89,6 +90,7 @@ public class Constants {
// Integer Constants
public static final int REST_TIME_OUT = 600;
public static final Integer MAX_LIMIT = 49999;
public static final int RESULT_SET_METADATA_CACHE_DURATION_IN_MS_VALUE = 600000;

// Token Constants
public static final String GRANT_TYPE_NAME = "grant_type";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,8 @@ public static void handleErrorResponse(String response, String propertyName) thr
throw new IOException(message);
}

public static <T> T handleSuccessResponse(Response response, Class<T> type, boolean cacheResponse) throws IOException {
public static <T> T handleSuccessResponse(Response response, Class<T> type) throws IOException {
String responseString = response.body().string();
if (response.headers().get("from-local-cache") == null && cacheResponse) {
log.info("Caching the response");
MetadataCacheUtil.cacheMetadata(response.request().url().toString(), responseString);
}
return handleSuccessResponse(responseString, type);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,22 @@ public class QueryExecutor extends QueryTokenExecutor {
}

private final OkHttpClient queryClient;

public QueryExecutor(QueryServiceConnection connection) {
this(connection, null, null);
this(connection, null, null, null, false);
}

public QueryExecutor(QueryServiceConnection connection, OkHttpClient tokenClient, OkHttpClient client) {
this(connection, tokenClient, client, null);
public QueryExecutor(QueryServiceConnection connection, boolean cached) {
this(connection, null, null, null, cached);
}

QueryExecutor(QueryServiceConnection connection, OkHttpClient tokenClient, OkHttpClient client, TokenManager tokenManager) {
QueryExecutor(QueryServiceConnection connection, OkHttpClient tokenClient, OkHttpClient client, TokenManager tokenManager, boolean cached) {
super(connection, tokenClient, tokenManager);
queryClient = getQueryClient(connection, client);
queryClient = getQueryClient(connection, client, cached);
}

private OkHttpClient getQueryClient(QueryServiceConnection connection, OkHttpClient client) {
private OkHttpClient getQueryClient(QueryServiceConnection connection, OkHttpClient client, boolean cached) {
client = client == null ? DEFAULT_QUERY_CLIENT : client;
return updateClientWithSocketFactory(client, connection.isSocksProxyDisabled());
return updateClientWithSocketFactory(client, connection.isSocksProxyDisabled(), cached);
}

public Response executeQuery(String sql, boolean isV2Query, Optional<Integer> limit, Optional<Integer> offset, Optional<String> orderby) throws IOException, SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class QueryTokenExecutor {
.callTimeout(Constants.REST_TIME_OUT, TimeUnit.SECONDS)
.retryOnConnectionFailure(true)
.socketFactory(new SFDefaultSocketFactoryWrapper(false))
.addInterceptor(new MetadataCacheInterceptor())
.build();
}

Expand Down Expand Up @@ -86,7 +85,7 @@ public QueryTokenExecutor(QueryServiceConnection connection, OkHttpClient client
}

// this makes query executor not reuse across requests
this.client = updateClientWithSocketFactory(client, connection.isSocksProxyDisabled());
this.client = updateClientWithSocketFactory(client, connection.isSocksProxyDisabled(), false);

// set TenantUrl in connection. This is mandatory in gRPC flow.
if(QueryEngineEnum.HYPER == connection.getQueryEngineEnum()) {
Expand Down Expand Up @@ -143,13 +142,25 @@ private int getMaxRetries(Properties properties) {
}
}

protected static OkHttpClient updateClientWithSocketFactory(OkHttpClient client, boolean isSocksProxyDisabled) {
protected OkHttpClient updateClientWithSocketFactory(OkHttpClient client, boolean isSocksProxyDisabled, boolean cached) {
OkHttpClient.Builder builder = client.newBuilder();
if (isSocksProxyDisabled) {
return client.newBuilder()
.socketFactory(new SFDefaultSocketFactoryWrapper(true))
.build();
builder.socketFactory(new SFDefaultSocketFactoryWrapper(true));
}
return client;
int metaDataCacheDurationInMs;

if(cached) {
try {
String defaultValue = String.valueOf(Constants.RESULT_SET_METADATA_CACHE_DURATION_IN_MS_VALUE);
Properties clientInfo = this.connection.getClientInfo();
String metadataProperty = clientInfo.getProperty(Constants.RESULT_SET_METADATA_CACHE_DURATION_IN_MS, defaultValue);
metaDataCacheDurationInMs = Integer.parseInt(metadataProperty);
} catch (SQLException e) {
metaDataCacheDurationInMs = Constants.RESULT_SET_METADATA_CACHE_DURATION_IN_MS_VALUE;
}
builder.addInterceptor(new MetadataCacheInterceptor(metaDataCacheDurationInMs));
}
return builder.build();
}
protected CoreToken getCoreToken() throws SQLException, TokenException {
return tokenManager.getCoreToken();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package com.salesforce.cdp.queryservice.interceptors;

import com.salesforce.cdp.queryservice.ResponseEnum;
import com.salesforce.cdp.queryservice.core.QueryServiceConnection;
import com.salesforce.cdp.queryservice.util.Constants;
import com.salesforce.cdp.queryservice.util.MetadataCacheUtil;
import okhttp3.*;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import java.io.IOException;

Expand All @@ -35,11 +36,14 @@ public class MetadataCacheInterceptorTest {
private Interceptor.Chain chain;

private MetadataCacheInterceptor metadataCacheInterceptor;
@Mock
QueryServiceConnection connection;

@Before
public void init() {
chain = mock(Interceptor.Chain.class);
metadataCacheInterceptor = new MetadataCacheInterceptor();
connection = mock(QueryServiceConnection.class);
metadataCacheInterceptor = new MetadataCacheInterceptor(600000);
doReturn(buildRequest()).when(chain).request();
}

Expand All @@ -50,13 +54,6 @@ public void testMetadataRequestWithNoCachePresent() throws IOException {
verify(chain, times(1)).proceed(any(Request.class));
}

@Test
public void testMetadataFromCache() throws IOException {
MetadataCacheUtil.cacheMetadata("https://mjrgg9bzgy2dsyzvmjrgkmzzg1.c360a.salesforce.com" + Constants.CDP_URL + Constants.METADATA_URL, TABLE_METADATA.getResponse());
metadataCacheInterceptor.intercept(chain);
verify(chain, times(0)).proceed(any(Request.class));
}

private Request buildRequest() {
return new Request.Builder()
.url("https://mjrgg9bzgy2dsyzvmjrgkmzzg1.c360a.salesforce.com" + Constants.CDP_URL + Constants.METADATA_URL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void init() throws Exception {
doNothing().when(connection).setToken(any());
TokenManager mockTokenManager = mock(TokenManager.class);
when(mockTokenManager.getOffcoreToken()).thenReturn(getToken());
queryExecutor = new QueryExecutor(connection, null, null, mockTokenManager) {
queryExecutor = new QueryExecutor(connection, null, null, mockTokenManager, false) {
@Override
protected OkHttpClient createClient() {
return mock(OkHttpClient.class);
Expand Down

0 comments on commit cd2fbca

Please sign in to comment.