diff --git a/mux_test.go b/mux_test.go index 75b87fc8..8179f068 100644 --- a/mux_test.go +++ b/mux_test.go @@ -177,7 +177,7 @@ func TestMuxReuseWire(t *testing.T) { m.Close() }) - t.Run("reuse blocking pool", func(t *testing.T) { + t.Run("reuse blocking (dpool) pool", func(t *testing.T) { blocking := make(chan struct{}) response := make(chan RedisResult) m, checkClean := setupMux([]*mockWire{ @@ -227,6 +227,158 @@ func TestMuxReuseWire(t *testing.T) { <-blocking }) + t.Run("reuse blocking (spool) pool", func(t *testing.T) { + blocking := make(chan struct{}) + response := make(chan RedisResult) + m, checkClean := setupMux([]*mockWire{ + { + // leave first wire for pipeline calls + }, + { + DoFn: func(cmd Completed) RedisResult { + return newResult(RedisMessage{typ: '+', string: "ACQUIRED"}, nil) + }, + }, + { + DoFn: func(cmd Completed) RedisResult { + blocking <- struct{}{} + return <-response + }, + }, + }) + m.usePool = true // switch to spool + defer checkClean(t) + defer m.Close() + if err := m.Dial(); err != nil { + t.Fatalf("unexpected dial error %v", err) + } + + wire1 := m.spool.Acquire() + + go func() { + // this should use the second wire + if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).ToString(); err != nil { + t.Errorf("unexpected error %v", err) + } else if val != "BLOCK_RESPONSE" { + t.Errorf("unexpected response %v", val) + } + close(blocking) + }() + <-blocking + + m.spool.Store(wire1) + // this should use the first wire + if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).ToString(); err != nil { + t.Fatalf("unexpected error %v", err) + } else if val != "ACQUIRED" { + t.Fatalf("unexpected response %v", val) + } + + response <- newResult(RedisMessage{typ: '+', string: "BLOCK_RESPONSE"}, nil) + <-blocking + }) + + t.Run("reuse blocking (dpool) pool DoMulti", func(t *testing.T) { + blocking := make(chan struct{}) + response := make(chan RedisResult) + m, checkClean := setupMux([]*mockWire{ + { + // leave first wire for pipeline calls + }, + { + DoMultiFn: func(cmd ...Completed) *redisresults { + return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '+', string: "ACQUIRED"}, nil)}} + }, + }, + { + DoMultiFn: func(cmd ...Completed) *redisresults { + blocking <- struct{}{} + return &redisresults{s: []RedisResult{<-response}} + }, + }, + }) + m.usePool = true // switch to spool + defer checkClean(t) + defer m.Close() + if err := m.Dial(); err != nil { + t.Fatalf("unexpected dial error %v", err) + } + + wire1 := m.spool.Acquire() + + go func() { + // this should use the second wire + if val, err := m.DoMulti(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).s[0].ToString(); err != nil { + t.Errorf("unexpected error %v", err) + } else if val != "BLOCK_RESPONSE" { + t.Errorf("unexpected response %v", val) + } + close(blocking) + }() + <-blocking + + m.spool.Store(wire1) + // this should use the first wire + if val, err := m.DoMulti(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).s[0].ToString(); err != nil { + t.Fatalf("unexpected error %v", err) + } else if val != "ACQUIRED" { + t.Fatalf("unexpected response %v", val) + } + + response <- newResult(RedisMessage{typ: '+', string: "BLOCK_RESPONSE"}, nil) + <-blocking + }) + + t.Run("reuse blocking (spool) pool DoMulti", func(t *testing.T) { + blocking := make(chan struct{}) + response := make(chan RedisResult) + m, checkClean := setupMux([]*mockWire{ + { + // leave first wire for pipeline calls + }, + { + DoMultiFn: func(cmd ...Completed) *redisresults { + return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '+', string: "ACQUIRED"}, nil)}} + }, + }, + { + DoMultiFn: func(cmd ...Completed) *redisresults { + blocking <- struct{}{} + return &redisresults{s: []RedisResult{<-response}} + }, + }, + }) + defer checkClean(t) + defer m.Close() + if err := m.Dial(); err != nil { + t.Fatalf("unexpected dial error %v", err) + } + + wire1 := m.dpool.Acquire() + + go func() { + // this should use the second wire + if val, err := m.DoMulti(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).s[0].ToString(); err != nil { + t.Errorf("unexpected error %v", err) + } else if val != "BLOCK_RESPONSE" { + t.Errorf("unexpected response %v", val) + } + close(blocking) + }() + <-blocking + + m.dpool.Store(wire1) + // this should use the first wire + if val, err := m.DoMulti(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).s[0].ToString(); err != nil { + t.Fatalf("unexpected error %v", err) + } else if val != "ACQUIRED" { + t.Fatalf("unexpected response %v", val) + } + + response <- newResult(RedisMessage{typ: '+', string: "BLOCK_RESPONSE"}, nil) + <-blocking + }) + t.Run("unsubscribe blocking pool", func(t *testing.T) { cleaned := false m, checkClean := setupMux([]*mockWire{