Skip to content

Commit

Permalink
- add policy support (sequential/weighted-random)
Browse files Browse the repository at this point in the history
- rename simple selector to sequential
- update docs + add examples

Signed-off-by: Gleb Kogtev <[email protected]>
  • Loading branch information
glebkin committed Aug 20, 2024
1 parent e6c4c37 commit a6446d3
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 92 deletions.
26 changes: 24 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ Each incoming DNS query that hits the CoreDNS fanout plugin will be replicated i
(Cloudflare) will not work.

* `worker-count` is the number of parallel queries per request. By default equals to count of IP list. Use this only for reducing parallel queries per request.
* `server-count` is the number of DNS servers to be requested. Equals to the number of specified IPs by default. If this parameter is lower than the number of specified IP addresses, servers are randomly selected based on the `load-factor` parameter.
* `load-factor` - the probability of selecting a server. This is specified in the order of the list of IP addresses and takes values between 1 and 100. By default, all servers have an equal probability of 100.
* `policy` - specifies the policy of DNS server selection mechanism. The default is `sequential`.
* `sequential` - select DNS servers one-by-one based on its order
* `weighted-random` - select DNS servers randomly based on `server-count` and `load-factor` params:
* `server-count` is the number of DNS servers to be requested. Equals to the number of specified IPs by default.
* `load-factor` - the probability of selecting a server. This is specified in the order of the list of IP addresses and takes values between 1 and 100. By default, all servers have an equal probability of 100.
* `network` is a specific network protocol. Could be `tcp`, `udp`, `tcp-tls`.
* `except` is a list is a space-separated list of domains to exclude from proxying.
* `except-file` is the path to file with line-separated list of domains to exclude from proxying.
Expand Down Expand Up @@ -112,3 +115,22 @@ If `race` is enable, we will get `NXDOMAIN` result quickly, otherwise we will ge
}
}
~~~

Sends parallel requests between two randomly selected resolvers. Note, that `127.0.0.1:9007` would be selected more frequently as it has the highest `load-factor`.
~~~ corefile
example.org {
fanout . 127.0.0.1:9005 127.0.0.1:9006 127.0.0.1:9007
policy weighted-random {
server-count 2
load-factor 50 70 100
}
}
~~~

Sends parallel requests between three resolver sequentially (default mode).
~~~ corefile
example.org {
fanout . 127.0.0.1:9005 127.0.0.1:9006 127.0.0.1:9007
policy sequential
}
~~~
26 changes: 14 additions & 12 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ package fanout
import "time"

const (
maxIPCount = 100
maxLoadFactor = 100
minLoadFactor = 1
maxWorkerCount = 32
minWorkerCount = 2
maxTimeout = 2 * time.Second
defaultTimeout = 30 * time.Second
readTimeout = 2 * time.Second
attemptDelay = time.Millisecond * 100
tcptls = "tcp-tls"
tcp = "tcp"
udp = "udp"
maxIPCount = 100
maxLoadFactor = 100
minLoadFactor = 1
policyWeightedRandom = "weighted-random"
policySequential = "sequential"
maxWorkerCount = 32
minWorkerCount = 2
maxTimeout = 2 * time.Second
defaultTimeout = 30 * time.Second
readTimeout = 2 * time.Second
attemptDelay = time.Millisecond * 100
tcptls = "tcp-tls"
tcp = "tcp"
udp = "udp"
)
53 changes: 21 additions & 32 deletions fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,44 +31,43 @@ import (
clog "github.com/coredns/coredns/plugin/pkg/log"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
"github.com/networkservicemesh/fanout/internal/selector"
"github.com/pkg/errors"
)

var log = clog.NewWithPlugin("fanout")

// Fanout represents a plugin instance that can do async requests to list of DNS servers.
type Fanout struct {
clients []Client
tlsConfig *tls.Config
excludeDomains Domain
tlsServerName string
timeout time.Duration
race bool
net string
from string
attempts int
workerCount int
serverCount int
loadFactor []int
tapPlugin *dnstap.Dnstap
Next plugin.Handler
clients []Client
tlsConfig *tls.Config
excludeDomains Domain
tlsServerName string
timeout time.Duration
race bool
net string
from string
attempts int
workerCount int
serverCount int
serverSelectionPolicy policy
tapPlugin *dnstap.Dnstap
Next plugin.Handler
}

