From 82bcceeb02ad29cd0229559652ca89d5175b16b8 Mon Sep 17 00:00:00 2001 From: morvencao Date: Mon, 9 Sep 2024 08:01:13 +0000 Subject: [PATCH] support configuration for consistent hash. Signed-off-by: morvencao --- cmd/maestro/server/pulse_server.go | 3 +- pkg/config/config.go | 2 +- pkg/config/pulse_server.go | 50 +++++++++++++++--- pkg/config/pulse_server_test.go | 83 ++++++++++++++++++++++++++++++ pkg/dispatcher/hash_dispatcher.go | 9 ++-- 5 files changed, 134 insertions(+), 13 deletions(-) create mode 100644 pkg/config/pulse_server_test.go diff --git a/cmd/maestro/server/pulse_server.go b/cmd/maestro/server/pulse_server.go index d606e2fa..24abde0b 100644 --- a/cmd/maestro/server/pulse_server.go +++ b/cmd/maestro/server/pulse_server.go @@ -70,7 +70,8 @@ func NewPulseServer(eventBroadcaster *event.EventBroadcaster) EventServer { case config.SharedSubscriptionType: statusDispatcher = dispatcher.NewNoopDispatcher(dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource) case config.BroadcastSubscriptionType: - statusDispatcher = dispatcher.NewHashDispatcher(env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&env().Database.SessionFactory), dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource) + statusDispatcher = dispatcher.NewHashDispatcher(env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&env().Database.SessionFactory), + dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource, env().Config.PulseServer.ConsistentHashConfig) default: glog.Fatalf("Unsupported subscription type: %s", env().Config.PulseServer.SubscriptionType) } diff --git a/pkg/config/config.go b/pkg/config/config.go index e705e1e6..fbcbd052 100755 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -30,7 +30,7 @@ func NewApplicationConfig() *ApplicationConfig { GRPCServer: NewGRPCServerConfig(), Metrics: NewMetricsConfig(), HealthCheck: NewHealthCheckConfig(), - PulseServer: MewPulseServerConfig(), + PulseServer: NewPulseServerConfig(), Database: NewDatabaseConfig(), MessageBroker: NewMessageBrokerConfig(), OCM: NewOCMConfig(), diff --git a/pkg/config/pulse_server.go b/pkg/config/pulse_server.go index e4d48e0f..3b29b80f 100644 --- a/pkg/config/pulse_server.go +++ b/pkg/config/pulse_server.go @@ -13,15 +13,36 @@ const ( // PulseServerConfig contains the configuration for the maestro pulse server. type PulseServerConfig struct { - PulseInterval int64 `json:"pulse_interval"` - SubscriptionType string `json:"subscription_type"` + PulseInterval int64 `json:"pulse_interval"` + SubscriptionType string `json:"subscription_type"` + ConsistentHashConfig *ConsistentHashConfig `json:"consistent_hash_config"` +} + +// ConsistentHashConfig contains the configuration for the consistent hashing algorithm. +type ConsistentHashConfig struct { + PartitionCount int `json:"partition_count"` + ReplicationFactor int `json:"replication_factor"` + Load float64 `json:"load"` } // NewPulseServerConfig creates a new PulseServerConfig with default 15 second pulse interval. -func MewPulseServerConfig() *PulseServerConfig { +func NewPulseServerConfig() *PulseServerConfig { return &PulseServerConfig{ - PulseInterval: 15, - SubscriptionType: "shared", + PulseInterval: 15, + SubscriptionType: "shared", + ConsistentHashConfig: NewConsistentHashConfig(), + } +} + +// NewConsistentHashConfig creates a new ConsistentHashConfig with default values. +// - PartitionCount: 7 +// - ReplicationFactor: 20 +// - Load: 1.25 +func NewConsistentHashConfig() *ConsistentHashConfig { + return &ConsistentHashConfig{ + PartitionCount: 7, + ReplicationFactor: 20, + Load: 1.25, } } @@ -31,11 +52,26 @@ func MewPulseServerConfig() *PulseServerConfig { // - "subscription-type" specifies the subscription type for resource status updates from message broker, either "shared" or "broadcast". // "shared" subscription type uses MQTT feature to ensure only one Maestro instance receives resource status messages. // "broadcast" subscription type will make all Maestro instances to receive resource status messages and hash the message to determine which instance should process it. +// If subscription type is "broadcast", ConsistentHashConfig settings can be configured for the hashing algorithm. func (c *PulseServerConfig) AddFlags(fs *pflag.FlagSet) { - fs.Int64Var(&c.PulseInterval, "pulse-interval", c.PulseInterval, "Sets the pulse interval for maestro instances (seconds) to indicate liveness (default: 10 seconds)") - fs.StringVar(&c.SubscriptionType, "subscription-type", c.SubscriptionType, "Sets the subscription type for resource status updates from message broker, Options: \"shared\" (only one instance receives resource status message, MQTT feature ensures exclusivity) or \"broadcast\" (all instances receive messages, hashed to determine processing instance) (default: \"shared\")") + fs.Int64Var(&c.PulseInterval, "pulse-interval", c.PulseInterval, "Sets the pulse interval for maestro instances (seconds) to indicate liveness") + fs.StringVar(&c.SubscriptionType, "subscription-type", c.SubscriptionType, "Sets the subscription type for resource status updates from message broker, Options: \"shared\" (only one instance receives resource status message, MQTT feature ensures exclusivity) or \"broadcast\" (all instances receive messages, hashed to determine processing instance)") + c.ConsistentHashConfig.AddFlags(fs) } func (c *PulseServerConfig) ReadFiles() error { + c.ConsistentHashConfig.ReadFiles() + return nil +} + +// AddFlags configures the ConsistentHashConfig with command line flags. Only take effect when subscription type is "broadcast". +// It allows users to customize the partition count, replication factor, and load for the consistent hashing algorithm. +func (c *ConsistentHashConfig) AddFlags(fs *pflag.FlagSet) { + fs.IntVar(&c.PartitionCount, "consistent-hash-partition-count", c.PartitionCount, "Sets the partition count for consistent hashing algorithm, select a big PartitionCount for more consumers. only take effect when subscription type is \"broadcast\"") + fs.IntVar(&c.ReplicationFactor, "consistent-hash-replication-factor", c.ReplicationFactor, "Sets the replication factor for maestro instances to be replicated on consistent hash ring. only take effect when subscription type is \"broadcast\"") + fs.Float64Var(&c.Load, "consistent-hash-load", c.Load, "Sets the load for consistent hashing algorithm, only take effect when subscription type is \"broadcast\"") +} + +func (c *ConsistentHashConfig) ReadFiles() error { return nil } diff --git a/pkg/config/pulse_server_test.go b/pkg/config/pulse_server_test.go new file mode 100644 index 00000000..803c5dc1 --- /dev/null +++ b/pkg/config/pulse_server_test.go @@ -0,0 +1,83 @@ +package config + +import ( + "reflect" + "testing" + + "github.com/spf13/pflag" +) + +func TestPulseServerConfig(t *testing.T) { + cases := []struct { + name string + input map[string]string + want *PulseServerConfig + }{ + { + name: "default subscription type", + input: map[string]string{}, + want: &PulseServerConfig{ + PulseInterval: 15, + SubscriptionType: "shared", + ConsistentHashConfig: &ConsistentHashConfig{ + PartitionCount: 7, + ReplicationFactor: 20, + Load: 1.25, + }, + }, + }, + { + name: "broadcast subscription type", + input: map[string]string{ + "subscription-type": "broadcast", + }, + want: &PulseServerConfig{ + PulseInterval: 15, + SubscriptionType: "broadcast", + ConsistentHashConfig: &ConsistentHashConfig{ + PartitionCount: 7, + ReplicationFactor: 20, + Load: 1.25, + }, + }, + }, + { + name: "custom consistent hash config", + input: map[string]string{ + "subscription-type": "broadcast", + "consistent-hash-partition-count": "10", + "consistent-hash-replication-factor": "30", + "consistent-hash-load": "1.5", + }, + want: &PulseServerConfig{ + PulseInterval: 15, + SubscriptionType: "broadcast", + ConsistentHashConfig: &ConsistentHashConfig{ + PartitionCount: 10, + ReplicationFactor: 30, + Load: 1.5, + }, + }, + }, + } + + config := NewPulseServerConfig() + pflag.NewFlagSet("test", pflag.ContinueOnError) + fs := pflag.CommandLine + config.AddFlags(fs) + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + // set flags + for key, value := range tc.input { + fs.Set(key, value) + } + if !reflect.DeepEqual(config, tc.want) { + t.Errorf("NewPulseServerConfig() = %v; want %v", config, tc.want) + } + // clear flags + fs.VisitAll(func(f *pflag.Flag) { + fs.Lookup(f.Name).Changed = false + }) + }) + } +} diff --git a/pkg/dispatcher/hash_dispatcher.go b/pkg/dispatcher/hash_dispatcher.go index d618dfcf..6a94c13b 100644 --- a/pkg/dispatcher/hash_dispatcher.go +++ b/pkg/dispatcher/hash_dispatcher.go @@ -11,6 +11,7 @@ import ( mapset "github.com/deckarep/golang-set/v2" "github.com/openshift-online/maestro/pkg/api" "github.com/openshift-online/maestro/pkg/client/cloudevents" + "github.com/openshift-online/maestro/pkg/config" "github.com/openshift-online/maestro/pkg/dao" "github.com/openshift-online/maestro/pkg/logger" "k8s.io/apimachinery/pkg/util/wait" @@ -32,7 +33,7 @@ type HashDispatcher struct { consistent *consistent.Consistent } -func NewHashDispatcher(instanceID string, instanceDao dao.InstanceDao, consumerDao dao.ConsumerDao, sourceClient cloudevents.SourceClient) *HashDispatcher { +func NewHashDispatcher(instanceID string, instanceDao dao.InstanceDao, consumerDao dao.ConsumerDao, sourceClient cloudevents.SourceClient, consistentHashingConfig *config.ConsistentHashConfig) *HashDispatcher { return &HashDispatcher{ instanceID: instanceID, instanceDao: instanceDao, @@ -41,9 +42,9 @@ func NewHashDispatcher(instanceID string, instanceDao dao.InstanceDao, consumerD consumerSet: mapset.NewSet[string](), workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "hash-dispatcher"), consistent: consistent.New(nil, consistent.Config{ - PartitionCount: 7, // consumer IDs are distributed among partitions, select a big PartitionCount for more consumers. - ReplicationFactor: 20, // the numbers for maestro instances to be replicated on consistent hash ring. - Load: 1.25, // Load is used to calculate average load, 1.25 is reasonable for most cases. + PartitionCount: consistentHashingConfig.PartitionCount, + ReplicationFactor: consistentHashingConfig.ReplicationFactor, + Load: consistentHashingConfig.Load, Hasher: hasher{}, }), }