Skip to content

Commit

Permalink
Merge pull request #135 from forcedotcom/develop
Browse files Browse the repository at this point in the history
Dataspace changes and Bug fixes
  • Loading branch information
soaggarwal authored Mar 10, 2023
2 parents 97a2982 + f2a2ed6 commit 51e72df
Show file tree
Hide file tree
Showing 13 changed files with 360 additions and 64 deletions.
28 changes: 26 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.queryService</groupId>
<artifactId>Salesforce-CDP-jdbc</artifactId>
<version>1.18.0</version>
<version>1.19.0</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down Expand Up @@ -226,6 +226,30 @@
<pattern>io.grpc</pattern>
<shadedPattern>${shadeBase}.io.grpc</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>${shadeBase}.com.fasterxml.jackson</shadedPattern>
</relocation>
<relocation>
<pattern>io.jsonwebtoken</pattern>
<shadedPattern>${shadeBase}.io.jsonwebtoken</shadedPattern>
</relocation>
<relocation>
<pattern>io.vavr</pattern>
<shadedPattern>${shadeBase}.io.vavr</shadedPattern>
</relocation>
<relocation>
<pattern>com.squareup</pattern>
<shadedPattern>${shadeBase}.com.squareup</shadedPattern>
</relocation>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>${shadeBase}.com.google</shadedPattern>
</relocation>
<relocation>
<pattern>net.jodah</pattern>
<shadedPattern>${shadeBase}.net.jodah</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
Expand Down Expand Up @@ -316,4 +340,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -468,4 +468,12 @@ QueryConfigResponse getQueryConfigResponse() throws SQLException {
throw new SQLException(QUERY_CONFIG_ERROR, e);
}
}

public CharSequence getDataspace() {
return properties.getProperty(Constants.DATASPACE);
}

