Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: support to set timeout in http client #7847

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,21 @@ const (
httpsScheme = "https"
networkErrorStatus = "network error"

defaultMembersInfoUpdateInterval = time.Minute
defaultTimeout = 30 * time.Second
defaultTimeout = 30 * time.Second
)

// respHandleFunc is the function to handle the HTTP response.
type respHandleFunc func(resp *http.Response, res any) error

type clientInnerOption func(ci *clientInner)

// withInnerTimeout configures the inner client with the given timeout config.
func withInnerTimeout(dur time.Duration) clientInnerOption {
return func(c *clientInner) {
c.timeout = dur
}
}

// clientInner is the inner implementation of the PD HTTP client, which contains some fundamental fields.
// It is wrapped by the `client` struct to make sure the inner implementation won't be exposed and could
// be consistent during the copy.
Expand All @@ -63,6 +71,7 @@ type clientInner struct {
source string
tlsConf *tls.Config
cli *http.Client
timeout time.Duration

requestCounter *prometheus.CounterVec
executionDuration *prometheus.HistogramVec
Expand All @@ -71,13 +80,22 @@ type clientInner struct {
}

func newClientInner(ctx context.Context, cancel context.CancelFunc, source string) *clientInner {
return &clientInner{ctx: ctx, cancel: cancel, source: source}
return &clientInner{ctx: ctx, cancel: cancel, source: source, timeout: defaultTimeout}
}

func (ci *clientInner) clone(opts ...clientInnerOption) *clientInner {
newCi := &clientInner{ctx: ci.ctx, cancel: ci.cancel, source: ci.source, timeout: ci.timeout}
for _, op := range opts {
op(newCi)
}
newCi.init(ci.sd)
return newCi
}

func (ci *clientInner) init(sd pd.ServiceDiscovery) {
// Init the HTTP client if it's not configured.
if ci.cli == nil {
ci.cli = &http.Client{Timeout: defaultTimeout}
ci.cli = &http.Client{Timeout: ci.timeout}
if ci.tlsConf != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = ci.tlsConf
Expand Down Expand Up @@ -265,6 +283,13 @@ func WithHTTPClient(cli *http.Client) ClientOption {
}
}

// WithTimeout configures the client with the given timeout config.
func WithTimeout(dur time.Duration) ClientOption {
return func(c *client) {
c.inner.timeout = dur
}
}

// WithTLSConfig configures the client with the given TLS config.
// This option won't work if the client is configured with WithHTTPClient.
func WithTLSConfig(tlsConf *tls.Config) ClientOption {
Expand Down Expand Up @@ -352,6 +377,13 @@ func (c *client) WithBackoffer(bo *retry.Backoffer) Client {
return &newClient
}

// WithTimeout sets and returns a new client with a new `http.Client` whose timeout is the given one.
func (c *client) WithTimeout(dur time.Duration) Client {
newClient := *c
newClient.inner = c.inner.clone(withInnerTimeout(dur))
return &newClient
}
Comment on lines +380 to +385
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial purpose of introducing clientInner is to allow safe sharing of the same inner core, even with multiple Client interfaces. This ensures that common components such as service discovery are reused effectively. However, this constraint appears to be violated. What about just keeping the timeout option as an initialization parameter instead of a dynamic variable?

Copy link
Member

@JmPotato JmPotato Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we could make the timeout an upper layer's behavior rather than the HTTP client configuration. Like a Context with a timeout.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, Common components don't rely on clientInner(http.Client). So I think it's safe to clone the clientInner. However, I do not consider this parameter to be a dynamic variable because it creates a new clientInner without affecting the previous clientInner.

In fact, the reason of why I do this is because the timeout of http. Client in the store helper in TiDB is 0. However, the timeout value of http. Client in client-go is 30 seconds. Setting a default timeout for http.Client eliminates the need to set a timeout in context each time for most requests.


// WithTargetURL sets and returns a new client with the given target URL.
func (c *client) WithTargetURL(targetURL string) Client {
newClient := *c
Expand Down
3 changes: 3 additions & 0 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/http"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
Expand Down Expand Up @@ -116,6 +117,8 @@ type Client interface {
WithRespHandler(func(resp *http.Response, res any) error) Client
// WithBackoffer sets and returns a new client with the given backoffer.
WithBackoffer(*retry.Backoffer) Client
// WithTimeout sets and returns a new client with the timeout config.
WithTimeout(time.Duration) Client
// WithTargetURL sets and returns a new client with the given target URL.
WithTargetURL(string) Client
// Close gracefully closes the HTTP client.
Expand Down
6 changes: 6 additions & 0 deletions server/api/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package api

import (
"net/http"
"time"

"github.com/pingcap/failpoint"
"github.com/tikv/pd/pkg/versioninfo"
"github.com/unrolled/render"
)
Expand Down Expand Up @@ -47,5 +49,9 @@ func (h *versionHandler) GetVersion(w http.ResponseWriter, r *http.Request) {
version := &version{
Version: versioninfo.PDReleaseVersion,
}
failpoint.Inject("slowGetVersion", func(val failpoint.Value) {
//nolint:durationcheck
time.Sleep(time.Millisecond * time.Duration(val.(int)))
})
h.rd.JSON(w, http.StatusOK, version)
}
37 changes: 34 additions & 3 deletions tests/integrations/client/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package client_test

import (
"context"
"fmt"
"math"
"net/http"
"net/url"
Expand All @@ -25,6 +26,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -119,16 +121,16 @@ func (suite *httpClientTestSuite) TearDownSuite() {
}

// RunTestInTwoModes is to run test in two modes.
func (suite *httpClientTestSuite) RunTestInTwoModes(test func(mode mode, client pd.Client)) {
func (suite *httpClientTestSuite) RunTestInTwoModes(test func(mode mode, client pd.Client), opts ...pd.ClientOption) {
// Run test with specific service discovery.
cli := setupCli(suite.Require(), suite.env[specificServiceDiscovery].ctx, suite.env[specificServiceDiscovery].endpoints)
sd := cli.GetServiceDiscovery()
client := pd.NewClientWithServiceDiscovery("pd-http-client-it-grpc", sd)
client := pd.NewClientWithServiceDiscovery("pd-http-client-it-grpc", sd, opts...)
test(specificServiceDiscovery, client)
client.Close()

// Run test with default service discovery.
client = pd.NewClient("pd-http-client-it-http", suite.env[defaultServiceDiscovery].endpoints)
client = pd.NewClient("pd-http-client-it-http", suite.env[defaultServiceDiscovery].endpoints, opts...)
test(defaultServiceDiscovery, client)
client.Close()
}
Expand Down Expand Up @@ -720,6 +722,35 @@ func (suite *httpClientTestSuite) checkWithBackoffer(mode mode, client pd.Client
re.Nil(rule)
}

func (suite *httpClientTestSuite) TestWithTimeout() {
suite.RunTestInTwoModes(suite.checkWithTimeout, pd.WithTimeout(1*time.Second))
}

func (suite *httpClientTestSuite) checkWithTimeout(mode mode, client pd.Client) {
re := suite.Require()
env := suite.env[mode]

re.NoError(failpoint.Enable("github.com/tikv/pd/server/api/slowGetVersion", fmt.Sprintf("return(%d)", 700)))
ctx, cancel := context.WithTimeout(env.ctx, 500*time.Millisecond)
_, err := client.GetPDVersion(ctx)
re.ErrorIs(err, context.DeadlineExceeded)
cancel()

newClient := client.WithTimeout(500 * time.Millisecond)
_, err = newClient.GetPDVersion(env.ctx)
re.ErrorContains(err, context.DeadlineExceeded.Error())

version, err := client.GetPDVersion(env.ctx)
re.NoError(err)
re.Equal("None", version)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/api/slowGetVersion"))

re.NoError(failpoint.Enable("github.com/tikv/pd/server/api/slowGetVersion", fmt.Sprintf("return(%d)", 1200)))
_, err = client.GetPDVersion(ctx)
re.ErrorContains(err, context.DeadlineExceeded.Error())
re.NoError(failpoint.Disable("github.com/tikv/pd/server/api/slowGetVersion"))
}

func (suite *httpClientTestSuite) TestRedirectWithMetrics() {
re := suite.Require()
env := suite.env[defaultServiceDiscovery]
Expand Down
Loading