Skip to content

Commit

Permalink
Merge pull request #399 from xmidt-org/denopink/patch/update-legacy-e…
Browse files Browse the repository at this point in the history
…ventType

patch: update legacy eventType
  • Loading branch information
denopink authored Mar 13, 2024
2 parents 5da7e1a + 69859e9 commit 3075bd9
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 31 deletions.
11 changes: 7 additions & 4 deletions eventDispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,13 @@ func (d *eventDispatcher) OnDeviceEvent(event *device.Event) {
destination := routable.To()
contentType := event.Format.ContentType()
if strings.HasPrefix(destination, EventPrefix) {
eventType = destination[len(EventPrefix):]
url, err = d.dispatchEvent(eventType, contentType, event.Contents)
if err != nil {
d.logger.Error("Error dispatching event", zap.Any("eventType", eventType), zap.Any("destination", destination), zap.Error(err))
var l wrp.Locator
if l, err = wrp.ParseLocator(destination); err == nil {
eventType = l.Authority
url, err = d.dispatchEvent(eventType, contentType, event.Contents)
if err != nil {
d.logger.Error("Error dispatching event", zap.Any("eventType", eventType), zap.Any("destination", destination), zap.Error(err))
}
}
} else if strings.HasPrefix(destination, DNSPrefix) {
eventType = event.Type.String()
Expand Down
72 changes: 64 additions & 8 deletions eventDispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package main
import (
"bytes"
"errors"
"fmt"
"io"
"testing"
"time"
Expand Down Expand Up @@ -234,9 +235,11 @@ func testEventDispatcherOnDeviceEventDispatchEvent(t *testing.T) {

func testEventDispatcherOnDeviceEventFullQueue(t *testing.T) {
var (
b bytes.Buffer
assert = assert.New(t)
require = require.New(t)
b bytes.Buffer
assert = assert.New(t)
require = require.New(t)
expectedEventType = "node-change"

outbounder = &Outbounder{
RequestTimeout: 100 * time.Millisecond,
EventEndpoints: map[string]interface{}{"default": []string{"nowhere.com"}},
Expand All @@ -257,24 +260,76 @@ func testEventDispatcherOnDeviceEventFullQueue(t *testing.T) {
require.NoError(err)

d.(*eventDispatcher).outbounds = make(chan outboundEnvelope)
dm.On("With", prometheus.Labels{eventLabel: "iot", codeLabel: messageDroppedCode, reasonLabel: fullQueueReason, urlLabel: "nowhere.com"}).Return().Once()
dm.On("With", prometheus.Labels{eventLabel: expectedEventType, codeLabel: messageDroppedCode, reasonLabel: fullQueueReason, urlLabel: "nowhere.com"}).Return().Once()
dm.On("Add", 1.).Return().Once()
d.OnDeviceEvent(&device.Event{
Type: device.MessageReceived,
Message: &wrp.Message{Destination: "event:iot"},
Message: &wrp.Message{Destination: fmt.Sprintf("event:%s/mac:11:22:33:44:55:66/Online/unknown/deb2eb69999", expectedEventType)},
Contents: []byte{1, 2},
})
assert.Greater(b.Len(), 0)
dm.AssertExpectations(t)
}

func testEventDispatcherOnDeviceEventMessageReceived(t *testing.T) {
var (
assert = assert.New(t)
require = require.New(t)
b bytes.Buffer
expectedEventType = "node-change"
m = wrp.Message{Destination: fmt.Sprintf("event:%s/mac:11:22:33:44:55:66/Online/unknown/deb2eb69999", expectedEventType)}
o = Outbounder{
Method: "PATCH",
EventEndpoints: map[string]interface{}{"default": []string{"nowhere.com"}},
Logger: zap.New(
zapcore.NewCore(zapcore.NewJSONEncoder(
zapcore.EncoderConfig{
MessageKey: "message",
}), zapcore.AddSync(&b), zapcore.ErrorLevel),
),
}
dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), &o, nil)
)

require.NotNil(dispatcher)
require.NotNil(outbounds)
require.NoError(err)

dispatcher.OnDeviceEvent(&device.Event{
Type: device.MessageReceived,
Message: &m,
})

require.Equal(1, len(outbounds))
e := <-outbounds
e.cancel()
<-e.request.Context().Done()

assert.Equal(o.method(), e.request.Method)
assert.Zero(b)
eventType, ok := e.request.Context().Value(eventTypeContextKey{}).(string)
require.True(ok)
assert.Equal(expectedEventType, eventType)
}

func testEventDispatcherOnDeviceEventFilterError(t *testing.T) {
var (
assert = assert.New(t)
require = require.New(t)
urlFilter = new(mockURLFilter)
expectedError = errors.New("expected")

dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), nil, urlFilter)
b bytes.Buffer
o = Outbounder{
Method: "PATCH",
EventEndpoints: map[string]interface{}{"default": []string{"nowhere.com"}},
Logger: zap.New(
zapcore.NewCore(zapcore.NewJSONEncoder(
zapcore.EncoderConfig{
MessageKey: "message",
}), zapcore.AddSync(&b), zapcore.ErrorLevel),
),
}
dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), &o, urlFilter)
)

require.NotNil(dispatcher)
Expand All @@ -289,8 +344,8 @@ func testEventDispatcherOnDeviceEventFilterError(t *testing.T) {
Message: &wrp.Message{Destination: "dns:doesnotmatter.com"},
})

// TODO verify logger's buffer isn't empty
assert.Equal(0, len(outbounds))
assert.NotZero(b)
urlFilter.AssertExpectations(t)
}

Expand Down Expand Up @@ -438,6 +493,7 @@ func TestEventDispatcherOnDeviceEvent(t *testing.T) {
test func(*testing.T)
}{
{"ConnectEvent", testEventDispatcherOnDeviceEventConnectEvent},
{"CorrectEventType", testEventDispatcherOnDeviceEventMessageReceived},
{"DisconnectEvent", testEventDispatcherOnDeviceEventDisconnectEvent},
{"Unroutable", testEventDispatcherOnDeviceEventUnroutable},
{"BadURLFilter", testEventDispatcherOnDeviceEventBadURLFilter},
Expand Down
18 changes: 7 additions & 11 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,9 @@ const (
connectionUnexpectedlyClosedEOFReason = "connection_unexpectedly_closed_eof"
noErrReason = "no_err"
expectedCodeReason = "expected_code"
non202CodeReason = "non202"

// dropped message codes
non202Code = "non202"
expected202Code = "202"
messageDroppedCode = "message_dropped"

// outbound event delivery outcomes
Expand Down Expand Up @@ -124,8 +123,8 @@ func Metrics() []xmetrics.Metric {
Name: OutboundRequestSizeBytes,
Type: xmetrics.HistogramType,
Help: "A histogram of request sizes for outbound requests",
LabelNames: []string{eventLabel, codeLabel, reasonLabel, urlLabel},
Buckets: []float64{200, 500, 900, 1500, 3000, 6000, 12000, 24000, 48000, 96000, 192000, 384000, 768000, 1536000},
LabelNames: []string{eventLabel, codeLabel},
Buckets: []float64{200, 500, 900, 1500, 3000, 6000, 12000, 24000, 48000, 96000, 192000},
},
{
Name: TotalOutboundEvents,
Expand Down Expand Up @@ -264,12 +263,9 @@ func InstrumentOutboundSize(obs HistogramVec, next http.RoundTripper) promhttp.R
code = strconv.Itoa(response.StatusCode)
}

labels = prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: getDoErrReason(err), urlLabel: request.URL.String()}
labels = prometheus.Labels{eventLabel: eventType, codeLabel: code}
} else {
labels = prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: expectedCodeReason, urlLabel: request.URL.String()}
if response.StatusCode != http.StatusAccepted {
labels[reasonLabel] = non202Code
}
labels = prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode)}
}

