Skip to content

Commit

Permalink
Add configurable ackDeadline to GCP Pub/Sub component (#3422)
Browse files Browse the repository at this point in the history
Signed-off-by: [email protected] <[email protected]>
Signed-off-by: Bernd Verst <[email protected]>
Co-authored-by: [email protected] <[email protected]>
Co-authored-by: Bernd Verst <[email protected]>
Co-authored-by: Yaron Schneider <[email protected]>
Co-authored-by: Alessandro (Ale) Segala <[email protected]>
  • Loading branch information
5 people authored Jun 26, 2024
1 parent bf07ca5 commit 1375f08
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 12 deletions.
25 changes: 14 additions & 11 deletions pubsub/gcp/pubsub/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ limitations under the License.

package pubsub

import "time"

// GCPPubSubMetaData pubsub metadata.
type metadata struct {
// Ignored by metadata parser because included in built-in authentication profile
Expand All @@ -29,15 +31,16 @@ type metadata struct {
AuthProviderCertURL string `mapstructure:"authProviderX509CertUrl" mdignore:"true"`
ClientCertURL string `mapstructure:"clientX509CertUrl" mdignore:"true"`

DisableEntityManagement bool `mapstructure:"disableEntityManagement"`
EnableMessageOrdering bool `mapstructure:"enableMessageOrdering"`
MaxReconnectionAttempts int `mapstructure:"maxReconnectionAttempts"`
ConnectionRecoveryInSec int `mapstructure:"connectionRecoveryInSec"`
ConnectionEndpoint string `mapstructure:"endpoint"`
OrderingKey string `mapstructure:"orderingKey"`
DeadLetterTopic string `mapstructure:"deadLetterTopic"`
MaxDeliveryAttempts int `mapstructure:"maxDeliveryAttempts"`
MaxOutstandingMessages int `mapstructure:"maxOutstandingMessages"`
MaxOutstandingBytes int `mapstructure:"maxOutstandingBytes"`
MaxConcurrentConnections int `mapstructure:"maxConcurrentConnections"`
DisableEntityManagement bool `mapstructure:"disableEntityManagement"`
EnableMessageOrdering bool `mapstructure:"enableMessageOrdering"`
MaxReconnectionAttempts int `mapstructure:"maxReconnectionAttempts"`
ConnectionRecoveryInSec int `mapstructure:"connectionRecoveryInSec"`
ConnectionEndpoint string `mapstructure:"endpoint"`
OrderingKey string `mapstructure:"orderingKey"`
DeadLetterTopic string `mapstructure:"deadLetterTopic"`
MaxDeliveryAttempts int `mapstructure:"maxDeliveryAttempts"`
MaxOutstandingMessages int `mapstructure:"maxOutstandingMessages"`
MaxOutstandingBytes int `mapstructure:"maxOutstandingBytes"`
MaxConcurrentConnections int `mapstructure:"maxConcurrentConnections"`
AckDeadline time.Duration `mapstructure:"ackDeadline"`
}
6 changes: 6 additions & 0 deletions pubsub/gcp/pubsub/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,9 @@ metadata:
Max number of concurrent streaming-pull connections to maintain
type: number
example: '10'
- name: ackDeadline
description: |
Message acknowledgement duration deadline.
Allows users to specify a custom message acknowledgment deadline after which a redelivery of the message will be performed if the message was not acknowledged.
default: '20s'
example: '1m'
9 changes: 8 additions & 1 deletion pubsub/gcp/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ const (
// Metadata keys.
metadataProjectIDKey = "projectId"
metedataOrderingKeyKey = "orderingKey"
metadataAckDeadlineKey = "ackDeadline"

// Defaults.
defaultMaxReconnectionAttempts = 30
defaultConnectionRecoveryInSec = 2
defaultMaxDeliveryAttempts = 5
defaultAckDeadline = 20 * time.Second
)

// GCPPubSub type.
Expand Down Expand Up @@ -125,6 +127,7 @@ func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) {
MaxReconnectionAttempts: defaultMaxReconnectionAttempts,
ConnectionRecoveryInSec: defaultConnectionRecoveryInSec,
MaxDeliveryAttempts: defaultMaxDeliveryAttempts,
AckDeadline: defaultAckDeadline,
}

err := kitmd.DecodeMetadata(pubSubMetadata.Properties, &result)
Expand All @@ -136,6 +139,10 @@ func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) {
return &result, fmt.Errorf("%s missing attribute %s", errorMessagePrefix, metadataProjectIDKey)
}

if result.AckDeadline <= 0 {
return nil, fmt.Errorf("%s invalid AckDeadline %s. Value must be a positive Go duration string or integer", errorMessagePrefix, pubSubMetadata.Properties[metadataAckDeadlineKey])
}

return &result, nil
}

Expand Down Expand Up @@ -456,7 +463,7 @@ func (g *GCPPubSub) ensureSubscription(parentCtx context.Context, subscription s
exists, subErr := entity.Exists(parentCtx)
if !exists {
subConfig := gcppubsub.SubscriptionConfig{
AckDeadline: 20 * time.Second,
AckDeadline: g.metadata.AckDeadline,
Topic: g.getTopic(topic),
EnableMessageOrdering: g.metadata.EnableMessageOrdering,
}
Expand Down
35 changes: 35 additions & 0 deletions pubsub/gcp/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package pubsub

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -257,4 +258,38 @@ func TestInit(t *testing.T) {
require.Error(t, err)
require.ErrorContains(t, err, "maxConcurrentConnections")
})
t.Run("valid ackDeadline", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"projectId": "test-project",
"ackDeadline": "30s", // Valid custom ack deadline in seconds
}

md, err := createMetadata(m)
require.NoError(t, err)
assert.Equal(t, 30*time.Second, md.AckDeadline, "AckDeadline should match the provided configuration")
})

t.Run("invalid ackDeadline", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"projectId": "test-project",
"ackDeadline": "-10m", // Invalid ack deadline
}

_, err := createMetadata(m)
require.Error(t, err, "Should return an error for invalid ackDeadline")
assert.Contains(t, err.Error(), "invalid AckDeadline", "Error message should indicate the invalid ack deadline")
})

t.Run("default ackDeadline when not specified", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"projectId": "test-project", // No ackDeadline specified
}

md, err := createMetadata(m)
require.NoError(t, err)
assert.Equal(t, defaultAckDeadline, md.AckDeadline, "Should use the default AckDeadline when none is specified")
})
}

0 comments on commit 1375f08

Please sign in to comment.