Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orchestrator thread keepalive #524

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
32 changes: 0 additions & 32 deletions orchestrator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ description = """
"""

[workspace]
default-members = ["gorc", "orchestrator", "test_runner"]
default-members = ["gorc", "orchestrator"]
members = [
"orchestrator",
"cosmos_gravity",
"ethereum_gravity",
"gravity_utils",
"gravity_proto_build",
"test_runner",
"gravity_proto",
"relayer",
"register_delegate_keys",
Expand All @@ -34,7 +33,6 @@ cosmos_gravity = { path = "./cosmos_gravity" }
ethereum_gravity = { path = "./ethereum_gravity" }
gravity_utils = { path = "./gravity_utils" }
gravity_proto_build = { path = "./gravity_proto_build" }
test_runner = { path = "./test_runner" }
gravity_proto = { path = "./gravity_proto" }
register_delegate_keys = { path = "./register_delegate_keys" }
gorc = { path = "./gorc" }
Expand Down
9 changes: 2 additions & 7 deletions orchestrator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ This is to build the relayer logic (i.e. cosmos to ethereum) as a seperate binar

Supporting bash scripts for this library

### test_runner/

A binary which runs tests against a cosmos chain


## CLI

Expand All @@ -67,7 +63,6 @@ client deploy-erc20-representation --cosmos-grpc=<url> --cosmos-prefix=<prefix>
orchestrator --cosmos-phrase=<key> --ethereum-key=<key> --cosmos-grpc=<url> --address-prefix=<prefix> --ethereum-rpc=<url> --fees=<denom> --contract-address=<addr>
register-delegate-key --validator-phrase=<key> --address-prefix=<prefix> [--cosmos-phrase=<key>] [--ethereum-key=<key>] --cosmos-grpc=<url> --fees=<denom>
relayer --ethereum-key=<key> --cosmos-grpc=<url> --address-prefix=<prefix> --ethereum-rpc=<url> --contract-address=<addr> --cosmos-grpc=<gurl>
test_runner
```

## PROPOSED
Expand Down Expand Up @@ -105,7 +100,7 @@ gorc
update [name] [new-name]
list
show [name]
cosmos
cosmos
add [name]
import [name] [mnemnoic]
delete [name]
Expand All @@ -117,7 +112,7 @@ gorc
```json
[gravity]
contract = "0x6b175474e89094c44da98b954eedeac495271d0f"

[ethereum]
key = "testkey"
rpc = "http://localhost:8545"
Expand Down
27 changes: 6 additions & 21 deletions orchestrator/cosmos_gravity/src/build.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use deep_space::Contact;
use deep_space::Msg;
use deep_space::{Address as CosmosAddress, Msg};
use ethereum_gravity::types::EthClient;
use ethers::prelude::*;
use ethers::utils::keccak256;
Expand All @@ -14,20 +13,16 @@ use lazy_static::lazy_static;
use regex::Regex;
use std::collections::BTreeMap;

use crate::crypto::PrivateKey as CosmosPrivateKey;

lazy_static! {
static ref DENOM_REGEX: Regex = Regex::new("^[a-zA-Z][a-zA-Z0-9/-]{2,127}$").unwrap();
}

pub async fn signer_set_tx_confirmation_messages(
contact: &Contact,
cosmos_address: CosmosAddress,
eth_client: EthClient,
valsets: Vec<Valset>,
cosmos_key: CosmosPrivateKey,
gravity_id: String,
) -> Vec<Msg> {
let cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap();
let ethereum_address = eth_client.address();

let mut msgs = Vec::new();
Expand All @@ -52,13 +47,11 @@ pub async fn signer_set_tx_confirmation_messages(
}

pub async fn batch_tx_confirmation_messages(
contact: &Contact,
cosmos_address: CosmosAddress,
eth_client: EthClient,
batches: Vec<TransactionBatch>,
cosmos_key: CosmosPrivateKey,
gravity_id: String,
) -> Vec<Msg> {
let cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap();
let ethereum_address = eth_client.address();

let mut msgs = Vec::new();
Expand All @@ -84,13 +77,11 @@ pub async fn batch_tx_confirmation_messages(
}

pub async fn contract_call_tx_confirmation_messages(
contact: &Contact,
cosmos_address: CosmosAddress,
eth_client: EthClient,
logic_calls: Vec<LogicCall>,
cosmos_key: CosmosPrivateKey,
gravity_id: String,
) -> Vec<Msg> {
let cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap();
let ethereum_address = eth_client.address();

let mut msgs = Vec::new();
Expand All @@ -117,12 +108,9 @@ pub async fn contract_call_tx_confirmation_messages(
}

pub async fn ethereum_vote_height_messages(
contact: &Contact,
cosmos_key: CosmosPrivateKey,
cosmos_address: CosmosAddress,
ethereum_height: U64,
) -> Vec<Msg> {
let cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap();

let msg = proto::MsgEthereumHeightVote {
ethereum_height: ethereum_height.as_u64(),
signer: cosmos_address.to_string(),
Expand All @@ -136,16 +124,13 @@ pub async fn ethereum_vote_height_messages(
}

pub fn ethereum_event_messages(
contact: &Contact,
cosmos_key: CosmosPrivateKey,
cosmos_address: CosmosAddress,
deposits: Vec<SendToCosmosEvent>,
batches: Vec<TransactionBatchExecutedEvent>,
erc20_deploys: Vec<Erc20DeployedEvent>,
logic_calls: Vec<LogicCallExecutedEvent>,
valsets: Vec<ValsetUpdatedEvent>,
) -> Vec<Msg> {
let cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap();

// This sorts oracle messages by event nonce before submitting them. It's not a pretty implementation because
// we're missing an intermediary layer of abstraction. We could implement 'EventTrait' and then implement sort
// for it, but then when we go to transform 'EventTrait' objects into GravityMsg enum values we'll have all sorts
Expand Down
53 changes: 42 additions & 11 deletions orchestrator/cosmos_gravity/src/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ use gravity_proto::gravity as proto;
use gravity_utils::error::GravityError;
use gravity_utils::ethereum::format_eth_address;
use prost::Message;
use tokio::sync::Mutex;
use std::cmp;
use std::collections::HashSet;
use std::sync::Arc;
use std::{result::Result, time::Duration};

use crate::crypto::PrivateKey as CosmosPrivateKey;
Expand Down Expand Up @@ -67,7 +69,7 @@ pub async fn update_gravity_delegate_addresses(
};
let msg = Msg::new("/gravity.v1.MsgDelegateKeys", msg);

send_messages(contact, cosmos_key, gas_price, vec![msg], gas_adjustment).await
send_messages(contact.to_owned(), cosmos_key, gas_price, vec![msg], gas_adjustment).await
}

/// Sends tokens from Cosmos to Ethereum. These tokens will not be sent immediately instead
Expand All @@ -78,7 +80,7 @@ pub async fn send_to_eth(
amount: Coin,
bridge_fee: Coin,
gas_price: (f64, String),
contact: &Contact,
contact: Contact,
gas_adjustment: f64,
) -> Result<TxResponse, GravityError> {
if amount.denom != bridge_fee.denom {
Expand All @@ -90,7 +92,7 @@ pub async fn send_to_eth(
)));
}

let cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap();
let cosmos_address = cosmos_key.to_address(&contact.get_prefix())?;

let msg = proto::MsgSendToEthereum {
sender: cosmos_address.to_string(),
Expand All @@ -103,13 +105,13 @@ pub async fn send_to_eth(
}

pub async fn send_messages(
contact: &Contact,
contact: Contact,
cosmos_key: CosmosPrivateKey,
gas_price: (f64, String),
messages: Vec<Msg>,
gas_adjustment: f64,
) -> Result<TxResponse, GravityError> {
let cosmos_address = cosmos_key.to_address(&contact.get_prefix()).unwrap();
let cosmos_address = cosmos_key.to_address(&contact.get_prefix())?;

let fee_amount = Coin {
denom: gas_price.1.clone(),
Expand Down Expand Up @@ -149,19 +151,19 @@ pub async fn send_messages(
Ok(contact.wait_for_tx(response, TIMEOUT).await?)
}

pub async fn send_main_loop(
contact: &Contact,
pub async fn run_sender(
contact: Contact,
cosmos_key: CosmosPrivateKey,
gas_price: (f64, String),
mut rx: tokio::sync::mpsc::Receiver<Vec<Msg>>,
rx: Arc<Mutex<tokio::sync::mpsc::Receiver<Vec<Msg>>>>,
gas_adjustment: f64,
msg_batch_size: usize,
) {
while let Some(messages) = rx.recv().await {
while let Some(messages) = rx.lock().await.recv().await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we creating a lock around the tokio receiver? The async channels should be able to be filled and drained without having to have synchronous locking.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It comes down to satisfying the borrow checker while passing in the receiver to run_sender inside of the loop inside of send_main_loop.

To get the compiler to let me pass the receiver into each loop iteration (each cosmos sender thread restart) I have to wrap the receiver in an atomic reference counter so that I can clone without making a duplicate of the receiver. However, Arc<T> doesn't have the DerefMut trait, and the recv() method requires that the value be a mutable reference. To get a mutable reference of the contained object in the Arc, I have to guarantee that one and only one mutable reference exists at a time, which is where the lock comes in.

for msg_chunk in messages.chunks(msg_batch_size) {
let batch = msg_chunk.to_vec();
match send_messages(
contact,
contact.clone(),
cosmos_key,
gas_price.to_owned(),
msg_chunk.to_vec(),
Expand All @@ -180,7 +182,7 @@ pub async fn send_main_loop(
for msg in batch {
let msg_vec = vec![msg];
match send_messages(
contact,
contact.clone(),
cosmos_key,
gas_price.to_owned(),
msg_vec.clone(),
Expand All @@ -198,6 +200,35 @@ pub async fn send_main_loop(
}
}

/// manages the cosmos message sender task. `run_sender` should never return, but in case there are system-caused
/// panics, we want to make sure we attempt to restart it. if this function ever returns, the receiver will be dropped
/// and all tasks containing senders will panic.
pub async fn send_main_loop(
contact: Contact,
cosmos_key: CosmosPrivateKey,
gas_price: (f64, String),
rx: tokio::sync::mpsc::Receiver<Vec<Msg>>,
gas_adjustment: f64,
msg_batch_size: usize,
) {
let rx = Arc::new(Mutex::new(rx));
loop {
info!("starting cosmos sender");
if let Err(err) = tokio::task::spawn(
run_sender(
contact.clone(),
cosmos_key,
gas_price.to_owned(),
rx.clone(),
gas_adjustment,
msg_batch_size,
)
).await {
error!("cosmos sender failed with: {err:?}");
}
}
}

fn log_send_error(messages: &Vec<Msg>, err: GravityError) {
let msg_types = messages
.iter()
Expand Down
2 changes: 1 addition & 1 deletion orchestrator/gorc/src/commands/cosmos_to_eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Runnable for CosmosToEthCmd {
amount.clone(),
bridge_fee.clone(),
config.cosmos.gas_price.as_tuple(),
&contact,
contact,
1.0
)
.await;
Expand Down
6 changes: 5 additions & 1 deletion orchestrator/gorc/src/commands/orchestrator/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ impl Runnable for StartCommand {
self.orchestrator_only,
config.cosmos.msg_batch_size,
)
.await;
.await
.unwrap_or_else(|e| {
error!("orchestrator exited with error: {}", e);
std::process::exit(1);
});
})
.unwrap_or_else(|e| {
status_err!("executor exited with error: {}", e);
Expand Down
2 changes: 1 addition & 1 deletion orchestrator/gorc/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub struct CosmosSection {
pub grpc: String,
pub prefix: String,
pub gas_adjustment: f64,
pub msg_batch_size: u32,
pub msg_batch_size: usize,
pub gas_price: GasPrice,
}

Expand Down
Loading
Loading