Skip to content

Commit

Permalink
decouple health check server and status dispatcher.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Jan 13, 2025
1 parent b160af1 commit 934e8ce
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 88 deletions.
13 changes: 4 additions & 9 deletions cmd/maestro/servecmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/openshift-online/maestro/cmd/maestro/server"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/controllers"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"
"github.com/openshift-online/maestro/pkg/dispatcher"
"github.com/openshift-online/maestro/pkg/event"
Expand All @@ -40,8 +39,6 @@ func runServer(cmd *cobra.Command, args []string) {
klog.Fatalf("Unable to initialize environment: %s", err.Error())
}

healthcheckServer := server.NewHealthCheckServer()

// Create event broadcaster to broadcast resource status update events to subscribers
eventBroadcaster := event.NewEventBroadcaster()

Expand All @@ -60,23 +57,21 @@ func runServer(cmd *cobra.Command, args []string) {
subscriptionType := environments.Environment().Config.EventServer.SubscriptionType
switch config.SubscriptionType(subscriptionType) {
case config.SharedSubscriptionType:
statusDispatcher = dispatcher.NewNoopDispatcher(dao.NewConsumerDao(&environments.Environment().Database.SessionFactory), environments.Environment().Clients.CloudEventsSource)
statusDispatcher = dispatcher.NewNoopDispatcher(environments.Environment().Database.SessionFactory, environments.Environment().Clients.CloudEventsSource)
case config.BroadcastSubscriptionType:
statusDispatcher = dispatcher.NewHashDispatcher(environments.Environment().Config.MessageBroker.ClientID, dao.NewInstanceDao(&environments.Environment().Database.SessionFactory),
dao.NewConsumerDao(&environments.Environment().Database.SessionFactory), environments.Environment().Clients.CloudEventsSource, environments.Environment().Config.EventServer.ConsistentHashConfig)
statusDispatcher = dispatcher.NewHashDispatcher(environments.Environment().Config.MessageBroker.ClientID, environments.Environment().Database.SessionFactory,
environments.Environment().Clients.CloudEventsSource, environments.Environment().Config.EventServer.ConsistentHashConfig)
default:
klog.Errorf("Unsupported subscription type: %s", subscriptionType)
}

// Set the status dispatcher for the healthcheck server
healthcheckServer.SetStatusDispatcher(statusDispatcher)
eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher)
eventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory))
}

// Create the servers
apiserver := server.NewAPIServer(eventBroadcaster)
metricsServer := server.NewMetricsServer()
healthcheckServer := server.NewHealthCheckServer()
controllersServer := server.NewControllersServer(eventServer, eventFilter)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
22 changes: 2 additions & 20 deletions cmd/maestro/server/healthcheck_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ import (
"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"
"github.com/openshift-online/maestro/pkg/dispatcher"
"gorm.io/gorm"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)

