From f0a3420b0e1949e1d72b48422f83470b8e790e67 Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Mon, 11 Mar 2024 20:49:54 -0400 Subject: [PATCH 1/2] patch: update legacy eventType patch: update legacy eventType chore: update wrp-go remove unnecessary metric labels --- eventDispatcher.go | 11 ++++--- eventDispatcher_test.go | 72 ++++++++++++++++++++++++++++++++++++----- go.mod | 2 +- go.sum | 2 ++ metrics.go | 16 ++++----- workerPool.go | 6 ++-- workerPool_test.go | 10 +++--- 7 files changed, 88 insertions(+), 31 deletions(-) diff --git a/eventDispatcher.go b/eventDispatcher.go index 3ccd951f..c5656c0c 100644 --- a/eventDispatcher.go +++ b/eventDispatcher.go @@ -132,10 +132,13 @@ func (d *eventDispatcher) OnDeviceEvent(event *device.Event) { destination := routable.To() contentType := event.Format.ContentType() if strings.HasPrefix(destination, EventPrefix) { - eventType = destination[len(EventPrefix):] - url, err = d.dispatchEvent(eventType, contentType, event.Contents) - if err != nil { - d.logger.Error("Error dispatching event", zap.Any("eventType", eventType), zap.Any("destination", destination), zap.Error(err)) + var l wrp.Locator + if l, err = wrp.ParseLocator(destination); err == nil { + eventType = l.Authority + url, err = d.dispatchEvent(eventType, contentType, event.Contents) + if err != nil { + d.logger.Error("Error dispatching event", zap.Any("eventType", eventType), zap.Any("destination", destination), zap.Error(err)) + } } } else if strings.HasPrefix(destination, DNSPrefix) { eventType = event.Type.String() diff --git a/eventDispatcher_test.go b/eventDispatcher_test.go index 60885714..ba1aa211 100644 --- a/eventDispatcher_test.go +++ b/eventDispatcher_test.go @@ -5,6 +5,7 @@ package main import ( "bytes" "errors" + "fmt" "io" "testing" "time" @@ -234,9 +235,11 @@ func testEventDispatcherOnDeviceEventDispatchEvent(t *testing.T) { func testEventDispatcherOnDeviceEventFullQueue(t *testing.T) { var ( - b bytes.Buffer - assert = assert.New(t) - require = require.New(t) + b bytes.Buffer + assert = assert.New(t) + require = require.New(t) + expectedEventType = "node-change" + outbounder = &Outbounder{ RequestTimeout: 100 * time.Millisecond, EventEndpoints: map[string]interface{}{"default": []string{"nowhere.com"}}, @@ -257,24 +260,76 @@ func testEventDispatcherOnDeviceEventFullQueue(t *testing.T) { require.NoError(err) d.(*eventDispatcher).outbounds = make(chan outboundEnvelope) - dm.On("With", prometheus.Labels{eventLabel: "iot", codeLabel: messageDroppedCode, reasonLabel: fullQueueReason, urlLabel: "nowhere.com"}).Return().Once() + dm.On("With", prometheus.Labels{eventLabel: expectedEventType, codeLabel: messageDroppedCode, reasonLabel: fullQueueReason, urlLabel: "nowhere.com"}).Return().Once() dm.On("Add", 1.).Return().Once() d.OnDeviceEvent(&device.Event{ Type: device.MessageReceived, - Message: &wrp.Message{Destination: "event:iot"}, + Message: &wrp.Message{Destination: fmt.Sprintf("event:%s/mac:11:22:33:44:55:66/Online/unknown/deb2eb69999", expectedEventType)}, Contents: []byte{1, 2}, }) assert.Greater(b.Len(), 0) dm.AssertExpectations(t) } + +func testEventDispatcherOnDeviceEventMessageReceived(t *testing.T) { + var ( + assert = assert.New(t) + require = require.New(t) + b bytes.Buffer + expectedEventType = "node-change" + m = wrp.Message{Destination: fmt.Sprintf("event:%s/mac:11:22:33:44:55:66/Online/unknown/deb2eb69999", expectedEventType)} + o = Outbounder{ + Method: "PATCH", + EventEndpoints: map[string]interface{}{"default": []string{"nowhere.com"}}, + Logger: zap.New( + zapcore.NewCore(zapcore.NewJSONEncoder( + zapcore.EncoderConfig{ + MessageKey: "message", + }), zapcore.AddSync(&b), zapcore.ErrorLevel), + ), + } + dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), &o, nil) + ) + + require.NotNil(dispatcher) + require.NotNil(outbounds) + require.NoError(err) + + dispatcher.OnDeviceEvent(&device.Event{ + Type: device.MessageReceived, + Message: &m, + }) + + require.Equal(1, len(outbounds)) + e := <-outbounds + e.cancel() + <-e.request.Context().Done() + + assert.Equal(o.method(), e.request.Method) + assert.Zero(b) + eventType, ok := e.request.Context().Value(eventTypeContextKey{}).(string) + require.True(ok) + assert.Equal(expectedEventType, eventType) +} + func testEventDispatcherOnDeviceEventFilterError(t *testing.T) { var ( assert = assert.New(t) require = require.New(t) urlFilter = new(mockURLFilter) expectedError = errors.New("expected") - - dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), nil, urlFilter) + b bytes.Buffer + o = Outbounder{ + Method: "PATCH", + EventEndpoints: map[string]interface{}{"default": []string{"nowhere.com"}}, + Logger: zap.New( + zapcore.NewCore(zapcore.NewJSONEncoder( + zapcore.EncoderConfig{ + MessageKey: "message", + }), zapcore.AddSync(&b), zapcore.ErrorLevel), + ), + } + dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), &o, urlFilter) ) require.NotNil(dispatcher) @@ -289,8 +344,8 @@ func testEventDispatcherOnDeviceEventFilterError(t *testing.T) { Message: &wrp.Message{Destination: "dns:doesnotmatter.com"}, }) - // TODO verify logger's buffer isn't empty assert.Equal(0, len(outbounds)) + assert.NotZero(b) urlFilter.AssertExpectations(t) } @@ -438,6 +493,7 @@ func TestEventDispatcherOnDeviceEvent(t *testing.T) { test func(*testing.T) }{ {"ConnectEvent", testEventDispatcherOnDeviceEventConnectEvent}, + {"CorrectEventType", testEventDispatcherOnDeviceEventMessageReceived}, {"DisconnectEvent", testEventDispatcherOnDeviceEventDisconnectEvent}, {"Unroutable", testEventDispatcherOnDeviceEventUnroutable}, {"BadURLFilter", testEventDispatcherOnDeviceEventBadURLFilter}, diff --git a/go.mod b/go.mod index b3f1a5f8..d5946065 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/xmidt-org/sallust v0.2.2 github.com/xmidt-org/touchstone v0.1.5 github.com/xmidt-org/webpa-common/v2 v2.3.2 - github.com/xmidt-org/wrp-go/v3 v3.4.0 + github.com/xmidt-org/wrp-go/v3 v3.4.3 go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.40.0 go.uber.org/zap v1.27.0 ) diff --git a/go.sum b/go.sum index e0bc4412..1c5e3ea5 100644 --- a/go.sum +++ b/go.sum @@ -568,6 +568,8 @@ github.com/xmidt-org/webpa-common/v2 v2.3.2 h1:66RUmlkltI3iet55WLsSVUW6D5Z7+JpxW github.com/xmidt-org/webpa-common/v2 v2.3.2/go.mod h1:WnMf2dLIZOQ5Gvje9Ges/ovHl2pqERFpfP+ST49v6bw= github.com/xmidt-org/wrp-go/v3 v3.4.0 h1:CZ11X4LdPPSpk76bddl8PdNyW0TiCaIXtbUmKGhp9HQ= github.com/xmidt-org/wrp-go/v3 v3.4.0/go.mod h1:jVMp/NDHgLnteXjVKryCVpqAaEs8HQun8bb19et8XUU= +github.com/xmidt-org/wrp-go/v3 v3.4.3 h1:0Rk7UtTP2nNPVHhVFzKC/HF4xV9f36x7q4301p1xC6k= +github.com/xmidt-org/wrp-go/v3 v3.4.3/go.mod h1:j1kLLoPJmKkMFz/vlwP238WBoFhJgbPyJDN9W2V1TxY= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/metrics.go b/metrics.go index 87f0e4c5..ab4ee440 100644 --- a/metrics.go +++ b/metrics.go @@ -89,10 +89,9 @@ const ( connectionUnexpectedlyClosedEOFReason = "connection_unexpectedly_closed_eof" noErrReason = "no_err" expectedCodeReason = "expected_code" + non202CodeReason = "non202" // dropped message codes - non202Code = "non202" - expected202Code = "202" messageDroppedCode = "message_dropped" // outbound event delivery outcomes @@ -124,7 +123,7 @@ func Metrics() []xmetrics.Metric { Name: OutboundRequestSizeBytes, Type: xmetrics.HistogramType, Help: "A histogram of request sizes for outbound requests", - LabelNames: []string{eventLabel, codeLabel, reasonLabel, urlLabel}, + LabelNames: []string{eventLabel, codeLabel}, Buckets: []float64{200, 500, 900, 1500, 3000, 6000, 12000, 24000, 48000, 96000, 192000, 384000, 768000, 1536000}, }, { @@ -264,12 +263,9 @@ func InstrumentOutboundSize(obs HistogramVec, next http.RoundTripper) promhttp.R code = strconv.Itoa(response.StatusCode) } - labels = prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: getDoErrReason(err), urlLabel: request.URL.String()} + labels = prometheus.Labels{eventLabel: eventType, codeLabel: code} } else { - labels = prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: expectedCodeReason, urlLabel: request.URL.String()} - if response.StatusCode != http.StatusAccepted { - labels[reasonLabel] = non202Code - } + labels = prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode)} } obs.With(labels).Observe(float64(size)) @@ -299,7 +295,7 @@ func InstrumentOutboundDuration(obs HistogramVec, next http.RoundTripper) promht } else { labels = prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: expectedCodeReason, urlLabel: request.URL.String()} if response.StatusCode != http.StatusAccepted { - labels[reasonLabel] = non202Code + labels[reasonLabel] = non202CodeReason } } @@ -329,7 +325,7 @@ func InstrumentOutboundCounter(counter CounterVec, next http.RoundTripper) promh } else { labels = prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: noErrReason, urlLabel: request.URL.String()} if response.StatusCode != http.StatusAccepted { - labels[reasonLabel] = non202Code + labels[reasonLabel] = non202CodeReason } } diff --git a/workerPool.go b/workerPool.go index 50704b8d..73e3e709 100644 --- a/workerPool.go +++ b/workerPool.go @@ -95,10 +95,10 @@ func (wp *WorkerPool) transact(e outboundEnvelope) { url := e.request.URL.String() switch response.StatusCode { case http.StatusAccepted: - wp.logger.Debug("HTTP response", zap.String("status", response.Status), zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, expected202Code), zap.String(urlLabel, url)) + wp.logger.Debug("HTTP response", zap.String("status", response.Status), zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, expectedCodeReason), zap.String(urlLabel, url)) default: - wp.droppedMessages.With(prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: non202Code, urlLabel: url}).Add(1) - wp.logger.Warn("HTTP response", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, non202Code), zap.String(urlLabel, url)) + wp.droppedMessages.With(prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: non202CodeReason, urlLabel: url}).Add(1) + wp.logger.Warn("HTTP response", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, non202CodeReason), zap.String(urlLabel, url)) } io.Copy(io.Discard, response.Body) diff --git a/workerPool_test.go b/workerPool_test.go index a11c7db0..8fd82349 100644 --- a/workerPool_test.go +++ b/workerPool_test.go @@ -48,7 +48,7 @@ func testWorkerPoolTransactHTTPSuccess(t *testing.T) { } ) - dm.On("With", prometheus.Labels{eventLabel: EventPrefix, codeLabel: strconv.Itoa(http.StatusAccepted), reasonLabel: non202Code, urlLabel: target}).Panic("Func dm.With should have not been called") + dm.On("With", prometheus.Labels{eventLabel: EventPrefix, codeLabel: strconv.Itoa(http.StatusAccepted), reasonLabel: non202CodeReason, urlLabel: target}).Panic("Func dm.With should have not been called") dm.On("Add", 1.).Panic("Func dm.Add should have not been called") require.NotPanics(func() { wp.transact(envelope) }) assert.Equal(b.Len(), 0) @@ -80,7 +80,7 @@ func testWorkerPoolTransactHTTPError(t *testing.T) { }, }, expectedCode: http.StatusInternalServerError, - expectedReason: non202Code, + expectedReason: non202CodeReason, }, { description: "failure 415, caduceus /notify response case", @@ -96,7 +96,7 @@ func testWorkerPoolTransactHTTPError(t *testing.T) { }, }, expectedCode: http.StatusUnsupportedMediaType, - expectedReason: non202Code, + expectedReason: non202CodeReason, }, { description: "failure 503, caduceus /notify response case", @@ -112,7 +112,7 @@ func testWorkerPoolTransactHTTPError(t *testing.T) { }, }, expectedCode: http.StatusServiceUnavailable, - expectedReason: non202Code, + expectedReason: non202CodeReason, }, { description: "failure 400, caduceus /notify response case", @@ -128,7 +128,7 @@ func testWorkerPoolTransactHTTPError(t *testing.T) { }, }, expectedCode: http.StatusBadRequest, - expectedReason: non202Code, + expectedReason: non202CodeReason, }, { description: "failure 408 timeout, caduceus /notify response case", From d04a4f8d6c50e754c0a1112e27daae40745b868a Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Wed, 13 Mar 2024 11:41:32 -0400 Subject: [PATCH 2/2] chore: decrease OutboundRequestSizeBytes buckets --- metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics.go b/metrics.go index ab4ee440..cc231702 100644 --- a/metrics.go +++ b/metrics.go @@ -124,7 +124,7 @@ func Metrics() []xmetrics.Metric { Type: xmetrics.HistogramType, Help: "A histogram of request sizes for outbound requests", LabelNames: []string{eventLabel, codeLabel}, - Buckets: []float64{200, 500, 900, 1500, 3000, 6000, 12000, 24000, 48000, 96000, 192000, 384000, 768000, 1536000}, + Buckets: []float64{200, 500, 900, 1500, 3000, 6000, 12000, 24000, 48000, 96000, 192000}, }, { Name: TotalOutboundEvents,