Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix healthcheck server repeatedly update instance ready/unready. #244

Merged
merged 3 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions cmd/maestro/servecmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/openshift-online/maestro/cmd/maestro/server"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/controllers"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"
"github.com/openshift-online/maestro/pkg/dispatcher"
"github.com/openshift-online/maestro/pkg/event"
Expand All @@ -40,8 +39,6 @@ func runServer(cmd *cobra.Command, args []string) {
klog.Fatalf("Unable to initialize environment: %s", err.Error())
}

healthcheckServer := server.NewHealthCheckServer()

// Create event broadcaster to broadcast resource status update events to subscribers
eventBroadcaster := event.NewEventBroadcaster()

Expand All @@ -60,23 +57,21 @@ func runServer(cmd *cobra.Command, args []string) {
subscriptionType := environments.Environment().Config.EventServer.SubscriptionType
switch config.SubscriptionType(subscriptionType) {
case config.SharedSubscriptionType:
statusDispatcher = dispatcher.NewNoopDispatcher(dao.NewConsumerDao(&environments.Environment().Database.SessionFactory), environments.Environment().Clients.CloudEventsSource)
statusDispatcher = dispatcher.NewNoopDispatcher(environments.Environment().Database.SessionFactory, environments.Environment().Clients.CloudEventsSource)
case config.BroadcastSubscriptionType:
statusDispatcher = dispatcher.NewHashDispatcher(environments.Environment().Config.MessageBroker.ClientID, dao.NewInstanceDao(&environments.Environment().Database.SessionFactory),
dao.NewConsumerDao(&environments.Environment().Database.SessionFactory), environments.Environment().Clients.CloudEventsSource, environments.Environment().Config.EventServer.ConsistentHashConfig)
statusDispatcher = dispatcher.NewHashDispatcher(environments.Environment().Config.MessageBroker.ClientID, environments.Environment().Database.SessionFactory,
environments.Environment().Clients.CloudEventsSource, environments.Environment().Config.EventServer.ConsistentHashConfig)
default:
klog.Errorf("Unsupported subscription type: %s", subscriptionType)
}

// Set the status dispatcher for the healthcheck server
healthcheckServer.SetStatusDispatcher(statusDispatcher)
eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher)
eventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory))
}

// Create the servers
apiserver := server.NewAPIServer(eventBroadcaster)
metricsServer := server.NewMetricsServer()
healthcheckServer := server.NewHealthCheckServer()
controllersServer := server.NewControllersServer(eventServer, eventFilter)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
26 changes: 4 additions & 22 deletions cmd/maestro/server/healthcheck_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ import (
"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"
"github.com/openshift-online/maestro/pkg/dispatcher"
"gorm.io/gorm"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)

