Skip to content

Commit

Permalink
refactor(service): cleanup design and add comments
Browse files Browse the repository at this point in the history
also adds various fixes, tweeks, and rearchitects to prepare for testing
  • Loading branch information
hannahhoward committed Oct 2, 2024
1 parent ece7ebe commit ec89bbd
Show file tree
Hide file tree
Showing 7 changed files with 461 additions and 229 deletions.
45 changes: 2 additions & 43 deletions pkg/blobindex/multihashmap.go
Original file line number Diff line number Diff line change
@@ -1,52 +1,11 @@
package blobindex

import (
"iter"
"maps"

mh "github.com/multiformats/go-multihash"
"github.com/storacha-network/go-ucanto/core/iterable"
"github.com/storacha-network/indexing-service/pkg/internal/bytemap"
)

type multihashMap[T any] struct {
data map[string]T
}

// NewMultihashMap returns a new map of multihash to a data type
func NewMultihashMap[T any](sizeHint int) MultihashMap[T] {
var stringMap map[string]T
if sizeHint == -1 {
stringMap = make(map[string]T)
} else {
stringMap = make(map[string]T, sizeHint)
}
return &multihashMap[T]{stringMap}
}

func (mhm *multihashMap[T]) Get(mh mh.Multihash) T {
return mhm.data[string(mh)]
}

func (mhm *multihashMap[T]) Has(mh mh.Multihash) bool {
_, ok := mhm.data[string(mh)]
return ok
}

func (mhm *multihashMap[T]) Set(mh mh.Multihash, t T) {
mhm.data[string(mh)] = t
}

func (mhm *multihashMap[T]) Delete(mh mh.Multihash) bool {
_, ok := mhm.data[string(mh)]
delete(mhm.data, string(mh))
return ok
}

func (mhm *multihashMap[T]) Size() int {
return len(mhm.data)
}
func (mhm *multihashMap[T]) Iterator() iter.Seq2[mh.Multihash, T] {
return iterable.Map2(func(str string, t T) (mh.Multihash, T) {
return mh.Multihash(str), t
}, maps.All(mhm.data))
return bytemap.NewByteMap[mh.Multihash, T](sizeHint)
}
62 changes: 62 additions & 0 deletions pkg/internal/bytemap/bytemap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package bytemap

import (
"iter"
"maps"

"github.com/storacha-network/go-ucanto/core/iterable"
)

type byteMap[K ~[]byte, T any] struct {
data map[string]T
}

// ByteMap is a generic for mapping byte array like types to arbitrary data types
type ByteMap[K ~[]byte, T any] interface {
Get(K) T
Has(K) bool
Set(K, T)
Delete(K) bool
Size() int
Iterator() iter.Seq2[K, T]
}

// NewByteMap returns a new map of multihash to a data type
func NewByteMap[K ~[]byte, T any](sizeHint int) ByteMap[K, T] {
var stringMap map[string]T
if sizeHint == -1 {
stringMap = make(map[string]T)
} else {
stringMap = make(map[string]T, sizeHint)
}
return &byteMap[K, T]{stringMap}
}

func (bm *byteMap[K, T]) Get(b K) T {
return bm.data[string(b)]
}

func (bm *byteMap[K, T]) Has(b K) bool {
_, ok := bm.data[string(b)]
return ok
}

func (bm *byteMap[K, T]) Set(b K, t T) {
bm.data[string(b)] = t
}

func (bm *byteMap[K, T]) Delete(b K) bool {
_, ok := bm.data[string(b)]
delete(bm.data, string(b))
return ok
}

func (bm *byteMap[K, T]) Size() int {
return len(bm.data)
}

func (bm *byteMap[K, T]) Iterator() iter.Seq2[K, T] {
return iterable.Map2(func(str string, t T) (K, T) {
return K(str), t
}, maps.All(bm.data))
}
19 changes: 19 additions & 0 deletions pkg/internal/jobwalker/jobwalker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package jobwalker

import "context"

// WrappedState is a wrapper around any state to enable atomic access and modification
type WrappedState[State any] interface {
Access() State
Modify(func(State) State)
// CmpSwap calls the "willModify" function (potentially multiple times) and calls modify if it returns true
CmpSwap(willModify func(State) bool, modify func(State) State) bool
}

// JobHandler handles the specified job and uses the passed in function to spawn more
// jobs.
// The handler should stop processing if spawn errors, returning the error from spawn
type JobHandler[Job any, State any] func(ctx context.Context, j Job, spawn func(Job) error, state WrappedState[State]) error

// JobWalker processes a set of jobs that spawn other jobs, all while modifying a final state
type JobWalker[Job, State any] func(ctx context.Context, initial []Job, initialState State, handler JobHandler[Job, State]) (State, error)
141 changes: 141 additions & 0 deletions pkg/internal/jobwalker/parallelwalk/parallelwalk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package parallelwalk

