Skip to content

Commit

Permalink
Decorate RPC errors with additional information about whether they ar…
Browse files Browse the repository at this point in the history
…e transmission errors or errors returned by the host (#1039)
  • Loading branch information
ChrisSchinnerl authored Mar 13, 2024
2 parents 4539e40 + e5df42c commit 0567309
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 61 deletions.
11 changes: 6 additions & 5 deletions autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/build"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/wallet"
"go.sia.tech/renterd/webhooks"
Expand Down Expand Up @@ -299,7 +300,7 @@ func (ap *Autopilot) Run() error {

// perform maintenance
setChanged, err := ap.c.performContractMaintenance(ap.shutdownCtx, w)
if err != nil && isErr(err, context.Canceled) {
if err != nil && utils.IsErr(err, context.Canceled) {
return
} else if err != nil {
ap.logger.Errorf("contract maintenance failed, err: %v", err)
Expand Down Expand Up @@ -405,9 +406,9 @@ func (ap *Autopilot) blockUntilConfigured(interrupt <-chan time.Time) (configure
cancel()

// if the config was not found, or we were unable to fetch it, keep blocking
if isErr(err, context.Canceled) {
if utils.IsErr(err, context.Canceled) {
return
} else if isErr(err, api.ErrAutopilotNotFound) {
} else if utils.IsErr(err, api.ErrAutopilotNotFound) {
once.Do(func() { ap.logger.Info("autopilot is waiting to be configured...") })
} else if err != nil {
ap.logger.Errorf("autopilot is unable to fetch its configuration from the bus, err: %v", err)
Expand Down Expand Up @@ -438,7 +439,7 @@ func (ap *Autopilot) blockUntilOnline() (online bool) {
online = len(peers) > 0
cancel()

if isErr(err, context.Canceled) {
if utils.IsErr(err, context.Canceled) {
return
} else if err != nil {
ap.logger.Errorf("failed to get peers, err: %v", err)
Expand Down Expand Up @@ -472,7 +473,7 @@ func (ap *Autopilot) blockUntilSynced(interrupt <-chan time.Time) (synced, block
cancel()

// if an error occurred, or if we're not synced, we continue
if isErr(err, context.Canceled) {
if utils.IsErr(err, context.Canceled) {
return
} else if err != nil {
ap.logger.Errorf("failed to get consensus state, err: %v", err)
Expand Down
19 changes: 10 additions & 9 deletions autopilot/contract_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/siad/build"
)

Expand Down Expand Up @@ -65,14 +66,14 @@ func (pm pruneMetrics) String() string {
func (pr pruneResult) toAlert() (id types.Hash256, alert *alerts.Alert) {
id = alertIDForContract(alertPruningID, pr.fcid)

if shouldTrigger := pr.err != nil && !((isErr(pr.err, errInvalidMerkleProof) && build.VersionCmp(pr.version, "1.6.0") < 0) ||
isErr(pr.err, api.ErrContractNotFound) || // contract got archived
isErr(pr.err, errConnectionRefused) ||
isErr(pr.err, errConnectionTimedOut) ||
isErr(pr.err, errConnectionResetByPeer) ||
isErr(pr.err, errInvalidHandshakeSignature) ||
isErr(pr.err, errNoRouteToHost) ||
isErr(pr.err, errNoSuchHost)); shouldTrigger {
if shouldTrigger := pr.err != nil && !((utils.IsErr(pr.err, errInvalidMerkleProof) && build.VersionCmp(pr.version, "1.6.0") < 0) ||
utils.IsErr(pr.err, api.ErrContractNotFound) || // contract got archived
utils.IsErr(pr.err, errConnectionRefused) ||
utils.IsErr(pr.err, errConnectionTimedOut) ||
utils.IsErr(pr.err, errConnectionResetByPeer) ||
utils.IsErr(pr.err, errInvalidHandshakeSignature) ||
utils.IsErr(pr.err, errNoRouteToHost) ||
utils.IsErr(pr.err, errNoSuchHost)); shouldTrigger {
alert = newContractPruningFailedAlert(pr.hk, pr.version, pr.fcid, pr.err)
}
return
Expand Down Expand Up @@ -196,7 +197,7 @@ func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneRes
pruned, remaining, err := w.RHPPruneContract(ctx, fcid, timeoutPruneContract)
if err != nil && pruned == 0 {
return pruneResult{fcid: fcid, hk: host.PublicKey, version: host.Settings.Version, err: err}
} else if err != nil && isErr(err, context.DeadlineExceeded) {
} else if err != nil && utils.IsErr(err, context.DeadlineExceeded) {
err = nil
}

Expand Down
5 changes: 3 additions & 2 deletions autopilot/contractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/wallet"
"go.sia.tech/renterd/worker"
"go.uber.org/zap"
Expand Down Expand Up @@ -1425,7 +1426,7 @@ func (c *contractor) renewContract(ctx context.Context, w Worker, ci contractInf
"renterFunds", renterFunds,
"expectedNewStorage", expectedNewStorage,
)
if strings.Contains(err.Error(), wallet.ErrInsufficientBalance.Error()) {
if utils.IsErr(err, wallet.ErrInsufficientBalance) && !worker.IsErrHost(err) {
return api.ContractMetadata{}, false, err
}
return api.ContractMetadata{}, true, err
Expand Down Expand Up @@ -1508,7 +1509,7 @@ func (c *contractor) refreshContract(ctx context.Context, w Worker, ci contractI
return api.ContractMetadata{}, true, err
}
c.logger.Errorw("refresh failed", zap.Error(err), "hk", hk, "fcid", fcid)
if strings.Contains(err.Error(), wallet.ErrInsufficientBalance.Error()) {
if utils.IsErr(err, wallet.ErrInsufficientBalance) && !worker.IsErrHost(err) {
return api.ContractMetadata{}, false, err
}
return api.ContractMetadata{}, true, err
Expand Down
10 changes: 2 additions & 8 deletions autopilot/ipfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/internal/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -137,7 +138,7 @@ func (r *ipResolver) lookup(hostIP string) ([]string, error) {
addrs, err := r.resolver.LookupIPAddr(ctx, host)
if err != nil {
// check the cache if it's an i/o timeout or server misbehaving error
if isErr(err, errIOTimeout) || isErr(err, errServerMisbehaving) {
if utils.IsErr(err, errIOTimeout) || utils.IsErr(err, errServerMisbehaving) {
if entry, found := r.cache[hostIP]; found && time.Since(entry.created) < ipCacheEntryValidity {
r.logger.Debugf("using cached IP addresses for %v, err: %v", hostIP, err)
return entry.subnets, nil
Expand Down Expand Up @@ -188,10 +189,3 @@ func parseSubnets(addresses []net.IPAddr) []string {

return subnets
}

func isErr(err error, target error) bool {
if errors.Is(err, target) {
return true
}
return err != nil && target != nil && strings.Contains(err.Error(), target.Error())
}
13 changes: 7 additions & 6 deletions autopilot/ipfilter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/internal/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -61,20 +62,20 @@ func TestIPResolver(t *testing.T) {

// test lookup error
r.setNextErr(errors.New("unknown error"))
if _, err := ipr.lookup("example.com:1234"); !isErr(err, errors.New("unknown error")) {
if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errors.New("unknown error")) {
t.Fatal("unexpected error", err)
}

// test IO timeout - no cache entry
r.setNextErr(errIOTimeout)
if _, err := ipr.lookup("example.com:1234"); !isErr(err, errIOTimeout) {
if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errIOTimeout) {
t.Fatal("unexpected error", err)
}

// test IO timeout - expired cache entry
ipr.cache["example.com:1234"] = ipCacheEntry{subnets: []string{"a"}}
r.setNextErr(errIOTimeout)
if _, err := ipr.lookup("example.com:1234"); !isErr(err, errIOTimeout) {
if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errIOTimeout) {
t.Fatal("unexpected error", err)
}

Expand All @@ -89,19 +90,19 @@ func TestIPResolver(t *testing.T) {

// test too many addresses - more than two
r.setAddr("example.com", []net.IPAddr{{}, {}, {}})
if _, err := ipr.lookup("example.com:1234"); !isErr(err, errTooManyAddresses) {
if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errTooManyAddresses) {
t.Fatal("unexpected error", err)
}

// test too many addresses - two of the same type
r.setAddr("example.com", []net.IPAddr{{IP: net.IPv4(1, 2, 3, 4)}, {IP: net.IPv4(1, 2, 3, 4)}})
if _, err := ipr.lookup("example.com:1234"); !isErr(err, errTooManyAddresses) {
if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errTooManyAddresses) {
t.Fatal("unexpected error", err)
}

// test invalid addresses
r.setAddr("example.com", []net.IPAddr{{IP: ipv4Localhost}, {IP: net.IP{127, 0, 0, 2}}})
if _, err := ipr.lookup("example.com:1234"); !isErr(err, errTooManyAddresses) {
if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errTooManyAddresses) {
t.Fatal("unexpected error", err)
}

Expand Down
3 changes: 2 additions & 1 deletion autopilot/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/stats"
"go.uber.org/zap"
Expand Down Expand Up @@ -156,7 +157,7 @@ func (m *migrator) performMigrations(p *workerPool) {

if err != nil {
m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, err)
skipAlert := isErr(err, api.ErrSlabNotFound)
skipAlert := utils.IsErr(err, api.ErrSlabNotFound)
if !skipAlert {
if res.SurchargeApplied {
m.ap.RegisterAlert(ctx, newCriticalMigrationFailedAlert(j.Key, j.Health, err))
Expand Down
3 changes: 2 additions & 1 deletion autopilot/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/internal/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -314,7 +315,7 @@ func (s *scanner) launchScanWorkers(ctx context.Context, w scanWorker, reqs chan
scan, err := w.RHPScan(ctx, req.hostKey, req.hostIP, s.currentTimeout())
if err != nil {
break // abort
} else if !isErr(errors.New(scan.ScanError), errIOTimeout) && scan.Ping > 0 {
} else if !utils.IsErr(errors.New(scan.ScanError), errIOTimeout) && scan.Ping > 0 {
s.tracker.addDataPoint(time.Duration(scan.Ping))
}

Expand Down
20 changes: 20 additions & 0 deletions internal/utils/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package utils

import (
"errors"
"strings"
)

// IsErr can be used to compare an error to a target and also works when used on
// errors that haven't been wrapped since it will fall back to a string
// comparison. Useful to check errors returned over the network.
func IsErr(err error, target error) bool {
if (err == nil) != (target == nil) {
return false
} else if errors.Is(err, target) {
return true
}
// TODO: we can get rid of the lower casing once siad is gone and
// renterd/hostd use the same error messages
return strings.Contains(strings.ToLower(err.Error()), strings.ToLower(target.Error()))
}
75 changes: 56 additions & 19 deletions worker/rhpv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"math"
"math/big"
"net"
"strings"
"sync"
"time"

Expand All @@ -20,6 +19,7 @@ import (
"go.sia.tech/mux/v1"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/siad/crypto"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -47,6 +47,12 @@ const (
)

var (
// errHost is used to wrap rpc errors returned by the host.
errHost = errors.New("host responded with error")

// errTransport is used to wrap rpc errors caused by the transport.
errTransport = errors.New("transport error")

// errBalanceInsufficient occurs when a withdrawal failed because the
// account balance was insufficient.
errBalanceInsufficient = errors.New("ephemeral account balance was insufficient")
Expand Down Expand Up @@ -83,31 +89,42 @@ var (
errWithdrawalExpired = errors.New("withdrawal request expired")
)

func isBalanceInsufficient(err error) bool { return isError(err, errBalanceInsufficient) }
func isBalanceMaxExceeded(err error) bool { return isError(err, errBalanceMaxExceeded) }
// IsErrHost indicates whether an error was returned by a host as part of an RPC.
func IsErrHost(err error) bool {
return utils.IsErr(err, errHost)
}

func isBalanceInsufficient(err error) bool { return utils.IsErr(err, errBalanceInsufficient) }
func isBalanceMaxExceeded(err error) bool { return utils.IsErr(err, errBalanceMaxExceeded) }
func isClosedStream(err error) bool {
return isError(err, mux.ErrClosedStream) || isError(err, net.ErrClosed)
return utils.IsErr(err, mux.ErrClosedStream) || utils.IsErr(err, net.ErrClosed)
}
func isInsufficientFunds(err error) bool { return isError(err, ErrInsufficientFunds) }
func isPriceTableExpired(err error) bool { return isError(err, errPriceTableExpired) }
func isPriceTableGouging(err error) bool { return isError(err, errPriceTableGouging) }
func isPriceTableNotFound(err error) bool { return isError(err, errPriceTableNotFound) }
func isInsufficientFunds(err error) bool { return utils.IsErr(err, ErrInsufficientFunds) }
func isPriceTableExpired(err error) bool { return utils.IsErr(err, errPriceTableExpired) }
func isPriceTableGouging(err error) bool { return utils.IsErr(err, errPriceTableGouging) }
func isPriceTableNotFound(err error) bool { return utils.IsErr(err, errPriceTableNotFound) }
func isSectorNotFound(err error) bool {
return isError(err, errSectorNotFound) || isError(err, errSectorNotFoundOld)
return utils.IsErr(err, errSectorNotFound) || utils.IsErr(err, errSectorNotFoundOld)
}
func isWithdrawalsInactive(err error) bool { return isError(err, errWithdrawalsInactive) }
func isWithdrawalExpired(err error) bool { return isError(err, errWithdrawalExpired) }
func isWithdrawalsInactive(err error) bool { return utils.IsErr(err, errWithdrawalsInactive) }
func isWithdrawalExpired(err error) bool { return utils.IsErr(err, errWithdrawalExpired) }

func isError(err error, target error) bool {
if err == nil {
return err == target
// wrapRPCErr extracts the innermost error, wraps it in either a errHost or
// errTransport and finally wraps it using the provided fnName.
func wrapRPCErr(err *error, fnName string) {
if *err == nil {
return
}
innerErr := *err
for errors.Unwrap(innerErr) != nil {
innerErr = errors.Unwrap(innerErr)
}
// compare error first
if errors.Is(err, target) {
return true
if errors.As(*err, new(*rhpv3.RPCError)) {
*err = fmt.Errorf("%w: '%w'", errHost, innerErr)
} else {
*err = fmt.Errorf("%w: '%w'", errTransport, innerErr)
}
// then compare the string in case the error was returned by a host
return strings.Contains(strings.ToLower(err.Error()), strings.ToLower(target.Error()))
*err = fmt.Errorf("%s: %w", fnName, *err)
}

// transportV3 is a reference-counted wrapper for rhpv3.Transport.
Expand All @@ -125,6 +142,26 @@ type streamV3 struct {
*rhpv3.Stream
}

func (s *streamV3) ReadResponse(resp rhpv3.ProtocolObject, maxLen uint64) (err error) {
defer wrapRPCErr(&err, "ReadResponse")
return s.Stream.ReadResponse(resp, maxLen)
}

func (s *streamV3) WriteResponse(resp rhpv3.ProtocolObject) (err error) {
defer wrapRPCErr(&err, "WriteResponse")
return s.Stream.WriteResponse(resp)
}

func (s *streamV3) ReadRequest(req rhpv3.ProtocolObject, maxLen uint64) (err error) {
defer wrapRPCErr(&err, "ReadRequest")
return s.Stream.ReadRequest(req, maxLen)
}

func (s *streamV3) WriteRequest(rpcID types.Specifier, req rhpv3.ProtocolObject) (err error) {
defer wrapRPCErr(&err, "WriteRequest")
return s.Stream.WriteRequest(rpcID, req)
}

// Close closes the stream and cancels the goroutine launched by DialStream.
func (s *streamV3) Close() error {
s.cancel()
Expand Down
Loading

0 comments on commit 0567309

Please sign in to comment.