Skip to content

Commit

Permalink
Control whether account NRG is enabled on a per-account basis
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Aug 7, 2024
1 parent 946003c commit 302ed18
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 22 deletions.
2 changes: 2 additions & 0 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type Account struct {
// Guarantee that only one goroutine can be running either checkJetStreamMigrate
// or clearObserverState at a given time for this account to prevent interleaving.
jscmMu sync.Mutex
// Should we send NRG traffic inside this account instead of the system account.
accountNRG atomic.Bool
}

const (
Expand Down
1 change: 0 additions & 1 deletion server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,6 @@ func (s *Server) sendStatsz(subj string) {
// JetStream
if js := s.js.Load(); js != nil {
jStat := &JetStreamVarz{}
jStat.AccountNRGActive = s.accountNRG.Load()
s.mu.RUnlock()
js.mu.RLock()
c := js.config
Expand Down
3 changes: 0 additions & 3 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type JetStreamConfig struct {
Domain string `json:"domain,omitempty"`
CompressOK bool `json:"compress_ok,omitempty"`
UniqueTag string `json:"unique_tag,omitempty"`
AccountNRG bool `json:"account_nrg_enabled,omitempty"`
}

// Statistics about JetStream for this server.
Expand Down Expand Up @@ -545,7 +544,6 @@ func (s *Server) restartJetStream() error {
MaxMemory: opts.JetStreamMaxMemory,
MaxStore: opts.JetStreamMaxStore,
Domain: opts.JetStreamDomain,
AccountNRG: opts.JetStreamAccountNRG,
}
s.Noticef("Restarting JetStream")
err := s.EnableJetStream(&cfg)
Expand Down Expand Up @@ -2515,7 +2513,6 @@ func (s *Server) dynJetStreamConfig(storeDir string, maxStore, maxMem int64) *Je
}

opts := s.getOpts()
jsc.AccountNRG = opts.JetStreamAccountNRG

// Sync options.
jsc.SyncInterval = opts.SyncInterval
Expand Down
11 changes: 4 additions & 7 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1226,11 +1226,10 @@ type Varz struct {

// JetStreamVarz contains basic runtime information about jetstream
type JetStreamVarz struct {
Config *JetStreamConfig `json:"config,omitempty"`
Stats *JetStreamStats `json:"stats,omitempty"`
Meta *MetaClusterInfo `json:"meta,omitempty"`
Limits *JSLimitOpts `json:"limits,omitempty"`
AccountNRGActive bool `json:"account_nrg_active,omitempty"`
Config *JetStreamConfig `json:"config,omitempty"`
Stats *JetStreamStats `json:"stats,omitempty"`
Meta *MetaClusterInfo `json:"meta,omitempty"`
Limits *JSLimitOpts `json:"limits,omitempty"`
}

// ClusterOptsVarz contains monitoring cluster information
Expand Down Expand Up @@ -1447,7 +1446,6 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
}

func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) {
v.AccountNRGActive = s.accountNRG.Load()
if doConfig {
js.mu.RLock()
// We want to snapshot the config since it will then be available outside
Expand Down Expand Up @@ -1827,7 +1825,6 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
// Now update server's varz
s.mu.RLock()
sv := &s.varz.JetStream
sv.AccountNRGActive = s.accountNRG.Load()
if created {
sv.Config = v.Config
}
Expand Down
9 changes: 6 additions & 3 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ type Options struct {
JetStreamLimits JSLimitOpts
JetStreamTpm JSTpmOpts
JetStreamMaxCatchup int64
JetStreamAccountNRG bool
StoreDir string `json:"-"`
SyncInterval time.Duration `json:"-"`
SyncAlways bool `json:"-"`
Expand Down Expand Up @@ -2129,6 +2128,12 @@ func parseJetStreamForAccount(v any, acc *Account, errors *[]error) error {
return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)}
}
jsLimits.MaxAckPending = int(vv)
case "account_nrg":
vv, ok := mv.(bool)
if !ok {
return &configErr{tk, fmt.Sprintf("Expected a boolean for %q, got %v", mk, mv)}
}
acc.accountNRG.Store(vv)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down Expand Up @@ -2363,8 +2368,6 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er
return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)}
}
opts.JetStreamMaxCatchup = s
case "account_nrg":
opts.JetStreamAccountNRG = mv.(bool)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down
9 changes: 5 additions & 4 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,10 +549,11 @@ func (n *raft) RecreateInternalSubs() error {
}

func (n *raft) recreateInternalSubsLocked() error {
// Is account NRG enabled at the server level?
n.s.optsMu.RLock()
acc := n.s.opts.JetStreamAccountNRG
n.s.optsMu.RUnlock()
// Is account NRG enabled in this account?
var acc bool
if a, _ := n.s.lookupAccount(n.accName); a != nil {
acc = a.accountNRG.Load()
}

// Check whether the peers in this group all claim to support
// moving the NRG traffic into the account.
Expand Down
4 changes: 0 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,6 @@ type Server struct {

// Queue to process JS API requests that come from routes (or gateways)
jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq]

// Whether account NRG is supported cluster-wide or not.
accountNRG atomic.Bool
}

// For tracking JS nodes.
Expand Down Expand Up @@ -2334,7 +2331,6 @@ func (s *Server) Start() {
Domain: opts.JetStreamDomain,
CompressOK: true,
UniqueTag: opts.JetStreamUniqueTag,
AccountNRG: opts.JetStreamAccountNRG,
}
if err := s.EnableJetStream(cfg); err != nil {
s.Fatalf("Can't start JetStream: %v", err)
Expand Down

0 comments on commit 302ed18

Please sign in to comment.