Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recreate AEH Processor in the event of an error before retrying the processing operation #3614

Merged
merged 12 commits into from
Dec 17, 2024
93 changes: 47 additions & 46 deletions common/component/azure/eventhubs/eventhubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,6 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config Subscr
}
topic := config.Topic

// Get the processor client
processor, err := aeh.getProcessorForTopic(subscribeCtx, topic)
if err != nil {
return fmt.Errorf("error trying to establish a connection: %w", err)
}

// This component has built-in retries because Event Hubs doesn't support N/ACK for messages
retryHandler := func(ctx context.Context, events []*azeventhubs.ReceivedEventData) ([]HandlerResponseItem, error) {
b := aeh.backOffConfig.NewBackOffWithContext(ctx)
Expand Down Expand Up @@ -282,51 +276,58 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config Subscr

subscriptionLoopFinished := make(chan bool, 1)

// Process all partition clients as they come in
subscriberLoop := func() {
for {
// This will block until a new partition client is available
// It returns nil if processor.Run terminates or if the context is canceled
partitionClient := processor.NextPartitionClient(subscribeCtx)
if partitionClient == nil {
subscriptionLoopFinished <- true
return
}
aeh.logger.Debugf("Received client for partition %s", partitionClient.PartitionID())

// Once we get a partition client, process the events in a separate goroutine
go func() {
processErr := aeh.processEvents(subscribeCtx, partitionClient, retryConfig)
// Do not log context.Canceled which happens at shutdown
if processErr != nil && !errors.Is(processErr, context.Canceled) {
aeh.logger.Errorf("Error processing events from partition client: %v", processErr)
}
}()
}
}

// Start the processor
// Start the subscribe + processor loop
go func() {
for {
go subscriberLoop()
// This is a blocking call that runs until the context is canceled
err = processor.Run(subscribeCtx)
// Exit if the context is canceled
if err != nil && errors.Is(err, context.Canceled) {
return
}
// Get the processor client
processor, err := aeh.getProcessorForTopic(subscribeCtx, topic)
if err != nil {
aeh.logger.Errorf("Error from event processor: %v", err)
aeh.logger.Errorf("error trying to establish a connection: %w", err)
} else {
aeh.logger.Debugf("Event processor terminated without error")
}
// wait for subscription loop finished signal
select {
case <-subscribeCtx.Done():
return
case <-subscriptionLoopFinished:
// noop
// Process all partition clients as they come in
subscriberLoop := func() {
for {
// This will block until a new partition client is available
// It returns nil if processor.Run terminates or if the context is canceled
partitionClient := processor.NextPartitionClient(subscribeCtx)
if partitionClient == nil {
subscriptionLoopFinished <- true
return
}
aeh.logger.Debugf("Received client for partition %s", partitionClient.PartitionID())

// Once we get a partition client, process the events in a separate goroutine
go func() {
processErr := aeh.processEvents(subscribeCtx, partitionClient, retryConfig)
// Do not log context.Canceled which happens at shutdown
if processErr != nil && !errors.Is(processErr, context.Canceled) {
aeh.logger.Errorf("Error processing events from partition client: %v", processErr)
}
}()
}
}

go subscriberLoop()
// This is a blocking call that runs until the context is canceled or a non-recoverable error is returned.
err = processor.Run(subscribeCtx)
// Exit if the context is canceled
if err != nil && errors.Is(err, context.Canceled) {
return
}
if err != nil {
aeh.logger.Errorf("Error from event processor: %v", err)
} else {
aeh.logger.Debugf("Event processor terminated without error")
}
// wait for subscription loop finished signal
select {
case <-subscribeCtx.Done():
return
case <-subscriptionLoopFinished:
// noop
}
}

// Waiting here is not strictly necessary, however, we will wait for a short time to increase the likelihood of transient errors having disappeared
select {
case <-subscribeCtx.Done():
Expand Down
Loading