Skip to content

Commit

Permalink
Record wallet metrics in the bus (#1458)
Browse files Browse the repository at this point in the history
This PR adds a recorder in the bus that periodically records wallet
metrics.

Kept it simple, figured that is fine for now, couple of things I
considered were:
- expose the update interval
- increasing interval and add a `Trigger` which we call on
`FundTransaction`
  • Loading branch information
ChrisSchinnerl authored Aug 19, 2024
2 parents fdea7f6 + d17ea3e commit 60ee37f
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 10 deletions.
1 change: 1 addition & 0 deletions api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type (
Confirmed types.Currency `json:"confirmed"`
Spendable types.Currency `json:"spendable"`
Unconfirmed types.Currency `json:"unconfirmed"`
Immature types.Currency `json:"immature"`
}

WalletMetricsQueryOpts struct{}
Expand Down
22 changes: 16 additions & 6 deletions bus/bus.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package bus

// TODOs:
// - add wallet metrics
// - add UPNP support

import (
Expand Down Expand Up @@ -33,9 +32,10 @@ import (
)

const (
defaultPinUpdateInterval = 5 * time.Minute
defaultPinRateWindow = 6 * time.Hour
stdTxnSize = 1200 // bytes
defaultWalletRecordMetricInterval = 5 * time.Minute
defaultPinUpdateInterval = 5 * time.Minute
defaultPinRateWindow = 6 * time.Hour
stdTxnSize = 1200 // bytes
)

// Client re-exports the client from the client package.
Expand Down Expand Up @@ -284,6 +284,7 @@ type (
RecordContractSetChurnMetric(ctx context.Context, metrics ...api.ContractSetChurnMetric) error

WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error)
RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error
}

// A SettingStore stores settings.
Expand All @@ -293,6 +294,10 @@ type (
Settings(ctx context.Context) ([]string, error)
UpdateSetting(ctx context.Context, key, value string) error
}

WalletMetricsRecorder interface {
Shutdown(context.Context) error
}
)

