Skip to content

Commit

Permalink
Merge pull request #54 from RealFax/dev
Browse files Browse the repository at this point in the history
[Client::bugfix fixed client nil pointer]
  • Loading branch information
PotatoCloud authored Sep 7, 2023
2 parents c26630c + cd4ccf9 commit 4ca4c35
Show file tree
Hide file tree
Showing 12 changed files with 311 additions and 39 deletions.
4 changes: 3 additions & 1 deletion client/example/basic-key-value/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
"sync"
"sync/atomic"
Expand All @@ -15,7 +17,7 @@ func main() {
//"127.0.0.1:3230",
//"127.0.0.1:4230",
"127.0.0.1:5230",
}, false)
}, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
Expand Down
4 changes: 3 additions & 1 deletion client/example/distributed-lock/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"context"
"github.com/RealFax/RedQueen/client"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
"time"
)
Expand All @@ -15,7 +17,7 @@ func main() {
"127.0.0.1:3230",
"127.0.0.1:4230",
"127.0.0.1:5230",
}, false)
}, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
Expand Down
6 changes: 4 additions & 2 deletions client/r_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ func (c *internalClient) AppendCluster(ctx context.Context, serverID, peerAddr s
if err != nil {
return err
}
defer client.conn.Release()

_, err = client.AppendCluster(ctx, &serverpb.AppendClusterRequest{
_, err = client.instance.AppendCluster(ctx, &serverpb.AppendClusterRequest{
ServerId: serverID,
PeerAddr: peerAddr,
Voter: voter,
Expand All @@ -41,12 +42,13 @@ func (c *internalClient) LeaderMonitor(ctx context.Context, recv *chan bool) err
if err != nil {
return err
}
defer client.conn.Release()

var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()

monitor, err := client.LeaderMonitor(ctx, &serverpb.LeaderMonitorRequest{})
monitor, err := client.instance.LeaderMonitor(ctx, &serverpb.LeaderMonitorRequest{})
if err != nil {
return err
}
Expand Down
21 changes: 14 additions & 7 deletions client/r_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ func (c *kvClient) Set(ctx context.Context, key, value []byte, ttl uint32, names
if err != nil {
return err
}
defer client.conn.Release()

_, err = client.Set(ctx, &serverpb.SetRequest{
_, err = client.instance.Set(ctx, &serverpb.SetRequest{
Key: key,
Value: value,
Ttl: ttl,
Expand All @@ -49,8 +50,9 @@ func (c *kvClient) Get(ctx context.Context, key []byte, namespace *string) (*Val
if err != nil {
return nil, err
}
defer client.conn.Release()

resp, err := client.Get(ctx, &serverpb.GetRequest{
resp, err := client.instance.Get(ctx, &serverpb.GetRequest{
Key: key,
Namespace: namespace,
})
Expand All @@ -68,8 +70,9 @@ func (c *kvClient) PrefixScan(ctx context.Context, prefix []byte, offset, limit
if err != nil {
return nil, err
}
defer client.conn.Release()

resp, err := client.PrefixScan(ctx, &serverpb.PrefixScanRequest{
resp, err := client.instance.PrefixScan(ctx, &serverpb.PrefixScanRequest{
Prefix: prefix,
Offset: offset,
Limit: limit,
Expand Down Expand Up @@ -97,8 +100,9 @@ func (c *kvClient) TrySet(ctx context.Context, key, value []byte, ttl uint32, na
if err != nil {
return err
}
defer client.conn.Release()

_, err = client.TrySet(ctx, &serverpb.SetRequest{
_, err = client.instance.TrySet(ctx, &serverpb.SetRequest{
Key: key,
Value: value,
Ttl: ttl,
Expand All @@ -114,8 +118,9 @@ func (c *kvClient) Delete(ctx context.Context, key []byte, namespace *string) er
if err != nil {
return err
}
defer client.conn.Release()

_, err = client.Delete(ctx, &serverpb.DeleteRequest{
_, err = client.instance.Delete(ctx, &serverpb.DeleteRequest{
Key: key,
Namespace: namespace,
})
Expand All @@ -131,8 +136,9 @@ func (c *kvClient) Watch(ctx context.Context, watcher *Watcher) error {
if err != nil {
return err
}
defer client.conn.Release()

watch, err := client.Watch(ctx, &serverpb.WatchRequest{
watch, err := client.instance.Watch(ctx, &serverpb.WatchRequest{
Key: watcher.key,
IgnoreErrors: watcher.ignoreErrors,
Namespace: watcher.namespace,
Expand Down Expand Up @@ -178,8 +184,9 @@ func (c *kvClient) WatchPrefix(ctx context.Context, watcher *Watcher) error {
if err != nil {
return err
}
defer client.conn.Release()

watch, err := client.WatchPrefix(ctx, &serverpb.WatchPrefixRequest{
watch, err := client.instance.WatchPrefix(ctx, &serverpb.WatchPrefixRequest{
Prefix: watcher.key,
Namespace: watcher.namespace,
BufSize: func() *uint32 {
Expand Down
9 changes: 6 additions & 3 deletions client/r_locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ func (c *lockerClient) Lock(ctx context.Context, lockID string, ttl int32) error
if err != nil {
return err
}
defer client.conn.Release()

_, err = client.Lock(ctx, &serverpb.LockRequest{
_, err = client.instance.Lock(ctx, &serverpb.LockRequest{
LockId: lockID,
Ttl: ttl,
})
Expand All @@ -35,8 +36,9 @@ func (c *lockerClient) Unlock(ctx context.Context, lockID string) error {
if err != nil {
return err
}
defer client.conn.Release()

_, err = client.Unlock(ctx, &serverpb.UnlockRequest{
_, err = client.instance.Unlock(ctx, &serverpb.UnlockRequest{
LockId: lockID,
})
return err
Expand All @@ -47,8 +49,9 @@ func (c *lockerClient) TryLock(ctx context.Context, lockID string, ttl int32, de
if err != nil {
return err
}
defer client.conn.Release()

_, err = client.TryLock(ctx, &serverpb.TryLockRequest{
_, err = client.instance.TryLock(ctx, &serverpb.TryLockRequest{
LockId: lockID,
Ttl: ttl,
Deadline: deadline,
Expand Down
53 changes: 35 additions & 18 deletions client/t_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ import (
)

type Conn interface {
ReadOnly() (*grpc.ClientConn, error)
WriteOnly() (*grpc.ClientConn, error)
ReadOnly() (*GrpcPoolConn, error)
WriteOnly() (*GrpcPoolConn, error)
Close() error
}

type clientConn struct {
state atomic.Bool
mu sync.Mutex
ctx context.Context
writeOnly *grpc.ClientConn
readOnly map[string]*grpc.ClientConn
writeOnly *GrpcPool
readOnly map[string]*GrpcPool

endpoints []string
}
Expand All @@ -49,7 +49,8 @@ func (c *clientConn) listenLeader() {
wg := sync.WaitGroup{}
wg.Add(len(c.readOnly))

finalTry := func(conn *grpc.ClientConn) {
finalTry := func(conn *GrpcPoolConn) {
defer conn.Release()
var (
err error
monitor serverpb.RedQueen_LeaderMonitorClient
Expand Down Expand Up @@ -87,13 +88,17 @@ func (c *clientConn) listenLeader() {
}

for _, conn := range c.readOnly {
go finalTry(conn)
cc, err := conn.Alloc()
if err != nil {
panic(err)
}
go finalTry(cc)
}

wg.Wait()
}

func (c *clientConn) ReadOnly() (*grpc.ClientConn, error) {
func (c *clientConn) ReadOnly() (*GrpcPoolConn, error) {
size := len(c.readOnly)
if size == 0 {
return nil, errors.New("read-only not maintained")
Expand All @@ -104,21 +109,31 @@ func (c *clientConn) ReadOnly() (*grpc.ClientConn, error) {
round, _ = rand.Int(rand.Reader, big.NewInt(int64(size)))
)

for _, conn := range c.readOnly {
for _, pool := range c.readOnly {
step++
if step == round.Int64() {
return conn, nil
if step != round.Int64() {
continue
}

cc, err := pool.Alloc()
if err != nil {
return nil, err
}
return cc, nil
}

return nil, errors.New("unexpected")
}

func (c *clientConn) WriteOnly() (*grpc.ClientConn, error) {
func (c *clientConn) WriteOnly() (*GrpcPoolConn, error) {
if c.writeOnly == nil {
return nil, errors.New("write-only not maintained")
}
return c.writeOnly, nil
cc, err := c.writeOnly.Alloc()
if err != nil {
return nil, err
}
return cc, nil
}

func (c *clientConn) Close() error {
Expand All @@ -127,8 +142,10 @@ func (c *clientConn) Close() error {
}
c.state.Store(false)

c.writeOnly.Close()
c.writeOnly = nil
if c.writeOnly != nil {
c.writeOnly.Close()
c.writeOnly = nil
}

for key, conn := range c.readOnly {
conn.Close()
Expand All @@ -143,22 +160,22 @@ func NewClientConn(ctx context.Context, endpoints []string, opts ...grpc.DialOpt
state: atomic.Bool{},
ctx: ctx,
writeOnly: nil,
readOnly: make(map[string]*grpc.ClientConn),
readOnly: make(map[string]*GrpcPool),
endpoints: endpoints,
}
cc.state.Store(true)

var (
err error
conn *grpc.ClientConn
pool *GrpcPool
)

// init
for _, endpoint := range endpoints {
if conn, err = grpc.DialContext(ctx, endpoint, opts...); err != nil {
if pool, err = NewGrpcPool(ctx, endpoint, 16, opts...); err != nil {
return nil, err
}
cc.readOnly[endpoint] = conn
cc.readOnly[endpoint] = pool
}

// start listen
Expand Down
Loading

0 comments on commit 4ca4c35

Please sign in to comment.