Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Support for Redis Cluster Client #46

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
log "github.com/massenz/slf4go/logging"
protos "github.com/massenz/statemachine-proto/golang/api"
"net"
"strings"
"sync"
)

Expand Down Expand Up @@ -76,7 +77,11 @@ func main() {
var localOnly = flag.Bool("local", false,
"If set, it only listens to incoming requests from the local host")
var port = flag.Int("http-port", 7399, "HTTP Server port for the REST API")
var redisUrl = flag.String("redis", "", "URI for the Redis cluster (host:port)")
var redisUrl = flag.String("redis", "", "For single node redis instances: URI "+
"for the Redis instance (host:port). For redis clusters: a comma-separated list of redis nodes. "+
"If using an ElastiCache redis cluster with cluster mode enabled, you can supply the configuration endpoint.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"If using an ElastiCache redis cluster with cluster mode enabled, you can supply the configuration endpoint.")
"If using an ElastiCache Redis cluster with cluster mode enabled, you can supply the configuration endpoint.")

Does it apply on any Redis deployment in cluster mode? or just Elasticache? If any, then we can remove the "Elasticache" part

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, updated

var cluster = flag.Bool("cluster", false,
z-cran marked this conversation as resolved.
Show resolved Hide resolved
"Needs to be set if connecting to a Redis instance with cluster mode enabled")
var awsEndpoint = flag.String("endpoint-url", "",
"HTTP URL for AWS SQS to connect to; usually best left undefined, "+
"unless required for local testing purposes (LocalStack uses http://localhost:4566)")
Expand All @@ -102,13 +107,20 @@ func main() {
}
addr := fmt.Sprintf("%s:%d", host, *port)

if *redisUrl == "" {
logger.Warn("in-memory storage configured, all data will NOT survive a server restart")
store = storage.NewInMemoryStore()
} else {
if *cluster {
redisNodes := strings.Split(*redisUrl, ",")
z-cran marked this conversation as resolved.
Show resolved Hide resolved
logger.Info("Connecting to Redis cluster at with the following nodes:")
for _, n := range redisNodes {
z-cran marked this conversation as resolved.
Show resolved Hide resolved
logger.Info(n)
}
store = storage.NewRedisClusterStore(redisNodes, *timeout, *maxRetries)
} else if *redisUrl != "" {
logger.Info("Connecting to Redis server at %s", *redisUrl)
logger.Info("with timeout: %s, max-retries: %d", *timeout, *maxRetries)
store = storage.NewRedisStore(*redisUrl, 1, *timeout, *maxRetries)
} else {
logger.Warn("in-memory storage configured, all data will NOT survive a server restart")
store = storage.NewInMemoryStore()
}
server.SetStore(store)

Expand Down
4 changes: 2 additions & 2 deletions docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ then
endpoint="--endpoint-url ${AWS_ENDPOINT}"
fi

echo -e "./sm-server -port ${SERVER_PORT} ${endpoint:-} ${DEBUG}\n\
echo -e "./sm-server -port ${SERVER_PORT} ${endpoint:-} ${CLUSTER} ${DEBUG}\n\
-redis ${REDIS}:${REDIS_PORT} -timeout ${TIMEOUT:-25ms} -max-retries ${RETRIES:-3}\n\
-events ${EVENTS_Q} -errors ${ERRORS_Q}\n\
$@"

./sm-server -http-port "${SERVER_PORT}" ${endpoint:-} ${DEBUG} \
./sm-server -http-port "${SERVER_PORT}" ${endpoint:-} ${CLUSTER} ${DEBUG} \
-redis ${REDIS}:${REDIS_PORT} -timeout ${TIMEOUT:-25ms} -max-retries ${RETRIES:-3} \
-events "${EVENTS_Q}" -errors "${ERRORS_Q}" \
"$@"
22 changes: 21 additions & 1 deletion storage/redis_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package storage

import (
"context"
"crypto/tls"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
Expand All @@ -42,7 +43,7 @@ const (

type RedisStore struct {
logger *slf4go.Log
client *redis.Client
client RedisClient
z-cran marked this conversation as resolved.
Show resolved Hide resolved
Timeout time.Duration
MaxRetries int
}
Expand Down Expand Up @@ -142,11 +143,30 @@ func NewRedisStoreWithDefaults(address string) StoreManager {
return NewRedisStore(address, DefaultRedisDb, DefaultTimeout, DefaultMaxRetries)
}

func NewRedisClusterStore(addresses []string, timeout time.Duration, maxRetries int) StoreManager {
logger := slf4go.NewLog(fmt.Sprintf("redis cluster: %v nodes", len(addresses)))
z-cran marked this conversation as resolved.
Show resolved Hide resolved

return &RedisStore{
logger: logger,
client: redis.NewClusterClient(&redis.ClusterOptions{
TLSConfig: &tls.Config{
z-cran marked this conversation as resolved.
Show resolved Hide resolved
MinVersion: tls.VersionTLS12,
},
Addrs: addresses,
}),
Timeout: timeout,
MaxRetries: maxRetries,
}
}

func NewRedisStore(address string, db int, timeout time.Duration, maxRetries int) StoreManager {
logger := slf4go.NewLog(fmt.Sprintf("redis://%s/%d", address, db))
return &RedisStore{
logger: logger,
client: redis.NewClient(&redis.Options{
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
Addr: address,
DB: db, // 0 means default DB
}),
Expand Down
8 changes: 8 additions & 0 deletions storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package storage

import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
log "github.com/massenz/slf4go/logging"
protos "github.com/massenz/statemachine-proto/golang/api"
"time"
Expand Down Expand Up @@ -69,3 +71,9 @@ type StoreManager interface {
GetTimeout() time.Duration
Health() error
}

type RedisClient interface {
z-cran marked this conversation as resolved.
Show resolved Hide resolved
Get(ctx context.Context, id string) *redis.StringCmd
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd
Ping(ctx context.Context) *redis.StatusCmd
}