Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry on franz consumer group session errors #154

Merged
merged 7 commits into from
Jul 12, 2024

Conversation

samirketema
Copy link
Contributor

@samirketema samirketema commented Jul 11, 2024

Description

Implements retry logic for the kafka consumer on group join and leader errors.

Quick checks:

  • I have followed the Code Guidelines.
  • There is no other pull request for the same update/change.
  • I have written unit tests.
  • I have made sure that the PR is of reasonable size and can be easily reviewed.

Manual Test

Steps to manually test:

  1. Spin up a kafka cluster using this docker compose:
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka1:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://:9092
      KAFKA_ADVERTISED_LISTENERS: INSIDE://<your-local-ip>:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
    depends_on:
      - zookeeper
  kafka2:
    image: wurstmeister/kafka
    ports:
      - "9093:9093"
    environment:
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://:9093
      KAFKA_ADVERTISED_LISTENERS: INSIDE://<your-local-ip>:9093
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
    depends_on:
      - zookeeper
  kafka3:
    image: wurstmeister/kafka
    ports:
      - "9094:9094"
    environment:
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://:9094
      KAFKA_ADVERTISED_LISTENERS: INSIDE://<your-local-ip>:9094
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
    depends_on:
      - zookeeper

  1. Run the following pipelines against the kafka cluster:
version: "2.2"
pipelines:
  - id: gen-kafka
    status: running
    connectors:
      - id: gen.in
        type: source
        plugin: builtin:generator
        settings:
          format.type: "structured"
          format.options.id: "int"
          format.options.name: "string"
          format.options.company: "string"
          format.options.trial: "bool"
          rate: 0.5
          recordCount: "1000"
      - id: kafka.out
        type: destination
        plugin: standalone:kafka
        settings:
          batchBytes: "10485760"
          sdk.batch.delay: 1s
          sdk.batch.size: "50000"
          servers: <your-local-ip>:9092,<your-local-ip>:9093,<your-local-ip>:9094
          topic: test
  - id: kafka-log
    status: running
    connectors:  
      - id: kafka.in
        type: source
        plugin: standalone:kafka
        settings:
          readFromBeginning: "true"
          servers: <your-local-ip>:9092,<your-local-ip>:9093,<your-local-ip>:9094
          topic: test
        processors:
          - id: unwrapopencdc
            type: builtin:unwrap.opencdc
            plugin: ""
            condition: ""
            settings:
              field: .Payload.After
      - id: log.out
        type: destination
        plugin: builtin:log
  1. Simulate a node going down by pausing or stopping one of the kafka node containers:

Before

Note that the pipeline errors out and stops, preventing a recovery from the issue:

2024-07-10T20:49:31+00:00 ERR pipeline stopped error="node kafka-log:kafka.in stopped with error: source stream was stopped unexpectedly: error reading from source: read plugin error: failed getting a record: unable to join group session: unable to dial: dial tcp 127.0.0.1:9092: connect: connection refused" component=pipeline.Service pipeline_id=kafka-log stack=[{"file":"/Users/samir/go/src/conduit/pkg/pipeline/lifecycle.go","func":"github.com/conduitio/conduit/pkg/pipeline.(*Service).runPipeline.func2","line":582},{"file":"/Users/samir/go/src/conduit/pkg/pipeline/stream/source.go","func":"github.com/conduitio/conduit/pkg/pipeline/stream.(*SourceNode).Run","line":140},{"file":"/Users/samir/go/src/conduit/pkg/pipeline/stream/source.go","func":"github.com/conduitio/conduit/pkg/pipeline/stream.(*SourceNode).Run.func1","line":87}]

After

Note that the errors are logged, but are continually retried with ErrBackoffRetry. This way, it's not silent, but we still can show the error.

After the broker comes back up, we can see that the connector continues processing records.

