Skip to content

Commit

Permalink
blockfetcher: add headerSizeMap to GetRange headers accordingly
Browse files Browse the repository at this point in the history
Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Feb 6, 2025
1 parent 5d9de68 commit 833b783
Showing 1 changed file with 43 additions and 18 deletions.
61 changes: 43 additions & 18 deletions pkg/services/blockfetcher/blockfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ func (p poolWrapper) Close() error {
return nil
}

type indexedOID struct {
Index int
OID oid.ID
}

// Service is a service that fetches blocks from NeoFS.
type Service struct {
// isActive denotes whether the service is working or in the process of shutdown.
Expand All @@ -72,15 +77,15 @@ type Service struct {
operationMode OperationMode

stateRootInHeader bool
// headerSize is the size of the header in bytes.
headerSize int
// headerSizeMap is a map of height to expected header size.
headerSizeMap map[int]int

chain Ledger
pool poolWrapper
enqueue func(obj bqueue.Indexable) error
account *wallet.Account

oidsCh chan oid.ID
oidsCh chan indexedOID
// wg is a wait group for block downloaders.
wg sync.WaitGroup

Expand All @@ -98,7 +103,7 @@ type Service struct {
shutdownCallback func()

// Depends on the OperationMode, the following functions are set to the appropriate functions.
getFunc func(ctx context.Context, oid string) (io.ReadCloser, error)
getFunc func(ctx context.Context, oid string, index int) (io.ReadCloser, error)
readFunc func(rc io.ReadCloser) (any, error)
heightFunc func() uint32
}
Expand Down Expand Up @@ -165,7 +170,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun
log: logger,
cfg: cfg,
operationMode: opt,
headerSize: getHeaderSize(chain.GetConfig()),
headerSizeMap: getHeaderSizeMap(chain.GetConfig()),

enqueue: put,
account: account,
Expand All @@ -181,12 +186,17 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun
// * first full block of OIDs is processing by Downloader
// * second full block of OIDs is available to be fetched by Downloader immediately
// * third half-filled block of OIDs is being collected by OIDsFetcher.
oidsCh: make(chan oid.ID, 2*cfg.OIDBatchSize),
oidsCh: make(chan indexedOID, 2*cfg.OIDBatchSize),
}, nil
}

func getHeaderSize(chain config.Blockchain) int {
return block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(0))
func getHeaderSizeMap(chain config.Blockchain) map[int]int {
headerSizeMap := make(map[int]int)
headerSizeMap[0] = block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(0))
for height := range chain.CommitteeHistory {
headerSizeMap[int(height)] = block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(height))
}

Check warning on line 198 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L197-L198

Added lines #L197 - L198 were not covered by tests
return headerSizeMap
}

// Start runs the NeoFS BlockFetcher service.
Expand Down Expand Up @@ -277,11 +287,13 @@ func (bfs *Service) oidDownloader() {
func (bfs *Service) blockDownloader() {
defer bfs.wg.Done()

for blkOid := range bfs.oidsCh {
for indexedOid := range bfs.oidsCh {
index := indexedOid.Index
blkOid := indexedOid.OID

Check warning on line 292 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L290-L292

Added lines #L290 - L292 were not covered by tests
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
defer cancel()

rc, err := bfs.getFunc(ctx, blkOid.String())
rc, err := bfs.getFunc(ctx, blkOid.String(), index)

Check warning on line 296 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L296

Added line #L296 was not covered by tests
if err != nil {
if isContextCanceledErr(err) {
return
Expand Down Expand Up @@ -347,15 +359,15 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {

blockCtx, blockCancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
defer blockCancel()
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String())
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String(), -1)

Check warning on line 362 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L362

Added line #L362 was not covered by tests
if err != nil {
if isContextCanceledErr(err) {
return nil
}
return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err)
}

err = bfs.streamBlockOIDs(oidsRC, int(skip))
err = bfs.streamBlockOIDs(oidsRC, int(startIndex), int(skip))

Check warning on line 370 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L370

Added line #L370 was not covered by tests
if err != nil {
if isContextCanceledErr(err) {
return nil
Expand All @@ -370,7 +382,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {
}

// streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel.
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, startIndex, skip int) error {

Check warning on line 385 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L385

Added line #L385 was not covered by tests
defer rc.Close()
oidBytes := make([]byte, oid.Size)
oidsProcessed := 0
Expand All @@ -397,7 +409,7 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
select {
case <-bfs.exiterToOIDDownloader:
return nil
case bfs.oidsCh <- oidBlock:
case bfs.oidsCh <- indexedOID{Index: startIndex*int(bfs.cfg.IndexFileSize) + oidsProcessed, OID: oidBlock}:

Check warning on line 412 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L412

Added line #L412 was not covered by tests
}

oidsProcessed++
Expand Down Expand Up @@ -442,12 +454,14 @@ func (bfs *Service) fetchOIDsBySearch() error {
bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no block found with index %d, stopping", startIndex))
return nil
}
index := int(startIndex)

Check warning on line 457 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L457

Added line #L457 was not covered by tests
for _, oid := range blockOids {
select {
case <-bfs.exiterToOIDDownloader:
return nil
case bfs.oidsCh <- oid:
case bfs.oidsCh <- indexedOID{Index: index, OID: oid}:

Check warning on line 462 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L462

Added line #L462 was not covered by tests
}
index++ //Won't work properly if neofs.ObjectSearch results are not ordered.

Check warning on line 464 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L464

Added line #L464 was not covered by tests
}
startIndex += batchSize
}
Expand Down Expand Up @@ -581,7 +595,7 @@ func (bfs *Service) retry(action func() error) error {
return err
}

func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) {
func (bfs *Service) objectGet(ctx context.Context, oid string, index int) (io.ReadCloser, error) {

Check warning on line 598 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L598

Added line #L598 was not covered by tests
u, err := url.Parse(fmt.Sprintf("%s:%s/%s", neofs.URIScheme, bfs.cfg.ContainerID, oid))
if err != nil {
return nil, err
Expand All @@ -594,8 +608,19 @@ func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, e
return rc, err
}

func (bfs *Service) objectGetRange(ctx context.Context, oid string) (io.ReadCloser, error) {
u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, bfs.headerSize))
func (bfs *Service) objectGetRange(ctx context.Context, oid string, height int) (io.ReadCloser, error) {
nearestHeight := 0
for h := range bfs.headerSizeMap {
if h <= height && h > nearestHeight {
nearestHeight = h
}
if nearestHeight >= height {
break

Check warning on line 618 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L611-L618

Added lines #L611 - L618 were not covered by tests
}
}

size := bfs.headerSizeMap[nearestHeight]
u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, size))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 833b783

Please sign in to comment.