Skip to content

Commit

Permalink
request blob when server side as well
Browse files Browse the repository at this point in the history
  • Loading branch information
miraclx committed Nov 15, 2024
1 parent d552e9f commit 6e371a2
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 38 deletions.
19 changes: 16 additions & 3 deletions crates/node/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,12 @@ impl Node {

match payload {
InitPayload::KeyShare => {
self.handle_key_share_request(context, our_identity, their_identity, stream)
self.handle_key_share_request(&context, our_identity, their_identity, stream)
.await?
}
InitPayload::BlobShare { blob_id } => {
self.handle_blob_share_request(
context,
&context,
our_identity,
their_identity,
blob_id,
Expand Down Expand Up @@ -234,8 +234,21 @@ impl Node {
context = updated;
}

if let Some(application) = self.ctx_manager.get_application(&application_id)? {
if !self.ctx_manager.has_blob_available(application.blob)? {
self.initiate_blob_share_process(
&context,
our_identity,
application.blob,
application.size,
stream,
)
.await?;
}
}

self.handle_state_sync_request(
context,
&mut context,
our_identity,
their_identity,
root_hash,
Expand Down
10 changes: 9 additions & 1 deletion crates/node/src/sync/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Node {

pub(super) async fn handle_blob_share_request(
&self,
context: Context,
context: &Context,
our_identity: PublicKey,
their_identity: PublicKey,
blob_id: BlobId,
Expand Down Expand Up @@ -186,6 +186,14 @@ impl Node {
)
.await?;

debug!(
context_id=%context.id,
our_identity=%our_identity,
their_identity=%their_identity,
blob_id=%blob_id,
"Blob share completed",
);

Ok(())
}
}
15 changes: 10 additions & 5 deletions crates/node/src/sync/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl Node {

pub(super) async fn handle_key_share_request(
&self,
context: Context,
context: &Context,
our_identity: PublicKey,
their_identity: PublicKey,
stream: &mut Stream,
Expand All @@ -72,21 +72,19 @@ impl Node {
)
.await?;

let mut context = context;
self.bidirectional_key_sync(&mut context, our_identity, their_identity, stream)
self.bidirectional_key_sync(context, our_identity, their_identity, stream)
.await
}

async fn bidirectional_key_sync(
&self,
context: &mut Context,
context: &Context,
our_identity: PublicKey,
their_identity: PublicKey,
stream: &mut Stream,
) -> eyre::Result<()> {
debug!(
context_id=%context.id,
our_root_hash=%context.root_hash,
our_identity=%our_identity,
their_identity=%their_identity,
"Starting bidirectional key sync",
Expand Down Expand Up @@ -139,6 +137,13 @@ impl Node {
self.ctx_manager
.update_sender_key(&context.id, &their_identity, &sender_key)?;

debug!(
context_id=%context.id,
our_identity=%our_identity,
their_identity=%their_identity,
"Key sync completed",
);

Ok(())
}
}
93 changes: 64 additions & 29 deletions crates/node/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,35 +34,63 @@ impl Node {
)
.await?;

let Some(ack) = recv(stream, self.sync_config.timeout, None).await? else {
bail!("connection closed while awaiting state sync handshake");
};
let mut pair = None;

let (root_hash, their_identity) = match ack {
StreamMessage::Init {
party_id,
payload:
InitPayload::StateSync {
root_hash,
application_id,
},
..
} => {
if application_id != context.application_id {
bail!(
"unexpected application id: expected {}, got {}",
context.application_id,
application_id
);
for _ in 1..=2 {
let Some(ack) = recv(stream, self.sync_config.timeout, None).await? else {
bail!("connection closed while awaiting state sync handshake");
};

let (root_hash, their_identity) = match ack {
StreamMessage::Init {
party_id,
payload:
InitPayload::StateSync {
root_hash,
application_id,
},
..
} => {
if application_id != context.application_id {
bail!(
"unexpected application id: expected {}, got {}",
context.application_id,
application_id
);
}

(root_hash, party_id)
}
StreamMessage::Init {
party_id: their_identity,
payload: InitPayload::BlobShare { blob_id },
..
} => {
self.handle_blob_share_request(
context,
our_identity,
their_identity,
blob_id,
stream,
)
.await?;

(root_hash, party_id)
}
unexpected @ (StreamMessage::Init { .. }
| StreamMessage::Message { .. }
| StreamMessage::OpaqueError) => {
bail!("unexpected message: {:?}", unexpected)
}
continue;
}
unexpected @ (StreamMessage::Init { .. }
| StreamMessage::Message { .. }
| StreamMessage::OpaqueError) => {
bail!("unexpected message: {:?}", unexpected)
}
};

pair = Some((root_hash, their_identity));

break;
}

let Some((root_hash, their_identity)) = pair else {
bail!("expected two state sync handshakes, got none");
};

if root_hash == context.root_hash {
Expand Down Expand Up @@ -105,7 +133,7 @@ impl Node {

pub(super) async fn handle_state_sync_request(
&self,
context: Context,
context: &mut Context,
our_identity: PublicKey,
their_identity: PublicKey,
root_hash: Hash,
Expand Down Expand Up @@ -148,9 +176,8 @@ impl Node {

let mut sqx_out = Sequencer::default();

let mut context = context;
self.bidirectional_sync(
&mut context,
context,
our_identity,
their_identity,
&mut sqx_out,
Expand Down Expand Up @@ -228,6 +255,14 @@ impl Node {
.await?;
}

debug!(
context_id=%context.id,
our_root_hash=%context.root_hash,
our_identity=%our_identity,
their_identity=%their_identity,
"State sync completed",
);

Ok(())
}
}

0 comments on commit 6e371a2

Please sign in to comment.