Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement IBC Validator Sync sub-protocol #63

Merged
merged 6 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 69 additions & 14 deletions contracts/consumer/converter/src/ibc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
use cosmwasm_std::entry_point;

use cosmwasm_std::{
from_slice, DepsMut, Env, Ibc3ChannelOpenResponse, IbcBasicResponse, IbcChannel,
IbcChannelCloseMsg, IbcChannelConnectMsg, IbcChannelOpenMsg, IbcChannelOpenResponse,
IbcPacketAckMsg, IbcPacketReceiveMsg, IbcPacketTimeoutMsg, IbcReceiveResponse,
from_slice, to_binary, DepsMut, Env, Event, Ibc3ChannelOpenResponse, IbcBasicResponse,
IbcChannel, IbcChannelCloseMsg, IbcChannelConnectMsg, IbcChannelOpenMsg,
IbcChannelOpenResponse, IbcMsg, IbcPacketAckMsg, IbcPacketReceiveMsg, IbcPacketTimeoutMsg,
IbcReceiveResponse, IbcTimeout,
};
use cw_storage_plus::Item;

use mesh_apis::ibc::{validate_channel_order, ProtocolVersion, PROTOCOL_NAME};
use mesh_apis::ibc::{
validate_channel_order, AckWrapper, AddValidator, ConsumerPacket, ProtocolVersion,
PROTOCOL_NAME,
};

use crate::error::ContractError;

Expand All @@ -20,6 +24,16 @@ const MIN_IBC_PROTOCOL_VERSION: &str = "1.0.0";
// IBC specific state
const IBC_CHANNEL: Item<IbcChannel> = Item::new("ibc_channel");

// Let those validator syncs take a day...
const DEFAULT_TIMEOUT: u64 = 24 * 60 * 60;

fn packet_timeout(env: &Env) -> IbcTimeout {
// No idea about their blocktime, but 24 hours ahead of our view of the clock
// should be decently in the future.
let timeout = env.block.time.plus_seconds(DEFAULT_TIMEOUT);
IbcTimeout::with_timestamp(timeout)
}

