Skip to content

Commit 7eae0ac

Browse files
authored
feat: add job to cancel stalled retrievals (#1233) (#1239)
* feat: add job to clean up stalled retrievals * chore: run configs doc gen * chore: refactor stalled retrieval name add warning if logs duration is less than stalled retrieval timeout
1 parent 4079f83 commit 7eae0ac

File tree

7 files changed

+69
-11
lines changed

7 files changed

+69
-11
lines changed

node/builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,7 @@ func ConfigBoost(cfg *config.Boost) Option {
531531
Override(new(retrievalmarket.RetrievalProviderNode), retrievaladapter.NewRetrievalProviderNode),
532532
Override(new(rmnet.RetrievalMarketNetwork), lotus_modules.RetrievalNetwork),
533533
Override(new(retrievalmarket.RetrievalProvider), lotus_modules.RetrievalProvider),
534-
Override(HandleRetrievalEventsKey, modules.HandleRetrievalGraphsyncUpdates(time.Duration(cfg.Dealmaking.RetrievalLogDuration))),
534+
Override(HandleRetrievalEventsKey, modules.HandleRetrievalGraphsyncUpdates(time.Duration(cfg.Dealmaking.RetrievalLogDuration), time.Duration(cfg.Dealmaking.StalledRetrievalTimeout))),
535535
Override(HandleRetrievalKey, lotus_modules.HandleRetrieval),
536536
Override(new(*lp2pimpl.TransportsListener), modules.NewTransportsListener(cfg)),
537537
Override(new(*protocolproxy.ProtocolProxy), modules.NewProtocolProxy(cfg)),

node/config/def.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func DefaultBoost() *Boost {
8989

9090
DealProposalLogDuration: Duration(time.Hour * 24),
9191
RetrievalLogDuration: Duration(time.Hour * 24),
92+
StalledRetrievalTimeout: Duration(time.Minute * 30),
9293

9394
RetrievalPricing: &lotus_config.RetrievalPricing{
9495
Strategy: RetrievalPricingDefaultMode,

node/config/doc_gen.go

Lines changed: 9 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/config/types.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,11 @@ type DealmakingConfig struct {
175175
// The amount of time to keep deal proposal logs for before cleaning them up.
176176
DealProposalLogDuration Duration
177177
// The amount of time to keep retrieval deal logs for before cleaning them up.
178+
// Note RetrievalLogDuration should exceed the StalledRetrievalTimeout as the
179+
// logs db is leveraged for pruning stalled retrievals.
178180
RetrievalLogDuration Duration
181+
// The amount of time stalled retrieval deals will remain open before being canceled.
182+
StalledRetrievalTimeout Duration
179183

180184
// A command used for fine-grained evaluation of storage deals
181185
// see https://boost.filecoin.io/configuration/deal-filters for more details

node/modules/retrieval.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,9 @@ func NewRetrievalLogDB(db *RetrievalSqlDB) *rtvllog.RetrievalLogDB {
142142
}
143143

144144
// Write graphsync retrieval updates to the database
145-
func HandleRetrievalGraphsyncUpdates(duration time.Duration) func(lc fx.Lifecycle, db *rtvllog.RetrievalLogDB, m lotus_retrievalmarket.RetrievalProvider, dt lotus_dtypes.ProviderDataTransfer) {
145+
func HandleRetrievalGraphsyncUpdates(duration time.Duration, stalledDuration time.Duration) func(lc fx.Lifecycle, db *rtvllog.RetrievalLogDB, m lotus_retrievalmarket.RetrievalProvider, dt lotus_dtypes.ProviderDataTransfer) {
146146
return func(lc fx.Lifecycle, db *rtvllog.RetrievalLogDB, m lotus_retrievalmarket.RetrievalProvider, dt lotus_dtypes.ProviderDataTransfer) {
147-
rel := rtvllog.NewRetrievalLog(db, duration)
147+
rel := rtvllog.NewRetrievalLog(db, duration, dt, stalledDuration)
148148

149149
relctx, cancel := context.WithCancel(context.Background())
150150
type unsubFn func()

retrievalmarket/rtvllog/db.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ func (d *RetrievalLogDB) List(ctx context.Context, cursor *time.Time, offset int
123123
return d.list(ctx, offset, limit, where, whereArgs...)
124124
}
125125

126+
func (d *RetrievalLogDB) ListLastUpdatedAndOpen(ctx context.Context, lastUpdated time.Time) ([]RetrievalDealState, error) {
127+
return d.list(ctx, 0, 0, "UpdatedAt <= ? AND Status != 'DealStatusCompleted' AND Status != 'DealStatusCancelled'", lastUpdated)
128+
}
129+
126130
func (d *RetrievalLogDB) list(ctx context.Context, offset int, limit int, where string, whereArgs ...interface{}) ([]RetrievalDealState, error) {
127131
qry := "SELECT " +
128132
"CreatedAt, " +

retrievalmarket/rtvllog/retrieval_log.go

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,32 +7,42 @@ import (
77

88
datatransfer "github.com/filecoin-project/go-data-transfer"
99
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
10+
lotus_dtypes "github.com/filecoin-project/lotus/node/modules/dtypes"
1011
logging "github.com/ipfs/go-log/v2"
1112
)
1213

1314
var log = logging.Logger("rtrvlog")
1415

1516
type RetrievalLog struct {
16-
db *RetrievalLogDB
17-
duration time.Duration
18-
ctx context.Context
17+
db *RetrievalLogDB
18+
duration time.Duration
19+
dataTransfer lotus_dtypes.ProviderDataTransfer
20+
stalledTimeout time.Duration
21+
ctx context.Context
1922

2023
lastUpdateLk sync.Mutex
2124
lastUpdate map[string]time.Time
2225
}
2326

24-
func NewRetrievalLog(db *RetrievalLogDB, duration time.Duration) *RetrievalLog {
27+
func NewRetrievalLog(db *RetrievalLogDB, duration time.Duration, dt lotus_dtypes.ProviderDataTransfer, stalledTimeout time.Duration) *RetrievalLog {
28+
if duration < stalledTimeout {
29+
log.Warnf("the RetrievalLogDuration (%s) should exceed the StalledRetrievalTimeout (%s)", duration.String(), stalledTimeout.String())
30+
}
31+
2532
return &RetrievalLog{
26-
db: db,
27-
duration: duration,
28-
lastUpdate: make(map[string]time.Time),
33+
db: db,
34+
duration: duration,
35+
dataTransfer: dt,
36+
stalledTimeout: stalledTimeout,
37+
lastUpdate: make(map[string]time.Time),
2938
}
3039
}
3140

3241
func (r *RetrievalLog) Start(ctx context.Context) {
3342
r.ctx = ctx
3443
go r.gcUpdateMap(ctx)
3544
go r.gcDatabase(ctx)
45+
go r.gcRetrievals(ctx)
3646
}
3747

3848
// Called when there is a retrieval ask query
@@ -225,3 +235,34 @@ func (r *RetrievalLog) gcDatabase(ctx context.Context) {
225235
}
226236
}
227237
}
238+
239+
// Periodically cancels stalled retrievals older than 30mins
240+
func (r *RetrievalLog) gcRetrievals(ctx context.Context) {
241+
ticker := time.NewTicker(5 * time.Minute)
242+
defer ticker.Stop()
243+
244+
for {
245+
select {
246+
case <-ctx.Done():
247+
return
248+
case now := <-ticker.C:
249+
// Get retrievals last updated
250+
rows, err := r.db.ListLastUpdatedAndOpen(ctx, now.Add(-r.stalledTimeout))
251+
252+
if err != nil {
253+
log.Errorw("error fetching open, stalled retrievals", "err", err)
254+
continue
255+
}
256+
257+
for _, row := range rows {
258+
chid := datatransfer.ChannelID{Initiator: row.PeerID, Responder: row.LocalPeerID, ID: row.TransferID}
259+
err := r.dataTransfer.CloseDataTransferChannel(ctx, chid)
260+
if err != nil {
261+
log.Errorw("error canceling retrieval", "dealID", row.DealID, "err", err)
262+
} else {
263+
log.Infof("Canceled retrieval %s, older than %s", row.DealID, r.stalledTimeout)
264+
}
265+
}
266+
}
267+
}
268+
}

0 commit comments

Comments
 (0)