Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Optional async decoupling for secondary writes and reworked E2E metric assertions #182

Merged
merged 20 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ced2a95
chore: Better abstract secondary storage
epociask Oct 12, 2024
ab6b939
chore: Better abstract secondary storage - add channel stream for sec…
epociask Oct 12, 2024
a598791
chore: Better abstract secondary storage - add channel stream for sec…
epociask Oct 12, 2024
3c3271d
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 12, 2024
bb9b433
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 13, 2024
4b9b0e2
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 13, 2024
13f221b
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 13, 2024
95790f2
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 13, 2024
8ef8108
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 16, 2024
2fce490
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 17, 2024
89f8272
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 17, 2024
167df0e
chore: Better abstract secondary storage - observe secondary storage …
epociask Oct 17, 2024
ab57599
chore: Better abstract secondary storage - address PR feedback, add b…
epociask Oct 20, 2024
2bb21b6
chore: Better abstract secondary storage - refactor tests
epociask Oct 20, 2024
0c3a48c
chore: Better abstract secondary storage - more test clean ups
epociask Oct 20, 2024
346e47c
Merge branch 'main' of github.com:Layr-Labs/op-plasma-eigenda into ep…
epociask Oct 20, 2024
1d858c7
Merge branch 'main' of github.com:Layr-Labs/op-plasma-eigenda into ep…
epociask Oct 21, 2024
69187fb
Merge branch 'main' of github.com:Layr-Labs/op-plasma-eigenda into ep…
epociask Oct 21, 2024
9959633
chore: Better abstract secondary storage - update go mock ref
epociask Oct 24, 2024
2d3559f
chore: Better abstract secondary storage - address PR feedback
epociask Oct 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ install-lint:
@echo "Installing golangci-lint..."
@sh -c $(GET_LINT_CMD)

gosec:
@echo "Running security scan with gosec..."
gosec ./...

submodules:
git submodule update --init --recursive

Expand Down
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ In order to disperse to the EigenDA network in production, or at high throughput
| `--s3.enable-tls` | | `$EIGENDA_PROXY_S3_ENABLE_TLS` | Enable TLS connection to S3 endpoint. |
| `--routing.fallback-targets` | `[]` | `$EIGENDA_PROXY_FALLBACK_TARGETS` | Fall back backend targets. Supports S3. | Backup storage locations to read from in the event of eigenda retrieval failure. |
| `--routing.cache-targets` | `[]` | `$EIGENDA_PROXY_CACHE_TARGETS` | Caching targets. Supports S3. | Caches data to backend targets after dispersing to DA, retrieved from before trying read from EigenDA. |
| `--routing.concurrent-write-threads` | `0` | `$EIGENDA_PROXY_CONCURRENT_WRITE_THREADS` | Number of threads spun-up for async secondary storage insertions. (<=0) denotes single threaded insertions where (>0) indicates decoupled writes. |
samlaf marked this conversation as resolved.
Show resolved Hide resolved
| `--s3.timeout` | `5s` | `$EIGENDA_PROXY_S3_TIMEOUT` | timeout for S3 storage operations (e.g. get, put) |
| `--redis.db` | `0` | `$EIGENDA_PROXY_REDIS_DB` | redis database to use after connecting to server |
| `--redis.endpoint` | `""` | `$EIGENDA_PROXY_REDIS_ENDPOINT` | redis endpoint url |
Expand Down Expand Up @@ -97,6 +98,9 @@ An optional `--eigenda-eth-confirmation-depth` flag can be provided to specify a

An ephemeral memory store backend can be used for faster feedback testing when testing rollup integrations. To target this feature, use the CLI flags `--memstore.enabled`, `--memstore.expiration`.

### Asynchronous Secondary Insertions
An optional `--routing.concurrent-write-routines` flag can be provided to enable asynchronous processing for secondary writes - allowing for more efficient dispersals in the presence of a hefty secondary routing layer. This flag specifies the number of write routines spun-up with supported thread counts in range `[1, 100)`.

### Storage Fallback
An optional storage fallback CLI flag `--routing.fallback-targets` can be leveraged to ensure resiliency when **reading**. When enabled, a blob is persisted to a fallback target after being successfully dispersed. Fallback targets use the keccak256 hash of the existing EigenDA commitment as their key, for succinctness. In the event that blobs cannot be read from EigenDA, they will then be retrieved in linear order from the provided fallback targets.

