Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support configuration for consistent hash. #192

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/maestro/server/pulse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func NewPulseServer(eventBroadcaster *event.EventBroadcaster) EventServer {
case config.SharedSubscriptionType:
statusDispatcher = dispatcher.NewNoopDispatcher(dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource)
case config.BroadcastSubscriptionType:
statusDispatcher = dispatcher.NewHashDispatcher(env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&env().Database.SessionFactory), dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource)
statusDispatcher = dispatcher.NewHashDispatcher(env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&env().Database.SessionFactory),
dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource, env().Config.PulseServer.ConsistentHashConfig)
default:
glog.Fatalf("Unsupported subscription type: %s", env().Config.PulseServer.SubscriptionType)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewApplicationConfig() *ApplicationConfig {
GRPCServer: NewGRPCServerConfig(),
Metrics: NewMetricsConfig(),
HealthCheck: NewHealthCheckConfig(),
PulseServer: MewPulseServerConfig(),
PulseServer: NewPulseServerConfig(),
Database: NewDatabaseConfig(),
MessageBroker: NewMessageBrokerConfig(),
OCM: NewOCMConfig(),
Expand Down
50 changes: 43 additions & 7 deletions pkg/config/pulse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,36 @@ const (

// PulseServerConfig contains the configuration for the maestro pulse server.
type PulseServerConfig struct {
PulseInterval int64 `json:"pulse_interval"`
SubscriptionType string `json:"subscription_type"`
PulseInterval int64 `json:"pulse_interval"`
SubscriptionType string `json:"subscription_type"`
ConsistentHashConfig *ConsistentHashConfig `json:"consistent_hash_config"`
}

// ConsistentHashConfig contains the configuration for the consistent hashing algorithm.
type ConsistentHashConfig struct {
PartitionCount int `json:"partition_count"`
ReplicationFactor int `json:"replication_factor"`
Load float64 `json:"load"`
}

// NewPulseServerConfig creates a new PulseServerConfig with default 15 second pulse interval.
func MewPulseServerConfig() *PulseServerConfig {
func NewPulseServerConfig() *PulseServerConfig {
return &PulseServerConfig{
PulseInterval: 15,
SubscriptionType: "shared",
PulseInterval: 15,
SubscriptionType: "shared",
ConsistentHashConfig: NewConsistentHashConfig(),
}
}

// NewConsistentHashConfig creates a new ConsistentHashConfig with default values.
// - PartitionCount: 7
// - ReplicationFactor: 20
// - Load: 1.25
func NewConsistentHashConfig() *ConsistentHashConfig {
return &ConsistentHashConfig{
PartitionCount: 7,
ReplicationFactor: 20,
Load: 1.25,
}
}

Expand All @@ -31,11 +52,26 @@ func MewPulseServerConfig() *PulseServerConfig {
// - "subscription-type" specifies the subscription type for resource status updates from message broker, either "shared" or "broadcast".
// "shared" subscription type uses MQTT feature to ensure only one Maestro instance receives resource status messages.
// "broadcast" subscription type will make all Maestro instances to receive resource status messages and hash the message to determine which instance should process it.
// If subscription type is "broadcast", ConsistentHashConfig settings can be configured for the hashing algorithm.
func (c *PulseServerConfig) AddFlags(fs *pflag.FlagSet) {
fs.Int64Var(&c.PulseInterval, "pulse-interval", c.PulseInterval, "Sets the pulse interval for maestro instances (seconds) to indicate liveness (default: 10 seconds)")
fs.StringVar(&c.SubscriptionType, "subscription-type", c.SubscriptionType, "Sets the subscription type for resource status updates from message broker, Options: \"shared\" (only one instance receives resource status message, MQTT feature ensures exclusivity) or \"broadcast\" (all instances receive messages, hashed to determine processing instance) (default: \"shared\")")
fs.Int64Var(&c.PulseInterval, "pulse-interval", c.PulseInterval, "Sets the pulse interval for maestro instances (seconds) to indicate liveness")
fs.StringVar(&c.SubscriptionType, "subscription-type", c.SubscriptionType, "Sets the subscription type for resource status updates from message broker, Options: \"shared\" (only one instance receives resource status message, MQTT feature ensures exclusivity) or \"broadcast\" (all instances receive messages, hashed to determine processing instance)")
c.ConsistentHashConfig.AddFlags(fs)
}

func (c *PulseServerConfig) ReadFiles() error {
c.ConsistentHashConfig.ReadFiles()
return nil
}

// AddFlags configures the ConsistentHashConfig with command line flags. Only take effect when subscription type is "broadcast".
// It allows users to customize the partition count, replication factor, and load for the consistent hashing algorithm.
func (c *ConsistentHashConfig) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&c.PartitionCount, "consistent-hash-partition-count", c.PartitionCount, "Sets the partition count for consistent hashing algorithm, select a big PartitionCount for more consumers. only take effect when subscription type is \"broadcast\"")
fs.IntVar(&c.ReplicationFactor, "consistent-hash-replication-factor", c.ReplicationFactor, "Sets the replication factor for maestro instances to be replicated on consistent hash ring. only take effect when subscription type is \"broadcast\"")
fs.Float64Var(&c.Load, "consistent-hash-load", c.Load, "Sets the load for consistent hashing algorithm, only take effect when subscription type is \"broadcast\"")
}

func (c *ConsistentHashConfig) ReadFiles() error {
return nil
}
83 changes: 83 additions & 0 deletions pkg/config/pulse_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package config

import (
"reflect"
"testing"

"github.com/spf13/pflag"
)

func TestPulseServerConfig(t *testing.T) {
cases := []struct {
name string
input map[string]string
want *PulseServerConfig
}{
{
name: "default subscription type",
input: map[string]string{},
want: &PulseServerConfig{
PulseInterval: 15,
SubscriptionType: "shared",
ConsistentHashConfig: &ConsistentHashConfig{
PartitionCount: 7,
ReplicationFactor: 20,
Load: 1.25,
},
},
},
{
name: "broadcast subscription type",
input: map[string]string{
"subscription-type": "broadcast",
},
want: &PulseServerConfig{
PulseInterval: 15,
SubscriptionType: "broadcast",
ConsistentHashConfig: &ConsistentHashConfig{
PartitionCount: 7,
ReplicationFactor: 20,
Load: 1.25,
},
},
},
{
name: "custom consistent hash config",
input: map[string]string{
"subscription-type": "broadcast",
"consistent-hash-partition-count": "10",
"consistent-hash-replication-factor": "30",
"consistent-hash-load": "1.5",
},
want: &PulseServerConfig{
PulseInterval: 15,
SubscriptionType: "broadcast",
ConsistentHashConfig: &ConsistentHashConfig{
PartitionCount: 10,
ReplicationFactor: 30,
Load: 1.5,
},
},
},
}

config := NewPulseServerConfig()
pflag.NewFlagSet("test", pflag.ContinueOnError)
fs := pflag.CommandLine
config.AddFlags(fs)
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// set flags
for key, value := range tc.input {
fs.Set(key, value)
}
if !reflect.DeepEqual(config, tc.want) {
t.Errorf("NewPulseServerConfig() = %v; want %v", config, tc.want)
}
// clear flags
fs.VisitAll(func(f *pflag.Flag) {
fs.Lookup(f.Name).Changed = false
})
})
}
}
9 changes: 5 additions & 4 deletions pkg/dispatcher/hash_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
mapset "github.com/deckarep/golang-set/v2"
"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/client/cloudevents"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/logger"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -32,7 +33,7 @@ type HashDispatcher struct {
consistent *consistent.Consistent
}

