Skip to content

Commit

Permalink
curio: Command for generating market tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Mar 27, 2024
1 parent 26b190c commit ee76f36
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 30 deletions.
1 change: 1 addition & 0 deletions cmd/curio/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func main() {
guidedsetup.GuidedsetupCmd,
configMigrateCmd,
sealCmd,
marketCmd,
}

jaeger := tracing.SetupJaegerTracing("curio")
Expand Down
64 changes: 64 additions & 0 deletions cmd/curio/market.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"fmt"
"sort"

"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/cmd/curio/deps"
"github.com/filecoin-project/lotus/curiosrc/market/lmrpc"
)

var marketCmd = &cli.Command{
Name: "market",
Subcommands: []*cli.Command{
marketRPCInfoCmd,
},
}

var marketRPCInfoCmd = &cli.Command{
Action: func(cctx *cli.Context) error {
db, err := deps.MakeDB(cctx)
if err != nil {
return err
}

cfg, err := deps.GetConfig(cctx, db)
if err != nil {
return xerrors.Errorf("get config: %w", err)
}

ts, err := lmrpc.MakeTokens(cfg)
if err != nil {
return xerrors.Errorf("make tokens: %w", err)
}

var addrTokens []struct {
Address string
Token string
}

for address, s := range ts {
addrTokens = append(addrTokens, struct {
Address string
Token string
}{
Address: address.String(),
Token: s,
})
}

sort.Slice(addrTokens, func(i, j int) bool {
return addrTokens[i].Address < addrTokens[j].Address
})

for _, at := range addrTokens {
fmt.Printf("%s %s\n", at.Address, at.Token)
}

return nil
},
Name: "rpc-info",
}
99 changes: 71 additions & 28 deletions curiosrc/market/lmrpc/lmrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sync"
"time"

