Skip to content

Commit

Permalink
feat: curio: storage index gc task (#11884)
Browse files Browse the repository at this point in the history
* curio storage path gc: lay out the structure

* curio gc: Implement storage metadata gc

* move bored singleton task impl to harmonytask

* curio: run storage gc task on storage node

* make gen
  • Loading branch information
magik6k authored Apr 16, 2024
1 parent fd7f1a9 commit c785e59
Show file tree
Hide file tree
Showing 8 changed files with 424 additions and 30 deletions.
7 changes: 7 additions & 0 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
curio "github.com/filecoin-project/lotus/curiosrc"
"github.com/filecoin-project/lotus/curiosrc/chainsched"
"github.com/filecoin-project/lotus/curiosrc/ffi"
"github.com/filecoin-project/lotus/curiosrc/gc"
"github.com/filecoin-project/lotus/curiosrc/message"
"github.com/filecoin-project/lotus/curiosrc/piece"
"github.com/filecoin-project/lotus/curiosrc/seal"
Expand Down Expand Up @@ -136,6 +137,12 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
}
}

if hasAnySealingTask {
// Sealing nodes maintain storage index when bored
storageEndpointGcTask := gc.NewStorageEndpointGC(si, stor, db)
activeTasks = append(activeTasks, storageEndpointGcTask)
}

