Skip to content

Commit

Permalink
Modify logic to accept potential third account in future
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Aug 7, 2024
1 parent 2a2e60e commit 76d570f
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 53 deletions.
11 changes: 7 additions & 4 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3686,16 +3686,19 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
if a.js != nil {
// Check whether the account NRG status changed. If it has then we need to notify the
// Raft groups running on the system so that they can move their subs if needed.
wantAccountNRG := a.js.accountNRG.Load()
a.mu.Lock()
previous := a.js.nrgAccount
switch strings.ToLower(ac.NRGAccount) {
case "account":
wantAccountNRG = true
a.js.nrgAccount = a.Name
case "system":
wantAccountNRG = false
a.js.nrgAccount = ""
default:
s.Errorf("Account claim for %q has invalid value %q for account NRG status", a.Name, ac.NRGAccount)
}
if wasAccountNRG := a.js.accountNRG.Swap(wantAccountNRG); wasAccountNRG != wantAccountNRG {
changed := a.js.nrgAccount != previous
a.mu.Unlock()
if changed {
s.updateNRGAccountStatus()
}
}
Expand Down
4 changes: 2 additions & 2 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ type jsAccount struct {
lupdate time.Time
utimer *time.Timer

// Should we send NRG traffic inside this account instead of the system account.
accountNRG atomic.Bool
// Which account to send NRG traffic into. Empty string is system account.
nrgAccount string
}

// Track general usage for this account.
Expand Down
10 changes: 7 additions & 3 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2479,7 +2479,7 @@ func TestJetStreamClusterAccountNRG(t *testing.T) {
}
time.Sleep(time.Millisecond * 100)
for _, s := range c.servers {
s.GlobalAccount().js.accountNRG.Store(false)
s.GlobalAccount().js.nrgAccount = ""
s.updateNRGAccountStatus()
}

Expand Down Expand Up @@ -2512,7 +2512,11 @@ func TestJetStreamClusterAccountNRG(t *testing.T) {
}
time.Sleep(time.Millisecond * 100)
for i, s := range c.servers {
s.GlobalAccount().js.accountNRG.Store(i == 0)
if i == 0 {
s.GlobalAccount().js.nrgAccount = globalAccountName
} else {
s.GlobalAccount().js.nrgAccount = ""
}
s.updateNRGAccountStatus()
}

Expand Down Expand Up @@ -2544,7 +2548,7 @@ func TestJetStreamClusterAccountNRG(t *testing.T) {
}
time.Sleep(time.Millisecond * 100)
for _, s := range c.servers {
s.GlobalAccount().js.accountNRG.Store(true)
s.GlobalAccount().js.nrgAccount = globalAccountName
s.updateNRGAccountStatus()
}

Expand Down
17 changes: 13 additions & 4 deletions server/jetstream_jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1586,7 +1586,12 @@ func TestJetStreamJWTClusterAccountNRG(t *testing.T) {
// Check that everything looks like it should.
require_True(t, acc != nil)
require_True(t, acc.js != nil)
require_Equal(t, acc.js.accountNRG.Load(), state == "account")
switch state {
case "account":
require_Equal(t, acc.js.nrgAccount, aExpPub)
case "system":
require_Equal(t, acc.js.nrgAccount, _EMPTY_)
}

// Now get a list of all of the Raft nodes that should
// have been updated by now.
Expand All @@ -1605,10 +1610,14 @@ func TestJetStreamJWTClusterAccountNRG(t *testing.T) {
// in-account or not.
for _, rg := range raftNodes {
rg.Lock()
inAcc := rg.inAcc
rgAcc := rg.acc
rg.Unlock()

require_Equal(t, inAcc, state == "account")
switch state {
case "account":
require_Equal(t, rgAcc.Name, aExpPub)
case "system":
require_Equal(t, rgAcc.Name, syspub)
}
}
}
}
Expand Down
11 changes: 5 additions & 6 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2802,12 +2802,11 @@ type JSInfo struct {
Config JetStreamConfig `json:"config,omitempty"`
Limits *JSLimitOpts `json:"limits,omitempty"`
JetStreamStats
Streams int `json:"streams"`
Consumers int `json:"consumers"`
Messages uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
Meta *MetaClusterInfo `json:"meta_cluster,omitempty"`
AccountNRGActive bool `json:"account_nrg_active,omitempty"`
Streams int `json:"streams"`
Consumers int `json:"consumers"`
Messages uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
Meta *MetaClusterInfo `json:"meta_cluster,omitempty"`

// aggregate raft info
AccountDetails []*AccountDetail `json:"account_details,omitempty"`
Expand Down
4 changes: 2 additions & 2 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2135,9 +2135,9 @@ func parseJetStreamForAccount(v any, acc *Account, errors *[]error) error {
}
switch strings.ToLower(vv) {
case "system":
acc.js.accountNRG.Store(false)
acc.js.nrgAccount = ""
case "account":
acc.js.accountNRG.Store(true)
acc.js.nrgAccount = acc.Name
default:
return &configErr{tk, fmt.Sprintf("Expected either 'system' or 'account' string value for %q, got %v", mk, mv)}
}
Expand Down
53 changes: 22 additions & 31 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ type raft struct {
created time.Time // Time that the group was created
accName string // Account name of the asset this raft group is for
acc *Account // Account that NRG traffic will be sent/received in
inAcc bool // Is the NRG traffic in-account right now?
group string // Raft group
sd string // Store directory
id string // Node ID
Expand Down Expand Up @@ -530,10 +529,11 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
// Returns whether peers within this group claim to support
// moving NRG traffic into the asset account.
// Lock must be held.
func (n *raft) checkAccountNRGStatus(enabled bool) bool {
if !enabled {
func (n *raft) checkAccountNRGStatus() bool {
if !n.s.accountNRGAllowed.Load() {
return false
}
enabled := true
for pn := range n.peers {
if si, ok := n.s.nodeToInfo.Load(pn); ok && si != nil {
enabled = enabled && si.(nodeInfo).accountNRG
Expand All @@ -549,26 +549,31 @@ func (n *raft) RecreateInternalSubs() error {
}

func (n *raft) recreateInternalSubsLocked() error {
// Is account NRG enabled in this account?
acc := n.s.accountNRGAllowed.Load()
if acc {
// Check whether the specific account has account NRG enabled.
// Default is the system account.
nrgAcc := n.s.sys.account

// Is account NRG enabled in this account and do all group
// peers claim to also support account NRG?
if n.checkAccountNRGStatus() {
// Check whether the account that the asset belongs to
// has volunteered a different NRG account.
target := nrgAcc.Name
if a, _ := n.s.lookupAccount(n.accName); a != nil {
a.mu.RLock()
ajs := a.js
if a.js != nil {
target = a.js.nrgAccount
}
a.mu.RUnlock()
// Check whether the specific account has JetStream enabled.
if ajs != nil {
acc = ajs.accountNRG.Load()
}

// If the target account exists, then we'll use that.
if target != _EMPTY_ {
if a, _ := n.s.lookupAccount(target); a != nil {
nrgAcc = a
}
}
}
if acc {
// Check whether the peers in this group all claim to support
// moving the NRG traffic into the account.
acc = n.checkAccountNRGStatus(acc)
}
if n.aesub != nil && n.inAcc == acc {
if n.aesub != nil && n.acc == nrgAcc {
// Subscriptions already exist and the account NRG state
// hasn't changed.
return nil
Expand All @@ -594,19 +599,6 @@ func (n *raft) recreateInternalSubsLocked() error {
c.closeConnection(InternalClient)
}

// Look up which account we think we should be participating
// on. This will either be the system account (default) or it
// will be the account that the asset is resident in.
var nrgAcc *Account
if n.s.sys != nil {
nrgAcc = n.s.sys.account
}
if acc { // Should we setup in the asset account?
var err error
if nrgAcc, err = n.s.lookupAccount(n.accName); err != nil {
return err
}
}
if n.acc != nrgAcc {
n.debug("Subscribing in '%s'", nrgAcc.GetName())
}
Expand All @@ -619,7 +611,6 @@ func (n *raft) recreateInternalSubsLocked() error {
n.c = c
n.sq = nrgAcc.sq
n.acc = nrgAcc
n.inAcc = acc

// Recreate any internal subscriptions for voting, append
// entries etc in the new account.
Expand Down
1 change: 0 additions & 1 deletion server/sendq.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func (sq *sendq) internalLoop() {

defer s.grWG.Done()

//c := s.createInternalAccountClient()
c := s.createInternalSystemClient()
c.registerWithAccount(sq.a)
c.noIcb = true
Expand Down

0 comments on commit 76d570f

Please sign in to comment.