diff --git a/src/talaria/outbounder.go b/src/talaria/outbounder.go index 97aac4bc..b1abbb76 100644 --- a/src/talaria/outbounder.go +++ b/src/talaria/outbounder.go @@ -53,7 +53,7 @@ type listener struct { requestFactory requestFactory } -func (l *listener) OnDeviceEvent(e *device.Event) { +func (l *listener) onDeviceEvent(e *device.Event) { if e.Type != device.MessageReceived { return } @@ -65,9 +65,10 @@ func (l *listener) OnDeviceEvent(e *device.Event) { } select { - case l.outbounds <- outbound: - default: + case <-outbound.request.Context().Done(): l.logger.Error("Dropping outbound message for device [%s]: %s->%s", e.Device.ID(), e.Message.From(), e.Message.To()) + outbound.cancel() + case l.outbounds <- outbound: } } @@ -79,7 +80,9 @@ type workerPool struct { transactor func(*http.Request) (*http.Response, error) } -func (wp *workerPool) send(outbound *outboundEnvelope) { +// transact performs all the logic necessary to fulfill and outbound request. +// This method ensures that the Context associated with the request is properly cancelled. +func (wp *workerPool) transact(outbound *outboundEnvelope) { defer outbound.cancel() response, err := wp.transactor(outbound.request) @@ -100,7 +103,7 @@ func (wp *workerPool) send(outbound *outboundEnvelope) { func (wp *workerPool) worker() { for outbound := range wp.outbounds { - wp.send(outbound) + wp.transact(outbound) } } @@ -239,5 +242,5 @@ func (o *Outbounder) Start() device.Listener { ) workerPool.run(o.WorkerPoolSize) - return listener.OnDeviceEvent + return listener.onDeviceEvent }