diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 1f4a499..ce25e9b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -46,6 +46,6 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-go@v2 with: - go-version: 1.14 + go-version: 1.22 - run: go get - run: ./test.sh diff --git a/integration/go.mod b/integration/go.mod index 6e964c2..b5012a4 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -1,6 +1,6 @@ module github.com/knyar/nginx-lua-prometheus/integration -go 1.14 +go 1.22 require ( github.com/golang/protobuf v1.3.2 @@ -9,3 +9,9 @@ require ( github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 ) + +require ( + github.com/kr/text v0.1.0 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect +) diff --git a/integration/nginx.conf b/integration/nginx.conf index e66f82d..3e1ea4d 100644 --- a/integration/nginx.conf +++ b/integration/nginx.conf @@ -15,7 +15,9 @@ http { error_log stderr; init_worker_by_lua_block { - prometheus = require("prometheus").init("prometheus_metrics", {sync_interval=0.4}) + prometheus = require("prometheus").init("prometheus_metrics", {sync_interval=0.1}) + + -- basic_test metric_requests = prometheus:counter("requests_total", "Number of HTTP requests", {"host", "path", "status"}) metric_latency = prometheus:histogram("request_duration_seconds", @@ -23,12 +25,20 @@ http { {0.08, 0.089991, 0.1, 0.2, 0.75, 1, 1.5, 3.123232001, 5, 15, 120, 350.5, 1500, 75000, 1500000}) metric_connections = prometheus:gauge("connections", "Number of HTTP connections", {"state"}) + + -- reset_test + metric_gauge = prometheus:gauge("reset_test_gauge", "Sample gauge for reset test", {"label"}) } server { listen 18000; server_name metrics; + location /health { + content_by_lua_block { + ngx.say("ok") + } + } location /metrics { content_by_lua_block { metric_connections:set(ngx.var.connections_reading, {"reading"}) @@ -44,9 +54,10 @@ http { server_name basic_test; log_by_lua_block { - metric_requests:inc(1, {ngx.var.server_name, ngx.var.request_uri, ngx.var.status}) - metric_latency:observe(tonumber(ngx.var.request_time), - {ngx.var.request_uri}) + if ngx.var.request_uri ~= "/health" then + metric_requests:inc(1, {ngx.var.server_name, ngx.var.request_uri, ngx.var.status}) + metric_latency:observe(tonumber(ngx.var.request_time), {ngx.var.request_uri}) + end } location /fast { @@ -64,5 +75,33 @@ http { location /error { return 500; } + location /health { + content_by_lua_block { + ngx.say("ok") + } + } + } + + server { + listen 18002; + server_name reset_test; + + location /reset_gauge { + content_by_lua_block { + metric_gauge:reset() + ngx.say("ok") + } + } + location /set_gauge { + content_by_lua_block { + metric_gauge:set(tonumber(ngx.var.arg_metricvalue), {ngx.var.arg_labelvalue}) + ngx.say("ok") + } + } + location /health { + content_by_lua_block { + ngx.say("ok") + } + } } } diff --git a/integration/test.go b/integration/test.go index 7660655..890d5fc 100644 --- a/integration/test.go +++ b/integration/test.go @@ -5,6 +5,7 @@ import ( "context" "flag" "fmt" + "io" "log" "net/http" "sync" @@ -18,12 +19,15 @@ var ( testDuration = flag.Duration("duration", 10*time.Second, "duration of the test") ) +const healthURL = "http://localhost:18000/health" const metricsURL = "http://localhost:18000/metrics" type testRunner struct { - client *http.Client - tests []testFunc - checks []checkFunc + ctx context.Context + client *http.Client + tests []testFunc + checks []checkFunc + healthURLs []string } type testFunc func() error @@ -36,26 +40,39 @@ type testData struct { func main() { flag.Parse() - // Use a custom http client with a lower idle connection timeout and a request timeout. - client := &http.Client{ - Timeout: 500 * time.Millisecond, - Transport: &http.Transport{IdleConnTimeout: 400 * time.Millisecond}, - } - - // Wait for nginx to start. - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + // Tests are expected to manage their duration themselves based on testDuration. + // Context timeout is longer than test duration to allow tests to complete + // without timing out. + ctx, cancel := context.WithTimeout(context.Background(), time.Duration((*testDuration).Nanoseconds())*2) defer cancel() - if err := waitFor(ctx, client, metricsURL); err != nil { - log.Fatal(err) + tr := &testRunner{ + // Use a custom http client with a lower idle connection timeout and a request timeout. + client: &http.Client{ + Timeout: 500 * time.Millisecond, + Transport: &http.Transport{ + IdleConnTimeout: 400 * time.Millisecond, + MaxIdleConns: 100, + MaxConnsPerHost: 100, + }, + }, + ctx: ctx, + healthURLs: []string{healthURL}, } // Register tests. - t := &testRunner{client: client} - registerBasic(t) + registerBasicTest(tr) + registerResetTest(tr) + + // Wait for all nginx servers to come up. + for _, url := range tr.healthURLs { + if err := tr.waitFor(url, 5*time.Second); err != nil { + log.Fatal(err) + } + } // Run tests. var wg sync.WaitGroup - for _, tt := range t.tests { + for _, tt := range tr.tests { wg.Add(1) go func(tt testFunc) { if err := tt(); err != nil { @@ -71,22 +88,12 @@ func main() { time.Sleep(500 * time.Millisecond) // Collect metrics. - resp, err := client.Get(metricsURL) - if err != nil { - log.Fatalf("Could not collect metrics: %v", err) - } - defer resp.Body.Close() - - // Parse metrics. - var parser expfmt.TextParser - res := &testData{} - res.metrics, err = parser.TextToMetricFamilies(resp.Body) - if err != nil { - log.Fatalf("Could not parse metrics: %v", err) + res := &testData{ + metrics: tr.mustGetMetrics(context.Background()), } // Run test checks. - for _, ch := range t.checks { + for _, ch := range tr.checks { if err := ch(res); err != nil { log.Fatal(err) } @@ -95,19 +102,67 @@ func main() { log.Print("All ok") } -func waitFor(ctx context.Context, c *http.Client, url string) error { - log.Printf("Waiting for %s...", url) +func (tr *testRunner) mustGetMetrics(ctx context.Context) map[string]*dto.MetricFamily { + var res map[string]*dto.MetricFamily + tr.mustGetContext(ctx, metricsURL, func(r *http.Response) error { + if r.StatusCode != 200 { + return fmt.Errorf("expected response 200 got %v", r) + } + var parser expfmt.TextParser + var err error + res, err = parser.TextToMetricFamilies(r.Body) + return err + }) + return res +} + +func (tr *testRunner) getContext(ctx context.Context, url string, cb func(*http.Response) error) error { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return fmt.Errorf("creating request for %s: %v", url, err) } + resp, err := tr.client.Do(req) + if err != nil { + return fmt.Errorf("could not fetch URL %s: %v", url, err) + } + defer resp.Body.Close() + if cb != nil { + return cb(resp) + } + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("could not read HTTP response for %s: %v", url, err) + } + if resp.StatusCode != 200 || string(body) != "ok\n" { + return fmt.Errorf("unexpected response %q from %s; expected 'ok'", string(body), url) + } + return nil +} + +func (tr *testRunner) mustGetContext(ctx context.Context, url string, cb func(*http.Response) error) { + if err := tr.getContext(ctx, url, cb); err != nil { + log.Fatal(err) + } +} + +func (tr *testRunner) get(url string) error { + return tr.getContext(tr.ctx, url, nil) +} + +func (tr *testRunner) mustGet(url string) { + if err := tr.get(url); err != nil { + log.Fatal(err) + } +} + +func (tr *testRunner) waitFor(url string, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(tr.ctx, timeout) + defer cancel() + log.Printf("Waiting for %s for %v...", url, timeout) for { - resp, err := c.Do(req) + err := tr.getContext(ctx, url, nil) if err == nil { - resp.Body.Close() - if resp.StatusCode == 200 { - return nil - } + return nil } if err := ctx.Err(); err != nil { return err diff --git a/integration/test.sh b/integration/test.sh index 816f47c..be58ad4 100755 --- a/integration/test.sh +++ b/integration/test.sh @@ -23,7 +23,7 @@ function cleanup { cleanup trap cleanup EXIT -docker run -d --name ${container_name} -p 18000-18001:18000-18001 \ +docker run -d --name ${container_name} -p 18000-18010:18000-18010 \ -v "${base_dir}/../:/nginx-lua-prometheus" ${image_name} \ nginx -c /nginx-lua-prometheus/integration/nginx.conf diff --git a/integration/test_basic.go b/integration/test_basic.go index 664c0e0..f6e93c6 100644 --- a/integration/test_basic.go +++ b/integration/test_basic.go @@ -8,6 +8,7 @@ import ( "log" "math" "math/rand" + "net/http" "sync" "time" @@ -40,10 +41,14 @@ var urls = map[requestType]string{ reqError: "http://localhost:18001/error", } -// Verify bucket boundaries. This should match the buckets defined in nginx.conf. +// Expected bucket boundaries. This should match the buckets defined in nginx.conf. var buckets = []float64{0.08, 0.089991, 0.1, 0.2, 0.75, 1, 1.5, 3.123232001, 5, 15, 120, 350.5, 1500, 75000, 1500000, math.Inf(1)} -func registerBasic(tr *testRunner) { +// Register a basic test that will send requests to 'fast', 'slow' and 'error' +// endpoints and verify that request counters and latency measurements are +// accurate. +func registerBasicTest(tr *testRunner) { + tr.healthURLs = append(tr.healthURLs, "http://localhost:18001/health") results := make(chan map[requestType]int64, *concurrency) tr.tests = append(tr.tests, func() error { log.Printf("Running basic test with %d concurrent clients for %v", *concurrency, *testDuration) @@ -62,17 +67,16 @@ func registerBasic(tr *testRunner) { // 5% are errors t = reqError } - resp, err := tr.client.Get(urls[t]) - if err != nil { - log.Fatalf("Could not fetch URL %s: %v", urls[t], err) - } - body, err := io.ReadAll(resp.Body) - if err != nil { - log.Fatalf("Could not read HTTP response for %s: %v", urls[t], err) - } - resp.Body.Close() - if t != reqError && string(body) != "ok\n" { - log.Fatalf("Unexpected response %q from %s; expected 'ok'", string(body), urls[t]) + if t == reqError { + tr.mustGetContext(tr.ctx, urls[t], func(r *http.Response) error { + io.Copy(io.Discard, r.Body) + if r.StatusCode != 500 { + return fmt.Errorf("expected response 500, got %+v", r) + } + return nil + }) + } else { + tr.mustGet(urls[t]) } result[t]++ } @@ -192,8 +196,10 @@ func getHistogramSum(mfs map[string]*dto.MetricFamily, metric string, labels [][ func hasMetricFamily(mfs map[string]*dto.MetricFamily, want *dto.MetricFamily) error { sortFn := func(x, y interface{}) bool { return pretty.Sprint(x) < pretty.Sprint(y) } for _, mf := range mfs { - if *mf.Name == *want.Name { + if mf.GetName() == want.GetName() { if diff := cmp.Diff(want, mf, cmpopts.SortSlices(sortFn)); diff != "" { + log.Printf("Want: %+v", want) + log.Printf("Got: %+v", mf) return fmt.Errorf("unexpected metric family %v (-want +got):\n%s", mf.Name, diff) } return nil diff --git a/integration/test_reset.go b/integration/test_reset.go new file mode 100644 index 0000000..e818daf --- /dev/null +++ b/integration/test_reset.go @@ -0,0 +1,89 @@ +// This is a simple integration test for nginx-lua-prometheus. +package main + +import ( + "fmt" + "log" + "math/rand" + "sync" + "time" + + dto "github.com/prometheus/client_model/go" +) + +func registerResetTest(tr *testRunner) { + tr.healthURLs = append(tr.healthURLs, "http://localhost:18002/health") + + const setURL = "http://localhost:18002/set_gauge" + const resetURL = "http://localhost:18002/reset_gauge" + const metricName = "reset_test_gauge" + tr.tests = append(tr.tests, func() error { + log.Printf("Running reset test with %d concurrent clients for %v", *concurrency, *testDuration) + var wg sync.WaitGroup + var mu sync.RWMutex + for i := 1; i <= *concurrency; i++ { + wg.Add(1) + go func(i int) { + labelValue := fmt.Sprintf("client%d", i) + setUrl := func(value int) string { + return fmt.Sprintf("%s?labelvalue=%s&metricvalue=%d", setURL, labelValue, value) + } + // Check that returned metrics contain a value for this worker. + // If wantValue is 0, it means the metric should not exist at all. + checkValue := func(mfs map[string]*dto.MetricFamily, wantValue int) { + for _, mf := range mfs { + if mf.GetName() != metricName { + continue + } + if wantValue == 0 { + log.Fatalf("client %d: metric %s exists while it should not; %+v", i, metricName, mf) + } + for _, m := range mf.Metric { + if len(m.Label) != 1 { + log.Fatalf("client %d: expected metric %s to have 1 label, got %+v", i, metricName, m) + } + if m.Label[0].GetValue() != labelValue { + continue + } + if m.GetGauge().GetValue() != float64(wantValue) { + log.Fatalf("client %d: expected metric %s to have value of %d, got %+v", i, metricName, wantValue, m) + } + return + } + log.Fatalf("client %d: metric %s does not have label %s while it should; %+v", i, metricName, labelValue, mf) + } + if wantValue != 0 { + log.Fatalf("client %d: metric %s not found in %+v", i, metricName, mfs) + } + } + for start := time.Now(); time.Since(start) < *testDuration; { + // Call the URL that sets a label value and confirm that it + // exists in the returned metrics. + value := 1 + rand.Intn(9000) + mu.RLock() + tr.mustGet(setUrl(value)) + metrics := tr.mustGetMetrics(tr.ctx) + checkValue(metrics, value) + mu.RUnlock() + + // Occasionally, reset the metric and confirm that it does + // not get returned. A mutex ensures that no other clients + // attempt to change or reset the gauge at the same time. + if rand.Intn(100) < 5 { + mu.Lock() + tr.mustGet(resetURL) + metrics := tr.mustGetMetrics(tr.ctx) + checkValue(metrics, 0) + // Wait for slightly longer than sync_interval to ensure that + // metric reset gets propagated to all workers. + time.Sleep(105 * time.Millisecond) + mu.Unlock() + } + } + wg.Done() + }(i) + } + wg.Wait() + return nil + }) +}