Skip to content

Commit

Permalink
fix: update to acct for feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Samantha Coyle <[email protected]>
  • Loading branch information
sicoyle committed Nov 26, 2024
1 parent 1c2a8ce commit e801cd9
Showing 1 changed file with 21 additions and 2 deletions.
23 changes: 21 additions & 2 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,18 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
k.latestSchemaCacheTTL = meta.SchemaLatestVersionCacheTTL
}
}

clients, err := k.latestClients()
if err != nil || clients == nil {
return fmt.Errorf("failed to get latest Kafka clients for initialization: %w", err)
}
if clients.producer == nil {
return errors.New("component is closed")
}
if clients.consumerGroup == nil {
return errors.New("component is closed")
}

k.logger.Debug("Kafka message bus initialization complete")

return nil
Expand Down Expand Up @@ -284,8 +296,15 @@ func (k *Kafka) Close() error {
k.subscribeLock.Unlock()

if k.clients != nil {
errs[0] = k.clients.producer.Close()
errs[1] = k.clients.consumerGroup.Close()
if k.clients.producer != nil {
errs[0] = k.clients.producer.Close()
k.clients.producer = nil
}
if k.clients.consumerGroup != nil {
errs[1] = k.clients.consumerGroup.Close()
k.clients.consumerGroup = nil
}

}
if k.awsAuthProvider != nil {
errs[2] = k.awsAuthProvider.Close()
Expand Down

0 comments on commit e801cd9

Please sign in to comment.