@@ -22,6 +22,7 @@ import ServiceLifecycle
22
22
/// `NIOAsyncSequenceProducerDelegate` that terminates the closes the producer when
23
23
/// `didTerminate()` is invoked.
24
24
internal struct KafkaConsumerCloseOnTerminate : Sendable {
25
+ let isMessageSequence : Bool
25
26
let stateMachine : NIOLockedValueBox < KafkaConsumer . StateMachine >
26
27
}
27
28
@@ -31,7 +32,7 @@ extension KafkaConsumerCloseOnTerminate: NIOAsyncSequenceProducerDelegate {
31
32
}
32
33
33
34
func didTerminate( ) {
34
- self . stateMachine. withLockedValue { $0. messageSequenceTerminated ( ) }
35
+ self . stateMachine. withLockedValue { $0. messageSequenceTerminated ( isMessageSequence : isMessageSequence ) }
35
36
}
36
37
}
37
38
@@ -121,6 +122,12 @@ public final class KafkaConsumer: Sendable, Service {
121
122
NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ,
122
123
KafkaConsumerCloseOnTerminate
123
124
>
125
+ typealias ProducerEvents = NIOAsyncSequenceProducer <
126
+ KafkaConsumerEvent ,
127
+ NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ,
128
+ KafkaConsumerCloseOnTerminate
129
+ >
130
+
124
131
/// The configuration object of the consumer client.
125
132
private let config : KafkaConsumerConfiguration
126
133
/// A logger.
@@ -146,7 +153,8 @@ public final class KafkaConsumer: Sendable, Service {
146
153
client: RDKafkaClient ,
147
154
stateMachine: NIOLockedValueBox < StateMachine > ,
148
155
config: KafkaConsumerConfiguration ,
149
- logger: Logger
156
+ logger: Logger ,
157
+ eventSource: ProducerEvents . Source ? = nil
150
158
) throws {
151
159
self . config = config
152
160
self . stateMachine = stateMachine
@@ -155,7 +163,7 @@ public final class KafkaConsumer: Sendable, Service {
155
163
let sourceAndSequence = NIOThrowingAsyncSequenceProducer . makeSequence (
156
164
elementType: KafkaConsumerMessage . self,
157
165
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ( ) ,
158
- delegate: KafkaConsumerCloseOnTerminate ( stateMachine: self . stateMachine)
166
+ delegate: KafkaConsumerCloseOnTerminate ( isMessageSequence : true , stateMachine: self . stateMachine)
159
167
)
160
168
161
169
self . messages = KafkaConsumerMessages (
@@ -166,7 +174,8 @@ public final class KafkaConsumer: Sendable, Service {
166
174
self . stateMachine. withLockedValue {
167
175
$0. initialize (
168
176
client: client,
169
- source: sourceAndSequence. source
177
+ source: sourceAndSequence. source,
178
+ eventSource: eventSource
170
179
)
171
180
}
172
181
@@ -242,6 +251,11 @@ public final class KafkaConsumer: Sendable, Service {
242
251
if config. enableAutoCommit == false {
243
252
subscribedEvents. append ( . offsetCommit)
244
253
}
254
+ // Don't listen to statistics even if configured
255
+ // As there are no events instantiated
256
+ // if config.statisticsInterval != .zero {
257
+ // subscribedEvents.append(.statistics)
258
+ // }
245
259
246
260
let client = try RDKafkaClient . makeClient (
247
261
type: . consumer,
@@ -250,20 +264,22 @@ public final class KafkaConsumer: Sendable, Service {
250
264
logger: logger
251
265
)
252
266
253
- let consumer = try KafkaConsumer (
254
- client: client,
255
- stateMachine: stateMachine,
256
- config: config,
257
- logger: logger
258
- )
259
-
260
267
let sourceAndSequence = NIOAsyncSequenceProducer . makeSequence (
261
268
elementType: KafkaConsumerEvent . self,
262
269
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ( ) ,
263
- delegate: KafkaConsumerCloseOnTerminate ( stateMachine: stateMachine)
270
+ delegate: KafkaConsumerCloseOnTerminate ( isMessageSequence : false , stateMachine: stateMachine)
264
271
)
265
272
266
273
let eventsSequence = KafkaConsumerEvents ( wrappedSequence: sourceAndSequence. sequence)
274
+
275
+ let consumer = try KafkaConsumer (
276
+ client: client,
277
+ stateMachine: stateMachine,
278
+ config: config,
279
+ logger: logger,
280
+ eventSource: sourceAndSequence. source
281
+ )
282
+
267
283
return ( consumer, eventsSequence)
268
284
}
269
285
@@ -321,7 +337,7 @@ public final class KafkaConsumer: Sendable, Service {
321
337
while !Task. isCancelled {
322
338
let nextAction = self . stateMachine. withLockedValue { $0. nextPollLoopAction ( ) }
323
339
switch nextAction {
324
- case . pollForAndYieldMessage( let client, let source) :
340
+ case . pollForAndYieldMessage( let client, let source, let eventSource ) :
325
341
let events = client. eventPoll ( )
326
342
for event in events {
327
343
switch event {
@@ -332,8 +348,11 @@ public final class KafkaConsumer: Sendable, Service {
332
348
_ = source. yield ( message)
333
349
case . failure( let error) :
334
350
source. finish ( )
351
+ eventSource? . finish ( )
335
352
throw error
336
353
}
354
+ case . statistics( let statistics) :
355
+ _ = eventSource? . yield ( . statistics( statistics) )
337
356
default :
338
357
break // Ignore
339
358
}
@@ -383,8 +402,9 @@ public final class KafkaConsumer: Sendable, Service {
383
402
client: client,
384
403
logger: self . logger
385
404
)
386
- case . triggerGracefulShutdownAndFinishSource( let client, let source) :
405
+ case . triggerGracefulShutdownAndFinishSource( let client, let source, let eventSource ) :
387
406
source. finish ( )
407
+ eventSource? . finish ( )
388
408
self . _triggerGracefulShutdown (
389
409
client: client,
390
410
logger: self . logger
@@ -428,17 +448,20 @@ extension KafkaConsumer {
428
448
///
429
449
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
430
450
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
451
+ /// - Parameter eventSource: ``NIOAsyncSequenceProducer/Source`` used for yielding new events.
431
452
case initializing(
432
453
client: RDKafkaClient ,
433
- source: Producer . Source
454
+ source: Producer . Source ,
455
+ eventSource: ProducerEvents . Source ?
434
456
)
435
457
/// The ``KafkaConsumer`` is consuming messages.
436
458
///
437
459
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
438
- /// - Parameter source : ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
460
+ /// - Parameter eventSource : ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
439
461
case consuming(
440
462
client: RDKafkaClient ,
441
- source: Producer . Source
463
+ source: Producer . Source ,
464
+ eventSource: ProducerEvents . Source ?
442
465
)
443
466
/// Consumer is still running but the messages asynchronous sequence was terminated.
444
467
/// All incoming messages will be dropped.
@@ -461,14 +484,16 @@ extension KafkaConsumer {
461
484
/// not yet available when the normal initialization occurs.
462
485
mutating func initialize(
463
486
client: RDKafkaClient ,
464
- source: Producer . Source
487
+ source: Producer . Source ,
488
+ eventSource: ProducerEvents . Source ?
465
489
) {
466
490
guard case . uninitialized = self . state else {
467
491
fatalError ( " \( #function) can only be invoked in state .uninitialized, but was invoked in state \( self . state) " )
468
492
}
469
493
self . state = . initializing(
470
494
client: client,
471
- source: source
495
+ source: source,
496
+ eventSource: eventSource
472
497
)
473
498
}
474
499
@@ -480,7 +505,8 @@ extension KafkaConsumer {
480
505
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
481
506
case pollForAndYieldMessage(
482
507
client: RDKafkaClient ,
483
- source: Producer . Source
508
+ source: Producer . Source ,
509
+ eventSource: ProducerEvents . Source ?
484
510
)
485
511
/// The ``KafkaConsumer`` stopped consuming messages or
486
512
/// is in the process of shutting down.
@@ -502,8 +528,8 @@ extension KafkaConsumer {
502
528
fatalError ( " \( #function) invoked while still in state \( self . state) " )
503
529
case . initializing:
504
530
fatalError ( " Subscribe to consumer group / assign to topic partition pair before reading messages " )
505
- case . consuming( let client, let source) :
506
- return . pollForAndYieldMessage( client: client, source: source)
531
+ case . consuming( let client, let source, let eventSource ) :
532
+ return . pollForAndYieldMessage( client: client, source: source, eventSource : eventSource )
507
533
case . consumptionStopped( let client) :
508
534
return . pollWithoutYield( client: client)
509
535
case . finishing( let client) :
@@ -532,10 +558,11 @@ extension KafkaConsumer {
532
558
switch self . state {
533
559
case . uninitialized:
534
560
fatalError ( " \( #function) invoked while still in state \( self . state) " )
535
- case . initializing( let client, let source) :
561
+ case . initializing( let client, let source, let eventSource ) :
536
562
self . state = . consuming(
537
563
client: client,
538
- source: source
564
+ source: source,
565
+ eventSource: eventSource
539
566
)
540
567
return . setUpConnection( client: client)
541
568
case . consuming, . consumptionStopped, . finishing, . finished:
@@ -545,16 +572,30 @@ extension KafkaConsumer {
545
572
546
573
/// The messages asynchronous sequence was terminated.
547
574
/// All incoming messages will be dropped.
548
- mutating func messageSequenceTerminated( ) {
575
+ mutating func messageSequenceTerminated( isMessageSequence : Bool ) {
549
576
switch self . state {
550
577
case . uninitialized:
551
578
fatalError ( " \( #function) invoked while still in state \( self . state) " )
552
579
case . initializing:
553
580
fatalError ( " Call to \( #function) before setUpConnection() was invoked " )
554
581
case . consumptionStopped:
555
- fatalError ( " messageSequenceTerminated() must not be invoked more than once " )
556
- case . consuming( let client, _) :
557
- self . state = . consumptionStopped( client: client)
582
+ if isMessageSequence {
583
+ fatalError ( " messageSequenceTerminated() must not be invoked more than once " )
584
+ }
585
+ case . consuming( let client, let source, let eventSource) :
586
+ // only move to stopping if messages sequence was finished
587
+ if isMessageSequence {
588
+ self . state = . consumptionStopped( client: client)
589
+ // If message sequence is being terminated, it means class deinit is called
590
+ // see `messages` field, it is last change to call finish for `eventSource`
591
+ eventSource? . finish ( )
592
+ }
593
+ else {
594
+ // Messages are still consuming, only event source was finished
595
+ // Ok, probably, noone wants to listen to events,
596
+ // though it might be very bad for rebalancing
597
+ self . state = . consuming( client: client, source: source, eventSource: nil )
598
+ }
558
599
case . finishing, . finished:
559
600
break
560
601
}
@@ -576,7 +617,7 @@ extension KafkaConsumer {
576
617
fatalError ( " Subscribe to consumer group / assign to topic partition pair before committing offsets " )
577
618
case . consumptionStopped:
578
619
fatalError ( " Cannot store offset when consumption has been stopped " )
579
- case . consuming( let client, _) :
620
+ case . consuming( let client, _, _ ) :
580
621
return . storeOffset( client: client)
581
622
case . finishing, . finished:
582
623
fatalError ( " \( #function) invoked while still in state \( self . state) " )
@@ -607,7 +648,7 @@ extension KafkaConsumer {
607
648
fatalError ( " Subscribe to consumer group / assign to topic partition pair before committing offsets " )
608
649
case . consumptionStopped:
609
650
fatalError ( " Cannot commit when consumption has been stopped " )
610
- case . consuming( let client, _) :
651
+ case . consuming( let client, _, _ ) :
611
652
return . commitSync( client: client)
612
653
case . finishing, . finished:
613
654
return . throwClosedError
@@ -628,7 +669,8 @@ extension KafkaConsumer {
628
669
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
629
670
case triggerGracefulShutdownAndFinishSource(
630
671
client: RDKafkaClient ,
631
- source: Producer . Source
672
+ source: Producer . Source ,
673
+ eventSource: ProducerEvents . Source ?
632
674
)
633
675
}
634
676
@@ -642,11 +684,12 @@ extension KafkaConsumer {
642
684
fatalError ( " \( #function) invoked while still in state \( self . state) " )
643
685
case . initializing:
644
686
fatalError ( " subscribe() / assign() should have been invoked before \( #function) " )
645
- case . consuming( let client, let source) :
687
+ case . consuming( let client, let source, let eventSource ) :
646
688
self . state = . finishing( client: client)
647
689
return . triggerGracefulShutdownAndFinishSource(
648
690
client: client,
649
- source: source
691
+ source: source,
692
+ eventSource: eventSource
650
693
)
651
694
case . consumptionStopped( let client) :
652
695
self . state = . finishing( client: client)
0 commit comments