2024-07-10T20:08:17+00:00 ERR group session error, retrying: unable to join group session: unable to dial: dial tcp 127.0.0.1:9093: connect: connection refused component=plugin.standalone connector_id=kafka-log:kafka.in plugin_name=conduit-connector-kafka
2024-07-10T20:08:17+00:00 ERR join and sync loop errored component=plugin.standalone connector_id=kafka-log:kafka.in group=backoff plugin_name=conduit-connector-kafka
2024-07-10T20:08:17+00:00 WRN read from broker errored, killing connection after 0 successful responses (is SASL missing?) component=plugin.standalone connector_id=kafka-log:kafka.in plugin_name=conduit-connector-kafka req=err
2024-07-10T20:08:17+00:00 WRN read from broker errored, killing connection after 0 successful responses (is SASL missing?) component=plugin.standalone connector_id=gen-kafka:kafka.out plugin_name=conduit-connector-kafka req=err
2024-07-10T20:08:17+00:00 WRN read from broker errored, killing connection after 0 successful responses (is SASL missing?) component=plugin.standalone connector_id=kafka-log:kafka.in plugin_name=conduit-connector-kafka req=err
2024-07-10T20:08:18+00:00 WRN unable to open connection to broker addr=err component=plugin.standalone connector_id=kafka-log:kafka.in plugin_name=conduit-connector-kafka
2024-07-10T20:08:18+00:00 WRN read from broker errored, killing connection after 0 successful responses (is SASL missing?) component=plugin.standalone connector_id=gen-kafka:kafka.out plugin_name=conduit-connector-kafka req=err
2024-07-10T20:08:18+00:00 WRN produce partition load error, bumping error count on first stored batch broker=will_fail component=plugin.standalone connector_id=gen-kafka:kafka.out plugin_name=conduit-connector-kafka
2024-07-10T20:08:19+00:00 WRN read from broker errored, killing connection after 0 successful responses (is SASL missing?) component=plugin.standalone connector_id=kafka-log:kafka.in plugin_name=conduit-connector-kafka req=err
2024-07-10T20:08:19+00:00 WRN read from broker errored, killing connection after 0 successful responses (is SASL missing?) component=plugin.standalone connector_id=kafka-log:kafka.in plugin_name=conduit-connector-kafka req=err
2024-07-10T20:08:20+00:00 WRN read from broker errored, killing connection after 0 successful responses (is SASL missing?) component=plugin.standalone connector_id=kafka-log:kafka.in plugin_name=conduit-connector-kafka req=err
2024-07-10T20:08:21+00:00 WRN produce partition load error, bumping error count on first stored batch broker=will_fail component=plugin.standalone connector_id=gen-kafka:kafka.out plugin_name=conduit-connector-kafka
2024-07-10T20:08:21+00:00 WRN read from broker errored, killing connection after 0 successful responses (is SASL missing?) component=plugin.standalone connector_id=kafka-log:kafka.in plugin_name=conduit-connector-kafka req=err
2024-07-10T20:08:26+00:00 INF component=plugin connector_id=kafka-log:log.out plugin_name=builtin:log plugin_type=destination record={"key":"c2FjY2hhcmlmeQ==","metadata":{"conduit.source.connector.id":"gen-kafka:gen.in","opencdc.readAt":"1720667249647356000"},"operation":"create","payload":{"after":{"company":"redipper","id":1592616014876622300,"name":"oenanthaldehyde","trial":false},"before":null},"position":"eyJHcm91cElEIjoiOWIyNzFkN2UtYTBmNC00MzlhLTk4YWMtN2YyMjk2NjIxOTdhIiwiVG9waWMiOiJzYW1pci10ZXN0LTIiLCJQYXJ0aXRpb24iOjAsIk9mZnNldCI6MH0="}

@samirketema samirketema requested a review from a team as a code owner July 11, 2024 03:47
@samirketema samirketema requested a review from lovromazgon July 11, 2024 04:23
source.go Outdated Show resolved Hide resolved
source.go Outdated Show resolved Hide resolved
README.md Outdated Show resolved Hide resolved
common/config.go Outdated Show resolved Hide resolved
@samirketema
Copy link
Contributor Author

samirketema commented Jul 11, 2024

@lovromazgon ok, feedback has been addressed - thank you 🙇🏾

edit - also, I re-tested with these changes, and still works as expected 🙏🏾

Copy link
Member

@lovromazgon lovromazgon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏

source/franz.go Outdated Show resolved Hide resolved
source/franz.go Outdated Show resolved Hide resolved
@samirketema samirketema merged commit 3eb3b3a into main Jul 12, 2024
4 checks passed
@samirketema samirketema deleted the samir/retry-consumer-group-session-error branch July 12, 2024 16:03
samirketema added a commit that referenced this pull request Jul 12, 2024
* retry on franz consumer group session errors

* address feedback

* Update README.md

Co-authored-by: Lovro Mažgon <[email protected]>

* remove unused retry leader error config

* remove retryLeaderErrors from tests

* Update source/franz.go

Co-authored-by: Lovro Mažgon <[email protected]>

* used -typed flag in franz client mock

---------

Co-authored-by: Lovro Mažgon <[email protected]>
(cherry picked from commit 3eb3b3a)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants