Skip to content

Commit

Permalink
[pubsub/jetstream] Add support for concurrencyMode (#3222)
Browse files Browse the repository at this point in the history
Signed-off-by: Byron Ruth <[email protected]>
  • Loading branch information
bruth authored Nov 13, 2023
1 parent 3bcd0c7 commit dd8d2ba
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 24 deletions.
21 changes: 17 additions & 4 deletions pubsub/jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,8 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
natsHandler := func(m *nats.Msg) {
jsm, err := m.Metadata()
if err != nil {
// If we get an error, then we don't have a valid JetStream
// message.
// If we get an error, then we don't have a valid JetStream message.
js.l.Error(err)

return
}

Expand Down Expand Up @@ -260,7 +258,22 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
subscription, err = js.jsc.QueueSubscribe(req.Topic, queue, natsHandler, nats.Bind(streamName, consumerInfo.Name))
} else {
js.l.Debugf("nats: subscribed to subject %s", req.Topic)
subscription, err = js.jsc.Subscribe(req.Topic, natsHandler, nats.Bind(streamName, consumerInfo.Name))
subscription, err = js.jsc.Subscribe(
req.Topic,
func(msg *nats.Msg) {
switch js.meta.Concurrency {
case pubsub.Single:
natsHandler(msg)
case pubsub.Parallel:
js.wg.Add(1)
go func() {
natsHandler(msg)
js.wg.Done()
}()
}
},
nats.Bind(streamName, consumerInfo.Name),
)
}
if err != nil {
return err
Expand Down
17 changes: 16 additions & 1 deletion pubsub/jetstream/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,14 @@ type metadata struct {
internalAckPolicy nats.AckPolicy `mapstructure:"-"`
Domain string `mapstructure:"domain"`
APIPrefix string `mapstructure:"apiPrefix"`

Concurrency pubsub.ConcurrencyMode `mapstructure:"concurrency"`
}

func parseMetadata(psm pubsub.Metadata) (metadata, error) {
var m metadata
m := metadata{
Concurrency: pubsub.Single,
}

err := kitmd.DecodeMetadata(psm.Properties, &m)
if err != nil {
Expand Down Expand Up @@ -119,5 +123,16 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) {
m.internalAckPolicy = nats.AckExplicitPolicy
}

// Explicit check to prevent overriding the Single default
// (the previous behavior) if not set.
// TODO: See https://github.com/dapr/components-contrib/pull/3222#discussion_r1389772053
if psm.Properties[pubsub.ConcurrencyKey] != "" {
c, err := pubsub.Concurrency(psm.Properties)
if err != nil {
return metadata{}, err
}
m.Concurrency = c
}

return m, nil
}
41 changes: 22 additions & 19 deletions pubsub/jetstream/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,32 +74,34 @@ func TestParseMetadata(t *testing.T) {
internalDeliverPolicy: nats.DeliverAllPolicy,
internalAckPolicy: nats.AckExplicitPolicy,
Domain: "hub",
Concurrency: pubsub.Single,
},
expectErr: false,
},
{
desc: "Valid Metadata with token",
input: pubsub.Metadata{Base: mdata.Base{
Properties: map[string]string{
"natsURL": "nats://localhost:4222",
"name": "myName",
"durableName": "myDurable",
"queueGroupName": "myQueue",
"startTime": "1629328511",
"flowControl": "true",
"ackWait": "2s",
"maxDeliver": "10",
"backOff": "500ms, 2s, 10s",
"maxAckPending": "5000",
"replicas": "3",
"memoryStorage": "true",
"rateLimit": "20000",
"heartbeat": "1s",
"token": "myToken",
"deliverPolicy": "sequence",
"startSequence": "5",
"ackPolicy": "all",
"apiPrefix": "HUB",
"natsURL": "nats://localhost:4222",
"name": "myName",
"durableName": "myDurable",
"queueGroupName": "myQueue",
"startTime": "1629328511",
"flowControl": "true",
"ackWait": "2s",
"maxDeliver": "10",
"backOff": "500ms, 2s, 10s",
"maxAckPending": "5000",
"replicas": "3",
"memoryStorage": "true",
"rateLimit": "20000",
"heartbeat": "1s",
"token": "myToken",
"deliverPolicy": "sequence",
"startSequence": "5",
"ackPolicy": "all",
"apiPrefix": "HUB",
"concurrencyMode": "parallel",
},
}},
want: metadata{
Expand All @@ -125,6 +127,7 @@ func TestParseMetadata(t *testing.T) {
internalDeliverPolicy: nats.DeliverByStartSequencePolicy,
internalAckPolicy: nats.AckAllPolicy,
APIPrefix: "HUB",
Concurrency: pubsub.Parallel,
},
expectErr: false,
},
Expand Down

0 comments on commit dd8d2ba

Please sign in to comment.