Skip to content

Commit

Permalink
v0.12.0 (#1504)
Browse files Browse the repository at this point in the history
  • Loading branch information
JuArce authored Nov 27, 2024
2 parents 1125be8 + d3fb100 commit 562a6b2
Show file tree
Hide file tree
Showing 127 changed files with 1,711 additions and 6,509 deletions.
36 changes: 0 additions & 36 deletions .github/workflows/test-go-retries.yml

This file was deleted.

78 changes: 28 additions & 50 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ OS := $(shell uname -s)
CONFIG_FILE?=config-files/config.yaml
AGG_CONFIG_FILE?=config-files/config-aggregator.yaml

OPERATOR_VERSION=v0.11.2
OPERATOR_VERSION=v0.12.0

ifeq ($(OS),Linux)
BUILD_ALL_FFI = $(MAKE) build_all_ffi_linux
Expand Down Expand Up @@ -117,6 +117,7 @@ unpause_batcher_payment_service:
get_paused_state_batcher_payments_service:
@echo "Getting paused state of Batcher Payments Service contract..."
. contracts/scripts/get_paused_state_batcher_payments_service.sh

anvil_upgrade_initialize_disable_verifiers:
@echo "Initializing disabled verifiers..."
. contracts/scripts/anvil/upgrade_disabled_verifiers_in_service_manager.sh
Expand Down Expand Up @@ -229,19 +230,21 @@ operator_mint_mock_tokens:

operator_whitelist_devnet:
@echo "Whitelisting operator"
$(eval OPERATOR_ADDRESS = $(shell yq -r '.operator.address' $(CONFIG_FILE)))
@echo "Operator address: $(OPERATOR_ADDRESS)"
RPC_URL="http://localhost:8545" PRIVATE_KEY="0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" OUTPUT_PATH=./script/output/devnet/alignedlayer_deployment_output.json ./contracts/scripts/whitelist_operator.sh $(OPERATOR_ADDRESS)
RPC_URL="http://localhost:8545" PRIVATE_KEY="0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" OUTPUT_PATH=./script/output/devnet/alignedlayer_deployment_output.json ./contracts/scripts/operator_whitelist.sh $(OPERATOR_ADDRESS)

operator_remove_devnet:
operator_remove_from_whitelist_devnet:
@echo "Removing operator"
$(eval OPERATOR_ADDRESS = $(shell yq -r '.operator.address' $(CONFIG_FILE)))
@echo "Operator address: $(OPERATOR_ADDRESS)"
RPC_URL="http://localhost:8545" PRIVATE_KEY="0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" OUTPUT_PATH=./script/output/devnet/alignedlayer_deployment_output.json ./contracts/scripts/remove_operator.sh $(OPERATOR_ADDRESS)
RPC_URL="http://localhost:8545" PRIVATE_KEY="0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" OUTPUT_PATH=./script/output/devnet/alignedlayer_deployment_output.json ./contracts/scripts/operator_remove_from_whitelist.sh $(OPERATOR_ADDRESS)

operator_whitelist:
@echo "Whitelisting operator $(OPERATOR_ADDRESS)"
@. contracts/scripts/.env && . contracts/scripts/whitelist_operator.sh $(OPERATOR_ADDRESS)
@. contracts/scripts/.env && . contracts/scripts/operator_whitelist.sh $(OPERATOR_ADDRESS)

operator_remove_from_whitelist:
@echo "Removing operator $(OPERATOR_ADDRESS)"
@. contracts/scripts/.env && . contracts/scripts/operator_remove_from_whitelist.sh $(OPERATOR_ADDRESS)

operator_deposit_into_mock_strategy:
@echo "Depositing into mock strategy"
Expand Down Expand Up @@ -563,18 +566,18 @@ run_storage: ## Run storage using storage-docker-compose.yaml
@echo "Running storage..."
@docker compose -f storage-docker-compose.yaml up

__DEPLOYMENT__:
deploy_aligned_contracts: ## Deploy Aligned Contracts
@echo "Deploying Aligned Contracts..."
@. contracts/scripts/.env && . contracts/scripts/deploy_aligned_contracts.sh
__DEPLOYMENT__: ## ____
deploy_aligned_contracts: ## Deploy Aligned Contracts. Parameters: NETWORK=<mainnet|holesky|sepolia>
@echo "Deploying Aligned Contracts on $(NETWORK) network..."
@. contracts/scripts/.env.$(NETWORK) && . contracts/scripts/deploy_aligned_contracts.sh

deploy_pauser_registry: ## Deploy Pauser Registry
@echo "Deploying Pauser Registry..."
@. contracts/scripts/.env && . contracts/scripts/deploy_pauser_registry.sh

upgrade_aligned_contracts: ## Upgrade Aligned Contracts
@echo "Upgrading Aligned Contracts..."
@. contracts/scripts/.env && . contracts/scripts/upgrade_aligned_contracts.sh
upgrade_aligned_contracts: ## Upgrade Aligned Contracts. Parameters: NETWORK=<mainnet|holesky|sepolia>
@echo "Upgrading Aligned Contracts on $(NETWORK) network..."
@. contracts/scripts/.env.$(NETWORK) && . contracts/scripts/upgrade_aligned_contracts.sh

upgrade_pauser_aligned_contracts: ## Upgrade Aligned Contracts with Pauser initialization
@echo "Upgrading Aligned Contracts with Pauser initialization..."
Expand Down Expand Up @@ -608,13 +611,13 @@ deploy_verify_batch_inclusion_caller:
@echo "Deploying VerifyBatchInclusionCaller contract..."
@. examples/verify/.env && . examples/verify/scripts/deploy_verify_batch_inclusion_caller.sh

deploy_batcher_payment_service:
@echo "Deploying BatcherPayments contract..."
@. contracts/scripts/.env && . contracts/scripts/deploy_batcher_payment_service.sh
deploy_batcher_payment_service: ## Deploy BatcherPayments contract. Parameters: NETWORK=<mainnet|holesky|sepolia>
@echo "Deploying BatcherPayments contract on $(NETWORK) network..."
@. contracts/scripts/.env.$(NETWORK) && . contracts/scripts/deploy_batcher_payment_service.sh

upgrade_batcher_payment_service:
@echo "Upgrading BatcherPayments contract..."
@. contracts/scripts/.env && . contracts/scripts/upgrade_batcher_payment_service.sh
upgrade_batcher_payment_service: ## Upgrade BatcherPayments contract. Parameters: NETWORK=<mainnet|holesky|sepolia
@echo "Upgrading BatcherPayments Contract on $(NETWORK) network..."
@. contracts/scripts/.env.$(NETWORK) && . contracts/scripts/upgrade_batcher_payment_service.sh

build_aligned_contracts:
@cd contracts/src/core && forge build
Expand Down Expand Up @@ -844,35 +847,6 @@ explorer_create_env:
@cd explorer && \
cp .env.dev .env

__TRACKER__:

tracker_devnet_start: tracker_run_db
@cd operator_tracker/ && \
cargo run -r -- --env-file .env.dev

tracker_install: tracker_build_db
cargo install --path ./operator_tracker

tracker_build_db:
@cd operator_tracker && \
docker build -t tracker-postgres-image .

tracker_run_db: tracker_build_db tracker_remove_db_container
@cd operator_tracker && \
docker run -d --name tracker-postgres-container -p 5433:5432 -v tracker-postgres-data:/var/lib/postgresql/data tracker-postgres-image

tracker_remove_db_container:
docker stop tracker-postgres-container || true && \
docker rm tracker-postgres-container || true

tracker_clean_db: tracker_remove_db_container
docker volume rm tracker-postgres-data || true

tracker_dump_db:
@cd operator_tracker && \
docker exec -t tracker-postgres-container pg_dumpall -c -U tracker_user > dump.$$(date +\%Y\%m\%d_\%H\%M\%S).sql
@echo "Dumped database successfully to /operator_tracker"

DOCKER_RPC_URL=http://anvil:8545
PROOF_GENERATOR_ADDRESS=0x66f9664f97F2b50F62D13eA064982f936dE76657

Expand Down Expand Up @@ -1093,7 +1067,7 @@ docker_logs_batcher:

__TELEMETRY__:
# Collector, Jaeger and Elixir API
telemetry_full_start: open_telemetry_start telemetry_start
telemetry_full_start: telemetry_compile_bls_verifier open_telemetry_start telemetry_start

# Collector and Jaeger
open_telemetry_start: ## Run open telemetry services using telemetry-docker-compose.yaml
Expand Down Expand Up @@ -1137,6 +1111,10 @@ telemetry_create_env:
@cd telemetry_api && \
cp .env.dev .env

telemetry_compile_bls_verifier:
@cd telemetry_api/priv && \
go build ../bls_verifier/bls_verify.go

setup_local_aligned_all:
tmux kill-session -t aligned_layer || true
tmux new-session -d -s aligned_layer
Expand Down
35 changes: 4 additions & 31 deletions aggregator/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ import (
"encoding/hex"
"fmt"
"math/big"
"strings"
"sync"
"time"

gethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/prometheus/client_golang/prometheus"
retry "github.com/yetanotherco/aligned_layer/core"
"github.com/yetanotherco/aligned_layer/metrics"

sdkclients "github.com/Layr-Labs/eigensdk-go/chainio/clients"
Expand Down Expand Up @@ -107,7 +105,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
// Telemetry
aggregatorTelemetry := NewTelemetry(aggregatorConfig.Aggregator.TelemetryIpPortAddress, logger)

avsReader, err := chainio.NewAvsReaderFromConfig(aggregatorConfig.BaseConfig, aggregatorConfig.EcdsaConfig)
avsReader, err := chainio.NewAvsReaderFromConfig(aggregatorConfig.BaseConfig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -136,9 +134,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
PromMetricsIpPortAddress: ":9090",
}

aggregatorPrivateKey := aggregatorConfig.EcdsaConfig.PrivateKey

clients, err := sdkclients.BuildAll(chainioConfig, aggregatorPrivateKey, logger)
clients, err := sdkclients.BuildReadClients(chainioConfig, logger)
if err != nil {
logger.Errorf("Cannot create sdk clients", "err", err)
return nil, err
Expand Down Expand Up @@ -331,6 +327,7 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
nonSignerStakesAndSignature,
agg.AggregatorConfig.Aggregator.GasBaseBumpPercentage,
agg.AggregatorConfig.Aggregator.GasBumpIncrementalPercentage,
agg.AggregatorConfig.Aggregator.GasBumpPercentageLimit,
agg.AggregatorConfig.Aggregator.TimeToWaitBeforeBump,
onGasPriceBumped,
)
Expand Down Expand Up @@ -396,7 +393,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
quorumNums := eigentypes.QuorumNums{eigentypes.QuorumNum(QUORUM_NUMBER)}
quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages{eigentypes.QuorumThresholdPercentage(QUORUM_THRESHOLD)}

err := agg.InitializeNewTaskRetryable(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, agg.AggregatorConfig.Aggregator.BlsServiceTaskTimeout)
err := agg.blsAggregationService.InitializeNewTaskWithWindow(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, agg.AggregatorConfig.Aggregator.BlsServiceTaskTimeout, 15*time.Second)
if err != nil {
agg.logger.Fatalf("BLS aggregation service error when initializing new task: %s", err)
}
Expand All @@ -409,30 +406,6 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by

// |---RETRYABLE---|

/*
InitializeNewTaskRetryable
Initialize a new task in the BLS Aggregation service
- Errors:
Permanent:
- TaskAlreadyInitializedError (Permanent): Task is already intialized in the BLS Aggregation service (https://github.com/Layr-Labs/eigensdk-go/blob/dev/services/bls_aggregation/blsagg.go#L27).
Transient:
- All others.
- Retry times (3 retries): 1 sec, 2 sec, 4 sec
*/
func (agg *Aggregator) InitializeNewTaskRetryable(batchIndex uint32, taskCreatedBlock uint32, quorumNums eigentypes.QuorumNums, quorumThresholdPercentages eigentypes.QuorumThresholdPercentages, timeToExpiry time.Duration) error {
initializeNewTask_func := func() error {
err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, timeToExpiry)
if err != nil {
// Task is already initialized
if strings.Contains(err.Error(), "already initialized") {
err = retry.PermanentError{Inner: err}
}
}
return err
}
return retry.Retry(initializeNewTask_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
}

// Long-lived goroutine that periodically checks and removes old Tasks from stored Maps
// It runs every GarbageCollectorPeriod and removes all tasks older than GarbageCollectorTasksAge
// This was added because each task occupies memory in the maps, and we need to free it to avoid a memory leak
Expand Down
16 changes: 13 additions & 3 deletions aggregator/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]))
taskIndex := uint32(0)

