Skip to content

Commit

Permalink
Merge pull request #1 from tmpim/emma/multi-header
Browse files Browse the repository at this point in the history
feat: Allow use of multiple headers for loadbalancing policy
  • Loading branch information
1lann authored Jul 30, 2020
2 parents 7b0a0cc + a2e865d commit 67b9a15
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 30 deletions.
25 changes: 15 additions & 10 deletions caskethttp/proxy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ type Policy interface {
}

func init() {
RegisterPolicy("random", func(arg string) Policy { return &Random{} })
RegisterPolicy("least_conn", func(arg string) Policy { return &LeastConn{} })
RegisterPolicy("round_robin", func(arg string) Policy { return &RoundRobin{} })
RegisterPolicy("ip_hash", func(arg string) Policy { return &IPHash{} })
RegisterPolicy("first", func(arg string) Policy { return &First{} })
RegisterPolicy("uri_hash", func(arg string) Policy { return &URIHash{} })
RegisterPolicy("header", func(arg string) Policy { return &Header{arg} })
RegisterPolicy("random", func(args []string) Policy { return &Random{} })
RegisterPolicy("least_conn", func(args []string) Policy { return &LeastConn{} })
RegisterPolicy("round_robin", func(args []string) Policy { return &RoundRobin{} })
RegisterPolicy("ip_hash", func(args []string) Policy { return &IPHash{} })
RegisterPolicy("first", func(args []string) Policy { return &First{} })
RegisterPolicy("uri_hash", func(args []string) Policy { return &URIHash{} })
RegisterPolicy("header", func(args []string) Policy { return &Header{args} })
}

// Random is a policy that selects up hosts from a pool at random.
Expand Down Expand Up @@ -183,17 +183,22 @@ func (r *First) Select(pool HostPool, request *http.Request) *UpstreamHost {
type Header struct {
// The name of the request header, the value of which will determine
// how the request is routed
Name string
Names []string
}

var roundRobinPolicier RoundRobin

// Select selects the host based on hashing the header value
func (r *Header) Select(pool HostPool, request *http.Request) *UpstreamHost {
if r.Name == "" {
if r.Names == nil {
return nil
}
val := request.Header.Get(r.Name)

val := ""
for _, name := range r.Names {
val += request.Header.Get(name)
}

if val == "" {
// fallback to RoundRobin policy in case no Header in request
return roundRobinPolicier.Select(pool, request)
Expand Down
24 changes: 12 additions & 12 deletions caskethttp/proxy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,18 +327,18 @@ func TestHeaderPolicy(t *testing.T) {
NilHost bool
HostIndex int
}{
{"empty config", &Header{""}, "", "", true, 0},
{"empty config+header+value", &Header{""}, "Affinity", "somevalue", true, 0},
{"empty config+header", &Header{""}, "Affinity", "", true, 0},

{"no header(fallback to roundrobin)", &Header{"Affinity"}, "", "", false, 1},
{"no header(fallback to roundrobin)", &Header{"Affinity"}, "", "", false, 2},
{"no header(fallback to roundrobin)", &Header{"Affinity"}, "", "", false, 0},

{"hash route to host", &Header{"Affinity"}, "Affinity", "somevalue", false, 1},
{"hash route to host", &Header{"Affinity"}, "Affinity", "somevalue2", false, 0},
{"hash route to host", &Header{"Affinity"}, "Affinity", "somevalue3", false, 2},
{"hash route with empty value", &Header{"Affinity"}, "Affinity", "", false, 1},
{"empty config", &Header{nil}, "", "", true, 0},
{"empty config+header+value", &Header{nil}, "Affinity", "somevalue", true, 0},
{"empty config+header", &Header{nil}, "Affinity", "", true, 0},

{"no header(fallback to roundrobin)", &Header{[]string{"Affinity"}}, "", "", false, 1},
{"no header(fallback to roundrobin)", &Header{[]string{"Affinity"}}, "", "", false, 2},
{"no header(fallback to roundrobin)", &Header{[]string{"Affinity"}}, "", "", false, 0},

{"hash route to host", &Header{[]string{"Affinity"}}, "Affinity", "somevalue", false, 1},
{"hash route to host", &Header{[]string{"Affinity"}}, "Affinity", "somevalue2", false, 0},
{"hash route to host", &Header{[]string{"Affinity"}}, "Affinity", "somevalue3", false, 2},
{"hash route with empty value", &Header{[]string{"Affinity"}}, "Affinity", "", false, 1},
}

for idx, test := range tests {
Expand Down
14 changes: 7 additions & 7 deletions caskethttp/proxy/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ import (

"crypto/tls"

"github.com/tmpim/casket/caskethttp/httpserver"
"github.com/tmpim/casket/casketfile"
"github.com/tmpim/casket/caskethttp/httpserver"
)

var (
supportedPolicies = make(map[string]func(string) Policy)
supportedPolicies = make(map[string]func([]string) Policy)
)

type staticUpstream struct {
Expand Down Expand Up @@ -346,11 +346,11 @@ func parseBlock(c *casketfile.Dispenser, u *staticUpstream, hasSrv bool) error {
if !ok {
return c.ArgErr()
}
arg := ""
if c.NextArg() {
arg = c.Val()
var args []string
for c.NextArg() {
args = append(args, c.Val())
}
u.Policy = policyCreateFunc(arg)
u.Policy = policyCreateFunc(args)
case "fallback_delay":
if !c.NextArg() {
return c.ArgErr()
Expand Down Expand Up @@ -780,7 +780,7 @@ func (u *staticUpstream) Stop() error {
}

// RegisterPolicy adds a custom policy to the proxy.
func RegisterPolicy(name string, policy func(string) Policy) {
func RegisterPolicy(name string, policy func([]string) Policy) {
supportedPolicies[name] = policy
}

Expand Down
2 changes: 1 addition & 1 deletion caskethttp/proxy/upstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestSelect(t *testing.T) {
func TestRegisterPolicy(t *testing.T) {
name := "custom"
customPolicy := &customPolicy{}
RegisterPolicy(name, func(string) Policy { return customPolicy })
RegisterPolicy(name, func([]string) Policy { return customPolicy })
if _, ok := supportedPolicies[name]; !ok {
t.Error("Expected supportedPolicies to have a custom policy.")
}
Expand Down

0 comments on commit 67b9a15

Please sign in to comment.