From de7031dee0e120c9714492806f32a4a21fc6fa4a Mon Sep 17 00:00:00 2001 From: Steven Baldwin Date: Wed, 6 Apr 2022 16:42:20 -0700 Subject: [PATCH] Support json arrays --- src/main/java/rockset/AvroParser.java | 11 +- src/main/java/rockset/RecordParser.java | 14 +- .../java/rockset/RocksetClientWrapper.java | 4 +- .../java/rockset/RocksetRequestWrapper.java | 215 +++++++++--------- src/test/java/rockset/AvroParserTest.java | 30 +-- src/test/java/rockset/JsonParserTest.java | 16 +- .../rockset/RocksetRequestWrapperTest.java | 4 +- 7 files changed, 150 insertions(+), 144 deletions(-) diff --git a/src/main/java/rockset/AvroParser.java b/src/main/java/rockset/AvroParser.java index 53c0aa3..6e3fa13 100644 --- a/src/main/java/rockset/AvroParser.java +++ b/src/main/java/rockset/AvroParser.java @@ -7,7 +7,7 @@ import io.confluent.kafka.serializers.NonRecordContainer; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -34,10 +34,9 @@ class AvroParser implements RecordParser { ); @Override - public Map parseValue(SinkRecord record) { - // If there is no value then return an empty map + public List> parseValue(SinkRecord record) { if (record.value() == null) { - return new HashMap(); + return Collections.emptyList(); } AvroData avroData = new AvroData(1); // arg is cacheSize Object val = avroData.fromConnectData(record.valueSchema(), record.value()); @@ -46,10 +45,10 @@ public Map parseValue(SinkRecord record) { } if (val instanceof Record) { Map map = getMap(val); - return convertLogicalTypesMap(record.valueSchema(), map); + return Collections.singletonList(convertLogicalTypesMap(record.valueSchema(), map)); } - return getMap(val); + return Collections.singletonList(getMap(val)); } private boolean isLogicalType(Schema schema) { diff --git a/src/main/java/rockset/RecordParser.java b/src/main/java/rockset/RecordParser.java index 504a815..4938a95 100644 --- a/src/main/java/rockset/RecordParser.java +++ b/src/main/java/rockset/RecordParser.java @@ -7,6 +7,7 @@ import io.confluent.connect.avro.AvroData; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -19,7 +20,7 @@ public interface RecordParser { - Map parseValue(SinkRecord record); + List> parseValue(SinkRecord record); /** * Parse key from a sink record. @@ -46,14 +47,14 @@ class JsonParser implements RecordParser { private static final ObjectMapper mapper = new ObjectMapper(); @Override - public Map parseValue(SinkRecord record) { + public List> parseValue(SinkRecord record) { Exception cause; // First try to deserialize as map try { if (record.value() == null) { - return new HashMap(); + return Collections.emptyList(); } - return toMap(record.value()); + return Collections.singletonList(toMap(record.value())); } catch (Exception e) { cause = e; } @@ -70,10 +71,7 @@ public Map parseValue(SinkRecord record) { log.warn(message, cause); throw new RuntimeException(message, e); } - - int size = maps.size(); - checkArgument(size == 1, "Only 1 object allowed in list type messages. Found %s", size); - return maps.get(0); + return maps; } private static Map toMap(Object value) throws IOException { diff --git a/src/main/java/rockset/RocksetClientWrapper.java b/src/main/java/rockset/RocksetClientWrapper.java index 71f425a..de4363a 100644 --- a/src/main/java/rockset/RocksetClientWrapper.java +++ b/src/main/java/rockset/RocksetClientWrapper.java @@ -58,8 +58,8 @@ public void addDoc(String topic, Collection records, } try { - Map doc = recordParser.parseValue(record); - messages.add(doc); + List> docs = recordParser.parseValue(record); + messages.addAll(docs); } catch (Exception e) { throw new ConnectException("Invalid JSON encountered in stream", e); diff --git a/src/main/java/rockset/RocksetRequestWrapper.java b/src/main/java/rockset/RocksetRequestWrapper.java index 09fedac..fedfd08 100644 --- a/src/main/java/rockset/RocksetRequestWrapper.java +++ b/src/main/java/rockset/RocksetRequestWrapper.java @@ -27,121 +27,126 @@ import java.util.concurrent.TimeUnit; public class RocksetRequestWrapper implements RocksetWrapper { - private static Logger log = LoggerFactory.getLogger(RocksetRequestWrapper.class); + private static Logger log = LoggerFactory.getLogger(RocksetRequestWrapper.class); - private static final MediaType JSON = MediaType.parse("application/json"); + private static final MediaType JSON = MediaType.parse("application/json"); - private static final String KAFKA_ENDPOINT = "/v1/receivers/kafka"; - private static final ObjectMapper mapper = new ObjectMapper(); + private static final String KAFKA_ENDPOINT = "/v1/receivers/kafka"; + private static final ObjectMapper mapper = new ObjectMapper(); - private OkHttpClient client; - private String integrationKeyEncoded; - private String apiServer; + private OkHttpClient client; + private String integrationKeyEncoded; + private String apiServer; - public RocksetRequestWrapper(RocksetConnectorConfig config) { - if (client == null) { - client = new OkHttpClient.Builder() - .connectTimeout(1, TimeUnit.MINUTES) - .writeTimeout(1, TimeUnit.MINUTES) - .readTimeout(1, TimeUnit.MINUTES) - .build(); + public RocksetRequestWrapper(RocksetConnectorConfig config) { + if (client == null) { + client = new OkHttpClient.Builder() + .connectTimeout(1, TimeUnit.MINUTES) + .writeTimeout(1, TimeUnit.MINUTES) + .readTimeout(1, TimeUnit.MINUTES) + .build(); + } + + parseConnectionString(config.getRocksetIntegrationKey()); + this.apiServer = config.getRocksetApiServerUrl(); } - parseConnectionString(config.getRocksetIntegrationKey()); - this.apiServer = config.getRocksetApiServerUrl(); - } - - // used for testing - public RocksetRequestWrapper(RocksetConnectorConfig config, - OkHttpClient client) { - this.client = client; - - parseConnectionString(config.getRocksetApiServerUrl()); - this.apiServer = config.getRocksetApiServerUrl(); - } - - private void parseConnectionString(String integrationKey) { - this.integrationKeyEncoded = base64EncodeAsUserPassword(integrationKey); - } - - private static String base64EncodeAsUserPassword(String integrationKey) { - final String userPassword = integrationKey + ":"; // password is empty - return Base64.getEncoder().encodeToString(userPassword.getBytes(StandardCharsets.UTF_8)); - } - - private boolean isInternalError(int code) { - return code == 500 // INTERNALERROR - || code == 502 - || code == 503 // NOT_READY - || code == 504 - || code == 429; // RESOURCEEXCEEDED - } - - @Override - public void addDoc(String topic, Collection records, - RecordParser recordParser, int batchSize) { - List messages = new LinkedList<>(); - - for (SinkRecord record : records) { - // if the size exceeds batchsize, send the docs - if (messages.size() >= batchSize) { - sendDocs(topic, messages); - messages.clear(); - } - - try { - Object key = recordParser.parseKey(record); - Map doc = recordParser.parseValue(record); - - KafkaMessage message = new KafkaMessage() - .document(doc) - .key(key) - .offset(record.kafkaOffset()) - .partition(record.kafkaPartition()); - messages.add(message); - } catch (Exception e) { - throw new ConnectException("Invalid JSON encountered in stream ", e); - } + // used for testing + public RocksetRequestWrapper(RocksetConnectorConfig config, + OkHttpClient client) { + this.client = client; + + parseConnectionString(config.getRocksetApiServerUrl()); + this.apiServer = config.getRocksetApiServerUrl(); + } + + private void parseConnectionString(String integrationKey) { + this.integrationKeyEncoded = base64EncodeAsUserPassword(integrationKey); + } + + private static String base64EncodeAsUserPassword(String integrationKey) { + final String userPassword = integrationKey + ":"; // password is empty + return Base64.getEncoder().encodeToString(userPassword.getBytes(StandardCharsets.UTF_8)); + } + + private boolean isInternalError(int code) { + return code == 500 // INTERNALERROR + || code == 502 + || code == 503 // NOT_READY + || code == 504 + || code == 429; // RESOURCEEXCEEDED } - sendDocs(topic, messages); - } - - private void sendDocs(String topic, List messages) { - Preconditions.checkArgument(!messages.isEmpty()); - log.debug("Sending batch of {} messages for topic: {} to Rockset", messages.size(), topic); - - KafkaDocumentsRequest documentsRequest = new KafkaDocumentsRequest() - .kafkaMessages(messages) - .topic(topic); - - try { - RequestBody requestBody = RequestBody.create(JSON, mapper.writeValueAsString(documentsRequest)); - Request request = new Request.Builder() - .url(this.apiServer + KAFKA_ENDPOINT) - .addHeader("Authorization", "Basic " + integrationKeyEncoded) - .post(requestBody) - .build(); - - try (Response response = client.newCall(request).execute()) { - if (isInternalError(response.code())) { - // internal errors are retriable - throw new RetriableException(String.format( - "Received internal error code: %s, message: %s. Can Retry.", - response.code(), response.message())); + @Override + public void addDoc(String topic, Collection records, + RecordParser recordParser, int batchSize) { + List messages = new LinkedList<>(); + + for (SinkRecord record : records) { + // if the size exceeds batchsize, send the docs + if (messages.size() >= batchSize) { + sendDocs(topic, messages); + messages.clear(); + } + + try { + Object key = recordParser.parseKey(record); + List> docs = recordParser.parseValue(record); + docs + .stream() + .map(doc -> + new KafkaMessage() + .document(doc) + .key(key) + .offset(record.kafkaOffset()) + .partition(record.kafkaPartition()) + ).forEach(messages::add); + + } catch (Exception e) { + throw new ConnectException("Invalid JSON encountered in stream ", e); + } } - if (response.code() != 200) { - throw new ConnectException(String.format("Unable to write document" - + " in Rockset, cause: %s", response.message())); + sendDocs(topic, messages); + } + + private void sendDocs(String topic, List messages) { + if (messages.isEmpty()) { + return; + } + log.debug("Sending batch of {} messages for topic: {} to Rockset", messages.size(), topic); + + KafkaDocumentsRequest documentsRequest = new KafkaDocumentsRequest() + .kafkaMessages(messages) + .topic(topic); + + try { + RequestBody requestBody = RequestBody.create(JSON, mapper.writeValueAsString(documentsRequest)); + Request request = new Request.Builder() + .url(this.apiServer + KAFKA_ENDPOINT) + .addHeader("Authorization", "Basic " + integrationKeyEncoded) + .post(requestBody) + .build(); + + try (Response response = client.newCall(request).execute()) { + if (isInternalError(response.code())) { + // internal errors are retriable + throw new RetriableException(String.format( + "Received internal error code: %s, message: %s. Can Retry.", + response.code(), response.message())); + } + + if (response.code() != 200) { + throw new ConnectException(String.format("Unable to write document" + + " in Rockset, cause: %s", response.message())); + } + } + } catch (SocketTimeoutException ste) { + throw new RetriableException("Encountered socket timeout exception. Can Retry", ste); + } catch (RetriableException e) { + throw e; + } catch (Exception e) { + throw new ConnectException(e); } - } - } catch (SocketTimeoutException ste) { - throw new RetriableException("Encountered socket timeout exception. Can Retry", ste); - } catch (RetriableException e) { - throw e; - } catch (Exception e) { - throw new ConnectException(e); } - } } diff --git a/src/test/java/rockset/AvroParserTest.java b/src/test/java/rockset/AvroParserTest.java index 8c15687..d2372ac 100644 --- a/src/test/java/rockset/AvroParserTest.java +++ b/src/test/java/rockset/AvroParserTest.java @@ -292,7 +292,7 @@ private void verifySimpleKey(Object key) throws IOException { Map expectedValue = ImmutableMap.of( "id", 2, "name", "my-name"); - assertEquals(expectedValue, parseValue(sr)); + assertEquals(expectedValue, parseSingleValue(sr)); } // test avro parsing without key @@ -327,7 +327,7 @@ public void testNoKeyAvro() throws IOException { "id", 1234, "details", expectedDetails, "name", "my-name"); - assertEquals(expectedValue, parseValue(sr)); + assertEquals(expectedValue, parseSingleValue(sr)); } @Test @@ -377,7 +377,7 @@ public void testAvroKey() throws IOException { "id", 1234, "details", expectedDetails, "name", "my-name"); - assertEquals(expectedValue, parseValue(sinkRecord)); + assertEquals(expectedValue, parseSingleValue(sinkRecord)); } } @@ -407,7 +407,7 @@ void testTime() { "time5", rocksetTimeType("00:00:00:000000")); SinkRecord sinkRecord = makeSinkRecord(null, null, timeSchema, value); - assertEquals(expected, parseValue(sinkRecord)); + assertEquals(expected, parseSingleValue(sinkRecord)); } @Test @@ -434,8 +434,8 @@ void testDate() { "date5", rocksetDateType("2016/07/18")); SinkRecord sinkRecord = makeSinkRecord(null, null, dateSchema, value); - System.out.println(parseValue(sinkRecord)); - assertEquals(expected, parseValue(sinkRecord)); + System.out.println(parseSingleValue(sinkRecord)); + assertEquals(expected, parseSingleValue(sinkRecord)); } @Test @@ -462,7 +462,7 @@ void testTimestamp() { "timestamp5", rocksetTimestampType(1583966531712L)); SinkRecord sinkRecord = makeSinkRecord(null, null, detailsSchema, value); - assertEquals(expected, parseValue(sinkRecord)); + assertEquals(expected, parseSingleValue(sinkRecord)); } @Test @@ -483,7 +483,7 @@ void testMultipleLogicalTypes() { "date", rocksetDateType("1970/01/02")); SinkRecord sinkRecord = makeSinkRecord(null, null, valueSchema, value); - assertEquals(expected, parseValue(sinkRecord)); + assertEquals(expected, parseSingleValue(sinkRecord)); } @Test @@ -516,7 +516,7 @@ void testNestedStructLogicalTypes() { "nested3", ImmutableMap.of("timestamp", rocksetTimestampType(1234L))); SinkRecord sr = makeSinkRecord(null, null, valueSchema, value); - assertEquals(expectedValue, parseValue(sr)); + assertEquals(expectedValue, parseSingleValue(sr)); } @Test @@ -541,7 +541,7 @@ void testNestedMapLogical() { "nested3", ImmutableMap.of("timestamp", rocksetTimestampType(1234L))); SinkRecord sr = makeSinkRecord(null, null, valueSchema, value); - assertEquals(expectedValue, parseValue(sr)); + assertEquals(expectedValue, parseSingleValue(sr)); } @Test @@ -584,7 +584,7 @@ void testArrayLogicalTypes() { rocksetTimestampType(1234L))); SinkRecord sr = makeSinkRecord(null, null, valueSchema, value); - assertEquals(expectedValue, parseValue(sr)); + assertEquals(expectedValue, parseSingleValue(sr)); } @Test @@ -633,7 +633,7 @@ void testNestedArrayLogicalTypes() { rocksetTimestampType(1234L)))); SinkRecord sr = makeSinkRecord(null, null, valueSchema, value); - assertEquals(expectedValue, parseValue(sr)); + assertEquals(expectedValue, parseSingleValue(sr)); } private ImmutableMap rocksetTimestampType(Object time) { @@ -648,8 +648,8 @@ private ImmutableMap rocksetTimeType(Object time) { return ImmutableMap.of( "__rockset_type", "time", "value", time); } - private Map parseValue(SinkRecord sinkRecord) { - return new AvroParser().parseValue(sinkRecord); + private Map parseSingleValue(SinkRecord sinkRecord) { + return new AvroParser().parseValue(sinkRecord).get(0); } private SinkRecord makeSinkRecord( @@ -658,4 +658,4 @@ private SinkRecord makeSinkRecord( return new SinkRecord("topic", 0, keySchema, key, valueSchema, value, 1); } -} \ No newline at end of file +} diff --git a/src/test/java/rockset/JsonParserTest.java b/src/test/java/rockset/JsonParserTest.java index e90fb26..090e1e5 100644 --- a/src/test/java/rockset/JsonParserTest.java +++ b/src/test/java/rockset/JsonParserTest.java @@ -19,7 +19,7 @@ public class JsonParserTest { public void testBasicJsonObject() { Map obj = ImmutableMap.of("foo", "bar", "foo2", "bar2"); SinkRecord sr = new SinkRecord("test-topic", 1, null, null, null, serialize(obj), 0); - assertEquals(obj, parseValue(sr)); + assertEquals(obj, parseSingleValue(sr)); } @Test @@ -27,7 +27,7 @@ public void testListTypeOneObject() { List> obj = ImmutableList.of( ImmutableMap.of("foo", "bar", "foo2", "bar2")); SinkRecord sr = new SinkRecord("test-topic", 1, null, null, null, serialize(obj), 0); - assertEquals(obj.get(0), parseValue(sr)); + assertEquals(obj.get(0), parseSingleValue(sr)); } @Test @@ -37,14 +37,14 @@ public void testListTypeMultipleObjects() { ImmutableMap.of("foo", "bar", "foo2", "bar2"), ImmutableMap.of("foo", "bar", "foo2", "bar2")); SinkRecord sr = new SinkRecord("test-topic", 1, null, null, null, serialize(obj), 0); - assertThrows(RuntimeException.class, () -> parseValue(sr)); + assertEquals(obj, parseList(sr)); } @Test public void testEmptyList() { List> obj = ImmutableList.of(); SinkRecord sr = new SinkRecord("test-topic", 1, null, null, null, serialize(obj), 0); - assertThrows(RuntimeException.class, () -> parseValue(sr)); + assertThrows(RuntimeException.class, () -> parseSingleValue(sr)); } private Object serialize(Object obj) { @@ -55,9 +55,13 @@ private Object serialize(Object obj) { } } - public Map parseValue(SinkRecord record) { + public Map parseSingleValue(SinkRecord record) { + return parseList(record).get(0); + } + + public List> parseList(SinkRecord record) { return new JsonParser().parseValue(record); } -} \ No newline at end of file +} diff --git a/src/test/java/rockset/RocksetRequestWrapperTest.java b/src/test/java/rockset/RocksetRequestWrapperTest.java index 5716dd3..952d245 100644 --- a/src/test/java/rockset/RocksetRequestWrapperTest.java +++ b/src/test/java/rockset/RocksetRequestWrapperTest.java @@ -90,7 +90,7 @@ public void testAddDocNullValue() throws Exception { new RocksetRequestWrapper(rcc, client); rrw.addDoc("testPut", Arrays.asList(sr), new JsonParser(), 10); - Mockito.verify(client, Mockito.times(1)).newCall(Mockito.any()); + Mockito.verify(client, Mockito.times(0)).newCall(Mockito.any()); } @Test @@ -177,6 +177,6 @@ public void testAddDocAvroNullValue() throws Exception { rrw.addDoc("testPut", Arrays.asList(sr), new AvroParser(), 10); - Mockito.verify(client, Mockito.times(1)).newCall(Mockito.any()); + Mockito.verify(client, Mockito.times(0)).newCall(Mockito.any()); } }