diff --git a/internal/circuitbreaker/circuitbreaker.go b/internal/circuitbreaker/circuitbreaker.go index 40d5aa5..b1d609e 100644 --- a/internal/circuitbreaker/circuitbreaker.go +++ b/internal/circuitbreaker/circuitbreaker.go @@ -41,18 +41,18 @@ func HandleOpenCircuitBreaker(cbMessage message.CircuitBreakerMessage, subscript log.Debug().Msgf("Could not acquire lock for HealthCheckCacheEntry, skipping entry for subscriptionId %s", hcData.HealthCheckKey) return } - log.Debug().Msgf("Successfully locked HealthCheckCacheEntry with key %s", hcData.HealthCheckKey) // Ensure that the lock is released when the function is ended defer func() { - if hcData.IsAcquired == true { - if err := cache.HealthCheckCache.Unlock(hcData.Ctx, hcData.HealthCheckKey); err != nil { - log.Error().Err(err).Msgf("Error unlocking HealthCheckCacheEntry with key %s", hcData.HealthCheckKey) - } - log.Debug().Msgf("Successfully unlocked HealthCheckCacheEntry with key %s", hcData.HealthCheckKey) + if err := cache.HealthCheckCache.Unlock(hcData.Ctx, hcData.HealthCheckKey); err != nil { + log.Error().Err(err).Msgf("Error unlocking HealthCheckCacheEntry with key %s", hcData.HealthCheckKey) } + + log.Debug().Msgf("Successfully unlocked HealthCheckCacheEntry with key %s", hcData.HealthCheckKey) }() + log.Debug().Msgf("Successfully locked HealthCheckCacheEntry with key %s", hcData.HealthCheckKey) + err = forceDeleteRepublishingEntry(cbMessage, hcData) if err != nil { log.Error().Err(err).Msgf("Error while deleting Republishing cache entry for subscriptionId %s", cbMessage.SubscriptionId) diff --git a/internal/config/config.go b/internal/config/config.go index 1e89cd6..98606fb 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -50,7 +50,7 @@ func setDefaults() { viper.SetDefault("healthCheck.successfulResponseCodes", []int{200, 201, 202, 204}) viper.SetDefault("healthCheck.coolDownTime", "30s") viper.SetDefault("republishing.checkInterval", "30s") - viper.SetDefault("republishing.batchSize", 10) + viper.SetDefault("republishing.batchSize", 100) viper.SetDefault("republishing.throttlingIntervalTime", "1s") viper.SetDefault("republishing.deliveringStatesOffset", "70m") diff --git a/internal/handler/delivering.go b/internal/handler/delivering.go index 8bf925e..89dac9f 100644 --- a/internal/handler/delivering.go +++ b/internal/handler/delivering.go @@ -7,11 +7,11 @@ package handler import ( "context" "github.com/rs/zerolog/log" - "github.com/telekom/pubsub-horizon-go/tracing" "pubsub-horizon-golaris/internal/cache" "pubsub-horizon-golaris/internal/config" "pubsub-horizon-golaris/internal/kafka" "pubsub-horizon-golaris/internal/mongo" + "pubsub-horizon-golaris/internal/republish" "time" ) @@ -31,64 +31,42 @@ func CheckDeliveringEvents() { } }() - batchSize := config.Current.Republishing.BatchSize - upperThresholdTimestamp := time.Now().Add(-config.Current.Republishing.DeliveringStatesOffset) picker, err := kafka.NewPicker() if err != nil { log.Error().Err(err).Msg("Could not initialize picker for handling events in state DELIVERING") + return } defer picker.Close() + var cursor any for { - var lastCursor any - dbMessages, lastCursor, err := mongo.CurrentConnection.FindDeliveringMessagesByDeliveryType(upperThresholdTimestamp, lastCursor) + dbMessages, c, err := mongo.CurrentConnection.FindDeliveringMessagesByDeliveryType(upperThresholdTimestamp, cursor) + cursor = c + if err != nil { - log.Error().Err(err).Msgf("Error while fetching DELIVERING messages from MongoDb") + log.Error().Err(err).Msgf("Error while fetching DELIVERING messages from database") + return } if len(dbMessages) == 0 { return } - log.Debug().Msgf("Found %d DELIVERING messages in MongoDb", len(dbMessages)) - for _, dbMessage := range dbMessages { + log.Debug().Msgf("Found %d DELIVERING messages in database", len(dbMessages)) - if dbMessage.Coordinates == nil { - log.Warn().Msgf("Coordinates in message for subscriptionId %s are nil: %v", dbMessage.SubscriptionId, dbMessage) - return - } + for _, dbMessage := range dbMessages { + if err := republish.RepublishEvent(picker, &dbMessage, nil); err != nil { + log.Error().Err(err).Msgf("Error while republishing message for subscriptionId %s", dbMessage.SubscriptionId) - message, err := picker.Pick(&dbMessage) - if err != nil { - log.Error().Err(err).Msgf("Error while fetching message from kafka for subscriptionId %s", dbMessage.SubscriptionId) - return + continue } - var b3Ctx = tracing.WithB3FromMessage(context.Background(), message) - var traceCtx = tracing.NewTraceContext(b3Ctx, "golaris", config.Current.Tracing.DebugEnabled) - - traceCtx.StartSpan("republish delivering message") - traceCtx.SetAttribute("component", "Horizon Golaris") - traceCtx.SetAttribute("eventId", dbMessage.Event.Id) - traceCtx.SetAttribute("eventType", dbMessage.Event.Type) - traceCtx.SetAttribute("subscriptionId", dbMessage.SubscriptionId) - traceCtx.SetAttribute("uuid", string(message.Key)) - - err = kafka.CurrentHandler.RepublishMessage(traceCtx, message, "", "", false) - if err != nil { - log.Error().Err(err).Msgf("Error while republishing message for subscriptionId %s", dbMessage.SubscriptionId) - return - } log.Debug().Msgf("Successfully republished message in state DELIVERING for subscriptionId %s", dbMessage.SubscriptionId) } - - if len(dbMessages) < int(batchSize) { - break - } } } diff --git a/internal/handler/delivering_test.go b/internal/handler/delivering_test.go index df3fb87..b8f2f0b 100644 --- a/internal/handler/delivering_test.go +++ b/internal/handler/delivering_test.go @@ -8,7 +8,9 @@ import ( "context" "github.com/IBM/sarama" "github.com/stretchr/testify/mock" + "github.com/telekom/pubsub-horizon-go/enum" "github.com/telekom/pubsub-horizon-go/message" + "github.com/telekom/pubsub-horizon-go/resource" "pubsub-horizon-golaris/internal/cache" "pubsub-horizon-golaris/internal/config" "pubsub-horizon-golaris/internal/kafka" @@ -18,104 +20,266 @@ import ( "time" ) -func TestCheckDeliveringEvents_Success(t *testing.T) { - mockMongo := new(test.MockMongoHandler) - mongo.CurrentConnection = mockMongo - - mockKafka := new(test.MockKafkaHandler) - kafka.CurrentHandler = mockKafka - - mockPicker := new(test.MockPicker) - test.InjectMockPicker(mockPicker) +// mockStep represents a single paging result returned by FindDeliveringMessagesByDeliveryType. +type mockStep struct { + OutMessages []message.StatusMessage + OutNextCursor any + OutError error +} - deliveringHandler := new(test.DeliveringMockHandler) - cache.DeliveringHandler = deliveringHandler +// deliveringTestCase holds the data for table-driven tests of CheckDeliveringEvents. +type deliveringTestCase struct { + name string + mongoSteps []mockStep + dbMessages []message.StatusMessage + kafkaMessages []sarama.ConsumerMessage + republishErrors []error +} - deliveringHandler.On("NewLockContext", mock.Anything).Return(context.Background()) - deliveringHandler.On("TryLockWithTimeout", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) - deliveringHandler.On("Unlock", mock.Anything, mock.Anything).Return(nil) +// intPtr and int64Ptr are helper functions for pointer fields in message coordinates. +func intPtr(i int32) *int32 { return &i } +func int64Ptr(i int64) *int64 { return &i } +func TestCheckDeliveringEvents_TableDriven(t *testing.T) { + // Configure global parameters for the republishing logic. config.Current.Republishing.BatchSize = 5 config.Current.Republishing.DeliveringStatesOffset = 30 * time.Minute - partitionValue1 := int32(1) - offsetValue1 := int64(100) - partitionValue2 := int32(1) - offsetValue2 := int64(101) - - dbMessages := []message.StatusMessage{ + // Define test cases in a table-driven style. + testCases := []deliveringTestCase{ { - Topic: "test-topic", - Status: "DELIVERING", - SubscriptionId: "sub123", - DeliveryType: "callback", - Coordinates: &message.Coordinates{ - Partition: &partitionValue1, - Offset: &offsetValue1, - }}, + name: "No events -> no republishing", + // Only one step returning an empty slice of messages. + mongoSteps: []mockStep{ + { + OutMessages: []message.StatusMessage{}, + OutNextCursor: nil, + OutError: nil, + }, + }, + dbMessages: []message.StatusMessage{}, + kafkaMessages: []sarama.ConsumerMessage{}, + republishErrors: []error{}, + }, { - Topic: "test-topic", - Status: "DELIVERING", - SubscriptionId: "sub123", - DeliveryType: "callback", - Coordinates: &message.Coordinates{ - Partition: &partitionValue2, - Offset: &offsetValue2, - }}, - } - - mockMongo.On("FindDeliveringMessagesByDeliveryType", mock.Anything, mock.Anything).Return(dbMessages, nil, nil) - - expectedKafkaMessage := &sarama.ConsumerMessage{ - Topic: "test-topic", - Partition: 0, - Offset: 100, - Key: []byte("test-key"), - Value: []byte(`{"uuid": "12345", "event": {"id": "67890"}}`), + name: "Multi-page fetch -> all republished successfully", + /* + * Three steps total: + * 1) Two messages + * 2) Two messages + * 3) Zero messages -> loop exits + */ + mongoSteps: []mockStep{ + { + OutMessages: []message.StatusMessage{ + { + Topic: "test-topic", + Status: "DELIVERING", + SubscriptionId: "sub123", + DeliveryType: "callback", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(100), + }, + }, + { + Topic: "test-topic", + Status: "DELIVERING", + SubscriptionId: "sub123", + DeliveryType: "callback", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(101), + }, + }, + }, + OutNextCursor: "cursor_page_1", + OutError: nil, + }, + { + OutMessages: []message.StatusMessage{ + { + Topic: "test-topic", + Status: "DELIVERING", + SubscriptionId: "sub123", + DeliveryType: "callback", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(102), + }, + }, + { + Topic: "test-topic", + Status: "DELIVERING", + SubscriptionId: "sub123", + DeliveryType: "callback", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(103), + }, + }, + }, + OutNextCursor: nil, + OutError: nil, + }, + { + // Zero messages -> loop exits + OutMessages: []message.StatusMessage{}, + OutNextCursor: nil, + OutError: nil, + }, + }, + dbMessages: []message.StatusMessage{ + { + Topic: "test-topic", + Status: "DELIVERING", + SubscriptionId: "sub123", + DeliveryType: "callback", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(100), + }, + }, + { + Topic: "test-topic", + Status: "DELIVERING", + SubscriptionId: "sub123", + DeliveryType: "callback", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(101), + }, + }, + { + Topic: "test-topic", + Status: "DELIVERING", + SubscriptionId: "sub123", + DeliveryType: "callback", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(102), + }, + }, + { + Topic: "test-topic", + Status: "DELIVERING", + SubscriptionId: "sub123", + DeliveryType: "callback", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(103), + }, + }, + }, + kafkaMessages: []sarama.ConsumerMessage{ + {Topic: "test-topic", Partition: 1, Offset: 100, Value: []byte("test-content-1")}, + {Topic: "test-topic", Partition: 1, Offset: 101, Value: []byte("test-content-2")}, + {Topic: "test-topic", Partition: 1, Offset: 102, Value: []byte("test-content-3")}, + {Topic: "test-topic", Partition: 1, Offset: 103, Value: []byte("test-content-4")}, + }, + republishErrors: []error{nil, nil, nil, nil}, + }, + // Additional scenarios can be added here as needed. } - mockPicker.On("Pick", mock.AnythingOfType("*message.StatusMessage")).Return(expectedKafkaMessage, nil) - mockKafka.On("RepublishMessage", expectedKafkaMessage, "", "").Return(nil) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Sets up all mocks. + mockMongo := new(test.MockMongoHandler) + mongo.CurrentConnection = mockMongo - CheckDeliveringEvents() + mockKafka := new(test.MockKafkaHandler) + kafka.CurrentHandler = mockKafka - mockMongo.AssertExpectations(t) - mockMongo.AssertCalled(t, "FindDeliveringMessagesByDeliveryType", mock.Anything, mock.Anything) + mockPicker := new(test.MockPicker) + test.InjectMockPicker(mockPicker) - mockKafka.AssertExpectations(t) - mockKafka.AssertCalled(t, "RepublishMessage", expectedKafkaMessage, "", "") - mockPicker.AssertCalled(t, "Pick", mock.AnythingOfType("*message.StatusMessage")) -} + mockCache := new(test.SubscriptionMockCache) + cache.SubscriptionCache = mockCache -func TestCheckDeliveringEvents_NoEvents(t *testing.T) { - mockMongo := new(test.MockMongoHandler) - mongo.CurrentConnection = mockMongo + deliveringHandler := new(test.DeliveringMockHandler) + cache.DeliveringHandler = deliveringHandler - mockKafka := new(test.MockKafkaHandler) - kafka.CurrentHandler = mockKafka + // Mocks for lock mechanism. + deliveringHandler. + On("NewLockContext", mock.Anything). + Return(context.Background()). + Once() + deliveringHandler. + On("TryLockWithTimeout", mock.Anything, cache.DeliveringLockKey, mock.Anything). + Return(true, nil). + Once() + deliveringHandler. + On("Unlock", mock.Anything, cache.DeliveringLockKey). + Return(nil). + Once() - deliveringHandler := new(test.DeliveringMockHandler) - cache.DeliveringHandler = deliveringHandler + // Mocks for multi-page Mongo fetch. + for _, step := range tc.mongoSteps { + mockMongo. + On("FindDeliveringMessagesByDeliveryType", + mock.Anything, // time.Time + mock.Anything, // lastCursor + ). + Return(step.OutMessages, step.OutNextCursor, step.OutError). + Once() + } - mockPicker := new(test.MockPicker) - test.InjectMockPicker(mockPicker) + // Picker and Kafka mocks are only configured if dbMessages are present. + for i, dbMsg := range tc.dbMessages { + mockPicker. + On("Pick", &dbMsg). + Return(&tc.kafkaMessages[i], nil). + Once() - deliveringHandler.On("NewLockContext", mock.Anything).Return(context.Background()) - deliveringHandler.On("TryLockWithTimeout", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) - deliveringHandler.On("Unlock", mock.Anything, mock.Anything).Return(nil) + errVal := tc.republishErrors[i] + mockKafka. + On("RepublishMessage", + mock.Anything, + &tc.kafkaMessages[i], + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + false, + ). + Return(errVal).Once() + } - config.Current.Republishing.BatchSize = 5 - config.Current.Republishing.DeliveringStatesOffset = 30 * time.Minute + // Subscription cache mock if relevant for the scenario. + if len(tc.dbMessages) > 0 { + subscription := &resource.SubscriptionResource{ + Spec: struct { + Subscription resource.Subscription `json:"subscription"` + Environment string `json:"environment"` + }{ + Subscription: resource.Subscription{ + SubscriptionId: "sub123", + DeliveryType: enum.DeliveryTypeCallback, + Callback: "http://test-callback.com", + }, + }, + } + mockCache. + On("Get", config.Current.Hazelcast.Caches.SubscriptionCache, "sub123"). + Return(subscription, nil). + Maybe() + } - mockMongo.On("FindDeliveringMessagesByDeliveryType", mock.Anything, mock.Anything).Return([]message.StatusMessage{}, nil, nil) + // Calls the function under test. + CheckDeliveringEvents() - CheckDeliveringEvents() + // Asserts that all expected mock calls were executed. + mockMongo.AssertExpectations(t) + mockPicker.AssertExpectations(t) + mockKafka.AssertExpectations(t) + mockCache.AssertExpectations(t) + deliveringHandler.AssertExpectations(t) - mockKafka.AssertNotCalled(t, "RepublishMessage", mock.Anything, "", "") - mockPicker.AssertNotCalled(t, "Pick", mock.AnythingOfType("*message.StatusMessage")) - - mockMongo.AssertExpectations(t) - mockMongo.AssertCalled(t, "FindDeliveringMessagesByDeliveryType", mock.Anything, mock.Anything) - - mockKafka.AssertExpectations(t) + // Additional verification can be performed based on scenario requirements. + // For "no events," asserts that neither RepublishMessage nor Pick was called. + if len(tc.dbMessages) == 0 { + mockPicker.AssertNotCalled(t, "Pick", mock.Anything) + mockKafka.AssertNotCalled(t, "RepublishMessage", mock.Anything) + } + }) + } } diff --git a/internal/handler/failed.go b/internal/handler/failed.go index 0cb2468..09a1b7b 100644 --- a/internal/handler/failed.go +++ b/internal/handler/failed.go @@ -7,12 +7,11 @@ package handler import ( "context" "github.com/rs/zerolog/log" - "github.com/telekom/pubsub-horizon-go/message" - "github.com/telekom/pubsub-horizon-go/tracing" "pubsub-horizon-golaris/internal/cache" "pubsub-horizon-golaris/internal/config" "pubsub-horizon-golaris/internal/kafka" "pubsub-horizon-golaris/internal/mongo" + "pubsub-horizon-golaris/internal/republish" "time" ) @@ -32,23 +31,22 @@ func CheckFailedEvents() { } }() - batchSize := config.Current.Republishing.BatchSize - - var dbMessages []message.StatusMessage - var err error - picker, err := kafka.NewPicker() if err != nil { log.Error().Err(err).Msg("Could not initialize picker for handling events in state FAILED") + return } defer picker.Close() + var cursor any for { - var lastCursor any - dbMessages, _, err = mongo.CurrentConnection.FindFailedMessagesWithCallbackUrlNotFoundException(time.Now(), lastCursor) + + dbMessages, c, err := mongo.CurrentConnection.FindFailedMessagesWithCallbackUrlNotFoundException(time.Now(), cursor) + cursor = c + if err != nil { - log.Error().Err(err).Msgf("Error while fetching FAILED messages from MongoDb") + log.Error().Err(err).Msgf("Error while fetching messages for subscription from database") return } @@ -56,6 +54,8 @@ func CheckFailedEvents() { return } + log.Debug().Msgf("Found %d FAILED messages in database", len(dbMessages)) + for _, dbMessage := range dbMessages { subscriptionId := dbMessage.SubscriptionId @@ -65,44 +65,15 @@ func CheckFailedEvents() { return } - if subscription != nil { - if subscription.Spec.Subscription.DeliveryType == "sse" || subscription.Spec.Subscription.DeliveryType == "server_sent_event" { - var newDeliveryType = "SERVER_SENT_EVENT" - - if dbMessage.Coordinates == nil { - log.Warn().Msgf("Coordinates in message for subscriptionId %s are nil: %v", dbMessage.SubscriptionId, dbMessage) - return - } - - kafkaMessage, err := picker.Pick(&dbMessage) - if err != nil { - log.Error().Err(err).Msgf("Error while fetching message from kafka for subscriptionId %s", subscriptionId) - return - } - - var b3Ctx = tracing.WithB3FromMessage(context.Background(), kafkaMessage) - var traceCtx = tracing.NewTraceContext(b3Ctx, "golaris", config.Current.Tracing.DebugEnabled) - - traceCtx.StartSpan("republish failed message") - traceCtx.SetAttribute("component", "Horizon Golaris") - traceCtx.SetAttribute("eventId", dbMessage.Event.Id) - traceCtx.SetAttribute("eventType", dbMessage.Event.Type) - traceCtx.SetAttribute("subscriptionId", dbMessage.SubscriptionId) - traceCtx.SetAttribute("uuid", string(kafkaMessage.Key)) - - err = kafka.CurrentHandler.RepublishMessage(traceCtx, kafkaMessage, newDeliveryType, "", true) - if err != nil { - log.Error().Err(err).Msgf("Error while republishing message for subscriptionId %s", subscriptionId) - return - } - log.Debug().Msgf("Successfully republished message in state FAILED for subscriptionId %s", subscriptionId) + if subscription != nil && (subscription.Spec.Subscription.DeliveryType == "sse" || subscription.Spec.Subscription.DeliveryType == "server_sent_event") { + if err := republish.RepublishEvent(picker, &dbMessage, subscription); err != nil { + log.Error().Err(err).Msgf("Error while republishing message for subscriptionId %s", dbMessage.SubscriptionId) + continue } - } - } - if len(dbMessages) < int(batchSize) { - break + log.Debug().Msgf("Successfully republished message in state FAILED for subscriptionId %s", dbMessage.SubscriptionId) + } } } } diff --git a/internal/handler/failed_test.go b/internal/handler/failed_test.go index 3c0e852..2aa50d1 100644 --- a/internal/handler/failed_test.go +++ b/internal/handler/failed_test.go @@ -19,78 +19,190 @@ import ( "testing" ) -func TestCheckFailedEvents(t *testing.T) { - mockMongo := new(test.MockMongoHandler) - mongo.CurrentConnection = mockMongo - - mockKafka := new(test.MockKafkaHandler) - kafka.CurrentHandler = mockKafka - - mockCache := new(test.SubscriptionMockCache) - cache.SubscriptionCache = mockCache - - failedHandler := new(test.FailedMockHandler) - cache.FailedHandler = failedHandler - - mockPicker := new(test.MockPicker) - test.InjectMockPicker(mockPicker) - - failedHandler.On("NewLockContext", mock.Anything).Return(context.Background()) - failedHandler.On("TryLockWithTimeout", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) - failedHandler.On("Unlock", mock.Anything, mock.Anything).Return(nil) +// failedTestCase holds the data for table-driven tests of CheckFailedEvents. +type failedTestCase struct { + name string + mongoSteps []mockStep + dbMessages []message.StatusMessage + kafkaMessages []sarama.ConsumerMessage + republishErrors []error + subscription *resource.SubscriptionResource +} +func TestCheckFailedEvents_TableDriven(t *testing.T) { + // Global republishing settings. config.Current.Republishing.BatchSize = 5 - partitionValue := int32(1) - offsetValue := int64(100) - - dbMessage := []message.StatusMessage{ + // Table of test cases. + testCases := []failedTestCase{ { - Topic: "test-topic", - Status: "FAILED", - SubscriptionId: "sub123", - DeliveryType: enum.DeliveryTypeSse, - Coordinates: &message.Coordinates{ - Partition: &partitionValue, - Offset: &offsetValue, - }}, - } - - subscription := &resource.SubscriptionResource{ - Spec: struct { - Subscription resource.Subscription `json:"subscription"` - Environment string `json:"environment"` - }{ - Subscription: resource.Subscription{ - SubscriptionId: "sub123", - DeliveryType: enum.DeliveryTypeSse, + name: "No events -> no republish", + // Only one step returning an empty slice to simulate no messages. + mongoSteps: []mockStep{ + { + OutMessages: []message.StatusMessage{}, + OutNextCursor: nil, + OutError: nil, + }, }, + // No messages, so no Kafka or subscription data needed. + dbMessages: []message.StatusMessage{}, + kafkaMessages: []sarama.ConsumerMessage{}, + republishErrors: []error{}, + subscription: nil, // No subscription needed for zero messages }, + { + name: "One FAILED SSE event -> republish", + // Two steps: first returns one message, second returns an empty slice. + mongoSteps: []mockStep{ + { + OutMessages: []message.StatusMessage{ + { + Topic: "test-topic", + Status: "FAILED", + SubscriptionId: "sub123", + DeliveryType: enum.DeliveryTypeCallback, + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(100), + }, + }, + }, + OutNextCursor: nil, + OutError: nil, + }, + { + // Second step returns zero messages, causing the loop to exit. + OutMessages: []message.StatusMessage{}, + OutNextCursor: nil, + OutError: nil, + }, + }, + dbMessages: []message.StatusMessage{ + { + Topic: "test-topic", + Status: "FAILED", + SubscriptionId: "sub123", + DeliveryType: enum.DeliveryTypeCallback, + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(100), + }, + }, + }, + kafkaMessages: []sarama.ConsumerMessage{ + { + Topic: "test-topic", + Partition: 1, + Offset: 100, + Key: []byte("test-key"), + Value: []byte(`{"uuid": "12345", "event": {"id": "67890"}}`), + }, + }, + republishErrors: []error{nil}, + subscription: &resource.SubscriptionResource{ + Spec: struct { + Subscription resource.Subscription `json:"subscription"` + Environment string `json:"environment"` + }{ + Subscription: resource.Subscription{ + SubscriptionId: "sub123", + DeliveryType: enum.DeliveryTypeSse, + }, + }, + }, + }, + // Additional scenarios can be added here if desired. } - mockMongo.On("FindFailedMessagesWithCallbackUrlNotFoundException", mock.Anything, mock.Anything).Return(dbMessage, nil, nil) - mockCache.On("Get", config.Current.Hazelcast.Caches.SubscriptionCache, "sub123").Return(subscription, nil) - - expectedKafkaMessage := &sarama.ConsumerMessage{ - Topic: "test-topic", - Partition: 1, - Offset: 100, - Key: []byte("test-key"), - Value: []byte(`{"uuid": "12345", "event": {"id": "67890"}}`), + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Mocks for locking. + failedHandler := new(test.FailedMockHandler) + cache.FailedHandler = failedHandler + + failedHandler. + On("NewLockContext", mock.Anything). + Return(context.Background()). + Once() + failedHandler. + On("TryLockWithTimeout", mock.Anything, cache.FailedLockKey, mock.Anything). + Return(true, nil). + Once() + failedHandler. + On("Unlock", mock.Anything, cache.FailedLockKey). + Return(nil). + Once() + + // Mongo mock to simulate multiple paging steps. + mockMongo := new(test.MockMongoHandler) + mongo.CurrentConnection = mockMongo + + for _, step := range tc.mongoSteps { + mockMongo. + On("FindFailedMessagesWithCallbackUrlNotFoundException", + mock.Anything, // time.Time + mock.Anything, // cursor + ). + Return(step.OutMessages, step.OutNextCursor, step.OutError). + Once() + } + + // Subscription cache mock for SSE subscriptions. + mockCache := new(test.SubscriptionMockCache) + cache.SubscriptionCache = mockCache + + if tc.subscription != nil && len(tc.dbMessages) > 0 { + subId := tc.subscription.Spec.Subscription.SubscriptionId + mockCache. + On("Get", config.Current.Hazelcast.Caches.SubscriptionCache, subId). + Return(tc.subscription, nil). + Maybe() + } + + // Picker mock and Kafka mock for republishing messages. + mockPicker := new(test.MockPicker) + test.InjectMockPicker(mockPicker) + + mockKafka := new(test.MockKafkaHandler) + kafka.CurrentHandler = mockKafka + + for i, dbMsg := range tc.dbMessages { + mockPicker. + On("Pick", &dbMsg). + Return(&tc.kafkaMessages[i], nil). + Once() + + errVal := tc.republishErrors[i] + // SSE is republished as "SERVER_SENT_EVENT" with an empty callback URL + // according to the original unit test expectations. Adjust if needed. + mockKafka. + On("RepublishMessage", + mock.Anything, + &tc.kafkaMessages[i], + "SERVER_SENT_EVENT", + "", + false, + ). + Return(errVal). + Once() + } + + // Calls the function under test. + CheckFailedEvents() + + // Assertions to ensure all mocks were triggered as expected. + failedHandler.AssertExpectations(t) + mockMongo.AssertExpectations(t) + mockPicker.AssertExpectations(t) + mockKafka.AssertExpectations(t) + mockCache.AssertExpectations(t) + + // If no messages were given, verifies that none of the Picker/Kafka methods were called. + if len(tc.dbMessages) == 0 { + mockPicker.AssertNotCalled(t, "Pick", mock.Anything) + mockKafka.AssertNotCalled(t, "RepublishMessage", mock.Anything) + } + }) } - - mockPicker.On("Pick", mock.AnythingOfType("*message.StatusMessage")).Return(expectedKafkaMessage, nil) - mockKafka.On("RepublishMessage", mock.Anything, "SERVER_SENT_EVENT", "").Return(nil) - - CheckFailedEvents() - - mockMongo.AssertExpectations(t) - mockMongo.AssertCalled(t, "FindFailedMessagesWithCallbackUrlNotFoundException", mock.Anything, mock.Anything) - - mockCache.AssertExpectations(t) - mockCache.AssertCalled(t, "Get", config.Current.Hazelcast.Caches.SubscriptionCache, "sub123") - - mockKafka.AssertExpectations(t) - mockKafka.AssertCalled(t, "RepublishMessage", expectedKafkaMessage, "SERVER_SENT_EVENT", "") - mockPicker.AssertCalled(t, "Pick", mock.AnythingOfType("*message.StatusMessage")) } diff --git a/internal/healthcheck/healthcheck.go b/internal/healthcheck/healthcheck.go index b82af6b..fac3b67 100644 --- a/internal/healthcheck/healthcheck.go +++ b/internal/healthcheck/healthcheck.go @@ -94,6 +94,7 @@ func CheckConsumerHealth(hcData *PreparedHealthCheckData, subscription *resource updateHealthCheckEntry(hcData.Ctx, hcData.HealthCheckKey, hcData.HealthCheckEntry, 0) return err } + log.Debug().Msgf("Received response for callback-url %s with http-status: %v", hcData.HealthCheckEntry.CallbackUrl, resp.StatusCode) hcData.HealthCheckEntry.LastCheckedStatus = resp.StatusCode diff --git a/internal/kafka/handler.go b/internal/kafka/handler.go index c8164d2..73a7f6b 100644 --- a/internal/kafka/handler.go +++ b/internal/kafka/handler.go @@ -60,7 +60,7 @@ func (kafkaHandler Handler) RepublishMessage(traceCtx *tracing.TraceContext, mes } if errorParams == true { - optionalMetadataMessage, err := updateMetaData(message) + optionalMetadataMessage, err := resetMessageErrorFields(message) if err != nil { log.Error().Err(err).Msg("Could not update message metadata") return err @@ -155,7 +155,7 @@ func updateMessage(message *sarama.ConsumerMessage, newDeliveryType string, newC return msg, nil } -func updateMetaData(message *sarama.ConsumerMessage) (*sarama.ProducerMessage, error) { +func resetMessageErrorFields(message *sarama.ConsumerMessage) (*sarama.ProducerMessage, error) { var messageValue map[string]any if err := json.Unmarshal(message.Value, &messageValue); err != nil { log.Error().Err(err).Msg("Could not unmarshal message value") diff --git a/internal/kafka/picker.go b/internal/kafka/picker.go index e11480a..f9e6166 100644 --- a/internal/kafka/picker.go +++ b/internal/kafka/picker.go @@ -5,9 +5,11 @@ package kafka import ( + "errors" "github.com/IBM/sarama" "github.com/rs/zerolog/log" "github.com/telekom/pubsub-horizon-go/message" + "net" "pubsub-horizon-golaris/internal/config" ) @@ -37,16 +39,37 @@ func (p *Picker) Close() { func (p *Picker) Pick(status *message.StatusMessage) (*sarama.ConsumerMessage, error) { var partition, offset = *status.Coordinates.Partition, *status.Coordinates.Offset - partConsumer, err := p.consumer.ConsumePartition(status.Topic, partition, offset) - if err != nil { - return nil, err - } + partConsumer, err := p.consumer.ConsumePartition(status.Topic, partition, offset) defer func() { if err := partConsumer.Close(); err != nil { log.Error().Err(err).Msg("Could not close picker gracefully") } }() + if err != nil { + var nErr *net.OpError + if errors.As(err, &nErr) { + return nil, err + } + var errorList = []error{ + sarama.ErrEligibleLeadersNotAvailable, + sarama.ErrPreferredLeaderNotAvailable, + sarama.ErrUnknownLeaderEpoch, + sarama.ErrFencedLeaderEpoch, + sarama.ErrNotLeaderForPartition, + sarama.ErrLeaderNotAvailable, + } + for _, e := range errorList { + if errors.Is(err, e) { + return nil, err + } + } + + log.Warn().Err(err).Msgf("Could not fetch message from kafka for subscriptionId %s", status.SubscriptionId) + + return nil, nil + } + return <-partConsumer.Messages(), nil } diff --git a/internal/republish/republish.go b/internal/republish/republish.go index f58d284..2b8e19c 100644 --- a/internal/republish/republish.go +++ b/internal/republish/republish.go @@ -7,39 +7,26 @@ package republish import ( "context" "encoding/gob" - "errors" - "github.com/1pkg/gohalt" - "github.com/IBM/sarama" "github.com/rs/zerolog/log" "github.com/telekom/pubsub-horizon-go/message" "github.com/telekom/pubsub-horizon-go/resource" "github.com/telekom/pubsub-horizon-go/tracing" - "net" "pubsub-horizon-golaris/internal/cache" "pubsub-horizon-golaris/internal/config" "pubsub-horizon-golaris/internal/kafka" "pubsub-horizon-golaris/internal/mongo" + "pubsub-horizon-golaris/internal/throttling" "strings" "time" ) -var republishPendingEventsFunc = RepublishPendingEvents -var throttler gohalt.Throttler +var republishPendingEventsFunc = republishPendingEvents // register the data type RepublishingCacheEntry to gob for encoding and decoding of binary data func init() { gob.Register(RepublishingCacheEntry{}) } -func createThrottler(redeliveriesPerSecond int, deliveryType string) gohalt.Throttler { - if deliveryType == "sse" || deliveryType == "server_sent_event" || redeliveriesPerSecond <= 0 { - return gohalt.NewThrottlerEcho(nil) - } - - log.Info().Msgf("Creating throttler with %d redeliveries", redeliveriesPerSecond) - return gohalt.NewThrottlerTimed(uint64(redeliveriesPerSecond), config.Current.Republishing.ThrottlingIntervalTime, 0) -} - // HandleRepublishingEntry manages the republishing process for a given subscription. // The function takes a SubscriptionResource object as a parameter. func HandleRepublishingEntry(subscription *resource.SubscriptionResource) { @@ -80,19 +67,17 @@ func HandleRepublishingEntry(subscription *resource.SubscriptionResource) { // Ensure that the lock is released if acquired before when the function is ended defer func() { - if acquired { - err = Unlock(ctx, subscriptionId) - if err != nil { - log.Debug().Msgf("Failed to unlock RepublishingCacheEntry with subscriptionId %s and error %v", subscriptionId, err) - } - log.Debug().Msgf("Successfully unlocked RepublishingCacheEntry with subscriptionId %s", subscriptionId) + err = Unlock(ctx, subscriptionId) + if err != nil { + log.Debug().Msgf("Failed to unlock RepublishingCacheEntry with subscriptionId %s and error %v", subscriptionId, err) } + log.Debug().Msgf("Successfully unlocked RepublishingCacheEntry with subscriptionId %s", subscriptionId) }() err = republishPendingEventsFunc(subscription, castedRepublishCacheEntry) if err != nil { log.Error().Err(err).Msgf("Error while republishing pending events for subscriptionId %s. Discarding rebublishing cache entry", subscriptionId) - return + return // republishingEntry from the cache won't be deleted } err = cache.RepublishingCache.Delete(ctx, subscriptionId) @@ -104,10 +89,59 @@ func HandleRepublishingEntry(subscription *resource.SubscriptionResource) { log.Debug().Msgf("Successfully processed RepublishingCacheEntry with subscriptionId %s", subscriptionId) } -// RepublishPendingEvents handles the republishing of pending events for a given subscription. -// The function fetches waiting events from the database and republishes them to Kafka. -// The function takes a subscriptionId as a parameter. -func RepublishPendingEvents(subscription *resource.SubscriptionResource, republishEntry RepublishingCacheEntry) error { +// RepublishEvent handles the republishing of a single event for a given subscription, using a picker instance. +// The function tries to pick the event message from Kafka using the given picker instance and republishes the event +// with an updated callbackUrl and deliveryType +// The function takes three parameters: +// - picker: a kafka picker instance +// - dbMessage: a reference to the status entry from the database +// - subscription: an optional reference to the subscription resource, which will be used to check for callbackUrl and deliveryType changes (can be nil) +func RepublishEvent(picker kafka.MessagePicker, dbMessage *message.StatusMessage, subscription *resource.SubscriptionResource) error { + subscriptionId := dbMessage.SubscriptionId + + if dbMessage.Coordinates == nil { + log.Warn().Msgf("Coordinates in message for subscriptionId %s are nil: %v", dbMessage.SubscriptionId, dbMessage) + + return nil + } + + kafkaMessage, err := picker.Pick(dbMessage) + if err != nil { + return err + } + + var b3Ctx = tracing.WithB3FromMessage(context.Background(), kafkaMessage) + var traceCtx = tracing.NewTraceContext(b3Ctx, "golaris", config.Current.Tracing.DebugEnabled) + + traceCtx.StartSpan("republish message") + defer traceCtx.EndCurrentSpan() + + traceCtx.SetAttribute("component", "Horizon Golaris") + traceCtx.SetAttribute("eventId", dbMessage.Event.Id) + traceCtx.SetAttribute("eventType", dbMessage.Event.Type) + traceCtx.SetAttribute("subscriptionId", dbMessage.SubscriptionId) + traceCtx.SetAttribute("uuid", dbMessage.Uuid) + + newDeliveryType, newCallbackUrl := checkForSubscriptionUpdates(dbMessage, subscription) + + if err := kafka.CurrentHandler.RepublishMessage(traceCtx, kafkaMessage, newDeliveryType, newCallbackUrl, false); err != nil { + log.Warn().Err(err).Msgf("Error while republishing message for subscriptionId %s: %v", subscriptionId, err) + traceCtx.CurrentSpan().RecordError(err) + + return err + } + + log.Debug().Msgf("Successfully republished message for subscriptionId %s", subscriptionId) + + return nil +} + +// republishPendingEvents handles the republishing of pending events for a given subscription. +// The function fetches waiting events from the database and calls RepublishEvent for each entry. +// The function takes two parameters: +// - subscription: a reference to the subscription resource +// - republishEntry: an entry of the republishing cache +func republishPendingEvents(subscription *resource.SubscriptionResource, republishEntry RepublishingCacheEntry) error { var subscriptionId = subscription.Spec.Subscription.SubscriptionId log.Info().Msgf("Republishing pending events for subscription %s", subscriptionId) @@ -123,135 +157,41 @@ func RepublishPendingEvents(subscription *resource.SubscriptionResource, republi } defer picker.Close() - batchSize := config.Current.Republishing.BatchSize - - throttler = createThrottler(subscription.Spec.Subscription.RedeliveriesPerSecond, string(subscription.Spec.Subscription.DeliveryType)) - defer throttler.Release(context.Background()) + throttler := throttling.NewSubscriptionAwareThrottler(subscription) + defer throttler.Release() cache.SetCancelStatus(subscriptionId, false) - var lastCursor any + var cursor any for { if cache.GetCancelStatus(subscriptionId) { log.Info().Msgf("Republishing for subscription %s has been cancelled", subscriptionId) - return nil } - var dbMessages []message.StatusMessage - var err error - - if republishEntry.OldDeliveryType == "sse" || republishEntry.OldDeliveryType == "server_sent_event" { - dbMessages, lastCursor, err = mongo.CurrentConnection.FindProcessedMessagesByDeliveryTypeSSE(time.Now(), lastCursor, subscriptionId) - if err != nil { - log.Error().Err(err).Msgf("Error while fetching PROCESSED messages for subscription %s from db", subscriptionId) - } - log.Debug().Msgf("Found %d PROCESSED messages in MongoDb", len(dbMessages)) - } else { - dbMessages, lastCursor, err = mongo.CurrentConnection.FindWaitingMessages(time.Now(), lastCursor, subscriptionId) - if err != nil { - log.Error().Err(err).Msgf("Error while fetching messages for subscription %s from db", subscriptionId) - } - log.Debug().Msgf("Found %d WAITING messages in MongoDb", len(dbMessages)) - } - - log.Debug().Msgf("Last cursor: %v", lastCursor) + dbMessages, c := findPendingDBMessages(subscriptionId, republishEntry.OldDeliveryType, cursor) + cursor = c if len(dbMessages) == 0 { break } for _, dbMessage := range dbMessages { + throttler.Throttle() if cache.GetCancelStatus(subscriptionId) { log.Info().Msgf("Republishing for subscription %s has been cancelled", subscriptionId) return nil } - for { - if acquireResult := throttler.Acquire(context.Background()); acquireResult != nil { - sleepInterval := time.Millisecond * 10 - totalSleepTime := config.Current.Republishing.ThrottlingIntervalTime - for slept := time.Duration(0); slept < totalSleepTime; slept += sleepInterval { - if cache.GetCancelStatus(subscriptionId) { - log.Info().Msgf("Republishing for subscription %s has been cancelled", subscriptionId) - return nil - } - - time.Sleep(sleepInterval) - } - continue - } - break - } - - var newDeliveryType string - if !strings.EqualFold(string(subscription.Spec.Subscription.DeliveryType), string(dbMessage.DeliveryType)) { - newDeliveryType = strings.ToUpper(string(subscription.Spec.Subscription.DeliveryType)) - } - - var newCallbackUrl string - if subscription.Spec.Subscription.Callback != "" && (subscription.Spec.Subscription.Callback != dbMessage.Properties["callbackUrl"]) { - newCallbackUrl = subscription.Spec.Subscription.Callback - } - - if dbMessage.Coordinates == nil { - log.Error().Msgf("Coordinates in message for subscriptionId %s are nil: %+v", subscriptionId, dbMessage) - continue - } - - kafkaMessage, err := picker.Pick(&dbMessage) - if err != nil { + if err := RepublishEvent(picker, &dbMessage, subscription); err != nil { // Returning an error results in NOT deleting the republishingEntry from the cache // so that the republishing job will get retried shortly - var nErr *net.OpError - if errors.As(err, &nErr) { - return err - } - - var errorList = []error{ - sarama.ErrEligibleLeadersNotAvailable, - sarama.ErrPreferredLeaderNotAvailable, - sarama.ErrUnknownLeaderEpoch, - sarama.ErrFencedLeaderEpoch, - sarama.ErrNotLeaderForPartition, - sarama.ErrLeaderNotAvailable, - } - - for _, e := range errorList { - if errors.Is(err, e) { - return err - } - } - - log.Error().Err(err).Msgf("Error while fetching message from kafka for subscriptionId %s", subscriptionId) - continue - } - - var b3Ctx = tracing.WithB3FromMessage(context.Background(), kafkaMessage) - var traceCtx = tracing.NewTraceContext(b3Ctx, "golaris", config.Current.Tracing.DebugEnabled) - - traceCtx.StartSpan("republish message") - traceCtx.SetAttribute("component", "Horizon Golaris") - traceCtx.SetAttribute("eventId", dbMessage.Event.Id) - traceCtx.SetAttribute("eventType", dbMessage.Event.Type) - traceCtx.SetAttribute("subscriptionId", dbMessage.SubscriptionId) - traceCtx.SetAttribute("uuid", string(kafkaMessage.Key)) - - err = kafka.CurrentHandler.RepublishMessage(traceCtx, kafkaMessage, newDeliveryType, newCallbackUrl, false) - if err != nil { - log.Warn().Msgf("Error while republishing message for subscriptionId %s: %v", subscriptionId, err) - traceCtx.CurrentSpan().RecordError(err) + return err } - log.Debug().Msgf("Successfully republished message for subscriptionId %s", subscriptionId) - - traceCtx.EndCurrentSpan() - } - - if len(dbMessages) < int(batchSize) { - break } } + return nil } @@ -296,3 +236,52 @@ func Unlock(ctx context.Context, subscriptionId string) error { } return nil } + +// checkForSubscriptionUpdates returns the new updated delivery type and callback URL for a subscription +// The function compares the delivery type and callback URL from database entry with those from the subscription resource +// and return the changes. If the subscription resource is not passed, the function tries to fetch it from the cache. +// The function takes three parameters: +// - dbMessage: a reference to the status entry from the database +// - subscription: an optional reference to the subscription resource, which will be used to check for callbackUrl and deliveryType changes (can be nil) +func checkForSubscriptionUpdates(dbMessage *message.StatusMessage, subscription *resource.SubscriptionResource) (newDeliveryType string, newCallbackUrl string) { + subscriptionId := dbMessage.SubscriptionId + + if subscription == nil { + subscription, _ = cache.SubscriptionCache.Get(config.Current.Hazelcast.Caches.SubscriptionCache, subscriptionId) + } + + if subscription != nil { + if !strings.EqualFold(string(subscription.Spec.Subscription.DeliveryType), string(dbMessage.DeliveryType)) { + newDeliveryType = strings.ToUpper(string(subscription.Spec.Subscription.DeliveryType)) + } + + if subscription.Spec.Subscription.Callback != "" && (subscription.Spec.Subscription.Callback != dbMessage.Properties["callbackUrl"]) { + newCallbackUrl = subscription.Spec.Subscription.Callback + } + } + + return +} + +func findPendingDBMessages(subscriptionId string, oldDeliveryType string, cursor any) (dbMessages []message.StatusMessage, newCursor any) { + dbMessages = []message.StatusMessage{} + var err error + + if oldDeliveryType == "sse" || oldDeliveryType == "server_sent_event" { + dbMessages, newCursor, err = mongo.CurrentConnection.FindProcessedMessagesByDeliveryTypeSSE(time.Now(), cursor, subscriptionId) + if err != nil { + log.Error().Err(err).Msgf("Error while fetching PROCESSED messages for subscription %s from db", subscriptionId) + } + log.Debug().Msgf("Found %d PROCESSED messages in MongoDb", len(dbMessages)) + } else { + dbMessages, newCursor, err = mongo.CurrentConnection.FindWaitingMessages(time.Now(), cursor, subscriptionId) + if err != nil { + log.Error().Err(err).Msgf("Error while fetching messages for subscription %s from db", subscriptionId) + } + log.Debug().Msgf("Found %d WAITING messages in MongoDb", len(dbMessages)) + } + + log.Debug().Msgf("Last cursor: %v", newCursor) + + return +} diff --git a/internal/republish/republish_test.go b/internal/republish/republish_test.go index 33b9eb2..a0dac3f 100644 --- a/internal/republish/republish_test.go +++ b/internal/republish/republish_test.go @@ -29,6 +29,7 @@ func TestMain(m *testing.M) { }) config.Current = test.BuildTestConfig() cache.Initialize() + code := m.Run() test.TeardownDocker() @@ -98,63 +99,6 @@ func TestHandleRepublishingEntry_NotAcquired(t *testing.T) { defer cache.RepublishingCache.Unlock(ctx, testSubscriptionId) } -func TestRepublishEvents(t *testing.T) { - // Initialize mocks - mockMongo := new(test.MockMongoHandler) - mockKafka := new(test.MockKafkaHandler) - - mockPicker := new(test.MockPicker) - test.InjectMockPicker(mockPicker) - - // Replace real handlers with mocks - mongo.CurrentConnection = mockMongo - kafka.CurrentHandler = mockKafka - - // Set configurations for the test - config.Current.Republishing.BatchSize = 10 - - // Mock data - subscriptionId := "sub123" - - partitionValue1 := int32(1) - offsetValue1 := int64(100) - partitionValue2 := int32(1) - offsetValue2 := int64(101) - dbMessages := []message.StatusMessage{ - {Topic: "test-topic", Coordinates: &message.Coordinates{Partition: &partitionValue1, Offset: &offsetValue1}}, - {Topic: "test-topic", Coordinates: &message.Coordinates{Partition: &partitionValue2, Offset: &offsetValue2}}, - } - - kafkaMessage := sarama.ConsumerMessage{Value: []byte("test-content")} - - // Expectations for the batch - mockMongo.On("FindWaitingMessages", mock.Anything, mock.Anything, subscriptionId).Return(dbMessages, nil, nil).Once() - - mockPicker.On("Pick", mock.AnythingOfType("*message.StatusMessage")).Return(&kafkaMessage, nil).Twice() - mockKafka.On("RepublishMessage", mock.AnythingOfType("*sarama.ConsumerMessage"), "CALLBACK", "http://new-callbackUrl/callback").Return(nil).Twice() - - // Call the function under test - subscription := &resource.SubscriptionResource{ - Spec: struct { - Subscription resource.Subscription `json:"subscription"` - Environment string `json:"environment"` - }{ - Subscription: resource.Subscription{ - SubscriptionId: "sub123", - DeliveryType: enum.DeliveryTypeCallback, - Callback: "http://new-callbackUrl/callback", - }, - }, - } - - RepublishPendingEvents(subscription, RepublishingCacheEntry{SubscriptionId: subscriptionId}) - - // Assertions - mockMongo.AssertExpectations(t) - mockKafka.AssertExpectations(t) - mockPicker.AssertExpectations(t) -} - func Test_Unlock_RepublishingEntryLocked(t *testing.T) { defer test.ClearCaches() var assertions = assert.New(t) @@ -226,3 +170,240 @@ func Test_ForceDelete_RepublishingEntryUnlocked(t *testing.T) { // Assertions assertions.False(cache.RepublishingCache.ContainsKey(ctx, testSubscriptionId)) } + +// mockStep beschreibt ein "Paging-Ergebnis" für FindWaitingMessages. +type mockStep struct { + OutMessages []message.StatusMessage + OutNextCursor any + OutError error +} + +// republishTestCase enthält die Test-Daten für unser Table-Driven-Testverfahren. +type republishTestCase struct { + name string + subscriptionId string + mongoSteps []mockStep + dbMessages []message.StatusMessage + kafkaMessages []sarama.ConsumerMessage + republishErrors []error + expectedError bool +} + +// Helper +func intPtr(i int32) *int32 { return &i } +func int64Ptr(i int64) *int64 { return &i } + +func TestRepublishPendingEvents_TableDriven(t *testing.T) { + config.Current.Republishing.BatchSize = 10 + config.Current.Tracing.Enabled = true + + cache.Initialize() + defer test.ClearCaches() + + // Test Case + testCases := []republishTestCase{ + { + name: "Successful republish with multi-page fetch", + subscriptionId: "success_multi_page", + /* + * Important: We need 4 calls (the 4th call returns 0 messages => break). + * 1) 2 messages + * 2) 2 messages + * 3) 1 message + * 4) 0 messages => break + */ + mongoSteps: []mockStep{ + { + OutMessages: []message.StatusMessage{ + { + Topic: "test-topic", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(100), + }, + }, + { + Topic: "test-topic", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(101), + }, + }, + }, + OutNextCursor: "success_multi_page_cursor_1", + OutError: nil, + }, + { + OutMessages: []message.StatusMessage{ + { + Topic: "test-topic", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(102), + }, + }, + { + Topic: "test-topic", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(103), + }, + }, + }, + OutNextCursor: "success_multi_page_cursor_2", + OutError: nil, + }, + { + OutMessages: []message.StatusMessage{ + { + Topic: "test-topic", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(104), + }, + }, + }, + OutNextCursor: nil, + OutError: nil, + }, + { + // 4th call: no messages => len(...) == 0 => break + OutMessages: []message.StatusMessage{}, + OutNextCursor: nil, + OutError: nil, + }, + }, + + // 2+2+1 = 5 messages in MongoDB + dbMessages: []message.StatusMessage{ + { + Topic: "test-topic", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(100), + }, + }, + { + Topic: "test-topic", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(101), + }, + }, + { + Topic: "test-topic", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(102), + }, + }, + { + Topic: "test-topic", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(103), + }, + }, + { + Topic: "test-topic", + Coordinates: &message.Coordinates{ + Partition: intPtr(1), + Offset: int64Ptr(104), + }, + }, + }, + + // 5 corresponding Kafka messages + kafkaMessages: []sarama.ConsumerMessage{ + {Value: []byte("test-content-1")}, + {Value: []byte("test-content-2")}, + {Value: []byte("test-content-3")}, + {Value: []byte("test-content-4")}, + {Value: []byte("test-content-5")}, + }, + + // No republishing errors in this testcase + republishErrors: []error{nil, nil, nil, nil, nil}, + expectedError: false, + }, + } + + // Test execution + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // 1) Prepare mocks + mockMongo := new(test.MockMongoHandler) + mockKafka := new(test.MockKafkaHandler) + mockPicker := new(test.MockPicker) + + mongo.CurrentConnection = mockMongo + kafka.CurrentHandler = mockKafka + test.InjectMockPicker(mockPicker) + + // 2) Configure Mongo mock: as many "On()" calls as in tc.mongoSteps + for _, step := range tc.mongoSteps { + mockMongo. + On("FindWaitingMessages", mock.Anything, mock.Anything, tc.subscriptionId). + Return(step.OutMessages, step.OutNextCursor, step.OutError). + Once() + } + + // 3) Picker mock: For each dbMessage, there is one "Pick" call + for i, dbMsg := range tc.dbMessages { + mockPicker. + On("Pick", &dbMsg). + Return(&tc.kafkaMessages[i], nil). + Once() + } + + // 4) Kafka mock: RepublishMessage with the new interface signature, + // expecting one "On()" call per Kafka message. + for i, kafkaMsg := range tc.kafkaMessages { + errVal := tc.republishErrors[i] + + mockKafka.On( + "RepublishMessage", + mock.Anything, // context.Context + &kafkaMsg, // *sarama.ConsumerMessage + mock.AnythingOfType("string"), // newDeliveryType + mock.AnythingOfType("string"), // newCallbackUrl + false, // skipTopicSuffix + ).Return(errVal).Once() + + } + + // 5) Subscription Resource + subscription := &resource.SubscriptionResource{ + Spec: struct { + Subscription resource.Subscription `json:"subscription"` + Environment string `json:"environment"` + }{ + Subscription: resource.Subscription{ + SubscriptionId: tc.subscriptionId, + DeliveryType: enum.DeliveryTypeCallback, + Callback: "http://test-callback.com", + }, + }, + } + + // 6) Entry + entry := RepublishingCacheEntry{ + SubscriptionId: tc.subscriptionId, + } + + // 7) Run test + err := republishPendingEvents(subscription, entry) + + // 8) Assertions + if tc.expectedError { + assert.Error(t, err, "Expected an error, but got none for test %s", tc.name) + } else { + assert.NoError(t, err, "Did not expect an error, but got one for test %s", tc.name) + } + + mockMongo.AssertExpectations(t) + mockKafka.AssertExpectations(t) + mockPicker.AssertExpectations(t) + }) + } +} diff --git a/internal/test/kafkahandler_mock.go b/internal/test/kafkahandler_mock.go index 11c2ad9..293ccde 100644 --- a/internal/test/kafkahandler_mock.go +++ b/internal/test/kafkahandler_mock.go @@ -18,7 +18,13 @@ type MockKafkaHandler struct { mock.Mock } -func (m *MockKafkaHandler) RepublishMessage(traceCtx *tracing.TraceContext, message *sarama.ConsumerMessage, newDeliveryType string, newCallbackUrl string, errorParams bool) error { - args := m.Called(message, newDeliveryType, newCallbackUrl) +func (m *MockKafkaHandler) RepublishMessage( + traceCtx *tracing.TraceContext, + message *sarama.ConsumerMessage, + newDeliveryType string, + newCallbackUrl string, + errorParams bool, +) error { + args := m.Called(traceCtx, message, newDeliveryType, newCallbackUrl, errorParams) return args.Error(0) } diff --git a/internal/throttling/throttler.go b/internal/throttling/throttler.go new file mode 100644 index 0000000..e34b755 --- /dev/null +++ b/internal/throttling/throttler.go @@ -0,0 +1,64 @@ +// Copyright 2024 Deutsche Telekom IT GmbH +// +// SPDX-License-Identifier: Apache-2.0 + +package throttling + +import ( + "context" + "github.com/1pkg/gohalt" + "github.com/rs/zerolog/log" + "github.com/telekom/pubsub-horizon-go/resource" + "pubsub-horizon-golaris/internal/cache" + "pubsub-horizon-golaris/internal/config" + "time" +) + +type SubscriptionAwareThrottler struct { + throttler gohalt.Throttler + subscription *resource.SubscriptionResource +} + +func NewSubscriptionAwareThrottler(subscription *resource.SubscriptionResource) SubscriptionAwareThrottler { + deliveryType := string(subscription.Spec.Subscription.DeliveryType) + redeliveriesPerSecond := subscription.Spec.Subscription.RedeliveriesPerSecond + + var throttler gohalt.Throttler + + if deliveryType == "sse" || deliveryType == "server_sent_event" || redeliveriesPerSecond <= 0 { + throttler = gohalt.NewThrottlerEcho(nil) + } else { + log.Info().Msgf("Creating throttler with %d redeliveries", redeliveriesPerSecond) + throttler = gohalt.NewThrottlerTimed(uint64(redeliveriesPerSecond), config.Current.Republishing.ThrottlingIntervalTime, 0) + } + + return SubscriptionAwareThrottler{ + throttler: throttler, + subscription: subscription, + } +} + +func (t SubscriptionAwareThrottler) Throttle() { + subscriptionId := t.subscription.Spec.Subscription.SubscriptionId + + for canceled := cache.GetCancelStatus(subscriptionId); !canceled; { + if err := t.throttler.Acquire(context.Background()); err != nil { + // throttling quota is drained + sleepInterval := time.Millisecond * 10 + totalSleepTime := config.Current.Republishing.ThrottlingIntervalTime + for slept := time.Duration(0); slept < totalSleepTime; slept += sleepInterval { + if cache.GetCancelStatus(subscriptionId) { + return + } + + time.Sleep(sleepInterval) + } + } else { + return // free throttling quota + } + } +} + +func (t SubscriptionAwareThrottler) Release() { + t.throttler.Release(context.Background()) +}