diff --git a/server/accounts.go b/server/accounts.go index 81c9c7d417d..e2bfbc91400 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -107,8 +107,6 @@ type Account struct { // Guarantee that only one goroutine can be running either checkJetStreamMigrate // or clearObserverState at a given time for this account to prevent interleaving. jscmMu sync.Mutex - // Should we send NRG traffic inside this account instead of the system account. - accountNRG atomic.Bool } const ( diff --git a/server/events.go b/server/events.go index 611021b4d50..6e16037246c 100644 --- a/server/events.go +++ b/server/events.go @@ -494,11 +494,14 @@ RESET: si.Version = VERSION si.Time = time.Now().UTC() si.Tags = tags + si.Flags = 0 if js { // New capability based flags. si.SetJetStreamEnabled() si.SetBinaryStreamSnapshot() - si.SetAccountNRG() + if s.accountNRGAllowed.Load() { + si.SetAccountNRG() + } } } var b []byte diff --git a/server/jetstream.go b/server/jetstream.go index 7627417d942..cbf99afb576 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -173,6 +173,9 @@ type jsAccount struct { updatesSub *subscription lupdate time.Time utimer *time.Timer + + // Should we send NRG traffic inside this account instead of the system account. + accountNRG atomic.Bool } // Track general usage for this account. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index de730920871..73585186387 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2471,52 +2471,103 @@ func TestJetStreamClusterAccountNRG(t *testing.T) { require_NoError(t, err) rg := stream.node.(*raft) - // System account should have interest, but the global account - // shouldn't. - for _, s := range c.servers { - require_True(t, s.sys.account.sl.hasInterest(rg.asubj, true)) - require_False(t, s.gacc.sl.hasInterest(rg.asubj, true)) - } + t.Run("Disabled", func(t *testing.T) { + // Switch off account NRG on all servers in the cluster. + for _, s := range c.servers { + s.accountNRGAllowed.Store(false) + s.sendStatszUpdate() + } + time.Sleep(time.Millisecond * 100) + for _, s := range c.servers { + s.GlobalAccount().js.accountNRG.Store(false) + s.updateNRGAccountStatus() + } - // First of all check that the Raft traffic is in the system - // account, as we haven't moved it elsewhere yet. - { - sub, err := snc.SubscribeSync(rg.asubj) - require_NoError(t, err) - require_NoError(t, sub.AutoUnsubscribe(1)) + // System account should have interest, but the global account + // shouldn't. + for _, s := range c.servers { + require_True(t, s.sys.account.sl.hasInterest(rg.asubj, true)) + require_False(t, s.gacc.sl.hasInterest(rg.asubj, true)) + } - msg, err := sub.NextMsg(time.Second * 3) - require_NoError(t, err) - require_True(t, msg != nil) - } + // First of all check that the Raft traffic is in the system + // account, as we haven't moved it elsewhere yet. + { + sub, err := snc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) - // Switch on account NRG on all servers in the cluster. Then - // we wait, as we will need statsz to be sent for all servers - // in the cluster. - for _, s := range c.servers { - s.GlobalAccount().accountNRG.Store(true) - s.updateNRGAccountStatus() - } + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + }) - // Now check that the traffic has moved into the asset acc. - // In this case the system account should no longer have - // subscriptions for those subjects. - { - sub, err := nc.SubscribeSync(rg.asubj) - require_NoError(t, err) - require_NoError(t, sub.AutoUnsubscribe(1)) + t.Run("Mixed", func(t *testing.T) { + // Switch on account NRG on a single server in the cluster and + // leave it off on the rest. + for i, s := range c.servers { + s.accountNRGAllowed.Store(i == 0) + s.sendStatszUpdate() + } + time.Sleep(time.Millisecond * 100) + for i, s := range c.servers { + s.GlobalAccount().js.accountNRG.Store(i == 0) + s.updateNRGAccountStatus() + } - msg, err := sub.NextMsg(time.Second * 3) - require_NoError(t, err) - require_True(t, msg != nil) - } + // System account should have interest, but the global account + // shouldn't. + for _, s := range c.servers { + require_True(t, s.sys.account.sl.hasInterest(rg.asubj, true)) + require_False(t, s.gacc.sl.hasInterest(rg.asubj, true)) + } - // The global account should now have interest and the - // system account shouldn't. - for _, s := range c.servers { - require_False(t, s.sys.account.sl.hasInterest(rg.asubj, true)) - require_True(t, s.gacc.sl.hasInterest(rg.asubj, true)) - } + // First of all check that the Raft traffic is in the system + // account, as we haven't moved it elsewhere yet. + { + sub, err := snc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) + + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + }) + + t.Run("Enabled", func(t *testing.T) { + // Switch on account NRG on all servers in the cluster. + for _, s := range c.servers { + s.accountNRGAllowed.Store(true) + s.sendStatszUpdate() + } + time.Sleep(time.Millisecond * 100) + for _, s := range c.servers { + s.GlobalAccount().js.accountNRG.Store(true) + s.updateNRGAccountStatus() + } + + // Now check that the traffic has moved into the asset acc. + // In this case the system account should no longer have + // subscriptions for those subjects. + { + sub, err := nc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) + + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + + // The global account should now have interest and the + // system account shouldn't. + for _, s := range c.servers { + require_False(t, s.sys.account.sl.hasInterest(rg.asubj, true)) + require_True(t, s.gacc.sl.hasInterest(rg.asubj, true)) + } + }) } func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) { diff --git a/server/opts.go b/server/opts.go index a20d2cc296f..fa078305ab6 100644 --- a/server/opts.go +++ b/server/opts.go @@ -2133,7 +2133,7 @@ func parseJetStreamForAccount(v any, acc *Account, errors *[]error) error { if !ok { return &configErr{tk, fmt.Sprintf("Expected a boolean for %q, got %v", mk, mv)} } - acc.accountNRG.Store(vv) + acc.js.accountNRG.Store(vv) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/raft.go b/server/raft.go index d0d02f4f56c..4f79fdcebf7 100644 --- a/server/raft.go +++ b/server/raft.go @@ -550,14 +550,18 @@ func (n *raft) RecreateInternalSubs() error { func (n *raft) recreateInternalSubsLocked() error { // Is account NRG enabled in this account? - var acc bool - if a, _ := n.s.lookupAccount(n.accName); a != nil { - acc = a.accountNRG.Load() + acc := n.s.accountNRGAllowed.Load() + if acc { + // Check whether the specific account has account NRG enabled. + if a, _ := n.s.lookupAccount(n.accName); a != nil && a.js != nil { + acc = a.js.accountNRG.Load() + } + } + if acc { + // Check whether the peers in this group all claim to support + // moving the NRG traffic into the account. + acc = n.checkAccountNRGStatus(acc) } - - // Check whether the peers in this group all claim to support - // moving the NRG traffic into the account. - acc = n.checkAccountNRGStatus(acc) if n.aesub != nil && n.inAcc == acc { // Subscriptions already exist and the account NRG state // hasn't changed. diff --git a/server/server.go b/server/server.go index 49499ea6723..6df836d694c 100644 --- a/server/server.go +++ b/server/server.go @@ -360,6 +360,9 @@ type Server struct { // Queue to process JS API requests that come from routes (or gateways) jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq] + + // Whether moving NRG traffic into accounts is permitted on this server. + accountNRGAllowed atomic.Bool } // For tracking JS nodes. @@ -724,6 +727,10 @@ func NewServer(opts *Options) (*Server, error) { syncOutSem: make(chan struct{}, maxConcurrentSyncRequests), } + // By default we'll allow account NRG. + // TODO: Maybe make it a configuration option? + s.accountNRGAllowed.Store(true) + // Fill up the maximum in flight syncRequests for this server. // Used in JetStream catchup semantics. for i := 0; i < maxConcurrentSyncRequests; i++ {