Skip to content

Commit

Permalink
services: add new service for fetching blocks from NeoFS
Browse files Browse the repository at this point in the history
Close #3496

Co-authored-by: Anna Shaleva <[email protected]>
Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland and AnnaShaleva committed Sep 10, 2024
2 parents 68161b4 + 9f186ce commit 5027493
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 57 deletions.
3 changes: 2 additions & 1 deletion config/protocol.testnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ ApplicationConfiguration:
Timeout: 10m
DownloaderWorkersCount: 500
OIDBatchSize: 8000
BQueueSize: 32000 # must be larger than OIDBatchSize; highly recommended to be 2*OIDBatchSize or 3*OIDBatchSize
BQueueSize: 16000 # must be larger than OIDBatchSize; recommended to be 2*OIDBatchSize or 3*OIDBatchSize
SkipIndexFilesSearch: false
IndexFileSize: 128000
ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH"
BlockAttribute: "block"
IndexFileAttribute: "oid"
2 changes: 1 addition & 1 deletion docs/neofs-blockstorage.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ attributes:
Each index file is an object containing a constant-sized batch of raw block object
IDs in binary form ordered by block index. Each index file is marked with the
following attributes:
- index file identifier with consecutive file index value (`index:0`)
- index file identifier with consecutive file index value (`oid:0`)
- the number of OIDs included into index file (`size:128000`)

### NeoFS BlockFetcher
Expand Down
4 changes: 4 additions & 0 deletions docs/node-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ BlockFetcher module and has the following structure:
ContainerID: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH"
BlockAttribute: "block"
IndexFileAttribute: "oid"
IndexFileSize: 128000
```
where:
- `Enabled` enables NeoFS BlockFetcher module.
Expand All @@ -198,6 +199,9 @@ where:
to be `2*OIDBatchSize` or `3*OIDBatchSize`.
- `SkipIndexFilesSearch` is a flag that allows to skip index files search and search
for blocks directly. It is set to `false` by default.
- `IndexFileSize` is the number of OID objects stored in the index files. This
setting depends on the NeoFS block storage configuration and is applicable only if
`SkipIndexFilesSearch` is set to `false`.

### Metrics Services Configuration

Expand Down
2 changes: 1 addition & 1 deletion pkg/config/application_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (a *ApplicationConfiguration) GetAddresses() ([]AnnounceableAddress, error)
// configuration is valid and safe to use for further operations.
func (a *ApplicationConfiguration) Validate() error {
if err := a.NeoFSBlockFetcher.Validate(); err != nil {
return fmt.Errorf("failed to validate NeoFSBlockFetcher section: %w", err)
return fmt.Errorf("invalid NeoFSBlockFetcher config: %w", err)

Check warning on line 151 in pkg/config/application_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/application_config.go#L151

Added line #L151 was not covered by tests
}
return nil
}
4 changes: 4 additions & 0 deletions pkg/config/blockfetcher_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type NeoFSBlockFetcher struct {
DownloaderWorkersCount int `yaml:"DownloaderWorkersCount"`
BQueueSize int `yaml:"BQueueSize"`
SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"`
IndexFileSize uint32 `yaml:"IndexFileSize"`
}

