Skip to content

Commit

Permalink
back to stable version
Browse files Browse the repository at this point in the history
  • Loading branch information
LLiuJJ committed Sep 25, 2024
1 parent 9910ca7 commit a228f67
Show file tree
Hide file tree
Showing 25 changed files with 1,371 additions and 568 deletions.
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ IMAGE_VERSION := v0.0.1

BUILDER_IMAGE := $(or $(BUILDER_IMAGE),eraft/eraftbook:$(IMAGE_VERSION))

default: meta_cli shard_server shard_cli meta_server
default: meta_cli shard_server shard_cli meta_server kv_server kv_cli

image:
docker build -f Dockerfile --network=host -t $(BUILDER_IMAGE) .
Expand All @@ -49,6 +49,12 @@ shard_server:
shard_cli:
go build -o output/shardcli cmd/shardcli/shardcli.go

kv_server:
go build -o output/kvserver cmd/kvraft/kvserver.go

kv_cli:
go build -o output/kvcli cmd/kvcli/kvcli.go

clean:
rm -rf output/*

Expand All @@ -69,3 +75,5 @@ run-demo:

stop-demo:
docker stop kvserver-node1 kvserver-node2 kvserver-node3 kvserver-node4 kvserver-node5 kvserver-node6 metaserver-node1 metaserver-node2 metaserver-node3


6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,9 @@ go test -run TestBasicClusterRW tests/integration_test.go -v
```
go test -run TestClusterRwBench tests/integration_test.go -v
```

运行单分片集群读写基准测试

```
go test -run TestClusterSingleShardRwBench tests/integration_test.go -v
```
138 changes: 138 additions & 0 deletions cmd/kvcli/kvcli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
//
// MIT License

// Copyright (c) 2022 eraft dev group

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:

// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.

// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//

package main

import (
"context"
"crypto/rand"
"fmt"
"math/big"
"os"
"os/signal"
"strconv"
"time"

"github.com/eraft-io/eraft/common"
pb "github.com/eraft-io/eraft/raftpb"

"github.com/eraft-io/eraft/raftcore"
)

type KvClient struct {
rpcCli *raftcore.RaftClientEnd
leaderId int64
clientId int64
commandId int64
}

func (kvCli *KvClient) Close() {
kvCli.rpcCli.CloseAllConn()
}

func nrand() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
return bigx.Int64()
}

func MakeKvClient(targetId int, targetAddr string) *KvClient {
cli := raftcore.MakeRaftClientEnd(targetAddr, uint64(targetId))
return &KvClient{
rpcCli: cli,
leaderId: 0,
clientId: nrand(),
commandId: 0,
}
}

func (kvCli *KvClient) Get(key string) string {
cmdReq := &pb.CommandRequest{
Key: key,
OpType: pb.OpType_OpGet,
ClientId: kvCli.clientId,
}
resp, err := (*kvCli.rpcCli.GetRaftServiceCli()).DoCommand(context.Background(), cmdReq)
if err != nil {
return "err"
}
return resp.Value
}

func (kvCli *KvClient) Put(key, value string) string {
cmdReq := &pb.CommandRequest{
Key: key,
Value: value,
ClientId: kvCli.clientId,
OpType: pb.OpType_OpPut,
}
_, err := (*kvCli.rpcCli.GetRaftServiceCli()).DoCommand(context.Background(), cmdReq)
if err != nil {
return "err"
}
return "ok"
}

func main() {
if len(os.Args) < 2 {
fmt.Println("usage: kvcli [serveraddr] [count]")
return
}
sigs := make(chan os.Signal, 1)

kvCli := MakeKvClient(99, os.Args[1])

count, err := strconv.Atoi(os.Args[2])
if err != nil {
panic(err)
}

sigChan := make(chan os.Signal)
signal.Notify(sigChan)

go func() {
sig := <-sigs
fmt.Println(sig)
kvCli.rpcCli.CloseAllConn()
os.Exit(-1)
}()

keys := make([]string, count)
vals := make([]string, count)

for i := 0; i < count; i++ {
rndK := common.RandStringRunes(8)
rndV := common.RandStringRunes(8)
keys[i] = rndK
vals[i] = rndV
}

startTs := time.Now()
for i := 0; i < count; i++ {
kvCli.Put(keys[i], vals[i])
}
elapsed := time.Since(startTs).Seconds()
fmt.Printf("total cost %f s\n", elapsed)

}
83 changes: 83 additions & 0 deletions cmd/kvraft/kvserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//
// MIT License

// Copyright (c) 2022 eraft dev group

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:

// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.

// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//

package main

import (
"fmt"
"net"
"os"
"os/signal"
"strconv"
"syscall"

kvsvr "github.com/eraft-io/eraft/kvserver"
pb "github.com/eraft-io/eraft/raftpb"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

func main() {
if len(os.Args) < 2 {
fmt.Println("usage: server [nodeId]")
return
}
sigs := make(chan os.Signal, 1)

signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

nodeIdStr := os.Args[1]
nodeId, err := strconv.Atoi(nodeIdStr)
if err != nil {
panic(err)
}

kvServer := kvsvr.MakeKvServer(nodeId)
lis, err := net.Listen("tcp", kvsvr.PeersMap[nodeId])
if err != nil {
fmt.Printf("failed to listen: %v", err)
return
}
fmt.Printf("server listen on: %s \n", kvsvr.PeersMap[nodeId])
s := grpc.NewServer()
pb.RegisterRaftServiceServer(s, kvServer)

sigChan := make(chan os.Signal)

signal.Notify(sigChan)

go func() {
sig := <-sigs
fmt.Println(sig)
kvServer.Rf.CloseEndsConn()
os.Exit(-1)
}()

reflection.Register(s)
err = s.Serve(lis)
if err != nil {
fmt.Printf("failed to serve: %v", err)
return
}
}
120 changes: 57 additions & 63 deletions cmd/shardcli/shardcli.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,8 @@
package main

import (
"encoding/json"
"fmt"
"os"
"os/signal"
"strconv"
"strings"

"github.com/eraft-io/eraft/shardkvserver"
)

func main() {
Expand All @@ -45,64 +39,64 @@ func main() {
"kvcli [configserver addr] insertbucketkv [gid] [bid] [key] [value]\n")
return
}
sigs := make(chan os.Signal, 1)
// sigs := make(chan os.Signal, 1)

shardKvCli := shardkvserver.MakeKvClient(os.Args[1])
// shardKvCli := shardkvserver.MakeKvClient(os.Args[1])

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan)
// sigChan := make(chan os.Signal, 1)
// signal.Notify(sigChan)

switch os.Args[2] {
case "put":
if err := shardKvCli.Put(os.Args[3], os.Args[4]); err != nil {
fmt.Println("err: " + err.Error())
return
}
case "get":
v, err := shardKvCli.Get(os.Args[3])
if err != nil {
fmt.Println("err: " + err.Error())
return
}
fmt.Println("got value: " + v)
case "getbuckets":
gid, _ := strconv.Atoi(os.Args[3])
bidsStr := os.Args[4]
bids := []int64{}
bidsStrArr := strings.Split(bidsStr, ",")
for _, bidStr := range bidsStrArr {
bid, _ := strconv.Atoi(bidStr)
bids = append(bids, int64(bid))
}
datas := shardKvCli.GetBucketDatas(gid, bids)
fmt.Println("get buckets datas: " + datas)
case "delbuckets":
gid, _ := strconv.Atoi(os.Args[3])
bidsStr := os.Args[4]
bids := []int64{}
bidsStrArr := strings.Split(bidsStr, ",")
for _, bidStr := range bidsStrArr {
bid, _ := strconv.Atoi(bidStr)
bids = append(bids, int64(bid))
}
shardKvCli.DeleteBucketDatas(gid, bids)
case "insertbucketkv":
gid, _ := strconv.Atoi(os.Args[3])
bid, _ := strconv.Atoi(os.Args[4])
bucketDatas := &shardkvserver.BucketDatasVo{}
bucketDatas.Datas = make(map[int]map[string]string)
kv := map[string]string{os.Args[5]: os.Args[6]}
bucketDatas.Datas[bid] = kv
datas, _ := json.Marshal(bucketDatas)
shardKvCli.InsertBucketDatas(gid, []int64{int64(bid)}, datas)
}
go func() {
sig := <-sigs
fmt.Println(sig)
for _, cli := range shardKvCli.GetCsClient().GetRpcClis() {
cli.CloseAllConn()
}
shardKvCli.GetRpcClient().CloseAllConn()
os.Exit(-1)
}()
// switch os.Args[2] {
// case "put":
// if err := shardKvCli.Put(os.Args[3], os.Args[4]); err != nil {
// fmt.Println("err: " + err.Error())
// return
// }
// case "get":
// v, err := shardKvCli.Get(os.Args[3])
// if err != nil {
// fmt.Println("err: " + err.Error())
// return
// }
// fmt.Println("got value: " + v)
// case "getbuckets":
// gid, _ := strconv.Atoi(os.Args[3])
// bidsStr := os.Args[4]
// bids := []int64{}
// bidsStrArr := strings.Split(bidsStr, ",")
// for _, bidStr := range bidsStrArr {
// bid, _ := strconv.Atoi(bidStr)
// bids = append(bids, int64(bid))
// }
// datas := shardKvCli.GetBucketDatas(gid, bids)
// fmt.Println("get buckets datas: " + datas)
// case "delbuckets":
// gid, _ := strconv.Atoi(os.Args[3])
// bidsStr := os.Args[4]
// bids := []int64{}
// bidsStrArr := strings.Split(bidsStr, ",")
// for _, bidStr := range bidsStrArr {
// bid, _ := strconv.Atoi(bidStr)
// bids = append(bids, int64(bid))
// }
// shardKvCli.DeleteBucketDatas(gid, bids)
// case "insertbucketkv":
// gid, _ := strconv.Atoi(os.Args[3])
// bid, _ := strconv.Atoi(os.Args[4])
// bucketDatas := &shardkvserver.BucketDatasVo{}
// bucketDatas.Datas = make(map[int]map[string]string)
// kv := map[string]string{os.Args[5]: os.Args[6]}
// bucketDatas.Datas[bid] = kv
// datas, _ := json.Marshal(bucketDatas)
// shardKvCli.InsertBucketDatas(gid, []int64{int64(bid)}, datas)
// }
// go func() {
// sig := <-sigs
// fmt.Println(sig)
// for _, cli := range shardKvCli.GetCsClient().GetRpcClis() {
// cli.CloseAllConn()
// }
// shardKvCli.GetRpcClient().CloseAllConn()
// os.Exit(-1)
// }()
}
Loading

0 comments on commit a228f67

Please sign in to comment.