import (
"context"
"errors"
"sync"

"github.com/storacha-network/indexing-service/pkg/internal/jobwalker"
)

type threadSafeState[State any] struct {
state State
lk sync.RWMutex
}

func (ts *threadSafeState[State]) Access() State {
ts.lk.RLock()
defer ts.lk.RUnlock()
return ts.state
}

func (ts *threadSafeState[State]) Modify(modify func(State) State) {
ts.lk.Lock()
defer ts.lk.Unlock()
ts.modify(modify)
}

func (ts *threadSafeState[State]) modify(modify func(State) State) {
ts.state = modify(ts.state)
}

func (ts *threadSafeState[State]) CmpSwap(willModify func(State) bool, modify func(State) State) bool {
if !willModify(ts.Access()) {
return false
}
ts.lk.Lock()
defer ts.lk.Unlock()
if !willModify(ts.state) {
return false
}
ts.modify(modify)
return true
}

// NewParallelWalk generates a function to handle a series of jobs that may spawn more jobs
// It will execute jobs in parallel, with the specified concurrency until all initial jobs
// and all spawned jobs (recursively) are handled, or a job errors
// This code is adapted from https://github.com/ipfs/go-merkledag/blob/master/merkledag.go#L464C6-L584
func NewParallelWalk[Job, State any](concurrency int) jobwalker.JobWalker[Job, State] {
return func(ctx context.Context, initial []Job, initialState State, handler jobwalker.JobHandler[Job, State]) (State, error) {
if len(initial) == 0 {
return initialState, errors.New("must provide at least one initial job")
}
jobFeed := make(chan Job)
spawnedJobs := make(chan Job)
jobFinishes := make(chan struct{})

state := &threadSafeState[State]{
state: initialState,
}
var wg sync.WaitGroup

errChan := make(chan error)

jobFeedCtx, cancel := context.WithCancel(ctx)

defer wg.Wait()
defer cancel()
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobFeed {

err := handler(jobFeedCtx, job, func(next Job) error {
select {
case spawnedJobs <- next:
return nil
case <-jobFeedCtx.Done():
return jobFeedCtx.Err()
}
}, state)

if err != nil {
select {
case errChan <- err:
case <-jobFeedCtx.Done():
}
return
}

select {
case jobFinishes <- struct{}{}:
case <-jobFeedCtx.Done():
}
}
}()
}
defer close(jobFeed)

jobProcessor := jobFeed
nextJob, queuedJobs := initial[0], initial[1:]

var inProgress int

hasNextJob := func() bool {
return jobProcessor != nil
}

for {
select {
case jobProcessor <- nextJob:
inProgress++
if len(queuedJobs) > 0 {
nextJob = queuedJobs[0]
queuedJobs = queuedJobs[1:]
} else {
var empty Job
nextJob = empty
jobProcessor = nil
}
case <-jobFinishes:
inProgress--
if inProgress == 0 && !hasNextJob() {
return state.Access(), nil
}
case queued := <-spawnedJobs:
if !hasNextJob() {
nextJob = queued
jobProcessor = jobFeed
} else {
queuedJobs = append(queuedJobs, queued)
}
case err := <-errChan:
return state.Access(), err
case <-ctx.Done():
return state.Access(), ctx.Err()
}
}
}
}
52 changes: 52 additions & 0 deletions pkg/internal/jobwalker/singlewalk/singlewalk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package singlewalk

import (
"context"

"github.com/storacha-network/indexing-service/pkg/internal/jobwalker"
)

type singleState[State any] struct {
m State
}

// Access implements jobwalker.WrappedState.
func (s *singleState[State]) Access() State {
return s.m
}

// CmpSwap implements jobwalker.WrappedState.
func (s *singleState[State]) CmpSwap(willModify func(State) bool, modify func(State) State) bool {
if !willModify(s.m) {
return false
}
s.m = modify(s.m)
return true
}

// Modify implements jobwalker.WrappedState.
func (s *singleState[State]) Modify(modify func(State) State) {
s.m = modify(s.m)
}

var _ jobwalker.WrappedState[any] = &singleState[any]{}

// SingleWalker processes jobs that span more jobs, sequentially depth first in a single thread
func SingleWalker[Job, State any](ctx context.Context, initial []Job, initialState State, handler jobwalker.JobHandler[Job, State]) (State, error) {
stack := initial
state := &singleState[State]{initialState}
for len(stack) > 0 {
select {
case <-ctx.Done():
return state.Access(), ctx.Err()
default:
}
next := initial[len(initial)-1]
initial = initial[:len(initial)-1]
handler(ctx, next, func(j Job) error {
stack = append(stack, j)
return nil
}, state)
}
return state.Access(), nil
}
Loading

0 comments on commit ec89bbd

Please sign in to comment.