Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting NullPointerException at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:188) #221

Open
dongxiaohe opened this issue Dec 4, 2018 · 15 comments

Comments

@dongxiaohe
Copy link

dongxiaohe commented Dec 4, 2018

Hi guys,

Currently, we have a Kafka topic called maxwell which has around 10 Million records lag over 200 partitions.
It seems Kafka connect S3 sinker keeps throws NPE during the bootstrap (Running state end up to Failed state in several secs). Can someone help plz, NPE is not an useful exception to help us debug the issue.

Also, restart api seems not functional as well. when I tried to use restful api to restart the failed task by calling
curl -X POST localhost:8083/connectors/maxwell-s3-connector-test/restart. It seems it only restart one consumer worker, rest of workers has no response ....

Here is status topic data:

.... ect.s3.S3SinkTask.put(S3SinkTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)\n\t... 10 more\n","worker_id":"172.31.16.219:8083","generation":26} {"state":"UNASSIGNED","trace":null,"worker_id":"172.31.16.219:8083","generation":26} {"state":"RUNNING","trace":null,"worker_id":"172.31.16.219:8083","generation":26}

I was expecting it would start 10 consumers(workers) instead ? (task.max = 10). The workaround we have to do is to delete the kafka connector and recreate one ....

  • The error stack trace from status topic:
    {"state":"RUNNING","trace":null,"worker_id":"172.31.28.239:8083","generation":26} {"state":"RUNNING","trace":null,"worker_id":"172.31.28.239:8083","generation":26} {"state":"RUNNING","trace":null,"worker_id":"172.31.28.239:8083","generation":26} {"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NullPointerException\n\tat io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)\n\t... 10 more\n","worker_id":"172.31.28.239:8083","generation":24}

  • Kafka connect version: confluentinc/cp-kafka-connect:5.0.0

  • Kafka s3 connector config:

      "config": {
          "connector.class": "io.confluent.connect.s3.S3SinkConnector",
          "tasks.max": 10,
          "topics": "maxwell",
          "flush.size": 10000,
          "rotate.schedule.interval.ms": 1800000,
          "s3.part.size": 26214400,
          "s3.region": "us-east-1",
          "s3.bucket.name": "maxwell-connect-dev",
          "s3.credentials.provider.class": "com.amazonaws.auth.EnvironmentVariableCredentialsProvider",
          "storage.class": "io.confluent.connect.s3.storage.S3Storage",
          "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
          "format.bytearray.separator": "\n",
          "schema.compatibility": "NONE",
          "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
          "timestamp.extractor": "Wallclock",
          "partition.duration.ms": 3600000,
          "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
          "locale": "en-US",
          "timezone": "UTC"
      }
    
  • Some clarification about the config

    • "flush.size": 10000 and "rotate.schedule.interval.ms": 1800000. S3 is not a streaming service and it has request limit, so we would like to batch more records and then commit to s3.
@OneCricketeer
Copy link

Not sure the immediate issue, but would it be possible to get debugging enabled like I did here - confluentinc/kafka-connect-storage-common#91

Your stacktrace exactly aligns with that and would like to know why your config would be causing it since you didn't rename a topic like in the other post.

@OneCricketeer
Copy link

RE confluentinc/kafka-connect-storage-common#91 (comment) I think you meant to respond here


I've been filling in working example stacks here over time - https://github.com/cricket007/kafka-connect-sandbox

Minio is a suitable replacement. Doesn't need to run on AWS

@pranavoyo
Copy link

I am hitting this issue in my setup

@msilvestre
Copy link

msilvestre commented Nov 27, 2019

I'm getting this same exception but when I try to use transforms.

Without it it works just fine, but as soon I added the following to the connector configuration I got NPE.

transforms=dropPrefix
transforms.dropPrefix.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.dropPrefix.regex=.*
transforms.dropPrefix.replacement=prefix_$0

Or

transforms=dropPrefix
transforms.dropPrefix.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.dropPrefix.regex=avro_(.*)
transforms.dropPrefix.replacement=$1

Or

transforms=KeyTopic
transforms.KeyTopic.type=io.confluent.connect.transforms.ExtractTopic$Key

Exception (it's always the same for all examples above):

[2019-11-27 16:10:18,826] ERROR WorkerSinkTask{id=avro_writer-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:558)
java.lang.NullPointerException
	at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:181)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[2019-11-27 16:10:18,827] ERROR WorkerSinkTask{id=avro_writer-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:181)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
	... 10 more
