Skip to content

Spark Structured Streaming Kinesis Data Streams connector supports both GetRecords and SubscribeToShard (Enhanced Fan-Out, EFO)

License

Notifications You must be signed in to change notification settings

awslabs/spark-sql-kinesis-connector

Amazon Kinesis Data Streams Connector for Spark Structured Streaming

Implementation of Amazon Kinesis Data Streams connector in Spark Structured Streaming with support to both GetRecords and SubscribeToShard (Enhanced Fan-Out, EFO) consumer types.

Developer Setup

Clone SparkSqlKinesisConnector from the source repository on GitHub.

git clone https://github.com/awslabs/spark-sql-kinesis-connector.git
cd spark-sql-kinesis-connector

mvn clean install -DskipTests

This will create target/spark-streaming-sql-kinesis-connector_2.12-<kineisis-connector-version>-SNAPSHOT.jar file which contains the connector and its shaded dependencies. The jar file will also be installed to local maven repository.

After the jar file is installed in local Maven repository, configure your project pom.xml (use Maven as an example):

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-sql-kinesis-connector_2.12</artifactId>
            <version>${kinesis-connector-version}</version>
        </dependency>

Current version is tested with Spark 3.2 and above.

Public jar file

For easier access, there is a public jar file available at S3. For example, for version 1.0.0, the file path to jar file is s3://awslabs-code-us-east-1/spark-sql-kinesis-connector/spark-streaming-sql-kinesis-connector_2.12-1.0.0.jar.

To run with spark-submit, include the jar file as below (version 1.0.0 as an example)

--jars s3://awslabs-code-us-east-1/spark-sql-kinesis-connector/spark-streaming-sql-kinesis-connector_2.12-1.0.0.jar

The jar file can also be downloaded at https://awslabs-code-us-east-1.s3.amazonaws.com/spark-sql-kinesis-connector/spark-streaming-sql-kinesis-connector_2.12-1.0.0.jar

Change the jar file name based on version, e.g. version 1.1.0 is spark-streaming-sql-kinesis-connector_2.12-1.1.0.jar

How to use it

Code Examples

Configure Kinesis Source with GetRecords consumer type

Consume data from Kinesis using GetRecords consumer type which is default consumer type.

val kinesis = spark
              .readStream
              .format("aws-kinesis")
              .option("kinesis.region", "us-east-2")
              .option("kinesis.streamName", "teststream")
              .option("kinesis.consumerType", "GetRecords")
              .option("kinesis.endpointUrl", endpointUrl)
              .option("kinesis.startingposition", "LATEST")
              .load

Following policy definition should be added to the IAM role

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "KdsStreamSubscribeToShardPolicy",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamSummary",
                "kinesis:ListShards",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:*:<account-id>:stream/<kinesis-stream-name>",
                "arn:aws:kinesis:*:<account-id>:stream/<Kinesis-stream-name>/*"
            ]
        }
    ]
}
Configure Kinesis Source with SubscribeToShard consumer type

Consume data from Kinesis using SubscribeToShard(EFO) consumer type (Please be aware that EFO may incur extra AWS costs)

val kinesis = spark
              .readStream
              .format("aws-kinesis")
              .option("kinesis.region", "us-east-2")
              .option("kinesis.streamName", "teststream")
              .option("kinesis.consumerType", "SubscribeToShard")
              .option("kinesis.endpointUrl", endpointUrl)
              .option("kinesis.startingposition", "LATEST")
              .option("kinesis.consumerName", "TestConsumer")
              .load()

Following policy definition should be added to the IAM role

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "KdsStreamSubscribeToShardPolicy",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamSummary",
                "kinesis:ListShards",
                "kinesis:DescribeStreamConsumer",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListStreamConsumers",
                "kinesis:RegisterStreamConsumer",
                "kinesis:DeregisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:*:<account-id>:stream/<kinesis-stream-name>",
                "arn:aws:kinesis:*:<account-id>:stream/<Kinesis-stream-name>/*"
            ]
        }
    ]
}
Check Schema
kinesis.printSchema
root
|-- data: binary (nullable = true)
|-- streamName: string (nullable = true)
|-- partitionKey: string (nullable = true)
|-- sequenceNumber: string (nullable = true)
|-- approximateArrivalTimestamp: timestamp (nullable = true)
Start Query
 // Cast data into string and group by data column
 val query = kinesis
            .selectExpr("CAST(data AS STRING)").as[(String)]
            .groupBy("data").count()
            .writeStream
            .format("console")
            .outputMode("complete") 
            .start()
