Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: initial logic for republishing off messages
Browse files Browse the repository at this point in the history
WRichter72 committed Jun 20, 2024
1 parent 818cfff commit a6069b8
Showing 6 changed files with 54 additions and 42 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ type Configuration struct {

SuccessfulResponseCodes []int `mapstructure:"successfulResponseCodes"`
RequestCooldownTime time.Duration `mapstructure:"requestCooldownResetTime"`
RepublishingBatchSize int64 `mapstructure:"republishingBatchSize"`

Polling Polling `mapstructure:"polling"`

1 change: 1 addition & 0 deletions config/loader.go
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@ func setDefaults() {
viper.SetDefault("port", 8080)

viper.SetDefault("successfulResponseCodes", []int{200, 201, 202, 204})
viper.SetDefault("republishingBatchSize", 10)

// Polling
viper.SetDefault("polling.openCbMessageInterval", "10ms")
31 changes: 27 additions & 4 deletions golaris/health_check.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,8 @@ import (
"fmt"
"github.com/go-resty/resty/v2"
"github.com/rs/zerolog/log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
"golaris/config"
"golaris/health"
"golaris/utils"
@@ -68,13 +70,13 @@ func performHealthCheck(deps utils.Dependencies, subscription *resource.Subscrip
log.Error().Err(err).Msgf("Failed to update health check for key %s", healthCheckKey)
}

checkConsumerHealth(healthCheckData)
checkConsumerHealth(deps, healthCheckData)

log.Info().Msgf("Successfully updated health check for key %s", healthCheckKey)
return
}

func checkConsumerHealth(healthCheckData health.HealthCheck) {
func checkConsumerHealth(deps utils.Dependencies, healthCheckData health.HealthCheck) {
log.Debug().Msg("Checking consumer health")

resp, err := executeHealthRequest(healthCheckData.CallbackUrl, healthCheckData.Method)
@@ -87,7 +89,7 @@ func checkConsumerHealth(healthCheckData health.HealthCheck) {
success := utils.Contains(config.Current.SuccessfulResponseCodes, resp.StatusCode())
if success {
go func() {
republishPendingEvents()
republishPendingEvents(deps, healthCheckData.CheckingFor)
closeCircuitBreaker()
}()
} else {
@@ -118,11 +120,32 @@ func executeHealthRequest(callbackUrl string, httpMethod string) (*resty.Respons
}
}

func republishPendingEvents() {
func republishPendingEvents(deps utils.Dependencies, subscriptionId string) {
//Set CircuitBreaker to Republishing

//Get Waiting events from database pageable!

pageable := options.Find().SetLimit(config.Current.RepublishingBatchSize).SetSort(bson.D{{Key: "timestamp", Value: 1}})
messages, err := deps.MongoConn.FindWaitingMessages(time.Now().UTC(), pageable, subscriptionId)
if err != nil {
log.Error().Err(err).Msgf("Error while fetching messages for subscription %s from mongo-db", subscriptionId)
}

for _, message := range messages {
log.Debug().Msgf("Republishing message for subscription %s: %v", subscriptionId, message)

// ToDo Check if coordinates are nil
kafkaMessage, err := deps.KafkaHandler.PickMessage(message.Topic, message.Coordinates.Partition, message.Coordinates.Offset)
if err != nil {
log.Warn().Msgf("Error while fetching message from kafka for subscription %s", subscriptionId)
}
err = deps.KafkaHandler.RepublishMessage(kafkaMessage)
if err != nil {
log.Warn().Msgf("Error while republishing message for subscription %s", subscriptionId)
}
log.Debug().Msgf("Successfully republished message for subscription %s", subscriptionId)
}

//Loop through the messages in the database

//Pick SubscriptionId, Coordinates and topic from the message
40 changes: 13 additions & 27 deletions kafka/handler.go
Original file line number Diff line number Diff line change
@@ -2,10 +2,11 @@ package kafka

import (
"encoding/json"
"eni.telekom.de/horizon2go/pkg/enum"
"github.com/IBM/sarama"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
"golaris/config"
"time"
)

type Handler struct {
@@ -52,18 +53,16 @@ func (kafkaHandler Handler) PickMessage(topic string, partition int32, offset in
}

func (kafkaHandler Handler) RepublishMessage(message *sarama.ConsumerMessage) error {
modifiedValue, header, err := updateMessageMetadata(message)
modifiedValue, err := updateMessage(message)
if err != nil {
log.Error().Err(err).Msg("Could not update message metadata")
return err
}

msg := &sarama.ProducerMessage{
Topic: message.Topic,
Headers: header,
Key: sarama.StringEncoder(message.Key),
Value: sarama.ByteEncoder(modifiedValue),
Timestamp: time.Now(),
Key: sarama.StringEncoder(message.Key),
Topic: message.Topic,
Value: sarama.ByteEncoder(modifiedValue),
}

_, _, err = kafkaHandler.producer.SendMessage(msg)
@@ -77,11 +76,11 @@ func (kafkaHandler Handler) RepublishMessage(message *sarama.ConsumerMessage) er
return nil
}

func updateMessageMetadata(message *sarama.ConsumerMessage) ([]byte, []sarama.RecordHeader, error) {
func updateMessage(message *sarama.ConsumerMessage) ([]byte, error) {
var messageValue map[string]any
if err := json.Unmarshal(message.Value, &messageValue); err != nil {
log.Error().Err(err).Msg("Could not unmarshal message value")
return nil, nil, err
return nil, err
}

// ToDo: If there are changes, we have to adjust the data here so that the current data is written to the kafka
@@ -90,27 +89,14 @@ func updateMessageMetadata(message *sarama.ConsumerMessage) ([]byte, []sarama.Re
// --> circuitBreakerOptOut?
// --> HttpMethod?

var metadataValue = map[string]any{
"uuid": messageValue["uuid"],
"event": map[string]any{
"id": messageValue["event"].(map[string]any)["id"],
},
"status": "PROCESSED",
}
messageValue["uuid"] = uuid.New().String()
messageValue["status"] = enum.StatusProcessed

// Add the messageType to METADATA?
// ToDo: Check if this is right here or do we need a message metaType?
// ToDo: if we only update the metadata, retentionTime does not restart
newMessageType := "METADATA"
newHeaders := []sarama.RecordHeader{
{Key: []byte("type"), Value: []byte(newMessageType)},
}

modifiedValue, err := json.Marshal(metadataValue)
modifiedValue, err := json.Marshal(messageValue)
if err != nil {
log.Error().Err(err).Msg("Could not marshal modified message value")
return nil, nil, err
return nil, err
}

return modifiedValue, newHeaders, nil
return modifiedValue, nil
}
20 changes: 10 additions & 10 deletions mongo/query.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
package mongo

import (
"context"
"eni.telekom.de/horizon2go/pkg/message"
"github.com/gofiber/fiber/v2"
"github.com/rs/zerolog/log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
)

func (connection Connection) findMessagesByQuery(ctx *fiber.Ctx, query bson.M, pageable options.FindOptions) ([]message.StatusMessage, error) {
func (connection Connection) findMessagesByQuery(query bson.M, pageable options.FindOptions) ([]message.StatusMessage, error) {
collection := connection.client.Database(connection.config.Database).Collection(connection.config.Collection)

cursor, err := collection.Find(ctx.Context(), query, &pageable)
cursor, err := collection.Find(context.Background(), query, &pageable)
if err != nil {
log.Error().Err(err).Msgf("Error finding documents: %v", err)
return nil, err
}

var messages []message.StatusMessage
if err = cursor.All(ctx.Context(), &messages); err != nil {
if err = cursor.All(context.Background(), &messages); err != nil {
log.Error().Err(err).Msgf("Error reading documents from cursor: %v", err)
return nil, err
}

return messages, nil
}

func (connection Connection) FindWaitingMessages(ctx *fiber.Ctx, timestamp time.Time, pageable options.FindOptions, subscriptionId string) ([]message.StatusMessage, error) {
func (connection Connection) FindWaitingMessages(timestamp time.Time, pageable *options.FindOptions, subscriptionId string) ([]message.StatusMessage, error) {
query := bson.M{
"status": "WAITING",
"subscriptionId": subscriptionId,
@@ -36,10 +36,10 @@ func (connection Connection) FindWaitingMessages(ctx *fiber.Ctx, timestamp time.
},
}

return connection.findMessagesByQuery(ctx, query, pageable)
return connection.findMessagesByQuery(query, *pageable)
}

func (connection Connection) FindDeliveringMessagesByDeliveryType(ctx *fiber.Ctx, status string, timestamp time.Time, pageable options.FindOptions, deliveryType string) ([]message.StatusMessage, error) {
func (connection Connection) FindDeliveringMessagesByDeliveryType(status string, timestamp time.Time, pageable options.FindOptions, deliveryType string) ([]message.StatusMessage, error) {
query := bson.M{
"status": status,
"deliveryType": deliveryType,
@@ -48,11 +48,11 @@ func (connection Connection) FindDeliveringMessagesByDeliveryType(ctx *fiber.Ctx
},
}

return connection.findMessagesByQuery(ctx, query, pageable)
return connection.findMessagesByQuery(query, pageable)
}

// ToDo: Here we need to discuss which FAILED events we want to republish!
func (connection Connection) FindFailedMessagesWithXYZException(ctx *fiber.Ctx, status string, timestamp time.Time, pageable options.FindOptions) ([]message.StatusMessage, error) {
func (connection Connection) FindFailedMessagesWithXYZException(status string, timestamp time.Time, pageable options.FindOptions) ([]message.StatusMessage, error) {
query := bson.M{
"status": status,
"error.type": "",
@@ -61,5 +61,5 @@ func (connection Connection) FindFailedMessagesWithXYZException(ctx *fiber.Ctx,
},
}

return connection.findMessagesByQuery(ctx, query, pageable)
return connection.findMessagesByQuery(query, pageable)
}
3 changes: 2 additions & 1 deletion service/service.go
Original file line number Diff line number Diff line change
@@ -31,7 +31,8 @@ func InitializeService() {
var err error
app = fiber.New()

app.Use(configureSecurity())
// Todo Implement token handling (clientId, clientSecret)
// app.Use(configureSecurity())
app.Use(tracing.Middleware())
app.Use(healthcheck.New())

0 comments on commit a6069b8

Please sign in to comment.