Skip to content
This repository has been archived by the owner on Mar 29, 2024. It is now read-only.

Commit

Permalink
Merge pull request #186 from safing/feature/better-connect-metrics
Browse files Browse the repository at this point in the history
Improve connect op metrics
  • Loading branch information
dhaavi authored Oct 6, 2023
2 parents f903805 + c579c62 commit 70b087d
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 47 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: '^1.20'
go-version: '^1.21'

- name: Get dependencies
run: go mod download
Expand All @@ -46,7 +46,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: '^1.20'
go-version: '^1.21'

- name: Get dependencies
run: go mod download
Expand Down
6 changes: 3 additions & 3 deletions crew/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (t *Tunnel) connectWorker(ctx context.Context) (err error) {
// TODO: Clean this up.
t.connInfo.Lock()
defer t.connInfo.Unlock()
t.connInfo.Failed(fmt.Sprintf("failed to establish route: %s", err), "")
t.connInfo.Failed(fmt.Sprintf("SPN failed to establish route: %s", err), "")
t.connInfo.Save()

tracer.Warningf("spn/crew: failed to establish route for %s: %s", t.connInfo, err)
Expand All @@ -97,11 +97,11 @@ func (t *Tunnel) connectWorker(ctx context.Context) (err error) {

t.connInfo.Lock()
defer t.connInfo.Unlock()
t.connInfo.Failed(tErr.Error(), "")
t.connInfo.Failed(fmt.Sprintf("SPN failed to initialize data tunnel (connect op): %s", tErr.Error()), "")
t.connInfo.Save()

// TODO: try with another route?
tracer.Warningf("spn/crew: failed to initialize tunnel for %s: %s", t.connInfo, err)
tracer.Warningf("spn/crew: failed to initialize data tunnel (connect op) for %s: %s", t.connInfo, err)
return tErr
}

Expand Down
75 changes: 72 additions & 3 deletions crew/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ import (
)

var (
newConnectOp *metrics.Counter
connectOpCnt *metrics.Counter
connectOpCntError *metrics.Counter
connectOpCntBadRequest *metrics.Counter
connectOpCntCanceled *metrics.Counter
connectOpCntFailed *metrics.Counter
connectOpCntConnected *metrics.Counter
connectOpCntRateLimited *metrics.Counter

connectOpIncomingBytes *metrics.Counter
connectOpOutgoingBytes *metrics.Counter

Expand All @@ -29,9 +36,9 @@ func registerMetrics() (err error) {
return nil
}

// Connect Op Stats.
// Connect Op Stats on client.

newConnectOp, err = metrics.NewCounter(
connectOpCnt, err = metrics.NewCounter(
"spn/op/connect/total",
nil,
&metrics.Options{
Expand All @@ -45,6 +52,68 @@ func registerMetrics() (err error) {
return err
}

// Connect Op Stats on server.

connectOpCntOptions := &metrics.Options{
Name: "SPN Total Connect Operations",
Permission: api.PermitUser,
Persist: true,
}

connectOpCntError, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "error"},
connectOpCntOptions,
)
if err != nil {
return err
}

connectOpCntBadRequest, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "bad_request"},
connectOpCntOptions,
)
if err != nil {
return err
}

connectOpCntCanceled, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "canceled"},
connectOpCntOptions,
)
if err != nil {
return err
}

connectOpCntFailed, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "failed"},
connectOpCntOptions,
)
if err != nil {
return err
}

connectOpCntConnected, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "connected"},
connectOpCntOptions,
)
if err != nil {
return err
}

connectOpCntRateLimited, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "rate_limited"},
connectOpCntOptions,
)
if err != nil {
return err
}

