Skip to content
This repository has been archived by the owner on Apr 2, 2020. It is now read-only.

Commit

Permalink
Merge pull request #22 from booster-proj/issue/21
Browse files Browse the repository at this point in the history
Export selected source metrics
  • Loading branch information
dmorn authored Jan 10, 2019
2 parents 686dcc2 + 2cdf000 commit 6aa182f
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 26 deletions.
64 changes: 56 additions & 8 deletions booster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Expand All @@ -20,33 +19,55 @@ package booster
import (
"context"
"net"
"sync"

"github.com/booster-proj/booster/core"
"upspin.io/log"
)

// New returns an instance of a booster dialer.
func New(b *core.Balancer) core.Dialer {
return &dialer{b}
func New(b *core.Balancer) *Dialer {
return &Dialer{b: b}
}

type dialer struct {
*core.Balancer
// MetricsExporter is an inteface around the IncSelectedSource function,
// which is used to collect a metric when a source is selected for use.
type MetricsExporter interface {
IncSelectedSource(labels map[string]string)
}

func (d *dialer) DialContext(ctx context.Context, network, address string) (conn net.Conn, err error) {
// Dialer is a core.Dialer implementation, which uses a core.Balancer
// instance to to retrieve a source to use when it comes to dial a network
// connection.
type Dialer struct {
b *core.Balancer

metrics struct {
sync.Mutex
exporter MetricsExporter
}
}

// DialContext dials a connection using `network` to `address`. The connection returned
// is dialed through a specific network interface, which is chosen using the dialer's
// interal balancer provided. If it fails to create a connection using a source, it
// tries to dial it using another source, until source exhaustion. It that case,
// only the last error received is returned.
func (d *Dialer) DialContext(ctx context.Context, network, address string) (conn net.Conn, err error) {
bl := make([]core.Source, 0, d.Len()) // blacklisted sources

// If the dialing fails, keep on trying with the other sources until exaustion.
for i := 0; len(bl) < d.Len(); i++ {
var src core.Source
src, err = d.Get(ctx, bl...)
src, err = d.b.Get(ctx, bl...)
if err != nil {
// Fail directly if the balancer returns an error, as
// we do not have any source to use.
return
}

d.sendMetrics(src.Name(), address)

log.Debug.Printf("DialContext: Attempt #%d to connect to %v (source %v)", i, address, src.Name())

conn, err = src.DialContext(ctx, "tcp4", address)
Expand All @@ -63,3 +84,30 @@ func (d *dialer) DialContext(ctx context.Context, network, address string) (conn

return
}

// Len returns the number of sources that the dialer as at it's disposal.
func (d *Dialer) Len() int {
return d.b.Len()
}

// SetMetricsExporter makes the receiver use exp as metrics exporter.
func (d *Dialer) SetMetricsExporter(exp MetricsExporter) {
d.metrics.Lock()
defer d.metrics.Unlock()

d.metrics.exporter = exp
}

func (d *Dialer) sendMetrics(name, target string) {
if d.metrics.exporter == nil {
return
}

d.metrics.Lock()
defer d.metrics.Unlock()

d.metrics.exporter.IncSelectedSource(map[string]string{
"source": name,
"target": target,
})
}
3 changes: 2 additions & 1 deletion cmd/booster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,14 @@ func main() {
}

b := new(core.Balancer)
exp := new(metrics.Exporter)
rs := store.New(b)
exp := new(metrics.Exporter)
l := source.NewListener(source.Config{
Store: rs,
MetricsExporter: exp,
})
d := booster.New(b)
d.SetMetricsExporter(exp)

router := remote.NewRouter()
router.Store = rs
Expand Down
35 changes: 24 additions & 11 deletions metrics/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

// Exporter can be used to both capture and serve metrics.
type Exporter struct {
}

// ServeHTTP is just a wrapper around the ServeHTTP function
// of the prohttp default Handler.
func (b *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
promhttp.Handler().ServeHTTP(w, r)
}

const namespace = "booster"

var (
Expand All @@ -51,18 +41,35 @@ var (
Name: "network_receive_bytes",
Help: "Received bytes for network source",
}, []string{"source", "target"})

selectSource = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "select_source_total",
Help: "Number of times a source was chosen",
}, []string{"source", "target"})
)

func init() {
prometheus.MustRegister(sendBytes)
prometheus.MustRegister(receiveBytes)
prometheus.MustRegister(selectSource)
}

// Exporter can be used to both capture and serve metrics.
type Exporter struct {
}

// ServeHTTP is just a wrapper around the ServeHTTP function
// of the prohttp default Handler.
func (exp *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
promhttp.Handler().ServeHTTP(w, r)
}

// SendDataFlow can be used to update the metrics exported by the broker
// about network usage, in particular upload and download bandwidth. `data`
// Type should either be "read" or "write", referring respectively to download
// and upload operations.
func (b *Exporter) SendDataFlow(labels map[string]string, data *source.DataFlow) {
func (exp *Exporter) SendDataFlow(labels map[string]string, data *source.DataFlow) {
switch data.Type {
case "read":
receiveBytes.With(prometheus.Labels(labels)).Add(float64(data.N))
Expand All @@ -71,3 +78,9 @@ func (b *Exporter) SendDataFlow(labels map[string]string, data *source.DataFlow)
default:
}
}

// INcSelectedSource is used to update the number of times a source was
// chosen.
func (exp *Exporter) IncSelectedSource(labels map[string]string) {
selectSource.With(prometheus.Labels(labels)).Inc()
}
12 changes: 6 additions & 6 deletions source/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,19 @@ type Interface struct {

metrics struct {
sync.Mutex
broker MetricsExporter
exporter MetricsExporter
}

conns *conns
}

// SetMetricsExporter sets br as the default MetricsExporter of interface
// SetMetricsExporter sets exp as the default MetricsExporter of interface
// `i`. It is safe to use by multiple goroutines.
func (i *Interface) SetMetricsExporter(br MetricsExporter) {
func (i *Interface) SetMetricsExporter(exp MetricsExporter) {
i.metrics.Lock()
defer i.metrics.Unlock()

i.metrics.broker = br
i.metrics.exporter = exp
}

// Name implements the core.Source interface.
Expand Down Expand Up @@ -121,14 +121,14 @@ func (i *Interface) Follow(conn net.Conn) net.Conn {
// SendMetrics sends the data using the Interface's MetricsExporter.
// It is safe to use by multiple goroutines.
func (i *Interface) SendMetrics(labels map[string]string, data *DataFlow) {
if i.metrics.broker == nil {
if i.metrics.exporter == nil {
return
}

i.metrics.Lock()
defer i.metrics.Unlock()

i.metrics.broker.SendDataFlow(labels, data)
i.metrics.exporter.SendDataFlow(labels, data)
}

// Close closes all open connections.
Expand Down

0 comments on commit 6aa182f

Please sign in to comment.