From 2754cae9fc023a4e1b5d0fa86c06ad80e687989f Mon Sep 17 00:00:00 2001 From: Iosif Nicolae Date: Thu, 1 Sep 2022 15:11:18 +0300 Subject: [PATCH 1/6] try to build a json stream reader --- Dockerfile | 6 ++ .../s3/formats/JsonFormatStreamReader.scala | 66 +++++++++++++++++++ .../aws/s3/formats/S3FormatStreamReader.scala | 2 +- 3 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 Dockerfile create mode 100644 kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..459540449 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,6 @@ +# Sample docker file +FROM hseeberger/scala-sbt:8u222_1.3.5_2.13.1 as BUILD_S3_CONNECTOR_PLUGIN +WORKDIR /tmp +RUN apt-get update && apt-get install -y git +RUN git clone https://github.com/lensesio/stream-reactor.git +RUN cd /tmp/stream-reactor && sbt "project aws-s3-kafka-3-1" compile && sbt "project aws-s3-kafka-3-1" assembly \ No newline at end of file diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala new file mode 100644 index 000000000..9262dcb90 --- /dev/null +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala @@ -0,0 +1,66 @@ +/* + * Copyright 2020 Lenses.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lenses.streamreactor.connect.aws.s3.formats + +import org.apache.kafka.connect.json.JsonConverter +import io.lenses.streamreactor.connect.aws.s3.model.SchemaAndValueSourceData +import io.lenses.streamreactor.connect.aws.s3.model.location.RemoteS3PathLocation +import org.apache.avro.file.DataFileStream +import org.apache.avro.generic.GenericDatumReader +import org.apache.avro.generic.GenericRecord + +import java.io.InputStream +import scala.util.Try + +class JsonFormatStreamReader(inputStreamFn: () => InputStream, bucketAndPath: RemoteS3PathLocation) + extends S3FormatStreamReader[SchemaAndValueSourceData] { + + private val inputStream: InputStream = inputStreamFn() + private val source = Source.fromInputStream(inputStream, "UTF-8") + protected val sourceLines = source.getLines() + protected var lineNumber: Long = -1 + + private val jsonConverter = new JsonConverter + + jsonConverter.configure( + Map("schemas.enable" -> false).asJava, + false, + ) + + override def close(): Unit = { + val _ = Try(source.close()) + } + + override def hasNext: Boolean = sourceLines.hasNext + + + override def next(): StringSourceData = { + lineNumber += 1 + if (!sourceLines.hasNext) { + throw FormatWriterException( + "Invalid state reached: the file content has been consumed, no further calls to next() are possible.", + ) + } + val value = sourceLines.next() + val schemaAndValue = jsonConverter.toConnectData(this.topic, value) + SchemaAndValueSourceData(schemaAndValue, lineNumber) + } + + override def getBucketAndPath: RemoteS3PathLocation = bucketAndPath + + override def getLineNumber: Long = lineNumber +} diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/S3FormatStreamReader.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/S3FormatStreamReader.scala index 021ca31e2..981eb980c 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/S3FormatStreamReader.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/S3FormatStreamReader.scala @@ -34,7 +34,7 @@ object S3FormatStreamReader { ): S3FormatStreamReader[_ <: SourceData] = format.format match { case Format.Avro => new AvroFormatStreamReader(inputStreamFn, bucketAndPath) - case Format.Json => new TextFormatStreamReader(inputStreamFn, bucketAndPath) + case Format.Json => new JsonFormatStreamReader(inputStreamFn, bucketAndPath) case Format.Text => new TextFormatStreamReader(inputStreamFn, bucketAndPath) case Format.Parquet => new ParquetFormatStreamReader(inputStreamFn, fileSizeFn, bucketAndPath) case Format.Csv => From 0f0ea59b7d8868b26860cb071fc71c7601917800 Mon Sep 17 00:00:00 2001 From: Iosif Nicolae Date: Thu, 1 Sep 2022 13:00:04 +0000 Subject: [PATCH 2/6] add json converter --- .../s3/formats/JsonFormatStreamReader.scala | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala index 9262dcb90..d03dfb5b8 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala @@ -16,16 +16,15 @@ package io.lenses.streamreactor.connect.aws.s3.formats -import org.apache.kafka.connect.json.JsonConverter import io.lenses.streamreactor.connect.aws.s3.model.SchemaAndValueSourceData import io.lenses.streamreactor.connect.aws.s3.model.location.RemoteS3PathLocation -import org.apache.avro.file.DataFileStream -import org.apache.avro.generic.GenericDatumReader -import org.apache.avro.generic.GenericRecord import java.io.InputStream +import scala.io.Source import scala.util.Try +import org.apache.kafka.connect.json.JsonConverter + class JsonFormatStreamReader(inputStreamFn: () => InputStream, bucketAndPath: RemoteS3PathLocation) extends S3FormatStreamReader[SchemaAndValueSourceData] { @@ -36,10 +35,10 @@ class JsonFormatStreamReader(inputStreamFn: () => InputStream, bucketAndPath: Re private val jsonConverter = new JsonConverter - jsonConverter.configure( - Map("schemas.enable" -> false).asJava, - false, - ) +// jsonConverter.configure( +// Map("schemas.enable" -> false).asJava, +// false, +// ) override def close(): Unit = { val _ = Try(source.close()) @@ -48,15 +47,15 @@ class JsonFormatStreamReader(inputStreamFn: () => InputStream, bucketAndPath: Re override def hasNext: Boolean = sourceLines.hasNext - override def next(): StringSourceData = { + override def next(): SchemaAndValueSourceData = { lineNumber += 1 if (!sourceLines.hasNext) { throw FormatWriterException( "Invalid state reached: the file content has been consumed, no further calls to next() are possible.", ) } - val value = sourceLines.next() - val schemaAndValue = jsonConverter.toConnectData(this.topic, value) + val value = sourceLines.next().getBytes(); + val schemaAndValue = jsonConverter.toConnectData("", value) SchemaAndValueSourceData(schemaAndValue, lineNumber) } From c6696c55bde6c8d803d42b157c6f04210e2e3825 Mon Sep 17 00:00:00 2001 From: Iosif Nicolae Date: Thu, 1 Sep 2022 13:29:01 +0000 Subject: [PATCH 3/6] check for null values --- .../connect/aws/s3/formats/JsonFormatStreamReader.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala index d03dfb5b8..6fbd71b6f 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala @@ -54,8 +54,9 @@ class JsonFormatStreamReader(inputStreamFn: () => InputStream, bucketAndPath: Re "Invalid state reached: the file content has been consumed, no further calls to next() are possible.", ) } - val value = sourceLines.next().getBytes(); - val schemaAndValue = jsonConverter.toConnectData("", value) + val genericRecordData = sourceLines.next(); + val genericRecord = if (genericRecordData == null) null else genericRecordData.getBytes(); + val schemaAndValue = jsonConverter.toConnectData("", genericRecord) SchemaAndValueSourceData(schemaAndValue, lineNumber) } From 8eadb8e8f6e1447df035a6cee5fa4ea491c79baa Mon Sep 17 00:00:00 2001 From: Iosif Nicolae Date: Thu, 1 Sep 2022 13:44:29 +0000 Subject: [PATCH 4/6] debug --- .gitignore | 2 ++ .../connect/aws/s3/formats/JsonFormatStreamReader.scala | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/.gitignore b/.gitignore index bebc272e6..4d7b45bce 100644 --- a/.gitignore +++ b/.gitignore @@ -86,3 +86,5 @@ classes/* /kafka-connect-hive/it/.project /kafka-connect-hive/it/.settings/ /kafka-connect-hive/it/bin/ + +!kafka-connect-aws-s3/target/kafka-3-1-jvm-2.13/libs/ diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala index 6fbd71b6f..ff340c234 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala @@ -54,9 +54,16 @@ class JsonFormatStreamReader(inputStreamFn: () => InputStream, bucketAndPath: Re "Invalid state reached: the file content has been consumed, no further calls to next() are possible.", ) } + println("aaaaaaaaa..") val genericRecordData = sourceLines.next(); + println("genericRecordData:") + println(genericRecordData) val genericRecord = if (genericRecordData == null) null else genericRecordData.getBytes(); + println("genericRecord:") + println(genericRecord) val schemaAndValue = jsonConverter.toConnectData("", genericRecord) + println("schemaAndValue:") + println(schemaAndValue) SchemaAndValueSourceData(schemaAndValue, lineNumber) } From 92b9145b7cb219995db8519abc2916b0e6a5f887 Mon Sep 17 00:00:00 2001 From: Iosif Nicolae Date: Thu, 1 Sep 2022 14:30:24 +0000 Subject: [PATCH 5/6] configure json converter --- .gitignore | 2 -- CONTRIBUTING | 11 +++++++++++ META-INF/MANIFEST.MF | 9 +++++++++ .../aws/s3/formats/JsonFormatStreamReader.scala | 9 +++++---- 4 files changed, 25 insertions(+), 6 deletions(-) create mode 100644 CONTRIBUTING create mode 100644 META-INF/MANIFEST.MF diff --git a/.gitignore b/.gitignore index 4d7b45bce..bebc272e6 100644 --- a/.gitignore +++ b/.gitignore @@ -86,5 +86,3 @@ classes/* /kafka-connect-hive/it/.project /kafka-connect-hive/it/.settings/ /kafka-connect-hive/it/bin/ - -!kafka-connect-aws-s3/target/kafka-3-1-jvm-2.13/libs/ diff --git a/CONTRIBUTING b/CONTRIBUTING new file mode 100644 index 000000000..3fb8cef09 --- /dev/null +++ b/CONTRIBUTING @@ -0,0 +1,11 @@ +If you would like to contribute code to this project you can do so through +GitHub by forking the repository and sending a pull request. + +Before Comcast merges your code into the project you must sign the [Comcast +Contributor License Agreement (CLA)](https://gist.github.com/ComcastOSS/a7b8933dd8e368535378cda25c92d19a). + +If you haven't previously signed a Comcast CLA, you'll automatically be asked +to when you open a pull request. Alternatively, we can send you a PDF that +you can sign and scan back to us. Please create a new GitHub issue to request +a PDF version of the CLA. + diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF new file mode 100644 index 000000000..c65010e70 --- /dev/null +++ b/META-INF/MANIFEST.MF @@ -0,0 +1,9 @@ +Manifest-Version: 1.0 +Specification-Title: kafka-connect-aws-s3-kafka-3-1 +Specification-Version: 1.1-SNAPSHOT +Specification-Vendor: com.celonis.kafka.connect +Implementation-Title: kafka-connect-aws-s3-kafka-3-1 +Implementation-Version: 1.1-SNAPSHOT +Implementation-Vendor: com.celonis.kafka.connect +Implementation-Vendor-Id: com.celonis.kafka.connect + diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala index ff340c234..6122e330c 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala @@ -23,6 +23,7 @@ import java.io.InputStream import scala.io.Source import scala.util.Try +import scala.jdk.CollectionConverters.MapHasAsJava import org.apache.kafka.connect.json.JsonConverter class JsonFormatStreamReader(inputStreamFn: () => InputStream, bucketAndPath: RemoteS3PathLocation) @@ -35,10 +36,10 @@ class JsonFormatStreamReader(inputStreamFn: () => InputStream, bucketAndPath: Re private val jsonConverter = new JsonConverter -// jsonConverter.configure( -// Map("schemas.enable" -> false).asJava, -// false, -// ) + jsonConverter.configure( + Map("schemas.enable" -> false).asJava, + false, + ) override def close(): Unit = { val _ = Try(source.close()) From e05dafc5bb5f241ebbd9533c39c9b6a430e3866c Mon Sep 17 00:00:00 2001 From: Iosif Nicolae Date: Thu, 1 Sep 2022 17:29:39 +0000 Subject: [PATCH 6/6] remove debug code --- .../connect/aws/s3/formats/JsonFormatStreamReader.scala | 7 ------- 1 file changed, 7 deletions(-) diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala index 6122e330c..bab9d7353 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala @@ -55,16 +55,9 @@ class JsonFormatStreamReader(inputStreamFn: () => InputStream, bucketAndPath: Re "Invalid state reached: the file content has been consumed, no further calls to next() are possible.", ) } - println("aaaaaaaaa..") val genericRecordData = sourceLines.next(); - println("genericRecordData:") - println(genericRecordData) val genericRecord = if (genericRecordData == null) null else genericRecordData.getBytes(); - println("genericRecord:") - println(genericRecord) val schemaAndValue = jsonConverter.toConnectData("", genericRecord) - println("schemaAndValue:") - println(schemaAndValue) SchemaAndValueSourceData(schemaAndValue, lineNumber) }