diff --git a/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceConfig.java b/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceConfig.java index 841b284..6985cd3 100644 --- a/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceConfig.java +++ b/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceConfig.java @@ -29,15 +29,19 @@ public class MongodbSourceConfig extends AbstractConfig { public static final String CONVERTER_CLASS = "converter.class"; private static final String CONVERTER_CLASS_DOC = "Converter class used to transform a mongodb oplog in a kafka message"; + public static final String CUSTOM_SCHEMA = "custom.schema"; + private static final String CUSTOM_SCHEMA_DOC = "Flag that tells if a custom schema will be obtained from the data"; + public static ConfigDef config = new ConfigDef() - .define(URI, Type.STRING, Importance.HIGH, URI_DOC) - .define(HOST, Type.STRING, Importance.HIGH, HOST_DOC) - .define(PORT, Type.INT, Importance.HIGH, PORT_DOC) - .define(BATCH_SIZE, Type.INT, Importance.HIGH, BATCH_SIZE_DOC) - .define(SCHEMA_NAME, Type.STRING, Importance.HIGH, SCHEMA_NAME_DOC) - .define(TOPIC_PREFIX, Type.STRING, Importance.LOW, TOPIC_PREFIX_DOC) - .define(CONVERTER_CLASS, Type.STRING, StringStructConverter.class.getName(), Importance.LOW, CONVERTER_CLASS_DOC) - .define(DATABASES, Type.STRING, Importance.LOW, DATABASES_DOC); + .define(URI, Type.STRING, Importance.HIGH, URI_DOC) + .define(HOST, Type.STRING, Importance.HIGH, HOST_DOC) + .define(PORT, Type.INT, Importance.HIGH, PORT_DOC) + .define(BATCH_SIZE, Type.INT, Importance.HIGH, BATCH_SIZE_DOC) + .define(SCHEMA_NAME, Type.STRING, Importance.HIGH, SCHEMA_NAME_DOC) + .define(TOPIC_PREFIX, Type.STRING, Importance.LOW, TOPIC_PREFIX_DOC) + .define(CUSTOM_SCHEMA, Type.BOOLEAN, false, Importance.LOW, CUSTOM_SCHEMA_DOC) + .define(CONVERTER_CLASS, Type.STRING, StringStructConverter.class.getName(), Importance.LOW, CONVERTER_CLASS_DOC) + .define(DATABASES, Type.STRING, Importance.LOW, DATABASES_DOC); public MongodbSourceConfig(Map props) { super(config, props); diff --git a/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceConnector.java b/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceConnector.java index 0d60920..4254924 100644 --- a/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceConnector.java +++ b/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceConnector.java @@ -21,6 +21,7 @@ import static org.apache.kafka.connect.mongodb.MongodbSourceConfig.TOPIC_PREFIX; import static org.apache.kafka.connect.mongodb.MongodbSourceConfig.CONVERTER_CLASS; import static org.apache.kafka.connect.mongodb.MongodbSourceConfig.DATABASES; +import static org.apache.kafka.connect.mongodb.MongodbSourceConfig.CUSTOM_SCHEMA; /** * MongodbSourceConnector implements the connector interface @@ -39,6 +40,7 @@ public class MongodbSourceConnector extends SourceConnector { private String topicPrefix; private String converterClass; private String databases; + private String customSchema; /** * Get the version of this connector. @@ -64,14 +66,14 @@ public void start(Map map) { if (uri == null || uri.isEmpty()){ host = map.get(HOST); if (host == null || host.isEmpty()){ - throw new ConnectException("Missing " + HOST + "or " + URI + " config"); + throw new ConnectException("Missing " + HOST + "or " + URI + " config"); } - - port = map.get(PORT); + + port = map.get(PORT); if (port == null || port.isEmpty()){ throw new ConnectException("Missing " + PORT + "or " + URI + " config"); } - } + } schemaName = map.get(SCHEMA_NAME); if (schemaName == null || schemaName.isEmpty()) throw new ConnectException("Missing " + SCHEMA_NAME + " config"); @@ -83,9 +85,15 @@ public void start(Map map) { databases = map.get(DATABASES); topicPrefix = map.get(TOPIC_PREFIX); - + converterClass = map.get(CONVERTER_CLASS); + if(map.containsKey(CUSTOM_SCHEMA)) + customSchema = map.get(CUSTOM_SCHEMA); + else + customSchema = null; + + LogUtils.dumpConfiguration(map, log); } @@ -115,16 +123,18 @@ public List> taskConfigs(int maxTasks) { Map config = new HashMap<>(); config.put(URI, uri); if(host!=null){ - config.put(HOST, host); + config.put(HOST, host); } if(port!=null){ - config.put(PORT, port); + config.put(PORT, port); } config.put(SCHEMA_NAME, schemaName); config.put(BATCH_SIZE, batchSize); config.put(TOPIC_PREFIX, topicPrefix); config.put(CONVERTER_CLASS, converterClass); config.put(DATABASES, StringUtils.join(dbsGrouped.get(i), ",")); + if(customSchema != null) + config.put(CUSTOM_SCHEMA, customSchema); configs.add(config); } return configs; @@ -143,4 +153,4 @@ public ConfigDef config () { } -} +} \ No newline at end of file diff --git a/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceTask.java b/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceTask.java index ce9400d..af1785c 100644 --- a/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceTask.java +++ b/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceTask.java @@ -1,12 +1,6 @@ package org.apache.kafka.connect.mongodb; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - +import com.mongodb.MongoClient; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -16,11 +10,16 @@ import org.apache.kafka.connect.mongodb.converter.StructConverter; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.utils.ConverterUtils; +import org.bson.BsonDocument; import org.bson.BsonTimestamp; +import org.bson.BsonValue; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.*; + /** * MongodbSourceTask is a Task that reads mutations from a mongodb for storage in Kafka. * @@ -41,6 +40,8 @@ public class MongodbSourceTask extends SourceTask { private MongodbReader reader; + private Boolean customSchema = false; + Map, Map> offsets = new HashMap<>(0); @@ -57,14 +58,14 @@ public String version() { */ @Override public void start(Map map) { - if(map.containsKey(MongodbSourceConfig.PORT)){ - try { - port = Integer.parseInt(map.get(MongodbSourceConfig.PORT)); - } catch (Exception e) { - throw new ConnectException(MongodbSourceConfig.PORT + " config should be an Integer"); - } - } - + if(map.containsKey(MongodbSourceConfig.PORT)){ + try { + port = Integer.parseInt(map.get(MongodbSourceConfig.PORT)); + } catch (Exception e) { + throw new ConnectException(MongodbSourceConfig.PORT + " config should be an Integer"); + } + } + try { batchSize = Integer.parseInt(map.get(MongodbSourceConfig.BATCH_SIZE)); } catch (Exception e) { @@ -75,18 +76,26 @@ public void start(Map map) { topicPrefix = map.get(MongodbSourceConfig.TOPIC_PREFIX); uri = map.get(MongodbSourceConfig.URI); host = map.get(MongodbSourceConfig.HOST); - + try{ String structConverterClass = map.get(MongodbSourceConfig.CONVERTER_CLASS); if(structConverterClass == null || structConverterClass.isEmpty()){ - structConverterClass = StringStructConverter.class.getName(); + structConverterClass = StringStructConverter.class.getName(); } structConverter = (StructConverter) Class.forName(structConverterClass).newInstance(); } catch(Exception e){ - throw new ConnectException(MongodbSourceConfig.CONVERTER_CLASS + " config should be a class of type StructConverter"); + throw new ConnectException(MongodbSourceConfig.CONVERTER_CLASS + " config should be a class of type StructConverter"); + } + + try{ + if(map.containsKey(MongodbSourceConfig.CUSTOM_SCHEMA)) + customSchema = Boolean.parseBoolean(map.get(MongodbSourceConfig.CUSTOM_SCHEMA)); } - + catch (Exception e){ + throw new ConnectException(MongodbSourceConfig.CUSTOM_SCHEMA + " config should be a Boolean"); + } + databases = Arrays.asList(map.get(MongodbSourceConfig.DATABASES).split(",")); log.trace("Creating schema"); @@ -98,24 +107,24 @@ public void start(Map map) { db = db.replaceAll("[\\s.]", "_"); if (schemas.get(db) == null) schemas.put(db, - SchemaBuilder - .struct() - .name(schemaName.concat("_").concat(db)) - .field("timestamp", Schema.OPTIONAL_INT32_SCHEMA) - .field("order", Schema.OPTIONAL_INT32_SCHEMA) - .field("operation", Schema.OPTIONAL_STRING_SCHEMA) - .field("database", Schema.OPTIONAL_STRING_SCHEMA) - .field("object", Schema.OPTIONAL_STRING_SCHEMA) - .build()); + SchemaBuilder + .struct() + .name(schemaName.concat("_").concat(db)) + .field("timestamp", Schema.OPTIONAL_INT32_SCHEMA) + .field("order", Schema.OPTIONAL_INT32_SCHEMA) + .field("operation", Schema.OPTIONAL_STRING_SCHEMA) + .field("database", Schema.OPTIONAL_STRING_SCHEMA) + .field("object", Schema.OPTIONAL_STRING_SCHEMA) + .build()); } loadOffsets(); - + if(uri != null){ - reader = new MongodbReader(uri, databases, batchSize, offsets); + reader = new MongodbReader(uri, databases, batchSize, offsets); } else{ - reader = new MongodbReader(host, port, databases, batchSize, offsets); + reader = new MongodbReader(host, port, databases, batchSize, offsets); } reader.run(); } @@ -130,12 +139,55 @@ public void start(Map map) { public List poll() throws InterruptException { List records = new ArrayList<>(); while (!reader.isEmpty()) { - Document message = reader.pool(); + Document message = reader.pool(); Struct messageStruct = getStruct(message); String topic = getTopic(message); String db = getDB(message); String timestamp = getTimestamp(message); - records.add(new SourceRecord(Collections.singletonMap("mongodb", db), Collections.singletonMap(db, timestamp), topic, messageStruct.schema(), messageStruct)); + Object objectKey = ((Map) message.get("o")).get("_id"); + + //key is _id + if(customSchema){ + //get the key object + BsonDocument bson = message + .toBsonDocument( + BsonDocument.class, MongoClient.getDefaultCodecRegistry()); + BsonValue bsonId = bson.get("o").asDocument().get("_id"); + Schema keySchema = null; + Object key = null; + if(bsonId.isObjectId()){ + keySchema = Schema.STRING_SCHEMA; + key = bsonId.asObjectId().getValue().toString(); + } + else if(bsonId.isDocument()){ + keySchema = ConverterUtils.createDynamicSchema((Map) objectKey, false); + key = ConverterUtils.createDynamicStruct(keySchema, (Map) objectKey, false); + } + else{ + keySchema = Schema.STRING_SCHEMA; + key = String.valueOf(objectKey); + } + + records.add(new SourceRecord( + Collections.singletonMap("mongodb", db), + Collections.singletonMap(db, timestamp), + topic, + keySchema, + key, + messageStruct.schema(), + messageStruct)); + } + + //key null + else { + records.add(new SourceRecord( + Collections.singletonMap("mongodb", db), + Collections.singletonMap(db, timestamp), + topic, + messageStruct.schema(), + messageStruct) + ); + } log.trace(message.toString()); } @@ -148,9 +200,9 @@ public List poll() throws InterruptException { */ @Override public void stop() { - if(reader != null){ - reader.stop(); - } + if(reader != null){ + reader.stop(); + } } /** @@ -163,10 +215,10 @@ private String getTopic(Document message) { String database = ((String) message.get("ns")).replaceAll("[\\s.]", "_"); if (topicPrefix != null && !topicPrefix.isEmpty()) { return new StringBuilder() - .append(topicPrefix) - .append("_") - .append(database) - .toString(); + .append(topicPrefix) + .append("_") + .append(database) + .toString(); } return database; } @@ -190,10 +242,10 @@ private String getDB(Document message) { private String getTimestamp(Document message) { BsonTimestamp timestamp = (BsonTimestamp) message.get("ts"); return new StringBuilder() - .append(timestamp.getTime()) - .append("_") - .append(timestamp.getInc()) - .toString(); + .append(timestamp.getTime()) + .append("_") + .append(timestamp.getInc()) + .toString(); } /** @@ -203,8 +255,8 @@ private String getTimestamp(Document message) { * @return message formatted as a Struct */ private Struct getStruct(Document message) { - final Schema schema = schemas.get(getDB(message).replaceAll("[\\s.]", "_")); - return structConverter.toStruct(message, schema); + final Schema schema = schemas.get(getDB(message).replaceAll("[\\s.]", "_")); + return structConverter.toStruct(message, schema, customSchema); } /** @@ -218,4 +270,4 @@ private void loadOffsets() { } offsets.putAll(context.offsetStorageReader().offsets(partitions)); } -} +} \ No newline at end of file diff --git a/src/main/java/org/apache/kafka/connect/mongodb/converter/JsonStructConverter.java b/src/main/java/org/apache/kafka/connect/mongodb/converter/JsonStructConverter.java index 87e3334..e8b8d3a 100644 --- a/src/main/java/org/apache/kafka/connect/mongodb/converter/JsonStructConverter.java +++ b/src/main/java/org/apache/kafka/connect/mongodb/converter/JsonStructConverter.java @@ -7,13 +7,13 @@ /** * Struct converter who stores mongodb document as Json String. - * + * * @author André Ignacio */ public class JsonStructConverter implements StructConverter { @Override - public Struct toStruct(Document document, Schema schema) { + public Struct toStruct(Document document, Schema schema, Boolean customSchema) { final Struct messageStruct = new Struct(schema); final BsonTimestamp bsonTimestamp = (BsonTimestamp) document.get("ts"); final Integer seconds = bsonTimestamp.getTime(); @@ -22,7 +22,7 @@ public Struct toStruct(Document document, Schema schema) { messageStruct.put("order", order); messageStruct.put("operation", document.get("op")); messageStruct.put("database", document.get("ns")); - + final Document modifiedDocument = (Document) document.get("o"); messageStruct.put("object", modifiedDocument.toJson()); diff --git a/src/main/java/org/apache/kafka/connect/mongodb/converter/StringStructConverter.java b/src/main/java/org/apache/kafka/connect/mongodb/converter/StringStructConverter.java index 07f7b56..45a82b4 100644 --- a/src/main/java/org/apache/kafka/connect/mongodb/converter/StringStructConverter.java +++ b/src/main/java/org/apache/kafka/connect/mongodb/converter/StringStructConverter.java @@ -2,29 +2,50 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.utils.ConverterUtils; import org.bson.BsonTimestamp; import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; /** * Default struct converter. This converter store mongodb document with .toString(). - * + * * @author André Ignacio */ public class StringStructConverter implements StructConverter { - @Override - public Struct toStruct(Document document, Schema schema) { - Struct messageStruct = new Struct(schema); - BsonTimestamp bsonTimestamp = (BsonTimestamp) document.get("ts"); - Integer seconds = bsonTimestamp.getTime(); - Integer order = bsonTimestamp.getInc(); - messageStruct.put("timestamp", seconds); - messageStruct.put("order", order); - messageStruct.put("operation", document.get("op")); - messageStruct.put("database", document.get("ns")); - messageStruct.put("object", document.get("o").toString()); - - return messageStruct; - } + private final static Logger log = LoggerFactory.getLogger(StringStructConverter.class); + + @Override + public Struct toStruct(Document document, Schema schema, Boolean getCustomSchema) { + Struct messageStruct = null; + if(getCustomSchema){ + Schema customSchema = ConverterUtils.createDynamicSchema((Map)document.get("o"), true); + messageStruct = ConverterUtils.createDynamicStruct(customSchema, (Map)document.get("o"), true); + if(customSchema == null || messageStruct == null){ + return getDefault(document, schema); + } + } + else{ + messageStruct = getDefault(document, schema); + } + + return messageStruct; + } + private Struct getDefault(Document document, Schema schema){ + Struct messageStruct = new Struct(schema); + BsonTimestamp bsonTimestamp = (BsonTimestamp) document.get("ts"); + Integer seconds = bsonTimestamp.getTime(); + Integer order = bsonTimestamp.getInc(); + messageStruct.put("timestamp", seconds); + messageStruct.put("order", order); + messageStruct.put("operation", document.get("op")); + messageStruct.put("database", document.get("ns")); + messageStruct.put("object", document.get("o").toString()); + return messageStruct; + } } diff --git a/src/main/java/org/apache/kafka/connect/mongodb/converter/StructConverter.java b/src/main/java/org/apache/kafka/connect/mongodb/converter/StructConverter.java index 8e75c56..dfddc5f 100644 --- a/src/main/java/org/apache/kafka/connect/mongodb/converter/StructConverter.java +++ b/src/main/java/org/apache/kafka/connect/mongodb/converter/StructConverter.java @@ -6,10 +6,10 @@ /** * Converter a document in a Struct. - * + * * @author André Ignacio */ public interface StructConverter { - Struct toStruct(Document document, Schema schema); + Struct toStruct(Document document, Schema schema, Boolean customSchema); } diff --git a/src/main/java/org/apache/kafka/connect/utils/ConverterUtils.java b/src/main/java/org/apache/kafka/connect/utils/ConverterUtils.java new file mode 100644 index 0000000..66c991b --- /dev/null +++ b/src/main/java/org/apache/kafka/connect/utils/ConverterUtils.java @@ -0,0 +1,247 @@ +package org.apache.kafka.connect.utils; + +import com.mongodb.MongoClient; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.bson.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class ConverterUtils { + + private final static Logger log = LoggerFactory.getLogger(ConverterUtils.class); + + public static Struct createDynamicStruct(Schema customSchema, Map doc, Boolean skipId) { + Struct customStruct = new Struct(customSchema); + Iterator itKeys = doc.keySet().iterator(); + while(itKeys.hasNext()){ + String key = itKeys.next(); + Object value = doc.get(key); + log.debug("key_Struct: " + key); + log.debug("value_Struct: " + doc.get(key)); + Document generic = new Document(key, value); + BsonDocument bsonDocument = + generic.toBsonDocument( + BsonDocument.class, MongoClient.getDefaultCodecRegistry()); + BsonValue bsonValue = bsonDocument.get(key); + + try { + //skip "_id" + if (key.equals("_id") && skipId) { + continue; + } + + //value is an array + else if (bsonValue.isArray()) { + BsonArray bsonArray = bsonValue.asArray(); + //empty array|isString|isInt32|isInt64|isDouble|isBoolean + if (bsonArray.size() == 0 + || ( !bsonArray.get(0).isDocument() && !bsonArray.get(0).isDateTime())) { + customStruct.put(key, value); + } + + //array of Documents TODO is not working yet + else if (bsonArray.get(0).isDocument()) { +// List objectList = (List) value; +// Schema elementSchema = createDynamicSchema((Map) objectList.get(0)); +// List structList = new ArrayList<>(); +// for(Object object : objectList){ +// structList.add(createDynamicStruct(elementSchema, (Map)object)); +// } +// customStruct.put(key, structList); + List docStrList = new ArrayList<>(); + for(BsonValue element: bsonArray){ + docStrList.add(element.asDocument().toJson()); + } + customStruct.put(key, docStrList); + } + + //array of Dates + else { + List dateStrList = new ArrayList<>(); + for(BsonValue bsonDate: bsonArray){ + long bsonDateLong = bsonDate.asDateTime().getValue(); + LocalDateTime date = LocalDateTime.ofInstant(Instant.ofEpochMilli(bsonDateLong), + ZoneId.of("Z")); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + dateStrList.add(date.format(formatter)); + } + customStruct.put(key, dateStrList); + } + } + + //value isDocument + else if (bsonValue.isDocument() && !bsonValue.isObjectId()) { + customStruct.put(key, createDynamicStruct( + customSchema.field(key).schema(), (Map) value, skipId)); + } + + //value isDateTime + else if (bsonValue.isDateTime()) { + long bsonDateLong = bsonValue.asDateTime().getValue(); + LocalDateTime date = LocalDateTime.ofInstant(Instant.ofEpochMilli(bsonDateLong), + ZoneId.of("Z")); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + customStruct.put(key, date.format(formatter)); + } + + //value is undefined + else if(value instanceof BsonUndefined){ + log.debug("key: {} is undefined", key); + continue; + } + + //value is undefined + else if(bsonValue.isNull()){ + log.debug("key: {} is null", key); + continue; + } + + //value isString|isInt64|isInt32|isDouble|isBoolean + else { + customStruct.put(key, value); + } + } + catch (Exception e){ + e.printStackTrace(); + log.error("Error while getting dynamicStruct: " + e.getMessage()); + return null; + } + } + return customStruct; + } + + public static Schema createDynamicSchema(Map doc, Boolean skipId){ + + SchemaBuilder schemaBuilder = SchemaBuilder.struct(); + Iterator itKeys = doc.keySet().iterator(); + while(itKeys.hasNext()){ + String key = itKeys.next(); + Object value = doc.get(key); + Document generic = new Document(key, value); + BsonDocument bsonDocument = + generic.toBsonDocument( + BsonDocument.class, MongoClient.getDefaultCodecRegistry()); + BsonValue bsonValue = bsonDocument.get(key); + + //skip "_id" + if(key.equals("_id") && skipId){ + continue; + } + + //value isString|isDateTime + else if(bsonValue.isString() || bsonValue.isDateTime()){ + schemaBuilder.field(key, Schema.STRING_SCHEMA); + } + + //value is an array + else if(bsonValue.isArray()){ + BsonArray bsonArray = bsonValue.asArray(); + //it is an array of string or the array is empty + schemaBuilder.field(key, getSchemaFromList(key, value)); + } + + //value isDocument + else if(bsonValue.isDocument()){ + log.debug("key: {} isDocument", key); + schemaBuilder.field(key, createDynamicSchema((Map) value, skipId)); + } + + //value isInt64 + else if(bsonValue.isInt64()){ + log.debug("key: {} isInt64", key); + schemaBuilder.field(key, Schema.INT64_SCHEMA); + } + + //value isInt32 + else if(bsonValue.isInt32()){ + log.debug("key: {} isInt32", key); + schemaBuilder.field(key, Schema.INT32_SCHEMA); + } + + //value isDouble + else if(bsonValue.isDouble()){ + schemaBuilder.field(key, Schema.FLOAT64_SCHEMA); + } + + //value isBoolean + else if(bsonValue.isBoolean()){ + schemaBuilder.field(key, Schema.BOOLEAN_SCHEMA); + } + + //value is undefined + else if(value instanceof BsonUndefined){ + log.debug("key: {} is undefined", key); + continue; + } + + //value is undefined + else if(bsonValue.isNull()){ + log.debug("key: {} is null", key); + continue; + } + + else{ + log.error("schema option not implemented"); + return null; + } + } + return schemaBuilder.build(); + } + + private static Schema getSchemaFromList(String key, Object value){ + Document generic = new Document(key, value); + BsonDocument bsonDocument = + generic.toBsonDocument( + BsonDocument.class, MongoClient.getDefaultCodecRegistry()); + BsonArray array = bsonDocument.get(key).asArray(); + + if(array.size() == 0 + || array.get(0).isString() || array.get(0).isDateTime()){ + return SchemaBuilder.array(Schema.STRING_SCHEMA).schema(); + } + + //array of Int32 + else if(array.get(0).isInt32()){ + return SchemaBuilder.array(Schema.INT32_SCHEMA).schema(); + } + + //array of Int64 + else if(array.get(0).isInt64()){ + return SchemaBuilder.array(Schema.INT64_SCHEMA).schema(); + } + + //array of isDouble + else if(array.get(0).isDouble()){ + return SchemaBuilder.array(Schema.FLOAT64_SCHEMA).schema(); + } + + //array of isBoolean + else if(array.get(0).isBoolean()){ + return SchemaBuilder.array(Schema.BOOLEAN_SCHEMA).schema(); + } + + //array of isDocument TODO not working yet + else if(array.get(0).isDocument()){ +// List objectList = (List)value; +// log.debug("objectList(0): {}", objectList.get(0)); +// Schema elementSchema = createDynamicSchema((Map) objectList.get(0)); +// return SchemaBuilder.array(elementSchema).build(); + return SchemaBuilder.array(Schema.STRING_SCHEMA).build(); + } + else { + log.error("array schema option not implemented"); + return null; + } + } +} \ No newline at end of file