diff --git a/cli/util/uploader.go b/cli/util/uploader.go index d716d9b092..6e13d6a2b7 100644 --- a/cli/util/uploader.go +++ b/cli/util/uploader.go @@ -351,10 +351,13 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun var ( errCh = make(chan error) - buffer = make([]byte, indexFileSize*oidSize) oidCh = make(chan oid.ID, indexFileSize) oidFetcherToProcessor = make(chan struct{}, indexFileSize) + // processedIndices: position in index file -> block index. + processedIndices = sync.Map{} + buffer = make([]byte, indexFileSize*oidSize) + emptyOid = make([]byte, oidSize) ) defer close(oidCh) @@ -382,58 +385,54 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun } return } - offset := (uint(blockIndex) % indexFileSize) * oidSize - id.Encode(buffer[offset:]) + indexFilePosition := uint(blockIndex) % indexFileSize + if _, exists := processedIndices.LoadOrStore(int(indexFilePosition), blockIndex); !exists { + id.Encode(buffer[indexFilePosition*oidSize:]) + } oidFetcherToProcessor <- struct{}{} } }() } - for i := existingIndexCount; i < expectedIndexCount; i++ { - startIndex := i * indexFileSize - endIndex := startIndex + indexFileSize - go func() { - for j := int(startIndex); j < int(endIndex); j += searchBatchSize { - remaining := int(endIndex) - j - end := j + min(searchBatchSize, remaining) - - prm = client.PrmObjectSearch{} - filters = object.NewSearchFilters() - filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE) - filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT) - prm.SetFilters(filters) - var objIDs []oid.ID - err := retry(func() error { - var errSearchIndex error - objIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm) - return errSearchIndex - }) - - if err != nil { - select { - case errCh <- fmt.Errorf("failed to search for objects from %d to %d for index file %d: %w", j, end, i, err): - default: - } - return - } - - for _, id := range objIDs { - oidCh <- id - } - } - }() - - var completed int - waitLoop: - for { + // Helper to wait for all objects to be processed + waitForCompletion := func(expectedCount int, errCh <-chan error, completionCh <-chan struct{}) error { + completed := 0 + for completed < expectedCount { select { case err := <-errCh: return err - case <-oidFetcherToProcessor: + case <-completionCh: completed++ - if completed == int(indexFileSize) { - break waitLoop + } + } + return nil + } + + // Main processing loop for each index file + for i := existingIndexCount; i < expectedIndexCount; i++ { + startIndex := i * indexFileSize + endIndex := startIndex + indexFileSize + objIDs, err := searchObj(ctx.Context, p, containerID, account, blockAttributeKey, startIndex, endIndex, maxParallelSearches) + if err != nil { + return err + } + for _, id := range objIDs { + oidCh <- id + } + if err = waitForCompletion(len(objIDs), errCh, oidFetcherToProcessor); err != nil { + return err + } + // Check if there are any empty oid in the created index file. + // This could happen if searchObj returned not all objects, or we have duplicates. + // In this case, we need to retry the search. + for idx := range indexFileSize { + if _, exists := processedIndices.Load(int(idx)); !exists { + objIDs, err = searchObj(ctx.Context, p, containerID, account, blockAttributeKey, i*indexFileSize+idx, i*indexFileSize+idx+1, 1) + if err != nil { + return err } + processedIndices.Store(int(idx), objIDs[0]) + objIDs[0].Encode(buffer[idx*oidSize:]) } } // Check if there are any empty oids in the created index file. @@ -448,17 +447,80 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun *object.NewAttribute(attributeKey, strconv.Itoa(int(i))), *object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))), } - err := retry(func() error { + err = retry(func() error { return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled) }) if err != nil { return fmt.Errorf("failed to upload index file %d: %w", i, err) } fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i) + buffer = make([]byte, indexFileSize*oidSize) } return nil } +// searchObj searches in parallel for objects with attribute GE startIndex and LT endIndex. +func searchObj(ctx context.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches int) ([]oid.ID, error) { + var ( + res []oid.ID + mu sync.Mutex + wg sync.WaitGroup + errCh = make(chan error) + sem = make(chan struct{}, maxParallelSearches) + ) + + for j := int(startIndex); j < int(endIndex); j += searchBatchSize { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case err := <-errCh: + return nil, err + case sem <- struct{}{}: + } + + wg.Add(1) + go func(j int) { + defer wg.Done() + defer func() { <-sem }() + + remaining := int(endIndex) - j + end := j + min(searchBatchSize, remaining) + + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE) + filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT) + prm.SetFilters(filters) + + var objIDs []oid.ID + err := retry(func() error { + var errSearchIndex error + objIDs, errSearchIndex = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm) + return errSearchIndex + }) + if err != nil { + select { + case errCh <- fmt.Errorf("failed to search for objects from %d to %d: %w", j, end, err): + default: + } + return + } + + mu.Lock() + res = append(res, objIDs...) + mu.Unlock() + }(j) + } + wg.Wait() + + select { + case err := <-errCh: + return nil, err + default: + return res, nil + } +} + // uploadObj uploads the block to the container using the pool. func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, HomomorphicHashingDisabled bool) error { var (