Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Invalid Offsets Committed during a rebalance on shutdown #3710

Closed
7 tasks done
mattatcha opened this issue Feb 4, 2022 · 9 comments
Closed
7 tasks done

Invalid Offsets Committed during a rebalance on shutdown #3710

mattatcha opened this issue Feb 4, 2022 · 9 comments

Comments

@mattatcha
Copy link

Description

Invalid Offsets committed for a partition when the partition has been postponed while a rebalance is in progress while using the EAGER load load balancing strategy.

This is causing a single partitions offsets to be either committed out of range or committed back in time to a previous offset. this is causing our consumers to re-process millions of messages that were already processed.

We are using kafka-client-go version 1.8.2. for reference.

See details below about EAGER vs COOPERATIVE rebalance strategies but we switched to using COOPERATIVE and it happens far less than with EAGER however it still does happen daily.

How to reproduce

When a consumer in a consumer group shuts down it causes a chain of events, at times, that commits the incorrect offsets for a partition.

Checklist

Please provide the following information:

  • librdkafka version:
    v1.8.2

  • Apache Kafka version:
    2.6.1

  • librdkafka client configuration:

{
   	"client.id":                     "clearbit/kafka-go",
   	"bootstrap.servers":             "localhost:9092",
   	"enable.partition.eof":          "false",
   	"session.timeout.ms":            "45000",
   	"max.poll.interval.ms":          "86400000",
   	"heartbeat.interval.ms":         "3000",
   	"partition.assignment.strategy": "roundrobin",
   	"request.required.acks":         "-1",
   	"enable.auto.commit":            "true",
   	"auto.commit.interval.ms":       "5000",
   	"enable.auto.offset.store":      "false",
   	"auto.offset.reset":             "earliest",
   	"compression.codec":             "lz4",
   	"compression.type":              "lz4",
   	"partitioner":                   "random",
   	"batch.num.messages":            "10000",
   	"queue.buffering.max.ms":        "5",
   	"statistics.interval.ms":        "15000",
   	"security.protocol":             "ssl",
   	"ssl.ca.location":               "...",
   }
  • Operating system:
    Debian Linux (x64)>

  • Provide logs (with debug=.. as necessary) from librdkafka

  • Provide broker log excerpts

  • Critical issue

The first line shows that the correct offsets were fetched but the old versions offset (-1) is what was stored.
The stored offset 411018846 is what was committed instead of 411193513 which was the previously committed offset.

1641579469.276|FETCHDEC|clearbit/kafka-go#consumer-1| [thrd:sasl_ssl://[REDACTED]/5]: Topic x.people [5]: fetch decide: updating to version 23 (was 16) at offset 411193513 (was 411018847)
1641579469.278|REMOVE|clearbit/kafka-go#consumer-1| [thrd:main]: Removing x.people [5] from assignment (started=true, pending=false, queried=false, stored offset=411018846)
1641579469.278|RESUME|clearbit/kafka-go#consumer-1| [thrd:main]: Not resuming stopped x.people [5]: at offset 411193513 (state stopped, v25)

Full debug logs from a single consumer