type Bus struct {
Expand All @@ -314,8 +319,9 @@ type Bus struct {
mtrcs MetricsStore
ss SettingStore

contractLocker ContractLocker
sectors UploadingSectorsCache
contractLocker ContractLocker
sectors UploadingSectorsCache
walletMetricsRecorder WalletMetricsRecorder

logger *zap.SugaredLogger
}
Expand Down Expand Up @@ -365,6 +371,9 @@ func New(ctx context.Context, am AlertManager, wm WebhooksManager, cm ChainManag
// create chain subscriber
b.cs = ibus.NewChainSubscriber(wm, cm, store, w, announcementMaxAge, l)

// create wallet metrics recorder
b.walletMetricsRecorder = ibus.NewWalletMetricRecorder(store, w, defaultWalletRecordMetricInterval, l)

return b, nil
}

Expand Down Expand Up @@ -514,6 +523,7 @@ func (b *Bus) Handler() http.Handler {
// Shutdown shuts down the bus.
func (b *Bus) Shutdown(ctx context.Context) error {
return errors.Join(
b.walletMetricsRecorder.Shutdown(ctx),
b.accountsMgr.Shutdown(ctx),
b.webhooksMgr.Shutdown(ctx),
b.pinMgr.Shutdown(ctx),
Expand Down
104 changes: 104 additions & 0 deletions internal/bus/walletmetricsrecorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package bus

import (
"context"
"sync"
"time"

"go.sia.tech/coreutils/wallet"
"go.sia.tech/renterd/api"
"go.uber.org/zap"
)

type (
WalletMetricsRecorder struct {
store MetricsStore
wallet WalletBalance

shutdownChan chan struct{}
wg sync.WaitGroup

logger *zap.SugaredLogger
}

MetricsStore interface {
RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error
}

WalletBalance interface {
Balance() (wallet.Balance, error)
}
)

// NewWalletMetricRecorder returns a recorder that periodically records wallet
// metrics. The recorder is already running and can be stopped by calling
// Shutdown.
func NewWalletMetricRecorder(store MetricsStore, wallet WalletBalance, interval time.Duration, logger *zap.Logger) *WalletMetricsRecorder {
logger = logger.Named("walletmetricsrecorder")
recorder := &WalletMetricsRecorder{
store: store,
wallet: wallet,
shutdownChan: make(chan struct{}),
logger: logger.Sugar(),
}
recorder.run(interval)
return recorder
}

func (wmr *WalletMetricsRecorder) run(interval time.Duration) {
wmr.wg.Add(1)
go func() {
defer wmr.wg.Done()

t := time.NewTicker(interval)
defer t.Stop()

for {
balance, err := wmr.wallet.Balance()
if err != nil {
wmr.logger.Error("failed to get wallet balance", zap.Error(err))
} else {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
if err = wmr.store.RecordWalletMetric(ctx, api.WalletMetric{
Timestamp: api.TimeRFC3339(time.Now().UTC()),
Spendable: balance.Spendable,
Confirmed: balance.Confirmed,
Unconfirmed: balance.Unconfirmed,
Immature: balance.Immature,
}); err != nil {
wmr.logger.Error("failed to record wallet metric", zap.Error(err))
} else {
wmr.logger.Debugw("successfully recorded wallet metrics",
zap.Stringer("spendable", balance.Spendable),
zap.Stringer("confirmed", balance.Confirmed),
zap.Stringer("unconfirmed", balance.Unconfirmed),
zap.Stringer("immature", balance.Immature))
}
cancel()
}

select {
case <-wmr.shutdownChan:
return
case <-t.C:
}
}
}()
}

func (wmr *WalletMetricsRecorder) Shutdown(ctx context.Context) error {
close(wmr.shutdownChan)

waitChan := make(chan struct{})
go func() {
wmr.wg.Wait()
close(waitChan)
}()

select {
case <-ctx.Done():
return context.Cause(ctx)
case <-waitChan:
return nil
}
}
6 changes: 6 additions & 0 deletions internal/sql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ var (
return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00001_idx_contracts_fcid_timestamp", log)
},
},
{
ID: "00002_idx_wallet_metrics_immature",
Migrate: func(tx Tx) error {
return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00002_idx_wallet_metrics_immature", log)
},
},
}
}
)
Expand Down
1 change: 0 additions & 1 deletion internal/test/e2e/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func TestMetrics(t *testing.T) {
}

