diff --git a/bindings/azure/eventhubs/metadata.yaml b/bindings/azure/eventhubs/metadata.yaml index 841758b940..e5272bd62e 100644 --- a/bindings/azure/eventhubs/metadata.yaml +++ b/bindings/azure/eventhubs/metadata.yaml @@ -161,3 +161,12 @@ metadata: description: | Storage container name. example: '"myeventhubstoragecontainer"' + - name: getAllMessageProperties + required: false + default: "false" + example: "false" + binding: + input: true + output: false + description: | + When set to true, will retrieve all message properties and include them in the returned event metadata diff --git a/common/component/azure/eventhubs/eventhubs.go b/common/component/azure/eventhubs/eventhubs.go index e5b4cfd6c0..2eb079617b 100644 --- a/common/component/azure/eventhubs/eventhubs.go +++ b/common/component/azure/eventhubs/eventhubs.go @@ -127,6 +127,11 @@ func (aeh *AzureEventHubs) EventHubName() string { return aeh.metadata.hubName } +// GetAllMessageProperties returns a boolean to indicate whether to return all properties for an event hubs message. +func (aeh *AzureEventHubs) GetAllMessageProperties() bool { + return aeh.metadata.GetAllMessageProperties +} + // Publish a batch of messages. func (aeh *AzureEventHubs) Publish(ctx context.Context, topic string, messages []*azeventhubs.EventData, batchOpts *azeventhubs.EventDataBatchOptions) error { // Get the producer client @@ -165,7 +170,7 @@ func (aeh *AzureEventHubs) GetBindingsHandlerFunc(topic string, getAllProperties return nil, fmt.Errorf("expected 1 message, got %d", len(messages)) } - bindingsMsg, err := NewBindingsReadResponseFromEventData(messages[0], topic, getAllProperties) + bindingsMsg, err := NewBindingsReadResponseFromEventData(messages[0], topic, aeh.GetAllMessageProperties()) if err != nil { return nil, fmt.Errorf("failed to get bindings read response from azure eventhubs message: %w", err) } diff --git a/common/component/azure/eventhubs/metadata.go b/common/component/azure/eventhubs/metadata.go index 00ed07fa7d..1b003b3d34 100644 --- a/common/component/azure/eventhubs/metadata.go +++ b/common/component/azure/eventhubs/metadata.go @@ -39,6 +39,7 @@ type AzureEventHubsMetadata struct { SubscriptionID string `json:"subscriptionID" mapstructure:"subscriptionID"` ResourceGroupName string `json:"resourceGroupName" mapstructure:"resourceGroupName"` EnableInOrderMessageDelivery bool `json:"enableInOrderMessageDelivery,string" mapstructure:"enableInOrderMessageDelivery"` + GetAllMessageProperties bool `json:"getAllMessageProperties,string" mapstructure:"getAllMessageProperties"` // Binding only EventHub string `json:"eventHub" mapstructure:"eventHub" mdonly:"bindings"` diff --git a/pubsub/azure/eventhubs/eventhubs.go b/pubsub/azure/eventhubs/eventhubs.go index 65617239bf..ae6db85377 100644 --- a/pubsub/azure/eventhubs/eventhubs.go +++ b/pubsub/azure/eventhubs/eventhubs.go @@ -130,6 +130,10 @@ func (aeh *AzureEventHubs) Subscribe(ctx context.Context, req pubsub.SubscribeRe // Check if requireAllProperties is set and is truthy getAllProperties := utils.IsTruthy(req.Metadata["requireAllProperties"]) + if !getAllProperties { + getAllProperties = aeh.GetAllMessageProperties() + } + checkPointFrequencyPerPartition := commonutils.GetIntValFromString(req.Metadata["checkPointFrequencyPerPartition"], impl.DefaultCheckpointFrequencyPerPartition) pubsubHandler := aeh.GetPubSubHandlerFunc(topic, getAllProperties, handler) @@ -155,6 +159,9 @@ func (aeh *AzureEventHubs) BulkSubscribe(ctx context.Context, req pubsub.Subscri // Check if requireAllProperties is set and is truthy getAllProperties := utils.IsTruthy(req.Metadata["requireAllProperties"]) + if !getAllProperties { + getAllProperties = aeh.GetAllMessageProperties() + } checkPointFrequencyPerPartition := commonutils.GetIntValFromString(req.Metadata["checkPointFrequencyPerPartition"], impl.DefaultCheckpointFrequencyPerPartition) maxBulkSubCount := commonutils.GetIntValOrDefault(req.BulkSubscribeConfig.MaxMessagesCount, impl.DefaultMaxBulkSubCount) maxBulkSubAwaitDurationMs := commonutils.GetIntValOrDefault(req.BulkSubscribeConfig.MaxAwaitDurationMs, impl.DefaultMaxBulkSubAwaitDurationMs) diff --git a/pubsub/azure/eventhubs/metadata.yaml b/pubsub/azure/eventhubs/metadata.yaml index 57c73c721c..0f56fc0932 100644 --- a/pubsub/azure/eventhubs/metadata.yaml +++ b/pubsub/azure/eventhubs/metadata.yaml @@ -110,3 +110,12 @@ metadata: description: | The name of the Event Hubs Consumer Group to listen on. example: '"group1"' + - name: getAllMessageProperties + required: false + default: "false" + example: "false" + binding: + input: true + output: false + description: | + When set to true, will retrieve all message properties and include them in the returned event metadata