Skip to content

Commit

Permalink
Migrate WalletMetrics to raw SQL (#1313)
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan authored Jun 20, 2024
1 parent b2404ee commit b447afc
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 88 deletions.
96 changes: 8 additions & 88 deletions stores/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ import (
"fmt"
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
sql "go.sia.tech/renterd/stores/sql"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type (
Expand Down Expand Up @@ -189,41 +186,17 @@ func (s *SQLStore) RecordPerformanceMetric(ctx context.Context, metrics ...api.P
}

func (s *SQLStore) RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error {
dbMetrics := make([]dbWalletMetric, len(metrics))
for i, metric := range metrics {
dbMetrics[i] = dbWalletMetric{
Timestamp: unixTimeMS(metric.Timestamp),
ConfirmedLo: unsigned64(metric.Confirmed.Lo),
ConfirmedHi: unsigned64(metric.Confirmed.Hi),
SpendableLo: unsigned64(metric.Spendable.Lo),
SpendableHi: unsigned64(metric.Spendable.Hi),
UnconfirmedLo: unsigned64(metric.Unconfirmed.Lo),
UnconfirmedHi: unsigned64(metric.Unconfirmed.Hi),
}
}
return s.dbMetrics.Transaction(func(tx *gorm.DB) error {
return tx.Create(&dbMetrics).Error
return s.bMetrics.Transaction(ctx, func(tx sql.MetricsDatabaseTx) error {
return tx.RecordWalletMetric(ctx, metrics...)
})
}

func (s *SQLStore) WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) {
metrics, err := s.walletMetrics(ctx, start, n, interval, opts)
if err != nil {
return nil, err
}
resp := make([]api.WalletMetric, len(metrics))
toCurr := func(lo, hi unsigned64) types.Currency {
return types.NewCurrency(uint64(lo), uint64(hi))
}
for i := range resp {
resp[i] = api.WalletMetric{
Timestamp: api.TimeRFC3339(time.Time(metrics[i].Timestamp).UTC()),
Confirmed: toCurr(metrics[i].ConfirmedLo, metrics[i].ConfirmedHi),
Spendable: toCurr(metrics[i].SpendableLo, metrics[i].SpendableHi),
Unconfirmed: toCurr(metrics[i].UnconfirmedLo, metrics[i].UnconfirmedHi),
}
}
return resp, nil
func (s *SQLStore) WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) (metrics []api.WalletMetric, err error) {
err = s.bMetrics.Transaction(ctx, func(tx sql.MetricsDatabaseTx) (txErr error) {
metrics, txErr = tx.WalletMetrics(ctx, start, n, interval, opts)
return
})
return
}

func (s *SQLStore) PruneMetrics(ctx context.Context, metric string, cutoff time.Time) error {
Expand Down Expand Up @@ -265,56 +238,3 @@ func normaliseTimestamp(start time.Time, interval time.Duration, t unixTimeMS) u
normalizedMS := (toNormaliseMS-startMS)/intervalMS*intervalMS + start.UnixMilli()
return unixTimeMS(time.UnixMilli(normalizedMS))
}

// findPeriods is the core of all methods retrieving metrics. By using integer
// division rounding combined with a GROUP BY operation, all rows of a table are
// split into intervals and the row with the lowest timestamp for each interval
// is returned. The result is then joined with the original table to retrieve
// only the metrics we want.
func (s *SQLStore) findPeriods(ctx context.Context, table string, dst interface{}, start time.Time, n uint64, interval time.Duration, whereExpr clause.Expr) error {
if n > api.MetricMaxIntervals {
return api.ErrMaxIntervalsExceeded
}
end := start.Add(time.Duration(n) * interval)
return s.dbMetrics.WithContext(ctx).Raw(fmt.Sprintf(`
WITH RECURSIVE periods AS (
SELECT ? AS period_start
UNION ALL
SELECT period_start + ?
FROM periods
WHERE period_start < ? - ?
)
SELECT %s.* FROM %s
INNER JOIN (
SELECT
p.period_start as Period,
MIN(obj.id) AS id
FROM
periods p
INNER JOIN
%s obj ON obj.timestamp >= p.period_start AND obj.timestamp < p.period_start + ?
WHERE ?
GROUP BY
p.period_start
) i ON %s.id = i.id ORDER BY Period ASC
`, table, table, table, table),
unixTimeMS(start),
interval.Milliseconds(),
unixTimeMS(end),
interval.Milliseconds(),
interval.Milliseconds(),
whereExpr,
).Scan(dst).
Error
}

func (s *SQLStore) walletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) (metrics []dbWalletMetric, err error) {
err = s.findPeriods(ctx, dbWalletMetric{}.TableName(), &metrics, start, n, interval, gorm.Expr("TRUE"))
if err != nil {
return nil, fmt.Errorf("failed to fetch wallet metrics: %w", err)
}
for i, m := range metrics {
metrics[i].Timestamp = normaliseTimestamp(start, interval, m.Timestamp)
}
return
}
6 changes: 6 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ type (

// RecordPerformanceMetric records performance metrics.
RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error

// RecordWalletMetric records wallet metrics.
RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error

// WalletMetrics returns wallet metrics for the given time range
WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error)
}

UsedContract struct {
Expand Down
54 changes: 54 additions & 0 deletions stores/sql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,58 @@ func RecordPerformanceMetric(ctx context.Context, tx sql.Tx, metrics ...api.Perf
return nil
}

func RecordWalletMetric(ctx context.Context, tx sql.Tx, metrics ...api.WalletMetric) error {
insertStmt, err := tx.Prepare(ctx, "INSERT INTO wallets (created_at, timestamp, confirmed_lo, confirmed_hi, spendable_lo, spendable_hi, unconfirmed_lo, unconfirmed_hi) VALUES (?, ?, ?, ?, ?, ?, ?, ?)")
if err != nil {
return fmt.Errorf("failed to prepare statement to insert wallet metric: %w", err)
}
defer insertStmt.Close()

for _, metric := range metrics {
res, err := insertStmt.Exec(ctx,
time.Now().UTC(),
UnixTimeMS(metric.Timestamp),
Unsigned64(metric.Confirmed.Lo),
Unsigned64(metric.Confirmed.Hi),
Unsigned64(metric.Spendable.Lo),
Unsigned64(metric.Spendable.Hi),
Unsigned64(metric.Unconfirmed.Lo),
Unsigned64(metric.Unconfirmed.Hi),
)
if err != nil {
return fmt.Errorf("failed to insert wallet metric: %w", err)
} else if n, err := res.RowsAffected(); err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
} else if n == 0 {
return fmt.Errorf("failed to insert wallet metric: no rows affected")
}
}

return nil
}

func WalletMetrics(ctx context.Context, tx sql.Tx, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) {
return queryPeriods(ctx, tx, start, n, interval, opts, func(rows *sql.LoggedRows) (m api.WalletMetric, err error) {
var placeHolder int64
var placeHolderTime time.Time
var timestamp UnixTimeMS
err = rows.Scan(
&placeHolder,
&placeHolderTime,
&timestamp,
(*Unsigned64)(&m.Confirmed.Lo), (*Unsigned64)(&m.Confirmed.Hi),
(*Unsigned64)(&m.Spendable.Lo), (*Unsigned64)(&m.Spendable.Hi),
(*Unsigned64)(&m.Unconfirmed.Lo), (*Unsigned64)(&m.Unconfirmed.Hi),
)
if err != nil {
err = fmt.Errorf("failed to scan contract set metric: %w", err)
return
}
m.Timestamp = api.TimeRFC3339(normaliseTimestamp(start, interval, timestamp))
return
})
}

func queryPeriods[T any](ctx context.Context, tx sql.Tx, start time.Time, n uint64, interval time.Duration, opts interface{}, scanRowFn func(*sql.LoggedRows) (T, error)) ([]T, error) {
if n > api.MetricMaxIntervals {
return nil, api.ErrMaxIntervalsExceeded
Expand Down Expand Up @@ -524,6 +576,8 @@ func whereClauseFromQueryOpts(opts interface{}) (where whereClause, _ error) {
where.query += " AND origin = ?"
where.params = append(where.params, opts.Origin)
}
case api.WalletMetricsQueryOpts:
where.table = "wallets"
default:
return whereClause{}, fmt.Errorf("unknown query opts type: %T", opts)
}
Expand Down
8 changes: 8 additions & 0 deletions stores/sql/mysql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,11 @@ func (tx *MetricsDatabaseTx) RecordContractSetMetric(ctx context.Context, metric
func (tx *MetricsDatabaseTx) RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error {
return ssql.RecordPerformanceMetric(ctx, tx, metrics...)
}

func (tx *MetricsDatabaseTx) RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error {
return ssql.RecordWalletMetric(ctx, tx, metrics...)
}

func (tx *MetricsDatabaseTx) WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) {
return ssql.WalletMetrics(ctx, tx, start, n, interval, opts)
}
8 changes: 8 additions & 0 deletions stores/sql/sqlite/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,11 @@ func (tx *MetricsDatabaseTx) RecordContractSetMetric(ctx context.Context, metric
func (tx *MetricsDatabaseTx) RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error {
return ssql.RecordPerformanceMetric(ctx, tx, metrics...)
}

func (tx *MetricsDatabaseTx) RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error {
return ssql.RecordWalletMetric(ctx, tx, metrics...)
}

func (tx *MetricsDatabaseTx) WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) {
return ssql.WalletMetrics(ctx, tx, start, n, interval, opts)
}

0 comments on commit b447afc

Please sign in to comment.