Skip to content

Commit

Permalink
Support json arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
sbaldwin-rs committed Apr 7, 2022
1 parent 089134c commit de7031d
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 144 deletions.
11 changes: 5 additions & 6 deletions src/main/java/rockset/AvroParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import io.confluent.kafka.serializers.NonRecordContainer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -34,10 +34,9 @@ class AvroParser implements RecordParser {
);

@Override
public Map<String, Object> parseValue(SinkRecord record) {
// If there is no value then return an empty map
public List<Map<String, Object>> parseValue(SinkRecord record) {
if (record.value() == null) {
return new HashMap<String, Object>();
return Collections.emptyList();
}
AvroData avroData = new AvroData(1); // arg is cacheSize
Object val = avroData.fromConnectData(record.valueSchema(), record.value());
Expand All @@ -46,10 +45,10 @@ public Map<String, Object> parseValue(SinkRecord record) {
}
if (val instanceof Record) {
Map<String, Object> map = getMap(val);
return convertLogicalTypesMap(record.valueSchema(), map);
return Collections.singletonList(convertLogicalTypesMap(record.valueSchema(), map));
}

return getMap(val);
return Collections.singletonList(getMap(val));
}

private boolean isLogicalType(Schema schema) {
Expand Down
14 changes: 6 additions & 8 deletions src/main/java/rockset/RecordParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.confluent.connect.avro.AvroData;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -19,7 +20,7 @@

public interface RecordParser {

Map<String, Object> parseValue(SinkRecord record);
List<Map<String, Object>> parseValue(SinkRecord record);

/**
* Parse key from a sink record.
Expand All @@ -46,14 +47,14 @@ class JsonParser implements RecordParser {
private static final ObjectMapper mapper = new ObjectMapper();

@Override
public Map<String, Object> parseValue(SinkRecord record) {
public List<Map<String, Object>> parseValue(SinkRecord record) {
Exception cause;
// First try to deserialize as map
try {
if (record.value() == null) {
return new HashMap<String, Object>();
return Collections.emptyList();
}
return toMap(record.value());
return Collections.singletonList(toMap(record.value()));
} catch (Exception e) {
cause = e;
}
Expand All @@ -70,10 +71,7 @@ public Map<String, Object> parseValue(SinkRecord record) {
log.warn(message, cause);
throw new RuntimeException(message, e);
}

int size = maps.size();
checkArgument(size == 1, "Only 1 object allowed in list type messages. Found %s", size);
return maps.get(0);
return maps;
}

private static Map<String, Object> toMap(Object value) throws IOException {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/rockset/RocksetClientWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public void addDoc(String topic, Collection<SinkRecord> records,
}

try {
Map<String, Object> doc = recordParser.parseValue(record);
messages.add(doc);
List<Map<String, Object>> docs = recordParser.parseValue(record);
messages.addAll(docs);
}
catch (Exception e) {
throw new ConnectException("Invalid JSON encountered in stream", e);
Expand Down
215 changes: 110 additions & 105 deletions src/main/java/rockset/RocksetRequestWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,121 +27,126 @@
import java.util.concurrent.TimeUnit;

public class RocksetRequestWrapper implements RocksetWrapper {
private static Logger log = LoggerFactory.getLogger(RocksetRequestWrapper.class);
private static Logger log = LoggerFactory.getLogger(RocksetRequestWrapper.class);

private static final MediaType JSON = MediaType.parse("application/json");
private static final MediaType JSON = MediaType.parse("application/json");

private static final String KAFKA_ENDPOINT = "/v1/receivers/kafka";
private static final ObjectMapper mapper = new ObjectMapper();
private static final String KAFKA_ENDPOINT = "/v1/receivers/kafka";
private static final ObjectMapper mapper = new ObjectMapper();

private OkHttpClient client;
private String integrationKeyEncoded;
private String apiServer;
private OkHttpClient client;
private String integrationKeyEncoded;
private String apiServer;

public RocksetRequestWrapper(RocksetConnectorConfig config) {
if (client == null) {
client = new OkHttpClient.Builder()
.connectTimeout(1, TimeUnit.MINUTES)
.writeTimeout(1, TimeUnit.MINUTES)
.readTimeout(1, TimeUnit.MINUTES)
.build();
public RocksetRequestWrapper(RocksetConnectorConfig config) {
if (client == null) {
client = new OkHttpClient.Builder()
.connectTimeout(1, TimeUnit.MINUTES)
.writeTimeout(1, TimeUnit.MINUTES)
.readTimeout(1, TimeUnit.MINUTES)
.build();
}

parseConnectionString(config.getRocksetIntegrationKey());
this.apiServer = config.getRocksetApiServerUrl();
}

parseConnectionString(config.getRocksetIntegrationKey());
this.apiServer = config.getRocksetApiServerUrl();
}

// used for testing
public RocksetRequestWrapper(RocksetConnectorConfig config,
OkHttpClient client) {
this.client = client;

parseConnectionString(config.getRocksetApiServerUrl());
this.apiServer = config.getRocksetApiServerUrl();
}

private void parseConnectionString(String integrationKey) {
this.integrationKeyEncoded = base64EncodeAsUserPassword(integrationKey);
}

private static String base64EncodeAsUserPassword(String integrationKey) {
final String userPassword = integrationKey + ":"; // password is empty
return Base64.getEncoder().encodeToString(userPassword.getBytes(StandardCharsets.UTF_8));
}

private boolean isInternalError(int code) {
return code == 500 // INTERNALERROR
|| code == 502
|| code == 503 // NOT_READY
|| code == 504
|| code == 429; // RESOURCEEXCEEDED
}

@Override
public void addDoc(String topic, Collection<SinkRecord> records,
RecordParser recordParser, int batchSize) {
List<KafkaMessage> messages = new LinkedList<>();

for (SinkRecord record : records) {
// if the size exceeds batchsize, send the docs
if (messages.size() >= batchSize) {
sendDocs(topic, messages);
messages.clear();
}

try {
Object key = recordParser.parseKey(record);
Map<String, Object> doc = recordParser.parseValue(record);

KafkaMessage message = new KafkaMessage()
.document(doc)
.key(key)
.offset(record.kafkaOffset())
.partition(record.kafkaPartition());
messages.add(message);
} catch (Exception e) {
throw new ConnectException("Invalid JSON encountered in stream ", e);
}
// used for testing
public RocksetRequestWrapper(RocksetConnectorConfig config,
OkHttpClient client) {
this.client = client;

parseConnectionString(config.getRocksetApiServerUrl());
this.apiServer = config.getRocksetApiServerUrl();
}

private void parseConnectionString(String integrationKey) {
this.integrationKeyEncoded = base64EncodeAsUserPassword(integrationKey);
}

private static String base64EncodeAsUserPassword(String integrationKey) {
final String userPassword = integrationKey + ":"; // password is empty
return Base64.getEncoder().encodeToString(userPassword.getBytes(StandardCharsets.UTF_8));
}

private boolean isInternalError(int code) {
return code == 500 // INTERNALERROR
|| code == 502
|| code == 503 // NOT_READY
|| code == 504
|| code == 429; // RESOURCEEXCEEDED
}

sendDocs(topic, messages);
}

private void sendDocs(String topic, List<KafkaMessage> messages) {
Preconditions.checkArgument(!messages.isEmpty());
log.debug("Sending batch of {} messages for topic: {} to Rockset", messages.size(), topic);

KafkaDocumentsRequest documentsRequest = new KafkaDocumentsRequest()
.kafkaMessages(messages)
.topic(topic);

try {
RequestBody requestBody = RequestBody.create(JSON, mapper.writeValueAsString(documentsRequest));
Request request = new Request.Builder()
.url(this.apiServer + KAFKA_ENDPOINT)
.addHeader("Authorization", "Basic " + integrationKeyEncoded)
.post(requestBody)
.build();

try (Response response = client.newCall(request).execute()) {
if (isInternalError(response.code())) {
// internal errors are retriable
throw new RetriableException(String.format(
"Received internal error code: %s, message: %s. Can Retry.",
response.code(), response.message()));
@Override
public void addDoc(String topic, Collection<SinkRecord> records,
RecordParser recordParser, int batchSize) {
List<KafkaMessage> messages = new LinkedList<>();

for (SinkRecord record : records) {
// if the size exceeds batchsize, send the docs
if (messages.size() >= batchSize) {
sendDocs(topic, messages);
messages.clear();
}

try {
Object key = recordParser.parseKey(record);
List<Map<String, Object>> docs = recordParser.parseValue(record);
docs
.stream()
.map(doc ->
new KafkaMessage()
.document(doc)
.key(key)
.offset(record.kafkaOffset())
.partition(record.kafkaPartition())
).forEach(messages::add);

} catch (Exception e) {
throw new ConnectException("Invalid JSON encountered in stream ", e);
}
}

if (response.code() != 200) {
throw new ConnectException(String.format("Unable to write document"
+ " in Rockset, cause: %s", response.message()));
sendDocs(topic, messages);
}

private void sendDocs(String topic, List<KafkaMessage> messages) {
if (messages.isEmpty()) {
return;
}
log.debug("Sending batch of {} messages for topic: {} to Rockset", messages.size(), topic);

KafkaDocumentsRequest documentsRequest = new KafkaDocumentsRequest()
.kafkaMessages(messages)
.topic(topic);

try {
RequestBody requestBody = RequestBody.create(JSON, mapper.writeValueAsString(documentsRequest));
Request request = new Request.Builder()
.url(this.apiServer + KAFKA_ENDPOINT)
.addHeader("Authorization", "Basic " + integrationKeyEncoded)
.post(requestBody)
.build();

try (Response response = client.newCall(request).execute()) {
if (isInternalError(response.code())) {
// internal errors are retriable
throw new RetriableException(String.format(
"Received internal error code: %s, message: %s. Can Retry.",
response.code(), response.message()));
}

if (response.code() != 200) {
throw new ConnectException(String.format("Unable to write document"
+ " in Rockset, cause: %s", response.message()));
}
}
} catch (SocketTimeoutException ste) {
throw new RetriableException("Encountered socket timeout exception. Can Retry", ste);
} catch (RetriableException e) {
throw e;
} catch (Exception e) {
throw new ConnectException(e);
}
}
} catch (SocketTimeoutException ste) {
throw new RetriableException("Encountered socket timeout exception. Can Retry", ste);
} catch (RetriableException e) {
throw e;
} catch (Exception e) {
throw new ConnectException(e);
}
}
}
Loading

0 comments on commit de7031d

Please sign in to comment.