Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Optional async decoupling for secondary writes and reworked E2E metric assertions #182

Merged
merged 20 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ced2a95
chore: Better abstract secondary storage
epociask Oct 12, 2024
ab6b939
chore: Better abstract secondary storage - add channel stream for sec…
epociask Oct 12, 2024
a598791
chore: Better abstract secondary storage - add channel stream for sec…
epociask Oct 12, 2024
3c3271d
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 12, 2024
bb9b433
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 13, 2024
4b9b0e2
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 13, 2024
13f221b
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 13, 2024
95790f2
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 13, 2024
8ef8108
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 16, 2024
2fce490
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 17, 2024
89f8272
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 17, 2024
167df0e
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 17, 2024
ab57599
chore: Better abstract secondary storage - address PR feedback, add b…
epociask Oct 20, 2024
2bb21b6
chore: Better abstract secondary storage - refactor tests
epociask Oct 20, 2024
0c3a48c
chore: Better abstract secondary storage - more test clean ups
epociask Oct 20, 2024
346e47c
Merge branch 'main' of github.com:Layr-Labs/op-plasma-eigenda into ep…
epociask Oct 20, 2024
1d858c7
Merge branch 'main' of github.com:Layr-Labs/op-plasma-eigenda into ep…
epociask Oct 21, 2024
69187fb
Merge branch 'main' of github.com:Layr-Labs/op-plasma-eigenda into ep…
epociask Oct 21, 2024
9959633
chore: Better abstract secondary storage - update go mock ref
epociask Oct 24, 2024
2d3559f
chore: Better abstract secondary storage - address PR feedback
epociask Oct 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ install-lint:
@echo "Installing golangci-lint..."
@sh -c $(GET_LINT_CMD)

gosec:
@echo "Running security scan with gosec..."
gosec ./...

submodules:
git submodule update --init --recursive

Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,11 @@ The `raw commitment` is an RLP-encoded [EigenDA certificate](https://github.com/

### Unit

Unit tests can be ran via invoking `make test`.
Unit tests can be ran via invoking `make test`. Please make sure to have all test containers downloaded locally before running via:
```
docker pull redis
docker pull minio
```
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this really needed? I would think testcontainer would still pull the images when attempting to run them if they are not present locally?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


### Holesky

Expand Down
16 changes: 14 additions & 2 deletions server/load_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,22 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store.
return nil, err
}

// determine read fallbacks
// create secondary storage router
fallbacks := populateTargets(cfg.EigenDAConfig.FallbackTargets, s3Store, redisStore)
caches := populateTargets(cfg.EigenDAConfig.CacheTargets, s3Store, redisStore)
secondary, err := store.NewSecondaryRouter(log, caches, fallbacks)
if err != nil {
return nil, err
samlaf marked this conversation as resolved.
Show resolved Hide resolved
}

if secondary.Enabled() { // only spin-up go routines if secondary storage is enabled
log.Debug("Starting secondary stream processing routines")

for i := 0; i < 10; i++ {
go secondary.StreamProcess(ctx)
}
samlaf marked this conversation as resolved.
Show resolved Hide resolved
}

log.Info("Creating storage router", "eigenda backend type", eigenDA != nil, "s3 backend type", s3Store != nil)
return store.NewRouter(eigenDA, s3Store, log, caches, fallbacks)
return store.NewRouter(eigenDA, s3Store, log, secondary)
}
134 changes: 22 additions & 112 deletions store/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
"context"
"errors"
"fmt"
"sync"

"github.com/Layr-Labs/eigenda-proxy/commitments"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
)

Expand All @@ -25,27 +23,22 @@

// Router ... storage backend routing layer
type Router struct {
log log.Logger
eigenda GeneratedKeyStore
s3 PrecomputedKeyStore
log log.Logger
// primary storage backends
eigenda GeneratedKeyStore // ALT DA commitment type for OP mode && simple commitment mode for standard /client
s3 PrecomputedKeyStore // OP commitment mode && keccak256 commitment type
samlaf marked this conversation as resolved.
Show resolved Hide resolved

caches []PrecomputedKeyStore
cacheLock sync.RWMutex

fallbacks []PrecomputedKeyStore
fallbackLock sync.RWMutex
// secondary storage backends (caching and fallbacks)
secondary ISecondary
}

func NewRouter(eigenda GeneratedKeyStore, s3 PrecomputedKeyStore, l log.Logger,
caches []PrecomputedKeyStore, fallbacks []PrecomputedKeyStore) (IRouter, error) {
secondary ISecondary) (IRouter, error) {
return &Router{
log: l,
eigenda: eigenda,
s3: s3,
caches: caches,
cacheLock: sync.RWMutex{},
fallbacks: fallbacks,
fallbackLock: sync.RWMutex{},
log: l,
eigenda: eigenda,
s3: s3,
secondary: secondary,
}, nil
}

