Skip to content

Commit

Permalink
services: add new service for fetching blocks from NeoFS
Browse files Browse the repository at this point in the history
Close #3496

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Jul 22, 2024
1 parent c207b9b commit db88338
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 19 deletions.
14 changes: 7 additions & 7 deletions cli/server/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/storage/dboper"
)

type dump []blockDump
type Dump []blockDump

type blockDump struct {
Block uint32 `json:"block"`
Size int `json:"size"`
Storage []dboper.Operation `json:"storage"`
}

func newDump() *dump {
return new(dump)
func NewDump() *Dump {
return new(Dump)
}

func (d *dump) add(index uint32, batch *storage.MemBatch) {
func (d *Dump) Add(index uint32, batch *storage.MemBatch) {
ops := storage.BatchToOperations(batch)
*d = append(*d, blockDump{
Block: index,
Expand All @@ -31,7 +31,7 @@ func (d *dump) add(index uint32, batch *storage.MemBatch) {
})
}

func (d *dump) tryPersist(prefix string, index uint32) error {
func (d *Dump) TryPersist(prefix string, index uint32) error {
if len(*d) == 0 {
return nil
}
Expand Down Expand Up @@ -62,12 +62,12 @@ func (d *dump) tryPersist(prefix string, index uint32) error {
return nil
}

func readFile(path string) (*dump, error) {
func readFile(path string) (*Dump, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
d := newDump()
d := NewDump()
if err := json.Unmarshal(data, d); err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,9 @@ func restoreDB(ctx *cli.Context) error {

gctx := newGraceContext()
var lastIndex uint32
dump := newDump()
dump := NewDump()
defer func() {
_ = dump.tryPersist(dumpDir, lastIndex)
_ = dump.TryPersist(dumpDir, lastIndex)
}()

var f = func(b *block.Block) error {
Expand All @@ -312,10 +312,10 @@ func restoreDB(ctx *cli.Context) error {
if batch == nil && b.Index == 0 {
return nil
}
dump.add(b.Index, batch)
dump.Add(b.Index, batch)
lastIndex = b.Index
if b.Index%1000 == 0 {
if err := dump.tryPersist(dumpDir, b.Index); err != nil {
if err := dump.TryPersist(dumpDir, b.Index); err != nil {
return fmt.Errorf("can't dump storage to file: %w", err)
}
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/core/storage/dbconfig/store_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,14 @@ type (
ReadOnly bool `yaml:"ReadOnly"`
}
)

// FilePath returns the file path for the DB. In case "inmemory" DB is used, it returns an empty string.
func (db DBConfiguration) FilePath() string {
switch db.Type {
case "boltdb":
return db.BoltDBOptions.FilePath
case "leveldb":
return db.LevelDBOptions.DataDirectoryPath

Check warning on line 32 in pkg/core/storage/dbconfig/store_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/storage/dbconfig/store_config.go#L27-L32

Added lines #L27 - L32 were not covered by tests
}
return ""

Check warning on line 34 in pkg/core/storage/dbconfig/store_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/storage/dbconfig/store_config.go#L34

Added line #L34 was not covered by tests
}
156 changes: 156 additions & 0 deletions pkg/services/blockfetcher/blockfetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package blockfetcher

import (
"context"
"fmt"
"io"
"net/url"
"time"

cli "github.com/nspcc-dev/neo-go/cli/server"
"github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/chaindump"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
gio "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"go.uber.org/zap"
)

type Service struct {
chain *core.Blockchain
log *zap.Logger
client *client.Client
db storage.Store
quit chan bool
containerID cid.ID
//HashCid string
//one more cid
Timeout time.Duration
Nodes []string
dumpDir string
}

// New creates a new BlockFetcherService.
func New(chain *core.Blockchain, store storage.Store, logger *zap.Logger) *Service {
neofsClient, err := client.New(client.PrmInit{})
if err != nil {
logger.Error("Failed to create NeoFS client", zap.Error(err))
return nil

Check warning on line 42 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L41-L42

Added lines #L41 - L42 were not covered by tests
}
var containerID cid.ID
err = containerID.DecodeString("9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG")
if err != nil {
logger.Error("Failed to decode container ID", zap.Error(err))
return nil

Check warning on line 48 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L47-L48

Added lines #L47 - L48 were not covered by tests
}
return &Service{
chain: chain,
log: logger,
client: neofsClient,
db: store,
quit: make(chan bool),
dumpDir: "./dump",
containerID: containerID,
Nodes: []string{"st1.t5.fs.neo.org:8080"},
Timeout: 20 * time.Second,
}
}

// Name implements the core.Service interface.
func (bfs *Service) Name() string {
return "BlockFetcherService"
}

// Start implements the core.Service interface.
func (bfs *Service) Start() {
bfs.log.Info("Starting Block Fetcher Service")
err := bfs.fetchBlocks()
if err != nil {
return

Check warning on line 73 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L73

Added line #L73 was not covered by tests
}
}

// Shutdown implements the core.Service interface.
func (bfs *Service) Shutdown() {
bfs.log.Info("Shutting down Block Fetcher Service")
close(bfs.quit)

Check warning on line 80 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L78-L80

Added lines #L78 - L80 were not covered by tests
}

func (bfs *Service) fetchBlocks() error {
for {
select {
case <-bfs.quit:
return nil
default:
privateKey, err := keys.NewPrivateKey()
if err != nil {
bfs.log.Error("Failed to create private key", zap.Error(err))
continue

Check warning on line 92 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L91-L92

Added lines #L91 - L92 were not covered by tests
}
ctx, cancel := context.WithTimeout(context.Background(), bfs.Timeout)
defer cancel()
blocks, err := neofs.ObjectSearch(ctx, bfs.client, privateKey, bfs.containerID, bfs.Nodes[0])
fmt.Println(blocks)
if err != nil {
bfs.log.Error("Failed to fetch object IDs", zap.Error(err))
continue

Check warning on line 100 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L99-L100

Added lines #L99 - L100 were not covered by tests
}
for _, obj := range blocks {
u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.containerID, obj))
if err != nil {
bfs.log.Error("Failed to parse URL", zap.Error(err))
continue

Check warning on line 106 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L105-L106

Added lines #L105 - L106 were not covered by tests
}
rc, err := neofs.Get(ctx, bfs.client, privateKey, u, bfs.Nodes[0])
if err != nil {
bfs.log.Error("Failed to get object", zap.Error(err))
continue

Check warning on line 111 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L110-L111

Added lines #L110 - L111 were not covered by tests
}
data, err := io.ReadAll(rc)
if err != nil {
bfs.log.Error("Failed to read data", zap.Error(err))
continue

Check warning on line 116 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L115-L116

Added lines #L115 - L116 were not covered by tests
}
err = bfs.processBlock(data, 2)
if err != nil {
bfs.log.Error("Failed to process block", zap.Error(err))
}
}
cancel()
close(bfs.quit)
}
}
}

func (bfs *Service) processBlock(data []byte, count uint32) error {
br := gio.NewBinReaderFromBuf(data)
dump := cli.NewDump()
var lastIndex uint32

err := chaindump.Restore(bfs.chain, br, bfs.chain.BlockHeight(), count, func(b *block.Block) error {
batch := bfs.chain.LastBatch()
if batch != nil {
dump.Add(b.Index, batch)
lastIndex = b.Index
if b.Index%1000 == 0 {

Check warning on line 139 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L137-L139

Added lines #L137 - L139 were not covered by tests
//need test for this
if err := dump.TryPersist(bfs.dumpDir, lastIndex); err != nil {
return fmt.Errorf("can't dump storage to file: %w", err)

Check warning on line 142 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L141-L142

Added lines #L141 - L142 were not covered by tests
}
}
}
return nil
})
if err != nil {
return fmt.Errorf("failed to restore blocks: %w", err)
}

if err = dump.TryPersist(bfs.dumpDir, lastIndex); err != nil {
return fmt.Errorf("final persistence failed: %w", err)

Check warning on line 153 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L153

Added line #L153 was not covered by tests
}
return err
}
52 changes: 52 additions & 0 deletions pkg/services/blockfetcher/blockfetcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package blockfetcher

import (
"fmt"
"testing"

"github.com/nspcc-dev/neo-go/internal/basicchain"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/chaindump"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
gio "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/neotest"
"github.com/nspcc-dev/neo-go/pkg/neotest/chain"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)

func TestProcessBlock(t *testing.T) {
bc, validators, committee := chain.NewMultiWithCustomConfig(t,
func(c *config.Blockchain) {
c.P2PSigExtensions = true
})
e := neotest.NewExecutor(t, bc, validators, committee)

basicchain.Init(t, "../../../", e)
require.True(t, bc.BlockHeight() > 5)

w := gio.NewBufBinWriter()
require.NoError(t, chaindump.Dump(bc, w.BinWriter, 0, bc.BlockHeight()+1))
require.NoError(t, w.Err)
buf := w.Bytes()
bc2, _, _ := chain.NewMultiWithCustomConfig(t, func(c *config.Blockchain) {
c.P2PSigExtensions = true
})
serv := New(bc2, storage.NewMemoryStore(), zaptest.NewLogger(t))
err := serv.processBlock(buf, bc.BlockHeight()+1)
require.NoError(t, err)
require.Equal(t, bc.BlockHeight(), bc2.BlockHeight())
}

func TestService(t *testing.T) {
bc, _, _ := chain.NewMultiWithCustomConfig(t,
func(c *config.Blockchain) {
c.P2PSigExtensions = true
})

serv := New(bc, storage.NewMemoryStore(), zaptest.NewLogger(t))
require.NotNil(t, serv)
require.Equal(t, "BlockFetcherService", serv.Name())
serv.Start()
fmt.Println(bc.BlockHeight())
}
40 changes: 33 additions & 7 deletions pkg/services/oracle/neofs/neofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,11 @@ var (
// Get returns a neofs object from the provided url.
// URI scheme is "neofs:<Container-ID>/<Object-ID/<Command>/<Params>".
// If Command is not provided, full object is requested.
func Get(ctx context.Context, priv *keys.PrivateKey, u *url.URL, addr string) (io.ReadCloser, error) {
func Get(ctx context.Context, c *client.Client, priv *keys.PrivateKey, u *url.URL, addr string) (io.ReadCloser, error) {
objectAddr, ps, err := parseNeoFSURL(u)
if err != nil {
return nil, err
}

c, err := client.New(client.PrmInit{})
if err != nil {
return nil, fmt.Errorf("failed to create client: %w", err)
}

var (
res = clientCloseWrapper{c: c}
prmd client.PrmDial
Expand Down Expand Up @@ -220,3 +214,35 @@ func parseRange(s string) (*object.Range, error) {
r.SetLength(length)
return r, nil
}

// ObjectSearch returns a list of object IDs from the provided container.
func ObjectSearch(ctx context.Context, c *client.Client, priv *keys.PrivateKey, containerID cid.ID, addr string) ([]oid.ID, error) {
var (
prmd client.PrmDial
s = user.NewAutoIDSignerRFC6979(priv.PrivateKey)
objectIDs []oid.ID
)

prmd.SetServerURI(addr)
prmd.SetContext(ctx)
err := c.Dial(prmd)
if err != nil {
return nil, err

Check warning on line 230 in pkg/services/oracle/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/oracle/neofs/neofs.go#L230

Added line #L230 was not covered by tests
}
prm := client.PrmObjectSearch{}

reader, err := c.ObjectSearchInit(ctx, containerID, s, prm)
if err != nil {
return nil, fmt.Errorf("failed to initiate object search: %w", err)

Check warning on line 236 in pkg/services/oracle/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/oracle/neofs/neofs.go#L236

Added line #L236 was not covered by tests
}
defer reader.Close()

err = reader.Iterate(func(oid oid.ID) bool {
objectIDs = append(objectIDs, oid)
return false
})
if err != nil {
return nil, fmt.Errorf("error during object ID iteration: %w", err)

Check warning on line 245 in pkg/services/oracle/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/oracle/neofs/neofs.go#L245

Added line #L245 was not covered by tests
}
return objectIDs, nil
}
23 changes: 23 additions & 0 deletions pkg/services/oracle/neofs/neofs_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package neofs

import (
"context"
"net/url"
"testing"
"time"

"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -72,3 +77,21 @@ func TestParseNeoFSURL(t *testing.T) {
})
}
}

func TestFetchListFromNeoFS(t *testing.T) {
privateKey, err := keys.NewPrivateKey()
require.NoError(t, err)

cStr := "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG"
var containerID cid.ID
if err := containerID.DecodeString(cStr); err != nil {
require.NoError(t, err)
}
neofsClient, err := client.New(client.PrmInit{})
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

_, err = ObjectSearch(ctx, neofsClient, privateKey, containerID, "st1.t5.fs.neo.org:8080")
require.NoError(t, err)
}
Loading

0 comments on commit db88338

Please sign in to comment.