type HealthCheckServer struct {
httpServer *http.Server
statusDispatcher dispatcher.Dispatcher
lockFactory db.LockFactory
instanceDao dao.InstanceDao
instanceID string
Expand Down Expand Up @@ -49,10 +47,6 @@ func NewHealthCheckServer() *HealthCheckServer {
return server
}

func (s *HealthCheckServer) SetStatusDispatcher(dispatcher dispatcher.Dispatcher) {
s.statusDispatcher = dispatcher
}

func (s *HealthCheckServer) Start(ctx context.Context) {
klog.Infof("Starting HealthCheck server")

Expand Down Expand Up @@ -153,34 +147,22 @@ func (s *HealthCheckServer) checkInstances(ctx context.Context) {
inactiveInstanceIDs := []string{}
for _, instance := range instances {
// Instances pulsing within the last three check intervals are considered as active.
if instance.LastHeartbeat.After(time.Now().Add(time.Duration(int(-3*time.Second) * s.heartbeatInterval))) {
if s.brokerType == "mqtt" {
if err := s.statusDispatcher.OnInstanceUp(instance.ID); err != nil {
klog.Errorf("Error to call OnInstanceUp handler for maestro instance %s: %s", instance.ID, err.Error())
}
}
// mark the instance as active after it is added to the status dispatcher
if instance.LastHeartbeat.After(time.Now().Add(time.Duration(int(-3*time.Second)*s.heartbeatInterval))) && !instance.Ready {
activeInstanceIDs = append(activeInstanceIDs, instance.ID)
} else {
if s.brokerType == "mqtt" {
if err := s.statusDispatcher.OnInstanceDown(instance.ID); err != nil {
klog.Errorf("Error to call OnInstanceDown handler for maestro instance %s: %s", instance.ID, err.Error())
}
}
// mark the instance as inactive after it is removed from the status dispatcher
} else if instance.LastHeartbeat.Before(time.Now().Add(time.Duration(int(-3*time.Second)*s.heartbeatInterval))) && instance.Ready {
inactiveInstanceIDs = append(inactiveInstanceIDs, instance.ID)
}
}

if len(activeInstanceIDs) > 0 {
// batch mark active instances
// batch mark active instances, this will tigger status dispatcher to call onInstanceUp handler.
if err := s.instanceDao.MarkReadyByIDs(ctx, activeInstanceIDs); err != nil {
klog.Errorf("Unable to mark active maestro instances (%s): %s", activeInstanceIDs, err.Error())
}
}

if len(inactiveInstanceIDs) > 0 {
// batch mark inactive instances
// batch mark inactive instances, this will tigger status dispatcher to call onInstanceDown handler.
if err := s.instanceDao.MarkUnreadyByIDs(ctx, inactiveInstanceIDs); err != nil {
klog.Errorf("Unable to mark inactive maestro instances (%s): %s", inactiveInstanceIDs, err.Error())
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/dao/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package dao

import (
"context"
"fmt"
"strings"
"time"

"gorm.io/gorm/clause"
Expand Down Expand Up @@ -67,7 +69,10 @@ func (d *sqlInstanceDao) MarkReadyByIDs(ctx context.Context, ids []string) error
db.MarkForRollback(ctx, err)
return err
}
return nil

// call pg_notify to notify the server_instances channel
notify := fmt.Sprintf("select pg_notify('%s', '%s')", "server_instances", fmt.Sprintf("ready:%s", strings.Join(ids, ",")))
return g2.Exec(notify).Error
}

func (d *sqlInstanceDao) MarkUnreadyByIDs(ctx context.Context, ids []string) error {
Expand All @@ -76,7 +81,9 @@ func (d *sqlInstanceDao) MarkUnreadyByIDs(ctx context.Context, ids []string) err
db.MarkForRollback(ctx, err)
return err
}
return nil
// call pg_notify to notify the server_instances channel
notify := fmt.Sprintf("select pg_notify('%s', '%s')", "server_instances", fmt.Sprintf("unready:%s", strings.Join(ids, ",")))
return g2.Exec(notify).Error
}

func (d *sqlInstanceDao) Delete(ctx context.Context, id string) error {
Expand Down
4 changes: 0 additions & 4 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,4 @@ type Dispatcher interface {
Start(ctx context.Context)
// Dispatch determines if the current Maestro instance should process the resource status update based on the consumer ID.
Dispatch(consumerName string) bool
// OnInstanceUp is called when a new maestro instance is up.
OnInstanceUp(instanceID string) error
// OnInstanceDown is called when a maestro instance is inactive.
OnInstanceDown(instanceID string) error
}
67 changes: 49 additions & 18 deletions pkg/dispatcher/hash_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dispatcher
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -13,9 +14,11 @@ import (
"github.com/openshift-online/maestro/pkg/client/cloudevents"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"
"github.com/openshift-online/maestro/pkg/logger"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)

var _ Dispatcher = &HashDispatcher{}
Expand All @@ -24,23 +27,25 @@ var _ Dispatcher = &HashDispatcher{}
// Only the maestro instance that is mapped to a consumer will process the resource status update from that consumer.
// Need to trigger status resync for the consumer when an instance is up or down.
type HashDispatcher struct {
instanceID string
instanceDao dao.InstanceDao
consumerDao dao.ConsumerDao
sourceClient cloudevents.SourceClient
consumerSet mapset.Set[string]
workQueue workqueue.RateLimitingInterface
consistent *consistent.Consistent
instanceID string
sessionFactory db.SessionFactory
instanceDao dao.InstanceDao
consumerDao dao.ConsumerDao
sourceClient cloudevents.SourceClient
consumerSet mapset.Set[string]
workQueue workqueue.RateLimitingInterface
consistent *consistent.Consistent
}

func NewHashDispatcher(instanceID string, instanceDao dao.InstanceDao, consumerDao dao.ConsumerDao, sourceClient cloudevents.SourceClient, consistentHashingConfig *config.ConsistentHashConfig) *HashDispatcher {
func NewHashDispatcher(instanceID string, sessionFactory db.SessionFactory, sourceClient cloudevents.SourceClient, consistentHashingConfig *config.ConsistentHashConfig) *HashDispatcher {
return &HashDispatcher{
instanceID: instanceID,
instanceDao: instanceDao,
consumerDao: consumerDao,
sourceClient: sourceClient,
consumerSet: mapset.NewSet[string](),
workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "hash-dispatcher"),
instanceID: instanceID,
sessionFactory: sessionFactory,
instanceDao: dao.NewInstanceDao(&sessionFactory),
consumerDao: dao.NewConsumerDao(&sessionFactory),
sourceClient: sourceClient,
consumerSet: mapset.NewSet[string](),
workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "hash-dispatcher"),
consistent: consistent.New(nil, consistent.Config{
PartitionCount: consistentHashingConfig.PartitionCount,
ReplicationFactor: consistentHashingConfig.ReplicationFactor,
Expand All @@ -58,6 +63,10 @@ func (d *HashDispatcher) Start(ctx context.Context) {
// start a goroutine to periodically check the instances and consumers.
go wait.UntilWithContext(ctx, d.check, 5*time.Second)

// listen for server_instance update
klog.Infof("HashDispatcher listening for server_instances updates")
go d.sessionFactory.NewListener(ctx, "server_instances", d.onInstanceUpdate)

// start a goroutine to resync current consumers for this source when the client is reconnected
go d.resyncOnReconnect(ctx)

Expand All @@ -66,6 +75,28 @@ func (d *HashDispatcher) Start(ctx context.Context) {
d.workQueue.ShutDown()
}

func (d *HashDispatcher) onInstanceUpdate(ids string) {
states := strings.Split(ids, ":")
if len(states) != 2 {
klog.Infof("watched server instances updated with invalid ids: %s", ids)
return
}
idList := strings.Split(states[1], ",")
if states[0] == "ready" {
for _, id := range idList {
if err := d.onInstanceUp(id); err != nil {
klog.Errorf("failed to call OnInstancesUp for instance %s: %s", id, err)
}
}
} else {
for _, id := range idList {
if err := d.onInstanceDown(id); err != nil {
klog.Errorf("failed to call OnInstancesDown for instance %s: %s", id, err)
}
}
}
}

// resyncOnReconnect listens for the client reconnected signal and resyncs current consumers for this source.
func (d *HashDispatcher) resyncOnReconnect(ctx context.Context) {
log := logger.NewOCMLogger(ctx)
Expand All @@ -90,8 +121,8 @@ func (d *HashDispatcher) Dispatch(consumerName string) bool {
return d.consumerSet.Contains(consumerName)
}

// OnInstanceUp adds the new instance to the hashing ring and updates the consumer set for the current instance.
func (d *HashDispatcher) OnInstanceUp(instanceID string) error {
// onInstanceUp adds the new instance to the hashing ring and updates the consumer set for the current instance.
func (d *HashDispatcher) onInstanceUp(instanceID string) error {
members := d.consistent.GetMembers()
for _, member := range members {
if member.String() == instanceID {
Expand All @@ -110,8 +141,8 @@ func (d *HashDispatcher) OnInstanceUp(instanceID string) error {
return d.updateConsumerSet()
}

// OnInstanceDown removes the instance from the hashing ring and updates the consumer set for the current instance.
func (d *HashDispatcher) OnInstanceDown(instanceID string) error {
// onInstanceDown removes the instance from the hashing ring and updates the consumer set for the current instance.
func (d *HashDispatcher) onInstanceDown(instanceID string) error {
members := d.consistent.GetMembers()
deletedMember := true
for _, member := range members {
Expand Down
49 changes: 36 additions & 13 deletions pkg/dispatcher/noop_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package dispatcher
import (
"context"
"fmt"
"strings"

"github.com/openshift-online/maestro/pkg/client/cloudevents"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"
"github.com/openshift-online/maestro/pkg/logger"
"k8s.io/klog/v2"
)

var _ Dispatcher = &NoopDispatcher{}
Expand All @@ -15,22 +18,32 @@ var _ Dispatcher = &NoopDispatcher{}
// to the current maestro instance. This is the default implementation when shared subscription is enabled.
// Need to trigger status resync from all consumers when an instance is down.
type NoopDispatcher struct {
consumerDao dao.ConsumerDao
sourceClient cloudevents.SourceClient
sessionFactory db.SessionFactory
consumerDao dao.ConsumerDao
sourceClient cloudevents.SourceClient
}

// NewNoopDispatcher creates a new NoopDispatcher instance.
func NewNoopDispatcher(consumerDao dao.ConsumerDao, sourceClient cloudevents.SourceClient) *NoopDispatcher {
func NewNoopDispatcher(sessionFactory db.SessionFactory, sourceClient cloudevents.SourceClient) *NoopDispatcher {
return &NoopDispatcher{
consumerDao: consumerDao,
sourceClient: sourceClient,
sessionFactory: sessionFactory,
consumerDao: dao.NewConsumerDao(&sessionFactory),
sourceClient: sourceClient,
}
}

// Start is a no-op implementation.
func (d *NoopDispatcher) Start(ctx context.Context) {
// handle client reconnected signal and resync status from consumers for this source
d.resyncOnReconnect(ctx)
go d.resyncOnReconnect(ctx)

// listen for server_instance update
klog.Infof("NoopDispatcher listening for server_instances updates")
go d.sessionFactory.NewListener(ctx, "server_instances", d.onInstanceUpdate)

// wait until context is canceled
<-ctx.Done()

}

// resyncOnReconnect listens for client reconnected signal and resyncs all consumers for this source.
Expand Down Expand Up @@ -61,18 +74,28 @@ func (d *NoopDispatcher) resyncOnReconnect(ctx context.Context) {
}
}

func (d *NoopDispatcher) onInstanceUpdate(ids string) {
states := strings.Split(ids, ":")
if len(states) != 2 {
klog.Infof("watched server instances updated with invalid ids: %s", ids)
return
}
idList := strings.Split(states[1], ",")
if states[0] == "unready" && len(idList) > 0 {
// only call onInstanceDown once with empty instance id to reduce the number of status resync requests
if err := d.onInstanceDown(); err != nil {
klog.Errorf("failed to call OnInstancesDown: %s", err)
}
}
}

// Dispatch always returns true, indicating that the current maestro instance should process the resource status update.
func (d *NoopDispatcher) Dispatch(consumerID string) bool {
return true
}

// OnInstanceUp is a no-op implementation.
func (d *NoopDispatcher) OnInstanceUp(instanceID string) error {
return nil
}

// OnInstanceDown triggers status resync from all consumers.
func (d *NoopDispatcher) OnInstanceDown(instanceID string) error {
// onInstanceDown calls status resync when there is down instance watched.
func (d *NoopDispatcher) onInstanceDown() error {
// send resync request to each consumer
// TODO: optimize this to only resync resource status for necessary consumers
consumerIDs := []string{}
Expand Down
Loading
Loading