Skip to content

Commit

Permalink
cli: adjust index file creation in upload-bin
Browse files Browse the repository at this point in the history
In case of incomplete search result it will try to find each missed oid
and process it. In case of duplicates the first found will be in index
file.

Close #3647

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Oct 30, 2024
1 parent 9d26ca9 commit a7ce9c0
Showing 1 changed file with 106 additions and 44 deletions.
150 changes: 106 additions & 44 deletions cli/util/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,13 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun

var (
errCh = make(chan error)
buffer = make([]byte, indexFileSize*oidSize)
oidCh = make(chan oid.ID, indexFileSize)
oidFetcherToProcessor = make(chan struct{}, indexFileSize)

// processedIndices: position in index file -> block index.
processedIndices = sync.Map{}
buffer = make([]byte, indexFileSize*oidSize)

Check warning on line 360 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L357-L360

Added lines #L357 - L360 were not covered by tests
emptyOid = make([]byte, oidSize)
)
defer close(oidCh)
Expand Down Expand Up @@ -382,58 +385,54 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
}
return
}
offset := (uint(blockIndex) % indexFileSize) * oidSize
id.Encode(buffer[offset:])
indexFilePosition := uint(blockIndex) % indexFileSize
if _, exists := processedIndices.LoadOrStore(int(indexFilePosition), blockIndex); !exists {
id.Encode(buffer[indexFilePosition*oidSize:])
}

Check warning on line 391 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L388-L391

Added lines #L388 - L391 were not covered by tests
oidFetcherToProcessor <- struct{}{}
}
}()
}

for i := existingIndexCount; i < expectedIndexCount; i++ {
startIndex := i * indexFileSize
endIndex := startIndex + indexFileSize
go func() {
for j := int(startIndex); j < int(endIndex); j += searchBatchSize {
remaining := int(endIndex) - j
end := j + min(searchBatchSize, remaining)

prm = client.PrmObjectSearch{}
filters = object.NewSearchFilters()
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
prm.SetFilters(filters)
var objIDs []oid.ID
err := retry(func() error {
var errSearchIndex error
objIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm)
return errSearchIndex
})

if err != nil {
select {
case errCh <- fmt.Errorf("failed to search for objects from %d to %d for index file %d: %w", j, end, i, err):
default:
}
return
}

for _, id := range objIDs {
oidCh <- id
}
}
}()

var completed int
waitLoop:
for {
// Helper to wait for all objects to be processed
waitForCompletion := func(expectedCount int, errCh <-chan error, completionCh <-chan struct{}) error {
completed := 0
for completed < expectedCount {

Check warning on line 400 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L398-L400

Added lines #L398 - L400 were not covered by tests
select {
case err := <-errCh:
return err
case <-oidFetcherToProcessor:
case <-completionCh:

Check warning on line 404 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L404

Added line #L404 was not covered by tests
completed++
if completed == int(indexFileSize) {
break waitLoop
}
}
return nil

Check warning on line 408 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L408

Added line #L408 was not covered by tests
}

// Main processing loop for each index file
for i := existingIndexCount; i < expectedIndexCount; i++ {
startIndex := i * indexFileSize
endIndex := startIndex + indexFileSize
objIDs, err := searchObj(ctx.Context, p, containerID, account, blockAttributeKey, startIndex, endIndex, maxParallelSearches)
if err != nil {
return err
}
for _, id := range objIDs {
oidCh <- id
}
if err = waitForCompletion(len(objIDs), errCh, oidFetcherToProcessor); err != nil {
return err
}

Check warning on line 424 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L412-L424

Added lines #L412 - L424 were not covered by tests
// Check if there are any empty oid in the created index file.
// This could happen if searchObj returned not all objects, or we have duplicates.
// In this case, we need to retry the search.
for idx := range indexFileSize {
if _, exists := processedIndices.Load(int(idx)); !exists {
objIDs, err = searchObj(ctx.Context, p, containerID, account, blockAttributeKey, i*indexFileSize+idx, i*indexFileSize+idx+1, 1)
if err != nil {
return err

Check warning on line 432 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L428-L432

Added lines #L428 - L432 were not covered by tests
}
processedIndices.Store(int(idx), objIDs[0])
objIDs[0].Encode(buffer[idx*oidSize:])

Check warning on line 435 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L434-L435

Added lines #L434 - L435 were not covered by tests
}
}
// Check if there are any empty oids in the created index file.
Expand All @@ -448,17 +447,80 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
*object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
}
err := retry(func() error {
err = retry(func() error {

Check warning on line 450 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L450

Added line #L450 was not covered by tests
return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
})
if err != nil {
return fmt.Errorf("failed to upload index file %d: %w", i, err)
}
fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i)
buffer = make([]byte, indexFileSize*oidSize)

Check warning on line 457 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L457

Added line #L457 was not covered by tests
}
return nil
}

// searchObj searches in parallel for objects with attribute GE startIndex and LT endIndex.
func searchObj(ctx context.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches int) ([]oid.ID, error) {
var (
res []oid.ID
mu sync.Mutex
wg sync.WaitGroup
errCh = make(chan error)
sem = make(chan struct{}, maxParallelSearches)
)

for j := int(startIndex); j < int(endIndex); j += searchBatchSize {
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-errCh:
return nil, err
case sem <- struct{}{}:

Check warning on line 478 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L463-L478

Added lines #L463 - L478 were not covered by tests
}

wg.Add(1)
go func(j int) {
defer wg.Done()
defer func() { <-sem }()

Check warning on line 484 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L481-L484

Added lines #L481 - L484 were not covered by tests

remaining := int(endIndex) - j
end := j + min(searchBatchSize, remaining)

prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
prm.SetFilters(filters)

var objIDs []oid.ID
err := retry(func() error {
var errSearchIndex error
objIDs, errSearchIndex = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm)
return errSearchIndex
})
if err != nil {
select {
case errCh <- fmt.Errorf("failed to search for objects from %d to %d: %w", j, end, err):
default:

Check warning on line 504 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L486-L504

Added lines #L486 - L504 were not covered by tests
}
return

Check warning on line 506 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L506

Added line #L506 was not covered by tests
}

mu.Lock()
res = append(res, objIDs...)
mu.Unlock()

Check warning on line 511 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L509-L511

Added lines #L509 - L511 were not covered by tests
}(j)
}
wg.Wait()

select {
case err := <-errCh:
return nil, err
default:
return res, nil

Check warning on line 520 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L514-L520

Added lines #L514 - L520 were not covered by tests
}
}

// uploadObj uploads the block to the container using the pool.
func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, HomomorphicHashingDisabled bool) error {
var (
Expand Down

0 comments on commit a7ce9c0

Please sign in to comment.