diff --git a/cli/server/dump_bin.go b/cli/server/dump_bin.go new file mode 100644 index 0000000000..4d9c948bc6 --- /dev/null +++ b/cli/server/dump_bin.go @@ -0,0 +1,95 @@ +package server + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/nspcc-dev/neo-go/cli/cmdargs" + "github.com/nspcc-dev/neo-go/cli/options" + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/urfave/cli/v2" +) + +func dumpBin(ctx *cli.Context) error { + var ( + err error + ) + if err := cmdargs.EnsureNone(ctx); err != nil { + return err + } + cfg, err := options.GetConfigFromContext(ctx) + if err != nil { + return cli.Exit(err, 1) + } + log, _, logCloser, err := options.HandleLoggingParams(ctx.Bool("debug"), cfg.ApplicationConfiguration) + if err != nil { + return cli.Exit(err, 1) + } + if logCloser != nil { + defer func() { _ = logCloser() }() + } + count := uint32(ctx.Uint("count")) + start := uint32(ctx.Uint("start")) + + chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) + if err != nil { + return err + } + defer func() { + pprof.ShutDown() + prometheus.ShutDown() + chain.Close() + }() + + chainCount := chain.BlockHeight() + 1 + if start+count > chainCount { + return cli.Exit(fmt.Errorf("chain is not that high (%d) to dump %d blocks starting from %d", chainCount-1, count, start), 1) + } + if count == 0 { + count = chainCount - start + } + + testDir := "./test/" + if _, err = os.Stat(testDir); os.IsNotExist(err) { + if err = os.MkdirAll(testDir, 0755); err != nil { + return cli.Exit(fmt.Sprintf("failed to create directory %s: %v", testDir, err), 1) + } + } + + for i := start; i < start+count; i++ { + bh := chain.GetHeaderHash(i) + blk, err2 := chain.GetBlock(bh) + if err2 != nil { + return cli.Exit(fmt.Sprintf("failed to get block %d: %v", i, err), 1) + } + filePath := filepath.Join(testDir, fmt.Sprintf("block-%d.bin", i)) + if err = saveBlockToFile(blk, filePath); err != nil { + return cli.Exit(fmt.Sprintf("failed to save block %d to file: %v", i, err), 1) + } + } + + return nil +} + +func saveBlockToFile(blk *block.Block, filePath string) error { + file, err := os.Create(filePath) + if err != nil { + return err + } + defer file.Close() + + writer := io.NewBinWriterFromIO(file) + + var buf = io.NewBufBinWriter() + blk.EncodeBinary(buf.BinWriter) + bytes := buf.Bytes() + + writer.WriteU32LE(uint32(len(bytes))) + blk.EncodeBinary(writer) + if writer.Err != nil { + return writer.Err + } + return nil +} diff --git a/cli/server/server.go b/cli/server/server.go index 67787ced9b..d276c8224b 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -22,6 +22,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/network" + "github.com/nspcc-dev/neo-go/pkg/services/blockfetcher" "github.com/nspcc-dev/neo-go/pkg/services/metrics" "github.com/nspcc-dev/neo-go/pkg/services/notary" "github.com/nspcc-dev/neo-go/pkg/services/oracle" @@ -105,6 +106,13 @@ func NewCommands() []*cli.Command { Action: dumpDB, Flags: cfgCountOutFlags, }, + { + Name: "dump-bin", + Usage: "Dump blocks (starting with the genesis or specified block) to the file", + UsageText: "neo-go db dump-bin [-o file] [-s start] [-c count] [--config-path path] [-p/-m/-t] [--config-file file]", + Action: dumpBin, + Flags: cfgCountOutFlags, + }, { Name: "restore", Usage: "Restore blocks from the file", @@ -287,9 +295,9 @@ func restoreDB(ctx *cli.Context) error { gctx := newGraceContext() var lastIndex uint32 - dump := newDump() + dump := blockfetcher.NewDump() defer func() { - _ = dump.tryPersist(dumpDir, lastIndex) + _ = dump.TryPersist(dumpDir, lastIndex) }() var f = func(b *block.Block) error { @@ -312,10 +320,10 @@ func restoreDB(ctx *cli.Context) error { if batch == nil && b.Index == 0 { return nil } - dump.add(b.Index, batch) + dump.Add(b.Index, batch) lastIndex = b.Index if b.Index%1000 == 0 { - if err := dump.tryPersist(dumpDir, b.Index); err != nil { + if err := dump.TryPersist(dumpDir, b.Index); err != nil { return fmt.Errorf("can't dump storage to file: %w", err) } } diff --git a/config/protocol.testnet.yml b/config/protocol.testnet.yml index bdab3cd20d..13bb975d4e 100644 --- a/config/protocol.testnet.yml +++ b/config/protocol.testnet.yml @@ -100,3 +100,10 @@ ApplicationConfiguration: Enabled: false Addresses: - ":2113" + NeoFS: + Nodes: + - st1.t5.fs.neo.org:8080 + Timeout: 100s + ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG" + DumpDir: "./chains" + Restore: true diff --git a/go.mod b/go.mod index 73d0953100..a6d73c5ee3 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( golang.org/x/term v0.18.0 golang.org/x/text v0.14.0 golang.org/x/tools v0.19.0 + google.golang.org/grpc v1.62.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -67,7 +68,6 @@ require ( golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect - google.golang.org/grpc v1.62.0 // indirect google.golang.org/protobuf v1.33.0 // indirect rsc.io/tmplfunc v0.0.3 // indirect ) diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index a2553b61a1..5420bef155 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -405,6 +406,11 @@ func (chain *FakeChain) UnsubscribeFromTransactions(ch chan *transaction.Transac panic("TODO") } +// LastBatch returns last persisted storage batch. +func (chain *FakeChain) LastBatch() *storage.MemBatch { + panic("TODO") +} + // AddBlock implements the StateSync interface. func (s *FakeStateSync) AddBlock(block *block.Block) error { panic("TODO") diff --git a/pkg/config/application_config.go b/pkg/config/application_config.go index 2e94961d7b..75bb2e9bb3 100644 --- a/pkg/config/application_config.go +++ b/pkg/config/application_config.go @@ -29,6 +29,7 @@ type ApplicationConfiguration struct { Oracle OracleConfiguration `yaml:"Oracle"` P2PNotary P2PNotary `yaml:"P2PNotary"` StateRoot StateRoot `yaml:"StateRoot"` + NeoFS NeoFS `yaml:"NeoFS"` } // EqualsButServices returns true when the o is the same as a except for services diff --git a/pkg/config/neofs.go b/pkg/config/neofs.go new file mode 100644 index 0000000000..1d8f97f619 --- /dev/null +++ b/pkg/config/neofs.go @@ -0,0 +1,14 @@ +package config + +import "time" + +// NeoFS represents the configuration for the blockfetcher service. +type ( + NeoFS struct { + Nodes []string `yaml:"Nodes"` + Timeout time.Duration `yaml:"Timeout"` + ContainerID string `yaml:"ContainerID"` + DumpDir string `yaml:"DumpDir"` + Restore bool `yaml:"Restore"` + } +) diff --git a/pkg/network/server.go b/pkg/network/server.go index 34da851b52..65158b3705 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -21,6 +21,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/core/mpt" + "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/io" @@ -28,6 +29,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/services/blockfetcher" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/zap" ) @@ -77,6 +79,7 @@ type ( RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block)) SubscribeForBlocks(ch chan *block.Block) UnsubscribeFromBlocks(ch chan *block.Block) + LastBatch() *storage.MemBatch } // Service is a service abstraction (oracle, state root, consensus, etc). @@ -107,6 +110,7 @@ type ( notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer + blockFetcher *blockfetcher.Service serviceLock sync.RWMutex services map[string]Service @@ -220,6 +224,8 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy s.bSyncQueue = bqueue.New(s.stateSync, log, nil, updateBlockQueueLenMetric) + s.blockFetcher = blockfetcher.New(chain, s.NeoFSCfg, log) + if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", zap.Int("configured", s.MinPeers), @@ -295,6 +301,15 @@ func (s *Server) Start() { go s.relayBlocksLoop() go s.bQueue.Run() go s.bSyncQueue.Run() + if s.ServerConfig.NeoFSCfg.Restore { + done := make(chan struct{}) + s.blockFetcher.Start( + func() { + s.log.Info("BlockFetcher service finished") + close(done) + }) + <-done + } for _, tr := range s.transports { go tr.Accept() } diff --git a/pkg/network/server_config.go b/pkg/network/server_config.go index c0f1e727f6..2da10968e0 100644 --- a/pkg/network/server_config.go +++ b/pkg/network/server_config.go @@ -76,6 +76,9 @@ type ( // BroadcastFactor is the factor (0-100) for fan-out optimization. BroadcastFactor int + + // NeoFSCfg is NeoFS configuration. + NeoFSCfg config.NeoFS } ) @@ -107,6 +110,7 @@ func NewServerConfig(cfg config.Config) (ServerConfig, error) { StateRootCfg: appConfig.StateRoot, ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize, BroadcastFactor: appConfig.P2P.BroadcastFactor, + NeoFSCfg: appConfig.NeoFS, } return c, nil } diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go new file mode 100644 index 0000000000..56c97ebefe --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -0,0 +1,281 @@ +package blockfetcher + +import ( + "context" + "fmt" + "io" + "net/url" + "os" + "strconv" + "time" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/chaindump" + "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + gio "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neofs-sdk-go/client" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +// Ledger is an interface to Blockchain sufficient for Service. +type Ledger interface { + LastBatch() *storage.MemBatch + AddBlock(block *block.Block) error + GetBlock(hash util.Uint256) (*block.Block, error) + GetConfig() config.Blockchain + GetHeaderHash(u uint32) util.Uint256 + BlockHeight() uint32 +} + +// Service is a service that fetches blocks from NeoFS. +type Service struct { + chain Ledger + client *client.Client + log *zap.Logger + quit chan bool + containerID cid.ID + Timeout time.Duration + Nodes []string + dumpDir string +} + +// New creates a new BlockFetcherService. +func New(chain Ledger, cfg config.NeoFS, logger *zap.Logger) *Service { + var containerID cid.ID + err := containerID.DecodeString(cfg.ContainerID) + if err != nil { + logger.Error("Failed to decode container ID", zap.Error(err)) + return nil + } + return &Service{ + chain: chain, + log: logger, + quit: make(chan bool), + dumpDir: cfg.DumpDir, + containerID: containerID, + Nodes: cfg.Nodes, + Timeout: cfg.Timeout, + } +} + +// Name implements the core.Service interface. +func (bfs *Service) Name() string { + return "BlockFetcherService" +} + +// Start implements the core.Service interface. +func (bfs *Service) Start(done func()) { + bfs.log.Info("Starting Block Fetcher Service") + ctx, cancel := context.WithTimeout(context.Background(), bfs.Timeout) + defer cancel() + c, err := neofs.GetSDKClient(ctx, bfs.Nodes[0]) + if err != nil { + bfs.log.Error("Failed to create SDK client", zap.Error(err)) + close(bfs.quit) + done() + return + } + bfs.client = c + + err = bfs.fetchData() + //err = bfs.fetchData3() + //err = bfs.fetchDataWithOid() + if err != nil { + close(bfs.quit) + done() + return + } + done() +} + +// Shutdown implements the core.Service interface. +func (bfs *Service) Shutdown() { + bfs.log.Info("Shutting down Block Fetcher Service") + close(bfs.quit) +} + +func (bfs *Service) fetchData() error { + select { + case <-bfs.quit: + return nil + default: + startIndex := bfs.chain.BlockHeight() + for { + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter("index_block_2", fmt.Sprintf("%d", startIndex), object.MatchStringEqual) + prm.SetFilters(filters) + + blockOids, err := bfs.search(prm) + bfs.log.Info(fmt.Sprintf("Found %d blocks with index %d", len(blockOids), startIndex)) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to find block with index %d", startIndex), zap.Error(err)) + return err + } + + if len(blockOids) == 0 { + bfs.log.Info(fmt.Sprintf("No block found with index %d, stopping.", startIndex)) + break + } + + data, err := bfs.get(blockOids[0].String()) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to fetch block %s", strconv.Itoa(int(startIndex))), zap.Error(err)) + return err + } + + err = bfs.ProcessBlock(data, 1) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to process block with index %d", startIndex), zap.Error(err)) + return err + } + + //if startIndex == 20 { + // break + //} + startIndex++ + } + } + close(bfs.quit) + return nil +} + +func (bfs *Service) ProcessBlock(data []byte, count uint32) error { + br := gio.NewBinReaderFromBuf(data) + dump := NewDump() + var lastIndex uint32 + + err := chaindump.Restore(bfs.chain, br, 0, count, func(b *block.Block) error { + batch := bfs.chain.LastBatch() + if batch != nil { + dump.Add(b.Index, batch) + lastIndex = b.Index + if b.Index%1000 == 0 { + if err := dump.TryPersist(bfs.dumpDir, lastIndex); err != nil { + return fmt.Errorf("can't dump storage to file: %w", err) + } + } + } + return nil + }) + + if err != nil { + return fmt.Errorf("failed to restore blocks: %w", err) + } + if err = dump.TryPersist(bfs.dumpDir, lastIndex); err != nil { + return fmt.Errorf("final persistence failed: %w", err) + } + return nil +} + +func (bfs *Service) get(oid string) ([]byte, error) { + privateKey, err := keys.NewPrivateKey() + if err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), bfs.Timeout) + defer cancel() + u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.containerID, oid)) + if err != nil { + return nil, err + } + rc, err := neofs.GetWithClient(ctx, bfs.client, privateKey, u, bfs.Nodes[0]) + if err != nil { + return nil, err + } + data, err := io.ReadAll(rc) + if err != nil { + return nil, err + } + return data, nil +} + +func (bfs *Service) search(prm client.PrmObjectSearch) ([]oid.ID, error) { + privateKey, err := keys.NewPrivateKey() + if err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), bfs.Timeout) + defer cancel() + if err != nil { + return nil, err + } + return neofs.ObjectSearch(ctx, bfs.client, privateKey, bfs.containerID, prm) +} + +func (bfs *Service) GenerateBlockOIDFiles() error { + startIndex := uint32(4000) + oids := make([]oid.ID, 0, 2000) + fileIndex := 2 + + for { + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter("index_block_2", fmt.Sprintf("%d", startIndex), object.MatchStringEqual) + prm.SetFilters(filters) + + blockOids, err := bfs.search(prm) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to find block with index %d", startIndex), zap.Error(err)) + return err + } + + if len(blockOids) == 0 { + bfs.log.Info(fmt.Sprintf("No more blocks found with index %d. Stopping.", startIndex)) + break + } + + oids = append(oids, blockOids[0]) + + if len(oids) >= 2000 { + err = bfs.writeOIDFile(oids[:2000], fileIndex) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to write OID file %d", fileIndex), zap.Error(err)) + return err + } + + oids = oids[2000:] + fileIndex++ + } + + startIndex++ + } + + if len(oids) > 0 { + err := bfs.writeOIDFile(oids, fileIndex) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to write final OID file %d", fileIndex), zap.Error(err)) + return err + } + } + + return nil +} + +func (bfs *Service) writeOIDFile(oids []oid.ID, fileIndex int) error { + fileName := fmt.Sprintf("./test/oid-%d.bin", fileIndex) + file, err := os.Create(fileName) + if err != nil { + return fmt.Errorf("failed to create file %s: %w", fileName, err) + } + defer file.Close() + + for _, oid := range oids { + oidBytes := make([]byte, 32) + oid.Encode(oidBytes) + _, err = file.Write(oidBytes) + if err != nil { + return fmt.Errorf("failed to write OID to file %s: %w", fileName, err) + } + } + + bfs.log.Info(fmt.Sprintf("Successfully wrote %d OIDs to %s", len(oids), fileName)) + return nil +} diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go new file mode 100644 index 0000000000..cf1f526102 --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -0,0 +1,108 @@ +package blockfetcher_test + +import ( + "context" + "fmt" + "io" + "net/url" + "testing" + "time" + + "github.com/nspcc-dev/neo-go/internal/basicchain" + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/chaindump" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + gio "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/neotest" + "github.com/nspcc-dev/neo-go/pkg/neotest/chain" + "github.com/nspcc-dev/neo-go/pkg/services/blockfetcher" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neofs-sdk-go/client" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +func TestProcessBlock(t *testing.T) { + bc, validators, committee := chain.NewMultiWithCustomConfig(t, + func(c *config.Blockchain) { + c.P2PSigExtensions = true + }) + e := neotest.NewExecutor(t, bc, validators, committee) + + basicchain.Init(t, "../../../", e) + require.True(t, bc.BlockHeight() > 5) + + w := gio.NewBufBinWriter() + require.NoError(t, chaindump.Dump(bc, w.BinWriter, 0, bc.BlockHeight()+1)) + require.NoError(t, w.Err) + buf := w.Bytes() + bc2, _, _ := chain.NewMultiWithCustomConfig(t, func(c *config.Blockchain) { + c.P2PSigExtensions = true + }) + cfg := config.NeoFS{ + ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", + Nodes: []string{"https://test"}, + Timeout: 10, + DumpDir: "./", + } + serv := blockfetcher.New(bc2, cfg, zaptest.NewLogger(t)) + err := serv.ProcessBlock(buf, bc.BlockHeight()+1) + require.NoError(t, err) + require.Equal(t, bc.BlockHeight(), bc2.BlockHeight()) +} + +func TestService(t *testing.T) { + cfg := config.NeoFS{ + ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", + Nodes: []string{"st1.t5.fs.neo.org:8080"}, + Timeout: 15 * time.Second, + DumpDir: "./", + } + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + + filters.AddFilter("index_block", fmt.Sprintf("%d", 0), object.MatchStringEqual) + filters.AddFilter("index_block", fmt.Sprintf("%d", 0), object.MatchNumLE) + prm.SetFilters(filters) + + privateKey, err := keys.NewPrivateKey() + require.NoError(t, err) + + var containerID cid.ID + err = containerID.DecodeString(cfg.ContainerID) + require.NoError(t, err) + var ( + s = user.NewAutoIDSignerRFC6979(privateKey.PrivateKey) + objectIDs []oid.ID + ) + ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout) + defer cancel() + + neofsClient, err := neofs.GetSDKClient(ctx, + cfg.Nodes[0]) + require.NoError(t, err) + + reader, err := neofsClient.ObjectSearchInit(ctx, containerID, s, prm) + require.NoError(t, err) + defer reader.Close() + + err = reader.Iterate(func(oid oid.ID) bool { + objectIDs = append(objectIDs, oid) + return false + }) + require.NoError(t, err) + fmt.Println(objectIDs) + + oid := "3uHQb3SYPEhoxJigTtRALwhiha3nCzL7GsN6PGYMjwhT" + u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", containerID, oid)) + require.NoError(t, err) + rc, err := neofs.GetWithClient(ctx, neofsClient, privateKey, u, cfg.Nodes[0]) + require.NoError(t, err) + data, err := io.ReadAll(rc) + require.NoError(t, err) + fmt.Println(data) +} diff --git a/pkg/services/blockfetcher/dump.go b/pkg/services/blockfetcher/dump.go new file mode 100644 index 0000000000..3fa51a76d3 --- /dev/null +++ b/pkg/services/blockfetcher/dump.go @@ -0,0 +1,106 @@ +package blockfetcher + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/core/storage/dboper" +) + +type Dump []blockDump + +type blockDump struct { + Block uint32 `json:"block"` + Size int `json:"size"` + Storage []dboper.Operation `json:"storage"` +} + +func (d *Dump) Size() int { + return len(*d) +} + +func NewDump() *Dump { + return new(Dump) +} + +func (d *Dump) Add(index uint32, batch *storage.MemBatch) { + ops := storage.BatchToOperations(batch) + *d = append(*d, blockDump{ + Block: index, + Size: len(ops), + Storage: ops, + }) +} + +func (d *Dump) TryPersist(prefix string, index uint32) error { + if len(*d) == 0 { + return nil + } + path, err := getPath(prefix, index) + if err != nil { + return err + } + old, err := readFile(path) + if err == nil { + *old = append(*old, *d...) + } else { + old = d + } + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + enc := json.NewEncoder(f) + enc.SetIndent("", " ") + if err := enc.Encode(*old); err != nil { + return err + } + + *d = (*d)[:0] + + return nil +} + +func readFile(path string) (*Dump, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + d := NewDump() + if err := json.Unmarshal(data, d); err != nil { + return nil, err + } + return d, err +} + +// getPath returns filename for storing blocks up to index. +// Directory structure is the following: +// https://github.com/NeoResearch/neo-storage-audit#folder-organization-where-to-find-the-desired-block +// Dir `BlockStorage_$DIRNO` contains blocks up to $DIRNO (from $DIRNO-100k) +// Inside it there are files grouped by 1k blocks. +// File dump-block-$FILENO.json contains blocks from $FILENO-999, $FILENO +// Example: file `BlockStorage_100000/dump-block-6000.json` contains blocks from 5001 to 6000. +func getPath(prefix string, index uint32) (string, error) { + dirN := ((index + 99999) / 100000) * 100000 + dir := fmt.Sprintf("BlockStorage_%d", dirN) + + path := filepath.Join(prefix, dir) + info, err := os.Stat(path) + if os.IsNotExist(err) { + err := os.MkdirAll(path, os.ModePerm) + if err != nil { + return "", err + } + } else if !info.IsDir() { + return "", fmt.Errorf("file `%s` is not a directory", path) + } + + fileN := ((index + 999) / 1000) * 1000 + file := fmt.Sprintf("dump-block-%d.json", fileN) + return filepath.Join(path, file), nil +} diff --git a/cli/server/dump_test.go b/pkg/services/blockfetcher/dump_test.go similarity index 96% rename from cli/server/dump_test.go rename to pkg/services/blockfetcher/dump_test.go index 74877e44b0..8e919d30b8 100644 --- a/cli/server/dump_test.go +++ b/pkg/services/blockfetcher/dump_test.go @@ -1,4 +1,4 @@ -package server +package blockfetcher import ( "path/filepath" diff --git a/pkg/services/blockfetcher/experiments.go b/pkg/services/blockfetcher/experiments.go new file mode 100644 index 0000000000..94dc397ef3 --- /dev/null +++ b/pkg/services/blockfetcher/experiments.go @@ -0,0 +1,229 @@ +package blockfetcher + +import ( + "fmt" + "strconv" + "sync" + + "github.com/nspcc-dev/neofs-sdk-go/client" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +func (bfs *Service) fetchData2() error { + startIndex := bfs.chain.BlockHeight() + select { + case <-bfs.quit: + return nil + default: + workers := uint32(150) + wg := sync.WaitGroup{} + oids := make([]oid.ID, workers) + res := make([][]byte, workers) + fmt.Println("Start index ", startIndex) + + for i := startIndex; i < startIndex+workers; i++ { + wg.Add(1) + go func(i uint32) { + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter("index_block_2", fmt.Sprintf("%d", i), object.MatchStringEqual) + prm.SetFilters(filters) + + blockOids, err := bfs.search(prm) + bfs.log.Info(fmt.Sprintf("Found %d blocks with index %d", len(blockOids), i)) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to find block with index %d", startIndex), zap.Error(err)) + } + + if len(blockOids) == 0 { + bfs.log.Info(fmt.Sprintf("No block found with index %d, stopping.", startIndex)) + } + oids[i-startIndex] = blockOids[0] + wg.Done() + }(i) + } + wg.Wait() + fmt.Println(oids) + wg2 := sync.WaitGroup{} + for j := range oids { + wg2.Add(1) + go func(j int) { + data, err := bfs.get(oids[j].String()) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to fetch block %s", strconv.Itoa(int(startIndex))), zap.Error(err)) + } + res[j] = data + fmt.Println("Block fetched ", j) + wg2.Done() + }(j) + } + wg2.Wait() + err := bfs.ProcessBlock(concatenateBatch(res), workers) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to process block with index %d", startIndex), zap.Error(err)) + return err + } + close(bfs.quit) + return nil + } +} + +func (bfs *Service) fetchData3() error { + startIndex := bfs.chain.BlockHeight() + workers := 10 + batchSize := 100 + + oidsChan := make(chan struct { + index uint32 + oid oid.ID + }, workers*2) + resultsChan := make(chan struct { + index uint32 + data []byte + }, workers*2) + errChan := make(chan error, 1) + + searchJobChan := make(chan uint32, workers) + fetchJobChan := make(chan struct { + index uint32 + oid oid.ID + }, workers) + + var wgSearch, wgFetch, wgProcess sync.WaitGroup + + for w := 0; w < workers; w++ { + wgSearch.Add(1) + go func() { + defer wgSearch.Done() + for index := range searchJobChan { + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter("index_block_2", fmt.Sprintf("%d", index), object.MatchStringEqual) + prm.SetFilters(filters) + + blockOids, err := bfs.search(prm) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to find block with index %d", index), zap.Error(err)) + errChan <- err + return + } + + if len(blockOids) > 0 { + oidsChan <- struct { + index uint32 + oid oid.ID + }{index: index, oid: blockOids[0]} + } else { + errChan <- nil + return + } + } + }() + } + + for w := 0; w < workers; w++ { + wgFetch.Add(1) + go func() { + defer wgFetch.Done() + for job := range fetchJobChan { + data, err := bfs.get(job.oid.String()) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to fetch block %s", job.oid.String()), zap.Error(err)) + errChan <- err + return + } + resultsChan <- struct { + index uint32 + data []byte + }{index: job.index, data: data} + } + }() + } + + go func() { + currentIndex := startIndex + for { + searchJobChan <- currentIndex + currentIndex++ + } + }() + + go func() { + for job := range oidsChan { + fetchJobChan <- job + } + close(fetchJobChan) + wgFetch.Wait() + close(resultsChan) + }() + + wgProcess.Add(1) + go func() { + defer wgProcess.Done() + batch := make([][]byte, 0, batchSize) + expectedIndex := startIndex + buffer := make(map[uint32][]byte) + + for result := range resultsChan { + if result.index == expectedIndex { + batch = append(batch, result.data) + expectedIndex++ + + for { + if bufferedData, exists := buffer[expectedIndex]; exists { + batch = append(batch, bufferedData) + delete(buffer, expectedIndex) + expectedIndex++ + } else { + break + } + + if len(batch) >= batchSize { + if err := bfs.ProcessBlock(concatenateBatch(batch), uint32(len(batch))); err != nil { + bfs.log.Error(fmt.Sprintf("Failed to process batch ending with index %d", result.index), zap.Error(err)) + errChan <- err + return + } + batch = batch[:0] + } + } + } else { + buffer[result.index] = result.data + } + } + + if len(batch) > 0 { + if err := bfs.ProcessBlock(concatenateBatch(batch), uint32(len(batch))); err != nil { + bfs.log.Error("Failed to process final batch", zap.Error(err)) + errChan <- err + } + } + }() + + select { + case err := <-errChan: + if err != nil { + return err + } + case <-bfs.quit: + return nil + } + + wgProcess.Wait() + close(bfs.quit) + return nil +} + +func concatenateBatch(batch [][]byte) []byte { + totalSize := 0 + for _, b := range batch { + totalSize += len(b) + } + res := make([]byte, 0, totalSize) + for _, b := range batch { + res = append(res, b...) + } + return res +} diff --git a/pkg/services/blockfetcher/oid_obj_experiments.go b/pkg/services/blockfetcher/oid_obj_experiments.go new file mode 100644 index 0000000000..7c97e85d41 --- /dev/null +++ b/pkg/services/blockfetcher/oid_obj_experiments.go @@ -0,0 +1,89 @@ +package blockfetcher + +import ( + "fmt" + + "github.com/nspcc-dev/neofs-sdk-go/client" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +func (bfs *Service) fetchDataWithOid() error { + startIndex := uint32(0) + + for { + select { + case <-bfs.quit: + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter("block_oids", fmt.Sprintf("%d", startIndex), object.MatchStringEqual) + prm.SetFilters(filters) + + blockOidsObject, err := bfs.search(prm) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to find 'block_oids' object with index %d", startIndex), zap.Error(err)) + return err + } + fmt.Println("Found ", len(blockOidsObject), " blocks with index ", startIndex) + if len(blockOidsObject) == 0 { + bfs.log.Info(fmt.Sprintf("No 'block_oids' object found with index %d, stopping.", startIndex)) + break + } + + blockOidsData, err := bfs.get(blockOidsObject[0].String()) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to fetch 'block_oids' object %d", startIndex), zap.Error(err)) + return err + } + fmt.Println("Block OIDs data: ") + blockOIDs, err := parseBlockOIDs(blockOidsData) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to parse 'block_oids' object with index %d", startIndex), zap.Error(err)) + return err + } + fmt.Println("Block OIDs: ", len(blockOIDs)) + var res []byte + for _, oid := range blockOIDs { + data, err := bfs.get(oid.String()) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to fetch block with OID %s", oid.String()), zap.Error(err)) + return err + } + bfs.log.Info(fmt.Sprintf("Fetched block with OID %s", oid.String())) + res = append(res, data...) + } + + err = bfs.ProcessBlock(res, uint32(len(blockOIDs))) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to process blocks with index %d", startIndex), zap.Error(err)) + return err + } + + startIndex++ + } + } +} + +func parseBlockOIDs(data []byte) ([]oid.ID, error) { + var oids []oid.ID + oidSize := 32 + + if len(data)%oidSize != 0 { + return nil, fmt.Errorf("invalid data length: not a multiple of oid size") + } + + for i := 0; i < len(data); i += oidSize { + oidBytes := data[i : i+oidSize] + var oid oid.ID + err := oid.Decode(oidBytes) + if err != nil { + return nil, fmt.Errorf("failed to decode OID: %w", err) + } + oids = append(oids, oid) + } + + return oids, nil +} diff --git a/pkg/services/oracle/neofs/neofs.go b/pkg/services/oracle/neofs/neofs.go index 27351ba8b2..98f6982c12 100644 --- a/pkg/services/oracle/neofs/neofs.go +++ b/pkg/services/oracle/neofs/neofs.go @@ -9,6 +9,7 @@ import ( "net/url" "strconv" "strings" + "time" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" @@ -17,6 +18,8 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -82,6 +85,34 @@ func Get(ctx context.Context, priv *keys.PrivateKey, u *url.URL, addr string) (i return res, err } +// GetWithClient returns a neofs object from the provided url using the provided client. +// URI scheme is "neofs://". +// If Command is not provided, full object is requested. +func GetWithClient(ctx context.Context, c *client.Client, priv *keys.PrivateKey, u *url.URL, addr string) (io.ReadCloser, error) { + objectAddr, ps, err := parseNeoFSURL(u) + if err != nil { + return nil, err + } + var ( + res = clientCloseWrapper{c: c} + ) + + var s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) + switch { + case len(ps) == 0 || ps[0] == "": // Get request + res.ReadCloser, err = getPayload(ctx, s, c, objectAddr) + case ps[0] == rangeCmd: + res.ReadCloser, err = getRange(ctx, s, c, objectAddr, ps[1:]...) + case ps[0] == headerCmd: + res.ReadCloser, err = getHeader(ctx, s, c, objectAddr) + case ps[0] == hashCmd: + res.ReadCloser, err = getHash(ctx, s, c, objectAddr, ps[1:]...) + default: + err = ErrInvalidCommand + } + return res, err +} + type clientCloseWrapper struct { io.ReadCloser c *client.Client @@ -220,3 +251,59 @@ func parseRange(s string) (*object.Range, error) { r.SetLength(length) return r, nil } + +// ObjectSearch returns a list of object IDs from the provided container. +func ObjectSearch(ctx context.Context, c *client.Client, priv *keys.PrivateKey, containerID cid.ID, prm client.PrmObjectSearch) ([]oid.ID, error) { + var ( + s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) + objectIDs []oid.ID + ) + + reader, err := c.ObjectSearchInit(ctx, containerID, s, prm) + if err != nil { + return nil, fmt.Errorf("failed to initiate object search: %w", err) + } + defer reader.Close() + + err = reader.Iterate(func(oid oid.ID) bool { + objectIDs = append(objectIDs, oid) + return false + }) + if err != nil { + return nil, fmt.Errorf("error during object ID iteration: %w", err) + } + return objectIDs, nil +} + +// GetSDKClient returns a NeoFS SDK client configured with the specified address and context. +func GetSDKClient(ctx context.Context, addr string) (*client.Client, error) { + var ( + prmInit client.PrmInit + prmDial client.PrmDial + ) + + prmDial.SetServerURI(addr) + prmDial.SetContext(ctx) + + deadline, ok := ctx.Deadline() + if ok { + if timeout := time.Until(deadline); timeout > 0 { + prmDial.SetTimeout(timeout) + prmDial.SetStreamTimeout(timeout) + } + } + + c, err := client.New(prmInit) + if err != nil { + return nil, fmt.Errorf("can't create SDK client: %w", err) + } + + if err := c.Dial(prmDial); err != nil { + if status.Code(err) == codes.Unimplemented { + return c, nil + } + return nil, fmt.Errorf("can't init SDK client: %w", err) + } + + return c, nil +}