Skip to content

Commit

Permalink
feat(shed): actor state diff stats
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Jan 6, 2025
1 parent fc70735 commit f2e4bb3
Showing 1 changed file with 79 additions and 10 deletions.
89 changes: 79 additions & 10 deletions cmd/lotus-shed/state-stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,12 +618,20 @@ The top level stats reported for an actor is computed independently of all field
accounting of the true size of the actor in the state datastore.
The calculation of these stats results in the actor state being traversed twice. The dag-cache-size flag can be used
to reduce the number of decode operations performed by caching the decoded object after first access.`,
to reduce the number of decode operations performed by caching the decoded object after first access.
When using the diff-tipset flag, the stats output will only include the mutated state between the two tipsets, not
the total state of the actor in either tipset.
`,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "tipset",
Usage: "specify tipset to call method on (pass comma separated array of cids)",
},
&cli.StringFlag{
Name: "diff-tipset",
Usage: "specify tipset to diff against, stat output will include only the mutated state between the two tipsets (pass comma separated array of cids)",
},
&cli.IntFlag{
Name: "workers",
Usage: "number of workers to use when processing",
Expand Down Expand Up @@ -688,12 +696,12 @@ to reduce the number of decode operations performed by caching the decoded objec
numWorkers := cctx.Int("workers")
dagCacheSize := cctx.Int("dag-cache-size")

eg, egctx := errgroup.WithContext(ctx)

jobs := make(chan address.Address, numWorkers)
results := make(chan actorStats, numWorkers)

worker := func(ctx context.Context, id int) error {
sc := &statCollector{}

worker := func(ctx context.Context, id int, ts *types.TipSet) error {
completed := 0
defer func() {
log.Infow("worker done", "id", id, "completed", completed)
Expand All @@ -720,7 +728,7 @@ to reduce the number of decode operations performed by caching the decoded objec
}
}

actStats, err := collectStats(ctx, addr, actor, dag)
actStats, err := sc.collectStats(ctx, addr, actor, dag)
if err != nil {
return err
}
Expand All @@ -738,20 +746,68 @@ to reduce the number of decode operations performed by caching the decoded objec
}
}

eg, egctx := errgroup.WithContext(ctx)
for w := 0; w < numWorkers; w++ {
id := w
eg.Go(func() error {
return worker(egctx, id)
return worker(egctx, id, ts)
})
}

done := make(chan struct{})
go func() {
defer close(jobs)
defer func() {
close(jobs)
close(done)
}()
for _, addr := range addrs {
jobs <- addr
}
}()

// if diff-tipset is set, we need to load the actors from the diff tipset and compare, so we'll
// discard the results for this run, then run the workers again with a new set of jobs and take
// the results from the second run which should just include the diff.

if diffTs := cctx.String("diff-tipset"); diffTs != "" {
// read and discard results
go func() {
for range results {

Check failure on line 775 in cmd/lotus-shed/state-stats.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

empty-block: this block is empty, you can remove it (revive)
}
}()

_ = eg.Wait()
log.Infow("done with first pass, starting diff")
close(results)

<-done

dts, err := lcli.ParseTipSetRef(ctx, tsr, diffTs)
if err != nil {
return err
}
// TODO: if anyone cares for the "all" case, re-load actors here
log.Infow("diff tipset", "parentstate", dts.ParentState())

jobs = make(chan address.Address, numWorkers)
results = make(chan actorStats, numWorkers)

eg, egctx = errgroup.WithContext(ctx)
for w := 0; w < numWorkers; w++ {
id := w
eg.Go(func() error {
return worker(egctx, id, dts)
})
}

go func() {
defer close(jobs)
for _, addr := range addrs {
jobs <- addr
}
}()
}

go func() {
// error is check later
_ = eg.Wait()
Expand Down Expand Up @@ -866,7 +922,12 @@ func collectSnapshotJobStats(ctx context.Context, in job, dag format.NodeGetter,
return results, nil
}

func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) {
type statCollector struct {
rootCidSet *cid.Set
fieldCidSets map[string]*cid.Set
}

func (sc *statCollector) collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) {
log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code))

nd, err := dag.Get(ctx, actor.Head)
Expand Down Expand Up @@ -903,6 +964,14 @@ func collectStats(ctx context.Context, addr address.Address, actor *types.Actor,
}
}

if sc.rootCidSet == nil {
sc.rootCidSet = cid.NewSet()
sc.fieldCidSets = make(map[string]*cid.Set)
for _, field := range fields {
sc.fieldCidSets[field.Name] = cid.NewSet()
}
}

actStats := actorStats{
Address: addr,
Actor: actor,
Expand All @@ -913,7 +982,7 @@ func collectStats(ctx context.Context, addr address.Address, actor *types.Actor,
walk: carWalkFunc,
}

if err := merkledag.Walk(ctx, dsc.walkLinks, actor.Head, cid.NewSet().Visit, merkledag.Concurrent()); err != nil {
if err := merkledag.Walk(ctx, dsc.walkLinks, actor.Head, sc.rootCidSet.Visit, merkledag.Concurrent()); err != nil {
return actorStats{}, err
}

Expand All @@ -925,7 +994,7 @@ func collectStats(ctx context.Context, addr address.Address, actor *types.Actor,
walk: carWalkFunc,
}

if err := merkledag.Walk(ctx, dsc.walkLinks, field.Cid, cid.NewSet().Visit, merkledag.Concurrent()); err != nil {
if err := merkledag.Walk(ctx, dsc.walkLinks, field.Cid, sc.fieldCidSets[field.Name].Visit, merkledag.Concurrent()); err != nil {
return actorStats{}, err
}

Expand Down

0 comments on commit f2e4bb3

Please sign in to comment.