Skip to content

Commit

Permalink
blockfetcher: add headerSizeMap to GetRange headers accordingly
Browse files Browse the repository at this point in the history
Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Feb 6, 2025
1 parent 0c71ba3 commit 7419fdd
Showing 1 changed file with 43 additions and 18 deletions.
61 changes: 43 additions & 18 deletions pkg/services/blockfetcher/blockfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func (p poolWrapper) Close() error {
return nil
}

type indexedOID struct {
Index int
OID oid.ID
}

// Service is a service that fetches blocks from NeoFS.
type Service struct {
// isActive denotes whether the service is working or in the process of shutdown.
Expand All @@ -71,15 +76,15 @@ type Service struct {
operationMode OperationMode

stateRootInHeader bool
// headerSize is the size of the header in bytes.
headerSize int
// headerSizeMap is a map of height to expected header size.
headerSizeMap map[int]int

chain Ledger
pool poolWrapper
enqueue func(obj any) error
account *wallet.Account

oidsCh chan oid.ID
oidsCh chan indexedOID
// wg is a wait group for block downloaders.
wg sync.WaitGroup

Expand All @@ -97,7 +102,7 @@ type Service struct {
shutdownCallback func()

// Depends on the OperationMode, the following functions are set to the appropriate functions.
getFunc func(ctx context.Context, oid string) (io.ReadCloser, error)
getFunc func(ctx context.Context, oid string, index int) (io.ReadCloser, error)
readFunc func(rc io.ReadCloser) (any, error)
heightFunc func() uint32
}
Expand Down Expand Up @@ -164,7 +169,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun
log: logger,
cfg: cfg,
operationMode: opt,
headerSize: getHeaderSize(chain.GetConfig()),
headerSizeMap: getHeaderSizeMap(chain.GetConfig()),

enqueue: put,
account: account,
Expand All @@ -180,12 +185,17 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun
// * first full block of OIDs is processing by Downloader
// * second full block of OIDs is available to be fetched by Downloader immediately
// * third half-filled block of OIDs is being collected by OIDsFetcher.
oidsCh: make(chan oid.ID, 2*cfg.OIDBatchSize),
oidsCh: make(chan indexedOID, 2*cfg.OIDBatchSize),
}, nil
}

func getHeaderSize(chain config.Blockchain) int {
return block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(0))
func getHeaderSizeMap(chain config.Blockchain) map[int]int {
headerSizeMap := make(map[int]int)
headerSizeMap[0] = block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(0))
for height := range chain.CommitteeHistory {
headerSizeMap[int(height)] = block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(height))
}

Check warning on line 197 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L196-L197

Added lines #L196 - L197 were not covered by tests
return headerSizeMap
}

// Start runs the NeoFS BlockFetcher service.
Expand Down Expand Up @@ -276,11 +286,13 @@ func (bfs *Service) oidDownloader() {
func (bfs *Service) blockDownloader() {
defer bfs.wg.Done()

for blkOid := range bfs.oidsCh {
for indexedOid := range bfs.oidsCh {
index := indexedOid.Index
blkOid := indexedOid.OID

Check warning on line 291 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L289-L291

Added lines #L289 - L291 were not covered by tests
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
defer cancel()

rc, err := bfs.getFunc(ctx, blkOid.String())
rc, err := bfs.getFunc(ctx, blkOid.String(), index)

Check warning on line 295 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L295

Added line #L295 was not covered by tests
if err != nil {
if isContextCanceledErr(err) {
return
Expand Down Expand Up @@ -346,15 +358,15 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {

blockCtx, blockCancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
defer blockCancel()
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String())
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String(), -1)

Check warning on line 361 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L361

Added line #L361 was not covered by tests
if err != nil {
if isContextCanceledErr(err) {
return nil
}
return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err)
}

err = bfs.streamBlockOIDs(oidsRC, int(skip))
err = bfs.streamBlockOIDs(oidsRC, int(startIndex), int(skip))

Check warning on line 369 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L369

Added line #L369 was not covered by tests
if err != nil {
if isContextCanceledErr(err) {
return nil
Expand All @@ -369,7 +381,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {
}

// streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel.
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, startIndex, skip int) error {

Check warning on line 384 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L384

Added line #L384 was not covered by tests
defer rc.Close()
oidBytes := make([]byte, oid.Size)
oidsProcessed := 0
Expand All @@ -396,7 +408,7 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
select {
case <-bfs.exiterToOIDDownloader:
return nil
case bfs.oidsCh <- oidBlock:
case bfs.oidsCh <- indexedOID{Index: startIndex*int(bfs.cfg.IndexFileSize) + oidsProcessed, OID: oidBlock}:

Check warning on line 411 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L411

Added line #L411 was not covered by tests
}

oidsProcessed++
Expand Down Expand Up @@ -441,12 +453,14 @@ func (bfs *Service) fetchOIDsBySearch() error {
bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no block found with index %d, stopping", startIndex))
return nil
}
index := int(startIndex)

Check warning on line 456 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L456

Added line #L456 was not covered by tests
for _, oid := range blockOids {
select {
case <-bfs.exiterToOIDDownloader:
return nil
case bfs.oidsCh <- oid:
case bfs.oidsCh <- indexedOID{Index: index, OID: oid}:

Check warning on line 461 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L461

Added line #L461 was not covered by tests
}
index++ //Won't work properly if neofs.ObjectSearch results are not ordered.

Check warning on line 463 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L463

Added line #L463 was not covered by tests
}
startIndex += batchSize
}
Expand Down Expand Up @@ -580,7 +594,7 @@ func (bfs *Service) retry(action func() error) error {
return err
}

func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) {
func (bfs *Service) objectGet(ctx context.Context, oid string, index int) (io.ReadCloser, error) {

Check warning on line 597 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L597

Added line #L597 was not covered by tests
u, err := url.Parse(fmt.Sprintf("%s:%s/%s", neofs.URIScheme, bfs.cfg.ContainerID, oid))
if err != nil {
return nil, err
Expand All @@ -593,8 +607,19 @@ func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, e
return rc, err
}

func (bfs *Service) objectGetRange(ctx context.Context, oid string) (io.ReadCloser, error) {
u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, bfs.headerSize))
func (bfs *Service) objectGetRange(ctx context.Context, oid string, height int) (io.ReadCloser, error) {
nearestHeight := 0
for h := range bfs.headerSizeMap {
if h <= height && h > nearestHeight {
nearestHeight = h
}
if nearestHeight >= height {
break

Check warning on line 617 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L610-L617

Added lines #L610 - L617 were not covered by tests
}
}

size := bfs.headerSizeMap[nearestHeight]
u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, size))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 7419fdd

Please sign in to comment.