Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NullPointerException io.zeebe.kafka.connect.source.ZeebeSourceTask.commit #64

Open
lrealdi opened this issue Nov 21, 2021 · 1 comment

Comments

@lrealdi
Copy link

lrealdi commented Nov 21, 2021

Hi
I have this issue with kafka-connect-zeebe-0.32.1-SNAPSHOT.jar

This is my docker-compose

version: "3"

services:
    zeebe:
        image: camunda/zeebe:${CAMUNDA_CLOUD_VERSION:-1.2.4}
        container_name: zeebe
        environment:
            - ZEEBE_BROKER_EXPORTERS_ELASTICSEARCH_CLASSNAME=io.camunda.zeebe.exporter.ElasticsearchExporter
            - ZEEBE_BROKER_EXPORTERS_ELASTICSEARCH_ARGS_URL=http://elasticsearch:9200
            - ZEEBE_BROKER_EXPORTERS_ELASTICSEARCH_ARGS_BULK_SIZE=1
        ports:
            - 26500:26500
        volumes:
            - zeebe:/usr/local/zeebe/data
        networks:
            - camunda-cloud
        depends_on:
            - elasticsearch

    operate:
        image: camunda/operate:${CAMUNDA_CLOUD_VERSION:-1.2.4}
        container_name: operate
        environment:
            - CAMUNDA_OPERATE_ZEEBE_GATEWAYADDRESS=zeebe:26500
            - CAMUNDA_OPERATE_ELASTICSEARCH_URL=http://elasticsearch:9200
            - CAMUNDA_OPERATE_ZEEBEELASTICSEARCH_URL=http://elasticsearch:9200
        ports:
            - 8080:8080
        networks:
            - camunda-cloud
        depends_on:
            - elasticsearch

    tasklist:
        image: camunda/tasklist:${CAMUNDA_CLOUD_VERSION:-1.2.4}
        container_name: tasklist
        environment:
            - CAMUNDA_TASKLIST_ZEEBE_GATEWAYADDRESS=zeebe:26500
            - CAMUNDA_TASKLIST_ELASTICSEARCH_URL=http://elasticsearch:9200
            - CAMUNDA_TASKLIST_ZEEBEELASTICSEARCH_URL=http://elasticsearch:9200
        ports:
            - 8081:8080
        networks:
            - camunda-cloud
        depends_on:
            - elasticsearch

    elasticsearch:
        image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_VERSION:-7.14.1}
        container_name: elasticsearch
        environment:
            - cluster.name=camunda-cloud
            - discovery.type=single-node
            - bootstrap.memory_lock=true
            - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        ulimits:
            memlock:
                soft: -1
                hard: -1
        volumes:
            - elastic:/usr/share/elasticsearch/data
        networks:
            - camunda-cloud

    zookeeper:
        image: confluentinc/cp-zookeeper:latest
        restart: unless-stopped
        hostname: zookeeper
        environment:
            ZOOKEEPER_CLIENT_PORT: 2181
            ZOOKEEPER_TICK_TIME: 2000
        ports:
            - 22181:2181

    kafka:
        image: confluentinc/cp-server:7.0.0
        hostname: kafka
        container_name: kafka
        depends_on:
            - zookeeper
        ports:
            - "9092:9092"
            - "9101:9101"
        environment:
            KAFKA_BROKER_ID: 1
            KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
            KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
            KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
            KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
            KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
            KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
            KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
            KAFKA_JMX_PORT: 9101
            KAFKA_JMX_HOSTNAME: localhost
            KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8085
            CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
            CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
            CONFLUENT_METRICS_ENABLE: 'true'
            CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'


    schema-registry:
        image: confluentinc/cp-schema-registry:7.0.0
        hostname: schema-registry
        container_name: schema-registry
        depends_on:
            - kafka
        ports:
            - "8085:8081"
        environment:
            SCHEMA_REGISTRY_HOST_NAME: schema-registry
            SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092'
            SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

    connect:
        image: cnfldemos/cp-server-connect-datagen:0.5.0-6.2.0
        hostname: connect
        container_name: connect
        depends_on:
            - kafka
            - schema-registry
        ports:
            - "8083:8083"
        environment:
            CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092'
            CONNECT_REST_ADVERTISED_HOST_NAME: connect
            CONNECT_REST_PORT: 8083
            CONNECT_GROUP_ID: compose-connect-group
            CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
            CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
            CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
            CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
            CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
            CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
            CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
            CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
            CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
            CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.0.0.jar
            CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
            CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
            CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
            CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
            CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
            CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
            CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
            CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=INFO,org.reflections=ERROR,io.zeebe.kafka.connect=TRACE,io.zeebe.client=INFO"
            CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components,/etc/kafka-connect/jars'
        volumes:
            - ./connectors:/etc/kafka-connect/jars/
            
    control-center:
        image: confluentinc/cp-enterprise-control-center:7.0.0
        hostname: control-center
        container_name: control-center
        depends_on:
            - zookeeper
            - kafka
            - schema-registry
            - connect
        ports:
            - "9021:9021"
        environment:
            CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:29092'
            CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
            CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
            CONTROL_CENTER_REPLICATION_FACTOR: 1
            CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
            CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
            CONFLUENT_METRICS_TOPIC_REPLICATION: 1
            PORT: 9021
            
