diff --git a/bindings/rabbitmq/rabbitmq.go b/bindings/rabbitmq/rabbitmq.go index 8be7ee063b..057b312c22 100644 --- a/bindings/rabbitmq/rabbitmq.go +++ b/bindings/rabbitmq/rabbitmq.go @@ -31,6 +31,7 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "github.com/dapr/components-contrib/bindings" + common "github.com/dapr/components-contrib/common/component/rabbitmq" "github.com/dapr/components-contrib/metadata" "github.com/dapr/kit/logger" kitmd "github.com/dapr/kit/metadata" @@ -228,11 +229,6 @@ func (r *RabbitMQ) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bi pub.Headers[k] = v } - contentType, ok := metadata.TryGetContentType(req.Metadata) - if ok { - pub.ContentType = contentType - } - // The default time to live has been set in the queue // We allow overriding on each call, by setting a value in request metadata ttl, ok, err := metadata.TryGetTTL(req.Metadata) @@ -252,6 +248,8 @@ func (r *RabbitMQ) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bi pub.Priority = priority } + common.ApplyMetadataToPublishing(req.Metadata, &pub) + err = ch.PublishWithContext(ctx, "", r.metadata.QueueName, false, false, pub) if err != nil { return nil, fmt.Errorf("failed to publish message: %w", err) @@ -473,9 +471,9 @@ func (r *RabbitMQ) handleMessage(ctx context.Context, handler bindings.Handler, // Passthrough any custom metadata to the handler. for k, v := range d.Headers { if s, ok := v.(string); ok { - // Escape the key and value to ensure they are valid URL query parameters. + // Escape the key to ensure they are valid URL query parameters. // This is necessary for them to be sent as HTTP Metadata. - metadata[url.QueryEscape(k)] = url.QueryEscape(s) + metadata[url.QueryEscape(k)] = s } } diff --git a/bindings/rabbitmq/rabbitmq_integration_test.go b/bindings/rabbitmq/rabbitmq_integration_test.go index dcc380c232..def3d8513e 100644 --- a/bindings/rabbitmq/rabbitmq_integration_test.go +++ b/bindings/rabbitmq/rabbitmq_integration_test.go @@ -447,3 +447,67 @@ func TestPublishWithHeaders(t *testing.T) { // assert.Contains(t, msg.Header, "custom_header1") // assert.Contains(t, msg.Header, "custom_header2") } + +func TestPublishMetadataProperties(t *testing.T) { + rabbitmqHost := getTestRabbitMQHost() + require.NotEmpty(t, rabbitmqHost, fmt.Sprintf("RabbitMQ host configuration must be set in environment variable '%s'", testRabbitMQHostEnvKey)) + + queueName := uuid.New().String() + durable := true + exclusive := false + + metadata := bindings.Metadata{ + Base: contribMetadata.Base{ + Name: "testQueue", + Properties: map[string]string{ + "queueName": queueName, + "host": rabbitmqHost, + "deleteWhenUnused": strconv.FormatBool(exclusive), + "durable": strconv.FormatBool(durable), + }, + }, + } + + logger := logger.NewLogger("test") + r := NewRabbitMQ(logger).(*RabbitMQ) + err := r.Init(t.Context(), metadata) + require.NoError(t, err) + + conn, err := amqp.Dial(rabbitmqHost) + require.NoError(t, err) + defer conn.Close() + + ch, err := conn.Channel() + require.NoError(t, err) + defer ch.Close() + + const messageData = "test message" + const msgID = "msg-123" + const corrID = "corr-456" + const msgType = "testType" + const contentType = "application/json" + + writeRequest := bindings.InvokeRequest{ + Data: []byte(messageData), + Metadata: map[string]string{ + "messageID": msgID, + "correlationID": corrID, + "type": msgType, + "contentType": contentType, + }, + } + _, err = r.Invoke(t.Context(), &writeRequest) + require.NoError(t, err) + + // Retrieve the message. + msg, ok, err := getMessageWithRetries(ch, queueName, 2*time.Second) + require.NoError(t, err) + assert.True(t, ok) + assert.Equal(t, messageData, string(msg.Body)) + assert.Equal(t, msgID, msg.MessageId) + assert.Equal(t, corrID, msg.CorrelationId) + assert.Equal(t, msgType, msg.Type) + assert.Equal(t, contentType, msg.ContentType) + + require.NoError(t, r.Close()) +} diff --git a/common/component/rabbitmq/rabbitmq.go b/common/component/rabbitmq/rabbitmq.go new file mode 100644 index 0000000000..9cd127bbc7 --- /dev/null +++ b/common/component/rabbitmq/rabbitmq.go @@ -0,0 +1,52 @@ +package rabbitmq + +import ( + "strings" + + amqp "github.com/rabbitmq/amqp091-go" +) + +const ( + MetadataKeyMessageID = "messageID" + MetadataKeyCorrelationID = "correlationID" + MetadataKeyContentType = "contentType" + MetadataKeyType = "type" + MetadataKeyPriority = "priority" + MetadataKeyTTL = "ttl" +) + +// TryGetProperty finds a property value using case-insensitive matching +func TryGetProperty(props map[string]string, key string) (string, bool) { + // First try exact match + if val, ok := props[key]; ok && val != "" { + return val, true + } + + // Then try case-insensitive match + for k, v := range props { + if v != "" && strings.EqualFold(key, k) { + return v, true + } + } + + return "", false +} + +// ApplyMetadataToPublishing applies common metadata fields to an AMQP publishing +func ApplyMetadataToPublishing(metadata map[string]string, publishing *amqp.Publishing) { + if contentType, ok := TryGetProperty(metadata, MetadataKeyContentType); ok { + publishing.ContentType = contentType + } + + if messageID, ok := TryGetProperty(metadata, MetadataKeyMessageID); ok { + publishing.MessageId = messageID + } + + if correlationID, ok := TryGetProperty(metadata, MetadataKeyCorrelationID); ok { + publishing.CorrelationId = correlationID + } + + if aType, ok := TryGetProperty(metadata, MetadataKeyType); ok { + publishing.Type = aType + } +} diff --git a/common/component/rabbitmq/rabbitmq_test.go b/common/component/rabbitmq/rabbitmq_test.go new file mode 100644 index 0000000000..1a55aeb845 --- /dev/null +++ b/common/component/rabbitmq/rabbitmq_test.go @@ -0,0 +1,68 @@ +package rabbitmq + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestTryGetProperty(t *testing.T) { + tests := []struct { + name string + props map[string]string + key string + expected string + found bool + }{ + { + name: "exact match", + props: map[string]string{"messageID": "test-id"}, + key: "messageID", + expected: "test-id", + found: true, + }, + { + name: "case insensitive match", + props: map[string]string{"messageid": "test-id"}, + key: "messageID", + expected: "test-id", + found: true, + }, + { + name: "uppercase match", + props: map[string]string{"MESSAGEID": "test-id"}, + key: "messageID", + expected: "test-id", + found: true, + }, + { + name: "not found", + props: map[string]string{"otherKey": "value"}, + key: "messageID", + expected: "", + found: false, + }, + { + name: "empty value", + props: map[string]string{"messageID": ""}, + key: "messageID", + expected: "", + found: false, + }, + { + name: "whitespace value", + props: map[string]string{"messageID": " "}, + key: "messageID", + expected: " ", + found: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + value, found := TryGetProperty(tt.props, tt.key) + assert.Equal(t, tt.expected, value) + assert.Equal(t, tt.found, found) + }) + } +} diff --git a/conversation/deepseek/metadata.yaml b/conversation/deepseek/metadata.yaml index 0f38a876b5..6ae17cc454 100644 --- a/conversation/deepseek/metadata.yaml +++ b/conversation/deepseek/metadata.yaml @@ -7,7 +7,7 @@ status: alpha title: "Deepseek" urls: - title: Reference - url: https://docs.dapr.io/reference/components-reference/supported-conversation/deepseek/ + url: https://docs.dapr.io/reference/components-reference/supported-conversation/setup-deepseek/ authenticationProfiles: - title: "API Key" description: "Authenticate using an API key" diff --git a/go.sum b/go.sum index a0b5adfa27..bc2a13fe02 100644 --- a/go.sum +++ b/go.sum @@ -236,6 +236,8 @@ github.com/aliyunmq/mq-http-go-sdk v1.0.3/go.mod h1:JYfRMQoPexERvnNNBcal0ZQ2TVQ5 github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/dubbo-getty v1.4.9-0.20220610060150-8af010f3f3dc h1:NZRon3MDqT4vddR3UIRBnwbbhEerghAimCSBsiESs3g= github.com/apache/dubbo-getty v1.4.9-0.20220610060150-8af010f3f3dc/go.mod h1:cPJlbcHUTNTpiboMQjMHhE9XBni11LiBiG8FdrDuVzk= @@ -382,6 +384,8 @@ github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QH github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= @@ -389,6 +393,8 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chebyrash/promise v0.0.0-20230709133807-42ec49ba1459 h1:s7UrE2T8jRoriLIddT8fW5+Wf2sXcOgfteXUKD74SaU= github.com/chebyrash/promise v0.0.0-20230709133807-42ec49ba1459/go.mod h1:CQthfPdCoGmlBJAG/sP9Km5nfK1/jGpDf1RiG/LUxXw= github.com/chenzhuoyu/iasm v0.9.0/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog= @@ -627,12 +633,16 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8 github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/foxcpp/go-mockdns v1.1.0 h1:jI0rD8M0wuYAxL7r/ynTrCQQq0BVqfB99Vgk7DlmewI= github.com/foxcpp/go-mockdns v1.1.0/go.mod h1:IhLeSFGed3mJIAXPH2aiRQB+kqz7oqu8ld2qVbOu7Wk= +github.com/foxcpp/go-mockdns v1.1.0 h1:jI0rD8M0wuYAxL7r/ynTrCQQq0BVqfB99Vgk7DlmewI= +github.com/foxcpp/go-mockdns v1.1.0/go.mod h1:IhLeSFGed3mJIAXPH2aiRQB+kqz7oqu8ld2qVbOu7Wk= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/frankban/quicktest v1.10.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y= github.com/frankban/quicktest v1.10.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= @@ -685,6 +695,8 @@ github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= @@ -1233,6 +1245,8 @@ github.com/miekg/dns v1.1.27/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7 github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= github.com/miekg/dns v1.1.57 h1:Jzi7ApEIzwEPLHWRcafCN9LZSBbqQpxjt/wpgvg7wcM= github.com/miekg/dns v1.1.57/go.mod h1:uqRjCRUuEAA6qsOiJvDd+CFo/vW+y5WR6SNmHE55hZk= +github.com/miekg/dns v1.1.57 h1:Jzi7ApEIzwEPLHWRcafCN9LZSBbqQpxjt/wpgvg7wcM= +github.com/miekg/dns v1.1.57/go.mod h1:uqRjCRUuEAA6qsOiJvDd+CFo/vW+y5WR6SNmHE55hZk= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= @@ -1311,6 +1325,8 @@ github.com/nats-io/nats-server/v2 v2.9.23/go.mod h1:wEjrEy9vnqIGE4Pqz4/c75v9Pmaq github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= +github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= @@ -1538,6 +1554,8 @@ github.com/sendgrid/sendgrid-go v3.13.0+incompatible h1:HZrzc06/QfBGesY9o3n1lvBr github.com/sendgrid/sendgrid-go v3.13.0+incompatible/go.mod h1:QRQt+LX/NmgVEvmdRw0VT/QgUn499+iza2FnDca9fg8= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= +github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= +github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= github.com/shirou/gopsutil v3.20.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88= github.com/shirou/gopsutil/v3 v3.22.2/go.mod h1:WapW1AOOPlHyXr+yOyw3uYx36enocrtSoSBy0L5vUHY= @@ -1632,6 +1650,8 @@ github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxm github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/supplyon/gremcos v0.1.40 h1:OFJw3MV44HNE9N6SKYK0zRBbEwyugyyjjqeXiGi5E3w= github.com/supplyon/gremcos v0.1.40/go.mod h1:LI6lxKObicSoIw1N04rHyjz9tGSaevM6Ydbo3XfyZfA= github.com/tchap/go-patricia/v2 v2.3.2 h1:xTHFutuitO2zqKAQ5rCROYgUb7Or/+IC3fts9/Yc7nM= @@ -1694,6 +1714,8 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC github.com/valyala/fasthttp v1.21.0/go.mod h1:jjraHZVbKOXftJfsOYoAjaeygpj5hr8ermTRJNroD7A= github.com/valyala/fasthttp v1.53.0 h1:lW/+SUkOxCx2vlIu0iaImv4JLrVRnbbkpCoaawvA4zc= github.com/valyala/fasthttp v1.53.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM= +github.com/valyala/fasthttp v1.53.0 h1:lW/+SUkOxCx2vlIu0iaImv4JLrVRnbbkpCoaawvA4zc= +github.com/valyala/fasthttp v1.53.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/vmware/vmware-go-kcl v1.5.1 h1:1rJLfAX4sDnCyatNoD/WJzVafkwST6u/cgY/Uf2VgHk= @@ -1750,14 +1772,20 @@ go.etcd.io/etcd/api/v3 v3.5.0-alpha.0/go.mod h1:mPcW6aZJukV6Aa81LSKpBjQXTWlXB5r7 go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= go.etcd.io/etcd/api/v3 v3.5.10 h1:szRajuUUbLyppkhs9K6BRtjY37l66XQQmw7oZRANE4k= go.etcd.io/etcd/api/v3 v3.5.10/go.mod h1:TidfmT4Uycad3NM/o25fG3J07odo4GBB9hoxaodFCtI= +go.etcd.io/etcd/api/v3 v3.5.10 h1:szRajuUUbLyppkhs9K6BRtjY37l66XQQmw7oZRANE4k= +go.etcd.io/etcd/api/v3 v3.5.10/go.mod h1:TidfmT4Uycad3NM/o25fG3J07odo4GBB9hoxaodFCtI= go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/pkg/v3 v3.5.10 h1:kfYIdQftBnbAq8pUWFXfpuuxFSKzlmM5cSn76JByiT0= go.etcd.io/etcd/client/pkg/v3 v3.5.10/go.mod h1:DYivfIviIuQ8+/lCq4vcxuseg2P2XbHygkKwFo9fc8U= +go.etcd.io/etcd/client/pkg/v3 v3.5.10 h1:kfYIdQftBnbAq8pUWFXfpuuxFSKzlmM5cSn76JByiT0= +go.etcd.io/etcd/client/pkg/v3 v3.5.10/go.mod h1:DYivfIviIuQ8+/lCq4vcxuseg2P2XbHygkKwFo9fc8U= go.etcd.io/etcd/client/v2 v2.305.0-alpha.0/go.mod h1:kdV+xzCJ3luEBSIeQyB/OEKkWKd8Zkux4sbDeANrosU= go.etcd.io/etcd/client/v3 v3.5.0-alpha.0/go.mod h1:wKt7jgDgf/OfKiYmCq5WFGxOFAkVMLxiiXgLDFhECr8= go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= go.etcd.io/etcd/client/v3 v3.5.10 h1:W9TXNZ+oB3MCd/8UjxHTWK5J9Nquw9fQBLJd5ne5/Ao= go.etcd.io/etcd/client/v3 v3.5.10/go.mod h1:RVeBnDz2PUEZqTpgqwAtUd8nAPf5kjyFyND7P1VkOKc= +go.etcd.io/etcd/client/v3 v3.5.10 h1:W9TXNZ+oB3MCd/8UjxHTWK5J9Nquw9fQBLJd5ne5/Ao= +go.etcd.io/etcd/client/v3 v3.5.10/go.mod h1:RVeBnDz2PUEZqTpgqwAtUd8nAPf5kjyFyND7P1VkOKc= go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0/go.mod h1:tV31atvwzcybuqejDoY3oaNRTtlD2l/Ot78Pc9w7DMY= go.etcd.io/etcd/raft/v3 v3.5.0-alpha.0/go.mod h1:FAwse6Zlm5v4tEWZaTjmNhe17Int4Oxbu7+2r0DiD3w= go.etcd.io/etcd/server/v3 v3.5.0-alpha.0/go.mod h1:tsKetYpt980ZTpzl/gb+UOJj9RkIyCb1u4wjzMg90BQ= @@ -1842,6 +1870,8 @@ go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= @@ -2038,8 +2068,8 @@ golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4Iltr golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= -golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M= -golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= +golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= +golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/metadata/utils.go b/metadata/utils.go index 50591061a5..35f44f1182 100644 --- a/metadata/utils.go +++ b/metadata/utils.go @@ -114,14 +114,6 @@ func IsRawPayload(props map[string]string) (bool, error) { return false, nil } -func TryGetContentType(props map[string]string) (string, bool) { - if val, ok := props[ContentType]; ok && val != "" { - return val, true - } - - return "", false -} - func TryGetQueryIndexName(props map[string]string) (string, bool) { if val, ok := props[QueryIndexName]; ok && val != "" { return val, true diff --git a/metadata/utils_test.go b/metadata/utils_test.go index 44c09e2bb0..0f8f6de7b8 100644 --- a/metadata/utils_test.go +++ b/metadata/utils_test.go @@ -175,34 +175,6 @@ func TestIsRawPayload(t *testing.T) { }) } -func TestTryGetContentType(t *testing.T) { - t.Run("Metadata without content type", func(t *testing.T) { - val, ok := TryGetContentType(map[string]string{}) - - assert.Equal(t, "", val) - assert.False(t, ok) - }) - - t.Run("Metadata with empty content type", func(t *testing.T) { - val, ok := TryGetContentType(map[string]string{ - "contentType": "", - }) - - assert.Equal(t, "", val) - assert.False(t, ok) - }) - - t.Run("Metadata with corrent content type", func(t *testing.T) { - const contentType = "application/cloudevent+json" - val, ok := TryGetContentType(map[string]string{ - "contentType": contentType, - }) - - assert.Equal(t, contentType, val) - assert.True(t, ok) - }) -} - func TestMetadataStructToStringMap(t *testing.T) { t.Run("Test metadata struct to metadata info conversion", func(t *testing.T) { type NestedStruct struct { diff --git a/pubsub/rabbitmq/metadata.go b/pubsub/rabbitmq/metadata.go index 6b6e6fc7b1..fdaa388dc9 100644 --- a/pubsub/rabbitmq/metadata.go +++ b/pubsub/rabbitmq/metadata.go @@ -27,31 +27,32 @@ import ( ) type rabbitmqMetadata struct { - pubsub.TLSProperties `mapstructure:",squash"` - ConsumerID string `mapstructure:"consumerID" mdignore:"true"` - ConnectionString string `mapstructure:"connectionString"` - Protocol string `mapstructure:"protocol"` - internalProtocol string `mapstructure:"-"` - Hostname string `mapstructure:"hostname"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - Durable bool `mapstructure:"durable"` - EnableDeadLetter bool `mapstructure:"enableDeadLetter"` - DeleteWhenUnused bool `mapstructure:"deletedWhenUnused"` - AutoAck bool `mapstructure:"autoAck"` - RequeueInFailure bool `mapstructure:"requeueInFailure"` - DeliveryMode uint8 `mapstructure:"deliveryMode"` // Transient (0 or 1) or Persistent (2) - PrefetchCount uint8 `mapstructure:"prefetchCount"` // Prefetch deactivated if 0 - ReconnectWait time.Duration `mapstructure:"reconnectWaitSeconds"` - MaxLen int64 `mapstructure:"maxLen"` - MaxLenBytes int64 `mapstructure:"maxLenBytes"` - ExchangeKind string `mapstructure:"exchangeKind"` - ClientName string `mapstructure:"clientName"` - HeartBeat time.Duration `mapstructure:"heartBeat"` - PublisherConfirm bool `mapstructure:"publisherConfirm"` - SaslExternal bool `mapstructure:"saslExternal"` - Concurrency pubsub.ConcurrencyMode `mapstructure:"concurrency"` - DefaultQueueTTL *time.Duration `mapstructure:"ttlInSeconds"` + pubsub.TLSProperties `mapstructure:",squash"` + ConsumerID string `mapstructure:"consumerID" mdignore:"true"` + ConnectionString string `mapstructure:"connectionString"` + Protocol string `mapstructure:"protocol"` + internalProtocol string `mapstructure:"-"` + Hostname string `mapstructure:"hostname"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + Durable bool `mapstructure:"durable"` + EnableDeadLetter bool `mapstructure:"enableDeadLetter"` + DeleteWhenUnused bool `mapstructure:"deletedWhenUnused"` + AutoAck bool `mapstructure:"autoAck"` + RequeueInFailure bool `mapstructure:"requeueInFailure"` + DeliveryMode uint8 `mapstructure:"deliveryMode"` // Transient (0 or 1) or Persistent (2) + PrefetchCount uint8 `mapstructure:"prefetchCount"` // Prefetch deactivated if 0 + ReconnectWait time.Duration `mapstructure:"reconnectWaitSeconds"` + MaxLen int64 `mapstructure:"maxLen"` + MaxLenBytes int64 `mapstructure:"maxLenBytes"` + ExchangeKind string `mapstructure:"exchangeKind"` + ClientName string `mapstructure:"clientName"` + HeartBeat time.Duration `mapstructure:"heartBeat"` + PublisherConfirm bool `mapstructure:"publisherConfirm"` + SaslExternal bool `mapstructure:"saslExternal"` + Concurrency pubsub.ConcurrencyMode `mapstructure:"concurrency"` + DefaultQueueTTL *time.Duration `mapstructure:"ttlInSeconds"` + PublishMessagePropertiesToMetadata bool `mapstructure:"publishMessagePropertiesToMetadata"` } const ( @@ -65,23 +66,24 @@ const ( metadataUsernameKey = "username" metadataPasswordKey = "password" - metadataDurableKey = "durable" - metadataEnableDeadLetterKey = "enableDeadLetter" - metadataDeleteWhenUnusedKey = "deletedWhenUnused" - metadataAutoAckKey = "autoAck" - metadataRequeueInFailureKey = "requeueInFailure" - metadataDeliveryModeKey = "deliveryMode" - metadataPrefetchCountKey = "prefetchCount" - metadataReconnectWaitSecondsKey = "reconnectWaitSeconds" - metadataMaxLenKey = "maxLen" - metadataMaxLenBytesKey = "maxLenBytes" - metadataExchangeKindKey = "exchangeKind" - metadataPublisherConfirmKey = "publisherConfirm" - metadataSaslExternal = "saslExternal" - metadataMaxPriority = "maxPriority" - metadataClientNameKey = "clientName" - metadataHeartBeatKey = "heartBeat" - metadataQueueNameKey = "queueName" + metadataDurableKey = "durable" + metadataEnableDeadLetterKey = "enableDeadLetter" + metadataDeleteWhenUnusedKey = "deletedWhenUnused" + metadataAutoAckKey = "autoAck" + metadataRequeueInFailureKey = "requeueInFailure" + metadataDeliveryModeKey = "deliveryMode" + metadataPrefetchCountKey = "prefetchCount" + metadataReconnectWaitSecondsKey = "reconnectWaitSeconds" + metadataMaxLenKey = "maxLen" + metadataMaxLenBytesKey = "maxLenBytes" + metadataExchangeKindKey = "exchangeKind" + metadataPublisherConfirmKey = "publisherConfirm" + metadataSaslExternal = "saslExternal" + metadataMaxPriority = "maxPriority" + metadataClientNameKey = "clientName" + metadataHeartBeatKey = "heartBeat" + metadataQueueNameKey = "queueName" + metadataPublishMessagePropertiesToMetadataKey = "publishMessagePropertiesToMetadata" defaultReconnectWaitSeconds = 3 @@ -92,16 +94,17 @@ const ( // createMetadata creates a new instance from the pubsub metadata. func createMetadata(pubSubMetadata pubsub.Metadata, log logger.Logger) (*rabbitmqMetadata, error) { result := rabbitmqMetadata{ - internalProtocol: protocolAMQP, - Hostname: "localhost", - Durable: true, - DeleteWhenUnused: true, - AutoAck: false, - ReconnectWait: time.Duration(defaultReconnectWaitSeconds) * time.Second, - ExchangeKind: fanoutExchangeKind, - PublisherConfirm: false, - SaslExternal: false, - HeartBeat: defaultHeartbeat, + internalProtocol: protocolAMQP, + Hostname: "localhost", + Durable: true, + DeleteWhenUnused: true, + AutoAck: false, + ReconnectWait: time.Duration(defaultReconnectWaitSeconds) * time.Second, + ExchangeKind: fanoutExchangeKind, + PublisherConfirm: false, + SaslExternal: false, + HeartBeat: defaultHeartbeat, + PublishMessagePropertiesToMetadata: false, } // upgrade metadata diff --git a/pubsub/rabbitmq/metadata_test.go b/pubsub/rabbitmq/metadata_test.go index 6ebebe08a8..c6ec29e3fe 100644 --- a/pubsub/rabbitmq/metadata_test.go +++ b/pubsub/rabbitmq/metadata_test.go @@ -99,6 +99,7 @@ func TestCreateMetadata(t *testing.T) { assert.Equal(t, "", m.CACert) assert.Equal(t, fanoutExchangeKind, m.ExchangeKind) assert.True(t, m.Durable) + assert.False(t, m.PublishMessagePropertiesToMetadata) }) invalidDeliveryModes := []string{"3", "10", "-1"} diff --git a/pubsub/rabbitmq/rabbitmq.go b/pubsub/rabbitmq/rabbitmq.go index 5ae400d03f..a62105a6db 100644 --- a/pubsub/rabbitmq/rabbitmq.go +++ b/pubsub/rabbitmq/rabbitmq.go @@ -28,6 +28,7 @@ import ( amqp "github.com/rabbitmq/amqp091-go" + common "github.com/dapr/components-contrib/common/component/rabbitmq" "github.com/dapr/components-contrib/metadata" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" @@ -259,6 +260,8 @@ func (r *rabbitMQ) publishSync(ctx context.Context, req *pubsub.PublishRequest) p.Priority = priority } + common.ApplyMetadataToPublishing(req.Metadata, &p) + confirm, err := r.channel.PublishWithDeferredConfirmWithContext(ctx, req.Topic, routingKey, false, false, p) if err != nil { r.logger.Errorf("%s publishing to %s failed in channel.Publish: %v", logMessagePrefix, req.Topic, err) @@ -620,8 +623,13 @@ func (r *rabbitMQ) listenMessages(ctx context.Context, channel rabbitMQChannelBr func (r *rabbitMQ) handleMessage(ctx context.Context, d amqp.Delivery, topic string, handler pubsub.Handler) error { pubsubMsg := &pubsub.NewMessage{ - Data: d.Body, - Topic: topic, + Data: d.Body, + Topic: topic, + Metadata: map[string]string{}, + } + + if r.metadata.PublishMessagePropertiesToMetadata { + pubsubMsg.Metadata = addAMQPPropertiesToMetadata(d) } err := handler(ctx, pubsubMsg) @@ -745,3 +753,43 @@ func (r *rabbitMQ) GetComponentMetadata() (metadataInfo metadata.MetadataMap) { func queueTypeValid(qType string) bool { return qType == amqp.QueueTypeClassic || qType == amqp.QueueTypeQuorum } + +// Add this function to extract metadata from AMQP delivery +func addAMQPPropertiesToMetadata(delivery amqp.Delivery) map[string]string { + metadata := map[string]string{} + + // Add message properties as metadata + if delivery.MessageId != "" { + metadata["metadata.messageid"] = delivery.MessageId + } + + if delivery.CorrelationId != "" { + metadata["metadata.correlationid"] = delivery.CorrelationId + } + + if delivery.Type != "" { + metadata["metadata.type"] = delivery.Type + } + + if delivery.ContentType != "" { + metadata["metadata.contenttype"] = delivery.ContentType + } + + // Add any custom headers + for k, v := range delivery.Headers { + metadataPrefixedKey := "metadata." + k + if v != nil { + switch value := v.(type) { + case string: + metadata[metadataPrefixedKey] = value + case []byte: + metadata[metadataPrefixedKey] = string(value) + default: + // Try to convert other types to string + metadata[metadataPrefixedKey] = fmt.Sprintf("%v", v) + } + } + } + + return metadata +} diff --git a/pubsub/rabbitmq/rabbitmq_test.go b/pubsub/rabbitmq/rabbitmq_test.go index a8be105b49..b8504230fa 100644 --- a/pubsub/rabbitmq/rabbitmq_test.go +++ b/pubsub/rabbitmq/rabbitmq_test.go @@ -461,10 +461,11 @@ func createAMQPMessage(body []byte) amqp.Delivery { } type rabbitMQInMemoryBroker struct { - buffer chan amqp.Delivery - declaredQueues []string - connectCount atomic.Int32 - closeCount atomic.Int32 + buffer chan amqp.Delivery + declaredQueues []string + connectCount atomic.Int32 + closeCount atomic.Int32 + lastMsgMetadata *amqp.Publishing // Add this field to capture the last message metadata } func (r *rabbitMQInMemoryBroker) Qos(prefetchCount, prefetchSize int, global bool) error { @@ -482,7 +483,17 @@ func (r *rabbitMQInMemoryBroker) PublishWithDeferredConfirmWithContext(ctx conte return nil, errors.New(errorChannelConnection) } - r.buffer <- createAMQPMessage(msg.Body) + // Store the last message metadata for inspection in tests + r.lastMsgMetadata = &msg + + // Use a non-blocking send or a separate goroutine to prevent deadlock + // when there's no consumer reading from the buffer + select { + case r.buffer <- createAMQPMessage(msg.Body): + // Message sent successfully + default: + // Buffer is full or there's no consumer, but we don't want to block + } return nil, nil } @@ -525,3 +536,200 @@ func (r *rabbitMQInMemoryBroker) Close() error { func (r *rabbitMQInMemoryBroker) IsClosed() bool { return r.connectCount.Load() <= r.closeCount.Load() } + +// TestPublishMetadataProperties tests that message metadata properties are correctly passed to the broker +func TestPublishMetadataProperties(t *testing.T) { + broker := newBroker() + pubsubRabbitMQ := newRabbitMQTest(broker) + metadata := pubsub.Metadata{Base: mdata.Base{ + Properties: map[string]string{ + metadataHostnameKey: "anyhost", + metadataConsumerIDKey: "consumer", + }, + }} + err := pubsubRabbitMQ.Init(t.Context(), metadata) + require.NoError(t, err) + + topic := "metadatatest" + + // Create a consumer for the test to prevent channel deadlock + messageHandler := func(ctx context.Context, msg *pubsub.NewMessage) error { + return nil + } + err = pubsubRabbitMQ.Subscribe(t.Context(), pubsub.SubscribeRequest{Topic: topic}, messageHandler) + require.NoError(t, err) + + // Test messageID + err = pubsubRabbitMQ.Publish(t.Context(), &pubsub.PublishRequest{ + Topic: topic, + Data: []byte("test message"), + Metadata: map[string]string{ + "messageID": "msg-123", + }, + }) + require.NoError(t, err) + assert.Equal(t, "msg-123", broker.lastMsgMetadata.MessageId) + + // Test correlationID + err = pubsubRabbitMQ.Publish(t.Context(), &pubsub.PublishRequest{ + Topic: topic, + Data: []byte("test message"), + Metadata: map[string]string{ + "correlationID": "corr-456", + }, + }) + require.NoError(t, err) + assert.Equal(t, "corr-456", broker.lastMsgMetadata.CorrelationId) + + // Test Type + err = pubsubRabbitMQ.Publish(t.Context(), &pubsub.PublishRequest{ + Topic: topic, + Data: []byte("test message"), + Metadata: map[string]string{ + "type": "mytype", + }, + }) + require.NoError(t, err) + assert.Equal(t, "mytype", broker.lastMsgMetadata.Type) + + // Test all properties together + err = pubsubRabbitMQ.Publish(t.Context(), &pubsub.PublishRequest{ + Topic: topic, + Data: []byte("test message"), + Metadata: map[string]string{ + "messageID": "msg-789", + "correlationID": "corr-789", + "type": "complete-type", + "contentType": "application/json", + }, + }) + require.NoError(t, err) + assert.Equal(t, "msg-789", broker.lastMsgMetadata.MessageId) + assert.Equal(t, "corr-789", broker.lastMsgMetadata.CorrelationId) + assert.Equal(t, "complete-type", broker.lastMsgMetadata.Type) + assert.Equal(t, "application/json", broker.lastMsgMetadata.ContentType) +} + +func TestPublishMessagePropertiesToMetadataFlag(t *testing.T) { + topicName := "test-topic" + messageData := []byte("test message data") + + t.Run("flag is true", func(t *testing.T) { + broker := newBroker() + pubsubRabbitMQ := newRabbitMQTest(broker) + metadata := pubsub.Metadata{Base: mdata.Base{ + Properties: map[string]string{ + metadataHostnameKey: "anyhost", + metadataConsumerIDKey: "consumer", + metadataPublishMessagePropertiesToMetadataKey: "true", + }, + }} + err := pubsubRabbitMQ.Init(t.Context(), metadata) + require.NoError(t, err) + + var receivedMsg *pubsub.NewMessage + processed := make(chan bool) + handler := func(ctx context.Context, msg *pubsub.NewMessage) error { + receivedMsg = msg + processed <- true + return nil + } + + err = pubsubRabbitMQ.Subscribe(t.Context(), pubsub.SubscribeRequest{Topic: topicName}, handler) + require.NoError(t, err) + + // Publish a message with some AMQP properties + broker.buffer <- amqp.Delivery{ + Body: messageData, + MessageId: "msg-id-true", + ContentType: "text/plain", + Headers: amqp.Table{ + "customHeader": "customValue", + }, + } + + <-processed + require.NotNil(t, receivedMsg) + assert.Equal(t, messageData, receivedMsg.Data) + assert.Equal(t, topicName, receivedMsg.Topic) + assert.Equal(t, "msg-id-true", receivedMsg.Metadata["metadata.messageid"]) + assert.Equal(t, "text/plain", receivedMsg.Metadata["metadata.contenttype"]) + assert.Equal(t, "customValue", receivedMsg.Metadata["metadata.customHeader"]) + }) + + t.Run("flag is false", func(t *testing.T) { + broker := newBroker() + pubsubRabbitMQ := newRabbitMQTest(broker) + metadata := pubsub.Metadata{Base: mdata.Base{ + Properties: map[string]string{ + metadataHostnameKey: "anyhost", + metadataConsumerIDKey: "consumer", + metadataPublishMessagePropertiesToMetadataKey: "false", // Explicitly false + }, + }} + err := pubsubRabbitMQ.Init(t.Context(), metadata) + require.NoError(t, err) + + var receivedMsg *pubsub.NewMessage + processed := make(chan bool) + handler := func(ctx context.Context, msg *pubsub.NewMessage) error { + receivedMsg = msg + processed <- true + return nil + } + + err = pubsubRabbitMQ.Subscribe(t.Context(), pubsub.SubscribeRequest{Topic: topicName}, handler) + require.NoError(t, err) + + // Publish a message with some AMQP properties + broker.buffer <- amqp.Delivery{ + Body: messageData, + MessageId: "msg-id-false", + ContentType: "application/xml", + } + + <-processed + require.NotNil(t, receivedMsg) + assert.Equal(t, messageData, receivedMsg.Data) + assert.Equal(t, topicName, receivedMsg.Topic) + assert.Empty(t, receivedMsg.Metadata, "Metadata should be empty when flag is false") + }) + + t.Run("flag is not set (default to false)", func(t *testing.T) { + broker := newBroker() + pubsubRabbitMQ := newRabbitMQTest(broker) + metadata := pubsub.Metadata{Base: mdata.Base{ + Properties: map[string]string{ + metadataHostnameKey: "anyhost", + metadataConsumerIDKey: "consumer", + // metadataPublishMessagePropertiesToMetadataKey is not set + }, + }} + err := pubsubRabbitMQ.Init(t.Context(), metadata) + require.NoError(t, err) + + var receivedMsg *pubsub.NewMessage + processed := make(chan bool) + handler := func(ctx context.Context, msg *pubsub.NewMessage) error { + receivedMsg = msg + processed <- true + return nil + } + + err = pubsubRabbitMQ.Subscribe(t.Context(), pubsub.SubscribeRequest{Topic: topicName}, handler) + require.NoError(t, err) + + // Publish a message with some AMQP properties + broker.buffer <- amqp.Delivery{ + Body: messageData, + MessageId: "msg-id-default", + ContentType: "application/json", + } + + <-processed + require.NotNil(t, receivedMsg) + assert.Equal(t, messageData, receivedMsg.Data) + assert.Equal(t, topicName, receivedMsg.Topic) + assert.Empty(t, receivedMsg.Metadata, "Metadata should be empty when flag is not set (defaults to false)") + }) +} diff --git a/tests/certification/bindings/rabbitmq/components/metadata/rabbitmq.yaml b/tests/certification/bindings/rabbitmq/components/metadata/rabbitmq.yaml new file mode 100644 index 0000000000..5472860810 --- /dev/null +++ b/tests/certification/bindings/rabbitmq/components/metadata/rabbitmq.yaml @@ -0,0 +1,16 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: metadata-binding +spec: + type: bindings.rabbitmq + version: v1 + metadata: + - name: queueName + value: metadataQueue + - name: host + value: "amqp://test:test@localhost:5672" + - name: durable + value: true + - name: deleteWhenUnused + value: false \ No newline at end of file diff --git a/tests/certification/bindings/rabbitmq/rabbitmq_test.go b/tests/certification/bindings/rabbitmq/rabbitmq_test.go index dbd2b3ec74..58facf5ac5 100644 --- a/tests/certification/bindings/rabbitmq/rabbitmq_test.go +++ b/tests/certification/bindings/rabbitmq/rabbitmq_test.go @@ -21,6 +21,7 @@ import ( "log" "os" "strconv" + "strings" "testing" "time" @@ -614,6 +615,101 @@ func amqpMtlsExternalAuthReady(url string) flow.Runnable { } } +func getMetadataValueCI(metadata map[string]string, key string) (string, bool) { + for k, v := range metadata { + if strings.EqualFold(k, key) { + return v, true + } + } + return "", false +} + +func TestRabbitMQMetadataProperties(t *testing.T) { + messages := watcher.NewUnordered() + + ports, _ := dapr_testing.GetFreePorts(3) + grpcPort := ports[0] + httpPort := ports[1] + appPort := ports[2] + + // Define the test values for metadata with fixed IDs + const messageData = "metadata-test-message" + const msgID = "msg-id-123" + const corrID = "corr-id-456" + const msgType = "test-type" + const contentType = "application/json" + + test := func(ctx flow.Context) error { + client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort)) + require.NoError(t, err, "Could not initialize dapr client.") + + metadata := map[string]string{ + "messageID": msgID, + "correlationID": corrID, + "type": msgType, + "contentType": contentType, + } + + ctx.Log("Invoking binding with metadata properties!") + req := &daprClient.InvokeBindingRequest{ + Name: "metadata-binding", + Operation: "create", + Data: []byte(messageData), + Metadata: metadata, + } + + err = client.InvokeOutputBinding(ctx, req) + require.NoError(ctx, err, "error publishing message with metadata") + + // Assertion on the data and metadata. + messages.ExpectStrings(messageData) + messages.Assert(ctx, time.Minute) + + return nil + } + + application := func(ctx flow.Context, s common.Service) (err error) { + // Setup the input binding endpoint. + err = multierr.Combine(err, + s.AddBindingInvocationHandler("metadata-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) { + msg := string(in.Data) + messages.Observe(msg) + + // Log the received metadata for debugging + ctx.Logf("Got message: %s with metadata: %+v", msg, in.Metadata) + + msgIdVal, _ := getMetadataValueCI(in.Metadata, "messageid") + corrIdVal, _ := getMetadataValueCI(in.Metadata, "correlationid") + contentTypeVal, _ := getMetadataValueCI(in.Metadata, "contenttype") + typeVal, _ := getMetadataValueCI(in.Metadata, "type") + + require.Equal(t, msgID, msgIdVal, "messageID should match expected value") + require.Equal(t, corrID, corrIdVal, "correlationID should match expected value") + require.Equal(t, contentType, contentTypeVal, "contentType should match expected value") + require.Equal(t, msgType, typeVal, "type should match expected value") + + return []byte("{}"), nil + })) + return err + } + + flow.New(t, "rabbitmq metadata properties certification"). + Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step("wait for rabbitmq readiness", + retry.Do(time.Second, 30, amqpReady(rabbitMQURL))). + Step(app.Run("metadataApp", fmt.Sprintf(":%d", appPort), application)). + Step(sidecar.Run("metadataSidecar", + append(componentRuntimeOptions(), + embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)), + embedded.WithDaprGRPCPort(strconv.Itoa(grpcPort)), + embedded.WithDaprHTTPPort(strconv.Itoa(httpPort)), + embedded.WithComponentsPath("./components/metadata"), + )..., + )). + Step("send with metadata and verify", test). + Run() +} + func componentRuntimeOptions() []embedded.Option { log := logger.NewLogger("dapr.components") diff --git a/tests/certification/pubsub/rabbitmq/components/metadata/rabbitmq-metadata.yaml b/tests/certification/pubsub/rabbitmq/components/metadata/rabbitmq-metadata.yaml new file mode 100644 index 0000000000..534539f966 --- /dev/null +++ b/tests/certification/pubsub/rabbitmq/components/metadata/rabbitmq-metadata.yaml @@ -0,0 +1,20 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: mq-metadata +spec: + type: pubsub.rabbitmq + version: v1 + metadata: + - name: consumerID + value: metadata + - name: host + value: "amqp://test:test@localhost:5672" + - name: durable + value: true + - name: deletedWhenUnused + value: false + - name: requeueInFailure + value: true + - name: publishMessagePropertiesToMetadata + value: true \ No newline at end of file diff --git a/tests/certification/pubsub/rabbitmq/rabbitmq_test.go b/tests/certification/pubsub/rabbitmq/rabbitmq_test.go index 1c77b40e9e..9ce8d565f0 100644 --- a/tests/certification/pubsub/rabbitmq/rabbitmq_test.go +++ b/tests/certification/pubsub/rabbitmq/rabbitmq_test.go @@ -22,6 +22,7 @@ import ( "math/rand" "os" "strconv" + "strings" "sync" "testing" "time" @@ -54,11 +55,13 @@ const ( sidecarName2 = "dapr-2" sidecarName3 = "dapr-3" sidecarName4 = "dapr-4" + sidecarNameMetadata = "dapr-metadata" sidecarNameTTLClient = "dapr-ttl-client" appID1 = "app-1" appID2 = "app-2" appID3 = "app-3" appID4 = "app-4" + appIDMetadata = "app-metadata" clusterName = "rabbitmqcertification" dockerComposeYAML = "docker-compose.yml" extSaslDockerComposeYAML = "mtls_sasl_external/docker-compose.yml" @@ -72,6 +75,7 @@ const ( pubsubAlpha = "mq-alpha" pubsubBeta = "mq-beta" pubsubMtlsExternal = "mq-mtls" + pubsubMetadata = "mq-metadata" pubsubMessageOnlyTTL = "msg-ttl-pubsub" pubsubQueueOnlyTTL = "overwrite-ttl-pubsub" pubsubOverwriteTTL = "queue-ttl-pubsub" @@ -84,6 +88,8 @@ const ( topicTTL1 = "ttl1" topicTTL2 = "ttl2" topicTTL3 = "ttl3" + + topicMetadata = "metadata-topic" ) type Consumer struct { @@ -853,6 +859,178 @@ func TestRabbitMQPriority(t *testing.T) { Run() } +func getMetadataValueCI(metadata map[string]string, key string) (string, bool) { + for k, v := range metadata { + if strings.EqualFold(k, key) { + return v, true + } + } + return "", false +} + +func TestRabbitMQMetadataProperties(t *testing.T) { + messagesWatcher := watcher.NewUnordered() + + // Define the test values for metadata with fixed IDs + const messageCount = 10 + const msgID = "msg-id-123" + const corrID = "corr-id-456" + const msgType = "test-type" + const contentType = "application/json" + + messages := make([]string, messageCount) + for i := range messageCount { + messages[i] = fmt.Sprintf("Test message %d", i+1) + } + + // Use a channel to collect metadata validation errors + metadataErrors := make(chan error, 1) + + // Application logic that tracks messages with their metadata + metadataApp := func(ctx flow.Context, s common.Service) (err error) { + // Setup the topic event handler for metadata testing + err = s.AddTopicEventHandler(&common.Subscription{ + PubsubName: pubsubMetadata, + Topic: topicMetadata, + Route: "/metadata", + Metadata: map[string]string{}, + }, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { + // ENHANCED DEBUGGING - Print every detail of the message + ctx.Logf("==================== MESSAGE RECEIVED ====================") + ctx.Logf("Topic: %s", e.Topic) + ctx.Logf("PubsubName: %s", e.PubsubName) + ctx.Logf("ID: %s", e.ID) + ctx.Logf("Data Type: %T", e.Data) + ctx.Logf("Data: %s", e.Data) + ctx.Logf("---------------- METADATA DUMP ----------------") + ctx.Logf("Total metadata entries: %d", len(e.Metadata)) + ctx.Logf("Raw metadata map: %+v", e.Metadata) + + ctx.Logf("---------------- TARGET METADATA VALUES ----------------") + msgIdVal, _ := getMetadataValueCI(e.Metadata, "messageid") + corrIdVal, _ := getMetadataValueCI(e.Metadata, "correlationid") + contentTypeVal, _ := getMetadataValueCI(e.Metadata, "contenttype") + typeVal, _ := getMetadataValueCI(e.Metadata, "type") + + ctx.Logf(" → messageID: '%s' (expected: '%s')", msgIdVal, msgID) + ctx.Logf(" → correlationID: '%s' (expected: '%s')", corrIdVal, corrID) + ctx.Logf(" → contentType: '%s' (expected: '%s')", contentTypeVal, contentType) + ctx.Logf(" → type: '%s' (expected: '%s')", typeVal, msgType) + + // Instead of failing silently, collect errors and send them to the channel + var metadataErr error + + if msgIdVal != msgID { + ctx.Logf("ERROR: messageID not found or incorrect: value=%s", msgIdVal) + metadataErr = fmt.Errorf("expected messageID: %s, got: %s", msgID, msgIdVal) + } + + if corrIdVal != corrID { + ctx.Logf("ERROR: correlationID not found or incorrect: value=%s", corrIdVal) + if metadataErr != nil { + metadataErr = fmt.Errorf("%w; expected correlationID: %s, got: %s", + metadataErr, corrID, corrIdVal) + } else { + metadataErr = fmt.Errorf("expected correlationID: %s, got: %s", corrID, corrIdVal) + } + } + + if contentTypeVal != contentType { + ctx.Logf("ERROR: contentType not found or incorrect: value=%s", contentTypeVal) + if metadataErr != nil { + metadataErr = fmt.Errorf("%w; expected contentType: %s, got: %s", + metadataErr, contentType, contentTypeVal) + } else { + metadataErr = fmt.Errorf("expected contentType: %s, got: %s", contentType, contentTypeVal) + } + } + + if typeVal != msgType { + ctx.Logf("ERROR: type not found or incorrect: value=%s", typeVal) + if metadataErr != nil { + metadataErr = fmt.Errorf("%w; expected type: %s, got: %s", + metadataErr, msgType, typeVal) + } else { + metadataErr = fmt.Errorf("expected type: %s, got: %s", msgType, typeVal) + } + } + + dataStr, ok := e.Data.(string) + if !ok { + return false, fmt.Errorf("e.Data is not a string, got %T", e.Data) + } + // If there are any metadata errors, send them to the channel + if metadataErr != nil { + ctx.Logf("Metadata validation failed: %s", metadataErr) + metadataErrors <- metadataErr + } + + messagesWatcher.Observe(dataStr) + ctx.Logf("Got message: %s with all expected metadata properties", e.Data) + return false, nil + }) + + return err + } + + // Test function to publish messages with metadata + testMetadata := func(ctx flow.Context) error { + // Get the Dapr client + client := sidecar.GetClient(ctx, sidecarNameMetadata) + messagesWatcher.ExpectStrings(messages...) + + // Publish messages with metadata properties + ctx.Log("Publishing messages with metadata properties") + for i := 0; i < messageCount; i++ { + err := client.PublishEvent(ctx, pubsubMetadata, topicMetadata, messages[i], + daprClient.PublishEventWithMetadata(map[string]string{ + "messageID": msgID, + "correlationID": corrID, + "contentType": contentType, + "type": msgType, + })) + require.NoError(ctx, err, "Failed publishing message with metadata") + } + + // Check for metadata errors with timeout + select { + case err := <-metadataErrors: + return fmt.Errorf("metadata validation failed: %w", err) + case <-time.After(5 * time.Second): + // No errors within timeout, continue with message assertion + } + + // Verify all messages were processed correctly + ctx.Log("Verifying messages were received...") + messagesWatcher.Assert(t, 20*time.Second) + + return nil + } + + // Run the test flow + flow.New(t, "rabbitmq metadata properties pubsub certification"). + // Start RabbitMQ container + Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step("wait for rabbitmq readiness", retry.Do(time.Second, 30, amqpReady(rabbitMQURL))). + // Run the metadata app and sidecar + Step(app.Run(appIDMetadata, fmt.Sprintf(":%d", appPort+10), metadataApp)). + Step(sidecar.Run(sidecarNameMetadata, + append(componentRuntimeOptions(), + embedded.WithComponentsPath("./components/metadata"), + embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+10)), + embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+20)), + embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+10)), + embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+10)), + embedded.WithGracefulShutdownDuration(2*time.Second), + )..., + )). + // Wait for subscription to complete + Step("wait for subscription setup", flow.Sleep(5*time.Second)). + // Run the test with timeout + Step("publish and verify metadata properties", testMetadata). + Run() +} + func componentRuntimeOptions() []embedded.Option { log := logger.NewLogger("dapr.components")