From cdbaff1ed3450a75f7f40e4ae2744435e3cee353 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Wed, 21 Aug 2024 18:20:44 +0300 Subject: [PATCH] services: add new service for fetching blocks from NeoFS Close #3496 Signed-off-by: Ekaterina Pavlova --- cli/server/dump_bin.go | 88 ++++ cli/server/server.go | 7 + config/protocol.testnet.yml | 16 + go.mod | 2 +- pkg/config/application_config.go | 21 +- pkg/config/blockfetcher_config.go | 35 ++ pkg/network/server.go | 23 +- pkg/network/server_config.go | 40 +- pkg/services/blockfetcher/blockfetcher.go | 381 ++++++++++++++++++ .../blockfetcher/blockfetcher_test.go | 75 ++++ pkg/services/oracle/neofs/neofs.go | 79 +++- 11 files changed, 727 insertions(+), 40 deletions(-) create mode 100644 cli/server/dump_bin.go create mode 100644 pkg/config/blockfetcher_config.go create mode 100644 pkg/services/blockfetcher/blockfetcher.go create mode 100644 pkg/services/blockfetcher/blockfetcher_test.go diff --git a/cli/server/dump_bin.go b/cli/server/dump_bin.go new file mode 100644 index 0000000000..04628d2c89 --- /dev/null +++ b/cli/server/dump_bin.go @@ -0,0 +1,88 @@ +package server + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/nspcc-dev/neo-go/cli/cmdargs" + "github.com/nspcc-dev/neo-go/cli/options" + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/urfave/cli/v2" +) + +func dumpBin(ctx *cli.Context) error { + if err := cmdargs.EnsureNone(ctx); err != nil { + return err + } + cfg, err := options.GetConfigFromContext(ctx) + if err != nil { + return cli.Exit(err, 1) + } + log, _, logCloser, err := options.HandleLoggingParams(ctx.Bool("debug"), cfg.ApplicationConfiguration) + if err != nil { + return cli.Exit(err, 1) + } + if logCloser != nil { + defer func() { _ = logCloser() }() + } + count := uint32(ctx.Uint("count")) + start := uint32(ctx.Uint("start")) + + chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) + if err != nil { + return err + } + defer func() { + pprof.ShutDown() + prometheus.ShutDown() + chain.Close() + }() + + blocksCount := chain.BlockHeight() + 1 + if start+count > blocksCount { + return cli.Exit(fmt.Errorf("chain is not that high (%d) to dump %d blocks starting from %d", blocksCount-1, count, start), 1) + } + if count == 0 { + count = blocksCount - start + } + + out := ctx.String("out") + if out == "" { + return cli.Exit("output directory is not specified", 1) + } + if _, err = os.Stat(out); os.IsNotExist(err) { + if err = os.MkdirAll(out, os.ModePerm); err != nil { + return cli.Exit(fmt.Sprintf("failed to create directory %s: %v", out, err), 1) + } + } + + for i := start; i < start+count; i++ { + bh := chain.GetHeaderHash(i) + blk, err := chain.GetBlock(bh) + if err != nil { + return cli.Exit(fmt.Sprintf("failed to get block %d: %s", i, err), 1) + } + filePath := filepath.Join(out, fmt.Sprintf("block-%d.bin", i)) + if err = saveBlockToFile(blk, filePath); err != nil { + return cli.Exit(fmt.Sprintf("failed to save block %d to file %s: %s", i, filePath, err), 1) + } + } + return nil +} + +func saveBlockToFile(blk *block.Block, filePath string) error { + file, err := os.Create(filePath) + if err != nil { + return err + } + defer file.Close() + + writer := io.NewBinWriterFromIO(file) + blk.EncodeBinary(writer) + if writer.Err != nil { + return writer.Err + } + return nil +} diff --git a/cli/server/server.go b/cli/server/server.go index 67787ced9b..fe8394d19a 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -105,6 +105,13 @@ func NewCommands() []*cli.Command { Action: dumpDB, Flags: cfgCountOutFlags, }, + { + Name: "dump-bin", + Usage: "Dump blocks (starting with the genesis or specified block) to the directory in binary format", + UsageText: "neo-go db dump-bin -o directory [-s start] [-c count] [--config-path path] [-p/-m/-t] [--config-file file]", + Action: dumpBin, + Flags: cfgCountOutFlags, + }, { Name: "restore", Usage: "Restore blocks from the file", diff --git a/config/protocol.testnet.yml b/config/protocol.testnet.yml index bdab3cd20d..d1c8e1edb3 100644 --- a/config/protocol.testnet.yml +++ b/config/protocol.testnet.yml @@ -100,3 +100,19 @@ ApplicationConfiguration: Enabled: false Addresses: - ":2113" +# NeoFSBlockFetcher: +# Enabled: true +# UnlockWallet: +# Path: "/notary_wallet.json" +# Password: "pass" +# Addresses: +# - st1.t5.fs.neo.org:8080 +# Timeout: 10s +# ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG" +# Mode: "indexSearch" +# BatchSize: 100 +# BlockAttribute: "index_block_2" +# OidAttribute: "block_oids" +# HeaderAttribute: "index_header" + + diff --git a/go.mod b/go.mod index 73d0953100..a6d73c5ee3 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( golang.org/x/term v0.18.0 golang.org/x/text v0.14.0 golang.org/x/tools v0.19.0 + google.golang.org/grpc v1.62.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -67,7 +68,6 @@ require ( golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect - google.golang.org/grpc v1.62.0 // indirect google.golang.org/protobuf v1.33.0 // indirect rsc.io/tmplfunc v0.0.3 // indirect ) diff --git a/pkg/config/application_config.go b/pkg/config/application_config.go index 2e94961d7b..7834b92470 100644 --- a/pkg/config/application_config.go +++ b/pkg/config/application_config.go @@ -23,12 +23,13 @@ type ApplicationConfiguration struct { Pprof BasicService `yaml:"Pprof"` Prometheus BasicService `yaml:"Prometheus"` - Relay bool `yaml:"Relay"` - Consensus Consensus `yaml:"Consensus"` - RPC RPC `yaml:"RPC"` - Oracle OracleConfiguration `yaml:"Oracle"` - P2PNotary P2PNotary `yaml:"P2PNotary"` - StateRoot StateRoot `yaml:"StateRoot"` + Relay bool `yaml:"Relay"` + Consensus Consensus `yaml:"Consensus"` + RPC RPC `yaml:"RPC"` + Oracle OracleConfiguration `yaml:"Oracle"` + P2PNotary P2PNotary `yaml:"P2PNotary"` + StateRoot StateRoot `yaml:"StateRoot"` + NeoFSBlockFetcher NeoFSBlockFetcher `yaml:"NeoFSBlockFetcher"` } // EqualsButServices returns true when the o is the same as a except for services @@ -145,3 +146,11 @@ func (a *ApplicationConfiguration) GetAddresses() ([]AnnounceableAddress, error) } return addrs, nil } + +// Validate checks the configuration for correctness. +func (a *ApplicationConfiguration) Validate() error { + if err := a.NeoFSBlockFetcher.Validate(); err != nil { + return err + } + return nil +} diff --git a/pkg/config/blockfetcher_config.go b/pkg/config/blockfetcher_config.go new file mode 100644 index 0000000000..b694ee9a50 --- /dev/null +++ b/pkg/config/blockfetcher_config.go @@ -0,0 +1,35 @@ +package config + +import ( + "errors" + "fmt" + "time" + + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" +) + +// NeoFSBlockFetcher represents the configuration for the blockfetcher service. +type NeoFSBlockFetcher struct { + InternalService `yaml:",inline"` + Timeout time.Duration `yaml:"Timeout"` + ContainerID string `yaml:"ContainerID"` + Mode string `yaml:"Mode"` + Addresses []string `yaml:"Addresses"` + BatchSize int `yaml:"BatchSize"` + BlockAttribute string `yaml:"BlockAttribute"` + OidAttribute string `yaml:"OidAttribute"` + HeaderAttribute string `yaml:"HeaderAttribute"` +} + +// Validate checks the configuration for the blockfetcher service. +func (f NeoFSBlockFetcher) Validate() error { + if f.ContainerID == "" { + return errors.New("container ID is not set") + } + var containerID cid.ID + err := containerID.DecodeString(f.ContainerID) + if err != nil { + return fmt.Errorf("invalid container ID: %w", err) + } + return nil +} diff --git a/pkg/network/server.go b/pkg/network/server.go index 34da851b52..e7cd6c79e8 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -28,6 +28,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/services/blockfetcher" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/zap" ) @@ -103,10 +104,12 @@ type ( chain Ledger bQueue *bqueue.Queue bSyncQueue *bqueue.Queue + bFetcherQueue *bqueue.Queue mempool *mempool.Pool notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer + blockFetcher *blockfetcher.Service serviceLock sync.RWMutex services map[string]Service @@ -220,6 +223,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy s.bSyncQueue = bqueue.New(s.stateSync, log, nil, updateBlockQueueLenMetric) + s.bFetcherQueue = bqueue.New(chain, log, nil, updateBlockQueueLenMetric) + s.blockFetcher = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, s.bFetcherQueue, log) + if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", zap.Int("configured", s.MinPeers), @@ -295,6 +301,10 @@ func (s *Server) Start() { go s.relayBlocksLoop() go s.bQueue.Run() go s.bSyncQueue.Run() + if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { + go s.bFetcherQueue.Run() + s.blockFetcher.Start() + } for _, tr := range s.transports { go tr.Accept() } @@ -319,6 +329,7 @@ func (s *Server) Shutdown() { } s.bQueue.Discard() s.bSyncQueue.Discard() + s.bFetcherQueue.Discard() s.serviceLock.RLock() for _, svc := range s.services { svc.Shutdown() @@ -706,7 +717,7 @@ func (s *Server) IsInSync() bool { var peersNumber int var notHigher int - if s.stateSync.IsActive() { + if s.stateSync.IsActive() || s.blockFetcher.IsActive() { return false } @@ -766,6 +777,9 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { // handleBlockCmd processes the block received from its peer. func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { + if s.blockFetcher.IsActive() { + return nil + } if s.stateSync.IsActive() { return s.bSyncQueue.PutBlock(block) } @@ -786,6 +800,9 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error { } func (s *Server) requestBlocksOrHeaders(p Peer) error { + if s.blockFetcher.IsActive() { + return nil + } if s.stateSync.NeedHeaders() { if s.chain.HeaderHeight() < p.LastBlockIndex() { return s.requestHeaders(p) @@ -1434,6 +1451,10 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { } func (s *Server) tryInitStateSync() { + if s.blockFetcher.IsActive() { + s.log.Info("Postponing StateSync until BlockFetcher completes") + return + } if !s.stateSync.IsActive() { s.bSyncQueue.Discard() return diff --git a/pkg/network/server_config.go b/pkg/network/server_config.go index c0f1e727f6..2e3fb141a4 100644 --- a/pkg/network/server_config.go +++ b/pkg/network/server_config.go @@ -76,6 +76,9 @@ type ( // BroadcastFactor is the factor (0-100) for fan-out optimization. BroadcastFactor int + + // NeoFSBlockFetcherCfg is the configuration for the blockfetcher service. + NeoFSBlockFetcherCfg config.NeoFSBlockFetcher } ) @@ -89,24 +92,25 @@ func NewServerConfig(cfg config.Config) (ServerConfig, error) { return ServerConfig{}, fmt.Errorf("failed to parse addresses: %w", err) } c := ServerConfig{ - UserAgent: cfg.GenerateUserAgent(), - Addresses: addrs, - Net: protoConfig.Magic, - Relay: appConfig.Relay, - Seeds: protoConfig.SeedList, - DialTimeout: appConfig.P2P.DialTimeout, - ProtoTickInterval: appConfig.P2P.ProtoTickInterval, - PingInterval: appConfig.P2P.PingInterval, - PingTimeout: appConfig.P2P.PingTimeout, - MaxPeers: appConfig.P2P.MaxPeers, - AttemptConnPeers: appConfig.P2P.AttemptConnPeers, - MinPeers: appConfig.P2P.MinPeers, - TimePerBlock: protoConfig.TimePerBlock, - OracleCfg: appConfig.Oracle, - P2PNotaryCfg: appConfig.P2PNotary, - StateRootCfg: appConfig.StateRoot, - ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize, - BroadcastFactor: appConfig.P2P.BroadcastFactor, + UserAgent: cfg.GenerateUserAgent(), + Addresses: addrs, + Net: protoConfig.Magic, + Relay: appConfig.Relay, + Seeds: protoConfig.SeedList, + DialTimeout: appConfig.P2P.DialTimeout, + ProtoTickInterval: appConfig.P2P.ProtoTickInterval, + PingInterval: appConfig.P2P.PingInterval, + PingTimeout: appConfig.P2P.PingTimeout, + MaxPeers: appConfig.P2P.MaxPeers, + AttemptConnPeers: appConfig.P2P.AttemptConnPeers, + MinPeers: appConfig.P2P.MinPeers, + TimePerBlock: protoConfig.TimePerBlock, + OracleCfg: appConfig.Oracle, + P2PNotaryCfg: appConfig.P2PNotary, + StateRootCfg: appConfig.StateRoot, + ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize, + BroadcastFactor: appConfig.P2P.BroadcastFactor, + NeoFSBlockFetcherCfg: appConfig.NeoFSBlockFetcher, } return c, nil } diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go new file mode 100644 index 0000000000..65291cc114 --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -0,0 +1,381 @@ +package blockfetcher + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "io" + "net/url" + "sync" + "sync/atomic" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/block" + gio "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/network/bqueue" + "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neo-go/pkg/wallet" + "github.com/nspcc-dev/neofs-sdk-go/client" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +const ( + ModeIndexSearch = "indexSearch" + ModeOidSearch = "oidSearch" + + oidSize = sha256.Size + batchSize = 2000 +) + +// Ledger is an interface to Blockchain sufficient for Service. +type Ledger interface { + AddBlock(block *block.Block) error + GetConfig() config.Blockchain + BlockHeight() uint32 + AddHeaders(...*block.Header) error + HeaderHeight() uint32 +} + +// Service is a service that fetches blocks from NeoFS. +type Service struct { + chain Ledger + client *client.Client + log *zap.Logger + quit chan bool + bFetcherExitCh chan struct{} + cfg config.NeoFSBlockFetcher + queue *bqueue.Queue + account *wallet.Account + started atomic.Bool + stateRootInHeader bool +} + +// New creates a new BlockFetcherService. +func New(chain Ledger, cfg config.NeoFSBlockFetcher, queue *bqueue.Queue, logger *zap.Logger) *Service { + var account *wallet.Account + if cfg.UnlockWallet.Path != "" { + walletFromFile, err := wallet.NewWalletFromFile(cfg.UnlockWallet.Path) + if err != nil { + logger.Error("Failed to load wallet from file", zap.Error(err)) + } + for _, acc := range walletFromFile.Accounts { + if err := acc.Decrypt(cfg.UnlockWallet.Password, walletFromFile.Scrypt); err == nil { + account = acc + break + } + } + if account == nil { + logger.Info("no wallet account could be unlocked, creating a new one") + err = walletFromFile.CreateAccount("blockfetcher", cfg.UnlockWallet.Password) + if err != nil { + return nil + } + account = walletFromFile.Accounts[len(walletFromFile.Accounts)-1] + } + } + + return &Service{ + chain: chain, + log: logger, + quit: make(chan bool), + bFetcherExitCh: make(chan struct{}), + cfg: cfg, + queue: queue, + account: account, + stateRootInHeader: chain.GetConfig().StateRootInHeader, + } +} + +// Start implements the Service interface. +func (bfs *Service) Start() { + if bfs.started.CompareAndSwap(false, true) == false { + return + } + bfs.log.Info("starting NeoFS block fetcher service") + var err error + bfs.client, err = neofs.GetSDKClient(context.Background(), bfs.cfg.Addresses[0]) + if err != nil { + bfs.log.Error("Failed to create SDK client", zap.Error(err)) + return + } + go bfs.start() +} + +func (bfs *Service) start() { + defer close(bfs.bFetcherExitCh) + + var err error + switch bfs.cfg.Mode { + case ModeIndexSearch: + err = bfs.fetchData() + case ModeOidSearch: + err = bfs.fetchDataWithOid() + default: + bfs.log.Error("Invalid mode specified", zap.String("mode", bfs.cfg.Mode)) + return + } + + if err != nil { + bfs.log.Error("Fetch operation failed", zap.Error(err)) + close(bfs.quit) + return + } +} + +// Shutdown implements the Service interface. +func (bfs *Service) Shutdown() { + if bfs.started.CompareAndSwap(true, false) { + bfs.log.Info("Shutting down Block Fetcher Service") + bfs.client.Close() + close(bfs.quit) + <-bfs.bFetcherExitCh + _ = bfs.log.Sync() + } +} + +// IsActive implements the Service interface. +func (bfs *Service) IsActive() bool { + return bfs.started.Load() +} + +func (bfs *Service) fetchData() error { + startIndex := bfs.chain.BlockHeight() + batchSize := uint32(bfs.cfg.BatchSize) + + for { + select { + case <-bfs.quit: + bfs.log.Info("Stopping fetchData operation.") + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE) + filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE) + prm.SetFilters(filters) + + blockOids, err := bfs.search(prm) + bfs.log.Info(fmt.Sprintf("Found %d blocks from index %d to %d", len(blockOids), startIndex, startIndex+batchSize-1)) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to find block with index %d", startIndex), zap.Error(err)) + return err + } + + if len(blockOids) == 0 { + bfs.log.Info(fmt.Sprintf("No block found with index %d, stopping.", startIndex)) + return errors.New("no block found") + } + err = bfs.fetchAndProcessBlocks(blockOids) + if err != nil { + return err + } + startIndex += batchSize + } + } +} + +func (bfs *Service) fetchAndProcessBlocks(blockOids []oid.ID) error { + var wg sync.WaitGroup + + for _, oidBlock := range blockOids { + wg.Add(1) + go func(oidBlock oid.ID) { + defer wg.Done() + + rc, err := bfs.get(oidBlock.String()) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to fetch block %s", oidBlock.String()), zap.Error(err)) + return + } + + bfs.log.Info(fmt.Sprintf("Fetched block with OID %s", oidBlock.String())) + + err = bfs.processBlock(rc) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to process block with OID %s", oidBlock.String()), zap.Error(err)) + } + }(oidBlock) + } + + wg.Wait() + return nil +} + +func (bfs *Service) fetchDataWithOid() error { + startIndex := bfs.chain.BlockHeight() / 2000 + skip := bfs.chain.BlockHeight() % 2000 + + for { + select { + case <-bfs.quit: + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(bfs.cfg.OidAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual) + prm.SetFilters(filters) + + blockOidsObject, err := bfs.search(prm) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to find 'block_oids' object with index %d", startIndex), zap.Error(err)) + return err + } + if len(blockOidsObject) == 0 { + bfs.log.Info(fmt.Sprintf("No 'block_oids' object found with index %d, stopping.", startIndex)) + return errors.New("no 'block_oids' object found") + } + + blockOidsData, err := bfs.get(blockOidsObject[0].String()) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to fetch 'block_oids' object %d", startIndex), zap.Error(err)) + return err + } + blockOIDs, err := parseBlockOIDs(blockOidsData) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to parse 'block_oids' object with index %d", startIndex), zap.Error(err)) + return err + } + + if len(blockOIDs) == 0 { + bfs.log.Info(fmt.Sprintf("No block found with index %d, stopping.", startIndex)) + return errors.New("no block found") + } + + blockOIDs = blockOIDs[skip:] + + for len(blockOIDs) > 0 { + batch := blockOIDs + if len(blockOIDs) > bfs.cfg.BatchSize { + batch = blockOIDs[:bfs.cfg.BatchSize] + blockOIDs = blockOIDs[bfs.cfg.BatchSize:] + } else { + blockOIDs = nil + } + + err = bfs.fetchAndProcessBlocks(batch) + if err != nil { + return err + } + } + + startIndex++ + skip = 0 + } + } +} + +func (bfs *Service) processBlock(rc io.ReadCloser) error { + b := block.New(bfs.stateRootInHeader) + r := gio.NewBinReaderFromIO(rc) + b.DecodeBinary(r) + rc.Close() + err := bfs.queue.PutBlock(b) + if err != nil { + bfs.log.Error(fmt.Sprintf("failed to queue block index: %d", b.Index), zap.Error(err)) + return err + } + return nil +} + +func (bfs *Service) GetHeaders() error { + startIndex := bfs.chain.HeaderHeight() / 2000 + + for { + select { + case <-bfs.quit: + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(bfs.cfg.HeaderAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE) + filters.AddFilter(bfs.cfg.HeaderAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumLE) + prm.SetFilters(filters) + + headerOids, err := bfs.search(prm) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to find 'index_header' object with index %d", startIndex), zap.Error(err)) + return err + } + + if len(headerOids) == 0 { + bfs.log.Info(fmt.Sprintf("No 'index_header' object found with index %d, stopping.", startIndex)) + return errors.New("no 'index_header' object found") + } + + rc, err := bfs.get(headerOids[0].String()) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to fetch 'index_header' object with index %d", startIndex), zap.Error(err)) + return err + } + + err = bfs.processHeaders(rc) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to process headers for index %d", startIndex), zap.Error(err)) + return err + } + startIndex++ + } + } +} + +func (bfs *Service) processHeaders(rc io.ReadCloser) error { + defer rc.Close() + var resHeader payload.Headers + br := gio.NewBinReaderFromIO(rc) + resHeader.DecodeBinary(br) + err := bfs.chain.AddHeaders(resHeader.Hdrs...) + if err != nil { + return err + } + return nil +} + +func (bfs *Service) get(oid string) (io.ReadCloser, error) { + ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + defer cancel() + u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.cfg.ContainerID, oid)) + if err != nil { + return nil, err + } + rc, err := neofs.GetWithClient(ctx, bfs.client, bfs.account.PrivateKey(), u, bfs.cfg.Addresses[0]) + if err != nil { + return nil, err + } + + return rc, nil +} + +func (bfs *Service) search(prm client.PrmObjectSearch) ([]oid.ID, error) { + ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + defer cancel() + return neofs.ObjectSearch(ctx, bfs.client, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm) +} + +func parseBlockOIDs(rc io.ReadCloser) ([]oid.ID, error) { + defer rc.Close() + data, err := io.ReadAll(rc) + if err != nil { + return nil, fmt.Errorf("failed to read data: %w", err) + } + oids := make([]oid.ID, 0, batchSize) + + if len(data)%oidSize != 0 { + return nil, fmt.Errorf("invalid data length: not a multiple of oid size") + } + + for i := 0; i < len(data); i += oidSize { + oidBytes := data[i : i+oidSize] + var oidBlock oid.ID + err := oidBlock.Decode(oidBytes) + if err != nil { + return nil, fmt.Errorf("failed to decode OID: %w", err) + } + oids = append(oids, oidBlock) + } + + return oids, nil +} diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go new file mode 100644 index 0000000000..473d0e25f2 --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -0,0 +1,75 @@ +package blockfetcher_test + +import ( + "context" + "fmt" + "io" + "net/url" + "testing" + "time" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neofs-sdk-go/client" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/stretchr/testify/require" +) + +func TestService(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", + InternalService: config.InternalService{ + Enabled: true, + UnlockWallet: config.Wallet{Password: "one", Path: "./"}, + }, + Addresses: []string{"st1.t5.fs.neo.org:8080"}, + Timeout: 15 * time.Second, + } + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + + filters.AddFilter("index_block", fmt.Sprintf("%d", 0), object.MatchStringEqual) + filters.AddFilter("index_block", fmt.Sprintf("%d", 0), object.MatchNumLE) + prm.SetFilters(filters) + + privateKey, err := keys.NewPrivateKey() + require.NoError(t, err) + + var containerID cid.ID + err = containerID.DecodeString(cfg.ContainerID) + require.NoError(t, err) + var ( + s = user.NewAutoIDSignerRFC6979(privateKey.PrivateKey) + objectIDs []oid.ID + ) + ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout) + defer cancel() + + neofsClient, err := neofs.GetSDKClient(ctx, + cfg.Addresses[0]) + require.NoError(t, err) + + reader, err := neofsClient.ObjectSearchInit(ctx, containerID, s, prm) + require.NoError(t, err) + defer reader.Close() + + err = reader.Iterate(func(oid oid.ID) bool { + objectIDs = append(objectIDs, oid) + return false + }) + require.NoError(t, err) + fmt.Println(objectIDs) + + oid := "3uHQb3SYPEhoxJigTtRALwhiha3nCzL7GsN6PGYMjwhT" + u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", containerID, oid)) + require.NoError(t, err) + rc, err := neofs.GetWithClient(ctx, neofsClient, privateKey, u, "") + require.NoError(t, err) + data, err := io.ReadAll(rc) + require.NoError(t, err) + fmt.Println(data) +} diff --git a/pkg/services/oracle/neofs/neofs.go b/pkg/services/oracle/neofs/neofs.go index 27351ba8b2..6abe4dabf1 100644 --- a/pkg/services/oracle/neofs/neofs.go +++ b/pkg/services/oracle/neofs/neofs.go @@ -17,6 +17,8 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -45,28 +47,26 @@ var ( // URI scheme is "neofs://". // If Command is not provided, full object is requested. func Get(ctx context.Context, priv *keys.PrivateKey, u *url.URL, addr string) (io.ReadCloser, error) { - objectAddr, ps, err := parseNeoFSURL(u) + c, err := GetSDKClient(ctx, addr) if err != nil { - return nil, err + return clientCloseWrapper{c: c}, fmt.Errorf("failed to create client: %w", err) } + return GetWithClient(ctx, c, priv, u, addr) +} - c, err := client.New(client.PrmInit{}) +// GetWithClient returns a neofs object from the provided url using the provided client. +// URI scheme is "neofs://". +// If Command is not provided, full object is requested. +func GetWithClient(ctx context.Context, c *client.Client, priv *keys.PrivateKey, u *url.URL, addr string) (io.ReadCloser, error) { + objectAddr, ps, err := parseNeoFSURL(u) if err != nil { - return nil, fmt.Errorf("failed to create client: %w", err) + return nil, err } - var ( - res = clientCloseWrapper{c: c} - prmd client.PrmDial + s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) + res = clientCloseWrapper{c: c} ) - prmd.SetServerURI(addr) - prmd.SetContext(ctx) - err = c.Dial(prmd) //nolint:contextcheck // contextcheck: Function `Dial->Balance->SendUnary->Init->setNeoFSAPIServer` should pass the context parameter - if err != nil { - return res, err - } - var s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) switch { case len(ps) == 0 || ps[0] == "": // Get request res.ReadCloser, err = getPayload(ctx, s, c, objectAddr) @@ -220,3 +220,54 @@ func parseRange(s string) (*object.Range, error) { r.SetLength(length) return r, nil } + +// ObjectSearch returns a list of object IDs from the provided container. +func ObjectSearch(ctx context.Context, c *client.Client, priv *keys.PrivateKey, containerIDStr string, prm client.PrmObjectSearch) ([]oid.ID, error) { + var ( + s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) + objectIDs []oid.ID + containerID cid.ID + ) + err := containerID.DecodeString(containerIDStr) + if err != nil { + return nil, fmt.Errorf("%w: %w", ErrInvalidContainer, err) + } + reader, err := c.ObjectSearchInit(ctx, containerID, s, prm) + if err != nil { + return nil, fmt.Errorf("failed to initiate object search: %w", err) + } + defer reader.Close() + + err = reader.Iterate(func(oid oid.ID) bool { + objectIDs = append(objectIDs, oid) + return false + }) + if err != nil { + return nil, fmt.Errorf("error during object ID iteration: %w", err) + } + return objectIDs, nil +} + +// GetSDKClient returns a NeoFS SDK client configured with the specified address and context. +func GetSDKClient(ctx context.Context, addr string) (*client.Client, error) { + var ( + prmDial client.PrmDial + ) + + prmDial.SetServerURI(addr) + prmDial.SetContext(ctx) + + c, err := client.New(client.PrmInit{}) + if err != nil { + return nil, fmt.Errorf("can't create SDK client: %w", err) + } + + if err := c.Dial(prmDial); err != nil { + if status.Code(err) == codes.Unimplemented { + return c, nil + } + return nil, fmt.Errorf("can't init SDK client: %w", err) + } + + return c, nil +}