Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reset max query time of blocking queries in client after retries #25039

Merged
merged 2 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/25039.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
client: Fixed a bug where temporary RPC errors cause the client to poll for changes more frequently thereafter
```
20 changes: 14 additions & 6 deletions client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ func (c *Client) rpc(method string, args any, reply any) error {

// If its a blocking query, allow the time specified by the request
if info, ok := args.(structs.RPCInfo); ok {
deadline = deadline.Add(info.TimeToBlock())
oldBlockTime := info.TimeToBlock()
deadline = deadline.Add(oldBlockTime)
defer info.SetTimeToBlock(oldBlockTime)
}

TRY:
Expand Down Expand Up @@ -127,9 +129,12 @@ TRY:
}

if time.Now().After(deadline) {
// Blocking queries are tricky. jitters and rpcholdtimes in multiple places can result in our server call taking longer than we wanted it to. For example:
// a block time of 5s may easily turn into the server blocking for 10s since it applies its own RPCHoldTime. If the server dies at t=7s we still want to retry
// so before we give up on blocking queries make one last attempt for an immediate answer
// Blocking queries are tricky. jitters and rpcholdtimes in multiple
// places can result in our server call taking longer than we wanted it
// to. For example: a block time of 5s may easily turn into the server
// blocking for 10s since it applies its own RPCHoldTime. If the server
// dies at t=7s we still want to retry so before we give up on blocking
// queries make one last attempt for an immediate answer
if info, ok := args.(structs.RPCInfo); ok && info.TimeToBlock() > 0 {
info.SetTimeToBlock(0)
return c.RPC(method, args, reply)
Expand All @@ -144,10 +149,13 @@ TRY:

select {
case <-timer.C:
// If we are going to retry a blocking query we need to update the time to block so it finishes by our deadline.
// If we are going to retry a blocking query we need to update the time
// to block so it finishes by our deadline.

if info, ok := args.(structs.RPCInfo); ok && info.TimeToBlock() > 0 {
newBlockTime := time.Until(deadline)
// We can get below 0 here on slow computers because we slept for jitter so at least try to get an immediate response
// We can get below 0 here on slow computers because we slept for
// jitter so at least try to get an immediate response
if newBlockTime < 0 {
newBlockTime = 0
}
Expand Down
55 changes: 55 additions & 0 deletions client/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package client
import (
"errors"
"testing"
"time"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/config"
Expand Down Expand Up @@ -191,3 +192,57 @@ func Test_resolveServer(t *testing.T) {
}

}

func TestRpc_RetryBlockTime(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test!!! 🎉

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test took 10x longer to write than the fix, unfortunately. Not having any way of controlling the behavior of the lower layers of the RPC "stack" we have is probably why we have fairly poor test coverage of the error handling paths. 😿

ci.Parallel(t)

// Timeouts have to allow for multiple passes thru the recursive c.rpc
// call. Unconfigurable internal timeouts prevent us from using a shorter
// MaxQueryTime base for this test
expectMaxQueryTime := time.Second
rpcHoldTimeout := 5 * time.Second
unblockTimeout := 7 * time.Second

srv, cleanupSrv := nomad.TestServer(t, func(c *nomad.Config) {
c.NumSchedulers = 0
c.BootstrapExpect = 3 // we intentionally don't want a leader
})
t.Cleanup(func() { cleanupSrv() })

c, cleanupC := TestClient(t, func(c *config.Config) {
c.Servers = []string{srv.GetConfig().RPCAddr.String()}
c.RPCHoldTimeout = rpcHoldTimeout
})
t.Cleanup(func() { cleanupC() })

req := structs.NodeSpecificRequest{
NodeID: c.NodeID(),
SecretID: c.secretNodeID(),
QueryOptions: structs.QueryOptions{
Region: c.Region(),
AuthToken: c.secretNodeID(),
MinQueryIndex: 10000, // some far-flung index we know won't exist yet
MaxQueryTime: expectMaxQueryTime,
},
}

resp := structs.NodeClientAllocsResponse{}
errCh := make(chan error)

go func() {
err := c.rpc("Node.GetClientAllocs", &req, &resp)
errCh <- err
}()

// wait for the blocking query to run long enough for 2 passes thru,
// including jitter
select {
case err := <-errCh:
must.NoError(t, err)
case <-time.After(unblockTimeout):
cleanupC() // force unblock
}

must.Eq(t, expectMaxQueryTime, req.MaxQueryTime,
must.Sprintf("MaxQueryTime was changed during retries but not reset"))
}