Gracefully Shutdown Query

Below is an example on how to ensure the query is gracefully shutdown before the streaming driver process is stopped.

// add query stop to system shutdown hook
sys.addShutdownHook {
  query.stop()
}

// wait for the signal to stop query
waitForQueryStop(query, writeToDir)

def waitForQueryStop(query: StreamingQuery, path: String): Unit = {
    val stopLockPath = new Path(path, "STOP_LOCK")
    val fileContext = FileContext.getFileContext(stopLockPath.toUri, new Configuration())

    while (query.isActive) {
      // Stop the query when "STOP_LOCK" file is found
      if (fileContext.util().exists(stopLockPath)) {
        query.stop()
        fileContext.delete(stopLockPath, false)
      }

      Thread.sleep(500)
    }
}

Note: Even gracefully shutdown is implemented, there is no guarantee of consumer deregistration success, especially in the event that an application is terminated brutally.

Using the Kinesis Sink
// Cast data into string and group by data column
kinesis
  .selectExpr("CAST(rand() AS STRING) as partitionKey","CAST(data AS STRING)").as[(String,String)]
  .groupBy("data").count()
  .writeStream
  .format("aws-kinesis")
  .outputMode("append")
  .option("kinesis.region", "us-east-1")
  .option("kinesis.streamName", "sparkSinkTest")
  .option("kinesis.endpointUrl", "https://kinesis.us-east-1.amazonaws.com")
  .option("checkpointLocation", "/path/to/checkpoint")
  .start()

Kinesis Connector Metadata storage

By default, Kinesis Connector's metadata is stored under the same HDFS/S3 folder of checkpoint location .

It is also possible to save the metadata in DynamoDB by specifying the options as below:

      .option("kinesis.metadataCommitterType", "DYNAMODB")
      .option("kinesis.dynamodb.tableName", "kinesisTestMetadata")

To use DynamoDB, following policy definition should be added to the IAM role running the job

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "KDSConnectorAccess",
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:DeleteItem",
                "dynamodb:PutItem",
                "dynamodb:GetItem",
                "dynamodb:Scan",
                "dynamodb:Query"
            ],
            "Resource": "arn:aws:dynamodb:<region>:<account-number>:table/<tablename>"
        }
    ]
}

EFO Consumer Registration/Deregistration

The Spark application need to provide a kinesis.consumerName when it is using Kinesis Enhanced Fan Out. Each application must have a unique stream consumer name. Kinesis Connector registers the stream consumer automatically when the application starts. If a consumer with the same kinesis.consumerName already exists, the connector reuses it.

The Stream consumer is deregistered when the application is shutdown gracefully with query stop() called. There is no guarantee of deregistration success, especially in the event that an application is terminated brutally. The stream consumer will be reused when the application restarts. Note that The stream consumers remain registered may incur extra AWS costs.

Avoid race conditions

  1. Speculative execution should to be disabled (by default, spark.speculation is turned off on EMR) to avoid Spark running two tasks for the same shard at the same time which will create race conditions.

  2. For the same reason, If two jobs need to read from the same Kinesis stream at the same time, the Spark application should cache the dataframe. Here is an example of caching dataframe in scala. Although batchDF.count and batchDF.write will start two jobs, batchDF.persist() ensures the application will only read from Kinesis stream once. batchDF.unpersist() releases the cache once the processing is done.

    val batchProcessor: (DataFrame, Long) => Unit = (batchDF: DataFrame, batchId: Long) => {
      val now = System.currentTimeMillis()
      val writeToDirNow = s"${writeToDir}/${now}"
      batchDF.persist()
      if (batchDF.count() > 0) {
        batchDF.write
          .format("csv")
          .mode(SaveMode.Append)
          .save(writeToDirNow)
      }
      batchDF.unpersist()
    }

    val inputDf = reader.load()
      .selectExpr("CAST(data AS STRING)")

    val query = inputDf
      .writeStream
      .queryName("KinesisDataConsumerForeachBatch")
      .foreachBatch {batchProcessor}
      .option("checkpointLocation", checkpointDir)
      .trigger(Trigger.ProcessingTime("15 seconds"))
      .start()

