-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(service): cleanup design and add comments
also adds various fixes, tweeks, and rearchitects to prepare for testing
- Loading branch information
1 parent
e2621dc
commit 8573141
Showing
7 changed files
with
461 additions
and
229 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,52 +1,11 @@ | ||
package blobindex | ||
|
||
import ( | ||
"iter" | ||
"maps" | ||
|
||
mh "github.com/multiformats/go-multihash" | ||
"github.com/storacha-network/go-ucanto/core/iterable" | ||
"github.com/storacha-network/indexing-service/pkg/internal/bytemap" | ||
) | ||
|
||
type multihashMap[T any] struct { | ||
data map[string]T | ||
} | ||
|
||
// NewMultihashMap returns a new map of multihash to a data type | ||
func NewMultihashMap[T any](sizeHint int) MultihashMap[T] { | ||
var stringMap map[string]T | ||
if sizeHint == -1 { | ||
stringMap = make(map[string]T) | ||
} else { | ||
stringMap = make(map[string]T, sizeHint) | ||
} | ||
return &multihashMap[T]{stringMap} | ||
} | ||
|
||
func (mhm *multihashMap[T]) Get(mh mh.Multihash) T { | ||
return mhm.data[string(mh)] | ||
} | ||
|
||
func (mhm *multihashMap[T]) Has(mh mh.Multihash) bool { | ||
_, ok := mhm.data[string(mh)] | ||
return ok | ||
} | ||
|
||
func (mhm *multihashMap[T]) Set(mh mh.Multihash, t T) { | ||
mhm.data[string(mh)] = t | ||
} | ||
|
||
func (mhm *multihashMap[T]) Delete(mh mh.Multihash) bool { | ||
_, ok := mhm.data[string(mh)] | ||
delete(mhm.data, string(mh)) | ||
return ok | ||
} | ||
|
||
func (mhm *multihashMap[T]) Size() int { | ||
return len(mhm.data) | ||
} | ||
func (mhm *multihashMap[T]) Iterator() iter.Seq2[mh.Multihash, T] { | ||
return iterable.Map2(func(str string, t T) (mh.Multihash, T) { | ||
return mh.Multihash(str), t | ||
}, maps.All(mhm.data)) | ||
return bytemap.NewByteMap[mh.Multihash, T](sizeHint) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package bytemap | ||
|
||
import ( | ||
"iter" | ||
"maps" | ||
|
||
"github.com/storacha-network/go-ucanto/core/iterable" | ||
) | ||
|
||
type byteMap[K ~[]byte, T any] struct { | ||
data map[string]T | ||
} | ||
|
||
// ByteMap is a generic for mapping byte array like types to arbitrary data types | ||
type ByteMap[K ~[]byte, T any] interface { | ||
Get(K) T | ||
Has(K) bool | ||
Set(K, T) | ||
Delete(K) bool | ||
Size() int | ||
Iterator() iter.Seq2[K, T] | ||
} | ||
|
||
// NewByteMap returns a new map of multihash to a data type | ||
func NewByteMap[K ~[]byte, T any](sizeHint int) ByteMap[K, T] { | ||
var stringMap map[string]T | ||
if sizeHint == -1 { | ||
stringMap = make(map[string]T) | ||
} else { | ||
stringMap = make(map[string]T, sizeHint) | ||
} | ||
return &byteMap[K, T]{stringMap} | ||
} | ||
|
||
func (bm *byteMap[K, T]) Get(b K) T { | ||
return bm.data[string(b)] | ||
} | ||
|
||
func (bm *byteMap[K, T]) Has(b K) bool { | ||
_, ok := bm.data[string(b)] | ||
return ok | ||
} | ||
|
||
func (bm *byteMap[K, T]) Set(b K, t T) { | ||
bm.data[string(b)] = t | ||
} | ||
|
||
func (bm *byteMap[K, T]) Delete(b K) bool { | ||
_, ok := bm.data[string(b)] | ||
delete(bm.data, string(b)) | ||
return ok | ||
} | ||
|
||
func (bm *byteMap[K, T]) Size() int { | ||
return len(bm.data) | ||
} | ||
|
||
func (bm *byteMap[K, T]) Iterator() iter.Seq2[K, T] { | ||
return iterable.Map2(func(str string, t T) (K, T) { | ||
return K(str), t | ||
}, maps.All(bm.data)) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package jobwalker | ||
|
||
import "context" | ||
|
||
// WrappedState is a wrapper around any state to enable atomic access and modification | ||
type WrappedState[State any] interface { | ||
Access() State | ||
Modify(func(State) State) | ||
// CmpSwap calls the "willModify" function (potentially multiple times) and calls modify if it returns true | ||
CmpSwap(willModify func(State) bool, modify func(State) State) bool | ||
} | ||
|
||
// JobHandler handles the specified job and uses the passed in function to spawn more | ||
// jobs. | ||
// The handler should stop processing if spawn errors, returning the error from spawn | ||
type JobHandler[Job any, State any] func(ctx context.Context, j Job, spawn func(Job) error, state WrappedState[State]) error | ||
|
||
// JobWalker processes a set of jobs that spawn other jobs, all while modifying a final state | ||
type JobWalker[Job, State any] func(ctx context.Context, initial []Job, initialState State, handler JobHandler[Job, State]) (State, error) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
package parallelwalk | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"sync" | ||
|
||
"github.com/storacha-network/indexing-service/pkg/internal/jobwalker" | ||
) | ||
|
||
type threadSafeState[State any] struct { | ||
state State | ||
lk sync.RWMutex | ||
} | ||
|
||
func (ts *threadSafeState[State]) Access() State { | ||
ts.lk.RLock() | ||
defer ts.lk.RUnlock() | ||
return ts.state | ||
} | ||
|
||
func (ts *threadSafeState[State]) Modify(modify func(State) State) { | ||
ts.lk.Lock() | ||
defer ts.lk.Unlock() | ||
ts.modify(modify) | ||
} | ||
|
||
func (ts *threadSafeState[State]) modify(modify func(State) State) { | ||
ts.state = modify(ts.state) | ||
} | ||
|
||
func (ts *threadSafeState[State]) CmpSwap(willModify func(State) bool, modify func(State) State) bool { | ||
if !willModify(ts.Access()) { | ||
return false | ||
} | ||
ts.lk.Lock() | ||
defer ts.lk.Unlock() | ||
if !willModify(ts.state) { | ||
return false | ||
} | ||
ts.modify(modify) | ||
return true | ||
} | ||
|
||
// NewParallelWalk generates a function to handle a series of jobs that may spawn more jobs | ||
// It will execute jobs in parallel, with the specified concurrency until all initial jobs | ||
// and all spawned jobs (recursively) are handled, or a job errors | ||
// This code is adapted from https://github.com/ipfs/go-merkledag/blob/master/merkledag.go#L464C6-L584 | ||
func NewParallelWalk[Job, State any](concurrency int) jobwalker.JobWalker[Job, State] { | ||
return func(ctx context.Context, initial []Job, initialState State, handler jobwalker.JobHandler[Job, State]) (State, error) { | ||
if len(initial) == 0 { | ||
return initialState, errors.New("must provide at least one initial job") | ||
} | ||
jobFeed := make(chan Job) | ||
spawnedJobs := make(chan Job) | ||
jobFinishes := make(chan struct{}) | ||
|
||
state := &threadSafeState[State]{ | ||
state: initialState, | ||
} | ||
var wg sync.WaitGroup | ||
|
||
errChan := make(chan error) | ||
|
||
jobFeedCtx, cancel := context.WithCancel(ctx) | ||
|
||
defer wg.Wait() | ||
defer cancel() | ||
for i := 0; i < concurrency; i++ { | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
for job := range jobFeed { | ||
|
||
err := handler(jobFeedCtx, job, func(next Job) error { | ||
select { | ||
case spawnedJobs <- next: | ||
return nil | ||
case <-jobFeedCtx.Done(): | ||
return jobFeedCtx.Err() | ||
} | ||
}, state) | ||
|
||
if err != nil { | ||
select { | ||
case errChan <- err: | ||
case <-jobFeedCtx.Done(): | ||
} | ||
return | ||
} | ||
|
||
select { | ||
case jobFinishes <- struct{}{}: | ||
case <-jobFeedCtx.Done(): | ||
} | ||
} | ||
}() | ||
} | ||
defer close(jobFeed) | ||
|
||
jobProcessor := jobFeed | ||
nextJob, queuedJobs := initial[0], initial[1:] | ||
|
||
var inProgress int | ||
|
||
hasNextJob := func() bool { | ||
return jobProcessor != nil | ||
} | ||
|
||
for { | ||
select { | ||
case jobProcessor <- nextJob: | ||
inProgress++ | ||
if len(queuedJobs) > 0 { | ||
nextJob = queuedJobs[0] | ||
queuedJobs = queuedJobs[1:] | ||
} else { | ||
var empty Job | ||
nextJob = empty | ||
jobProcessor = nil | ||
} | ||
case <-jobFinishes: | ||
inProgress-- | ||
if inProgress == 0 && !hasNextJob() { | ||
return state.Access(), nil | ||
} | ||
case queued := <-spawnedJobs: | ||
if !hasNextJob() { | ||
nextJob = queued | ||
jobProcessor = jobFeed | ||
} else { | ||
queuedJobs = append(queuedJobs, queued) | ||
} | ||
case err := <-errChan: | ||
return state.Access(), err | ||
case <-ctx.Done(): | ||
return state.Access(), ctx.Err() | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package singlewalk | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/storacha-network/indexing-service/pkg/internal/jobwalker" | ||
) | ||
|
||
type singleState[State any] struct { | ||
m State | ||
} | ||
|
||
// Access implements jobwalker.WrappedState. | ||
func (s *singleState[State]) Access() State { | ||
return s.m | ||
} | ||
|
||
// CmpSwap implements jobwalker.WrappedState. | ||
func (s *singleState[State]) CmpSwap(willModify func(State) bool, modify func(State) State) bool { | ||
if !willModify(s.m) { | ||
return false | ||
} | ||
s.m = modify(s.m) | ||
return true | ||
} | ||
|
||
// Modify implements jobwalker.WrappedState. | ||
func (s *singleState[State]) Modify(modify func(State) State) { | ||
s.m = modify(s.m) | ||
} | ||
|
||
var _ jobwalker.WrappedState[any] = &singleState[any]{} | ||
|
||
// SingleWalker processes jobs that span more jobs, sequentially depth first in a single thread | ||
func SingleWalker[Job, State any](ctx context.Context, initial []Job, initialState State, handler jobwalker.JobHandler[Job, State]) (State, error) { | ||
stack := initial | ||
state := &singleState[State]{initialState} | ||
for len(stack) > 0 { | ||
select { | ||
case <-ctx.Done(): | ||
return state.Access(), ctx.Err() | ||
default: | ||
} | ||
next := initial[len(initial)-1] | ||
initial = initial[:len(initial)-1] | ||
handler(ctx, next, func(j Job) error { | ||
stack = append(stack, j) | ||
return nil | ||
}, state) | ||
} | ||
return state.Access(), nil | ||
} |
Oops, something went wrong.