Skip to content

Commit

Permalink
Update eventhubs.go
Browse files Browse the repository at this point in the history
Recreate AEH Processor in the event of an error before retrying the processing operation

Signed-off-by: MattCosturos <[email protected]>
  • Loading branch information
MattCosturos authored Nov 26, 2024
1 parent 1137759 commit 03714ef
Showing 1 changed file with 30 additions and 30 deletions.
60 changes: 30 additions & 30 deletions common/component/azure/eventhubs/eventhubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,6 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config Subscr
}
topic := config.Topic

// Get the processor client
processor, err := aeh.getProcessorForTopic(subscribeCtx, topic)
if err != nil {
return fmt.Errorf("error trying to establish a connection: %w", err)
}

// This component has built-in retries because Event Hubs doesn't support N/ACK for messages
retryHandler := func(ctx context.Context, events []*azeventhubs.ReceivedEventData) ([]HandlerResponseItem, error) {
b := aeh.backOffConfig.NewBackOffWithContext(ctx)
Expand Down Expand Up @@ -277,34 +271,40 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config Subscr

subscriptionLoopFinished := make(chan bool, 1)

// Process all partition clients as they come in
subscriberLoop := func() {
// Start the subscribe + processor loop
go func() {
for {
// This will block until a new partition client is available
// It returns nil if processor.Run terminates or if the context is canceled
partitionClient := processor.NextPartitionClient(subscribeCtx)
if partitionClient == nil {
subscriptionLoopFinished <- true
return
// Get the processor client
processor, err := aeh.getProcessorForTopic(subscribeCtx, topic)
if err != nil {
return fmt.Errorf("error trying to establish a connection: %w", err)

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.jetstream conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.jetstream conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / secretstores.local.env conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / secretstores.local.env conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.redis.v6 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.redis.v6 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.redis.v7 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.redis.v7 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / crypto.localstorage conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / crypto.localstorage conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.in-memory conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.in-memory conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.rabbitmq conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.rabbitmq conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.postgresql.v2.docker conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.postgresql.v2.docker conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.mqtt3-emqx conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.mqtt3-emqx conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.rabbitmq conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.rabbitmq conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.mongodb conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.mongodb conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.memcached conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.memcached conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.mqtt3-vernemq conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.mqtt3-vernemq conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / configuration.postgresql.docker conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / configuration.postgresql.docker conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / configuration.redis.v7 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / configuration.redis.v7 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.in-memory conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.in-memory conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.redis.v6 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.redis.v6 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.cockroachdb.v1 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.cockroachdb.v1 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.kafka-wurstmeister conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.kafka-wurstmeister conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.http conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.http conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.sqlite conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.sqlite conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.etcd.v1 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.etcd.v1 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.cron conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.cron conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.mqtt3-mosquitto conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.mqtt3-mosquitto conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.mqtt3-vernemq conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.mqtt3-vernemq conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.redis.v7 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.redis.v7 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.kubemq conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.kubemq conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / crypto.jwks conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / crypto.jwks conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / lock.redis.v7 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / lock.redis.v7 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.influx conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.influx conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.oracledatabase conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.oracledatabase conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.postgresql.docker conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.postgresql.docker conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / secretstores.local.file conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / secretstores.local.file conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.cassandra conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.cassandra conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.kafka-confluent conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.kafka-confluent conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.mysql.mariadb conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.mysql.mariadb conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.mysql.mysql conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.mysql.mysql conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.kafka-wurstmeister conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.kafka-wurstmeister conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.kubemq conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.kubemq conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / secretstores.hashicorp.vault conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / secretstores.hashicorp.vault conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / secretstores.aws.secretsmanager.docker conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / secretstores.aws.secretsmanager.docker conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.etcd.v2 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.etcd.v2 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.mqtt3-emqx conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.mqtt3-emqx conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.postgresql.v1.docker conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.postgresql.v1.docker conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / lock.redis.v6 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / lock.redis.v6 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.rethinkdb conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / state.rethinkdb conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.kafka-confluent conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / bindings.kafka-confluent conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / configuration.redis.v6 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / configuration.redis.v6 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.pulsar conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.pulsar conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.redis.v6 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.redis.v6 conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.solace conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / pubsub.solace conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / secretstores.kubernetes conformance

too many return values

Check failure on line 280 in common/component/azure/eventhubs/eventhubs.go

View workflow job for this annotation

GitHub Actions / secretstores.kubernetes conformance

too many return values
}
aeh.logger.Debugf("Received client for partition %s", partitionClient.PartitionID())

// Once we get a partition client, process the events in a separate goroutine
go func() {
processErr := aeh.processEvents(subscribeCtx, partitionClient, retryConfig)
// Do not log context.Canceled which happens at shutdown
if processErr != nil && !errors.Is(processErr, context.Canceled) {
aeh.logger.Errorf("Error processing events from partition client: %v", processErr)
}
}()
}
}

// Start the processor
go func() {
for {
// Process all partition clients as they come in
subscriberLoop := func() {
for {
// This will block until a new partition client is available
// It returns nil if processor.Run terminates or if the context is canceled
partitionClient := processor.NextPartitionClient(subscribeCtx)
if partitionClient == nil {
subscriptionLoopFinished <- true
return
}
aeh.logger.Debugf("Received client for partition %s", partitionClient.PartitionID())

// Once we get a partition client, process the events in a separate goroutine
go func() {
processErr := aeh.processEvents(subscribeCtx, partitionClient, retryConfig)
// Do not log context.Canceled which happens at shutdown
if processErr != nil && !errors.Is(processErr, context.Canceled) {
aeh.logger.Errorf("Error processing events from partition client: %v", processErr)
}
}()
}
}

go subscriberLoop()
// This is a blocking call that runs until the context is canceled
// This is a blocking call that runs until the context is canceled or a non-recoverable error is returned.
err = processor.Run(subscribeCtx)
// Exit if the context is canceled
if err != nil && errors.Is(err, context.Canceled) {
Expand Down

0 comments on commit 03714ef

Please sign in to comment.