diff --git a/server/accounts.go b/server/accounts.go index ae4b1b39430..e55eeeea1df 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -3686,16 +3686,19 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim if a.js != nil { // Check whether the account NRG status changed. If it has then we need to notify the // Raft groups running on the system so that they can move their subs if needed. - wantAccountNRG := a.js.accountNRG.Load() + a.mu.Lock() + previous := a.js.nrgAccount switch strings.ToLower(ac.NRGAccount) { case "account": - wantAccountNRG = true + a.js.nrgAccount = a.Name case "system": - wantAccountNRG = false + a.js.nrgAccount = "" default: s.Errorf("Account claim for %q has invalid value %q for account NRG status", a.Name, ac.NRGAccount) } - if wasAccountNRG := a.js.accountNRG.Swap(wantAccountNRG); wasAccountNRG != wantAccountNRG { + changed := a.js.nrgAccount != previous + a.mu.Unlock() + if changed { s.updateNRGAccountStatus() } } diff --git a/server/jetstream.go b/server/jetstream.go index cbf99afb576..43e707a279e 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -174,8 +174,8 @@ type jsAccount struct { lupdate time.Time utimer *time.Timer - // Should we send NRG traffic inside this account instead of the system account. - accountNRG atomic.Bool + // Which account to send NRG traffic into. Empty string is system account. + nrgAccount string } // Track general usage for this account. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 73585186387..6e549915d22 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2479,7 +2479,7 @@ func TestJetStreamClusterAccountNRG(t *testing.T) { } time.Sleep(time.Millisecond * 100) for _, s := range c.servers { - s.GlobalAccount().js.accountNRG.Store(false) + s.GlobalAccount().js.nrgAccount = "" s.updateNRGAccountStatus() } @@ -2512,7 +2512,11 @@ func TestJetStreamClusterAccountNRG(t *testing.T) { } time.Sleep(time.Millisecond * 100) for i, s := range c.servers { - s.GlobalAccount().js.accountNRG.Store(i == 0) + if i == 0 { + s.GlobalAccount().js.nrgAccount = globalAccountName + } else { + s.GlobalAccount().js.nrgAccount = "" + } s.updateNRGAccountStatus() } @@ -2544,7 +2548,7 @@ func TestJetStreamClusterAccountNRG(t *testing.T) { } time.Sleep(time.Millisecond * 100) for _, s := range c.servers { - s.GlobalAccount().js.accountNRG.Store(true) + s.GlobalAccount().js.nrgAccount = globalAccountName s.updateNRGAccountStatus() } diff --git a/server/jetstream_jwt_test.go b/server/jetstream_jwt_test.go index dc8ed2a620f..31b033ea523 100644 --- a/server/jetstream_jwt_test.go +++ b/server/jetstream_jwt_test.go @@ -1586,7 +1586,12 @@ func TestJetStreamJWTClusterAccountNRG(t *testing.T) { // Check that everything looks like it should. require_True(t, acc != nil) require_True(t, acc.js != nil) - require_Equal(t, acc.js.accountNRG.Load(), state == "account") + switch state { + case "account": + require_Equal(t, acc.js.nrgAccount, aExpPub) + case "system": + require_Equal(t, acc.js.nrgAccount, _EMPTY_) + } // Now get a list of all of the Raft nodes that should // have been updated by now. @@ -1605,10 +1610,14 @@ func TestJetStreamJWTClusterAccountNRG(t *testing.T) { // in-account or not. for _, rg := range raftNodes { rg.Lock() - inAcc := rg.inAcc + rgAcc := rg.acc rg.Unlock() - - require_Equal(t, inAcc, state == "account") + switch state { + case "account": + require_Equal(t, rgAcc.Name, aExpPub) + case "system": + require_Equal(t, rgAcc.Name, syspub) + } } } } diff --git a/server/monitor.go b/server/monitor.go index f7b156b2e2b..285b133a2bf 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2802,12 +2802,11 @@ type JSInfo struct { Config JetStreamConfig `json:"config,omitempty"` Limits *JSLimitOpts `json:"limits,omitempty"` JetStreamStats - Streams int `json:"streams"` - Consumers int `json:"consumers"` - Messages uint64 `json:"messages"` - Bytes uint64 `json:"bytes"` - Meta *MetaClusterInfo `json:"meta_cluster,omitempty"` - AccountNRGActive bool `json:"account_nrg_active,omitempty"` + Streams int `json:"streams"` + Consumers int `json:"consumers"` + Messages uint64 `json:"messages"` + Bytes uint64 `json:"bytes"` + Meta *MetaClusterInfo `json:"meta_cluster,omitempty"` // aggregate raft info AccountDetails []*AccountDetail `json:"account_details,omitempty"` diff --git a/server/opts.go b/server/opts.go index 1f2e1c0258c..c40e1b3c9ab 100644 --- a/server/opts.go +++ b/server/opts.go @@ -2135,9 +2135,9 @@ func parseJetStreamForAccount(v any, acc *Account, errors *[]error) error { } switch strings.ToLower(vv) { case "system": - acc.js.accountNRG.Store(false) + acc.js.nrgAccount = "" case "account": - acc.js.accountNRG.Store(true) + acc.js.nrgAccount = acc.Name default: return &configErr{tk, fmt.Sprintf("Expected either 'system' or 'account' string value for %q, got %v", mk, mv)} } diff --git a/server/raft.go b/server/raft.go index 26e7f97f25f..c71434fcc24 100644 --- a/server/raft.go +++ b/server/raft.go @@ -131,7 +131,6 @@ type raft struct { created time.Time // Time that the group was created accName string // Account name of the asset this raft group is for acc *Account // Account that NRG traffic will be sent/received in - inAcc bool // Is the NRG traffic in-account right now? group string // Raft group sd string // Store directory id string // Node ID @@ -530,10 +529,11 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe // Returns whether peers within this group claim to support // moving NRG traffic into the asset account. // Lock must be held. -func (n *raft) checkAccountNRGStatus(enabled bool) bool { - if !enabled { +func (n *raft) checkAccountNRGStatus() bool { + if !n.s.accountNRGAllowed.Load() { return false } + enabled := true for pn := range n.peers { if si, ok := n.s.nodeToInfo.Load(pn); ok && si != nil { enabled = enabled && si.(nodeInfo).accountNRG @@ -549,26 +549,31 @@ func (n *raft) RecreateInternalSubs() error { } func (n *raft) recreateInternalSubsLocked() error { - // Is account NRG enabled in this account? - acc := n.s.accountNRGAllowed.Load() - if acc { - // Check whether the specific account has account NRG enabled. + // Default is the system account. + nrgAcc := n.s.sys.account + + // Is account NRG enabled in this account and do all group + // peers claim to also support account NRG? + if n.checkAccountNRGStatus() { + // Check whether the account that the asset belongs to + // has volunteered a different NRG account. + target := nrgAcc.Name if a, _ := n.s.lookupAccount(n.accName); a != nil { a.mu.RLock() - ajs := a.js + if a.js != nil { + target = a.js.nrgAccount + } a.mu.RUnlock() - // Check whether the specific account has JetStream enabled. - if ajs != nil { - acc = ajs.accountNRG.Load() + } + + // If the target account exists, then we'll use that. + if target != _EMPTY_ { + if a, _ := n.s.lookupAccount(target); a != nil { + nrgAcc = a } } } - if acc { - // Check whether the peers in this group all claim to support - // moving the NRG traffic into the account. - acc = n.checkAccountNRGStatus(acc) - } - if n.aesub != nil && n.inAcc == acc { + if n.aesub != nil && n.acc == nrgAcc { // Subscriptions already exist and the account NRG state // hasn't changed. return nil @@ -594,19 +599,6 @@ func (n *raft) recreateInternalSubsLocked() error { c.closeConnection(InternalClient) } - // Look up which account we think we should be participating - // on. This will either be the system account (default) or it - // will be the account that the asset is resident in. - var nrgAcc *Account - if n.s.sys != nil { - nrgAcc = n.s.sys.account - } - if acc { // Should we setup in the asset account? - var err error - if nrgAcc, err = n.s.lookupAccount(n.accName); err != nil { - return err - } - } if n.acc != nrgAcc { n.debug("Subscribing in '%s'", nrgAcc.GetName()) } @@ -619,7 +611,6 @@ func (n *raft) recreateInternalSubsLocked() error { n.c = c n.sq = nrgAcc.sq n.acc = nrgAcc - n.inAcc = acc // Recreate any internal subscriptions for voting, append // entries etc in the new account. diff --git a/server/sendq.go b/server/sendq.go index 2e7b5d03452..8f486b36621 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -45,7 +45,6 @@ func (sq *sendq) internalLoop() { defer s.grWG.Done() - //c := s.createInternalAccountClient() c := s.createInternalSystemClient() c.registerWithAccount(sq.a) c.noIcb = true