Skip to content

Commit

Permalink
[FIXED] Do not overwrite ordered consumer deliver policy if start tim…
Browse files Browse the repository at this point in the history
…e is set (#1742)

Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio authored Nov 29, 2024
1 parent e963b77 commit f563c66
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 5 deletions.
8 changes: 3 additions & 5 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,18 +631,16 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
c.cfg.DeliverPolicy == DeliverAllPolicy {

cfg.OptStartSeq = 0
} else if c.cfg.DeliverPolicy == DeliverByStartTimePolicy {
cfg.OptStartSeq = 0
cfg.OptStartTime = c.cfg.OptStartTime
} else {
cfg.OptStartSeq = c.cfg.OptStartSeq
}

if cfg.DeliverPolicy == DeliverLastPerSubjectPolicy && len(c.cfg.FilterSubjects) == 0 {
cfg.FilterSubjects = []string{">"}
}
if c.cfg.OptStartTime != nil {
cfg.OptStartSeq = 0
cfg.DeliverPolicy = DeliverByStartTimePolicy
cfg.OptStartTime = c.cfg.OptStartTime
}

return cfg
}
Expand Down
50 changes: 50 additions & 0 deletions jetstream/test/ordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2051,6 +2051,56 @@ func TestOrderedConsumerConfig(t *testing.T) {
HeadersOnly: true,
},
},
{
name: "both start seq and time set, deliver policy start seq",
config: jetstream.OrderedConsumerConfig{
FilterSubjects: []string{"foo.a", "foo.b"},
DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
OptStartSeq: 10,
OptStartTime: &time.Time{},
ReplayPolicy: jetstream.ReplayOriginalPolicy,
InactiveThreshold: 10 * time.Second,
HeadersOnly: true,
},
expected: jetstream.ConsumerConfig{
FilterSubjects: []string{"foo.a", "foo.b"},
OptStartSeq: 10,
OptStartTime: nil,
DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
AckPolicy: jetstream.AckNonePolicy,
MaxDeliver: -1,
MaxWaiting: 512,
InactiveThreshold: 10 * time.Second,
Replicas: 1,
MemoryStorage: true,
HeadersOnly: true,
},
},
{
name: "both start seq and time set, deliver policy start time",
config: jetstream.OrderedConsumerConfig{
FilterSubjects: []string{"foo.a", "foo.b"},
DeliverPolicy: jetstream.DeliverByStartTimePolicy,
OptStartSeq: 10,
OptStartTime: &time.Time{},
ReplayPolicy: jetstream.ReplayOriginalPolicy,
InactiveThreshold: 10 * time.Second,
HeadersOnly: true,
},
expected: jetstream.ConsumerConfig{
FilterSubjects: []string{"foo.a", "foo.b"},
OptStartSeq: 0,
OptStartTime: &time.Time{},
DeliverPolicy: jetstream.DeliverByStartTimePolicy,
AckPolicy: jetstream.AckNonePolicy,
MaxDeliver: -1,
MaxWaiting: 512,
InactiveThreshold: 10 * time.Second,
Replicas: 1,
MemoryStorage: true,
HeadersOnly: true,
},
},
}

for _, test := range tests {
Expand Down

0 comments on commit f563c66

Please sign in to comment.