Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NRG (2.11): Add ability to move cluster traffic into asset accounts #5466

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/google/go-tpm v0.9.0
github.com/klauspost/compress v1.17.9
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.5.8
github.com/nats-io/jwt/v2 v2.6.0
github.com/nats-io/nats.go v1.36.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/jwt/v2 v2.6.0 h1:yXoBTdEotZw3NujMT+Nnu1UPNlFWdKQ3d0JJF/+pJag=
github.com/nats-io/jwt/v2 v2.6.0/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU=
github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
Expand All @@ -22,13 +22,9 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
Expand Down
22 changes: 22 additions & 0 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Account struct {
sqmu sync.Mutex
sl *Sublist
ic *client
sq *sendq
isid uint64
etmr *time.Timer
ctmr *time.Timer
Expand Down Expand Up @@ -3655,6 +3656,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim

a.updated = time.Now()
clients := a.getClientsLocked()
ajs := a.js
a.mu.Unlock()

// Sort if we are over the limit.
Expand All @@ -3679,6 +3681,26 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
a.enableAllJetStreamServiceImportsAndMappings()
}

if ajs != nil {
// Check whether the account NRG status changed. If it has then we need to notify the
// Raft groups running on the system so that they can move their subs if needed.
a.mu.Lock()
previous := ajs.nrgAccount
switch tokens := strings.SplitN(ac.ClusterTraffic, ":", 2); tokens[0] {
case "system":
a.js.nrgAccount = _EMPTY_
case "owner":
a.js.nrgAccount = a.Name
default:
s.Errorf("Account claim for %q has invalid value %q for cluster traffic account", a.Name, ac.ClusterTraffic)
Copy link
Member

@aricart aricart Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a PR for the validation of the JWT, if this is the case, the JWT can be validated after you decode. See nats-io/jwt#224

}
changed := ajs.nrgAccount != previous
a.mu.Unlock()
if changed {
s.updateNRGAccountStatus()
}
}

for i, c := range clients {
a.mu.RLock()
exceeded := a.mconns != jwt.NoLimit && i >= int(a.mconns)
Expand Down
47 changes: 46 additions & 1 deletion server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ type ServerInfo struct {
const (
JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled.
BinaryStreamSnapshot // New stream snapshot capability.
AccountNRG // Move NRG traffic out of system account.
)

// Set JetStream capability.
Expand All @@ -289,6 +290,17 @@ func (si *ServerInfo) BinaryStreamSnapshot() bool {
return si.Flags&BinaryStreamSnapshot != 0
}

// Set account NRG capability.
func (si *ServerInfo) SetAccountNRG() {
si.Flags |= AccountNRG
}

// AccountNRG indicates whether or not we support moving the NRG traffic out of the
// system account and into the asset account.
func (si *ServerInfo) AccountNRG() bool {
return si.Flags&AccountNRG != 0
}

// ClientInfo is detailed information about the client forming a connection.
type ClientInfo struct {
Start *time.Time `json:"start,omitempty"`
Expand Down Expand Up @@ -475,10 +487,14 @@ RESET:
si.Version = VERSION
si.Time = time.Now().UTC()
si.Tags = tags
si.Flags = 0
if js {
// New capability based flags.
si.SetJetStreamEnabled()
si.SetBinaryStreamSnapshot()
if s.accountNRGAllowed.Load() {
si.SetAccountNRG()
}
}
}
var b []byte
Expand Down Expand Up @@ -1616,7 +1632,8 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
}

node := getHash(si.Name)
s.nodeToInfo.Store(node, nodeInfo{
accountNRG := si.AccountNRG()
oldInfo, _ := s.nodeToInfo.Swap(node, nodeInfo{
si.Name,
si.Version,
si.Cluster,
Expand All @@ -1628,7 +1645,14 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
false,
si.JetStreamEnabled(),
si.BinaryStreamSnapshot(),
accountNRG,
})
if oldInfo == nil || accountNRG != oldInfo.(nodeInfo).accountNRG {
// One of the servers we received statsz from changed its mind about
// whether or not it supports in-account NRG, so update the groups
// with this information.
s.updateNRGAccountStatus()
}
}

// updateRemoteServer is called when we have an update from a remote server.
Expand Down Expand Up @@ -1675,14 +1699,35 @@ func (s *Server) processNewServer(si *ServerInfo) {
false,
si.JetStreamEnabled(),
si.BinaryStreamSnapshot(),
si.AccountNRG(),
})
}
}
go s.updateNRGAccountStatus()
// Announce ourselves..
// Do this in a separate Go routine.
go s.sendStatszUpdate()
}

// Works out whether all nodes support moving the NRG traffic into
// the account and moves it appropriately.
// Server lock MUST NOT be held on entry.
func (s *Server) updateNRGAccountStatus() {
s.rnMu.RLock()
raftNodes := make([]RaftNode, 0, len(s.raftNodes))
for _, n := range s.raftNodes {
raftNodes = append(raftNodes, n)
}
s.rnMu.RUnlock()
for _, n := range raftNodes {
// In the event that the node is happy that all nodes that
// it cares about haven't changed, this will be a no-op.
if err := n.RecreateInternalSubs(); err != nil {
n.Stop()
}
}
}

// If GW is enabled on this server and there are any leaf node connections,
// this function will send a LeafNode connect system event to the super cluster
// to ensure that the GWs are in interest-only mode for this account.
Expand Down
3 changes: 3 additions & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ type jsAccount struct {
updatesSub *subscription
lupdate time.Time
utimer *time.Timer

// Which account to send NRG traffic into. Empty string is system account.
nrgAccount string
}

// Track general usage for this account.
Expand Down
144 changes: 144 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2505,6 +2505,150 @@ func TestJetStreamClusterConsumerLeak(t *testing.T) {
}
}

func TestJetStreamClusterAccountNRG(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Storage: nats.MemoryStorage,
Retention: nats.WorkQueuePolicy,
Replicas: 3,
})
require_NoError(t, err)

leader := c.streamLeader(globalAccountName, "TEST")
stream, err := leader.gacc.lookupStream("TEST")
require_NoError(t, err)
rg := stream.node.(*raft)

t.Run("Disabled", func(t *testing.T) {
// Switch off account NRG on all servers in the cluster.
for _, s := range c.servers {
s.accountNRGAllowed.Store(false)
s.sendStatszUpdate()
}
time.Sleep(time.Millisecond * 100)
for _, s := range c.servers {
s.GlobalAccount().js.nrgAccount = ""
s.updateNRGAccountStatus()
}

// Check account interest for the AppendEntry subject.
checkFor(t, time.Second, time.Millisecond*25, func() error {
for _, s := range c.servers {
if !s.sys.account.sl.hasInterest(rg.asubj, true) {
return fmt.Errorf("system account should have interest")
}
if s.gacc.sl.hasInterest(rg.asubj, true) {
return fmt.Errorf("global account shouldn't have interest")
}
}
return nil
})

// Check that the Raft traffic is in the system account, as we
// haven't moved it elsewhere yet.
{
sub, err := snc.SubscribeSync(rg.asubj)
require_NoError(t, err)
require_NoError(t, sub.AutoUnsubscribe(1))

msg, err := sub.NextMsg(time.Second * 3)
require_NoError(t, err)
require_True(t, msg != nil)
}
})

t.Run("Mixed", func(t *testing.T) {
// Switch on account NRG on a single server in the cluster and
// leave it off on the rest.
for i, s := range c.servers {
s.accountNRGAllowed.Store(i == 0)
s.sendStatszUpdate()
}
time.Sleep(time.Millisecond * 100)
for i, s := range c.servers {
if i == 0 {
s.GlobalAccount().js.nrgAccount = globalAccountName
} else {
s.GlobalAccount().js.nrgAccount = ""
}
s.updateNRGAccountStatus()
}

// Check account interest for the AppendEntry subject.
checkFor(t, time.Second, time.Millisecond*25, func() error {
for _, s := range c.servers {
if !s.sys.account.sl.hasInterest(rg.asubj, true) {
return fmt.Errorf("system account should have interest")
}
if s.gacc.sl.hasInterest(rg.asubj, true) {
return fmt.Errorf("global account shouldn't have interest")
}
}
return nil
})

// Check that the Raft traffic is in the system account, as we
// don't claim support for account NRG on all nodes in the group.
{
sub, err := snc.SubscribeSync(rg.asubj)
require_NoError(t, err)
require_NoError(t, sub.AutoUnsubscribe(1))

msg, err := sub.NextMsg(time.Second * 3)
require_NoError(t, err)
require_True(t, msg != nil)
}
})

t.Run("Enabled", func(t *testing.T) {
// Switch on account NRG on all servers in the cluster.
for _, s := range c.servers {
s.accountNRGAllowed.Store(true)
s.sendStatszUpdate()
}
time.Sleep(time.Millisecond * 100)
for _, s := range c.servers {
s.GlobalAccount().js.nrgAccount = globalAccountName
s.updateNRGAccountStatus()
}

// Check account interest for the AppendEntry subject.
checkFor(t, time.Second, time.Millisecond*25, func() error {
for _, s := range c.servers {
if s.sys.account.sl.hasInterest(rg.asubj, true) {
return fmt.Errorf("system account shouldn't have interest")
}
if !s.gacc.sl.hasInterest(rg.asubj, true) {
return fmt.Errorf("global account should have interest")
}
}
return nil
})

// Check that the traffic moved into the global account as
// expected.
{
sub, err := nc.SubscribeSync(rg.asubj)
require_NoError(t, err)
require_NoError(t, sub.AutoUnsubscribe(1))

msg, err := sub.NextMsg(time.Second * 3)
require_NoError(t, err)
require_True(t, msg != nil)
}
})
}

func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down
Loading