From eb24b32744256b9d1c1876f0df82e68e58cf3a61 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 19 Aug 2024 13:54:47 +0200 Subject: [PATCH 1/2] bus: record wallet metrics --- api/metrics.go | 1 + bus/bus.go | 22 ++-- internal/bus/walletmetricsrecorder.go | 100 ++++++++++++++++++ internal/sql/migrations.go | 6 ++ internal/test/e2e/metrics_test.go | 1 - stores/metrics_test.go | 1 + stores/sql/metrics.go | 5 +- ...tion_00002_idx_wallet_metrics_immature.sql | 2 + .../sql/mysql/migrations/metrics/schema.sql | 5 +- ...tion_00002_idx_wallet_metrics_immature.sql | 2 + .../sql/sqlite/migrations/metrics/schema.sql | 3 +- 11 files changed, 138 insertions(+), 10 deletions(-) create mode 100644 internal/bus/walletmetricsrecorder.go create mode 100644 stores/sql/mysql/migrations/metrics/migration_00002_idx_wallet_metrics_immature.sql create mode 100644 stores/sql/sqlite/migrations/metrics/migration_00002_idx_wallet_metrics_immature.sql diff --git a/api/metrics.go b/api/metrics.go index 412435e34..98c6f06b0 100644 --- a/api/metrics.go +++ b/api/metrics.go @@ -110,6 +110,7 @@ type ( Confirmed types.Currency `json:"confirmed"` Spendable types.Currency `json:"spendable"` Unconfirmed types.Currency `json:"unconfirmed"` + Immature types.Currency `json:"immature"` } WalletMetricsQueryOpts struct{} diff --git a/bus/bus.go b/bus/bus.go index e7492cfc5..ec6807705 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -1,7 +1,6 @@ package bus // TODOs: -// - add wallet metrics // - add UPNP support import ( @@ -33,9 +32,10 @@ import ( ) const ( - defaultPinUpdateInterval = 5 * time.Minute - defaultPinRateWindow = 6 * time.Hour - stdTxnSize = 1200 // bytes + defaultWalletRecordMetricInterval = 5 * time.Minute + defaultPinUpdateInterval = 5 * time.Minute + defaultPinRateWindow = 6 * time.Hour + stdTxnSize = 1200 // bytes ) // Client re-exports the client from the client package. @@ -284,6 +284,7 @@ type ( RecordContractSetChurnMetric(ctx context.Context, metrics ...api.ContractSetChurnMetric) error WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) + RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error } // A SettingStore stores settings. @@ -293,6 +294,10 @@ type ( Settings(ctx context.Context) ([]string, error) UpdateSetting(ctx context.Context, key, value string) error } + + WalletMetricsRecorder interface { + Shutdown(context.Context) error + } ) type Bus struct { @@ -314,8 +319,9 @@ type Bus struct { mtrcs MetricsStore ss SettingStore - contractLocker ContractLocker - sectors UploadingSectorsCache + contractLocker ContractLocker + sectors UploadingSectorsCache + walletMetricsRecorder WalletMetricsRecorder logger *zap.SugaredLogger } @@ -365,6 +371,9 @@ func New(ctx context.Context, am AlertManager, wm WebhooksManager, cm ChainManag // create chain subscriber b.cs = ibus.NewChainSubscriber(wm, cm, store, w, announcementMaxAge, l) + // create wallet metrics recorder + b.walletMetricsRecorder = ibus.NewWalletMetricRecorder(store, w, defaultWalletRecordMetricInterval, l) + return b, nil } @@ -514,6 +523,7 @@ func (b *Bus) Handler() http.Handler { // Shutdown shuts down the bus. func (b *Bus) Shutdown(ctx context.Context) error { return errors.Join( + b.walletMetricsRecorder.Shutdown(ctx), b.accountsMgr.Shutdown(ctx), b.webhooksMgr.Shutdown(ctx), b.pinMgr.Shutdown(ctx), diff --git a/internal/bus/walletmetricsrecorder.go b/internal/bus/walletmetricsrecorder.go new file mode 100644 index 000000000..43b3f9004 --- /dev/null +++ b/internal/bus/walletmetricsrecorder.go @@ -0,0 +1,100 @@ +package bus + +import ( + "context" + "sync" + "time" + + "go.sia.tech/coreutils/wallet" + "go.sia.tech/renterd/api" + "go.uber.org/zap" +) + +type ( + WalletMetricsRecorder struct { + store MetricsStore + wallet WalletBalance + + shutdownChan chan struct{} + wg sync.WaitGroup + + logger *zap.SugaredLogger + } + + MetricsStore interface { + RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error + } + + WalletBalance interface { + Balance() (wallet.Balance, error) + } +) + +// NewWalletMetricRecorder returns a recorder that periodically records wallet +// metrics. The recorder is already running and can be stopped by calling +// Shutdown. +func NewWalletMetricRecorder(store MetricsStore, wallet WalletBalance, interval time.Duration, logger *zap.Logger) *WalletMetricsRecorder { + logger = logger.Named("walletmetricsrecorder") + recorder := &WalletMetricsRecorder{ + store: store, + wallet: wallet, + shutdownChan: make(chan struct{}), + logger: logger.Sugar(), + } + recorder.run(interval) + return recorder +} + +func (wmr *WalletMetricsRecorder) run(interval time.Duration) { + wmr.wg.Add(1) + go func() { + defer wmr.wg.Done() + + t := time.NewTicker(interval) + defer t.Stop() + + for { + balance, err := wmr.wallet.Balance() + if err != nil { + wmr.logger.Error("failed to get wallet balance", zap.Error(err)) + } else { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + if err = wmr.store.RecordWalletMetric(ctx, api.WalletMetric{ + Timestamp: api.TimeRFC3339(time.Now().UTC()), + Spendable: balance.Spendable, + Confirmed: balance.Confirmed, + Unconfirmed: balance.Unconfirmed, + Immature: balance.Immature, + }); err != nil { + wmr.logger.Error("failed to record wallet metric", zap.Error(err)) + } else { + wmr.logger.Debug("successfully recorded wallet metrics") + } + cancel() + } + + select { + case <-wmr.shutdownChan: + return + case <-t.C: + } + } + }() +} + +func (wmr *WalletMetricsRecorder) Shutdown(ctx context.Context) error { + close(wmr.shutdownChan) + + waitChan := make(chan struct{}) + go func() { + wmr.wg.Wait() + close(waitChan) + }() + + select { + case <-ctx.Done(): + return context.Cause(ctx) + case <-waitChan: + return nil + } +} diff --git a/internal/sql/migrations.go b/internal/sql/migrations.go index c0b567e99..18214c07f 100644 --- a/internal/sql/migrations.go +++ b/internal/sql/migrations.go @@ -214,6 +214,12 @@ var ( return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00001_idx_contracts_fcid_timestamp", log) }, }, + { + ID: "00002_idx_wallet_metrics_immature", + Migrate: func(tx Tx) error { + return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00002_idx_wallet_metrics_immature", log) + }, + }, } } ) diff --git a/internal/test/e2e/metrics_test.go b/internal/test/e2e/metrics_test.go index eb40c787b..fcec2d6c1 100644 --- a/internal/test/e2e/metrics_test.go +++ b/internal/test/e2e/metrics_test.go @@ -80,7 +80,6 @@ func TestMetrics(t *testing.T) { } // check wallet metrics - t.Skip("TODO: check wallet metrics") wm, err := b.WalletMetrics(context.Background(), start, 10, time.Minute, api.WalletMetricsQueryOpts{}) tt.OK(err) if len(wm) == 0 { diff --git a/stores/metrics_test.go b/stores/metrics_test.go index 0e9092cb6..9a9f7b71b 100644 --- a/stores/metrics_test.go +++ b/stores/metrics_test.go @@ -528,6 +528,7 @@ func TestWalletMetrics(t *testing.T) { Confirmed: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), Unconfirmed: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), Spendable: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), + Immature: types.NewCurrency(frand.Uint64n(math.MaxUint64), frand.Uint64n(math.MaxUint64)), } if err := ss.RecordWalletMetric(context.Background(), metric); err != nil { t.Fatal(err) diff --git a/stores/sql/metrics.go b/stores/sql/metrics.go index 510048114..6f6e5420f 100644 --- a/stores/sql/metrics.go +++ b/stores/sql/metrics.go @@ -366,7 +366,7 @@ func RecordPerformanceMetric(ctx context.Context, tx sql.Tx, metrics ...api.Perf } func RecordWalletMetric(ctx context.Context, tx sql.Tx, metrics ...api.WalletMetric) error { - insertStmt, err := tx.Prepare(ctx, "INSERT INTO wallets (created_at, timestamp, confirmed_lo, confirmed_hi, spendable_lo, spendable_hi, unconfirmed_lo, unconfirmed_hi) VALUES (?, ?, ?, ?, ?, ?, ?, ?)") + insertStmt, err := tx.Prepare(ctx, "INSERT INTO wallets (created_at, timestamp, confirmed_lo, confirmed_hi, spendable_lo, spendable_hi, unconfirmed_lo, unconfirmed_hi, immature_hi, immature_lo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") if err != nil { return fmt.Errorf("failed to prepare statement to insert wallet metric: %w", err) } @@ -382,6 +382,8 @@ func RecordWalletMetric(ctx context.Context, tx sql.Tx, metrics ...api.WalletMet Unsigned64(metric.Spendable.Hi), Unsigned64(metric.Unconfirmed.Lo), Unsigned64(metric.Unconfirmed.Hi), + Unsigned64(metric.Immature.Lo), + Unsigned64(metric.Immature.Hi), ) if err != nil { return fmt.Errorf("failed to insert wallet metric: %w", err) @@ -407,6 +409,7 @@ func WalletMetrics(ctx context.Context, tx sql.Tx, start time.Time, n uint64, in (*Unsigned64)(&m.Confirmed.Lo), (*Unsigned64)(&m.Confirmed.Hi), (*Unsigned64)(&m.Spendable.Lo), (*Unsigned64)(&m.Spendable.Hi), (*Unsigned64)(&m.Unconfirmed.Lo), (*Unsigned64)(&m.Unconfirmed.Hi), + (*Unsigned64)(&m.Immature.Lo), (*Unsigned64)(&m.Immature.Hi), ) if err != nil { err = fmt.Errorf("failed to scan contract set metric: %w", err) diff --git a/stores/sql/mysql/migrations/metrics/migration_00002_idx_wallet_metrics_immature.sql b/stores/sql/mysql/migrations/metrics/migration_00002_idx_wallet_metrics_immature.sql new file mode 100644 index 000000000..edc14f373 --- /dev/null +++ b/stores/sql/mysql/migrations/metrics/migration_00002_idx_wallet_metrics_immature.sql @@ -0,0 +1,2 @@ +ALTER TABLE `wallets` ADD COLUMN `immature_lo` bigint NOT NULL, ADD COLUMN `immature_hi` bigint NOT NULL; +CREATE INDEX `idx_wallets_immature` ON `wallets`(`immature_lo`,`immature_hi`); diff --git a/stores/sql/mysql/migrations/metrics/schema.sql b/stores/sql/mysql/migrations/metrics/schema.sql index da4db5a6e..7c4c27d6c 100644 --- a/stores/sql/mysql/migrations/metrics/schema.sql +++ b/stores/sql/mysql/migrations/metrics/schema.sql @@ -114,9 +114,12 @@ CREATE TABLE `wallets` ( `spendable_hi` bigint NOT NULL, `unconfirmed_lo` bigint NOT NULL, `unconfirmed_hi` bigint NOT NULL, + `immature_lo` bigint NOT NULL, + `immature_hi` bigint NOT NULL, PRIMARY KEY (`id`), KEY `idx_wallets_timestamp` (`timestamp`), KEY `idx_confirmed` (`confirmed_lo`,`confirmed_hi`), KEY `idx_spendable` (`spendable_lo`,`spendable_hi`), - KEY `idx_unconfirmed` (`unconfirmed_lo`,`unconfirmed_hi`) + KEY `idx_unconfirmed` (`unconfirmed_lo`,`unconfirmed_hi`), + KEY `idx_wallets_immature` (`immature_lo`,`immature_hi`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; \ No newline at end of file diff --git a/stores/sql/sqlite/migrations/metrics/migration_00002_idx_wallet_metrics_immature.sql b/stores/sql/sqlite/migrations/metrics/migration_00002_idx_wallet_metrics_immature.sql new file mode 100644 index 000000000..6844e5184 --- /dev/null +++ b/stores/sql/sqlite/migrations/metrics/migration_00002_idx_wallet_metrics_immature.sql @@ -0,0 +1,2 @@ +ALTER TABLE `wallets` ADD COLUMN `immature_lo` BIGINT NOT NULL, ADD COLUMN `immature_hi` BIGINT NOT NULL; +CREATE INDEX `idx_wallets_immature` ON `wallets`(`immature_lo`,`immature_hi`); \ No newline at end of file diff --git a/stores/sql/sqlite/migrations/metrics/schema.sql b/stores/sql/sqlite/migrations/metrics/schema.sql index 63dae7d65..dfb8e3cf1 100644 --- a/stores/sql/sqlite/migrations/metrics/schema.sql +++ b/stores/sql/sqlite/migrations/metrics/schema.sql @@ -46,8 +46,9 @@ CREATE INDEX `idx_performance_action` ON `performance`(`action`); CREATE INDEX `idx_performance_timestamp` ON `performance`(`timestamp`); -- dbWalletMetric -CREATE TABLE `wallets` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`timestamp` BIGINT NOT NULL,`confirmed_lo` BIGINT NOT NULL,`confirmed_hi` BIGINT NOT NULL,`spendable_lo` BIGINT NOT NULL,`spendable_hi` BIGINT NOT NULL,`unconfirmed_lo` BIGINT NOT NULL,`unconfirmed_hi` BIGINT NOT NULL); +CREATE TABLE `wallets` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`timestamp` BIGINT NOT NULL,`confirmed_lo` BIGINT NOT NULL,`confirmed_hi` BIGINT NOT NULL,`spendable_lo` BIGINT NOT NULL,`spendable_hi` BIGINT NOT NULL,`unconfirmed_lo` BIGINT NOT NULL,`unconfirmed_hi` BIGINT NOT NULL,`immature_lo` BIGINT NOT NULL,`immature_hi` BIGINT NOT NULL); CREATE INDEX `idx_unconfirmed` ON `wallets`(`unconfirmed_lo`,`unconfirmed_hi`); CREATE INDEX `idx_spendable` ON `wallets`(`spendable_lo`,`spendable_hi`); CREATE INDEX `idx_confirmed` ON `wallets`(`confirmed_lo`,`confirmed_hi`); +CREATE INDEX `idx_wallets_immature` ON `wallets`(`immature_lo`,`immature_hi`); CREATE INDEX `idx_wallets_timestamp` ON `wallets`(`timestamp`); From d17ea3ef6ec1766a2d88d6266e9b9afdd936a5e6 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 19 Aug 2024 13:59:35 +0200 Subject: [PATCH 2/2] bus: log wallet balance --- internal/bus/walletmetricsrecorder.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/bus/walletmetricsrecorder.go b/internal/bus/walletmetricsrecorder.go index 43b3f9004..4d3205043 100644 --- a/internal/bus/walletmetricsrecorder.go +++ b/internal/bus/walletmetricsrecorder.go @@ -68,7 +68,11 @@ func (wmr *WalletMetricsRecorder) run(interval time.Duration) { }); err != nil { wmr.logger.Error("failed to record wallet metric", zap.Error(err)) } else { - wmr.logger.Debug("successfully recorded wallet metrics") + wmr.logger.Debugw("successfully recorded wallet metrics", + zap.Stringer("spendable", balance.Spendable), + zap.Stringer("confirmed", balance.Confirmed), + zap.Stringer("unconfirmed", balance.Unconfirmed), + zap.Stringer("immature", balance.Immature)) } cancel() }