Skip to content

Commit

Permalink
Honor the context's cancellation semantics when enqueuing outbound re…
Browse files Browse the repository at this point in the history
…quests; don't export listener method
  • Loading branch information
John Bass authored and John Bass committed Apr 12, 2017
1 parent 3a159ac commit eaccbbd
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions src/talaria/outbounder.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type listener struct {
requestFactory requestFactory
}

func (l *listener) OnDeviceEvent(e *device.Event) {
func (l *listener) onDeviceEvent(e *device.Event) {
if e.Type != device.MessageReceived {
return
}
Expand All @@ -65,9 +65,10 @@ func (l *listener) OnDeviceEvent(e *device.Event) {
}

select {
case l.outbounds <- outbound:
default:
case <-outbound.request.Context().Done():
l.logger.Error("Dropping outbound message for device [%s]: %s->%s", e.Device.ID(), e.Message.From(), e.Message.To())
outbound.cancel()
case l.outbounds <- outbound:
}
}

Expand All @@ -79,7 +80,9 @@ type workerPool struct {
transactor func(*http.Request) (*http.Response, error)
}

func (wp *workerPool) send(outbound *outboundEnvelope) {
// transact performs all the logic necessary to fulfill and outbound request.
// This method ensures that the Context associated with the request is properly cancelled.
func (wp *workerPool) transact(outbound *outboundEnvelope) {
defer outbound.cancel()

response, err := wp.transactor(outbound.request)
Expand All @@ -100,7 +103,7 @@ func (wp *workerPool) send(outbound *outboundEnvelope) {

func (wp *workerPool) worker() {
for outbound := range wp.outbounds {
wp.send(outbound)
wp.transact(outbound)
}
}

Expand Down Expand Up @@ -239,5 +242,5 @@ func (o *Outbounder) Start() device.Listener {
)

workerPool.run(o.WorkerPoolSize)
return listener.OnDeviceEvent
return listener.onDeviceEvent
}

0 comments on commit eaccbbd

Please sign in to comment.