// New returns reference to new Fanout plugin instance with default configs.
func New() *Fanout {
return &Fanout{
tlsConfig: new(tls.Config),
net: "udp",
attempts: 3,
timeout: defaultTimeout,
excludeDomains: NewDomain(),
tlsConfig: new(tls.Config),
net: "udp",
attempts: 3,
timeout: defaultTimeout,
excludeDomains: NewDomain(),
serverSelectionPolicy: &sequentialPolicy{}, // default policy
}
}

func (f *Fanout) addClient(p Client) {
f.clients = append(f.clients, p)
f.loadFactor = append(f.loadFactor, maxLoadFactor)
f.workerCount++
f.serverCount++
}
Expand Down Expand Up @@ -110,18 +109,8 @@ func (f *Fanout) ServeDNS(ctx context.Context, w dns.ResponseWriter, m *dns.Msg)
return 0, nil
}

type clientSelector interface {
Pick() Client
}

func (f *Fanout) runWorkers(ctx context.Context, req *request.Request) chan *response {
var sel clientSelector
if f.serverCount == len(f.clients) {
sel = selector.NewSimpleSelector(f.clients)
} else {
sel = selector.NewWeightedRandSelector(f.clients, f.loadFactor)
}

sel := f.serverSelectionPolicy.selector(f.clients)
workerCh := make(chan Client, f.workerCount)
responseCh := make(chan *response, f.serverCount)
go func() {
Expand Down
1 change: 1 addition & 0 deletions fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ func (t *fanoutTestSuite) TestServerCount() {
c1 := NewClient(s1.addr, t.network)
c2 := NewClient(s2.addr, t.network)
f := New()
f.serverSelectionPolicy = &weightedPolicy{loadFactor: []int{50, 100}}
f.net = t.network
f.from = "."
f.addClient(c1)
Expand Down
12 changes: 6 additions & 6 deletions internal/selector/simple.go → internal/selector/sequential.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@

package selector

// Simple selector acts like a queue and picks elements one-by-one starting from the first element
type Simple[T any] struct {
// Sequential selector acts like a queue and picks elements one-by-one starting from the first element
type Sequential[T any] struct {
values []T
idx int
}

// NewSimpleSelector inits Simple selector with default starting index 0
func NewSimpleSelector[T any](values []T) *Simple[T] {
return &Simple[T]{
// NewSequentialSelector inits Sequential selector with default starting index 0
func NewSequentialSelector[T any](values []T) *Sequential[T] {
return &Sequential[T]{
values: values,
idx: 0,
}
}

// Pick returns next available element from values array if exists.
// Returns default value of type T otherwise
func (s *Simple[T]) Pick() T {
func (s *Sequential[T]) Pick() T {
var result T
if s.idx >= len(s.values) {
return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestSimple_Pick(t *testing.T) {
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
wrs := NewSimpleSelector(tc.values)
wrs := NewSequentialSelector(tc.values)

actual := make([]string, 0, tc.picksCount)
for i := 0; i < tc.picksCount; i++ {
Expand Down
46 changes: 46 additions & 0 deletions policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2024 MWS and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fanout

import "github.com/networkservicemesh/fanout/internal/selector"

type policy interface {
selector(clients []Client) clientSelector
}

type clientSelector interface {
Pick() Client
}

// sequentialPolicy is used to select clients based on its sequential order
type sequentialPolicy struct {
}

// creates new sequential selector of provided clients
func (p *sequentialPolicy) selector(clients []Client) clientSelector {
return selector.NewSequentialSelector(clients)
}

// weightedPolicy is used to select clients randomly based on its loadFactor (weights)
type weightedPolicy struct {
loadFactor []int
}

// creates new weighted random selector of provided clients based on loadFactor
func (p *weightedPolicy) selector(clients []Client) clientSelector {
return selector.NewWeightedRandSelector(clients, p.loadFactor)
}
92 changes: 67 additions & 25 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,22 +122,14 @@ func parsefanoutStanza(c *caddyfile.Dispenser) (*Fanout, error) {
return f, err
}
for c.NextBlock() {
err = parseValue(strings.ToLower(c.Val()), f, c)
err = parseValue(strings.ToLower(c.Val()), f, c, toHosts)
if err != nil {
return nil, err
}
}
initClients(f, toHosts)
if f.serverCount > len(toHosts) || f.serverCount == 0 {
f.serverCount = len(toHosts)
}
if len(f.loadFactor) == 0 {
for i := 0; i < len(toHosts); i++ {
f.loadFactor = append(f.loadFactor, maxLoadFactor)
}
}
if len(f.loadFactor) != len(toHosts) {
return nil, errors.New("load-factor params count must be the same as the number of hosts")
if f.serverCount > len(f.clients) || f.serverCount == 0 {
f.serverCount = len(f.clients)
}

if f.workerCount > len(f.clients) || f.workerCount == 0 {
Expand All @@ -163,7 +155,7 @@ func initClients(f *Fanout, hosts []string) {
}
}

func parseValue(v string, f *Fanout, c *caddyfile.Dispenser) error {
func parseValue(v string, f *Fanout, c *caddyfile.Dispenser, hosts []string) error {
switch v {
case "tls":
return parseTLS(f, c)
Expand All @@ -173,12 +165,8 @@ func parseValue(v string, f *Fanout, c *caddyfile.Dispenser) error {
return parseTLSServer(f, c)
case "worker-count":
return parseWorkerCount(f, c)
case "server-count":
num, err := parsePositiveInt(c)
f.serverCount = num
return err
case "load-factor":
return parseLoadFactor(f, c)
case "policy":
return parsePolicy(f, c, hosts)
case "timeout":
return parseTimeout(f, c)
case "race":
Expand All @@ -196,6 +184,59 @@ func parseValue(v string, f *Fanout, c *caddyfile.Dispenser) error {
}
}

func parsePolicy(f *Fanout, c *caddyfile.Dispenser, hosts []string) error {
if !c.NextArg() {
return c.ArgErr()
}

switch c.Val() {
case policyWeightedRandom:
// omit "{"
c.Next()
if c.Val() != "{" {
return c.Err("Wrong policy configuration")
}
case policySequential:
f.serverSelectionPolicy = &sequentialPolicy{}
return nil
default:
return errors.Errorf("unknown policy %q", c.Val())
}

var loadFactor []int
for c.Next() {
if c.Val() == "}" {
break
}

var err error
switch c.Val() {
case "server-count":
f.serverCount, err = parsePositiveInt(c)
case "load-factor":
loadFactor, err = parseLoadFactor(c)
default:
return errors.Errorf("unknown property %q", c.Val())
}
if err != nil {
return err
}
}

if len(loadFactor) == 0 {
for i := 0; i < len(hosts); i++ {
loadFactor = append(loadFactor, maxLoadFactor)
}
}
if len(loadFactor) != len(hosts) {
return errors.New("load-factor params count must be the same as the number of hosts")
}

f.serverSelectionPolicy = &weightedPolicy{loadFactor: loadFactor}

return nil
}

func parseTimeout(f *Fanout, c *caddyfile.Dispenser) error {
if !c.NextArg() {
return c.ArgErr()
Expand Down Expand Up @@ -263,29 +304,30 @@ func parseWorkerCount(f *Fanout, c *caddyfile.Dispenser) error {
return err
}

func parseLoadFactor(f *Fanout, c *caddyfile.Dispenser) error {
func parseLoadFactor(c *caddyfile.Dispenser) ([]int, error) {
args := c.RemainingArgs()
if len(args) == 0 {
return c.ArgErr()
return nil, c.ArgErr()
}

result := make([]int, 0, len(args))
for _, arg := range args {
loadFactor, err := strconv.Atoi(arg)
if err != nil {
return c.ArgErr()
return nil, c.ArgErr()
}

if loadFactor < minLoadFactor {
return errors.New("load-factor should be more or equal 1")
return nil, errors.New("load-factor should be more or equal 1")
}
if loadFactor > maxLoadFactor {
return errors.Errorf("load-factor %d should be less than %d", loadFactor, maxLoadFactor)
return nil, errors.Errorf("load-factor %d should be less than %d", loadFactor, maxLoadFactor)
}

f.loadFactor = append(f.loadFactor, loadFactor)
result = append(result, loadFactor)
}

return nil
return result, nil
}

func parsePositiveInt(c *caddyfile.Dispenser) (int, error) {
Expand Down
Loading

0 comments on commit a6446d3

Please sign in to comment.