obs.With(labels).Observe(float64(size))
Expand Down Expand Up @@ -299,7 +295,7 @@ func InstrumentOutboundDuration(obs HistogramVec, next http.RoundTripper) promht
} else {
labels = prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: expectedCodeReason, urlLabel: request.URL.String()}
if response.StatusCode != http.StatusAccepted {
labels[reasonLabel] = non202Code
labels[reasonLabel] = non202CodeReason
}
}

Expand Down Expand Up @@ -329,7 +325,7 @@ func InstrumentOutboundCounter(counter CounterVec, next http.RoundTripper) promh
} else {
labels = prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: noErrReason, urlLabel: request.URL.String()}
if response.StatusCode != http.StatusAccepted {
labels[reasonLabel] = non202Code
labels[reasonLabel] = non202CodeReason
}
}

Expand Down
6 changes: 3 additions & 3 deletions workerPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ func (wp *WorkerPool) transact(e outboundEnvelope) {
url := e.request.URL.String()
switch response.StatusCode {
case http.StatusAccepted:
wp.logger.Debug("HTTP response", zap.String("status", response.Status), zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, expected202Code), zap.String(urlLabel, url))
wp.logger.Debug("HTTP response", zap.String("status", response.Status), zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, expectedCodeReason), zap.String(urlLabel, url))
default:
wp.droppedMessages.With(prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: non202Code, urlLabel: url}).Add(1)
wp.logger.Warn("HTTP response", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, non202Code), zap.String(urlLabel, url))
wp.droppedMessages.With(prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: non202CodeReason, urlLabel: url}).Add(1)
wp.logger.Warn("HTTP response", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, non202CodeReason), zap.String(urlLabel, url))
}

