From 207057415df0fe8a251a64abb32404534a77fa5c Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 21 Feb 2024 16:15:15 +0000 Subject: [PATCH] Move NRG traffic into asset account This adds a new account NRG capability into statsz so that we can detect when all servers in the cluster support moving traffic into the asset account, instead of all being in the system account. Signed-off-by: Neil Twigg --- go.mod | 2 +- go.sum | 8 +- server/accounts.go | 21 +++++ server/events.go | 47 +++++++++- server/jetstream.go | 3 + server/jetstream_cluster_4_test.go | 144 +++++++++++++++++++++++++++++ server/jetstream_jwt_test.go | 107 +++++++++++++++++++++ server/opts.go | 13 +++ server/raft.go | 132 +++++++++++++++++++++++--- server/route.go | 2 +- server/sendq.go | 7 +- server/server.go | 13 ++- 12 files changed, 470 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index b54d38bcce6..2f288e50e9c 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/google/go-tpm v0.9.0 github.com/klauspost/compress v1.17.9 github.com/minio/highwayhash v1.0.3 - github.com/nats-io/jwt/v2 v2.5.8 + github.com/nats-io/jwt/v2 v2.6.0 github.com/nats-io/nats.go v1.36.0 github.com/nats-io/nkeys v0.4.7 github.com/nats-io/nuid v1.0.1 diff --git a/go.sum b/go.sum index 30d7d5952ab..531f9b1b157 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2 github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= -github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= -github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/jwt/v2 v2.6.0 h1:yXoBTdEotZw3NujMT+Nnu1UPNlFWdKQ3d0JJF/+pJag= +github.com/nats-io/jwt/v2 v2.6.0/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU= github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= @@ -22,13 +22,9 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= -golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= -golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= -golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= diff --git a/server/accounts.go b/server/accounts.go index 9a99b218597..eab290cb5f4 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -62,6 +62,7 @@ type Account struct { sqmu sync.Mutex sl *Sublist ic *client + sq *sendq isid uint64 etmr *time.Timer ctmr *time.Timer @@ -3679,6 +3680,26 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim a.enableAllJetStreamServiceImportsAndMappings() } + if a.js != nil { + // Check whether the account NRG status changed. If it has then we need to notify the + // Raft groups running on the system so that they can move their subs if needed. + a.mu.Lock() + previous := a.js.nrgAccount + switch tokens := strings.SplitN(ac.ClusterTraffic, ":", 2); tokens[0] { + case "system": + a.js.nrgAccount = _EMPTY_ + case "owner": + a.js.nrgAccount = a.Name + default: + s.Errorf("Account claim for %q has invalid value %q for cluster traffic account", a.Name, ac.ClusterTraffic) + } + changed := a.js.nrgAccount != previous + a.mu.Unlock() + if changed { + s.updateNRGAccountStatus() + } + } + for i, c := range clients { a.mu.RLock() exceeded := a.mconns != jwt.NoLimit && i >= int(a.mconns) diff --git a/server/events.go b/server/events.go index c8272d42c6f..36fcc13b961 100644 --- a/server/events.go +++ b/server/events.go @@ -264,6 +264,7 @@ type ServerInfo struct { const ( JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled. BinaryStreamSnapshot // New stream snapshot capability. + AccountNRG // Move NRG traffic out of system account. ) // Set JetStream capability. @@ -289,6 +290,17 @@ func (si *ServerInfo) BinaryStreamSnapshot() bool { return si.Flags&BinaryStreamSnapshot != 0 } +// Set account NRG capability. +func (si *ServerInfo) SetAccountNRG() { + si.Flags |= AccountNRG +} + +// AccountNRG indicates whether or not we support moving the NRG traffic out of the +// system account and into the asset account. +func (si *ServerInfo) AccountNRG() bool { + return si.Flags&AccountNRG != 0 +} + // ClientInfo is detailed information about the client forming a connection. type ClientInfo struct { Start *time.Time `json:"start,omitempty"` @@ -475,10 +487,14 @@ RESET: si.Version = VERSION si.Time = time.Now().UTC() si.Tags = tags + si.Flags = 0 if js { // New capability based flags. si.SetJetStreamEnabled() si.SetBinaryStreamSnapshot() + if s.accountNRGAllowed.Load() { + si.SetAccountNRG() + } } } var b []byte @@ -1616,7 +1632,8 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su } node := getHash(si.Name) - s.nodeToInfo.Store(node, nodeInfo{ + accountNRG := si.AccountNRG() + oldInfo, _ := s.nodeToInfo.Swap(node, nodeInfo{ si.Name, si.Version, si.Cluster, @@ -1628,7 +1645,14 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su false, si.JetStreamEnabled(), si.BinaryStreamSnapshot(), + accountNRG, }) + if oldInfo == nil || accountNRG != oldInfo.(nodeInfo).accountNRG { + // One of the servers we received statsz from changed its mind about + // whether or not it supports in-account NRG, so update the groups + // with this information. + s.updateNRGAccountStatus() + } } // updateRemoteServer is called when we have an update from a remote server. @@ -1675,14 +1699,35 @@ func (s *Server) processNewServer(si *ServerInfo) { false, si.JetStreamEnabled(), si.BinaryStreamSnapshot(), + si.AccountNRG(), }) } } + go s.updateNRGAccountStatus() // Announce ourselves.. // Do this in a separate Go routine. go s.sendStatszUpdate() } +// Works out whether all nodes support moving the NRG traffic into +// the account and moves it appropriately. +// Server lock MUST NOT be held on entry. +func (s *Server) updateNRGAccountStatus() { + s.rnMu.Lock() + raftNodes := make([]RaftNode, 0, len(s.raftNodes)) + for _, n := range s.raftNodes { + raftNodes = append(raftNodes, n) + } + s.rnMu.Unlock() + for _, n := range raftNodes { + // In the event that the node is happy that all nodes that + // it cares about haven't changed, this will be a no-op. + if err := n.RecreateInternalSubs(); err != nil { + n.Stop() + } + } +} + // If GW is enabled on this server and there are any leaf node connections, // this function will send a LeafNode connect system event to the super cluster // to ensure that the GWs are in interest-only mode for this account. diff --git a/server/jetstream.go b/server/jetstream.go index 6dc6de6e98e..ac7c8a1e11f 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -174,6 +174,9 @@ type jsAccount struct { updatesSub *subscription lupdate time.Time utimer *time.Timer + + // Which account to send NRG traffic into. Empty string is system account. + nrgAccount string } // Track general usage for this account. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 3f97518590a..13a941c10c7 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2505,6 +2505,150 @@ func TestJetStreamClusterConsumerLeak(t *testing.T) { } } +func TestJetStreamClusterAccountNRG(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!")) + defer snc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: nats.MemoryStorage, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + leader := c.streamLeader(globalAccountName, "TEST") + stream, err := leader.gacc.lookupStream("TEST") + require_NoError(t, err) + rg := stream.node.(*raft) + + t.Run("Disabled", func(t *testing.T) { + // Switch off account NRG on all servers in the cluster. + for _, s := range c.servers { + s.accountNRGAllowed.Store(false) + s.sendStatszUpdate() + } + time.Sleep(time.Millisecond * 100) + for _, s := range c.servers { + s.GlobalAccount().js.nrgAccount = "" + s.updateNRGAccountStatus() + } + + // Check account interest for the AppendEntry subject. + checkFor(t, time.Second, time.Millisecond*25, func() error { + for _, s := range c.servers { + if !s.sys.account.sl.hasInterest(rg.asubj, true) { + return fmt.Errorf("system account should have interest") + } + if s.gacc.sl.hasInterest(rg.asubj, true) { + return fmt.Errorf("global account shouldn't have interest") + } + } + return nil + }) + + // Check that the Raft traffic is in the system account, as we + // haven't moved it elsewhere yet. + { + sub, err := snc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) + + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + }) + + t.Run("Mixed", func(t *testing.T) { + // Switch on account NRG on a single server in the cluster and + // leave it off on the rest. + for i, s := range c.servers { + s.accountNRGAllowed.Store(i == 0) + s.sendStatszUpdate() + } + time.Sleep(time.Millisecond * 100) + for i, s := range c.servers { + if i == 0 { + s.GlobalAccount().js.nrgAccount = globalAccountName + } else { + s.GlobalAccount().js.nrgAccount = "" + } + s.updateNRGAccountStatus() + } + + // Check account interest for the AppendEntry subject. + checkFor(t, time.Second, time.Millisecond*25, func() error { + for _, s := range c.servers { + if !s.sys.account.sl.hasInterest(rg.asubj, true) { + return fmt.Errorf("system account should have interest") + } + if s.gacc.sl.hasInterest(rg.asubj, true) { + return fmt.Errorf("global account shouldn't have interest") + } + } + return nil + }) + + // Check that the Raft traffic is in the system account, as we + // don't claim support for account NRG on all nodes in the group. + { + sub, err := snc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) + + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + }) + + t.Run("Enabled", func(t *testing.T) { + // Switch on account NRG on all servers in the cluster. + for _, s := range c.servers { + s.accountNRGAllowed.Store(true) + s.sendStatszUpdate() + } + time.Sleep(time.Millisecond * 100) + for _, s := range c.servers { + s.GlobalAccount().js.nrgAccount = globalAccountName + s.updateNRGAccountStatus() + } + + // Check account interest for the AppendEntry subject. + checkFor(t, time.Second, time.Millisecond*25, func() error { + for _, s := range c.servers { + if s.sys.account.sl.hasInterest(rg.asubj, true) { + return fmt.Errorf("system account shouldn't have interest") + } + if !s.gacc.sl.hasInterest(rg.asubj, true) { + return fmt.Errorf("global account should have interest") + } + } + return nil + }) + + // Check that the traffic moved into the global account as + // expected. + { + sub, err := nc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) + + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + }) +} + func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() diff --git a/server/jetstream_jwt_test.go b/server/jetstream_jwt_test.go index f8f6466f69e..9f5e937ea1e 100644 --- a/server/jetstream_jwt_test.go +++ b/server/jetstream_jwt_test.go @@ -1532,3 +1532,110 @@ func TestJetStreamJWTClusteredTiersR3StreamWithR1ConsumersAndAccounting(t *testi require_Equal(t, r3.Streams, 1) require_Equal(t, r3.Consumers, 0) } + +func TestJetStreamJWTClusterAccountNRG(t *testing.T) { + _, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + + _, aExpPub := createKey(t) + accClaim := jwt.NewAccountClaims(aExpPub) + accClaim.Name = "acc" + accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{DiskStorage: 1100, Consumer: 10, Streams: 1} + accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{DiskStorage: 1100, Consumer: 1, Streams: 1} + accJwt := encodeClaim(t, accClaim, aExpPub) + + _, aExpPub2 := createKey(t) + accClaim2 := jwt.NewAccountClaims(aExpPub2) + accClaim2.Name = "another_acc" + accJwt2 := encodeClaim(t, accClaim2, aExpPub2) + + tmlp := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + leaf { + listen: 127.0.0.1:-1 + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + ` + fmt.Sprintf(` + operator: %s + system_account: %s + resolver = MEMORY + resolver_preload = { + %s : %s + %s : %s + %s : %s + } + `, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt, aExpPub2, accJwt2) + + c := createJetStreamClusterWithTemplate(t, tmlp, "cluster", 3) + defer c.shutdown() + + // We'll try flipping the state a few times and then do some sanity + // checks to check that it took effect. + thirdAcc := fmt.Sprintf("account:%s", aExpPub2) + // TODO: Not currently testing thirdAcc because we haven't enabled this + // functionality yet. If/when we do enable, this test is ready just by + // uncommenting the third state below. + for _, state := range []string{"system", "owner" /*, thirdAcc */} { + accClaim.ClusterTraffic = state + accJwt = encodeClaim(t, accClaim, aExpPub) + + for _, s := range c.servers { + // Update the account claim for our "third account". + acc, err := s.lookupAccount(aExpPub2) + require_NoError(t, err) + require_NoError(t, s.updateAccountWithClaimJWT(acc, accJwt2)) + + // Then update the account claim for the asset account. + acc, err = s.lookupAccount(aExpPub) + require_NoError(t, err) + require_NoError(t, s.updateAccountWithClaimJWT(acc, accJwt)) + + // Check that everything looks like it should. + require_True(t, acc != nil) + require_True(t, acc.js != nil) + switch state { + case "system": + require_Equal(t, acc.js.nrgAccount, _EMPTY_) + case "owner": + require_Equal(t, acc.js.nrgAccount, aExpPub) + case thirdAcc: + require_Equal(t, acc.js.nrgAccount, aExpPub2) + } + + // Now get a list of all of the Raft nodes that should + // have been updated by now. + s.rnMu.Lock() + raftNodes := make([]*raft, 0, len(s.raftNodes)) + for _, n := range s.raftNodes { + rg := n.(*raft) + if rg.accName != acc.Name { + continue + } + raftNodes = append(raftNodes, rg) + } + s.rnMu.Unlock() + + // Check whether each of the Raft nodes reports being + // in-account or not. + for _, rg := range raftNodes { + rg.Lock() + rgAcc := rg.acc + rg.Unlock() + switch state { + case "system": + require_Equal(t, rgAcc.Name, syspub) + case "owner": + require_Equal(t, rgAcc.Name, aExpPub) + case thirdAcc: + require_Equal(t, rgAcc.Name, aExpPub2) + } + } + } + } +} diff --git a/server/opts.go b/server/opts.go index f562d925a67..88e0ad8c5ae 100644 --- a/server/opts.go +++ b/server/opts.go @@ -2141,6 +2141,19 @@ func parseJetStreamForAccount(v any, acc *Account, errors *[]error) error { return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } jsLimits.MaxAckPending = int(vv) + case "cluster_traffic": + vv, ok := mv.(string) + if !ok { + return &configErr{tk, fmt.Sprintf("Expected either 'system' or 'account' string value for %q, got %v", mk, mv)} + } + switch tokens := strings.SplitN(vv, ":", 2); strings.ToLower(tokens[0]) { + case "system": + acc.js.nrgAccount = _EMPTY_ + case "owner": + acc.js.nrgAccount = acc.Name + default: + return &configErr{tk, fmt.Sprintf("Expected 'system' or 'owner' string value for %q, got %v", mk, mv)} + } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/raft.go b/server/raft.go index f45f89f5073..56e69e37960 100644 --- a/server/raft.go +++ b/server/raft.go @@ -76,6 +76,7 @@ type RaftNode interface { Stop() Delete() Wipe() + RecreateInternalSubs() error } type WAL interface { @@ -129,6 +130,7 @@ type raft struct { created time.Time // Time that the group was created accName string // Account name of the asset this raft group is for + acc *Account // Account that NRG traffic will be sent/received in group string // Raft group sd string // Store directory id string // Node ID @@ -351,8 +353,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe s.mu.RUnlock() return nil, ErrNoSysAccount } - sq := s.sys.sq - sacc := s.sys.account hash := s.sys.shash s.mu.RUnlock() @@ -380,9 +380,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe acks: make(map[uint64]map[string]struct{}), pae: make(map[uint64]*appendEntry), s: s, - c: s.createInternalSystemClient(), js: s.getJetStream(), - sq: sq, quit: make(chan struct{}), reqs: newIPQueue[*voteRequest](s, qpfx+"vreq"), votes: newIPQueue[*voteResponse](s, qpfx+"vresp"), @@ -395,7 +393,14 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe observer: cfg.Observer, extSt: ps.domainExt, } - n.c.registerWithAccount(sacc) + + // Setup our internal subscriptions for proposals, votes and append entries. + // If we fail to do this for some reason then this is fatal — we cannot + // continue setting up or the Raft node may be partially/totally isolated. + if err := n.RecreateInternalSubs(); err != nil { + n.shutdown(false) + return nil, err + } if atomic.LoadInt32(&s.logging.debug) > 0 { n.dflag = true @@ -493,14 +498,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe } } - // Setup our internal subscriptions for proposals, votes and append entries. - // If we fail to do this for some reason then this is fatal — we cannot - // continue setting up or the Raft node may be partially/totally isolated. - if err := n.createInternalSubs(); err != nil { - n.shutdown(false) - return nil, err - } - n.debug("Started") // Check if we need to start in observer mode due to lame duck status. @@ -529,6 +526,109 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe return n, nil } +// Returns whether peers within this group claim to support +// moving NRG traffic into the asset account. +// Lock must be held. +func (n *raft) checkAccountNRGStatus() bool { + if !n.s.accountNRGAllowed.Load() { + return false + } + enabled := true + for pn := range n.peers { + if si, ok := n.s.nodeToInfo.Load(pn); ok && si != nil { + enabled = enabled && si.(nodeInfo).accountNRG + } + } + return enabled +} + +func (n *raft) RecreateInternalSubs() error { + n.Lock() + defer n.Unlock() + return n.recreateInternalSubsLocked() +} + +func (n *raft) recreateInternalSubsLocked() error { + // Sanity check for system account, as it can disappear when + // the system is shutting down. + if n.s == nil { + return fmt.Errorf("server not found") + } + n.s.mu.RLock() + sys := n.s.sys + n.s.mu.RUnlock() + if sys == nil { + return fmt.Errorf("system account not found") + } + + // Default is the system account. + nrgAcc := sys.account + + // Is account NRG enabled in this account and do all group + // peers claim to also support account NRG? + if n.checkAccountNRGStatus() { + // Check whether the account that the asset belongs to + // has volunteered a different NRG account. + target := nrgAcc.Name + if a, _ := n.s.lookupAccount(n.accName); a != nil { + a.mu.RLock() + if a.js != nil { + target = a.js.nrgAccount + } + a.mu.RUnlock() + } + + // If the target account exists, then we'll use that. + if target != _EMPTY_ { + if a, _ := n.s.lookupAccount(target); a != nil { + nrgAcc = a + } + } + } + if n.aesub != nil && n.acc == nrgAcc { + // Subscriptions already exist and the account NRG state + // hasn't changed. + return nil + } + + // Need to cancel any in-progress catch-ups, otherwise the + // inboxes are about to be pulled out from underneath it in + // the next step... + n.cancelCatchup() + + // If we have an existing client then tear down any existing + // subscriptions and close the internal client. + if c := n.c; c != nil { + c.mu.Lock() + subs := make([]*subscription, 0, len(c.subs)) + for _, sub := range c.subs { + subs = append(subs, sub) + } + c.mu.Unlock() + for _, sub := range subs { + n.unsubscribe(sub) + } + c.closeConnection(InternalClient) + } + + if n.acc != nrgAcc { + n.debug("Subscribing in '%s'", nrgAcc.GetName()) + } + + c := n.s.createInternalSystemClient() + c.registerWithAccount(nrgAcc) + if nrgAcc.sq == nil { + nrgAcc.sq = n.s.newSendQ(nrgAcc) + } + n.c = c + n.sq = nrgAcc.sq + n.acc = nrgAcc + + // Recreate any internal subscriptions for voting, append + // entries etc in the new account. + return n.createInternalSubs() +} + // outOfResources checks to see if we are out of resources. func (n *raft) outOfResources() bool { js := n.js @@ -1753,9 +1853,8 @@ func (n *raft) unsubscribe(sub *subscription) { } } +// Lock should be held. func (n *raft) createInternalSubs() error { - n.Lock() - defer n.Unlock() n.vsubj, n.vreply = fmt.Sprintf(raftVoteSubj, n.group), n.newInbox() n.asubj, n.areply = fmt.Sprintf(raftAppendSubj, n.group), n.newInbox() n.psubj = fmt.Sprintf(raftPropSubj, n.group) @@ -2905,6 +3004,9 @@ func (n *raft) adjustClusterSizeAndQuorum() { go n.sendHeartbeat() } } + if ncsz != pcsz { + n.recreateInternalSubsLocked() + } } // Track interactions with this peer. diff --git a/server/route.go b/server/route.go index 61f3c6dbb49..d46b76b626d 100644 --- a/server/route.go +++ b/server/route.go @@ -2128,7 +2128,7 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMod // check to be consistent and future proof. but will be same domain if s.sameDomain(info.Domain) { s.nodeToInfo.Store(rHash, - nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false}) + nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false, false}) } } diff --git a/server/sendq.go b/server/sendq.go index 0287c5548a7..8f486b36621 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -29,10 +29,11 @@ type sendq struct { mu sync.Mutex q *ipQueue[*outMsg] s *Server + a *Account } -func (s *Server) newSendQ() *sendq { - sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ")} +func (s *Server) newSendQ(acc *Account) *sendq { + sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ"), a: acc} s.startGoRoutine(sq.internalLoop) return sq } @@ -45,7 +46,7 @@ func (sq *sendq) internalLoop() { defer s.grWG.Done() c := s.createInternalSystemClient() - c.registerWithAccount(s.SystemAccount()) + c.registerWithAccount(sq.a) c.noIcb = true defer c.closeConnection(ClientClosed) diff --git a/server/server.go b/server/server.go index 94d8a7bc484..26d048e6327 100644 --- a/server/server.go +++ b/server/server.go @@ -360,6 +360,11 @@ type Server struct { // Queue to process JS API requests that come from routes (or gateways) jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq] + + // Whether moving NRG traffic into accounts is permitted on this server. + // Controls whether or not the account NRG capability is set in statsz. + // Currently used by unit tests to simulate nodes not supporting account NRG. + accountNRGAllowed atomic.Bool } // For tracking JS nodes. @@ -375,6 +380,7 @@ type nodeInfo struct { offline bool js bool binarySnapshots bool + accountNRG bool } // Make sure all are 64bits for atomic use @@ -723,6 +729,9 @@ func NewServer(opts *Options) (*Server, error) { syncOutSem: make(chan struct{}, maxConcurrentSyncRequests), } + // By default we'll allow account NRG. + s.accountNRGAllowed.Store(true) + // Fill up the maximum in flight syncRequests for this server. // Used in JetStream catchup semantics. for i := 0; i < maxConcurrentSyncRequests; i++ { @@ -766,7 +775,7 @@ func NewServer(opts *Options) (*Server, error) { opts.Tags, &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true}, nil, - false, true, true, + false, true, true, true, }) } @@ -1754,7 +1763,7 @@ func (s *Server) setSystemAccount(acc *Account) error { recvq: newIPQueue[*inSysMsg](s, "System recvQ"), recvqp: newIPQueue[*inSysMsg](s, "System recvQ Pings"), resetCh: make(chan struct{}), - sq: s.newSendQ(), + sq: s.newSendQ(acc), statsz: eventsHBInterval, orphMax: 5 * eventsHBInterval, chkOrph: 3 * eventsHBInterval,