Skip to content

Commit

Permalink
Merge pull request #868 from ripienaar/stream_consumer_limits
Browse files Browse the repository at this point in the history
Support consumer limits
  • Loading branch information
ripienaar authored Sep 21, 2023
2 parents f77374d + b32965d commit f7e1c2c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 9 deletions.
15 changes: 15 additions & 0 deletions cli/stream_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ type streamCmd struct {
metadataIsSet bool
compression string
firstSeq uint64
limitInactiveThreshold time.Duration
limitMaxAckPending int

fServer string
fCluster string
Expand Down Expand Up @@ -209,6 +211,11 @@ func configureStreamCommand(app commandHost) {
f.Flag("republish-source", "Republish messages to --republish-destination").PlaceHolder("SOURCE").StringVar(&c.repubSource)
f.Flag("republish-destination", "Republish destination for messages in --republish-source").PlaceHolder("DEST").StringVar(&c.repubDest)
f.Flag("republish-headers", "Republish only message headers, no bodies").UnNegatableBoolVar(&c.repubHeadersOnly)
if !edit {
f.Flag("limit-consumer-inactive", "The maximum Consumer inactive threshold the Stream allows").PlaceHolder("THRESHOLD").DurationVar(&c.limitInactiveThreshold)
f.Flag("limit-consumer-max-pending", "The maximum Consumer Ack Pending the stream Allows").PlaceHolder("PENDING").IntVar(&c.limitMaxAckPending)
}

if edit {
f.Flag("no-republish", "Removes current republish configuration").UnNegatableBoolVar(&c.noRepub)
f.Flag("no-transform", "Removes current subject transform configuration").UnNegatableBoolVar(&c.noSubjectTransform)
Expand Down Expand Up @@ -2417,6 +2424,14 @@ func (c *streamCmd) prepareConfig(_ *fisk.ParseContext, requireSize bool) api.St
DiscardNewPer: c.discardPerSubj,
}

if c.limitInactiveThreshold > 0 {
cfg.ConsumerLimits.InactiveThreshold = c.limitInactiveThreshold
}

if c.limitMaxAckPending > 0 {
cfg.ConsumerLimits.MaxAckPending = c.limitMaxAckPending
}

if len(c.metadata) > 0 {
cfg.Metadata = c.metadata
}
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ require (
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/klauspost/compress v1.17.0
github.com/mattn/go-isatty v0.0.19
github.com/nats-io/jsm.go v0.1.0
github.com/nats-io/nats-server/v2 v2.10.0
github.com/nats-io/jsm.go v0.1.1-0.20230921074448-1bbb5650afc8
github.com/nats-io/nats-server/v2 v2.10.1
github.com/nats-io/nats.go v1.30.0
github.com/nats-io/nuid v1.0.1
github.com/prometheus/client_golang v1.16.0
Expand All @@ -33,7 +33,7 @@ require (
)

require (
github.com/antonmedv/expr v1.15.2 // indirect
github.com/antonmedv/expr v1.15.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXY
github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 h1:+vx7roKuyA63nhn5WAunQHLTznkw5W8b1Xc0dNjp83s=
github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDeC1lPdgDeDbhX8XFpy1jqjK0IBG8W5K+xYqA0w=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/antonmedv/expr v1.15.2 h1:afFXpDWIC2n3bF+kTZE1JvFo+c34uaM3sTqh8z0xfdU=
github.com/antonmedv/expr v1.15.2/go.mod h1:0E/6TxnOlRNp81GMzX9QfDPAmHo2Phg00y4JUv1ihsE=
github.com/antonmedv/expr v1.15.3 h1:q3hOJZNvLvhqE8OHBs1cFRdbXFNKuA+bHmRaI+AmRmI=
github.com/antonmedv/expr v1.15.3/go.mod h1:0E/6TxnOlRNp81GMzX9QfDPAmHo2Phg00y4JUv1ihsE=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
Expand Down Expand Up @@ -80,12 +80,12 @@ github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQ
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jsm.go v0.1.0 h1:H2gYCee/iyBDjUftPOr5fEPWAcG/+fyVl89IWiy6AC4=
github.com/nats-io/jsm.go v0.1.0/go.mod h1:snnYORje42cEDCX5QygzeoVA2KiWVbiIJbLfGIvXW08=
github.com/nats-io/jsm.go v0.1.1-0.20230921074448-1bbb5650afc8 h1:OKm9e1//rlcl4i9zXQ6QQxj7DJaeL+Oe8WBgAKO4cqI=
github.com/nats-io/jsm.go v0.1.1-0.20230921074448-1bbb5650afc8/go.mod h1:hB4Qd+IKoRvAAWTOI1HkCy4wotjFwOIT+codHCFOZqk=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.10.0 h1:rcU++Hzo+wARxtJugrV3J5z5iGdHeVG8tT8Chb3bKDg=
github.com/nats-io/nats-server/v2 v2.10.0/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
github.com/nats-io/nats-server/v2 v2.10.1 h1:MIJ614dhOIdo71iSzY8ln78miXwrYvlvXHUyS+XdKZQ=
github.com/nats-io/nats-server/v2 v2.10.1/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
github.com/nats-io/nats.go v1.30.0 h1:bj/rVsRCrFXxmm9mJiDhb74UKl2HhKpDwKRBtvCjZjc=
github.com/nats-io/nats.go v1.30.0/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
Expand Down

0 comments on commit f7e1c2c

Please sign in to comment.