Skip to content

Commit

Permalink
add change log and add tls to stream
Browse files Browse the repository at this point in the history
Signed-off-by: zhangchao <[email protected]>
  • Loading branch information
Taction committed Sep 18, 2023
1 parent 958dadc commit d39125a
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General**: Updated AWS SDK and updated all the aws scalers ([#4905](https://github.com/kedacore/keda/issues/4905))
- **Azure Pod Identity**: Introduce validation to prevent usage of empty identity ID for Azure identity providers ([#4528](https://github.com/kedacore/keda/issues/4528))
- **Prometheus Scaler**: Remove trailing whitespaces in customAuthHeader and customAuthValue ([#4960](https://github.com/kedacore/keda/issues/4960))
- **Redis Scalers**: Add TLS authentication support for Redis and Redis stream scalers ([#4917](https://github.com/kedacore/keda/issues/4917))

### Fixes
- **RabbitMQ Scaler**: Allow subpaths along with vhost in connection string ([#2634](https://github.com/kedacore/keda/issues/2634))
Expand Down
54 changes: 31 additions & 23 deletions pkg/scalers/redis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,67 +192,75 @@ func createRedisScalerWithClient(client *redis.Client, meta *redisMetadata, scri
}
}

func parseRedisMetadata(config *ScalerConfig, parserFn redisAddressParser) (*redisMetadata, error) {
connInfo, err := parserFn(config.TriggerMetadata, config.ResolvedEnv, config.AuthParams)
if err != nil {
return nil, err
}
meta := redisMetadata{
connectionInfo: connInfo,
}

func parseTLSConfigIntoConnectionInfo(config *ScalerConfig, connInfo *redisConnectionInfo) error {
enableTLS := defaultEnableTLS
if val, ok := config.TriggerMetadata["enableTLS"]; ok {
tls, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("enableTLS parsing error %w", err)
return fmt.Errorf("enableTLS parsing error %w", err)
}
enableTLS = tls
}

meta.connectionInfo.unsafeSsl = false
connInfo.unsafeSsl = false
if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
parsedVal, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing unsafeSsl: %w", err)
return fmt.Errorf("error parsing unsafeSsl: %w", err)
}
meta.connectionInfo.unsafeSsl = parsedVal
connInfo.unsafeSsl = parsedVal
}

// parse tls config defined in auth params
if val, ok := config.AuthParams["tls"]; ok {
val = strings.TrimSpace(val)
if enableTLS {
return nil, errors.New("unable to set `tls` in both ScaledObject and TriggerAuthentication together")
return errors.New("unable to set `tls` in both ScaledObject and TriggerAuthentication together")
}
switch val {
case stringEnable:
enableTLS = true
case stringDisable:
enableTLS = false
default:
return nil, fmt.Errorf("error incorrect TLS value given, got %s", val)
return fmt.Errorf("error incorrect TLS value given, got %s", val)
}
}
if enableTLS {
certGiven := config.AuthParams["cert"] != ""
keyGiven := config.AuthParams["key"] != ""
if certGiven && !keyGiven {
return nil, errors.New("key must be provided with cert")
return errors.New("key must be provided with cert")
}
if keyGiven && !certGiven {
return nil, errors.New("cert must be provided with key")
return errors.New("cert must be provided with key")
}
meta.connectionInfo.ca = config.AuthParams["ca"]
meta.connectionInfo.cert = config.AuthParams["cert"]
meta.connectionInfo.key = config.AuthParams["key"]
connInfo.ca = config.AuthParams["ca"]
connInfo.cert = config.AuthParams["cert"]
connInfo.key = config.AuthParams["key"]
if value, found := config.AuthParams["keyPassword"]; found {
meta.connectionInfo.keyPassword = value
connInfo.keyPassword = value
} else {
meta.connectionInfo.keyPassword = ""
connInfo.keyPassword = ""
}
}
meta.connectionInfo.enableTLS = enableTLS
connInfo.enableTLS = enableTLS
return nil
}

func parseRedisMetadata(config *ScalerConfig, parserFn redisAddressParser) (*redisMetadata, error) {
connInfo, err := parserFn(config.TriggerMetadata, config.ResolvedEnv, config.AuthParams)
if err != nil {
return nil, err
}
meta := redisMetadata{
connectionInfo: connInfo,
}

err = parseTLSConfigIntoConnectionInfo(config, &meta.connectionInfo)
if err != nil {
return nil, err
}

meta.listLength = defaultListLength
if val, ok := config.TriggerMetadata["listLength"]; ok {
Expand Down
19 changes: 3 additions & 16 deletions pkg/scalers/redis_streams_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,22 +265,9 @@ func parseRedisStreamsMetadata(config *ScalerConfig, parseFn redisAddressParser)
connectionInfo: connInfo,
}

meta.connectionInfo.enableTLS = defaultEnableTLS
if val, ok := config.TriggerMetadata["enableTLS"]; ok {
tls, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("enableTLS parsing error %w", err)
}
meta.connectionInfo.enableTLS = tls
}

meta.connectionInfo.unsafeSsl = false
if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
parsedVal, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing unsafeSsl: %w", err)
}
meta.connectionInfo.unsafeSsl = parsedVal
err = parseTLSConfigIntoConnectionInfo(config, &meta.connectionInfo)
if err != nil {
return nil, err
}

if val, ok := config.TriggerMetadata[streamNameMetadata]; ok {
Expand Down
37 changes: 37 additions & 0 deletions pkg/scalers/redis_streams_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,43 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) {
},
wantErr: nil,
},
{
name: "tls in auth param",
metadata: map[string]string{
"hosts": "a, b, c",
"ports": "1, 2, 3",
"stream": "my-stream",
"pendingEntriesCount": "5",
"consumerGroup": "consumer1",
},
authParams: map[string]string{
"password": "password",
"tls": "enable",
"ca": "caaa",
"cert": "ceert",
"key": "keey",
"keyPassword": "keeyPassword",
},
wantMeta: &redisStreamsMetadata{
streamName: "my-stream",
targetPendingEntriesCount: 5,
activationLagCount: 0,
consumerGroupName: "consumer1",
connectionInfo: redisConnectionInfo{
addresses: []string{"a:1", "b:2", "c:3"},
hosts: []string{"a", "b", "c"},
ports: []string{"1", "2", "3"},
password: "password",
enableTLS: true,
ca: "caaa",
cert: "ceert",
key: "keey",
keyPassword: "keeyPassword",
},
scaleFactor: xPendingFactor,
},
wantErr: nil,
},
{
name: "stream is provided",
metadata: map[string]string{
Expand Down

0 comments on commit d39125a

Please sign in to comment.