// check wallet metrics
t.Skip("TODO: check wallet metrics")
wm, err := b.WalletMetrics(context.Background(), start, 10, time.Minute, api.WalletMetricsQueryOpts{})
tt.OK(err)
if len(wm) == 0 {
Expand Down
1 change: 1 addition & 0 deletions stores/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ func TestWalletMetrics(t *testing.T) {
Confirmed: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)),
Unconfirmed: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)),
Spendable: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)),
Immature: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)),
}
if err := ss.RecordWalletMetric(context.Background(), metric); err != nil {
t.Fatal(err)
Expand Down
5 changes: 4 additions & 1 deletion stores/sql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func RecordPerformanceMetric(ctx context.Context, tx sql.Tx, metrics ...api.Perf
}

func RecordWalletMetric(ctx context.Context, tx sql.Tx, metrics ...api.WalletMetric) error {
insertStmt, err := tx.Prepare(ctx, "INSERT INTO wallets (created_at, timestamp, confirmed_lo, confirmed_hi, spendable_lo, spendable_hi, unconfirmed_lo, unconfirmed_hi) VALUES (?, ?, ?, ?, ?, ?, ?, ?)")
insertStmt, err := tx.Prepare(ctx, "INSERT INTO wallets (created_at, timestamp, confirmed_lo, confirmed_hi, spendable_lo, spendable_hi, unconfirmed_lo, unconfirmed_hi, immature_hi, immature_lo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
if err != nil {
return fmt.Errorf("failed to prepare statement to insert wallet metric: %w", err)
}
Expand All @@ -382,6 +382,8 @@ func RecordWalletMetric(ctx context.Context, tx sql.Tx, metrics ...api.WalletMet
Unsigned64(metric.Spendable.Hi),
Unsigned64(metric.Unconfirmed.Lo),
Unsigned64(metric.Unconfirmed.Hi),
Unsigned64(metric.Immature.Lo),
Unsigned64(metric.Immature.Hi),
)
if err != nil {
return fmt.Errorf("failed to insert wallet metric: %w", err)
Expand All @@ -407,6 +409,7 @@ func WalletMetrics(ctx context.Context, tx sql.Tx, start time.Time, n uint64, in
(*Unsigned64)(&m.Confirmed.Lo), (*Unsigned64)(&m.Confirmed.Hi),
(*Unsigned64)(&m.Spendable.Lo), (*Unsigned64)(&m.Spendable.Hi),
(*Unsigned64)(&m.Unconfirmed.Lo), (*Unsigned64)(&m.Unconfirmed.Hi),
(*Unsigned64)(&m.Immature.Lo), (*Unsigned64)(&m.Immature.Hi),
)
if err != nil {
err = fmt.Errorf("failed to scan contract set metric: %w", err)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE `wallets` ADD COLUMN `immature_lo` bigint NOT NULL, ADD COLUMN `immature_hi` bigint NOT NULL;
CREATE INDEX `idx_wallets_immature` ON `wallets`(`immature_lo`,`immature_hi`);
5 changes: 4 additions & 1 deletion stores/sql/mysql/migrations/metrics/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,12 @@ CREATE TABLE `wallets` (
`spendable_hi` bigint NOT NULL,
`unconfirmed_lo` bigint NOT NULL,
`unconfirmed_hi` bigint NOT NULL,
`immature_lo` bigint NOT NULL,
`immature_hi` bigint NOT NULL,
PRIMARY KEY (`id`),
KEY `idx_wallets_timestamp` (`timestamp`),
KEY `idx_confirmed` (`confirmed_lo`,`confirmed_hi`),
KEY `idx_spendable` (`spendable_lo`,`spendable_hi`),
KEY `idx_unconfirmed` (`unconfirmed_lo`,`unconfirmed_hi`)
KEY `idx_unconfirmed` (`unconfirmed_lo`,`unconfirmed_hi`),
KEY `idx_wallets_immature` (`immature_lo`,`immature_hi`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE `wallets` ADD COLUMN `immature_lo` BIGINT NOT NULL, ADD COLUMN `immature_hi` BIGINT NOT NULL;
CREATE INDEX `idx_wallets_immature` ON `wallets`(`immature_lo`,`immature_hi`);
3 changes: 2 additions & 1 deletion stores/sql/sqlite/migrations/metrics/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ CREATE INDEX `idx_performance_action` ON `performance`(`action`);
CREATE INDEX `idx_performance_timestamp` ON `performance`(`timestamp`);

-- dbWalletMetric
CREATE TABLE `wallets` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`timestamp` BIGINT NOT NULL,`confirmed_lo` BIGINT NOT NULL,`confirmed_hi` BIGINT NOT NULL,`spendable_lo` BIGINT NOT NULL,`spendable_hi` BIGINT NOT NULL,`unconfirmed_lo` BIGINT NOT NULL,`unconfirmed_hi` BIGINT NOT NULL);
CREATE TABLE `wallets` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`timestamp` BIGINT NOT NULL,`confirmed_lo` BIGINT NOT NULL,`confirmed_hi` BIGINT NOT NULL,`spendable_lo` BIGINT NOT NULL,`spendable_hi` BIGINT NOT NULL,`unconfirmed_lo` BIGINT NOT NULL,`unconfirmed_hi` BIGINT NOT NULL,`immature_lo` BIGINT NOT NULL,`immature_hi` BIGINT NOT NULL);
CREATE INDEX `idx_unconfirmed` ON `wallets`(`unconfirmed_lo`,`unconfirmed_hi`);
CREATE INDEX `idx_spendable` ON `wallets`(`spendable_lo`,`spendable_hi`);
CREATE INDEX `idx_confirmed` ON `wallets`(`confirmed_lo`,`confirmed_hi`);
CREATE INDEX `idx_wallets_immature` ON `wallets`(`immature_lo`,`immature_hi`);
CREATE INDEX `idx_wallets_timestamp` ON `wallets`(`timestamp`);

0 comments on commit 60ee37f

Please sign in to comment.