Skip to content

Commit

Permalink
add client reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
tombokombo committed Oct 28, 2024
1 parent 16488f5 commit 6b0734a
Showing 1 changed file with 42 additions and 23 deletions.
65 changes: 42 additions & 23 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"fmt"
"bytes"
"crypto/tls"
"io/ioutil"
Expand All @@ -11,6 +12,7 @@ import (
"sync"
"time"
"strings"
"io"

"gopkg.in/yaml.v2"

Expand Down Expand Up @@ -43,6 +45,7 @@ type Config map[string][]BackendConfig
type ProxyServer struct {
configPath string
config sync.Map // sync.Map for thread-safe access to the configuration
clients sync.Map
queue chan *ForwardedRequest // Capped channel acting as the request queue
workerCount int

Expand Down Expand Up @@ -118,8 +121,7 @@ func NewProxyServer(configPath string, queueSize, workerCount int) *ProxyServer
func (p *ProxyServer) loadConfig() error {
data, err := ioutil.ReadFile(p.configPath)
if err != nil {
log.Printf("Error reading config file: %v\n", err)
return err
log.Fatal("Error reading config file: %v\n", err)
}

var newConfig Config
Expand Down Expand Up @@ -192,6 +194,37 @@ func (p *ProxyServer) getBackendsForHost(host string) ([]BackendConfig, bool) {
return nil, false
}

func (p *ProxyServer) getClientForHost(hostBackend string, timeout float32) (*http.Client) {
timeoutStr := fmt.Sprintf("%f", timeout)
hostBackend = hostBackend+timeoutStr

if client, found := p.clients.Load(hostBackend); found {
log.Printf("Reuse client for host: %s", hostBackend)
return client.(*http.Client)
}
client := &http.Client{
//The timeout includes connection time, any
// redirects, and reading the response body. The timer remains
// running after Get, Head, Post, or Do return and will
// interrupt reading of the Response.Body.
Timeout: (time.Duration(timeout) + 1) * time.Second,
Transport: &http.Transport{
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
MaxIdleConnsPerHost: 10,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, // Skip certificate verification (not recommended in production)
DialContext: (&net.Dialer{
// Timeout is the maximum amount of time a dial will wait for
// a connect to complete.
Timeout: time.Duration(timeout) * time.Second,
}).DialContext,
},
}
p.clients.Store(hostBackend, client)
log.Printf("New client for host: %s", hostBackend)
return client
}

// proxyRequest forwards the request to the backend with retries based on the configuration
func (p *ProxyServer) proxyRequest(r *ForwardedRequest) {
// Increment total requests counter
Expand All @@ -209,25 +242,7 @@ func (p *ProxyServer) proxyRequest(r *ForwardedRequest) {

var lastErr error
for _, backend := range backends {
client := &http.Client{
//The timeout includes connection time, any
// redirects, and reading the response body. The timer remains
// running after Get, Head, Post, or Do return and will
// interrupt reading of the Response.Body.
Timeout: (time.Duration(backend.Timeout) + 1) * time.Second,
Transport: &http.Transport{
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
MaxIdleConnsPerHost: 10,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, // Skip certificate verification (not recommended in production)
DialContext: (&net.Dialer{
// Timeout is the maximum amount of time a dial will wait for
// a connect to complete.
Timeout: time.Duration(backend.Timeout) * time.Second,
}).DialContext,
},
}

client := p.getClientForHost(host + backend.Backend, backend.Timeout)
for i := 0; i <= backend.Retries; i++ {
req, err := http.NewRequest(r.Req.Method, backend.Backend+r.Req.URL.Path, bytes.NewReader(r.Body));
if err != nil {
Expand All @@ -237,13 +252,17 @@ func (p *ProxyServer) proxyRequest(r *ForwardedRequest) {

req.Header = r.Req.Header
resp, err := client.Do(req)
// Read the response body to ensure the connection can be reused
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
log.Printf("Failed to read %v response body: %v\n", backend.Backend +r.Req.URL.Path, err)
}

resp.Body.Close()
if err == nil && resp.StatusCode < 400 {
defer resp.Body.Close()
// Successfully forwarded request
p.totalForwarded.Inc()
return
}

lastErr = err
p.totalRetries.Inc() // Increment retries counter
time.Sleep(time.Duration(backend.Delay) * time.Second) // Small delay before retrying
Expand Down

0 comments on commit 6b0734a

Please sign in to comment.