Skip to content

Commit

Permalink
Move NRG traffic into asset account
Browse files Browse the repository at this point in the history
This adds a new account NRG capability into statsz so that we can
detect when all servers in the cluster support moving traffic into
the asset account, instead of all being in the system account.

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Sep 10, 2024
1 parent 7f92c34 commit 13ba3f0
Show file tree
Hide file tree
Showing 12 changed files with 459 additions and 24 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/nats-io/nats-server/v2

go 1.22.0

replace github.com/nats-io/jwt/v2 => github.com/nats-io/jwt/v2 v2.5.9-0.20240910093232-2e1251e51094

require (
github.com/google/go-tpm v0.9.0
github.com/klauspost/compress v1.17.9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/jwt/v2 v2.5.9-0.20240910093232-2e1251e51094 h1:8yMEZG1tTWFB7h4xEiucEJEYd4geU3YO5C3hFYjZToc=
github.com/nats-io/jwt/v2 v2.5.9-0.20240910093232-2e1251e51094/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU=
github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
Expand Down
23 changes: 23 additions & 0 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Account struct {
sqmu sync.Mutex
sl *Sublist
ic *client
sq *sendq
isid uint64
etmr *time.Timer
ctmr *time.Timer
Expand Down Expand Up @@ -3679,6 +3680,28 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
a.enableAllJetStreamServiceImportsAndMappings()
}

if a.js != nil {
// Check whether the account NRG status changed. If it has then we need to notify the
// Raft groups running on the system so that they can move their subs if needed.
a.mu.Lock()
previous := a.js.nrgAccount
switch tokens := strings.SplitN(ac.ClusterTraffic, ":", 2); tokens[0] {
case "system":
a.js.nrgAccount = ""
case "owner":
a.js.nrgAccount = a.Name
case "account":
a.js.nrgAccount = tokens[1]
default:
s.Errorf("Account claim for %q has invalid value %q for account NRG status", a.Name, ac.ClusterTraffic)
}
changed := a.js.nrgAccount != previous
a.mu.Unlock()
if changed {
s.updateNRGAccountStatus()
}
}

for i, c := range clients {
a.mu.RLock()
exceeded := a.mconns != jwt.NoLimit && i >= int(a.mconns)
Expand Down
47 changes: 46 additions & 1 deletion server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ type ServerInfo struct {
const (
JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled.
BinaryStreamSnapshot // New stream snapshot capability.
AccountNRG // Move NRG traffic out of system account.
)

// Set JetStream capability.
Expand All @@ -289,6 +290,17 @@ func (si *ServerInfo) BinaryStreamSnapshot() bool {
return si.Flags&BinaryStreamSnapshot != 0
}

// Set account NRG capability.
func (si *ServerInfo) SetAccountNRG() {
si.Flags |= AccountNRG
}

// AccountNRG indicates whether or not we support moving the NRG traffic out of the
// system account and into the asset account.
func (si *ServerInfo) AccountNRG() bool {
return si.Flags&AccountNRG != 0
}

// ClientInfo is detailed information about the client forming a connection.
type ClientInfo struct {
Start *time.Time `json:"start,omitempty"`
Expand Down Expand Up @@ -475,10 +487,14 @@ RESET:
si.Version = VERSION
si.Time = time.Now().UTC()
si.Tags = tags
si.Flags = 0
if js {
// New capability based flags.
si.SetJetStreamEnabled()
si.SetBinaryStreamSnapshot()
if s.accountNRGAllowed.Load() {
si.SetAccountNRG()
}
}
}
var b []byte
Expand Down Expand Up @@ -1616,7 +1632,8 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
}

node := getHash(si.Name)
s.nodeToInfo.Store(node, nodeInfo{
accountNRG := si.AccountNRG()
oldInfo, _ := s.nodeToInfo.Swap(node, nodeInfo{
si.Name,
si.Version,
si.Cluster,
Expand All @@ -1628,7 +1645,14 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
false,
si.JetStreamEnabled(),
si.BinaryStreamSnapshot(),
accountNRG,
})
if oldInfo == nil || accountNRG != oldInfo.(nodeInfo).accountNRG {
// One of the servers we received statsz from changed its mind about
// whether or not it supports in-account NRG, so update the groups
// with this information.
s.updateNRGAccountStatus()
}
}

// updateRemoteServer is called when we have an update from a remote server.
Expand Down Expand Up @@ -1675,14 +1699,35 @@ func (s *Server) processNewServer(si *ServerInfo) {
false,
si.JetStreamEnabled(),
si.BinaryStreamSnapshot(),
si.AccountNRG(),
})
}
}
go s.updateNRGAccountStatus()
// Announce ourselves..
// Do this in a separate Go routine.
go s.sendStatszUpdate()
}

