diff --git a/pkg/blobindex/multihashmap.go b/pkg/blobindex/multihashmap.go index 7ef70f8..16698b2 100644 --- a/pkg/blobindex/multihashmap.go +++ b/pkg/blobindex/multihashmap.go @@ -1,52 +1,11 @@ package blobindex import ( - "iter" - "maps" - mh "github.com/multiformats/go-multihash" - "github.com/storacha-network/go-ucanto/core/iterable" + "github.com/storacha-network/indexing-service/pkg/internal/bytemap" ) -type multihashMap[T any] struct { - data map[string]T -} - // NewMultihashMap returns a new map of multihash to a data type func NewMultihashMap[T any](sizeHint int) MultihashMap[T] { - var stringMap map[string]T - if sizeHint == -1 { - stringMap = make(map[string]T) - } else { - stringMap = make(map[string]T, sizeHint) - } - return &multihashMap[T]{stringMap} -} - -func (mhm *multihashMap[T]) Get(mh mh.Multihash) T { - return mhm.data[string(mh)] -} - -func (mhm *multihashMap[T]) Has(mh mh.Multihash) bool { - _, ok := mhm.data[string(mh)] - return ok -} - -func (mhm *multihashMap[T]) Set(mh mh.Multihash, t T) { - mhm.data[string(mh)] = t -} - -func (mhm *multihashMap[T]) Delete(mh mh.Multihash) bool { - _, ok := mhm.data[string(mh)] - delete(mhm.data, string(mh)) - return ok -} - -func (mhm *multihashMap[T]) Size() int { - return len(mhm.data) -} -func (mhm *multihashMap[T]) Iterator() iter.Seq2[mh.Multihash, T] { - return iterable.Map2(func(str string, t T) (mh.Multihash, T) { - return mh.Multihash(str), t - }, maps.All(mhm.data)) + return bytemap.NewByteMap[mh.Multihash, T](sizeHint) } diff --git a/pkg/internal/bytemap/bytemap.go b/pkg/internal/bytemap/bytemap.go new file mode 100644 index 0000000..902a25d --- /dev/null +++ b/pkg/internal/bytemap/bytemap.go @@ -0,0 +1,62 @@ +package bytemap + +import ( + "iter" + "maps" + + "github.com/storacha-network/go-ucanto/core/iterable" +) + +type byteMap[K ~[]byte, T any] struct { + data map[string]T +} + +// ByteMap is a generic for mapping byte array like types to arbitrary data types +type ByteMap[K ~[]byte, T any] interface { + Get(K) T + Has(K) bool + Set(K, T) + Delete(K) bool + Size() int + Iterator() iter.Seq2[K, T] +} + +// NewByteMap returns a new map of multihash to a data type +func NewByteMap[K ~[]byte, T any](sizeHint int) ByteMap[K, T] { + var stringMap map[string]T + if sizeHint == -1 { + stringMap = make(map[string]T) + } else { + stringMap = make(map[string]T, sizeHint) + } + return &byteMap[K, T]{stringMap} +} + +func (bm *byteMap[K, T]) Get(b K) T { + return bm.data[string(b)] +} + +func (bm *byteMap[K, T]) Has(b K) bool { + _, ok := bm.data[string(b)] + return ok +} + +func (bm *byteMap[K, T]) Set(b K, t T) { + bm.data[string(b)] = t +} + +func (bm *byteMap[K, T]) Delete(b K) bool { + _, ok := bm.data[string(b)] + delete(bm.data, string(b)) + return ok +} + +func (bm *byteMap[K, T]) Size() int { + return len(bm.data) +} + +func (bm *byteMap[K, T]) Iterator() iter.Seq2[K, T] { + return iterable.Map2(func(str string, t T) (K, T) { + return K(str), t + }, maps.All(bm.data)) +} diff --git a/pkg/internal/jobwalker/jobwalker.go b/pkg/internal/jobwalker/jobwalker.go new file mode 100644 index 0000000..6bf1d85 --- /dev/null +++ b/pkg/internal/jobwalker/jobwalker.go @@ -0,0 +1,19 @@ +package jobwalker + +import "context" + +// WrappedState is a wrapper around any state to enable atomic access and modification +type WrappedState[State any] interface { + Access() State + Modify(func(State) State) + // CmpSwap calls the "willModify" function (potentially multiple times) and calls modify if it returns true + CmpSwap(willModify func(State) bool, modify func(State) State) bool +} + +// JobHandler handles the specified job and uses the passed in function to spawn more +// jobs. +// The handler should stop processing if spawn errors, returning the error from spawn +type JobHandler[Job any, State any] func(ctx context.Context, j Job, spawn func(Job) error, state WrappedState[State]) error + +// JobWalker processes a set of jobs that spawn other jobs, all while modifying a final state +type JobWalker[Job, State any] func(ctx context.Context, initial []Job, initialState State, handler JobHandler[Job, State]) (State, error) diff --git a/pkg/internal/jobwalker/parallelwalk/parallelwalk.go b/pkg/internal/jobwalker/parallelwalk/parallelwalk.go new file mode 100644 index 0000000..feabe9f --- /dev/null +++ b/pkg/internal/jobwalker/parallelwalk/parallelwalk.go @@ -0,0 +1,141 @@ +package parallelwalk + +import ( + "context" + "errors" + "sync" + + "github.com/storacha-network/indexing-service/pkg/internal/jobwalker" +) + +type threadSafeState[State any] struct { + state State + lk sync.RWMutex +} + +func (ts *threadSafeState[State]) Access() State { + ts.lk.RLock() + defer ts.lk.RUnlock() + return ts.state +} + +func (ts *threadSafeState[State]) Modify(modify func(State) State) { + ts.lk.Lock() + defer ts.lk.Unlock() + ts.modify(modify) +} + +func (ts *threadSafeState[State]) modify(modify func(State) State) { + ts.state = modify(ts.state) +} + +func (ts *threadSafeState[State]) CmpSwap(willModify func(State) bool, modify func(State) State) bool { + if !willModify(ts.Access()) { + return false + } + ts.lk.Lock() + defer ts.lk.Unlock() + if !willModify(ts.state) { + return false + } + ts.modify(modify) + return true +} + +// NewParallelWalk generates a function to handle a series of jobs that may spawn more jobs +// It will execute jobs in parallel, with the specified concurrency until all initial jobs +// and all spawned jobs (recursively) are handled, or a job errors +// This code is adapted from https://github.com/ipfs/go-merkledag/blob/master/merkledag.go#L464C6-L584 +func NewParallelWalk[Job, State any](concurrency int) jobwalker.JobWalker[Job, State] { + return func(ctx context.Context, initial []Job, initialState State, handler jobwalker.JobHandler[Job, State]) (State, error) { + if len(initial) == 0 { + return initialState, errors.New("must provide at least one initial job") + } + jobFeed := make(chan Job) + spawnedJobs := make(chan Job) + jobFinishes := make(chan struct{}) + + state := &threadSafeState[State]{ + state: initialState, + } + var wg sync.WaitGroup + + errChan := make(chan error) + + jobFeedCtx, cancel := context.WithCancel(ctx) + + defer wg.Wait() + defer cancel() + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for job := range jobFeed { + + err := handler(jobFeedCtx, job, func(next Job) error { + select { + case spawnedJobs <- next: + return nil + case <-jobFeedCtx.Done(): + return jobFeedCtx.Err() + } + }, state) + + if err != nil { + select { + case errChan <- err: + case <-jobFeedCtx.Done(): + } + return + } + + select { + case jobFinishes <- struct{}{}: + case <-jobFeedCtx.Done(): + } + } + }() + } + defer close(jobFeed) + + jobProcessor := jobFeed + nextJob, queuedJobs := initial[0], initial[1:] + + var inProgress int + + hasNextJob := func() bool { + return jobProcessor != nil + } + + for { + select { + case jobProcessor <- nextJob: + inProgress++ + if len(queuedJobs) > 0 { + nextJob = queuedJobs[0] + queuedJobs = queuedJobs[1:] + } else { + var empty Job + nextJob = empty + jobProcessor = nil + } + case <-jobFinishes: + inProgress-- + if inProgress == 0 && !hasNextJob() { + return state.Access(), nil + } + case queued := <-spawnedJobs: + if !hasNextJob() { + nextJob = queued + jobProcessor = jobFeed + } else { + queuedJobs = append(queuedJobs, queued) + } + case err := <-errChan: + return state.Access(), err + case <-ctx.Done(): + return state.Access(), ctx.Err() + } + } + } +} diff --git a/pkg/internal/jobwalker/singlewalk/singlewalk.go b/pkg/internal/jobwalker/singlewalk/singlewalk.go new file mode 100644 index 0000000..e7d000b --- /dev/null +++ b/pkg/internal/jobwalker/singlewalk/singlewalk.go @@ -0,0 +1,52 @@ +package singlewalk + +import ( + "context" + + "github.com/storacha-network/indexing-service/pkg/internal/jobwalker" +) + +type singleState[State any] struct { + m State +} + +// Access implements jobwalker.WrappedState. +func (s *singleState[State]) Access() State { + return s.m +} + +// CmpSwap implements jobwalker.WrappedState. +func (s *singleState[State]) CmpSwap(willModify func(State) bool, modify func(State) State) bool { + if !willModify(s.m) { + return false + } + s.m = modify(s.m) + return true +} + +// Modify implements jobwalker.WrappedState. +func (s *singleState[State]) Modify(modify func(State) State) { + s.m = modify(s.m) +} + +var _ jobwalker.WrappedState[any] = &singleState[any]{} + +// SingleWalker processes jobs that span more jobs, sequentially depth first in a single thread +func SingleWalker[Job, State any](ctx context.Context, initial []Job, initialState State, handler jobwalker.JobHandler[Job, State]) (State, error) { + stack := initial + state := &singleState[State]{initialState} + for len(stack) > 0 { + select { + case <-ctx.Done(): + return state.Access(), ctx.Err() + default: + } + next := initial[len(initial)-1] + initial = initial[:len(initial)-1] + handler(ctx, next, func(j Job) error { + stack = append(stack, j) + return nil + }, state) + } + return state.Access(), nil +} diff --git a/pkg/internal/parallelwalk/parallelwalk.go b/pkg/internal/parallelwalk/parallelwalk.go deleted file mode 100644 index 5292347..0000000 --- a/pkg/internal/parallelwalk/parallelwalk.go +++ /dev/null @@ -1,111 +0,0 @@ -package parallelwalk - -import ( - "context" - "errors" - "sync" -) - -// JobHandler handles the specified job and uses the passed in function to spawn more -// jobs. -// The handler should stop processing if spawn errors, returning the error from spawn -type JobHandler[Job any, State any] func(ctx context.Context, j Job, spawn func(Job) error, stateModifier func(func(State) State)) error - -// ParallelWalk is a function to handle a series of jobs that may spawn more jobs -// It will execute jobs in parallel, with the specified concurrency until all initial jobs -// and all spawned jobs (recursively) are handled, or a job errors -// This code is adapted from https://github.com/ipfs/go-merkledag/blob/master/merkledag.go#L464C6-L584 -func ParallelWalk[Job, State any](ctx context.Context, initial []Job, state State, handler JobHandler[Job, State], concurrency int) (State, error) { - if len(initial) == 0 { - return state, errors.New("must provide at least one initial job") - } - jobFeed := make(chan Job) - spawnedJobs := make(chan Job) - jobFinishes := make(chan struct{}) - - var stateLk sync.Mutex - - var wg sync.WaitGroup - - errChan := make(chan error) - - jobFeedCtx, cancel := context.WithCancel(ctx) - - defer wg.Wait() - defer cancel() - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for job := range jobFeed { - - err := handler(jobFeedCtx, job, func(next Job) error { - select { - case spawnedJobs <- next: - return nil - case <-jobFeedCtx.Done(): - return jobFeedCtx.Err() - } - }, func(modifyState func(State) State) { - stateLk.Lock() - state = modifyState(state) - stateLk.Unlock() - }) - - if err != nil { - select { - case errChan <- err: - case <-jobFeedCtx.Done(): - } - return - } - - select { - case jobFinishes <- struct{}{}: - case <-jobFeedCtx.Done(): - } - } - }() - } - defer close(jobFeed) - - jobProcessor := jobFeed - nextJob, queuedJobs := initial[0], initial[1:] - - var inProgress int - - hasNextJob := func() bool { - return jobProcessor != nil - } - - for { - select { - case jobProcessor <- nextJob: - inProgress++ - if len(queuedJobs) > 0 { - nextJob = queuedJobs[0] - queuedJobs = queuedJobs[1:] - } else { - var empty Job - nextJob = empty - jobProcessor = nil - } - case <-jobFinishes: - inProgress-- - if inProgress == 0 && !hasNextJob() { - return state, nil - } - case queued := <-spawnedJobs: - if !hasNextJob() { - nextJob = queued - jobProcessor = jobFeed - } else { - queuedJobs = append(queuedJobs, queued) - } - case err := <-errChan: - return state, err - case <-ctx.Done(): - return state, ctx.Err() - } - } -} diff --git a/pkg/service/service.go b/pkg/service/service.go index 160222d..81d21e6 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -13,7 +13,10 @@ import ( "github.com/storacha-network/go-ucanto/core/delegation" "github.com/storacha-network/go-ucanto/did" "github.com/storacha-network/indexing-service/pkg/blobindex" - "github.com/storacha-network/indexing-service/pkg/internal/parallelwalk" + "github.com/storacha-network/indexing-service/pkg/internal/bytemap" + "github.com/storacha-network/indexing-service/pkg/internal/jobwalker" + "github.com/storacha-network/indexing-service/pkg/internal/jobwalker/parallelwalk" + "github.com/storacha-network/indexing-service/pkg/internal/jobwalker/singlewalk" "github.com/storacha-network/indexing-service/pkg/metadata" "github.com/storacha-network/indexing-service/pkg/service/providerindex" "github.com/storacha-network/indexing-service/pkg/types" @@ -21,18 +24,21 @@ import ( const defaultConcurrency = 5 +// Match narrows parameters for locating providers/claims for a set of multihashes type Match struct { Subject []did.DID } +// Query is a query for several multihashes type Query struct { Hashes []multihash.Multihash Match Match } +// QueryResult describes the found claims and indexes for a given query type QueryResult struct { - Claims []delegation.Delegation - Indexes []blobindex.ShardedDagIndexView + Claims map[cid.Cid]delegation.Delegation + Indexes bytemap.ByteMap[types.EncodedContextID, blobindex.ShardedDagIndexView] } // ProviderIndex is a read/write interface to a local cache of providers that falls back to IPNI @@ -71,100 +77,155 @@ type BlobIndexLookup interface { Find(ctx context.Context, contextID types.EncodedContextID, fetchURL url.URL, rng *metadata.Range) (blobindex.ShardedDagIndexView, error) } +// IndexingService implements read/write logic for indexing data with IPNI, content claims, sharded dag indexes, and a cache layer type IndexingService struct { blobIndexLookup BlobIndexLookup claimLookup ClaimLookup providerIndex ProviderIndex + jobWalker jobwalker.JobWalker[job, queryState] } type job struct { - mh multihash.Multihash - isIndex bool - targetClaims []multicodec.Code + mh multihash.Multihash + indexForMh *multihash.Multihash + jobType jobType } -var allClaims = []multicodec.Code{metadata.EqualsClaimID, metadata.IndexClaimID, metadata.LocationCommitmentID} -var justLocation = []multicodec.Code{metadata.LocationCommitmentID} -var equalsOrLocation = []multicodec.Code{metadata.IndexClaimID, metadata.LocationCommitmentID} +type jobKey string -// Query returns back relevant content claims for the given query using the following steps -// 1. Query the IPNIIndex for all matching records -// 2. For any index records, query the IPNIIndex for any location claims for that index cid -// 3. For any index claims, query the IPNIIndex for location claims for the index cid -// 4. Query the BlobIndexLookup to get the full ShardedDagIndex for any index claims -// 5. Query IPNIIndex for any location claims for any shards that contain the multihash based on the ShardedDagIndex -// 6. Read the requisite claims from the ClaimLookup -// 7. Return all discovered claims and sharded dag indexes -func (is *IndexingService) Query(ctx context.Context, q Query) (QueryResult, error) { - initialJobs := make([]job, 0, len(q.Hashes)) - for _, mh := range q.Hashes { - initialJobs = append(initialJobs, job{mh, false, allClaims}) +func (j job) key() jobKey { + k := jobKey(j.mh) + jobKey(j.jobType) + if j.indexForMh != nil { + k += jobKey(*j.indexForMh) + } + return k +} + +type jobType string + +const standardJobType jobType = "standard" +const locationJobType jobType = "location" +const equalsOrLocationJobType jobType = "equals_or_location" + +var targetClaims = map[jobType][]multicodec.Code{ + standardJobType: {metadata.EqualsClaimID, metadata.IndexClaimID, metadata.LocationCommitmentID}, + locationJobType: {metadata.LocationCommitmentID}, + equalsOrLocationJobType: {metadata.IndexClaimID, metadata.LocationCommitmentID}, +} + +type queryState struct { + q *Query + qr *QueryResult + visits map[jobKey]struct{} +} + +func (is *IndexingService) jobHandler(mhCtx context.Context, j job, spawn func(job) error, state jobwalker.WrappedState[queryState]) error { + + // check if node has already been visited and ignore if that is the case + if !state.CmpSwap(func(qs queryState) bool { + _, ok := qs.visits[j.key()] + return !ok + }, func(qs queryState) queryState { + qs.visits[j.key()] = struct{}{} + return qs + }) { + return nil } - return parallelwalk.ParallelWalk(ctx, initialJobs, QueryResult{}, func(mhCtx context.Context, j job, spawn func(job) error, stateModifier func(func(QueryResult) QueryResult)) error { - results, err := is.providerIndex.Find(mhCtx, providerindex.QueryKey{ - Hash: j.mh, - Spaces: q.Match.Subject, - TargetClaims: j.targetClaims, - }) + + // find provider records related to this multihash + results, err := is.providerIndex.Find(mhCtx, providerindex.QueryKey{ + Hash: j.mh, + Spaces: state.Access().q.Match.Subject, + TargetClaims: targetClaims[j.jobType], + }) + if err != nil { + return err + } + for _, result := range results { + // unmarshall metadata for this provider + md := metadata.MetadataContext.New() + err = md.UnmarshalBinary(result.Metadata) if err != nil { return err } - for _, result := range results { - md := metadata.MetadataContext.New() - err = md.UnmarshalBinary(result.Metadata) + // the provider may list one or more protocols for this CID + // in our case, the protocols are just differnt types of content claims + for _, code := range md.Protocols() { + protocol := md.Get(code) + // make sure this is some kind of claim protocol, ignore if not + hasClaimCid, ok := protocol.(metadata.HasClaimCid) + if !ok { + continue + } + // fetch (from cache or url) the actual content claim + claimCid := hasClaimCid.GetClaimCid() + url := is.fetchClaimUrl(*result.Provider, claimCid) + claim, err := is.claimLookup.LookupClaim(mhCtx, claimCid, url) if err != nil { return err } - for _, code := range md.Protocols() { - protocol := md.Get(code) - if hasClaimCid, ok := protocol.(metadata.HasClaimCid); ok { - claimCid := hasClaimCid.GetClaimCid() - url := is.fetchClaimUrl(*result.Provider, claimCid) - claim, err := is.claimLookup.LookupClaim(mhCtx, claimCid, url) - if err != nil { + // add the fetched claim to the results, if we don't already have it + state.CmpSwap( + func(qs queryState) bool { + _, ok := qs.qr.Claims[claimCid] + return !ok + }, + func(qs queryState) queryState { + qs.qr.Claims[claimCid] = claim + return qs + }) + + // handle each type of protocol + switch typedProtocol := protocol.(type) { + case *metadata.EqualsClaimMetadata: + // for an equals claim, it's published on both the content and equals multihashes + // we follow with a query for location claim on the OTHER side of the multihash + if string(typedProtocol.Equals.Hash()) != string(j.mh) { + // lookup was the content hash, queue the equals hash + if err := spawn(job{typedProtocol.Equals.Hash(), nil, locationJobType}); err != nil { + return err + } + } else { + // lookup was the equals hash, queue the content hash + if err := spawn(job{multihash.Multihash(result.ContextID), nil, locationJobType}); err != nil { return err } - stateModifier(func(qr QueryResult) QueryResult { - qr.Claims = append(qr.Claims, claim) - return qr - }) } - switch typedProtocol := protocol.(type) { - case *metadata.EqualsClaimMetadata: - if string(typedProtocol.Equals.Hash()) != string(j.mh) { - // lookup was the content hash, queue the equals hash - if err := spawn(job{typedProtocol.Equals.Hash(), false, justLocation}); err != nil { - return err - } - } else { - // lookup was the equals hash, queue the content hash - if err := spawn(job{multihash.Multihash(result.ContextID), false, justLocation}); err != nil { - return err - } + case *metadata.IndexClaimMetadata: + // for an index claim, we follow by looking for a location claim for the index, and fetching the index + mh := j.mh + if err := spawn(job{typedProtocol.Index.Hash(), &mh, equalsOrLocationJobType}); err != nil { + return err + } + case *metadata.LocationCommitmentMetadata: + // for a location claim, we just store it, unless its for an index CID, in which case get the full idnex + if j.indexForMh != nil { + // fetch (from URL or cache) the full index + shard := typedProtocol.Shard + if shard == nil { + c := cid.NewCidV1(cid.Raw, j.mh) + shard = &c } - case *metadata.IndexClaimMetadata: - if err := spawn(job{typedProtocol.Index.Hash(), true, justLocation}); err != nil { + url := is.fetchRetrievalUrl(*result.Provider, *shard) + index, err := is.blobIndexLookup.Find(mhCtx, result.ContextID, url, typedProtocol.Range) + if err != nil { return err } - case *metadata.LocationCommitmentMetadata: - if j.isIndex { - shard := typedProtocol.Shard - if shard == nil { - c := cid.NewCidV1(cid.Raw, j.mh) - shard = &c - } - url := is.fetchRetrievalUrl(*result.Provider, *shard) - index, err := is.blobIndexLookup.Find(mhCtx, result.ContextID, url, typedProtocol.Range) - if err != nil { - return err - } - stateModifier(func(qr QueryResult) QueryResult { - qr.Indexes = append(qr.Indexes, index) - return qr + // Add the index to the query results, if we don't already have it + state.CmpSwap( + func(qs queryState) bool { + return !qs.qr.Indexes.Has(result.ContextID) + }, + func(qs queryState) queryState { + qs.qr.Indexes.Set(result.ContextID, index) + return qs }) - shards := index.Shards().Iterator() - for shard := range shards { - if err := spawn(job{shard, false, equalsOrLocation}); err != nil { + + // add location queries for all shards containing the original CID we're seeing an index for + shards := index.Shards().Iterator() + for shard, index := range shards { + if index.Has(*j.indexForMh) { + if err := spawn(job{shard, nil, equalsOrLocationJobType}); err != nil { return err } } @@ -172,8 +233,32 @@ func (is *IndexingService) Query(ctx context.Context, q Query) (QueryResult, err } } } - return nil - }, defaultConcurrency) + } + return nil +} + +// Query returns back relevant content claims for the given query using the following steps +// 1. Query the IPNIIndex for all matching records +// 2. For any index records, query the IPNIIndex for any location claims for that index cid +// 3. For any index claims, query the IPNIIndex for location claims for the index cid +// 4. Query the BlobIndexLookup to get the full ShardedDagIndex for any index claims +// 5. Query IPNIIndex for any location claims for any shards that contain the multihash based on the ShardedDagIndex +// 6. Read the requisite claims from the ClaimLookup +// 7. Return all discovered claims and sharded dag indexes +func (is *IndexingService) Query(ctx context.Context, q Query) (QueryResult, error) { + initialJobs := make([]job, 0, len(q.Hashes)) + for _, mh := range q.Hashes { + initialJobs = append(initialJobs, job{mh, nil, standardJobType}) + } + qs, err := is.jobWalker(ctx, initialJobs, queryState{ + q: &q, + qr: &QueryResult{ + Claims: make(map[cid.Cid]delegation.Delegation), + Indexes: bytemap.NewByteMap[types.EncodedContextID, blobindex.ShardedDagIndexView](-1), + }, + visits: map[jobKey]struct{}{}, + }, is.jobHandler) + return *qs.qr, err } func (is *IndexingService) fetchClaimUrl(provider peer.AddrInfo, claimCid cid.Cid) url.URL { @@ -195,6 +280,7 @@ func (is *IndexingService) CacheClaim(ctx context.Context, claim delegation.Dele return errors.New("not implemented") } +// PublishClaim caches and publishes a content claim // I imagine publish claim to work as follows // For all claims except index, just use the publish API on IPNIIndex // For index claims, let's assume they fail if a location claim for the index car cid is not already published @@ -203,3 +289,27 @@ func (is *IndexingService) CacheClaim(ctx context.Context, claim delegation.Dele func (is *IndexingService) PublishClaim(ctx context.Context, claim delegation.Delegation) error { return errors.New("not implemented") } + +// Option configures an IndexingService +type Option func(is *IndexingService) + +// WithConcurrency causes the indexing service to process find queries parallel, with the given concurrency +func WithConcurrency(concurrency int) Option { + return func(is *IndexingService) { + is.jobWalker = parallelwalk.NewParallelWalk[job, queryState](concurrency) + } +} + +// NewIndexingService returns a new indexing service +func NewIndexingService(blobIndexLookup BlobIndexLookup, claimLookup ClaimLookup, providerIndex ProviderIndex, options ...Option) *IndexingService { + is := &IndexingService{ + blobIndexLookup: blobIndexLookup, + claimLookup: claimLookup, + providerIndex: providerIndex, + jobWalker: singlewalk.SingleWalker[job, queryState], + } + for _, option := range options { + option(is) + } + return is +}