_, err = metrics.NewGauge(
"spn/op/connect/active",
nil,
Expand Down
75 changes: 59 additions & 16 deletions crew/op_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func init() {
// NewConnectOp starts a new connect operation.
func NewConnectOp(tunnel *Tunnel) (*ConnectOp, *terminal.Error) {
// Submit metrics.
newConnectOp.Inc()
connectOpCnt.Inc()

// Create request.
request := &ConnectRequest{
Expand Down Expand Up @@ -168,9 +168,6 @@ func NewConnectOp(tunnel *Tunnel) (*ConnectOp, *terminal.Error) {
}

func startConnectOp(t terminal.Terminal, opID uint32, data *container.Container) (terminal.Operation, *terminal.Error) {
// Submit metrics.
newConnectOp.Inc()

// Check if we are running a public hub.
if !conf.PublicHub() {
return nil, terminal.ErrPermissionDenied.With("connecting is only allowed on public hubs")
Expand All @@ -180,14 +177,17 @@ func startConnectOp(t terminal.Terminal, opID uint32, data *container.Container)
request := &ConnectRequest{}
_, err := dsd.Load(data.CompileData(), request)
if err != nil {
connectOpCntError.Inc() // More like a protocol/system error than a bad request.
return nil, terminal.ErrMalformedData.With("failed to parse connect request: %w", err)
}
if request.QueueSize == 0 || request.QueueSize > terminal.MaxQueueSize {
connectOpCntError.Inc() // More like a protocol/system error than a bad request.
return nil, terminal.ErrInvalidOptions.With("invalid queue size of %d", request.QueueSize)
}

// Check if IP seems valid.
if len(request.IP) != net.IPv4len && len(request.IP) != net.IPv6len {
connectOpCntError.Inc() // More like a protocol/system error than a bad request.
return nil, terminal.ErrInvalidOptions.With("ip address is not valid")
}

Expand All @@ -213,6 +213,7 @@ func (op *ConnectOp) handleSetup(_ context.Context) error {
if sessionTerm, ok := op.t.(terminal.SessionTerminal); ok {
session = sessionTerm.GetSession()
} else {
connectOpCntError.Inc()
log.Errorf("spn/crew: %T is not a session terminal, aborting op %s#%d", op.t, op.t.FmtID(), op.ID())
op.Stop(op, terminal.ErrInternalError.With("no session available"))
return nil
Expand All @@ -225,6 +226,7 @@ func (op *ConnectOp) handleSetup(_ context.Context) error {

// If context was canceled, stop operation.
if cancelErr != nil {
connectOpCntCanceled.Inc()
op.Stop(op, terminal.ErrCanceled.With(cancelErr.Error()))
}

Expand All @@ -235,11 +237,14 @@ func (op *ConnectOp) handleSetup(_ context.Context) error {
func (op *ConnectOp) setup(session *terminal.Session) {
// Rate limit before connecting.
if tErr := session.RateLimit(); tErr != nil {
// Fake connection error when rate limited.
// Add rate limit info to error.
if tErr.Is(terminal.ErrRateLimited) {
connectOpCntRateLimited.Inc()
op.Stop(op, tErr.With(session.RateLimitInfo()))
return
}

connectOpCntError.Inc()
op.Stop(op, tErr)
return
}
Expand All @@ -248,27 +253,31 @@ func (op *ConnectOp) setup(session *terminal.Session) {
ipScope := netutils.GetIPScope(op.request.IP)
if ipScope != netutils.Global {
session.ReportSuspiciousActivity(terminal.SusFactorQuiteUnusual)
connectOpCntBadRequest.Inc()
op.Stop(op, terminal.ErrPermissionDenied.With("denied request to connect to non-global IP %s", op.request.IP))
return
}

// Check exit policy.
if tErr := checkExitPolicy(op.request); tErr != nil {
session.ReportSuspiciousActivity(terminal.SusFactorQuiteUnusual)
connectOpCntBadRequest.Inc()
op.Stop(op, tErr)
return
}

// Check one last time before connecting if operation was not canceled.
if op.Ctx().Err() != nil {
op.Stop(op, terminal.ErrCanceled.With(op.Ctx().Err().Error()))
connectOpCntCanceled.Inc()
return
}

// Connect to destination.
dialNet := op.request.DialNetwork()
if dialNet == "" {
session.ReportSuspiciousActivity(terminal.SusFactorCommon)
connectOpCntBadRequest.Inc()
op.Stop(op, terminal.ErrIncorrectUsage.With("protocol %s is not supported", op.request.Protocol))
return
}
Expand All @@ -285,10 +294,13 @@ func (op *ConnectOp) setup(session *terminal.Session) {
switch {
case errors.As(err, &netError) && netError.Timeout():
session.ReportSuspiciousActivity(terminal.SusFactorCommon)
connectOpCntFailed.Inc()
case errors.Is(err, context.Canceled):
session.ReportSuspiciousActivity(terminal.SusFactorCommon)
connectOpCntCanceled.Inc()
default:
session.ReportSuspiciousActivity(terminal.SusFactorWeirdButOK)
connectOpCntFailed.Inc()
}

op.Stop(op, terminal.ErrConnectionError.With("failed to connect to %s: %w", op.request, err))
Expand All @@ -301,6 +313,7 @@ func (op *ConnectOp) setup(session *terminal.Session) {
module.StartWorker("connect op conn writer", op.connWriter)
module.StartWorker("connect op flow handler", op.dfq.FlowHandler)

connectOpCntConnected.Inc()
log.Infof("spn/crew: connected op %s#%d to %s", op.t.FmtID(), op.ID(), op.request)
}

Expand Down Expand Up @@ -516,18 +529,48 @@ func (op *ConnectOp) HandleStop(err *terminal.Error) (errorToSend *terminal.Erro
// Cancel workers.
op.cancelCtx()

// Avoid connecting to destination via this Hub if the was a connection
// error and no data was received.
if op.entry && // On clients only.
err.Is(terminal.ErrConnectionError) &&
op.outgoingTraffic.Load() == 0 {
// Only if no data was received (ie. sent to local application).
op.tunnel.avoidDestinationHub()
}
// Special client-side handling.
if op.entry {
// Mark the connection as failed if there was an error and no data was sent to the app yet.
if err.IsError() && op.outgoingTraffic.Load() == 0 {
// Set connection to failed and save it to propagate the update.
c := op.tunnel.connInfo
func() {
c.Lock()
defer c.Unlock()

if err.IsExternal() {
c.Failed(fmt.Sprintf(
"the exit node reported an error: %s", err,
), "")
} else {
c.Failed(fmt.Sprintf(
"connection failed locally: %s", err,
), "")
}

// If we are on the client, don't leak local errors to the server.
if op.entry && !err.IsExternal() {
return terminal.ErrStopping
c.Save()
}()
}

// Avoid connecting to the destination via this Hub if:
// - The error is external - ie. from the server.
// - The error is a connection error.
// - No data was received.
// This indicates that there is some network level issue that we can
// possibly work around by using another exit node.
if err.IsError() && err.IsExternal() &&
err.Is(terminal.ErrConnectionError) &&
op.outgoingTraffic.Load() == 0 {
op.tunnel.avoidDestinationHub()
}

// Don't leak local errors to the server.
if !err.IsExternal() {
// Change error that is reported.
return terminal.ErrStopping
}
}

return err
}
18 changes: 9 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/safing/spn

go 1.19
go 1.21

require (
github.com/awalterschulze/gographviz v2.0.3+incompatible
Expand All @@ -11,12 +11,12 @@ require (
github.com/rot256/pblind v0.0.0-20230622102829-4dc2c6e4b857
github.com/safing/jess v0.3.1
github.com/safing/portbase v0.18.2
github.com/safing/portmaster v1.4.9
github.com/safing/portmaster v1.4.10-0.20231006102818-4f0adc87e70c
github.com/spf13/cobra v1.7.0
github.com/stretchr/testify v1.8.4
github.com/tevino/abool v1.2.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/net v0.15.0
golang.org/x/exp v0.0.0-20231005195138-3e424a577f31
golang.org/x/net v0.16.0
)

require (
Expand Down Expand Up @@ -68,13 +68,13 @@ require (
github.com/x448/float16 v0.8.4 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.etcd.io/bbolt v1.3.7 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.13.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gvisor.dev/gvisor v0.0.0-20220817001344-846276b3dbc5 // indirect
gvisor.dev/gvisor v0.0.0-20231006032704-15cc3fcbbd77 // indirect
)
Loading

0 comments on commit 70b087d

Please sign in to comment.