#[cfg_attr(not(feature = "library"), entry_point)]
/// enforces ordering and versioning constraints
pub fn ibc_channel_open(
Expand Down Expand Up @@ -63,7 +77,7 @@ pub fn ibc_channel_open(
/// once it's established, we store data
pub fn ibc_channel_connect(
deps: DepsMut,
_env: Env,
env: Env,
msg: IbcChannelConnectMsg,
) -> Result<IbcBasicResponse, ContractError> {
// ensure we have no channel yet
Expand All @@ -89,8 +103,27 @@ pub fn ibc_channel_connect(
// store the channel
IBC_CHANNEL.save(deps.storage, &channel)?;

// FIXME: later we start with sending the validator sync packets
Ok(IbcBasicResponse::default())
// Send a validator sync packet to arrive with the newly established channel
let validators = deps.querier.query_all_validators()?;
let updates = validators
.into_iter()
.map(|v| AddValidator {
valoper: v.address,
// TODO: not yet available in CosmWasm APIs
pub_key: "TODO".to_string(),
// Use current height/time as start height/time (no slashing before mesh starts)
start_height: env.block.height,
start_time: env.block.time.seconds(),
})
.collect();
let packet = ConsumerPacket::AddValidators(updates);
let msg = IbcMsg::SendPacket {
channel_id: channel.endpoint.channel_id,
data: to_binary(&packet)?,
timeout: packet_timeout(&env),
};

Ok(IbcBasicResponse::new().add_message(msg))
}

#[cfg_attr(not(feature = "library"), entry_point)]
Expand All @@ -117,21 +150,43 @@ pub fn ibc_packet_receive(
}

#[cfg_attr(not(feature = "library"), entry_point)]
/// never should be called as we do not send packets
/// We get acks on sync state without much to do.
/// If it succeeded, take no action. If it errored, we can't do anything else and let it go.
ethanfrey marked this conversation as resolved.
Show resolved Hide resolved
/// We just log the error cases so they can be detected.
pub fn ibc_packet_ack(
_deps: DepsMut,
_env: Env,
_msg: IbcPacketAckMsg,
msg: IbcPacketAckMsg,
) -> Result<IbcBasicResponse, ContractError> {
Ok(IbcBasicResponse::new().add_attribute("action", "ibc_packet_ack"))
let ack: AckWrapper = from_slice(&msg.acknowledgement.data)?;
let mut res = IbcBasicResponse::new();
match ack {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼 Taking shape.

AckWrapper::Result(_) => {}
AckWrapper::Error(e) => {
// The wasmd framework will label this with the contract_addr, which helps us find the port and issue.
// Provide info to find the actual packet.
let event = Event::new("mesh_ibc_error")
.add_attribute("error", e)
.add_attribute("channel", msg.original_packet.src.channel_id)
.add_attribute("sequence", msg.original_packet.sequence.to_string());
res = res.add_event(event);
}
}
Ok(res)
}

#[cfg_attr(not(feature = "library"), entry_point)]
/// never should be called as we do not send packets
/// The most we can do here is retry the packet, hoping it will eventually arrive.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼 Perhaps we can introduce number of retries in the future.

pub fn ibc_packet_timeout(
_deps: DepsMut,
_env: Env,
_msg: IbcPacketTimeoutMsg,
env: Env,
msg: IbcPacketTimeoutMsg,
) -> Result<IbcBasicResponse, ContractError> {
Ok(IbcBasicResponse::new().add_attribute("action", "ibc_packet_timeout"))
// Play it again, Sam.
let msg = IbcMsg::SendPacket {
channel_id: msg.packet.src.channel_id,
data: msg.packet.data,
timeout: packet_timeout(&env),
};
Ok(IbcBasicResponse::new().add_message(msg))
}
183 changes: 183 additions & 0 deletions contracts/provider/external-staking/src/crdt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use cosmwasm_schema::cw_serde;
use cosmwasm_std::{Order, StdError, StdResult, Storage};
use cw_storage_plus::{Bound, Map};

// Question: Do we need to add more info here if we want to keep historical info for slashing.
// Would we ever need the pubkeys for a Tombstoned validator? Or do we consider it already slashed and therefore unslashable?
#[cw_serde]
pub enum ValidatorState {
Active(ActiveState),
Tombstoned {},
}

impl ValidatorState {
pub fn is_active(&self) -> bool {
matches!(self, ValidatorState::Active(_))
}
}

#[cw_serde]
/// Active state maintains a sorted list of updates with no duplicates.
/// The first one is the one with the highest start_height.
pub struct ActiveState(Vec<ValUpdate>);
ethanfrey marked this conversation as resolved.
Show resolved Hide resolved

impl ActiveState {
/// Add one more element to this list, maintaining the constraints
pub fn insert_unique(&mut self, update: ValUpdate) {
self.0.push(update);
self.0.sort_by(|a, b| b.start_height.cmp(&a.start_height));
self.0.dedup();
}
}

#[cw_serde]
pub struct ValUpdate {
pub pub_key: String,
pub start_height: u64,
pub start_time: u64,
}

impl ValUpdate {
pub fn new(pub_key: impl Into<String>, start_height: u64, start_time: u64) -> Self {
ValUpdate {
pub_key: pub_key.into(),
start_height,
start_time,
}
}
}

/// This holds all CRDT related state and logic (related to validators)
pub struct CrdtState<'a> {
validators: Map<'a, &'a str, ValidatorState>,
}

impl<'a> CrdtState<'a> {
ethanfrey marked this conversation as resolved.
Show resolved Hide resolved
pub const fn new() -> Self {
CrdtState {
validators: Map::new("crdt.validators"),
}
}

pub fn add_validator(
&self,
storage: &mut dyn Storage,
valoper: &str,
update: ValUpdate,
) -> Result<(), StdError> {
let mut state = self
.validators
.may_load(storage, valoper)?
.unwrap_or_else(|| ValidatorState::Active(ActiveState(vec![])));

match &mut state {
ValidatorState::Active(active) => {
// add to the set, ensuring there are no duplicates
active.insert_unique(update);
}
ValidatorState::Tombstoned {} => {
// we just silently ignore it here
}
}

self.validators.save(storage, valoper, &state)
}

pub fn remove_validator(
&self,
storage: &mut dyn Storage,
valoper: &str,
) -> Result<(), StdError> {
let state = ValidatorState::Tombstoned {};
self.validators.save(storage, valoper, &state)
}

pub fn is_active_validator(&self, storage: &dyn Storage, valoper: &str) -> StdResult<bool> {
let active = self
.validators
.may_load(storage, valoper)?
.map(|s| s.is_active())
.unwrap_or(false);
Ok(active)
}

/// This returns the valoper address of all active validators
pub fn list_active_validators(
&self,
storage: &dyn Storage,
start_after: Option<&str>,
limit: usize,
) -> StdResult<Vec<String>> {
let start = start_after.map(Bound::exclusive);
self.validators
.range(storage, start, None, Order::Ascending)
.filter_map(|r| match r {
Ok((valoper, ValidatorState::Active(_))) => Some(Ok(valoper)),
Ok((_, ValidatorState::Tombstoned {})) => None,
Err(e) => Some(Err(e)),
})
.take(limit)
.collect()
}
}

#[cfg(test)]
mod tests {
use super::*;

use cosmwasm_std::MemoryStorage;

fn mock_update(start_height: u64) -> ValUpdate {
ValUpdate {
pub_key: "TODO".to_string(),
start_height,
start_time: 1687339542,
}
}

// We add three new validators, and remove one
#[test]
fn happy_path() {
let mut storage = MemoryStorage::new();
let crdt = CrdtState::new();

crdt.add_validator(&mut storage, "alice", mock_update(123))
.unwrap();
crdt.add_validator(&mut storage, "bob", mock_update(200))
.unwrap();
crdt.add_validator(&mut storage, "carl", mock_update(303))
.unwrap();
crdt.remove_validator(&mut storage, "bob").unwrap();

assert!(crdt.is_active_validator(&storage, "alice").unwrap());
assert!(!crdt.is_active_validator(&storage, "bob").unwrap());
assert!(crdt.is_active_validator(&storage, "carl").unwrap());

let active = crdt.list_active_validators(&storage, None, 10).unwrap();
assert_eq!(active, vec!["alice".to_string(), "carl".to_string()]);
}

// Like happy path, but we remove bob before he was ever added
#[test]
fn remove_before_add_works() {
let mut storage = MemoryStorage::new();
let crdt = CrdtState::new();

crdt.remove_validator(&mut storage, "bob").unwrap();
crdt.add_validator(&mut storage, "alice", mock_update(123))
.unwrap();
crdt.add_validator(&mut storage, "bob", mock_update(200))
.unwrap();
crdt.add_validator(&mut storage, "carl", mock_update(303))
.unwrap();

assert!(crdt.is_active_validator(&storage, "alice").unwrap());
assert!(!crdt.is_active_validator(&storage, "bob").unwrap());
assert!(crdt.is_active_validator(&storage, "carl").unwrap());

let active = crdt.list_active_validators(&storage, None, 10).unwrap();
assert_eq!(active, vec!["alice".to_string(), "carl".to_string()]);
}

// TODO: test key rotation later
}
55 changes: 45 additions & 10 deletions contracts/provider/external-staking/src/ibc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@ use cosmwasm_std::{
IbcPacketAckMsg, IbcPacketReceiveMsg, IbcPacketTimeoutMsg, IbcReceiveResponse,
};
use cw_storage_plus::Item;
use mesh_apis::ibc::{validate_channel_order, ProtocolVersion};
use mesh_apis::ibc::{
ack_success, validate_channel_order, AddValidator, AddValidatorsAck, ConsumerPacket,
ProtocolVersion, RemoveValidatorsAck,
};

use crate::{error::ContractError, msg::AuthorizedEndpoint};
use crate::{
crdt::{CrdtState, ValUpdate},
error::ContractError,
msg::AuthorizedEndpoint,
};

/// This is the maximum version of the Mesh Security protocol that we support
const SUPPORTED_IBC_PROTOCOL_VERSION: &str = "1.0.0";
Expand All @@ -22,6 +29,8 @@ pub const AUTH_ENDPOINT: Item<AuthorizedEndpoint> = Item::new("auth_endpoint");
// TODO: expected endpoint
pub const IBC_CHANNEL: Item<IbcChannel> = Item::new("ibc_channel");

pub const VAL_CRDT: CrdtState = CrdtState::new();

#[cfg_attr(not(feature = "library"), entry_point)]
/// enforces ordering and versioning constraints
pub fn ibc_channel_open(
Expand Down Expand Up @@ -89,8 +98,6 @@ pub fn ibc_channel_connect(
}

#[cfg_attr(not(feature = "library"), entry_point)]
/// On closed channel, we take all tokens from reflect contract to this contract.
/// We also delete the channel entry from accounts.
pub fn ibc_channel_close(
_deps: DepsMut,
_env: Env,
Expand All @@ -100,15 +107,43 @@ pub fn ibc_channel_close(
}

#[cfg_attr(not(feature = "library"), entry_point)]
/// we look for a the proper reflect contract to relay to and send the message
/// We cannot return any meaningful response value as we do not know the response value
/// of execution. We just return ok if we dispatched, error if we failed to dispatch
// this accepts validator sync packets and updates the crdt state
pub fn ibc_packet_receive(
_deps: DepsMut,
deps: DepsMut,
_env: Env,
_msg: IbcPacketReceiveMsg,
msg: IbcPacketReceiveMsg,
) -> Result<IbcReceiveResponse, ContractError> {
todo!();
// There is only one channel, so we don't need to switch.
// We also don't care about packet sequence as this is fully commutative.
let packet: ConsumerPacket = from_slice(&msg.packet.data)?;
let ack = match packet {
ConsumerPacket::AddValidators(to_add) => {
for AddValidator {
valoper,
pub_key,
start_height,
start_time,
} in to_add
{
let update = ValUpdate {
pub_key,
start_height,
start_time,
};
VAL_CRDT.add_validator(deps.storage, &valoper, update)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we capture / map the error(s) here, and return ack_fail instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done in wasmd.

If contract panics, it aborts the whole tx (no ack written)
If contract returns error, it wraps the error message in this "standard" struct, which is what I use now as AckWrapper.

}
ack_success(&AddValidatorsAck {})?
}
ConsumerPacket::RemoveValidators(to_remove) => {
for valoper in to_remove {
VAL_CRDT.remove_validator(deps.storage, &valoper)?;
}
ack_success(&RemoveValidatorsAck {})?
}
};

// return empty success ack
Ok(IbcReceiveResponse::new().set_ack(ack))
}

#[cfg_attr(not(feature = "library"), entry_point)]
Expand Down
Loading