From d70dcdb1842903be57b53a00d5af719cd46f89db Mon Sep 17 00:00:00 2001 From: Beta <105949605+LevBeta@users.noreply.github.com> Date: Wed, 29 Jan 2025 15:13:33 +0000 Subject: [PATCH 01/19] Hotfix crossbar # Conflicts: # Cargo.lock # Cargo.toml # src/liquidator.rs # src/wrappers/liquidator_account.rs --- Cargo.lock | 32 ++++++++++++++++++++------- Cargo.toml | 2 +- src/crossbar.rs | 2 +- src/liquidator.rs | 35 +++++++++++++++++++++++------- src/marginfi_ixs.rs | 2 +- src/rebalancer.rs | 3 ++- src/wrappers/liquidator_account.rs | 7 ++++-- 7 files changed, 61 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3d5febc..a4bfd7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1639,6 +1639,20 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -4712,7 +4726,7 @@ checksum = "c5cc431df6cc1dd964134fa4ec7df765d3af3fae9c2148f96a3c4fb500290633" dependencies = [ "async-trait", "bincode", - "dashmap", + "dashmap 5.5.3", "futures", "futures-util", "indexmap 2.2.6", @@ -5952,9 +5966,9 @@ dependencies = [ [[package]] name = "switchboard-on-demand" -version = "0.1.15" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ddb8d623f0daa9be45ad8f44b7c23a85bb8c3743777b1db5d34a4fcfc478618" +checksum = "6bc728d0af2eefd2cb76a712413c41fe122c04d188442da0e13f6a1f48827342" dependencies = [ "arc-swap", "async-trait", @@ -5966,10 +5980,8 @@ dependencies = [ "lazy_static", "libsecp256k1 0.7.1", "log", - "num 0.4.3", "rust_decimal", "serde", - "serde_json", "sha2 0.10.8", "solana-address-lookup-table-program", "solana-program", @@ -5980,17 +5992,19 @@ dependencies = [ [[package]] name = "switchboard-on-demand-client" -version = "0.1.7" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8a4ee6334ce2e60bd5c4e0c9b0ba0269d31d17e76c7f546683be55622c16c0e" +checksum = "dfb24d22d91f03c7b13aa7da3cec1c11c8606a9fc0367abc0f150315f19f5e65" dependencies = [ "anyhow_ext", "arrayref", "base58", "base64 0.22.1", - "borsh 1.5.1", + "bincode", + "borsh 0.9.3", "bs58 0.4.0", "bytemuck", + "dashmap 6.1.0", "futures", "hex", "lazy_static", @@ -5999,10 +6013,12 @@ dependencies = [ "reqwest", "rust_decimal", "serde", + "serde_derive", "serde_json", "sha2 0.10.8", "solana-client", "solana-sdk", + "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7e524eb..ba4dd24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,7 +59,7 @@ yellowstone-grpc-proto = { git = "https://github.com/mrgnlabs/yellowstone-grpc", jito-protos = { git = "https://github.com/mrgnlabs/jito-rs", branch = "1.18.17" } jito-searcher-client = { git = "https://github.com/mrgnlabs/jito-rs", branch = "1.18.17" } switchboard-on-demand = "0.1.15" -switchboard-on-demand-client = "0.1.7" +switchboard-on-demand-client = "0.2.9" chrono = "0.4.38" hex = "0.4.3" url = "2.5.2" diff --git a/src/crossbar.rs b/src/crossbar.rs index a257073..adabdec 100644 --- a/src/crossbar.rs +++ b/src/crossbar.rs @@ -16,7 +16,7 @@ pub(crate) struct CrossbarMaintainer { impl CrossbarMaintainer { /// Creates a new CrossbarMaintainer empty instance pub fn new() -> Self { - let crossbar_client = CrossbarClient::default(None); + let crossbar_client = CrossbarClient::default(); Self { crossbar_client } } diff --git a/src/liquidator.rs b/src/liquidator.rs index 6181f73..63df631 100644 --- a/src/liquidator.rs +++ b/src/liquidator.rs @@ -273,7 +273,10 @@ impl Liquidator { accounts.sort_by(|a, b| a.profit.cmp(&b.profit)); accounts.reverse(); for account in accounts { - info!("Liquidating account {:?}", account.liquidate_account.address); + info!( + "Liquidating account {:?}", + account.liquidate_account.address + ); if let Err(e) = self .liquidator_account .liquidate( @@ -331,13 +334,14 @@ impl Liquidator { let (deposit_shares, liabs_shares) = account.get_deposits_and_liabilities_shares(); - let deposit_values = self - .get_value_of_shares( - deposit_shares, - &BalanceSide::Assets, - RequirementType::Maintenance, - ) - .unwrap(); + let deposit_values = match self.get_value_of_shares( + deposit_shares, + &BalanceSide::Assets, + RequirementType::Maintenance, + ) { + Ok(values) => values, + Err(_) => return None, + }; if deposit_values .iter() @@ -756,6 +760,21 @@ impl Liquidator { .collect(); for (bank_address, bank) in banks.iter() { + if bank.config.oracle_setup == OracleSetup::StakedWithPythPush { + continue; + } + + let oracle_keys_excluded_default = bank + .config + .oracle_keys + .iter() + .filter(|key| *key != &Pubkey::default()) + .collect::>(); + + if oracle_keys_excluded_default.len() > 1 { + continue; + } + let (oracle_address, mut oracle_account) = { let oracle_addresses = find_oracle_keys(&bank.config); let mut oracle_account = None; diff --git a/src/marginfi_ixs.rs b/src/marginfi_ixs.rs index 2d63357..4f7fd9f 100644 --- a/src/marginfi_ixs.rs +++ b/src/marginfi_ixs.rs @@ -166,7 +166,7 @@ pub fn make_liquidate_ix( asset_amount: u64, ) -> Instruction { let mut accounts = marginfi::accounts::LendingAccountLiquidate { - marginfi_group, + group: marginfi_group, liquidator_marginfi_account: marginfi_account, signer, liquidatee_marginfi_account, diff --git a/src/rebalancer.rs b/src/rebalancer.rs index b8a3be5..9ff304a 100644 --- a/src/rebalancer.rs +++ b/src/rebalancer.rs @@ -48,8 +48,8 @@ use std::{ sync::{atomic::AtomicBool, Arc}, }; use switchboard_on_demand::PullFeedAccountData; -use switchboard_on_demand_client::QueueAccountData; use switchboard_on_demand_client::{FetchUpdateManyParams, Gateway, PullFeed}; +use switchboard_on_demand_client::{QueueAccountData, SbContext}; /// The rebalancer is responsible to keep the liquidator account /// "rebalanced" -> Document this better pub struct Rebalancer { @@ -358,6 +358,7 @@ impl Rebalancer { if !active_swb_oracles.is_empty() { if let Ok((ix, lut)) = PullFeed::fetch_update_many_ix( + SbContext::new(), &self.liquidator_account.non_blocking_rpc_client, FetchUpdateManyParams { feeds: active_swb_oracles, diff --git a/src/wrappers/liquidator_account.rs b/src/wrappers/liquidator_account.rs index aa08fda..230c90c 100644 --- a/src/wrappers/liquidator_account.rs +++ b/src/wrappers/liquidator_account.rs @@ -4,7 +4,6 @@ use crate::{ marginfi_ixs::{make_deposit_ix, make_liquidate_ix, make_repay_ix, make_withdraw_ix}, transaction_manager::{BatchTransactions, RawTransaction}, }; -use solana_sdk::commitment_config::CommitmentConfig; use anchor_spl::associated_token::spl_associated_token_account::instruction::create_associated_token_account; use crossbeam::channel::Sender; use marginfi::state::{marginfi_account::MarginfiAccount, marginfi_group::BankVaultType}; @@ -13,6 +12,7 @@ use solana_client::{ nonblocking::rpc_client::RpcClient as NonBlockingRpcClient, rpc_client::RpcClient, }; use solana_program::pubkey::Pubkey; +use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::{ instruction::Instruction, pubkey, @@ -21,7 +21,9 @@ use solana_sdk::{ system_instruction::transfer, }; use std::{collections::HashMap, str::FromStr, sync::Arc}; -use switchboard_on_demand_client::{FetchUpdateManyParams, Gateway, PullFeed, QueueAccountData}; +use switchboard_on_demand_client::{ + FetchUpdateManyParams, Gateway, PullFeed, QueueAccountData, SbContext, +}; /// Wraps the liquidator account into a dedicated strecture pub struct LiquidatorAccount { @@ -143,6 +145,7 @@ impl LiquidatorAccount { let crank_data = if !observation_swb_oracles.is_empty() { if let Ok((ix, luts)) = PullFeed::fetch_update_many_ix( + SbContext::new(), &self.non_blocking_rpc_client, FetchUpdateManyParams { feeds: observation_swb_oracles, From 29fef0be0f4429b7fe224f9ab35a3d5031e01beb Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Thu, 30 Jan 2025 13:40:03 +0100 Subject: [PATCH 02/19] cleanup # Conflicts: # src/crossbar.rs --- README.md | 23 +++++++++++++++++------ src/crossbar.rs | 1 + src/liquidator.rs | 29 ----------------------------- src/transaction_manager.rs | 5 ++++- 4 files changed, 22 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index 258c187..ad37e46 100644 --- a/README.md +++ b/README.md @@ -6,11 +6,12 @@ Marginfi liquidator ## Deployment Guide ### Installing dependencies -Ubuntu +Ubuntu: ```bash sudo apt install build-essential libssl-dev pkg-config unzip ``` +All systems: Follow this instructions https://grpc.io/docs/protoc-installation/#install-pre-compiled-binaries-any-os; @@ -19,7 +20,13 @@ Follow this instructions To initiate the creation of a new configuration file for the liquidator, execute the following command in your terminal: ```bash - eva01 setup + ./eva01 setup +``` + +or through cargo: + +```bash + cargo run -- setup ``` This command launches a Mini CLI wizard that guides you through setting up a base configuration for the liquidator. During this process, it will also check if you have a MarginfiAccount initialized. If not, it will prompt you to create one. At this stage, the setup will only request the essential variables. For adjusting settings like `Minimum Profit`, you'll need to manually edit the configuration file afterward. @@ -30,7 +37,13 @@ Now, lets start the liquidator Once your configuration file is ready, you can start the liquidator by running: ```bash - eva01 run + ./eva01 run +``` + +or: + +```bash + cargo run -- run ``` Replace `` with the path to your newly created configuration file. After initiating this command, Eva begins its operation. Please note that it might take a few minutes for Eva to load all the marginfi accounts, including English support, and to be fully operational. @@ -38,6 +51,7 @@ Replace `` with the path to your newly created configuration file. ### Initial Loading Time The initial loading phase can take some time, depending on your RPC. Eva will load everything needed into the state, including all Marginfi Accounts. Expect the loading time to be between 1-3 minutes depending on the RPC. + ## Eva01 Configuration To run eva you need to add configuration variables first. @@ -51,6 +65,3 @@ The following are mandatory to run Eva - `KEYPAIR_PATH` The wallet keypair for the liquidator. It is a string that is the path of the file containing the Keypair object. - `SIGNER_PUBKEY` The pubkey corresponding to the keypair - `LIQUIDATOR_ACCOUNT` The marginfi account corresponding to the `SIGNER_PUBKEY` - - - diff --git a/src/crossbar.rs b/src/crossbar.rs index adabdec..ace550f 100644 --- a/src/crossbar.rs +++ b/src/crossbar.rs @@ -1,3 +1,4 @@ +use log::error; use solana_sdk::pubkey::Pubkey; use std::{ collections::HashMap, diff --git a/src/liquidator.rs b/src/liquidator.rs index 63df631..8e09d56 100644 --- a/src/liquidator.rs +++ b/src/liquidator.rs @@ -56,7 +56,6 @@ pub struct Liquidator { general_config: GeneralConfig, config: LiquidatorCfg, geyser_receiver: Receiver, - transaction_sender: Sender, marginfi_accounts: HashMap, banks: HashMap, oracle_to_bank: HashMap, @@ -65,33 +64,6 @@ pub struct Liquidator { cache_oracle_needed_accounts: HashMap, } -#[derive(Clone)] -pub struct LiquidatableAccount<'a> { - account: &'a MarginfiAccountWrapper, - asset_bank_pk: Pubkey, - liab_bank_pk: Pubkey, - max_liquidation_amount: I80F48, - profit: I80F48, -} - -impl<'a> LiquidatableAccount<'a> { - pub fn new( - account: &'a MarginfiAccountWrapper, - asset_bank_pk: Pubkey, - liab_bank_pk: Pubkey, - max_liquidation_amount: I80F48, - profit: I80F48, - ) -> LiquidatableAccount { - Self { - account, - asset_bank_pk, - liab_bank_pk, - max_liquidation_amount, - profit, - } - } -} - pub struct PreparedLiquidatableAccount { liquidate_account: MarginfiAccountWrapper, asset_bank: BankWrapper, @@ -123,7 +95,6 @@ impl Liquidator { general_config, config: liquidator_config, geyser_receiver, - transaction_sender, marginfi_accounts: HashMap::new(), banks: HashMap::new(), liquidator_account, diff --git a/src/transaction_manager.rs b/src/transaction_manager.rs index 7f11a3e..62b4d78 100644 --- a/src/transaction_manager.rs +++ b/src/transaction_manager.rs @@ -80,7 +80,10 @@ impl RawTransaction { impl TransactionManager { /// Creates a new transaction manager pub async fn new(rx: Receiver, config: GeneralConfig) -> Self { - let keypair = read_keypair_file(&config.keypair_path).unwrap(); + let keypair = read_keypair_file(&config.keypair_path).map_err(|e| { + error!("Failed to read keypair file: {:?}", e); + e + }).unwrap(); let mut searcher_client = get_searcher_client_no_auth(&config.block_engine_url) .await .unwrap(); From 1b5c72efe6d9c138b1149466d2d0184c354a479b Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Mon, 3 Feb 2025 21:10:30 +0100 Subject: [PATCH 03/19] fix after rebase --- Cargo.lock | 2 +- src/liquidator.rs | 29 +++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index a4bfd7d..7c1e74d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2924,7 +2924,7 @@ dependencies = [ [[package]] name = "marginfi" version = "0.1.0" -source = "git+https://github.com/mrgnlabs/marginfi-v2#bd7efdadb0bdf9cf0f9e4299f9225dcdadc17635" +source = "git+https://github.com/mrgnlabs/marginfi-v2#933e11ee9b49fbb2d98ec12c6f48189c43168dd6" dependencies = [ "anchor-lang 0.29.0", "anchor-lang 0.30.1", diff --git a/src/liquidator.rs b/src/liquidator.rs index 8e09d56..63df631 100644 --- a/src/liquidator.rs +++ b/src/liquidator.rs @@ -56,6 +56,7 @@ pub struct Liquidator { general_config: GeneralConfig, config: LiquidatorCfg, geyser_receiver: Receiver, + transaction_sender: Sender, marginfi_accounts: HashMap, banks: HashMap, oracle_to_bank: HashMap, @@ -64,6 +65,33 @@ pub struct Liquidator { cache_oracle_needed_accounts: HashMap, } +#[derive(Clone)] +pub struct LiquidatableAccount<'a> { + account: &'a MarginfiAccountWrapper, + asset_bank_pk: Pubkey, + liab_bank_pk: Pubkey, + max_liquidation_amount: I80F48, + profit: I80F48, +} + +impl<'a> LiquidatableAccount<'a> { + pub fn new( + account: &'a MarginfiAccountWrapper, + asset_bank_pk: Pubkey, + liab_bank_pk: Pubkey, + max_liquidation_amount: I80F48, + profit: I80F48, + ) -> LiquidatableAccount { + Self { + account, + asset_bank_pk, + liab_bank_pk, + max_liquidation_amount, + profit, + } + } +} + pub struct PreparedLiquidatableAccount { liquidate_account: MarginfiAccountWrapper, asset_bank: BankWrapper, @@ -95,6 +123,7 @@ impl Liquidator { general_config, config: liquidator_config, geyser_receiver, + transaction_sender, marginfi_accounts: HashMap::new(), banks: HashMap::new(), liquidator_account, From b394921bef99b5ea27175853435da60328b69b7d Mon Sep 17 00:00:00 2001 From: Jakob Date: Sun, 8 Sep 2024 16:51:33 +0000 Subject: [PATCH 04/19] fix: build and deploy docker image in ci --- .github/workflows/docker-build-gcp.yml | 48 ++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 .github/workflows/docker-build-gcp.yml diff --git a/.github/workflows/docker-build-gcp.yml b/.github/workflows/docker-build-gcp.yml new file mode 100644 index 0000000..b67dd55 --- /dev/null +++ b/.github/workflows/docker-build-gcp.yml @@ -0,0 +1,48 @@ +name: Build and Push Docker Image to GCP Artifact Registry + +# Trigger the workflow only when a git tag following SemVer is pushed +on: + push: + tags: + - 'v[0-9]+.[0-9]+.[0-9]+' + +jobs: + build: + runs-on: ubuntu-latest + + steps: + # Checkout the repository + - name: Checkout code + uses: actions/checkout@v3 + + # Set up Google Cloud authentication + - name: Set up GCP authentication + uses: google-github-actions/auth@v1 + with: + credentials_json: '${{ secrets.GCP_SERVICE_ACCOUNT_KEY }}' + + # Configure Docker to use gcloud as a credential helper + - name: Configure Docker for Google Artifact Registry + run: | + gcloud auth configure-docker us-east1-docker.pkg.dev + + # Extract version tag without the 'v' prefix + - name: Extract version without 'v' prefix + run: echo "VERSION=${GITHUB_REF#refs/tags/v}" >> $GITHUB_ENV + + # Build the Docker image using the version without the 'v' and tag it as 'latest' as well + - name: Build Docker image + run: | + docker build -t us-east1-docker.pkg.dev/marginfi-dev/main/eva01:${{ env.VERSION }} \ + -t us-east1-docker.pkg.dev/marginfi-dev/main/eva01:latest . + + # Push the Docker image to Artifact Registry with the version tag + - name: Push Docker image with version tag to Artifact Registry + run: | + docker push us-east1-docker.pkg.dev/marginfi-dev/main/eva01:${{ env.VERSION }} + + # Push the Docker image to Artifact Registry with the 'latest' tag + - name: Push Docker image with 'latest' tag to Artifact Registry + run: | + docker push us-east1-docker.pkg.dev/marginfi-dev/main/eva01:latest + From 4a48f370971c44ebbaa095f6aeb6cd6ea0b1a067 Mon Sep 17 00:00:00 2001 From: Jakob Date: Tue, 10 Sep 2024 06:28:35 +0000 Subject: [PATCH 05/19] fix: restart deployment --- .github/workflows/docker-build-gcp.yml | 27 ++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/.github/workflows/docker-build-gcp.yml b/.github/workflows/docker-build-gcp.yml index b67dd55..78e1561 100644 --- a/.github/workflows/docker-build-gcp.yml +++ b/.github/workflows/docker-build-gcp.yml @@ -45,4 +45,31 @@ jobs: - name: Push Docker image with 'latest' tag to Artifact Registry run: | docker push us-east1-docker.pkg.dev/marginfi-dev/main/eva01:latest + deploy: + runs-on: ubuntu-latest + needs: build + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Set up kubectl + uses: azure/setup-kubectl@v3 + with: + version: 'latest' + + - name: Set up Kubeconfig + env: + KUBECONFIG: ${{ secrets.KUBECONFIG }} + run: | + echo "$KUBECONFIG" > ~/.kube/config + + - name: Get all eva01-* deployments + id: get-deployments + run: | + kubectl get deployments --no-headers -o custom-columns=":metadata.name" | grep '^eva01-' > deployments.txt + + - name: Restart all eva01-* deployments + run: | + cat deployments.txt | xargs -I {} kubectl rollout restart deployment {} From f7e1fdbddc4d160bccd1911f21edb5c34d211fa7 Mon Sep 17 00:00:00 2001 From: Jakob Date: Tue, 10 Sep 2024 06:52:28 +0000 Subject: [PATCH 06/19] fix: deployment --- .github/workflows/docker-build-gcp.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/docker-build-gcp.yml b/.github/workflows/docker-build-gcp.yml index 78e1561..855a339 100644 --- a/.github/workflows/docker-build-gcp.yml +++ b/.github/workflows/docker-build-gcp.yml @@ -62,6 +62,7 @@ jobs: env: KUBECONFIG: ${{ secrets.KUBECONFIG }} run: | + mkdir -p ~/.kube echo "$KUBECONFIG" > ~/.kube/config - name: Get all eva01-* deployments From b7496798b6db7a76389956764dbc07e87971daae Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Tue, 4 Feb 2025 14:57:07 +0100 Subject: [PATCH 07/19] fix warnings and clean up --- .gitignore | 2 +- .vscode/launch.json | 2 +- src/cli/setup/initialize.rs | 28 ------------------- src/cli/setup/mod.rs | 3 --- src/config.rs | 7 ----- src/crossbar.rs | 6 +---- src/liquidator.rs | 43 +++++------------------------- src/rebalancer.rs | 17 +++++------- src/transaction_manager.rs | 10 ++++--- src/utils.rs | 2 +- src/wrappers/liquidator_account.rs | 5 ++-- 11 files changed, 26 insertions(+), 99 deletions(-) delete mode 100644 src/cli/setup/initialize.rs diff --git a/.gitignore b/.gitignore index 4972af9..c400614 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ /target .idea/ .env -cfg.toml \ No newline at end of file +config.toml \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index 5a1755d..4d4d8e8 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -19,7 +19,7 @@ "kind": "bin" } }, - "args": ["run", "cfg.toml"], + "args": ["run", "config.toml"], "cwd": "${workspaceFolder}" }, { diff --git a/src/cli/setup/initialize.rs b/src/cli/setup/initialize.rs deleted file mode 100644 index 702397d..0000000 --- a/src/cli/setup/initialize.rs +++ /dev/null @@ -1,28 +0,0 @@ -use crate::{ - marginfi_ixs::make_initialize_ix, - sender::{SenderCfg, TransactionSender}, -}; -use solana_client::rpc_client::RpcClient; -use solana_sdk::{ - pubkey::Pubkey, - signature::Signature, - signature::{Keypair, Signer}, -}; -use std::sync::Arc; - -pub fn initialize_marginfi_account( - rpc_client: Arc, - signer: Arc, - marginfi_program_id: Pubkey, - marginfi_group_id: Pubkey, - send_cfg: SenderCfg, -) -> anyhow::Result { - let signer_pk = signer.pubkey(); - - let initialize_ix = make_initialize_ix(marginfi_program_id, marginfi_group_id, signer_pk); - - let sig = TransactionSender::send_ix(rpc_client, initialize_ix, signer, None, send_cfg) - .map_err(|e| anyhow::anyhow!("Coulnd't send the transaction: {}", e))?; - - Ok(sig) -} diff --git a/src/cli/setup/mod.rs b/src/cli/setup/mod.rs index 30df4f6..ab24921 100644 --- a/src/cli/setup/mod.rs +++ b/src/cli/setup/mod.rs @@ -17,9 +17,6 @@ use solana_program::pubkey::Pubkey; use solana_sdk::signature::{read_keypair_file, Signer}; use std::{ops::Not, path::PathBuf, str::FromStr}; -/// Helper for initializing Marginfi Account -pub mod initialize; - lazy_static! { static ref DEFAULT_CONFIG_PATH: PathBuf = { let mut path = dirs::home_dir().expect("Couldn't find the config directory"); diff --git a/src/config.rs b/src/config.rs index 59d6114..564ad07 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,7 +4,6 @@ use crate::{ fixed_from_float, fixed_to_float, from_option_vec_pubkey_string, from_pubkey_string, from_vec_str_to_pubkey, pubkey_to_str, vec_pubkey_to_option_vec_str, vec_pubkey_to_str, }, - wrappers::marginfi_account::TxConfig, }; use fixed::types::I80F48; use fixed_macro::types::I80F48; @@ -160,12 +159,6 @@ impl GeneralConfig { pubkey!("5FuKF7C1tJji2mXZuJ14U9oDb37is5mmvYLf4KwojoF1"), ] } - - pub fn get_tx_config(&self) -> TxConfig { - TxConfig { - compute_unit_price_micro_lamports: self.compute_unit_price_micro_lamports, - } - } } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] diff --git a/src/crossbar.rs b/src/crossbar.rs index ace550f..b68d442 100644 --- a/src/crossbar.rs +++ b/src/crossbar.rs @@ -1,9 +1,5 @@ -use log::error; use solana_sdk::pubkey::Pubkey; -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; +use std::collections::HashMap; use switchboard_on_demand_client::CrossbarClient; const CHUNK_SIZE: usize = 20; diff --git a/src/liquidator.rs b/src/liquidator.rs index 63df631..2b46bbc 100644 --- a/src/liquidator.rs +++ b/src/liquidator.rs @@ -56,7 +56,6 @@ pub struct Liquidator { general_config: GeneralConfig, config: LiquidatorCfg, geyser_receiver: Receiver, - transaction_sender: Sender, marginfi_accounts: HashMap, banks: HashMap, oracle_to_bank: HashMap, @@ -65,35 +64,8 @@ pub struct Liquidator { cache_oracle_needed_accounts: HashMap, } -#[derive(Clone)] -pub struct LiquidatableAccount<'a> { - account: &'a MarginfiAccountWrapper, - asset_bank_pk: Pubkey, - liab_bank_pk: Pubkey, - max_liquidation_amount: I80F48, - profit: I80F48, -} - -impl<'a> LiquidatableAccount<'a> { - pub fn new( - account: &'a MarginfiAccountWrapper, - asset_bank_pk: Pubkey, - liab_bank_pk: Pubkey, - max_liquidation_amount: I80F48, - profit: I80F48, - ) -> LiquidatableAccount { - Self { - account, - asset_bank_pk, - liab_bank_pk, - max_liquidation_amount, - profit, - } - } -} - pub struct PreparedLiquidatableAccount { - liquidate_account: MarginfiAccountWrapper, + liquidatee_account: MarginfiAccountWrapper, asset_bank: BankWrapper, liab_bank: BankWrapper, asset_amount: u64, @@ -113,7 +85,7 @@ impl Liquidator { let liquidator_account = LiquidatorAccount::new( RpcClient::new(general_config.rpc_url.clone()), general_config.liquidator_account, - transaction_sender.clone(), + transaction_sender, general_config.clone(), ) .await @@ -123,7 +95,6 @@ impl Liquidator { general_config, config: liquidator_config, geyser_receiver, - transaction_sender, marginfi_accounts: HashMap::new(), banks: HashMap::new(), liquidator_account, @@ -275,12 +246,12 @@ impl Liquidator { for account in accounts { info!( "Liquidating account {:?}", - account.liquidate_account.address + account.liquidatee_account.address ); if let Err(e) = self .liquidator_account .liquidate( - &account.liquidate_account, + &account.liquidatee_account, &account.asset_bank, &account.liab_bank, account.asset_amount, @@ -290,7 +261,7 @@ impl Liquidator { { info!( "Failed to liquidate account {:?}, error: {:?}", - account.liquidate_account.address, e + account.liquidatee_account.address, e ); } } @@ -332,7 +303,7 @@ impl Liquidator { return None; } - let (deposit_shares, liabs_shares) = account.get_deposits_and_liabilities_shares(); + let (deposit_shares, _) = account.get_deposits_and_liabilities_shares(); let deposit_values = match self.get_value_of_shares( deposit_shares, @@ -396,7 +367,7 @@ impl Liquidator { let slippage_adjusted_asset_amount = asset_amount_to_liquidate * I80F48!(0.95); Some(PreparedLiquidatableAccount { - liquidate_account: account.clone(), + liquidatee_account: account.clone(), asset_bank: asset_bank.clone(), liab_bank: liab_bank.clone(), asset_amount: slippage_adjusted_asset_amount.to_num(), diff --git a/src/rebalancer.rs b/src/rebalancer.rs index 9ff304a..73a8cde 100644 --- a/src/rebalancer.rs +++ b/src/rebalancer.rs @@ -32,9 +32,7 @@ use marginfi::{ price::{OraclePriceFeedAdapter, OracleSetup, PriceBias, SwitchboardPullPriceFeed}, }, }; -use solana_client::{ - nonblocking::rpc_client::RpcClient as NonBlockingRpcClient, rpc_client::RpcClient, -}; +use solana_client::rpc_client::RpcClient; use solana_program::pubkey::Pubkey; use solana_sdk::{ account::Account, account_info::IntoAccountInfo, clock::Clock, @@ -44,12 +42,11 @@ use solana_sdk::{ use std::{ cmp::min, collections::{HashMap, HashSet}, - str::FromStr, sync::{atomic::AtomicBool, Arc}, }; use switchboard_on_demand::PullFeedAccountData; -use switchboard_on_demand_client::{FetchUpdateManyParams, Gateway, PullFeed}; -use switchboard_on_demand_client::{QueueAccountData, SbContext}; +use switchboard_on_demand_client::SbContext; +use switchboard_on_demand_client::{FetchUpdateManyParams, PullFeed}; /// The rebalancer is responsible to keep the liquidator account /// "rebalanced" -> Document this better pub struct Rebalancer { @@ -377,10 +374,10 @@ impl Rebalancer { } } debug!("Rebalancing accounts"); - //self.sell_non_preferred_deposits().await?; - //self.repay_liabilities().await?; - //self.handle_tokens_in_token_accounts().await?; - //self.deposit_preferred_tokens().await?; + self.sell_non_preferred_deposits().await?; + self.repay_liabilities().await?; + self.handle_tokens_in_token_accounts().await?; + self.deposit_preferred_tokens().await?; Ok(()) } diff --git a/src/transaction_manager.rs b/src/transaction_manager.rs index 62b4d78..fc0bfea 100644 --- a/src/transaction_manager.rs +++ b/src/transaction_manager.rs @@ -80,10 +80,12 @@ impl RawTransaction { impl TransactionManager { /// Creates a new transaction manager pub async fn new(rx: Receiver, config: GeneralConfig) -> Self { - let keypair = read_keypair_file(&config.keypair_path).map_err(|e| { - error!("Failed to read keypair file: {:?}", e); - e - }).unwrap(); + let keypair = read_keypair_file(&config.keypair_path) + .map_err(|e| { + error!("Failed to read keypair file: {:?}", e); + e + }) + .unwrap(); let mut searcher_client = get_searcher_client_no_auth(&config.block_engine_url) .await .unwrap(); diff --git a/src/utils.rs b/src/utils.rs index 1d0f974..e718fc1 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -286,7 +286,7 @@ impl<'a> BankAccountWithPriceFeedEva<'a> { pub fn load( lending_account: &'a LendingAccount, banks: HashMap, - ) -> anyhow::Result> { + ) -> anyhow::Result>> { let active_balances = lending_account .balances .iter() diff --git a/src/wrappers/liquidator_account.rs b/src/wrappers/liquidator_account.rs index 230c90c..0013ce5 100644 --- a/src/wrappers/liquidator_account.rs +++ b/src/wrappers/liquidator_account.rs @@ -4,16 +4,15 @@ use crate::{ marginfi_ixs::{make_deposit_ix, make_liquidate_ix, make_repay_ix, make_withdraw_ix}, transaction_manager::{BatchTransactions, RawTransaction}, }; -use anchor_spl::associated_token::spl_associated_token_account::instruction::create_associated_token_account; use crossbeam::channel::Sender; use marginfi::state::{marginfi_account::MarginfiAccount, marginfi_group::BankVaultType}; -use solana_client::rpc_config::RpcSendTransactionConfig; use solana_client::{ nonblocking::rpc_client::RpcClient as NonBlockingRpcClient, rpc_client::RpcClient, + rpc_config::RpcSendTransactionConfig, }; use solana_program::pubkey::Pubkey; -use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::{ + commitment_config::CommitmentConfig, instruction::Instruction, pubkey, signature::{read_keypair_file, Keypair}, From 52124014159b980cb02a3fcda929373dbb6ab0b2 Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Tue, 4 Feb 2025 15:54:41 +0100 Subject: [PATCH 08/19] more cleanup --- src/geyser.rs | 174 +++++++++++++++---------------- src/marginfi_ixs.rs | 29 +----- src/sender.rs | 102 +----------------- src/transaction_manager.rs | 79 +------------- src/utils.rs | 100 +----------------- src/wrappers/bank.rs | 23 +--- src/wrappers/marginfi_account.rs | 15 --- src/wrappers/oracle.rs | 3 - src/wrappers/token_account.rs | 2 +- 9 files changed, 100 insertions(+), 427 deletions(-) diff --git a/src/geyser.rs b/src/geyser.rs index b702cd5..a03719a 100644 --- a/src/geyser.rs +++ b/src/geyser.rs @@ -71,107 +71,103 @@ impl GeyserService { let (_, mut stream) = client.subscribe_with_request(Some(sub_req)).await?; while let Some(msg) = stream.next().await { - match msg { - Ok(msg) => { - if let Some(update_oneof) = msg.update_oneof { - if let subscribe_update::UpdateOneof::Account(account) = update_oneof { - if let Some(update_account) = &account.account { - if let Ok(address) = - Pubkey::try_from(update_account.pubkey.clone()) - { - if let Ok(account) = - account_update_to_account(update_account) - { - if let Ok(account_owner_pk) = - Pubkey::try_from(account.owner) - { - if account_owner_pk == marginfi_program_id - && update_account.data.len() - == MARGIN_ACCOUNT_SIZE - { - let marginfi_account = - MarginfiAccount::try_deserialize( - &mut account.data.as_slice(), - ); - - match marginfi_account { - Err(_) => { - error!("Error deserializing marginfi account"); - continue; - } - Ok(marginfi_account) => { - if marginfi_account.group - != marginfi_group_pk - { - continue; - } - } - } - - let update = GeyserUpdate { - account_type: AccountType::MarginfiAccount, - address, - account: account.clone(), - }; - if let Err(e) = - liquidator_sender.send(update.clone()) - { - error!("Error sending update to the liquidator sender: {:?}", e); - } - if let Err(e) = - rebalancer_sender.send(update.clone()) - { - error!("Error sending update to the rebalancer sender: {:?}", e); - } - } - } - if let Some(account_type) = - tracked_accounts.get(&address) - { - let update = GeyserUpdate { - account_type: account_type.clone(), - address, - account: account.clone(), - }; - - match account_type { - AccountType::OracleAccount => { - if let Err(e) = - liquidator_sender.send(update.clone()) - { - error!("Error sending update to the liquidator sender: {:?}", e); - } - if let Err(e) = - rebalancer_sender.send(update.clone()) - { - error!("Error sending update to the rebalancer sender: {:?}", e); - } - } - AccountType::TokenAccount => { - if let Err(e) = - rebalancer_sender.send(update.clone()) - { - error!("Error sending update to the rebalancer sender: {:?}", e); - } - } - _ => {} - } + if let Err(e) = msg { + error!("Error receiving message from geyser {:?}", e); + break; + } + + if let Some(subscribe_update::UpdateOneof::Account(account)) = + msg.unwrap().update_oneof + { + if let Some(update_account) = &account.account { + if let Ok(address) = Pubkey::try_from(update_account.pubkey.clone()) { + if let Ok(account) = account_update_to_account(update_account) { + if account.owner == marginfi_program_id + && update_account.data.len() == MARGIN_ACCOUNT_SIZE + { + let marginfi_account = MarginfiAccount::try_deserialize( + &mut account.data.as_slice(), + ); + + match marginfi_account { + Err(_) => { + error!("Error deserializing marginfi account"); + continue; + } + Ok(marginfi_account) => { + if marginfi_account.group != marginfi_group_pk { + continue; } } } + + let update = GeyserUpdate { + account_type: AccountType::MarginfiAccount, + address, + account: account.clone(), + }; + if let Err(e) = liquidator_sender.send(update.clone()) { + error!( + "Error sending update to the liquidator sender: {:?}", + e + ); + } + if let Err(e) = rebalancer_sender.send(update.clone()) { + error!( + "Error sending update to the rebalancer sender: {:?}", + e + ); + } + } + + if let Some(account_type) = tracked_accounts.get(&address) { + Self::send_update( + &liquidator_sender, + &rebalancer_sender, + account_type.clone(), + address, + &account, + ); } } } } - Err(e) => { - error!("Error receiving message from geyser {:?}", e); - break; - } } } } } + fn send_update( + liquidator_sender: &Sender, + rebalancer_sender: &Sender, + account_type: AccountType, + address: Pubkey, + account: &Account, + ) { + let update = GeyserUpdate { + account_type, + address, + account: account.clone(), + }; + + match update.account_type { + AccountType::OracleAccount => { + if let Err(e) = liquidator_sender.send(update.clone()) { + error!("Error sending update to the liquidator sender: {:?}", e); + } + if let Err(e) = rebalancer_sender.send(update.clone()) { + error!("Error sending update to the rebalancer sender: {:?}", e); + } + } + AccountType::TokenAccount => { + if let Err(e) = rebalancer_sender.send(update.clone()) { + error!("Error sending update to the rebalancer sender: {:?}", e); + } + } + _ => {} + } + } + /// Builds a geyser subscription request payload fn build_geyser_subscribe_request( tracked_accounts: &[Pubkey], diff --git a/src/marginfi_ixs.rs b/src/marginfi_ixs.rs index 4f7fd9f..79b2539 100644 --- a/src/marginfi_ixs.rs +++ b/src/marginfi_ixs.rs @@ -1,34 +1,9 @@ -use anchor_lang::{system_program, InstructionData, Key, ToAccountMetas}; +use anchor_lang::{InstructionData, Key, ToAccountMetas}; use anchor_spl::token_2022; use log::trace; use solana_sdk::instruction::AccountMeta; -use solana_sdk::{ - instruction::Instruction, - pubkey::Pubkey, - signature::{Keypair, Signer}, -}; - -pub fn make_initialize_ix( - marginfi_program_id: Pubkey, - marginfi_group: Pubkey, - signer: Pubkey, -) -> Instruction { - let marginfi_account_key = Keypair::new(); - - Instruction { - program_id: marginfi_program_id, - accounts: marginfi::accounts::MarginfiAccountInitialize { - marginfi_group, - marginfi_account: marginfi_account_key.pubkey(), - system_program: system_program::ID, - authority: signer, - fee_payer: signer, - } - .to_account_metas(Some(true)), - data: marginfi::instruction::MarginfiAccountInitialize.data(), - } -} +use solana_sdk::{instruction::Instruction, pubkey::Pubkey}; pub fn make_deposit_ix( marginfi_program_id: Pubkey, diff --git a/src/sender.rs b/src/sender.rs index b12864a..a3328f0 100644 --- a/src/sender.rs +++ b/src/sender.rs @@ -1,16 +1,9 @@ -use crate::wrappers::marginfi_account::TxConfig; use log::{error, info}; use serde::Deserialize; use solana_client::rpc_client::{RpcClient, SerializableTransaction}; use solana_client::rpc_config::{RpcSendTransactionConfig, RpcSimulateTransactionConfig}; +use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::signature::Signature; -use solana_sdk::{ - commitment_config::CommitmentConfig, - compute_budget::ComputeBudgetInstruction, - instruction::Instruction, - signature::{Keypair, Signer}, - transaction::Transaction, -}; use std::time::Duration; use std::{error::Error, sync::Arc}; @@ -22,8 +15,6 @@ pub struct SenderCfg { skip_preflight: bool, #[serde(default = "SenderCfg::default_timeout")] timeout: Duration, - #[serde(default = "SenderCfg::default_transaction_type")] - transaction_type: TransactionType, } impl SenderCfg { @@ -31,14 +22,6 @@ impl SenderCfg { spam_times: 12, skip_preflight: false, timeout: Duration::from_secs(45), - transaction_type: TransactionType::Aggressive, - }; - - pub const PASSIVE: SenderCfg = SenderCfg { - spam_times: 0, // In passive mode no transaction is spammed - skip_preflight: false, - timeout: Duration::from_secs(45), - transaction_type: TransactionType::Passive, }; pub const fn default_spam_times() -> u64 { @@ -52,94 +35,11 @@ impl SenderCfg { const fn default_timeout() -> Duration { Self::DEFAULT.timeout } - - const fn default_transaction_type() -> TransactionType { - TransactionType::Aggressive - } } pub struct TransactionSender; -#[derive(Debug, Deserialize)] -pub enum TransactionType { - Aggressive, - Passive, -} - impl TransactionSender { - pub fn send_ix( - rpc_client: Arc, - ix: Instruction, - signer: Arc, - tx_config: Option, - cfg: SenderCfg, - ) -> Result> { - let recent_blockhash = rpc_client.get_latest_blockhash()?; - - let mut ixs = vec![ix]; - - if let Some(config) = tx_config { - let mut compute_budget_price_ix = - ComputeBudgetInstruction::set_compute_unit_price(1000); - - if let Some(price) = config.compute_unit_price_micro_lamports { - compute_budget_price_ix = ComputeBudgetInstruction::set_compute_unit_price(price); - } - - ixs.push(compute_budget_price_ix); - } - - let compute_budget_price_ix = ComputeBudgetInstruction::set_compute_unit_limit(500000); - ixs.push(compute_budget_price_ix); - - let tx = Transaction::new_signed_with_payer( - &ixs, - Some(&signer.pubkey()), - &[signer.as_ref()], - recent_blockhash, - ); - - match cfg.transaction_type { - TransactionType::Passive => Self::passive_send_tx(rpc_client, &tx, cfg), - TransactionType::Aggressive => Self::passive_send_tx(rpc_client, &tx, cfg), - } - } - - pub fn passive_send_tx( - rpc: Arc, - transaction: &impl SerializableTransaction, - cfg: SenderCfg, - ) -> Result> { - let signature = *transaction.get_signature(); - - info!("Sending transaction: {}", signature.to_string()); - - if !cfg.skip_preflight { - let res = rpc.simulate_transaction_with_config( - transaction, - RpcSimulateTransactionConfig { - commitment: Some(CommitmentConfig::processed()), - ..Default::default() - }, - )?; - - if res.value.err.is_some() { - error!("Failed to simulate transaction: {:#?}", res.value); - return Err("Transaction simulation failed".into()); - } - } - - rpc.send_transaction(transaction)?; - - let blockhash = transaction.get_recent_blockhash(); - - rpc.confirm_transaction_with_spinner(&signature, blockhash, CommitmentConfig::confirmed())?; - - info!("Confirmed transaction: {}", signature.to_string()); - - Ok(signature) - } - pub fn aggressive_send_tx( rpc: Arc, transaction: &impl SerializableTransaction, diff --git a/src/transaction_manager.rs b/src/transaction_manager.rs index fc0bfea..a10c3ff 100644 --- a/src/transaction_manager.rs +++ b/src/transaction_manager.rs @@ -7,10 +7,7 @@ use jito_protos::searcher::{ use jito_searcher_client::{get_searcher_client_no_auth, send_bundle_with_confirmation}; use log::{debug, error}; use solana_address_lookup_table_program::state::AddressLookupTable; -use solana_client::{ - nonblocking::rpc_client::RpcClient, rpc_client::RpcClient as NonBlockRpc, - rpc_client::SerializableTransaction, rpc_config::RpcSimulateTransactionConfig, -}; +use solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::RpcClient as NonBlockRpc}; use solana_sdk::{ address_lookup_table_account::AddressLookupTableAccount, commitment_config::CommitmentConfig, @@ -18,15 +15,12 @@ use solana_sdk::{ instruction::Instruction, message::{v0, VersionedMessage}, pubkey::Pubkey, - signature::{read_keypair_file, Keypair, Signature, Signer}, + signature::{read_keypair_file, Keypair, Signer}, system_instruction::transfer, transaction::VersionedTransaction, }; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, -}; -use std::{error::Error, str::FromStr}; +use std::str::FromStr; +use std::sync::{atomic::AtomicBool, Arc}; use tonic::transport::Channel; /// The leadership threshold related to the jito block engine @@ -195,53 +189,6 @@ impl TransactionManager { Ok(()) } - /// Implements a alternative solution to jito transactions - /// Sends a transaction to the network and waits for confirmation (non-jito) - fn send_agressive_tx(&self, mut ixs: Vec) -> Result> { - let recent_blockhash = self.non_block_rpc.get_latest_blockhash()?; - - ixs.push(ComputeBudgetInstruction::set_compute_unit_limit(500_000)); - - let transaction = VersionedTransaction::try_new( - VersionedMessage::V0(v0::Message::try_compile( - &self.keypair.pubkey(), - &ixs, - &self.lookup_tables, - recent_blockhash, - )?), - &[&self.keypair], - )?; - - let signature = *transaction.get_signature(); - - let simulation = self.non_block_rpc.simulate_transaction_with_config( - &transaction, - RpcSimulateTransactionConfig { - commitment: Some(CommitmentConfig::processed()), - ..Default::default() - }, - )?; - - if simulation.value.err.is_some() { - return Err(format!("Failed to simulate transaction {:?}", simulation.value).into()); - } - - (0..12).try_for_each(|_| { - self.non_block_rpc.send_transaction(&transaction)?; - Ok::<_, Box>(()) - })?; - - let blockhash = transaction.get_recent_blockhash(); - - self.non_block_rpc.confirm_transaction_with_spinner( - &signature, - blockhash, - CommitmentConfig::confirmed(), - )?; - - Ok(signature) - } - /// Configures the instructions /// Adds the compute budget instruction to each instruction /// and compiles the instructions into transactions @@ -253,7 +200,7 @@ impl TransactionManager { let blockhash = self.rpc.get_latest_blockhash().await?; let mut txs = Vec::new(); - for mut raw_transaction in instructions { + for raw_transaction in instructions { let mut ixs = raw_transaction.instructions; ixs.push(ComputeBudgetInstruction::set_compute_unit_limit(1_000_000)); ixs.push(transfer( @@ -279,22 +226,6 @@ impl TransactionManager { Ok(txs) } - /// Listen for the next leader and update the AtomicBool accordingly - async fn listen_for_leader(&mut self) -> anyhow::Result<()> { - loop { - let next_leader = self - .searcher_client - .get_next_scheduled_leader(NextScheduledLeaderRequest {}) - .await? - .into_inner(); - - let num_slots = next_leader.next_leader_slot - next_leader.current_slot; - - self.is_jito_leader - .store(num_slots <= LEADERSHIP_THRESHOLD, Ordering::Relaxed); - } - } - async fn get_tip_accounts( searcher_client: &mut SearcherServiceClient, ) -> anyhow::Result> { diff --git a/src/utils.rs b/src/utils.rs index e718fc1..c44b03c 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, Result}; use backoff::ExponentialBackoff; use fixed::types::I80F48; use marginfi::{ - bank_authority_seed, bank_seed, + bank_authority_seed, constants::{PYTH_PUSH_MARGINFI_SPONSORED_SHARD_ID, PYTH_PUSH_PYTH_SPONSORED_SHARD_ID}, prelude::MarginfiResult, state::{ @@ -18,7 +18,6 @@ use solana_client::rpc_config::RpcAccountInfoConfig; use solana_program::pubkey::Pubkey; use solana_sdk::{ account::Account, - account_info::AccountInfo, signature::{read_keypair_file, Keypair}, }; use std::{ @@ -26,7 +25,7 @@ use std::{ io::Write, path::PathBuf, str::FromStr, - sync::{atomic::AtomicUsize, Arc, RwLock}, + sync::{atomic::AtomicUsize, Arc}, }; use switchboard_on_demand::PullFeedAccountData; use url::Url; @@ -138,7 +137,7 @@ pub fn batch_get_multiple_accounts( } // Field parsers to save compute. All account validation is assumed to be done -// outside of these methods. +// outside these methods. pub mod accessor { use super::*; @@ -153,12 +152,6 @@ pub mod accessor { mint_bytes.copy_from_slice(&bytes[..32]); Pubkey::new_from_array(mint_bytes) } - - pub fn authority(bytes: &[u8]) -> Pubkey { - let mut owner_bytes = [0u8; 32]; - owner_bytes.copy_from_slice(&bytes[32..64]); - Pubkey::new_from_array(owner_bytes) - } } pub fn account_update_to_account(account_update: &SubscribeUpdateAccountInfo) -> Result { @@ -171,7 +164,8 @@ pub fn account_update_to_account(account_update: &SubscribeUpdateAccountInfo) -> .. } = account_update; - let owner = Pubkey::try_from(owner.clone()).expect("Invalid pubkey"); + let owner = Pubkey::try_from(owner.clone()) + .map_err(|e| anyhow!("Invalid pubkey: {:?}, error: {:?}", owner, e))?; let account = Account { lamports: *lamports, @@ -399,14 +393,6 @@ impl<'a> BankAccountWithPriceFeedEva<'a> { } } -pub fn find_bank_vault_pda( - bank_pk: &Pubkey, - vault_type: BankVaultType, - program_id: &Pubkey, -) -> (Pubkey, u8) { - Pubkey::find_program_address(bank_seed!(vault_type, bank_pk), program_id) -} - pub fn find_bank_vault_authority_pda( bank_pk: &Pubkey, vault_type: BankVaultType, @@ -454,46 +440,6 @@ pub fn calc_weighted_assets_new( )?) } -pub fn calc_weighted_assets( - bank_rw_lock: Arc>, - amount: I80F48, - requirement_type: RequirementType, -) -> anyhow::Result { - let bank_wrapper_ref = bank_rw_lock.read().unwrap(); - let oracle_adapter = &bank_wrapper_ref.oracle_adapter; - let mut asset_weight = bank_wrapper_ref - .bank - .config - .get_weight(requirement_type, BalanceSide::Assets); - - let price_bias = if matches!(requirement_type, RequirementType::Equity) { - None - } else { - Some(PriceBias::Low) - }; - - let lower_price = - oracle_adapter.get_price_of_type(requirement_type.get_oracle_price_type(), price_bias)?; - - if matches!(requirement_type, RequirementType::Initial) { - if let Some(discount) = bank_wrapper_ref - .bank - .maybe_get_asset_weight_init_discount(lower_price)? - { - asset_weight = asset_weight - .checked_mul(discount) - .ok_or_else(|| anyhow!("math error"))?; - } - } - - Ok(calc_value( - amount, - lower_price, - bank_wrapper_ref.bank.mint_decimals, - Some(asset_weight), - )?) -} - #[inline(always)] pub fn calc_weighted_liabs_new( bank: &BankWrapper, @@ -523,36 +469,6 @@ pub fn calc_weighted_liabs_new( )?) } -#[inline(always)] -pub fn calc_weighted_liabs( - bank_rw_lock: Arc>, - amount: I80F48, - requirement_type: RequirementType, -) -> anyhow::Result { - let bank_wrapper_ref = bank_rw_lock.read().unwrap(); - let bank = &bank_wrapper_ref.bank; - let oracle_adapter = &bank_wrapper_ref.oracle_adapter; - let liability_weight = bank - .config - .get_weight(requirement_type, BalanceSide::Liabilities); - - let price_bias = if matches!(requirement_type, RequirementType::Equity) { - None - } else { - Some(PriceBias::High) - }; - - let higher_price = - oracle_adapter.get_price_of_type(requirement_type.get_oracle_price_type(), price_bias)?; - - Ok(calc_value( - amount, - higher_price, - bank.mint_decimals, - Some(liability_weight), - )?) -} - pub fn find_oracle_keys(bank_config: &BankConfig) -> Vec { match bank_config.oracle_setup { marginfi::state::price::OracleSetup::PythPushOracle => { @@ -597,12 +513,6 @@ pub fn find_oracle_keys(bank_config: &BankConfig) -> Vec { } } -pub fn load_swb_pull_account(account_info: &AccountInfo) -> anyhow::Result { - let bytes = &account_info.data.borrow().to_vec()[8..std::mem::size_of::()]; - - Ok(load_swb_pull_account_from_bytes(bytes)?) -} - pub fn load_swb_pull_account_from_bytes(bytes: &[u8]) -> anyhow::Result { if bytes .as_ptr() diff --git a/src/wrappers/bank.rs b/src/wrappers/bank.rs index edbeb15..caa8c90 100644 --- a/src/wrappers/bank.rs +++ b/src/wrappers/bank.rs @@ -3,7 +3,7 @@ use fixed::types::I80F48; use marginfi::state::{ marginfi_account::{calc_amount, calc_value, BalanceSide, RequirementType}, marginfi_group::Bank, - price::{OraclePriceType, PriceAdapter, PriceBias}, + price::{OraclePriceType, PriceBias}, }; use solana_program::pubkey::Pubkey; @@ -84,25 +84,4 @@ impl BankWrapper { Ok(calc_value(amount, price, self.bank.mint_decimals, None)?) } - - pub fn calc_weighted_value( - &self, - amount: I80F48, - side: BalanceSide, - requirement_type: RequirementType, - ) -> anyhow::Result { - let (weight, price_bias, oracle_type) = self.get_pricing_params(side, requirement_type); - - let price = self - .oracle_adapter - .get_price_of_type(oracle_type, price_bias) - .unwrap(); - - Ok(calc_value( - amount, - price, - self.bank.mint_decimals, - Some(weight), - )?) - } } diff --git a/src/wrappers/marginfi_account.rs b/src/wrappers/marginfi_account.rs index 44f0c6e..0e81ab5 100644 --- a/src/wrappers/marginfi_account.rs +++ b/src/wrappers/marginfi_account.rs @@ -4,11 +4,6 @@ use marginfi::state::marginfi_account::{BalanceSide, MarginfiAccount}; use solana_program::pubkey::Pubkey; use std::collections::HashMap; -#[derive(Clone)] -pub struct TxConfig { - pub compute_unit_price_micro_lamports: Option, -} - #[derive(Clone)] pub struct MarginfiAccountWrapper { pub address: Pubkey, @@ -97,16 +92,6 @@ impl MarginfiAccountWrapper { Ok(balance) } - pub fn get_deposits_shares(&self) -> Vec<(I80F48, Pubkey)> { - self.account - .lending_account - .balances - .iter() - .filter(|b| matches!(b.get_side(), Some(BalanceSide::Assets)) & b.active) - .map(|b| (b.asset_shares.into(), b.bank_pk)) - .collect::>() - } - pub fn get_deposits_and_liabilities_shares( &self, ) -> (Vec<(I80F48, Pubkey)>, Vec<(I80F48, Pubkey)>) { diff --git a/src/wrappers/oracle.rs b/src/wrappers/oracle.rs index 58f0d96..efddb49 100644 --- a/src/wrappers/oracle.rs +++ b/src/wrappers/oracle.rs @@ -1,9 +1,6 @@ -use std::sync::Arc; - use fixed::types::I80F48; use marginfi::state::price::{OraclePriceFeedAdapter, OraclePriceType, PriceAdapter, PriceBias}; use solana_program::pubkey::Pubkey; -use tokio::sync::Mutex; #[derive(Clone)] pub struct OracleWrapper { diff --git a/src/wrappers/token_account.rs b/src/wrappers/token_account.rs index 25dbd2d..9d58430 100644 --- a/src/wrappers/token_account.rs +++ b/src/wrappers/token_account.rs @@ -1,6 +1,6 @@ use super::bank::BankWrapper; use fixed::types::I80F48; -use marginfi::{constants::EXP_10_I80F48, state::price::PriceAdapter}; +use marginfi::constants::EXP_10_I80F48; use solana_program::pubkey::Pubkey; pub struct TokenAccountWrapper { From af51309b01ccb8a88c756d017f59e66d01a922f6 Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Tue, 4 Feb 2025 22:29:49 +0100 Subject: [PATCH 09/19] satisfy clippy and introduce a convenience VS Code task --- .github/workflows/docker-build-gcp.yml | 3 +- .vscode/tasks.json | 15 ++++++ src/cli/app.rs | 1 + src/crossbar.rs | 29 ++++++----- src/geyser.rs | 12 ++--- src/liquidator.rs | 20 ++++---- src/marginfi_ixs.rs | 70 +++++++++++--------------- src/rebalancer.rs | 17 +++---- src/token_account_manager.rs | 3 +- src/utils.rs | 18 ++++--- src/wrappers/liquidator_account.rs | 35 ++++--------- src/wrappers/marginfi_account.rs | 6 +-- 12 files changed, 110 insertions(+), 119 deletions(-) create mode 100644 .vscode/tasks.json diff --git a/.github/workflows/docker-build-gcp.yml b/.github/workflows/docker-build-gcp.yml index 855a339..9de0bcd 100644 --- a/.github/workflows/docker-build-gcp.yml +++ b/.github/workflows/docker-build-gcp.yml @@ -58,12 +58,13 @@ jobs: with: version: 'latest' - - name: Set up Kubeconfig + - name: Set up & Validate Kubeconfig env: KUBECONFIG: ${{ secrets.KUBECONFIG }} run: | mkdir -p ~/.kube echo "$KUBECONFIG" > ~/.kube/config + kubectl config view --kubeconfig=~/.kube/config > /dev/null - name: Get all eva01-* deployments id: get-deployments diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..bcacef4 --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,15 @@ +{ + "version": "2.0.0", + "tasks": [ + { + "label": "Run (with all checks)", + "type": "shell", + "command": "cargo fmt && cargo clippy -- -D warnings && cargo build && cargo run -- run config.toml", + "problemMatcher": [], + "group": { + "kind": "build", + "isDefault": true + } + } + ] +} diff --git a/src/cli/app.rs b/src/cli/app.rs index dc773d2..7c8c0f1 100644 --- a/src/cli/app.rs +++ b/src/cli/app.rs @@ -10,6 +10,7 @@ pub struct Args { pub cmd: Commands, } +#[allow(clippy::large_enum_variant)] #[derive(Subcommand, Debug)] pub enum Commands { #[command(about = "Run the liquidator, by the given configuration file")] diff --git a/src/crossbar.rs b/src/crossbar.rs index b68d442..dbc848e 100644 --- a/src/crossbar.rs +++ b/src/crossbar.rs @@ -25,7 +25,7 @@ impl CrossbarMaintainer { // Create a fast lookup map from feed hash to oracle hash let feed_hash_to_oracle_hash_map: HashMap = feeds .iter() - .map(|(address, feed_hash)| (feed_hash.clone(), address.clone())) + .map(|(address, feed_hash)| (feed_hash.clone(), *address)) .collect(); let feed_hashes: Vec = feeds @@ -56,21 +56,20 @@ impl CrossbarMaintainer { let mut prices = Vec::new(); - for result in chunk_results { - if let Ok(chunk_result) = result { - for simulated_response in chunk_result { - if let Some(price) = calculate_price(simulated_response.results) { - prices.push(( - feed_hash_to_oracle_hash_map - .get(&simulated_response.feedHash) - .unwrap() - .clone(), - price, - )); - } + chunk_results + .into_iter() + .flatten() + .flatten() + .for_each(|simulated_response| { + if let Some(price) = calculate_price(simulated_response.results) { + prices.push(( + *feed_hash_to_oracle_hash_map + .get(&simulated_response.feedHash) + .unwrap(), + price, + )); } - } - } + }); prices } diff --git a/src/geyser.rs b/src/geyser.rs index a03719a..7e55645 100644 --- a/src/geyser.rs +++ b/src/geyser.rs @@ -28,9 +28,9 @@ pub struct GeyserUpdate { /// TokenAccount -> Rebalancer #[derive(Clone, Debug)] pub enum AccountType { - OracleAccount, - MarginfiAccount, - TokenAccount, + Oracle, + Marginfi, + Token, } pub struct GeyserServiceConfig { @@ -102,7 +102,7 @@ impl GeyserService { } let update = GeyserUpdate { - account_type: AccountType::MarginfiAccount, + account_type: AccountType::Marginfi, address, account: account.clone(), }; @@ -151,7 +151,7 @@ impl GeyserService { }; match update.account_type { - AccountType::OracleAccount => { + AccountType::Oracle => { if let Err(e) = liquidator_sender.send(update.clone()) { error!("Error sending update to the liquidator sender: {:?}", e); } @@ -159,7 +159,7 @@ impl GeyserService { error!("Error sending update to the rebalancer sender: {:?}", e); } } - AccountType::TokenAccount => { + AccountType::Token => { if let Err(e) = rebalancer_sender.send(update.clone()) { error!("Error sending update to the rebalancer sender: {:?}", e); } diff --git a/src/liquidator.rs b/src/liquidator.rs index 2b46bbc..65a425c 100644 --- a/src/liquidator.rs +++ b/src/liquidator.rs @@ -141,7 +141,7 @@ impl Liquidator { while let Ok(mut msg) = self.geyser_receiver.recv() { debug!("Received message {:?}", msg); match msg.account_type { - AccountType::OracleAccount => { + AccountType::Oracle => { if let Some(bank_to_update_pk) = self.oracle_to_bank.get(&msg.address) { let bank_to_update: &mut BankWrapper = self.banks.get_mut(bank_to_update_pk).unwrap(); @@ -217,7 +217,7 @@ impl Liquidator { bank_to_update.oracle_adapter.price_adapter = oracle_price_adapter; } } - AccountType::MarginfiAccount => { + AccountType::Marginfi => { let marginfi_account = bytemuck::from_bytes::(&msg.account.data[8..]); self.marginfi_accounts @@ -280,11 +280,10 @@ impl Liquidator { .banks .values() .filter_map(|bank| { - if let Some(feed_hash) = &bank.oracle_adapter.swb_feed_hash { - Some((bank.address, feed_hash.clone())) - } else { - None - } + bank.oracle_adapter + .swb_feed_hash + .as_ref() + .map(|feed_hash| (bank.address, feed_hash.clone())) }) .collect::>(); @@ -717,8 +716,7 @@ impl Liquidator { let oracle_keys = banks .iter() - .map(|(_, bank)| find_oracle_keys(&bank.config)) - .flatten() + .flat_map(|(_, bank)| find_oracle_keys(&bank.config)) .collect::>(); let mut oracle_accounts = @@ -752,7 +750,7 @@ impl Liquidator { let mut oracle_address = None; for address in oracle_addresses.iter() { - if let Some(Some(account)) = oracle_map.get(&address) { + if let Some(Some(account)) = oracle_map.get(address) { oracle_account = Some(account.clone()); oracle_address = Some(*address); break; @@ -867,7 +865,7 @@ impl Liquidator { let mut tracked_accounts: HashMap = HashMap::new(); for bank in self.banks.values() { - tracked_accounts.insert(bank.oracle_adapter.address, AccountType::OracleAccount); + tracked_accounts.insert(bank.oracle_adapter.address, AccountType::Oracle); } tracked_accounts diff --git a/src/marginfi_ixs.rs b/src/marginfi_ixs.rs index 79b2539..1b76744 100644 --- a/src/marginfi_ixs.rs +++ b/src/marginfi_ixs.rs @@ -5,30 +5,31 @@ use log::trace; use solana_sdk::instruction::AccountMeta; use solana_sdk::{instruction::Instruction, pubkey::Pubkey}; +use crate::wrappers::bank::BankWrapper; + +#[allow(clippy::too_many_arguments)] pub fn make_deposit_ix( marginfi_program_id: Pubkey, marginfi_group: Pubkey, marginfi_account: Pubkey, signer: Pubkey, - bank: Pubkey, + bank: &BankWrapper, signer_token_account: Pubkey, - bank_liquidity_vault: Pubkey, token_program: Pubkey, - mint: Pubkey, amount: u64, ) -> Instruction { let mut accounts = marginfi::accounts::LendingAccountDeposit { marginfi_group, marginfi_account, signer, - bank, + bank: bank.address, signer_token_account, - bank_liquidity_vault, + bank_liquidity_vault: bank.bank.liquidity_vault, token_program, } .to_account_metas(Some(true)); - maybe_add_bank_mint(&mut accounts, mint, &token_program); + maybe_add_bank_mint(&mut accounts, bank.bank.mint, &token_program); Instruction { program_id: marginfi_program_id, @@ -37,16 +38,15 @@ pub fn make_deposit_ix( } } +#[allow(clippy::too_many_arguments)] pub fn make_repay_ix( marginfi_program_id: Pubkey, marginfi_group: Pubkey, marginfi_account: Pubkey, signer: Pubkey, - bank: Pubkey, + bank: &BankWrapper, signer_token_account: Pubkey, - bank_liquidity_vault: Pubkey, token_program: Pubkey, - mint: Pubkey, amount: u64, repay_all: Option, ) -> Instruction { @@ -54,14 +54,14 @@ pub fn make_repay_ix( marginfi_group, marginfi_account, signer, - bank, + bank: bank.address, signer_token_account, - bank_liquidity_vault, + bank_liquidity_vault: bank.bank.liquidity_vault, token_program, } .to_account_metas(Some(true)); - maybe_add_bank_mint(&mut accounts, mint, &token_program); + maybe_add_bank_mint(&mut accounts, bank.bank.mint, &token_program); Instruction { program_id: marginfi_program_id, @@ -70,18 +70,17 @@ pub fn make_repay_ix( } } +#[allow(clippy::too_many_arguments)] pub fn make_withdraw_ix( marginfi_program_id: Pubkey, marginfi_group: Pubkey, marginfi_account: Pubkey, signer: Pubkey, - bank: Pubkey, + bank: &BankWrapper, destination_token_account: Pubkey, bank_liquidity_vault_authority: Pubkey, - bank_liquidity_vault: Pubkey, token_program: Pubkey, observation_accounts: Vec, - mint: Pubkey, amount: u64, withdraw_all: Option, ) -> Instruction { @@ -89,15 +88,15 @@ pub fn make_withdraw_ix( marginfi_group, marginfi_account, signer, - bank, + bank: bank.address, destination_token_account, bank_liquidity_vault_authority, - bank_liquidity_vault, + bank_liquidity_vault: bank.bank.liquidity_vault, token_program, } .to_account_metas(Some(true)); - maybe_add_bank_mint(&mut accounts, mint, &token_program); + maybe_add_bank_mint(&mut accounts, bank.bank.mint, &token_program); trace!( "make_withdraw_ix: observation_accounts: {:?}", @@ -121,23 +120,18 @@ pub fn make_withdraw_ix( } } +#[allow(clippy::too_many_arguments)] pub fn make_liquidate_ix( marginfi_program_id: Pubkey, marginfi_group: Pubkey, marginfi_account: Pubkey, - asset_bank: Pubkey, - liab_bank: Pubkey, + asset_bank: &BankWrapper, + liab_bank: &BankWrapper, signer: Pubkey, liquidatee_marginfi_account: Pubkey, bank_liquidity_vault_authority: Pubkey, - bank_liquidity_vault: Pubkey, - bank_insurance_vault: Pubkey, token_program: Pubkey, - liquidator_observation_accounts: Vec, - liquidatee_observation_accounts: Vec, - asset_bank_oracle: Pubkey, - liab_bank_oracle: Pubkey, - liab_mint: Pubkey, + observation_accounts: Vec, asset_amount: u64, ) -> Instruction { let mut accounts = marginfi::accounts::LendingAccountLiquidate { @@ -146,29 +140,23 @@ pub fn make_liquidate_ix( signer, liquidatee_marginfi_account, bank_liquidity_vault_authority, - bank_liquidity_vault, - bank_insurance_vault, + bank_liquidity_vault: liab_bank.bank.liquidity_vault, + bank_insurance_vault: liab_bank.bank.insurance_vault, token_program, - asset_bank, - liab_bank, + asset_bank: asset_bank.address, + liab_bank: liab_bank.address, } .to_account_metas(Some(true)); - maybe_add_bank_mint(&mut accounts, liab_mint, &token_program); + maybe_add_bank_mint(&mut accounts, liab_bank.bank.mint, &token_program); accounts.extend([ - AccountMeta::new_readonly(asset_bank_oracle, false), - AccountMeta::new_readonly(liab_bank_oracle, false), + AccountMeta::new_readonly(asset_bank.oracle_adapter.address, false), + AccountMeta::new_readonly(liab_bank.oracle_adapter.address, false), ]); accounts.extend( - liquidator_observation_accounts - .iter() - .map(|a| AccountMeta::new_readonly(a.key(), false)), - ); - - accounts.extend( - liquidatee_observation_accounts + observation_accounts .iter() .map(|a| AccountMeta::new_readonly(a.key(), false)), ); diff --git a/src/rebalancer.rs b/src/rebalancer.rs index 73a8cde..8007cee 100644 --- a/src/rebalancer.rs +++ b/src/rebalancer.rs @@ -205,7 +205,7 @@ impl Rebalancer { while let Ok(mut msg) = self.geyser_receiver.recv() { debug!("Received message {:?}", msg); match msg.account_type { - AccountType::OracleAccount => { + AccountType::Oracle => { if let Some(bank_to_update_pk) = self.oracle_to_bank.get(&msg.address) { let bank_to_update: &mut BankWrapper = self.banks.get_mut(bank_to_update_pk).unwrap(); @@ -281,7 +281,7 @@ impl Rebalancer { bank_to_update.oracle_adapter.price_adapter = oracle_price_adapter; } } - AccountType::MarginfiAccount => { + AccountType::Marginfi => { if msg.address == self.general_config.liquidator_account { let marginfi_account = bytemuck::from_bytes::(&msg.account.data[8..]); @@ -289,7 +289,7 @@ impl Rebalancer { self.liquidator_account.account_wrapper.account = *marginfi_account; } } - AccountType::TokenAccount => { + AccountType::Token => { let mint = accessor::mint(&msg.account.data); let balance = accessor::amount(&msg.account.data); @@ -315,11 +315,10 @@ impl Rebalancer { .banks .values() .filter_map(|bank| { - if let Some(feed_hash) = &bank.oracle_adapter.swb_feed_hash { - Some((bank.address, feed_hash.clone())) - } else { - None - } + bank.oracle_adapter + .swb_feed_hash + .as_ref() + .map(|feed_hash| (bank.address, feed_hash.clone())) }) .collect::>(); @@ -413,7 +412,7 @@ impl Rebalancer { let mut tracked_accounts: HashMap = HashMap::new(); for token_account in self.token_accounts.values() { - tracked_accounts.insert(token_account.address, AccountType::TokenAccount); + tracked_accounts.insert(token_account.address, AccountType::Token); } tracked_accounts diff --git a/src/token_account_manager.rs b/src/token_account_manager.rs index dfad31d..f3aa53f 100644 --- a/src/token_account_manager.rs +++ b/src/token_account_manager.rs @@ -121,8 +121,7 @@ impl TokenAccountManager { { let addresses = tas .iter() - .map(|(mint, address)| vec![*mint, *address]) - .flatten() + .flat_map(|(mint, address)| vec![*mint, *address]) .collect::>(); let res = batch_get_multiple_accounts( diff --git a/src/utils.rs b/src/utils.rs index c44b03c..4bd19c0 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -23,6 +23,7 @@ use solana_sdk::{ use std::{ collections::HashMap, io::Write, + mem::MaybeUninit, path::PathBuf, str::FromStr, sync::{atomic::AtomicUsize, Arc}, @@ -494,8 +495,8 @@ pub fn find_oracle_keys(bank_config: &BankConfig) -> Vec { feed_id, ) .0, - bank_config.oracle_keys[1].clone(), - bank_config.oracle_keys[2].clone(), + bank_config.oracle_keys[1], + bank_config.oracle_keys[2], ]; oracle_addresses } @@ -504,7 +505,7 @@ pub fn find_oracle_keys(bank_config: &BankConfig) -> Vec { .iter() .filter_map(|key| { if *key != Pubkey::default() { - Some(key.clone()) + Some(*key) } else { None } @@ -523,7 +524,7 @@ pub fn load_swb_pull_account_from_bytes(bytes: &[u8]) -> anyhow::Result(); - let mut vec: Vec = Vec::with_capacity(num); + let mut vec: Vec> = Vec::with_capacity(num); unsafe { vec.set_len(num); @@ -532,9 +533,14 @@ pub fn load_swb_pull_account_from_bytes(bytes: &[u8]) -> anyhow::Result = std::mem::transmute::< + Vec>, + Vec, + >(vec); + + Ok(vec[0]) + } } pub fn expand_tilde(path: &str) -> PathBuf { diff --git a/src/wrappers/liquidator_account.rs b/src/wrappers/liquidator_account.rs index 0013ce5..1cf279e 100644 --- a/src/wrappers/liquidator_account.rs +++ b/src/wrappers/liquidator_account.rs @@ -96,14 +96,14 @@ impl LiquidatorAccount { pub async fn liquidate( &mut self, - liquidate_account: &MarginfiAccountWrapper, + liquidatee_account: &MarginfiAccountWrapper, asset_bank: &BankWrapper, liab_bank: &BankWrapper, asset_amount: u64, banks: &HashMap, ) -> anyhow::Result<()> { let liquidator_account_address = self.account_wrapper.address; - let liquidatee_account_address = liquidate_account.address; + let liquidatee_account_address = liquidatee_account.address; let signer_pk = self.signer_keypair.pubkey(); let liab_mint = liab_bank.bank.mint; @@ -113,15 +113,12 @@ impl LiquidatorAccount { &self.program_id, ); - let bank_liquidaity_vault = liab_bank.bank.liquidity_vault; - let bank_insurante_vault = liab_bank.bank.insurance_vault; - let liquidator_observation_accounts = self.account_wrapper .get_observation_accounts(&[], &[], banks); let liquidatee_observation_accounts = - liquidate_account.get_observation_accounts(&[], &[], banks); + liquidatee_account.get_observation_accounts(&[], &[], banks); let joined_observation_accounts = liquidator_observation_accounts .iter() @@ -168,19 +165,13 @@ impl LiquidatorAccount { self.program_id, self.group, liquidator_account_address, - asset_bank.address, - liab_bank.address, + asset_bank, + liab_bank, signer_pk, liquidatee_account_address, bank_liquidaity_vault_authority, - bank_liquidaity_vault, - bank_insurante_vault, *self.token_program_per_mint.get(&liab_mint).unwrap(), - liquidator_observation_accounts, - liquidatee_observation_accounts, - asset_bank.oracle_adapter.address, - liab_bank.oracle_adapter.address, - liab_mint, + joined_observation_accounts, asset_amount, ); @@ -217,7 +208,7 @@ impl LiquidatorAccount { .await; println!( "Transaction sent without preflight check {:?} for address {:?}", - res, liquidate_account.address + res, liquidatee_account.address ); bundle.push(RawTransaction::new(vec![liquidate_ix])); @@ -257,7 +248,7 @@ impl LiquidatorAccount { self.group, marginfi_account, signer_pk, - bank.address, + bank, token_account, crate::utils::find_bank_vault_authority_pda( &bank.address, @@ -265,10 +256,8 @@ impl LiquidatorAccount { &self.program_id, ) .0, - bank.bank.liquidity_vault, token_program, observation_accounts, - mint, amount, withdraw_all, ); @@ -298,11 +287,9 @@ impl LiquidatorAccount { self.group, marginfi_account, signer_pk, - bank.address, + bank, *token_account, - bank.bank.liquidity_vault, token_program, - mint, amount, repay_all, ); @@ -338,11 +325,9 @@ impl LiquidatorAccount { self.group, marginfi_account, signer_pk, - bank.address, + bank, token_account, - bank.bank.liquidity_vault, token_program, - mint, amount, ); diff --git a/src/wrappers/marginfi_account.rs b/src/wrappers/marginfi_account.rs index 0e81ab5..5dee3cc 100644 --- a/src/wrappers/marginfi_account.rs +++ b/src/wrappers/marginfi_account.rs @@ -10,6 +10,8 @@ pub struct MarginfiAccountWrapper { pub account: MarginfiAccount, } +type Shares = Vec<(I80F48, Pubkey)>; + impl MarginfiAccountWrapper { pub fn new(address: Pubkey, account: MarginfiAccount) -> Self { MarginfiAccountWrapper { address, account } @@ -92,9 +94,7 @@ impl MarginfiAccountWrapper { Ok(balance) } - pub fn get_deposits_and_liabilities_shares( - &self, - ) -> (Vec<(I80F48, Pubkey)>, Vec<(I80F48, Pubkey)>) { + pub fn get_deposits_and_liabilities_shares(&self) -> (Shares, Shares) { let mut liabilities = Vec::new(); let mut deposits = Vec::new(); From ee66189a055f3d8013ef74da9d7a335aed896d69 Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Wed, 5 Feb 2025 20:05:20 +0100 Subject: [PATCH 10/19] better error reporting --- src/cli/entrypoints.rs | 2 +- src/transaction_manager.rs | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/cli/entrypoints.rs b/src/cli/entrypoints.rs index 74bc3e5..430935d 100644 --- a/src/cli/entrypoints.rs +++ b/src/cli/entrypoints.rs @@ -31,7 +31,7 @@ pub async fn run_liquidator(config: Eva01Config) -> anyhow::Result<()> { // a channel is shared between the liquidator/rebalancer // and the transaction manager let mut transaction_manager = - TransactionManager::new(transaction_rx, config.general_config.clone()).await; + TransactionManager::new(transaction_rx, config.general_config.clone()).await?; // Create the liquidator let mut liquidator = Liquidator::new( diff --git a/src/transaction_manager.rs b/src/transaction_manager.rs index a10c3ff..b90472c 100644 --- a/src/transaction_manager.rs +++ b/src/transaction_manager.rs @@ -73,7 +73,10 @@ impl RawTransaction { impl TransactionManager { /// Creates a new transaction manager - pub async fn new(rx: Receiver, config: GeneralConfig) -> Self { + pub async fn new( + rx: Receiver, + config: GeneralConfig, + ) -> anyhow::Result { let keypair = read_keypair_file(&config.keypair_path) .map_err(|e| { error!("Failed to read keypair file: {:?}", e); @@ -94,8 +97,8 @@ impl TransactionManager { // Loads the Address Lookup Table's accounts let mut lookup_tables = vec![]; for table_address in &config.address_lookup_tables { - let raw_account = rpc.get_account(table_address).await.unwrap(); - let address_lookup_table = AddressLookupTable::deserialize(&raw_account.data).unwrap(); + let raw_account = rpc.get_account(table_address).await?; + let address_lookup_table = AddressLookupTable::deserialize(&raw_account.data)?; let lookup_table = AddressLookupTableAccount { key: *table_address, addresses: address_lookup_table.addresses.to_vec(), @@ -105,7 +108,7 @@ impl TransactionManager { let tip_accounts = Self::get_tip_accounts(&mut searcher_client).await.unwrap(); - Self { + Ok(Self { rx, keypair, rpc, @@ -114,7 +117,7 @@ impl TransactionManager { is_jito_leader: AtomicBool::new(false), tip_accounts, lookup_tables, - } + }) } /// Starts the transaction manager From b5608298f7750e09b5d164dc077cec4a032b0c69 Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Wed, 12 Feb 2025 14:41:51 +0100 Subject: [PATCH 11/19] test build to publish metrics --- Cargo.lock | 184 ++++++++++++++++++++++++++++++++++++++++--- Cargo.toml | 2 + Dockerfile | 1 + service.yaml | 14 ++++ service_monitor.yaml | 13 +++ src/cli/mod.rs | 34 ++++++++ 6 files changed, 235 insertions(+), 13 deletions(-) create mode 100644 service.yaml create mode 100644 service_monitor.yaml diff --git a/Cargo.lock b/Cargo.lock index 7c1e74d..f3c49a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "Inflector" @@ -834,7 +834,7 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", - "http", + "http 0.2.12", "http-body", "hyper", "itoa", @@ -860,7 +860,7 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", + "http 0.2.12", "http-body", "mime", "rustversion", @@ -1985,6 +1985,7 @@ dependencies = [ "log", "marginfi", "num-traits", + "prometheus", "rayon", "serde", "serde_json", @@ -2005,6 +2006,7 @@ dependencies = [ "tonic", "tonic-health", "url", + "warp", "yellowstone-grpc-client", "yellowstone-grpc-proto", ] @@ -2299,7 +2301,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap 2.2.6", "slab", "tokio", @@ -2359,6 +2361,30 @@ version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +[[package]] +name = "headers" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http 0.2.12", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http 0.2.12", +] + [[package]] name = "heck" version = "0.3.3" @@ -2445,6 +2471,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f16ca2af56261c99fba8bac40a10251ce8188205a4c448fbb745a2e4daa76fea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -2452,7 +2489,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", "pin-project-lite", ] @@ -2485,7 +2522,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", + "http 0.2.12", "http-body", "httparse", "httpdate", @@ -2505,7 +2542,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", - "http", + "http 0.2.12", "hyper", "rustls", "tokio", @@ -3014,6 +3051,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -3040,6 +3087,24 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 0.2.12", + "httparse", + "log", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + [[package]] name = "multimap" version = "0.10.0" @@ -3640,6 +3705,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror", +] + [[package]] name = "prost" version = "0.12.6" @@ -3716,6 +3796,12 @@ dependencies = [ "prost 0.12.6", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "protobuf-src" version = "1.1.0+21.5" @@ -4076,7 +4162,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", + "http 0.2.12", "http-body", "hyper", "hyper-rustls", @@ -4336,6 +4422,12 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -5014,8 +5106,8 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tokio-tungstenite", - "tungstenite", + "tokio-tungstenite 0.20.1", + "tungstenite 0.20.1", "url", ] @@ -6316,10 +6408,22 @@ dependencies = [ "rustls", "tokio", "tokio-rustls", - "tungstenite", + "tungstenite 0.20.1", "webpki-roots 0.25.4", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.21.0", +] + [[package]] name = "tokio-util" version = "0.7.11" @@ -6410,7 +6514,7 @@ dependencies = [ "base64 0.21.7", "bytes", "h2", - "http", + "http 0.2.12", "http-body", "hyper", "hyper-timeout", @@ -6535,7 +6639,7 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http", + "http 0.2.12", "httparse", "log", "rand 0.8.5", @@ -6547,6 +6651,25 @@ dependencies = [ "webpki-roots 0.24.0", ] +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.2.0", + "httparse", + "log", + "rand 0.8.5", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "type-layout" version = "0.2.0" @@ -6580,6 +6703,12 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "unicase" +version = "2.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -6758,6 +6887,35 @@ dependencies = [ "try-lock", ] +[[package]] +name = "warp" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "headers", + "http 0.2.12", + "hyper", + "log", + "mime", + "mime_guess", + "multer", + "percent-encoding", + "pin-project", + "scoped-tls", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-tungstenite 0.21.0", + "tokio-util", + "tower-service", + "tracing", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index ba4dd24..4250f47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,8 @@ switchboard-on-demand-client = "0.2.9" chrono = "0.4.38" hex = "0.4.3" url = "2.5.2" +warp = "0.3.7" +prometheus = "0.13.4" [profile.release] opt-level = 3 diff --git a/Dockerfile b/Dockerfile index 860b30a..4f39ae5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -28,6 +28,7 @@ WORKDIR /app COPY --from=builder /usr/src/app/target/release/eva01 . ENV RUST_LOG=eva01=info +ENV PRETTY_LOGS=1 # Set the startup command CMD ["./eva01", "run", "/config/config.toml"] diff --git a/service.yaml b/service.yaml new file mode 100644 index 0000000..8e6c631 --- /dev/null +++ b/service.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: Service +metadata: + name: eva01-metrics + namespace: default + labels: + app: eva01 +spec: + selector: + app: eva01 + ports: + - name: metrics + port: 8080 + targetPort: 8080 diff --git a/service_monitor.yaml b/service_monitor.yaml new file mode 100644 index 0000000..3bb2474 --- /dev/null +++ b/service_monitor.yaml @@ -0,0 +1,13 @@ +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: eva01-monitor + namespace: monitoring +spec: + selector: + matchLabels: + app: eva01 + endpoints: + - port: metrics + path: /metrics + interval: 10s diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 90c1eaa..2b2a4a9 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -11,10 +11,44 @@ pub mod entrypoints; /// A wizard like setup menu for creating the liquidator configuration pub mod setup; +use prometheus::{Encoder, TextEncoder, Counter, Registry}; +use warp::Filter; +use lazy_static::lazy_static; +use std::sync::Arc; + +lazy_static! { + static ref REQUEST_COUNTER: Counter = Counter::new( + "eva01_requests_total", + "Total number of requests received" + ).unwrap(); +} + /// Main entrypoint for the Eva pub async fn main_entry() -> anyhow::Result<()> { let args = app::Args::parse(); + let registry = Registry::new(); + registry.register(Box::new(REQUEST_COUNTER.clone())).unwrap(); + + let metrics_route = warp::path("metrics").map(move || { + let encoder = TextEncoder::new(); + let mut buffer = Vec::new(); + let metric_families = registry.gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + let response = String::from_utf8(buffer).unwrap(); + warp::reply::with_header(response, "Content-Type", "text/plain; version=0.0.4") + }); + + let hello = warp::path!("hello").map(|| { + REQUEST_COUNTER.inc(); + "Hello, world!" + }); + + let routes = metrics_route.or(hello); + + println!("Starting eva01 metrics server on port 8080..."); + warp::serve(routes).run(([0, 0, 0, 0], 8080)).await; + match args.cmd { app::Commands::Run { path } => { let config = Eva01Config::try_load_from_file(path)?; From e17e9afd9cd19980b285babee438c1306e5541f9 Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Fri, 14 Feb 2025 13:48:26 +0100 Subject: [PATCH 12/19] progress --- src/cli/mod.rs | 15 +++++++-------- src/liquidator.rs | 3 +++ src/rebalancer.rs | 1 + 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 2b2a4a9..5146ff6 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -11,16 +11,13 @@ pub mod entrypoints; /// A wizard like setup menu for creating the liquidator configuration pub mod setup; -use prometheus::{Encoder, TextEncoder, Counter, Registry}; -use warp::Filter; use lazy_static::lazy_static; -use std::sync::Arc; +use prometheus::{Counter, Encoder, Registry, TextEncoder}; +use warp::Filter; lazy_static! { - static ref REQUEST_COUNTER: Counter = Counter::new( - "eva01_requests_total", - "Total number of requests received" - ).unwrap(); + static ref REQUEST_COUNTER: Counter = + Counter::new("eva01_requests_total", "Total number of requests received").unwrap(); } /// Main entrypoint for the Eva @@ -28,7 +25,9 @@ pub async fn main_entry() -> anyhow::Result<()> { let args = app::Args::parse(); let registry = Registry::new(); - registry.register(Box::new(REQUEST_COUNTER.clone())).unwrap(); + registry + .register(Box::new(REQUEST_COUNTER.clone())) + .unwrap(); let metrics_route = warp::path("metrics").map(move || { let encoder = TextEncoder::new(); diff --git a/src/liquidator.rs b/src/liquidator.rs index 65a425c..11b4bfd 100644 --- a/src/liquidator.rs +++ b/src/liquidator.rs @@ -243,6 +243,7 @@ impl Liquidator { // Accounts are sorted from the highest profit to the lowest accounts.sort_by(|a, b| a.profit.cmp(&b.profit)); accounts.reverse(); + info!("Accounts to liquidate: {:?}", accounts.len()); for account in accounts { info!( "Liquidating account {:?}", @@ -263,7 +264,9 @@ impl Liquidator { "Failed to liquidate account {:?}, error: {:?}", account.liquidatee_account.address, e ); + // TODO: publish update to Grafana - how many accounts are NOT liquidated } + // TODO: publish update to Grafana - how many accounts are liquidated } } break; diff --git a/src/rebalancer.rs b/src/rebalancer.rs index 8007cee..4471a0e 100644 --- a/src/rebalancer.rs +++ b/src/rebalancer.rs @@ -296,6 +296,7 @@ impl Rebalancer { let token_to_update = self.token_accounts.get_mut(&mint).unwrap(); token_to_update.balance = balance; + // TODO: publish update to Grafana } } From f83832c6b9356c26a751dcfb40ea14c9edd3af1d Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Fri, 14 Feb 2025 13:51:34 +0100 Subject: [PATCH 13/19] spawn a separate thread for the metrics server --- src/cli/mod.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 5146ff6..eeddf26 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -8,22 +8,21 @@ pub mod app; /// Entrypoints for the Eva pub mod entrypoints; -/// A wizard like setup menu for creating the liquidator configuration +/// A wizard-like setup menu for creating the liquidator configuration pub mod setup; use lazy_static::lazy_static; use prometheus::{Counter, Encoder, Registry, TextEncoder}; use warp::Filter; +use tokio::task; lazy_static! { static ref REQUEST_COUNTER: Counter = Counter::new("eva01_requests_total", "Total number of requests received").unwrap(); } -/// Main entrypoint for the Eva -pub async fn main_entry() -> anyhow::Result<()> { - let args = app::Args::parse(); - +/// Starts the metrics server asynchronously +async fn start_metrics_server() { let registry = Registry::new(); registry .register(Box::new(REQUEST_COUNTER.clone())) @@ -47,7 +46,16 @@ pub async fn main_entry() -> anyhow::Result<()> { println!("Starting eva01 metrics server on port 8080..."); warp::serve(routes).run(([0, 0, 0, 0], 8080)).await; +} + +/// Main entrypoint for Eva +pub async fn main_entry() -> anyhow::Result<()> { + let args = app::Args::parse(); + + // Start the metrics server in a separate task + task::spawn(start_metrics_server()); + // Proceed with the main program match args.cmd { app::Commands::Run { path } => { let config = Eva01Config::try_load_from_file(path)?; From 10c0ea4e5f1c2d1f5c656099b59ec7317fd68b9c Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Sat, 15 Feb 2025 13:17:17 +0100 Subject: [PATCH 14/19] add new (sensible) metrics --- src/cli/mod.rs | 80 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 52 insertions(+), 28 deletions(-) diff --git a/src/cli/mod.rs b/src/cli/mod.rs index eeddf26..b94c158 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use crate::config::Eva01Config; use clap::Parser; use setup::setup_from_cfg; @@ -12,48 +14,70 @@ pub mod entrypoints; pub mod setup; use lazy_static::lazy_static; -use prometheus::{Counter, Encoder, Registry, TextEncoder}; +use prometheus::{Counter, Encoder, Gauge, Histogram, HistogramOpts, Registry, TextEncoder}; +use tokio::sync::Mutex; use warp::Filter; -use tokio::task; lazy_static! { - static ref REQUEST_COUNTER: Counter = - Counter::new("eva01_requests_total", "Total number of requests received").unwrap(); + static ref REGISTRY: Registry = Registry::new(); + static ref SUCCESSFUL_LIQUIDATIONS: Counter = Counter::new( + "eva01_successful_liquidations_total", + "Total number of successful liquidations" + ) + .unwrap(); + static ref FAILED_LIQUIDATIONS: Counter = Counter::new( + "eva01_failed_liquidations_total", + "Total number of failed liquidation attempts" + ) + .unwrap(); + static ref ERROR_COUNT: Counter = + Counter::new("eva01_errors_total", "Total number of errors encountered").unwrap(); + static ref LIQUIDATION_LATENCY: Histogram = Histogram::with_opts(HistogramOpts::new( + "eva01_liquidation_latency_seconds", + "Time taken for liquidations in seconds" + )) + .unwrap(); + static ref BALANCES: Mutex> = Mutex::new(HashMap::new()); } -/// Starts the metrics server asynchronously -async fn start_metrics_server() { - let registry = Registry::new(); - registry - .register(Box::new(REQUEST_COUNTER.clone())) +fn register_metrics() { + REGISTRY + .register(Box::new(SUCCESSFUL_LIQUIDATIONS.clone())) .unwrap(); + REGISTRY + .register(Box::new(FAILED_LIQUIDATIONS.clone())) + .unwrap(); + REGISTRY.register(Box::new(ERROR_COUNT.clone())).unwrap(); + REGISTRY + .register(Box::new(LIQUIDATION_LATENCY.clone())) + .unwrap(); +} - let metrics_route = warp::path("metrics").map(move || { - let encoder = TextEncoder::new(); - let mut buffer = Vec::new(); - let metric_families = registry.gather(); - encoder.encode(&metric_families, &mut buffer).unwrap(); - let response = String::from_utf8(buffer).unwrap(); - warp::reply::with_header(response, "Content-Type", "text/plain; version=0.0.4") - }); - - let hello = warp::path!("hello").map(|| { - REQUEST_COUNTER.inc(); - "Hello, world!" - }); - - let routes = metrics_route.or(hello); - - println!("Starting eva01 metrics server on port 8080..."); - warp::serve(routes).run(([0, 0, 0, 0], 8080)).await; +fn metrics_handler() -> String { + let encoder = TextEncoder::new(); + let mut buffer = Vec::new(); + let metric_families = REGISTRY.gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + String::from_utf8(buffer).unwrap() } /// Main entrypoint for Eva pub async fn main_entry() -> anyhow::Result<()> { let args = app::Args::parse(); + register_metrics(); + + let metrics_route = warp::path("metrics").map(move || { + warp::reply::with_header( + metrics_handler(), + "Content-Type", + "text/plain; version=0.0.4", + ) + }); // Start the metrics server in a separate task - task::spawn(start_metrics_server()); + tokio::spawn(async move { + warp::serve(metrics_route).run(([0, 0, 0, 0], 8080)).await; + }); // Proceed with the main program match args.cmd { From 322098a5807f86fef6714112d563a8c117a268bd Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Sat, 15 Feb 2025 13:29:12 +0100 Subject: [PATCH 15/19] reposition metrics in the source tree --- src/cli/entrypoints.rs | 17 +++++++++++ src/cli/mod.rs | 65 ------------------------------------------ src/main.rs | 3 ++ src/metrics.rs | 47 ++++++++++++++++++++++++++++++ 4 files changed, 67 insertions(+), 65 deletions(-) create mode 100644 src/metrics.rs diff --git a/src/cli/entrypoints.rs b/src/cli/entrypoints.rs index 430935d..532a4cb 100644 --- a/src/cli/entrypoints.rs +++ b/src/cli/entrypoints.rs @@ -2,6 +2,7 @@ use crate::{ config::Eva01Config, geyser::{GeyserService, GeyserUpdate}, liquidator::Liquidator, + metrics::{metrics_handler, register_metrics}, rebalancer::Rebalancer, transaction_manager::{BatchTransactions, TransactionManager}, }; @@ -10,8 +11,24 @@ use std::{ collections::HashMap, sync::{atomic::AtomicBool, Arc}, }; +use warp::Filter; pub async fn run_liquidator(config: Eva01Config) -> anyhow::Result<()> { + register_metrics(); + + let metrics_route = warp::path("metrics").map(move || { + warp::reply::with_header( + metrics_handler(), + "Content-Type", + "text/plain; version=0.0.4", + ) + }); + + // Start the metrics server in a separate task + tokio::spawn(async move { + warp::serve(metrics_route).run(([0, 0, 0, 0], 8080)).await; + }); + info!("Starting eva01 liquidator! {:#?}", &config); // Create two channels diff --git a/src/cli/mod.rs b/src/cli/mod.rs index b94c158..3d5a9d4 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use crate::config::Eva01Config; use clap::Parser; use setup::setup_from_cfg; @@ -13,73 +11,10 @@ pub mod entrypoints; /// A wizard-like setup menu for creating the liquidator configuration pub mod setup; -use lazy_static::lazy_static; -use prometheus::{Counter, Encoder, Gauge, Histogram, HistogramOpts, Registry, TextEncoder}; -use tokio::sync::Mutex; -use warp::Filter; - -lazy_static! { - static ref REGISTRY: Registry = Registry::new(); - static ref SUCCESSFUL_LIQUIDATIONS: Counter = Counter::new( - "eva01_successful_liquidations_total", - "Total number of successful liquidations" - ) - .unwrap(); - static ref FAILED_LIQUIDATIONS: Counter = Counter::new( - "eva01_failed_liquidations_total", - "Total number of failed liquidation attempts" - ) - .unwrap(); - static ref ERROR_COUNT: Counter = - Counter::new("eva01_errors_total", "Total number of errors encountered").unwrap(); - static ref LIQUIDATION_LATENCY: Histogram = Histogram::with_opts(HistogramOpts::new( - "eva01_liquidation_latency_seconds", - "Time taken for liquidations in seconds" - )) - .unwrap(); - static ref BALANCES: Mutex> = Mutex::new(HashMap::new()); -} - -fn register_metrics() { - REGISTRY - .register(Box::new(SUCCESSFUL_LIQUIDATIONS.clone())) - .unwrap(); - REGISTRY - .register(Box::new(FAILED_LIQUIDATIONS.clone())) - .unwrap(); - REGISTRY.register(Box::new(ERROR_COUNT.clone())).unwrap(); - REGISTRY - .register(Box::new(LIQUIDATION_LATENCY.clone())) - .unwrap(); -} - -fn metrics_handler() -> String { - let encoder = TextEncoder::new(); - let mut buffer = Vec::new(); - let metric_families = REGISTRY.gather(); - encoder.encode(&metric_families, &mut buffer).unwrap(); - String::from_utf8(buffer).unwrap() -} - /// Main entrypoint for Eva pub async fn main_entry() -> anyhow::Result<()> { let args = app::Args::parse(); - register_metrics(); - - let metrics_route = warp::path("metrics").map(move || { - warp::reply::with_header( - metrics_handler(), - "Content-Type", - "text/plain; version=0.0.4", - ) - }); - - // Start the metrics server in a separate task - tokio::spawn(async move { - warp::serve(metrics_route).run(([0, 0, 0, 0], 8080)).await; - }); - // Proceed with the main program match args.cmd { app::Commands::Run { path } => { let config = Eva01Config::try_load_from_file(path)?; diff --git a/src/main.rs b/src/main.rs index a0c70bd..8bbc681 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,9 @@ use env_logger::Builder; use std::{backtrace::Backtrace, error::Error}; +/// Prometheus metrics +mod metrics; + /// Geyser service mod geyser; diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..388588f --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,47 @@ +use lazy_static::lazy_static; +use prometheus::{Counter, Encoder, Gauge, Histogram, HistogramOpts, Registry, TextEncoder}; +use std::collections::HashMap; +use tokio::sync::Mutex; + +lazy_static! { + pub static ref REGISTRY: Registry = Registry::new(); + pub static ref SUCCESSFUL_LIQUIDATIONS: Counter = Counter::new( + "eva01_successful_liquidations_total", + "Total number of successful liquidations" + ) + .unwrap(); + pub static ref FAILED_LIQUIDATIONS: Counter = Counter::new( + "eva01_failed_liquidations_total", + "Total number of failed liquidation attempts" + ) + .unwrap(); + pub static ref ERROR_COUNT: Counter = + Counter::new("eva01_errors_total", "Total number of errors encountered").unwrap(); + pub static ref LIQUIDATION_LATENCY: Histogram = Histogram::with_opts(HistogramOpts::new( + "eva01_liquidation_latency_seconds", + "Time taken for liquidations in seconds" + )) + .unwrap(); + pub static ref BALANCES: Mutex> = Mutex::new(HashMap::new()); +} + +pub fn register_metrics() { + REGISTRY + .register(Box::new(SUCCESSFUL_LIQUIDATIONS.clone())) + .unwrap(); + REGISTRY + .register(Box::new(FAILED_LIQUIDATIONS.clone())) + .unwrap(); + REGISTRY.register(Box::new(ERROR_COUNT.clone())).unwrap(); + REGISTRY + .register(Box::new(LIQUIDATION_LATENCY.clone())) + .unwrap(); +} + +pub fn metrics_handler() -> String { + let encoder = TextEncoder::new(); + let mut buffer = Vec::new(); + let metric_families = REGISTRY.gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + String::from_utf8(buffer).unwrap() +} From 09bb6a8a0b7d5845e16083575774494cfa02d4b1 Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Sat, 15 Feb 2025 14:00:51 +0100 Subject: [PATCH 16/19] Metrics! Metrics everywhere! --- src/liquidator.rs | 15 ++++++++++++--- src/metrics.rs | 28 ++++++++++++++++++++++++---- src/rebalancer.rs | 3 ++- src/transaction_manager.rs | 5 ++++- 4 files changed, 42 insertions(+), 9 deletions(-) diff --git a/src/liquidator.rs b/src/liquidator.rs index 11b4bfd..7a0dfad 100644 --- a/src/liquidator.rs +++ b/src/liquidator.rs @@ -2,6 +2,7 @@ use crate::{ config::{GeneralConfig, LiquidatorCfg}, crossbar::CrossbarMaintainer, geyser::{AccountType, GeyserUpdate}, + metrics::{ERROR_COUNT, FAILED_LIQUIDATIONS, LIQUIDATION_ATTEMPTS, LIQUIDATION_LATENCY}, transaction_manager::BatchTransactions, utils::{ batch_get_multiple_accounts, find_oracle_keys, BankAccountWithPriceFeedEva, @@ -45,6 +46,7 @@ use std::{ cmp::min, collections::HashMap, sync::{atomic::AtomicBool, Arc}, + time::Instant, }; use switchboard_on_demand::PullFeedAccountData; @@ -243,12 +245,14 @@ impl Liquidator { // Accounts are sorted from the highest profit to the lowest accounts.sort_by(|a, b| a.profit.cmp(&b.profit)); accounts.reverse(); - info!("Accounts to liquidate: {:?}", accounts.len()); for account in accounts { + LIQUIDATION_ATTEMPTS.inc(); info!( "Liquidating account {:?}", account.liquidatee_account.address ); + + let start = Instant::now(); if let Err(e) = self .liquidator_account .liquidate( @@ -264,9 +268,11 @@ impl Liquidator { "Failed to liquidate account {:?}, error: {:?}", account.liquidatee_account.address, e ); - // TODO: publish update to Grafana - how many accounts are NOT liquidated + FAILED_LIQUIDATIONS.inc(); + ERROR_COUNT.inc(); } - // TODO: publish update to Grafana - how many accounts are liquidated + let duration = start.elapsed().as_secs_f64(); + LIQUIDATION_LATENCY.observe(duration); } } break; @@ -331,6 +337,7 @@ impl Liquidator { Ok(None) => return None, Err(e) => { error!("Error finding liquidation bank candidates: {:?}", e); + ERROR_COUNT.inc(); return None; } }; @@ -343,6 +350,7 @@ impl Liquidator { ) .map_err(|e| { error!("Error computing max liquidatable asset amount: {:?}", e); + ERROR_COUNT.inc(); }) .ok()?; @@ -890,6 +898,7 @@ impl Liquidator { let bank = match self.banks.get(&share.1) { Some(bank) => bank, None => { + ERROR_COUNT.inc(); return Err(anyhow::anyhow!("Bank with pubkey {} not found", share.1)); } }; diff --git a/src/metrics.rs b/src/metrics.rs index 388588f..ed56afa 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -5,9 +5,9 @@ use tokio::sync::Mutex; lazy_static! { pub static ref REGISTRY: Registry = Registry::new(); - pub static ref SUCCESSFUL_LIQUIDATIONS: Counter = Counter::new( - "eva01_successful_liquidations_total", - "Total number of successful liquidations" + pub static ref LIQUIDATION_ATTEMPTS: Counter = Counter::new( + "eva01_liquidation_attempts_total", + "Total number of liquidation attempts" ) .unwrap(); pub static ref FAILED_LIQUIDATIONS: Counter = Counter::new( @@ -27,7 +27,7 @@ lazy_static! { pub fn register_metrics() { REGISTRY - .register(Box::new(SUCCESSFUL_LIQUIDATIONS.clone())) + .register(Box::new(LIQUIDATION_ATTEMPTS.clone())) .unwrap(); REGISTRY .register(Box::new(FAILED_LIQUIDATIONS.clone())) @@ -45,3 +45,23 @@ pub fn metrics_handler() -> String { encoder.encode(&metric_families, &mut buffer).unwrap(); String::from_utf8(buffer).unwrap() } + +pub async fn update_balance(coin: &str, new_balance: f64) { + let mut balances = BALANCES.lock().await; + + if let Some(gauge) = balances.get(coin) { + gauge.set(new_balance); + } else { + // If the coin is not already being tracked, create a new Gauge + let gauge = Gauge::new( + format!("eva01_balance_{}", coin), + format!("Balance of {}", coin), + ) + .unwrap(); + gauge.set(new_balance); + + // Insert it into the registry and the HashMap + balances.insert(coin.to_string(), gauge.clone()); + crate::metrics::REGISTRY.register(Box::new(gauge)).unwrap(); + } +} diff --git a/src/rebalancer.rs b/src/rebalancer.rs index 4471a0e..5c5d0d5 100644 --- a/src/rebalancer.rs +++ b/src/rebalancer.rs @@ -2,6 +2,7 @@ use crate::{ config::{GeneralConfig, RebalancerCfg}, crossbar::CrossbarMaintainer, geyser::{AccountType, GeyserUpdate}, + metrics::update_balance, sender::{SenderCfg, TransactionSender}, token_account_manager::TokenAccountManager, transaction_manager::{BatchTransactions, RawTransaction}, @@ -296,7 +297,7 @@ impl Rebalancer { let token_to_update = self.token_accounts.get_mut(&mint).unwrap(); token_to_update.balance = balance; - // TODO: publish update to Grafana + update_balance(&token_to_update.mint.to_string(), balance as f64).await; } } diff --git a/src/transaction_manager.rs b/src/transaction_manager.rs index b90472c..757dcfd 100644 --- a/src/transaction_manager.rs +++ b/src/transaction_manager.rs @@ -1,4 +1,4 @@ -use crate::config::GeneralConfig; +use crate::{config::GeneralConfig, metrics::ERROR_COUNT}; use crossbeam::channel::Receiver; use jito_protos::searcher::{ searcher_service_client::SearcherServiceClient, GetTipAccountsRequest, @@ -126,6 +126,7 @@ impl TransactionManager { let transactions = match self.configure_instructions(instructions).await { Ok(txs) => txs, Err(e) => { + ERROR_COUNT.inc(); error!("Failed to configure instructions: {:?}", e); continue; } @@ -139,6 +140,7 @@ impl TransactionManager { { Ok(response) => response.into_inner(), Err(e) => { + ERROR_COUNT.inc(); error!("Failed to get next scheduled leader: {:?}", e); continue; } @@ -160,6 +162,7 @@ impl TransactionManager { ); tokio::spawn(async move { if let Err(e) = transaction.await { + ERROR_COUNT.inc(); error!("Failed to send transaction: {:?}", e); } }); From fd06365a7e4efa5a4386b54005f3acd70d26ad41 Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Sun, 16 Feb 2025 13:58:20 +0100 Subject: [PATCH 17/19] fix rate limit --- src/transaction_manager.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/transaction_manager.rs b/src/transaction_manager.rs index 757dcfd..83a42e6 100644 --- a/src/transaction_manager.rs +++ b/src/transaction_manager.rs @@ -28,7 +28,7 @@ const LEADERSHIP_THRESHOLD: u64 = 2; /// The sleep duration for the transaction manager /// to wait before checking for the next leader -const SLEEP_DURATION: std::time::Duration = std::time::Duration::from_millis(500); +const SLEEP_DURATION: std::time::Duration = std::time::Duration::from_millis(1000); /// Manages transactions for the liquidator and rebalancer #[allow(dead_code)] @@ -142,6 +142,7 @@ impl TransactionManager { Err(e) => { ERROR_COUNT.inc(); error!("Failed to get next scheduled leader: {:?}", e); + tokio::time::sleep(SLEEP_DURATION).await; continue; } }; From 210f7cbaf0da16946c69cc88db3c187fe45be131 Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Sun, 16 Feb 2025 14:30:50 +0100 Subject: [PATCH 18/19] another fix for rate limit --- src/transaction_manager.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/transaction_manager.rs b/src/transaction_manager.rs index 83a42e6..45c8e08 100644 --- a/src/transaction_manager.rs +++ b/src/transaction_manager.rs @@ -19,8 +19,8 @@ use solana_sdk::{ system_instruction::transfer, transaction::VersionedTransaction, }; -use std::str::FromStr; use std::sync::{atomic::AtomicBool, Arc}; +use std::{ops::Mul, str::FromStr}; use tonic::transport::Channel; /// The leadership threshold related to the jito block engine @@ -132,6 +132,7 @@ impl TransactionManager { } }; debug!("Waiting for Jito leader..."); + let mut multiplier = 1u32; loop { let next_leader = match self .searcher_client @@ -142,7 +143,17 @@ impl TransactionManager { Err(e) => { ERROR_COUNT.inc(); error!("Failed to get next scheduled leader: {:?}", e); - tokio::time::sleep(SLEEP_DURATION).await; + if e.code() == tonic::Code::ResourceExhausted { + let sleep_for = SLEEP_DURATION.mul(multiplier); + debug!( + "Resource exhausted, sleeping for {} seconds", + sleep_for.as_secs() + ); + tokio::time::sleep(sleep_for).await; + if multiplier < 128 { + multiplier *= 2; + } + } continue; } }; From 874b2a197b5e21b826547c4d27d0d0dcbdf53ac7 Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Mon, 17 Feb 2025 14:32:43 +0100 Subject: [PATCH 19/19] change log level --- src/transaction_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transaction_manager.rs b/src/transaction_manager.rs index 45c8e08..e8d6f8c 100644 --- a/src/transaction_manager.rs +++ b/src/transaction_manager.rs @@ -145,7 +145,7 @@ impl TransactionManager { error!("Failed to get next scheduled leader: {:?}", e); if e.code() == tonic::Code::ResourceExhausted { let sleep_for = SLEEP_DURATION.mul(multiplier); - debug!( + error!( "Resource exhausted, sleeping for {} seconds", sleep_for.as_secs() );