Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solver participation guard #3257

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5fe0dd6
Solver participation validator
squadgazzz Jan 29, 2025
5319945
Test
squadgazzz Jan 29, 2025
e65c328
Avoid rpc calls every time
squadgazzz Jan 29, 2025
fc3321b
Typo
squadgazzz Jan 29, 2025
0fbd61c
Docs
squadgazzz Jan 29, 2025
b1abfa0
Metrics
squadgazzz Jan 29, 2025
292dcff
Configurable validators
squadgazzz Jan 29, 2025
fe9ef5b
Fixed clap config
squadgazzz Jan 29, 2025
c5e3502
Refactoring
squadgazzz Jan 30, 2025
a9e6a3f
Config per solver
squadgazzz Jan 30, 2025
9a55fe2
Start using the new config
squadgazzz Jan 30, 2025
f9bdafd
Simplify to hashset
squadgazzz Jan 30, 2025
5fc831e
Nit
squadgazzz Jan 30, 2025
3154cd0
Use driver's name in metrics
squadgazzz Jan 31, 2025
47007c1
Nit
squadgazzz Jan 31, 2025
bb9059e
Send metrics about each found solver
squadgazzz Jan 31, 2025
6787d34
Cache only accepted solvers
squadgazzz Jan 31, 2025
a2710c6
Refactoring
squadgazzz Jan 31, 2025
1f43009
Fix the tests
squadgazzz Feb 3, 2025
0d50991
Merge branch 'main' into blacklist-failing-solvers
squadgazzz Feb 3, 2025
366611d
Nits
squadgazzz Feb 7, 2025
e9a70f5
Trigger updates on the proposed_solution table insert
squadgazzz Feb 11, 2025
e220eaf
Nit
squadgazzz Feb 11, 2025
51832d4
Formatting
squadgazzz Feb 11, 2025
17ee52c
infra::Persistence
squadgazzz Feb 12, 2025
cba693a
Naming
squadgazzz Feb 12, 2025
c3c9433
Comment
squadgazzz Feb 14, 2025
58d7de1
Merge branch 'main' into blacklist-failing-solvers
squadgazzz Feb 14, 2025
fdc3afe
Merge branch 'main' into blacklist-failing-solvers
squadgazzz Feb 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/autopilot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ chrono = { workspace = true }
clap = { workspace = true }
contracts = { path = "../contracts" }
cow-amm = { path = "../cow-amm" }
dashmap = { workspace = true }
database = { path = "../database" }
derive_more = { workspace = true }
ethcontract = { workspace = true }
Expand Down
51 changes: 51 additions & 0 deletions crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,51 @@ pub struct Arguments {
/// Archive node URL used to index CoW AMM
#[clap(long, env)]
pub archive_node_url: Option<Url>,

/// Configuration for the solver participation guard.
#[clap(flatten)]
pub solver_participation_guard: SolverParticipationGuardConfig,
}

#[derive(Debug, clap::Parser)]
pub struct SolverParticipationGuardConfig {
#[clap(flatten)]
pub db_based_validator: DbBasedValidatorConfig,

#[clap(flatten)]
pub onchain_based_validator: OnchainBasedValidatorConfig,
}

#[derive(Debug, clap::Parser)]
pub struct DbBasedValidatorConfig {
/// Enables or disables the solver participation guard
#[clap(
id = "db_enabled",
long = "db-based-solver-participation-guard-enabled",
env = "DB_BASED_SOLVER_PARTICIPATION_GUARD_ENABLED",
default_value = "true"
)]
pub enabled: bool,

/// The time-to-live for the solver participation blacklist cache.
#[clap(long, env, default_value = "5m", value_parser = humantime::parse_duration)]
pub solver_blacklist_cache_ttl: Duration,

/// The number of last auctions to check solver participation eligibility.
#[clap(long, env, default_value = "3")]
pub solver_last_auctions_participation_count: u32,
}

#[derive(Debug, Clone, clap::Parser)]
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
pub struct OnchainBasedValidatorConfig {
/// Enables or disables the solver participation guard
#[clap(
id = "onchain_enabled",
long = "onchain-based-solver-participation-guard-enabled",
env = "ONCHAIN_BASED_SOLVER_PARTICIPATION_GUARD_ENABLED",
default_value = "true"
)]
pub enabled: bool,
}

