diff --git a/CONTRIBUTING b/CONTRIBUTING new file mode 100644 index 000000000..3fb8cef09 --- /dev/null +++ b/CONTRIBUTING @@ -0,0 +1,11 @@ +If you would like to contribute code to this project you can do so through +GitHub by forking the repository and sending a pull request. + +Before Comcast merges your code into the project you must sign the [Comcast +Contributor License Agreement (CLA)](https://gist.github.com/ComcastOSS/a7b8933dd8e368535378cda25c92d19a). + +If you haven't previously signed a Comcast CLA, you'll automatically be asked +to when you open a pull request. Alternatively, we can send you a PDF that +you can sign and scan back to us. Please create a new GitHub issue to request +a PDF version of the CLA. + diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..459540449 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,6 @@ +# Sample docker file +FROM hseeberger/scala-sbt:8u222_1.3.5_2.13.1 as BUILD_S3_CONNECTOR_PLUGIN +WORKDIR /tmp +RUN apt-get update && apt-get install -y git +RUN git clone https://github.com/lensesio/stream-reactor.git +RUN cd /tmp/stream-reactor && sbt "project aws-s3-kafka-3-1" compile && sbt "project aws-s3-kafka-3-1" assembly \ No newline at end of file diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF new file mode 100644 index 000000000..c65010e70 --- /dev/null +++ b/META-INF/MANIFEST.MF @@ -0,0 +1,9 @@ +Manifest-Version: 1.0 +Specification-Title: kafka-connect-aws-s3-kafka-3-1 +Specification-Version: 1.1-SNAPSHOT +Specification-Vendor: com.celonis.kafka.connect +Implementation-Title: kafka-connect-aws-s3-kafka-3-1 +Implementation-Version: 1.1-SNAPSHOT +Implementation-Vendor: com.celonis.kafka.connect +Implementation-Vendor-Id: com.celonis.kafka.connect + diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala new file mode 100644 index 000000000..bab9d7353 --- /dev/null +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/JsonFormatStreamReader.scala @@ -0,0 +1,67 @@ +/* + * Copyright 2020 Lenses.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lenses.streamreactor.connect.aws.s3.formats + +import io.lenses.streamreactor.connect.aws.s3.model.SchemaAndValueSourceData +import io.lenses.streamreactor.connect.aws.s3.model.location.RemoteS3PathLocation + +import java.io.InputStream +import scala.io.Source +import scala.util.Try + +import scala.jdk.CollectionConverters.MapHasAsJava +import org.apache.kafka.connect.json.JsonConverter + +class JsonFormatStreamReader(inputStreamFn: () => InputStream, bucketAndPath: RemoteS3PathLocation) + extends S3FormatStreamReader[SchemaAndValueSourceData] { + + private val inputStream: InputStream = inputStreamFn() + private val source = Source.fromInputStream(inputStream, "UTF-8") + protected val sourceLines = source.getLines() + protected var lineNumber: Long = -1 + + private val jsonConverter = new JsonConverter + + jsonConverter.configure( + Map("schemas.enable" -> false).asJava, + false, + ) + + override def close(): Unit = { + val _ = Try(source.close()) + } + + override def hasNext: Boolean = sourceLines.hasNext + + + override def next(): SchemaAndValueSourceData = { + lineNumber += 1 + if (!sourceLines.hasNext) { + throw FormatWriterException( + "Invalid state reached: the file content has been consumed, no further calls to next() are possible.", + ) + } + val genericRecordData = sourceLines.next(); + val genericRecord = if (genericRecordData == null) null else genericRecordData.getBytes(); + val schemaAndValue = jsonConverter.toConnectData("", genericRecord) + SchemaAndValueSourceData(schemaAndValue, lineNumber) + } + + override def getBucketAndPath: RemoteS3PathLocation = bucketAndPath + + override def getLineNumber: Long = lineNumber +} diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/S3FormatStreamReader.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/S3FormatStreamReader.scala index 021ca31e2..981eb980c 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/S3FormatStreamReader.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/formats/S3FormatStreamReader.scala @@ -34,7 +34,7 @@ object S3FormatStreamReader { ): S3FormatStreamReader[_ <: SourceData] = format.format match { case Format.Avro => new AvroFormatStreamReader(inputStreamFn, bucketAndPath) - case Format.Json => new TextFormatStreamReader(inputStreamFn, bucketAndPath) + case Format.Json => new JsonFormatStreamReader(inputStreamFn, bucketAndPath) case Format.Text => new TextFormatStreamReader(inputStreamFn, bucketAndPath) case Format.Parquet => new ParquetFormatStreamReader(inputStreamFn, fileSizeFn, bucketAndPath) case Format.Csv =>