-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
406 lines (358 loc) · 14.5 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
package main
import (
"flag"
"fmt"
"bytes"
"crypto/tls"
"io/ioutil"
"net"
"net/http"
"os"
"syscall"
"os/signal"
"sync"
"time"
"strings"
"io"
"gopkg.in/yaml.v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/jessevdk/go-flags"
"k8s.io/klog/v2"
)
type Opts struct {
QueueSize int `long:"queue_size" short:"q" env:"QUEUE_SIZE" description:"buffer queue size/num of buffered requests" default:"100"`
WorkerCount int `long:"worker_count" short:"w" env:"WORKER_COUNT" description:"count of worker goroutines to spawn" default:"5"`
ListenAddress string `long:"listen" short:"l" env:"LISTEN_ADDRESS" description:"listen bind address" default:":8080"`
MetricsAddress string `long:"metrics_address" short:"m" env:"METRICS_ADDRESS" description:"metrics bind address" default:":9091"`
ConfigPath string `long:"config_path" short:"c" env:"CONFIG_PATH" description:"path to backend map file in yaml format" default:"/etc/backends.yaml"`
LogLevel int `long:"loglevel" short:"v" env:"LOG_LEVEL" description:"log verbosity level for klog" default:"2"`
TerminateFile string `long:"terminate_file_path" short:"t" env:"TERMINATE_FILE_PATH" description:"if file exists will terminate" default:""`
}
// BackendConfig represents the configuration for each backend
type BackendConfig struct {
Backend string `yaml:"backend"`
Retries int `yaml:"retries"`
Delay float32 `yaml:"delay`
Timeout float32 `yaml:"timeout"` // Timeout in seconds
}
// Config represents the YAML structure mapping front-facing hosts to their backends
type Config map[string][]BackendConfig
// ProxyServer handles the proxying logic
type ProxyServer struct {
configPath string
config sync.Map // sync.Map for thread-safe access to the configuration
clients sync.Map
queue chan *ForwardedRequest // Capped channel acting as the request queue
workerCount int
// Prometheus metrics
totalRequests *prometheus.CounterVec
totalForwarded *prometheus.CounterVec
totalRetries *prometheus.CounterVec
totalFailed *prometheus.CounterVec
totalDropped prometheus.Counter
totalFailedBodyRead prometheus.Counter
usedQueueLength prometheus.Gauge
}
type ForwardedRequest struct {
Req *http.Request
Body []byte
}
// NewProxyServer initializes a new ProxyServer with Prometheus metrics and worker goroutines
func NewProxyServer(configPath string, queueSize, workerCount int, terminateFile string) *ProxyServer {
ps := &ProxyServer{
configPath: configPath,
queue: make(chan *ForwardedRequest, queueSize), // Capped channel
workerCount: workerCount,
// Initialize Prometheus metrics
totalRequests: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "pxfd_http_proxy_requests_total",
Help: "Total number of incoming requests",
},
[]string{"host"},
),
totalForwarded: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "pxfd_http_proxy_forwarded_total",
Help: "Total number of successfully forwarded requests",
},
[]string{"host", "backend"},
),
totalRetries: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "pxfd_http_proxy_retries_total",
Help: "Total number of retries",
},
[]string{"host", "backend"},
),
totalFailed: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "pxfd_http_proxy_failed_total",
Help: "Total number of failed requests",
},
[]string{"host"},
),
totalDropped: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pxfd_http_proxy_queue_full_dropped_total",
Help: "Total number of dropped requests",
}),
totalFailedBodyRead: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pxfd_http_proxy_failed_body_read_total",
Help: "Total number of requests with failed body reads",
}),
usedQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "pxfd_http_proxy_queue_length",
Help: "Current length of the request queue",
}),
}
// Register Prometheus metrics
prometheus.MustRegister(ps.totalRequests, ps.totalForwarded, ps.totalRetries, ps.totalFailed, ps.totalDropped, ps.totalFailedBodyRead, ps.usedQueueLength)
ps.loadConfig() // Load config initially
go ps.reloadConfigPeriodically() // Start goroutine to reload config every 30 seconds
if len(terminateFile) > 0 {
go ps.checkTerminateFile(terminateFile) // Start goroutine to reload config every 30 seconds
}
go ps.updateQueueLengthPeriodically() // Start goroutine to update queue length metrics
// Start worker goroutines
for i := 0; i < ps.workerCount; i++ {
go ps.worker(i)
}
signalCh := make(chan os.Signal)
signal.Notify(signalCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
go func() {
for sig := range signalCh {
switch sig {
case os.Interrupt:
klog.V(2).Infof("Got interrput signal")
case syscall.SIGTERM:
klog.V(2).Infof("Got SIGTERM signal")
// deliver rest of queue to FINAL destination
case syscall.SIGQUIT:
klog.V(2).Infof("Got SIGQUIT signal")
// deliver rest of queue to FINAL destination
default:
klog.V(2).Infof("Some signal received: %v", sig)
}
klog.V(1).Infof("Queue len is: %v", ps.getQueueLen())
}
}()
return ps
}
// loadConfig loads the configuration from the YAML file
func (p *ProxyServer) getQueueLen() int {
return len(p.queue)
}
// loadConfig loads the configuration from the YAML file
func (p *ProxyServer) loadConfig() error {
data, err := ioutil.ReadFile(p.configPath)
if err != nil {
klog.Fatal("Error reading config file: %v", err)
}
var newConfig Config
if err := yaml.Unmarshal(data, &newConfig); err != nil {
klog.V(1).Infof("Error parsing config file: %v", err)
return err
}
// Update sync.Map with new config
for host, backends := range newConfig {
p.config.Store(host, backends)
}
// Clear old hosts in sync.Map
p.config.Range(func(key, value interface{}) bool {
found := false
for host, _ := range newConfig {
if host == key {
found = true
}
}
if !found {
klog.V(2).Infof("Deleting old host %v", key)
p.config.Delete(key)
}
return true
})
klog.V(4).Infof("Configuration reloaded successfully")
return nil
}
// reloadConfigPeriodically reloads the config from the file every 30 seconds
func (p *ProxyServer) reloadConfigPeriodically() {
for {
time.Sleep(30 * time.Second)
if err := p.loadConfig(); err != nil {
klog.V(1).Infof("Failed to reload config:", err)
}
}
}
func (p *ProxyServer) checkTerminateFile(terminateFile string) {
for {
if _, err := os.Stat(terminateFile); err == nil {
klog.V(1).Infof("File exists %s - terminating",terminateFile)
os.Exit(0)
}
time.Sleep(5 * time.Second)
}
}
// updateQueueLengthPeriodically updates the queue length every 10 seconds
func (p *ProxyServer) updateQueueLengthPeriodically() {
for {
p.usedQueueLength.Set(float64(len(p.queue))) // Update queue length gauge
time.Sleep(10 * time.Second)
}
}
// worker processes requests from the queue
func (p *ProxyServer) worker(id int) {
klog.V(2).Infof("Worker %d started", id)
for req := range p.queue {
p.proxyRequest(req) // Process each request from the queue
}
}
// getBackendsForHost returns the backend configurations for a given host
func (p *ProxyServer) getBackendsForHost(host string) ([]BackendConfig, bool) {
if backends, found := p.config.Load(host); found {
return backends.([]BackendConfig), true
}
return nil, false
}
func (p *ProxyServer) getClientForHost(hostBackend string, timeout float32) (*http.Client) {
timeoutStr := fmt.Sprintf("%f", timeout)
hostBackend = hostBackend+timeoutStr
if client, found := p.clients.Load(hostBackend); found {
return client.(*http.Client)
}
client := &http.Client{
//The timeout includes connection time, any
// redirects, and reading the response body. The timer remains
// running after Get, Head, Post, or Do return and will
// interrupt reading of the Response.Body.
Timeout: (time.Duration(timeout) + 1) * time.Second,
Transport: &http.Transport{
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
MaxIdleConnsPerHost: 10,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, // Skip certificate verification (not recommended in production)
DialContext: (&net.Dialer{
// Timeout is the maximum amount of time a dial will wait for
// a connect to complete.
Timeout: time.Duration(timeout) * time.Second,
}).DialContext,
},
}
p.clients.Store(hostBackend, client)
return client
}
// proxyRequest forwards the request to the backend with retries based on the configuration
func (p *ProxyServer) proxyRequest(r *ForwardedRequest) {
if r == nil || r.Req == nil {
klog.Error("Received a nil request, skipping processing")
return
}
host := r.Req.Host
host, _, _ = strings.Cut(host, ":") // strip port
backends, found := p.getBackendsForHost(host)
p.totalRequests.WithLabelValues(host).Inc()
p.totalFailed.WithLabelValues(host).Add(0) // just to let metrics visible
if !found || len(backends) == 0 {
p.totalFailed.WithLabelValues(host).Inc() // Increment failed request counter
klog.V(1).Infof("Error host: '%v' not found in config file, dropping request", host)
return
}
var lastErr error
for _, backend := range backends {
p.totalForwarded.WithLabelValues(host, backend.Backend).Add(0)
client := p.getClientForHost(host+backend.Backend, backend.Timeout)
if client == nil {
klog.V(1).Infof("Client for backend %s is nil, skipping", backend.Backend)
continue
}
for i := 0; i <= backend.Retries; i++ {
req, err := http.NewRequest(r.Req.Method, backend.Backend+r.Req.URL.Path, bytes.NewReader(r.Body))
if err != nil {
klog.V(4).Infof("Message failed for host %s with error: %v", backend.Backend, err)
lastErr = err
continue
}
req.Header = r.Req.Header
resp, err := client.Do(req)
if err != nil {
klog.V(1).Infof("Failed to process request to %s with error: %v", backend.Backend, err)
lastErr = err
p.totalRetries.WithLabelValues(host, backend.Backend).Inc()
time.Sleep(time.Duration(backend.Delay) * time.Second)
continue
}
// Read and discard response body
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
klog.V(1).Infof("Failed to read response body from %v: %v", backend.Backend+r.Req.URL.Path, err)
}
resp.Body.Close()
if resp.StatusCode < 400 {
p.totalForwarded.WithLabelValues(host, backend.Backend).Inc()
p.totalRetries.WithLabelValues(host, backend.Backend).Add(0)
klog.V(4).Infof("Request success to %s", backend.Backend)
return
} else {
klog.V(4).Infof("Request to %s failed on not acceptable status code %v", backend.Backend, resp.StatusCode)
p.totalRetries.WithLabelValues(host, backend.Backend).Inc()
time.Sleep(time.Duration(backend.Delay) * time.Second)
continue
}
}
}
p.totalFailed.WithLabelValues(host).Inc()
klog.V(1).Infof("All backends failed for host %s: %v", host, lastErr)
}
// handleIncomingRequest queues incoming requests
func (p *ProxyServer) handleIncomingRequest(w http.ResponseWriter, r *http.Request) {
// Increment failed body read counter if the body can't be read
body, err := ioutil.ReadAll(r.Body)
if err != nil {
p.totalFailedBodyRead.Inc()
http.Error(w, "Failed to read body", http.StatusBadRequest)
return
}
r.Body.Close() // Close the original body
// Try to add the request to the queue
select {
case p.queue <- &ForwardedRequest{Req: r, Body: body}:
//log.Printf(w, "Request queued\n")
default:
p.totalDropped.Inc() // Increment dropped requests counter
http.Error(w, "Queue is full", http.StatusServiceUnavailable)
}
}
func getEnv(key string, defaultValue string) string {
if value, exists := os.LookupEnv(key); exists {
return value
}
return defaultValue
}
func main() {
klog.InitFlags(nil)
var conf Opts
_, err := flags.Parse(&conf)
if err != nil {
klog.Fatalf("Error parsing flags: %v", err)
}
flag.Set("v", fmt.Sprintf("%d", conf.LogLevel))
// Create a new proxy server with the path to the YAML config
proxy := NewProxyServer(conf.ConfigPath, conf.QueueSize, conf.WorkerCount, conf.TerminateFile) // queue size = 100, worker count = 5
// HTTP handler for incoming requests
http.HandleFunc("/", proxy.handleIncomingRequest)
// Start the Prometheus metrics server on a separate port
go func() {
metricsMux := http.NewServeMux()
metricsMux.Handle("/metrics", promhttp.Handler())
klog.V(1).Infof("Prometheus metrics server listening on %s", conf.MetricsAddress)
err := http.ListenAndServe(conf.MetricsAddress, metricsMux)
if err != nil {
klog.Fatalf("Failed to start server: %v", err) // %v formats the error as a string
}
}()
// Start the proxy server
klog.V(1).Infof("Proxy server is listening on %s", conf.ListenAddress)
err = http.ListenAndServe(conf.ListenAddress, nil)
if err != nil {
klog.Fatalf("Failed to start server: %v", err) // %v formats the error as a string
}
}