1641579426.381|HEARTBEAT|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" heartbeat error response in state up (join-state steady, 1 partition(s) assigned): Broker: Group rebalance in progress
1641579426.381|REBALANCE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" is rebalancing (EAGER) in state up (join-state steady) with 1 assigned partition(s): rebalance in progress
1641579426.381|CGRPJOINSTATE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" changed join state steady -> wait-unassign-call (state up)
1641579426.381|ASSIGN|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": delegating revoke of 1 partition(s) to application on queue rd_kafka_cgrp_new: rebalance in progress
1641579426.381|PAUSE|clearbit/kafka-go#consumer-1| [thrd:main]: Pausing fetchers for 1 assigned partition(s): rebalance
1641579426.381|PAUSE|clearbit/kafka-go#consumer-1| [thrd:main]: Library pausing 1 partition(s)
1641579426.381|BARRIER|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [7]: rd_kafka_toppar_op_pause_resume:2449: new version barrier v17
1641579426.381|PAUSE|clearbit/kafka-go#consumer-1| [thrd:main]: Pause x.people [7] (v17)
1641579426.381|ASSIGNMENT|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": clearing group assignment
1641579426.381|OP|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [7] received op PAUSE (v17) in fetch-state active (opv16)
1641579426.381|PAUSE|clearbit/kafka-go#consumer-1| [thrd:main]: Pause x.people [7]: at offset 411269057 (state active, v17)
1641579426.381|CGRPOP|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" received op GET_REBALANCE_PROTOCOL in state up (join-state wait-unassign-call)
1641579426.381|CGRPOP|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" received op ASSIGN in state up (join-state wait-unassign-call)
1641579426.381|CLEARASSIGN|clearbit/kafka-go#consumer-1| [thrd:main]: Clearing current assignment of 1 partition(s)
1641579426.381|CGRPJOINSTATE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" changed join state wait-unassign-call -> wait-unassign-to-complete (state up)
1641579426.381|DUMP|clearbit/kafka-go#consumer-1| [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=0)
1641579426.381|DUMP_ALL|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579426.381|DUMP_PND|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579426.381|DUMP_QRY|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579426.381|DUMP_REM|clearbit/kafka-go#consumer-1| [thrd:main]: List with 1 partition(s):
1641579426.381|DUMP_REM|clearbit/kafka-go#consumer-1| [thrd:main]:  x.people [7] offset STORED
1641579426.381|BARRIER|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [7]: rd_kafka_toppar_op_fetch_stop:2390: new version barrier v18
1641579426.381|CONSUMER|clearbit/kafka-go#consumer-1| [thrd:main]: Stop consuming x.people [7] (v18)
1641579426.381|BARRIER|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [7]: rd_kafka_toppar_op_pause_resume:2449: new version barrier v19
1641579426.381|RESUME|clearbit/kafka-go#consumer-1| [thrd:main]: Resume x.people [7] (v19)
1641579426.381|DESP|clearbit/kafka-go#consumer-1| [thrd:main]: Removing (un)desired topic x.people [7]
1641579426.381|REMOVE|clearbit/kafka-go#consumer-1| [thrd:main]: Removing x.people [7] from assignment (started=true, pending=false, queried=false, stored offset=411269042)
1641579426.381|REMOVE|clearbit/kafka-go#consumer-1| [thrd:main]: Served 1 removed partition(s), with 1 offset(s) to commit
1641579426.381|COMMIT|clearbit/kafka-go#consumer-1| [thrd:main]: GroupCoordinator/6: Committing offsets for 1 partition(s) with generation-id 13172 in join-state wait-unassign-to-complete: unassigned partitions
1641579426.381|OFFSET|clearbit/kafka-go#consumer-1| [thrd:main]: GroupCoordinator/6: Enqueue OffsetCommitRequest(v7, 1/1 partition(s))): unassigned partitions
1641579426.381|ASSIGNMENT|clearbit/kafka-go#consumer-1| [thrd:main]: Current assignment of 0 partition(s) with 0 pending adds, 0 offset queries, 1 partitions awaiting stop and 1 offset commits in progress
1641579426.381|SEND|clearbit/kafka-go#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/6: Sent OffsetCommitRequest (v7, 164 bytes @ 0, CorrId 36501)
1641579426.381|OP|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [7] received op FETCH_STOP (v18) in fetch-state active (opv17)
1641579426.381|FETCH|clearbit/kafka-go#consumer-1| [thrd:main]: Stopping fetch for x.people [7] in state active (v18)
1641579426.381|PARTSTATE|clearbit/kafka-go#consumer-1| [thrd:main]: Partition x.people [7] changed fetch state active -> stopping
1641579426.381|STORETERM|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [7]: offset store terminating
1641579426.381|PARTSTATE|clearbit/kafka-go#consumer-1| [thrd:main]: Partition x.people [7] changed fetch state stopping -> stopped
1641579426.381|OP|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [7] received op PAUSE (v19) in fetch-state stopped (opv18)
1641579426.381|RESUME|clearbit/kafka-go#consumer-1| [thrd:main]: Not resuming stopped x.people [7]: at offset 411269057 (state stopped, v19)
1641579426.381|CGRPOP|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" received op PARTITION_LEAVE in state up (join-state wait-unassign-to-complete) for x.people [7]
1641579426.381|PARTDEL|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": delete x.people [7]
1641579426.381|STOPSERVE|clearbit/kafka-go#consumer-1| [thrd:main]: All partitions awaiting stop are now stopped: serving assignment
1641579426.381|DUMP|clearbit/kafka-go#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
1641579426.381|DUMP_ALL|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579426.381|DUMP_PND|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579426.381|DUMP_QRY|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579426.381|DUMP_REM|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579426.381|ASSIGNMENT|clearbit/kafka-go#consumer-1| [thrd:main]: Current assignment of 0 partition(s) with 0 pending adds, 0 offset queries, 0 partitions awaiting stop and 1 offset commits in progress
1641579426.384|RECV|clearbit/kafka-go#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/6: Received OffsetCommitResponse (v7, 28 bytes, CorrId 36501, rtt 2.41ms)
1641579426.384|COMMIT|clearbit/kafka-go#consumer-1| [thrd:main]: GroupCoordinator/6: OffsetCommit for 1 partition(s) in join-state wait-unassign-to-complete: unassigned partitions: returned: Success
1641579426.384|DUMP|clearbit/kafka-go#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
1641579426.384|DUMP_ALL|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579426.384|DUMP_PND|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579426.384|DUMP_QRY|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579426.384|DUMP_REM|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579426.384|ASSIGNDONE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": assignment operations done in join-state wait-unassign-to-complete (rebalance rejoin=false)
1641579426.384|UNASSIGN|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": unassign done in state up (join-state wait-unassign-to-complete)
1641579426.384|REJOIN|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": Rejoining group without an assignment: Unassignment done
1641579426.384|CGRPJOINSTATE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" changed join state wait-unassign-to-complete -> init (state up)
1641579426.384|JOIN|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": join with 1 subscribed topic(s)
1641579426.384|CGRPMETADATA|clearbit/kafka-go#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (42561ms old)
1641579426.384|JOIN|clearbit/kafka-go#consumer-1| [thrd:main]: sasl_ssl://[REDACTED]/6: Joining group "core-services.x-index.x-people-v2" with 1 subscribed topic(s) and member id "clearbit/kafka-go-6159f910-0bd5-42eb-8c43-ddfe12d7fdc7"
1641579426.384|CGRPJOINSTATE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" changed join state init -> wait-join (state up)
1641579426.384|SEND|clearbit/kafka-go#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/6: Sent JoinGroupRequest (v5, 186 bytes @ 0, CorrId 36502)
1641579426.384|BROADCAST|clearbit/kafka-go#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
1641579426.527|RECV|clearbit/kafka-go#consumer-1| [thrd:sasl_ssl://[REDACTED]/6]: sasl_ssl://[REDACTED]/6: Received FetchResponse (v11, 1806 bytes, CorrId 405153, rtt 176.84ms)
1641579426.527|FETCH|clearbit/kafka-go#consumer-1| [thrd:sasl_ssl://[REDACTED]/6]: sasl_ssl://[REDACTED]/6: Topic x.people [7] MessageSet size 1736, error "Success", MaxOffset 411269058, LSO 411269058, Ver 16/16
1641579426.527|CONSUME|clearbit/kafka-go#consumer-1| [thrd:sasl_ssl://[REDACTED]/6]: sasl_ssl://[REDACTED]/6: Enqueue 1 message(s) (2181 bytes, 1 ops) on x.people [7] fetch queue (qlen 1, v16, last_offset 411269057, 0 ctrl msgs, lz4)
1641579426.527|FETCH|clearbit/kafka-go#consumer-1| [thrd:sasl_ssl://[REDACTED]/6]: sasl_ssl://[REDACTED]/6: Topic x.people [7] in state stopped at offset 411269057 (2/100000 msgs, 2/65536 kb queued, opv 16) is not fetchable: not in active fetch state
1641579426.527|FETCHADD|clearbit/kafka-go#consumer-1| [thrd:sasl_ssl://[REDACTED]/6]: sasl_ssl://[REDACTED]/6: Removed x.people [7] from fetch list (0 entries, opv 16): not in active fetch state
1641579431.048|DESIRED|clearbit/kafka-go#consumer-1| [thrd:app]: x.people [7]: marking as DESIRED
1641579469.188|CGRPOP|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" received op SUBSCRIBE in state up (join-state wait-join)
1641579469.188|SUBSCRIBE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": subscribe to new unset subscription of 0 topics (join-state wait-join)
1641579469.188|SUBSCRIBE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": postponing subscribe until previous rebalance completes (join-state wait-join)
1641579469.262|RECV|clearbit/kafka-go#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/6: Received JoinGroupResponse (v5, 138 bytes, CorrId 36502, rtt 42877.66ms)
1641579469.262|JOINGROUP|clearbit/kafka-go#consumer-1| [thrd:main]: JoinGroup response: GenerationId 13173, Protocol roundrobin, LeaderId clearbit/kafka-go-a539d9f7-4d2d-4064-882f-af3a43724841, my MemberId clearbit/kafka-go-6159f910-0bd5-42eb-8c43-ddfe12d7fdc7, member metadata count 0: (no error)
1641579469.262|CGRPJOINSTATE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" changed join state wait-join -> wait-sync (state up)
1641579469.262|SEND|clearbit/kafka-go#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/6: Sent SyncGroupRequest (v3, 132 bytes @ 0, CorrId 36503)
1641579469.262|BROADCAST|clearbit/kafka-go#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
1641579469.274|RECV|clearbit/kafka-go#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/6: Received SyncGroupResponse (v3, 38 bytes, CorrId 36503, rtt 12.29ms)
1641579469.274|SYNCGROUP|clearbit/kafka-go#consumer-1| [thrd:main]: SyncGroup response: Success (28 bytes of MemberState data)
1641579469.274|ASSIGNMENT|clearbit/kafka-go#consumer-1| [thrd:main]: List with 1 partition(s):
1641579469.274|ASSIGNMENT|clearbit/kafka-go#consumer-1| [thrd:main]:  x.people [5] offset INVALID
1641579469.274|CGRPJOINSTATE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" changed join state wait-sync -> wait-assign-call (state up)
1641579469.274|ASSIGN|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": delegating assign of 1 partition(s) to application on queue rd_kafka_cgrp_new: new assignment
1641579469.274|ASSIGNMENT|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": setting group assignment to 1 partition(s)
1641579469.274|GRPASSIGNMENT|clearbit/kafka-go#consumer-1| [thrd:main]: List with 1 partition(s):
1641579469.274|GRPASSIGNMENT|clearbit/kafka-go#consumer-1| [thrd:main]:  x.people [5] offset INVALID
1641579469.274|HEARTBEAT|clearbit/kafka-go#consumer-1| [thrd:main]: GroupCoordinator/6: Heartbeat for group "core-services.x-index.x-people-v2" generation id 13173
1641579469.275|SEND|clearbit/kafka-go#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/6: Sent HeartbeatRequest (v3, 128 bytes @ 0, CorrId 36504)
1641579469.275|CGRPOP|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" received op GET_REBALANCE_PROTOCOL in state up (join-state wait-assign-call)
1641579469.275|CGRPOP|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" received op ASSIGN in state up (join-state wait-assign-call)
1641579469.275|ASSIGN|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": new assignment of 1 partition(s) in join-state wait-assign-call
1641579469.275|CLEARASSIGN|clearbit/kafka-go#consumer-1| [thrd:main]: No current assignment to clear
1641579469.275|ASSIGNMENT|clearbit/kafka-go#consumer-1| [thrd:main]: Added 1 partition(s) to assignment which now consists of 1 partition(s) where of 1 are in pending state and 0 are being queried
1641579469.275|PAUSE|clearbit/kafka-go#consumer-1| [thrd:main]: Resuming fetchers for 1 assigned partition(s): assign called
1641579469.275|RESUME|clearbit/kafka-go#consumer-1| [thrd:main]: Library resuming 1 partition(s)
1641579469.275|BARRIER|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [5]: rd_kafka_toppar_op_pause_resume:2449: new version barrier v20
1641579469.275|RESUME|clearbit/kafka-go#consumer-1| [thrd:main]: Resume x.people [5] (v20)
1641579469.275|CGRPJOINSTATE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" changed join state wait-assign-call -> steady (state up)
1641579469.275|DUMP|clearbit/kafka-go#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
1641579469.275|DUMP_ALL|clearbit/kafka-go#consumer-1| [thrd:main]: List with 1 partition(s):
1641579469.275|DUMP_ALL|clearbit/kafka-go#consumer-1| [thrd:main]:  x.people [5] offset STORED
1641579469.275|DUMP_PND|clearbit/kafka-go#consumer-1| [thrd:main]: List with 1 partition(s):
1641579469.275|DUMP_PND|clearbit/kafka-go#consumer-1| [thrd:main]:  x.people [5] offset STORED
1641579469.275|DUMP_QRY|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579469.275|DUMP_REM|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579469.275|SRVPEND|clearbit/kafka-go#consumer-1| [thrd:main]: Querying committed offset for pending assigned partition x.people [5]
1641579469.275|OFFSETFETCH|clearbit/kafka-go#consumer-1| [thrd:main]: Fetching committed offsets for 1 pending partition(s) in assignment
1641579469.275|OFFSET|clearbit/kafka-go#consumer-1| [thrd:main]: GroupCoordinator/6: OffsetFetchRequest(v7) for 1/1 partition(s)
1641579469.275|OFFSET|clearbit/kafka-go#consumer-1| [thrd:main]: GroupCoordinator/6: Fetch committed offsets for 1/1 partition(s)
1641579469.275|SEND|clearbit/kafka-go#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/6: Sent OffsetFetchRequest (v7, 84 bytes @ 0, CorrId 36505)
1641579469.275|ASSIGNMENT|clearbit/kafka-go#consumer-1| [thrd:main]: Current assignment of 1 partition(s) with 1 pending adds, 1 offset queries, 0 partitions awaiting stop and 0 offset commits in progress
1641579469.275|OP|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [5] received op PAUSE (v20) in fetch-state stopped (opv19)
1641579469.275|RESUME|clearbit/kafka-go#consumer-1| [thrd:main]: Not resuming x.people [5]: partition is not paused by library
1641579469.275|RECV|clearbit/kafka-go#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/6: Received HeartbeatResponse (v3, 6 bytes, CorrId 36504, rtt 0.47ms)
1641579469.275|RECV|clearbit/kafka-go#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/6: Received OffsetFetchResponse (v7, 40 bytes, CorrId 36505, rtt 0.42ms)
1641579469.275|OFFSETFETCH|clearbit/kafka-go#consumer-1| [thrd:main]: GroupCoordinator/6: OffsetFetchResponse: x.people [5] offset 411193513, metadata 0 byte(s): NO_ERROR
1641579469.275|OFFFETCH|clearbit/kafka-go#consumer-1| [thrd:main]: GroupCoordinator/6: OffsetFetch for 1/1 partition(s) (0 unstable partition(s)) returned Success
1641579469.275|OFFSETFETCH|clearbit/kafka-go#consumer-1| [thrd:main]: Adding x.people [5] back to pending list with offset 411193513
1641579469.275|DUMP|clearbit/kafka-go#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
1641579469.275|DUMP_ALL|clearbit/kafka-go#consumer-1| [thrd:main]: List with 1 partition(s):
1641579469.275|DUMP_ALL|clearbit/kafka-go#consumer-1| [thrd:main]:  x.people [5] offset STORED
1641579469.275|DUMP_PND|clearbit/kafka-go#consumer-1| [thrd:main]: List with 1 partition(s):
1641579469.275|DUMP_PND|clearbit/kafka-go#consumer-1| [thrd:main]:  x.people [5] offset 411193513
1641579469.275|DUMP_QRY|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579469.275|DUMP_REM|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579469.275|SRVPEND|clearbit/kafka-go#consumer-1| [thrd:main]: Starting pending assigned partition x.people [5] at offset 411193513
1641579469.275|BARRIER|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [5]: rd_kafka_toppar_op_pause_resume:2449: new version barrier v21
1641579469.275|RESUME|clearbit/kafka-go#consumer-1| [thrd:main]: Resume x.people [5] (v21)
1641579469.275|BARRIER|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [5]: rd_kafka_toppar_op_fetch_start:2363: new version barrier v22
1641579469.275|CONSUMER|clearbit/kafka-go#consumer-1| [thrd:main]: Start consuming x.people [5] at offset 411193513 (v22)
1641579469.275|ASSIGNDONE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": assignment operations done in join-state steady (rebalance rejoin=false)
1641579469.275|REJOIN|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": Rejoining group with 1 owned partition(s): Applying next subscription
1641579469.275|CGRPJOINSTATE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" changed join state steady -> init (state up)
1641579469.275|SUBSCRIBE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": invoking waiting postponed unsubscribe
1641579469.275|UNSUBSCRIBE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": unsubscribe from current subscription of size 1 (leave group=true, has joined=true, clearbit/kafka-go-6159f910-0bd5-42eb-8c43-ddfe12d7fdc7, join-state init)
1641579469.275|SUBSCRIPTION|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": clearing subscribed topics list (1)
1641579469.275|SUBSCRIPTION|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": effective subscription list changed from 1 to 0 topic(s):
1641579469.275|GRPLEADER|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": resetting group leader info: unsubscribe
1641579469.275|REBALANCE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2"      in state up (join-state init) with 1 assigned partition(s): unsubscribe
1641579469.275|CGRPJOINSTATE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" changed join state init -> wait-unassign-call (state up)
1641579469.275|ASSIGN|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": delegating revoke of 1 partition(s) to application on queue rd_kafka_cgrp_new: unsubscribe
1641579469.275|PAUSE|clearbit/kafka-go#consumer-1| [thrd:main]: Pausing fetchers for 1 assigned partition(s): rebalance
1641579469.275|PAUSE|clearbit/kafka-go#consumer-1| [thrd:main]: Library pausing 1 partition(s)
1641579469.275|BARRIER|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [5]: rd_kafka_toppar_op_pause_resume:2449: new version barrier v23
1641579469.275|PAUSE|clearbit/kafka-go#consumer-1| [thrd:main]: Pause x.people [5] (v23)
1641579469.275|ASSIGNMENT|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": clearing group assignment
1641579469.275|OP|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [5] received op PAUSE (v21) in fetch-state stopped (opv20)
1641579469.275|RESUME|clearbit/kafka-go#consumer-1| [thrd:main]: Not resuming x.people [5]: partition is not paused by library
1641579469.275|OP|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [5] received op FETCH_START (v22) in fetch-state stopped (opv21)
1641579469.275|FETCH|clearbit/kafka-go#consumer-1| [thrd:main]: Start fetch for x.people [5] in state stopped at offset 411193513 (v22)
1641579469.275|PARTSTATE|clearbit/kafka-go#consumer-1| [thrd:main]: Partition x.people [5] changed fetch state stopped -> active
1641579469.275|FETCH|clearbit/kafka-go#consumer-1| [thrd:main]: Partition x.people [5] start fetching at offset 411193513
1641579469.275|WAKEUP|clearbit/kafka-go#consumer-1| [thrd:main]: sasl_ssl://[REDACTED]/5: Wake-up
1641579469.275|OP|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [5] received op PAUSE (v23) in fetch-state active (opv22)
1641579469.275|PAUSE|clearbit/kafka-go#consumer-1| [thrd:main]: Pause x.people [5]: at offset 411193513 (state active, v23)
1641579469.275|CGRPOP|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" received op PARTITION_JOIN in state up (join-state wait-unassign-call) for x.people [5]
1641579469.277|PARTADD|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": add x.people [5]
1641579469.276|FETCHDEC|clearbit/kafka-go#consumer-1| [thrd:sasl_ssl://[REDACTED]/5]: Topic x.people [5]: fetch decide: updating to version 23 (was 16) at offset 411193513 (was 411018847)
1641579469.277|CGRPOP|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" received op GET_REBALANCE_PROTOCOL in state up (join-state wait-unassign-call)
1641579469.277|CGRPOP|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" received op ASSIGN in state up (join-state wait-unassign-call)
1641579469.277|CLEARASSIGN|clearbit/kafka-go#consumer-1| [thrd:main]: Clearing current assignment of 1 partition(s)
1641579469.277|CGRPJOINSTATE|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" changed join state wait-unassign-call -> wait-unassign-to-complete (state up)
1641579469.277|DUMP|clearbit/kafka-go#consumer-1| [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=0)
1641579469.277|DUMP_ALL|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579469.277|DUMP_PND|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579469.277|DUMP_QRY|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579469.277|DUMP_REM|clearbit/kafka-go#consumer-1| [thrd:main]: List with 1 partition(s):
1641579469.277|DUMP_REM|clearbit/kafka-go#consumer-1| [thrd:main]:  x.people [5] offset STORED
1641579469.277|BARRIER|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [5]: rd_kafka_toppar_op_fetch_stop:2390: new version barrier v24
1641579469.277|CONSUMER|clearbit/kafka-go#consumer-1| [thrd:main]: Stop consuming x.people [5] (v24)
1641579469.277|BARRIER|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [5]: rd_kafka_toppar_op_pause_resume:2449: new version barrier v25
1641579469.277|RESUME|clearbit/kafka-go#consumer-1| [thrd:main]: Resume x.people [5] (v25)
1641579469.278|DESP|clearbit/kafka-go#consumer-1| [thrd:main]: Removing (un)desired topic x.people [5]
1641579469.278|REMOVE|clearbit/kafka-go#consumer-1| [thrd:main]: Removing x.people [5] from assignment (started=true, pending=false, queried=false, stored offset=411018846)
1641579469.278|REMOVE|clearbit/kafka-go#consumer-1| [thrd:main]: Served 1 removed partition(s), with 1 offset(s) to commit
1641579469.278|COMMIT|clearbit/kafka-go#consumer-1| [thrd:main]: GroupCoordinator/6: Committing offsets for 1 partition(s) with generation-id 13173 in join-state wait-unassign-to-complete: unassigned partitions
1641579469.278|OFFSET|clearbit/kafka-go#consumer-1| [thrd:main]: GroupCoordinator/6: Enqueue OffsetCommitRequest(v7, 1/1 partition(s))): unassigned partitions
1641579469.278|ASSIGNMENT|clearbit/kafka-go#consumer-1| [thrd:main]: Current assignment of 0 partition(s) with 0 pending adds, 0 offset queries, 1 partitions awaiting stop and 1 offset commits in progress
1641579469.278|CLOSE|clearbit/kafka-go#consumer-1| [thrd:app]: Closing consumer
1641579469.278|CLOSE|clearbit/kafka-go#consumer-1| [thrd:app]: Waiting for close events
1641579469.278|OP|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [5] received op FETCH_STOP (v24) in fetch-state active (opv23)
1641579469.278|FETCH|clearbit/kafka-go#consumer-1| [thrd:main]: Stopping fetch for x.people [5] in state active (v24)
1641579469.278|PARTSTATE|clearbit/kafka-go#consumer-1| [thrd:main]: Partition x.people [5] changed fetch state active -> stopping
1641579469.278|STORETERM|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [5]: offset store terminating
1641579469.278|PARTSTATE|clearbit/kafka-go#consumer-1| [thrd:main]: Partition x.people [5] changed fetch state stopping -> stopped
1641579469.278|OP|clearbit/kafka-go#consumer-1| [thrd:main]: x.people [5] received op PAUSE (v25) in fetch-state stopped (opv24)
1641579469.278|RESUME|clearbit/kafka-go#consumer-1| [thrd:main]: Not resuming stopped x.people [5]: at offset 411193513 (state stopped, v25)
1641579469.278|CGRPOP|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" received op TERMINATE in state up (join-state wait-unassign-to-complete)
1641579469.278|CGRPTERM|clearbit/kafka-go#consumer-1| [thrd:main]: Terminating group "core-services.x-index.x-people-v2" in state up with 1 partition(s)
1641579469.278|CLEARASSIGN|clearbit/kafka-go#consumer-1| [thrd:main]: No current assignment to clear
1641579469.278|SEND|clearbit/kafka-go#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/6: Sent OffsetCommitRequest (v7, 164 bytes @ 0, CorrId 36506)
1641579469.278|DUMP|clearbit/kafka-go#consumer-1| [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=1)
1641579469.278|DUMP_ALL|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579469.278|DUMP_PND|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579469.278|DUMP_QRY|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579469.278|DUMP_REM|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579469.278|ASSIGNMENT|clearbit/kafka-go#consumer-1| [thrd:main]: Current assignment of 0 partition(s) with 0 pending adds, 0 offset queries, 1 partitions awaiting stop and 1 offset commits in progress
1641579469.278|CGRPTERM|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": waiting for 1 toppar(s), assignment in progress, 1 commit(s) (state up, join-state wait-unassign-to-complete) before terminating
1641579469.278|CGRPTERM|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": waiting for 1 toppar(s), assignment in progress, 1 commit(s) (state up, join-state wait-unassign-to-complete) before terminating
1641579469.278|CGRPOP|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2" received op PARTITION_LEAVE in state up (join-state wait-unassign-to-complete) for x.people [5]
1641579469.278|PARTDEL|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": delete x.people [5]
1641579469.278|CGRPTERM|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": waiting for 0 toppar(s), assignment in progress, 1 commit(s) (state up, join-state wait-unassign-to-complete) before terminating
1641579469.278|STOPSERVE|clearbit/kafka-go#consumer-1| [thrd:main]: All partitions awaiting stop are now stopped: serving assignment
1641579469.278|DUMP|clearbit/kafka-go#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
1641579469.278|DUMP_ALL|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579469.278|DUMP_PND|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579469.278|DUMP_QRY|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579469.278|DUMP_REM|clearbit/kafka-go#consumer-1| [thrd:main]: List with 0 partition(s):
1641579469.278|ASSIGNMENT|clearbit/kafka-go#consumer-1| [thrd:main]: Current assignment of 0 partition(s) with 0 pending adds, 0 offset queries, 0 partitions awaiting stop and 1 offset commits in progress
1641579469.278|CGRPTERM|clearbit/kafka-go#consumer-1| [thrd:main]: Group "core-services.x-index.x-people-v2": waiting for 0 toppar(s), assignment in progress, 1 commit(s) (state up, join-state wait-unassign-to-complete) before terminating
1641579469.281|RECV|clearbit/kafka-go#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/6: Received OffsetCommitResponse (v7, 28 bytes, CorrId 36506, rtt 3.66ms)
1641579469.281|COMMIT|clearbit/kafka-go#consumer-1| [thrd:main]: GroupCoordinator/6: OffsetCommit for 1 partition(s) in join-state wait-unassign-to-complete: unassigned partitions: returned: Success
1641579469.281|DUMP|clearbit/kafka-go#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
@deankarn
Copy link

