Skip to content

Commit

Permalink
Use RESP3 in valkey connections
Browse files Browse the repository at this point in the history
  • Loading branch information
secwall committed Feb 3, 2025
1 parent 7dccc04 commit f520524
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 101 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.6
github.com/stretchr/testify v1.10.0
github.com/valkey-io/valkey-go v1.0.53
github.com/valkey-io/valkey-go v1.0.54
gopkg.in/yaml.v2 v2.4.0
)

Expand Down Expand Up @@ -51,7 +51,6 @@ require (
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.29.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
Expand Down
14 changes: 6 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/onsi/gomega v1.34.1 h1:EUMJIKUjM8sKjYbtxQI9A4z2o+rruxnzNvpknOXie6k=
github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY=
github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=
Expand Down Expand Up @@ -240,8 +240,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/valkey-io/valkey-go v1.0.53 h1:bntDqQVPzkLdE/4ypXBrHalXJB+BOTMk+JwXNRCGudg=
github.com/valkey-io/valkey-go v1.0.53/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM=
github.com/valkey-io/valkey-go v1.0.54 h1:pmFRGcMRJW8mHvsWLd/2MSgY6i3WNygpUl904KUaxao=
github.com/valkey-io/valkey-go v1.0.54/go.mod h1:NE+C8cjb3+XvLazNhiorcLJGhJa9MBAkFNoAW/48/fk=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -271,8 +271,6 @@ golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand All @@ -291,8 +289,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
114 changes: 27 additions & 87 deletions internal/valkey/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func NewNode(config *config.Config, logger *slog.Logger, fqdn string) (*Node, er
Password: config.Valkey.AuthPassword,
Dialer: net.Dialer{Timeout: config.Valkey.DialTimeout},
ConnWriteTimeout: config.Valkey.WriteTimeout,
AlwaysRESP2: true,
ForceSingleClient: true,
DisableAutoPipelining: true,
DisableCache: true,
Expand Down Expand Up @@ -200,25 +199,32 @@ func (n *Node) configRewrite(ctx context.Context) error {
return n.conn.Do(ctx, n.conn.B().ConfigRewrite().Build()).Error()
}

// IsReplPaused returns pause status of replication on node
func (n *Node) IsReplPaused(ctx context.Context) (bool, error) {
// configGet returns str value of config key
func (n *Node) configGet(ctx context.Context, key string) (string, error) {
err := n.ensureConn()
if err != nil {
return false, err
return "", err
}
cmd := n.conn.Do(ctx, n.conn.B().ConfigGet().Parameter("repl-paused").Build())
cmd := n.conn.Do(ctx, n.conn.B().ConfigGet().Parameter(key).Build())
err = cmd.Error()
if err != nil {
return false, err
return "", err
}
vals, err := cmd.AsStrSlice()
vals, err := cmd.AsStrMap()
if err != nil {
return false, err
return "", err
}
if len(vals) != 2 {
return false, fmt.Errorf("unexpected config get result for repl-paused: %v", vals)
val, ok := vals[key]
if !ok {
return "", fmt.Errorf("unexpected config get result for %s: %v", key, vals)
}
return vals[1] == "yes", nil
return val, nil
}

// IsReplPaused returns pause status of replication on node
func (n *Node) IsReplPaused(ctx context.Context) (bool, error) {
val, err := n.configGet(ctx, "repl-paused")
return val == "yes", err
}

// PauseReplication pauses replication from master on node
Expand Down Expand Up @@ -251,23 +257,8 @@ func (n *Node) ResumeReplication(ctx context.Context) error {

// IsOffline returns Offline status for node
func (n *Node) IsOffline(ctx context.Context) (bool, error) {
err := n.ensureConn()
if err != nil {
return false, err
}
cmd := n.conn.Do(ctx, n.conn.B().ConfigGet().Parameter("offline").Build())
err = cmd.Error()
if err != nil {
return false, err
}
vals, err := cmd.AsStrSlice()
if err != nil {
return false, err
}
if len(vals) != 2 {
return false, fmt.Errorf("unexpected config get result for offline: %v", vals)
}
return vals[1] == "yes", nil
val, err := n.configGet(ctx, "offline")
return val == "yes", err
}

// SetOffline disallows non-localhost connections and drops all existing clients (except rdsync ones)
Expand Down Expand Up @@ -306,23 +297,11 @@ func (n *Node) DisconnectClients(ctx context.Context, ctype string) error {

// GetNumQuorumReplicas returns number of connected replicas to accept writes on node
func (n *Node) GetNumQuorumReplicas(ctx context.Context) (int, error) {
err := n.ensureConn()
if err != nil {
return 0, err
}
cmd := n.conn.Do(ctx, n.conn.B().ConfigGet().Parameter("quorum-replicas-to-write").Build())
err = cmd.Error()
val, err := n.configGet(ctx, "quorum-replicas-to-write")
if err != nil {
return 0, err
}
vals, err := cmd.AsStrSlice()
if err != nil {
return 0, err
}
if len(vals) != 2 {
return 0, fmt.Errorf("unexpected config get result for quorum-replicas-to-write: %v", vals)
}
ret, err := strconv.ParseInt(vals[1], 10, 32)
ret, err := strconv.ParseInt(val, 10, 32)
if err != nil {
return 0, fmt.Errorf("unable to parse quorum-replicas-to-write value: %s", err.Error())
}
Expand All @@ -345,23 +324,11 @@ func (n *Node) SetNumQuorumReplicas(ctx context.Context, value int) (error, erro

// GetQuorumReplicas returns a set of quorum replicas
func (n *Node) GetQuorumReplicas(ctx context.Context) (string, error) {
err := n.ensureConn()
if err != nil {
return "", err
}
cmd := n.conn.Do(ctx, n.conn.B().ConfigGet().Parameter("quorum-replicas").Build())
err = cmd.Error()
if err != nil {
return "", err
}
vals, err := cmd.AsStrSlice()
val, err := n.configGet(ctx, "quorum-replicas")
if err != nil {
return "", err
}
if len(vals) != 2 {
return "", fmt.Errorf("unexpected config get result for quorum-replicas: %v", vals)
}
split := strings.Split(vals[1], " ")
split := strings.Split(val, " ")
sort.Strings(split)
return strings.Join(split, " "), nil
}
Expand Down Expand Up @@ -400,23 +367,8 @@ func (n *Node) EmptyQuorumReplicas(ctx context.Context) error {

// GetAppendonly returns a setting of appendonly config
func (n *Node) GetAppendonly(ctx context.Context) (bool, error) {
err := n.ensureConn()
if err != nil {
return false, err
}
cmd := n.conn.Do(ctx, n.conn.B().ConfigGet().Parameter("appendonly").Build())
err = cmd.Error()
if err != nil {
return false, err
}
vals, err := cmd.AsStrSlice()
if err != nil {
return false, err
}
if len(vals) != 2 {
return false, fmt.Errorf("unexpected config get result for repl-paused: %v", vals)
}
return vals[1] == "yes", nil
val, err := n.configGet(ctx, "appendonly")
return val == "yes", err
}

// SetOffline disallows non-localhost connections and drops all existing clients (except rdsync ones)
Expand All @@ -438,23 +390,11 @@ func (n *Node) SetAppendonly(ctx context.Context, value bool) error {

// GetMinReplicasToWrite returns number of replicas required to write on node
func (n *Node) GetMinReplicasToWrite(ctx context.Context) (int64, error) {
err := n.ensureConn()
val, err := n.configGet(ctx, "min-replicas-to-write")
if err != nil {
return 0, err
}
cmd := n.conn.Do(ctx, n.conn.B().ConfigGet().Parameter("min-replicas-to-write").Build())
err = cmd.Error()
if err != nil {
return 0, err
}
vals, err := cmd.AsStrSlice()
if err != nil {
return 0, err
}
if len(vals) != 2 {
return 0, fmt.Errorf("unexpected config get result for min-replicas-to-write: %v", vals)
}
ret, err := strconv.ParseInt(vals[1], 10, 64)
ret, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return 0, fmt.Errorf("unable to parse min-replicas-to-write value: %s", err.Error())
}
Expand Down
1 change: 0 additions & 1 deletion internal/valkey/senticache.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ func NewRemoteSentiCacheNode(config *config.Config, host string, logger *slog.Lo
Password: config.SentinelMode.CacheAuthPassword,
Dialer: net.Dialer{Timeout: config.Valkey.DialTimeout},
ConnWriteTimeout: config.Valkey.WriteTimeout,
AlwaysRESP2: true,
ForceSingleClient: true,
DisableAutoPipelining: true,
DisableCache: true,
Expand Down
15 changes: 12 additions & 3 deletions tests/rdsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ func (tctx *testContext) connectZookeeper(addrs []string, timeout time.Duration)
func (tctx *testContext) connectValkey(addr string, timeout time.Duration) (client.Client, error) {
opts := client.ClientOption{
InitAddress: []string{addr},
AlwaysRESP2: true,
ForceSingleClient: true,
DisableAutoPipelining: true,
DisableCache: true,
Expand Down Expand Up @@ -250,7 +249,6 @@ func (tctx *testContext) connectValkey(addr string, timeout time.Duration) (clie
func (tctx *testContext) connectSenticache(addr string, timeout time.Duration) (client.Client, error) {
opts := client.ClientOption{
InitAddress: []string{addr},
AlwaysRESP2: true,
ForceSingleClient: true,
DisableAutoPipelining: true,
DisableCache: true,
Expand Down Expand Up @@ -343,7 +341,18 @@ func (tctx *testContext) runValkeyCmd(host string, cmd []string) (string, error)
tctx.valkeyCmdResult = err.Error()
return tctx.valkeyCmdResult, err
}
if message.IsArray() {
if message.IsMap() {
strMap, err := message.AsStrMap()
if err != nil {
tctx.valkeyCmdResult = err.Error()
} else {
var pairs []string
for k, v := range strMap {
pairs = append(pairs, k+" "+v)
}
tctx.valkeyCmdResult = strings.Join(pairs, " ")
}
} else if message.IsArray() {
strSlice, err := message.AsStrSlice()
if err != nil {
tctx.valkeyCmdResult = err.Error()
Expand Down

0 comments on commit f520524

Please sign in to comment.