Skip to content

Commit

Permalink
refac: refatored log messages for waiting handler
Browse files Browse the repository at this point in the history
  • Loading branch information
WRichter72 committed Jan 30, 2025
1 parent b9df6bb commit deb4e9d
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions internal/handler/waiting.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type (
var WaitingHandlerService WaitingHandlerInterface = new(waitingHandler)

func (waitingHandler *waitingHandler) CheckWaitingEvents() {
log.Info().Msgf("Republish messages stucked in state WAITING")
log.Info().Msgf("WaitingHandler: Republish messages stucked in state WAITING")

minMessageAge := config.Current.WaitingHandler.MinMessageAge
maxMessageAge := config.Current.WaitingHandler.MaxMessageAge
Expand All @@ -37,52 +37,52 @@ func (waitingHandler *waitingHandler) CheckWaitingEvents() {
var ctx = cache.HandlerCache.NewLockContext(context.Background())

if acquired, _ := cache.HandlerCache.TryLockWithTimeout(ctx, cache.WaitingLockKey, 10*time.Millisecond); !acquired {
log.Debug().Msgf("Could not acquire lock for WaitingHandler entry: %s", cache.WaitingLockKey)
log.Debug().Msgf("WaitingHandler: Could not acquire lock for WaitingHandler entry: %s", cache.WaitingLockKey)
return
}

defer func() {
if err := cache.HandlerCache.Unlock(ctx, cache.WaitingLockKey); err != nil {
log.Error().Err(err).Msg("Error unlocking WaitingHandler")
log.Error().Err(err).Msg("WaitingHandler: Error unlocking WaitingHandler")
}
}()

// Get all subscriptions (distinct) for messages in state WAITING from db
dbSubscriptionsForWaitingEvents, err := mongo.CurrentConnection.FindDistinctSubscriptionsForWaitingEvents(time.Now().Add(-maxMessageAge), time.Now().Add(-minMessageAge))
if err != nil {
log.Error().Err(err).Msgf("Error while fetching distinct subscriptions for events stucked in state WAITING from db")
log.Error().Err(err).Msgf("WaitingHandler: Error while fetching distinct subscriptions for events stucked in state WAITING from db")
return
}

// If no subscriptions found, return
log.Debug().Msgf("Found %d subscriptions with waiting messages: %v", len(dbSubscriptionsForWaitingEvents), dbSubscriptionsForWaitingEvents)
log.Debug().Msgf("WaitingHandler: Found %d subscriptions with waiting messages: %v", len(dbSubscriptionsForWaitingEvents), dbSubscriptionsForWaitingEvents)
if len(dbSubscriptionsForWaitingEvents) == 0 {
return
}

// Get all republishing cache entries
republishingSubscriptionsMap, err := WaitingHandlerService.GetRepublishingSubscriptionsMap()
if err != nil {
log.Error().Err(err).Msgf("Error while fetching rebublishing cache entries for events stucked in state WAITING")
log.Error().Err(err).Msgf("WaitingHandler: Error while fetching rebublishing cache entries for events stucked in state WAITING")
return
}
log.Debug().Msgf("Found %d rebublishing entries: %v", len(republishingSubscriptionsMap), republishingSubscriptionsMap)
log.Debug().Msgf("WaitingHandler: Found %d rebublishing entries: %v", len(republishingSubscriptionsMap), republishingSubscriptionsMap)

// Get all circuit-breaker entries with status OPEN
circuitBreakerSubscriptionsMap, err := WaitingHandlerService.GetCircuitBreakerSubscriptionsMap()
if err != nil {
log.Error().Err(err).Msgf("Error while fetching circuit breaker cache entries for events stucked in state WAITING")
log.Error().Err(err).Msgf("WaitingHandler: Error while fetching circuit breaker cache entries for events stucked in state WAITING")
return
}
log.Debug().Msgf("Found %d circuitbreaker entries in state OPEN: %v", len(circuitBreakerSubscriptionsMap), circuitBreakerSubscriptionsMap)
log.Debug().Msgf("WaitingHandler: Found %d circuitbreaker entries in state OPEN: %v", len(circuitBreakerSubscriptionsMap), circuitBreakerSubscriptionsMap)

// Check if subscription is in republishing cache or in circuit breaker cache. If not create a republishing cache entry
for _, subscriptionId := range dbSubscriptionsForWaitingEvents {
log.Debug().Msgf("Checking subscription for events stucked in state WAITING. subscription: %v", subscriptionId)
log.Debug().Msgf("WaitingHandler: Checking subscription for events stucked in state WAITING. subscription: %v", subscriptionId)
_, inRepublishing := republishingSubscriptionsMap[subscriptionId]
_, inCircuitBreaker := circuitBreakerSubscriptionsMap[subscriptionId]
if !inRepublishing && !inCircuitBreaker {
log.Warn().Msgf("Subscription %v has waiting messages and no circuitbreaker entry and no republishing entry!. Creating republishing entry for events stucked in state WAITING", subscriptionId)
log.Warn().Msgf("WaitingHandler: Subscription %v has waiting messages and no circuitbreaker entry and no republishing entry!. Creating republishing entry for events stucked in state WAITING", subscriptionId)

// Create republishing cache entry for subscription with stuck waiting events
republishingCacheEntry := republish.RepublishingCacheEntry{
Expand All @@ -91,13 +91,13 @@ func (waitingHandler *waitingHandler) CheckWaitingEvents() {
PostponedUntil: time.Now(),
}
if err := cache.RepublishingCache.Set(context.Background(), subscriptionId, republishingCacheEntry); err != nil {
log.Error().Err(err).Msgf("Error while creating RepublishingCacheEntry entry for events stucked in state WAITING. subscriptionId: %s", subscriptionId)
log.Error().Err(err).Msgf("WaitingHandler: Error while creating RepublishingCacheEntry entry for events stucked in state WAITING. subscriptionId: %s", subscriptionId)
continue
}
log.Debug().Msgf("Successfully created RepublishingCacheEntry entry for for events stucked in state WAITING. subscriptionId: %s republishingEntry: %+v", subscriptionId, republishingCacheEntry)
log.Debug().Msgf("WaitingHandler: Successfully created RepublishingCacheEntry entry for for events stucked in state WAITING. subscriptionId: %s republishingEntry: %+v", subscriptionId, republishingCacheEntry)
}
}
log.Info().Msgf("Finished republishing messages stucked in state WAITING")
log.Info().Msgf("WaitingHandler: Finished republishing messages stucked in state WAITING")
}

func (waitingHandler *waitingHandler) GetCircuitBreakerSubscriptionsMap() (map[string]struct{}, error) {
Expand Down

0 comments on commit deb4e9d

Please sign in to comment.