Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Platov <[email protected]>
  • Loading branch information
aplatoff committed Jan 8, 2025
1 parent e1e1368 commit c8440d0
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 53 deletions.
22 changes: 15 additions & 7 deletions huly/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
//

use crate::id::{AccId, OrgId};
use crate::membership::{Membership, MembershipRequest, MembershipRequestType};
use crate::membership::{
Empty, Membership, MembershipRequest, MembershipRequestType, ServeMeRequestType,
};
use crate::message::{Message, SignedMessage, SignedMessageType};
use anyhow::Result;
use iroh::{Endpoint, NodeId, SecretKey};
use tokio::io::AsyncWriteExt;

pub async fn request_membership(
secret_key: &SecretKey,
Expand All @@ -20,16 +23,21 @@ pub async fn request_membership(
let encoded = MembershipRequestType::encode(&request)?;
let signed = SignedMessage::sign(secret_key, encoded)?;
let encoded = SignedMessageType::encode(&signed)?;

encoded.write_async(&mut send).await?;
println!("sent membership request: {:?}", request);

let response = Message::read_async(&mut recv).await?;

// if let Some(encoded) = encoded {
// let message = Message::decode(encoded.as_ref())?;
println!("got membership response: {:?}", response);
// } else {
// println!("unexpected end of stream?")
// }

let request = ServeMeRequestType::encode(&Empty {})?;
request.write_async(&mut send).await?;
println!("sent serve me request");

let response = Message::read_async(&mut recv).await?;
println!("got serve me response: {:?}", response);

tokio::signal::ctrl_c().await?;

send.finish()?;
send.stopped().await?;
Expand Down
60 changes: 39 additions & 21 deletions huly/src/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,38 +79,56 @@ impl ProtocolHandler for Membership {

// fetch account's organizations

let topic = TopicId::from_bytes(account_id.into());

println!("subscribing");
let (sender, receiver) = this.gossip.subscribe_and_join(topic, vec![]).await?.split();
let (mut send, mut recv) = connection.accept_bi().await?;

println!("spawning account loop");
let x = tokio::spawn(account_loop(sender, receiver));
println!("accepted connection");

println!("spawned");
let (mut send, mut recv) = connection.accept_bi().await?;
loop {
let message = Message::read_async(&mut recv).await?;
match message.get_type_unwrap() {
MembershipRequestType::TAG => {
println!("got message");
match message.get_type() {
SignedMessageType::TAG => {
println!("got signed message");

let signed = SignedMessageType::decode(&message)?;
if signed.verify()? != device_id.into() {
anyhow::bail!("message must be signed by the device");
}

// let message = Message::decode(signed.get_payload().as_bytes())?;
let request = MembershipRequestType::decode(signed.get_message())?;
let device = request.device_ownership.device;
let account = request.device_ownership.account;
match signed.get_message().get_type() {
MembershipRequestType::TAG => {
let request = MembershipRequestType::decode(signed.get_message())?;
let device = request.device_ownership.device;
let account = request.device_ownership.account;

this.db.insert_device_account(device, account)?;
println!("added device `{}` to account `{}`", device, account);

let response = MembershipResponse::new(true, None);
let encoded = MembershipResponseType::encode(&response)?;
let signed =
SignedMessage::sign(&this.endpoint.secret_key(), encoded)?;
let encoded = SignedMessageType::encode(&signed)?;
encoded.write_async(&mut send).await?;
}
_ => anyhow::bail!("unknown message type"),
}
}
ServeMeRequestType::TAG => {
println!("got serve me request");
let topic = TopicId::from_bytes(account_id.into());

println!("subscribing");
let (sender, receiver) =
this.gossip.subscribe_and_join(topic, vec![]).await?.split();

this.db.insert_device_account(device, account)?;
println!("added device `{}` to account `{}`", device, account);
println!("spawning account loop");
let x = tokio::spawn(account_loop(sender, receiver));
println!("spawned");

let response = MembershipResponse::new(true, None);
let encoded = MembershipResponseType::encode(&response)?;
let signed = SignedMessage::sign(&this.endpoint.secret_key(), encoded)?;
let message = SignedMessageType::encode(&signed)?;
message.write_async(&mut send).await?;
let response = Empty {};
let encoded = ServeMeResponseType::encode(&response)?;
encoded.write_async(&mut send).await?;
}
_ => anyhow::bail!("unknown message type"),
}
Expand Down
31 changes: 6 additions & 25 deletions huly/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,9 @@ impl Data {
}
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum TypeParams {
Zero(),
One(Tag),
Two(Tag, Tag),
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Message {
message_type: Tag,
type_params: TypeParams,
data_format: Format,
data: Data,
}
Expand All @@ -69,18 +61,20 @@ impl Message {
}

pub async fn read_async(mut reader: impl AsyncRead + Unpin) -> Result<Self> {
let mut buffer = BytesMut::new();
let size = reader.read_u32().await?;
if size > Self::MAX_MESSAGE_SIZE as u32 {
anyhow::bail!("Incoming message exceeds the maximum message size");
}
let size = usize::try_from(size).context("frame larger than usize")?;
buffer.reserve(size);
loop {
let mut buffer = BytesMut::with_capacity(size);
let mut remaining = size;

while remaining > 0 {
let r = reader.read_buf(&mut buffer).await?;
if r == 0 {
break;
anyhow::bail!("Unexpected EOF");
}
remaining = remaining.saturating_sub(r);
}
Self::decode(&buffer)
}
Expand All @@ -101,18 +95,6 @@ impl Message {
self.message_type
}

pub fn get_type_unwrap(&self) -> Tag {
match self.type_params {
TypeParams::Zero() => self.message_type,
TypeParams::One(tag) => tag,
TypeParams::Two(_, _) => self.message_type,
}
}

pub fn get_type_params(&self) -> TypeParams {
self.type_params
}

pub fn get_payload<T>(&self) -> Result<T>
where
T: DeserializeOwned,
Expand Down Expand Up @@ -152,7 +134,6 @@ where
pub fn encode(message: &T) -> Result<Message> {
Ok(Message {
message_type: Self::TAG,
type_params: TypeParams::Zero(),
data_format: POSTCARD_FORMAT,
data: Data::Inline(postcard::to_stdvec(message)?.into()),
})
Expand Down

0 comments on commit c8440d0

Please sign in to comment.