From 33b6bea4582eeb8e0f5ff73ad4f06f39c6c90554 Mon Sep 17 00:00:00 2001 From: supernovahs Date: Fri, 10 May 2024 19:00:54 +0530 Subject: [PATCH] clear all nits --- Cargo.lock | 16 ++++ Cargo.toml | 10 +-- .../chainio/clients/avsregistry/src/reader.rs | 75 ++++++---------- .../clients/avsregistry/src/subscriber.rs | 2 +- crates/metrics/collectors/economic/Cargo.toml | 1 + crates/metrics/collectors/economic/src/lib.rs | 2 +- crates/services/avsregistry/Cargo.toml | 4 +- .../services/avsregistry/src/chaincaller.rs | 27 +++--- crates/services/bls_aggregation/Cargo.toml | 3 +- .../services/bls_aggregation/src/bls_agg.rs | 46 +++++----- crates/services/operatorsinfo/Cargo.toml | 9 +- .../src/operatorsinfo_inmemory.rs | 86 +++++++++++-------- crates/types/src/avs.rs | 5 +- 13 files changed, 158 insertions(+), 128 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a50c09d1..0cdfdd76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -210,11 +210,13 @@ dependencies = [ "alloy-json-rpc", "alloy-network", "alloy-primitives", + "alloy-pubsub", "alloy-rpc-client", "alloy-rpc-types", "alloy-rpc-types-trace", "alloy-transport", "alloy-transport-http", + "alloy-transport-ws", "async-stream", "async-trait", "auto_impl", @@ -275,8 +277,11 @@ version = "0.1.0" source = "git+https://github.com/alloy-rs/alloy?rev=af788af#af788afe934d4c54ec1fcb6bb4b16ce385f913ab" dependencies = [ "alloy-json-rpc", + "alloy-primitives", + "alloy-pubsub", "alloy-transport", "alloy-transport-http", + "alloy-transport-ws", "futures", "pin-project", "reqwest 0.12.4", @@ -1420,6 +1425,7 @@ dependencies = [ name = "eigensdk-metrics-collectors-economic" version = "0.0.1-alpha" dependencies = [ + "alloy-primitives", "eigensdk-client-avsregistry", "eigensdk-client-elcontracts", "ethers", @@ -1436,6 +1442,8 @@ dependencies = [ name = "eigensdk-services-avsregistry" version = "0.0.1-alpha" dependencies = [ + "alloy-primitives", + "eigensdk-chainio-utils", "eigensdk-client-avsregistry", "eigensdk-contract-bindings", "eigensdk-crypto-bls", @@ -1451,6 +1459,7 @@ dependencies = [ name = "eigensdk-services-blsaggregation" version = "0.0.1-alpha" dependencies = [ + "alloy-primitives", "eigensdk-crypto-bls", "eigensdk-crypto-bn254", "eigensdk-services-avsregistry", @@ -1466,6 +1475,11 @@ dependencies = [ name = "eigensdk-services-operatorsinfo" version = "0.0.1-alpha" dependencies = [ + "alloy-contract", + "alloy-primitives", + "alloy-provider", + "alloy-sol-types", + "alloy-transport-ws", "eigensdk-client-avsregistry", "eigensdk-contract-bindings", "eigensdk-crypto-bls", @@ -1473,6 +1487,8 @@ dependencies = [ "eigensdk-types", "eth-keystore", "ethers", + "eyre", + "futures-util", "thiserror", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 2556bd0e..a448e105 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ rustdoc.all = "warn" [workspace.dependencies] ethers = "2.0.14" ark-ff = "0.4.0" +eyre = "0.6.12" reth = {git = "https://github.com/paradigmxyz/reth"} prometheus-client = "0.22.2" bn254 = {git = "https://github.com/sedaprotocol/bn254"} @@ -62,12 +63,10 @@ eigensdk-services-avsregistry = {path = "crates/services/avsregistry"} eigensdk-services-bls_aggregation = {path = "crates/services/bls_aggregation"} eigensdk-services-operatorsinfo = {path = "crates/services/operatorsinfo"} tokio = {version = "1.37.0" , features = ["test-util", "full","sync"] } - +futures-util = "0.3.30" thiserror = "1.0" tracing = "0.1.40" - - #misc parking_lot = "0.12" @@ -85,7 +84,7 @@ alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/alloy", rev = "af7 alloy-genesis = { git = "https://github.com/alloy-rs/alloy", rev = "af788af" } alloy-node-bindings = { git = "https://github.com/alloy-rs/alloy", rev = "af788af" } alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "af788af", default-features = false, features = [ - "reqwest", + "reqwest","ws" ] } alloy-transport-http = {git = "https://github.com/alloy-rs/alloy", rev = "af788af"} alloy-eips = { git = "https://github.com/alloy-rs/alloy", default-features = false, rev = "af788af" } @@ -95,4 +94,5 @@ alloy-network = { git = "https://github.com/alloy-rs/alloy", rev = "af788af" } alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "af788af" } alloy-contract = {git = "https://github.com/alloy-rs/alloy", rev = "af788af" } alloy-transport = {git = "https://github.com/alloy-rs/alloy", rev = "af788af"} -alloy-transport-ws = {git = "https://github.com/alloy-rs/alloy", rev = "af788af"} \ No newline at end of file +alloy-transport-ws = {git = "https://github.com/alloy-rs/alloy", rev = "af788af"} +alloy-rpc-client = {git = "https://github.com/alloy-rs/alloy", rev = "af788af"} \ No newline at end of file diff --git a/crates/chainio/clients/avsregistry/src/reader.rs b/crates/chainio/clients/avsregistry/src/reader.rs index 40118e9c..4963c0c0 100644 --- a/crates/chainio/clients/avsregistry/src/reader.rs +++ b/crates/chainio/clients/avsregistry/src/reader.rs @@ -1,15 +1,9 @@ -use crate::{error::AvsRegistryError, reader::BLSApkRegistry::NewPubkeyRegistration}; - -use alloy_contract::SolCallBuilder; -use alloy_eips::eip1898::BlockNumberOrTag; -use alloy_primitives::address; -use alloy_primitives::{ - hex::check, keccak256, Address, BlockNumber, Bytes, FixedBytes, B256, U256, U64, -}; +use crate::error::AvsRegistryError; +use alloy_primitives::{Address, Bytes, FixedBytes, B256, U256}; use alloy_provider::{Provider, ProviderBuilder}; -use alloy_rpc_types::{Filter, FilterBlockOption, FilterSet, Topic, ValueOrArray}; -use alloy_sol_types::{sol, SolConstructor, SolEventInterface}; -use eigensdk_types::operator::bitmap_to_quorum_ids; +use alloy_rpc_types::Filter; +use alloy_sol_types::sol; +use eigensdk_types::operator::{bitmap_to_quorum_ids, BLSApkRegistry, OperatorPubKeys}; use num_bigint::BigInt; use std::collections::HashMap; use std::fmt::Debug; @@ -42,20 +36,7 @@ sol!( "../../../../crates/contracts/bindings/utils/json/OperatorStateRetriever.json" ); -sol!( - #[allow(missing_docs)] - #[derive(Debug)] - #[sol(rpc)] - BLSApkRegistry, - "../../../../crates/contracts/bindings/utils/json/BLSApkRegistry.json" -); - -use self::RegistryCoordinator::RegistryCoordinatorInstance; use BLSApkRegistry::{G1Point, G2Point}; -use OperatorStateRetriever::OperatorStateRetrieverCalls; -use RegistryCoordinator::RegistryCoordinatorEvents; -use StakeRegistry::StakeRegistryEvents; - /// Avs Registry chainreader #[derive(Debug, Clone)] pub struct AvsRegistryChainReader { @@ -66,12 +47,6 @@ pub struct AvsRegistryChainReader { provider: String, } -#[derive(Debug, Clone)] -pub struct OperatorPubKeys { - pub g1_pub_key: G1Point, - pub g2_pub_key: G2Point, -} - trait AvsRegistryReader { fn get_quorum_count() -> Result; } @@ -452,7 +427,6 @@ impl AvsRegistryChainReader { .event("NewPubkeyRegistration(address,(uint256,uint256),(uint256[2],uint256[2]))") .address(self.bls_apk_registry_addr); - let contract_bls_apk_registry = BLSApkRegistry::new(self.bls_apk_registry_addr, &provider); let logs = provider.get_logs(&filter).await?; debug!(transactionLogs = ?logs, "avsRegistryChainReader.QueryExistingRegisteredOperatorPubKeys"); @@ -538,37 +512,36 @@ impl AvsRegistryChainReader { } } -#[test] -fn test_build_avs_registry_chain_reader() { - let provider = "http://localhost:8545"; - let instance = AvsRegistryChainReader::new( - Address::from_word(keccak256("registry")), - // Address::from_word(keccak256(("registry").into())), - Address::from_word(keccak256("blsapkregistry")), - Address::from_word(keccak256("operatorstateretriever")), - Address::from_word(keccak256("stakeregistry")), - provider.to_string(), - ); - let _ = AvsRegistryChainReader::build_avs_registry_chain_reader( - &instance, - Address::from_word(keccak256("registry")), - Address::from_word(keccak256("operator")), - Address::from_word(keccak256("stake")), - ); -} - #[cfg(test)] mod tests { use super::*; + use alloy_primitives::keccak256; use hex::FromHex; use std::str::FromStr; - const HOLESKY_REGISTRY_COORDINATOR: &str = "0x53012C69A189cfA2D9d29eb6F19B32e0A2EA3490"; const HOLESKY_OPERATOR_STATE_RETRIEVER: &str = "0xB4baAfee917fb4449f5ec64804217bccE9f46C67"; const HOLESKY_STAKE_REGISTRY: &str = "0xBDACD5998989Eec814ac7A0f0f6596088AA2a270"; const HOLESKY_BLS_APK_REGISTRY: &str = "0x066cF95c1bf0927124DFB8B02B401bc23A79730D"; + #[tokio::test] + async fn test_build_avs_registry_chain_reader() { + let provider = "http://localhost:8545"; + let instance = AvsRegistryChainReader::new( + Address::from_word(keccak256("registry")), + // Address::from_word(keccak256(("registry").into())), + Address::from_word(keccak256("blsapkregistry")), + Address::from_word(keccak256("operatorstateretriever")), + Address::from_word(keccak256("stakeregistry")), + provider.to_string(), + ); + let _ = AvsRegistryChainReader::build_avs_registry_chain_reader( + &instance, + Address::from_word(keccak256("registry")), + Address::from_word(keccak256("operator")), + Address::from_word(keccak256("stake")), + ); + } fn build_avs_registry_chain_reader() -> AvsRegistryChainReader { let holesky_registry_coordinator = Address::from_str(HOLESKY_REGISTRY_COORDINATOR).expect("failed to parse address"); diff --git a/crates/chainio/clients/avsregistry/src/subscriber.rs b/crates/chainio/clients/avsregistry/src/subscriber.rs index bfcf14cf..72870929 100644 --- a/crates/chainio/clients/avsregistry/src/subscriber.rs +++ b/crates/chainio/clients/avsregistry/src/subscriber.rs @@ -67,7 +67,7 @@ impl AvsRegistryChainSubscriber { .with_recommended_fillers() .on_builtin(&self.provider) .await?; - let current_block_number = provider.get_block_number().await.unwrap(); + let current_block_number = provider.get_block_number().await?; let filter = Filter::new() .event("NewPubkeyRegistration(address,(uint256,uint256),(uint256[2],uint256[2]))") diff --git a/crates/metrics/collectors/economic/Cargo.toml b/crates/metrics/collectors/economic/Cargo.toml index 13881ddd..4b96ecae 100644 --- a/crates/metrics/collectors/economic/Cargo.toml +++ b/crates/metrics/collectors/economic/Cargo.toml @@ -10,3 +10,4 @@ repository.workspace = true ethers.workspace = true eigensdk-client-elcontracts.workspace = true eigensdk-client-avsregistry.workspace = true +alloy-primitives.workspace = true \ No newline at end of file diff --git a/crates/metrics/collectors/economic/src/lib.rs b/crates/metrics/collectors/economic/src/lib.rs index 8913aba3..e776eafe 100644 --- a/crates/metrics/collectors/economic/src/lib.rs +++ b/crates/metrics/collectors/economic/src/lib.rs @@ -1,6 +1,6 @@ +use alloy_primitives::{Address, U256}; use eigensdk_client_avsregistry::reader::AvsRegistryChainReader; use eigensdk_client_elcontracts::reader::ELChainReader; -use ethers_core::types::{Address, U256}; use std::collections::HashMap; pub struct Collector { diff --git a/crates/services/avsregistry/Cargo.toml b/crates/services/avsregistry/Cargo.toml index efb21367..6b6ded29 100644 --- a/crates/services/avsregistry/Cargo.toml +++ b/crates/services/avsregistry/Cargo.toml @@ -17,4 +17,6 @@ eigensdk-services-operatorsinfo.workspace = true eigensdk-types.workspace = true eigensdk-contract-bindings.workspace = true eigensdk-crypto-bls.workspace = true -eigensdk-crypto-bn254.workspace = true \ No newline at end of file +eigensdk-crypto-bn254.workspace = true +alloy-primitives.workspace = true +eigensdk-chainio-utils.workspace = true \ No newline at end of file diff --git a/crates/services/avsregistry/src/chaincaller.rs b/crates/services/avsregistry/src/chaincaller.rs index b4160b53..3497eee7 100644 --- a/crates/services/avsregistry/src/chaincaller.rs +++ b/crates/services/avsregistry/src/chaincaller.rs @@ -1,15 +1,16 @@ +use alloy_primitives::{Bytes, FixedBytes, U256}; +use eigensdk_chainio_utils::convert_to_bn254_g1_point; use eigensdk_client_avsregistry::reader::AvsRegistryChainReader; -use eigensdk_contract_bindings::BLSApkRegistry::{G1Point, G2Point}; use eigensdk_crypto_bls::attestation::G1Point as BlsG1Point; use eigensdk_crypto_bn254::utils::u256_to_bigint256; use eigensdk_services_operatorsinfo::operatorsinfo_inmemory::OperatorInfoServiceInMemory; +use eigensdk_types::operator::BLSApkRegistry::{G1Point, G2Point}; use eigensdk_types::operator::{ self, OperatorAvsState, OperatorInfo, OperatorPubKeys, QuorumAvsState, }; -use ethers_core::types::{Bytes, U256, U64}; use std::collections::HashMap; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct AvsRegistryServiceChainCaller { avs_registry: AvsRegistryChainReader, operators_info_service: OperatorInfoServiceInMemory, @@ -34,8 +35,8 @@ impl AvsRegistryServiceChainCaller { &self, block_num: u32, quorum_nums: Bytes, - ) -> HashMap<[u8; 32], OperatorAvsState> { - let mut operators_avs_state: HashMap<[u8; 32], OperatorAvsState> = HashMap::new(); + ) -> HashMap, OperatorAvsState> { + let mut operators_avs_state: HashMap, OperatorAvsState> = HashMap::new(); let operators_stakes_in_quorums = self .avs_registry @@ -49,12 +50,12 @@ impl AvsRegistryServiceChainCaller { for (quorum_id, quorum_num) in quorum_nums.iter().enumerate() { for operator in &operators_stakes_in_quorums[quorum_id] { - let info = self.get_operator_info(operator.operator_id).await; + let info = self.get_operator_info(*operator.operatorId).await; let stake_per_quorum = HashMap::new(); let avs_state = operators_avs_state - .entry(operator.operator_id) + .entry(FixedBytes(*operator.operatorId)) .or_insert_with(|| OperatorAvsState { - operator_id: operator.operator_id, + operator_id: *operator.operatorId, operator_info: OperatorInfo { pub_keys: info }, stake_per_quorum: stake_per_quorum, block_num: block_num.into(), @@ -89,20 +90,24 @@ impl AvsRegistryServiceChainCaller { if !operator.stake_per_quorum[quorum_num].is_zero() { if let Some(pubkeys) = &operator.operator_info.pub_keys { let g1_point = BlsG1Point::new( - u256_to_bigint256(pubkeys.g1_pub_key.x), - u256_to_bigint256(pubkeys.g1_pub_key.y), + u256_to_bigint256(pubkeys.g1_pub_key.X), + u256_to_bigint256(pubkeys.g1_pub_key.Y), ); pub_key_g1.add(g1_point); total_stake += operator.stake_per_quorum[quorum_num]; } } } + let g1_point = convert_to_bn254_g1_point(pub_key_g1.point); quorums_avs_state.insert( *quorum_num, QuorumAvsState { quorum_num: *quorum_num, total_stake: total_stake, - agg_pub_key_g1: pub_key_g1, + agg_pub_key_g1: G1Point { + X: g1_point.X, + Y: g1_point.Y, + }, block_num: block_num, }, ); diff --git a/crates/services/bls_aggregation/Cargo.toml b/crates/services/bls_aggregation/Cargo.toml index 2f776074..67439d94 100644 --- a/crates/services/bls_aggregation/Cargo.toml +++ b/crates/services/bls_aggregation/Cargo.toml @@ -17,4 +17,5 @@ eigensdk-crypto-bls.workspace = true eigensdk-services-avsregistry.workspace = true parking_lot.workspace = true tokio = {workspace = true, features = ["full"]} -eigensdk-crypto-bn254.workspace = true \ No newline at end of file +eigensdk-crypto-bn254.workspace = true +alloy-primitives.workspace = true \ No newline at end of file diff --git a/crates/services/bls_aggregation/src/bls_agg.rs b/crates/services/bls_aggregation/src/bls_agg.rs index 95db2798..095a18fa 100644 --- a/crates/services/bls_aggregation/src/bls_agg.rs +++ b/crates/services/bls_aggregation/src/bls_agg.rs @@ -2,11 +2,13 @@ use eigensdk_crypto_bls::attestation::{G1Point, G2Point, Signature}; use eigensdk_services_avsregistry::chaincaller::AvsRegistryServiceChainCaller; use eigensdk_types::{ avs::{SignedTaskResponseDigest, TaskIndex, TaskResponseDigest}, + operator::BLSApkRegistry, operator::{OperatorAvsState, QuorumThresholdPercentage, QuorumThresholdPercentages}, }; +use alloy_primitives::{FixedBytes, U256}; use eigensdk_crypto_bn254::utils::u256_to_bigint256; -use ethers_core::types::U256; +use ethers::core::k256::FieldBytes; use std::collections::HashMap; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::time::{self, Duration}; @@ -35,7 +37,7 @@ pub struct AggregatedOperators { signers_total_stake_per_quorum: HashMap, - pub signers_operator_ids_set: HashMap<[u8; 32], bool>, + pub signers_operator_ids_set: HashMap, bool>, } impl AggregatedOperators {} @@ -110,7 +112,7 @@ impl BlsAggregatorService { task_index: TaskIndex, task_response_digest: TaskResponseDigest, bls_signature: Signature, - operator_id: [u8; 32], + operator_id: FixedBytes<32>, ) { let task_channel = self.read().await; @@ -160,14 +162,17 @@ impl BlsAggregatorService { // let quorum_apks_g1 for (_, quorum_number) in quorum_nums.iter().enumerate() { if let Some(val) = quorums_avs_stake.get(&quorum_number) { - quorum_apks_g1.push(val.agg_pub_key_g1.clone()); + quorum_apks_g1.push(G1Point::new( + u256_to_bigint256(val.agg_pub_key_g1.X), + u256_to_bigint256(val.agg_pub_key_g1.Y), + )); } } let task_expired_timer = time::sleep(time_to_expiry); tokio::pin!(task_expired_timer); - let mut aggregated_operators: HashMap<[u8; 32], AggregatedOperators> = HashMap::new(); + let mut aggregated_operators: HashMap, AggregatedOperators> = HashMap::new(); let sig = self .verify_signature(task_index, &signed_task_digest, &operator_state_avs) .await; @@ -186,13 +191,13 @@ impl BlsAggregatorService { aggregate_response.signers_agg_sig_g1.get_g1_point().add(signed_task_digest.bls_signature.get_g1_point()); if let Some(op_avs_state) = operator_state_avs.get_mut(&signed_task_digest.operator_id){ if let Some(pub_key) =&op_avs_state.operator_info.pub_keys { - let g2_pub_key = G2Point::new((u256_to_bigint256(pub_key.g2_pub_key.x[0]),u256_to_bigint256(pub_key.g2_pub_key.x[1])),(u256_to_bigint256(pub_key.g2_pub_key.y[0]),u256_to_bigint256(pub_key.g2_pub_key.y[1]))); + let g2_pub_key = G2Point::new((u256_to_bigint256(pub_key.g2_pub_key.X[0]),u256_to_bigint256(pub_key.g2_pub_key.X[1])),(u256_to_bigint256(pub_key.g2_pub_key.Y[0]),u256_to_bigint256(pub_key.g2_pub_key.Y[1]))); aggregate_response.signers_apk_g2.add(g2_pub_key); } } - aggregate_response.signers_operator_ids_set.insert(signed_task_digest.operator_id, true); + aggregate_response.signers_operator_ids_set.insert(FixedBytes(*signed_task_digest.operator_id), true); if let Some(state_avs) = operator_state_avs.get(&signed_task_digest.operator_id){ @@ -228,7 +233,7 @@ impl BlsAggregatorService { // check stake threshold if self.check_if_stake_thresholds_met(aggregate_response.signers_total_stake_per_quorum,total_stake_per_quorum.clone(),quorum_threshold_percentage_map.clone()){ - let mut non_signers_operators_ids: Vec<[u8;32]> = vec![]; + let mut non_signers_operators_ids: Vec> = vec![]; for (i,op_info) in &operator_state_avs{ if aggregate_response.signers_operator_ids_set.get(&op_info.operator_id).is_none(){ non_signers_operators_ids.push(*i); @@ -244,7 +249,7 @@ impl BlsAggregatorService { if let Some(operator) = operator_state_avs.get(operator_id){ if let Some(keys) = &operator.operator_info.pub_keys{ - let g1_key = G1Point::new(u256_to_bigint256(keys.g1_pub_key.x),u256_to_bigint256(keys.g1_pub_key.y)); + let g1_key = G1Point::new(u256_to_bigint256(keys.g1_pub_key.X),u256_to_bigint256(keys.g1_pub_key.Y)); non_signers_g1_pub_keys.push(g1_key); } } @@ -260,10 +265,10 @@ impl BlsAggregatorService { quorum_apks_g1: quorum_apks_g1.clone(), signers_apk_g2: aggregate_response.signers_apk_g2, signers_agg_sig_g1: aggregate_response.signers_agg_sig_g1, - non_signer_quorum_bitmap_indices: indices.clone().quorum_apk_indices, - quorum_apk_indices: indices.quorum_apk_indices, - total_stake_indices: indices.total_stake_indices, - non_signer_stake_indices: indices.non_signer_stake_indices + non_signer_quorum_bitmap_indices: indices.clone().quorumApkIndices, + quorum_apk_indices: indices.quorumApkIndices, + total_stake_indices: indices.totalStakeIndices, + non_signer_stake_indices: indices.nonSignerStakeIndices }; @@ -285,7 +290,7 @@ impl BlsAggregatorService { &self, task_index: TaskIndex, signed_task_response_digest: &SignedTaskResponseDigest, - operator_avs_state: &HashMap<[u8; 32], OperatorAvsState>, + operator_avs_state: &HashMap, OperatorAvsState>, ) { if let Some(operator_state) = operator_avs_state.get(&signed_task_response_digest.operator_id) @@ -293,12 +298,12 @@ impl BlsAggregatorService { if let Some(pub_keys) = &operator_state.operator_info.pub_keys { let g2_proj = G2Point::new( ( - u256_to_bigint256(pub_keys.g2_pub_key.x[0]), - u256_to_bigint256(pub_keys.g2_pub_key.x[1]), + u256_to_bigint256(pub_keys.g2_pub_key.X[0]), + u256_to_bigint256(pub_keys.g2_pub_key.X[1]), ), ( - u256_to_bigint256(pub_keys.g2_pub_key.y[0]), - u256_to_bigint256(pub_keys.g2_pub_key.y[1]), + u256_to_bigint256(pub_keys.g2_pub_key.Y[0]), + u256_to_bigint256(pub_keys.g2_pub_key.Y[1]), ), ) .point; @@ -322,8 +327,9 @@ impl BlsAggregatorService { if let Some(signed_stake_by_quorum) = signed_stake_per_quorum.get(&(quorum_num as u8)) { if let Some(total_stake_by_quorum) = total_stake_per_quorum.get(&(quorum_num as u8)) { - let signed_stake = *signed_stake_by_quorum * 100; - let threshold_stake = *total_stake_by_quorum * quorum_threshold_percentage; + let signed_stake = signed_stake_by_quorum * U256::from(100); + let threshold_stake = + *total_stake_by_quorum * U256::from(quorum_threshold_percentage); return signed_stake >= threshold_stake; } else { return false; diff --git a/crates/services/operatorsinfo/Cargo.toml b/crates/services/operatorsinfo/Cargo.toml index 7789f160..d4e9e34e 100644 --- a/crates/services/operatorsinfo/Cargo.toml +++ b/crates/services/operatorsinfo/Cargo.toml @@ -10,6 +10,7 @@ license-file.workspace = true [dependencies] ethers.workspace = true +eyre.workspace = true eth-keystore = "0.5.0" thiserror.workspace = true eigensdk-client-avsregistry.workspace = true @@ -17,4 +18,10 @@ eigensdk-types.workspace = true eigensdk-contract-bindings.workspace = true eigensdk-crypto-bls.workspace = true eigensdk-crypto-bn254.workspace = true -tokio.workspace = true \ No newline at end of file +alloy-sol-types.workspace = true +alloy-contract.workspace = true +alloy-primitives.workspace = true +alloy-provider.workspace = true +alloy-transport-ws.workspace = true +futures-util.workspace = true +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } \ No newline at end of file diff --git a/crates/services/operatorsinfo/src/operatorsinfo_inmemory.rs b/crates/services/operatorsinfo/src/operatorsinfo_inmemory.rs index 5f2c88fe..5c886458 100644 --- a/crates/services/operatorsinfo/src/operatorsinfo_inmemory.rs +++ b/crates/services/operatorsinfo/src/operatorsinfo_inmemory.rs @@ -1,25 +1,27 @@ +use alloy_sol_types::sol; use eigensdk_client_avsregistry::{ reader::AvsRegistryChainReader, subscriber::AvsRegistryChainSubscriber, }; -use eigensdk_contract_bindings::BLSApkRegistry::BLSApkRegistryEvents; + +// use eigensdk_types::{G1Point,G2Point}; +use alloy_primitives::Address; +use alloy_provider::{Provider, ProviderBuilder}; +use alloy_transport_ws::WsConnect; +use eigensdk_types::operator::BLSApkRegistry::{self, G1Point, G2Point}; use eigensdk_types::operator::{operator_id_from_g1_pub_key, OperatorPubKeys}; -use ethers::contract::EthLogDecode; -use ethers_core::{abi::RawLog, types::Address}; -use ethers_providers::StreamExt; -use ethers_providers::{Middleware, Provider, Ws}; +use eyre::Result; +use futures_util::{stream, StreamExt}; use std::collections::HashMap; -use std::sync::Arc; use tokio::sync::{ mpsc, mpsc::UnboundedSender, oneshot::{self, Sender}, }; - -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct OperatorInfoServiceInMemory { avs_registry_reader: AvsRegistryChainReader, avs_registry_subscriber: AvsRegistryChainSubscriber, - web_socket: Arc>, + ws: String, pub_keys: UnboundedSender, } @@ -34,7 +36,7 @@ impl OperatorInfoServiceInMemory { pub async fn new( avs_registry_subscriber: AvsRegistryChainSubscriber, avs_registry_chain_reader: AvsRegistryChainReader, - web_socket: Arc>, + web_socket: String, ) -> Self { let (pubkeys_tx, mut pubkeys_rx) = mpsc::unbounded_channel(); @@ -64,42 +66,59 @@ impl OperatorInfoServiceInMemory { Self { avs_registry_reader: avs_registry_chain_reader, avs_registry_subscriber: avs_registry_subscriber, - web_socket, + ws: web_socket, pub_keys: pubkeys_tx, } } - pub async fn start_service(&self) { + #[tokio::main] + pub async fn start_service(&self) -> Result<()> { // query past operator registrations self.query_past_registered_operator_events_and_fill_db() .await; - let filter = self + let filter_result = self .avs_registry_subscriber - .get_new_pub_key_registration_filter(self.web_socket.clone()) + .get_new_pub_key_registration_filter() .await; - let mut subcription_new_operator_registration_stream = - self.web_socket.subscribe_logs(&filter).await.unwrap(); - - while let Some(log) = subcription_new_operator_registration_stream.next().await { - let data = BLSApkRegistryEvents::decode_log(&RawLog::from(log)).unwrap(); - - match data { - BLSApkRegistryEvents::NewPubkeyRegistrationFilter(pubkeyreg) => { - let operator_pub_key = OperatorPubKeys { - g1_pub_key: pubkeyreg.pubkey_g1, - g2_pub_key: pubkeyreg.pubkey_g2, - }; - // send message - let _ = self.pub_keys.send(OperatorsInfoMessage::InsertOperatorInfo( - pubkeyreg.operator, - operator_pub_key, - )); + match filter_result { + Ok(filter) => { + let ws = WsConnect::new(&self.ws); + let provider = ProviderBuilder::new().on_ws(ws).await?; + + let mut subcription_new_operator_registration_stream = + provider.subscribe_logs(&filter).await?; + let mut stream = subcription_new_operator_registration_stream.into_stream(); + while let Some(log) = stream.next().await { + let data = log + .log_decode::() + .ok(); + + if let Some(new_pub_key_event) = data { + let event_data = new_pub_key_event.data(); + let operator_pub_key = OperatorPubKeys { + g1_pub_key: G1Point { + X: event_data.pubkeyG1.X, + Y: event_data.pubkeyG1.Y, + }, + g2_pub_key: G2Point { + X: event_data.pubkeyG2.X, + Y: event_data.pubkeyG2.Y, + }, + }; + // send message + let _ = self.pub_keys.send(OperatorsInfoMessage::InsertOperatorInfo( + event_data.operator, + operator_pub_key, + )); + } } - _ => {} } + Err(_) => {} } + + Ok(()) } pub async fn get_operator_info(&self, address: Address) -> Option { @@ -111,10 +130,9 @@ impl OperatorInfoServiceInMemory { } pub async fn query_past_registered_operator_events_and_fill_db(&self) { - // Assuming ethers rs fetches data from first block . Have to validate this . let (operator_address, operator_pub_keys) = self .avs_registry_reader - .query_existing_registered_operator_pub_keys(None, None) + .query_existing_registered_operator_pub_keys(0, 0) .await .unwrap(); diff --git a/crates/types/src/avs.rs b/crates/types/src/avs.rs index 405890f4..b36298b3 100644 --- a/crates/types/src/avs.rs +++ b/crates/types/src/avs.rs @@ -1,8 +1,9 @@ +use alloy_primitives::FixedBytes; use eigensdk_crypto_bls::attestation::Signature; pub type TaskIndex = u32; -pub type TaskResponseDigest = [u8; 32]; +pub type TaskResponseDigest = FixedBytes<32>; #[derive(Debug, Clone)] pub struct SignedTaskResponseDigest { @@ -10,5 +11,5 @@ pub struct SignedTaskResponseDigest { pub bls_signature: Signature, - pub operator_id: [u8; 32], + pub operator_id: FixedBytes<32>, }