From a6069b805d415a207ae433372626b035031e3287 Mon Sep 17 00:00:00 2001 From: "w.richter@telekom.de" Date: Thu, 20 Jun 2024 12:04:23 +0200 Subject: [PATCH] feat: initial logic for republishing off messages --- config/config.go | 1 + config/loader.go | 1 + golaris/health_check.go | 31 +++++++++++++++++++++++++++---- kafka/handler.go | 40 +++++++++++++--------------------------- mongo/query.go | 20 ++++++++++---------- service/service.go | 3 ++- 6 files changed, 54 insertions(+), 42 deletions(-) diff --git a/config/config.go b/config/config.go index 15d4f47..087717b 100644 --- a/config/config.go +++ b/config/config.go @@ -8,6 +8,7 @@ type Configuration struct { SuccessfulResponseCodes []int `mapstructure:"successfulResponseCodes"` RequestCooldownTime time.Duration `mapstructure:"requestCooldownResetTime"` + RepublishingBatchSize int64 `mapstructure:"republishingBatchSize"` Polling Polling `mapstructure:"polling"` diff --git a/config/loader.go b/config/loader.go index 32a2c7b..d0958f2 100644 --- a/config/loader.go +++ b/config/loader.go @@ -42,6 +42,7 @@ func setDefaults() { viper.SetDefault("port", 8080) viper.SetDefault("successfulResponseCodes", []int{200, 201, 202, 204}) + viper.SetDefault("republishingBatchSize", 10) // Polling viper.SetDefault("polling.openCbMessageInterval", "10ms") diff --git a/golaris/health_check.go b/golaris/health_check.go index 7a887eb..e1c6c7c 100644 --- a/golaris/health_check.go +++ b/golaris/health_check.go @@ -6,6 +6,8 @@ import ( "fmt" "github.com/go-resty/resty/v2" "github.com/rs/zerolog/log" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo/options" "golaris/config" "golaris/health" "golaris/utils" @@ -68,13 +70,13 @@ func performHealthCheck(deps utils.Dependencies, subscription *resource.Subscrip log.Error().Err(err).Msgf("Failed to update health check for key %s", healthCheckKey) } - checkConsumerHealth(healthCheckData) + checkConsumerHealth(deps, healthCheckData) log.Info().Msgf("Successfully updated health check for key %s", healthCheckKey) return } -func checkConsumerHealth(healthCheckData health.HealthCheck) { +func checkConsumerHealth(deps utils.Dependencies, healthCheckData health.HealthCheck) { log.Debug().Msg("Checking consumer health") resp, err := executeHealthRequest(healthCheckData.CallbackUrl, healthCheckData.Method) @@ -87,7 +89,7 @@ func checkConsumerHealth(healthCheckData health.HealthCheck) { success := utils.Contains(config.Current.SuccessfulResponseCodes, resp.StatusCode()) if success { go func() { - republishPendingEvents() + republishPendingEvents(deps, healthCheckData.CheckingFor) closeCircuitBreaker() }() } else { @@ -118,11 +120,32 @@ func executeHealthRequest(callbackUrl string, httpMethod string) (*resty.Respons } } -func republishPendingEvents() { +func republishPendingEvents(deps utils.Dependencies, subscriptionId string) { //Set CircuitBreaker to Republishing //Get Waiting events from database pageable! + pageable := options.Find().SetLimit(config.Current.RepublishingBatchSize).SetSort(bson.D{{Key: "timestamp", Value: 1}}) + messages, err := deps.MongoConn.FindWaitingMessages(time.Now().UTC(), pageable, subscriptionId) + if err != nil { + log.Error().Err(err).Msgf("Error while fetching messages for subscription %s from mongo-db", subscriptionId) + } + + for _, message := range messages { + log.Debug().Msgf("Republishing message for subscription %s: %v", subscriptionId, message) + + // ToDo Check if coordinates are nil + kafkaMessage, err := deps.KafkaHandler.PickMessage(message.Topic, message.Coordinates.Partition, message.Coordinates.Offset) + if err != nil { + log.Warn().Msgf("Error while fetching message from kafka for subscription %s", subscriptionId) + } + err = deps.KafkaHandler.RepublishMessage(kafkaMessage) + if err != nil { + log.Warn().Msgf("Error while republishing message for subscription %s", subscriptionId) + } + log.Debug().Msgf("Successfully republished message for subscription %s", subscriptionId) + } + //Loop through the messages in the database //Pick SubscriptionId, Coordinates and topic from the message diff --git a/kafka/handler.go b/kafka/handler.go index 2124e89..d6626e8 100644 --- a/kafka/handler.go +++ b/kafka/handler.go @@ -2,10 +2,11 @@ package kafka import ( "encoding/json" + "eni.telekom.de/horizon2go/pkg/enum" "github.com/IBM/sarama" + "github.com/google/uuid" "github.com/rs/zerolog/log" "golaris/config" - "time" ) type Handler struct { @@ -52,18 +53,16 @@ func (kafkaHandler Handler) PickMessage(topic string, partition int32, offset in } func (kafkaHandler Handler) RepublishMessage(message *sarama.ConsumerMessage) error { - modifiedValue, header, err := updateMessageMetadata(message) + modifiedValue, err := updateMessage(message) if err != nil { log.Error().Err(err).Msg("Could not update message metadata") return err } msg := &sarama.ProducerMessage{ - Topic: message.Topic, - Headers: header, - Key: sarama.StringEncoder(message.Key), - Value: sarama.ByteEncoder(modifiedValue), - Timestamp: time.Now(), + Key: sarama.StringEncoder(message.Key), + Topic: message.Topic, + Value: sarama.ByteEncoder(modifiedValue), } _, _, err = kafkaHandler.producer.SendMessage(msg) @@ -77,11 +76,11 @@ func (kafkaHandler Handler) RepublishMessage(message *sarama.ConsumerMessage) er return nil } -func updateMessageMetadata(message *sarama.ConsumerMessage) ([]byte, []sarama.RecordHeader, error) { +func updateMessage(message *sarama.ConsumerMessage) ([]byte, error) { var messageValue map[string]any if err := json.Unmarshal(message.Value, &messageValue); err != nil { log.Error().Err(err).Msg("Could not unmarshal message value") - return nil, nil, err + return nil, err } // ToDo: If there are changes, we have to adjust the data here so that the current data is written to the kafka @@ -90,27 +89,14 @@ func updateMessageMetadata(message *sarama.ConsumerMessage) ([]byte, []sarama.Re // --> circuitBreakerOptOut? // --> HttpMethod? - var metadataValue = map[string]any{ - "uuid": messageValue["uuid"], - "event": map[string]any{ - "id": messageValue["event"].(map[string]any)["id"], - }, - "status": "PROCESSED", - } + messageValue["uuid"] = uuid.New().String() + messageValue["status"] = enum.StatusProcessed - // Add the messageType to METADATA? - // ToDo: Check if this is right here or do we need a message metaType? - // ToDo: if we only update the metadata, retentionTime does not restart - newMessageType := "METADATA" - newHeaders := []sarama.RecordHeader{ - {Key: []byte("type"), Value: []byte(newMessageType)}, - } - - modifiedValue, err := json.Marshal(metadataValue) + modifiedValue, err := json.Marshal(messageValue) if err != nil { log.Error().Err(err).Msg("Could not marshal modified message value") - return nil, nil, err + return nil, err } - return modifiedValue, newHeaders, nil + return modifiedValue, nil } diff --git a/mongo/query.go b/mongo/query.go index 37c6657..a158c6f 100644 --- a/mongo/query.go +++ b/mongo/query.go @@ -1,25 +1,25 @@ package mongo import ( + "context" "eni.telekom.de/horizon2go/pkg/message" - "github.com/gofiber/fiber/v2" "github.com/rs/zerolog/log" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" "time" ) -func (connection Connection) findMessagesByQuery(ctx *fiber.Ctx, query bson.M, pageable options.FindOptions) ([]message.StatusMessage, error) { +func (connection Connection) findMessagesByQuery(query bson.M, pageable options.FindOptions) ([]message.StatusMessage, error) { collection := connection.client.Database(connection.config.Database).Collection(connection.config.Collection) - cursor, err := collection.Find(ctx.Context(), query, &pageable) + cursor, err := collection.Find(context.Background(), query, &pageable) if err != nil { log.Error().Err(err).Msgf("Error finding documents: %v", err) return nil, err } var messages []message.StatusMessage - if err = cursor.All(ctx.Context(), &messages); err != nil { + if err = cursor.All(context.Background(), &messages); err != nil { log.Error().Err(err).Msgf("Error reading documents from cursor: %v", err) return nil, err } @@ -27,7 +27,7 @@ func (connection Connection) findMessagesByQuery(ctx *fiber.Ctx, query bson.M, p return messages, nil } -func (connection Connection) FindWaitingMessages(ctx *fiber.Ctx, timestamp time.Time, pageable options.FindOptions, subscriptionId string) ([]message.StatusMessage, error) { +func (connection Connection) FindWaitingMessages(timestamp time.Time, pageable *options.FindOptions, subscriptionId string) ([]message.StatusMessage, error) { query := bson.M{ "status": "WAITING", "subscriptionId": subscriptionId, @@ -36,10 +36,10 @@ func (connection Connection) FindWaitingMessages(ctx *fiber.Ctx, timestamp time. }, } - return connection.findMessagesByQuery(ctx, query, pageable) + return connection.findMessagesByQuery(query, *pageable) } -func (connection Connection) FindDeliveringMessagesByDeliveryType(ctx *fiber.Ctx, status string, timestamp time.Time, pageable options.FindOptions, deliveryType string) ([]message.StatusMessage, error) { +func (connection Connection) FindDeliveringMessagesByDeliveryType(status string, timestamp time.Time, pageable options.FindOptions, deliveryType string) ([]message.StatusMessage, error) { query := bson.M{ "status": status, "deliveryType": deliveryType, @@ -48,11 +48,11 @@ func (connection Connection) FindDeliveringMessagesByDeliveryType(ctx *fiber.Ctx }, } - return connection.findMessagesByQuery(ctx, query, pageable) + return connection.findMessagesByQuery(query, pageable) } // ToDo: Here we need to discuss which FAILED events we want to republish! -func (connection Connection) FindFailedMessagesWithXYZException(ctx *fiber.Ctx, status string, timestamp time.Time, pageable options.FindOptions) ([]message.StatusMessage, error) { +func (connection Connection) FindFailedMessagesWithXYZException(status string, timestamp time.Time, pageable options.FindOptions) ([]message.StatusMessage, error) { query := bson.M{ "status": status, "error.type": "", @@ -61,5 +61,5 @@ func (connection Connection) FindFailedMessagesWithXYZException(ctx *fiber.Ctx, }, } - return connection.findMessagesByQuery(ctx, query, pageable) + return connection.findMessagesByQuery(query, pageable) } diff --git a/service/service.go b/service/service.go index 68c6c7e..2b6fdae 100644 --- a/service/service.go +++ b/service/service.go @@ -31,7 +31,8 @@ func InitializeService() { var err error app = fiber.New() - app.Use(configureSecurity()) + // Todo Implement token handling (clientId, clientSecret) + // app.Use(configureSecurity()) app.Use(tracing.Middleware()) app.Use(healthcheck.New())