Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MaxFlushDelay option #158

Merged
merged 2 commits into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type pipe struct {
info map[string]RedisMessage
timeout time.Duration
pinggap time.Duration
maxFlushDelay time.Duration
once sync.Once
r2mu sync.Mutex
version int32
Expand Down Expand Up @@ -87,8 +88,9 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps bool)
ssubs: newSubs(),
close: make(chan struct{}),

timeout: option.ConnWriteTimeout,
pinggap: option.Dialer.KeepAlive,
timeout: option.ConnWriteTimeout,
pinggap: option.Dialer.KeepAlive,
maxFlushDelay: option.MaxFlushDelay,

r2ps: r2ps,
}
Expand Down Expand Up @@ -291,7 +293,17 @@ func (p *pipe) _backgroundWrite() (err error) {
if p.w.Buffered() == 0 {
err = p.Error()
} else {
err = p.w.Flush()
if p.maxFlushDelay == 0 {
err = p.w.Flush()
} else {
if atomic.LoadInt32(&p.waits) == 1 {
err = p.w.Flush()
} else {
ts := time.Now()
err = p.w.Flush()
time.Sleep(p.maxFlushDelay - time.Since(ts))
}
}
}
if err == nil {
if atomic.LoadInt32(&p.state) == 1 {
Expand Down
20 changes: 20 additions & 0 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,26 @@ func TestWriteSinglePipelineFlush(t *testing.T) {
}
}

func TestWriteWithMaxFlushDelay(t *testing.T) {
p, mock, cancel, _ := setup(t, ClientOption{
AlwaysPipelining: true,
MaxFlushDelay: 20 * time.Microsecond,
})
defer cancel()
times := 2000
wg := sync.WaitGroup{}
wg.Add(times)

for i := 0; i < times; i++ {
go func() {
ExpectOK(t, p.Do(context.Background(), cmds.NewCompleted([]string{"PING"})))
}()
}
for i := 0; i < times; i++ {
mock.Expect("PING").ReplyString("OK")
}
}

func TestWriteMultiFlush(t *testing.T) {
p, mock, cancel, _ := setup(t, ClientOption{})
defer cancel()
Expand Down
8 changes: 8 additions & 0 deletions rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ type ClientOption struct {
DisableCache bool
// AlwaysPipelining makes rueidis.Client always pipeline redis commands even if they are not issued concurrently.
AlwaysPipelining bool
// MaxFlushDelay when greater than zero pauses pipeline write loop for some time (not larger than MaxFlushDelay)
// after each flushing of data to the connection. This gives pipeline a chance to collect more commands to send
// to Redis. Adding this delay increases latency, reduces throughput – but in most cases may significantly reduce
// application and Redis CPU utilization due to less executed system calls. By default, Rueidis flushes data to the
// connection without extra delays. Depending on network latency and application-specific conditions the value
// of MaxFlushDelay may vary, sth like 20 microseconds should not affect latency/throughput a lot but still
// produce notable CPU usage reduction under load.
MaxFlushDelay time.Duration
}

// SentinelOption contains MasterSet,
Expand Down