diff --git a/api/admin.go b/api/admin.go index fd03355..9136941 100644 --- a/api/admin.go +++ b/api/admin.go @@ -22,9 +22,9 @@ type ImportWalletRequest struct { PrivateKey string `json:"private_key"` } -// It creates a new wallet and saves it to the database +// ConfigureAdminRouter It creates a new wallet and saves it to the database +// It configures the admin router func ConfigureAdminRouter(e *echo.Group, node *core.DeltaNode) { - adminWallet := e.Group("/wallet") adminStats := e.Group("/stats") adminStats.GET("/miner/:minerId", handleAdminStatsMiner(node)) @@ -34,6 +34,8 @@ func ConfigureAdminRouter(e *echo.Group, node *core.DeltaNode) { } // It creates a new wallet address and saves it to the database +// `handleAdminCreateWallet` is a function that takes a `DeltaNode` and returns a function that takes an `echo.Context` and +// returns an `error` func handleAdminCreateWallet(node *core.DeltaNode) func(c echo.Context) error { return func(c echo.Context) error { authorizationString := c.Request().Header.Get("Authorization") @@ -80,6 +82,8 @@ func handleAdminCreateWallet(node *core.DeltaNode) func(c echo.Context) error { } // Creating a new wallet address and saving it to the database. +// `handleAdminRegisterWallet` is a function that takes a `DeltaNode` and returns a function that takes an `echo.Context` +// and returns an `error` func handleAdminRegisterWallet(node *core.DeltaNode) func(c echo.Context) error { return func(c echo.Context) error { authorizationString := c.Request().Header.Get("Authorization") @@ -132,6 +136,8 @@ func handleAdminRegisterWallet(node *core.DeltaNode) func(c echo.Context) error // It takes the authorization header from the request, splits it into two parts, and then uses the second part to find all // wallets owned by the user +// `handleAdminListWallets` is a function that takes a `DeltaNode` and returns a function that takes an `echo.Context` and +// returns an `error` func handleAdminListWallets(node *core.DeltaNode) func(c echo.Context) error { return func(c echo.Context) error { authorizationString := c.Request().Header.Get("Authorization") @@ -153,6 +159,7 @@ func handleAdminListWallets(node *core.DeltaNode) func(c echo.Context) error { } // A function that returns a function that returns an error. +// It returns a function that takes a context and returns an error func handleAdminStatsMiner(node *core.DeltaNode) func(c echo.Context) error { return func(c echo.Context) error { diff --git a/api/deal.go b/api/deal.go index 661cc80..b360a5d 100644 --- a/api/deal.go +++ b/api/deal.go @@ -72,7 +72,7 @@ func ConfigureDealRouter(e *echo.Group, node *core.DeltaNode) { dealStatus := dealMake.Group("/status") dealMake.POST("/content", func(c echo.Context) error { - return handleContentAdd(c, node, *statsService) + return handleContentAdd(c, node) }) dealMake.POST("/existing/content", func(c echo.Context) error { @@ -84,16 +84,16 @@ func ConfigureDealRouter(e *echo.Group, node *core.DeltaNode) { }) dealMake.POST("/piece-commitment", func(c echo.Context) error { - return handleCommPieceAdd(c, node, *statsService) + return handleCommPieceAdd(c, node) }) dealMake.POST("/existing/piece-commitment", func(c echo.Context) error { - return handleCommPieceAdd(c, node, *statsService) + return handleCommPieceAdd(c, node) }) // make piece-commitments dealMake.POST("/piece-commitments", func(c echo.Context) error { - return handleCommPiecesAdd(c, node, *statsService) + return handleCommPiecesAdd(c, node) }) dealPrepare.POST("/content", func(c echo.Context) error { @@ -121,10 +121,10 @@ func ConfigureDealRouter(e *echo.Group, node *core.DeltaNode) { }) dealStatus.POST("/content/:contentId", func(c echo.Context) error { - return handleContentStats(c, node, *statsService) + return handleContentStats(c, *statsService) }) dealStatus.POST("/piece-commitment/:piece-commitmentId", func(c echo.Context) error { - return handleCommitmentPieceStats(c, node, *statsService) + return handleCommitmentPieceStats(c, *statsService) }) } @@ -530,12 +530,15 @@ func handleExistingContentAdd(c echo.Context, node *core.DeltaNode) error { node.Dispatcher.AddJobAndDispatch(dispatchJobs, 1) - c.JSON(200, DealResponse{ + err = c.JSON(200, DealResponse{ Status: "success", Message: "File uploaded and pinned successfully", ContentId: content.ID, DealRequest: dealRequest, }) + if err != nil { + return err + } return nil } @@ -550,7 +553,7 @@ func handleExistingContentAdd(c echo.Context, node *core.DeltaNode) error { // @Success 200 {object} ContentMakeDealResponse // @Failure 500 {object} ContentMakeDealResponse // @Router /deal/content/{contentId} [post] -func handleContentAdd(c echo.Context, node *core.DeltaNode, stats core.StatsService) error { +func handleContentAdd(c echo.Context, node *core.DeltaNode) error { var dealRequest DealRequest // lets record this. @@ -560,7 +563,10 @@ func handleContentAdd(c echo.Context, node *core.DeltaNode, stats core.StatsServ meta := c.FormValue("metadata") // validate the meta - json.Unmarshal([]byte(meta), &dealRequest) + err = json.Unmarshal([]byte(meta), &dealRequest) + if err != nil { + return err + } err = ValidateMeta(dealRequest) if err != nil { @@ -739,12 +745,15 @@ func handleContentAdd(c echo.Context, node *core.DeltaNode, stats core.StatsServ node.Dispatcher.AddJobAndDispatch(dispatchJobs, 1) - c.JSON(200, DealResponse{ + err = c.JSON(200, DealResponse{ Status: "success", Message: "File uploaded and pinned successfully", ContentId: content.ID, DealRequest: dealRequest, }) + if err != nil { + return err + } return nil } @@ -755,7 +764,7 @@ func handleContentAdd(c echo.Context, node *core.DeltaNode, stats core.StatsServ // @Tags deals // @Accept json // @Produce json -func handleCommPieceAdd(c echo.Context, node *core.DeltaNode, statsService core.StatsService) error { +func handleCommPieceAdd(c echo.Context, node *core.DeltaNode) error { var dealRequest DealRequest // lets record this. @@ -933,12 +942,15 @@ func handleCommPieceAdd(c echo.Context, node *core.DeltaNode, statsService core. node.Dispatcher.AddJobAndDispatch(dispatchJobs, 1) - c.JSON(200, DealResponse{ + err = c.JSON(200, DealResponse{ Status: "success", Message: "File uploaded and pinned successfully", ContentId: content.ID, DealRequest: dealRequest, }) + if err != nil { + return err + } return nil } @@ -948,7 +960,7 @@ func handleCommPieceAdd(c echo.Context, node *core.DeltaNode, statsService core. // @Tags CommP // @Accept json // @Produce json -func handleCommPiecesAdd(c echo.Context, node *core.DeltaNode, statsService core.StatsService) error { +func handleCommPiecesAdd(c echo.Context, node *core.DeltaNode) error { var dealRequests []DealRequest // lets record this. @@ -1137,13 +1149,16 @@ func handleCommPiecesAdd(c echo.Context, node *core.DeltaNode, statsService core } node.Dispatcher.Start(len(dealRequests)) - c.JSON(http.StatusOK, dealResponses) + err = c.JSON(http.StatusOK, dealResponses) + if err != nil { + return err + } return nil } // It takes a contentId as a parameter, looks up the status of the content, and returns the status as JSON -func handleContentStats(c echo.Context, node *core.DeltaNode, statsService core.StatsService) error { +func handleContentStats(c echo.Context, statsService core.StatsService) error { contentIdParam := c.Param("contentId") contentId, err := strconv.Atoi(contentIdParam) if err != nil { @@ -1163,7 +1178,7 @@ func handleContentStats(c echo.Context, node *core.DeltaNode, statsService core. } // It takes a piece commitment ID, looks up the status of the piece commitment, and returns the status -func handleCommitmentPieceStats(c echo.Context, node *core.DeltaNode, statsService core.StatsService) error { +func handleCommitmentPieceStats(c echo.Context, statsService core.StatsService) error { pieceCommitmentIdParam := c.Param("piece-commitmentId") pieceCommitmentId, err := strconv.Atoi(pieceCommitmentIdParam) if err != nil { @@ -1206,10 +1221,12 @@ func ValidateMeta(dealRequest DealRequest) error { return errors.New("miner is required") } + // connection mode is required if (DealRequest{} != dealRequest && (dealRequest.ConnectionMode != utils.CONNECTION_MODE_E2E && dealRequest.ConnectionMode != utils.CONNECTION_MODE_IMPORT)) { return errors.New("connection mode can only be e2e or import") } + // piece commitment is required if (PieceCommitmentRequest{} != dealRequest.PieceCommitment && dealRequest.PieceCommitment.Piece == "") { return errors.New("piece commitment is invalid, make sure you have the cid, piece_cid, size and padded_piece_size or unpadded_piece_size") } diff --git a/api/metrics.go b/api/metrics.go index 326c8bd..f410dc3 100644 --- a/api/metrics.go +++ b/api/metrics.go @@ -1,3 +1,4 @@ +// It configures the metrics router package api import ( @@ -7,7 +8,8 @@ import ( "net/http" ) -// Configuring the metrics router. +// ConfigMetricsRouter Configuring the metrics router. +// > ConfigMetricsRouter is a function that takes a pointer to an echo.Group and returns nothing func ConfigMetricsRouter(e *echo.Group) { // metrics phandle := promhttp.Handler() diff --git a/api/miner.go b/api/miner.go deleted file mode 100644 index 9c4345c..0000000 --- a/api/miner.go +++ /dev/null @@ -1,34 +0,0 @@ -package api - -import ( - "delta/core" - model "github.com/application-research/delta-db/db_models" - "github.com/labstack/echo/v4" -) - -// https://api.estuary.tech/public/miners/storage/query/f01963614 -// https://api.estuary.tech/public/miners/ - -// ConfigureMinerRouter ConfigureAdminRouter configures the admin router -// This is the router that is used to administer the node -func ConfigureMinerRouter(e *echo.Group, node *core.DeltaNode) { - - configureMiner := e.Group("/miner") - - // get stats of miner - configureMiner.GET("/stats/:minerId", handleMinerStats(node)) - - // get the stats of miner and content - // get the stats of miner and content id -} - -// A function that returns a function that takes a context and returns an error. -func handleMinerStats(node *core.DeltaNode) func(c echo.Context) error { - return func(c echo.Context) error { - minerId := c.Param("minerId") - var dealsListMiner []model.ContentDeal - node.DB.Model(&model.ContentDeal{}).Where("miner = ?", minerId).Order("created_at desc").Find(&dealsListMiner) - c.JSON(200, dealsListMiner) - return nil - } -} diff --git a/api/router.go b/api/router.go index 2043b44..602d5b7 100644 --- a/api/router.go +++ b/api/router.go @@ -100,6 +100,16 @@ func InitializeEchoRouterConfig(ln *core.DeltaNode, config config.DeltaConfig) { authorizationString := c.Request().Header.Get("Authorization") authParts := strings.Split(authorizationString, " ") + if len(authParts) != 2 { + return c.JSON(http.StatusUnauthorized, HttpErrorResponse{ + Error: HttpError{ + Code: http.StatusUnauthorized, + Reason: http.StatusText(http.StatusUnauthorized), + Details: "Invalid authorization header", + }, + }) + } + response, err := http.Post( "https://auth.estuary.tech/check-api-key", "application/json", @@ -166,7 +176,6 @@ func InitializeEchoRouterConfig(ln *core.DeltaNode, config config.DeltaConfig) { ConfigureDealRouter(apiGroup, ln) ConfigureStatsCheckRouter(apiGroup, ln) ConfigureRepairRouter(apiGroup, ln) - ConfigureMinerRouter(apiGroup, ln) // open api ConfigureNodeInfoRouter(openApiGroup, ln) diff --git a/api/stats.go b/api/stats.go index 27a2888..a3042d2 100644 --- a/api/stats.go +++ b/api/stats.go @@ -19,6 +19,7 @@ type StatsCheckResponse struct { } // ConfigureStatsCheckRouter Creating a new router and adding a route to it. +// It configures the router for the stats check API func ConfigureStatsCheckRouter(e *echo.Group, node *core.DeltaNode) { e.GET("/stats/miner/:minerId/content", func(c echo.Context) error { @@ -70,7 +71,6 @@ func ConfigureStatsCheckRouter(e *echo.Group, node *core.DeltaNode) { }) e.GET("/stats/deal-proposal/:id", func(c echo.Context) error { - return nil }) @@ -82,25 +82,14 @@ func ConfigureStatsCheckRouter(e *echo.Group, node *core.DeltaNode) { return handleGetContents(c, node) }) - e.GET("/stats/miner/:minerId", func(c echo.Context) error { + e.GET("/stats/miner/:minerId", handleMinerStats(node)) - authorizationString := c.Request().Header.Get("Authorization") - authParts := strings.Split(authorizationString, " ") - - var contents []model.Content - node.DB.Raw("select c.* from content_deals cd, contents c where cd.content = c.id and cd.miner = ? and c.requesting_api_key = ?", c.Param("minerId"), authParts[1]).Scan(&contents) - - var contentMinerAssignment []model.ContentMiner - node.DB.Raw("select cma.* from content_miners cma, contents c where cma.content = c.id and cma.miner = ? and c.requesting_api_key = ?", c.Param("minerId"), authParts[1]).Scan(&contentMinerAssignment) + e.GET("/stats", handleStats(node)) - return c.JSON(200, map[string]interface{}{ - "content": contents, - "cmas": contentMinerAssignment, - }) - return nil - }) +} - e.GET("/stats", func(c echo.Context) error { +func handleStats(node *core.DeltaNode) func(c echo.Context) error { + return func(c echo.Context) error { authorizationString := c.Request().Header.Get("Authorization") authParts := strings.Split(authorizationString, " ") @@ -121,8 +110,31 @@ func ConfigureStatsCheckRouter(e *echo.Group, node *core.DeltaNode) { "deals": contentDeal, "piece_commitments": pieceCommitments, }) - }) + } +} + +// `handleMinerStats` is a function that takes a `*core.DeltaNode` and returns a function that takes an `echo.Context` and +// returns an `error` +// `handleMinerStats` is a function that takes a `*core.DeltaNode` and returns a function that takes an `echo.Context` and +// returns an `error` +func handleMinerStats(node *core.DeltaNode) func(c echo.Context) error { + return func(c echo.Context) error { + + authorizationString := c.Request().Header.Get("Authorization") + authParts := strings.Split(authorizationString, " ") + + var contents []model.Content + node.DB.Raw("select c.* from content_deals cd, contents c where cd.content = c.id and cd.miner = ? and c.requesting_api_key = ?", c.Param("minerId"), authParts[1]).Scan(&contents) + var contentMinerAssignment []model.ContentMiner + node.DB.Raw("select cma.* from content_miners cma, contents c where cma.content = c.id and cma.miner = ? and c.requesting_api_key = ?", c.Param("minerId"), authParts[1]).Scan(&contentMinerAssignment) + + return c.JSON(200, map[string]interface{}{ + "content": contents, + "cmas": contentMinerAssignment, + }) + return nil + } } // A function that takes in a commitment and a piece number and returns the piece of the commitment. diff --git a/api/websocket.go b/api/websocket.go index 0eb41d5..0547b58 100644 --- a/api/websocket.go +++ b/api/websocket.go @@ -44,7 +44,8 @@ func ConfigureWebsocketRouter(e *echo.Group, ln *core.DeltaNode) { } -// Handling a websocket connection. +// It upgrades the HTTP connection to a WebSocket connection, registers the new client, and then reads messages from the +// client and sends them to the broadcast channel func handleWebsocketContent(ln *core.DeltaNode) func(c echo.Context) error { return func(c echo.Context) error { // Upgrade HTTP connection to WebSocket connection @@ -82,7 +83,8 @@ func handleWebsocketContent(ln *core.DeltaNode) func(c echo.Context) error { } } -// Handling a websocket connection. +// It upgrades the HTTP connection to a WebSocket connection, registers the new client, and then reads messages from the +// client and sends them to the broadcast channel func handleWebsocketPieceCommitment(ln *core.DeltaNode) func(c echo.Context) error { return func(c echo.Context) error { // Upgrade HTTP connection to WebSocket connection @@ -120,7 +122,8 @@ func handleWebsocketPieceCommitment(ln *core.DeltaNode) func(c echo.Context) err } } -// Handling a websocket connection. +// It upgrades the HTTP connection to a WebSocket connection, registers the new client, and then reads messages from the +// client and sends them to the broadcast channel func handleWebsocketContentDeal(ln *core.DeltaNode) func(c echo.Context) error { return func(c echo.Context) error { // Upgrade HTTP connection to WebSocket connection diff --git a/cmd/commp.go b/cmd/commp.go index 181da05..570f680 100644 --- a/cmd/commp.go +++ b/cmd/commp.go @@ -25,6 +25,7 @@ type CommpResult struct { } // CommpCmd A CLI command that generates a piece commitment for a given file. +// `CommpCmd()` returns a slice of `*cli.Command`s func CommpCmd() []*cli.Command { // add a command to run API node var commpCommands []*cli.Command diff --git a/cmd/daemon.go b/cmd/daemon.go index a5bdf26..0886ea4 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -116,7 +116,6 @@ func DaemonCmd(cfg *c.DeltaConfig) []*cli.Command { // launch the API node api.InitializeEchoRouterConfig(ln, *cfg) - api.LoopForever() return nil @@ -132,6 +131,7 @@ func DaemonCmd(cfg *c.DeltaConfig) []*cli.Command { // Run the cron jobs. // The cron jobs are run every 12 hours and are responsible for cleaning up the database and the blockstore. // It also retries the failed transfers. +// `runScheduledCron` is a function that runs a cron job on a node func runScheduledCron(ln *core.DeltaNode) { maxCleanUpJobs := ln.Config.Dispatcher.MaxCleanupWorkers @@ -141,8 +141,7 @@ func runScheduledCron(ln *core.DeltaNode) { dispatcher := core.CreateNewDispatcher() dispatcher.AddJob(jobs.NewItemContentCleanUpProcessor(ln)) dispatcher.AddJob(jobs.NewRetryProcessor(ln)) - dispatcher.AddJob(jobs.NewMinerCheckProcessor(ln)) - dispatcher.Start(maxCleanUpJobs) // fix 100 workers for now. + dispatcher.Start(maxCleanUpJobs) }) s.Start() @@ -150,6 +149,7 @@ func runScheduledCron(ln *core.DeltaNode) { } // Setting the global node meta. +// > This function sets the global node metadata for the given node func setGlobalNodeMeta(ln *core.DeltaNode, repo string) *model.InstanceMeta { // get the 80% of the total memory usage diff --git a/config/config.go b/config/config.go index 5a3557c..c058a52 100644 --- a/config/config.go +++ b/config/config.go @@ -15,7 +15,7 @@ var ( type DeltaConfig struct { Node struct { - Name string `env:"NODE_NAME" envDefault:"stg-deal-maker"` + Name string `env:"NODE_NAME" envDefault:"delta-deal-maker"` Description string `env:"NODE_DESCRIPTION"` Type string `env:"NODE_TYPE"` } diff --git a/core/commp.go b/core/commp.go index a4c7c5f..aec781b 100644 --- a/core/commp.go +++ b/core/commp.go @@ -19,11 +19,13 @@ type CommpService struct { } // GenerateCommPFile Generating a CommP file from a payload file. +// Generating a CommP file from a payload file. func (c CommpService) GenerateCommPFile(context context.Context, payloadCid cid.Cid, blockstore blockstore.Blockstore) (pieceCid cid.Cid, payloadSize uint64, unPaddedPieceSize abi.UnpaddedPieceSize, err error) { return filclient.GeneratePieceCommitment(context, payloadCid, blockstore) } // GenerateCommPCarV2 Generating a CommP file from a CARv2 file. +// Generating a CommP file from a CARv2 file. func (c CommpService) GenerateCommPCarV2(readerFromFile io.Reader) (*abi.PieceInfo, error) { bytesFromCar, err := io.ReadAll(readerFromFile) rd, err := carv2.NewReader(bytes.NewReader(bytesFromCar)) @@ -72,6 +74,7 @@ func (c CommpService) GenerateCommPCarV2(readerFromFile io.Reader) (*abi.PieceIn } // GetSize Getting the size of the file. +// Getting the size of the file. func (c CommpService) GetSize(stream io.Reader) int { buf := new(bytes.Buffer) buf.ReadFrom(stream) @@ -79,6 +82,7 @@ func (c CommpService) GetSize(stream io.Reader) int { } // GetCarSize Getting the size of the CARv2 file. +// Getting the size of the CARv2 file. func (c CommpService) GetCarSize(stream io.Reader, rd *carv2.Reader) (int64, error) { var size int64 switch rd.Version { diff --git a/core/job.go b/core/job.go index 5b8a9ac..a401eff 100644 --- a/core/job.go +++ b/core/job.go @@ -116,6 +116,7 @@ func (d *Dispatcher) AddJob(je IProcessor) { } // AddJobAndDispatch Adding a job to the jobQueue, and then starting the dispatcher with a number of workers. +// It's adding a job to the jobQueue, and then starting the dispatcher with a number of workers. func (d *Dispatcher) AddJobAndDispatch(je IProcessor, numWorkers int) { j := &Job{ID: d.jobCounter, Processor: je} go func() { d.jobQueue <- j }() @@ -125,6 +126,7 @@ func (d *Dispatcher) AddJobAndDispatch(je IProcessor, numWorkers int) { } // Finished It's a method that returns true if the jobCounter is less than 1. +// It's a method that returns true if the jobCounter is less than 1. func (d *Dispatcher) Finished() bool { if d.jobCounter < 1 { return true diff --git a/core/libp2p.go b/core/libp2p.go index 60161ec..cf16b86 100644 --- a/core/libp2p.go +++ b/core/libp2p.go @@ -13,6 +13,7 @@ import ( ) // SetFilclientLibp2pSubscribe It subscribes to the libp2p transfer manager and updates the database with the status of the transfer +// It sets the filclient libp2p subscribe. func SetFilclientLibp2pSubscribe(filc *fc.FilClient, i *DeltaNode) { filc.Libp2pTransferMgr.Subscribe(func(dbid uint, fst fc.ChannelState) { switch fst.Status { diff --git a/core/node.go b/core/node.go index a61c324..d22d39d 100644 --- a/core/node.go +++ b/core/node.go @@ -84,6 +84,7 @@ type NewLightNodeParams struct { } // NewLightNode Creating a new light node. +// > This function creates a new DeltaNode with a new light client func NewLightNode(repo NewLightNodeParams) (*DeltaNode, error) { // database @@ -167,6 +168,7 @@ func LotusConnection(fullNodeApiInfo string) (v1api.FullNode, jsonrpc.ClientClos } // SetupWallet Creating a new wallet and setting it as the default wallet. +// > SetupWallet creates a new wallet and returns it func SetupWallet(dir string) (*wallet.LocalWallet, error) { kstore, err := keystore.OpenOrInitKeystore(dir) if err != nil { @@ -198,6 +200,7 @@ func SetupWallet(dir string) (*wallet.LocalWallet, error) { } // GetPublicIP Getting the public IP of the node. +// > GetPublicIP() returns the public IP address of the machine it's running on func GetPublicIP() (string, error) { resp, err := http.Get("https://ifconfig.me") // important to get the public ip if possible. if err != nil { diff --git a/core/repair.go b/core/repair.go index 86b1e9b..e3cbc64 100644 --- a/core/repair.go +++ b/core/repair.go @@ -16,6 +16,7 @@ func NewRepairService(dn DeltaNode) *RepairService { } } +// RecreateDeal A method of RepairService. // A method of RepairService. func (r RepairService) RecreateDeal(param RepairParam) (RepairResult, error) { return RepairResult{}, nil diff --git a/core/stats.go b/core/stats.go index 152f2ee..40f626f 100644 --- a/core/stats.go +++ b/core/stats.go @@ -46,7 +46,7 @@ func NewStatsStatsService(deltaNode *DeltaNode) *StatsService { } } -// A function that returns a StatsResult and an error. +// Status A function that returns a StatsResult and an error. func (s *StatsService) Status(param StatsParam) (StatsResult, error) { var content []model.Content s.DeltaNode.DB.Raw("select c.* from content_deals cd, contents c where cd.content = c.id and c.requesting_api_key = ?", param.RequestingApiKey).Scan(&content) @@ -64,7 +64,7 @@ func (s *StatsService) Status(param StatsParam) (StatsResult, error) { PieceCommitments: pieceCommitments}, nil } -// A function that returns a StatsPieceCommitmentResult and an error. +// PieceCommitmentStatus A function that returns a StatsPieceCommitmentResult and an error. func (s *StatsService) PieceCommitmentStatus(param PieceCommitmentStatsParam) (StatsPieceCommitmentResult, error) { // select * from piece_commitments pc, content c where c.piece_commitment_id = pc.id and c.requesting_api_key = ?; var pieceCommitment model.PieceCommitment @@ -74,7 +74,7 @@ func (s *StatsService) PieceCommitmentStatus(param PieceCommitmentStatsParam) (S PieceCommitments: pieceCommitment}, nil } -// A function that returns a StatsContentResult and an error. +// ContentStatus A function that returns a StatsContentResult and an error. func (s *StatsService) ContentStatus(param ContentStatsParam) (StatsContentResult, error) { var content model.Content s.DeltaNode.DB.Raw("select c.* from content_deals cd, contents c where cd.content = c.id and c.requesting_api_key = ? and c.id = ?", param.RequestingApiKey, param.ContentId).Scan(&content) @@ -82,7 +82,7 @@ func (s *StatsService) ContentStatus(param ContentStatsParam) (StatsContentResul return StatsContentResult{Content: content}, nil } -// A function that returns a StatsDealResult and an error. +// DealStatus A function that returns a StatsDealResult and an error. func (s *StatsService) DealStatus(param DealStatsParam) (StatsDealResult, error) { var contentDeal model.ContentDeal s.DeltaNode.DB.Raw("select cd.* from content_deals cd, contents c where cd.content = c.id and c.requesting_api_key = ? and cd.id = ?", param.RequestingApiKey, param.DealId).Scan(&contentDeal) diff --git a/core/status_logger.go b/core/status_logger.go index b328d78..4e6aeee 100644 --- a/core/status_logger.go +++ b/core/status_logger.go @@ -24,7 +24,7 @@ func (s *StatusLogger) UpdateContentStatus(content model.Content, status string) return nil } -// Updating the status of a piece commitment object. +// UpdatePieceCommStatus Updating the status of a piece commitment object. func (s *StatusLogger) UpdatePieceCommStatus(pieceCommp model.PieceCommitment, status string) error { tx := s.LightNode.DB.Model(&pieceCommp).Update("status", status) if tx.Error != nil { @@ -33,7 +33,7 @@ func (s *StatusLogger) UpdatePieceCommStatus(pieceCommp model.PieceCommitment, s return nil } -// Updating the status of a content deal object. +// UpdateContentDealStatus Updating the status of a content deal object. func (s *StatusLogger) UpdateContentDealStatus(pieceCommp model.ContentDeal, status string) error { tx := s.LightNode.DB.Model(&pieceCommp).Update("status", status) if tx.Error != nil { diff --git a/core/upload_offline.go b/core/upload_offline.go deleted file mode 100644 index 0840682..0000000 --- a/core/upload_offline.go +++ /dev/null @@ -1,42 +0,0 @@ -package core - -import "time" - -type UploadOfflineParam struct { - File []byte - Filename string - Replication int - Miner string - WalletID string -} - -type UploadOfflineCommpParam struct { - UploadOnlineParam - Duration time.Duration - Size int64 -} - -type UploadOfflineResult struct { -} - -type UploadOfflineService struct { - DeltaNode DeltaNode -} - -func NewUploadOfflineService(dn DeltaNode) *UploadOfflineService { - return &UploadOfflineService{ - DeltaNode: dn, - } -} - -func (u *UploadOfflineService) Add(param UploadOfflineParam) (UploadOfflineResult, error) { - return UploadOfflineResult{}, nil -} - -func (u *UploadOfflineService) List(param UploadOfflineParam) (UploadOfflineResult, error) { - return UploadOfflineResult{}, nil -} - -func (u *UploadOfflineService) Commp(param UploadOfflineCommpParam) (UploadOfflineResult, error) { - return UploadOfflineResult{}, nil -} diff --git a/core/upload_online.go b/core/upload_online.go deleted file mode 100644 index 6a0eacc..0000000 --- a/core/upload_online.go +++ /dev/null @@ -1,42 +0,0 @@ -package core - -import "time" - -type UploadOnlineParam struct { - File []byte - Filename string - Replication int - Miner string - WalletID string -} - -type UploadOnlineCommpParam struct { - UploadOnlineParam - Duration time.Duration - Size int64 -} - -type UploadOnlineResult struct { -} - -type UploadOnlineService struct { - DeltaNode DeltaNode -} - -func NewUploadOnlineService(dn DeltaNode) *UploadOnlineService { - return &UploadOnlineService{ - DeltaNode: dn, - } -} - -func (u UploadOnlineService) Add(param UploadOnlineParam) (UploadOnlineResult, error) { - return UploadOnlineResult{}, nil -} - -func (u UploadOnlineService) List(param UploadOnlineParam) (UploadOnlineResult, error) { - return UploadOnlineResult{}, nil -} - -func (u UploadOnlineService) Commp(param UploadOnlineCommpParam) (UploadOnlineResult, error) { - return UploadOnlineResult{}, nil -} diff --git a/core/wallet.go b/core/wallet.go index 2c914ec..d8bfbfe 100644 --- a/core/wallet.go +++ b/core/wallet.go @@ -60,14 +60,14 @@ type WalletService struct { DeltaNode *DeltaNode } -// Creating a new wallet service. +// NewWalletService Creating a new wallet service. func NewWalletService(dn *DeltaNode) *WalletService { return &WalletService{ DeltaNode: dn, } } -// Creating a new wallet and saving it to the database. +// Create Creating a new wallet and saving it to the database. func (w WalletService) Create(param CreateWalletParam) (AddWalletResult, error) { newWallet, err := wallet.NewWallet(wallet.NewMemKeyStore()) if err != nil { @@ -97,7 +97,7 @@ func (w WalletService) Create(param CreateWalletParam) (AddWalletResult, error) }, nil } -// Importing a wallet. +// Import Importing a wallet. func (w WalletService) Import(param ImportWalletParam) (ImportWalletResult, error) { newWallet, err := wallet.NewWallet(wallet.NewMemKeyStore()) if err != nil { @@ -136,8 +136,7 @@ func (w WalletService) Import(param ImportWalletParam) (ImportWalletResult, erro }, nil } -// Deleting the wallet from the database. - +// Remove Deleting the wallet from the database. func (w WalletService) Remove(param RemoveWalletParam) (DeleteWalletResult, error) { err := w.DeltaNode.DB.Delete(&model.Wallet{}).Where("owner = ? and addr = ?", param.RequestingApiKey, param.Address).Error if err != nil { @@ -152,7 +151,7 @@ func (w WalletService) Remove(param RemoveWalletParam) (DeleteWalletResult, erro }, nil } -// A function that takes a WalletParam and returns a list of model.Wallet and an error. +// List A function that takes a WalletParam and returns a list of model.Wallet and an error. func (w WalletService) List(param WalletParam) ([]model.Wallet, error) { var wallets []model.Wallet w.DeltaNode.DB.Model(&model.Wallet{}).Where("owner = ?", param.RequestingApiKey).Find(&wallets) @@ -160,6 +159,7 @@ func (w WalletService) List(param WalletParam) ([]model.Wallet, error) { // Getting the wallet from the database. } +// Getting the wallet from the database. func (w WalletService) Get(param GetWalletParam) (model.Wallet, error) { var wallet model.Wallet w.DeltaNode.DB.Model(&model.Wallet{}).Where("owner = ? and addr = ?", param.RequestingApiKey, param.Address).Find(&wallet) diff --git a/core/websocket.go b/core/websocket.go index 9258d1e..89b304d 100644 --- a/core/websocket.go +++ b/core/websocket.go @@ -4,12 +4,15 @@ type WebsocketService struct { DeltaNode *DeltaNode } +// `NewWebsocketService` creates a new `WebsocketService` struct and returns a pointer to it func NewWebsocketService(dn *DeltaNode) *WebsocketService { return &WebsocketService{ DeltaNode: dn, } } +// HandlePieceCommitmentMessages A function that is listening to the channel `ws.DeltaNode.WebsocketBroadcast.PieceCommitmentChannel.Channel` and when it +// receives a message, it broadcasts it to all clients. func (ws *WebsocketService) HandlePieceCommitmentMessages() error { for { message := <-ws.DeltaNode.WebsocketBroadcast.PieceCommitmentChannel.Channel @@ -21,11 +24,12 @@ func (ws *WebsocketService) HandlePieceCommitmentMessages() error { delete(ws.DeltaNode.WebsocketBroadcast.PieceCommitmentChannel.Clients, client) } } - } return nil } +// HandleContentDealMessages Listening to the channel `ws.DeltaNode.WebsocketBroadcast.ContentDealChannel.Channel` and when it +// // receives a message, it broadcasts it to all clients. func (ws *WebsocketService) HandleContentDealMessages() error { for { message := <-ws.DeltaNode.WebsocketBroadcast.ContentDealChannel.Channel @@ -42,6 +46,8 @@ func (ws *WebsocketService) HandleContentDealMessages() error { return nil } +// HandleContentMessages Listening to the channel `ws.DeltaNode.WebsocketBroadcast.ContentChannel.Channel` and when it +// // receives a message, it broadcasts it to all clients. func (ws *WebsocketService) HandleContentMessages() error { for { message := <-ws.DeltaNode.WebsocketBroadcast.ContentChannel.Channel diff --git a/go.mod b/go.mod index 50417de..1aa89fe 100644 --- a/go.mod +++ b/go.mod @@ -303,4 +303,4 @@ require ( replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi -//replace github.com/application-research/delta-db => ../delta-events +replace github.com/application-research/delta-db => ../delta-events diff --git a/jobs/make_signed_deal.go b/jobs/make_signed_deal.go new file mode 100644 index 0000000..042d274 --- /dev/null +++ b/jobs/make_signed_deal.go @@ -0,0 +1,308 @@ +package jobs + +import ( + "context" + "delta/core" + "delta/utils" + "encoding/base64" + "encoding/hex" + "encoding/json" + "fmt" + model "github.com/application-research/delta-db/db_models" + fc "github.com/application-research/filclient" + smtypes "github.com/filecoin-project/boost/storagemarket/types" + "github.com/filecoin-project/boost/transport/httptransport" + boosttypes "github.com/filecoin-project/boost/transport/types" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/builtin/v9/market" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/wallet" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multiaddr" + "golang.org/x/xerrors" + "time" +) + +type SignedDealMakerProcessor struct { + // It creates a new `StorageDealMakerProcessor` object, which is a type of `IProcessor` object + Context context.Context + LightNode *core.DeltaNode + Content *model.Content + PieceComm *model.PieceCommitment +} + +func NewSignedDealMakerProcessor(ln *core.DeltaNode, content model.Content, commitment model.PieceCommitment) IProcessor { + return &SignedDealMakerProcessor{ + LightNode: ln, + Content: &content, + PieceComm: &commitment, + Context: context.Background(), + } +} + +// Run The above code is a function that is part of the StorageDealMakerProcessor struct. It is a function that is called when +// the StorageDealMakerProcessor is run. It calls the makeStorageDeal function, which is defined in the same file. +func (i SignedDealMakerProcessor) Run() error { + err := i.makeUnsignedStorageDeal(i.Content, i.PieceComm) + if err != nil { + fmt.Println(err) + return err + } + return nil +} + +// Making a deal with the miner. +func (i *SignedDealMakerProcessor) makeUnsignedStorageDeal(content *model.Content, pieceComm *model.PieceCommitment) error { + + i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ + Status: utils.CONTENT_DEAL_MAKING_PROPOSAL, //"making-deal-proposal", + }) + + // any error here, fail the content + var minerAddress = i.GetAssignedMinerForContent(*content).Address + var filClient, err = i.GetAssignedFilclientForContent(*content) + //var WallerSigner, err = i.GetAssignedWalletForContent(*content) + var dealProposal = i.GetDealProposalForContent(*content) + + if err != nil { + i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ + Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", + LastMessage: err.Error(), + }) + return err + } + + if err != nil { + i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ + Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", + LastMessage: err.Error(), + }) + return err + } + + priceBigInt, err := types.BigFromString("0") + + var dealDuration = utils.DEFAULT_DURATION + if dealProposal.ID != 0 { + dealDuration = int(dealProposal.Duration) + } + duration := abi.ChainEpoch(dealDuration) + payloadCid, err := cid.Decode(pieceComm.Cid) + if err != nil { + i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ + Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", + LastMessage: err.Error(), + }) + } + + pieceCid, err := cid.Decode(pieceComm.Piece) + if err != nil { + i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ + Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", + LastMessage: err.Error(), + }) + } + + // label deal + label, err := market.NewLabelFromString(dealProposal.Label) + if err != nil { + i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ + Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", + LastMessage: err.Error(), + }) + } + + prop, rawUnsigned, err := filClient.MakeDealUnsigned(i.Context, minerAddress, payloadCid, priceBigInt, duration, + fc.DealWithVerified(true), + fc.DealWithFastRetrieval(!dealProposal.RemoveUnsealedCopy), + fc.DealWithLabel(label), + fc.DealWithPieceInfo(fc.DealPieceInfo{ + Cid: pieceCid, + Size: abi.PaddedPieceSize(pieceComm.PaddedPieceSize), + PayloadSize: uint64(pieceComm.Size), + }), + ) + + if err != nil { + i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ + Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", + LastMessage: err.Error(), + }) + return err + } + + // struct to json + encodedDealProposal, err := json.Marshal(prop) + encodedUnsignedDealProposal := hex.EncodeToString(rawUnsigned) + // save the unsigned deal + var unsignedDeal = model.ContentDealProposal{ + Content: content.ID, + Unsigned: encodedUnsignedDealProposal, + Meta: string(encodedDealProposal), + CreatedAt: time.Time{}, + UpdatedAt: time.Time{}, + } + i.LightNode.DB.Create(&unsignedDeal) + return nil + +} + +// GetAssignedMinerForContent Getting the miner address for the content. +func (i *SignedDealMakerProcessor) GetAssignedMinerForContent(content model.Content) MinerAddress { + var storageMinerAssignment model.ContentMiner + i.LightNode.DB.Model(&model.ContentMiner{}).Where("content = ?", content.ID).Find(&storageMinerAssignment) + if storageMinerAssignment.ID != 0 { + address.CurrentNetwork = address.Mainnet + a, err := address.NewFromString(storageMinerAssignment.Miner) + if err != nil { + fmt.Println("error on miner address", err, a) + } + return MinerAddress{Address: a} + } + return i.GetStorageProviders()[0] +} + +// Getting the content deal proposal parameters for a given content. +func (i *SignedDealMakerProcessor) GetDealProposalForContent(content model.Content) model.ContentDealProposalParameters { + var contentDealProposalParameters model.ContentDealProposalParameters + i.LightNode.DB.Model(&model.ContentDealProposalParameters{}).Where("content = ?", content.ID).Find(&contentDealProposalParameters) + return contentDealProposalParameters +} + +// Getting the assigned filclient for the content. +func (i *SignedDealMakerProcessor) GetAssignedFilclientForContent(content model.Content) (*fc.FilClient, error) { + api, _, err := core.LotusConnection(utils.LOTUS_API) + if err != nil { + return nil, err + } + + var storageWalletAssignment model.ContentWallet + i.LightNode.DB.Model(&model.ContentWallet{}).Where("content = ?", content.ID).Find(&storageWalletAssignment) + + if storageWalletAssignment.ID != 0 { + newWallet, err := wallet.NewWallet(wallet.NewMemKeyStore()) + + var walletMeta WalletMeta + + json.Unmarshal([]byte(storageWalletAssignment.Wallet), &walletMeta) + unhexPkey, err := hex.DecodeString(walletMeta.PrivateKey) + decodedPkey, err := base64.StdEncoding.DecodeString(string(unhexPkey)) + + if err != nil { + fmt.Println("error on unhex", err) + return nil, err + } + + newWalletAddr, err := newWallet.WalletImport(context.Background(), &types.KeyInfo{ + Type: types.KeyType(walletMeta.KeyType), + PrivateKey: decodedPkey, + }) + + if err != nil { + fmt.Println("error on wallet_estuary import", err) + return nil, err + } + // new filclient just for this request + filclient, err := fc.NewClient(i.LightNode.Node.Host, api, newWallet, newWalletAddr, i.LightNode.Node.Blockstore, i.LightNode.Node.Datastore, i.LightNode.Node.Config.DatastoreDir.Directory) + if err != nil { + fmt.Println("error on filclient", err) + return nil, err + } + core.SetFilclientLibp2pSubscribe(filclient, i.LightNode) + return filclient, err + } + + return i.LightNode.FilClient, err +} + +// GetStorageProviders Getting the storage providers. +func (i *SignedDealMakerProcessor) GetStorageProviders() []MinerAddress { + var storageProviders []MinerAddress + for _, s := range mainnetMinerStrs { + address.CurrentNetwork = address.Mainnet + a, err := address.NewFromString(s) + if err != nil { + fmt.Println("error on miner address", err, a) + } + storageProviders = append(storageProviders, MinerAddress{Address: a}) + } + return storageProviders +} + +// Sending a proposal to the peer. +func (i *SignedDealMakerProcessor) sendProposalV120(ctx context.Context, netprop network.Proposal, propCid cid.Cid, dealUUID uuid.UUID, dbid uint, skipIpniAnnounce bool) (bool, error) { + // Create an auth token to be used in the request + authToken, err := httptransport.GenerateAuthToken() + if err != nil { + return false, xerrors.Errorf("generating auth token for deal: %w", err) + } + + rootCid := netprop.Piece.Root + size := netprop.Piece.RawBlockSize + var announceAddr multiaddr.Multiaddr + + if len(i.LightNode.Node.Config.AnnounceAddrs) == 0 { + return false, xerrors.Errorf("cannot serve deal data: no announce address configured for estuary node") + } + + addrstr := i.LightNode.Node.Config.AnnounceAddrs[1] + "/p2p/" + i.LightNode.Node.Host.ID().String() + announceAddr, err = multiaddr.NewMultiaddr(addrstr) + if err != nil { + return false, xerrors.Errorf("cannot parse announce address '%s': %w", addrstr, err) + } + + // Add an auth token for the data to the auth DB + err = i.LightNode.FilClient.Libp2pTransferMgr.PrepareForDataRequest(ctx, dbid, authToken, propCid, rootCid, size) + if err != nil { + return false, xerrors.Errorf("preparing for data request: %w", err) + } + + // Send the deal proposal to the storage provider + transferParams, err := json.Marshal(boosttypes.HttpRequest{ + URL: "libp2p://" + announceAddr.String(), + Headers: map[string]string{ + "Authorization": httptransport.BasicAuthHeader("", authToken), + }, + }) + + // Send the deal proposal to the storage provider + var propPhase bool + //var err error + if i.Content.ConnectionMode == utils.CONNECTION_MODE_IMPORT { + propPhase, err = i.LightNode.FilClient.SendProposalV120WithOptions( + ctx, netprop, + fc.ProposalV120WithDealUUID(dealUUID), + fc.ProposalV120WithLibp2pTransfer(announceAddr, authToken, dbid), + fc.ProposalV120WithOffline(true), + fc.ProposalV120WithSkipIPNIAnnounce(skipIpniAnnounce), + fc.ProposalV120WithTransfer(smtypes.Transfer{ + Type: "libp2p", + ClientID: fmt.Sprintf("%d", dbid), + Params: transferParams, + Size: netprop.Piece.RawBlockSize, + }), + ) + } else { + propPhase, err = i.LightNode.FilClient.SendProposalV120WithOptions( + ctx, netprop, + fc.ProposalV120WithDealUUID(dealUUID), + fc.ProposalV120WithLibp2pTransfer(announceAddr, authToken, dbid), + fc.ProposalV120WithSkipIPNIAnnounce(skipIpniAnnounce), + fc.ProposalV120WithTransfer(smtypes.Transfer{ + Type: "libp2p", + ClientID: fmt.Sprintf("%d", dbid), + Params: transferParams, + Size: netprop.Piece.RawBlockSize, + }), + ) + } + + if err != nil { + i.LightNode.FilClient.Libp2pTransferMgr.CleanupPreparedRequest(i.Context, dbid, authToken) + } + + return propPhase, err +} diff --git a/jobs/make_unsigned_deal.go b/jobs/make_unsigned_deal.go new file mode 100644 index 0000000..0337da8 --- /dev/null +++ b/jobs/make_unsigned_deal.go @@ -0,0 +1,308 @@ +package jobs + +import ( + "context" + "delta/core" + "delta/utils" + "encoding/base64" + "encoding/hex" + "encoding/json" + "fmt" + model "github.com/application-research/delta-db/db_models" + fc "github.com/application-research/filclient" + smtypes "github.com/filecoin-project/boost/storagemarket/types" + "github.com/filecoin-project/boost/transport/httptransport" + boosttypes "github.com/filecoin-project/boost/transport/types" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/storagemarket/network" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/builtin/v9/market" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/wallet" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multiaddr" + "golang.org/x/xerrors" + "time" +) + +type UnsignedDealMakerProcessor struct { + // It creates a new `StorageDealMakerProcessor` object, which is a type of `IProcessor` object + Context context.Context + LightNode *core.DeltaNode + Content *model.Content + PieceComm *model.PieceCommitment +} + +func NewUnsignedDealMakerProcessor(ln *core.DeltaNode, content model.Content, commitment model.PieceCommitment) IProcessor { + return &UnsignedDealMakerProcessor{ + LightNode: ln, + Content: &content, + PieceComm: &commitment, + Context: context.Background(), + } +} + +// Run The above code is a function that is part of the StorageDealMakerProcessor struct. It is a function that is called when +// the StorageDealMakerProcessor is run. It calls the makeStorageDeal function, which is defined in the same file. +func (i UnsignedDealMakerProcessor) Run() error { + err := i.makeUnsignedStorageDeal(i.Content, i.PieceComm) + if err != nil { + fmt.Println(err) + return err + } + return nil +} + +// Making a deal with the miner. +func (i *UnsignedDealMakerProcessor) makeUnsignedStorageDeal(content *model.Content, pieceComm *model.PieceCommitment) error { + + i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ + Status: utils.CONTENT_DEAL_MAKING_PROPOSAL, //"making-deal-proposal", + }) + + // any error here, fail the content + var minerAddress = i.GetAssignedMinerForContent(*content).Address + var filClient, err = i.GetAssignedFilclientForContent(*content) + //var WallerSigner, err = i.GetAssignedWalletForContent(*content) + var dealProposal = i.GetDealProposalForContent(*content) + + if err != nil { + i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ + Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", + LastMessage: err.Error(), + }) + return err + } + + if err != nil { + i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ + Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", + LastMessage: err.Error(), + }) + return err + } + + priceBigInt, err := types.BigFromString("0") + + var dealDuration = utils.DEFAULT_DURATION + if dealProposal.ID != 0 { + dealDuration = int(dealProposal.Duration) + } + duration := abi.ChainEpoch(dealDuration) + payloadCid, err := cid.Decode(pieceComm.Cid) + if err != nil { + i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ + Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", + LastMessage: err.Error(), + }) + } + + pieceCid, err := cid.Decode(pieceComm.Piece) + if err != nil { + i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ + Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", + LastMessage: err.Error(), + }) + } + + // label deal + label, err := market.NewLabelFromString(dealProposal.Label) + if err != nil { + i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ + Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", + LastMessage: err.Error(), + }) + } + + prop, rawUnsigned, err := filClient.MakeDealUnsigned(i.Context, minerAddress, payloadCid, priceBigInt, duration, + fc.DealWithVerified(true), + fc.DealWithFastRetrieval(!dealProposal.RemoveUnsealedCopy), + fc.DealWithLabel(label), + fc.DealWithPieceInfo(fc.DealPieceInfo{ + Cid: pieceCid, + Size: abi.PaddedPieceSize(pieceComm.PaddedPieceSize), + PayloadSize: uint64(pieceComm.Size), + }), + ) + + if err != nil { + i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ + Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", + LastMessage: err.Error(), + }) + return err + } + + // struct to json + encodedDealProposal, err := json.Marshal(prop) + encodedUnsignedDealProposal := hex.EncodeToString(rawUnsigned) + // save the unsigned deal + var unsignedDeal = model.ContentDealProposal{ + Content: content.ID, + Unsigned: encodedUnsignedDealProposal, + Meta: string(encodedDealProposal), + CreatedAt: time.Time{}, + UpdatedAt: time.Time{}, + } + i.LightNode.DB.Create(&unsignedDeal) + return nil + +} + +// GetAssignedMinerForContent Getting the miner address for the content. +func (i *UnsignedDealMakerProcessor) GetAssignedMinerForContent(content model.Content) MinerAddress { + var storageMinerAssignment model.ContentMiner + i.LightNode.DB.Model(&model.ContentMiner{}).Where("content = ?", content.ID).Find(&storageMinerAssignment) + if storageMinerAssignment.ID != 0 { + address.CurrentNetwork = address.Mainnet + a, err := address.NewFromString(storageMinerAssignment.Miner) + if err != nil { + fmt.Println("error on miner address", err, a) + } + return MinerAddress{Address: a} + } + return i.GetStorageProviders()[0] +} + +// Getting the content deal proposal parameters for a given content. +func (i *UnsignedDealMakerProcessor) GetDealProposalForContent(content model.Content) model.ContentDealProposalParameters { + var contentDealProposalParameters model.ContentDealProposalParameters + i.LightNode.DB.Model(&model.ContentDealProposalParameters{}).Where("content = ?", content.ID).Find(&contentDealProposalParameters) + return contentDealProposalParameters +} + +// Getting the assigned filclient for the content. +func (i *UnsignedDealMakerProcessor) GetAssignedFilclientForContent(content model.Content) (*fc.FilClient, error) { + api, _, err := core.LotusConnection(utils.LOTUS_API) + if err != nil { + return nil, err + } + + var storageWalletAssignment model.ContentWallet + i.LightNode.DB.Model(&model.ContentWallet{}).Where("content = ?", content.ID).Find(&storageWalletAssignment) + + if storageWalletAssignment.ID != 0 { + newWallet, err := wallet.NewWallet(wallet.NewMemKeyStore()) + + var walletMeta WalletMeta + + json.Unmarshal([]byte(storageWalletAssignment.Wallet), &walletMeta) + unhexPkey, err := hex.DecodeString(walletMeta.PrivateKey) + decodedPkey, err := base64.StdEncoding.DecodeString(string(unhexPkey)) + + if err != nil { + fmt.Println("error on unhex", err) + return nil, err + } + + newWalletAddr, err := newWallet.WalletImport(context.Background(), &types.KeyInfo{ + Type: types.KeyType(walletMeta.KeyType), + PrivateKey: decodedPkey, + }) + + if err != nil { + fmt.Println("error on wallet_estuary import", err) + return nil, err + } + // new filclient just for this request + filclient, err := fc.NewClient(i.LightNode.Node.Host, api, newWallet, newWalletAddr, i.LightNode.Node.Blockstore, i.LightNode.Node.Datastore, i.LightNode.Node.Config.DatastoreDir.Directory) + if err != nil { + fmt.Println("error on filclient", err) + return nil, err + } + core.SetFilclientLibp2pSubscribe(filclient, i.LightNode) + return filclient, err + } + + return i.LightNode.FilClient, err +} + +// GetStorageProviders Getting the storage providers. +func (i *UnsignedDealMakerProcessor) GetStorageProviders() []MinerAddress { + var storageProviders []MinerAddress + for _, s := range mainnetMinerStrs { + address.CurrentNetwork = address.Mainnet + a, err := address.NewFromString(s) + if err != nil { + fmt.Println("error on miner address", err, a) + } + storageProviders = append(storageProviders, MinerAddress{Address: a}) + } + return storageProviders +} + +// Sending a proposal to the peer. +func (i *UnsignedDealMakerProcessor) sendProposalV120(ctx context.Context, netprop network.Proposal, propCid cid.Cid, dealUUID uuid.UUID, dbid uint, skipIpniAnnounce bool) (bool, error) { + // Create an auth token to be used in the request + authToken, err := httptransport.GenerateAuthToken() + if err != nil { + return false, xerrors.Errorf("generating auth token for deal: %w", err) + } + + rootCid := netprop.Piece.Root + size := netprop.Piece.RawBlockSize + var announceAddr multiaddr.Multiaddr + + if len(i.LightNode.Node.Config.AnnounceAddrs) == 0 { + return false, xerrors.Errorf("cannot serve deal data: no announce address configured for estuary node") + } + + addrstr := i.LightNode.Node.Config.AnnounceAddrs[1] + "/p2p/" + i.LightNode.Node.Host.ID().String() + announceAddr, err = multiaddr.NewMultiaddr(addrstr) + if err != nil { + return false, xerrors.Errorf("cannot parse announce address '%s': %w", addrstr, err) + } + + // Add an auth token for the data to the auth DB + err = i.LightNode.FilClient.Libp2pTransferMgr.PrepareForDataRequest(ctx, dbid, authToken, propCid, rootCid, size) + if err != nil { + return false, xerrors.Errorf("preparing for data request: %w", err) + } + + // Send the deal proposal to the storage provider + transferParams, err := json.Marshal(boosttypes.HttpRequest{ + URL: "libp2p://" + announceAddr.String(), + Headers: map[string]string{ + "Authorization": httptransport.BasicAuthHeader("", authToken), + }, + }) + + // Send the deal proposal to the storage provider + var propPhase bool + //var err error + if i.Content.ConnectionMode == utils.CONNECTION_MODE_IMPORT { + propPhase, err = i.LightNode.FilClient.SendProposalV120WithOptions( + ctx, netprop, + fc.ProposalV120WithDealUUID(dealUUID), + fc.ProposalV120WithLibp2pTransfer(announceAddr, authToken, dbid), + fc.ProposalV120WithOffline(true), + fc.ProposalV120WithSkipIPNIAnnounce(skipIpniAnnounce), + fc.ProposalV120WithTransfer(smtypes.Transfer{ + Type: "libp2p", + ClientID: fmt.Sprintf("%d", dbid), + Params: transferParams, + Size: netprop.Piece.RawBlockSize, + }), + ) + } else { + propPhase, err = i.LightNode.FilClient.SendProposalV120WithOptions( + ctx, netprop, + fc.ProposalV120WithDealUUID(dealUUID), + fc.ProposalV120WithLibp2pTransfer(announceAddr, authToken, dbid), + fc.ProposalV120WithSkipIPNIAnnounce(skipIpniAnnounce), + fc.ProposalV120WithTransfer(smtypes.Transfer{ + Type: "libp2p", + ClientID: fmt.Sprintf("%d", dbid), + Params: transferParams, + Size: netprop.Piece.RawBlockSize, + }), + ) + } + + if err != nil { + i.LightNode.FilClient.Libp2pTransferMgr.CleanupPreparedRequest(i.Context, dbid, authToken) + } + + return propPhase, err +} diff --git a/jobs/piece_commp_compute.go b/jobs/piece_commp_compute.go index f63cae9..b237d4d 100644 --- a/jobs/piece_commp_compute.go +++ b/jobs/piece_commp_compute.go @@ -70,6 +70,7 @@ func (i PieceCommpProcessor) Run() error { i.LightNode.DB.Model(&model.Content{}).Where("id = ?", i.Content.ID).Updates(model.Content{ Status: utils.CONTENT_FAILED_TO_PROCESS, LastMessage: err.Error(), + UpdatedAt: time.Now(), }) return err } @@ -79,6 +80,7 @@ func (i PieceCommpProcessor) Run() error { unPaddedPieceSize = pieceInfo.Size.Unpadded() payloadSize = uint64(len(bytesFromCar)) + } else { pieceCid, payloadSize, unPaddedPieceSize, err = i.CommpService.GenerateCommPFile(i.Context, payloadCid, i.LightNode.Node.Blockstore) paddedPieceSize = unPaddedPieceSize.Padded() @@ -86,6 +88,7 @@ func (i PieceCommpProcessor) Run() error { i.LightNode.DB.Model(&model.Content{}).Where("id = ?", i.Content.ID).Updates(model.Content{ Status: utils.CONTENT_FAILED_TO_PROCESS, LastMessage: err.Error(), + UpdatedAt: time.Now(), }) return err } diff --git a/jobs/storage_deal_maker.go b/jobs/storage_deal_maker.go index 600f469..6d7817f 100644 --- a/jobs/storage_deal_maker.go +++ b/jobs/storage_deal_maker.go @@ -86,8 +86,10 @@ func (i StorageDealMakerProcessor) Run() error { // Making a deal with the miner. func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, pieceComm *model.PieceCommitment) error { + // update the status i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ - Status: utils.CONTENT_DEAL_MAKING_PROPOSAL, //"making-deal-proposal", + Status: utils.CONTENT_DEAL_MAKING_PROPOSAL, //"making-deal-proposal", + UpdatedAt: time.Now(), }) // any error here, fail the content @@ -100,6 +102,7 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) return err } @@ -108,6 +111,7 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) return err } @@ -124,6 +128,7 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) } @@ -132,6 +137,7 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) } @@ -141,6 +147,7 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) } @@ -159,6 +166,7 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) return err } @@ -174,6 +182,7 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) return err } @@ -184,6 +193,7 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) return err } @@ -196,7 +206,6 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec DealProtocolVersion: string(proto), CreatedAt: time.Now(), UpdatedAt: time.Now(), - //MinerVersion: ask.MinerVersion, } if err := i.LightNode.DB.Create(deal).Error; err != nil { i.LightNode.Dispatcher.AddJobAndDispatch(NewStorageDealMakerProcessor(i.LightNode, *content, *pieceComm), 1) @@ -211,8 +220,11 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec // log and send the proposal over i.LightNode.DB.Create(&model.ContentDealProposal{ - Content: content.ID, - Meta: propString, + Content: content.ID, + Meta: propString, + Signed: propString, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), }) // send the proposal. @@ -222,10 +234,12 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&deal).Where("id = ?", deal.ID).Updates(model.ContentDeal{ LastMessage: err.Error(), Failed: true, // mark it as failed + UpdatedAt: time.Now(), }) i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) return err } @@ -235,10 +249,12 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&deal).Where("id = ?", deal.ID).Updates(model.ContentDeal{ LastMessage: err.Error(), Failed: true, // mark it as failed + UpdatedAt: time.Now(), }) i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) return nil } @@ -247,10 +263,12 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&deal).Where("id = ?", deal.ID).Updates(model.ContentDeal{ LastMessage: err.Error(), Failed: true, // mark it as failed + UpdatedAt: time.Now(), }) i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) return nil } @@ -259,11 +277,13 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&deal).Where("id = ?", deal.ID).Updates(model.ContentDeal{ LastMessage: err.Error(), Failed: true, // mark it as failed + UpdatedAt: time.Now(), }) i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) return err } @@ -272,11 +292,13 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&deal).Where("id = ?", deal.ID).Updates(model.ContentDeal{ LastMessage: err.Error(), Failed: true, // mark it as failed + UpdatedAt: time.Now(), }) i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) return err } @@ -285,6 +307,7 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&deal).Where("id = ?", deal.ID).Updates(model.ContentDeal{ LastMessage: err.Error(), Failed: true, // mark it as failed + UpdatedAt: time.Now(), }) i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", @@ -296,10 +319,12 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&deal).Where("id = ?", deal.ID).Updates(model.ContentDeal{ LastMessage: err.Error(), Failed: true, // mark it as failed + UpdatedAt: time.Now(), }) i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) return err } @@ -312,6 +337,7 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) return err } @@ -320,10 +346,12 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&deal).Where("id = ?", deal.ID).Updates(model.ContentDeal{ LastMessage: err.Error(), Failed: true, // mark it as failed + UpdatedAt: time.Now(), }) i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) return err } @@ -332,6 +360,7 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&deal).Where("id = ?", deal.ID).Updates(model.ContentDeal{ LastMessage: err.Error(), Failed: true, // mark it as failed + UpdatedAt: time.Now(), }) i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", @@ -344,10 +373,12 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&deal).Where("id = ?", deal.ID).Updates(model.ContentDeal{ LastMessage: err.Error(), Failed: true, // mark it as failed + UpdatedAt: time.Now(), }) i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) return err } @@ -356,10 +387,12 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&deal).Where("id = ?", deal.ID).Updates(model.ContentDeal{ LastMessage: err.Error(), Failed: true, // mark it as failed + UpdatedAt: time.Now(), }) i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_FAILED, //"failed", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) // retry i.LightNode.Dispatcher.AddJobAndDispatch(NewStorageDealMakerProcessor(i.LightNode, *content, *pieceComm), 1) @@ -375,6 +408,7 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec i.LightNode.DB.Model(&content).Where("id = ?", content.ID).Updates(model.Content{ Status: utils.CONTENT_DEAL_PROPOSAL_SENT, //"deal-proposal-sent", LastMessage: err.Error(), + UpdatedAt: time.Now(), }) } @@ -394,6 +428,11 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec pieceComm.Status = utils.COMMP_STATUS_COMITTED //"committed" content.Status = utils.DEAL_STATUS_TRANSFER_STARTED //"transfer-started" deal.LastMessage = utils.DEAL_STATUS_TRANSFER_STARTED //"transfer-started" + + pieceComm.UpdatedAt = time.Now() + content.UpdatedAt = time.Now() + deal.UpdatedAt = time.Now() + deal.DTChan = channelId.String() i.LightNode.DB.Transaction(func(tx *gorm.DB) error { tx.Model(&model.PieceCommitment{}).Where("id = ?", pieceComm.ID).Save(pieceComm) @@ -407,6 +446,11 @@ func (i *StorageDealMakerProcessor) makeStorageDeal(content *model.Content, piec pieceComm.Status = utils.COMMP_STATUS_COMITTED //"committed" content.Status = utils.CONTENT_DEAL_PROPOSAL_SENT deal.LastMessage = utils.CONTENT_DEAL_PROPOSAL_SENT + + pieceComm.UpdatedAt = time.Now() + content.UpdatedAt = time.Now() + deal.UpdatedAt = time.Now() + i.LightNode.DB.Transaction(func(tx *gorm.DB) error { tx.Model(&model.PieceCommitment{}).Where("id = ?", pieceComm.ID).Save(pieceComm) tx.Model(&model.Content{}).Where("id = ?", content.ID).Save(content) @@ -434,7 +478,6 @@ func (i *StorageDealMakerProcessor) GetAssignedMinerForContent(content model.Con return i.GetStorageProviders()[0] } -// Getting the content deal proposal parameters for a given content. // Getting the content deal proposal parameters for a given content. func (i *StorageDealMakerProcessor) GetDealProposalForContent(content model.Content) model.ContentDealProposalParameters { var contentDealProposalParameters model.ContentDealProposalParameters @@ -442,7 +485,7 @@ func (i *StorageDealMakerProcessor) GetDealProposalForContent(content model.Cont return contentDealProposalParameters } -// *|CURSOR_MARCADOR|* +// Creating a new filclient for the content. func (i *StorageDealMakerProcessor) GetAssignedFilclientForContent(content model.Content) (*fc.FilClient, error) { api, _, err := core.LotusConnection(utils.LOTUS_API) if err != nil { diff --git a/metrics/metrics.go b/metrics/metrics.go index 43032b6..be5b831 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -36,11 +36,14 @@ func Exporter() http.Handler { return exporter } +// It takes a URL parameter, and then serves the corresponding profile +// It takes a URL parameter, and if it's one of the supported profiling types, it serves the profile data func ServeProfile(c echo.Context) error { httpprof.Handler(c.Param("prof")).ServeHTTP(c.Response().Writer, c.Request()) return nil } +// It writes the stack traces of all goroutines to the given writer func WriteAllGoroutineStacks(w io.Writer) error { buf := make([]byte, 64<<20) for i := 0; ; i++ {