Skip to content

Commit

Permalink
sections for dedicated broker and presence manager configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Sep 27, 2024
1 parent 05d2cda commit 0852bfc
Show file tree
Hide file tree
Showing 10 changed files with 312 additions and 150 deletions.
3 changes: 3 additions & 0 deletions internal/cli/defaultconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ func DefaultConfig(configFile string, baseFile string, dryRun bool) {

supportedExtensions := []string{"json", "toml", "yaml", "yml"}

// This is an unreleased feature so we remove RedisNats from generated config.
conf.Broker.RedisNats = nil

jsonBytes, err := json.MarshalIndent(conf, "", " ")
if err != nil {
fmt.Printf("error: %v\n", err)
Expand Down
41 changes: 23 additions & 18 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,22 @@ type Config struct {
// TLSExternal enables TLS only for external HTTP endpoints.
TLSExternal bool `mapstructure:"tls_external" json:"tls_external" envconfig:"tls_external" toml:"tls_external" yaml:"tls_external"`

// Engine to use: memory or redis. By default, memory engine is used. Memory engine is superfast,
// but it's not distributed and all data stored in memory (thus lost after node restart). Redis engine
// provides seamless horizontal scalability, fault-tolerance, and persistence over Centrifugo restarts.
// See also Broker option to run Centrifugo with Nats (only implements at most once PUB/SUB semantics).
Engine string `mapstructure:"engine" json:"engine" envconfig:"engine" default:"memory" toml:"engine" yaml:"engine"`
// Broker to use: the only option is nats.
Broker string `mapstructure:"broker" json:"broker" envconfig:"broker" toml:"broker" yaml:"broker"`

// Redis is a configuration for Redis engine.
Redis configtypes.RedisEngine `mapstructure:"redis" json:"redis" envconfig:"redis" toml:"redis" yaml:"redis"`
// Nats is a configuration for NATS broker.
Nats configtypes.NatsBroker `mapstructure:"nats" json:"nats" envconfig:"nats" toml:"nats" yaml:"nats"`
// Engine is a configuration for Centrifugo engine. It's a handy combination of Broker and PresenceManager.
// Currently only memory and redis engines are supported – both implement all the features. For more granular
// control use Broker and PresenceManager options.
Engine configtypes.Engine `mapstructure:"engine" json:"engine" envconfig:"engine" toml:"engine" yaml:"engine"`
// Broker allows to configure a message broker to use. Broker is responsible for PUB/SUB functionality
// and channel message history and idempotency cache .
// By default, memory Broker is used. Memory broker is superfast, but it's not distributed and all
// data stored in memory (thus lost after node restart). Redis Broker provides seamless horizontal
// scalability, fault-tolerance, and persistence over Centrifugo restarts. Centrifugo also supports
// Nats Broker which only implements at most once PUB/SUB semantics.
Broker configtypes.Broker `mapstructure:"broker" json:"broker" envconfig:"broker" toml:"broker" yaml:"broker"`
// PresenceManager allows to configure a presence manager to use. Presence manager is responsible for
// presence information storage and retrieval. By default, memory PresenceManager is used. Memory
// PresenceManager is superfast, but it's not distributed. Redis PresenceManager provides a seamless
// horizontal scalability.
PresenceManager configtypes.PresenceManager `mapstructure:"presence_manager" json:"presence_manager" envconfig:"presence_manager" toml:"presence_manager" yaml:"presence_manager"`

// Client contains real-time client connection related configuration.
Client configtypes.Client `mapstructure:"client" json:"client" envconfig:"client" toml:"client" yaml:"client"`
Expand Down Expand Up @@ -135,8 +139,8 @@ func DefineFlags(rootCmd *cobra.Command) {
rootCmd.Flags().StringP("port", "p", "8000", "port to bind HTTP server to")
rootCmd.Flags().StringP("internal_address", "", "", "custom interface address to listen on for internal endpoints")
rootCmd.Flags().StringP("internal_port", "", "", "custom port for internal endpoints")
rootCmd.Flags().StringP("engine", "e", "memory", "engine to use: memory or redis")
rootCmd.Flags().StringP("broker", "", "", "custom broker to use: ex. nats")
rootCmd.Flags().StringP("engine.type", "", "memory", "broker to use: ex. redis")
rootCmd.Flags().StringP("presence_manager.type", "", "memory", "presence manager to use: ex. redis")
rootCmd.Flags().StringP("log_level", "", "info", "set the log level: trace, debug, info, error, fatal or none")
rootCmd.Flags().StringP("log_file", "", "", "optional log file - if not specified logs go to STDOUT")
rootCmd.Flags().StringP("pid_file", "", "", "optional path to create PID file")
Expand Down Expand Up @@ -170,10 +174,11 @@ func GetConfig(cmd *cobra.Command, configFile string) (Config, Meta, error) {
if cmd != nil {
bindPFlags := []string{
"port", "address", "internal_port", "internal_address", "log_level", "log_file", "pid_file",
"engine", "broker", "debug.enabled", "admin.enabled", "admin.external", "admin.insecure",
"client.insecure", "http_api.insecure", "http_api.external", "prometheus.enabled", "health.enabled",
"grpc_api.enabled", "grpc_api.port", "uni_grpc.enabled", "uni_grpc.port", "uni_websocket.enabled",
"uni_sse.enabled", "uni_http_stream.enabled", "sse.enabled", "http_stream.enabled", "swagger.enabled",
"engine.type", "broker.enabled", "broker.type", "presence_manager.enabled", "presence_manager.type",
"debug.enabled", "admin.enabled", "admin.external", "admin.insecure", "client.insecure", "http_api.insecure",
"http_api.external", "prometheus.enabled", "health.enabled", "grpc_api.enabled", "grpc_api.port",
"uni_grpc.enabled", "uni_grpc.port", "uni_websocket.enabled", "uni_sse.enabled", "uni_http_stream.enabled",
"sse.enabled", "http_stream.enabled", "swagger.enabled",
}
for _, flag := range bindPFlags {
_ = v.BindPFlag(flag, cmd.Flags().Lookup(flag))
Expand Down
4 changes: 2 additions & 2 deletions internal/config/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (

// Validate validates config and returns error if problems found.
func (c Config) Validate() error {
if c.Broker != "" && c.Broker != "nats" {
return fmt.Errorf("unknown broker: %s", c.Broker)
if c.Broker.Type != "memory" && c.Broker.Type != "nats" && c.Broker.Type != "redis" && c.Broker.Type != "redisnats" {
return fmt.Errorf("unknown broker: %s", c.Broker.Type)
}

if err := validateTokens(c); err != nil {
Expand Down
70 changes: 70 additions & 0 deletions internal/configtypes/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package configtypes

type Engine struct {
// Type of broker to use. Can be "memory" or "redis" at this point.
Type string `mapstructure:"type" default:"memory" json:"type" envconfig:"type" yaml:"type" toml:"type"`
// Redis is a configuration for "redis" broker.
Redis RedisEngine `mapstructure:"redis" json:"redis" envconfig:"redis" toml:"redis" yaml:"redis"`
}

type RedisBrokerCommon struct {
UseLists bool `mapstructure:"use_lists" json:"use_lists" envconfig:"use_lists" yaml:"use_lists" toml:"use_lists"`
}

type RedisBroker struct {
Redis `mapstructure:",squash" yaml:",inline"`
RedisBrokerCommon `mapstructure:",squash" yaml:",inline"`
}

type EngineRedisBroker struct {
RedisBrokerCommon `mapstructure:",squash" yaml:",inline"`
}

type RedisPresenceManagerCommon struct {
PresenceTTL Duration `mapstructure:"presence_ttl" json:"presence_ttl" envconfig:"presence_ttl" default:"60s" yaml:"presence_ttl" toml:"presence_ttl"`
PresenceHashFieldTTL bool `mapstructure:"presence_hash_field_ttl" json:"presence_hash_field_ttl" envconfig:"presence_hash_field_ttl" yaml:"presence_hash_field_ttl" toml:"presence_hash_field_ttl"`
PresenceUserMapping bool `mapstructure:"presence_user_mapping" json:"presence_user_mapping" envconfig:"presence_user_mapping" yaml:"presence_user_mapping" toml:"presence_user_mapping"`
}

type EngineRedisPresenceManager struct {
RedisPresenceManagerCommon `mapstructure:",squash" yaml:",inline"`
}

type RedisPresenceManager struct {
Redis `mapstructure:",squash" yaml:",inline"`
RedisPresenceManagerCommon `mapstructure:",squash" yaml:",inline"`
}

// RedisNatsBroker configuration.
type RedisNatsBroker struct {
Redis RedisBroker `mapstructure:"redis" json:"redis" envconfig:"redis" toml:"redis" yaml:"redis"`
Nats NatsBroker `mapstructure:"nats" json:"nats" envconfig:"nats" toml:"nats" yaml:"nats"`
}

// RedisEngine configuration.
type RedisEngine struct {
Redis `mapstructure:",squash" yaml:",inline"`
EngineRedisBroker `mapstructure:",squash" yaml:",inline"`
EngineRedisPresenceManager `mapstructure:",squash" yaml:",inline"`
}

type Broker struct {
Enabled bool `mapstructure:"enabled" json:"enabled" envconfig:"enabled" yaml:"enabled" toml:"enabled"`
// Type of broker to use. Can be "memory", "redis", "nats" at this point.
Type string `mapstructure:"type" default:"memory" json:"type" envconfig:"type" yaml:"type" toml:"type"`
// Redis is a configuration for "redis" broker.
Redis RedisBroker `mapstructure:"redis" json:"redis" envconfig:"redis" toml:"redis" yaml:"redis"`
// Nats is a configuration for NATS broker. It does not support history/recovery/cache.
Nats NatsBroker `mapstructure:"nats" json:"nats" envconfig:"nats" toml:"nats" yaml:"nats"`
// RedisNats is a configuration for Redis + NATS broker. It's highly experimental, undocumented and
// can only be used when enable_unreleased_features option is set to true.
RedisNats *RedisNatsBroker `mapstructure:"redisnats" json:"redisnats,omitempty" envconfig:"redisnats" toml:"redisnats,omitempty" yaml:"redisnats,omitempty"`
}

type PresenceManager struct {
Enabled bool `mapstructure:"enabled" json:"enabled" envconfig:"enabled" yaml:"enabled" toml:"enabled"`
// Type of presence manager to use. Can be "memory" or "redis" at this point.
Type string `mapstructure:"type" default:"memory" json:"type" envconfig:"type" yaml:"type" toml:"type"`
// Redis is a configuration for "redis" broker.
Redis RedisPresenceManager `mapstructure:"redis" json:"redis" envconfig:"redis" toml:"redis" yaml:"redis"`
}
35 changes: 0 additions & 35 deletions internal/configtypes/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,3 @@ type Redis struct {
TLS TLSConfig `mapstructure:"tls" json:"tls" envconfig:"tls" yaml:"tls" toml:"tls"`
SentinelTLS TLSConfig `mapstructure:"sentinel_tls" json:"sentinel_tls" envconfig:"sentinel_tls" yaml:"sentinel_tls" toml:"sentinel_tls"`
}

type RedisBrokerCommon struct {
UseLists bool `mapstructure:"use_lists" json:"use_lists" envconfig:"use_lists" yaml:"use_lists" toml:"use_lists"`
}

type RedisBroker struct {
Redis `mapstructure:",squash" yaml:",inline"`
RedisBrokerCommon `mapstructure:",squash" yaml:",inline"`
}

type EngineRedisBroker struct {
RedisBrokerCommon `mapstructure:",squash" yaml:",inline"`
}

type RedisPresenceManagerCommon struct {
PresenceTTL Duration `mapstructure:"presence_ttl" json:"presence_ttl" envconfig:"presence_ttl" default:"60s" yaml:"presence_ttl" toml:"presence_ttl"`
PresenceHashFieldTTL bool `mapstructure:"presence_hash_field_ttl" json:"presence_hash_field_ttl" envconfig:"presence_hash_field_ttl" yaml:"presence_hash_field_ttl" toml:"presence_hash_field_ttl"`
PresenceUserMapping bool `mapstructure:"presence_user_mapping" json:"presence_user_mapping" envconfig:"presence_user_mapping" yaml:"presence_user_mapping" toml:"presence_user_mapping"`
}

type EngineRedisPresenceManager struct {
RedisPresenceManagerCommon `mapstructure:",squash" yaml:",inline"`
}

type RedisPresenceManager struct {
Redis `mapstructure:",squash" yaml:",inline"`
RedisPresenceManagerCommon `mapstructure:",squash" yaml:",inline"`
}

// RedisEngine configuration.
type RedisEngine struct {
Redis `mapstructure:",squash" yaml:",inline"`
EngineRedisBroker `mapstructure:",squash" yaml:",inline"`
EngineRedisPresenceManager `mapstructure:",squash" yaml:",inline"`
}
2 changes: 1 addition & 1 deletion internal/natsbroker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func New(n *centrifuge.Node, conf Config) (*NatsBroker, error) {
clientChannelPrefix: conf.Prefix + ".client.",
}
if conf.RawMode.Enabled {
log.Info().Str("rawModePrefix", conf.RawMode.Prefix).Msg("Nats raw mode enabled")
log.Info().Str("raw_mode_prefix", conf.RawMode.Prefix).Msg("raw mode of Nats enabled")
if len(conf.RawMode.ChannelReplacements) > 0 {
var replacerArgs []string
for k, v := range conf.RawMode.ChannelReplacements {
Expand Down
15 changes: 9 additions & 6 deletions internal/proxy/rpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,16 @@ type rpcHandleTestCase struct {

func (c rpcHandleTestCase) invokeHandle() (reply centrifuge.RPCReply, err error) {
rpcHandler := c.rpcProxyHandler.Handle(c.node)
cfgContainer, _ := config.NewContainer(config.Config{
RPC: configtypes.RPC{
WithoutNamespace: configtypes.RpcOptions{
RpcProxyName: "test",
},
cfg := config.DefaultConfig()
cfg.RPC = configtypes.RPC{
WithoutNamespace: configtypes.RpcOptions{
RpcProxyName: "test",
},
})
}
cfgContainer, err := config.NewContainer(cfg)
if err != nil {
return centrifuge.RPCReply{}, err
}
reply, err = rpcHandler(c.client, centrifuge.RPCEvent{}, cfgContainer, PerCallData{})

return reply, err
Expand Down
Loading

0 comments on commit 0852bfc

Please sign in to comment.