diff --git a/go.mod b/go.mod index 5f3178b346c..2c931d506f2 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/nats-io/nats-server/v2 go 1.21.0 +replace github.com/nats-io/jwt/v2 => github.com/nats-io/jwt/v2 v2.5.9-0.20240730132529-79732145f9be + require ( github.com/google/go-tpm v0.9.0 github.com/klauspost/compress v1.17.9 diff --git a/go.sum b/go.sum index 6fa3decb262..e7c20f789ac 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2 github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= -github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= -github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/jwt/v2 v2.5.9-0.20240730132529-79732145f9be h1:0EpVNpiLRtjqTgsz8TFtVeNzOwpjC/Xu7yXDiqJSxyI= +github.com/nats-io/jwt/v2 v2.5.9-0.20240730132529-79732145f9be/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU= github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= diff --git a/server/accounts.go b/server/accounts.go index e2bfbc91400..72c74795ea6 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -3683,6 +3683,14 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim a.enableAllJetStreamServiceImportsAndMappings() } + if a.js != nil { + // Check whether the account NRG status changed. If it has then we need to notify the + // Raft groups running on the system so that they can move their subs if needed. + if wasAccountNRG := a.js.accountNRG.Swap(ac.AccountNRG); wasAccountNRG != ac.AccountNRG { + s.updateNRGAccountStatus() + } + } + for i, c := range clients { a.mu.RLock() exceeded := a.mconns != jwt.NoLimit && i >= int(a.mconns) diff --git a/server/events.go b/server/events.go index 6e16037246c..99a3c19548c 100644 --- a/server/events.go +++ b/server/events.go @@ -1710,7 +1710,9 @@ func (s *Server) updateNRGAccountStatus() { for _, n := range raftNodes { // In the event that the node is happy that all nodes that // it cares about haven't changed, this will be a no-op. - n.RecreateInternalSubs() + if err := n.RecreateInternalSubs(); err != nil { + n.Stop() + } } } diff --git a/server/jetstream_jwt_test.go b/server/jetstream_jwt_test.go index f8f6466f69e..bbc5d2d1237 100644 --- a/server/jetstream_jwt_test.go +++ b/server/jetstream_jwt_test.go @@ -1532,3 +1532,84 @@ func TestJetStreamJWTClusteredTiersR3StreamWithR1ConsumersAndAccounting(t *testi require_Equal(t, r3.Streams, 1) require_Equal(t, r3.Consumers, 0) } + +func TestJetStreamJWTClusterAccountNRG(t *testing.T) { + _, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + + _, aExpPub := createKey(t) + accClaim := jwt.NewAccountClaims(aExpPub) + accClaim.Name = "acc" + accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{DiskStorage: 1100, Consumer: 10, Streams: 1} + accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{DiskStorage: 1100, Consumer: 1, Streams: 1} + accJwt := encodeClaim(t, accClaim, aExpPub) + + tmlp := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + leaf { + listen: 127.0.0.1:-1 + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + ` + fmt.Sprintf(` + operator: %s + system_account: %s + resolver = MEMORY + resolver_preload = { + %s : %s + %s : %s + } + `, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt) + + c := createJetStreamClusterWithTemplate(t, tmlp, "cluster", 3) + defer c.shutdown() + + // We'll try flipping the state a few times and then do some sanity + // checks to check that it took effect. + for _, state := range []bool{true, false, true} { + accClaim.AccountNRG = state + accJwt = encodeClaim(t, accClaim, aExpPub) + + for _, s := range c.servers { + // Find the account. + acc, err := s.lookupAccount(aExpPub) + require_NoError(t, err) + + // Submit a claim update using the new JWT. + require_NoError(t, s.updateAccountWithClaimJWT(acc, accJwt)) + + // Check that everything looks like it should. + require_True(t, acc != nil) + require_True(t, acc.js != nil) + require_Equal(t, acc.js.accountNRG.Load(), state) + + // Now get a list of all of the Raft nodes that should + // have been updated by now. + s.rnMu.Lock() + raftNodes := make([]*raft, 0, len(s.raftNodes)) + for _, n := range s.raftNodes { + rg := n.(*raft) + if rg.accName != acc.Name { + continue + } + raftNodes = append(raftNodes, rg) + } + s.rnMu.Unlock() + + // Check whether each of the Raft nodes reports being + // in-account or not. + for _, rg := range raftNodes { + rg.Lock() + inAcc := rg.inAcc + rg.Unlock() + + require_Equal(t, inAcc, state) + } + } + } +} diff --git a/server/opts.go b/server/opts.go index fa078305ab6..7c77ad2e122 100644 --- a/server/opts.go +++ b/server/opts.go @@ -2128,7 +2128,7 @@ func parseJetStreamForAccount(v any, acc *Account, errors *[]error) error { return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } jsLimits.MaxAckPending = int(vv) - case "account_nrg": + case "nrg_in_account": vv, ok := mv.(bool) if !ok { return &configErr{tk, fmt.Sprintf("Expected a boolean for %q, got %v", mk, mv)} diff --git a/server/raft.go b/server/raft.go index 4f79fdcebf7..078b438a0d2 100644 --- a/server/raft.go +++ b/server/raft.go @@ -399,7 +399,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe // If we fail to do this for some reason then this is fatal — we cannot // continue setting up or the Raft node may be partially/totally isolated. if err := n.RecreateInternalSubs(); err != nil { - n.shutdown(true) + n.shutdown(false) return nil, err }