diff --git a/stores/metrics.go b/stores/metrics.go index fe6a3422c..f9f7b060c 100644 --- a/stores/metrics.go +++ b/stores/metrics.go @@ -6,11 +6,8 @@ import ( "fmt" "time" - "go.sia.tech/core/types" "go.sia.tech/renterd/api" sql "go.sia.tech/renterd/stores/sql" - "gorm.io/gorm" - "gorm.io/gorm/clause" ) type ( @@ -189,41 +186,17 @@ func (s *SQLStore) RecordPerformanceMetric(ctx context.Context, metrics ...api.P } func (s *SQLStore) RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error { - dbMetrics := make([]dbWalletMetric, len(metrics)) - for i, metric := range metrics { - dbMetrics[i] = dbWalletMetric{ - Timestamp: unixTimeMS(metric.Timestamp), - ConfirmedLo: unsigned64(metric.Confirmed.Lo), - ConfirmedHi: unsigned64(metric.Confirmed.Hi), - SpendableLo: unsigned64(metric.Spendable.Lo), - SpendableHi: unsigned64(metric.Spendable.Hi), - UnconfirmedLo: unsigned64(metric.Unconfirmed.Lo), - UnconfirmedHi: unsigned64(metric.Unconfirmed.Hi), - } - } - return s.dbMetrics.Transaction(func(tx *gorm.DB) error { - return tx.Create(&dbMetrics).Error + return s.bMetrics.Transaction(ctx, func(tx sql.MetricsDatabaseTx) error { + return tx.RecordWalletMetric(ctx, metrics...) }) } -func (s *SQLStore) WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) { - metrics, err := s.walletMetrics(ctx, start, n, interval, opts) - if err != nil { - return nil, err - } - resp := make([]api.WalletMetric, len(metrics)) - toCurr := func(lo, hi unsigned64) types.Currency { - return types.NewCurrency(uint64(lo), uint64(hi)) - } - for i := range resp { - resp[i] = api.WalletMetric{ - Timestamp: api.TimeRFC3339(time.Time(metrics[i].Timestamp).UTC()), - Confirmed: toCurr(metrics[i].ConfirmedLo, metrics[i].ConfirmedHi), - Spendable: toCurr(metrics[i].SpendableLo, metrics[i].SpendableHi), - Unconfirmed: toCurr(metrics[i].UnconfirmedLo, metrics[i].UnconfirmedHi), - } - } - return resp, nil +func (s *SQLStore) WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) (metrics []api.WalletMetric, err error) { + err = s.bMetrics.Transaction(ctx, func(tx sql.MetricsDatabaseTx) (txErr error) { + metrics, txErr = tx.WalletMetrics(ctx, start, n, interval, opts) + return + }) + return } func (s *SQLStore) PruneMetrics(ctx context.Context, metric string, cutoff time.Time) error { @@ -265,56 +238,3 @@ func normaliseTimestamp(start time.Time, interval time.Duration, t unixTimeMS) u normalizedMS := (toNormaliseMS-startMS)/intervalMS*intervalMS + start.UnixMilli() return unixTimeMS(time.UnixMilli(normalizedMS)) } - -// findPeriods is the core of all methods retrieving metrics. By using integer -// division rounding combined with a GROUP BY operation, all rows of a table are -// split into intervals and the row with the lowest timestamp for each interval -// is returned. The result is then joined with the original table to retrieve -// only the metrics we want. -func (s *SQLStore) findPeriods(ctx context.Context, table string, dst interface{}, start time.Time, n uint64, interval time.Duration, whereExpr clause.Expr) error { - if n > api.MetricMaxIntervals { - return api.ErrMaxIntervalsExceeded - } - end := start.Add(time.Duration(n) * interval) - return s.dbMetrics.WithContext(ctx).Raw(fmt.Sprintf(` - WITH RECURSIVE periods AS ( - SELECT ? AS period_start - UNION ALL - SELECT period_start + ? - FROM periods - WHERE period_start < ? - ? - ) - SELECT %s.* FROM %s - INNER JOIN ( - SELECT - p.period_start as Period, - MIN(obj.id) AS id - FROM - periods p - INNER JOIN - %s obj ON obj.timestamp >= p.period_start AND obj.timestamp < p.period_start + ? - WHERE ? - GROUP BY - p.period_start - ) i ON %s.id = i.id ORDER BY Period ASC - `, table, table, table, table), - unixTimeMS(start), - interval.Milliseconds(), - unixTimeMS(end), - interval.Milliseconds(), - interval.Milliseconds(), - whereExpr, - ).Scan(dst). - Error -} - -func (s *SQLStore) walletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) (metrics []dbWalletMetric, err error) { - err = s.findPeriods(ctx, dbWalletMetric{}.TableName(), &metrics, start, n, interval, gorm.Expr("TRUE")) - if err != nil { - return nil, fmt.Errorf("failed to fetch wallet metrics: %w", err) - } - for i, m := range metrics { - metrics[i].Timestamp = normaliseTimestamp(start, interval, m.Timestamp) - } - return -} diff --git a/stores/sql/database.go b/stores/sql/database.go index d071604c6..ff6f04660 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -292,6 +292,12 @@ type ( // RecordPerformanceMetric records performance metrics. RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error + + // RecordWalletMetric records wallet metrics. + RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error + + // WalletMetrics returns wallet metrics for the given time range + WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) } UsedContract struct { diff --git a/stores/sql/metrics.go b/stores/sql/metrics.go index 1755baac0..ab618658d 100644 --- a/stores/sql/metrics.go +++ b/stores/sql/metrics.go @@ -335,6 +335,58 @@ func RecordPerformanceMetric(ctx context.Context, tx sql.Tx, metrics ...api.Perf return nil } +func RecordWalletMetric(ctx context.Context, tx sql.Tx, metrics ...api.WalletMetric) error { + insertStmt, err := tx.Prepare(ctx, "INSERT INTO wallets (created_at, timestamp, confirmed_lo, confirmed_hi, spendable_lo, spendable_hi, unconfirmed_lo, unconfirmed_hi) VALUES (?, ?, ?, ?, ?, ?, ?, ?)") + if err != nil { + return fmt.Errorf("failed to prepare statement to insert wallet metric: %w", err) + } + defer insertStmt.Close() + + for _, metric := range metrics { + res, err := insertStmt.Exec(ctx, + time.Now().UTC(), + UnixTimeMS(metric.Timestamp), + Unsigned64(metric.Confirmed.Lo), + Unsigned64(metric.Confirmed.Hi), + Unsigned64(metric.Spendable.Lo), + Unsigned64(metric.Spendable.Hi), + Unsigned64(metric.Unconfirmed.Lo), + Unsigned64(metric.Unconfirmed.Hi), + ) + if err != nil { + return fmt.Errorf("failed to insert wallet metric: %w", err) + } else if n, err := res.RowsAffected(); err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } else if n == 0 { + return fmt.Errorf("failed to insert wallet metric: no rows affected") + } + } + + return nil +} + +func WalletMetrics(ctx context.Context, tx sql.Tx, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) { + return queryPeriods(ctx, tx, start, n, interval, opts, func(rows *sql.LoggedRows) (m api.WalletMetric, err error) { + var placeHolder int64 + var placeHolderTime time.Time + var timestamp UnixTimeMS + err = rows.Scan( + &placeHolder, + &placeHolderTime, + ×tamp, + (*Unsigned64)(&m.Confirmed.Lo), (*Unsigned64)(&m.Confirmed.Hi), + (*Unsigned64)(&m.Spendable.Lo), (*Unsigned64)(&m.Spendable.Hi), + (*Unsigned64)(&m.Unconfirmed.Lo), (*Unsigned64)(&m.Unconfirmed.Hi), + ) + if err != nil { + err = fmt.Errorf("failed to scan contract set metric: %w", err) + return + } + m.Timestamp = api.TimeRFC3339(normaliseTimestamp(start, interval, timestamp)) + return + }) +} + func queryPeriods[T any](ctx context.Context, tx sql.Tx, start time.Time, n uint64, interval time.Duration, opts interface{}, scanRowFn func(*sql.LoggedRows) (T, error)) ([]T, error) { if n > api.MetricMaxIntervals { return nil, api.ErrMaxIntervalsExceeded @@ -524,6 +576,8 @@ func whereClauseFromQueryOpts(opts interface{}) (where whereClause, _ error) { where.query += " AND origin = ?" where.params = append(where.params, opts.Origin) } + case api.WalletMetricsQueryOpts: + where.table = "wallets" default: return whereClause{}, fmt.Errorf("unknown query opts type: %T", opts) } diff --git a/stores/sql/mysql/metrics.go b/stores/sql/mysql/metrics.go index 74be45bc8..21a80bdc7 100644 --- a/stores/sql/mysql/metrics.go +++ b/stores/sql/mysql/metrics.go @@ -111,3 +111,11 @@ func (tx *MetricsDatabaseTx) RecordContractSetMetric(ctx context.Context, metric func (tx *MetricsDatabaseTx) RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error { return ssql.RecordPerformanceMetric(ctx, tx, metrics...) } + +func (tx *MetricsDatabaseTx) RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error { + return ssql.RecordWalletMetric(ctx, tx, metrics...) +} + +func (tx *MetricsDatabaseTx) WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) { + return ssql.WalletMetrics(ctx, tx, start, n, interval, opts) +} diff --git a/stores/sql/sqlite/metrics.go b/stores/sql/sqlite/metrics.go index 35293be39..b3b375013 100644 --- a/stores/sql/sqlite/metrics.go +++ b/stores/sql/sqlite/metrics.go @@ -110,3 +110,11 @@ func (tx *MetricsDatabaseTx) RecordContractSetMetric(ctx context.Context, metric func (tx *MetricsDatabaseTx) RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error { return ssql.RecordPerformanceMetric(ctx, tx, metrics...) } + +func (tx *MetricsDatabaseTx) RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error { + return ssql.RecordWalletMetric(ctx, tx, metrics...) +} + +func (tx *MetricsDatabaseTx) WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) { + return ssql.WalletMetrics(ctx, tx, start, n, interval, opts) +}