From f42ddc184b22e7c2f9e65bf9a60c86f1521fdc34 Mon Sep 17 00:00:00 2001 From: Parvathy Sasikala Devi Date: Wed, 15 Feb 2023 16:31:23 +0530 Subject: [PATCH 1/9] Dataspace support in JDBC (#120) * Added dataspace support --- .../core/QueryServiceConnection.java | 8 +++ .../core/QueryServiceMetadata.java | 58 +++++++++++++++++-- .../model/DataSpaceAttributes.java | 13 +++++ .../queryservice/model/DataspaceResponse.java | 14 +++++ .../cdp/queryservice/util/Constants.java | 6 ++ .../cdp/queryservice/util/QueryExecutor.java | 41 ++++++++----- .../queryservice/util/QueryTokenExecutor.java | 4 ++ .../cdp/queryservice/util/TokenHelper.java | 50 ++++++++++++---- 8 files changed, 165 insertions(+), 29 deletions(-) create mode 100644 src/main/java/com/salesforce/cdp/queryservice/model/DataSpaceAttributes.java create mode 100644 src/main/java/com/salesforce/cdp/queryservice/model/DataspaceResponse.java diff --git a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java index e37a685..3c08855 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java +++ b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java @@ -468,4 +468,12 @@ QueryConfigResponse getQueryConfigResponse() throws SQLException { throw new SQLException(QUERY_CONFIG_ERROR, e); } } + + public CharSequence getDataspace() { + return properties.getProperty(Constants.DATASPACE); + } + + public void setDataspace(String dataspace) { + properties.put(Constants.DATASPACE,dataspace); + } } \ No newline at end of file diff --git a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceMetadata.java b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceMetadata.java index e512ed5..30b8a23 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceMetadata.java +++ b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceMetadata.java @@ -16,6 +16,8 @@ package com.salesforce.cdp.queryservice.core; +import com.salesforce.cdp.queryservice.model.DataSpaceAttributes; +import com.salesforce.cdp.queryservice.model.DataspaceResponse; import com.salesforce.cdp.queryservice.model.MetadataResponse; import com.salesforce.cdp.queryservice.model.TableMetadata; import com.salesforce.cdp.queryservice.util.Constants; @@ -37,7 +39,7 @@ @Slf4j public class QueryServiceMetadata implements DatabaseMetaData { - private String url; + private final String url; private Properties properties; private QueryServiceConnection queryServiceConnection; private QueryExecutor queryExecutor; @@ -652,17 +654,61 @@ public ResultSet getProcedureColumns(String catalog, String schemaPattern, Strin } @Override - public ResultSet getTables(String catalog, String schemaPattern, String tableNamePattern, String[] types) throws SQLException { + public ResultSet getTables(String catalog, String dataspace, String tableNamePattern, String[] types) throws SQLException { + if(StringUtils.isNotBlank(dataspace) && StringUtils.isBlank(queryServiceConnection.getDataspace())){ + queryServiceConnection.setDataspace(dataspace); + log.info("Selected dataspace :"+dataspace); + } + else if(StringUtils.isNotBlank(dataspace) && !dataspace.equals(queryServiceConnection.getDataspace())){ + throw new SQLException("Dataspace cannot be changed in the same connections"); + } MetadataResponse metadataResponse = getMetadataResponse(); return createTableResultSet(metadataResponse, tableNamePattern); } @Override public ResultSet getSchemas() throws SQLException { + if(isTableauClient()){ + return getDataSpaces(); + } return new QueryServiceResultSet(Collections.EMPTY_LIST, new QueryServiceResultSetMetaData(GET_SCHEMAS)); } + private QueryServiceResultSet getDataSpaces() throws SQLException { + List data = new ArrayList<>(); + try { + Response response = queryExecutor.getDataspaces(); + if (response.isSuccessful()) { + DataspaceResponse successResponse= HttpHelper.handleSuccessResponse(response.body().string(),DataspaceResponse.class); + for(DataSpaceAttributes attributes :successResponse.getRecords()){ + data.add(createDataSpaceRow(attributes.getName())); + } + } + if(data.isEmpty()){ + data.add(createDataSpaceRow("default")); + } + } catch (Exception e) { + log.error("Exception while getting dataspace from query service", e); + throw new SQLException(METADATA_EXCEPTION, e); + } + QueryServiceDbMetadata dbMetadata = GET_SCHEMAS; + return new QueryServiceResultSet(data, new QueryServiceResultSetMetaData(dbMetadata)); + } + + + private Map createDataSpaceRow(String dataspaceName) { + Map row = new LinkedHashMap<>(); + row.put("TABLE_CAT", Constants.CATALOG); + row.put("TABLE_SCHEM", dataspaceName); + return row; + } + + private boolean isTableauClient() { + String userAgent =String.valueOf(properties.get(Constants.USER_AGENT)); + return StringUtils.isNotBlank(userAgent) && userAgent.equals(Constants.TABLEAU_USER_AGENT_VALUE); + } + @Override public ResultSet getCatalogs() throws SQLException { return new QueryServiceResultSet(Collections.EMPTY_LIST, @@ -916,7 +962,11 @@ public RowIdLifetime getRowIdLifetime() throws SQLException { @Override public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException { - return null; + if(isTableauClient()){ + return getDataSpaces(); + } + return new QueryServiceResultSet(Collections.EMPTY_LIST, + new QueryServiceResultSetMetaData(GET_SCHEMAS)); } @Override @@ -1031,7 +1081,7 @@ private ResultSet createColumnResultSet(MetadataResponse metadataResponse, Strin } for (int i = 0; i < columns.size(); i++) { HashMap columnMap = new LinkedHashMap<>(); - columnMap.put("TABLE_CAT", "catalog"); + columnMap.put("TABLE_CAT", Constants.CATALOG); columnMap.put("TABLE_SCHEM", null); columnMap.put("TABLE_NAME", tableNamePattern); columnMap.put("COLUMN_NAME", columns.get(i).get("name")); diff --git a/src/main/java/com/salesforce/cdp/queryservice/model/DataSpaceAttributes.java b/src/main/java/com/salesforce/cdp/queryservice/model/DataSpaceAttributes.java new file mode 100644 index 0000000..38f442a --- /dev/null +++ b/src/main/java/com/salesforce/cdp/queryservice/model/DataSpaceAttributes.java @@ -0,0 +1,13 @@ +package com.salesforce.cdp.queryservice.model; + +import com.fasterxml.jackson.annotation.JsonAlias; +import lombok.Data; + +import java.util.Map; + +@Data +public class DataSpaceAttributes { + Map attributes; + @JsonAlias({ "Name" }) + String name; +} diff --git a/src/main/java/com/salesforce/cdp/queryservice/model/DataspaceResponse.java b/src/main/java/com/salesforce/cdp/queryservice/model/DataspaceResponse.java new file mode 100644 index 0000000..055e008 --- /dev/null +++ b/src/main/java/com/salesforce/cdp/queryservice/model/DataspaceResponse.java @@ -0,0 +1,14 @@ +package com.salesforce.cdp.queryservice.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + +import java.util.List; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class DataspaceResponse { + List records; + Integer totalSize; + Boolean done; +} \ No newline at end of file diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/Constants.java b/src/main/java/com/salesforce/cdp/queryservice/util/Constants.java index f5563d6..59d2e5e 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/Constants.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/Constants.java @@ -127,4 +127,10 @@ public class Constants { public static final int END_OF_STREAM = -1; public static final int START_OF_STREAM = 0; public static final String DATE_ISO_STD = "yyyy-MM-dd'T'HH:mm:ss"; + + public static final String DATASPACE ="dataspace"; + public static final String DATASPACE_URL = "/services/data/v56.0/query/?q=SELECT+name+from+Dataspace"; + public static final String BEARER = "Bearer"; + public static final Object CATALOG ="catalog" ; + public static final int USER_ERROR = 400 ; } diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/QueryExecutor.java b/src/main/java/com/salesforce/cdp/queryservice/util/QueryExecutor.java index 8b55574..0c3d2e5 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/QueryExecutor.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/QueryExecutor.java @@ -62,18 +62,12 @@ public Response executeQuery(String sql, boolean isV2Query, Optional li Map tokenWithTenantUrl = getTokenWithTenantUrl(); StringBuilder url = new StringBuilder(Constants.PROTOCOL) .append(tokenWithTenantUrl.get(Constants.TENANT_URL)) - .append(isV2Query ? Constants.CDP_URL_V2: Constants.CDP_URL) + .append(isV2Query ? Constants.CDP_URL_V2 : Constants.CDP_URL) .append(Constants.ANSI_SQL_URL) .append(Constants.QUESTION_MARK); - if (limit.isPresent()) { - url.append(Constants.LIMIT).append(limit.get()).append(Constants.AND); - } - if (offset.isPresent()) { - url.append(Constants.OFFSET).append(offset.get()).append(Constants.AND); - } - if (orderby.isPresent()) { - url.append(Constants.ORDERBY).append(orderby.get()); - } + limit.ifPresent(integer -> url.append(Constants.LIMIT).append(integer).append(Constants.AND)); + offset.ifPresent(integer -> url.append(Constants.OFFSET).append(integer).append(Constants.AND)); + orderby.ifPresent(s -> url.append(Constants.ORDERBY).append(s)); Request request = HttpHelper.buildRequest(Constants.POST, url.toString(), body, createHeaders(tokenWithTenantUrl, this.connection.getEnableArrowStream())); return getResponse(request); } @@ -81,14 +75,13 @@ public Response executeQuery(String sql, boolean isV2Query, Optional li public Response executeNextBatchQuery(String nextBatchId) throws IOException, SQLException { log.info("Preparing to execute query for nextBatch {}", nextBatchId); Map tokenWithTenantUrl = getTokenWithTenantUrl(); - StringBuilder url = new StringBuilder(Constants.PROTOCOL + tokenWithTenantUrl.get(Constants.TENANT_URL) + String url = Constants.PROTOCOL + tokenWithTenantUrl.get(Constants.TENANT_URL) + Constants.CDP_URL_V2 + Constants.ANSI_SQL_URL + Constants.SLASH - + nextBatchId - ); + + nextBatchId; - Request request = HttpHelper.buildRequest(Constants.GET, url.toString(), null, createHeaders(tokenWithTenantUrl, this.connection.getEnableArrowStream())); + Request request = HttpHelper.buildRequest(Constants.GET, url, null, createHeaders(tokenWithTenantUrl, this.connection.getEnableArrowStream())); return getResponse(request); } @@ -138,4 +131,24 @@ protected Response getResponse(Request request) throws IOException { log.info("Total time taken to get response for url {} is {} ms and traceid {}", request.url(), endTime - startTime, response.headers(Constants.TRACE_ID)); return response; } + + public Response getDataspaces() { + try{ + + Properties connectionProperties = connection.getClientInfo(); + String url = connectionProperties.getProperty(Constants.LOGIN_URL)+Constants.DATASPACE_URL; + Map headers = new HashMap<>(); + headers.put(Constants.AUTHORIZATION,getCoreToken()); + headers.put(Constants.CONTENT_TYPE, Constants.JSON_CONTENT); + + Request request = HttpHelper.buildRequest(Constants.GET, url, null,headers ); + return getResponse(request); + } + catch(Exception e){ + log.error("Error while fetching dataspace",e); + } + return null; + } + + } diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/QueryTokenExecutor.java b/src/main/java/com/salesforce/cdp/queryservice/util/QueryTokenExecutor.java index fb8c34e..292b36b 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/QueryTokenExecutor.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/QueryTokenExecutor.java @@ -26,6 +26,7 @@ import net.jodah.failsafe.FailsafeException; import net.jodah.failsafe.RetryPolicy; import okhttp3.OkHttpClient; +import org.apache.commons.lang3.StringUtils; import java.sql.SQLException; import java.util.Map; @@ -130,5 +131,8 @@ protected static OkHttpClient updateClientWithSocketFactory(OkHttpClient client, } return client; } + protected String getCoreToken() throws SQLException, TokenException { + return Constants.BEARER + StringUtils.SPACE + TokenHelper.getCoreToken(connection.getClientInfo(),client); + } } diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/TokenHelper.java b/src/main/java/com/salesforce/cdp/queryservice/util/TokenHelper.java index 141f667..f23a860 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/TokenHelper.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/TokenHelper.java @@ -53,6 +53,11 @@ @Slf4j public class TokenHelper { + public static boolean isCoreTokenExchanged() { + return coreTokenExchanged; + } + + private static boolean coreTokenExchanged = false; private static Cache tokenCache = CacheBuilder.newBuilder() .expireAfterWrite(7200000, TimeUnit.MILLISECONDS) .maximumSize(100).build(); @@ -82,7 +87,7 @@ public static Token getToken(Properties properties, OkHttpClient client) throws && properties.containsKey(Constants.PRIVATE_KEY)) { return retrieveTokenWithJWTBearerGrant(properties, client); } - Token newToken = exchangeToken(properties.getProperty(Constants.LOGIN_URL), properties.getProperty(Constants.CORETOKEN), client); + Token newToken = exchangeToken(properties.getProperty(Constants.LOGIN_URL), properties.getProperty(Constants.CORETOKEN), properties.getProperty(Constants.DATASPACE),client); tokenCache.put(properties.getProperty(Constants.CORETOKEN), newToken); return newToken; } @@ -94,7 +99,7 @@ public static Token getToken(Properties properties, OkHttpClient client) throws clearToken(properties.getProperty(Constants.CORETOKEN)); } return renewToken(properties.getProperty(Constants.LOGIN_URL), properties.getProperty(Constants.REFRESHTOKEN), - properties.getProperty(Constants.CLIENT_ID), properties.getProperty(Constants.CLIENT_SECRET), client); + properties.getProperty(Constants.CLIENT_ID), properties.getProperty(Constants.CLIENT_SECRET),properties.getProperty(Constants.DATASPACE), client); } } @@ -134,7 +139,7 @@ private static Token retrieveTokenWithPasswordGrant(Properties properties, OkHtt // And exchange the UN/PW flow authtoken for a scoped bearer token. coreTokenRenewResponse = HttpHelper.handleSuccessResponse(response, CoreTokenRenewResponse.class, false); - return exchangeToken(coreTokenRenewResponse.getInstance_url(), coreTokenRenewResponse.getAccess_token(), client); + return exchangeToken(coreTokenRenewResponse.getInstance_url(), coreTokenRenewResponse.getAccess_token(),properties.getProperty(Constants.DATASPACE), client); } catch (IOException e) { log.error("Caught exception while retrieving the token", e); throw new TokenException(TOKEN_FETCH_FAILURE, e); @@ -173,7 +178,7 @@ private static Token retrieveTokenWithJWTBearerGrant(Properties properties, OkHt // And exchange the Key/Pair flow authtoken for a scoped bearer token. coreTokenRenewResponse = HttpHelper.handleSuccessResponse(response, CoreTokenRenewResponse.class, false); - return exchangeToken(coreTokenRenewResponse.getInstance_url(), coreTokenRenewResponse.getAccess_token(), client); + return exchangeToken(coreTokenRenewResponse.getInstance_url(), coreTokenRenewResponse.getAccess_token(),properties.getProperty(Constants.DATASPACE), client); } catch(SQLException sqlException) { log.error("Caught exception while setting audience for JWT assertion", sqlException); throw new TokenException(TOKEN_FETCH_FAILURE, sqlException); @@ -200,7 +205,18 @@ private static void fillArray(byte[] bytes, byte val) { } } - private static Token renewToken(String url, String refreshToken, String clientId, String secret, OkHttpClient client) throws TokenException { + private static Token renewToken(String url, String refreshToken, String clientId, String secret, String dataspace,OkHttpClient client) throws TokenException { + + + CoreTokenRenewResponse coreTokenRenewResponse = getCoreToken(url,refreshToken,clientId,secret,client); + log.info("Renewed core token {}", coreTokenRenewResponse); + Token token = exchangeToken(url, coreTokenRenewResponse.getAccess_token(),dataspace, client); + tokenCache.put(coreTokenRenewResponse.getAccess_token(), token); + return token; + + } + + public static CoreTokenRenewResponse getCoreToken(String url, String refreshToken, String clientId, String secret, OkHttpClient client) throws TokenException { String token_url = url + Constants.CORE_TOKEN_URL; Map requestBody = new HashMap<>(); requestBody.put(Constants.GRANT_TYPE_NAME, Constants.REFRESH_TOKEN_GRANT_TYPE); @@ -211,22 +227,23 @@ private static Token renewToken(String url, String refreshToken, String clientId try { Response response = login(requestBody, token_url, client); coreTokenRenewResponse = HttpHelper.handleSuccessResponse(response, CoreTokenRenewResponse.class, false); - log.info("Renewed core token {}", coreTokenRenewResponse); - Token token = exchangeToken(url, coreTokenRenewResponse.getAccess_token(), client); - tokenCache.put(coreTokenRenewResponse.getAccess_token(), token); - return token; - } catch (IOException e) { + coreTokenExchanged = false; + } + catch (IOException e) { log.error("Caught exception while renewing the core token", e); throw new TokenException(RENEW_TOKEN, e); } + return coreTokenRenewResponse; } - private static Token exchangeToken(String url, String coreToken, OkHttpClient client) throws TokenException { + private static Token exchangeToken(String url, String coreToken,String dataspace, OkHttpClient client) throws TokenException { String token_url = url + Constants.TOKEN_EXCHANGE_URL; Map requestBody = new HashMap<>(); requestBody.put(Constants.GRANT_TYPE_NAME, Constants.GRANT_TYPE); requestBody.put(Constants.SUBJECT_TOKEN_TYPE_NAME, Constants.SUBJECT_TOKEN_TYPE); requestBody.put(Constants.SUBJECT_TOKEN, coreToken); + coreTokenExchanged=true; + if(StringUtils.isNotBlank(dataspace)) {requestBody.put(Constants.DATASPACE,dataspace);} Calendar expire_time = Calendar.getInstance(); Response response = null; try { @@ -272,6 +289,7 @@ private static Response login(Map requestBody, String url, OkHtt FormBody.Builder formBody = new FormBody.Builder(); requestBody.forEach(formBody::addEncoded); Map headers = Collections.singletonMap(Constants.CONTENT_TYPE, Constants.URL_ENCODED_CONTENT); + log.info(requestBody.toString()); try { Request request = HttpHelper.buildRequest(Constants.POST, url, formBody.build(), headers); Response response = client.newCall(request).execute(); @@ -303,6 +321,7 @@ private static void invalidateCoreToken(String url, String coreToken, OkHttpClie } } + private static Response un_pw_login( String grantType, String clientId, byte[] clientSecret, String userName, byte[] passwordBytes, @@ -406,4 +425,13 @@ private static RSAPrivateKey getPrivateKey(String rsaPrivateKey) throws NoSuchAl RSAPrivateKey privateKey = (RSAPrivateKey)kf.generatePrivate(keySpec); return privateKey; } + + public static String getCoreToken(Properties connectionProperties, OkHttpClient client) throws TokenException { + if(TokenHelper.isCoreTokenExchanged()){ + CoreTokenRenewResponse coreTokenRenewResponse = getCoreToken(connectionProperties.getProperty(Constants.LOGIN_URL), connectionProperties.getProperty(Constants.REFRESHTOKEN), + connectionProperties.getProperty(Constants.CLIENT_ID), connectionProperties.getProperty(Constants.CLIENT_SECRET), client); + return coreTokenRenewResponse.getAccess_token(); + } + return connectionProperties.getProperty(Constants.CORETOKEN); + } } From 90ff8344fa7b955ae241705621556ce97466caaa Mon Sep 17 00:00:00 2001 From: sonal-aggarwal Date: Mon, 27 Feb 2023 22:33:13 +0530 Subject: [PATCH 2/9] @W-12606762: fixing grpc dependency conflict --- pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pom.xml b/pom.xml index 331f974..a7ec5a2 100644 --- a/pom.xml +++ b/pom.xml @@ -222,6 +222,10 @@ io.netty ${shadeBase}.io.netty + + io.grpc + ${shadeBase}.io.grpc + From 5ed70a5fa69b38312633cb977105deefb951fe1c Mon Sep 17 00:00:00 2001 From: sonal-aggarwal Date: Tue, 28 Feb 2023 09:55:20 +0530 Subject: [PATCH 3/9] @W-12611809: grpc timeout removed --- .../com/salesforce/cdp/queryservice/util/QueryGrpcExecutor.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/QueryGrpcExecutor.java b/src/main/java/com/salesforce/cdp/queryservice/util/QueryGrpcExecutor.java index 92a7528..8e3d781 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/QueryGrpcExecutor.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/QueryGrpcExecutor.java @@ -45,7 +45,6 @@ public class QueryGrpcExecutor extends QueryTokenExecutor { private static ManagedChannel DEFAULT_CHANNEL = null; private static final int port = 443; - private static final int timeoutInMin = 5; // No retry on hyper for now. Fallback to v2 call if receive even one failure from hyper. private static final int GRPC_MAX_RETRY = 0; private static final Metadata.Key TRACE_ID_KEY = Metadata.Key.of(Constants.TRACE_ID, Metadata.ASCII_STRING_MARSHALLER); @@ -135,7 +134,6 @@ private Iterator executeQuery(String sql, AtomicRefe QueryServiceGrpc.QueryServiceBlockingStub stub = QueryServiceGrpc.newBlockingStub(channel); Properties properties = connection.getClientInfo(); return stub - .withDeadlineAfter(timeoutInMin, TimeUnit.MINUTES) .withMaxInboundMessageSize(MAX_MESSAGE_SIZE) .withInterceptors( new GrpcInterceptor(tokenWithTenantUrl, properties), From ae5c9d779c9051f71bd6e81b83feced33bf211b5 Mon Sep 17 00:00:00 2001 From: sonal-aggarwal Date: Thu, 2 Mar 2023 15:43:19 +0530 Subject: [PATCH 4/9] @W-12534388: OutOfMemory fix in arrow deserialization --- .../queryservice/util/ExtractArrowUtil.java | 53 +++++++++---------- 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java b/src/main/java/com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java index fcf5f13..3bc69f8 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java @@ -22,12 +22,11 @@ @Slf4j public class ExtractArrowUtil extends ArrowUtil { private Iterator inputStream; - private RootAllocator streamRootAllocator; + private static final long ALLOCATOR_MAX_SIZE_IN_BYTES = 10 * 1024 * 1024; // 10MB public ExtractArrowUtil(Iterator inputStream) { super(); this.inputStream = inputStream; - streamRootAllocator = new RootAllocator(Long.MAX_VALUE); } public boolean isNextChunkPresent() { @@ -38,38 +37,38 @@ public List getRowsFromStreamResponse() throws SQLException { AnsiSqlQueryStreamResponse response = inputStream.next(); ByteString arrowResponseChunk = response.getArrowResponseChunk().getData(); InputStream chunkInputStream = new ByteArrayInputStream(arrowResponseChunk.toByteArray()); - ArrowStreamReader arrowStreamReader = new ArrowStreamReader(chunkInputStream, streamRootAllocator); + RootAllocator rootAllocator = new RootAllocator(ALLOCATOR_MAX_SIZE_IN_BYTES); + try (ArrowStreamReader arrowStreamReader = new ArrowStreamReader(chunkInputStream, rootAllocator)) { - VectorSchemaRoot vectorSchemaRoot; + VectorSchemaRoot vectorSchemaRoot; - try { - if (!arrowStreamReader.loadNextBatch()) { - throw new SQLException("Unable to load the record batch"); + try { + if (!arrowStreamReader.loadNextBatch()) { + throw new SQLException("Unable to load the record batch"); + } + vectorSchemaRoot = arrowStreamReader.getVectorSchemaRoot(); + } catch (IOException e) { + throw new SQLException("Error while getting VectorSchemaRoot"); } - vectorSchemaRoot = arrowStreamReader.getVectorSchemaRoot(); - } catch (IOException e) { - throw new SQLException("Error while getting VectorSchemaRoot"); - } - List fieldVectors = vectorSchemaRoot.getFieldVectors(); - List data = new ArrayList<>(); + List fieldVectors = vectorSchemaRoot.getFieldVectors(); + List data = new ArrayList<>(); - int rowCount = fieldVectors.get(0).getValueCount(); - for (int i = 0; i < rowCount; ++i) { - List row = new ArrayList<>(); - for (FieldVector fieldVector : fieldVectors) { - Object fieldValue = this.getFieldValue(fieldVector, i); - row.add(fieldValue); + int rowCount = fieldVectors.get(0).getValueCount(); + for (int i = 0; i < rowCount; ++i) { + List row = new ArrayList<>(); + for (FieldVector fieldVector : fieldVectors) { + Object fieldValue = this.getFieldValue(fieldVector, i); + row.add(fieldValue); + } + data.add(row); } - data.add(row); - } - return data; - } - public void closeReader() { - if (streamRootAllocator != null) { - streamRootAllocator.close(); - streamRootAllocator = null; + return data; + } catch (IOException e) { + throw new SQLException("Failed to parse the arrow stream", e); + } finally { + rootAllocator.close(); } } } \ No newline at end of file From 21ab3a151efc8572cc1008d055e1b564fe6e9b46 Mon Sep 17 00:00:00 2001 From: minukolunu <126959520+minukolunu@users.noreply.github.com> Date: Sat, 4 Mar 2023 17:41:53 +0530 Subject: [PATCH 5/9] Shade jackson classes for the driver to work with spark 3.2.1 for AWS DataWrangler --- pom.xml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index a7ec5a2..ea12579 100644 --- a/pom.xml +++ b/pom.xml @@ -226,6 +226,10 @@ io.grpc ${shadeBase}.io.grpc + + com.fasterxml.jackson + ${shadeBase}.com.fasterxml.jackson + @@ -316,4 +320,4 @@ - \ No newline at end of file + From fdeb0098d0ed29f82bad88015b6d57dfd79eb873 Mon Sep 17 00:00:00 2001 From: sonal-aggarwal Date: Tue, 7 Mar 2023 17:03:56 +0530 Subject: [PATCH 6/9] @W-12648471: fixing timezone in Hyper Date/Time fields --- .../core/QueryServiceHyperResultSet.java | 28 +++++ .../cdp/queryservice/util/ArrowUtil.java | 9 +- .../core/QueryServiceHyperResultSetTest.java | 112 ++++++++++++++++++ 3 files changed, 143 insertions(+), 6 deletions(-) create mode 100644 src/test/java/com/salesforce/cdp/queryservice/core/QueryServiceHyperResultSetTest.java diff --git a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceHyperResultSet.java b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceHyperResultSet.java index 3333005..d7d5eb1 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceHyperResultSet.java +++ b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceHyperResultSet.java @@ -20,11 +20,16 @@ import com.salesforce.cdp.queryservice.util.ExtractArrowUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.DateUtils; +import java.sql.Date; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; +import java.util.Calendar; import java.util.Iterator; import java.util.List; @@ -165,6 +170,29 @@ public boolean isLast() throws SQLException { return !this.isNextChunkPresent() && this.currentRow == this.data.size() - 1; } + @Override + public Date getDate(String columnLabel, Calendar cal) throws SQLException { + errorOutIfClosed(); + Object value = getObject(columnLabel); + if (wasNull() || value== null || StringUtils.EMPTY.equals(value)) { + wasNull.set(true); + return null; + } + + if(value instanceof LocalDateTime) { + long epoch = ((LocalDateTime) value).toEpochSecond(ZoneOffset.UTC); + return new Date(epoch * 1000); + } + if(value instanceof Long) { + return new Date((Long)value); + } + if(value instanceof Date) { + return (Date) value; + } else { + throw new SQLException("Invalid date from server: " + value + ", columnLabel: " + columnLabel); + } + } + private boolean isNextChunkPresent() throws SQLException { try { return arrowUtil.isNextChunkPresent(); diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/ArrowUtil.java b/src/main/java/com/salesforce/cdp/queryservice/util/ArrowUtil.java index b6cbd93..a24d84f 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/ArrowUtil.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/ArrowUtil.java @@ -28,6 +28,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.sql.Date; import java.sql.SQLException; import java.text.MessageFormat; import java.util.ArrayList; @@ -145,16 +146,12 @@ Object getFieldValue(FieldVector fieldVector, int index) throws SQLException { return ((TimeNanoVector) fieldVector).getObject(index); } else if (type == Types.MinorType.TIMESTAMPNANOTZ) { long epochNano = ((TimeStampNanoTZVector) fieldVector).getObject(index); - String date = new java.text.SimpleDateFormat(Constants.DATE_ISO_STD) - .format(new java.util.Date (epochNano/1000000)); - return date; + return epochNano/1000000; } else if (type == Types.MinorType.TIMESTAMPNANO) { return ((TimeStampNanoVector) fieldVector).getObject(index); } else if (type == Types.MinorType.TIMESTAMPMILLITZ) { long epochMillis = ((TimeStampMilliTZVector) fieldVector).getObject(index); - String date = new java.text.SimpleDateFormat(Constants.DATE_ISO_STD) - .format(new java.util.Date (epochMillis)); - return date; + return epochMillis; } else if (type == Types.MinorType.TIMESTAMPMILLI) { return ((TimeStampMilliVector) fieldVector).getObject(index); } diff --git a/src/test/java/com/salesforce/cdp/queryservice/core/QueryServiceHyperResultSetTest.java b/src/test/java/com/salesforce/cdp/queryservice/core/QueryServiceHyperResultSetTest.java new file mode 100644 index 0000000..108d0f9 --- /dev/null +++ b/src/test/java/com/salesforce/cdp/queryservice/core/QueryServiceHyperResultSetTest.java @@ -0,0 +1,112 @@ +package com.salesforce.cdp.queryservice.core; + +import com.fasterxml.jackson.databind.util.ArrayIterator; +import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlQueryStreamResponse; +import com.salesforce.cdp.queryservice.util.ExtractArrowUtil; +import com.salesforce.cdp.queryservice.util.QueryExecutor; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.internal.util.reflection.Whitebox; + +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class QueryServiceHyperResultSetTest { + + private ExtractArrowUtil arrowUtil; + + private QueryServiceHyperResultSet hyperResultSet; + + @Before + public void init() throws SQLException { + arrowUtil = Mockito.mock(ExtractArrowUtil.class); + ResultSetMetaData resultSetMetaData = mockResultSetMetadata(); + hyperResultSet = new QueryServiceHyperResultSet(null, resultSetMetaData, null); + when(arrowUtil.isNextChunkPresent()).thenReturn(true, false); + when(arrowUtil.getRowsFromStreamResponse()).thenReturn(mockData()); + Whitebox.setInternalState(hyperResultSet, "arrowUtil", arrowUtil); + } + + @Test + public void testGetObject() throws SQLException { + hyperResultSet.next(); + List data = (List)mockData().get(0); + for(int i=1; i<=data.size(); i++) { + Assert.assertEquals(data.get(i-1), hyperResultSet.getObject(i)); + } + } + + @Test + public void testGetDate() throws SQLException { + hyperResultSet.next(); + Assert.assertEquals(new Date(1678166106000L).toString(), hyperResultSet.getDate(4).toString()); + Assert.assertEquals(new Time(1678166106000L).toString(), hyperResultSet.getTime(4).toString()); + Assert.assertEquals(new Timestamp(1678166106000L).toString(), hyperResultSet.getTimestamp(4).toString()); + Assert.assertEquals(new Timestamp(1678166106000L).toString(), hyperResultSet.getTimestamp(4, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).toString()); + + Assert.assertEquals(new Date(1678166106000L).toString(), hyperResultSet.getDate(5).toString()); + Assert.assertEquals(new Time(1678166106000L).toString(), hyperResultSet.getTime(5).toString()); + Assert.assertEquals(new Timestamp(1678166106000L).toString(), hyperResultSet.getTimestamp(5).toString()); + Assert.assertEquals(new Timestamp(1678166106000L).toString(), hyperResultSet.getTimestamp(5, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).toString()); + + Assert.assertEquals(new Date(1678166106000L).toString(), hyperResultSet.getDate(6).toString()); + Assert.assertEquals(new Time(1678166106000L).toString(), hyperResultSet.getTime(6).toString()); + Assert.assertEquals(new Timestamp(1678166106000L).toString(), hyperResultSet.getTimestamp(6).toString()); + Assert.assertEquals(new Timestamp(1678166106000L).toString(), hyperResultSet.getTimestamp(6, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).toString()); + + } + + private List mockData() { + List data = new ArrayList<>(); + List row = new ArrayList<>(); + row.add(1); + row.add("1"); + row.add(1.0); + row.add(1678166106000L); + row.add(LocalDateTime.ofEpochSecond(1678166106, 0, ZoneOffset.UTC)); + row.add(new Date(1678166106000L)); + data.add(row); + return data; + } + + private ResultSetMetaData mockResultSetMetadata() { + String[] columnNames = {"col1", "col2", "col3", "col4", "col5", "col6"}; + String[] columnTypes = {"INTEGER", "VARCHAR", "DECIMAL", "TIMESTAMP WITH TIME ZONE", "TIMESTAMP WITH TIME ZONE", "TIMESTAMP WITH TIME ZONE"}; + Integer[] columnTypeIds = {1,2,3,4,5,6}; + Map columnNameToPosition = new HashMap<>(); + + for(int i=0; i Date: Tue, 7 Mar 2023 18:26:36 +0530 Subject: [PATCH 7/9] Share add Dependencies --- pom.xml | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/pom.xml b/pom.xml index ea12579..aabd23c 100644 --- a/pom.xml +++ b/pom.xml @@ -230,6 +230,26 @@ com.fasterxml.jackson ${shadeBase}.com.fasterxml.jackson + + io.jsonwebtoken + ${shadeBase}.io.jsonwebtoken + + + io.vavr + ${shadeBase}.io.vavr + + + com.squareup + ${shadeBase}.com.squareup + + + com.google + ${shadeBase}.com.google + + + net.jodah + ${shadeBase}.net.jodah + From 7c1ef9f08baf31b1c8ad6777573c04c4196b7764 Mon Sep 17 00:00:00 2001 From: sonal-aggarwal Date: Fri, 10 Mar 2023 13:48:30 +0530 Subject: [PATCH 8/9] release version 1.19.0 with bug fixes --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4781992..9b27fb7 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.queryService Salesforce-CDP-jdbc - 1.18.0 + 1.19.0 UTF-8 From f2a2ed65b3dff992b0d6b0ccb5fcd8a1c78c2806 Mon Sep 17 00:00:00 2001 From: sonal-aggarwal Date: Fri, 10 Mar 2023 13:52:46 +0530 Subject: [PATCH 9/9] increasing size of allocator to 100MB --- .../com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java b/src/main/java/com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java index 3bc69f8..69e1a23 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java @@ -22,7 +22,7 @@ @Slf4j public class ExtractArrowUtil extends ArrowUtil { private Iterator inputStream; - private static final long ALLOCATOR_MAX_SIZE_IN_BYTES = 10 * 1024 * 1024; // 10MB + private static final long ALLOCATOR_MAX_SIZE_IN_BYTES = 100 * 1024 * 1024; // 100MB public ExtractArrowUtil(Iterator inputStream) { super();