taskIndex, err := agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash)
// The Aggregator may receive the Task Identifier after the operators.
// If that's the case, we won't know about the task at this point
// so we make GetTaskIndex retryable, waiting for some seconds,
// before trying to fetch the task again from the map.
taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash, retry.NetworkRetryParams())

if err != nil {
agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum")
Expand Down Expand Up @@ -106,7 +110,13 @@ func (agg *Aggregator) ServerRunning(_ *struct{}, reply *int64) error {
return nil
}

func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error) {
/*
Checks Internal mapping for Signed Task Response, returns its TaskIndex.
- All errors are considered Transient Errors
- Retry times (3 retries): 1 sec, 2 sec, 4 sec
TODO: We should refactor the retry duration considering extending it to a larger time or number of retries, at least somewhere between 1 and 2 blocks
*/
func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte, config *retry.RetryParams) (uint32, error) {
getTaskIndex_func := func() (uint32, error) {
agg.taskMutex.Lock()
taskIndex, ok := agg.batchesIdxByIdentifierHash[batchIdentifierHash]
Expand All @@ -118,5 +128,5 @@ func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error
}
}

return retry.RetryWithData(getTaskIndex_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
return retry.RetryWithData(getTaskIndex_func, config)
}
20 changes: 10 additions & 10 deletions aggregator/pkg/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (t *Telemetry) InitNewTrace(batchMerkleRoot [32]byte) {
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
}
if err := t.sendTelemetryMessage("/api/initTaskTrace", body); err != nil {
t.logger.Error("[Telemetry] Error in InitNewTrace", "error", err)
t.logger.Warn("[Telemetry] Error in InitNewTrace", "error", err)
}
}

