Skip to content

Commit

Permalink
feat: update healthCheckCache
Browse files Browse the repository at this point in the history
  • Loading branch information
julian-spierefka committed Jun 19, 2024
1 parent a2dcb1e commit 2554a0c
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 37 deletions.
5 changes: 3 additions & 2 deletions golaris/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"eni.telekom.de/horizon2go/pkg/cache"
"eni.telekom.de/horizon2go/pkg/message"
"eni.telekom.de/horizon2go/pkg/resource"
"github.com/hazelcast/hazelcast-go-client"
"github.com/rs/zerolog/log"
"golaris/config"
)

func checkSubscriptionForCbMessage(subCache *cache.Cache[resource.SubscriptionResource], cbMessage message.CircuitBreakerMessage) {
func checkSubscriptionForCbMessage(subCache *cache.Cache[resource.SubscriptionResource], healthCache *hazelcast.Map, cbMessage message.CircuitBreakerMessage) {
subscriptionId := cbMessage.SubscriptionId

subscription, err := subCache.Get(config.Current.Hazelcast.Caches.SubscriptionCache, subscriptionId)
Expand All @@ -21,5 +22,5 @@ func checkSubscriptionForCbMessage(subCache *cache.Cache[resource.SubscriptionRe

// ToDo: Check whether the subscription has changed

go performHealthCheck(subscription)
go performHealthCheck(subscription, healthCache)
}
23 changes: 13 additions & 10 deletions golaris/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"eni.telekom.de/horizon2go/pkg/resource"
"fmt"
"github.com/hazelcast/hazelcast-go-client"
"github.com/rs/zerolog/log"
"golaris/cache"
"golaris/health"
"time"
)

func performHealthCheck(subscription *resource.SubscriptionResource) {
func performHealthCheck(subscription *resource.SubscriptionResource, healthCache *hazelcast.Map) {
// Specify HTTP method based on the subscription configuration
httpMethod := "HEAD"
if subscription.Spec.Subscription.EnforceGetHealthCheck == true {
Expand All @@ -20,31 +21,30 @@ func performHealthCheck(subscription *resource.SubscriptionResource) {
log.Info().Msgf("Created healthCheckKey is: %s", healthCheckKey)

// Attempt to acquire a lock for the health check key
ctx := cache.HealthCheckMap.NewLockContext(context.Background())
if acquired, _ := cache.HealthCheckMap.TryLockWithTimeout(ctx, healthCheckKey, 10*time.Millisecond); !acquired {
ctx := healthCache.NewLockContext(context.Background())
if acquired, _ := healthCache.TryLockWithTimeout(ctx, healthCheckKey, 10*time.Millisecond); !acquired {
log.Info().Msgf("Could not acquire lock for key %s, skipping health check", healthCheckKey)
return
}

// Ensure that the lock is released when the function is ended
defer func() {
if err := cache.HealthCheckMap.Unlock(context.Background(), healthCheckKey); err != nil {
if err := healthCache.Unlock(context.Background(), healthCheckKey); err != nil {
log.Error().Err(err).Msgf("Error unlocking key %s", healthCheckKey)
}
}()

//Check if there is already a HealthCheck entry for the HealthCheckKey
existingHealthCheck, err := cache.HealthCheckMap.Get(context.Background(), healthCheckKey)
existingHealthCheck, err := healthCache.Get(context.Background(), healthCheckKey)
if err != nil {
log.Error().Err(err).Msgf("Failed to retrieve health check for key %s", healthCheckKey)
} else if existingHealthCheck != nil {
//ToDo: What do we want to do in this case?
log.Info().Msgf("Health check for key %s already exists, skipping health check", healthCheckKey)
return
}

// Prepare the health check object
healthCheck := cache.HealthCheck{
healthCheckData := health.HealthCheck{
Environment: subscription.Spec.Environment,
Method: httpMethod,
CallbackUrl: subscription.Spec.Subscription.Callback,
Expand All @@ -53,12 +53,15 @@ func performHealthCheck(subscription *resource.SubscriptionResource) {
LastedCheckedStatus: 0,
}

// Update the health check in the cache
err = cache.HealthCheckMap.Set(context.Background(), healthCheckKey, healthCheck)
// Update the health check in the health
err = healthCache.Set(context.Background(), healthCheckKey, healthCheckData)
if err != nil {
log.Error().Err(err).Msgf("Failed to update health check for key %s", healthCheckKey)
return
}

existingHealthCheck2, _ := healthCache.Get(context.Background(), healthCheckKey)
log.Info().Msgf("Health check for key %s is: %v", healthCheckKey, existingHealthCheck2)

log.Info().Msgf("Successfully updated health check for key %s", healthCheckKey)
}
19 changes: 3 additions & 16 deletions cache/cache.go → health/cache.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package cache
package health

import (
"context"
"github.com/hazelcast/hazelcast-go-client"
"github.com/rs/zerolog/log"
"time"
)

Expand All @@ -16,29 +15,17 @@ type HealthCheck struct {
LastedCheckedStatus int `json:"lastCheckedStatus"`
}

var HealthCheckMap *hazelcast.Map

func init() {
cacheConfig := hazelcast.Config{}

var err error
HealthCheckMap, err = NewHealthCheckCache(cacheConfig)
if err != nil {
log.Error().Msgf("Failed to initialize health check cache: %v", err)
}
}

func NewHealthCheckCache(config hazelcast.Config) (*hazelcast.Map, error) {
hazelcastClient, err := hazelcast.StartNewClientWithConfig(context.Background(), config)
if err != nil {
return nil, err
}

mapName := "healthCheckEntries"
HealthCheckMap, err = hazelcastClient.GetMap(context.Background(), mapName)
HealthCheckInstance, err := hazelcastClient.GetMap(context.Background(), mapName)
if err != nil {
return nil, err
}

return HealthCheckMap, nil
return HealthCheckInstance, nil
}
2 changes: 1 addition & 1 deletion mock/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func CreateMockedCircuitBreakerMessages(cbCache *cache.Cache[message.CircuitBrea

for i := 1; i <= numberMessages; i++ {

subscriptionId := "subscriptionId"
subscriptionId := ""

circuitBreakerMessage := message.CircuitBreakerMessage{
SubscriptionId: subscriptionId,
Expand Down
23 changes: 15 additions & 8 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/rs/zerolog/log"
"golaris/config"
"golaris/golaris"
"golaris/health"
"golaris/kafka"
"golaris/metrics"
"golaris/mock"
Expand All @@ -21,11 +22,12 @@ import (
)

var (
SubCacheInstance *cache.Cache[resource.SubscriptionResource]
CbCacheInstance *cache.Cache[message.CircuitBreakerMessage]
kafkaHandler *kafka.Handler
app *fiber.App
mongoConnection *mongo.Connection
SubCacheInstance *cache.Cache[resource.SubscriptionResource]
CbCacheInstance *cache.Cache[message.CircuitBreakerMessage]
HealthCheckInstance *hazelcast.Map
kafkaHandler *kafka.Handler
app *fiber.App
mongoConnection *mongo.Connection
)

func InitializeService() {
Expand All @@ -42,12 +44,17 @@ func InitializeService() {
cacheConfig := configureHazelcast()
SubCacheInstance, err = cache.NewCache[resource.SubscriptionResource](cacheConfig)
if err != nil {
log.Panic().Err(err).Msg("Error while initializing Hazelcast subscription cache")
log.Panic().Err(err).Msg("Error while initializing Hazelcast subscription health")
}

CbCacheInstance, err = cache.NewCache[message.CircuitBreakerMessage](cacheConfig)
if err != nil {
log.Panic().Err(err).Msg("Error while initializing CircuitBreaker cache")
log.Panic().Err(err).Msg("Error while initializing CircuitBreaker health")
}

HealthCheckInstance, err = health.NewHealthCheckCache(cacheConfig)
if err != nil {
log.Panic().Err(err).Msg("Error while initializing HealthCheck cache")
}

mongoConnection, err = mongo.NewMongoConnection(&config.Current.Mongo)
Expand All @@ -60,7 +67,7 @@ func InitializeService() {
log.Panic().Err(err).Msg("Error while initializing Kafka Picker")
}

golaris.InitializeScheduler(CbCacheInstance, SubCacheInstance)
golaris.InitializeScheduler(CbCacheInstance, SubCacheInstance, HealthCheckInstance)

// TODO Mock cb-messages until comet is adapted
mock.CreateMockedCircuitBreakerMessages(CbCacheInstance, 1)
Expand Down

0 comments on commit 2554a0c

Please sign in to comment.