diff --git a/go.mod b/go.mod index 6d3e6ac4509..17f7e8778db 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/nats-io/nats-server/v2 go 1.22.0 +replace github.com/nats-io/jwt/v2 => github.com/nats-io/jwt/v2 v2.5.9-0.20240910093232-2e1251e51094 + require ( github.com/google/go-tpm v0.9.0 github.com/klauspost/compress v1.17.9 diff --git a/go.sum b/go.sum index 2d80355d6a7..d2247aebd91 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2 github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= -github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= -github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/jwt/v2 v2.5.9-0.20240910093232-2e1251e51094 h1:8yMEZG1tTWFB7h4xEiucEJEYd4geU3YO5C3hFYjZToc= +github.com/nats-io/jwt/v2 v2.5.9-0.20240910093232-2e1251e51094/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU= github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= diff --git a/server/accounts.go b/server/accounts.go index 9a99b218597..a13d323b777 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -62,6 +62,7 @@ type Account struct { sqmu sync.Mutex sl *Sublist ic *client + sq *sendq isid uint64 etmr *time.Timer ctmr *time.Timer @@ -3679,6 +3680,28 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim a.enableAllJetStreamServiceImportsAndMappings() } + if a.js != nil { + // Check whether the account NRG status changed. If it has then we need to notify the + // Raft groups running on the system so that they can move their subs if needed. + a.mu.Lock() + previous := a.js.nrgAccount + switch tokens := strings.SplitN(ac.ClusterTraffic, ":", 2); tokens[0] { + case "system": + a.js.nrgAccount = "" + case "owner": + a.js.nrgAccount = a.Name + case "account": + a.js.nrgAccount = tokens[1] + default: + s.Errorf("Account claim for %q has invalid value %q for account NRG status", a.Name, ac.ClusterTraffic) + } + changed := a.js.nrgAccount != previous + a.mu.Unlock() + if changed { + s.updateNRGAccountStatus() + } + } + for i, c := range clients { a.mu.RLock() exceeded := a.mconns != jwt.NoLimit && i >= int(a.mconns) diff --git a/server/events.go b/server/events.go index c8272d42c6f..36fcc13b961 100644 --- a/server/events.go +++ b/server/events.go @@ -264,6 +264,7 @@ type ServerInfo struct { const ( JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled. BinaryStreamSnapshot // New stream snapshot capability. + AccountNRG // Move NRG traffic out of system account. ) // Set JetStream capability. @@ -289,6 +290,17 @@ func (si *ServerInfo) BinaryStreamSnapshot() bool { return si.Flags&BinaryStreamSnapshot != 0 } +// Set account NRG capability. +func (si *ServerInfo) SetAccountNRG() { + si.Flags |= AccountNRG +} + +// AccountNRG indicates whether or not we support moving the NRG traffic out of the +// system account and into the asset account. +func (si *ServerInfo) AccountNRG() bool { + return si.Flags&AccountNRG != 0 +} + // ClientInfo is detailed information about the client forming a connection. type ClientInfo struct { Start *time.Time `json:"start,omitempty"` @@ -475,10 +487,14 @@ RESET: si.Version = VERSION si.Time = time.Now().UTC() si.Tags = tags + si.Flags = 0 if js { // New capability based flags. si.SetJetStreamEnabled() si.SetBinaryStreamSnapshot() + if s.accountNRGAllowed.Load() { + si.SetAccountNRG() + } } } var b []byte @@ -1616,7 +1632,8 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su } node := getHash(si.Name) - s.nodeToInfo.Store(node, nodeInfo{ + accountNRG := si.AccountNRG() + oldInfo, _ := s.nodeToInfo.Swap(node, nodeInfo{ si.Name, si.Version, si.Cluster, @@ -1628,7 +1645,14 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su false, si.JetStreamEnabled(), si.BinaryStreamSnapshot(), + accountNRG, }) + if oldInfo == nil || accountNRG != oldInfo.(nodeInfo).accountNRG { + // One of the servers we received statsz from changed its mind about + // whether or not it supports in-account NRG, so update the groups + // with this information. + s.updateNRGAccountStatus() + } } // updateRemoteServer is called when we have an update from a remote server. @@ -1675,14 +1699,35 @@ func (s *Server) processNewServer(si *ServerInfo) { false, si.JetStreamEnabled(), si.BinaryStreamSnapshot(), + si.AccountNRG(), }) } } + go s.updateNRGAccountStatus() // Announce ourselves.. // Do this in a separate Go routine. go s.sendStatszUpdate() } +// Works out whether all nodes support moving the NRG traffic into +// the account and moves it appropriately. +// Server lock MUST NOT be held on entry. +func (s *Server) updateNRGAccountStatus() { + s.rnMu.Lock() + raftNodes := make([]RaftNode, 0, len(s.raftNodes)) + for _, n := range s.raftNodes { + raftNodes = append(raftNodes, n) + } + s.rnMu.Unlock() + for _, n := range raftNodes { + // In the event that the node is happy that all nodes that + // it cares about haven't changed, this will be a no-op. + if err := n.RecreateInternalSubs(); err != nil { + n.Stop() + } + } +} + // If GW is enabled on this server and there are any leaf node connections, // this function will send a LeafNode connect system event to the super cluster // to ensure that the GWs are in interest-only mode for this account. diff --git a/server/jetstream.go b/server/jetstream.go index 6dc6de6e98e..ac7c8a1e11f 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -174,6 +174,9 @@ type jsAccount struct { updatesSub *subscription lupdate time.Time utimer *time.Timer + + // Which account to send NRG traffic into. Empty string is system account. + nrgAccount string } // Track general usage for this account. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 3f97518590a..13a941c10c7 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2505,6 +2505,150 @@ func TestJetStreamClusterConsumerLeak(t *testing.T) { } } +func TestJetStreamClusterAccountNRG(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!")) + defer snc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: nats.MemoryStorage, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + leader := c.streamLeader(globalAccountName, "TEST") + stream, err := leader.gacc.lookupStream("TEST") + require_NoError(t, err) + rg := stream.node.(*raft) + + t.Run("Disabled", func(t *testing.T) { + // Switch off account NRG on all servers in the cluster. + for _, s := range c.servers { + s.accountNRGAllowed.Store(false) + s.sendStatszUpdate() + } + time.Sleep(time.Millisecond * 100) + for _, s := range c.servers { + s.GlobalAccount().js.nrgAccount = "" + s.updateNRGAccountStatus() + } + + // Check account interest for the AppendEntry subject. + checkFor(t, time.Second, time.Millisecond*25, func() error { + for _, s := range c.servers { + if !s.sys.account.sl.hasInterest(rg.asubj, true) { + return fmt.Errorf("system account should have interest") + } + if s.gacc.sl.hasInterest(rg.asubj, true) { + return fmt.Errorf("global account shouldn't have interest") + } + } + return nil + }) + + // Check that the Raft traffic is in the system account, as we + // haven't moved it elsewhere yet. + { + sub, err := snc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) + + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + }) + + t.Run("Mixed", func(t *testing.T) { + // Switch on account NRG on a single server in the cluster and + // leave it off on the rest. + for i, s := range c.servers { + s.accountNRGAllowed.Store(i == 0) + s.sendStatszUpdate() + } + time.Sleep(time.Millisecond * 100) + for i, s := range c.servers { + if i == 0 { + s.GlobalAccount().js.nrgAccount = globalAccountName + } else { + s.GlobalAccount().js.nrgAccount = "" + } + s.updateNRGAccountStatus() + } + + // Check account interest for the AppendEntry subject. + checkFor(t, time.Second, time.Millisecond*25, func() error { + for _, s := range c.servers { + if !s.sys.account.sl.hasInterest(rg.asubj, true) { + return fmt.Errorf("system account should have interest") + } + if s.gacc.sl.hasInterest(rg.asubj, true) { + return fmt.Errorf("global account shouldn't have interest") + } + } + return nil + }) + + // Check that the Raft traffic is in the system account, as we + // don't claim support for account NRG on all nodes in the group. + { + sub, err := snc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) + + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + }) + + t.Run("Enabled", func(t *testing.T) { + // Switch on account NRG on all servers in the cluster. + for _, s := range c.servers { + s.accountNRGAllowed.Store(true) + s.sendStatszUpdate() + } + time.Sleep(time.Millisecond * 100) + for _, s := range c.servers { + s.GlobalAccount().js.nrgAccount = globalAccountName + s.updateNRGAccountStatus() + } + + // Check account interest for the AppendEntry subject. + checkFor(t, time.Second, time.Millisecond*25, func() error { + for _, s := range c.servers { + if s.sys.account.sl.hasInterest(rg.asubj, true) { + return fmt.Errorf("system account shouldn't have interest") + } + if !s.gacc.sl.hasInterest(rg.asubj, true) { + return fmt.Errorf("global account should have interest") + } + } + return nil + }) + + // Check that the traffic moved into the global account as + // expected. + { + sub, err := nc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) + + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + }) +} + func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() diff --git a/server/jetstream_jwt_test.go b/server/jetstream_jwt_test.go index f8f6466f69e..fc7b9c313e7 100644 --- a/server/jetstream_jwt_test.go +++ b/server/jetstream_jwt_test.go @@ -1532,3 +1532,107 @@ func TestJetStreamJWTClusteredTiersR3StreamWithR1ConsumersAndAccounting(t *testi require_Equal(t, r3.Streams, 1) require_Equal(t, r3.Consumers, 0) } + +func TestJetStreamJWTClusterAccountNRG(t *testing.T) { + _, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + + _, aExpPub := createKey(t) + accClaim := jwt.NewAccountClaims(aExpPub) + accClaim.Name = "acc" + accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{DiskStorage: 1100, Consumer: 10, Streams: 1} + accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{DiskStorage: 1100, Consumer: 1, Streams: 1} + accJwt := encodeClaim(t, accClaim, aExpPub) + + _, aExpPub2 := createKey(t) + accClaim2 := jwt.NewAccountClaims(aExpPub2) + accClaim2.Name = "another_acc" + accJwt2 := encodeClaim(t, accClaim2, aExpPub2) + + tmlp := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + leaf { + listen: 127.0.0.1:-1 + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + ` + fmt.Sprintf(` + operator: %s + system_account: %s + resolver = MEMORY + resolver_preload = { + %s : %s + %s : %s + %s : %s + } + `, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt, aExpPub2, accJwt2) + + c := createJetStreamClusterWithTemplate(t, tmlp, "cluster", 3) + defer c.shutdown() + + // We'll try flipping the state a few times and then do some sanity + // checks to check that it took effect. + thirdAcc := fmt.Sprintf("account:%s", aExpPub2) + for _, state := range []string{"system", "owner", thirdAcc} { + accClaim.ClusterTraffic = state + accJwt = encodeClaim(t, accClaim, aExpPub) + + for _, s := range c.servers { + // Update the account claim for our "third account". + acc, err := s.lookupAccount(aExpPub2) + require_NoError(t, err) + require_NoError(t, s.updateAccountWithClaimJWT(acc, accJwt2)) + + // Then update the account claim for the asset account. + acc, err = s.lookupAccount(aExpPub) + require_NoError(t, err) + require_NoError(t, s.updateAccountWithClaimJWT(acc, accJwt)) + + // Check that everything looks like it should. + require_True(t, acc != nil) + require_True(t, acc.js != nil) + switch state { + case "system": + require_Equal(t, acc.js.nrgAccount, _EMPTY_) + case "owner": + require_Equal(t, acc.js.nrgAccount, aExpPub) + case thirdAcc: + require_Equal(t, acc.js.nrgAccount, aExpPub2) + } + + // Now get a list of all of the Raft nodes that should + // have been updated by now. + s.rnMu.Lock() + raftNodes := make([]*raft, 0, len(s.raftNodes)) + for _, n := range s.raftNodes { + rg := n.(*raft) + if rg.accName != acc.Name { + continue + } + raftNodes = append(raftNodes, rg) + } + s.rnMu.Unlock() + + // Check whether each of the Raft nodes reports being + // in-account or not. + for _, rg := range raftNodes { + rg.Lock() + rgAcc := rg.acc + rg.Unlock() + switch state { + case "system": + require_Equal(t, rgAcc.Name, syspub) + case "owner": + require_Equal(t, rgAcc.Name, aExpPub) + case thirdAcc: + require_Equal(t, rgAcc.Name, aExpPub2) + } + } + } + } +} diff --git a/server/opts.go b/server/opts.go index f562d925a67..13a59af8740 100644 --- a/server/opts.go +++ b/server/opts.go @@ -2141,6 +2141,21 @@ func parseJetStreamForAccount(v any, acc *Account, errors *[]error) error { return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } jsLimits.MaxAckPending = int(vv) + case "cluster_traffic": + vv, ok := mv.(string) + if !ok { + return &configErr{tk, fmt.Sprintf("Expected either 'system' or 'account' string value for %q, got %v", mk, mv)} + } + switch tokens := strings.SplitN(vv, ":", 2); strings.ToLower(tokens[0]) { + case "system": + acc.js.nrgAccount = "" + case "owner": + acc.js.nrgAccount = acc.Name + case "account": + acc.js.nrgAccount = tokens[1] + default: + return &configErr{tk, fmt.Sprintf("Expected 'system', 'owner' or 'account:ACCNAME' string value for %q, got %v", mk, mv)} + } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/raft.go b/server/raft.go index f45f89f5073..4a5a15fc241 100644 --- a/server/raft.go +++ b/server/raft.go @@ -76,6 +76,7 @@ type RaftNode interface { Stop() Delete() Wipe() + RecreateInternalSubs() error } type WAL interface { @@ -129,6 +130,7 @@ type raft struct { created time.Time // Time that the group was created accName string // Account name of the asset this raft group is for + acc *Account // Account that NRG traffic will be sent/received in group string // Raft group sd string // Store directory id string // Node ID @@ -351,8 +353,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe s.mu.RUnlock() return nil, ErrNoSysAccount } - sq := s.sys.sq - sacc := s.sys.account hash := s.sys.shash s.mu.RUnlock() @@ -380,9 +380,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe acks: make(map[uint64]map[string]struct{}), pae: make(map[uint64]*appendEntry), s: s, - c: s.createInternalSystemClient(), js: s.getJetStream(), - sq: sq, quit: make(chan struct{}), reqs: newIPQueue[*voteRequest](s, qpfx+"vreq"), votes: newIPQueue[*voteResponse](s, qpfx+"vresp"), @@ -395,7 +393,14 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe observer: cfg.Observer, extSt: ps.domainExt, } - n.c.registerWithAccount(sacc) + + // Setup our internal subscriptions for proposals, votes and append entries. + // If we fail to do this for some reason then this is fatal — we cannot + // continue setting up or the Raft node may be partially/totally isolated. + if err := n.RecreateInternalSubs(); err != nil { + n.shutdown(false) + return nil, err + } if atomic.LoadInt32(&s.logging.debug) > 0 { n.dflag = true @@ -493,14 +498,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe } } - // Setup our internal subscriptions for proposals, votes and append entries. - // If we fail to do this for some reason then this is fatal — we cannot - // continue setting up or the Raft node may be partially/totally isolated. - if err := n.createInternalSubs(); err != nil { - n.shutdown(false) - return nil, err - } - n.debug("Started") // Check if we need to start in observer mode due to lame duck status. @@ -529,6 +526,97 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe return n, nil } +// Returns whether peers within this group claim to support +// moving NRG traffic into the asset account. +// Lock must be held. +func (n *raft) checkAccountNRGStatus() bool { + if !n.s.accountNRGAllowed.Load() { + return false + } + enabled := true + for pn := range n.peers { + if si, ok := n.s.nodeToInfo.Load(pn); ok && si != nil { + enabled = enabled && si.(nodeInfo).accountNRG + } + } + return enabled +} + +func (n *raft) RecreateInternalSubs() error { + n.Lock() + defer n.Unlock() + return n.recreateInternalSubsLocked() +} + +func (n *raft) recreateInternalSubsLocked() error { + // Default is the system account. + nrgAcc := n.s.sys.account + + // Is account NRG enabled in this account and do all group + // peers claim to also support account NRG? + if n.checkAccountNRGStatus() { + // Check whether the account that the asset belongs to + // has volunteered a different NRG account. + target := nrgAcc.Name + if a, _ := n.s.lookupAccount(n.accName); a != nil { + a.mu.RLock() + if a.js != nil { + target = a.js.nrgAccount + } + a.mu.RUnlock() + } + + // If the target account exists, then we'll use that. + if target != _EMPTY_ { + if a, _ := n.s.lookupAccount(target); a != nil { + nrgAcc = a + } + } + } + if n.aesub != nil && n.acc == nrgAcc { + // Subscriptions already exist and the account NRG state + // hasn't changed. + return nil + } + + // Need to cancel any in-progress catch-ups, otherwise the + // inboxes are about to be pulled out from underneath it in + // the next step... + n.cancelCatchup() + + // If we have an existing client then tear down any existing + // subscriptions and close the internal client. + if c := n.c; c != nil { + c.mu.Lock() + subs := make([]*subscription, 0, len(c.subs)) + for _, sub := range c.subs { + subs = append(subs, sub) + } + c.mu.Unlock() + for _, sub := range subs { + n.unsubscribe(sub) + } + c.closeConnection(InternalClient) + } + + if n.acc != nrgAcc { + n.debug("Subscribing in '%s'", nrgAcc.GetName()) + } + + c := n.s.createInternalSystemClient() + c.registerWithAccount(nrgAcc) + if nrgAcc.sq == nil { + nrgAcc.sq = n.s.newSendQ(nrgAcc) + } + n.c = c + n.sq = nrgAcc.sq + n.acc = nrgAcc + + // Recreate any internal subscriptions for voting, append + // entries etc in the new account. + return n.createInternalSubs() +} + // outOfResources checks to see if we are out of resources. func (n *raft) outOfResources() bool { js := n.js @@ -1753,9 +1841,8 @@ func (n *raft) unsubscribe(sub *subscription) { } } +// Lock should be held. func (n *raft) createInternalSubs() error { - n.Lock() - defer n.Unlock() n.vsubj, n.vreply = fmt.Sprintf(raftVoteSubj, n.group), n.newInbox() n.asubj, n.areply = fmt.Sprintf(raftAppendSubj, n.group), n.newInbox() n.psubj = fmt.Sprintf(raftPropSubj, n.group) @@ -2905,6 +2992,9 @@ func (n *raft) adjustClusterSizeAndQuorum() { go n.sendHeartbeat() } } + if ncsz != pcsz { + n.recreateInternalSubsLocked() + } } // Track interactions with this peer. diff --git a/server/route.go b/server/route.go index 61f3c6dbb49..d46b76b626d 100644 --- a/server/route.go +++ b/server/route.go @@ -2128,7 +2128,7 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMod // check to be consistent and future proof. but will be same domain if s.sameDomain(info.Domain) { s.nodeToInfo.Store(rHash, - nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false}) + nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false, false}) } } diff --git a/server/sendq.go b/server/sendq.go index 0287c5548a7..8f486b36621 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -29,10 +29,11 @@ type sendq struct { mu sync.Mutex q *ipQueue[*outMsg] s *Server + a *Account } -func (s *Server) newSendQ() *sendq { - sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ")} +func (s *Server) newSendQ(acc *Account) *sendq { + sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ"), a: acc} s.startGoRoutine(sq.internalLoop) return sq } @@ -45,7 +46,7 @@ func (sq *sendq) internalLoop() { defer s.grWG.Done() c := s.createInternalSystemClient() - c.registerWithAccount(s.SystemAccount()) + c.registerWithAccount(sq.a) c.noIcb = true defer c.closeConnection(ClientClosed) diff --git a/server/server.go b/server/server.go index 94d8a7bc484..b84c93d687b 100644 --- a/server/server.go +++ b/server/server.go @@ -360,6 +360,9 @@ type Server struct { // Queue to process JS API requests that come from routes (or gateways) jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq] + + // Whether moving NRG traffic into accounts is permitted on this server. + accountNRGAllowed atomic.Bool } // For tracking JS nodes. @@ -375,6 +378,7 @@ type nodeInfo struct { offline bool js bool binarySnapshots bool + accountNRG bool } // Make sure all are 64bits for atomic use @@ -723,6 +727,10 @@ func NewServer(opts *Options) (*Server, error) { syncOutSem: make(chan struct{}, maxConcurrentSyncRequests), } + // By default we'll allow account NRG. + // TODO: Maybe make it a configuration option? + s.accountNRGAllowed.Store(true) + // Fill up the maximum in flight syncRequests for this server. // Used in JetStream catchup semantics. for i := 0; i < maxConcurrentSyncRequests; i++ { @@ -766,7 +774,7 @@ func NewServer(opts *Options) (*Server, error) { opts.Tags, &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true}, nil, - false, true, true, + false, true, true, true, }) } @@ -1754,7 +1762,7 @@ func (s *Server) setSystemAccount(acc *Account) error { recvq: newIPQueue[*inSysMsg](s, "System recvQ"), recvqp: newIPQueue[*inSysMsg](s, "System recvQ Pings"), resetCh: make(chan struct{}), - sq: s.newSendQ(), + sq: s.newSendQ(acc), statsz: eventsHBInterval, orphMax: 5 * eventsHBInterval, chkOrph: 3 * eventsHBInterval,