Skip to content

Commit

Permalink
Add more config for HTTP connections
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Jan 9, 2025
1 parent 817dcee commit d92a404
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 19 deletions.
4 changes: 4 additions & 0 deletions assets/docs/configuration/targets/http-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ target {
# Optional path to the file containing template which is used to build HTTP request based on a batch of input data
template_file = "myTemplate.file"

max_connections = 100
max_idle_connections = 100
max_idle_connections_per_host = 100

# Optional HTTP response rules which are used to match HTTP response code/body and categorize it as either invalid data or target setup error.
# For example, we can have 2 invalid + 1 setup error rules:
response_rules {
Expand Down
7 changes: 7 additions & 0 deletions config/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func TestCreateTargetComponentHCL(t *testing.T) {
KeyFile: "",
CaFile: "",
SkipVerifyTLS: false,
MaxConnections: 0,
MaxIdleConnections: 100,
MaxIdleConnectionsPerHost: 100,
ResponseRules: &target.ResponseRules{
Invalid: []target.Rule{},
SetupError: []target.Rule{},
Expand Down Expand Up @@ -118,6 +121,10 @@ func TestCreateTargetComponentHCL(t *testing.T) {
SkipVerifyTLS: true,
DynamicHeaders: true,
TemplateFile: "myTemplate.file",

MaxConnections: 100,
MaxIdleConnections: 100,
MaxIdleConnectionsPerHost: 100,
ResponseRules: &target.ResponseRules{
Invalid: []target.Rule{
{
Expand Down
28 changes: 26 additions & 2 deletions pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type HTTPTargetConfig struct {

TemplateFile string `hcl:"template_file,optional"`
ResponseRules *ResponseRules `hcl:"response_rules,block"`

MaxConnections int `hcl:"max_connections,optional"`
MaxIdleConnections int `hcl:"max_idle_connections,optional"`
MaxIdleConnectionsPerHost int `hcl:"max_idle_connections_per_host,optional"`
}

// ResponseRules is part of HTTP target configuration. It provides rules how HTTP respones should be handled. Response can be categerized as 'invalid' (bad data), as setup error or (if none of the rules matches) as a transient error.
Expand Down Expand Up @@ -99,6 +103,10 @@ type HTTPTarget struct {
requestTemplate *template.Template
approxTmplSize int
responseRules *ResponseRules

maxConnections int
maxIdleConnections int
maxIdleConnectionsPerHost int
}

func checkURL(str string) error {
Expand Down Expand Up @@ -159,7 +167,11 @@ func newHTTPTarget(
oAuth2RefreshToken string,
oAuth2TokenURL string,
templateFile string,
responseRules *ResponseRules) (*HTTPTarget, error) {
responseRules *ResponseRules,
maxConnections int,
maxIdleConnections int,
maxIdleConnectionsPerHost int,
) (*HTTPTarget, error) {
err := checkURL(httpURL)
if err != nil {
return nil, err
Expand All @@ -169,7 +181,9 @@ func newHTTPTarget(
return nil, err1
}
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConnsPerHost = transport.MaxIdleConns
transport.MaxConnsPerHost = maxConnections
transport.MaxIdleConns = maxIdleConnections
transport.MaxIdleConnsPerHost = maxIdleConnectionsPerHost

tlsConfig, err2 := common.CreateTLSConfiguration(certFile, keyFile, caFile, skipVerifyTLS)
if err2 != nil {
Expand Down Expand Up @@ -208,6 +222,10 @@ func newHTTPTarget(
requestTemplate: requestTemplate,
approxTmplSize: approxTmplSize,
responseRules: responseRules,

maxConnections: maxConnections,
maxIdleConnections: maxIdleConnections,
maxIdleConnectionsPerHost: maxIdleConnectionsPerHost,
}, nil
}

Expand Down Expand Up @@ -290,6 +308,9 @@ func HTTPTargetConfigFunction(c *HTTPTargetConfig) (*HTTPTarget, error) {
c.OAuth2TokenURL,
c.TemplateFile,
c.ResponseRules,
c.MaxConnections,
c.MaxIdleConnections,
c.MaxIdleConnectionsPerHost,
)
}

Expand Down Expand Up @@ -318,6 +339,9 @@ func (f HTTPTargetAdapter) ProvideDefault() (interface{}, error) {
Invalid: []Rule{},
SetupError: []Rule{},
},
MaxConnections: 0, // no limit
MaxIdleConnections: 100,
MaxIdleConnectionsPerHost: 100,
}

return cfg, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/target/http_oauth2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func runTest(t *testing.T, inputClientID string, inputClientSecret string, input
}

func oauth2Target(t *testing.T, targetURL string, inputClientID string, inputClientSecret string, inputRefreshToken string, tokenServerURL string) *HTTPTarget {
target, err := newHTTPTarget(targetURL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, inputClientID, inputClientSecret, inputRefreshToken, tokenServerURL, "", defaultResponseRules())
target, err := newHTTPTarget(targetURL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, inputClientID, inputClientSecret, inputRefreshToken, tokenServerURL, "", defaultResponseRules(), 0, 100, 100)
if err != nil {
t.Fatal(err)
}
Expand Down
32 changes: 16 additions & 16 deletions pkg/target/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,20 +308,20 @@ func TestHTTP_AddHeadersToRequest_WithDynamicHeaders(t *testing.T) {
func TestHTTP_NewHTTPTarget(t *testing.T) {
assert := assert.New(t)

httpTarget, err := newHTTPTarget("http://something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
httpTarget, err := newHTTPTarget("http://something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), 0, 100, 100)

assert.Nil(err)
assert.NotNil(httpTarget)

failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), 0, 100, 100)

assert.NotNil(err1)
if err1 != nil {
assert.Equal("Invalid url for HTTP target: 'something'", err1.Error())
}
assert.Nil(failedHTTPTarget)

failedHTTPTarget2, err2 := newHTTPTarget("", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
failedHTTPTarget2, err2 := newHTTPTarget("", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), 0, 100, 100)
assert.NotNil(err2)
if err2 != nil {
assert.Equal("Invalid url for HTTP target: ''", err2.Error())
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestHTTP_Write_Simple(t *testing.T) {
server := createTestServerWithResponseCode(&results, tt.ResponseCode, "")
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), 0, 100, 100)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -413,7 +413,7 @@ func TestHTTP_Write_Batched(t *testing.T) {
server := createTestServerWithResponseCode(&results, 200, "")
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, tt.BatchSize, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, tt.BatchSize, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), 0, 100, 100)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestHTTP_Write_Concurrent(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), 0, 100, 100)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -516,7 +516,7 @@ func TestHTTP_Write_Failure(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), 0, 100, 100)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -558,7 +558,7 @@ func TestHTTP_Write_InvalidResponseCode(t *testing.T) {
var results [][]byte
server := createTestServerWithResponseCode(&results, tt.ResponseCode, "")
defer server.Close()
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), 0, 100, 100)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -593,7 +593,7 @@ func TestHTTP_Write_Oversized(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules(), 0, 100, 100)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -636,7 +636,7 @@ func TestHTTP_Write_EnabledTemplating(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), defaultResponseRules(), 0, 100, 100)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -694,7 +694,7 @@ func TestHTTP_Write_Invalid(t *testing.T) {
},
}

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules)
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules, 0, 100, 100)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -722,7 +722,7 @@ func TestHTTP_Write_Setup(t *testing.T) {
},
}

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules)
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules, 0, 100, 100)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -768,7 +768,7 @@ func TestHTTP_Write_TLS(t *testing.T) {
"",
"",
"",
defaultResponseRules())
defaultResponseRules(), 0, 100, 100)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -810,7 +810,7 @@ func TestHTTP_Write_TLS(t *testing.T) {
"",
"",
"",
defaultResponseRules())
defaultResponseRules(), 0, 100, 100)
if err2 != nil {
t.Fatal(err2)
}
Expand Down Expand Up @@ -845,7 +845,7 @@ func TestHTTP_Write_TLS(t *testing.T) {
"",
"",
"",
defaultResponseRules())
defaultResponseRules(), 0, 100, 100)
if err4 != nil {
t.Fatal(err4)
}
Expand Down Expand Up @@ -1000,7 +1000,7 @@ func TestHTTP_Write_GroupedRequests(t *testing.T) {
defer server.Close()

//dynamicHeaders enabled
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, true, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, true, "", "", "", "", "", defaultResponseRules(), 0, 100, 100)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit d92a404

Please sign in to comment.