Skip to content

Commit

Permalink
Merge branch 'nats-io:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
souravagrawal authored Dec 24, 2024
2 parents a309fcf + c4b778c commit 0e11988
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 57 deletions.
6 changes: 3 additions & 3 deletions .github/actions/nightly-release/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ runs:

- name: goreleaser
# Use commit hash here to avoid a re-tagging attack, as this is a third-party action
# Commit 5742e2a039330cbb23ebf35f046f814d4c6ff811 = tag v5
uses: goreleaser/goreleaser-action@5742e2a039330cbb23ebf35f046f814d4c6ff811
# Commit 9ed2f89a662bf1735a48bc8557fd212fa902bebf = tag v6.1.0
uses: goreleaser/goreleaser-action@9ed2f89a662bf1735a48bc8557fd212fa902bebf
with:
workdir: "${{ inputs.workdir }}"
version: latest
version: '~> v2'
args: release --snapshot --config .goreleaser-nightly.yml

- name: images
Expand Down
4 changes: 3 additions & 1 deletion .goreleaser-nightly.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
project_name: nats-server
version: 2

builds:
- main: .
Expand All @@ -13,6 +14,7 @@ builds:
- linux
goarch:
- amd64
mod_timestamp: "{{ .CommitTimestamp }}"

dockers:
- goos: linux
Expand All @@ -32,4 +34,4 @@ checksum:
algorithm: sha256

snapshot:
name_template: '{{ if index .Env "IMAGE_NAME" }}{{ .Env.IMAGE_NAME }}{{ else if not (eq .Branch "main" "dev" "") }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}'
version_template: '{{ if index .Env "IMAGE_NAME" }}{{ .Env.IMAGE_NAME }}{{ else if not (eq .Branch "main" "dev" "") }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}'
4 changes: 3 additions & 1 deletion .goreleaser.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
project_name: nats-server
version: 2

release:
github:
Expand All @@ -8,7 +9,7 @@ release:
draft: true

changelog:
skip: true
disable: true

builds:
- main: .
Expand Down Expand Up @@ -45,6 +46,7 @@ builds:
goarch: arm64
- goos: freebsd
goarch: 386
mod_timestamp: "{{ .CommitTimestamp }}"

nfpms:
- file_name_template: '{{.ProjectName}}-{{.Tag}}-{{.Arch}}{{if .Arm}}{{.Arm}}{{end}}'
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ script: ./scripts/runTestsOnTravis.sh $TEST_SUITE
deploy:
provider: script
cleanup: true
script: curl -o /tmp/goreleaser.tar.gz -sLf https://github.com/goreleaser/goreleaser/releases/download/v1.26.2/goreleaser_Linux_x86_64.tar.gz && tar -xvf /tmp/goreleaser.tar.gz -C /tmp/ && /tmp/goreleaser
script: curl -o /tmp/goreleaser.tar.gz -sLf https://github.com/goreleaser/goreleaser/releases/download/v2.5.0/goreleaser_Linux_x86_64.tar.gz && tar -xvf /tmp/goreleaser.tar.gz -C /tmp/ && /tmp/goreleaser
on:
tags: true
condition: ($TRAVIS_GO_VERSION =~ 1.23) && ($TEST_SUITE = "compile")
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.22
toolchain go1.22.8