"github.com/fatih/color"
"github.com/google/uuid"
logging "github.com/ipfs/go-log/v2"
"github.com/jackc/pgx/v5"
Expand All @@ -39,6 +38,68 @@ import (
var log = logging.Logger("lmrpc")

func ServeCurioMarketRPCFromConfig(db *harmonydb.DB, full api.FullNode, cfg *config.CurioConfig) error {
return forEachMarketRPC(cfg, func(maddr string, listen string) error {
addr, err := address.NewFromString(maddr)
if err != nil {
return xerrors.Errorf("parsing actor address: %w", err)
}

go func() {
err := ServeCurioMarketRPC(db, full, addr, cfg, listen)
if err != nil {
log.Errorf("failed to serve market rpc: %s", err)
}
}()

return nil
})
}

func MakeTokens(cfg *config.CurioConfig) (map[address.Address]string, error) {
out := map[address.Address]string{}

err := forEachMarketRPC(cfg, func(smaddr string, listen string) error {
ctx := context.Background()

laddr, err := net.ResolveTCPAddr("tcp", listen)
if err != nil {
return xerrors.Errorf("net resolve: %w", err)
}

if len(laddr.IP) == 0 {
// set localhost
laddr.IP = net.IPv4(127, 0, 0, 1)
}

// need minimal provider with just the config
lp := fakelm.NewLMRPCProvider(nil, nil, address.Undef, 0, 0, nil, nil, cfg)

tok, err := lp.AuthNew(ctx, api.AllPermissions)
if err != nil {
return err
}

// parse listen into multiaddr
ma, err := manet.FromNetAddr(laddr)
if err != nil {
return xerrors.Errorf("net from addr (%v): %w", laddr, err)
}

maddr, err := address.NewFromString(smaddr)
if err != nil {
return xerrors.Errorf("parsing actor address: %w", err)
}

token := fmt.Sprintf("%s:%s", tok, ma)
out[maddr] = token

return nil
})

return out, err
}

func forEachMarketRPC(cfg *config.CurioConfig, cb func(string, string) error) error {
for n, server := range cfg.Subsystems.MarketRPCServers {
n := n

Expand Down Expand Up @@ -76,13 +137,9 @@ func ServeCurioMarketRPCFromConfig(db *harmonydb.DB, full api.FullNode, cfg *con

log.Infow("Starting market RPC server", "actor", maddr, "listen", strListen)

// serve the market rpc
go func() {
err := ServeCurioMarketRPC(db, full, maddr, cfg, strListen)
if err != nil {
log.Errorf("failed to serve market rpc %d for %s: %s", n, maddr, err)
}
}()
if err := cb(strMaddr, strListen); err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -125,7 +182,7 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr

ast.CommonStruct.Internal.Version = func(ctx context.Context) (api.APIVersion, error) {
return api.APIVersion{
Version: "lp-proxy-v0",
Version: "curio-proxy-v0",
APIVersion: api.MinerAPIVersion0,
BlockDelay: build.BlockDelaySecs,
}, nil
Expand Down Expand Up @@ -174,7 +231,8 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr

pieceUUID := uuid.New()

color.Blue("%s %s piece assign request with id %s", deal.DealProposal.PieceCID, deal.DealProposal.Provider, pieceUUID)
//color.Blue("%s %s piece assign request with id %s", deal.DealProposal.PieceCID, deal.DealProposal.Provider, pieceUUID)
log.Infow("piece assign request", "piece_cid", deal.DealProposal.PieceCID, "provider", deal.DealProposal.Provider, "piece_uuid", pieceUUID)

pieceInfoLk.Lock()
pieceInfos[pieceUUID] = append(pieceInfos[pieceUUID], pi)
Expand Down Expand Up @@ -267,7 +325,7 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr
return api.SectorOffset{}, err
}

color.Blue("%s piece assigned to sector f0%d:%d @ %d", deal.DealProposal.PieceCID, mid, so.Sector, so.Offset)
log.Infow("piece assigned to sector", "piece_cid", deal.DealProposal.PieceCID, "sector", so.Sector, "offset", so.Offset)

return so, nil
}
Expand Down Expand Up @@ -305,7 +363,7 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr
pis, ok := pieceInfos[pu]
if !ok {
http.Error(w, "piece not found", http.StatusNotFound)
color.Red("%s not found", pu)
log.Warnw("piece not found", "piece_uuid", pu)
pieceInfoLk.Unlock()
return
}
Expand Down Expand Up @@ -339,7 +397,7 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr
return
}

color.Green("%s served %.3f MiB in %s (%.2f MiB/s)", pu, float64(n)/(1024*1024), took, mbps)
log.Infow("piece served", "piece_uuid", pu, "size", float64(n)/(1024*1024), "duration", took, "speed", mbps)
}

finalApi := proxy.LoggingAPI[api.StorageMiner, api.StorageMinerStruct](&ast)
Expand All @@ -353,21 +411,6 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr
mux.Handle("/piece", pieceHandler)
mux.Handle("/", mh)

{
tok, err := lp.AuthNew(ctx, api.AllPermissions)
if err != nil {
return err
}

// parse listen into multiaddr
ma, err := manet.FromNetAddr(laddr)
if err != nil {
return xerrors.Errorf("net from addr (%v): %w", laddr, err)
}

fmt.Printf("Token: %s:%s\n", tok, ma)
}

server := &http.Server{
Addr: listen,
Handler: mux,
Expand Down
2 changes: 1 addition & 1 deletion node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ type CurioSubsystemsConfig struct {
// the initial SDR layers computation. Pieces in PiecePark are removed after all sectors referencing the piece are
// sealed.
//
// To get API info for boost configuration run 'curio cli market rpc-info'
// To get API info for boost configuration run 'curio market rpc-info'
//
// NOTE: All deal data will flow through this service, so it should be placed on a machine running boost or on
// a machine which handles ParkPiece tasks.
Expand Down

0 comments on commit ee76f36

Please sign in to comment.