Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Update compose with secondary store setup && minor refactors #270

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 64 additions & 12 deletions docker-compose.yaml
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the goal for this docker-compose file? Is it just for manual testing?

It's not mentioned anywhere in the README, its not in CI, and don't think we use it for anything explicit right now. Might be good to document it and add it to ci if we want to keep it around?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some ideas around this; we coulda add feature testing to ensure lightweight dispersal/retrieval in a more real environment than the one we codify for E2E testing. The idea is you have one script that does a E2E set/get assertion which takes in specific feature flags denoting a specific runtime behavior of proxy which wrap around it; ie:

  • fallback_dest: [S3, redis, nil] w/ 3 variations
  • cache_dest: [S3, redis, nil] w/ 3 variations
  • cert_verification: [true, false] w 2 variations
3 * 3 * 2 = 18 permutations; but less since `fallback_dest` and `cache_dest` can only be sampled with replacement for `nil` so it'd be - ( 2 * 2) or:

18 - 4 = 14 test case permutations

Think this would be good to have since it also provides a more infra level overview of what wiring proxy with dependencies look like. This would definitely help devx when people have to spin-up additional infra and wire it with proxy.

Original file line number Diff line number Diff line change
@@ -1,22 +1,82 @@
## The following is a proxy instance
## pointed to redis for storage caching and S3
## for storage failovers

services:
## Used as secondary read failover target
minio:
image: minio/minio:latest
container_name: minio
environment:
- MINIO_ROOT_USER=minioadmin
- MINIO_ROOT_PASSWORD=minioadmin
ports:
- "9000:9000"
- "9001:9001"
command: server /data
volumes:
- minio_data:/data

minio-init:
## Seed test bucket
image: minio/mc:latest
depends_on:
- minio
entrypoint: ["/bin/sh", "-c", "/usr/bin/create-bucket.sh"]
volumes:
- ./scripts/create-test-s3-bucket.sh:/usr/bin/create-bucket.sh

redis:
image: redis:latest
container_name: redis
command: redis-server --requirepass redispassword
environment:
- REDIS_PASSWORD=redispassword
ports:
- "6379:6379"

eigenda_proxy:
depends_on:
- minio-init
build:
context: .
dockerfile: Dockerfile
container_name: eigenda-proxy
environment:
- EIGENDA_PROXY_LOG_LEVEL=debug
- EIGENDA_PROXY_ADDR=0.0.0.0
- EIGENDA_PROXY_PORT=4242
- EIGENDA_PROXY_MEMSTORE_ENABLED=false
## Turn this off to talk to actual eigenda network
- EIGENDA_PROXY_MEMSTORE_ENABLED=true
- EIGENDA_PROXY_MEMSTORE_EXPIRATION=45m
- EIGENDA_PROXY_EIGENDA_SIGNER_PRIVATE_KEY_HEX=$PRIVATE_KEY
- EIGENDA_PROXY_EIGENDA_CERT_VERIFICATION_DISABLED=true
- EIGENDA_PROXY_EIGENDA_SIGNER_PRIVATE_KEY_HEX=${PRIVATE_KEY}
- EIGENDA_PROXY_EIGENDA_DISPERSER_RPC=disperser-holesky.eigenda.xyz:443
- EIGENDA_PROXY_EIGENDA_SERVICE_MANAGER_ADDR=0xD4A7E1Bd8015057293f0D0A557088c286942e84b
- EIGENDA_PROXY_EIGENDA_ETH_RPC=$ETH_RPC
- EIGENDA_PROXY_EIGENDA_ETH_RPC=https://ethereum-holesky-rpc.publicnode.com
- EIGENDA_PROXY_EIGENDA_ETH_CONFIRMATION_DEPTH=0
- EIGENDA_PROXY_METRICS_ADDR=0.0.0.0
- EIGENDA_PROXY_METRICS_ENABLED=true
- EIGENDA_PROXY_METRICS_PORT=7300
## S3
- EIGENDA_PROXY_S3_CREDENTIAL_TYPE=static
- EIGENDA_PROXY_S3_ACCESS_KEY_ID=minioadmin
- EIGENDA_PROXY_S3_ACCESS_KEY_SECRET=minioadmin
- EIGENDA_PROXY_S3_BUCKET=eigenda-proxy-test
- EIGENDA_PROXY_S3_PATH=""
- EIGENDA_PROXY_S3_ENDPOINT=minio:9000
- EIGENDA_PROXY_S3_ENABLE_TLS=false

# Redis Configuration
- EIGENDA_PROXY_REDIS_DB=0
- EIGENDA_PROXY_REDIS_ENDPOINT=redis:6379
- EIGENDA_PROXY_REDIS_PASSWORD=redispassword
- EIGENDA_PROXY_REDIS_EVICTION=24h0m0s

## Secondary routing
- EIGENDA_PROXY_STORAGE_FALLBACK_TARGETS=s3
- EIGENDA_PROXY_STORAGE_CACHE_TARGETS=redis

ports:
- 4242:4242
- 7300:7300
Expand Down Expand Up @@ -44,14 +104,6 @@ services:
depends_on:
- prometheus

traffic-generator:
image: alpine:latest
build: scripts/
container_name: traffic_generator
depends_on:
- eigenda_proxy
volumes:
- ./scripts/:/scripts/