// Works out whether all nodes support moving the NRG traffic into
// the account and moves it appropriately.
// Server lock MUST NOT be held on entry.
func (s *Server) updateNRGAccountStatus() {
s.rnMu.Lock()
raftNodes := make([]RaftNode, 0, len(s.raftNodes))
for _, n := range s.raftNodes {
raftNodes = append(raftNodes, n)
}
s.rnMu.Unlock()
for _, n := range raftNodes {
// In the event that the node is happy that all nodes that
// it cares about haven't changed, this will be a no-op.
if err := n.RecreateInternalSubs(); err != nil {
n.Stop()
}
}
}

// If GW is enabled on this server and there are any leaf node connections,
// this function will send a LeafNode connect system event to the super cluster
// to ensure that the GWs are in interest-only mode for this account.
Expand Down
3 changes: 3 additions & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ type jsAccount struct {
updatesSub *subscription
lupdate time.Time
utimer *time.Timer

// Which account to send NRG traffic into. Empty string is system account.
nrgAccount string
}

// Track general usage for this account.
Expand Down
144 changes: 144 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2505,6 +2505,150 @@ func TestJetStreamClusterConsumerLeak(t *testing.T) {
}
}

func TestJetStreamClusterAccountNRG(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Storage: nats.MemoryStorage,
Retention: nats.WorkQueuePolicy,
Replicas: 3,
})
require_NoError(t, err)

leader := c.streamLeader(globalAccountName, "TEST")
stream, err := leader.gacc.lookupStream("TEST")
require_NoError(t, err)
rg := stream.node.(*raft)

t.Run("Disabled", func(t *testing.T) {
// Switch off account NRG on all servers in the cluster.
for _, s := range c.servers {
s.accountNRGAllowed.Store(false)
s.sendStatszUpdate()
}
time.Sleep(time.Millisecond * 100)
for _, s := range c.servers {
s.GlobalAccount().js.nrgAccount = ""
s.updateNRGAccountStatus()
}

// Check account interest for the AppendEntry subject.
checkFor(t, time.Second, time.Millisecond*25, func() error {
for _, s := range c.servers {
if !s.sys.account.sl.hasInterest(rg.asubj, true) {
return fmt.Errorf("system account should have interest")
}
if s.gacc.sl.hasInterest(rg.asubj, true) {
return fmt.Errorf("global account shouldn't have interest")
}
}
return nil
})

// Check that the Raft traffic is in the system account, as we
// haven't moved it elsewhere yet.
{
sub, err := snc.SubscribeSync(rg.asubj)
require_NoError(t, err)
require_NoError(t, sub.AutoUnsubscribe(1))

msg, err := sub.NextMsg(time.Second * 3)
require_NoError(t, err)
require_True(t, msg != nil)
}
})

t.Run("Mixed", func(t *testing.T) {
// Switch on account NRG on a single server in the cluster and
// leave it off on the rest.
for i, s := range c.servers {
s.accountNRGAllowed.Store(i == 0)
s.sendStatszUpdate()
}
time.Sleep(time.Millisecond * 100)
for i, s := range c.servers {
if i == 0 {
s.GlobalAccount().js.nrgAccount = globalAccountName
} else {
s.GlobalAccount().js.nrgAccount = ""
}
s.updateNRGAccountStatus()
}

// Check account interest for the AppendEntry subject.
checkFor(t, time.Second, time.Millisecond*25, func() error {
for _, s := range c.servers {
if !s.sys.account.sl.hasInterest(rg.asubj, true) {
return fmt.Errorf("system account should have interest")
}
if s.gacc.sl.hasInterest(rg.asubj, true) {
return fmt.Errorf("global account shouldn't have interest")
}
}
return nil
})

// Check that the Raft traffic is in the system account, as we
// don't claim support for account NRG on all nodes in the group.
{
sub, err := snc.SubscribeSync(rg.asubj)
require_NoError(t, err)
require_NoError(t, sub.AutoUnsubscribe(1))

msg, err := sub.NextMsg(time.Second * 3)
require_NoError(t, err)
require_True(t, msg != nil)
}
})

t.Run("Enabled", func(t *testing.T) {
// Switch on account NRG on all servers in the cluster.
for _, s := range c.servers {
s.accountNRGAllowed.Store(true)
s.sendStatszUpdate()
}
time.Sleep(time.Millisecond * 100)
for _, s := range c.servers {
s.GlobalAccount().js.nrgAccount = globalAccountName
s.updateNRGAccountStatus()
}

// Check account interest for the AppendEntry subject.
checkFor(t, time.Second, time.Millisecond*25, func() error {
for _, s := range c.servers {
if s.sys.account.sl.hasInterest(rg.asubj, true) {
return fmt.Errorf("system account shouldn't have interest")
}
if !s.gacc.sl.hasInterest(rg.asubj, true) {
return fmt.Errorf("global account should have interest")
}
}
return nil
})

// Check that the traffic moved into the global account as
// expected.
{
sub, err := nc.SubscribeSync(rg.asubj)
require_NoError(t, err)
require_NoError(t, sub.AutoUnsubscribe(1))

msg, err := sub.NextMsg(time.Second * 3)
require_NoError(t, err)
require_True(t, msg != nil)
}
})
}

func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down
Loading

0 comments on commit 13ba3f0

Please sign in to comment.