public void setDataspace(String dataspace) {
properties.put(Constants.DATASPACE,dataspace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@
import com.salesforce.cdp.queryservice.util.ExtractArrowUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;

import java.sql.Date;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Iterator;
import java.util.List;

Expand Down Expand Up @@ -165,6 +170,29 @@ public boolean isLast() throws SQLException {
return !this.isNextChunkPresent() && this.currentRow == this.data.size() - 1;
}

@Override
public Date getDate(String columnLabel, Calendar cal) throws SQLException {
errorOutIfClosed();
Object value = getObject(columnLabel);
if (wasNull() || value== null || StringUtils.EMPTY.equals(value)) {
wasNull.set(true);
return null;
}

if(value instanceof LocalDateTime) {
long epoch = ((LocalDateTime) value).toEpochSecond(ZoneOffset.UTC);
return new Date(epoch * 1000);
}
if(value instanceof Long) {
return new Date((Long)value);
}
if(value instanceof Date) {
return (Date) value;
} else {
throw new SQLException("Invalid date from server: " + value + ", columnLabel: " + columnLabel);
}
}

private boolean isNextChunkPresent() throws SQLException {
try {
return arrowUtil.isNextChunkPresent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.salesforce.cdp.queryservice.core;

import com.salesforce.cdp.queryservice.model.DataSpaceAttributes;
import com.salesforce.cdp.queryservice.model.DataspaceResponse;
import com.salesforce.cdp.queryservice.model.MetadataResponse;
import com.salesforce.cdp.queryservice.model.TableMetadata;
import com.salesforce.cdp.queryservice.util.Constants;
Expand All @@ -37,7 +39,7 @@

@Slf4j
public class QueryServiceMetadata implements DatabaseMetaData {
private String url;
private final String url;
private Properties properties;
private QueryServiceConnection queryServiceConnection;
private QueryExecutor queryExecutor;
Expand Down Expand Up @@ -652,17 +654,61 @@ public ResultSet getProcedureColumns(String catalog, String schemaPattern, Strin
}

@Override
public ResultSet getTables(String catalog, String schemaPattern, String tableNamePattern, String[] types) throws SQLException {
public ResultSet getTables(String catalog, String dataspace, String tableNamePattern, String[] types) throws SQLException {
if(StringUtils.isNotBlank(dataspace) && StringUtils.isBlank(queryServiceConnection.getDataspace())){
queryServiceConnection.setDataspace(dataspace);
log.info("Selected dataspace :"+dataspace);
}
else if(StringUtils.isNotBlank(dataspace) && !dataspace.equals(queryServiceConnection.getDataspace())){
throw new SQLException("Dataspace cannot be changed in the same connections");
}
MetadataResponse metadataResponse = getMetadataResponse();
return createTableResultSet(metadataResponse, tableNamePattern);
}

@Override
public ResultSet getSchemas() throws SQLException {
if(isTableauClient()){
return getDataSpaces();
}
return new QueryServiceResultSet(Collections.EMPTY_LIST,
new QueryServiceResultSetMetaData(GET_SCHEMAS));
}

private QueryServiceResultSet getDataSpaces() throws SQLException {
List<Object> data = new ArrayList<>();
try {
Response response = queryExecutor.getDataspaces();
if (response.isSuccessful()) {
DataspaceResponse successResponse= HttpHelper.handleSuccessResponse(response.body().string(),DataspaceResponse.class);
for(DataSpaceAttributes attributes :successResponse.getRecords()){
data.add(createDataSpaceRow(attributes.getName()));
}
}
if(data.isEmpty()){
data.add(createDataSpaceRow("default"));
}
} catch (Exception e) {
log.error("Exception while getting dataspace from query service", e);
throw new SQLException(METADATA_EXCEPTION, e);
}
QueryServiceDbMetadata dbMetadata = GET_SCHEMAS;
return new QueryServiceResultSet(data, new QueryServiceResultSetMetaData(dbMetadata));
}


private Map<String, Object> createDataSpaceRow(String dataspaceName) {
Map<String, Object> row = new LinkedHashMap<>();
row.put("TABLE_CAT", Constants.CATALOG);
row.put("TABLE_SCHEM", dataspaceName);
return row;
}

private boolean isTableauClient() {
String userAgent =String.valueOf(properties.get(Constants.USER_AGENT));
return StringUtils.isNotBlank(userAgent) && userAgent.equals(Constants.TABLEAU_USER_AGENT_VALUE);
}

@Override
public ResultSet getCatalogs() throws SQLException {
return new QueryServiceResultSet(Collections.EMPTY_LIST,
Expand Down Expand Up @@ -916,7 +962,11 @@ public RowIdLifetime getRowIdLifetime() throws SQLException {

@Override
public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException {
return null;
if(isTableauClient()){
return getDataSpaces();
}
return new QueryServiceResultSet(Collections.EMPTY_LIST,
new QueryServiceResultSetMetaData(GET_SCHEMAS));
}

@Override
Expand Down Expand Up @@ -1031,7 +1081,7 @@ private ResultSet createColumnResultSet(MetadataResponse metadataResponse, Strin
}
for (int i = 0; i < columns.size(); i++) {
HashMap<String, Object> columnMap = new LinkedHashMap<>();
columnMap.put("TABLE_CAT", "catalog");
columnMap.put("TABLE_CAT", Constants.CATALOG);
columnMap.put("TABLE_SCHEM", null);
columnMap.put("TABLE_NAME", tableNamePattern);
columnMap.put("COLUMN_NAME", columns.get(i).get("name"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.salesforce.cdp.queryservice.model;

import com.fasterxml.jackson.annotation.JsonAlias;
import lombok.Data;

import java.util.Map;

@Data
public class DataSpaceAttributes {
Map<String,Object> attributes;
@JsonAlias({ "Name" })
String name;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.salesforce.cdp.queryservice.model;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;

import java.util.List;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class DataspaceResponse {
List<DataSpaceAttributes> records;
Integer totalSize;
Boolean done;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.sql.Date;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.ArrayList;
Expand Down Expand Up @@ -145,16 +146,12 @@ Object getFieldValue(FieldVector fieldVector, int index) throws SQLException {
return ((TimeNanoVector) fieldVector).getObject(index);
} else if (type == Types.MinorType.TIMESTAMPNANOTZ) {
long epochNano = ((TimeStampNanoTZVector) fieldVector).getObject(index);
String date = new java.text.SimpleDateFormat(Constants.DATE_ISO_STD)
.format(new java.util.Date (epochNano/1000000));
return date;
return epochNano/1000000;
} else if (type == Types.MinorType.TIMESTAMPNANO) {
return ((TimeStampNanoVector) fieldVector).getObject(index);
} else if (type == Types.MinorType.TIMESTAMPMILLITZ) {
long epochMillis = ((TimeStampMilliTZVector) fieldVector).getObject(index);
String date = new java.text.SimpleDateFormat(Constants.DATE_ISO_STD)
.format(new java.util.Date (epochMillis));
return date;
return epochMillis;
} else if (type == Types.MinorType.TIMESTAMPMILLI) {
return ((TimeStampMilliVector) fieldVector).getObject(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,10 @@ public class Constants {
public static final int END_OF_STREAM = -1;
public static final int START_OF_STREAM = 0;
public static final String DATE_ISO_STD = "yyyy-MM-dd'T'HH:mm:ss";

public static final String DATASPACE ="dataspace";
public static final String DATASPACE_URL = "/services/data/v56.0/query/?q=SELECT+name+from+Dataspace";
public static final String BEARER = "Bearer";
public static final Object CATALOG ="catalog" ;
public static final int USER_ERROR = 400 ;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
@Slf4j
public class ExtractArrowUtil extends ArrowUtil {
private Iterator<AnsiSqlQueryStreamResponse> inputStream;
private RootAllocator streamRootAllocator;
private static final long ALLOCATOR_MAX_SIZE_IN_BYTES = 100 * 1024 * 1024; // 100MB

public ExtractArrowUtil(Iterator<AnsiSqlQueryStreamResponse> inputStream) {
super();
this.inputStream = inputStream;
streamRootAllocator = new RootAllocator(Long.MAX_VALUE);
}

public boolean isNextChunkPresent() {
Expand All @@ -38,38 +37,38 @@ public List<Object> getRowsFromStreamResponse() throws SQLException {
AnsiSqlQueryStreamResponse response = inputStream.next();
ByteString arrowResponseChunk = response.getArrowResponseChunk().getData();
InputStream chunkInputStream = new ByteArrayInputStream(arrowResponseChunk.toByteArray());
ArrowStreamReader arrowStreamReader = new ArrowStreamReader(chunkInputStream, streamRootAllocator);
RootAllocator rootAllocator = new RootAllocator(ALLOCATOR_MAX_SIZE_IN_BYTES);
try (ArrowStreamReader arrowStreamReader = new ArrowStreamReader(chunkInputStream, rootAllocator)) {

VectorSchemaRoot vectorSchemaRoot;
VectorSchemaRoot vectorSchemaRoot;

try {
if (!arrowStreamReader.loadNextBatch()) {
throw new SQLException("Unable to load the record batch");
try {
if (!arrowStreamReader.loadNextBatch()) {
throw new SQLException("Unable to load the record batch");
}
vectorSchemaRoot = arrowStreamReader.getVectorSchemaRoot();
} catch (IOException e) {
throw new SQLException("Error while getting VectorSchemaRoot");
}
vectorSchemaRoot = arrowStreamReader.getVectorSchemaRoot();
} catch (IOException e) {
throw new SQLException("Error while getting VectorSchemaRoot");
}

List<FieldVector> fieldVectors = vectorSchemaRoot.getFieldVectors();
List<Object> data = new ArrayList<>();
List<FieldVector> fieldVectors = vectorSchemaRoot.getFieldVectors();
List<Object> data = new ArrayList<>();

int rowCount = fieldVectors.get(0).getValueCount();
for (int i = 0; i < rowCount; ++i) {
List<Object> row = new ArrayList<>();
for (FieldVector fieldVector : fieldVectors) {
Object fieldValue = this.getFieldValue(fieldVector, i);
row.add(fieldValue);
int rowCount = fieldVectors.get(0).getValueCount();
for (int i = 0; i < rowCount; ++i) {
List<Object> row = new ArrayList<>();
for (FieldVector fieldVector : fieldVectors) {
Object fieldValue = this.getFieldValue(fieldVector, i);
row.add(fieldValue);
}
data.add(row);
}
data.add(row);
}
return data;
}

public void closeReader() {
if (streamRootAllocator != null) {
streamRootAllocator.close();
streamRootAllocator = null;
return data;
} catch (IOException e) {
throw new SQLException("Failed to parse the arrow stream", e);
} finally {
rootAllocator.close();
}
}
}
Loading

0 comments on commit 51e72df

Please sign in to comment.