Expand All @@ -78,7 +78,7 @@ func (t *Telemetry) LogOperatorResponse(batchMerkleRoot [32]byte, operatorId [32
OperatorId: fmt.Sprintf("0x%s", hex.EncodeToString(operatorId[:])),
}
if err := t.sendTelemetryMessage("/api/operatorResponse", body); err != nil {
t.logger.Error("[Telemetry] Error in LogOperatorResponse", "error", err)
t.logger.Warn("[Telemetry] Error in LogOperatorResponse", "error", err)
}
}

Expand All @@ -87,7 +87,7 @@ func (t *Telemetry) LogQuorumReached(batchMerkleRoot [32]byte) {
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
}
if err := t.sendTelemetryMessage("/api/quorumReached", body); err != nil {
t.logger.Error("[Telemetry] Error in LogQuorumReached", "error", err)
t.logger.Warn("[Telemetry] Error in LogQuorumReached", "error", err)
}
}

Expand All @@ -97,7 +97,7 @@ func (t *Telemetry) LogTaskError(batchMerkleRoot [32]byte, taskError error) {
TaskError: taskError.Error(),
}
if err := t.sendTelemetryMessage("/api/taskError", body); err != nil {
t.logger.Error("[Telemetry] Error in LogTaskError", "error", err)
t.logger.Warn("[Telemetry] Error in LogTaskError", "error", err)
}
}

