From 2cdf000eb2e748a70210631959c549b33c9df0c3 Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Thu, 10 Jan 2019 17:32:18 +0100 Subject: [PATCH] Close #21. Improve doc --- booster.go | 64 +++++++++++++++++++++++++++++++++++++++------ cmd/booster/main.go | 3 ++- metrics/exporter.go | 35 +++++++++++++++++-------- source/interface.go | 12 ++++----- 4 files changed, 88 insertions(+), 26 deletions(-) diff --git a/booster.go b/booster.go index 6de09c5..2b82abd 100644 --- a/booster.go +++ b/booster.go @@ -8,8 +8,7 @@ License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . @@ -20,33 +19,55 @@ package booster import ( "context" "net" + "sync" "github.com/booster-proj/booster/core" "upspin.io/log" ) // New returns an instance of a booster dialer. -func New(b *core.Balancer) core.Dialer { - return &dialer{b} +func New(b *core.Balancer) *Dialer { + return &Dialer{b: b} } -type dialer struct { - *core.Balancer +// MetricsExporter is an inteface around the IncSelectedSource function, +// which is used to collect a metric when a source is selected for use. +type MetricsExporter interface { + IncSelectedSource(labels map[string]string) } -func (d *dialer) DialContext(ctx context.Context, network, address string) (conn net.Conn, err error) { +// Dialer is a core.Dialer implementation, which uses a core.Balancer +// instance to to retrieve a source to use when it comes to dial a network +// connection. +type Dialer struct { + b *core.Balancer + + metrics struct { + sync.Mutex + exporter MetricsExporter + } +} + +// DialContext dials a connection using `network` to `address`. The connection returned +// is dialed through a specific network interface, which is chosen using the dialer's +// interal balancer provided. If it fails to create a connection using a source, it +// tries to dial it using another source, until source exhaustion. It that case, +// only the last error received is returned. +func (d *Dialer) DialContext(ctx context.Context, network, address string) (conn net.Conn, err error) { bl := make([]core.Source, 0, d.Len()) // blacklisted sources // If the dialing fails, keep on trying with the other sources until exaustion. for i := 0; len(bl) < d.Len(); i++ { var src core.Source - src, err = d.Get(ctx, bl...) + src, err = d.b.Get(ctx, bl...) if err != nil { // Fail directly if the balancer returns an error, as // we do not have any source to use. return } + d.sendMetrics(src.Name(), address) + log.Debug.Printf("DialContext: Attempt #%d to connect to %v (source %v)", i, address, src.Name()) conn, err = src.DialContext(ctx, "tcp4", address) @@ -63,3 +84,30 @@ func (d *dialer) DialContext(ctx context.Context, network, address string) (conn return } + +// Len returns the number of sources that the dialer as at it's disposal. +func (d *Dialer) Len() int { + return d.b.Len() +} + +// SetMetricsExporter makes the receiver use exp as metrics exporter. +func (d *Dialer) SetMetricsExporter(exp MetricsExporter) { + d.metrics.Lock() + defer d.metrics.Unlock() + + d.metrics.exporter = exp +} + +func (d *Dialer) sendMetrics(name, target string) { + if d.metrics.exporter == nil { + return + } + + d.metrics.Lock() + defer d.metrics.Unlock() + + d.metrics.exporter.IncSelectedSource(map[string]string{ + "source": name, + "target": target, + }) +} diff --git a/cmd/booster/main.go b/cmd/booster/main.go index 723cabe..1a6b162 100644 --- a/cmd/booster/main.go +++ b/cmd/booster/main.go @@ -92,13 +92,14 @@ func main() { } b := new(core.Balancer) - exp := new(metrics.Exporter) rs := store.New(b) + exp := new(metrics.Exporter) l := source.NewListener(source.Config{ Store: rs, MetricsExporter: exp, }) d := booster.New(b) + d.SetMetricsExporter(exp) router := remote.NewRouter() router.Store = rs diff --git a/metrics/exporter.go b/metrics/exporter.go index 4622475..0a06fee 100644 --- a/metrics/exporter.go +++ b/metrics/exporter.go @@ -27,16 +27,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) -// Exporter can be used to both capture and serve metrics. -type Exporter struct { -} - -// ServeHTTP is just a wrapper around the ServeHTTP function -// of the prohttp default Handler. -func (b *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) { - promhttp.Handler().ServeHTTP(w, r) -} - const namespace = "booster" var ( @@ -51,18 +41,35 @@ var ( Name: "network_receive_bytes", Help: "Received bytes for network source", }, []string{"source", "target"}) + + selectSource = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "select_source_total", + Help: "Number of times a source was chosen", + }, []string{"source", "target"}) ) func init() { prometheus.MustRegister(sendBytes) prometheus.MustRegister(receiveBytes) + prometheus.MustRegister(selectSource) +} + +// Exporter can be used to both capture and serve metrics. +type Exporter struct { +} + +// ServeHTTP is just a wrapper around the ServeHTTP function +// of the prohttp default Handler. +func (exp *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) { + promhttp.Handler().ServeHTTP(w, r) } // SendDataFlow can be used to update the metrics exported by the broker // about network usage, in particular upload and download bandwidth. `data` // Type should either be "read" or "write", referring respectively to download // and upload operations. -func (b *Exporter) SendDataFlow(labels map[string]string, data *source.DataFlow) { +func (exp *Exporter) SendDataFlow(labels map[string]string, data *source.DataFlow) { switch data.Type { case "read": receiveBytes.With(prometheus.Labels(labels)).Add(float64(data.N)) @@ -71,3 +78,9 @@ func (b *Exporter) SendDataFlow(labels map[string]string, data *source.DataFlow) default: } } + +// INcSelectedSource is used to update the number of times a source was +// chosen. +func (exp *Exporter) IncSelectedSource(labels map[string]string) { + selectSource.With(prometheus.Labels(labels)).Inc() +} diff --git a/source/interface.go b/source/interface.go index 4cdeebf..2a8f1cb 100644 --- a/source/interface.go +++ b/source/interface.go @@ -47,19 +47,19 @@ type Interface struct { metrics struct { sync.Mutex - broker MetricsExporter + exporter MetricsExporter } conns *conns } -// SetMetricsExporter sets br as the default MetricsExporter of interface +// SetMetricsExporter sets exp as the default MetricsExporter of interface // `i`. It is safe to use by multiple goroutines. -func (i *Interface) SetMetricsExporter(br MetricsExporter) { +func (i *Interface) SetMetricsExporter(exp MetricsExporter) { i.metrics.Lock() defer i.metrics.Unlock() - i.metrics.broker = br + i.metrics.exporter = exp } // Name implements the core.Source interface. @@ -121,14 +121,14 @@ func (i *Interface) Follow(conn net.Conn) net.Conn { // SendMetrics sends the data using the Interface's MetricsExporter. // It is safe to use by multiple goroutines. func (i *Interface) SendMetrics(labels map[string]string, data *DataFlow) { - if i.metrics.broker == nil { + if i.metrics.exporter == nil { return } i.metrics.Lock() defer i.metrics.Unlock() - i.metrics.broker.SendDataFlow(labels, data) + i.metrics.exporter.SendDataFlow(labels, data) } // Close closes all open connections.