type HealthCheckServer struct {
httpServer *http.Server
statusDispatcher dispatcher.Dispatcher
lockFactory db.LockFactory
instanceDao dao.InstanceDao
instanceID string
Expand Down Expand Up @@ -49,10 +47,6 @@ func NewHealthCheckServer() *HealthCheckServer {
return server
}

func (s *HealthCheckServer) SetStatusDispatcher(dispatcher dispatcher.Dispatcher) {
s.statusDispatcher = dispatcher
}

func (s *HealthCheckServer) Start(ctx context.Context) {
klog.Infof("Starting HealthCheck server")

Expand Down Expand Up @@ -154,33 +148,21 @@ func (s *HealthCheckServer) checkInstances(ctx context.Context) {
for _, instance := range instances {
// Instances pulsing within the last three check intervals are considered as active.
if instance.LastHeartbeat.After(time.Now().Add(time.Duration(int(-3*time.Second)*s.heartbeatInterval))) && !instance.Ready {
if s.brokerType == "mqtt" {
if err := s.statusDispatcher.OnInstanceUp(instance.ID); err != nil {
klog.Errorf("Error to call OnInstanceUp handler for maestro instance %s: %s", instance.ID, err.Error())
}
}
// mark the instance as active after it is added to the status dispatcher
activeInstanceIDs = append(activeInstanceIDs, instance.ID)
} else if instance.LastHeartbeat.Before(time.Now().Add(time.Duration(int(-3*time.Second)*s.heartbeatInterval))) && instance.Ready {
if s.brokerType == "mqtt" {
if err := s.statusDispatcher.OnInstanceDown(instance.ID); err != nil {
klog.Errorf("Error to call OnInstanceDown handler for maestro instance %s: %s", instance.ID, err.Error())
}
}
// mark the instance as inactive after it is removed from the status dispatcher
inactiveInstanceIDs = append(inactiveInstanceIDs, instance.ID)
}
}

if len(activeInstanceIDs) > 0 {
// batch mark active instances
// batch mark active instances, this will tigger status dispatcher to call onInstanceUp handler.
if err := s.instanceDao.MarkReadyByIDs(ctx, activeInstanceIDs); err != nil {
klog.Errorf("Unable to mark active maestro instances (%s): %s", activeInstanceIDs, err.Error())
}
}

if len(inactiveInstanceIDs) > 0 {
// batch mark inactive instances
// batch mark inactive instances, this will tigger status dispatcher to call onInstanceDown handler.
if err := s.instanceDao.MarkUnreadyByIDs(ctx, inactiveInstanceIDs); err != nil {
klog.Errorf("Unable to mark inactive maestro instances (%s): %s", inactiveInstanceIDs, err.Error())
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/dao/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package dao

import (
"context"
"fmt"
"strings"
"time"

"gorm.io/gorm/clause"
Expand Down Expand Up @@ -67,7 +69,10 @@ func (d *sqlInstanceDao) MarkReadyByIDs(ctx context.Context, ids []string) error
db.MarkForRollback(ctx, err)
return err
}
return nil

// call pg_notify to notify the server_instances channel
notify := fmt.Sprintf("select pg_notify('%s', '%s')", "server_instances", fmt.Sprintf("ready:%s", strings.Join(ids, ",")))
return g2.Exec(notify).Error
}

func (d *sqlInstanceDao) MarkUnreadyByIDs(ctx context.Context, ids []string) error {
Expand All @@ -76,7 +81,9 @@ func (d *sqlInstanceDao) MarkUnreadyByIDs(ctx context.Context, ids []string) err
db.MarkForRollback(ctx, err)
return err
}
return nil
// call pg_notify to notify the server_instances channel
notify := fmt.Sprintf("select pg_notify('%s', '%s')", "server_instances", fmt.Sprintf("unready:%s", strings.Join(ids, ",")))
return g2.Exec(notify).Error
}

func (d *sqlInstanceDao) Delete(ctx context.Context, id string) error {
Expand Down
4 changes: 0 additions & 4 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,4 @@ type Dispatcher interface {
Start(ctx context.Context)
// Dispatch determines if the current Maestro instance should process the resource status update based on the consumer ID.
Dispatch(consumerName string) bool
// OnInstanceUp is called when a new maestro instance is up.
OnInstanceUp(instanceID string) error
// OnInstanceDown is called when a maestro instance is inactive.
OnInstanceDown(instanceID string) error
}
67 changes: 49 additions & 18 deletions pkg/dispatcher/hash_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dispatcher
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -13,9 +14,11 @@ import (
"github.com/openshift-online/maestro/pkg/client/cloudevents"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"
"github.com/openshift-online/maestro/pkg/logger"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)

var _ Dispatcher = &HashDispatcher{}
Expand All @@ -24,23 +27,25 @@ var _ Dispatcher = &HashDispatcher{}
// Only the maestro instance that is mapped to a consumer will process the resource status update from that consumer.
// Need to trigger status resync for the consumer when an instance is up or down.
type HashDispatcher struct {
instanceID string
instanceDao dao.InstanceDao
consumerDao dao.ConsumerDao
sourceClient cloudevents.SourceClient
consumerSet mapset.Set[string]
workQueue workqueue.RateLimitingInterface
consistent *consistent.Consistent
instanceID string
sessionFactory db.SessionFactory
instanceDao dao.InstanceDao
consumerDao dao.ConsumerDao
sourceClient cloudevents.SourceClient
consumerSet mapset.Set[string]
workQueue workqueue.RateLimitingInterface
consistent *consistent.Consistent
}

func NewHashDispatcher(instanceID string, instanceDao dao.InstanceDao, consumerDao dao.ConsumerDao, sourceClient cloudevents.SourceClient, consistentHashingConfig *config.ConsistentHashConfig) *HashDispatcher {
func NewHashDispatcher(instanceID string, sessionFactory db.SessionFactory, sourceClient cloudevents.SourceClient, consistentHashingConfig *config.ConsistentHashConfig) *HashDispatcher {
return &HashDispatcher{
instanceID: instanceID,
instanceDao: instanceDao,
consumerDao: consumerDao,
sourceClient: sourceClient,
consumerSet: mapset.NewSet[string](),
workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "hash-dispatcher"),
instanceID: instanceID,
sessionFactory: sessionFactory,
instanceDao: dao.NewInstanceDao(&sessionFactory),
consumerDao: dao.NewConsumerDao(&sessionFactory),
sourceClient: sourceClient,
consumerSet: mapset.NewSet[string](),
workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "hash-dispatcher"),
consistent: consistent.New(nil, consistent.Config{
PartitionCount: consistentHashingConfig.PartitionCount,
ReplicationFactor: consistentHashingConfig.ReplicationFactor,
Expand All @@ -58,6 +63,10 @@ func (d *HashDispatcher) Start(ctx context.Context) {
// start a goroutine to periodically check the instances and consumers.
go wait.UntilWithContext(ctx, d.check, 5*time.Second)

// listen for server_instance update
klog.Infof("HashDispatcher listening for server_instances updates")
go d.sessionFactory.NewListener(ctx, "server_instances", d.onInstanceUpdate)

// start a goroutine to resync current consumers for this source when the client is reconnected
go d.resyncOnReconnect(ctx)

Expand All @@ -66,6 +75,28 @@ func (d *HashDispatcher) Start(ctx context.Context) {
d.workQueue.ShutDown()
}

func (d *HashDispatcher) onInstanceUpdate(ids string) {
states := strings.Split(ids, ":")
if len(states) != 2 {
klog.Infof("watched server instances updated with invalid ids: %s", ids)
return
}
idList := strings.Split(states[1], ",")
if states[0] == "ready" {
for _, id := range idList {
if err := d.onInstanceUp(id); err != nil {
klog.Errorf("failed to call OnInstancesUp for instance %s: %s", id, err)
}
}
} else {
for _, id := range idList {
if err := d.onInstanceDown(id); err != nil {
klog.Errorf("failed to call OnInstancesDown for instance %s: %s", id, err)
}
}
}
}

// resyncOnReconnect listens for the client reconnected signal and resyncs current consumers for this source.
func (d *HashDispatcher) resyncOnReconnect(ctx context.Context) {
log := logger.NewOCMLogger(ctx)
Expand All @@ -90,8 +121,8 @@ func (d *HashDispatcher) Dispatch(consumerName string) bool {
return d.consumerSet.Contains(consumerName)
}

// OnInstanceUp adds the new instance to the hashing ring and updates the consumer set for the current instance.
func (d *HashDispatcher) OnInstanceUp(instanceID string) error {
// onInstanceUp adds the new instance to the hashing ring and updates the consumer set for the current instance.
func (d *HashDispatcher) onInstanceUp(instanceID string) error {
members := d.consistent.GetMembers()
for _, member := range members {
if member.String() == instanceID {
Expand All @@ -110,8 +141,8 @@ func (d *HashDispatcher) OnInstanceUp(instanceID string) error {
return d.updateConsumerSet()
}

// OnInstanceDown removes the instance from the hashing ring and updates the consumer set for the current instance.
func (d *HashDispatcher) OnInstanceDown(instanceID string) error {
// onInstanceDown removes the instance from the hashing ring and updates the consumer set for the current instance.
func (d *HashDispatcher) onInstanceDown(instanceID string) error {
members := d.consistent.GetMembers()
deletedMember := true
for _, member := range members {
Expand Down
49 changes: 36 additions & 13 deletions pkg/dispatcher/noop_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package dispatcher
import (
"context"
"fmt"
"strings"

"github.com/openshift-online/maestro/pkg/client/cloudevents"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"
"github.com/openshift-online/maestro/pkg/logger"
"k8s.io/klog/v2"
)

var _ Dispatcher = &NoopDispatcher{}
Expand All @@ -15,22 +18,32 @@ var _ Dispatcher = &NoopDispatcher{}
// to the current maestro instance. This is the default implementation when shared subscription is enabled.
// Need to trigger status resync from all consumers when an instance is down.
type NoopDispatcher struct {
consumerDao dao.ConsumerDao
sourceClient cloudevents.SourceClient
sessionFactory db.SessionFactory
consumerDao dao.ConsumerDao
sourceClient cloudevents.SourceClient
}

// NewNoopDispatcher creates a new NoopDispatcher instance.
func NewNoopDispatcher(consumerDao dao.ConsumerDao, sourceClient cloudevents.SourceClient) *NoopDispatcher {
func NewNoopDispatcher(sessionFactory db.SessionFactory, sourceClient cloudevents.SourceClient) *NoopDispatcher {
return &NoopDispatcher{
consumerDao: consumerDao,
sourceClient: sourceClient,
sessionFactory: sessionFactory,
consumerDao: dao.NewConsumerDao(&sessionFactory),
sourceClient: sourceClient,
}
}

// Start is a no-op implementation.
func (d *NoopDispatcher) Start(ctx context.Context) {
// handle client reconnected signal and resync status from consumers for this source
d.resyncOnReconnect(ctx)
go d.resyncOnReconnect(ctx)

// listen for server_instance update
klog.Infof("NoopDispatcher listening for server_instances updates")
go d.sessionFactory.NewListener(ctx, "server_instances", d.onInstanceUpdate)

// wait until context is canceled
<-ctx.Done()

}

// resyncOnReconnect listens for client reconnected signal and resyncs all consumers for this source.
Expand Down Expand Up @@ -61,18 +74,28 @@ func (d *NoopDispatcher) resyncOnReconnect(ctx context.Context) {
}
}

func (d *NoopDispatcher) onInstanceUpdate(ids string) {
states := strings.Split(ids, ":")
if len(states) != 2 {
klog.Infof("watched server instances updated with invalid ids: %s", ids)
return
}
idList := strings.Split(states[1], ",")
if states[0] == "unready" && len(idList) > 0 {
// only call onInstanceDown once with empty instance id to reduce the number of status resync requests
if err := d.onInstanceDown(); err != nil {
klog.Errorf("failed to call OnInstancesDown: %s", err)
}
}
}

// Dispatch always returns true, indicating that the current maestro instance should process the resource status update.
func (d *NoopDispatcher) Dispatch(consumerID string) bool {
return true
}

// OnInstanceUp is a no-op implementation.
func (d *NoopDispatcher) OnInstanceUp(instanceID string) error {
return nil
}

// OnInstanceDown triggers status resync from all consumers.
func (d *NoopDispatcher) OnInstanceDown(instanceID string) error {
// onInstanceDown calls status resync when there is down instance watched.
func (d *NoopDispatcher) onInstanceDown() error {
// send resync request to each consumer
// TODO: optimize this to only resync resource status for necessary consumers
consumerIDs := []string{}
Expand Down
4 changes: 1 addition & 3 deletions test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,10 @@ func NewHelper(t *testing.T) *Helper {
if helper.Broker != "grpc" {
statusDispatcher := dispatcher.NewHashDispatcher(
helper.Env().Config.MessageBroker.ClientID,
dao.NewInstanceDao(&helper.Env().Database.SessionFactory),
dao.NewConsumerDao(&helper.Env().Database.SessionFactory),
helper.Env().Database.SessionFactory,
helper.Env().Clients.CloudEventsSource,
helper.Env().Config.EventServer.ConsistentHashConfig,
)
helper.HealthCheckServer.SetStatusDispatcher(statusDispatcher)
helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, statusDispatcher)
helper.EventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory))
} else {
Expand Down
Loading

0 comments on commit 934e8ce

Please sign in to comment.