volumes:
    zeebe:
        driver: local
    elastic:
        driver: local

networks:
    camunda-cloud:
        driver: bridge

When I add th ping connector, the controlcenter show:
State: FAILED task id: connect:8083
immagine

In console I see:

 [2021-11-21 11:55:36,373] ERROR WorkerSourceTask{id=ping-0} Exception thrown while calling task.commit() (org.apache.kafka.connect.runtime.WorkerSourceTask)
 java.lang.NullPointerException
 	at io.zeebe.kafka.connect.source.ZeebeSourceTask.commit(ZeebeSourceTask.java:114)
 	at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:571)
 	at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:515)
 	at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:110)
 	at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.lambda$schedule$0(SourceTaskOffsetCommitter.java:84)
 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 	at java.base/java.lang.Thread.run(Thread.java:829)

Originally posted by @lrealdi in #57 (comment)

@lrealdi
Copy link
Author

lrealdi commented Nov 21, 2021

[2021-11-21 21:19:00,955] ERROR WorkerSourceTask{id=ping-0} Exception thrown while calling task.commit() (org.apache.kafka.connect.runtime.WorkerSourceTask)
java.lang.NullPointerException
	at io.zeebe.kafka.connect.source.ZeebeSourceTask.commit(ZeebeSourceTask.java:114)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:571)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:515)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:252)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
[2021-11-21 21:19:00,959] ERROR WorkerSourceTask{id=ping-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.ExceptionInInitializerError
	at io.grpc.netty.NettyChannelBuilder.<clinit>(NettyChannelBuilder.java:82)
	at io.camunda.zeebe.client.impl.ZeebeClientImpl.buildChannel(ZeebeClientImpl.java:124)
	at io.camunda.zeebe.client.impl.ZeebeClientImpl.<init>(ZeebeClientImpl.java:80)
	at io.camunda.zeebe.client.impl.ZeebeClientBuilderImpl.build(ZeebeClientBuilderImpl.java:282)
	at io.zeebe.kafka.connect.util.ZeebeClientHelper.buildClient(ZeebeClientHelper.java:62)
	at io.zeebe.kafka.connect.source.ZeebeSourceTask.start(ZeebeSourceTask.java:52)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:225)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassCastException: class io.netty.channel.epoll.EpollSocketChannel
	at java.base/java.lang.Class.asSubclass(Class.java:3640)
	at io.grpc.netty.Utils.epollChannelType(Utils.java:318)
	at io.grpc.netty.Utils.<clinit>(Utils.java:110)
	... 14 more
[2021-11-21 21:19:00,966] WARN Could not stop task (org.apache.kafka.connect.runtime.WorkerSourceTask)
java.lang.NullPointerException
	at io.zeebe.kafka.connect.source.ZeebeSourceTask.stop(ZeebeSourceTask.java:109)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:170)
	at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:161)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:187)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant