Skip to content

Commit

Permalink
Store account NRG enabled in jsAccount instead of Account
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Aug 7, 2024
1 parent 44938ec commit 201770c
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 51 deletions.
2 changes: 0 additions & 2 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ type Account struct {
// Guarantee that only one goroutine can be running either checkJetStreamMigrate
// or clearObserverState at a given time for this account to prevent interleaving.
jscmMu sync.Mutex
// Should we send NRG traffic inside this account instead of the system account.
accountNRG atomic.Bool
}

const (
Expand Down
5 changes: 4 additions & 1 deletion server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,11 +494,14 @@ RESET:
si.Version = VERSION
si.Time = time.Now().UTC()
si.Tags = tags
si.Flags = 0
if js {
// New capability based flags.
si.SetJetStreamEnabled()
si.SetBinaryStreamSnapshot()
si.SetAccountNRG()
if s.accountNRGAllowed.Load() {
si.SetAccountNRG()
}
}
}
var b []byte
Expand Down
3 changes: 3 additions & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ type jsAccount struct {
updatesSub *subscription
lupdate time.Time
utimer *time.Timer

// Should we send NRG traffic inside this account instead of the system account.
accountNRG atomic.Bool
}

// Track general usage for this account.
Expand Down
131 changes: 91 additions & 40 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2471,52 +2471,103 @@ func TestJetStreamClusterAccountNRG(t *testing.T) {
require_NoError(t, err)
rg := stream.node.(*raft)

// System account should have interest, but the global account
// shouldn't.
for _, s := range c.servers {
require_True(t, s.sys.account.sl.hasInterest(rg.asubj, true))
require_False(t, s.gacc.sl.hasInterest(rg.asubj, true))
}
t.Run("Disabled", func(t *testing.T) {
// Switch off account NRG on all servers in the cluster.
for _, s := range c.servers {
s.accountNRGAllowed.Store(false)
s.sendStatszUpdate()
}
time.Sleep(time.Millisecond * 100)
for _, s := range c.servers {
s.GlobalAccount().js.accountNRG.Store(false)
s.updateNRGAccountStatus()
}

// First of all check that the Raft traffic is in the system
// account, as we haven't moved it elsewhere yet.
{
sub, err := snc.SubscribeSync(rg.asubj)
require_NoError(t, err)
require_NoError(t, sub.AutoUnsubscribe(1))
// System account should have interest, but the global account
// shouldn't.
for _, s := range c.servers {
require_True(t, s.sys.account.sl.hasInterest(rg.asubj, true))
require_False(t, s.gacc.sl.hasInterest(rg.asubj, true))
}

msg, err := sub.NextMsg(time.Second * 3)
require_NoError(t, err)
require_True(t, msg != nil)
}
// First of all check that the Raft traffic is in the system
// account, as we haven't moved it elsewhere yet.
{
sub, err := snc.SubscribeSync(rg.asubj)
require_NoError(t, err)
require_NoError(t, sub.AutoUnsubscribe(1))

// Switch on account NRG on all servers in the cluster. Then
// we wait, as we will need statsz to be sent for all servers
// in the cluster.
for _, s := range c.servers {
s.GlobalAccount().accountNRG.Store(true)
s.updateNRGAccountStatus()
}
msg, err := sub.NextMsg(time.Second * 3)
require_NoError(t, err)
require_True(t, msg != nil)
}
})

// Now check that the traffic has moved into the asset acc.
// In this case the system account should no longer have
// subscriptions for those subjects.
{
sub, err := nc.SubscribeSync(rg.asubj)
require_NoError(t, err)
require_NoError(t, sub.AutoUnsubscribe(1))
t.Run("Mixed", func(t *testing.T) {
// Switch on account NRG on a single server in the cluster and
// leave it off on the rest.
for i, s := range c.servers {
s.accountNRGAllowed.Store(i == 0)
s.sendStatszUpdate()
}
time.Sleep(time.Millisecond * 100)
for i, s := range c.servers {
s.GlobalAccount().js.accountNRG.Store(i == 0)
s.updateNRGAccountStatus()
}

msg, err := sub.NextMsg(time.Second * 3)
require_NoError(t, err)
require_True(t, msg != nil)
}
// System account should have interest, but the global account
// shouldn't.
for _, s := range c.servers {
require_True(t, s.sys.account.sl.hasInterest(rg.asubj, true))
require_False(t, s.gacc.sl.hasInterest(rg.asubj, true))
}

// The global account should now have interest and the
// system account shouldn't.
for _, s := range c.servers {
require_False(t, s.sys.account.sl.hasInterest(rg.asubj, true))
require_True(t, s.gacc.sl.hasInterest(rg.asubj, true))
}
// First of all check that the Raft traffic is in the system
// account, as we haven't moved it elsewhere yet.
{
sub, err := snc.SubscribeSync(rg.asubj)
require_NoError(t, err)
require_NoError(t, sub.AutoUnsubscribe(1))

msg, err := sub.NextMsg(time.Second * 3)
require_NoError(t, err)
require_True(t, msg != nil)
}
})

t.Run("Enabled", func(t *testing.T) {
// Switch on account NRG on all servers in the cluster.
for _, s := range c.servers {
s.accountNRGAllowed.Store(true)
s.sendStatszUpdate()
}
time.Sleep(time.Millisecond * 100)
for _, s := range c.servers {
s.GlobalAccount().js.accountNRG.Store(true)
s.updateNRGAccountStatus()
}

// Now check that the traffic has moved into the asset acc.
// In this case the system account should no longer have
// subscriptions for those subjects.
{
sub, err := nc.SubscribeSync(rg.asubj)
require_NoError(t, err)
require_NoError(t, sub.AutoUnsubscribe(1))

msg, err := sub.NextMsg(time.Second * 3)
require_NoError(t, err)
require_True(t, msg != nil)
}

// The global account should now have interest and the
// system account shouldn't.
for _, s := range c.servers {
require_False(t, s.sys.account.sl.hasInterest(rg.asubj, true))
require_True(t, s.gacc.sl.hasInterest(rg.asubj, true))
}
})
}

func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2133,7 +2133,7 @@ func parseJetStreamForAccount(v any, acc *Account, errors *[]error) error {
if !ok {
return &configErr{tk, fmt.Sprintf("Expected a boolean for %q, got %v", mk, mv)}
}
acc.accountNRG.Store(vv)
acc.js.accountNRG.Store(vv)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down
18 changes: 11 additions & 7 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,14 +550,18 @@ func (n *raft) RecreateInternalSubs() error {

func (n *raft) recreateInternalSubsLocked() error {
// Is account NRG enabled in this account?
var acc bool
if a, _ := n.s.lookupAccount(n.accName); a != nil {
acc = a.accountNRG.Load()
acc := n.s.accountNRGAllowed.Load()
if acc {
// Check whether the specific account has account NRG enabled.
if a, _ := n.s.lookupAccount(n.accName); a != nil && a.js != nil {
acc = a.js.accountNRG.Load()
}
}
if acc {
// Check whether the peers in this group all claim to support
// moving the NRG traffic into the account.
acc = n.checkAccountNRGStatus(acc)
}

// Check whether the peers in this group all claim to support
// moving the NRG traffic into the account.
acc = n.checkAccountNRGStatus(acc)
if n.aesub != nil && n.inAcc == acc {
// Subscriptions already exist and the account NRG state
// hasn't changed.
Expand Down
7 changes: 7 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ type Server struct {

// Queue to process JS API requests that come from routes (or gateways)
jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq]

// Whether moving NRG traffic into accounts is permitted on this server.
accountNRGAllowed atomic.Bool
}

// For tracking JS nodes.
Expand Down Expand Up @@ -724,6 +727,10 @@ func NewServer(opts *Options) (*Server, error) {
syncOutSem: make(chan struct{}, maxConcurrentSyncRequests),
}

// By default we'll allow account NRG.
// TODO: Maybe make it a configuration option?
s.accountNRGAllowed.Store(true)

// Fill up the maximum in flight syncRequests for this server.
// Used in JetStream catchup semantics.
for i := 0; i < maxConcurrentSyncRequests; i++ {
Expand Down

0 comments on commit 201770c

Please sign in to comment.