func NewHashDispatcher(instanceID string, instanceDao dao.InstanceDao, consumerDao dao.ConsumerDao, sourceClient cloudevents.SourceClient) *HashDispatcher {
func NewHashDispatcher(instanceID string, instanceDao dao.InstanceDao, consumerDao dao.ConsumerDao, sourceClient cloudevents.SourceClient, consistentHashingConfig *config.ConsistentHashConfig) *HashDispatcher {
return &HashDispatcher{
instanceID: instanceID,
instanceDao: instanceDao,
Expand All @@ -41,9 +42,9 @@ func NewHashDispatcher(instanceID string, instanceDao dao.InstanceDao, consumerD
consumerSet: mapset.NewSet[string](),
workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "hash-dispatcher"),
consistent: consistent.New(nil, consistent.Config{
PartitionCount: 7, // consumer IDs are distributed among partitions, select a big PartitionCount for more consumers.
ReplicationFactor: 20, // the numbers for maestro instances to be replicated on consistent hash ring.
Load: 1.25, // Load is used to calculate average load, 1.25 is reasonable for most cases.
PartitionCount: consistentHashingConfig.PartitionCount,
ReplicationFactor: consistentHashingConfig.ReplicationFactor,
Load: consistentHashingConfig.Load,
Hasher: hasher{},
}),
}
Expand Down
Loading