Skip to content

Commit

Permalink
Merge pull request #1850 from ItalyPaleAle/fix-1845
Browse files Browse the repository at this point in the history
Fixing #1845
  • Loading branch information
berndverst authored Jul 5, 2022
2 parents cb0ffd1 + 7466f7f commit ab984ff
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 51 deletions.
104 changes: 73 additions & 31 deletions bindings/azure/servicebusqueues/metadata.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package servicebusqueues

import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"

Expand All @@ -27,6 +28,18 @@ type serviceBusQueuesMetadata struct {
}

const (
// Keys
connectionString = "connectionString"
namespaceName = "namespaceName"
queueName = "queueName"
timeoutInSec = "timeoutInSec"
maxConnectionRecoveryInSec = "maxConnectionRecoveryInSec"
minConnectionRecoveryInSec = "minConnectionRecoveryInSec"
maxRetriableErrorsPerSec = "maxRetriableErrorsPerSec"
maxActiveMessages = "maxActiveMessages"
lockRenewalInSec = "lockRenewalInSec"
maxConcurrentHandlers = "maxConcurrentHandlers"

// Default time to live for queues, which is 14 days. The same way Azure Portal does.
defaultMessageTimeToLive = time.Hour * 24 * 14

Expand All @@ -50,18 +63,19 @@ const (

// Default rate of retriable errors per second
defaultMaxRetriableErrorsPerSec = 10

errorMessagePrefix = "azure service bus error:"
)

func (a *AzureServiceBusQueues) parseMetadata(metadata bindings.Metadata) (*serviceBusQueuesMetadata, error) {
b, err := json.Marshal(metadata.Properties)
if err != nil {
return nil, err
m := serviceBusQueuesMetadata{}

if val, ok := metadata.Properties[connectionString]; ok && val != "" {
m.ConnectionString = val
}

var m serviceBusQueuesMetadata
err = json.Unmarshal(b, &m)
if err != nil {
return nil, err
if val, ok := metadata.Properties[namespaceName]; ok && val != "" {
m.NamespaceName = val
}

if m.ConnectionString != "" && m.NamespaceName != "" {
Expand All @@ -78,46 +92,74 @@ func (a *AzureServiceBusQueues) parseMetadata(metadata bindings.Metadata) (*serv
}
m.ttl = ttl

// Queue names are case-insensitive and are forced to lowercase. This mimics the Azure portal's behavior.
m.QueueName = strings.ToLower(m.QueueName)

if m.TimeoutInSec < 1 {
m.TimeoutInSec = defaultTimeoutInSec
if val, ok := metadata.Properties[queueName]; ok && val != "" {
// Queue names are case-insensitive and are forced to lowercase. This mimics the Azure portal's behavior.
m.QueueName = strings.ToLower(val)
}

if m.MinConnectionRecoveryInSec < 1 {
m.MinConnectionRecoveryInSec = defaultMinConnectionRecoveryInSec
/* Optional configuration settings - defaults will be set by the client. */
m.TimeoutInSec = defaultTimeoutInSec
if val, ok := metadata.Properties[timeoutInSec]; ok && val != "" {
m.TimeoutInSec, err = strconv.Atoi(val)
if err != nil {
return &m, fmt.Errorf("%s invalid timeoutInSec %s, %s", errorMessagePrefix, val, err)
}
}

if m.MaxConnectionRecoveryInSec < 1 {
m.MaxConnectionRecoveryInSec = defaultMaxConnectionRecoveryInSec
m.MinConnectionRecoveryInSec = defaultMinConnectionRecoveryInSec
if val, ok := metadata.Properties[minConnectionRecoveryInSec]; ok && val != "" {
m.MinConnectionRecoveryInSec, err = strconv.Atoi(val)
if err != nil {
return &m, fmt.Errorf("%s invalid minConnectionRecoveryInSec %s, %s", errorMessagePrefix, val, err)
}
}

if m.MinConnectionRecoveryInSec > m.MaxConnectionRecoveryInSec {
return nil, errors.New("maxConnectionRecoveryInSec must be greater than minConnectionRecoveryInSec")
m.MaxConnectionRecoveryInSec = defaultMaxConnectionRecoveryInSec
if val, ok := metadata.Properties[maxConnectionRecoveryInSec]; ok && val != "" {
m.MaxConnectionRecoveryInSec, err = strconv.Atoi(val)
if err != nil {
return &m, fmt.Errorf("%s invalid maxConnectionRecoveryInSec %s, %s", errorMessagePrefix, val, err)
}
}

if m.MaxActiveMessages < 1 {
m.MaxActiveMessages = defaultMaxActiveMessages
m.MaxActiveMessages = defaultMaxActiveMessages
if val, ok := metadata.Properties[maxActiveMessages]; ok && val != "" {
m.MaxActiveMessages, err = strconv.Atoi(val)
if err != nil {
return &m, fmt.Errorf("%s invalid maxActiveMessages %s, %s", errorMessagePrefix, val, err)
}
}

if m.MaxConcurrentHandlers < 1 {
m.MaxConcurrentHandlers = defaultMaxConcurrentHandlers
m.MaxConcurrentHandlers = defaultMaxConcurrentHandlers
if val, ok := metadata.Properties[maxConcurrentHandlers]; ok && val != "" {
m.MaxConcurrentHandlers, err = strconv.Atoi(val)
if err != nil {
return &m, fmt.Errorf("%s invalid maxConcurrentHandlers %s, %s", errorMessagePrefix, val, err)
}
}

if m.MaxConcurrentHandlers > m.MaxActiveMessages {
return nil, errors.New("maxConcurrentHandlers cannot be bigger than maxActiveMessages")
return nil, fmt.Errorf("%s maxConcurrentHandlers cannot be bigger than maxActiveMessages, %s", errorMessagePrefix, err)
}

if m.LockRenewalInSec < 1 {
m.LockRenewalInSec = defaultLockRenewalInSec
m.LockRenewalInSec = defaultLockRenewalInSec
if val, ok := metadata.Properties[lockRenewalInSec]; ok && val != "" {
m.LockRenewalInSec, err = strconv.Atoi(val)
if err != nil {
return &m, fmt.Errorf("%s invalid lockRenewalInSec %s, %s", errorMessagePrefix, val, err)
}
}

if m.MaxRetriableErrorsPerSec == nil {
m.MaxRetriableErrorsPerSec = to.Ptr(defaultMaxRetriableErrorsPerSec)
}
if *m.MaxRetriableErrorsPerSec < 0 {
return nil, errors.New("maxRetriableErrorsPerSec must be non-negative")
m.MaxRetriableErrorsPerSec = to.Ptr(defaultMaxRetriableErrorsPerSec)
if val, ok := metadata.Properties[maxRetriableErrorsPerSec]; ok && val != "" {
mRetriableErrorsPerSec, err := strconv.Atoi(val)
if err != nil {
return &m, fmt.Errorf("%s invalid lockRenewalInSec %s, %s", errorMessagePrefix, val, err)
}
if mRetriableErrorsPerSec < 0 {
return nil, fmt.Errorf("%smaxRetriableErrorsPerSec must be non-negative, %s", errorMessagePrefix, err)
}
m.MaxRetriableErrorsPerSec = to.Ptr(mRetriableErrorsPerSec)
}

return &m, nil
Expand Down
82 changes: 62 additions & 20 deletions bindings/azure/servicebusqueues/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,74 @@ func TestParseMetadata(t *testing.T) {
oneSecondDuration := time.Second

testCases := []struct {
name string
properties map[string]string
expectedConnectionString string
expectedQueueName string
expectedTTL time.Duration
name string
properties map[string]string
expectedConnectionString string
expectedQueueName string
expectedTTL time.Duration
expectedTimeoutInSec int
expectedMaxConnectionRecoveryInSec int
expectedMinConnectionRecoveryInSec int
expectedMaxRetriableErrorsPerSec int
expectedMaxActiveMessages int
expectedLockRenewalInSec int
expectedMaxConcurrentHandlers int
}{
{
name: "ConnectionString and queue name",
properties: map[string]string{"connectionString": "connString", "queueName": "queue1"},
expectedConnectionString: "connString",
expectedQueueName: "queue1",
expectedTTL: defaultMessageTimeToLive,
name: "ConnectionString and queue name",
properties: map[string]string{"connectionString": "connString", "queueName": "queue1"},
expectedConnectionString: "connString",
expectedQueueName: "queue1",
expectedTTL: defaultMessageTimeToLive,
expectedTimeoutInSec: defaultTimeoutInSec,
expectedMaxConnectionRecoveryInSec: defaultMaxConnectionRecoveryInSec,
expectedMinConnectionRecoveryInSec: defaultMinConnectionRecoveryInSec,
expectedMaxRetriableErrorsPerSec: defaultMaxRetriableErrorsPerSec,
expectedMaxActiveMessages: defaultMaxActiveMessages,
expectedLockRenewalInSec: defaultLockRenewalInSec,
expectedMaxConcurrentHandlers: defaultMaxConcurrentHandlers,
},
{
name: "Empty TTL",
properties: map[string]string{"connectionString": "connString", "queueName": "queue1", metadata.TTLMetadataKey: ""},
expectedConnectionString: "connString",
expectedQueueName: "queue1",
expectedTTL: defaultMessageTimeToLive,
name: "ConnectionString, queue name and all optional values",
properties: map[string]string{"connectionString": "connString", "queueName": "queue1", "timeoutInSec": "30", "minConnectionRecoveryInSec": "1", "maxConnectionRecoveryInSec": "200", "maxRetriableErrorsPerSec": "20", "maxActiveMessages": "10", "maxConcurrentHandlers": "2", "lockRenewalInSec": "30"},
expectedConnectionString: "connString",
expectedQueueName: "queue1",
expectedTTL: defaultMessageTimeToLive,
expectedTimeoutInSec: 30,
expectedMaxConnectionRecoveryInSec: 200,
expectedMinConnectionRecoveryInSec: 1,
expectedMaxRetriableErrorsPerSec: 20,
expectedMaxActiveMessages: 10,
expectedMaxConcurrentHandlers: 2,
expectedLockRenewalInSec: 30,
},
{
name: "With TTL",
properties: map[string]string{"connectionString": "connString", "queueName": "queue1", metadata.TTLMetadataKey: "1"},
expectedConnectionString: "connString",
expectedQueueName: "queue1",
expectedTTL: oneSecondDuration,
name: "Empty TTL",
properties: map[string]string{"connectionString": "connString", "queueName": "queue1", metadata.TTLMetadataKey: ""},
expectedConnectionString: "connString",
expectedQueueName: "queue1",
expectedTTL: defaultMessageTimeToLive,
expectedTimeoutInSec: defaultTimeoutInSec,
expectedMaxConnectionRecoveryInSec: defaultMaxConnectionRecoveryInSec,
expectedMinConnectionRecoveryInSec: defaultMinConnectionRecoveryInSec,
expectedMaxRetriableErrorsPerSec: defaultMaxRetriableErrorsPerSec,
expectedMaxActiveMessages: defaultMaxActiveMessages,
expectedLockRenewalInSec: defaultLockRenewalInSec,
expectedMaxConcurrentHandlers: defaultMaxConcurrentHandlers,
},
{
name: "With TTL",
properties: map[string]string{"connectionString": "connString", "queueName": "queue1", metadata.TTLMetadataKey: "1"},
expectedConnectionString: "connString",
expectedQueueName: "queue1",
expectedTTL: oneSecondDuration,
expectedTimeoutInSec: defaultTimeoutInSec,
expectedMaxConnectionRecoveryInSec: defaultMaxConnectionRecoveryInSec,
expectedMinConnectionRecoveryInSec: defaultMinConnectionRecoveryInSec,
expectedMaxRetriableErrorsPerSec: defaultMaxRetriableErrorsPerSec,
expectedMaxActiveMessages: defaultMaxActiveMessages,
expectedLockRenewalInSec: defaultLockRenewalInSec,
expectedMaxConcurrentHandlers: defaultMaxConcurrentHandlers,
},
}

Expand Down

0 comments on commit ab984ff

Please sign in to comment.