Skip to content

Commit

Permalink
Fix http handler chain for wrp fanout (#89)
Browse files Browse the repository at this point in the history
* Fix http handler chain for fanout

* update wrp-go version

* update changelog
  • Loading branch information
joe94 authored Feb 17, 2020
1 parent d869994 commit 3892108
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 52 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

## [v0.4.1]
- fix bug in wiring of WRP and Fanout handler chains [#89](https://github.com/xmidt-org/scytale/pull/89)

## [v0.4.0]
- add configurable feature to authorize WRP PartnerIDs from predefined JWT claims [#86](https://github.com/xmidt-org/scytale/pull/86)

Expand All @@ -31,7 +34,8 @@ Switching to new build process
- initial creation


[Unreleased]: https://github.com/Comcast/scytale/compare/v0.4.0...HEAD
[Unreleased]: https://github.com/Comcast/scytale/compare/v0.4.1...HEAD
[v0.4.1]: https://github.com/Comcast/scytale/compare/v0.4.0...v0.4.1
[v0.4.0]: https://github.com/Comcast/scytale/compare/v0.3.1...v0.4.0
[v0.3.1]: https://github.com/Comcast/scytale/compare/v0.3.0...v0.3.1
[v0.3.0]: https://github.com/Comcast/scytale/compare/v0.2.0...v0.3.0
Expand Down
6 changes: 2 additions & 4 deletions WRPHandler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"fmt"
"net/http"

gokithttp "github.com/go-kit/kit/transport/http"
Expand Down Expand Up @@ -31,7 +30,7 @@ func newWRPFanoutHandler(fanoutHandler http.Handler) wrphttp.HandlerFunc {
panic("fanoutHandler must be defined")
}
return func(w wrphttp.ResponseWriter, r *wrphttp.Request) {
fanoutPrep(r.Original, r.Entity.Source, r.Entity)
fanoutPrep(r.Original, r.Entity.Bytes, r.Entity)
fanoutHandler.ServeHTTP(w, r.Original)
}
}
Expand All @@ -48,11 +47,10 @@ func newWRPFanoutHandlerWithPIDCheck(fanoutHandler http.Handler, p wrpAccessAuth
ctx = r.Context()
entity = r.Entity
fanout = r.Original
fanoutBody = r.Entity.Source
fanoutBody = r.Entity.Bytes
)

modified, err := p.authorizeWRP(ctx, &entity.Message)
fmt.Println(err == nil)

if err != nil {
encodeError(ctx, err, w)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ require (
github.com/stretchr/testify v1.3.0
github.com/xmidt-org/bascule v0.8.0
github.com/xmidt-org/webpa-common v1.6.2
github.com/xmidt-org/wrp-go/v2 v2.0.0
github.com/xmidt-org/wrp-go/v2 v2.0.1
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ github.com/xmidt-org/wrp-go v1.3.5-0.20200211000703-4ca6fd2bfee9 h1:6+6npiqsf8/l
github.com/xmidt-org/wrp-go v1.3.5-0.20200211000703-4ca6fd2bfee9/go.mod h1:5mZuvwCWAA7r3sXB7fAKykwROd5oMVygajutTBqddz0=
github.com/xmidt-org/wrp-go/v2 v2.0.0 h1:5qWc3uZDQNxjunUqK9HMrWZcdCaTtUVCtR+SSYWSK6I=
github.com/xmidt-org/wrp-go/v2 v2.0.0/go.mod h1:v0HK0go/7OSVDvKbnXsUn6c+M987p0yyxWEs8/Fmf60=
github.com/xmidt-org/wrp-go/v2 v2.0.1 h1:JWMpAvNCkD1pLXdZLmAs/4g3twxTM7K4YU57dapJvB0=
github.com/xmidt-org/wrp-go/v2 v2.0.1/go.mod h1:v0HK0go/7OSVDvKbnXsUn6c+M987p0yyxWEs8/Fmf60=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down
104 changes: 58 additions & 46 deletions primaryHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/xmidt-org/webpa-common/logging/logginghttp"
"github.com/xmidt-org/webpa-common/service"
"github.com/xmidt-org/webpa-common/service/monitor"
"github.com/xmidt-org/webpa-common/xhttp"
"github.com/xmidt-org/webpa-common/xhttp/fanout"
"github.com/xmidt-org/webpa-common/xmetrics"
"github.com/xmidt-org/wrp-go/v2"
Expand Down Expand Up @@ -201,35 +202,12 @@ func NewPrimaryHandler(logger log.Logger, v *viper.Viper, registry xmetrics.Regi
}

var (
handlerChain = authChain.Extend(
fanout.NewChain(
cfg,
logginghttp.SetLogger(
logger,
logginghttp.RequestInfo,

// custom logger func that extracts the intended destination of requests
func(kv []interface{}, request *http.Request) []interface{} {
if deviceName := request.Header.Get("X-Webpa-Device-Name"); len(deviceName) > 0 {
return append(kv, "X-Webpa-Device-Name", deviceName)
}

if variables := mux.Vars(request); len(variables) > 0 {
if deviceID := variables["deviceID"]; len(deviceID) > 0 {
return append(kv, "deviceID", deviceID)
}
}
return kv
},
),
),
)

transactor = fanout.NewTransactor(cfg)
options = []fanout.Option{
fanout.WithTransactor(transactor),
}
)

if len(cfg.Authorization) > 0 {
options = append(
options,
Expand All @@ -245,25 +223,55 @@ func NewPrimaryHandler(logger log.Logger, v *viper.Viper, registry xmetrics.Regi
)

router.NotFoundHandler = http.HandlerFunc(func(response http.ResponseWriter, _ *http.Request) {
response.WriteHeader(http.StatusBadRequest)
xhttp.WriteError(response, http.StatusBadRequest, "Invalid endpoint")
})

fanoutHandler := fanout.New(
endpoints,
append(
options,
fanout.WithFanoutBefore(
fanout.UsePath(fmt.Sprintf("%s/%s/device/send", baseURI, version)),
),
fanout.WithFanoutFailure(
fanout.ReturnHeadersWithPrefix("X-"),
),
fanout.WithFanoutAfter(
fanout.ReturnHeadersWithPrefix("X-"),
),
)...,
fanoutChain := fanout.NewChain(
cfg,
logginghttp.SetLogger(
logger,
logginghttp.RequestInfo,

// custom logger func that extracts the intended destination of requests
func(kv []interface{}, request *http.Request) []interface{} {
if deviceName := request.Header.Get("X-Webpa-Device-Name"); len(deviceName) > 0 {
return append(kv, "X-Webpa-Device-Name", deviceName)
}

if variables := mux.Vars(request); len(variables) > 0 {
if deviceID := variables["deviceID"]; len(deviceID) > 0 {
return append(kv, "deviceID", deviceID)
}
}
return kv
},
),
)

HTTPFanoutHandler := fanoutChain.Then(
fanout.New(
endpoints,
append(
options,
fanout.WithFanoutBefore(
fanout.ForwardHeaders("Content-Type", "X-Webpa-Device-Name"),
fanout.UsePath(fmt.Sprintf("%s/%s/device/send", baseURI, version)),

func(ctx context.Context, _, fanout *http.Request, body []byte) (context.Context, error) {
fanout.Body, fanout.GetBody = xhttp.NewRewindBytes(body)
fanout.ContentLength = int64(len(body))
return ctx, nil
},
),
fanout.WithFanoutFailure(
fanout.ReturnHeadersWithPrefix("X-"),
),
fanout.WithFanoutAfter(
fanout.ReturnHeadersWithPrefix("X-"),
),
)...,
))

var (
wrpCheckConfig WRPCheckConfig
WRPFanoutHandler wrphttp.Handler
Expand All @@ -283,28 +291,32 @@ func NewPrimaryHandler(logger log.Logger, v *viper.Viper, registry xmetrics.Regi

if wrpCheckConfig.Type == "enforce" || wrpCheckConfig.Type == "monitor" {
WRPFanoutHandler = newWRPFanoutHandlerWithPIDCheck(
fanoutHandler,
HTTPFanoutHandler,
&wrpPartnersAccess{
strict: wrpCheckConfig.Type == "enforce",
receivedWRPMessageCount: NewReceivedWRPCounter(registry),
})
} else {
WRPFanoutHandler = newWRPFanoutHandler(fanoutHandler)
WRPFanoutHandler = newWRPFanoutHandler(HTTPFanoutHandler)
}

sendWRPHandler := wrphttp.NewHTTPHandler(WRPFanoutHandler,
wrphttp.WithDecoder(wrphttp.DecodeEntityFromSources(wrp.Msgpack, true)),
wrphttp.WithNewResponseWriter(nonWRPResponseWriterFactory))

sendSubrouter.Headers(
wrphttp.MessageTypeHeader, "",
"Content-Type", wrp.Msgpack.ContentType(),
"Content-Type", wrp.JSON.ContentType()).
Handler(handlerChain.Then(sendWRPHandler))
wrphttp.MessageTypeHeader, "").
Handler(authChain.Then(sendWRPHandler))

sendSubrouter.Headers("Content-Type", wrp.Msgpack.ContentType()).
Handler(authChain.Then(sendWRPHandler))

sendSubrouter.Headers("Content-Type", wrp.JSON.ContentType()).
Handler(authChain.Then(sendWRPHandler))

router.Handle(
fmt.Sprintf("%s/%s/device/{deviceID}/stat", baseURI, version),
handlerChain.Then(
authChain.Extend(fanoutChain).Then(
fanout.New(
endpoints,
append(
Expand Down

0 comments on commit 3892108

Please sign in to comment.