From 1375f081c68eeabd7aed83eeddb95a9193d41113 Mon Sep 17 00:00:00 2001 From: mahparaashley <47503096+mahparaashley@users.noreply.github.com> Date: Wed, 26 Jun 2024 15:59:44 -0400 Subject: [PATCH] Add configurable ackDeadline to GCP Pub/Sub component (#3422) Signed-off-by: mashley@rechargeapps.com Signed-off-by: Bernd Verst Co-authored-by: mashley@rechargeapps.com Co-authored-by: Bernd Verst Co-authored-by: Yaron Schneider Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> --- pubsub/gcp/pubsub/metadata.go | 25 +++++++++++++---------- pubsub/gcp/pubsub/metadata.yaml | 6 ++++++ pubsub/gcp/pubsub/pubsub.go | 9 +++++++- pubsub/gcp/pubsub/pubsub_test.go | 35 ++++++++++++++++++++++++++++++++ 4 files changed, 63 insertions(+), 12 deletions(-) diff --git a/pubsub/gcp/pubsub/metadata.go b/pubsub/gcp/pubsub/metadata.go index 168d990968..3bfa21e0c2 100644 --- a/pubsub/gcp/pubsub/metadata.go +++ b/pubsub/gcp/pubsub/metadata.go @@ -13,6 +13,8 @@ limitations under the License. package pubsub +import "time" + // GCPPubSubMetaData pubsub metadata. type metadata struct { // Ignored by metadata parser because included in built-in authentication profile @@ -29,15 +31,16 @@ type metadata struct { AuthProviderCertURL string `mapstructure:"authProviderX509CertUrl" mdignore:"true"` ClientCertURL string `mapstructure:"clientX509CertUrl" mdignore:"true"` - DisableEntityManagement bool `mapstructure:"disableEntityManagement"` - EnableMessageOrdering bool `mapstructure:"enableMessageOrdering"` - MaxReconnectionAttempts int `mapstructure:"maxReconnectionAttempts"` - ConnectionRecoveryInSec int `mapstructure:"connectionRecoveryInSec"` - ConnectionEndpoint string `mapstructure:"endpoint"` - OrderingKey string `mapstructure:"orderingKey"` - DeadLetterTopic string `mapstructure:"deadLetterTopic"` - MaxDeliveryAttempts int `mapstructure:"maxDeliveryAttempts"` - MaxOutstandingMessages int `mapstructure:"maxOutstandingMessages"` - MaxOutstandingBytes int `mapstructure:"maxOutstandingBytes"` - MaxConcurrentConnections int `mapstructure:"maxConcurrentConnections"` + DisableEntityManagement bool `mapstructure:"disableEntityManagement"` + EnableMessageOrdering bool `mapstructure:"enableMessageOrdering"` + MaxReconnectionAttempts int `mapstructure:"maxReconnectionAttempts"` + ConnectionRecoveryInSec int `mapstructure:"connectionRecoveryInSec"` + ConnectionEndpoint string `mapstructure:"endpoint"` + OrderingKey string `mapstructure:"orderingKey"` + DeadLetterTopic string `mapstructure:"deadLetterTopic"` + MaxDeliveryAttempts int `mapstructure:"maxDeliveryAttempts"` + MaxOutstandingMessages int `mapstructure:"maxOutstandingMessages"` + MaxOutstandingBytes int `mapstructure:"maxOutstandingBytes"` + MaxConcurrentConnections int `mapstructure:"maxConcurrentConnections"` + AckDeadline time.Duration `mapstructure:"ackDeadline"` } diff --git a/pubsub/gcp/pubsub/metadata.yaml b/pubsub/gcp/pubsub/metadata.yaml index 4adffd2d3a..7ade285fb4 100644 --- a/pubsub/gcp/pubsub/metadata.yaml +++ b/pubsub/gcp/pubsub/metadata.yaml @@ -75,3 +75,9 @@ metadata: Max number of concurrent streaming-pull connections to maintain type: number example: '10' + - name: ackDeadline + description: | + Message acknowledgement duration deadline. + Allows users to specify a custom message acknowledgment deadline after which a redelivery of the message will be performed if the message was not acknowledged. + default: '20s' + example: '1m' diff --git a/pubsub/gcp/pubsub/pubsub.go b/pubsub/gcp/pubsub/pubsub.go index 562545db39..22088f5ddc 100644 --- a/pubsub/gcp/pubsub/pubsub.go +++ b/pubsub/gcp/pubsub/pubsub.go @@ -41,11 +41,13 @@ const ( // Metadata keys. metadataProjectIDKey = "projectId" metedataOrderingKeyKey = "orderingKey" + metadataAckDeadlineKey = "ackDeadline" // Defaults. defaultMaxReconnectionAttempts = 30 defaultConnectionRecoveryInSec = 2 defaultMaxDeliveryAttempts = 5 + defaultAckDeadline = 20 * time.Second ) // GCPPubSub type. @@ -125,6 +127,7 @@ func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) { MaxReconnectionAttempts: defaultMaxReconnectionAttempts, ConnectionRecoveryInSec: defaultConnectionRecoveryInSec, MaxDeliveryAttempts: defaultMaxDeliveryAttempts, + AckDeadline: defaultAckDeadline, } err := kitmd.DecodeMetadata(pubSubMetadata.Properties, &result) @@ -136,6 +139,10 @@ func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) { return &result, fmt.Errorf("%s missing attribute %s", errorMessagePrefix, metadataProjectIDKey) } + if result.AckDeadline <= 0 { + return nil, fmt.Errorf("%s invalid AckDeadline %s. Value must be a positive Go duration string or integer", errorMessagePrefix, pubSubMetadata.Properties[metadataAckDeadlineKey]) + } + return &result, nil } @@ -456,7 +463,7 @@ func (g *GCPPubSub) ensureSubscription(parentCtx context.Context, subscription s exists, subErr := entity.Exists(parentCtx) if !exists { subConfig := gcppubsub.SubscriptionConfig{ - AckDeadline: 20 * time.Second, + AckDeadline: g.metadata.AckDeadline, Topic: g.getTopic(topic), EnableMessageOrdering: g.metadata.EnableMessageOrdering, } diff --git a/pubsub/gcp/pubsub/pubsub_test.go b/pubsub/gcp/pubsub/pubsub_test.go index 0ddfaaad7c..8cda0ee16f 100644 --- a/pubsub/gcp/pubsub/pubsub_test.go +++ b/pubsub/gcp/pubsub/pubsub_test.go @@ -15,6 +15,7 @@ package pubsub import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -257,4 +258,38 @@ func TestInit(t *testing.T) { require.Error(t, err) require.ErrorContains(t, err, "maxConcurrentConnections") }) + t.Run("valid ackDeadline", func(t *testing.T) { + m := pubsub.Metadata{} + m.Properties = map[string]string{ + "projectId": "test-project", + "ackDeadline": "30s", // Valid custom ack deadline in seconds + } + + md, err := createMetadata(m) + require.NoError(t, err) + assert.Equal(t, 30*time.Second, md.AckDeadline, "AckDeadline should match the provided configuration") + }) + + t.Run("invalid ackDeadline", func(t *testing.T) { + m := pubsub.Metadata{} + m.Properties = map[string]string{ + "projectId": "test-project", + "ackDeadline": "-10m", // Invalid ack deadline + } + + _, err := createMetadata(m) + require.Error(t, err, "Should return an error for invalid ackDeadline") + assert.Contains(t, err.Error(), "invalid AckDeadline", "Error message should indicate the invalid ack deadline") + }) + + t.Run("default ackDeadline when not specified", func(t *testing.T) { + m := pubsub.Metadata{} + m.Properties = map[string]string{ + "projectId": "test-project", // No ackDeadline specified + } + + md, err := createMetadata(m) + require.NoError(t, err) + assert.Equal(t, defaultAckDeadline, md.AckDeadline, "Should use the default AckDeadline when none is specified") + }) }