The same applies if you want to write the output of a streaming query to multiple locations

    val query = inputDf
      .writeStream
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.persist()
        batchDF.write.format(...).save(...)  // location 1
        batchDF.write.format(...).save(...)  // location 2
        batchDF.unpersist()
      }

Credential Provider

Kinesis Connector uses the default credentials provider chain to supply credentials that are used in your application.It looks for credentials in this order:

  1. Java System Properties - aws.accessKeyId and aws.secretAccessKey
  2. Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  3. Web Identity Token credentials from system properties or environment variables
  4. Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI
  5. Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI" environment variable is set and security manager has permission to access the variable,
  6. Instance profile credentials delivered through the Amazon EC2 metadata service

Refer to AWS SDK for Java 2.x developer Guide for details.

Cross Account Access using AssumeRole

There are scenarios where customers follow a multi-account approach resulting in Kinesis Data Streams and Spark consumer applications operating in different accounts.

The steps to access a Kinesis data stream in one account from a Spark structured streaming application in another account are:

  • Step 1 – Create AWS Identity and Access Management (IAM) role in Account A to access the Kinesis data stream with trust relationship with Account B.

Attach below policy to the role in Account A

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "KdsStreamPolicy",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamSummary",
                "kinesis:ListShards",
                "kinesis:DescribeStreamConsumer",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListStreamConsumers",
                "kinesis:RegisterStreamConsumer",
                "kinesis:DeregisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:*:<AccountA-id>:stream/<SparkConnectorTestStream>",
                "arn:aws:kinesis:*:<AccountA-id>:stream/<SparkConnectorTestStream>/*"
            ]
        }
    ]
}

Trust policy of the role in Account A

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "AccountATrust",
			"Effect": "Allow",
			"Principal": {"AWS":["arn:aws:iam::<AccountB-id>:root"]},
			"Action": "sts:AssumeRole"
		}
	]
}
  • Step 2 – Create IAM role in Account B to assume the role in Account A. This role is used to run the Spark application.

add below permission

{
  "Sid": "AssumeRoleInSourceAccount",
  "Effect": "Allow",
  "Action": "sts:AssumeRole",
  "Resource": "<RoleArnInAccountA>"
}
  • Step 3 – Configure Kinesis connector to assume the role in Account A to read Kinesis data stream in Account A as below
  .option("kinesis.stsRoleArn", "RoleArnInAccountA")
  .option("kinesis.stsSessionName", "StsSessionName")

Cross Account Access using Access Key

It's also possible to access cross account Kinesis data stream using user's AWS credentials. The user is in Kinesis account (Account A) and needs to have access permission to Kinesis as above.

  .option("kinesis.awsAccessKeyId", "awsAccessKeyId")
  .option("kinesis.awsSecretKey", "awsSecretKey")

Note: Using permanent credentials are not recommended due to security concerns.

Known Limitations

  • Speculative execution is not supported.
  • Trigger.AvailableNow is not supported.
  • Continuous Processing is not supported.

Kinesis Source Configuration