// Validate checks NeoFSBlockFetcher for internal consistency and ensures
Expand All @@ -43,5 +44,8 @@ func (cfg *NeoFSBlockFetcher) Validate() error {
if len(cfg.Addresses) == 0 {
return errors.New("addresses are not set")

Check warning on line 45 in pkg/config/blockfetcher_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/blockfetcher_config.go#L44-L45

Added lines #L44 - L45 were not covered by tests
}
if !cfg.SkipIndexFilesSearch && cfg.IndexFileSize == 0 {
return errors.New("IndexFileSize is not set")

Check warning on line 48 in pkg/config/blockfetcher_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/blockfetcher_config.go#L47-L48

Added lines #L47 - L48 were not covered by tests
}
return nil

Check warning on line 50 in pkg/config/blockfetcher_config.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/blockfetcher_config.go#L50

Added line #L50 was not covered by tests
}
128 changes: 90 additions & 38 deletions pkg/services/blockfetcher/blockfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -25,8 +26,6 @@ import (
const (
// oidSize is the size of the object ID in NeoFS.
oidSize = sha256.Size
// indexFileSize is the number of OIDs in a single index file.
indexFileSize = 128000
// defaultTimeout is the default timeout for NeoFS requests.
defaultTimeout = 5 * time.Minute
// defaultOIDBatchSize is the default number of OIDs to search and fetch at once.
Expand Down Expand Up @@ -59,18 +58,22 @@ type Service struct {
// wg is a wait group for block downloaders.
wg sync.WaitGroup

// Global context for download operations cancellation.
ctx context.Context
ctxCancel context.CancelFunc

// A set of routines managing graceful Service shutdown.
quit chan struct{}
quit chan bool
quitOnce sync.Once
exiterToOIDDownloader chan struct{}
exiterToShutdown chan struct{}
oidDownloaderToExiter chan struct{}
blockPutterToExiter chan struct{}
blockQueuerToExiter chan struct{}

shutdownCallback func()
}

// New creates a new BlockFetcherService.
// New creates a new BlockFetcher Service.
func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, shutdownCallback func()) (*Service, error) {
var (
account *wallet.Account
Expand Down Expand Up @@ -106,7 +109,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
if cfg.DownloaderWorkersCount <= 0 {
cfg.DownloaderWorkersCount = defaultDownloaderWorkersCount
}
if cfg.Addresses == nil {
if len(cfg.Addresses) == 0 {
return &Service{}, errors.New("no addresses provided")
}
return &Service{
Expand All @@ -119,11 +122,11 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
stateRootInHeader: chain.GetConfig().StateRootInHeader,
shutdownCallback: shutdownCallback,

quit: make(chan struct{}),
quit: make(chan bool),
exiterToOIDDownloader: make(chan struct{}),
exiterToShutdown: make(chan struct{}),
oidDownloaderToExiter: make(chan struct{}),
blockPutterToExiter: make(chan struct{}),
blockQueuerToExiter: make(chan struct{}),

// Use buffer of two batch sizes to load OIDs in advance:
// * first full block of OIDs is processing by Downloader
Expand All @@ -145,7 +148,8 @@ func (bfs *Service) Start() error {
bfs.log.Info("starting NeoFS BlockFetcher service")

var err error
bfs.client, err = neofs.GetSDKClient(context.Background(), bfs.cfg.Addresses[0], 10*time.Minute)
bfs.ctx, bfs.ctxCancel = context.WithCancel(context.Background())
bfs.client, err = neofs.GetSDKClient(bfs.ctx, bfs.cfg.Addresses[0], 10*time.Minute)
if err != nil {
bfs.isActive.CompareAndSwap(true, false)
return fmt.Errorf("create SDK client: %w", err)
Expand Down Expand Up @@ -179,56 +183,74 @@ func (bfs *Service) oidDownloader() {
} else {
err = bfs.fetchOIDsFromIndexFiles()

Check warning on line 184 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L180-L184

Added lines #L180 - L184 were not covered by tests
}
var force bool
if err != nil {
bfs.log.Error("NeoFS BlockFetcher service: OID downloading routine failed", zap.Error(err))
force = true

Check warning on line 189 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L186-L189

Added lines #L186 - L189 were not covered by tests
}
// Gracefully stop the service since there's nothing to do anymore.
bfs.stopService()
// Stop the service since there's nothing to do anymore.
bfs.stopService(force)

Check warning on line 192 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L192

Added line #L192 was not covered by tests
}

// blockDownloader downloads the block from NeoFS and sends it to the blocks channel.
func (bfs *Service) blockDownloader() {
defer bfs.wg.Done()

Check warning on line 197 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L196-L197

Added lines #L196 - L197 were not covered by tests

for blkOid := range bfs.oidsCh {
ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout)
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
defer cancel()

Check warning on line 201 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L199-L201

Added lines #L199 - L201 were not covered by tests

rc, err := bfs.objectGet(ctx, blkOid.String())
if err != nil {
if isContextCanceledErr(err) {
return

Check warning on line 206 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L203-L206

Added lines #L203 - L206 were not covered by tests
}
bfs.log.Error("failed to objectGet block", zap.Error(err))
bfs.stopService()
bfs.stopService(true)
return

Check warning on line 210 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L208-L210

Added lines #L208 - L210 were not covered by tests
}

b, err := bfs.readBlock(rc)
if err != nil {
if isContextCanceledErr(err) {
return

Check warning on line 216 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L213-L216

Added lines #L213 - L216 were not covered by tests
}
bfs.log.Error("failed to read block", zap.Error(err))
bfs.stopService()
bfs.stopService(true)
return

Check warning on line 220 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L218-L220

Added lines #L218 - L220 were not covered by tests
}
bfs.blocksCh <- b
select {
case <-bfs.ctx.Done():
return
case bfs.blocksCh <- b:

Check warning on line 225 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L222-L225

Added lines #L222 - L225 were not covered by tests
}
}
}

// blockQueuer puts the block into the bqueue.
func (bfs *Service) blockQueuer() {
defer close(bfs.blockPutterToExiter)
defer close(bfs.blockQueuerToExiter)

Check warning on line 232 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L231-L232

Added lines #L231 - L232 were not covered by tests

for b := range bfs.blocksCh {
err := bfs.enqueueBlock(b)
if err != nil {
bfs.log.Error("failed to enqueue block", zap.Error(err))
bfs.stopService()
select {
case <-bfs.ctx.Done():
return
default:
err := bfs.enqueueBlock(b)
if err != nil {
bfs.log.Error("failed to enqueue block", zap.Error(err))
bfs.stopService(true)
return

Check warning on line 243 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L234-L243

Added lines #L234 - L243 were not covered by tests
}
}
}
}

// fetchOIDsFromIndexFiles fetches block OIDs from NeoFS by searching index files first.
func (bfs *Service) fetchOIDsFromIndexFiles() error {
h := bfs.chain.BlockHeight()
startIndex := h/indexFileSize + 1
skip := h % indexFileSize
startIndex := h/bfs.cfg.IndexFileSize + 1
skip := h % bfs.cfg.IndexFileSize

Check warning on line 253 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L250-L253

Added lines #L250 - L253 were not covered by tests

for {
select {
Expand All @@ -240,26 +262,35 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {
filters.AddFilter(bfs.cfg.IndexFileAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
prm.SetFilters(filters)

Check warning on line 263 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L255-L263

Added lines #L255 - L263 were not covered by tests

ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout)
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
blockOidsObject, err := bfs.objectSearch(ctx, prm)
cancel()
if err != nil {
if isContextCanceledErr(err) {
return nil

Check warning on line 270 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L265-L270

Added lines #L265 - L270 were not covered by tests
}
return fmt.Errorf("failed to find '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err)

Check warning on line 272 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L272

Added line #L272 was not covered by tests
}
if len(blockOidsObject) == 0 {
bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no '%s' object found with index %d, stopping", bfs.cfg.IndexFileAttribute, startIndex))
return nil

Check warning on line 276 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L274-L276

Added lines #L274 - L276 were not covered by tests
}

blockCtx, blockCancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout)
blockCtx, blockCancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
defer blockCancel()
blockOidsData, err := bfs.objectGet(blockCtx, blockOidsObject[0].String())
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String())
if err != nil {
if isContextCanceledErr(err) {
return nil

Check warning on line 284 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L279-L284

Added lines #L279 - L284 were not covered by tests
}
return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err)

Check warning on line 286 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L286

Added line #L286 was not covered by tests
}

err = bfs.streamBlockOIDs(blockOidsData, int(skip))
err = bfs.streamBlockOIDs(oidsRC, int(skip))
if err != nil {
if isContextCanceledErr(err) {
return nil

Check warning on line 292 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L289-L292

Added lines #L289 - L292 were not covered by tests
}
return fmt.Errorf("failed to stream block OIDs with index %d: %w", startIndex, err)

Check warning on line 294 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L294

Added line #L294 was not covered by tests
}

Expand Down Expand Up @@ -302,8 +333,8 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {

oidsProcessed++

Check warning on line 334 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L334

Added line #L334 was not covered by tests
}
if oidsProcessed != indexFileSize {
return fmt.Errorf("block OIDs count mismatch: expected %d, processed %d", indexFileSize, oidsProcessed)
if oidsProcessed != int(bfs.cfg.IndexFileSize) {
return fmt.Errorf("block OIDs count mismatch: expected %d, processed %d", bfs.cfg.IndexFileSize, oidsProcessed)

Check warning on line 337 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L336-L337

Added lines #L336 - L337 were not covered by tests
}
return nil

Check warning on line 339 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L339

Added line #L339 was not covered by tests
}
Expand All @@ -323,10 +354,13 @@ func (bfs *Service) fetchOIDsBySearch() error {
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE)
prm.SetFilters(filters)
ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout)
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
blockOids, err := bfs.objectSearch(ctx, prm)
cancel()
if err != nil {
if isContextCanceledErr(err) {
return nil

Check warning on line 362 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L347-L362

Added lines #L347 - L362 were not covered by tests
}
return err

Check warning on line 364 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L364

Added line #L364 was not covered by tests
}

Expand Down Expand Up @@ -355,20 +389,22 @@ func (bfs *Service) readBlock(rc io.ReadCloser) (*block.Block, error) {
return b, r.Err

Check warning on line 389 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L384-L389

Added lines #L384 - L389 were not covered by tests
}

// Shutdown stops the NeoFS BlockFetcher service gracefully. It prevents service from new
// block OIDs search and waits until all downloaders finish their work.
// Shutdown stops the NeoFS BlockFetcher service. It prevents service from new
// block OIDs search, cancels all in-progress downloading operations and waits
// until all service routines finish their work.
func (bfs *Service) Shutdown() {
if !bfs.IsActive() {
return

Check warning on line 397 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L395-L397

Added lines #L395 - L397 were not covered by tests
}
bfs.stopService()
bfs.stopService(true)
<-bfs.exiterToShutdown

Check warning on line 400 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L399-L400

Added lines #L399 - L400 were not covered by tests
}

// stopService close quitting goroutine once. It's the only entrypoint to shutdown
// procedure.
func (bfs *Service) stopService() {
func (bfs *Service) stopService(force bool) {
bfs.quitOnce.Do(func() {
bfs.quit <- force
close(bfs.quit)
})

Check warning on line 409 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L405-L409

Added lines #L405 - L409 were not covered by tests
}
Expand All @@ -377,8 +413,16 @@ func (bfs *Service) stopService() {
// Service shutdown process.
func (bfs *Service) exiter() {

Check warning on line 414 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L414

Added line #L414 was not covered by tests
// Closing signal may come from anyone, but only once.
<-bfs.quit
bfs.log.Info("shutting down NeoFS BlockFetcher service")
force := <-bfs.quit
bfs.log.Info("shutting down NeoFS BlockFetcher service",
zap.Bool("force", force),
)

Check warning on line 419 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L416-L419

Added lines #L416 - L419 were not covered by tests

// Cansel all pending OIDs/blocks downloads in case if shutdown requested by user
// or caused by downloading error.
if force {
bfs.ctxCancel()

Check warning on line 424 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L423-L424

Added lines #L423 - L424 were not covered by tests
}

// Send signal to OID downloader to stop. Wait until OID downloader finishes his
// work.
Expand All @@ -392,10 +436,11 @@ func (bfs *Service) exiter() {

// Send signal to block putter to finish his work. Wait until it's finished.
close(bfs.blocksCh)
<-bfs.blockPutterToExiter
<-bfs.blockQueuerToExiter

Check warning on line 439 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L438-L439

Added lines #L438 - L439 were not covered by tests

// Everything is done, turn off the activity marker and let the server know about
// it.
// Everything is done, release resources, turn off the activity marker and let
// the server know about it.
_ = bfs.client.Close()
_ = bfs.log.Sync()
bfs.isActive.CompareAndSwap(true, false)
bfs.shutdownCallback()

Check warning on line 446 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L443-L446

Added lines #L443 - L446 were not covered by tests
Expand Down Expand Up @@ -425,3 +470,10 @@ func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, e
func (bfs *Service) objectSearch(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) {
return neofs.ObjectSearch(ctx, bfs.client, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm)

Check warning on line 471 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L470-L471

Added lines #L470 - L471 were not covered by tests
}

// isContextCanceledErr returns whether error is a wrapped [context.Canceled].
// Ref. https://github.com/nspcc-dev/neofs-sdk-go/issues/624.
func isContextCanceledErr(err error) bool {
return errors.Is(err, context.Canceled) ||
strings.Contains(err.Error(), "context canceled")

Check warning on line 478 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L476-L478

Added lines #L476 - L478 were not covered by tests
}
Loading

0 comments on commit 5027493

Please sign in to comment.