require (
github.com/google/go-tpm v0.9.0
github.com/google/go-tpm v0.9.3
github.com/klauspost/compress v1.17.11
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.7.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-tpm v0.9.0 h1:sQF6YqWMi+SCXpsmS3fd21oPy/vSddwZry4JnmltHVk=
github.com/google/go-tpm v0.9.0/go.mod h1:FkNVkc6C+IsvDI9Jw1OveJmxGZUUaKxtrpOS47QWKfU=
github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc=
github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
Expand Down
13 changes: 9 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5835,17 +5835,22 @@ func (o *consumer) requestNextMsgSubject() string {

func (o *consumer) decStreamPending(sseq uint64, subj string) {
o.mu.Lock()
// Update our cached num pending only if we think deliverMsg has not done so.
if sseq >= o.sseq && o.isFilteredMatch(subj) {
o.npc--
}

// Check if this message was pending.
p, wasPending := o.pending[sseq]
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[sseq]
}

// Update our cached num pending only if we think deliverMsg has not done so.
// Either we have not reached the message yet, or we've hit the race condition
// when there is contention at the beginning of the stream. In which case we can
// only decrement if the ack floor is still low enough to be able to detect it.
if o.isFilteredMatch(subj) && sseq > o.asflr && (sseq >= o.sseq || !wasPending) {
o.npc--
}

o.mu.Unlock()

// If it was pending process it like an ack.
Expand Down
8 changes: 3 additions & 5 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2415,7 +2415,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
compactInterval = 2 * time.Minute
compactSizeMin = 8 * 1024 * 1024
compactNumMin = 65536
minSnapDelta = 10 * time.Second
)

// Spread these out for large numbers on server restart.
Expand All @@ -2439,16 +2438,15 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// a complete and detailed state which could be costly in terms of memory, cpu and GC.
// This only entails how many messages, and the first and last sequence of the stream.
// This is all that is needed to detect a change, and we can get this from FilteredState()
// with and empty filter.
// with an empty filter.
var lastState SimpleState
var lastSnapTime time.Time

// Don't allow the upper layer to install snapshots until we have
// fully recovered from disk.
isRecovering := true

doSnapshot := func() {
if mset == nil || isRecovering || isRestore || time.Since(lastSnapTime) < minSnapDelta {
if mset == nil || isRecovering || isRestore {
return
}

Expand All @@ -2466,7 +2464,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}

if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil {
lastState, lastSnapTime = curState, time.Now()
lastState = curState
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
}
Expand Down
39 changes: 0 additions & 39 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6945,45 +6945,6 @@ func TestJetStreamClusterConsumerInfoAfterCreate(t *testing.T) {
require_NoError(t, err)
}

func TestJetStreamClusterDontSnapshotTooOften(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

// We force the snapshot compact size to hit multiple times.
// But, we should not be making snapshots too often since that would degrade performance.
data := make([]byte, 1024*1024) // 1MB payload
_, err = crand.Read(data)
require_NoError(t, err)
for i := 0; i < 50; i++ {
// We do synchronous publishes so we're more likely to have entries pass through the apply queue.
_, err = js.Publish("foo", data)
require_NoError(t, err)
}

for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
snap, err := mset.node.(*raft).loadLastSnapshot()
require_NoError(t, err)
// This measure is not exact and more of a side effect.
// We expect one snapshot to be made pretty soon and to be on cooldown after.
// So no snapshots should be made after that.
require_LessThan(t, snap.lastIndex, 20)
}
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
65 changes: 65 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24737,3 +24737,68 @@ func TestJetStreamWouldExceedLimits(t *testing.T) {
require_True(t, js.wouldExceedLimits(MemoryStorage, int(js.config.MaxMemory)+1))
require_True(t, js.wouldExceedLimits(FileStorage, int(js.config.MaxStore)+1))
}

func TestJetStreamConsumerDecrementPendingCountOnSkippedMsg(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"})
require_NoError(t, err)

acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("CONSUMER")

requireExpected := func(expected int64) {
t.Helper()
o.mu.RLock()
defer o.mu.RUnlock()
require_Equal(t, o.npc, expected)
}

// Should initially report no messages available.
requireExpected(0)

// A new message is available, should report in pending.
_, err = js.Publish("foo", nil)
require_NoError(t, err)
requireExpected(1)

// Pending count should decrease when the message is deleted.
err = js.DeleteMsg("TEST", 1)
require_NoError(t, err)
requireExpected(0)

// Make more messages available, should report in pending.
_, err = js.Publish("foo", nil)
require_NoError(t, err)
_, err = js.Publish("foo", nil)
require_NoError(t, err)
requireExpected(2)

// Simulate getNextMsg being called and the starting sequence to skip over a deleted message.
// Also simulate one pending message.
o.mu.Lock()
o.sseq = 100
o.npc--
o.pending = make(map[uint64]*Pending)
o.pending[2] = &Pending{}
o.mu.Unlock()

// Since this message is pending we should not decrement pending count as we've done so already.
o.decStreamPending(2, "foo")
requireExpected(1)

// This is the deleted message that was skipped, and we can decrement the pending count
// because it's not pending and only as long as the ack floor hasn't moved up yet.
o.decStreamPending(3, "foo")
requireExpected(0)
}

0 comments on commit 0e11988

Please sign in to comment.