io.Copy(io.Discard, response.Body)
Expand Down
10 changes: 5 additions & 5 deletions workerPool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func testWorkerPoolTransactHTTPSuccess(t *testing.T) {
}
)

dm.On("With", prometheus.Labels{eventLabel: EventPrefix, codeLabel: strconv.Itoa(http.StatusAccepted), reasonLabel: non202Code, urlLabel: target}).Panic("Func dm.With should have not been called")
dm.On("With", prometheus.Labels{eventLabel: EventPrefix, codeLabel: strconv.Itoa(http.StatusAccepted), reasonLabel: non202CodeReason, urlLabel: target}).Panic("Func dm.With should have not been called")
dm.On("Add", 1.).Panic("Func dm.Add should have not been called")
require.NotPanics(func() { wp.transact(envelope) })
assert.Equal(b.Len(), 0)
Expand Down Expand Up @@ -80,7 +80,7 @@ func testWorkerPoolTransactHTTPError(t *testing.T) {
},
},
expectedCode: http.StatusInternalServerError,
expectedReason: non202Code,
expectedReason: non202CodeReason,
},
{
description: "failure 415, caduceus /notify response case",
Expand All @@ -96,7 +96,7 @@ func testWorkerPoolTransactHTTPError(t *testing.T) {
},
},
expectedCode: http.StatusUnsupportedMediaType,
expectedReason: non202Code,
expectedReason: non202CodeReason,
},
{
description: "failure 503, caduceus /notify response case",
Expand All @@ -112,7 +112,7 @@ func testWorkerPoolTransactHTTPError(t *testing.T) {
},
},
expectedCode: http.StatusServiceUnavailable,
expectedReason: non202Code,
expectedReason: non202CodeReason,
},
{
description: "failure 400, caduceus /notify response case",
Expand All @@ -128,7 +128,7 @@ func testWorkerPoolTransactHTTPError(t *testing.T) {
},
},
expectedCode: http.StatusBadRequest,
expectedReason: non202Code,
expectedReason: non202CodeReason,
},
{
description: "failure 408 timeout, caduceus /notify response case",
Expand Down

0 comments on commit 3075bd9

Please sign in to comment.