volumes:
grafana-data:
minio_data:
15 changes: 15 additions & 0 deletions scripts/create-test-s3-bucket.sh
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need to do this in this PR but could we convert our Makefile into a Justfile?
Justfile commands allow embedding shell scripts directly, so we don't need to maintain a whole bunch of separate scripts.
Personally find it much cleaner setup.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I'm down to use it - lets file an issue

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/sh

# Wait 2 seconds to ensure minio is finished bootstrapping
# TODO: Update this to do event based polling on minio server directly vs semi-arbitrary timeout
sleep 2s

# Configure MinIO client (mc)
echo "Configuring MinIO client..."
mc alias set local http://minio:9000 minioadmin minioadmin
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is mc? I don't have it on my system. Is this an external dependency we're adding? Prob want to document it somewhere. Eventually we'll prob want to have a mise file like op (or go all out and use nix)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oo probably worth noting that this script is only runnable in the minio-init service since it references the docker network from within


# Ensure the bucket exists
echo "Creating bucket: eigenda-proxy-test..."
mc mb local/eigenda-proxy-test || echo "Bucket already exists."

echo "Bucket setup complete."
8 changes: 6 additions & 2 deletions server/load_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr
return nil, err
}

// create secondary storage router
// create secondary storage manager
fallbacks := populateTargets(cfg.EigenDAConfig.StorageConfig.FallbackTargets, s3Store, redisStore)
caches := populateTargets(cfg.EigenDAConfig.StorageConfig.CacheTargets, s3Store, redisStore)
secondary := store.NewSecondaryManager(log, m, caches, fallbacks)
Expand All @@ -135,6 +135,10 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr
}
}

log.Info("Creating storage router", "eigenda backend type", eigenDA != nil, "s3 backend type", s3Store != nil)
log.Info("Created storage backends",
"eigenda", eigenDA != nil,
"s3", s3Store != nil,
"redis", redisStore != nil,
)
return store.NewManager(eigenDA, s3Store, log, secondary)
}
3 changes: 1 addition & 2 deletions store/generated_key/memstore/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func (e *MemStore) pruningLoop(ctx context.Context) {
return

case <-timer.C:
e.l.Debug("pruning expired blobs")
e.pruneExpired()
}
}
Expand All @@ -99,7 +98,7 @@ func (e *MemStore) pruneExpired() {
delete(e.keyStarts, commit)
delete(e.store, commit)

e.l.Info("blob pruned", "commit", commit)
e.l.Debug("blob pruned", "commit", commit)
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions store/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,13 @@ func (m *Manager) Put(ctx context.Context, cm commitments.CommitmentMode, key, v

// 2 - Put blob into secondary storage backends
if m.secondary.Enabled() && m.secondary.AsyncWriteEntry() { // publish put notification to secondary's subscription on PutNotify topic
m.log.Debug("Publishing data to async secondary stores")
m.secondary.Topic() <- PutNotify{
Commitment: commit,
Value: value,
}
} else if m.secondary.Enabled() && !m.secondary.AsyncWriteEntry() { // secondary is available only for synchronous writes
m.log.Debug("Publishing data to single threaded secondary stores")
err := m.secondary.HandleRedundantWrites(ctx, commit, value)
if err != nil {
m.log.Error("Secondary insertions failed", "error", err.Error())
Expand Down
9 changes: 5 additions & 4 deletions store/secondary.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type ISecondary interface {
WriteSubscriptionLoop(ctx context.Context)
}

// PutNotify ... notification received by primary router to perform insertion across
// PutNotify ... notification received by primary manager to perform insertion across
// secondary storage backends
type PutNotify struct {
Commitment []byte
Expand All @@ -53,7 +53,7 @@ type SecondaryManager struct {
concurrentWrites bool
}

// NewSecondaryManager ... creates a new secondary storage router
// NewSecondaryManager ... creates a new secondary storage manager
func NewSecondaryManager(log log.Logger, m metrics.Metricer, caches []common.PrecomputedKeyStore, fallbacks []common.PrecomputedKeyStore) ISecondary {
return &SecondaryManager{
topic: make(chan PutNotify), // channel is un-buffered which dispersing consumption across routines helps alleviate
Expand Down Expand Up @@ -92,6 +92,7 @@ func (sm *SecondaryManager) HandleRedundantWrites(ctx context.Context, commitmen
successes := 0

for _, src := range sources {
sm.log.Debug("Attempting to write to secondary storage", "backend", src.BackendType())
cb := sm.m.RecordSecondaryRequest(src.BackendType().String(), http.MethodPut)

// for added safety - we retry the insertion 5x using a default exponential backoff
Expand All @@ -115,12 +116,12 @@ func (sm *SecondaryManager) HandleRedundantWrites(ctx context.Context, commitmen
return nil
}

// AsyncWriteEntry ... subscribes to put notifications posted to shared topic with primary router
// AsyncWriteEntry ... subscribes to put notifications posted to shared topic with primary manager
func (sm *SecondaryManager) AsyncWriteEntry() bool {
return sm.concurrentWrites
}

// WriteSubscriptionLoop ... subscribes to put notifications posted to shared topic with primary router
// WriteSubscriptionLoop ... subscribes to put notifications posted to shared topic with primary manager
func (sm *SecondaryManager) WriteSubscriptionLoop(ctx context.Context) {
sm.concurrentWrites = true

Expand Down
Loading