Skip to content

Commit

Permalink
Merge pull request #68 from blnkfinance/jerry/redis-fix
Browse files Browse the repository at this point in the history
fix(redis): improve URL parsing for Managed and Docker Redis connections
  • Loading branch information
jerry-enebeli authored Dec 23, 2024
2 parents 25ae315 + 421d7fe commit c1435a4
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 13 deletions.
3 changes: 1 addition & 2 deletions blnk.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package blnk
import (
"context"
"embed"
"fmt"

"github.com/typesense/typesense-go/typesense/api"

Expand Down Expand Up @@ -60,7 +59,7 @@ func NewBlnk(db database.IDataSource) (*Blnk, error) {
if err != nil {
return nil, err
}
redisClient, err := redis_db.NewRedisClient([]string{fmt.Sprintf("redis://%s", configuration.Redis.Dns)})
redisClient, err := redis_db.NewRedisClient([]string{configuration.Redis.Dns})
if err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion cmd/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/jerry-enebeli/blnk"
"github.com/jerry-enebeli/blnk/config"
redis_db "github.com/jerry-enebeli/blnk/internal/redis-db"
trace "github.com/jerry-enebeli/blnk/internal/traces"
"github.com/jerry-enebeli/blnk/model"

Expand Down Expand Up @@ -173,8 +174,13 @@ func workerCommands(b *blnkInstance) *cobra.Command {
}

// Initialize the Asynq server with Redis as the backend and the queue configuration.
redisOption, err := redis_db.ParseRedisURL(conf.Redis.Dns)
if err != nil {
log.Fatalf("Error parsing Redis URL: %v", err)
}

srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: conf.Redis.Dns},
asynq.RedisClientOpt{Addr: redisOption.Addr, Password: redisOption.Password, DB: redisOption.DB},
asynq.Config{
Concurrency: 1, // Set the concurrency level for processing tasks
Queues: queues,
Expand Down
3 changes: 1 addition & 2 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package cache
import (
"context"
"errors"
"fmt"
"time"

"github.com/go-redis/cache/v9"
Expand Down Expand Up @@ -71,7 +70,7 @@ func NewCache() (Cache, error) {
}

// Initialize Redis cache with the configured Redis DNS
ca, err := newRedisCache([]string{fmt.Sprintf("redis://%s", cfg.Redis.Dns)})
ca, err := newRedisCache([]string{cfg.Redis.Dns})
if err != nil {
return nil, err
}
Expand Down
89 changes: 86 additions & 3 deletions internal/redis-db/redisdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -17,7 +17,10 @@ limitations under the License.
package redis_db

import (
"crypto/tls"
"errors"
"fmt"
"strings"

"github.com/redis/go-redis/v9"
)
Expand All @@ -29,6 +32,57 @@ type Redis struct {
client redis.UniversalClient // Redis universal client (works for both single and clustered Redis)
}

// parseRedisURL parses a Redis URL into Redis options, handling various URL formats
// including Azure Redis Cache URLs with special characters in passwords.
func ParseRedisURL(rawURL string) (*redis.Options, error) {
// Don't modify docker-style addresses (e.g. redis:6379)
if strings.Count(rawURL, ":") == 1 && !strings.Contains(rawURL, "@") && !strings.Contains(rawURL, "//") {
return &redis.Options{
Addr: rawURL,
}, nil
}

// Handle URLs that have redis:// prefix with password but no colon
if strings.HasPrefix(rawURL, "redis://") && strings.Contains(rawURL, "@") {
parts := strings.Split(strings.TrimPrefix(rawURL, "redis://"), "@")
if len(parts) == 2 {
rawURL = fmt.Sprintf("redis://:%s@%s", parts[0], parts[1])
}
}

// Parse the URL
opts, err := redis.ParseURL(rawURL)
if err != nil {
// If ParseURL fails, try manual parsing
host := rawURL
var password string

// Extract password if present
if strings.Contains(rawURL, "@") {
parts := strings.Split(rawURL, "@")
if len(parts) == 2 {
password = strings.TrimPrefix(parts[0], "redis://")
host = parts[1]
}
}

opts = &redis.Options{
Addr: host,
Password: password,
DB: 0,
}

// Enable TLS for Azure Redis
if strings.Contains(host, "redis.cache.windows.net") {
opts.TLSConfig = &tls.Config{
MinVersion: tls.VersionTLS12,
}
}
}

return opts, nil
}

// NewRedisClient creates a new Redis client connection based on the provided list of addresses.
// It automatically detects if the connection is for a single Redis instance or a Redis Cluster.
//
Expand All @@ -48,16 +102,45 @@ func NewRedisClient(addresses []string) (*Redis, error) {

// If a single address is provided, use it to create a standalone Redis client
if len(addresses) == 1 {
opts, err := redis.ParseURL(addresses[0])
opts, err := ParseRedisURL(addresses[0])
if err != nil {
return nil, err
}

client = redis.NewClient(opts)
} else {
// For multiple addresses, create a Redis Cluster client
// Parse each URL for cluster setup
var clusterAddrs []string
var password string
useTLS := false

for _, addr := range addresses {
opts, err := ParseRedisURL(addr)
if err != nil {
return nil, err
}
clusterAddrs = append(clusterAddrs, opts.Addr)

// Use the password from the first URL that has one
if password == "" && opts.Password != "" {
password = opts.Password
}

// Enable TLS if any URL requires it
if opts.TLSConfig != nil {
useTLS = true
}
}
var tlsConfig *tls.Config
if useTLS {
tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
}

client = redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: addresses,
Addrs: clusterAddrs,
Password: password,
TLSConfig: tlsConfig,
})
}

Expand Down
10 changes: 8 additions & 2 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/jerry-enebeli/blnk/config"
redis_db "github.com/jerry-enebeli/blnk/internal/redis-db"

"github.com/hibiken/asynq"
"github.com/jerry-enebeli/blnk/model"
Expand Down Expand Up @@ -57,8 +58,13 @@ type TransactionTypePayload struct {
// Returns:
// - *Queue: A pointer to the newly created Queue instance.
func NewQueue(conf *config.Configuration) *Queue {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: conf.Redis.Dns})
inspector := asynq.NewInspector(asynq.RedisClientOpt{Addr: conf.Redis.Dns})
redisOption, err := redis_db.ParseRedisURL(conf.Redis.Dns)
if err != nil {
log.Fatalf("Error parsing Redis URL: %v", err)
}
queueOptions := asynq.RedisClientOpt{Addr: redisOption.Addr, Password: redisOption.Password, DB: redisOption.DB}
client := asynq.NewClient(queueOptions)
inspector := asynq.NewInspector(queueOptions)
return &Queue{
Client: client,
Inspector: inspector,
Expand Down
13 changes: 10 additions & 3 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,23 @@ package blnk
import (
"context"
"encoding/json"
"log"
"testing"

"github.com/hibiken/asynq"
"github.com/jerry-enebeli/blnk/config"
redis_db "github.com/jerry-enebeli/blnk/internal/redis-db"
"github.com/stretchr/testify/assert"
)

func TestEnqueueImmediateTransactionSuccess(t *testing.T) {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
inspector := asynq.NewInspector(asynq.RedisClientOpt{Addr: "localhost:6379"})
redisOption, err := redis_db.ParseRedisURL("localhost:6379")
if err != nil {
log.Fatalf("Error parsing Redis URL: %v", err)
}
queueOptions := asynq.RedisClientOpt{Addr: redisOption.Addr, Password: redisOption.Password, DB: redisOption.DB}
client := asynq.NewClient(queueOptions)
inspector := asynq.NewInspector(queueOptions)

q := NewQueue(&config.Configuration{
Redis: config.RedisConfig{
Expand All @@ -39,7 +46,7 @@ func TestEnqueueImmediateTransactionSuccess(t *testing.T) {
q.Inspector = inspector

transaction := getTransactionMock(100, false)
_, err := json.Marshal(transaction)
_, err = json.Marshal(transaction)
assert.NoError(t, err)

err = q.Enqueue(context.Background(), &transaction)
Expand Down

0 comments on commit c1435a4

Please sign in to comment.