if needProofParams {
for spt := range dependencies.ProofTypes {
if err := modules.GetParams(true)(spt); err != nil {
Expand Down
276 changes: 276 additions & 0 deletions curiosrc/gc/storage_endpoint_gc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
package gc

import (
"context"
"os"
"strings"
"sync"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/samber/lo"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/lib/result"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

var log = logging.Logger("curiogc")

const StorageEndpointGCInterval = 2 * time.Minute // todo bump post testing
const StorageEndpointDeadTime = 15 * time.Minute
const MaxParallelEndpointChecks = 32

type StorageEndpointGC struct {
si *paths.DBIndex
remote *paths.Remote
db *harmonydb.DB
}

func NewStorageEndpointGC(si *paths.DBIndex, remote *paths.Remote, db *harmonydb.DB) *StorageEndpointGC {
return &StorageEndpointGC{
si: si,
remote: remote,
db: db,
}
}

func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
/*
1. Get all storage paths + urls (endpoints)
2. Ping each url, record results
3. Update sector_path_url_liveness with success/failure
4.1 If a URL was consistently down for StorageEndpointDeadTime, remove it from the storage_path table
4.2 Remove storage paths with no URLs remaining
4.2.1 in the same transaction remove sector refs to the dead path
*/

ctx := context.Background()

var pathRefs []struct {
StorageID storiface.ID `db:"storage_id"`
Urls string `db:"urls"`
LastHeartbeat *time.Time `db:"last_heartbeat"`
}

err = s.db.Select(ctx, &pathRefs, `SELECT storage_id, urls, last_heartbeat FROM storage_path`)
if err != nil {
return false, xerrors.Errorf("getting path metadata: %w", err)
}

type pingResult struct {
storageID storiface.ID
url string

res result.Result[fsutil.FsStat]
}

var pingResults []pingResult
var resultLk sync.Mutex
var resultThrottle = make(chan struct{}, MaxParallelEndpointChecks)

for _, pathRef := range pathRefs {
pathRef := pathRef
urls := strings.Split(pathRef.Urls, paths.URLSeparator)

for _, url := range urls {
url := url

select {
case resultThrottle <- struct{}{}:
case <-ctx.Done():
return false, ctx.Err()
}

go func() {
defer func() {
<-resultThrottle
}()

st, err := s.remote.StatUrl(ctx, url, pathRef.StorageID)

res := pingResult{
storageID: pathRef.StorageID,
url: url,
res: result.Wrap(st, err),
}

resultLk.Lock()
pingResults = append(pingResults, res)
resultLk.Unlock()
}()
}
}

// Wait for all pings to finish
for i := 0; i < MaxParallelEndpointChecks; i++ {
select {
case resultThrottle <- struct{}{}:
case <-ctx.Done():
return false, ctx.Err()
}
}

// Update the liveness table

/*
create table sector_path_url_liveness (
storage_id text,
url text,
last_checked timestamp not null,
last_live timestamp,
last_dead timestamp,
last_dead_reason text,
primary key (storage_id, url),
foreign key (storage_id) references storage_path (storage_id) on delete cascade
)
*/

currentTime := time.Now().UTC()

committed, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
for _, pingResult := range pingResults {
var lastLive, lastDead, lastDeadReason interface{}
if pingResult.res.Error == nil {
lastLive = currentTime.UTC()
lastDead = nil
lastDeadReason = nil
} else {
lastLive = nil
lastDead = currentTime.UTC()
lastDeadReason = pingResult.res.Error.Error()
}

_, err := tx.Exec(`
INSERT INTO sector_path_url_liveness (storage_id, url, last_checked, last_live, last_dead, last_dead_reason)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (storage_id, url) DO UPDATE
SET last_checked = EXCLUDED.last_checked,
last_live = COALESCE(EXCLUDED.last_live, sector_path_url_liveness.last_live),
last_dead = COALESCE(EXCLUDED.last_dead, sector_path_url_liveness.last_dead),
last_dead_reason = COALESCE(EXCLUDED.last_dead_reason, sector_path_url_liveness.last_dead_reason)
`, pingResult.storageID, pingResult.url, currentTime, lastLive, lastDead, lastDeadReason)
if err != nil {
return false, xerrors.Errorf("updating liveness data: %w", err)
}
}

return true, nil
}, harmonydb.OptionRetry())
if err != nil {
return false, xerrors.Errorf("sector_path_url_liveness update: %w", err)
}
if !committed {
return false, xerrors.Errorf("sector_path_url_liveness update: transaction didn't commit")
}

///////
// Now we do the actual database cleanup
if !stillOwned() {
return false, xerrors.Errorf("task no longer owned")
}

committed, err = s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
// Identify URLs that are consistently down
var deadURLs []struct {
StorageID storiface.ID
URL string
}
err = tx.Select(&deadURLs, `
SELECT storage_id, url FROM sector_path_url_liveness
WHERE last_dead > last_live AND last_dead < $1
`, currentTime.Add(-StorageEndpointDeadTime).UTC())
if err != nil {
return false, xerrors.Errorf("selecting dead URLs: %w", err)
}

log.Debugw("dead urls", "dead_urls", deadURLs)

// Remove dead URLs from storage_path entries and handle path cleanup
for _, du := range deadURLs {
// Fetch the current URLs for the storage path
var currentPath struct {
URLs string
}
err = tx.Get(&currentPath, "SELECT urls FROM storage_path WHERE storage_id = $1", du.StorageID)
if err != nil {
return false, xerrors.Errorf("fetching storage paths: %w", err)
}

// Filter out the dead URL using lo.Reject and prepare the updated list
urls := strings.Split(currentPath.URLs, paths.URLSeparator)
urls = lo.Reject(urls, func(u string, _ int) bool {
return u == du.URL
})

log.Debugw("filtered urls", "urls", urls, "dead_url", du.URL, "storage_id", du.StorageID)

if os.Getenv("CURIO_STORAGE_META_GC_DRYRUN") != "no" { // todo drop this after testing
log.Debugw("dryrun: not updating storage path", "storage_id", du.StorageID, "urls", urls, "dead_url", du.URL, "current_urls", currentPath.URLs, "dead_urls", deadURLs)
continue
}

if len(urls) == 0 {
// If no URLs left, remove the storage path entirely
_, err = tx.Exec("DELETE FROM storage_path WHERE storage_id = $1", du.StorageID)
if err != nil {
return false, xerrors.Errorf("deleting storage path: %w", err)
}
_, err = tx.Exec("DELETE FROM sector_location WHERE storage_id = $1", du.StorageID)
if err != nil {
return false, xerrors.Errorf("deleting sector locations: %w", err)
}
} else {
// Update the storage path with the filtered URLs
newURLs := strings.Join(urls, paths.URLSeparator)
_, err = tx.Exec("UPDATE storage_path SET urls = $1 WHERE storage_id = $2", newURLs, du.StorageID)
if err != nil {
return false, xerrors.Errorf("updating storage path urls: %w", err)
}
}
}

return true, nil
}, harmonydb.OptionRetry())
if err != nil {
return false, xerrors.Errorf("removing dead URLs and cleaning storage paths: %w", err)
}
if !committed {
return false, xerrors.Errorf("transaction for removing dead URLs and cleaning paths did not commit")
}

