diff --git a/server/accounts.go b/server/accounts.go index aca96b5d24b..bf6fcfb9564 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -61,6 +61,7 @@ type Account struct { sqmu sync.Mutex sl *Sublist ic *client + sq *sendq isid uint64 etmr *time.Timer ctmr *time.Timer diff --git a/server/events.go b/server/events.go index ff940571a00..6ba450fec46 100644 --- a/server/events.go +++ b/server/events.go @@ -263,6 +263,7 @@ type ServerInfo struct { const ( JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled. BinaryStreamSnapshot // New stream snapshot capability. + AccountNRG // Move NRG traffic out of system account. ) // Set JetStream capability. @@ -288,6 +289,17 @@ func (si *ServerInfo) BinaryStreamSnapshot() bool { return si.Flags&BinaryStreamSnapshot != 0 } +// Set account NRG capability. +func (si *ServerInfo) SetAccountNRG() { + si.Flags |= AccountNRG +} + +// AccountNRG indicates whether or not we support moving the NRG traffic out of the +// system account and into the asset account. +func (si *ServerInfo) AccountNRG() bool { + return si.Flags&AccountNRG != 0 +} + // ClientInfo is detailed information about the client forming a connection. type ClientInfo struct { Start *time.Time `json:"start,omitempty"` @@ -486,6 +498,7 @@ RESET: // New capability based flags. si.SetJetStreamEnabled() si.SetBinaryStreamSnapshot() + si.SetAccountNRG() } } var b []byte @@ -912,6 +925,7 @@ func (s *Server) sendStatsz(subj string) { // JetStream if js := s.js.Load(); js != nil { jStat := &JetStreamVarz{} + jStat.AccountNRGActive = s.accountNRG.Load() s.mu.RUnlock() js.mu.RLock() c := js.config @@ -1589,7 +1603,9 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su false, si.JetStreamEnabled(), si.BinaryStreamSnapshot(), + si.AccountNRG(), }) + s.updateNRGAccountStatus() } // updateRemoteServer is called when we have an update from a remote server. @@ -1636,14 +1652,56 @@ func (s *Server) processNewServer(si *ServerInfo) { false, si.JetStreamEnabled(), si.BinaryStreamSnapshot(), + si.AccountNRG(), }) } } + go s.updateNRGAccountStatus() // Announce ourselves.. // Do this in a separate Go routine. go s.sendStatszUpdate() } +// Works out whether all nodes support moving the NRG traffic into +// the account and moves it appropriately. +// Server lock MUST NOT be held on entry. +func (s *Server) updateNRGAccountStatus() { + var raftNodes []RaftNode + s.optsMu.RLock() + supported := s.opts.JetStreamAccountNRG + s.optsMu.RUnlock() + if supported { + s.rnMu.Lock() + raftNodes = make([]RaftNode, 0, len(s.raftNodes)) + for _, n := range s.raftNodes { + raftNodes = append(raftNodes, n) + } + s.rnMu.Unlock() + s.mu.Lock() + s.nodeToInfo.Range(func(key, value any) bool { + si := value.(nodeInfo) + if !s.sameDomain(si.domain) { + return true + } + if supported = supported && si.accountNRG; !supported { + return false + } + return true + }) + s.mu.Unlock() + } + if s.accountNRG.CompareAndSwap(!supported, supported) { + if supported { + s.Noticef("Moving NRG traffic into asset accounts") + } else { + s.Warnf("Moving NRG traffic back into system account due to old nodes coming online") + } + for _, n := range raftNodes { + n.RecreateInternalSubs(supported) + } + } +} + // If GW is enabled on this server and there are any leaf node connections, // this function will send a LeafNode connect system event to the super cluster // to ensure that the GWs are in interest-only mode for this account. diff --git a/server/jetstream.go b/server/jetstream.go index 7627417d942..14aa7bb488a 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -48,6 +48,7 @@ type JetStreamConfig struct { Domain string `json:"domain,omitempty"` CompressOK bool `json:"compress_ok,omitempty"` UniqueTag string `json:"unique_tag,omitempty"` + AccountNRG bool `json:"account_nrg_enabled,omitempty"` } // Statistics about JetStream for this server. @@ -544,6 +545,7 @@ func (s *Server) restartJetStream() error { MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, Domain: opts.JetStreamDomain, + AccountNRG: opts.JetStreamAccountNRG, } s.Noticef("Restarting JetStream") err := s.EnableJetStream(&cfg) @@ -2513,6 +2515,7 @@ func (s *Server) dynJetStreamConfig(storeDir string, maxStore, maxMem int64) *Je } opts := s.getOpts() + jsc.AccountNRG = opts.JetStreamAccountNRG // Sync options. jsc.SyncInterval = opts.SyncInterval diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 8407187e99b..93bb609adff 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2329,3 +2329,115 @@ func TestJetStreamClusterAckFloorBetweenLeaderAndFollowers(t *testing.T) { } } } + +func TestJetStreamClusterAccountNRG(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!")) + defer snc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: nats.MemoryStorage, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + leader := c.streamLeader(globalAccountName, "TEST") + stream, err := leader.gacc.lookupStream("TEST") + require_NoError(t, err) + rg := stream.node.(*raft) + + // System account should have interest, but the global account + // shouldn't. + for _, s := range c.servers { + require_True(t, s.sys.account.sl.hasInterest(rg.asubj, true)) + require_False(t, s.gacc.sl.hasInterest(rg.asubj, true)) + } + + // Check that varz tells us what we expect. + // set a header to make sure request parsing knows to ignore them + ri := snc.NewRespInbox() + replies, err := snc.SubscribeSync(ri) + require_NoError(t, err) + require_NoError(t, snc.PublishMsg(&nats.Msg{ + Subject: "$SYS.REQ.SERVER.PING.VARZ", + Reply: ri, + })) + for i := 0; i < len(c.servers); i++ { + msg, err := replies.NextMsg(time.Second * 5) + require_NoError(t, err) + var v Varz + require_NoError(t, json.Unmarshal(msg.Data, &v)) + //require_Equal(t, v.JetStream.Config.AccountNRG, false) + require_Equal(t, v.JetStream.AccountNRGActive, false) + } + p, _, _ := replies.Pending() + require_Equal(t, p, 0) + + // First of all check that the Raft traffic is in the system + // account, as we haven't moved it elsewhere yet. + { + sub, err := snc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) + + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + + // Switch on account NRG on all servers in the cluster. Then + // we wait, as we will need statsz to be sent for all servers + // in the cluster. + for _, s := range c.servers { + s.optsMu.Lock() + s.opts.JetStreamAccountNRG = true + s.optsMu.Unlock() + s.updateNRGAccountStatus() + } + + // Now check that the traffic has moved into the asset acc. + // In this case the system account should no longer have + // subscriptions for those subjects. + { + sub, err := nc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) + + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + + // The global account should now have interest and the + // system account shouldn't. + for _, s := range c.servers { + require_False(t, s.sys.account.sl.hasInterest(rg.asubj, true)) + require_True(t, s.gacc.sl.hasInterest(rg.asubj, true)) + } + + // Check varz again. + require_NoError(t, snc.PublishMsg(&nats.Msg{ + Subject: "$SYS.REQ.SERVER.PING.VARZ", + Reply: ri, + })) + for i := 0; i < len(c.servers); i++ { + msg, err := replies.NextMsg(time.Second * 5) + require_NoError(t, err) + var v Varz + //Server Varz `json:"server"` + //} + require_NoError(t, json.Unmarshal(msg.Data, &v)) + t.Logf("JSON: %s", msg.Data) + t.Logf("Varz: %+v", v) + //require_Equal(t, v.JetStream.Config.AccountNRG, true) + //require_Equal(t, v.JetStream.AccountNRGActive, true) + } +} diff --git a/server/monitor.go b/server/monitor.go index c0c2bc08e83..9a17609533e 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1226,9 +1226,10 @@ type Varz struct { // JetStreamVarz contains basic runtime information about jetstream type JetStreamVarz struct { - Config *JetStreamConfig `json:"config,omitempty"` - Stats *JetStreamStats `json:"stats,omitempty"` - Meta *MetaClusterInfo `json:"meta,omitempty"` + Config *JetStreamConfig `json:"config,omitempty"` + Stats *JetStreamStats `json:"stats,omitempty"` + Meta *MetaClusterInfo `json:"meta,omitempty"` + AccountNRGActive bool `json:"account_nrg_active,omitempty"` } // ClusterOptsVarz contains monitoring cluster information @@ -1443,6 +1444,7 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { } func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) { + v.AccountNRGActive = s.accountNRG.Load() if doConfig { js.mu.RLock() // We want to snapshot the config since it will then be available outside @@ -1821,6 +1823,7 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { // Now update server's varz s.mu.RLock() sv := &s.varz.JetStream + sv.AccountNRGActive = s.accountNRG.Load() if created { sv.Config = v.Config } @@ -2794,11 +2797,12 @@ type JSInfo struct { Disabled bool `json:"disabled,omitempty"` Config JetStreamConfig `json:"config,omitempty"` JetStreamStats - Streams int `json:"streams"` - Consumers int `json:"consumers"` - Messages uint64 `json:"messages"` - Bytes uint64 `json:"bytes"` - Meta *MetaClusterInfo `json:"meta_cluster,omitempty"` + Streams int `json:"streams"` + Consumers int `json:"consumers"` + Messages uint64 `json:"messages"` + Bytes uint64 `json:"bytes"` + Meta *MetaClusterInfo `json:"meta_cluster,omitempty"` + AccountNRGActive bool `json:"account_nrg_active,omitempty"` // aggregate raft info AccountDetails []*AccountDetail `json:"account_details,omitempty"` diff --git a/server/opts.go b/server/opts.go index e717f26ea43..8fe776a5fa8 100644 --- a/server/opts.go +++ b/server/opts.go @@ -317,6 +317,7 @@ type Options struct { JetStreamLimits JSLimitOpts JetStreamTpm JSTpmOpts JetStreamMaxCatchup int64 + JetStreamAccountNRG bool StoreDir string `json:"-"` SyncInterval time.Duration `json:"-"` SyncAlways bool `json:"-"` @@ -2310,6 +2311,8 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)} } opts.JetStreamMaxCatchup = s + case "account_nrg": + opts.JetStreamAccountNRG = mv.(bool) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/raft.go b/server/raft.go index 442ea2b7ec8..b7084aa0f25 100644 --- a/server/raft.go +++ b/server/raft.go @@ -76,6 +76,7 @@ type RaftNode interface { Stop() Delete() Wipe() + RecreateInternalSubs(acc bool) error } type WAL interface { @@ -129,6 +130,8 @@ type raft struct { created time.Time // Time that the group was created accName string // Account name of the asset this raft group is for + acc *Account // Account that NRG traffic will be sent/received in + inAcc bool // Is the NRG traffic in-account right now? group string // Raft group sd string // Store directory id string // Node ID @@ -352,8 +355,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe s.mu.RUnlock() return nil, ErrNoSysAccount } - sq := s.sys.sq - sacc := s.sys.account hash := s.sys.shash s.mu.RUnlock() @@ -381,9 +382,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe acks: make(map[uint64]map[string]struct{}), pae: make(map[uint64]*appendEntry), s: s, - c: s.createInternalSystemClient(), js: s.getJetStream(), - sq: sq, quit: make(chan struct{}), reqs: newIPQueue[*voteRequest](s, qpfx+"vreq"), votes: newIPQueue[*voteResponse](s, qpfx+"vresp"), @@ -397,7 +396,14 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe observer: cfg.Observer, extSt: ps.domainExt, } - n.c.registerWithAccount(sacc) + + // Setup our internal subscriptions for proposals, votes and append entries. + // If we fail to do this for some reason then this is fatal — we cannot + // continue setting up or the Raft node may be partially/totally isolated. + if err := n.RecreateInternalSubs(n.s.opts.JetStreamAccountNRG); err != nil { + n.shutdown(true) + return nil, err + } if atomic.LoadInt32(&s.logging.debug) > 0 { n.dflag = true @@ -495,14 +501,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe } } - // Setup our internal subscriptions for proposals, votes and append entries. - // If we fail to do this for some reason then this is fatal — we cannot - // continue setting up or the Raft node may be partially/totally isolated. - if err := n.createInternalSubs(); err != nil { - n.shutdown(true) - return nil, err - } - n.debug("Started") // Check if we need to start in observer mode due to lame duck status. @@ -531,6 +529,83 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe return n, nil } +// Returns whether peers within this group claim to support +// moving NRG traffic into the asset account. +// Lock must be held. +func (n *raft) checkAccountNRGStatus(acc bool) bool { + if !acc { + return false + } + supported := true + for pn := range n.peers { + if si, ok := n.s.nodeToInfo.Load(pn); ok && si != nil { + supported = supported && si.(nodeInfo).accountNRG + } + } + return supported +} + +func (n *raft) RecreateInternalSubs(acc bool) error { + n.Lock() + defer n.Unlock() + + // Check whether the peers in this group all claim to support + // moving the NRG traffic into the account. + acc = n.checkAccountNRGStatus(acc) + if n.aesub != nil && n.inAcc == acc { + // Subscriptions already exist and the account NRG state + // hasn't changed. + return nil + } + + // Need to cancel any in-progress catch-ups, otherwise the + // inboxes are about to be pulled out from underneath it in + // the next step... + n.cancelCatchup() + + // If we have an existing client then tear down any existing + // subscriptions and close the internal client. + if c := n.c; c != nil { + c.mu.Lock() + subs := make([]*subscription, 0, len(c.subs)) + for _, sub := range c.subs { + subs = append(subs, sub) + } + c.mu.Unlock() + for _, sub := range subs { + n.unsubscribe(sub) + } + c.closeConnection(InternalClient) + } + + // Look up which account we think we should be participating + // on. This will either be the system account (default) or it + // will be the account that the asset is resident in. + var nrgAcc *Account + if n.s.sys != nil { + nrgAcc = n.s.sys.account + } + if acc { // Should we setup in the asset account? + var err error + if nrgAcc, err = n.s.lookupAccount(n.accName); err != nil { + return err + } + } + c := n.s.createInternalSystemClient() + c.registerWithAccount(nrgAcc) + if nrgAcc.sq == nil { + nrgAcc.sq = n.s.newSendQ(nrgAcc) + } + n.c = c + n.sq = nrgAcc.sq + n.acc = nrgAcc + n.inAcc = acc + + // Recreate any internal subscriptions for voting, append + // entries etc in the new account. + return n.createInternalSubs() +} + // outOfResources checks to see if we are out of resources. func (n *raft) outOfResources() bool { js := n.js @@ -1747,9 +1822,8 @@ func (n *raft) unsubscribe(sub *subscription) { } } +// Lock should be held. func (n *raft) createInternalSubs() error { - n.Lock() - defer n.Unlock() n.vsubj, n.vreply = fmt.Sprintf(raftVoteSubj, n.group), n.newInbox() n.asubj, n.areply = fmt.Sprintf(raftAppendSubj, n.group), n.newInbox() n.psubj = fmt.Sprintf(raftPropSubj, n.group) diff --git a/server/route.go b/server/route.go index 7bbcccddf80..f5354a18f89 100644 --- a/server/route.go +++ b/server/route.go @@ -2065,7 +2065,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string // check to be consistent and future proof. but will be same domain if s.sameDomain(info.Domain) { s.nodeToInfo.Store(rHash, - nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false}) + nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false, false}) } } diff --git a/server/sendq.go b/server/sendq.go index 0287c5548a7..2e7b5d03452 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -29,10 +29,11 @@ type sendq struct { mu sync.Mutex q *ipQueue[*outMsg] s *Server + a *Account } -func (s *Server) newSendQ() *sendq { - sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ")} +func (s *Server) newSendQ(acc *Account) *sendq { + sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ"), a: acc} s.startGoRoutine(sq.internalLoop) return sq } @@ -44,8 +45,9 @@ func (sq *sendq) internalLoop() { defer s.grWG.Done() + //c := s.createInternalAccountClient() c := s.createInternalSystemClient() - c.registerWithAccount(s.SystemAccount()) + c.registerWithAccount(sq.a) c.noIcb = true defer c.closeConnection(ClientClosed) diff --git a/server/server.go b/server/server.go index 4f26b86a670..05914907cba 100644 --- a/server/server.go +++ b/server/server.go @@ -358,6 +358,9 @@ type Server struct { // Queue to process JS API requests that come from routes (or gateways) jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq] + + // Whether account NRG is supported cluster-wide or not. + accountNRG atomic.Bool } // For tracking JS nodes. @@ -373,6 +376,7 @@ type nodeInfo struct { offline bool js bool binarySnapshots bool + accountNRG bool } // Make sure all are 64bits for atomic use @@ -764,7 +768,7 @@ func NewServer(opts *Options) (*Server, error) { opts.Tags, &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true}, nil, - false, true, true, + false, true, true, true, }) } @@ -1742,7 +1746,7 @@ func (s *Server) setSystemAccount(acc *Account) error { sendq: newIPQueue[*pubMsg](s, "System sendQ"), recvq: newIPQueue[*inSysMsg](s, "System recvQ"), resetCh: make(chan struct{}), - sq: s.newSendQ(), + sq: s.newSendQ(acc), statsz: eventsHBInterval, orphMax: 5 * eventsHBInterval, chkOrph: 3 * eventsHBInterval, @@ -2319,6 +2323,7 @@ func (s *Server) Start() { Domain: opts.JetStreamDomain, CompressOK: true, UniqueTag: opts.JetStreamUniqueTag, + AccountNRG: opts.JetStreamAccountNRG, } if err := s.EnableJetStream(cfg); err != nil { s.Fatalf("Can't start JetStream: %v", err)