Expand All @@ -107,7 +107,7 @@ func (t *Telemetry) BumpedTaskGasPrice(batchMerkleRoot [32]byte, bumpedGasPrice
BumpedGasPrice: bumpedGasPrice,
}
if err := t.sendTelemetryMessage("/api/aggregatorTaskGasPriceBump", body); err != nil {
t.logger.Error("[Telemetry] Error in LogOperatorResponse", "error", err)
t.logger.Warn("[Telemetry] Error in LogOperatorResponse", "error", err)
}
}

Expand All @@ -117,7 +117,7 @@ func (t *Telemetry) TaskSentToEthereum(batchMerkleRoot [32]byte, txHash string)
TxHash: txHash,
}
if err := t.sendTelemetryMessage("/api/aggregatorTaskSent", body); err != nil {
t.logger.Error("[Telemetry] Error in TaskSentToEthereum", "error", err)
t.logger.Warn("[Telemetry] Error in TaskSentToEthereum", "error", err)
}
}

Expand All @@ -129,15 +129,15 @@ func (t *Telemetry) FinishTrace(batchMerkleRoot [32]byte) {
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
}
if err := t.sendTelemetryMessage("/api/finishTaskTrace", body); err != nil {
t.logger.Error("[Telemetry] Error in FinishTrace", "error", err)
t.logger.Warn("[Telemetry] Error in FinishTrace", "error", err)
}
}()
}

func (t *Telemetry) sendTelemetryMessage(endpoint string, message interface{}) error {
encodedBody, err := json.Marshal(message)
if err != nil {
t.logger.Error("[Telemetry] Error marshalling JSON", "error", err)
t.logger.Warn("[Telemetry] Error marshalling JSON", "error", err)
return fmt.Errorf("error marshalling JSON: %w", err)
}

Expand All @@ -147,14 +147,14 @@ func (t *Telemetry) sendTelemetryMessage(endpoint string, message interface{}) e

resp, err := t.client.Post(fullURL.String(), "application/json", bytes.NewBuffer(encodedBody))
if err != nil {
t.logger.Error("[Telemetry] Error sending POST request", "error", err)
t.logger.Warn("[Telemetry] Error sending POST request", "error", err)
return fmt.Errorf("error making POST request: %w", err)
}
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
t.logger.Error("[Telemetry] Error reading response body", "error", err)
t.logger.Warn("[Telemetry] Error reading response body", "error", err)
return fmt.Errorf("error reading response body: %w", err)
}

Expand Down
Loading

0 comments on commit 562a6b2

Please sign in to comment.