[2019-11-27 16:10:18,828] ERROR WorkerSinkTask{id=avro_writer-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)

@compchi
Copy link

compchi commented Jan 2, 2020

Hi, same issue as @msilvestre ... I'm using the io.confluent.connect.s3.S3SinkConnectorand my tranform config looks like this:

"transforms": "AddSuffix",
"transforms.AddSuffix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddSuffix.regex": ".*",
"transforms.AddSuffix.replacement": "$0_bak"

@aakashnshah
Copy link

Hi @dongxiaohe, it looks like the NPE comes from the topic partition not being found - was there any topic or topic partition manipulation done?

@carlos-verdes
Copy link

@aakashnshah the RegexRouter is changing the topic name dynamically but this should work as in other connectors

@winglian
Copy link

@aakashnshah We're having a similar issue with the NPE when modifying the topic name before it goes to the S3SinkConnector. Our expectation is that the S3SinkConnector would just translate the new topic as part of the path that should be getting written too. Is it actually more nuanced than that? Is there more validation happening where it expects that topic to actually exist?

@pcfleischer
Copy link

I can confirm that we also only have this on the transform... it seems like it might be specific to s3/cloud-storage connector, the same setup for JDBCSinkConnector works for routing to different tables as this would be to route to specific folders. The stack trace is pretty awful, so i'm wondering if is actually catch bug that's obfuscating the underlying stack trace.

"transforms.ExtractTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value"
"transforms.ExtractTopic.field": "eventType"
"transforms.ExtractTopic.skip.missing.or.null": true

@pcfleischer
Copy link

pcfleischer commented Mar 24, 2021

It looks like there are some writers initialized on open based upon the context of the connector source topics, and when the records are being written there are no writers initialized based upon the records after the mapping.

I also notice they dropped the assignment model in the initialization in recent versions, perhaps the below would be a solution?

Initialization

  @Override
  public void open(Collection<TopicPartition> partitions) {
    // assignment should be empty, either because this is the initial call or because it follows
    // a call to "close".
    assignment.addAll(partitions);
    for (TopicPartition tp : assignment) {
      topicPartitionWriters.put(tp, newTopicPartitionWriter(tp));
    }
  }

Usage

  @Override
  public void put(Collection<SinkRecord> records) throws ConnectException {
    for (SinkRecord record : records) {
      String topic = record.topic();
      int partition = record.kafkaPartition();
      TopicPartition tp = new TopicPartition(topic, partition);
      // this won't handle a rename since topic partition writers map is built of sources
      topicPartitionWriters.get(tp).buffer(record);
    }
  ...
  }

Potential Solution

  @Override
  public void put(Collection<SinkRecord> records) throws ConnectException {
    for (SinkRecord record : records) {
      String topic = record.topic();
      int partition = record.kafkaPartition();
      TopicPartition tp = new TopicPartition(topic, partition);
      // will this work???
      TopicPartitionWriter writer = topicPartitionWriters.get(tp);
      if (writer == null) {
        writer = newTopicPartitionWriter(tp);
        assignment.add(tp);
        topicPartitionWriters.put(tp, writer);
      }
      writer.buffer(record);
    }
  ...
  }

@pcfleischer
Copy link

I tried this solution on local and it didn't work... and it seems for good reason as the writer uses buffered writes per topic, so splitting/routing from one to one seems possible since it's one consumer but one to multiple (our use case) seems like a more fundamental change since there would be multiple file buffers per consumer.

In our case, we're using filters with multiple connectors instead and changing the parent directory.

@mdespriee
Copy link

Same issue as @msilvestre and others here. Regex router on s3 sink makes it go NPE.

(Not sure if it's the same cause as @dongxiaohe, though. Shouldn't this issue be splitted ?)

@fmeyer
Copy link

fmeyer commented Jan 10, 2022

I've found the cause for this NPE and I'm working on a fix

Whenever we apply transformers to a topic it looses the ability to interface with the source topic/partition, thus we need to keep track of it.

here fmeyer@8448b59

@fmeyer
Copy link

fmeyer commented Jan 13, 2022

PR for the transforms issue is here #480
@pcfleischer @mdespriee let me know if you can try it on your cases.

@spider-123-eng
Copy link

Hi @fmeyer I had compiled your code in my local and copied the .jar file to my plugins directory, then i had run the connector with the 'RegexRouter' transformation , it is working as expected , i can see the topic name getting changed.

But not sure why this PR is not merged , are there any unknown issues if we merge this changes ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests