Skip to content

Commit

Permalink
support multi commands transaction in cluster mode
Browse files Browse the repository at this point in the history
  • Loading branch information
HDT3213 committed Jun 20, 2021
1 parent ef5639f commit 5d05e2e
Show file tree
Hide file tree
Showing 14 changed files with 312 additions and 34 deletions.
19 changes: 11 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@ middleware using golang.

Key Features:

- support string, list, hash, set, sorted set
- ttl
- publish/subscribe
- geo
- aof and aof rewrite
- Transaction. The `multi` command is Atomic and Isolated. If any errors are encountered during execution, godis will rollback the executed commands
- server side cluster which is transparent to client. You can connect to any node in the cluster to
- Support string, list, hash, set, sorted set
- TTL
- Publish/Subscribe
- GEO
- AOF and AOF Rewrite
- MULTI Commands Transaction is Atomic and Isolated. If any errors are encountered during execution, godis will rollback the executed commands
- Server-side Cluster which is transparent to client. You can connect to any node in the cluster to
access all data in the cluster.
- a concurrent core, so you don't have to worry about your commands blocking the server too much.
- `MSET`, `DEL` command is supported and atomically executed in cluster mode
- `Rename`, `RenameNX` command is supported within slot in cluster mode
- MULTI Commands Transaction is supported within slot in cluster mode
- Concurrent Core, so you don't have to worry about your commands blocking the server too much.