Checking to see if there's anything more needed to help get to the bottom of this?

This is happening daily in our environment.

@edenhill
Copy link
Contributor

I believe this is the same issue as fixed by this PR:
#3774

@deankarn
Copy link

Awesome @edenhill thanks for the update :)

Is there any ETA on getting that PR merged? Asking as it’s causing serious issues in our environment.

@edenhill
Copy link
Contributor

It will be merged this week. The v1.9.0 release should be out in 2-3 weeks.

You will probably need to change your application anyway to handle the new error returned from offsets_store(), and perhaps not even attempt offsets_store() for partitions that are no longer assigned - and you could do this now as a workaround on the current version.

@deankarn
Copy link

Ah so the confluent-kafka-go lib still needs to be updated to handle this new error 👍

TY for the quick response.

@edenhill
Copy link
Contributor

The go client will not need updating, but your application will.
The root of the issue is that the application is storing offsets for partitions it does not own.
From the next release this will be rejected, but the application should pay more attention to not do processing for lost partitions.

@deankarn
Copy link

deankarn commented Mar 29, 2022

Maybe that's a separate issue issue in the confluent-kafka-go library then. Because we see it most often when pushing a new update to our application which does a rolling restart.

The confluent-kafka-go client, on Close(), handles committal of the offsets through librdkafka. It first Unsubscribes and then Unassigns here. It's during this transition that another application picks up this partition that is still yet unassigned and causing it to commit the incorrect offsets as the current and new applications are racing each other.

Good to know that with the PR that the other application will not be able to cause the issue, but curious the other application could pick the currently assigned partition up.

@pranavrth
Copy link
Member

The issue was merged and released in v1.9.0. Closing the issue. Please raise a new issue if you face any problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants