From aa0bcc8f1fd551a483c85e47443fe59bdc452b91 Mon Sep 17 00:00:00 2001 From: Germano Eichenberg Date: Mon, 14 Feb 2022 10:50:57 -0300 Subject: [PATCH] Add support for ha, cleanup, refactor Squashed commit of the following: commit 52a9c113b6cfe8c2ada17213f96148855dfcefda Author: Germano Eichenberg Date: Mon Feb 14 10:42:47 2022 -0300 Remove dead code commit 02400a87f867170d46f1c15f8c439f4699d52bb5 Author: Germano Eichenberg Date: Mon Feb 14 10:38:47 2022 -0300 Add call to increment requests_routed_received commit d8b7f16bc560f4e3409efd29be1554b0b3be627c Author: Germano Eichenberg Date: Sun Feb 13 01:26:02 2022 -0300 Fix context cancelled errors, add logging to missing places commit b509fea3097b778231b6baba3d812953682ae32e Author: Germano Eichenberg Date: Sun Feb 13 01:00:52 2022 -0300 Remove context cancellation from globals commit df5f912e6285b45afae3264a617682890ed50880 Author: Germano Eichenberg Date: Sat Feb 12 15:54:37 2022 -0300 Implement graceful shutdowns commit b430cb92ebc7568aa3ff35f41275c7e5263b020a Author: Germano Eichenberg Date: Fri Feb 11 15:27:48 2022 -0300 Re-add context cancellation on FireGlobalRequest commit b4e753e3d97ddae4f1e28aadba23ba3e16e4ca4b Author: Germano Eichenberg Date: Fri Feb 11 15:08:54 2022 -0300 Remove timeout from routeRequest commit 87a2ce909a6311b054e8f5853c87e7e1722a8866 Author: Germano Eichenberg Date: Fri Feb 11 15:01:54 2022 -0300 Remove context call from FireGlobalRequest commit f55744835a69906ff9b8e2699d78e5bcee42ef1d Author: Germano Eichenberg Date: Fri Feb 11 14:38:45 2022 -0300 Add more context around log errors for main handlers commit d3650c507f14ea0f7a94be57a89fa581b361ed89 Author: Germano Eichenberg Date: Fri Feb 11 12:34:23 2022 -0300 Attempt to fix context canceled errors commit 4da3dcd14db4d6ef8eb4daa1afa6de257361457b Author: Germano Eichenberg Date: Fri Feb 11 12:01:12 2022 -0300 Tweak client config to be more permissive commit 9a89bd3b534a2e666f5c8cb6439a124039bc667c Author: Germano Eichenberg Date: Fri Feb 11 11:01:01 2022 -0300 More readme work commit fdf8d31a768e23a9f3b91fea90b0d72cccf770c1 Author: Germano Eichenberg Date: Thu Feb 10 18:01:40 2022 -0300 Fix small typo in readme commit 1d09b86fb63b365b3362cb307ce1ac565a4e4099 Author: Germano Eichenberg Date: Thu Feb 10 17:57:23 2022 -0300 Move HTTP logic into QueueManager, add support for routing requests to other nodes, HA mvp commit ae897b3be7a69a24976f5231e2b04648ad03f45c Author: Germano Eichenberg Date: Wed Feb 9 20:14:56 2022 -0300 Working implementation of memberlist commit 4cf9d372efdf9b675441825d70a3bf2a3dd46c2a Author: Germano Eichenberg Date: Tue Feb 8 19:30:19 2022 -0300 Cleanup, move responsibilities of queue creation from main to queue, handle 401s and invalid tokens in one place --- .gitignore | 2 +- CONFIG.md | 27 ++- README.md | 64 ++++-- e2e/Queue_test.go | 386 ------------------------------------- e2e/util.go | 41 ---- go.mod | 1 + go.sum | 34 ++++ lib/discord.go | 35 +--- lib/distributed_global.go | 88 +++++++++ lib/http.go | 36 ++++ lib/memberlist.go | 45 +++++ lib/memberlist_delegate.go | 24 +++ lib/memberlist_events.go | 23 +++ lib/metrics.go | 15 ++ lib/queue.go | 54 ++++-- lib/queue_manager.go | 265 +++++++++++++++++++++++++ main.go | 141 +++++++------- 17 files changed, 719 insertions(+), 562 deletions(-) delete mode 100644 e2e/Queue_test.go delete mode 100644 e2e/util.go create mode 100644 lib/distributed_global.go create mode 100644 lib/http.go create mode 100644 lib/memberlist.go create mode 100644 lib/memberlist_delegate.go create mode 100644 lib/memberlist_events.go create mode 100644 lib/queue_manager.go diff --git a/.gitignore b/.gitignore index 9e594c6..fea24ab 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ .idea/ +nirn-proxy.* nirn-proxy -nirn-proxy.exe .env *.txt *.log \ No newline at end of file diff --git a/CONFIG.md b/CONFIG.md index 2781f60..652b114 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -1,4 +1,6 @@ # Config +All variables are optional unless stated otherwise + ##### LOG_LEVEL Logrus log level. Passed directly to [ParseLevel](https://github.com/sirupsen/logrus/blob/master/logrus.go#L25-L45) @@ -6,10 +8,10 @@ Logrus log level. Passed directly to [ParseLevel](https://github.com/sirupsen/lo The port to listen for requests on ##### METRICS_PORT -The port for to listen on for metrics +The port to listen for metrics requests on ##### ENABLE_METRICS -Wether to enable and register metrics. Disabling may improve resource usage +Toggle to enable and register metrics. Disabling may improve resource usage ##### ENABLE_PPROF Enables the performance profiling handler. Read more [here](https://github.com/google/pprof/blob/master/doc/README.md) @@ -21,12 +23,27 @@ Decreasing this will improve memory usage, but beware that once a channel buffer ##### OUTBOUND_IP The local address to use when firing requests to discord. -Example: `"120.121.122.123"` +Example: `120.121.122.123` ##### BIND_IP The IP to bind the HTTP server on (both for requests and metrics). 127.0.0.1 will only allow requests coming from the loopback interface. Useful for preventing the proxy from being accessed from outside of LAN, for example. -Example: `"10.0.0.42"` - Would only listen on LAN +Example: `10.0.0.42` - Would only listen on LAN ##### REQUEST_TIMEOUT -Defines the amount of time the proxy will wait for a response from discord. Does not include time waiting for ratelimits to clear. \ No newline at end of file +Defines the amount of time the proxy will wait for a response from discord. Does not include time waiting for ratelimits to clear. + +##### CLUSTER_PORT +Sets the port that's going to be used to communicate with other cluster members. Default 7946 + +##### CLUSTER_MEMBERS +Comma separated list of stable/known members of the cluster. Does not need to include all members, a gossip protocol is used for discovery. You may include a port along with the address and if you don't, CLUSTER_PORT is used. This variable overrides CLUSTER_DNS. + +Example: `10.0.0.2,10.0.0.3:7244` + +##### CLUSTER_DNS +DNS address that will resolve to multiple members of the cluster. Does not need to include all members, a gossip protocol is used for discovery. While this is the recommended method of discovery for Kubernetes or similar, it does come with a limitation, which is that all nodes must use the same port for communication since DNS can't return port information. The port used by the proxy for requests is broadcasted automatically and doesn't need to be the same for nodes. + +If using Kubernetes, create a headless service and use it here for easy clustering. + +Example: `nirn-headless.default.svc.cluster.local` or `nirn.mydomain.com` \ No newline at end of file diff --git a/README.md b/README.md index c35ec38..6ffc7de 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,11 @@ # Nirn-proxy -Nirn is a transparent & dynamic HTTP proxy that handles Discord ratelimits for you and exports meaningful prometheus metrics. It is considered beta software but is being used in production by [Dyno](https://dyno.gg) on the scale of hundreds of requests per second. +Nirn-proxy is a highly available, transparent & dynamic HTTP proxy that handles Discord ratelimits for you and exports meaningful prometheus metrics. It is considered beta software but is being used in production by [Dyno](https://dyno.gg) on the scale of hundreds of requests per second. + +It is designed to be minimally invasive and exploits common library patterns to make the adoption as simple as a URL change. #### Features + +- Highly available, horizontally scalable - Transparent ratelimit handling, per-route and global - Multi-bot support with automatic detection for elevated REST limits (big bot sharding) - Works with any API version (Also supports using two or more versions for the same bot) @@ -17,17 +21,20 @@ The proxy sits between the client and discord. Essentially, instead of pointing Configuration options are -| Variable | Value | Default | -|-----------------|--------|---------| -| LOG_LEVEL | panic, fatal, error, warn, info, debug, trace | info | -| PORT | number | 8080 | -| METRICS_PORT | number | 9000 | -| ENABLE_METRICS | boolean| true | -| ENABLE_PPROF | boolean| false | -| BUFFER_SIZE | number | 50 | -| OUTBOUND_IP | string | "" | -| BIND_IP | string | 0.0.0.0 | -| REQUEST_TIMEOUT | number (milliseconds) | 5000 | +| Variable | Value | Default | +|-----------------|-----------------------------------------------|-------------------------| +| LOG_LEVEL | panic, fatal, error, warn, info, debug, trace | info | +| PORT | number | 8080 | +| METRICS_PORT | number | 9000 | +| ENABLE_METRICS | boolean | true | +| ENABLE_PPROF | boolean | false | +| BUFFER_SIZE | number | 50 | +| OUTBOUND_IP | string | "" | +| BIND_IP | string | 0.0.0.0 | +| REQUEST_TIMEOUT | number (milliseconds) | 5000 | +| CLUSTER_PORT | number | 7946 | +| CLUSTER_MEMBERS | string list (comma separated) | "" | +| CLUSTER_DNS | string | "" | Information on each config var can be found [here](https://github.com/germanoeich/nirn-proxy/blob/main/CONFIG.md) @@ -61,14 +68,37 @@ This will vary depending on your usage, how many unique routes you see, etc. For ### Metrics -| Key | Labels | Description | -|-------------------|----------------------------------------|------------------------------------------------| -|nirn_proxy_error | none | Counter for errors | -|nirn_proxy_requests| method, status, route, clientId | Histogram that keeps track of all request metrics| -|nirn_proxy_open_connections| none | Gauge for open client connections with the proxy| +| Key | Labels | Description | +|------------------------------------|----------------------------------------|------------------------------------------------------------| +|nirn_proxy_error | none | Counter for errors | +|nirn_proxy_requests | method, status, route, clientId | Histogram that keeps track of all request metrics | +|nirn_proxy_open_connections | none | Gauge for open client connections with the proxy | +|nirn_proxy_requests_routed_sent | none | Counter for requests routed to other nodes | +|nirn_proxy_requests_routed_received | none | Counter for requests received from other nodes | +|nirn_proxy_requests_routed_error | none | Counter for requests routed that failed | Note: 429s can produce two status: 429 Too Many Requests or 429 Shared. The latter is only produced for requests that return with the x-ratelimit-scope header set to "shared", which means they don't count towards the cloudflare firewall limit and thus should not be used for alerts, etc. +### High availability + +The proxy can be run in a cluster by setting either `CLUSTER_MEMBERS` or `CLUSTER_DNS` env vars. When in cluster mode, all nodes are a suitable gateway for all requests and the proxy will route requests consistently using the bucket hash. + +It's recommended that all nodes are reachable through LAN. Please reach out if a WAN cluster is desired for your use case. + +If a node fails, there is a brief period where it will be unhealthy but requests will still be routed to it. When these requests fail, the proxy will mock a 429 to send back to the user. The 429 will signal the client to wait 1s and will have a custom header `generated-by-proxy`. This is done in order to allow seamless retries when a member fails. If you want to backoff, use the custom header to override your lib retry logic. + +The cluster uses [SWIM](https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf), which is an [AP protocol](https://en.wikipedia.org/wiki/CAP_theorem) and is powered by hashicorps excellent [memberlist](https://github.com/hashicorp/memberlist) implementation. + +Being an AP system means that the cluster will tolerate a network partition and needs no quorum to function. In case a network partition occurs, you'll have two clusters running independently, which may or may not be desirable. Configure your network accordingly. + +In case you want to specifically target a node (i.e, for troubleshooting), set the `nirn-routed-to` header on the request. The value doesn't matter. This will prevent the node from routing the request to another node. + +During recovery periods or when nodes join/leave the cluster, you might notice increased 429s. This is expected since the hashing table is changing as members change. Once the cluster settles into a stable state, it'll go back to normal. + +Global ratelimits are handled by a single node on the cluster, however this affinity is soft. There is no concept of leader or elections and if this node leaves, the cluster will simply pick a new one. This is a bottleneck and might increase tail latency, but the other options were either too complex, required an external storage, or would require quorum for the proxy to function. Webhooks and other requests with no token bypass this mechanism completely. + +The best deployment strategy for the cluster is to kill nodes one at a time, preferably with the replacement node already up. + ### Profiling The proxy can be profiled at runtime by enabling the ENABLE_PPROF flag and browsing to `http://ip:7654/debug/pprof/` diff --git a/e2e/Queue_test.go b/e2e/Queue_test.go deleted file mode 100644 index c743249..0000000 --- a/e2e/Queue_test.go +++ /dev/null @@ -1,386 +0,0 @@ -package e2e - -import ( - "fmt" - "github.com/germanoeich/nirn-proxy/lib" - "github.com/sirupsen/logrus/hooks/test" - "github.com/stretchr/testify/assert" - "math/rand" - "net/http" - "net/http/httptest" - "runtime" - "strconv" - "strings" - "sync" - "sync/atomic" - "testing" - "time" -) - -var logger, hook = test.NewNullLogger() -func Init() int { - rand.Seed(time.Now().Unix()) - lib.SetLogger(logger) - return 0 -} -var _ = Init() - -var server_200_noheaders = httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { - res.WriteHeader(200) - res.Write([]byte("body")) -})) - -// global, reset: 500ms -var server_429_global_500 = httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { - res.Header().Set("x-ratelimit-global", "true") - res.Header().Set("x-ratelimit-reset-after", "0.5") - res.WriteHeader(429) - res.Write([]byte("body")) -})) - -// remain: 0, limit: 1, reset: 500ms -var server_429_0_1_500 = httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { - res.Header().Set("x-ratelimit-reset-after", "0.5") - res.Header().Set("x-ratelimit-remaining", "0") - res.Header().Set("x-ratelimit-limit", "1") - res.WriteHeader(429) - res.Write([]byte("body")) -})) - -// remain: 0, limit: 1, reset: 1ms -var server_429_0_1_1 = httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { - res.Header().Set("x-ratelimit-reset-after", "0.001") - res.Header().Set("x-ratelimit-remaining", "0") - res.Header().Set("x-ratelimit-limit", "1") - res.WriteHeader(429) - res.Write([]byte("body")) -})) - - -func TestQueueWorks(t *testing.T) { - var count int64 = 0 - genericProcessor := func(item *lib.QueueItem) (*http.Response, error) { - req, _ := http.NewRequest(http.MethodGet, server_200_noheaders.URL, nil) - res, _ := http.DefaultClient.Do(req) - go atomic.AddInt64(&count, 1) - return res, nil - } - - q := lib.NewRequestQueue(genericProcessor, 50, 50) - for i := 0; i < 50; i++ { - req := httptest.NewRequest("GET", "/api/v9/guilds/915995872213471273/audit-logs", nil) - go q.Queue(req, nil) - } - - <- time.After(200 * time.Millisecond) - assert.Equal(t, int64(50), count) -} - -func TestQueueFiresSequentially(t *testing.T) { - var count int64 = 0 - mu := sync.RWMutex{} - mu.Lock() - genericProcessor := func(item *lib.QueueItem) (*http.Response, error) { - mu.RLock() - req, _ := http.NewRequest(http.MethodGet, server_200_noheaders.URL, nil) - res, _ := http.DefaultClient.Do(req) - if strings.Contains(item.Req.URL.Path, "2") { - <- time.After(250 * time.Millisecond) - } - go atomic.AddInt64(&count, 1) - return res, nil - } - - q := lib.NewRequestQueue(genericProcessor, 100, 100) - for i := 0; i < 100; i++ { - // Force a "sequence" inside the internal channel - time.Sleep(2 * time.Millisecond) - uri := "/api/v9/guilds/111111111111111111/messages/111111111111111111" - if i == 30 { - uri = "/api/v9/guilds/111111111111111111/messages/111111111111111112" - } - req, _ := http.NewRequest("GET", uri, nil) - go q.Queue(req, nil) - } - - mu.Unlock() - <- time.After(100 * time.Millisecond) - assert.Equal(t, int64(30), count) - <- time.After(500 * time.Millisecond) - assert.Equal(t, int64(100), count) -} - -func TestQueueLocksOnDiscordGlobal(t *testing.T) { - var count int64 = 0 - mu := sync.RWMutex{} - mu.Lock() - genericProcessor := func(item *lib.QueueItem) (*http.Response, error) { - mu.RLock() - req, _ := http.NewRequest(http.MethodGet, server_429_global_500.URL, nil) - res, _ := http.DefaultClient.Do(req) - go atomic.AddInt64(&count, 1) - return res, nil - } - - q := lib.NewRequestQueue(genericProcessor, 50, 100) - for i := 0; i < 2; i++ { - uri := "/api/v9/guilds/111111111111111111/messages/111111111111111111" - req, _ := http.NewRequest("GET", uri, nil) - go q.Queue(req, nil) - } - - mu.Unlock() - <- time.After(100 * time.Millisecond) - assert.Equal(t, int64(1), count) - <- time.After(550 * time.Millisecond) - assert.Equal(t, int64(2), count) -} - -func TestQueueGlobalRatelimitWorks(t *testing.T) { - var count int64 = 0 - mu := sync.RWMutex{} - mu.Lock() - genericProcessor := func(item *lib.QueueItem) (*http.Response, error) { - mu.RLock() - req, _ := http.NewRequest(http.MethodGet, server_200_noheaders.URL, nil) - res, _ := http.DefaultClient.Do(req) - go atomic.AddInt64(&count, 1) - return res, nil - } - - q := lib.NewRequestQueue(genericProcessor, 50, 100) - for i := 0; i < 70; i++ { - uri := "/api/v9/guilds/111111111111111111/messages/111111111111111111" - req, _ := http.NewRequest("GET", uri, nil) - go q.Queue(req, nil) - } - - mu.Unlock() - <- time.After(250 * time.Millisecond) - assert.Equal(t, int64(50), count) - <- time.After(1100 * time.Millisecond) - assert.Equal(t, int64(70), count) -} - -func TestQueueWorksOnMultipleChannels(t *testing.T) { - // This test relies on the fact that a bucket will lock when it encounters a 429 - var count int64 = 0 - mu := sync.RWMutex{} - mu.Lock() - genericProcessor := func(item *lib.QueueItem) (*http.Response, error) { - mu.RLock() - req, _ := http.NewRequest(http.MethodGet, server_429_0_1_500.URL, nil) - res, _ := http.DefaultClient.Do(req) - go atomic.AddInt64(&count, 1) - return res, nil - } - - q := lib.NewRequestQueue(genericProcessor, 9999, 100) - for i := 0; i < 99; i++ { - indexstr := strconv.Itoa(i) - //Generate a unique bucket per route - uri := "/api/v9/guilds/1111111111111111" + indexstr + "/messages/111111111111111111" - req, _ := http.NewRequest("GET", uri, nil) - go q.Queue(req, nil) - } - - mu.Unlock() - <- time.After(200 * time.Millisecond) - assert.Equal(t, int64(99), count) -} - -func TestQueueBucketLocksUnlocksOn429(t *testing.T) { - var count int64 = 0 - mu := sync.RWMutex{} - mu.Lock() - genericProcessor := func(item *lib.QueueItem) (*http.Response, error) { - mu.RLock() - req, _ := http.NewRequest(http.MethodGet, server_429_0_1_500.URL, nil) - res, _ := http.DefaultClient.Do(req) - go atomic.AddInt64(&count, 1) - return res, nil - } - - q := lib.NewRequestQueue(genericProcessor, 9999, 100) - for i := 0; i < 3; i++ { - uri := "/api/v9/guilds/111111111111111111/messages/111111111111111111" - req, _ := http.NewRequest("GET", uri, nil) - go q.Queue(req, nil) - } - - mu.Unlock() - <- time.After(100 * time.Millisecond) - assert.Equal(t, int64(1), count) - <- time.After(500 * time.Millisecond) - assert.Equal(t, int64(2), count) - <- time.After(500 * time.Millisecond) - assert.Equal(t, int64(3), count) -} - -// This test is non-deterministic and random in nature -func TestQueueRandomPermutationsFireSimultaneously(t *testing.T) { - var count int64 = 0 - mu := sync.RWMutex{} - mu.Lock() - genericProcessor := func(item *lib.QueueItem) (*http.Response, error) { - mu.RLock() - req, _ := http.NewRequest(http.MethodGet, server_200_noheaders.URL, nil) - res, err := http.DefaultClient.Do(req) - if err != nil { - fmt.Println(err) - } - go atomic.AddInt64(&count, 1) - return res, nil - } - - q := lib.NewRequestQueue(genericProcessor, 1000, 100) - for i := 0; i < 3000; i++ { - uri := GetRandomRoute() - req, err := http.NewRequest("GET", uri, nil) - if err != nil { - fmt.Println(err) - } - go q.Queue(req, nil) - } - - mu.Unlock() - <- time.After(5000 * time.Millisecond) - assert.Equal(t, int64(3000), count) - runtime.GC() -} - -// This test is non-deterministic and random in nature -func TestQueueRandomPermutationsFireSequentially(t *testing.T) { - var count int64 = 0 - genericProcessor := func(item *lib.QueueItem) (*http.Response, error) { - req, _ := http.NewRequest(http.MethodGet, server_200_noheaders.URL, nil) - res, err := http.DefaultClient.Do(req) - if err != nil { - fmt.Println(err) - } - go atomic.AddInt64(&count, 1) - return res, nil - } - - q := lib.NewRequestQueue(genericProcessor, 1000, 100) - for i := 0; i < 3000; i++ { - uri := GetRandomRoute() - req, err := http.NewRequest("GET", uri, nil) - if err != nil { - fmt.Println(err) - } - q.Queue(req, nil) - } - - <- time.After(100 * time.Millisecond) - assert.Equal(t, int64(3000), count) - runtime.GC() -} - -// This test is non-deterministic and random in nature -func TestQueueRandomPermutationsFireRandomDelay(t *testing.T) { - var count int64 = 0 - genericProcessor := func(item *lib.QueueItem) (*http.Response, error) { - req, _ := http.NewRequest(http.MethodGet, server_200_noheaders.URL, nil) - res, err := http.DefaultClient.Do(req) - if err != nil { - fmt.Println(err) - } - go atomic.AddInt64(&count, 1) - return res, nil - } - - q := lib.NewRequestQueue(genericProcessor, 1000, 100) - for i := 0; i < 3000; i++ { - go func() { - // between 0 and 2ms - time.Sleep(time.Duration(rand.Intn(2000)) * time.Microsecond) - uri := GetRandomRoute() - req, err := http.NewRequest("GET", uri, nil) - if err != nil { - fmt.Println(err) - } - q.Queue(req, nil) - }() - - } - - <- time.After(2 * 3000 * time.Millisecond) - assert.Equal(t, int64(3000), count) - runtime.GC() -} - - -// This test is non-deterministic and random in nature -func TestQueueFixedPermutationsFireRandomDelay(t *testing.T) { - var routes []string - for i := 0; i < 15; i++ { - routes = append(routes, GetRandomRoute()) - } - var count int64 = 0 - genericProcessor := func(item *lib.QueueItem) (*http.Response, error) { - req, _ := http.NewRequest(http.MethodGet, server_200_noheaders.URL, nil) - res, err := http.DefaultClient.Do(req) - if err != nil { - fmt.Println(err) - } - go atomic.AddInt64(&count, 1) - return res, nil - } - - q := lib.NewRequestQueue(genericProcessor, 1000, 100) - for i := 0; i < 3000; i++ { - go func() { - // between 0 and 2ms - time.Sleep(time.Duration(rand.Intn(2000)) * time.Microsecond) - uri := routes[rand.Intn(len(routes))] - req, err := http.NewRequest("GET", uri, nil) - if err != nil { - fmt.Println(err) - } - q.Queue(req, nil) - }() - - } - - <- time.After(2 * 3000 * time.Millisecond) - assert.Equal(t, int64(3000), count) - runtime.GC() -} - -// This test is non-deterministic and random in nature -func TestQueueFixedPermutationsFireRandomDelayAll429s(t *testing.T) { - var routes []string - for i := 0; i < 15; i++ { - routes = append(routes, GetRandomRoute()) - } - var count int64 = 0 - genericProcessor := func(item *lib.QueueItem) (*http.Response, error) { - req, _ := http.NewRequest(http.MethodGet, server_429_0_1_1.URL, nil) - res, err := http.DefaultClient.Do(req) - if err != nil { - fmt.Println(err) - } - go atomic.AddInt64(&count, 1) - return res, nil - } - - q := lib.NewRequestQueue(genericProcessor, 1000, 100) - for i := 0; i < 3000; i++ { - go func() { - // between 0 and 2ms - time.Sleep(time.Duration(rand.Intn(2000)) * time.Microsecond) - uri := routes[rand.Intn(len(routes))] - req, err := http.NewRequest("GET", uri, nil) - if err != nil { - fmt.Println(err) - } - q.Queue(req, nil) - }() - - } - - <- time.After(3 * 3000 * time.Millisecond) - assert.Equal(t, int64(3000), count) - runtime.GC() -} \ No newline at end of file diff --git a/e2e/util.go b/e2e/util.go deleted file mode 100644 index 35ddfa0..0000000 --- a/e2e/util.go +++ /dev/null @@ -1,41 +0,0 @@ -package e2e - -import ( - "github.com/bwmarrin/snowflake" - "math/rand" - "strconv" - "strings" -) - -var node, _ = snowflake.NewNode(1) - -var routes = []string{ - "/api/v9/guilds/!/audit-logs", - "/api/v9/guilds/!/members/!", - "/api/v9/channels/!/messages", - "/api/v9/channels/!", - "/api/v9/interactions/!/aW50ZXJhY3Rpb246OTE1ODAxMzMwMzQ4NjAxNDA1OkNoQml5bXJ3TUw5WGNIN2NSdFRMNlVAHFGWm1EUUVTSW84a3ZIY0FyQzRpRFQ4YUVqOXFpR09Idkd4Y3Fsc09kblFDbzQyZEh5cmJTblZwYXd1eXZqbmFVOURyVk5ScDNWODhOVEx2dnVObXVJZzQzaW5Rd3ZFa0JVdTFvdXBB", - "/api/v9/users/!/channels", - "/api/v9/guilds/!/channels", - "/api/v9/guilds/!/members/!/roles/!", - "/api/v9/channels/!/messages/!", - "/api/v9/guilds/!/bans/!", - "/api/v9/guilds/!/bans", - "/api/v9/guilds/!/webhooks", - "/api/v9/users/!", - "/api/v9/webhooks/!", - "/api/v9/guilds/!/roles/!", - "/api/v9/guilds/!/invites", -} - -func GenerateSnowflake() int64 { - return int64(node.Generate()) -} - -func GenerateSnowflakeStr() string { - return strconv.FormatInt(GenerateSnowflake(), 10) -} - -func GetRandomRoute() string { - return strings.ReplaceAll(routes[rand.Intn(len(routes))], "!", GenerateSnowflakeStr()) -} \ No newline at end of file diff --git a/go.mod b/go.mod index bec3a4b..0f5af6e 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.16 require ( github.com/Clever/leakybucket v1.2.0 github.com/bwmarrin/snowflake v0.3.0 // indirect + github.com/hashicorp/memberlist v0.3.1 // indirect github.com/joho/godotenv v1.4.0 // indirect github.com/prometheus/client_golang v1.11.0 github.com/sirupsen/logrus v1.8.1 diff --git a/go.sum b/go.sum index 10b5734..2e8eb38 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I= +github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/aws/aws-sdk-go v1.29.31/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTgahTma5Xg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -40,6 +42,8 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -47,6 +51,21 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= +github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-msgpack v0.5.3 h1:zKjpN5BK/P5lMYrLmBHdBULWbJ0XpYR+7NGzqkZzoD4= +github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= +github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= +github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= +github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs= +github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= +github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/memberlist v0.3.1 h1:MXgUXLqva1QvpVEDQW1IQLG0wivQAtmFlHRQ+1vWZfM= +github.com/hashicorp/memberlist v0.3.1/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg= github.com/joho/godotenv v1.4.0/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= @@ -64,12 +83,15 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/miekg/dns v1.1.26 h1:gPxPSwALAeHJSjarOs00QjVdV9QoBvc1D2ujQUr5BzU= +github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -93,6 +115,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= +github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= @@ -107,18 +131,24 @@ github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -126,6 +156,8 @@ golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -141,6 +173,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/lib/discord.go b/lib/discord.go index 52b3318..a94797f 100644 --- a/lib/discord.go +++ b/lib/discord.go @@ -10,13 +10,10 @@ import ( "math" "net" "net/http" - "strings" "time" ) -var client *http.Client = &http.Client{ - Timeout: 60 * time.Second, -} +var client *http.Client var contextTimeout time.Duration @@ -57,10 +54,10 @@ func createTransport(ip string) http.RoundTripper { transport := http.Transport{ ForceAttemptHTTP2: true, - MaxIdleConns: 100, + MaxIdleConns: 1000, IdleConnTimeout: 90 * time.Second, TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, + ExpectContinueTimeout: 2 * time.Second, DialContext: dialContext, ResponseHeaderTimeout: 0, } @@ -90,7 +87,8 @@ func GetBotGlobalLimit(token string) (uint, error) { switch { case bot.StatusCode == 401: - return 0, errors.New("invalid token - nirn-proxy") + // In case a 401 is encountered, we return math.MaxUint32 to allow requests through to fail fast + return math.MaxUint32, errors.New("invalid token - nirn-proxy") case bot.StatusCode == 429: return 0, errors.New("429 on gateway/bot") case bot.StatusCode == 500: @@ -148,18 +146,6 @@ func GetBotUser(token string) (*BotUserResponse, error) { return &s, nil } -func copyHeader(dst, src http.Header) { - dst["Date"] = nil - dst["Content-Type"] = nil - for k, vv := range src { - for _, v := range vv { - if k != "Content-Length" { - dst[strings.ToLower(k)] = []string{v} - } - } - } -} - func doDiscordReq(ctx context.Context, path string, method string, body io.ReadCloser, header http.Header, query string) (*http.Response, error) { discordReq, err := http.NewRequestWithContext(ctx, method, "https://discord.com" + path + "?" + query, body) discordReq.Header = header @@ -218,17 +204,8 @@ func ProcessRequest(ctx context.Context, item *QueueItem) (*http.Response, error "discordBucket": discordResp.Header.Get("x-ratelimit-bucket"), }).Debug("Discord request") - body, err := ioutil.ReadAll(discordResp.Body) - if err != nil { - res.WriteHeader(500) - _, _ = res.Write([]byte(err.Error())) - return nil, err - } - - copyHeader(res.Header(), discordResp.Header) - res.WriteHeader(discordResp.StatusCode) + err = CopyResponseToResponseWriter(discordResp, item.Res) - _, err = res.Write(body) if err != nil { return nil, err } diff --git a/lib/distributed_global.go b/lib/distributed_global.go new file mode 100644 index 0000000..b1610cc --- /dev/null +++ b/lib/distributed_global.go @@ -0,0 +1,88 @@ +package lib + +import ( + "context" + "errors" + "github.com/Clever/leakybucket" + "github.com/Clever/leakybucket/memory" + "github.com/sirupsen/logrus" + "net/http" + "strconv" + "sync" + "time" +) + +type ClusterGlobalRateLimiter struct { + sync.RWMutex + globalBucketsMap map[uint64]*leakybucket.Bucket + memStorage *memory.Storage +} + +func NewClusterGlobalRateLimiter() *ClusterGlobalRateLimiter { + memStorage := memory.New() + return &ClusterGlobalRateLimiter{ + memStorage: memStorage, + globalBucketsMap: make(map[uint64]*leakybucket.Bucket), + } +} + +func (c *ClusterGlobalRateLimiter) Take(botHash uint64, botLimit uint) { + bucket := *c.getOrCreate(botHash, botLimit) +takeGlobal: + _, err := bucket.Add(1) + if err != nil { + reset := bucket.Reset() + logger.WithFields(logrus.Fields{ + "waitTime": time.Until(reset), + }).Trace("Failed to grab global token, sleeping for a bit") + time.Sleep(time.Until(reset)) + goto takeGlobal + } +} + +func (c *ClusterGlobalRateLimiter) getOrCreate(botHash uint64, botLimit uint) *leakybucket.Bucket { + c.RLock() + b, ok := c.globalBucketsMap[botHash] + c.RUnlock() + if !ok { + c.Lock() + // Check if it wasn't created while we didnt hold the exclusive lock + b, ok = c.globalBucketsMap[botHash] + if ok { + c.Unlock() + return b + } + + globalBucket, _ := c.memStorage.Create(strconv.FormatUint(botHash, 10), botLimit, 1 * time.Second) + c.globalBucketsMap[botHash] = &globalBucket + c.Unlock() + return &globalBucket + } else { + return b + } +} + + +func (c *ClusterGlobalRateLimiter) FireGlobalRequest(ctx context.Context, addr string, botHash uint64, botLimit uint) error { + globalReq, err := http.NewRequestWithContext(ctx, "GET", "http://" + addr + "/nirn/global", nil) + if err != nil { + return err + } + + globalReq.Header.Set("bot-hash", strconv.FormatUint(botHash, 10)) + globalReq.Header.Set("bot-limit", strconv.FormatUint(uint64(botLimit), 10)) + + // The node handling the request will only return if we grabbed a token or an error was thrown + resp, err := client.Do(globalReq) + logger.Trace("Got go-ahead for global") + + if err != nil { + return err + } + + if resp.StatusCode != 200 { + return errors.New("global request failed with status " + resp.Status) + } + + return nil +} \ No newline at end of file diff --git a/lib/http.go b/lib/http.go new file mode 100644 index 0000000..969e902 --- /dev/null +++ b/lib/http.go @@ -0,0 +1,36 @@ +package lib + +import ( + "io/ioutil" + "net/http" + "strings" +) + +func copyHeader(dst, src http.Header) { + dst["Date"] = nil + dst["Content-Type"] = nil + for k, vv := range src { + for _, v := range vv { + dst[strings.ToLower(k)] = []string{v} + } + } +} + +func CopyResponseToResponseWriter(resp *http.Response, respWriter *http.ResponseWriter) error { + writer := *respWriter + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + writer.WriteHeader(500) + _, _ = writer.Write([]byte(err.Error())) + return err + } + + copyHeader(writer.Header(), resp.Header) + writer.WriteHeader(resp.StatusCode) + + _, err = writer.Write(body) + if err != nil { + return err + } + return nil +} \ No newline at end of file diff --git a/lib/memberlist.go b/lib/memberlist.go new file mode 100644 index 0000000..d153ffa --- /dev/null +++ b/lib/memberlist.go @@ -0,0 +1,45 @@ +package lib + +import ( + "github.com/hashicorp/memberlist" + "os" + "time" +) + +func InitMemberList(knownMembers []string, port int, proxyPort string, manager *QueueManager) *memberlist.Memberlist { + config := memberlist.DefaultLANConfig() + config.BindPort = port + config.AdvertisePort = port + config.Delegate = NirnDelegate{ + proxyPort: proxyPort, + } + + config.Events = manager.GetEventDelegate() + + //DEBUG CODE + if os.Getenv("NODE_NAME") != "" { + config.Name = os.Getenv("NODE_NAME") + config.DeadNodeReclaimTime = 1 * time.Nanosecond + } + + list, err := memberlist.Create(config) + if err != nil { + panic("Failed to create memberlist: " + err.Error()) + } + + manager.SetCluster(list, proxyPort) + + _, err = list.Join(knownMembers) + if err != nil { + logger.Info("Failed to join existing cluster, ok if this is the first node") + logger.Error(err) + } + + var members string + for _, member := range list.Members() { + members += member.Name + " " + } + + logger.Info("Connected to cluster nodes: [ " + members + "]") + return list +} \ No newline at end of file diff --git a/lib/memberlist_delegate.go b/lib/memberlist_delegate.go new file mode 100644 index 0000000..0ed35ce --- /dev/null +++ b/lib/memberlist_delegate.go @@ -0,0 +1,24 @@ +package lib + +import "github.com/hashicorp/memberlist" + +type NirnDelegate struct { + memberlist.Delegate + proxyPort string +} + +func (d NirnDelegate) NodeMeta(limit int) []byte { + return []byte(d.proxyPort) +} + +func (d NirnDelegate) NotifyMsg(msg []byte) {} + +func (d NirnDelegate) GetBroadcasts(overhead int, limit int) [][]byte { + return [][]byte{} +} + +func (d NirnDelegate) LocalState(join bool) []byte { + return []byte{} +} + +func (d NirnDelegate) MergeRemoteState(buf []byte, join bool) {} diff --git a/lib/memberlist_events.go b/lib/memberlist_events.go new file mode 100644 index 0000000..69938a0 --- /dev/null +++ b/lib/memberlist_events.go @@ -0,0 +1,23 @@ +package lib + +import "github.com/hashicorp/memberlist" + +type NirnEvents struct { + memberlist.EventDelegate + OnJoin func(node *memberlist.Node) + OnLeave func(node *memberlist.Node) +} + +func formatNodeInfo(node *memberlist.Node) string { + return node.Name + " - " + node.Address() + " - listenport: " + string(node.Meta) +} + +func (d NirnEvents) NotifyJoin(node *memberlist.Node) { + logger.Info("Node joined the cluster: " + formatNodeInfo(node)) + d.OnJoin(node) +} +func (d NirnEvents) NotifyLeave(node *memberlist.Node) { + logger.Info("Node left the cluster: " + formatNodeInfo(node)) + d.OnLeave(node) +} +func (d NirnEvents) NotifyUpdate(node *memberlist.Node) {} \ No newline at end of file diff --git a/lib/metrics.go b/lib/metrics.go index bff7834..d43782b 100644 --- a/lib/metrics.go +++ b/lib/metrics.go @@ -23,6 +23,21 @@ var ( Name: "nirn_proxy_open_connections", Help: "Gauge for client connections currently open with the proxy", }) + + RequestsRoutedSent = promauto.NewCounter(prometheus.CounterOpts{ + Name: "nirn_proxy_requests_routed_sent", + Help: "Counter for requests routed from this node into other nodes", + }) + + RequestsRoutedRecv = promauto.NewCounter(prometheus.CounterOpts{ + Name: "nirn_proxy_requests_routed_received", + Help: "Counter for requests received from other nodes", + }) + + RequestsRoutedError = promauto.NewCounter(prometheus.CounterOpts{ + Name: "nirn_proxy_requests_routed_error", + Help: "Counter for failed requests routed from this node", + }) ) func StartMetrics(addr string) { diff --git a/lib/queue.go b/lib/queue.go index 466e435..22e41b4 100644 --- a/lib/queue.go +++ b/lib/queue.go @@ -38,14 +38,37 @@ type RequestQueue struct { user *BotUserResponse identifier string isTokenInvalid *int64 + botLimit uint } -func NewRequestQueue(processor func(ctx context.Context, item *QueueItem) (*http.Response, error), globalLimit uint, bufferSize int, user *BotUserResponse) *RequestQueue { +func NewRequestQueue(processor func(ctx context.Context, item *QueueItem) (*http.Response, error), token string, bufferSize int) (*RequestQueue, error) { + limit, err := GetBotGlobalLimit(token) memStorage := memory.New() - globalBucket, err := memStorage.Create("global", globalLimit, 1 * time.Second) + globalBucket, _ := memStorage.Create("global", limit, 1 * time.Second) if err != nil { - panic(err) + if strings.HasPrefix(err.Error(), "invalid token") { + // Return a queue that will only return 401s + var invalid = new(int64) + *invalid = 999 + return &RequestQueue{ + queues: make(map[uint64]*QueueChannel), + processor: processor, + globalBucket: globalBucket, + globalLockedUntil: new(int64), + bufferSize: bufferSize, + user: nil, + identifier: "InvalidTokenQueue", + isTokenInvalid: invalid, + botLimit: limit, + }, nil + } + return nil, err + } + + user, err := GetBotUser(token) + if err != nil && token != "" { + return nil, err } identifier := "NoAuth" @@ -59,13 +82,16 @@ func NewRequestQueue(processor func(ctx context.Context, item *QueueItem) (*http globalBucket: globalBucket, globalLockedUntil: new(int64), bufferSize: bufferSize, - user: user, - identifier: identifier, - isTokenInvalid: new(int64), + user: user, + identifier: identifier, + isTokenInvalid: new(int64), + botLimit: limit, } + logger.WithFields(logrus.Fields{ "globalLimit": limit, "identifier": identifier, "bufferSize": bufferSize }).Info("Created new queue") + go ret.tickSweep() - return ret + return ret, nil } func (q *RequestQueue) sweep() { @@ -91,15 +117,14 @@ func (q *RequestQueue) tickSweep() { } } -func (q *RequestQueue) Queue(req *http.Request, res *http.ResponseWriter) (string, *http.Response, error) { - path := GetOptimisticBucketPath(req.URL.Path, req.Method) +func (q *RequestQueue) Queue(req *http.Request, res *http.ResponseWriter, path string, pathHash uint64) (string, *http.Response, error) { logger.WithFields(logrus.Fields{ "bucket": path, "path": req.URL.Path, "method": req.Method, }).Trace("Inbound request") - ch := q.getQueueChannel(path) + ch := q.getQueueChannel(path, pathHash) doneChan := make(chan *http.Response) errChan := make(chan error) @@ -112,8 +137,7 @@ func (q *RequestQueue) Queue(req *http.Request, res *http.ResponseWriter) (strin } } -func (q *RequestQueue) getQueueChannel(path string) *QueueChannel { - pathHash := HashCRC64(path) +func (q *RequestQueue) getQueueChannel(path string, pathHash uint64) *QueueChannel { q.Lock() defer q.Unlock() t := time.Now() @@ -185,6 +209,7 @@ func parseHeaders(headers *http.Header) (int64, int64, time.Duration, bool, erro func (q *RequestQueue) takeGlobal(path string) { takeGlobal: waitTime := atomic.LoadInt64(q.globalLockedUntil) + if waitTime > 0 { logger.WithFields(logrus.Fields{ "bucket": path, @@ -196,6 +221,7 @@ takeGlobal: logger.Info("Unlocked global bucket") } } + _, err := q.globalBucket.Add(1) if err != nil { reset := q.globalBucket.Reset() @@ -254,6 +280,7 @@ func (q *RequestQueue) subscribe(ch *QueueChannel, path string, pathHash uint64) // Fail fast path for webhook 404s var ret404 = false for item := range ch.ch { + ctx := context.WithValue(item.Req.Context(), "identifier", q.identifier) if ret404 { return404webhook(item) continue @@ -266,7 +293,6 @@ func (q *RequestQueue) subscribe(ch *QueueChannel, path string, pathHash uint64) continue } - ctx := context.WithValue(item.Req.Context(), "identifier", q.identifier) resp, err := q.processor(ctx, item) if err != nil { item.errChan <- err @@ -321,7 +347,7 @@ func (q *RequestQueue) subscribe(ch *QueueChannel, path string, pathHash uint64) if resp.StatusCode == 401 { // Permanently lock this queue - atomic.StoreInt64(q.globalLockedUntil, 999) + atomic.StoreInt64(q.isTokenInvalid, 999) } if remaining == 0 || resp.StatusCode == 429 { time.Sleep(time.Until(time.Now().Add(resetAfter))) diff --git a/lib/queue_manager.go b/lib/queue_manager.go new file mode 100644 index 0000000..155eeb7 --- /dev/null +++ b/lib/queue_manager.go @@ -0,0 +1,265 @@ +package lib + +import ( + "github.com/hashicorp/memberlist" + "github.com/sirupsen/logrus" + "net/http" + "sort" + "strconv" + "sync" + "time" +) + +type QueueManager struct { + sync.RWMutex + queues map[string]*RequestQueue + bufferSize int + cluster *memberlist.Memberlist + clusterGlobalRateLimiter *ClusterGlobalRateLimiter + orderedClusterMembers []string + nameToAddressMap map[string]string + localNodeName string + localNodeIP string + localNodeProxyListenAddr string +} + +func NewQueueManager(bufferSize int) *QueueManager { + q := &QueueManager{ + queues: make(map[string]*RequestQueue), + bufferSize: bufferSize, + cluster: nil, + clusterGlobalRateLimiter: NewClusterGlobalRateLimiter(), + } + + return q +} + +func (m *QueueManager) Shutdown() { + if m.cluster != nil { + m.cluster.Leave(30 * time.Second) + } +} + +func (m *QueueManager) reindexMembers() { + if m.cluster == nil { + logger.Warn("reindexMembers called but cluster is nil") + return + } + + m.Lock() + defer m.Unlock() + + members := m.cluster.Members() + var orderedMembers []string + nameToAddressMap := make(map[string]string) + for _, m := range members { + orderedMembers = append(orderedMembers, m.Name) + nameToAddressMap[m.Name] = m.Addr.String() + ":" + string(m.Meta) + } + sort.Strings(orderedMembers) + + m.orderedClusterMembers = orderedMembers + m.nameToAddressMap = nameToAddressMap +} + +func (m *QueueManager) onNodeJoin(node *memberlist.Node) { + // Running in goroutine prevents a deadlock inside memberlist + go m.reindexMembers() +} +func (m *QueueManager) onNodeLeave(node *memberlist.Node) { + // Running in goroutine prevents a deadlock inside memberlist + go m.reindexMembers() +} + +func (m *QueueManager) GetEventDelegate() *NirnEvents { + return &NirnEvents{ + OnJoin: m.onNodeJoin, + OnLeave: m.onNodeLeave, + } +} + +func (m *QueueManager) SetCluster(cluster *memberlist.Memberlist, proxyPort string) { + m.cluster = cluster + m.localNodeName = cluster.LocalNode().Name + m.localNodeIP = cluster.LocalNode().Addr.String() + m.localNodeProxyListenAddr = m.localNodeIP + ":" + proxyPort + m.reindexMembers() +} + +func (m *QueueManager) calculateRoute(pathHash uint64) string { + if m.cluster == nil { + // Route to self, proxy in stand-alone mode + return "" + } + + m.RLock() + defer m.RUnlock() + + members := m.orderedClusterMembers + count := uint64(len(members)) + + chosenIndex := pathHash % count + addr := m.nameToAddressMap[members[chosenIndex]] + if addr == m.localNodeProxyListenAddr { + return "" + } + return addr +} + +func (m *QueueManager) routeRequest(addr string, req *http.Request) (*http.Response, error) { + nodeReq, err := http.NewRequestWithContext(req.Context(), req.Method, "http://" + addr + req.URL.Path + "?" + req.URL.RawQuery, req.Body) + nodeReq.Header = req.Header.Clone() + nodeReq.Header.Set("nirn-routed-to", addr) + if err != nil { + return nil, err + } + + logger.WithFields(logrus.Fields{ + "to": addr, + "path": req.URL.Path, + "method": req.Method, + }).Trace("Routing request to node in cluster") + resp, err := client.Do(nodeReq) + logger.WithFields(logrus.Fields{ + "to": addr, + "path": req.URL.Path, + "method": req.Method, + }).Trace("Received response from node") + if err == nil { + RequestsRoutedSent.Inc() + } else { + RequestsRoutedError.Inc() + } + + return resp, err +} + +func (m *QueueManager) Generate429(resp *http.ResponseWriter) { + writer := *resp + writer.Header().Set("generated-by-proxy", "true") + writer.Header().Set("x-ratelimit-scope", "user") + writer.Header().Set("x-ratelimit-limit", "1") + writer.Header().Set("x-ratelimit-remaining", "0") + writer.Header().Set("x-ratelimit-reset", string(time.Now().Add(1 * time.Second).Unix())) + writer.Header().Set("x-ratelimit-after", "1") + writer.Header().Set("retry-after", "1") + writer.Header().Set("content-type", "application/json") + writer.WriteHeader(429) + writer.Write([]byte("{\n\t\"global\": false,\n\t\"message\": \"You are being rate limited.\",\n\t\"retry_after\": 1\n}")) +} + +func (m *QueueManager) DiscordRequestHandler(resp http.ResponseWriter, req *http.Request) { + ConnectionsOpen.Inc() + defer ConnectionsOpen.Dec() + + token := req.Header.Get("Authorization") + + m.RLock() + q, ok := m.queues[token] + m.RUnlock() + + if !ok { + m.Lock() + // Check if it wasn't created while we didn't hold the lock + q, ok = m.queues[token] + if !ok { + var err error + q, err = NewRequestQueue(ProcessRequest, token, m.bufferSize) + + if err != nil { + resp.WriteHeader(500) + resp.Write([]byte(err.Error())) + logger.Error(err) + m.Unlock() + return + } + + m.queues[token] = q + } + m.Unlock() + } + + path := GetOptimisticBucketPath(req.URL.Path, req.Method) + pathHash := HashCRC64(path) + var botHash uint64 = 0 + if q.user != nil { + botHash = HashCRC64(q.user.Id) + } + + routeTo := m.calculateRoute(pathHash) + globalRouteTo := m.calculateRoute(botHash) + + routeToHeader := req.Header.Get("nirn-routed-to") + req.Header.Del("nirn-routed-to") + + if routeToHeader != "" { + RequestsRoutedRecv.Inc() + } + + var err error + if routeTo == "" || routeToHeader != "" { + if q.identifier != "NoAuth" && m.cluster != nil { + botLimit := q.botLimit + + if globalRouteTo == "" { + m.clusterGlobalRateLimiter.Take(botHash, botLimit) + } else { + err = m.clusterGlobalRateLimiter.FireGlobalRequest(req.Context(), globalRouteTo, botHash, botLimit) + if err != nil { + logger.WithFields(logrus.Fields{"function": "FireGlobalRequest"}).Error(err) + ErrorCounter.Inc() + m.Generate429(&resp) + return + } + } + } + _, _, err = q.Queue(req, &resp, path, pathHash) + if err != nil { + logger.WithFields(logrus.Fields{"function": "Queue"}).Error(err) + } + } else { + var res *http.Response + res, err = m.routeRequest(routeTo, req) + if err == nil { + err = CopyResponseToResponseWriter(res, &resp) + if err != nil { + logger.WithFields(logrus.Fields{"function": "CopyResponseToResponseWriter"}).Error(err) + } + } else { + logger.WithFields(logrus.Fields{"function": "routeRequest"}).Error(err) + m.Generate429(&resp) + } + } + + if err != nil { + ErrorCounter.Inc() + return + } +} + +func (m *QueueManager) HandleGlobal(w http.ResponseWriter, r *http.Request) { + botHashStr := r.Header.Get("bot-hash") + botLimitStr := r.Header.Get("bot-limit") + + botHash, err := strconv.ParseUint(botHashStr, 10, 64) + if err != nil { + w.WriteHeader(400) + return + } + + botLimit, err := strconv.ParseUint(botLimitStr, 10, 64) + if err != nil { + w.WriteHeader(400) + return + } + + m.clusterGlobalRateLimiter.Take(botHash, uint(botLimit)) + logger.Trace("Returned OK for global request") +} + +func (m *QueueManager) CreateMux() *http.ServeMux { + mux := http.NewServeMux() + mux.HandleFunc("/", m.DiscordRequestHandler) + mux.HandleFunc("/nirn/global", m.HandleGlobal) + return mux +} \ No newline at end of file diff --git a/main.go b/main.go index 8656378..2d4a7fc 100644 --- a/main.go +++ b/main.go @@ -1,82 +1,24 @@ package main import ( + "context" "github.com/germanoeich/nirn-proxy/lib" + "github.com/hashicorp/memberlist" _ "github.com/joho/godotenv/autoload" "github.com/sirupsen/logrus" + "net" "net/http" "os" + "os/signal" "strings" - "sync" + "syscall" "time" ) var logger = logrus.New() // token : queue map -var queues = make(map[string]*lib.RequestQueue) -// Store invalid tokens to prevent a storm when a token gets reset -var invalidTokens = make(map[string]bool) -var queueMu = sync.RWMutex{} var bufferSize = 50 -type GenericHandler struct{} -func (_ *GenericHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - lib.ConnectionsOpen.Inc() - defer lib.ConnectionsOpen.Dec() - - token := req.Header.Get("Authorization") - queueMu.RLock() - // No token will work and fall under "" on the map - _, isInvalid := invalidTokens[token] - if isInvalid { - resp.WriteHeader(401) - _, err := resp.Write([]byte("Known bad token - nirn-proxy")) - if err != nil { - logger.Error(err) - } - queueMu.RUnlock() - return - } - q, ok := queues[token] - queueMu.RUnlock() - if !ok { - queueMu.Lock() - // Check if it wasn't created while we didn't hold the lock - q, ok = queues[token] - if !ok { - limit, err := lib.GetBotGlobalLimit(token) - if err != nil { - if strings.HasPrefix(err.Error(), "invalid token") { - invalidTokens[token] = true - } - logger.Error(err) - resp.WriteHeader(500) - _, err := resp.Write([]byte("Unable to fetch gateway info - nirn-proxy")) - if err != nil { - logger.Error(err) - } - queueMu.Unlock() - return - } - - user, _ := lib.GetBotUser(token) - - q = lib.NewRequestQueue(lib.ProcessRequest, limit, bufferSize, user) - clientId := lib.GetBotId(token) - logger.WithFields(logrus.Fields{ "globalLimit": limit, "clientId": clientId, "bufferSize": bufferSize }).Info("Created new queue") - queues[token] = q - } - queueMu.Unlock() - } - - _, _, err := q.Queue(req, &resp) - if err != nil { - logger.Error(err) - lib.ErrorCounter.Inc() - return - } -} - func setupLogger() { logLevel := lib.EnvGet("LOG_LEVEL", "info") lvl, err := logrus.ParseLevel(logLevel) @@ -89,6 +31,39 @@ func setupLogger() { lib.SetLogger(logger) } +func initCluster(proxyPort string, manager *lib.QueueManager) *memberlist.Memberlist { + port := lib.EnvGetInt("CLUSTER_PORT", 7946) + + memberEnv := os.Getenv("CLUSTER_MEMBERS") + dns := os.Getenv("CLUSTER_DNS") + + if memberEnv == "" && dns == "" { + logger.Info("Running in stand-alone mode") + return nil + } + + logger.Info("Attempting to create/join cluster") + var members []string + if memberEnv != "" { + members = strings.Split(memberEnv, ",") + } else { + ips, err := net.LookupIP(dns) + if err != nil { + logger.Panic(err) + } + + if len(ips) == 0 { + logger.Panic("no ips returned by dns") + } + + for _, ip := range ips { + members = append(members, ip.String()) + } + } + + return lib.InitMemberList(members, port, proxyPort, manager) +} + func main() { outboundIp := os.Getenv("OUTBOUND_IP") @@ -100,11 +75,18 @@ func main() { bindIp := lib.EnvGet("BIND_IP", "0.0.0.0") setupLogger() - logger.Info("Starting proxy on " + bindIp + ":" + port) + + bufferSize = lib.EnvGetInt("BUFFER_SIZE", 50) + + manager := lib.NewQueueManager(bufferSize) + + initCluster(port, manager) + + mux := manager.CreateMux() s := &http.Server{ Addr: bindIp + ":" + port, - Handler: &GenericHandler{}, + Handler: mux, ReadTimeout: 10 * time.Second, WriteTimeout: 1 * time.Hour, MaxHeaderBytes: 1 << 20, @@ -119,10 +101,31 @@ func main() { go lib.StartMetrics(bindIp + ":" + port) } - bufferSize = lib.EnvGetInt("BUFFER_SIZE", 50) - err := s.ListenAndServe() - if err != nil { - panic(err) + done := make(chan os.Signal, 1) + signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + go func() { + if err := s.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.WithFields(logrus.Fields{"function": "http.ListenAndServe"}).Panic(err) + } + }() + + logger.Info("Starting proxy on " + bindIp + ":" + port) + <-done + logger.Info("Server received shutdown signal") + + logger.Info("Broadcasting leave message to cluster, if in cluster mode") + manager.Shutdown() + + logger.Info("Gracefully shutting down HTTP server") + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + if err := s.Shutdown(ctx); err != nil { + logger.WithFields(logrus.Fields{"function": "http.Shutdown"}).Error(err) } + + logger.Info("Bye bye") } \ No newline at end of file