Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JsonFormatStreamReader #878

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CONTRIBUTING
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
If you would like to contribute code to this project you can do so through
GitHub by forking the repository and sending a pull request.

Before Comcast merges your code into the project you must sign the [Comcast
Contributor License Agreement (CLA)](https://gist.github.com/ComcastOSS/a7b8933dd8e368535378cda25c92d19a).

If you haven't previously signed a Comcast CLA, you'll automatically be asked
to when you open a pull request. Alternatively, we can send you a PDF that
you can sign and scan back to us. Please create a new GitHub issue to request
a PDF version of the CLA.

6 changes: 6 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Sample docker file
FROM hseeberger/scala-sbt:8u222_1.3.5_2.13.1 as BUILD_S3_CONNECTOR_PLUGIN
WORKDIR /tmp
RUN apt-get update && apt-get install -y git
RUN git clone https://github.com/lensesio/stream-reactor.git
RUN cd /tmp/stream-reactor && sbt "project aws-s3-kafka-3-1" compile && sbt "project aws-s3-kafka-3-1" assembly
9 changes: 9 additions & 0 deletions META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Manifest-Version: 1.0
Specification-Title: kafka-connect-aws-s3-kafka-3-1
Specification-Version: 1.1-SNAPSHOT
Specification-Vendor: com.celonis.kafka.connect
Implementation-Title: kafka-connect-aws-s3-kafka-3-1
Implementation-Version: 1.1-SNAPSHOT
Implementation-Vendor: com.celonis.kafka.connect
Implementation-Vendor-Id: com.celonis.kafka.connect

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2020 Lenses.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.lenses.streamreactor.connect.aws.s3.formats

import io.lenses.streamreactor.connect.aws.s3.model.SchemaAndValueSourceData
import io.lenses.streamreactor.connect.aws.s3.model.location.RemoteS3PathLocation

import java.io.InputStream
import scala.io.Source
import scala.util.Try

import scala.jdk.CollectionConverters.MapHasAsJava
import org.apache.kafka.connect.json.JsonConverter

class JsonFormatStreamReader(inputStreamFn: () => InputStream, bucketAndPath: RemoteS3PathLocation)
extends S3FormatStreamReader[SchemaAndValueSourceData] {

private val inputStream: InputStream = inputStreamFn()
private val source = Source.fromInputStream(inputStream, "UTF-8")
protected val sourceLines = source.getLines()
protected var lineNumber: Long = -1

private val jsonConverter = new JsonConverter

jsonConverter.configure(
Map("schemas.enable" -> false).asJava,
false,
)

override def close(): Unit = {
val _ = Try(source.close())
}

override def hasNext: Boolean = sourceLines.hasNext


override def next(): SchemaAndValueSourceData = {
lineNumber += 1
if (!sourceLines.hasNext) {
throw FormatWriterException(
"Invalid state reached: the file content has been consumed, no further calls to next() are possible.",
)
}
val genericRecordData = sourceLines.next();
val genericRecord = if (genericRecordData == null) null else genericRecordData.getBytes();
val schemaAndValue = jsonConverter.toConnectData("", genericRecord)
SchemaAndValueSourceData(schemaAndValue, lineNumber)
}

override def getBucketAndPath: RemoteS3PathLocation = bucketAndPath

override def getLineNumber: Long = lineNumber
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object S3FormatStreamReader {
): S3FormatStreamReader[_ <: SourceData] =
format.format match {
case Format.Avro => new AvroFormatStreamReader(inputStreamFn, bucketAndPath)
case Format.Json => new TextFormatStreamReader(inputStreamFn, bucketAndPath)
case Format.Json => new JsonFormatStreamReader(inputStreamFn, bucketAndPath)
case Format.Text => new TextFormatStreamReader(inputStreamFn, bucketAndPath)
case Format.Parquet => new ParquetFormatStreamReader(inputStreamFn, fileSizeFn, bucketAndPath)
case Format.Csv =>
Expand Down