If you could read Chinese, you can find more details in [My Blog](https://www.cnblogs.com/Finley/category/1598973.html).

Expand Down
5 changes: 4 additions & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ Godis 是一个用 Go 语言实现的 Redis 服务器。本项目旨在为尝试
- 发布订阅
- 地理位置
- AOF 持久化及AOF重写
- 事务. Multi 命令开启的事务具有`原子性``隔离性`. 若在执行过程中遇到错误, godis 会回滚已执行的命令
- Multi 命令开启的事务具有`原子性``隔离性`. 若在执行过程中遇到错误, godis 会回滚已执行的命令
- 内置集群模式. 集群对客户端是透明的, 您可以像使用单机版 redis 一样使用 godis 集群
- `MSET`, `DEL` 命令在集群模式下原子性执行
- `Rename`, `RenameNX` 命令在集群模式下支持在同一个 slot 内执行
- Multi 命令开启的事务在集群模式下支持在同一个 slot 内执行
- 并行引擎, 无需担心您的操作会阻塞整个服务器.

可以在[我的博客](https://www.cnblogs.com/Finley/category/1598973.html)了解更多关于
Expand Down
34 changes: 27 additions & 7 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,25 +91,45 @@ func isAuthenticated(c redis.Connection) bool {
}

// Exec executes command on cluster
func (cluster *Cluster) Exec(c redis.Connection, cmdArgs [][]byte) (result redis.Reply) {
func (cluster *Cluster) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Reply) {
defer func() {
if err := recover(); err != nil {
logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
result = &reply.UnknownErrReply{}
}
}()
cmd := strings.ToLower(string(cmdArgs[0]))
if cmd == "auth" {
return godis.Auth(cluster.db, c, cmdArgs[1:])
cmdName := strings.ToLower(string(cmdLine[0]))
if cmdName == "auth" {
return godis.Auth(cluster.db, c, cmdLine[1:])
}
if !isAuthenticated(c) {
return reply.MakeErrReply("NOAUTH Authentication required")
}
cmdFunc, ok := router[cmd]

if cmdName == "multi" {
if len(cmdLine) != 1 {
return reply.MakeArgNumErrReply(cmdName)
}
return godis.StartMulti(cluster.db, c)
} else if cmdName == "discard" {
if len(cmdLine) != 1 {
return reply.MakeArgNumErrReply(cmdName)
}
return godis.DiscardMulti(cluster.db, c)
} else if cmdName == "exec" {
if len(cmdLine) != 1 {
return reply.MakeArgNumErrReply(cmdName)
}
return execMulti(cluster, c, nil)
}
if c != nil && c.InMultiState() {
return godis.EnqueueCmd(cluster.db, c, cmdLine)
}
cmdFunc, ok := router[cmdName]
if !ok {
return reply.MakeErrReply("ERR unknown command '" + cmd + "', or not supported in cluster mode")
return reply.MakeErrReply("ERR unknown command '" + cmdName + "', or not supported in cluster mode")
}
result = cmdFunc(cluster, c, cmdArgs)
result = cmdFunc(cluster, c, cmdLine)
return
}

Expand Down
95 changes: 95 additions & 0 deletions cluster/multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package cluster

import (
"github.com/hdt3213/godis"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/redis/reply"
)

const relayMulti = "_multi"

var relayMultiBytes = []byte(relayMulti)

// cmdLine == []string{"exec"}
func execMulti(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
if !conn.InMultiState() {
return reply.MakeErrReply("ERR EXEC without MULTI")
}
defer conn.SetMultiState(false)
cmdLines := conn.GetQueuedCmdLine()

// analysis related keys
keys := make([]string, 0) // may contains duplicate
for _, cl := range cmdLines {
wKeys, rKeys := cluster.db.GetRelatedKeys(cl)
keys = append(keys, wKeys...)
keys = append(keys, rKeys...)
}
if len(keys) == 0 {
// empty transaction or only `PING`s
return godis.ExecMulti(cluster.db, cmdLines)
}
groupMap := cluster.groupBy(keys)
if len(groupMap) > 1 {
return reply.MakeErrReply("ERR MULTI commands transaction must within one slot in cluster mode")
}
var peer string
// assert len(groupMap) == 1
for p := range groupMap {
peer = p
}

// out parser not support reply.MultiRawReply, so we have to encode it
if peer == cluster.self {
return godis.ExecMulti(cluster.db, cmdLines)
}
return execMultiOnOtherNode(cluster, conn, peer, cmdLines)
}

func execMultiOnOtherNode(cluster *Cluster, conn redis.Connection, peer string, cmdLines []CmdLine) redis.Reply {
defer func() {
conn.ClearQueuedCmds()
conn.SetMultiState(false)
}()
relayCmdLine := [][]byte{ // relay it to executing node
relayMultiBytes,
}
relayCmdLine = append(relayCmdLine, encodeCmdLine(cmdLines)...)
rawRelayResult := cluster.relay(peer, conn, relayCmdLine)
if reply.IsErrorReply(rawRelayResult) {
return rawRelayResult
}
relayResult, ok := rawRelayResult.(*reply.MultiBulkReply)
if !ok {
return reply.MakeErrReply("execute failed")
}
rep, err := parseEncodedMultiRawReply(relayResult.Args)
if err != nil {
return reply.MakeErrReply(err.Error())
}
return rep
}

// execRelayedMulti execute relayed multi commands transaction
// cmdLine format: _multi base64ed-cmdLine
// result format: base64ed-reply list
func execRelayedMulti(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
decoded, err := parseEncodedMultiRawReply(cmdLine[1:])
if err != nil {
return reply.MakeErrReply(err.Error())
}
var cmdLines []CmdLine
for _, rep := range decoded.Replies {
mbr, ok := rep.(*reply.MultiBulkReply)
if !ok {
return reply.MakeErrReply("exec failed")
}
cmdLines = append(cmdLines, mbr.Args)
}
rawResult := godis.ExecMulti(cluster.db, cmdLines)
resultMBR, ok := rawResult.(*reply.MultiRawReply)
if !ok {
return reply.MakeErrReply("exec failed")
}
return encodeMultiRawReply(resultMBR)
}
47 changes: 47 additions & 0 deletions cluster/multi_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package cluster

import (
"bytes"
"encoding/base64"
"github.com/hdt3213/godis/redis/parser"
"github.com/hdt3213/godis/redis/reply"
)

func encodeCmdLine(cmdLines []CmdLine) [][]byte {
var result [][]byte
for _, line := range cmdLines {
raw := reply.MakeMultiBulkReply(line).ToBytes()
encoded := make([]byte, base64.StdEncoding.EncodedLen(len(raw)))
base64.StdEncoding.Encode(encoded, raw)
result = append(result, encoded)
}
return result
}

func parseEncodedMultiRawReply(args [][]byte) (*reply.MultiRawReply, error) {
cmdBuf := new(bytes.Buffer)
for _, arg := range args {
dbuf := make([]byte, base64.StdEncoding.DecodedLen(len(arg)))
n, err := base64.StdEncoding.Decode(dbuf, arg)
if err != nil {
continue
}
cmdBuf.Write(dbuf[:n])
}
cmds, err := parser.ParseBytes(cmdBuf.Bytes())
if err != nil {
return nil, reply.MakeErrReply(err.Error())
}
return reply.MakeMultiRawReply(cmds), nil
}

func encodeMultiRawReply(src *reply.MultiRawReply) *reply.MultiBulkReply {
args := make([][]byte, 0, len(src.Replies))
for _, rep := range src.Replies {
raw := rep.ToBytes()
encoded := make([]byte, base64.StdEncoding.EncodedLen(len(raw)))
base64.StdEncoding.Encode(encoded, raw)
args = append(args, encoded)
}
return reply.MakeMultiBulkReply(args)
}
73 changes: 73 additions & 0 deletions cluster/multi_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package cluster

import (
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/reply"
"github.com/hdt3213/godis/redis/reply/asserts"
"testing"
)

func TestMultiExecOnSelf(t *testing.T) {
testCluster.db.Flush()
conn := new(connection.FakeConn)
result := testCluster.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result)
key := utils.RandString(10)
value := utils.RandString(10)
testCluster.Exec(conn, utils.ToCmdLine("set", key, value))
key2 := utils.RandString(10)
testCluster.Exec(conn, utils.ToCmdLine("rpush", key2, value))
result = testCluster.Exec(conn, utils.ToCmdLine("exec"))
asserts.AssertNotError(t, result)
result = testCluster.Exec(conn, utils.ToCmdLine("get", key))
asserts.AssertBulkReply(t, result, value)
result = testCluster.Exec(conn, utils.ToCmdLine("lrange", key2, "0", "-1"))
asserts.AssertMultiBulkReply(t, result, []string{value})
}

func TestEmptyMulti(t *testing.T) {
testCluster.db.Flush()
conn := new(connection.FakeConn)
result := testCluster.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result)
result = testCluster.Exec(conn, utils.ToCmdLine("PING"))
asserts.AssertNotError(t, result)
result = testCluster.Exec(conn, utils.ToCmdLine("EXEC"))
asserts.AssertNotError(t, result)
mbr := result.(*reply.MultiRawReply)
asserts.AssertStatusReply(t, mbr.Replies[0], "PONG")
}

func TestMultiExecOnOthers(t *testing.T) {
testCluster.db.Flush()
conn := new(connection.FakeConn)
result := testCluster.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result)
key := utils.RandString(10)
value := utils.RandString(10)
testCluster.Exec(conn, utils.ToCmdLine("rpush", key, value))
testCluster.Exec(conn, utils.ToCmdLine("lrange", key, "0", "-1"))

cmdLines := conn.GetQueuedCmdLine()
relayCmdLine := [][]byte{ // relay it to executing node
relayMultiBytes,
}
relayCmdLine = append(relayCmdLine, encodeCmdLine(cmdLines)...)
rawRelayResult := execRelayedMulti(testCluster, conn, relayCmdLine)
if reply.IsErrorReply(rawRelayResult) {
t.Error()
}
relayResult, ok := rawRelayResult.(*reply.MultiBulkReply)
if !ok {
t.Error()
}
rep, err := parseEncodedMultiRawReply(relayResult.Args)
if err != nil {
t.Error()
}
if len(rep.Replies) != 2 {
t.Errorf("expect 2 replies actual %d", len(rep.Replies))
}
asserts.AssertMultiBulkReply(t, rep.Replies[1], []string{value})
}
2 changes: 1 addition & 1 deletion cluster/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func makeRouter() map[string]CmdFunc {

routerMap["flushdb"] = FlushDB
routerMap["flushall"] = FlushAll
//routerMap["writeKeys"] = Keys
routerMap[relayMulti] = execRelayedMulti

return routerMap
}
Expand Down
File renamed without changes.
File renamed without changes.
6 changes: 3 additions & 3 deletions exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (db *DB) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Reply) {
return result
}
if c != nil && c.InMultiState() {
return enqueueCmd(db, c, cmdLine)
return EnqueueCmd(db, c, cmdLine)
}

// normal commands
Expand All @@ -60,12 +60,12 @@ func execSpecialCmd(c redis.Connection, cmdLine [][]byte, cmdName string, db *DB
if len(cmdLine) != 1 {
return reply.MakeArgNumErrReply(cmdName), true
}
return startMulti(db, c), true
return StartMulti(db, c), true
} else if cmdName == "discard" {
if len(cmdLine) != 1 {
return reply.MakeArgNumErrReply(cmdName), true
}
return discardMulti(db, c), true
return DiscardMulti(db, c), true
} else if cmdName == "exec" {
if len(cmdLine) != 1 {
return reply.MakeArgNumErrReply(cmdName), true
Expand Down
6 changes: 3 additions & 3 deletions geo.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,20 @@ func execGeoPos(db *DB, args [][]byte) redis.Reply {
return &reply.NullBulkReply{}
}

positions := make([][]byte, len(args)-1)
positions := make([]redis.Reply, len(args)-1)
for i := 0; i < len(args)-1; i++ {
member := string(args[i+1])
elem, exists := sortedSet.Get(member)
if !exists {
positions[i] = (&reply.EmptyMultiBulkReply{}).ToBytes()
positions[i] = (&reply.EmptyMultiBulkReply{})
continue
}
lat, lng := geohash.Decode(uint64(elem.Score))
lngStr := strconv.FormatFloat(lng, 'f', -1, 64)
latStr := strconv.FormatFloat(lat, 'f', -1, 64)
positions[i] = reply.MakeMultiBulkReply([][]byte{
[]byte(lngStr), []byte(latStr),
}).ToBytes()
})
}
return reply.MakeMultiRawReply(positions)
}
Expand Down
Loading

0 comments on commit 5d05e2e

Please sign in to comment.