return true, nil
}

func (s *StorageEndpointGC) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
id := ids[0]
return &id, nil
}

func (s *StorageEndpointGC) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: 1,
Name: "StorageMetaGC",
Cost: resources.Resources{
Cpu: 1,
Ram: 64 << 20,
Gpu: 0,
},
IAmBored: harmonytask.SingletonTaskAdder(StorageEndpointGCInterval, s),
}
}

func (s *StorageEndpointGC) Adder(taskFunc harmonytask.AddTaskFunc) {
// lazy endpoint, added when bored
return
}

var _ harmonytask.TaskInterface = &StorageEndpointGC{}
8 changes: 8 additions & 0 deletions lib/harmony/harmonydb/sql/20240416-harmony_singleton_task.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
create table harmony_task_singletons (
task_name varchar(255) not null,
task_id bigint,
last_run_time timestamp,

primary key (task_name),
foreign key (task_id) references harmony_task (id) on delete set null
);
13 changes: 13 additions & 0 deletions lib/harmony/harmonydb/sql/20240417-sector_index_gc.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
create table sector_path_url_liveness (
storage_id text,
url text,

last_checked timestamp not null,
last_live timestamp,
last_dead timestamp,
last_dead_reason text,

primary key (storage_id, url),

foreign key (storage_id) references storage_path (storage_id) on delete cascade
)
4 changes: 4 additions & 0 deletions lib/harmony/harmonydb/userfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ func (t *Tx) Select(sliceOfStructPtr any, sql rawStringOnly, arguments ...any) e
return pgxscan.Select(t.ctx, t.Tx, sliceOfStructPtr, string(sql), arguments...)
}

func (t *Tx) Get(s any, sql rawStringOnly, arguments ...any) error {
return pgxscan.Get(t.ctx, t.Tx, s, string(sql), arguments...)
}

func IsErrUniqueContraint(err error) bool {
var e2 *pgconn.PgError
return errors.As(err, &e2) && e2.Code == pgerrcode.UniqueViolation
Expand Down
52 changes: 52 additions & 0 deletions lib/harmony/harmonytask/singleton_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package harmonytask

import (
"errors"
"time"

"github.com/jackc/pgx/v5"

"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/passcall"
)

func SingletonTaskAdder(minInterval time.Duration, task TaskInterface) func(AddTaskFunc) error {
return passcall.Every(minInterval, func(add AddTaskFunc) error {
taskName := task.TypeDetails().Name

add(func(taskID TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
var existingTaskID *int64
var lastRunTime time.Time

// Query to check the existing task entry
err = tx.QueryRow(`SELECT task_id, last_run_time FROM harmony_task_singletons WHERE task_name = $1`, taskName).Scan(&existingTaskID, &lastRunTime)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
return false, err // return error if query failed and it's not because of missing row
}
}

now := time.Now()
// Determine if the task should run based on the absence of a record or outdated last_run_time
shouldRun := err == pgx.ErrNoRows || (existingTaskID == nil && lastRunTime.Add(minInterval).Before(now))
if !shouldRun {
return false, nil
}

// Conditionally insert or update the task entry
n, err := tx.Exec(`
INSERT INTO harmony_task_singletons (task_name, task_id, last_run_time)
VALUES ($1, $2, $3)
ON CONFLICT (task_name) DO UPDATE
SET task_id = COALESCE(harmony_task_singletons.task_id, $2),
last_run_time = $3
WHERE harmony_task_singletons.task_id IS NULL
`, taskName, taskID, now)
if err != nil {
return false, err
}
return n > 0, nil
})
return nil
})
}
Loading

0 comments on commit c785e59

Please sign in to comment.