Skip to content

Commit

Permalink
Merge pull request #4 from Comcast/feature/remove_async_logic
Browse files Browse the repository at this point in the history
Feature/remove async logic
  • Loading branch information
johnabass authored Apr 12, 2017
2 parents 4c4a103 + eaccbbd commit 27e2bca
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 106 deletions.
173 changes: 70 additions & 103 deletions src/talaria/outbounder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ const (
// OutbounderKey is the Viper subkey which is expected to hold Outbounder configuration
OutbounderKey = "device.outbound"

// OutboundContentType is the Content-Type header value for device messages leaving talaria
OutboundContentType = "application/wrp"

EventPrefix = "event:"
URLPrefix = "url:"

Expand All @@ -30,111 +27,83 @@ const (
DefaultAssumeScheme = "https"
DefaultAllowedScheme = "https"
DefaultWorkerPoolSize = 100
DefaultRequestQueueSize = 1000
DefaultRequestTimeout time.Duration = 5 * time.Second
DefaultOutboundQueueSize = 1000
DefaultRequestTimeout time.Duration = 15 * time.Second
DefaultClientTimeout time.Duration = 3 * time.Second
DefaultEncoderPoolSize = 100
DefaultMessageFailedSource = "talaria"
DefaultMessageFailedHeader = "X-Webpa-Message-Delivery-Failure"
DefaultMaxIdleConns = 0
DefaultMaxIdleConnsPerHost = 100
DefaultIdleConnTimeout time.Duration = 0
)

// outboundEnvelope is a tuple of information related to handling an asynchronous HTTP request
type outboundEnvelope struct {
request *http.Request
cancel func()
}

// RequestFactory is a simple function type for creating an outbound HTTP request
// for a given WRP message.
type requestFactory func(device.Interface, wrp.Routable, []byte) (*http.Request, error)
// for a given WRP message. This factory function type wraps the created HTTP request in
// an envelope with other data related to cancellation and queue management.
type requestFactory func(device.Interface, wrp.Routable, []byte) (*outboundEnvelope, error)

// listener is the internal device Listener type that dispatches requests over HTTP.
type listener struct {
logger logging.Logger
requests chan<- *http.Request
requestFactory requestFactory
messageFailedHeader string
encoderPool *wrp.EncoderPool
logger logging.Logger
outbounds chan<- *outboundEnvelope
requestFactory requestFactory
}

func (l *listener) messageReceived(device device.Interface, message wrp.Routable, encoded []byte) {
request, err := l.requestFactory(device, message, encoded)
if err != nil {
l.logger.Error("Unable to create request for device [%s]: %s", device.ID(), err)
func (l *listener) onDeviceEvent(e *device.Event) {
if e.Type != device.MessageReceived {
return
}

select {
case l.requests <- request:
default:
l.logger.Error("Dropping outbound message for device [%s]: %s->%s", device.ID(), message.From(), message.To())
}
}

func (l *listener) messageFailed(device device.Interface, message wrp.Routable, encoded []byte, sendError error) {
if message.MessageType() == wrp.SimpleEventMessageType {
return
}

var (
failureResponse = message.Response("TODO", 1)
encodedFailure bytes.Buffer
)

if err := l.encoderPool.Encode(&encodedFailure, failureResponse); err != nil {
l.logger.Error("Could not encode returned message for device [%s]: %s", device.ID(), err)
return
}

request, err := l.requestFactory(device, failureResponse, encodedFailure.Bytes())
outbound, err := l.requestFactory(e.Device, e.Message, e.Contents)
if err != nil {
l.logger.Error("Unable to create returned message request for device [%s]: %s", device.ID(), err)
l.logger.Error("Unable to create request for device [%s]: %s", e.Device.ID(), err)
return
}

if sendError != nil {
request.Header.Set(l.messageFailedHeader, sendError.Error())
} else {
request.Header.Set(l.messageFailedHeader, "Disconnected")
}

select {
case l.requests <- request:
default:
l.logger.Error("Dropping returned message for device [%s]: %s->%s", device.ID(), failureResponse.From(), failureResponse.To())
}
}

func (l *listener) OnDeviceEvent(e *device.Event) {
switch e.Type {
case device.MessageReceived:
l.messageReceived(e.Device, e.Message, e.Contents)
case device.MessageFailed:
l.messageFailed(e.Device, e.Message, e.Contents, e.Error)
case <-outbound.request.Context().Done():
l.logger.Error("Dropping outbound message for device [%s]: %s->%s", e.Device.ID(), e.Message.From(), e.Message.To())
outbound.cancel()
case l.outbounds <- outbound:
}
}

// workerPool describes a pool of goroutines that dispatch http.Request objects to
// a transactor function
type workerPool struct {
logger logging.Logger
requests <-chan *http.Request
outbounds <-chan *outboundEnvelope
transactor func(*http.Request) (*http.Response, error)
}

func (wp *workerPool) worker() {
for request := range wp.requests {
response, err := wp.transactor(request)
if err != nil {
wp.logger.Error("HTTP error: %s", err)
continue
}
// transact performs all the logic necessary to fulfill and outbound request.
// This method ensures that the Context associated with the request is properly cancelled.
func (wp *workerPool) transact(outbound *outboundEnvelope) {
defer outbound.cancel()

if response.StatusCode < 400 {
wp.logger.Debug("HTTP response status: %s", response.Status)
} else {
wp.logger.Error("HTTP response status: %s", response.Status)
}
response, err := wp.transactor(outbound.request)
if err != nil {
wp.logger.Error("HTTP error: %s", err)
return
}

io.Copy(ioutil.Discard, response.Body)
response.Body.Close()
if response.StatusCode < 400 {
wp.logger.Debug("HTTP response status: %s", response.Status)
} else {
wp.logger.Error("HTTP response status: %s", response.Status)
}

io.Copy(ioutil.Discard, response.Body)
response.Body.Close()
}

func (wp *workerPool) worker() {
for outbound := range wp.outbounds {
wp.transact(outbound)
}
}

Expand All @@ -153,12 +122,9 @@ type Outbounder struct {
AssumeScheme string
AllowedSchemes []string
WorkerPoolSize int
RequestQueueSize int
OutboundQueueSize int
RequestTimeout time.Duration
ClientTimeout time.Duration
EncoderPoolSize int
MessageFailedSource string
MessageFailedHeader string
MaxIdleConns int
MaxIdleConnsPerHost int
IdleConnTimeout time.Duration
Expand All @@ -176,11 +142,9 @@ func NewOutbounder(logger logging.Logger, v *viper.Viper) (o *Outbounder, err er
AssumeScheme: DefaultAssumeScheme,
AllowedSchemes: []string{DefaultAllowedScheme},
WorkerPoolSize: DefaultWorkerPoolSize,
RequestQueueSize: DefaultRequestQueueSize,
OutboundQueueSize: DefaultOutboundQueueSize,
RequestTimeout: DefaultRequestTimeout,
ClientTimeout: DefaultClientTimeout,
EncoderPoolSize: DefaultEncoderPoolSize,
MessageFailedSource: DefaultMessageFailedSource,
MaxIdleConns: DefaultMaxIdleConns,
MaxIdleConnsPerHost: DefaultMaxIdleConnsPerHost,
IdleConnTimeout: DefaultIdleConnTimeout,
Expand Down Expand Up @@ -221,19 +185,20 @@ func (o *Outbounder) newRequestFactory() requestFactory {
allowedSchemes[scheme] = true
}

return func(device device.Interface, message wrp.Routable, encoded []byte) (r *http.Request, err error) {
destination := message.To()
return func(d device.Interface, m wrp.Routable, c []byte) (outbound *outboundEnvelope, err error) {
destination := m.To()
var request *http.Request
if strings.HasPrefix(destination, EventPrefix) {
// route this to the configured endpoint that receives all events
r, err = http.NewRequest(o.Method, o.EventEndpoint, bytes.NewBuffer(encoded))
request, err = http.NewRequest(o.Method, o.EventEndpoint, bytes.NewReader(c))
} else if strings.HasPrefix(destination, URLPrefix) {
// route this to the given URL, subject to some validation
if r, err = http.NewRequest(o.Method, destination[len(URLPrefix):], bytes.NewBuffer(encoded)); err == nil {
if len(r.URL.Scheme) == 0 {
if request, err = http.NewRequest(o.Method, destination[len(URLPrefix):], bytes.NewReader(c)); err == nil {
if len(request.URL.Scheme) == 0 {
// if no scheme is supplied, use the configured AssumeScheme
r.URL.Scheme = o.AssumeScheme
} else if !allowedSchemes[r.URL.Scheme] {
err = fmt.Errorf("Scheme not allowed: %s", r.URL.Scheme)
request.URL.Scheme = o.AssumeScheme
} else if !allowedSchemes[request.URL.Scheme] {
err = fmt.Errorf("Scheme not allowed: %s", request.URL.Scheme)
}
}
} else {
Expand All @@ -244,36 +209,38 @@ func (o *Outbounder) newRequestFactory() requestFactory {
return
}

r.Header.Set(o.DeviceNameHeader, string(device.ID()))
r.Header.Set("Content-Type", OutboundContentType)
request.Header.Set(o.DeviceNameHeader, string(d.ID()))
request.Header.Set("Content-Type", wrp.Msgpack.ContentType())
// TODO: Need to set Convey?

ctx, _ := context.WithTimeout(context.Background(), o.RequestTimeout)
r = r.WithContext(ctx)
ctx, cancel := context.WithTimeout(request.Context(), o.RequestTimeout)
outbound = &outboundEnvelope{
request: request.WithContext(ctx),
cancel: cancel,
}

return
}
}

// Start spawns all necessary goroutines and returns a device.Listener
func (o *Outbounder) Start() device.Listener {
var (
requests = make(chan *http.Request, o.RequestQueueSize)
outbounds = make(chan *outboundEnvelope, o.OutboundQueueSize)

workerPool = workerPool{
logger: o.Logger,
requests: requests,
outbounds: outbounds,
transactor: o.newTransactor(),
}

listener = &listener{
logger: o.Logger,
requestFactory: o.newRequestFactory(),
messageFailedHeader: o.MessageFailedHeader,
requests: requests,
encoderPool: wrp.NewEncoderPool(o.EncoderPoolSize, wrp.Msgpack),
logger: o.Logger,
requestFactory: o.newRequestFactory(),
outbounds: outbounds,
}
)

workerPool.run(o.WorkerPoolSize)
return listener.OnDeviceEvent
return listener.onDeviceEvent
}
2 changes: 1 addition & 1 deletion src/talaria/inbound.go → src/talaria/primaryHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"net/http"
)

func NewInboundHandler(logger logging.Logger, manager device.Manager, v *viper.Viper) (http.Handler, error) {
func NewPrimaryHandler(logger logging.Logger, manager device.Manager, v *viper.Viper) (http.Handler, error) {
poolFactory, err := wrp.NewPoolFactory(v.Sub(wrp.ViperKey))
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions src/talaria/talaria.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ func talaria(arguments []string) int {

deviceOptions.Listeners = []device.Listener{outbounder.Start()}
manager := device.NewManager(deviceOptions, nil)
deviceHandler, err := NewInboundHandler(logger, manager, v)
primaryHandler, err := NewPrimaryHandler(logger, manager, v)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to initialize inbound handler: %s\n", err)
return 1
}

_, runnable := webPA.Prepare(logger, deviceHandler)
_, runnable := webPA.Prepare(logger, primaryHandler)
waitGroup, shutdown, err := concurrent.Execute(runnable)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to start device manager: %s\n", err)
Expand Down

0 comments on commit 27e2bca

Please sign in to comment.