Expand Down Expand Up @@ -208,7 +212,11 @@ The `raw commitment` is an RLP-encoded [EigenDA certificate](https://github.com/

### Unit

Unit tests can be ran via invoking `make test`.
Unit tests can be ran via invoking `make test`. Please make sure to have all test containers downloaded locally before running via:
```
docker pull redis
docker pull minio
```
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this really needed? I would think testcontainer would still pull the images when attempting to run them if they are not present locally?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


### Holesky

Expand Down
5 changes: 3 additions & 2 deletions cmd/server/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ func StartProxySvr(cliCtx *cli.Context) error {
return fmt.Errorf("failed to pretty print config: %w", err)
}

m := metrics.NewMetrics("default")

ctx, ctxCancel := context.WithCancel(cliCtx.Context)
defer ctxCancel()

daRouter, err := server.LoadStoreRouter(ctx, cfg, log)
daRouter, err := server.LoadStoreRouter(ctx, cfg, log, m)
if err != nil {
return fmt.Errorf("failed to create store: %w", err)
}
m := metrics.NewMetrics("default")
server := server.NewServer(cliCtx.String(flags.ListenAddrFlagName), cliCtx.Int(flags.PortFlagName), daRouter, log, m)

if err := server.Start(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion commitments/mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
type CommitmentMeta struct {
Mode CommitmentMode
// CertVersion is shared for all modes and denotes version of the EigenDA certificate
CertVersion byte
CertVersion uint8
epociask marked this conversation as resolved.
Show resolved Hide resolved
}

type CommitmentMode string
Expand Down
21 changes: 10 additions & 11 deletions e2e/optimism_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package e2e_test

import (
"net/http"
"testing"

"github.com/Layr-Labs/eigenda-proxy/commitments"
"github.com/Layr-Labs/eigenda-proxy/e2e"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-e2e/actions"
Expand Down Expand Up @@ -166,11 +168,11 @@ func TestOptimismKeccak256Commitment(gt *testing.T) {
optimism.sequencer.ActL2PipelineFull(t)
optimism.ActL1Finalized(t)

// assert that EigenDA proxy's was written and read from
stat := proxyTS.Server.GetS3Stats()
// assert that EigenDA proxy was written and read from using op keccak256 commitment mode
readCount, err := proxyTS.Metrics.HTTPServerRequestsTotal.Find(http.MethodGet, "", string(commitments.OptimismKeccak), "0")
require.NoError(t, err)
require.True(t, readCount > 0)

require.Equal(t, 1, stat.Entries)
require.Equal(t, 1, stat.Reads)
}

func TestOptimismGenericCommitment(gt *testing.T) {
Expand Down Expand Up @@ -220,11 +222,8 @@ func TestOptimismGenericCommitment(gt *testing.T) {
optimism.sequencer.ActL2PipelineFull(t)
optimism.ActL1Finalized(t)

// assert that EigenDA proxy's was written and read from

if useMemory() {
stat := proxyTS.Server.GetEigenDAStats()
require.Equal(t, 1, stat.Entries)
require.Equal(t, 1, stat.Reads)
}
// assert that EigenDA proxy was written and read from using op generic commitment mode
readCount, err := proxyTS.Metrics.HTTPServerRequestsTotal.Find(http.MethodGet, "", string(commitments.OptimismGeneric), "0")
require.NoError(t, err)
require.True(t, readCount > 0)
}
24 changes: 12 additions & 12 deletions e2e/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"time"

"github.com/Layr-Labs/eigenda-proxy/client"
"github.com/Layr-Labs/eigenda-proxy/store"

"github.com/Layr-Labs/eigenda-proxy/e2e"
"github.com/Layr-Labs/eigenda-proxy/store"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -350,9 +350,10 @@ func TestProxyServerCaching(t *testing.T) {
require.Equal(t, testPreimage, preimage)

// ensure that read was from cache
s3Stats := ts.Server.GetS3Stats()
require.Equal(t, 1, s3Stats.Reads)
require.Equal(t, 1, s3Stats.Entries)

count, err := ts.Metrics.SecondaryRequestsTotal.Find(store.S3BackendType.String(), http.MethodGet, "success")
require.NoError(t, err)
require.True(t, count > 0)

if useMemory() { // ensure that eigenda was not read from
memStats := ts.Server.GetEigenDAStats()
Expand Down Expand Up @@ -393,11 +394,9 @@ func TestProxyServerCachingWithRedis(t *testing.T) {
require.Equal(t, testPreimage, preimage)

// ensure that read was from cache
redStats, err := ts.Server.GetStoreStats(store.RedisBackendType)
readCount, err := ts.Metrics.SecondaryRequestsTotal.Find(store.RedisBackendType.String(), http.MethodGet, "success")
require.NoError(t, err)

require.Equal(t, 1, redStats.Reads)
require.Equal(t, 1, redStats.Entries)
require.True(t, readCount > 0)

if useMemory() { // ensure that eigenda was not read from
memStats := ts.Server.GetEigenDAStats()
Expand All @@ -420,6 +419,7 @@ func TestProxyServerReadFallback(t *testing.T) {

t.Parallel()

// setup server with S3 as a fallback option
testCfg := e2e.TestConfig(useMemory())
testCfg.UseS3Fallback = true
testCfg.Expiration = time.Millisecond * 1
Expand Down Expand Up @@ -447,11 +447,11 @@ func TestProxyServerReadFallback(t *testing.T) {
require.NoError(t, err)
require.Equal(t, testPreimage, preimage)

// ensure that read was from fallback target location (i.e, S3 for this test)
s3Stats := ts.Server.GetS3Stats()
require.Equal(t, 1, s3Stats.Reads)
require.Equal(t, 1, s3Stats.Entries)
count, err := ts.Metrics.SecondaryRequestsTotal.Find(store.S3BackendType.String(), http.MethodGet, "success")
require.NoError(t, err)
require.True(t, count > 0)

// TODO - remove this in favor of metrics sampling
if useMemory() { // ensure that an eigenda read was attempted with zero data available
memStats := ts.Server.GetEigenDAStats()
require.Equal(t, 1, memStats.Reads)
Expand Down
26 changes: 15 additions & 11 deletions e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func createS3Config(eigendaCfg server.Config) server.CLIConfig {
createS3Bucket(bucketName)

eigendaCfg.S3Config = s3.Config{
Profiling: true,
Bucket: bucketName,
Path: "",
Endpoint: "localhost:4566",
Expand Down Expand Up @@ -175,9 +174,10 @@ func TestSuiteConfig(t *testing.T, testCfg *Cfg) server.CLIConfig {
}

type TestSuite struct {
Ctx context.Context
Log log.Logger
Server *server.Server
Ctx context.Context
Log log.Logger
Server *server.Server
Metrics *metrics.InMemoryMetricer
}

func CreateTestSuite(t *testing.T, testSuiteCfg server.CLIConfig) (TestSuite, func()) {
Expand All @@ -187,29 +187,33 @@ func CreateTestSuite(t *testing.T, testSuiteCfg server.CLIConfig) (TestSuite, fu
Color: true,
}).New("role", svcName)

m := metrics.NewInMemoryMetricer()
ctx := context.Background()
store, err := server.LoadStoreRouter(
ctx,
testSuiteCfg,
log,
m,
)

require.NoError(t, err)
server := server.NewServer(host, 0, store, log, metrics.NoopMetrics)
proxySvr := server.NewServer(host, 0, store, log, m)

t.Log("Starting proxy server...")
err = server.Start()
err = proxySvr.Start()
require.NoError(t, err)

kill := func() {
if err := server.Stop(); err != nil {
panic(err)
if err := proxySvr.Stop(); err != nil {
log.Error("failed to stop proxy server", "err", err)
}
}

return TestSuite{
Ctx: ctx,
Log: log,
Server: server,
Ctx: ctx,
Log: log,
Server: proxySvr,
Metrics: m,
}, kill
}

Expand Down
12 changes: 10 additions & 2 deletions flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ const (
PortFlagName = "port"

// routing flags
// TODO: change "routing" --> "secondary"
FallbackTargetsFlagName = "routing.fallback-targets"
CacheTargetsFlagName = "routing.cache-targets"
ConcurrentWriteThreads = "routing.concurrent-write-routines"
)

const EnvVarPrefix = "EIGENDA_PROXY"
Expand All @@ -43,13 +45,13 @@ func CLIFlags() []cli.Flag {
flags := []cli.Flag{
&cli.StringFlag{
Name: ListenAddrFlagName,
Usage: "server listening address",
Usage: "Server listening address",
Value: "0.0.0.0",
EnvVars: prefixEnvVars("ADDR"),
},
&cli.IntFlag{
Name: PortFlagName,
Usage: "server listening port",
Usage: "Server listening port",
Value: 3100,
EnvVars: prefixEnvVars("PORT"),
},
Expand All @@ -65,6 +67,12 @@ func CLIFlags() []cli.Flag {
Value: cli.NewStringSlice(),
EnvVars: prefixEnvVars("CACHE_TARGETS"),
},
&cli.IntFlag{
Name: ConcurrentWriteThreads,
Usage: "Number of threads spun-up for async secondary storage insertions. (<=0) denotes single threaded insertions where (>0) indicates decoupled writes.",
Value: 0,
EnvVars: prefixEnvVars("CONCURRENT_WRITE_THREADS"),
},
}

return flags
Expand Down
119 changes: 119 additions & 0 deletions metrics/memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package metrics

import (
"fmt"
"sort"
"sync"

"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
)

func keyLabels(labels []string) (common.Hash, error) {
sort.Strings(labels) // in-place sort strings so keys are order agnostic

encodedBytes, err := rlp.EncodeToBytes(labels)
samlaf marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return common.Hash{}, err
}

hash := crypto.Keccak256Hash(encodedBytes)
samlaf marked this conversation as resolved.
Show resolved Hide resolved

return hash, nil
}

type MetricCountMap struct {
m *sync.Map
}

func NewCountMap() *MetricCountMap {
return &MetricCountMap{
m: new(sync.Map),
}
}

func (mcm *MetricCountMap) insert(values ...string) error {
key, err := keyLabels(values)

if err != nil {
return err
}

// update or add count entry
value, exists := mcm.m.Load(key.Hex())
if !exists {
mcm.m.Store(key.Hex(), uint64(1))
return nil
}
uint64Val, ok := value.(uint64)
if !ok {
return fmt.Errorf("could not read uint64 from sync map")
}

mcm.m.Store(key.Hex(), uint64Val+uint64(1))
return nil
}

func (mcm *MetricCountMap) Find(values ...string) (uint64, error) {
samlaf marked this conversation as resolved.
Show resolved Hide resolved
key, err := keyLabels(values)

if err != nil {
return 0, err
}

val, exists := mcm.m.Load(key.Hex())
if !exists {
return 0, fmt.Errorf("value doesn't exist")
}
uint64Val, ok := val.(uint64)
if !ok {
return 0, fmt.Errorf("could not read uint64 from sync map")
}

return uint64Val, nil
}

type InMemoryMetricer struct {
HTTPServerRequestsTotal *MetricCountMap
// secondary metrics
SecondaryRequestsTotal *MetricCountMap
}

func NewInMemoryMetricer() *InMemoryMetricer {
return &InMemoryMetricer{
HTTPServerRequestsTotal: NewCountMap(),
SecondaryRequestsTotal: NewCountMap(),
}
}
samlaf marked this conversation as resolved.
Show resolved Hide resolved

var _ Metricer = NewInMemoryMetricer()

func (n *InMemoryMetricer) Document() []metrics.DocumentedMetric {
return nil
}

func (n *InMemoryMetricer) RecordInfo(_ string) {
}

func (n *InMemoryMetricer) RecordUp() {
}

func (n *InMemoryMetricer) RecordRPCServerRequest(endpoint string) func(status, mode, ver string) {
return func(x string, y string, z string) {
err := n.HTTPServerRequestsTotal.insert(endpoint, x, y, z)
if err != nil {
panic(err)
}
}
}

func (n *InMemoryMetricer) RecordSecondaryRequest(x string, y string) func(status string) {
return func(z string) {
err := n.SecondaryRequestsTotal.insert(x, y, z)
if err != nil {
panic(err)
}
}
}
Loading
Loading