From f6b2099baa9fa4e72399be03db1da7fe50f0500f Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 21 Feb 2024 16:15:15 +0000 Subject: [PATCH] Move NRG traffic into asset account This adds a new account NRG capability into statsz so that we can detect when all servers in the cluster support moving traffic into the asset account, instead of all being in the system account. Signed-off-by: Neil Twigg --- server/accounts.go | 1 + server/events.go | 58 +++++++++++++++ server/jetstream.go | 3 + server/jetstream_cluster_4_test.go | 112 +++++++++++++++++++++++++++++ server/monitor.go | 20 +++--- server/opts.go | 3 + server/raft.go | 104 +++++++++++++++++++++++---- server/route.go | 2 +- server/sendq.go | 8 ++- server/server.go | 9 ++- 10 files changed, 291 insertions(+), 29 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index aca96b5d24b..bf6fcfb9564 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -61,6 +61,7 @@ type Account struct { sqmu sync.Mutex sl *Sublist ic *client + sq *sendq isid uint64 etmr *time.Timer ctmr *time.Timer diff --git a/server/events.go b/server/events.go index ff940571a00..6ba450fec46 100644 --- a/server/events.go +++ b/server/events.go @@ -263,6 +263,7 @@ type ServerInfo struct { const ( JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled. BinaryStreamSnapshot // New stream snapshot capability. + AccountNRG // Move NRG traffic out of system account. ) // Set JetStream capability. @@ -288,6 +289,17 @@ func (si *ServerInfo) BinaryStreamSnapshot() bool { return si.Flags&BinaryStreamSnapshot != 0 } +// Set account NRG capability. +func (si *ServerInfo) SetAccountNRG() { + si.Flags |= AccountNRG +} + +// AccountNRG indicates whether or not we support moving the NRG traffic out of the +// system account and into the asset account. +func (si *ServerInfo) AccountNRG() bool { + return si.Flags&AccountNRG != 0 +} + // ClientInfo is detailed information about the client forming a connection. type ClientInfo struct { Start *time.Time `json:"start,omitempty"` @@ -486,6 +498,7 @@ RESET: // New capability based flags. si.SetJetStreamEnabled() si.SetBinaryStreamSnapshot() + si.SetAccountNRG() } } var b []byte @@ -912,6 +925,7 @@ func (s *Server) sendStatsz(subj string) { // JetStream if js := s.js.Load(); js != nil { jStat := &JetStreamVarz{} + jStat.AccountNRGActive = s.accountNRG.Load() s.mu.RUnlock() js.mu.RLock() c := js.config @@ -1589,7 +1603,9 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su false, si.JetStreamEnabled(), si.BinaryStreamSnapshot(), + si.AccountNRG(), }) + s.updateNRGAccountStatus() } // updateRemoteServer is called when we have an update from a remote server. @@ -1636,14 +1652,56 @@ func (s *Server) processNewServer(si *ServerInfo) { false, si.JetStreamEnabled(), si.BinaryStreamSnapshot(), + si.AccountNRG(), }) } } + go s.updateNRGAccountStatus() // Announce ourselves.. // Do this in a separate Go routine. go s.sendStatszUpdate() } +// Works out whether all nodes support moving the NRG traffic into +// the account and moves it appropriately. +// Server lock MUST NOT be held on entry. +func (s *Server) updateNRGAccountStatus() { + var raftNodes []RaftNode + s.optsMu.RLock() + supported := s.opts.JetStreamAccountNRG + s.optsMu.RUnlock() + if supported { + s.rnMu.Lock() + raftNodes = make([]RaftNode, 0, len(s.raftNodes)) + for _, n := range s.raftNodes { + raftNodes = append(raftNodes, n) + } + s.rnMu.Unlock() + s.mu.Lock() + s.nodeToInfo.Range(func(key, value any) bool { + si := value.(nodeInfo) + if !s.sameDomain(si.domain) { + return true + } + if supported = supported && si.accountNRG; !supported { + return false + } + return true + }) + s.mu.Unlock() + } + if s.accountNRG.CompareAndSwap(!supported, supported) { + if supported { + s.Noticef("Moving NRG traffic into asset accounts") + } else { + s.Warnf("Moving NRG traffic back into system account due to old nodes coming online") + } + for _, n := range raftNodes { + n.RecreateInternalSubs(supported) + } + } +} + // If GW is enabled on this server and there are any leaf node connections, // this function will send a LeafNode connect system event to the super cluster // to ensure that the GWs are in interest-only mode for this account. diff --git a/server/jetstream.go b/server/jetstream.go index 7627417d942..14aa7bb488a 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -48,6 +48,7 @@ type JetStreamConfig struct { Domain string `json:"domain,omitempty"` CompressOK bool `json:"compress_ok,omitempty"` UniqueTag string `json:"unique_tag,omitempty"` + AccountNRG bool `json:"account_nrg_enabled,omitempty"` } // Statistics about JetStream for this server. @@ -544,6 +545,7 @@ func (s *Server) restartJetStream() error { MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, Domain: opts.JetStreamDomain, + AccountNRG: opts.JetStreamAccountNRG, } s.Noticef("Restarting JetStream") err := s.EnableJetStream(&cfg) @@ -2513,6 +2515,7 @@ func (s *Server) dynJetStreamConfig(storeDir string, maxStore, maxMem int64) *Je } opts := s.getOpts() + jsc.AccountNRG = opts.JetStreamAccountNRG // Sync options. jsc.SyncInterval = opts.SyncInterval diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 8407187e99b..93bb609adff 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2329,3 +2329,115 @@ func TestJetStreamClusterAckFloorBetweenLeaderAndFollowers(t *testing.T) { } } } + +func TestJetStreamClusterAccountNRG(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!")) + defer snc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: nats.MemoryStorage, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + leader := c.streamLeader(globalAccountName, "TEST") + stream, err := leader.gacc.lookupStream("TEST") + require_NoError(t, err) + rg := stream.node.(*raft) + + // System account should have interest, but the global account + // shouldn't. + for _, s := range c.servers { + require_True(t, s.sys.account.sl.hasInterest(rg.asubj, true)) + require_False(t, s.gacc.sl.hasInterest(rg.asubj, true)) + } + + // Check that varz tells us what we expect. + // set a header to make sure request parsing knows to ignore them + ri := snc.NewRespInbox() + replies, err := snc.SubscribeSync(ri) + require_NoError(t, err) + require_NoError(t, snc.PublishMsg(&nats.Msg{ + Subject: "$SYS.REQ.SERVER.PING.VARZ", + Reply: ri, + })) + for i := 0; i < len(c.servers); i++ { + msg, err := replies.NextMsg(time.Second * 5) + require_NoError(t, err) + var v Varz + require_NoError(t, json.Unmarshal(msg.Data, &v)) + //require_Equal(t, v.JetStream.Config.AccountNRG, false) + require_Equal(t, v.JetStream.AccountNRGActive, false) + } + p, _, _ := replies.Pending() + require_Equal(t, p, 0) + + // First of all check that the Raft traffic is in the system + // account, as we haven't moved it elsewhere yet. + { + sub, err := snc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) + + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + + // Switch on account NRG on all servers in the cluster. Then + // we wait, as we will need statsz to be sent for all servers + // in the cluster. + for _, s := range c.servers { + s.optsMu.Lock() + s.opts.JetStreamAccountNRG = true + s.optsMu.Unlock() + s.updateNRGAccountStatus() + } + + // Now check that the traffic has moved into the asset acc. + // In this case the system account should no longer have + // subscriptions for those subjects. + { + sub, err := nc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) + + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + + // The global account should now have interest and the + // system account shouldn't. + for _, s := range c.servers { + require_False(t, s.sys.account.sl.hasInterest(rg.asubj, true)) + require_True(t, s.gacc.sl.hasInterest(rg.asubj, true)) + } + + // Check varz again. + require_NoError(t, snc.PublishMsg(&nats.Msg{ + Subject: "$SYS.REQ.SERVER.PING.VARZ", + Reply: ri, + })) + for i := 0; i < len(c.servers); i++ { + msg, err := replies.NextMsg(time.Second * 5) + require_NoError(t, err) + var v Varz + //Server Varz `json:"server"` + //} + require_NoError(t, json.Unmarshal(msg.Data, &v)) + t.Logf("JSON: %s", msg.Data) + t.Logf("Varz: %+v", v) + //require_Equal(t, v.JetStream.Config.AccountNRG, true) + //require_Equal(t, v.JetStream.AccountNRGActive, true) + } +} diff --git a/server/monitor.go b/server/monitor.go index c0c2bc08e83..9a17609533e 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1226,9 +1226,10 @@ type Varz struct { // JetStreamVarz contains basic runtime information about jetstream type JetStreamVarz struct { - Config *JetStreamConfig `json:"config,omitempty"` - Stats *JetStreamStats `json:"stats,omitempty"` - Meta *MetaClusterInfo `json:"meta,omitempty"` + Config *JetStreamConfig `json:"config,omitempty"` + Stats *JetStreamStats `json:"stats,omitempty"` + Meta *MetaClusterInfo `json:"meta,omitempty"` + AccountNRGActive bool `json:"account_nrg_active,omitempty"` } // ClusterOptsVarz contains monitoring cluster information @@ -1443,6 +1444,7 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { } func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) { + v.AccountNRGActive = s.accountNRG.Load() if doConfig { js.mu.RLock() // We want to snapshot the config since it will then be available outside @@ -1821,6 +1823,7 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { // Now update server's varz s.mu.RLock() sv := &s.varz.JetStream + sv.AccountNRGActive = s.accountNRG.Load() if created { sv.Config = v.Config } @@ -2794,11 +2797,12 @@ type JSInfo struct { Disabled bool `json:"disabled,omitempty"` Config JetStreamConfig `json:"config,omitempty"` JetStreamStats - Streams int `json:"streams"` - Consumers int `json:"consumers"` - Messages uint64 `json:"messages"` - Bytes uint64 `json:"bytes"` - Meta *MetaClusterInfo `json:"meta_cluster,omitempty"` + Streams int `json:"streams"` + Consumers int `json:"consumers"` + Messages uint64 `json:"messages"` + Bytes uint64 `json:"bytes"` + Meta *MetaClusterInfo `json:"meta_cluster,omitempty"` + AccountNRGActive bool `json:"account_nrg_active,omitempty"` // aggregate raft info AccountDetails []*AccountDetail `json:"account_details,omitempty"` diff --git a/server/opts.go b/server/opts.go index e717f26ea43..8fe776a5fa8 100644 --- a/server/opts.go +++ b/server/opts.go @@ -317,6 +317,7 @@ type Options struct { JetStreamLimits JSLimitOpts JetStreamTpm JSTpmOpts JetStreamMaxCatchup int64 + JetStreamAccountNRG bool StoreDir string `json:"-"` SyncInterval time.Duration `json:"-"` SyncAlways bool `json:"-"` @@ -2310,6 +2311,8 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)} } opts.JetStreamMaxCatchup = s + case "account_nrg": + opts.JetStreamAccountNRG = mv.(bool) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/raft.go b/server/raft.go index 442ea2b7ec8..b7084aa0f25 100644 --- a/server/raft.go +++ b/server/raft.go @@ -76,6 +76,7 @@ type RaftNode interface { Stop() Delete() Wipe() + RecreateInternalSubs(acc bool) error } type WAL interface { @@ -129,6 +130,8 @@ type raft struct { created time.Time // Time that the group was created accName string // Account name of the asset this raft group is for + acc *Account // Account that NRG traffic will be sent/received in + inAcc bool // Is the NRG traffic in-account right now? group string // Raft group sd string // Store directory id string // Node ID @@ -352,8 +355,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe s.mu.RUnlock() return nil, ErrNoSysAccount } - sq := s.sys.sq - sacc := s.sys.account hash := s.sys.shash s.mu.RUnlock() @@ -381,9 +382,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe acks: make(map[uint64]map[string]struct{}), pae: make(map[uint64]*appendEntry), s: s, - c: s.createInternalSystemClient(), js: s.getJetStream(), - sq: sq, quit: make(chan struct{}), reqs: newIPQueue[*voteRequest](s, qpfx+"vreq"), votes: newIPQueue[*voteResponse](s, qpfx+"vresp"), @@ -397,7 +396,14 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe observer: cfg.Observer, extSt: ps.domainExt, } - n.c.registerWithAccount(sacc) + + // Setup our internal subscriptions for proposals, votes and append entries. + // If we fail to do this for some reason then this is fatal — we cannot + // continue setting up or the Raft node may be partially/totally isolated. + if err := n.RecreateInternalSubs(n.s.opts.JetStreamAccountNRG); err != nil { + n.shutdown(true) + return nil, err + } if atomic.LoadInt32(&s.logging.debug) > 0 { n.dflag = true @@ -495,14 +501,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe } } - // Setup our internal subscriptions for proposals, votes and append entries. - // If we fail to do this for some reason then this is fatal — we cannot - // continue setting up or the Raft node may be partially/totally isolated. - if err := n.createInternalSubs(); err != nil { - n.shutdown(true) - return nil, err - } - n.debug("Started") // Check if we need to start in observer mode due to lame duck status. @@ -531,6 +529,83 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe return n, nil } +// Returns whether peers within this group claim to support +// moving NRG traffic into the asset account. +// Lock must be held. +func (n *raft) checkAccountNRGStatus(acc bool) bool { + if !acc { + return false + } + supported := true + for pn := range n.peers { + if si, ok := n.s.nodeToInfo.Load(pn); ok && si != nil { + supported = supported && si.(nodeInfo).accountNRG + } + } + return supported +} + +func (n *raft) RecreateInternalSubs(acc bool) error { + n.Lock() + defer n.Unlock() + + // Check whether the peers in this group all claim to support + // moving the NRG traffic into the account. + acc = n.checkAccountNRGStatus(acc) + if n.aesub != nil && n.inAcc == acc { + // Subscriptions already exist and the account NRG state + // hasn't changed. + return nil + } + + // Need to cancel any in-progress catch-ups, otherwise the + // inboxes are about to be pulled out from underneath it in + // the next step... + n.cancelCatchup() + + // If we have an existing client then tear down any existing + // subscriptions and close the internal client. + if c := n.c; c != nil { + c.mu.Lock() + subs := make([]*subscription, 0, len(c.subs)) + for _, sub := range c.subs { + subs = append(subs, sub) + } + c.mu.Unlock() + for _, sub := range subs { + n.unsubscribe(sub) + } + c.closeConnection(InternalClient) + } + + // Look up which account we think we should be participating + // on. This will either be the system account (default) or it + // will be the account that the asset is resident in. + var nrgAcc *Account + if n.s.sys != nil { + nrgAcc = n.s.sys.account + } + if acc { // Should we setup in the asset account? + var err error + if nrgAcc, err = n.s.lookupAccount(n.accName); err != nil { + return err + } + } + c := n.s.createInternalSystemClient() + c.registerWithAccount(nrgAcc) + if nrgAcc.sq == nil { + nrgAcc.sq = n.s.newSendQ(nrgAcc) + } + n.c = c + n.sq = nrgAcc.sq + n.acc = nrgAcc + n.inAcc = acc + + // Recreate any internal subscriptions for voting, append + // entries etc in the new account. + return n.createInternalSubs() +} + // outOfResources checks to see if we are out of resources. func (n *raft) outOfResources() bool { js := n.js @@ -1747,9 +1822,8 @@ func (n *raft) unsubscribe(sub *subscription) { } } +// Lock should be held. func (n *raft) createInternalSubs() error { - n.Lock() - defer n.Unlock() n.vsubj, n.vreply = fmt.Sprintf(raftVoteSubj, n.group), n.newInbox() n.asubj, n.areply = fmt.Sprintf(raftAppendSubj, n.group), n.newInbox() n.psubj = fmt.Sprintf(raftPropSubj, n.group) diff --git a/server/route.go b/server/route.go index 7bbcccddf80..f5354a18f89 100644 --- a/server/route.go +++ b/server/route.go @@ -2065,7 +2065,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string // check to be consistent and future proof. but will be same domain if s.sameDomain(info.Domain) { s.nodeToInfo.Store(rHash, - nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false}) + nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false, false}) } } diff --git a/server/sendq.go b/server/sendq.go index 0287c5548a7..2e7b5d03452 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -29,10 +29,11 @@ type sendq struct { mu sync.Mutex q *ipQueue[*outMsg] s *Server + a *Account } -func (s *Server) newSendQ() *sendq { - sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ")} +func (s *Server) newSendQ(acc *Account) *sendq { + sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ"), a: acc} s.startGoRoutine(sq.internalLoop) return sq } @@ -44,8 +45,9 @@ func (sq *sendq) internalLoop() { defer s.grWG.Done() + //c := s.createInternalAccountClient() c := s.createInternalSystemClient() - c.registerWithAccount(s.SystemAccount()) + c.registerWithAccount(sq.a) c.noIcb = true defer c.closeConnection(ClientClosed) diff --git a/server/server.go b/server/server.go index 4f26b86a670..05914907cba 100644 --- a/server/server.go +++ b/server/server.go @@ -358,6 +358,9 @@ type Server struct { // Queue to process JS API requests that come from routes (or gateways) jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq] + + // Whether account NRG is supported cluster-wide or not. + accountNRG atomic.Bool } // For tracking JS nodes. @@ -373,6 +376,7 @@ type nodeInfo struct { offline bool js bool binarySnapshots bool + accountNRG bool } // Make sure all are 64bits for atomic use @@ -764,7 +768,7 @@ func NewServer(opts *Options) (*Server, error) { opts.Tags, &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true}, nil, - false, true, true, + false, true, true, true, }) } @@ -1742,7 +1746,7 @@ func (s *Server) setSystemAccount(acc *Account) error { sendq: newIPQueue[*pubMsg](s, "System sendQ"), recvq: newIPQueue[*inSysMsg](s, "System recvQ"), resetCh: make(chan struct{}), - sq: s.newSendQ(), + sq: s.newSendQ(acc), statsz: eventsHBInterval, orphMax: 5 * eventsHBInterval, chkOrph: 3 * eventsHBInterval, @@ -2319,6 +2323,7 @@ func (s *Server) Start() { Domain: opts.JetStreamDomain, CompressOK: true, UniqueTag: opts.JetStreamUniqueTag, + AccountNRG: opts.JetStreamAccountNRG, } if err := s.EnableJetStream(cfg); err != nil { s.Fatalf("Can't start JetStream: %v", err)