Skip to content

Commit

Permalink
Merge pull request #378 from unknowntpo/main
Browse files Browse the repository at this point in the history
feat: avoid using QUIT command
  • Loading branch information
rueian authored Sep 29, 2023
2 parents 4691df7 + 11398f1 commit 08096d8
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 38 deletions.
5 changes: 1 addition & 4 deletions internal/cmds/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ var (
RoleCmd = Completed{
cs: newCommandSlice([]string{"ROLE"}),
}
// QuitCmd is predefined QUIT
QuitCmd = Completed{
cs: newCommandSlice([]string{"QUIT"}),
}

// UnsubscribeCmd is predefined UNSUBSCRIBE
UnsubscribeCmd = Completed{
cs: newCommandSlice([]string{"UNSUBSCRIBE"}),
Expand Down
2 changes: 1 addition & 1 deletion mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestNewMux(t *testing.T) {
ReplyString("OK")
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LIB_NAME, "LIB-VER", LIB_VER).
ReplyError("UNKNOWN COMMAND")
mock.Expect("QUIT").ReplyString("OK")
mock.Expect("PING").ReplyString("OK")
mock.Close()
}()
m := makeMux("", &ClientOption{}, func(dst string, opt *ClientOption) (net.Conn, error) {
Expand Down
6 changes: 3 additions & 3 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (p *pipe) _background() {
atomic.CompareAndSwapInt32(&p.state, 2, 3) // make write goroutine to exit
atomic.AddInt32(&p.waits, 1)
go func() {
<-p.queue.PutOne(cmds.QuitCmd)
<-p.queue.PutOne(cmds.PingCmd)
atomic.AddInt32(&p.waits, -1)
}()
}
Expand Down Expand Up @@ -421,7 +421,7 @@ func (p *pipe) _backgroundWrite() (err error) {
err = writeCmd(p.w, cmd.Commands())
}
if err != nil {
if err != ErrClosing { // ignore ErrClosing to allow final QUIT command to be sent
if err != ErrClosing { // ignore ErrClosing to allow final PING command to be sent
return
}
runtime.Gosched()
Expand Down Expand Up @@ -1304,7 +1304,7 @@ func (p *pipe) Close() {
p.background()
}
if block == 1 && (stopping1 || stopping2) { // make sure there is no block cmd
<-p.queue.PutOne(cmds.QuitCmd)
<-p.queue.PutOne(cmds.PingCmd)
}
}
atomic.AddInt32(&p.waits, -1)
Expand Down
41 changes: 19 additions & 22 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func setup(t *testing.T, option ClientOption) (*pipe, *redisMock, func(), func()
t.Fatalf("pipe setup failed, unexpected version: %v", p.Version())
}
return p, mock, func() {
go func() { mock.Expect("QUIT").ReplyString("OK") }()
go func() { mock.Expect("PING").ReplyString("OK") }()
p.Close()
mock.Close()
for atomic.LoadInt32(&p.state) != 4 {
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestNewPipe(t *testing.T) {
if err != nil {
t.Fatalf("pipe setup failed: %v", err)
}
go func() { mock.Expect("QUIT").ReplyString("OK") }()
go func() { mock.Expect("PING").ReplyString("OK") }()
p.Close()
mock.Close()
n1.Close()
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestNewPipe(t *testing.T) {
if err != nil {
t.Fatalf("pipe setup failed: %v", err)
}
go func() { mock.Expect("QUIT").ReplyString("OK") }()
go func() { mock.Expect("PING").ReplyString("OK") }()
p.Close()
mock.Close()
n1.Close()
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestNewPipe(t *testing.T) {
if err != nil {
t.Fatalf("pipe setup failed: %v", err)
}
go func() { mock.Expect("QUIT").ReplyString("OK") }()
go func() { mock.Expect("PING").ReplyString("OK") }()
p.Close()
mock.Close()
n1.Close()
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestNewPipe(t *testing.T) {
if err != nil {
t.Fatalf("pipe setup failed: %v", err)
}
go func() { mock.Expect("QUIT").ReplyString("OK") }()
go func() { mock.Expect("PING").ReplyString("OK") }()
p.Close()
mock.Close()
n1.Close()
Expand Down Expand Up @@ -356,7 +356,7 @@ func TestNewPipe(t *testing.T) {
if err != nil {
t.Fatalf("pipe setup failed: %v", err)
}
go func() { mock.Expect("QUIT").ReplyString("OK") }()
go func() { mock.Expect("PING").ReplyString("OK") }()
p.Close()
mock.Close()
n1.Close()
Expand Down Expand Up @@ -393,7 +393,7 @@ func TestNewPipe(t *testing.T) {
if err != nil {
t.Fatalf("pipe setup failed: %v", err)
}
go func() { mock.Expect("QUIT").ReplyString("OK") }()
go func() { mock.Expect("PING").ReplyString("OK") }()
p.Close()
mock.Close()
n1.Close()
Expand Down Expand Up @@ -430,7 +430,7 @@ func TestNewPipe(t *testing.T) {
if err != nil {
t.Fatalf("pipe setup failed: %v", err)
}
go func() { mock.Expect("QUIT").ReplyString("OK") }()
go func() { mock.Expect("PING").ReplyString("OK") }()
p.Close()
mock.Close()
n1.Close()
Expand All @@ -447,7 +447,7 @@ func TestNewPipe(t *testing.T) {
t.Run("Auth Credentials Function Error", func(t *testing.T) {
n1, n2 := net.Pipe()
mock := &redisMock{buf: bufio.NewReader(n2), conn: n2, t: t}
go func() { mock.Expect("QUIT").ReplyString("OK") }()
go func() { mock.Expect("PING").ReplyString("OK") }()
_, err := newPipe(func() (net.Conn, error) { return n1, nil }, &ClientOption{
SelectDB: 1,
AuthCredentialsFn: func(context AuthCredentialsContext) (AuthCredentials, error) {
Expand Down Expand Up @@ -476,7 +476,7 @@ func TestNewRESP2Pipe(t *testing.T) {
ReplyError("ERR unknown subcommand or wrong number of arguments for 'TRACKING'. Try CLIENT HELP")
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LIB_NAME, "LIB-VER", LIB_VER).
ReplyError("UNKNOWN COMMAND")
mock.Expect("QUIT").ReplyString("OK")
mock.Expect("PING").ReplyString("OK")
}()
if _, err := newPipe(func() (net.Conn, error) { return n1, nil }, &ClientOption{}); !errors.Is(err, ErrNoCache) {
t.Fatalf("unexpected err: %v", err)
Expand All @@ -495,7 +495,7 @@ func TestNewRESP2Pipe(t *testing.T) {
ReplyString("OK")
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LIB_NAME, "LIB-VER", LIB_VER).
ReplyError("UNKNOWN COMMAND")
mock.Expect("QUIT").ReplyString("OK")
mock.Expect("PING").ReplyString("OK")
}()
if _, err := newPipe(func() (net.Conn, error) { return n1, nil }, &ClientOption{}); !errors.Is(err, ErrNoCache) {
t.Fatalf("unexpected err: %v", err)
Expand Down Expand Up @@ -527,7 +527,7 @@ func TestNewRESP2Pipe(t *testing.T) {
if p.version >= 6 {
t.Fatalf("unexpected p.version: %v", p.version)
}
go func() { mock.Expect("QUIT").ReplyString("OK") }()
go func() { mock.Expect("PING").ReplyString("OK") }()
p.Close()
mock.Close()
n1.Close()
Expand Down Expand Up @@ -564,7 +564,7 @@ func TestNewRESP2Pipe(t *testing.T) {
if p.version >= 6 {
t.Fatalf("unexpected p.version: %v", p.version)
}
go func() { mock.Expect("QUIT").ReplyString("OK") }()
go func() { mock.Expect("PING").ReplyString("OK") }()
p.Close()
mock.Close()
n1.Close()
Expand Down Expand Up @@ -602,7 +602,7 @@ func TestNewRESP2Pipe(t *testing.T) {
if p.version >= 6 {
t.Fatalf("unexpected p.version: %v", p.version)
}
go func() { mock.Expect("QUIT").ReplyString("OK") }()
go func() { mock.Expect("PING").ReplyString("OK") }()
p.Close()
mock.Close()
n1.Close()
Expand Down Expand Up @@ -644,7 +644,7 @@ func TestNewRESP2Pipe(t *testing.T) {
if p.version >= 6 {
t.Fatalf("unexpected p.version: %v", p.version)
}
go func() { mock.Expect("QUIT").ReplyString("OK") }()
go func() { mock.Expect("PING").ReplyString("OK") }()
p.Close()
mock.Close()
n1.Close()
Expand Down Expand Up @@ -686,7 +686,7 @@ func TestNewRESP2Pipe(t *testing.T) {
if p.version >= 6 {
t.Fatalf("unexpected p.version: %v", p.version)
}
go func() { mock.Expect("QUIT").ReplyString("OK") }()
go func() { mock.Expect("PING").ReplyString("OK") }()
p.Close()
mock.Close()
n1.Close()
Expand Down Expand Up @@ -2054,7 +2054,6 @@ func TestDisableClientSideCaching(t *testing.T) {
Expect("GET", "c").
ReplyString("2").
ReplyString("3")

}()

v, _ := p.DoCache(context.Background(), Cacheable(cmds.NewCompleted([]string{"GET", "a"})), 10*time.Second).ToMessage()
Expand Down Expand Up @@ -2616,7 +2615,7 @@ func TestPubSub(t *testing.T) {
})

t.Run("PubSub Unexpected Subscribe", func(t *testing.T) {
var shouldPanic = func(push string) (pass bool) {
shouldPanic := func(push string) (pass bool) {
defer func() { pass = recover() == protocolbug }()

p, mock, _, _ := setup(t, ClientOption{})
Expand Down Expand Up @@ -2646,7 +2645,7 @@ func TestPubSub(t *testing.T) {
})

t.Run("PubSub MULTI/EXEC Subscribe", func(t *testing.T) {
var shouldPanic = func(cmd Completed) (pass bool) {
shouldPanic := func(cmd Completed) (pass bool) {
defer func() { pass = recover() == multiexecsub }()

p, mock, _, _ := setup(t, ClientOption{})
Expand Down Expand Up @@ -3141,7 +3140,7 @@ func TestCloseAndWaitPendingCMDs(t *testing.T) {
}
r.ReplyString("b")
}
mock.Expect("QUIT").ReplyString("OK")
mock.Expect("PING").ReplyString("OK")
mock.Close()
wg.Wait()
}
Expand Down Expand Up @@ -3692,7 +3691,6 @@ func TestBlockingCommandNoDeadline(t *testing.T) {
close(wait)
time.Sleep(2 * timeout)
mock.Expect("READ").Expect("BLOCK").ReplyString("OK").ReplyString("READ").ReplyString("OK")

}()
<-wait
if val, err := p.DoMulti(context.Background(),
Expand Down Expand Up @@ -3805,5 +3803,4 @@ func TestNoHelloRegex(t *testing.T) {
}
})
}

}
8 changes: 4 additions & 4 deletions ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ func TestRing(t *testing.T) {
if one, _, ch := ring.NextWriteCmd(); ch == nil {
go func() {
time.Sleep(time.Millisecond * 100)
ring.PutOne(cmds.QuitCmd)
ring.PutOne(cmds.PingCmd)
}()
if one, _, ch = ring.WaitForWrite(); ch != nil && one.Commands()[0] == cmds.QuitCmd.Commands()[0] {
if one, _, ch = ring.WaitForWrite(); ch != nil && one.Commands()[0] == cmds.PingCmd.Commands()[0] {
return
}
}
Expand All @@ -131,9 +131,9 @@ func TestRing(t *testing.T) {
if _, multi, ch := ring.NextWriteCmd(); ch == nil {
go func() {
time.Sleep(time.Millisecond * 100)
ring.PutMulti([]Completed{cmds.QuitCmd}, nil)
ring.PutMulti([]Completed{cmds.PingCmd}, nil)
}()
if _, multi, ch = ring.WaitForWrite(); ch != nil && multi[0].Commands()[0] == cmds.QuitCmd.Commands()[0] {
if _, multi, ch = ring.WaitForWrite(); ch != nil && multi[0].Commands()[0] == cmds.PingCmd.Commands()[0] {
return
}
}
Expand Down
8 changes: 4 additions & 4 deletions rueidis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestNewClusterClientError(t *testing.T) {
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LIB_NAME, "LIB-VER", LIB_VER).
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLUSTER", "SLOTS").Reply(RedisMessage{typ: '-', string: "other error"})
mock.Expect("QUIT").ReplyString("OK")
mock.Expect("PING").ReplyString("OK")
mock.Close()
close(done)
}()
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestFallBackSingleClient(t *testing.T) {
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LIB_NAME, "LIB-VER", LIB_VER).
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLUSTER", "SLOTS").Reply(RedisMessage{typ: '-', string: "ERR This instance has cluster support disabled"})
mock.Expect("QUIT").ReplyString("OK")
mock.Expect("PING").ReplyString("OK")
mock.Close()
close(done)
}()
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestForceSingleClient(t *testing.T) {
}
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LIB_NAME, "LIB-VER", LIB_VER).
ReplyError("UNKNOWN COMMAND")
mock.Expect("QUIT").ReplyString("OK")
mock.Expect("PING").ReplyString("OK")
mock.Close()
close(done)
}()
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestTLSClient(t *testing.T) {
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LIB_NAME, "LIB-VER", LIB_VER).
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLUSTER", "SLOTS").Reply(RedisMessage{typ: '-', string: "ERR This instance has cluster support disabled"})
mock.Expect("QUIT").ReplyString("OK")
mock.Expect("PING").ReplyString("OK")
mock.Close()
close(done)
}()
Expand Down

0 comments on commit 08096d8

Please sign in to comment.