Expand Down Expand Up @@ -76,9 +69,9 @@
}

// 1 - read blob from cache if enabled
if r.cacheEnabled() {
if r.secondary.CachingEnabled() {
r.log.Debug("Retrieving data from cached backends")
data, err := r.multiSourceRead(ctx, key, false)
data, err := r.secondary.MultiSourceRead(ctx, key, false, r.eigenda.Verify)
if err == nil {
return data, nil
}
Expand All @@ -98,8 +91,8 @@
}

// 3 - read blob from fallbacks if enabled and data is non-retrievable from EigenDA
if r.fallbackEnabled() {
data, err = r.multiSourceRead(ctx, key, true)
if r.secondary.FallbackEnabled() {
data, err = r.secondary.MultiSourceRead(ctx, key, true, r.eigenda.Verify)
samlaf marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
r.log.Error("Failed to read from fallback targets", "err", err)
return nil, err
Expand Down Expand Up @@ -133,90 +126,15 @@
return nil, err
}

if r.cacheEnabled() || r.fallbackEnabled() {
err = r.handleRedundantWrites(ctx, commit, value)
if err != nil {
log.Error("Failed to write to redundant backends", "err", err)
}
}

return commit, nil
}

// handleRedundantWrites ... writes to both sets of backends (i.e, fallback, cache)
// and returns an error if NONE of them succeed
// NOTE: multi-target set writes are done at once to avoid re-invocation of the same write function at the same
// caller step for different target sets vs. reading which is done conditionally to segment between a cached read type
// vs a fallback read type
func (r *Router) handleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error {
r.cacheLock.RLock()
r.fallbackLock.RLock()

defer func() {
r.cacheLock.RUnlock()
r.fallbackLock.RUnlock()
}()
if r.secondary.Enabled() {

Check failure on line 129 in store/router.go

View workflow job for this annotation

GitHub Actions / Linter

unnecessary leading newline (whitespace)

sources := r.caches
sources = append(sources, r.fallbacks...)

key := crypto.Keccak256(commitment)
successes := 0

for _, src := range sources {
err := src.Put(ctx, key, value)
if err != nil {
r.log.Warn("Failed to write to redundant target", "backend", src.BackendType(), "err", err)
} else {
successes++
r.secondary.Ingress() <- PutNotif{
Commitment: commit,
Value: value,
}
}

if successes == 0 {
return errors.New("failed to write blob to any redundant targets")
}

return nil
}

// multiSourceRead ... reads from a set of backends and returns the first successfully read blob
func (r *Router) multiSourceRead(ctx context.Context, commitment []byte, fallback bool) ([]byte, error) {
var sources []PrecomputedKeyStore
if fallback {
r.fallbackLock.RLock()
defer r.fallbackLock.RUnlock()

sources = r.fallbacks
} else {
r.cacheLock.RLock()
defer r.cacheLock.RUnlock()

sources = r.caches
}

key := crypto.Keccak256(commitment)
for _, src := range sources {
data, err := src.Get(ctx, key)
if err != nil {
r.log.Warn("Failed to read from redundant target", "backend", src.BackendType(), "err", err)
continue
}

if data == nil {
r.log.Debug("No data found in redundant target", "backend", src.BackendType())
continue
}

// verify cert:data using EigenDA verification checks
err = r.eigenda.Verify(commitment, data)
if err != nil {
log.Warn("Failed to verify blob", "err", err, "backend", src.BackendType())
continue
}

return data, nil
}
return nil, errors.New("no data found in any redundant backend")
return commit, nil
}

// putWithoutKey ... inserts a value into a storage backend that computes the key on-demand (i.e, EigenDA)
Expand All @@ -243,14 +161,6 @@
return key, r.s3.Put(ctx, key, value)
}

func (r *Router) fallbackEnabled() bool {
return len(r.fallbacks) > 0
}

func (r *Router) cacheEnabled() bool {
return len(r.caches) > 0
}

// GetEigenDAStore ...
func (r *Router) GetEigenDAStore() GeneratedKeyStore {
return r.eigenda
Expand All @@ -263,10 +173,10 @@

// Caches ...
func (r *Router) Caches() []PrecomputedKeyStore {
return r.caches
return r.secondary.Caches()
}

// Fallbacks ...
func (r *Router) Fallbacks() []PrecomputedKeyStore {
return r.fallbacks
return r.secondary.Fallbacks()
}
Loading
Loading