Skip to content

Commit

Permalink
Support setting account NRG in JWT claim, rename option to `nrg_in_ac…
Browse files Browse the repository at this point in the history
…count`

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Jul 30, 2024
1 parent 72b6541 commit c7c6e1c
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 5 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/nats-io/nats-server/v2

go 1.21.0

replace github.com/nats-io/jwt/v2 => github.com/nats-io/jwt/v2 v2.5.9-0.20240730132529-79732145f9be

require (
github.com/google/go-tpm v0.9.0
github.com/klauspost/compress v1.17.9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/jwt/v2 v2.5.9-0.20240730132529-79732145f9be h1:0EpVNpiLRtjqTgsz8TFtVeNzOwpjC/Xu7yXDiqJSxyI=
github.com/nats-io/jwt/v2 v2.5.9-0.20240730132529-79732145f9be/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU=
github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
Expand Down
8 changes: 8 additions & 0 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3680,6 +3680,14 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
a.enableAllJetStreamServiceImportsAndMappings()
}

if a.js != nil {
// Check whether the account NRG status changed. If it has then we need to notify the
// Raft groups running on the system so that they can move their subs if needed.
if wasAccountNRG := a.js.accountNRG.Swap(ac.AccountNRG); wasAccountNRG != ac.AccountNRG {
s.updateNRGAccountStatus()
}
}

for i, c := range clients {
a.mu.RLock()
exceeded := a.mconns != jwt.NoLimit && i >= int(a.mconns)
Expand Down
4 changes: 3 additions & 1 deletion server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -1710,7 +1710,9 @@ func (s *Server) updateNRGAccountStatus() {
for _, n := range raftNodes {
// In the event that the node is happy that all nodes that
// it cares about haven't changed, this will be a no-op.
n.RecreateInternalSubs()
if err := n.RecreateInternalSubs(); err != nil {
n.Stop()
}
}
}

Expand Down
81 changes: 81 additions & 0 deletions server/jetstream_jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1532,3 +1532,84 @@ func TestJetStreamJWTClusteredTiersR3StreamWithR1ConsumersAndAccounting(t *testi
require_Equal(t, r3.Streams, 1)
require_Equal(t, r3.Consumers, 0)
}

func TestJetStreamJWTClusterAccountNRG(t *testing.T) {
_, syspub := createKey(t)
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)

_, aExpPub := createKey(t)
accClaim := jwt.NewAccountClaims(aExpPub)
accClaim.Name = "acc"
accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{DiskStorage: 1100, Consumer: 10, Streams: 1}
accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{DiskStorage: 1100, Consumer: 1, Streams: 1}
accJwt := encodeClaim(t, accClaim, aExpPub)

tmlp := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
leaf {
listen: 127.0.0.1:-1
}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
` + fmt.Sprintf(`
operator: %s
system_account: %s
resolver = MEMORY
resolver_preload = {
%s : %s
%s : %s
}
`, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt)

c := createJetStreamClusterWithTemplate(t, tmlp, "cluster", 3)
defer c.shutdown()

// We'll try flipping the state a few times and then do some sanity
// checks to check that it took effect.
for _, state := range []bool{true, false, true} {
accClaim.AccountNRG = state
accJwt = encodeClaim(t, accClaim, aExpPub)

for _, s := range c.servers {
// Find the account.
acc, err := s.lookupAccount(aExpPub)
require_NoError(t, err)

// Submit a claim update using the new JWT.
require_NoError(t, s.updateAccountWithClaimJWT(acc, accJwt))

// Check that everything looks like it should.
require_True(t, acc != nil)
require_True(t, acc.js != nil)
require_Equal(t, acc.js.accountNRG.Load(), state)

// Now get a list of all of the Raft nodes that should
// have been updated by now.
s.rnMu.Lock()
raftNodes := make([]*raft, 0, len(s.raftNodes))
for _, n := range s.raftNodes {
rg := n.(*raft)
if rg.accName != acc.Name {
continue
}
raftNodes = append(raftNodes, rg)
}
s.rnMu.Unlock()

// Check whether each of the Raft nodes reports being
// in-account or not.
for _, rg := range raftNodes {
rg.Lock()
inAcc := rg.inAcc
rg.Unlock()

require_Equal(t, inAcc, state)
}
}
}
}
2 changes: 1 addition & 1 deletion server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2094,7 +2094,7 @@ func parseJetStreamForAccount(v any, acc *Account, errors *[]error) error {
return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)}
}
jsLimits.MaxAckPending = int(vv)
case "account_nrg":
case "nrg_in_account":
vv, ok := mv.(bool)
if !ok {
return &configErr{tk, fmt.Sprintf("Expected a boolean for %q, got %v", mk, mv)}
Expand Down
2 changes: 1 addition & 1 deletion server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
// If we fail to do this for some reason then this is fatal — we cannot
// continue setting up or the Raft node may be partially/totally isolated.
if err := n.RecreateInternalSubs(); err != nil {
n.shutdown(true)
n.shutdown(false)
return nil, err
}

Expand Down

0 comments on commit c7c6e1c

Please sign in to comment.