From cdd0297f9ebc52cbacdb094ba9a99374f274b7fb Mon Sep 17 00:00:00 2001 From: John Bass Date: Tue, 11 Apr 2017 16:47:06 -0700 Subject: [PATCH 1/4] Renamed this handler to match the webpa-common API --- src/talaria/{inbound.go => primaryHandler.go} | 2 +- src/talaria/talaria.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) rename src/talaria/{inbound.go => primaryHandler.go} (93%) diff --git a/src/talaria/inbound.go b/src/talaria/primaryHandler.go similarity index 93% rename from src/talaria/inbound.go rename to src/talaria/primaryHandler.go index 8de06fb8..4e211142 100644 --- a/src/talaria/inbound.go +++ b/src/talaria/primaryHandler.go @@ -9,7 +9,7 @@ import ( "net/http" ) -func NewInboundHandler(logger logging.Logger, manager device.Manager, v *viper.Viper) (http.Handler, error) { +func NewPrimaryHandler(logger logging.Logger, manager device.Manager, v *viper.Viper) (http.Handler, error) { poolFactory, err := wrp.NewPoolFactory(v.Sub(wrp.ViperKey)) if err != nil { return nil, err diff --git a/src/talaria/talaria.go b/src/talaria/talaria.go index d3f8c551..596a5deb 100644 --- a/src/talaria/talaria.go +++ b/src/talaria/talaria.go @@ -58,13 +58,13 @@ func talaria(arguments []string) int { deviceOptions.Listeners = []device.Listener{outbounder.Start()} manager := device.NewManager(deviceOptions, nil) - deviceHandler, err := NewInboundHandler(logger, manager, v) + primaryHandler, err := NewPrimaryHandler(logger, manager, v) if err != nil { fmt.Fprintf(os.Stderr, "Unable to initialize inbound handler: %s\n", err) return 1 } - _, runnable := webPA.Prepare(logger, deviceHandler) + _, runnable := webPA.Prepare(logger, primaryHandler) waitGroup, shutdown, err := concurrent.Execute(runnable) if err != nil { fmt.Fprintf(os.Stderr, "Unable to start device manager: %s\n", err) From 45c1baeacb8b3bd045c58be14ea3d3d45f4f2444 Mon Sep 17 00:00:00 2001 From: John Bass Date: Tue, 11 Apr 2017 16:54:59 -0700 Subject: [PATCH 2/4] Simplifications using the new webpa-common API; removed message failed as there is no back channel in this version --- src/talaria/outbounder.go | 88 ++++++++------------------------------- 1 file changed, 17 insertions(+), 71 deletions(-) diff --git a/src/talaria/outbounder.go b/src/talaria/outbounder.go index 33b0c425..89bcafd7 100644 --- a/src/talaria/outbounder.go +++ b/src/talaria/outbounder.go @@ -19,9 +19,6 @@ const ( // OutbounderKey is the Viper subkey which is expected to hold Outbounder configuration OutbounderKey = "device.outbound" - // OutboundContentType is the Content-Type header value for device messages leaving talaria - OutboundContentType = "application/wrp" - EventPrefix = "event:" URLPrefix = "url:" @@ -33,9 +30,6 @@ const ( DefaultRequestQueueSize = 1000 DefaultRequestTimeout time.Duration = 5 * time.Second DefaultClientTimeout time.Duration = 3 * time.Second - DefaultEncoderPoolSize = 100 - DefaultMessageFailedSource = "talaria" - DefaultMessageFailedHeader = "X-Webpa-Message-Delivery-Failure" DefaultMaxIdleConns = 0 DefaultMaxIdleConnsPerHost = 100 DefaultIdleConnTimeout time.Duration = 0 @@ -47,67 +41,26 @@ type requestFactory func(device.Interface, wrp.Routable, []byte) (*http.Request, // listener is the internal device Listener type that dispatches requests over HTTP. type listener struct { - logger logging.Logger - requests chan<- *http.Request - requestFactory requestFactory - messageFailedHeader string - encoderPool *wrp.EncoderPool -} - -func (l *listener) messageReceived(device device.Interface, message wrp.Routable, encoded []byte) { - request, err := l.requestFactory(device, message, encoded) - if err != nil { - l.logger.Error("Unable to create request for device [%s]: %s", device.ID(), err) - return - } - - select { - case l.requests <- request: - default: - l.logger.Error("Dropping outbound message for device [%s]: %s->%s", device.ID(), message.From(), message.To()) - } + logger logging.Logger + requests chan<- *http.Request + requestFactory requestFactory } -func (l *listener) messageFailed(device device.Interface, message wrp.Routable, encoded []byte, sendError error) { - if message.MessageType() == wrp.SimpleEventMessageType { - return - } - - var ( - failureResponse = message.Response("TODO", 1) - encodedFailure bytes.Buffer - ) - - if err := l.encoderPool.Encode(&encodedFailure, failureResponse); err != nil { - l.logger.Error("Could not encode returned message for device [%s]: %s", device.ID(), err) +func (l *listener) OnDeviceEvent(e *device.Event) { + if e.Type != device.MessageReceived { return } - request, err := l.requestFactory(device, failureResponse, encodedFailure.Bytes()) + request, err := l.requestFactory(e.Device, e.Message, e.Contents) if err != nil { - l.logger.Error("Unable to create returned message request for device [%s]: %s", device.ID(), err) + l.logger.Error("Unable to create request for device [%s]: %s", e.Device.ID(), err) return } - if sendError != nil { - request.Header.Set(l.messageFailedHeader, sendError.Error()) - } else { - request.Header.Set(l.messageFailedHeader, "Disconnected") - } - select { case l.requests <- request: default: - l.logger.Error("Dropping returned message for device [%s]: %s->%s", device.ID(), failureResponse.From(), failureResponse.To()) - } -} - -func (l *listener) OnDeviceEvent(e *device.Event) { - switch e.Type { - case device.MessageReceived: - l.messageReceived(e.Device, e.Message, e.Contents) - case device.MessageFailed: - l.messageFailed(e.Device, e.Message, e.Contents, e.Error) + l.logger.Error("Dropping outbound message for device [%s]: %s->%s", e.Device.ID(), e.Message.From(), e.Message.To()) } } @@ -156,9 +109,6 @@ type Outbounder struct { RequestQueueSize int RequestTimeout time.Duration ClientTimeout time.Duration - EncoderPoolSize int - MessageFailedSource string - MessageFailedHeader string MaxIdleConns int MaxIdleConnsPerHost int IdleConnTimeout time.Duration @@ -179,8 +129,6 @@ func NewOutbounder(logger logging.Logger, v *viper.Viper) (o *Outbounder, err er RequestQueueSize: DefaultRequestQueueSize, RequestTimeout: DefaultRequestTimeout, ClientTimeout: DefaultClientTimeout, - EncoderPoolSize: DefaultEncoderPoolSize, - MessageFailedSource: DefaultMessageFailedSource, MaxIdleConns: DefaultMaxIdleConns, MaxIdleConnsPerHost: DefaultMaxIdleConnsPerHost, IdleConnTimeout: DefaultIdleConnTimeout, @@ -221,14 +169,14 @@ func (o *Outbounder) newRequestFactory() requestFactory { allowedSchemes[scheme] = true } - return func(device device.Interface, message wrp.Routable, encoded []byte) (r *http.Request, err error) { - destination := message.To() + return func(d device.Interface, m wrp.Routable, c []byte) (r *http.Request, err error) { + destination := m.To() if strings.HasPrefix(destination, EventPrefix) { // route this to the configured endpoint that receives all events - r, err = http.NewRequest(o.Method, o.EventEndpoint, bytes.NewBuffer(encoded)) + r, err = http.NewRequest(o.Method, o.EventEndpoint, bytes.NewReader(c)) } else if strings.HasPrefix(destination, URLPrefix) { // route this to the given URL, subject to some validation - if r, err = http.NewRequest(o.Method, destination[len(URLPrefix):], bytes.NewBuffer(encoded)); err == nil { + if r, err = http.NewRequest(o.Method, destination[len(URLPrefix):], bytes.NewReader(c)); err == nil { if len(r.URL.Scheme) == 0 { // if no scheme is supplied, use the configured AssumeScheme r.URL.Scheme = o.AssumeScheme @@ -244,8 +192,8 @@ func (o *Outbounder) newRequestFactory() requestFactory { return } - r.Header.Set(o.DeviceNameHeader, string(device.ID())) - r.Header.Set("Content-Type", OutboundContentType) + r.Header.Set(o.DeviceNameHeader, string(d.ID())) + r.Header.Set("Content-Type", wrp.Msgpack.ContentType()) // TODO: Need to set Convey? ctx, _ := context.WithTimeout(context.Background(), o.RequestTimeout) @@ -266,11 +214,9 @@ func (o *Outbounder) Start() device.Listener { } listener = &listener{ - logger: o.Logger, - requestFactory: o.newRequestFactory(), - messageFailedHeader: o.MessageFailedHeader, - requests: requests, - encoderPool: wrp.NewEncoderPool(o.EncoderPoolSize, wrp.Msgpack), + logger: o.Logger, + requestFactory: o.newRequestFactory(), + requests: requests, } ) From 3a159accefb723503ace70a904f77e39d9061de5 Mon Sep 17 00:00:00 2001 From: John Bass Date: Tue, 11 Apr 2017 18:28:47 -0700 Subject: [PATCH 3/4] Added cancellation semantics to the outbound queue --- src/talaria/outbounder.go | 94 +++++++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 38 deletions(-) diff --git a/src/talaria/outbounder.go b/src/talaria/outbounder.go index 89bcafd7..97aac4bc 100644 --- a/src/talaria/outbounder.go +++ b/src/talaria/outbounder.go @@ -27,22 +27,29 @@ const ( DefaultAssumeScheme = "https" DefaultAllowedScheme = "https" DefaultWorkerPoolSize = 100 - DefaultRequestQueueSize = 1000 - DefaultRequestTimeout time.Duration = 5 * time.Second + DefaultOutboundQueueSize = 1000 + DefaultRequestTimeout time.Duration = 15 * time.Second DefaultClientTimeout time.Duration = 3 * time.Second DefaultMaxIdleConns = 0 DefaultMaxIdleConnsPerHost = 100 DefaultIdleConnTimeout time.Duration = 0 ) +// outboundEnvelope is a tuple of information related to handling an asynchronous HTTP request +type outboundEnvelope struct { + request *http.Request + cancel func() +} + // RequestFactory is a simple function type for creating an outbound HTTP request -// for a given WRP message. -type requestFactory func(device.Interface, wrp.Routable, []byte) (*http.Request, error) +// for a given WRP message. This factory function type wraps the created HTTP request in +// an envelope with other data related to cancellation and queue management. +type requestFactory func(device.Interface, wrp.Routable, []byte) (*outboundEnvelope, error) // listener is the internal device Listener type that dispatches requests over HTTP. type listener struct { logger logging.Logger - requests chan<- *http.Request + outbounds chan<- *outboundEnvelope requestFactory requestFactory } @@ -51,14 +58,14 @@ func (l *listener) OnDeviceEvent(e *device.Event) { return } - request, err := l.requestFactory(e.Device, e.Message, e.Contents) + outbound, err := l.requestFactory(e.Device, e.Message, e.Contents) if err != nil { l.logger.Error("Unable to create request for device [%s]: %s", e.Device.ID(), err) return } select { - case l.requests <- request: + case l.outbounds <- outbound: default: l.logger.Error("Dropping outbound message for device [%s]: %s->%s", e.Device.ID(), e.Message.From(), e.Message.To()) } @@ -68,26 +75,32 @@ func (l *listener) OnDeviceEvent(e *device.Event) { // a transactor function type workerPool struct { logger logging.Logger - requests <-chan *http.Request + outbounds <-chan *outboundEnvelope transactor func(*http.Request) (*http.Response, error) } -func (wp *workerPool) worker() { - for request := range wp.requests { - response, err := wp.transactor(request) - if err != nil { - wp.logger.Error("HTTP error: %s", err) - continue - } +func (wp *workerPool) send(outbound *outboundEnvelope) { + defer outbound.cancel() - if response.StatusCode < 400 { - wp.logger.Debug("HTTP response status: %s", response.Status) - } else { - wp.logger.Error("HTTP response status: %s", response.Status) - } + response, err := wp.transactor(outbound.request) + if err != nil { + wp.logger.Error("HTTP error: %s", err) + return + } + + if response.StatusCode < 400 { + wp.logger.Debug("HTTP response status: %s", response.Status) + } else { + wp.logger.Error("HTTP response status: %s", response.Status) + } - io.Copy(ioutil.Discard, response.Body) - response.Body.Close() + io.Copy(ioutil.Discard, response.Body) + response.Body.Close() +} + +func (wp *workerPool) worker() { + for outbound := range wp.outbounds { + wp.send(outbound) } } @@ -106,7 +119,7 @@ type Outbounder struct { AssumeScheme string AllowedSchemes []string WorkerPoolSize int - RequestQueueSize int + OutboundQueueSize int RequestTimeout time.Duration ClientTimeout time.Duration MaxIdleConns int @@ -126,7 +139,7 @@ func NewOutbounder(logger logging.Logger, v *viper.Viper) (o *Outbounder, err er AssumeScheme: DefaultAssumeScheme, AllowedSchemes: []string{DefaultAllowedScheme}, WorkerPoolSize: DefaultWorkerPoolSize, - RequestQueueSize: DefaultRequestQueueSize, + OutboundQueueSize: DefaultOutboundQueueSize, RequestTimeout: DefaultRequestTimeout, ClientTimeout: DefaultClientTimeout, MaxIdleConns: DefaultMaxIdleConns, @@ -169,19 +182,20 @@ func (o *Outbounder) newRequestFactory() requestFactory { allowedSchemes[scheme] = true } - return func(d device.Interface, m wrp.Routable, c []byte) (r *http.Request, err error) { + return func(d device.Interface, m wrp.Routable, c []byte) (outbound *outboundEnvelope, err error) { destination := m.To() + var request *http.Request if strings.HasPrefix(destination, EventPrefix) { // route this to the configured endpoint that receives all events - r, err = http.NewRequest(o.Method, o.EventEndpoint, bytes.NewReader(c)) + request, err = http.NewRequest(o.Method, o.EventEndpoint, bytes.NewReader(c)) } else if strings.HasPrefix(destination, URLPrefix) { // route this to the given URL, subject to some validation - if r, err = http.NewRequest(o.Method, destination[len(URLPrefix):], bytes.NewReader(c)); err == nil { - if len(r.URL.Scheme) == 0 { + if request, err = http.NewRequest(o.Method, destination[len(URLPrefix):], bytes.NewReader(c)); err == nil { + if len(request.URL.Scheme) == 0 { // if no scheme is supplied, use the configured AssumeScheme - r.URL.Scheme = o.AssumeScheme - } else if !allowedSchemes[r.URL.Scheme] { - err = fmt.Errorf("Scheme not allowed: %s", r.URL.Scheme) + request.URL.Scheme = o.AssumeScheme + } else if !allowedSchemes[request.URL.Scheme] { + err = fmt.Errorf("Scheme not allowed: %s", request.URL.Scheme) } } } else { @@ -192,31 +206,35 @@ func (o *Outbounder) newRequestFactory() requestFactory { return } - r.Header.Set(o.DeviceNameHeader, string(d.ID())) - r.Header.Set("Content-Type", wrp.Msgpack.ContentType()) + request.Header.Set(o.DeviceNameHeader, string(d.ID())) + request.Header.Set("Content-Type", wrp.Msgpack.ContentType()) // TODO: Need to set Convey? - ctx, _ := context.WithTimeout(context.Background(), o.RequestTimeout) - r = r.WithContext(ctx) + ctx, cancel := context.WithTimeout(request.Context(), o.RequestTimeout) + outbound = &outboundEnvelope{ + request: request.WithContext(ctx), + cancel: cancel, + } return } } +// Start spawns all necessary goroutines and returns a device.Listener func (o *Outbounder) Start() device.Listener { var ( - requests = make(chan *http.Request, o.RequestQueueSize) + outbounds = make(chan *outboundEnvelope, o.OutboundQueueSize) workerPool = workerPool{ logger: o.Logger, - requests: requests, + outbounds: outbounds, transactor: o.newTransactor(), } listener = &listener{ logger: o.Logger, requestFactory: o.newRequestFactory(), - requests: requests, + outbounds: outbounds, } ) From eaccbbdd6a6d29fd15d5b43c53a0cb1e1006a626 Mon Sep 17 00:00:00 2001 From: John Bass Date: Tue, 11 Apr 2017 21:45:13 -0700 Subject: [PATCH 4/4] Honor the context's cancellation semantics when enqueuing outbound requests; don't export listener method --- src/talaria/outbounder.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/talaria/outbounder.go b/src/talaria/outbounder.go index 97aac4bc..b1abbb76 100644 --- a/src/talaria/outbounder.go +++ b/src/talaria/outbounder.go @@ -53,7 +53,7 @@ type listener struct { requestFactory requestFactory } -func (l *listener) OnDeviceEvent(e *device.Event) { +func (l *listener) onDeviceEvent(e *device.Event) { if e.Type != device.MessageReceived { return } @@ -65,9 +65,10 @@ func (l *listener) OnDeviceEvent(e *device.Event) { } select { - case l.outbounds <- outbound: - default: + case <-outbound.request.Context().Done(): l.logger.Error("Dropping outbound message for device [%s]: %s->%s", e.Device.ID(), e.Message.From(), e.Message.To()) + outbound.cancel() + case l.outbounds <- outbound: } } @@ -79,7 +80,9 @@ type workerPool struct { transactor func(*http.Request) (*http.Response, error) } -func (wp *workerPool) send(outbound *outboundEnvelope) { +// transact performs all the logic necessary to fulfill and outbound request. +// This method ensures that the Context associated with the request is properly cancelled. +func (wp *workerPool) transact(outbound *outboundEnvelope) { defer outbound.cancel() response, err := wp.transactor(outbound.request) @@ -100,7 +103,7 @@ func (wp *workerPool) send(outbound *outboundEnvelope) { func (wp *workerPool) worker() { for outbound := range wp.outbounds { - wp.send(outbound) + wp.transact(outbound) } } @@ -239,5 +242,5 @@ func (o *Outbounder) Start() device.Listener { ) workerPool.run(o.WorkerPoolSize) - return listener.OnDeviceEvent + return listener.onDeviceEvent }