diff --git a/cmd/curio/deps/deps.go b/cmd/curio/deps/deps.go index 4ce660739f5..c25741cbc11 100644 --- a/cmd/curio/deps/deps.go +++ b/cmd/curio/deps/deps.go @@ -388,7 +388,7 @@ func GetConfig(cctx *cli.Context, db *harmonydb.DB) (*config.CurioConfig, error) for _, k := range meta.Keys() { have = append(have, strings.Join(k, " ")) } - log.Infow("Using layer", "layer", layer, "config", curioConfig) + log.Debugw("Using layer", "layer", layer, "config", curioConfig) } _ = have // FUTURE: verify that required fields are here. // If config includes 3rd-party config, consider JSONSchema as a way that diff --git a/cmd/curio/main.go b/cmd/curio/main.go index 2175d24c5f8..9a58768f371 100644 --- a/cmd/curio/main.go +++ b/cmd/curio/main.go @@ -54,6 +54,7 @@ func main() { webCmd, guidedsetup.GuidedsetupCmd, sealCmd, + marketCmd, } jaeger := tracing.SetupJaegerTracing("curio") diff --git a/cmd/curio/market.go b/cmd/curio/market.go new file mode 100644 index 00000000000..cc562db932c --- /dev/null +++ b/cmd/curio/market.go @@ -0,0 +1,70 @@ +package main + +import ( + "fmt" + "sort" + + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/cmd/curio/deps" + "github.com/filecoin-project/lotus/curiosrc/market/lmrpc" +) + +var marketCmd = &cli.Command{ + Name: "market", + Subcommands: []*cli.Command{ + marketRPCInfoCmd, + }, +} + +var marketRPCInfoCmd = &cli.Command{ + Flags: []cli.Flag{ + &cli.StringSliceFlag{ + Name: "layers", + Usage: "list of layers to be interpreted (atop defaults). Default: base", + }, + }, + Action: func(cctx *cli.Context) error { + db, err := deps.MakeDB(cctx) + if err != nil { + return err + } + + cfg, err := deps.GetConfig(cctx, db) + if err != nil { + return xerrors.Errorf("get config: %w", err) + } + + ts, err := lmrpc.MakeTokens(cfg) + if err != nil { + return xerrors.Errorf("make tokens: %w", err) + } + + var addrTokens []struct { + Address string + Token string + } + + for address, s := range ts { + addrTokens = append(addrTokens, struct { + Address string + Token string + }{ + Address: address.String(), + Token: s, + }) + } + + sort.Slice(addrTokens, func(i, j int) bool { + return addrTokens[i].Address < addrTokens[j].Address + }) + + for _, at := range addrTokens { + fmt.Printf("[lotus-miner/boost compatible] %s %s\n", at.Address, at.Token) + } + + return nil + }, + Name: "rpc-info", +} diff --git a/cmd/curio/run.go b/cmd/curio/run.go index 5974a540503..0f2785f2a3c 100644 --- a/cmd/curio/run.go +++ b/cmd/curio/run.go @@ -10,12 +10,14 @@ import ( "github.com/urfave/cli/v2" "go.opencensus.io/stats" "go.opencensus.io/tag" + "golang.org/x/xerrors" "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/cmd/curio/deps" "github.com/filecoin-project/lotus/cmd/curio/rpc" "github.com/filecoin-project/lotus/cmd/curio/tasks" + "github.com/filecoin-project/lotus/curiosrc/market/lmrpc" "github.com/filecoin-project/lotus/lib/ulimit" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node" @@ -133,6 +135,11 @@ var runCmd = &cli.Command{ if err != nil { return err } + + if err := lmrpc.ServeCurioMarketRPCFromConfig(dependencies.DB, dependencies.Full, dependencies.Cfg); err != nil { + return xerrors.Errorf("starting market RPCs: %w", err) + } + finishCh := node.MonitorShutdown(shutdownChan) //node.ShutdownHandler{Component: "rpc server", StopFunc: rpcStopper}, //node.ShutdownHandler{Component: "curio", StopFunc: stop}, diff --git a/cmd/lotus-shed/deal.go b/cmd/lotus-shed/deal.go index ba6979a140b..029e2966643 100644 --- a/cmd/lotus-shed/deal.go +++ b/cmd/lotus-shed/deal.go @@ -2,22 +2,15 @@ package main import ( "bytes" - "context" "fmt" "io" - "net" "net/http" "net/http/httptest" "net/url" "os" - "sync" - "time" "github.com/fatih/color" - "github.com/google/uuid" - "github.com/jackc/pgx/v5" "github.com/mitchellh/go-homedir" - manet "github.com/multiformats/go-multiaddr/net" "github.com/urfave/cli/v2" "golang.org/x/xerrors" @@ -32,19 +25,9 @@ import ( "github.com/filecoin-project/go-state-types/builtin/v9/market" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" - "github.com/filecoin-project/lotus/cmd/curio/deps" - cumarket "github.com/filecoin-project/lotus/curiosrc/market" - "github.com/filecoin-project/lotus/curiosrc/market/fakelm" - "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/must" - "github.com/filecoin-project/lotus/lib/nullreader" - "github.com/filecoin-project/lotus/metrics/proxy" - "github.com/filecoin-project/lotus/node" - "github.com/filecoin-project/lotus/storage/paths" - "github.com/filecoin-project/lotus/storage/sealer/storiface" ) var lpUtilCmd = &cli.Command{ @@ -52,7 +35,6 @@ var lpUtilCmd = &cli.Command{ Usage: "lotus provider utility commands", Subcommands: []*cli.Command{ lpUtilStartDealCmd, - lpBoostProxyCmd, }, } @@ -300,360 +282,3 @@ var lpUtilStartDealCmd = &cli.Command{ return nil }, } - -var lpBoostProxyCmd = &cli.Command{ - Name: "boost-proxy", - Usage: "Start a legacy lotus-miner rpc proxy", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "actor-address", - Usage: "Address of the miner actor", - Required: true, - }, - - &cli.StringFlag{ - Name: "db-host", - EnvVars: []string{"LOTUS_DB_HOST"}, - Usage: "Command separated list of hostnames for yugabyte cluster", - Value: "yugabyte", - }, - &cli.StringFlag{ - Name: "db-name", - EnvVars: []string{"LOTUS_DB_NAME", "LOTUS_HARMONYDB_HOSTS"}, - Value: "yugabyte", - }, - &cli.StringFlag{ - Name: "db-user", - EnvVars: []string{"LOTUS_DB_USER", "LOTUS_HARMONYDB_USERNAME"}, - Value: "yugabyte", - }, - &cli.StringFlag{ - Name: "db-password", - EnvVars: []string{"LOTUS_DB_PASSWORD", "LOTUS_HARMONYDB_PASSWORD"}, - Value: "yugabyte", - }, - &cli.StringFlag{ - Name: "db-port", - EnvVars: []string{"LOTUS_DB_PORT", "LOTUS_HARMONYDB_PORT"}, - Hidden: true, - Value: "5433", - }, - &cli.StringFlag{ - Name: "layers", - EnvVars: []string{"LOTUS_LAYERS", "LOTUS_CONFIG_LAYERS"}, - Value: "base", - }, - - &cli.StringFlag{ - Name: "listen", - Usage: "Address to listen on", - Value: ":32100", - }, - }, - Action: func(cctx *cli.Context) error { - ctx := lcli.ReqContext(cctx) - - db, err := deps.MakeDB(cctx) - if err != nil { - return err - } - - maddr, err := address.NewFromString(cctx.String("actor-address")) - if err != nil { - return xerrors.Errorf("parsing miner address: %w", err) - } - - full, closer, err := lcli.GetFullNodeAPIV1(cctx) - if err != nil { - return err - } - - defer closer() - - pin := cumarket.NewPieceIngester(db, full) - - si := paths.NewDBIndex(nil, db) - - mid, err := address.IDFromAddress(maddr) - if err != nil { - return xerrors.Errorf("getting miner id: %w", err) - } - - mi, err := full.StateMinerInfo(ctx, maddr, types.EmptyTSK) - if err != nil { - return xerrors.Errorf("getting miner info: %w", err) - } - - lp := fakelm.NewLMRPCProvider(si, full, maddr, abi.ActorID(mid), mi.SectorSize, pin, db, cctx.String("layers")) - - laddr, err := net.ResolveTCPAddr("tcp", cctx.String("listen")) - if err != nil { - return xerrors.Errorf("net resolve: %w", err) - } - - if len(laddr.IP) == 0 { - // set localhost - laddr.IP = net.IPv4(127, 0, 0, 1) - } - rootUrl := url.URL{ - Scheme: "http", - Host: laddr.String(), - } - - ast := api.StorageMinerStruct{} - - ast.CommonStruct.Internal.Version = func(ctx context.Context) (api.APIVersion, error) { - return api.APIVersion{ - Version: "lp-proxy-v0", - APIVersion: api.MinerAPIVersion0, - BlockDelay: build.BlockDelaySecs, - }, nil - } - - ast.CommonStruct.Internal.AuthNew = lp.AuthNew - - ast.Internal.ActorAddress = lp.ActorAddress - ast.Internal.WorkerJobs = lp.WorkerJobs - ast.Internal.SectorsStatus = lp.SectorsStatus - ast.Internal.SectorsList = lp.SectorsList - ast.Internal.SectorsSummary = lp.SectorsSummary - ast.Internal.SectorsListInStates = lp.SectorsListInStates - ast.Internal.StorageRedeclareLocal = lp.StorageRedeclareLocal - ast.Internal.ComputeDataCid = lp.ComputeDataCid - - type pieceInfo struct { - data storiface.Data - size abi.UnpaddedPieceSize - - done chan struct{} - } - - pieceInfoLk := new(sync.Mutex) - pieceInfos := map[uuid.UUID][]pieceInfo{} - - ast.Internal.SectorAddPieceToAny = func(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data, deal api.PieceDealInfo) (api.SectorOffset, error) { - origPieceData := pieceData - defer func() { - closer, ok := origPieceData.(io.Closer) - if !ok { - log.Warnf("DataCid: cannot close pieceData reader %T because it is not an io.Closer", origPieceData) - return - } - if err := closer.Close(); err != nil { - log.Warnw("closing pieceData in DataCid", "error", err) - } - }() - - pi := pieceInfo{ - data: pieceData, - size: pieceSize, - - done: make(chan struct{}), - } - - pieceUUID := uuid.New() - - color.Blue("%s %s piece assign request with id %s", deal.DealProposal.PieceCID, deal.DealProposal.Provider, pieceUUID) - - pieceInfoLk.Lock() - pieceInfos[pieceUUID] = append(pieceInfos[pieceUUID], pi) - pieceInfoLk.Unlock() - - // /piece?piece_cid=xxxx - dataUrl := rootUrl - dataUrl.Path = "/piece" - dataUrl.RawQuery = "piece_id=" + pieceUUID.String() - - // add piece entry - - var refID int64 - var pieceWasCreated bool - - comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { - var pieceID int64 - // Attempt to select the piece ID first - err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID) - - if err != nil { - if err == pgx.ErrNoRows { - // Piece does not exist, attempt to insert - err = tx.QueryRow(` - INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size) - VALUES ($1, $2, $3) - ON CONFLICT (piece_cid) DO NOTHING - RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID) - if err != nil { - return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err) - } - pieceWasCreated = true // New piece was created - } else { - // Some other error occurred during select - return false, xerrors.Errorf("checking existing parked piece: %w", err) - } - } else { - pieceWasCreated = false // Piece already exists, no new piece was created - } - - // Add parked_piece_ref - err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url) - VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID) - if err != nil { - return false, xerrors.Errorf("inserting parked piece ref: %w", err) - } - - // If everything went well, commit the transaction - return true, nil // This will commit the transaction - }, harmonydb.OptionRetry()) - if err != nil { - return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err) - } - if !comm { - return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit") - } - - // wait for piece to be parked - if pieceWasCreated { - <-pi.done - } else { - // If the piece was not created, we need to close the done channel - close(pi.done) - - go func() { - // close the data reader (drain to eof if it's not a closer) - if closer, ok := pieceData.(io.Closer); ok { - if err := closer.Close(); err != nil { - log.Warnw("closing pieceData in DataCid", "error", err) - } - } else { - log.Warnw("pieceData is not an io.Closer", "type", fmt.Sprintf("%T", pieceData)) - - _, err := io.Copy(io.Discard, pieceData) - if err != nil { - log.Warnw("draining pieceData in DataCid", "error", err) - } - } - }() - } - - pieceIDUrl := url.URL{ - Scheme: "pieceref", - Opaque: fmt.Sprintf("%d", refID), - } - - // make a sector - so, err := pin.AllocatePieceToSector(ctx, maddr, deal, int64(pieceSize), pieceIDUrl, nil) - if err != nil { - return api.SectorOffset{}, err - } - - color.Blue("%s piece assigned to sector f0%d:%d @ %d", deal.DealProposal.PieceCID, mid, so.Sector, so.Offset) - - return so, nil - } - - ast.Internal.StorageList = si.StorageList - ast.Internal.StorageDetach = si.StorageDetach - ast.Internal.StorageReportHealth = si.StorageReportHealth - ast.Internal.StorageDeclareSector = si.StorageDeclareSector - ast.Internal.StorageDropSector = si.StorageDropSector - ast.Internal.StorageFindSector = si.StorageFindSector - ast.Internal.StorageInfo = si.StorageInfo - ast.Internal.StorageBestAlloc = si.StorageBestAlloc - ast.Internal.StorageLock = si.StorageLock - ast.Internal.StorageTryLock = si.StorageTryLock - ast.Internal.StorageGetLocks = si.StorageGetLocks - - var pieceHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) { - // /piece?piece_id=xxxx - pieceUUID := r.URL.Query().Get("piece_id") - - pu, err := uuid.Parse(pieceUUID) - if err != nil { - http.Error(w, "bad piece id", http.StatusBadRequest) - return - } - - if r.Method != http.MethodGet { - http.Error(w, "bad method", http.StatusMethodNotAllowed) - return - } - - fmt.Printf("%s request for piece from %s\n", pieceUUID, r.RemoteAddr) - - pieceInfoLk.Lock() - pis, ok := pieceInfos[pu] - if !ok { - http.Error(w, "piece not found", http.StatusNotFound) - color.Red("%s not found", pu) - pieceInfoLk.Unlock() - return - } - - // pop - pi := pis[0] - pis = pis[1:] - - pieceInfos[pu] = pis - if len(pis) == 0 { - delete(pieceInfos, pu) - } - - pieceInfoLk.Unlock() - - start := time.Now() - - pieceData := io.LimitReader(io.MultiReader( - pi.data, - nullreader.Reader{}, - ), int64(pi.size)) - - n, err := io.Copy(w, pieceData) - close(pi.done) - - took := time.Since(start) - mbps := float64(n) / (1024 * 1024) / took.Seconds() - - if err != nil { - log.Errorf("copying piece data: %s", err) - return - } - - color.Green("%s served %.3f MiB in %s (%.2f MiB/s)", pu, float64(n)/(1024*1024), took, mbps) - } - - finalApi := proxy.LoggingAPI[api.StorageMiner, api.StorageMinerStruct](&ast) - - mh, err := node.MinerHandler(finalApi, false) // todo permissioned - if err != nil { - return err - } - - mux := http.NewServeMux() - mux.Handle("/piece", pieceHandler) - mux.Handle("/", mh) - - { - tok, err := lp.AuthNew(ctx, api.AllPermissions) - if err != nil { - return err - } - - // parse listen into multiaddr - ma, err := manet.FromNetAddr(laddr) - if err != nil { - return xerrors.Errorf("net from addr (%v): %w", laddr, err) - } - - fmt.Printf("Token: %s:%s\n", tok, ma) - } - - server := &http.Server{ - Addr: cctx.String("listen"), - Handler: mux, - ReadTimeout: 48 * time.Hour, - WriteTimeout: 48 * time.Hour, // really high because we block until TreeD - } - - return server.ListenAndServe() - }, -} diff --git a/curiosrc/market/fakelm/lmimpl.go b/curiosrc/market/fakelm/lmimpl.go index 1f6c5b91dee..9dc19e627d8 100644 --- a/curiosrc/market/fakelm/lmimpl.go +++ b/curiosrc/market/fakelm/lmimpl.go @@ -6,7 +6,6 @@ import ( "net/http" "net/url" - "github.com/BurntSushi/toml" "github.com/gbrlsnchs/jwt/v3" "github.com/google/uuid" "golang.org/x/xerrors" @@ -37,21 +36,21 @@ type LMRPCProvider struct { ssize abi.SectorSize - pi market.Ingester - db *harmonydb.DB - confLayer string + pi market.Ingester + db *harmonydb.DB + conf *config.CurioConfig } -func NewLMRPCProvider(si paths.SectorIndex, full api.FullNode, maddr address.Address, minerID abi.ActorID, ssize abi.SectorSize, pi market.Ingester, db *harmonydb.DB, confLayer string) *LMRPCProvider { +func NewLMRPCProvider(si paths.SectorIndex, full api.FullNode, maddr address.Address, minerID abi.ActorID, ssize abi.SectorSize, pi market.Ingester, db *harmonydb.DB, conf *config.CurioConfig) *LMRPCProvider { return &LMRPCProvider{ - si: si, - full: full, - maddr: maddr, - minerID: minerID, - ssize: ssize, - pi: pi, - db: db, - confLayer: confLayer, + si: si, + full: full, + maddr: maddr, + minerID: minerID, + ssize: ssize, + pi: pi, + db: db, + conf: conf, } } @@ -330,24 +329,6 @@ func (l *LMRPCProvider) AllocatePieceToSector(ctx context.Context, maddr address } func (l *LMRPCProvider) AuthNew(ctx context.Context, perms []auth.Permission) ([]byte, error) { - var cs []struct { - Config string - } - - err := l.db.Select(ctx, &cs, "select config from harmony_config where title = $1", l.confLayer) - if err != nil { - return nil, err - } - - if len(cs) == 0 { - return nil, xerrors.Errorf("no harmony config found") - } - - lp := config.DefaultCurioConfig() - if _, err := toml.Decode(cs[0].Config, lp); err != nil { - return nil, xerrors.Errorf("decode harmony config: %w", err) - } - type jwtPayload struct { Allow []auth.Permission } @@ -356,7 +337,7 @@ func (l *LMRPCProvider) AuthNew(ctx context.Context, perms []auth.Permission) ([ Allow: perms, } - sk, err := base64.StdEncoding.DecodeString(lp.Apis.StorageRPCSecret) + sk, err := base64.StdEncoding.DecodeString(l.conf.Apis.StorageRPCSecret) if err != nil { return nil, xerrors.Errorf("decode secret: %w", err) } diff --git a/curiosrc/market/lmrpc/lmrpc.go b/curiosrc/market/lmrpc/lmrpc.go new file mode 100644 index 00000000000..b760b7e1ee3 --- /dev/null +++ b/curiosrc/market/lmrpc/lmrpc.go @@ -0,0 +1,422 @@ +package lmrpc + +import ( + "context" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/google/uuid" + logging "github.com/ipfs/go-log/v2" + "github.com/jackc/pgx/v5" + manet "github.com/multiformats/go-multiaddr/net" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/types" + cumarket "github.com/filecoin-project/lotus/curiosrc/market" + "github.com/filecoin-project/lotus/curiosrc/market/fakelm" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/nullreader" + "github.com/filecoin-project/lotus/metrics/proxy" + "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/storage/paths" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +var log = logging.Logger("lmrpc") + +func ServeCurioMarketRPCFromConfig(db *harmonydb.DB, full api.FullNode, cfg *config.CurioConfig) error { + return forEachMarketRPC(cfg, func(maddr string, listen string) error { + addr, err := address.NewFromString(maddr) + if err != nil { + return xerrors.Errorf("parsing actor address: %w", err) + } + + go func() { + err := ServeCurioMarketRPC(db, full, addr, cfg, listen) + if err != nil { + log.Errorf("failed to serve market rpc: %s", err) + } + }() + + return nil + }) +} + +func MakeTokens(cfg *config.CurioConfig) (map[address.Address]string, error) { + out := map[address.Address]string{} + + err := forEachMarketRPC(cfg, func(smaddr string, listen string) error { + ctx := context.Background() + + laddr, err := net.ResolveTCPAddr("tcp", listen) + if err != nil { + return xerrors.Errorf("net resolve: %w", err) + } + + if len(laddr.IP) == 0 { + // set localhost + laddr.IP = net.IPv4(127, 0, 0, 1) + } + + // need minimal provider with just the config + lp := fakelm.NewLMRPCProvider(nil, nil, address.Undef, 0, 0, nil, nil, cfg) + + tok, err := lp.AuthNew(ctx, api.AllPermissions) + if err != nil { + return err + } + + // parse listen into multiaddr + ma, err := manet.FromNetAddr(laddr) + if err != nil { + return xerrors.Errorf("net from addr (%v): %w", laddr, err) + } + + maddr, err := address.NewFromString(smaddr) + if err != nil { + return xerrors.Errorf("parsing actor address: %w", err) + } + + token := fmt.Sprintf("%s:%s", tok, ma) + out[maddr] = token + + return nil + }) + + return out, err +} + +func forEachMarketRPC(cfg *config.CurioConfig, cb func(string, string) error) error { + for n, server := range cfg.Subsystems.BoostAdapters { + n := n + + // server: [f0.. actor address]:[bind address] + // bind address is either a numeric port or a full address + + // first split at first : to get the actor address and the bind address + split := strings.SplitN(server, ":", 2) + + // if the split length is not 2, return an error + if len(split) != 2 { + return fmt.Errorf("bad market rpc server config %d %s, expected [f0.. actor address]:[bind address]", n, server) + } + + // get the actor address and the bind address + strMaddr, strListen := split[0], split[1] + + maddr, err := address.NewFromString(strMaddr) + if err != nil { + return xerrors.Errorf("parsing actor address: %w", err) + } + + // check the listen address + if strListen == "" { + return fmt.Errorf("bad market rpc server config %d %s, expected [f0.. actor address]:[bind address]", n, server) + } + // if listen address is numeric, prepend the default host + if _, err := strconv.Atoi(strListen); err == nil { + strListen = "0.0.0.0:" + strListen + } + // check if the listen address is a valid address + if _, _, err := net.SplitHostPort(strListen); err != nil { + return fmt.Errorf("bad market rpc server config %d %s, expected [f0.. actor address]:[bind address]", n, server) + } + + log.Infow("Starting market RPC server", "actor", maddr, "listen", strListen) + + if err := cb(strMaddr, strListen); err != nil { + return err + } + } + + return nil +} + +func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Address, conf *config.CurioConfig, listen string) error { + ctx := context.Background() + + pin := cumarket.NewPieceIngester(db, full) + + si := paths.NewDBIndex(nil, db) + + mid, err := address.IDFromAddress(maddr) + if err != nil { + return xerrors.Errorf("getting miner id: %w", err) + } + + mi, err := full.StateMinerInfo(ctx, maddr, types.EmptyTSK) + if err != nil { + return xerrors.Errorf("getting miner info: %w", err) + } + + lp := fakelm.NewLMRPCProvider(si, full, maddr, abi.ActorID(mid), mi.SectorSize, pin, db, conf) + + laddr, err := net.ResolveTCPAddr("tcp", listen) + if err != nil { + return xerrors.Errorf("net resolve: %w", err) + } + + if len(laddr.IP) == 0 { + // set localhost + laddr.IP = net.IPv4(127, 0, 0, 1) + } + rootUrl := url.URL{ + Scheme: "http", + Host: laddr.String(), + } + + ast := api.StorageMinerStruct{} + + ast.CommonStruct.Internal.Version = func(ctx context.Context) (api.APIVersion, error) { + return api.APIVersion{ + Version: "curio-proxy-v0", + APIVersion: api.MinerAPIVersion0, + BlockDelay: build.BlockDelaySecs, + }, nil + } + + ast.CommonStruct.Internal.AuthNew = lp.AuthNew + + ast.Internal.ActorAddress = lp.ActorAddress + ast.Internal.WorkerJobs = lp.WorkerJobs + ast.Internal.SectorsStatus = lp.SectorsStatus + ast.Internal.SectorsList = lp.SectorsList + ast.Internal.SectorsSummary = lp.SectorsSummary + ast.Internal.SectorsListInStates = lp.SectorsListInStates + ast.Internal.StorageRedeclareLocal = lp.StorageRedeclareLocal + ast.Internal.ComputeDataCid = lp.ComputeDataCid + + type pieceInfo struct { + data storiface.Data + size abi.UnpaddedPieceSize + + done chan struct{} + } + + pieceInfoLk := new(sync.Mutex) + pieceInfos := map[uuid.UUID][]pieceInfo{} + + ast.Internal.SectorAddPieceToAny = func(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data, deal api.PieceDealInfo) (api.SectorOffset, error) { + origPieceData := pieceData + defer func() { + closer, ok := origPieceData.(io.Closer) + if !ok { + log.Warnf("DataCid: cannot close pieceData reader %T because it is not an io.Closer", origPieceData) + return + } + if err := closer.Close(); err != nil { + log.Warnw("closing pieceData in DataCid", "error", err) + } + }() + + pi := pieceInfo{ + data: pieceData, + size: pieceSize, + + done: make(chan struct{}), + } + + pieceUUID := uuid.New() + + //color.Blue("%s %s piece assign request with id %s", deal.DealProposal.PieceCID, deal.DealProposal.Provider, pieceUUID) + log.Infow("piece assign request", "piece_cid", deal.DealProposal.PieceCID, "provider", deal.DealProposal.Provider, "piece_uuid", pieceUUID) + + pieceInfoLk.Lock() + pieceInfos[pieceUUID] = append(pieceInfos[pieceUUID], pi) + pieceInfoLk.Unlock() + + // /piece?piece_cid=xxxx + dataUrl := rootUrl + dataUrl.Path = "/piece" + dataUrl.RawQuery = "piece_id=" + pieceUUID.String() + + // add piece entry + + var refID int64 + var pieceWasCreated bool + + comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + var pieceID int64 + // Attempt to select the piece ID first + err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID) + + if err != nil { + if err == pgx.ErrNoRows { + // Piece does not exist, attempt to insert + err = tx.QueryRow(` + INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size) + VALUES ($1, $2, $3) + ON CONFLICT (piece_cid) DO NOTHING + RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID) + if err != nil { + return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err) + } + pieceWasCreated = true // New piece was created + } else { + // Some other error occurred during select + return false, xerrors.Errorf("checking existing parked piece: %w", err) + } + } else { + pieceWasCreated = false // Piece already exists, no new piece was created + } + + // Add parked_piece_ref + err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url) + VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID) + if err != nil { + return false, xerrors.Errorf("inserting parked piece ref: %w", err) + } + + // If everything went well, commit the transaction + return true, nil // This will commit the transaction + }, harmonydb.OptionRetry()) + if err != nil { + return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err) + } + if !comm { + return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit") + } + + // wait for piece to be parked + if pieceWasCreated { + <-pi.done + } else { + // If the piece was not created, we need to close the done channel + close(pi.done) + + go func() { + // close the data reader (drain to eof if it's not a closer) + if closer, ok := pieceData.(io.Closer); ok { + if err := closer.Close(); err != nil { + log.Warnw("closing pieceData in DataCid", "error", err) + } + } else { + log.Warnw("pieceData is not an io.Closer", "type", fmt.Sprintf("%T", pieceData)) + + _, err := io.Copy(io.Discard, pieceData) + if err != nil { + log.Warnw("draining pieceData in DataCid", "error", err) + } + } + }() + } + + pieceIDUrl := url.URL{ + Scheme: "pieceref", + Opaque: fmt.Sprintf("%d", refID), + } + + // make a sector + so, err := pin.AllocatePieceToSector(ctx, maddr, deal, int64(pieceSize), pieceIDUrl, nil) + if err != nil { + return api.SectorOffset{}, err + } + + log.Infow("piece assigned to sector", "piece_cid", deal.DealProposal.PieceCID, "sector", so.Sector, "offset", so.Offset) + + return so, nil + } + + ast.Internal.StorageList = si.StorageList + ast.Internal.StorageDetach = si.StorageDetach + ast.Internal.StorageReportHealth = si.StorageReportHealth + ast.Internal.StorageDeclareSector = si.StorageDeclareSector + ast.Internal.StorageDropSector = si.StorageDropSector + ast.Internal.StorageFindSector = si.StorageFindSector + ast.Internal.StorageInfo = si.StorageInfo + ast.Internal.StorageBestAlloc = si.StorageBestAlloc + ast.Internal.StorageLock = si.StorageLock + ast.Internal.StorageTryLock = si.StorageTryLock + ast.Internal.StorageGetLocks = si.StorageGetLocks + + var pieceHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) { + // /piece?piece_id=xxxx + pieceUUID := r.URL.Query().Get("piece_id") + + pu, err := uuid.Parse(pieceUUID) + if err != nil { + http.Error(w, "bad piece id", http.StatusBadRequest) + return + } + + if r.Method != http.MethodGet { + http.Error(w, "bad method", http.StatusMethodNotAllowed) + return + } + + fmt.Printf("%s request for piece from %s\n", pieceUUID, r.RemoteAddr) + + pieceInfoLk.Lock() + pis, ok := pieceInfos[pu] + if !ok { + http.Error(w, "piece not found", http.StatusNotFound) + log.Warnw("piece not found", "piece_uuid", pu) + pieceInfoLk.Unlock() + return + } + + // pop + pi := pis[0] + pis = pis[1:] + + pieceInfos[pu] = pis + if len(pis) == 0 { + delete(pieceInfos, pu) + } + + pieceInfoLk.Unlock() + + start := time.Now() + + pieceData := io.LimitReader(io.MultiReader( + pi.data, + nullreader.Reader{}, + ), int64(pi.size)) + + n, err := io.Copy(w, pieceData) + close(pi.done) + + took := time.Since(start) + mbps := float64(n) / (1024 * 1024) / took.Seconds() + + if err != nil { + log.Errorf("copying piece data: %s", err) + return + } + + log.Infow("piece served", "piece_uuid", pu, "size", float64(n)/(1024*1024), "duration", took, "speed", mbps) + } + + finalApi := proxy.LoggingAPI[api.StorageMiner, api.StorageMinerStruct](&ast) + + mh, err := node.MinerHandler(finalApi, false) // todo permissioned + if err != nil { + return err + } + + mux := http.NewServeMux() + mux.Handle("/piece", pieceHandler) + mux.Handle("/", mh) + + server := &http.Server{ + Addr: listen, + Handler: mux, + ReadTimeout: 48 * time.Hour, + WriteTimeout: 48 * time.Hour, // really high because we block until pieces are saved in PiecePark + } + + return server.ListenAndServe() +} diff --git a/documentation/en/cli-curio.md b/documentation/en/cli-curio.md index 296a45c437d..f7c4f2d0c00 100644 --- a/documentation/en/cli-curio.md +++ b/documentation/en/cli-curio.md @@ -18,6 +18,7 @@ COMMANDS: web Start Curio web interface guided-setup Run the guided setup for migrating from lotus-miner to Curio seal Manage the sealing pipeline + market auth Manage RPC permissions log Manage logging wait-api Wait for lotus api to come online @@ -347,6 +348,35 @@ OPTIONS: --help, -h show help ``` +## curio market +``` +NAME: + curio market + +USAGE: + curio market command [command options] [arguments...] + +COMMANDS: + rpc-info + help, h Shows a list of commands or help for one command + +OPTIONS: + --help, -h show help +``` + +### curio market rpc-info +``` +NAME: + curio market rpc-info + +USAGE: + curio market rpc-info [command options] [arguments...] + +OPTIONS: + --layers value [ --layers value ] list of layers to be interpreted (atop defaults). Default: base + --help, -h show help +``` + ## curio auth ``` NAME: diff --git a/documentation/en/default-curio-config.toml b/documentation/en/default-curio-config.toml index 11c073696fb..f4e4c19b853 100644 --- a/documentation/en/default-curio-config.toml +++ b/documentation/en/default-curio-config.toml @@ -139,6 +139,27 @@ # type: int #MoveStorageMaxTasks = 0 + # BoostAdapters is a list of tuples of miner address and port/ip to listen for market (e.g. boost) requests. + # This interface is compatible with the lotus-miner RPC, implementing a subset needed for storage market operations. + # Strings should be in the format "actor:port" or "actor:ip:port". Default listen address is 0.0.0.0 + # Example: "f0123:32100", "f0123:127.0.0.1:32100". Multiple addresses can be specified. + # + # When a market node like boost gives Curio's market RPC a deal to placing into a sector, Curio will first store the + # deal data in a temporary location "Piece Park" before assigning it to a sector. This requires that at least one + # node in the cluster has the EnableParkPiece option enabled and has sufficient scratch space to store the deal data. + # This is different from lotus-miner which stored the deal data into an "unsealed" sector as soon as the deal was + # received. Deal data in PiecePark is accessed when the sector TreeD and TreeR are computed, but isn't needed for + # the initial SDR layers computation. Pieces in PiecePark are removed after all sectors referencing the piece are + # sealed. + # + # To get API info for boost configuration run 'curio market rpc-info' + # + # NOTE: All deal data will flow through this service, so it should be placed on a machine running boost or on + # a machine which handles ParkPiece tasks. + # + # type: []string + #BoostAdapters = [] + # EnableWebGui enables the web GUI on this lotus-provider instance. The UI has minimal local overhead, but it should # only need to be run on a single machine in the cluster. # diff --git a/node/config/def.go b/node/config/def.go index 661bed56f4d..0307001b843 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -330,7 +330,8 @@ const ( func DefaultCurioConfig() *CurioConfig { return &CurioConfig{ Subsystems: CurioSubsystemsConfig{ - GuiAddress: ":4701", + GuiAddress: ":4701", + BoostAdapters: []string{}, }, Fees: CurioFees{ DefaultMaxFee: DefaultDefaultMaxFee, diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index b61a136ce8c..3b4124f8a5a 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -561,6 +561,28 @@ SDRTrees machine into long-term storage. This task runs after the Finalize task. Comment: `The maximum amount of MoveStorage tasks that can run simultaneously. Note that the maximum number of tasks will also be bounded by resources available on the machine. It is recommended that this value is set to a number which uses all available network (or disk) bandwidth on the machine without causing bottlenecks.`, + }, + { + Name: "BoostAdapters", + Type: "[]string", + + Comment: `BoostAdapters is a list of tuples of miner address and port/ip to listen for market (e.g. boost) requests. +This interface is compatible with the lotus-miner RPC, implementing a subset needed for storage market operations. +Strings should be in the format "actor:port" or "actor:ip:port". Default listen address is 0.0.0.0 +Example: "f0123:32100", "f0123:127.0.0.1:32100". Multiple addresses can be specified. + +When a market node like boost gives Curio's market RPC a deal to placing into a sector, Curio will first store the +deal data in a temporary location "Piece Park" before assigning it to a sector. This requires that at least one +node in the cluster has the EnableParkPiece option enabled and has sufficient scratch space to store the deal data. +This is different from lotus-miner which stored the deal data into an "unsealed" sector as soon as the deal was +received. Deal data in PiecePark is accessed when the sector TreeD and TreeR are computed, but isn't needed for +the initial SDR layers computation. Pieces in PiecePark are removed after all sectors referencing the piece are +sealed. + +To get API info for boost configuration run 'curio market rpc-info' + +NOTE: All deal data will flow through this service, so it should be placed on a machine running boost or on +a machine which handles ParkPiece tasks.`, }, { Name: "EnableWebGui", diff --git a/node/config/types.go b/node/config/types.go index 09e8d7800db..82d9f4a99cf 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -200,6 +200,25 @@ type CurioSubsystemsConfig struct { // uses all available network (or disk) bandwidth on the machine without causing bottlenecks. MoveStorageMaxTasks int + // BoostAdapters is a list of tuples of miner address and port/ip to listen for market (e.g. boost) requests. + // This interface is compatible with the lotus-miner RPC, implementing a subset needed for storage market operations. + // Strings should be in the format "actor:port" or "actor:ip:port". Default listen address is 0.0.0.0 + // Example: "f0123:32100", "f0123:127.0.0.1:32100". Multiple addresses can be specified. + // + // When a market node like boost gives Curio's market RPC a deal to placing into a sector, Curio will first store the + // deal data in a temporary location "Piece Park" before assigning it to a sector. This requires that at least one + // node in the cluster has the EnableParkPiece option enabled and has sufficient scratch space to store the deal data. + // This is different from lotus-miner which stored the deal data into an "unsealed" sector as soon as the deal was + // received. Deal data in PiecePark is accessed when the sector TreeD and TreeR are computed, but isn't needed for + // the initial SDR layers computation. Pieces in PiecePark are removed after all sectors referencing the piece are + // sealed. + // + // To get API info for boost configuration run 'curio market rpc-info' + // + // NOTE: All deal data will flow through this service, so it should be placed on a machine running boost or on + // a machine which handles ParkPiece tasks. + BoostAdapters []string + // EnableWebGui enables the web GUI on this lotus-provider instance. The UI has minimal local overhead, but it should // only need to be run on a single machine in the cluster. EnableWebGui bool diff --git a/node/modules/storageminer_svc.go b/node/modules/storageminer_svc.go index 1a909b4ec93..17eb987ef39 100644 --- a/node/modules/storageminer_svc.go +++ b/node/modules/storageminer_svc.go @@ -2,26 +2,15 @@ package modules import ( "context" - "strings" "go.uber.org/fx" "golang.org/x/xerrors" - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/client" "github.com/filecoin-project/lotus/api/v1api" - "github.com/filecoin-project/lotus/chain/types" cliutil "github.com/filecoin-project/lotus/cli/util" - "github.com/filecoin-project/lotus/curiosrc/market" - "github.com/filecoin-project/lotus/curiosrc/market/fakelm" - "github.com/filecoin-project/lotus/lib/harmony/harmonydb" - "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/helpers" - "github.com/filecoin-project/lotus/storage/paths" - "github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sectorblocks" ) @@ -30,98 +19,8 @@ type MinerStorageService api.StorageMiner var _ sectorblocks.SectorBuilder = *new(MinerSealingService) -func harmonyApiInfoToConf(apiInfo string) (config.HarmonyDB, error) { - hc := config.HarmonyDB{} - - // apiInfo - harmony:layer:maddr:user:pass:dbname:host:port - - parts := strings.Split(apiInfo, ":") - - if len(parts) != 8 { - return config.HarmonyDB{}, xerrors.Errorf("invalid harmonydb info '%s'", apiInfo) - } - - hc.Username = parts[3] - hc.Password = parts[4] - hc.Database = parts[5] - hc.Hosts = []string{parts[6]} - hc.Port = parts[7] - - return hc, nil -} - -func connectHarmony(apiInfo string, fapi v1api.FullNode, mctx helpers.MetricsCtx, lc fx.Lifecycle) (api.StorageMiner, error) { - log.Info("Connecting to harmonydb") - - hc, err := harmonyApiInfoToConf(apiInfo) - if err != nil { - return nil, err - } - - db, err := harmonydb.NewFromConfig(hc) - if err != nil { - return nil, xerrors.Errorf("connecting to harmonydb: %w", err) - } - - parts := strings.Split(apiInfo, ":") - maddr, err := address.NewFromString(parts[2]) - if err != nil { - return nil, xerrors.Errorf("parsing miner address: %w", err) - } - - pin := market.NewPieceIngester(db, fapi) - - si := paths.NewDBIndex(nil, db) - - mid, err := address.IDFromAddress(maddr) - if err != nil { - return nil, xerrors.Errorf("getting miner id: %w", err) - } - - mi, err := fapi.StateMinerInfo(mctx, maddr, types.EmptyTSK) - if err != nil { - return nil, xerrors.Errorf("getting miner info: %w", err) - } - - lp := fakelm.NewLMRPCProvider(si, fapi, maddr, abi.ActorID(mid), mi.SectorSize, pin, db, parts[1]) - - ast := api.StorageMinerStruct{} - - ast.CommonStruct.Internal.AuthNew = lp.AuthNew - - ast.Internal.ActorAddress = lp.ActorAddress - ast.Internal.WorkerJobs = lp.WorkerJobs - ast.Internal.SectorsStatus = lp.SectorsStatus - ast.Internal.SectorsList = lp.SectorsList - ast.Internal.SectorsSummary = lp.SectorsSummary - ast.Internal.SectorsListInStates = lp.SectorsListInStates - ast.Internal.StorageRedeclareLocal = lp.StorageRedeclareLocal - ast.Internal.ComputeDataCid = lp.ComputeDataCid - ast.Internal.SectorAddPieceToAny = func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storiface.Data, p3 api.PieceDealInfo) (api.SectorOffset, error) { - panic("implement me") - } - - ast.Internal.StorageList = si.StorageList - ast.Internal.StorageDetach = si.StorageDetach - ast.Internal.StorageReportHealth = si.StorageReportHealth - ast.Internal.StorageDeclareSector = si.StorageDeclareSector - ast.Internal.StorageDropSector = si.StorageDropSector - ast.Internal.StorageFindSector = si.StorageFindSector - ast.Internal.StorageInfo = si.StorageInfo - ast.Internal.StorageBestAlloc = si.StorageBestAlloc - ast.Internal.StorageLock = si.StorageLock - ast.Internal.StorageTryLock = si.StorageTryLock - ast.Internal.StorageGetLocks = si.StorageGetLocks - - return &ast, nil -} - func connectMinerService(apiInfo string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (api.StorageMiner, error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (api.StorageMiner, error) { - if strings.HasPrefix(apiInfo, "harmony:") { - return connectHarmony(apiInfo, fapi, mctx, lc) - } - ctx := helpers.LifecycleCtx(mctx, lc) info := cliutil.ParseApiInfo(apiInfo) addr, err := info.DialArgs("v0")