impl std::fmt::Display for Arguments {
Expand Down Expand Up @@ -287,6 +332,7 @@ impl std::fmt::Display for Arguments {
max_winners_per_auction,
archive_node_url,
max_solutions_per_solver,
solver_participation_guard,
} = self;

write!(f, "{}", shared)?;
Expand Down Expand Up @@ -370,6 +416,11 @@ impl std::fmt::Display for Arguments {
"max_solutions_per_solver: {:?}",
max_solutions_per_solver
)?;
writeln!(
f,
"solver_participation_guard: {:?}",
solver_participation_guard
)?;
Ok(())
}
}
Expand Down
24 changes: 24 additions & 0 deletions crates/autopilot/src/database/competition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,28 @@ impl super::Postgres {

Ok(())
}

/// Finds solvers that won `last_auctions_count` consecutive auctions but
/// never settled any of them. The current block is used to prevent
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
/// selecting auctions with deadline after the current block since they
/// still can be settled.
pub async fn find_non_settling_solvers(
&self,
last_auctions_count: u32,
current_block: u64,
) -> anyhow::Result<Vec<Address>> {
let mut ex = self.pool.acquire().await.context("acquire")?;
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["find_non_settling_solvers"])
.start_timer();

database::solver_competition::find_non_settling_solvers(
&mut ex,
last_auctions_count,
current_block,
)
.await
.context("solver_competition::find_non_settling_solvers")
}
}
6 changes: 5 additions & 1 deletion crates/autopilot/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ use {
};

mod participant;
mod solver_participation_guard;

pub use participant::{Participant, Ranked, Unranked};
pub use {
participant::{Participant, Ranked, Unranked},
solver_participation_guard::{DatabaseSolverParticipationValidator, SolverParticipationGuard},
};

type SolutionId = u64;

Expand Down
190 changes: 190 additions & 0 deletions crates/autopilot/src/domain/competition/solver_participation_guard.rs
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use {
crate::{
arguments::SolverParticipationGuardConfig,
database::Postgres,
domain::{eth, Metrics},
infra::Ethereum,
},
ethrpc::block_stream::CurrentBlockWatcher,
std::{
sync::Arc,
time::{Duration, Instant},
},
};

/// This struct checks whether a solver can participate in the competition by
/// using different validators.
#[derive(Clone)]
pub struct SolverParticipationGuard(Arc<Inner>);

struct Inner {
/// Stores the validators in order they will be called.
validators: Vec<Box<dyn SolverParticipationValidator + Send + Sync>>,
}

impl SolverParticipationGuard {
pub fn new(
eth: Ethereum,
db: Postgres,
settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
config: SolverParticipationGuardConfig,
) -> Self {
let mut validators: Vec<Box<dyn SolverParticipationValidator + Send + Sync>> = Vec::new();

if config.db_based_validator.enabled {
let current_block = eth.current_block().clone();
let database_solver_participation_validator = DatabaseSolverParticipationValidator::new(
db,
current_block,
settlement_updates_receiver,
config.db_based_validator.solver_blacklist_cache_ttl,
config
.db_based_validator
.solver_last_auctions_participation_count,
);
validators.push(Box::new(database_solver_participation_validator));
}

if config.onchain_based_validator.enabled {
let onchain_solver_participation_validator =
OnchainSolverParticipationValidator { eth };
validators.push(Box::new(onchain_solver_participation_validator));
}

Self(Arc::new(Inner { validators }))
}

/// Checks if a solver can participate in the competition.
/// Sequentially asks internal validators to avoid redundant RPC calls in
/// the following order:
/// 1. DatabaseSolverParticipationValidator - operates fast since it uses
/// in-memory cache.
/// 2. OnchainSolverParticipationValidator - only then calls the
/// Authenticator contract.
pub async fn can_participate(&self, solver: &eth::Address) -> anyhow::Result<bool> {
for validator in &self.0.validators {
if !validator.is_allowed(solver).await? {
return Ok(false);
}
}

Ok(true)
}
}

#[async_trait::async_trait]
trait SolverParticipationValidator: Send + Sync {
async fn is_allowed(&self, solver: &eth::Address) -> anyhow::Result<bool>;
}

/// Checks the DB by searching for solvers that won N last consecutive auctions
/// but never settled any of them.
#[derive(Clone)]
pub struct DatabaseSolverParticipationValidator(Arc<DatabaseSolverParticipationValidatorInner>);

struct DatabaseSolverParticipationValidatorInner {
db: Postgres,
banned_solvers: dashmap::DashMap<eth::Address, Instant>,
ttl: Duration,
last_auctions_count: u32,
}