Name Default Value Description
kinesis.endpointUrl required, no default value Endpoint URL for Kinesis Stream
kinesis.region inferred value from kinesis.endpointUrl Region running the Kinesis connector
kinesis.streamName required, no default value Name of the stream
kinesis.consumerType GetRecords Consumer type. Possible values are "GetRecords", "SubscribeToShard"
kinesis.failOnDataLoss false Fail the streaming job if any active shard is missing or expired
kinesis.maxFetchRecordsPerShard 100,000 Maximum number of records to fetch per shard per microbatch
kinesis.maxFetchTimePerShardSec - Maximum time in seconds to fetch records per shard per microbatch. If kinesis.maxFetchTimePerShardSec is not explicitly defined, the decision to conclude the current task is based on the value of kinesis.maxFetchRecordsPerShard. However, if kinesis.maxFetchTimePerShardSec is defined, the current task is terminated when either kinesis.maxFetchRecordsPerShard or kinesis.maxFetchTimePerShardSec is reached first. maxFetchTimePerShardSec must be no less than 10 seconds to make sure the fetch can be progressing.
Note: If a shard is idle (no new data) for more than 10s, the task terminates even if neither maxFetchTimePerShard nor maxFetchRecordsPerShard reached.
kinesis.startingPosition LATEST Starting Position in Kinesis to fetch data from. Possible values are "LATEST", "TRIM_HORIZON", "EARLIEST" (alias for TRIM_HORIZON), or "AT_TIMESTAMP YYYY-MM-DDTHH:MM:SSZ" (e.g. 2023-08-30T19:00:05Z, 2023-08-30T19:00:05-08:00)
kinesis.describeShardInterval 1s Minimum Interval between two ListShards API calls to get latest shards. Possible values are time values such as 50s, 100ms.
kinesis.minBatchesToRetain same as spark.sql.streaming.minBatchesToRetain The minimum number of batches of kinesis metadata that must be retained and made recoverable.
kinesis.checkNewRecordThreads 8 Number of threads in Spark driver to check if there are new records in Kinesis stream.
kinesis.metadataCommitterType HDFS Where to save Kinesis connector metadata. Possible values are "HDFS", "DYNAMODB"
kinesis.metadataPath Same as checkpointLocation a path to HDFS or S3. Only valid when kinesis.metadataCommitterType is HDFS.
kinesis.metadataNumRetries 5 Maximum Number of retries for metadata requests
kinesis.metadataRetryIntervalsMs 1000 (milliseconds) Wait time before retrying metadata requests
kinesis.metadataMaxRetryIntervalMs 10000 (milliseconds) Max wait time between 2 retries of metadata requests
kinesis.clientNumRetries 5 Maximum Number of retries for Kinesis API requests
kinesis.clientRetryIntervalsMs 1000 (milliseconds) Wait time before retrying Kinesis requests
kinesis.clientMaxRetryIntervalMs 10000 (milliseconds) Max wait time between 2 retries of Kinesis requests
kinesis.consumerName Required when kinesis.consumerType is "SubscribeToShard" Kinesis stream Enhance Fan Out consumer name
kinesis.stsRoleArn - AWS STS Role ARN for Kinesis operations describe, read record, etc.
kinesis.stsSessionName - AWS STS Session name
kinesis.stsEndpointUrl - AWS STS Endpoint URL
kinesis.awsAccessKeyId - awsAccessKeyId for Kinesis operations describe, read record, etc.
kinesis.awsSecretKey - awsSecretKey for Kinesis operations describe, read record, etc.
kinesis.kinesisRegion inferred value from kinesis.endpointUrl Region the Kinesis stream belongs to
kinesis.credentialProviderClass - Custom connector credential provider implements org.apache.spark.sql.connector.kinesis.ConnectorAwsCredentialsProvider. If the implementation returns temporary credentials, it is responsible for refreshing the credentials before they expire.
kinesis.credentialProviderParam - Custom connector credential provider's input parameter
kinesis.dynamodb.tableName Required when when kinesis.metadataCommitterType is "DYNAMODB" Dynamodb tableName
kinesis.subscribeToShard.timeoutSec 60 (seconds) Timeout waiting for subscribeToShard finish
kinesis.subscribeToShard.maxRetries 10 Max retries of subscribeToShard request
kinesis.getRecords.numberOfRecordsPerFetch 10,000 Maximum Number of records to fetch per getRecords API call
kinesis.getRecords.fetchIntervalMs 200 (milliseconds) Minimum interval of two getRecords API calls

Kinesis Sink Configuration

Name Default Value Description
kinesis.endpointUrl required, no default value Endpoint URL for Kinesis Stream
kinesis.region inferred value from endpoint url Region running the Kinesis connector
kinesis.streamName required, no default value Name of the stream
kinesis.sink.flushWaitTimeMs 100 (milliseconds) Wait time while flushing records to Kinesis on Task End
kinesis.sink.recordMaxBufferedTimeMs 1000 (milliseconds) Specify the maximum buffered time of a record
kinesis.sink.maxConnections 1 Specify the maximum connections to Kinesis
kinesis.sink.aggregationEnabled True Specify if records should be aggregated before sending them to Kinesis
kinesis.sink.recordTtl 30000 (milliseconds) Records not successfully written to Kinesis within this time are dropped

Security

See CONTRIBUTING for more information.

Acknowledgement

This connector would not have been possible without reference implemetation of Kinesis Connector, Apache Flink AWS Connectors, spark-streaming-sql-s3-connector and Kinesis Client Library. Structure of some part of the code is influenced by the excellent work done by various Apache Spark Contributors.

About

Spark Structured Streaming Kinesis Data Streams connector supports both GetRecords and SubscribeToShard (Enhanced Fan-Out, EFO)

Resources

License

Code of conduct

Security policy

Stars

Watchers

Forks

Packages

No packages published