Skip to content

Commit

Permalink
add terminate file
Browse files Browse the repository at this point in the history
  • Loading branch information
tombokombo committed Nov 6, 2024
1 parent 218cb35 commit a9d15be
Showing 1 changed file with 21 additions and 3 deletions.
24 changes: 21 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Opts struct {
MetricsAddress string `long:"metrics_address" short:"m" env:"METRICS_ADDRESS" description:"metrics bind address" default:":9091"`
ConfigPath string `long:"config_path" short:"c" env:"CONFIG_PATH" description:"path to backend map file in yaml format" default:"/etc/backends.yaml"`
LogLevel int `long:"loglevel" short:"v" env:"LOG_LEVEL" description:"log verbosity level for klog" default:"2"`
TerminateFile string `long:"terminate_file_path" short:"t" env:"TERMINATE_FILE_PATH" description:"if file exists will terminate" default:""`
}

// BackendConfig represents the configuration for each backend
Expand Down Expand Up @@ -70,7 +71,7 @@ type ForwardedRequest struct {
}

// NewProxyServer initializes a new ProxyServer with Prometheus metrics and worker goroutines
func NewProxyServer(configPath string, queueSize, workerCount int) *ProxyServer {
func NewProxyServer(configPath string, queueSize, workerCount int, terminateFile string) *ProxyServer {
ps := &ProxyServer{
configPath: configPath,
queue: make(chan *ForwardedRequest, queueSize), // Capped channel
Expand Down Expand Up @@ -105,7 +106,7 @@ func NewProxyServer(configPath string, queueSize, workerCount int) *ProxyServer
[]string{"host"},
),
totalDropped: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pxfd_http_proxy_dropped_total",
Name: "pxfd_http_proxy_queue_full_dropped_total",
Help: "Total number of dropped requests",
}),
totalFailedBodyRead: prometheus.NewCounter(prometheus.CounterOpts{
Expand All @@ -123,6 +124,9 @@ func NewProxyServer(configPath string, queueSize, workerCount int) *ProxyServer

ps.loadConfig() // Load config initially
go ps.reloadConfigPeriodically() // Start goroutine to reload config every 30 seconds
if len(terminateFile) > 0 {
go ps.checkTerminateFile(terminateFile) // Start goroutine to reload config every 30 seconds
}
go ps.updateQueueLengthPeriodically() // Start goroutine to update queue length metrics

// Start worker goroutines
Expand Down Expand Up @@ -204,6 +208,16 @@ func (p *ProxyServer) reloadConfigPeriodically() {
}
}

func (p *ProxyServer) checkTerminateFile(terminateFile string) {
for {
if _, err := os.Stat(terminateFile); err == nil {
klog.V(1).Infof("File exists %s - terminating",terminateFile)
os.Exit(0)
}
time.Sleep(5 * time.Second)
}
}

// updateQueueLengthPeriodically updates the queue length every 10 seconds
func (p *ProxyServer) updateQueueLengthPeriodically() {
for {
Expand Down Expand Up @@ -269,6 +283,7 @@ func (p *ProxyServer) proxyRequest(r *ForwardedRequest) {
backends, found := p.getBackendsForHost(host)

p.totalRequests.WithLabelValues(host).Inc()
p.totalFailed.WithLabelValues(host).Add(0) // just to let metrics visible

if !found || len(backends) == 0 {
p.totalFailed.WithLabelValues(host).Inc() // Increment failed request counter
Expand All @@ -278,6 +293,7 @@ func (p *ProxyServer) proxyRequest(r *ForwardedRequest) {

var lastErr error
for _, backend := range backends {
p.totalForwarded.WithLabelValues(host, backend.Backend).Add(0)
client := p.getClientForHost(host+backend.Backend, backend.Timeout)
if client == nil {
klog.V(1).Infof("Client for backend %s is nil, skipping", backend.Backend)
Expand Down Expand Up @@ -310,10 +326,12 @@ func (p *ProxyServer) proxyRequest(r *ForwardedRequest) {

if resp.StatusCode < 400 {
p.totalForwarded.WithLabelValues(host, backend.Backend).Inc()
p.totalRetries.WithLabelValues(host, backend.Backend).Add(0)
klog.V(4).Infof("Request success to %s", backend.Backend)
return
} else {
klog.V(4).Infof("Request to %s failed on not acceptable status code %v", backend.Backend, resp.StatusCode)
p.totalRetries.WithLabelValues(host, backend.Backend).Inc()
time.Sleep(time.Duration(backend.Delay) * time.Second)
continue
}
Expand Down Expand Up @@ -363,7 +381,7 @@ func main() {
flag.Set("v", fmt.Sprintf("%d", conf.LogLevel))

// Create a new proxy server with the path to the YAML config
proxy := NewProxyServer(conf.ConfigPath, conf.QueueSize, conf.WorkerCount) // queue size = 100, worker count = 5
proxy := NewProxyServer(conf.ConfigPath, conf.QueueSize, conf.WorkerCount, conf.TerminateFile) // queue size = 100, worker count = 5

// HTTP handler for incoming requests
http.HandleFunc("/", proxy.handleIncomingRequest)
Expand Down

0 comments on commit a9d15be

Please sign in to comment.