Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: curio ffiselect: Isolate gpu calls in a subprocess #11994

Merged
merged 22 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ BINS+=lotus-miner

curio: $(BUILD_DEPS)
rm -f curio
$(GOCC) build $(GOFLAGS) -o curio ./cmd/curio
$(GOCC) build $(GOFLAGS) -o curio -ldflags " \
-X github.com/filecoin-project/lotus/curiosrc/build.IsOpencl=$(FFI_USE_OPENCL) \
-X github.com/filecoin-project/lotus/curiosrc/build.Commit=`git log -1 --format=%h_%cI`" \
./cmd/curio
.PHONY: curio
BINS+=curio

Expand Down
13 changes: 0 additions & 13 deletions cmd/curio/deps/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (

"github.com/BurntSushi/toml"
"github.com/gbrlsnchs/jwt/v3"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
logging "github.com/ipfs/go-log/v2"
"github.com/samber/lo"
"github.com/urfave/cli/v2"
Expand All @@ -28,7 +26,6 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v1api"
Expand Down Expand Up @@ -170,7 +167,6 @@ type Deps struct {
DB *harmonydb.DB // has itest capability
Full api.FullNode
Verif storiface.Verifier
LW *sealer.LocalWorker
As *multictladdr.MultiAddressSelector
Maddrs map[dtypes.MinerAddress]bool
ProofTypes map[abi.RegisteredSealProof]bool
Expand Down Expand Up @@ -308,16 +304,7 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`,
if deps.Stor == nil {
deps.Stor = paths.NewRemote(deps.LocalStore, deps.Si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{})
}
if deps.LW == nil {
wstates := statestore.New(dssync.MutexWrap(ds.NewMapDatastore()))

// todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper
// maybe with a curio specific abstraction. LocalWorker does persistent call tracking which we probably
// don't need (ehh.. maybe we do, the async callback system may actually work decently well with harmonytask)
deps.LW = sealer.NewLocalWorker(sealer.WorkerConfig{
MaxParallelChallengeReads: deps.Cfg.Proving.ParallelCheckLimit,
}, deps.Stor, deps.LocalStore, deps.Si, nil, wstates)
}
if deps.Maddrs == nil {
deps.Maddrs = map[dtypes.MinerAddress]bool{}
}
Expand Down
71 changes: 71 additions & 0 deletions cmd/curio/ffi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package main

import (
"encoding/gob"
"fmt"
"os"
"reflect"

"github.com/ipfs/go-cid"
"github.com/samber/lo"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/lib/ffiselect"
ffidirect "github.com/filecoin-project/lotus/lib/ffiselect/ffidirect"
"github.com/filecoin-project/lotus/lib/must"
)

var ffiCmd = &cli.Command{
Name: "ffi",
Hidden: true,
Flags: []cli.Flag{
layersFlag,
},
Action: func(cctx *cli.Context) (err error) {
output := os.NewFile(uintptr(3), "out")

defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
if err != nil {
err = gob.NewEncoder(output).Encode(ffiselect.ValErr{Val: nil, Err: err.Error()})
if err != nil {
panic(err)
}
}
}()
var callInfo ffiselect.FFICall
if err := gob.NewDecoder(os.Stdin).Decode(&callInfo); err != nil {
return xerrors.Errorf("ffi subprocess can not decode: %w", err)
}

args := lo.Map(callInfo.Args, func(arg any, i int) reflect.Value {
return reflect.ValueOf(arg)
})

resAry := reflect.ValueOf(ffidirect.FFI{}).MethodByName(callInfo.Fn).Call(args)
res := lo.Map(resAry, func(res reflect.Value, i int) any {
return res.Interface()
})

err = gob.NewEncoder(output).Encode(ffiselect.ValErr{Val: res, Err: ""})
if err != nil {
return xerrors.Errorf("ffi subprocess can not encode: %w", err)
}

return output.Close()
},
}

func ffiSelfTest() {
val1, val2 := 12345678, must.One(cid.Parse("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"))
ret1, ret2, err := ffiselect.FFISelect{}.SelfTest(val1, val2)
if err != nil {
panic("ffi self test failed:" + err.Error())
}
if ret1 != val1 || !val2.Equals(ret2) {
panic(fmt.Sprint("ffi self test failed: values do not match: ", val1, val2, ret1, ret2))
}
}
1 change: 1 addition & 0 deletions cmd/curio/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func main() {
sealCmd,
marketCmd,
fetchParamCmd,
ffiCmd,
}

jaeger := tracing.SetupJaegerTracing("curio")
Expand Down
2 changes: 1 addition & 1 deletion cmd/curio/proving.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ It will not send any messages to the chain. Since it can compute any deadline, o
}

wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := curio.WindowPostScheduler(
ctx, deps.Cfg.Fees, deps.Cfg.Proving, deps.Full, deps.Verif, deps.LW, nil, nil,
ctx, deps.Cfg.Fees, deps.Cfg.Proving, deps.Full, deps.Verif, nil, nil,
deps.As, deps.Maddrs, deps.DB, deps.Stor, deps.Si, deps.Cfg.Subsystems.WindowPostMaxTasks)
if err != nil {
return err
Expand Down
20 changes: 9 additions & 11 deletions cmd/curio/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ import (
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/cmd/curio/deps"
"github.com/filecoin-project/lotus/cmd/curio/rpc"
Expand Down Expand Up @@ -89,11 +87,7 @@ var runCmd = &cli.Command{
log.Errorf("ensuring tempdir exists: %s", err)
}

ctx, _ := tag.New(lcli.DaemonContext(cctx),
tag.Insert(metrics.Version, build.BuildVersion),
tag.Insert(metrics.Commit, build.CurrentCommit),
tag.Insert(metrics.NodeType, "curio"),
)
ctx := lcli.DaemonContext(cctx)
shutdownChan := make(chan struct{})
{
var ctxclose func()
Expand Down Expand Up @@ -126,6 +120,8 @@ var runCmd = &cli.Command{
return err
}

go ffiSelfTest() // Panics on failure

taskEngine, err := tasks.StartTasks(ctx, dependencies)

if err != nil {
Expand All @@ -150,6 +146,11 @@ var runCmd = &cli.Command{
},
}

var layersFlag = &cli.StringSliceFlag{
Name: "layers",
Usage: "list of layers to be interpreted (atop defaults). Default: base",
}

var webCmd = &cli.Command{
Name: "web",
Usage: "Start Curio web interface",
Expand All @@ -165,10 +166,7 @@ var webCmd = &cli.Command{
Name: "nosync",
Usage: "don't check full-node sync status",
},
&cli.StringSliceFlag{
Name: "layers",
Usage: "list of layers to be interpreted (atop defaults). Default: base",
},
layersFlag,
},
Action: func(cctx *cli.Context) error {

Expand Down
6 changes: 3 additions & 3 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
db := dependencies.DB
full := dependencies.Full
verif := dependencies.Verif
lw := dependencies.LW
as := dependencies.As
maddrs := dependencies.Maddrs
stor := dependencies.Stor
Expand All @@ -61,7 +60,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task

if cfg.Subsystems.EnableWindowPost {
wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := curio.WindowPostScheduler(
ctx, cfg.Fees, cfg.Proving, full, verif, lw, sender, chainSched,
ctx, cfg.Fees, cfg.Proving, full, verif, sender, chainSched,
as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks)

if err != nil {
Expand All @@ -72,7 +71,8 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
}

if cfg.Subsystems.EnableWinningPost {
winPoStTask := winning.NewWinPostTask(cfg.Subsystems.WinningPostMaxTasks, db, lw, verif, full, maddrs)
pl := dependencies.LocalStore
winPoStTask := winning.NewWinPostTask(cfg.Subsystems.WinningPostMaxTasks, db, pl, verif, full, maddrs)
activeTasks = append(activeTasks, winPoStTask)
needProofParams = true
}
Expand Down
9 changes: 9 additions & 0 deletions curiosrc/build/build.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package build

// IsOpencl is set to the value of FFI_USE_OPENCL
var IsOpencl string

// Format: 8 HEX then underscore then ISO8701 date
// Ex: 4c5e98f28_2024-05-17T18:42:27-04:00
// NOTE: git date for repeatabile builds.
var Commit string
5 changes: 2 additions & 3 deletions curiosrc/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,20 @@ import (
"github.com/filecoin-project/lotus/node/config"
dtypes "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

//var log = logging.Logger("provider")

func WindowPostScheduler(ctx context.Context, fc config.CurioFees, pc config.CurioProvingConfig,
api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, sender *message.Sender, chainSched *chainsched.CurioChainSched,
api api.FullNode, verif storiface.Verifier, sender *message.Sender, chainSched *chainsched.CurioChainSched,
as *multictladdr.MultiAddressSelector, addresses map[dtypes.MinerAddress]bool, db *harmonydb.DB,
stor paths.Store, idx paths.SectorIndex, max int) (*window.WdPostTask, *window.WdPostSubmitTask, *window.WdPostRecoverDeclareTask, error) {

// todo config
ft := window.NewSimpleFaultTracker(stor, idx, pc.ParallelCheckLimit, time.Duration(pc.SingleCheckTimeout), time.Duration(pc.PartitionCheckTimeout))

computeTask, err := window.NewWdPostTask(db, api, ft, lw, verif, chainSched, addresses, max)
computeTask, err := window.NewWdPostTask(db, api, ft, stor, verif, chainSched, addresses, max, pc.ParallelCheckLimit, time.Duration(pc.SingleCheckTimeout))
if err != nil {
return nil, nil, nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions curiosrc/ffi/sdr_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import (
"github.com/puzpuzpuz/xsync/v2"
"golang.org/x/xerrors"

// TODO everywhere here that we call this we should call our proxy instead.
ffi "github.com/filecoin-project/filecoin-ffi"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-state-types/abi"
proof2 "github.com/filecoin-project/go-state-types/proof"

"github.com/filecoin-project/lotus/curiosrc/proof"
"github.com/filecoin-project/lotus/lib/ffiselect"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/proofpaths"
Expand Down Expand Up @@ -256,7 +258,7 @@ func (sb *SealCalls) TreeRC(ctx context.Context, task *harmonytask.TaskID, secto
}
}

sl, uns, err := ffi.SealPreCommitPhase2(p1o, fspaths.Cache, fspaths.Sealed)
sl, uns, err := ffiselect.FFISelect{}.SealPreCommitPhase2(sector.ID, p1o, fspaths.Cache, fspaths.Sealed)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("computing seal proof: %w", err)
}
Expand Down Expand Up @@ -307,7 +309,7 @@ func (sb *SealCalls) PoRepSnark(ctx context.Context, sn storiface.SectorRef, sea
return nil, xerrors.Errorf("failed to generate vanilla proof: %w", err)
}

proof, err := ffi.SealCommitPhase2(vproof, sn.ID.Number, sn.ID.Miner)
proof, err := ffiselect.FFISelect{}.SealCommitPhase2(vproof, sn.ID.Number, sn.ID.Miner)
if err != nil {
return nil, xerrors.Errorf("computing seal proof failed: %w", err)
}
Expand Down
Loading
Loading