Skip to content

Commit

Permalink
feat: Refactoring and add some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
julian-spierefka committed Jun 20, 2024
1 parent d1841e7 commit 818cfff
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 102 deletions.
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import "time"

type Configuration struct {
LogLevel string `mapstructure:"logLevel"`
Port int `mapstructure:"port"`

Port int `mapstructure:"port"`
SuccessfulResponseCodes []int `mapstructure:"successfulResponseCodes"`
SuccessfulResponseCodes []int `mapstructure:"successfulResponseCodes"`
RequestCooldownTime time.Duration `mapstructure:"requestCooldownResetTime"`

Polling Polling `mapstructure:"polling"`

Expand Down
2 changes: 1 addition & 1 deletion config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func configureViper() {

func setDefaults() {
viper.SetDefault("logLevel", "info")

viper.SetDefault("port", 8080)

viper.SetDefault("successfulResponseCodes", []int{200, 201, 202, 204})

// Polling
Expand Down
26 changes: 0 additions & 26 deletions golaris/circuit_breaker.go

This file was deleted.

45 changes: 37 additions & 8 deletions golaris/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"eni.telekom.de/horizon2go/pkg/resource"
"fmt"
"github.com/go-resty/resty/v2"
"github.com/hazelcast/hazelcast-go-client"
"github.com/rs/zerolog/log"
"golaris/config"
"golaris/health"
Expand All @@ -14,7 +13,7 @@ import (
"time"
)

func performHealthCheck(subscription *resource.SubscriptionResource, healthCache *hazelcast.Map) {
func performHealthCheck(deps utils.Dependencies, subscription *resource.SubscriptionResource) {
// Specify HTTP method based on the subscription configuration
httpMethod := "HEAD"
if subscription.Spec.Subscription.EnforceGetHealthCheck == true {
Expand All @@ -24,23 +23,34 @@ func performHealthCheck(subscription *resource.SubscriptionResource, healthCache
healthCheckKey := fmt.Sprintf("%s:%s:%s", subscription.Spec.Environment, httpMethod, subscription.Spec.Subscription.Callback)

// Attempt to acquire a lock for the health check key
ctx := healthCache.NewLockContext(context.Background())
if acquired, _ := healthCache.TryLockWithTimeout(ctx, healthCheckKey, 10*time.Millisecond); !acquired {
ctx := deps.HealthCache.NewLockContext(context.Background())
if acquired, _ := deps.HealthCache.TryLockWithTimeout(ctx, healthCheckKey, 10*time.Millisecond); !acquired {
log.Debug().Msgf("Could not acquire lock for key %s, skipping health check", healthCheckKey)
return
}

// Ensure that the lock is released when the function is ended
defer func() {
if err := healthCache.Unlock(ctx, healthCheckKey); err != nil {
if err := deps.HealthCache.Unlock(ctx, healthCheckKey); err != nil {
log.Error().Err(err).Msgf("Error unlocking key %s", healthCheckKey)
}
log.Debug().Msgf("Successfully unlocked key %s", healthCheckKey)
}()

//Check if there is already a HealthCheck entry for the HealthCheckKey
_, err := healthCache.Get(context.Background(), healthCheckKey)
//ToDo: What do we want to do in this case if existing health check exists?
existingHealthCheckData, err := deps.HealthCache.Get(ctx, healthCheckKey)
if err != nil {
log.Error().Err(err).Msgf("Error retrieving health check for key %s", healthCheckKey)
}

if existingHealthCheckData != nil {
lastCheckedTime := existingHealthCheckData.(health.HealthCheck).LastChecked
duration := time.Since(lastCheckedTime)
if duration.Seconds() < config.Current.RequestCooldownTime.Seconds() {
log.Debug().Msgf("Skipping health check for key %s due to cooldown", healthCheckKey)
return
}
}

// Prepare the health check object
healthCheckData := health.HealthCheck{
Expand All @@ -53,7 +63,7 @@ func performHealthCheck(subscription *resource.SubscriptionResource, healthCache
}

// Update the health check in the health
err = healthCache.Set(ctx, healthCheckKey, healthCheckData)
err = deps.HealthCache.Set(ctx, healthCheckKey, healthCheckData)
if err != nil {
log.Error().Err(err).Msgf("Failed to update health check for key %s", healthCheckKey)
}
Expand All @@ -78,6 +88,7 @@ func checkConsumerHealth(healthCheckData health.HealthCheck) {
if success {
go func() {
republishPendingEvents()
closeCircuitBreaker()
}()
} else {
// ToDo: What do we want to do in case of a failed health check?
Expand Down Expand Up @@ -108,5 +119,23 @@ func executeHealthRequest(callbackUrl string, httpMethod string) (*resty.Respons
}

func republishPendingEvents() {
//Set CircuitBreaker to Republishing

//Get Waiting events from database pageable!

//Loop through the messages in the database

//Pick SubscriptionId, Coordinates and topic from the message

//Check if coords are null

//Pick message from kafka with topic, and partition and offset

//Check if message exist

//If exists, resetMessage (write new message into kafka)
}

func closeCircuitBreaker() {

}
26 changes: 9 additions & 17 deletions golaris/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,55 +1,47 @@
package golaris

import (
"eni.telekom.de/horizon2go/pkg/cache"
"eni.telekom.de/horizon2go/pkg/enum"
"eni.telekom.de/horizon2go/pkg/message"
"eni.telekom.de/horizon2go/pkg/resource"
"github.com/go-co-op/gocron"
"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/predicate"
"github.com/rs/zerolog/log"
"golaris/config"
"golaris/utils"
"time"
)

type SchedulerParams struct {
cbCache *cache.Cache[message.CircuitBreakerMessage]
subCache *cache.Cache[resource.SubscriptionResource]
}

var scheduler *gocron.Scheduler

func InitializeScheduler(cbCache *cache.Cache[message.CircuitBreakerMessage], subCache *cache.Cache[resource.SubscriptionResource], healthCache *hazelcast.Map) {
func InitializeScheduler(deps utils.Dependencies) {
scheduler = gocron.NewScheduler(time.UTC)

if _, err := scheduler.Every(config.Current.Polling.OpenCbMessageInterval).Do(func() {
checkCircuitBreakersByStatus(cbCache, subCache, healthCache, enum.CircuitBreakerStatusOpen)
checkCircuitBreakersByStatus(deps, enum.CircuitBreakerStatusOpen)
}); err != nil {
log.Error().Msgf("Error while scheduling for OPEN CircuitBreakers: %v", err)
}

//ToDo: With a scheduler or any other mechanism?
//ToDo: When do we want to do this? Always or only on a pod restart?
if _, err := scheduler.Every(config.Current.Polling.RepublishingOrCheckingMessageInterval).Do(func() {
checkCircuitBreakersByStatus(cbCache, subCache, healthCache, enum.CircuitBreakerStatusRepublishing)
checkCircuitBreakersByStatus(cbCache, subCache, healthCache, enum.CircuitBreakerStatusChecking)
checkCircuitBreakersByStatus(deps, enum.CircuitBreakerStatusRepublishing)
checkCircuitBreakersByStatus(deps, enum.CircuitBreakerStatusChecking)
}); err != nil {
log.Error().Msgf("Error while scheduling for REPUBLISHING and CHECKING CircuitBreakers: %v", err)
}

scheduler.StartAsync()
}

func checkCircuitBreakersByStatus(cbCache *cache.Cache[message.CircuitBreakerMessage], subCache *cache.Cache[resource.SubscriptionResource], healthCache *hazelcast.Map, status enum.CircuitBreakerStatus) {
func checkCircuitBreakersByStatus(deps utils.Dependencies, status enum.CircuitBreakerStatus) {
statusQuery := predicate.Equal("status", string(status))
cbEntries, err := cbCache.GetQuery(config.Current.Hazelcast.Caches.CircuitBreakerCache, statusQuery)
cbEntries, err := deps.CbCache.GetQuery(config.Current.Hazelcast.Caches.CircuitBreakerCache, statusQuery)
if err != nil {
log.Debug().Msgf("Error while getting CircuitBreaker messages: %v", err)
return
}

for _, entry := range cbEntries {
log.Info().Msgf("Checking CircuitBreaker with id %s", entry.Status)
go checkSubscriptionForCbMessage(subCache, healthCache, entry)
go checkSubscriptionForCbMessage(deps, entry)
}
}
24 changes: 24 additions & 0 deletions golaris/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package golaris

import (
"eni.telekom.de/horizon2go/pkg/message"
"github.com/rs/zerolog/log"
"golaris/config"
"golaris/utils"
)

func checkSubscriptionForCbMessage(deps utils.Dependencies, cbMessage message.CircuitBreakerMessage) {
subscriptionId := cbMessage.SubscriptionId

subscription, err := deps.SubCache.Get(config.Current.Hazelcast.Caches.SubscriptionCache, subscriptionId)
if err != nil {
log.Error().Err(err).Msgf("Could not read subscription with id %s", subscriptionId)
return
}

log.Info().Msgf("Subscription with id %s found: %v", subscriptionId, subscription)

// ToDo: Check whether the subscription has changed

go performHealthCheck(deps, subscription)
}
48 changes: 13 additions & 35 deletions mongo/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ import (
"time"
)

type MessageParams struct {
ctx *fiber.Ctx
status string
timestamp time.Time
pageable options.FindOptions
}

func (connection Connection) findMessagesByQuery(ctx *fiber.Ctx, query bson.M, pageable options.FindOptions) ([]message.StatusMessage, error) {
collection := connection.client.Database(connection.config.Database).Collection(connection.config.Collection)

Expand All @@ -34,54 +27,39 @@ func (connection Connection) findMessagesByQuery(ctx *fiber.Ctx, query bson.M, p
return messages, nil
}

func (connection Connection) findWaitingMessagesOrWithCallbackUrlNotFoundException(messageParams MessageParams, statusList []string, subscriptionIDs []string) ([]message.StatusMessage, error) {
ctx := messageParams.ctx

func (connection Connection) FindWaitingMessages(ctx *fiber.Ctx, timestamp time.Time, pageable options.FindOptions, subscriptionId string) ([]message.StatusMessage, error) {
query := bson.M{
"$or": []bson.M{
{"status": "WAITING"},
//ToDo: Only Waiting messages
{"error.type": "de.telekom.horizon.dude.exception.CallbackUrlNotFoundException"},
},
"status": bson.M{
"$in": statusList,
},
"subscriptionId": bson.M{
"$in": subscriptionIDs,
},
"status": "WAITING",
"subscriptionId": subscriptionId,
"modified": bson.M{
"$lte": messageParams.timestamp,
"$lte": timestamp,
},
}

return connection.findMessagesByQuery(ctx, query, messageParams.pageable)
return connection.findMessagesByQuery(ctx, query, pageable)
}

func (connection Connection) findDeliveringMessagesByDeliveryType(messageParams MessageParams, deliveryType string) ([]message.StatusMessage, error) {
ctx := messageParams.ctx

func (connection Connection) FindDeliveringMessagesByDeliveryType(ctx *fiber.Ctx, status string, timestamp time.Time, pageable options.FindOptions, deliveryType string) ([]message.StatusMessage, error) {
query := bson.M{
"status": status,
"deliveryType": deliveryType,
"status": messageParams.status,
"modified": bson.M{
"$lte": messageParams.timestamp,
"$lte": timestamp,
},
}

return connection.findMessagesByQuery(ctx, query, messageParams.pageable)
return connection.findMessagesByQuery(ctx, query, pageable)
}

// ToDo: Here we need to discuss which FAILED events we want to republish!
func (connection Connection) findFailedMessagesWithXYZException(messageParams MessageParams) ([]message.StatusMessage, error) {
ctx := messageParams.ctx

func (connection Connection) FindFailedMessagesWithXYZException(ctx *fiber.Ctx, status string, timestamp time.Time, pageable options.FindOptions) ([]message.StatusMessage, error) {
query := bson.M{
"status": messageParams.status,
"status": status,
"error.type": "",
"modified": bson.M{
"$lte": messageParams.timestamp,
"$lte": timestamp,
},
}

return connection.findMessagesByQuery(ctx, query, messageParams.pageable)
return connection.findMessagesByQuery(ctx, query, pageable)
}
23 changes: 10 additions & 13 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@ import (
"golaris/mock"
"golaris/mongo"
"golaris/tracing"
"golaris/utils"
)

var (
SubCacheInstance *cache.Cache[resource.SubscriptionResource]
CbCacheInstance *cache.Cache[message.CircuitBreakerMessage]
HealthCheckInstance *hazelcast.Map
kafkaHandler *kafka.Handler
app *fiber.App
mongoConnection *mongo.Connection
app *fiber.App
deps utils.Dependencies
)

func InitializeService() {
Expand All @@ -42,35 +39,35 @@ func InitializeService() {
app.Get("/api/v1/circuit-breakers/:subscriptionId", getCircuitBreakerMessage)

cacheConfig := configureHazelcast()
SubCacheInstance, err = cache.NewCache[resource.SubscriptionResource](cacheConfig)
deps.SubCache, err = cache.NewCache[resource.SubscriptionResource](cacheConfig)
if err != nil {
log.Panic().Err(err).Msg("Error while initializing Hazelcast subscription health")
}

CbCacheInstance, err = cache.NewCache[message.CircuitBreakerMessage](cacheConfig)
deps.CbCache, err = cache.NewCache[message.CircuitBreakerMessage](cacheConfig)
if err != nil {
log.Panic().Err(err).Msg("Error while initializing CircuitBreaker health")
}

HealthCheckInstance, err = health.NewHealthCheckCache(hazelcast.Config{})
deps.HealthCache, err = health.NewHealthCheckCache(hazelcast.Config{})
if err != nil {
log.Panic().Err(err).Msg("Error while initializing HealthCheck cache")
}

mongoConnection, err = mongo.NewMongoConnection(&config.Current.Mongo)
deps.MongoConn, err = mongo.NewMongoConnection(&config.Current.Mongo)
if err != nil {
log.Panic().Err(err).Msg("Error while initializing MongoDB connection")
}

kafkaHandler, err = kafka.NewKafkaHandler()
deps.KafkaHandler, err = kafka.NewKafkaHandler()
if err != nil {
log.Panic().Err(err).Msg("Error while initializing Kafka Picker")
}

golaris.InitializeScheduler(CbCacheInstance, SubCacheInstance, HealthCheckInstance)
golaris.InitializeScheduler(deps)

// TODO Mock cb-messages until comet is adapted
mock.CreateMockedCircuitBreakerMessages(CbCacheInstance, 1)
mock.CreateMockedCircuitBreakerMessages(deps.CbCache, 1)
}

func configureSecurity() fiber.Handler {
Expand Down
Loading

0 comments on commit 818cfff

Please sign in to comment.