impl DatabaseSolverParticipationValidator {
pub fn new(
db: Postgres,
current_block: CurrentBlockWatcher,
settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
ttl: Duration,
last_auctions_count: u32,
) -> Self {
let self_ = Self(Arc::new(DatabaseSolverParticipationValidatorInner {
db,
banned_solvers: Default::default(),
ttl,
last_auctions_count,
}));

self_.start_maintenance(settlement_updates_receiver, current_block);

self_
}

/// Update the internal cache only once the settlement table is updated to
/// avoid redundant DB queries.
fn start_maintenance(
&self,
mut settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
current_block: CurrentBlockWatcher,
) {
let self_ = self.0.clone();
tokio::spawn(async move {
while settlement_updates_receiver.recv().await.is_some() {
let current_block = current_block.borrow().number;
match self_
.db
.find_non_settling_solvers(self_.last_auctions_count, current_block)
.await
{
Ok(non_settling_solvers) => {
let non_settling_solvers = non_settling_solvers
.into_iter()
.map(|solver| {
let address = eth::Address(solver.0.into());

Metrics::get()
.non_settling_solver
.with_label_values(&[&format!("{:#x}", address.0)]);

address
})
.collect::<Vec<_>>();

tracing::debug!(?non_settling_solvers, "found non-settling solvers");

let now = Instant::now();
for solver in non_settling_solvers {
self_.banned_solvers.insert(solver, now);
}
}
Err(err) => {
tracing::warn!(?err, "error while searching for non-settling solvers")
}
}
}
});
}
}

#[async_trait::async_trait]
impl SolverParticipationValidator for DatabaseSolverParticipationValidator {
async fn is_allowed(&self, solver: &eth::Address) -> anyhow::Result<bool> {
if let Some(entry) = self.0.banned_solvers.get(solver) {
if Instant::now().duration_since(*entry.value()) < self.0.ttl {
return Ok(false);
} else {
self.0.banned_solvers.remove(solver);
}
}

Ok(true)
}
}

/// Calls Authenticator contract to check if a solver has a sufficient
/// permission.
struct OnchainSolverParticipationValidator {
eth: Ethereum,
}

#[async_trait::async_trait]
impl SolverParticipationValidator for OnchainSolverParticipationValidator {
async fn is_allowed(&self, solver: &eth::Address) -> anyhow::Result<bool> {
Ok(self
.eth
.contracts()
.authenticator()
.is_solver(solver.0)
.call()
.await?)
}
}
15 changes: 15 additions & 0 deletions crates/autopilot/src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,18 @@ pub use {
fee::ProtocolFees,
quote::Quote,
};

#[derive(prometheus_metric_storage::MetricStorage)]
#[metric(subsystem = "domain")]
pub struct Metrics {
/// How many times the solver marked as non-settling based on the database
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
/// statistics.
#[metric(labels("solver"))]
pub non_settling_solver: prometheus::IntCounterVec,
}

impl Metrics {
fn get() -> &'static Self {
Metrics::instance(observe::metrics::get_storage_registry()).unwrap()
}
}
21 changes: 19 additions & 2 deletions crates/autopilot/src/domain/settlement/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,34 @@ use {
pub struct Observer {
eth: infra::Ethereum,
persistence: infra::Persistence,
settlement_updates_sender: tokio::sync::mpsc::UnboundedSender<()>,
}

impl Observer {
/// Creates a new Observer and asynchronously schedules the first update
/// run.
pub fn new(eth: infra::Ethereum, persistence: infra::Persistence) -> Self {
Self { eth, persistence }
pub fn new(
eth: infra::Ethereum,
persistence: infra::Persistence,
settlement_updates_sender: tokio::sync::mpsc::UnboundedSender<()>,
) -> Self {
Self {
eth,
persistence,
settlement_updates_sender,
}
}

/// Fetches all the available missing data needed for bookkeeping.
/// This needs to get called after indexing a new settlement event
/// since this code needs that data to already be present in the DB.
pub async fn update(&self) {
let mut updated = false;
loop {
match self.single_update().await {
Ok(true) => {
tracing::debug!("on settlement event updater ran and processed event");
updated = true;
// There might be more pending updates, continue immediately.
continue;
}
Expand All @@ -49,6 +60,12 @@ impl Observer {
}
}
}
if updated {
// Notify the solver participation guard that a settlement has been updated.
if let Err(err) = self.settlement_updates_sender.send(()) {
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
tracing::error!(?err, "failed to notify solver participation guard");
}
}
}

/// Update database for settlement events that have not been processed yet.
Expand Down
Loading
Loading