Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Update Op #915

Merged
merged 22 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "stdlib"]
path = stdlib
url = ../freenet-stdlib
url = https://github.com/freenet/freenet-stdlib
6 changes: 4 additions & 2 deletions crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,14 +391,16 @@ pub(crate) mod test {
}
}
val if (35..80).contains(&val) => {
let new_state = UpdateData::State(State::from(self.random_byte_vec()));
if let Some(contract) = self.choose(&state.existing_contracts) {
let delta = UpdateData::Delta(StateDelta::from(self.random_byte_vec()));
// TODO: It will be used when the delta updates are available
// let delta = UpdateData::Delta(StateDelta::from(self.random_byte_vec()));
if !for_this_peer {
continue;
}
let request = ContractRequest::Update {
key: contract.key().clone(),
data: delta,
data: new_state,
};
if state.owns_contracts.contains(&contract.key()) {
return Some(request.into());
Expand Down
30 changes: 30 additions & 0 deletions crates/core/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,36 @@ where
error
})?;
}
ContractHandlerEvent::UpdateQuery {
key,
state,
related_contracts,
} => {
let update_result = contract_handler
.executor()
.upsert_contract_state(
key.clone(),
Either::Left(state.clone()),
related_contracts,
None,
)
.instrument(tracing::info_span!("upsert_contract_state", %key))
.await;

contract_handler
.channel()
.send_to_sender(
id,
ContractHandlerEvent::UpdateResponse {
new_value: update_result.map_err(Into::into),
},
)
.await
.map_err(|error| {
tracing::debug!(%error, "shutting down contract handler");
error
})?;
}
_ => unreachable!(),
}
}
Expand Down
10 changes: 8 additions & 2 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,14 +399,20 @@ struct UpdateContract {
impl ComposeNetworkMessage<operations::update::UpdateOp> for UpdateContract {
fn initiate_op(self, op_manager: &OpManager) -> operations::update::UpdateOp {
let UpdateContract { key, new_state } = self;
operations::update::start_op(key, new_state, op_manager.ring.max_hops_to_live)
let related_contracts = RelatedContracts::default();
operations::update::start_op(
key,
new_state,
related_contracts,
op_manager.ring.max_hops_to_live,
)
}

async fn resume_op(
op: operations::update::UpdateOp,
op_manager: &OpManager,
) -> Result<(), OpError> {
operations::update::request_update(op_manager, op, None).await
operations::update::request_update(op_manager, op).await
}
}

Expand Down
17 changes: 9 additions & 8 deletions crates/core/src/contract/executor/mock_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,15 @@ impl ContractExecutor for Executor<MockRuntime> {
.map_err(ExecutorError::other)?;
return Ok(incoming_state);
}
// (Either::Left(_), None) => {
// return Err(ExecutorError::request(RequestError::from(
// StdContractError::Get {
// key: key.clone(),
// cause: "Missing contract or parameters".into(),
// },
// )));
// }
(Either::Left(incoming_state), None) => {
// update case

self.state_store
.update(&key, incoming_state.clone())
.await
.map_err(ExecutorError::other)?;
return Ok(incoming_state);
}
(update, contract) => unreachable!("{update:?}, {contract:?}"),
}
}
Expand Down
21 changes: 21 additions & 0 deletions crates/core/src/contract/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,16 @@ pub(crate) enum ContractHandlerEvent {
key: ContractKey,
response: Result<StoreResponse, ExecutorError>,
},
/// Updates a supposedly existing contract in this node
UpdateQuery {
key: ContractKey,
state: WrappedState,
related_contracts: RelatedContracts<'static>,
},
/// The response to an update query
UpdateResponse {
new_value: Result<WrappedState, ExecutorError>,
},
}

impl std::fmt::Display for ContractHandlerEvent {
Expand Down Expand Up @@ -399,6 +409,17 @@ impl std::fmt::Display for ContractHandlerEvent {
write!(f, "get query failed {{ {key} }}",)
}
},
ContractHandlerEvent::UpdateQuery { key, .. } => {
write!(f, "update query {{ {key} }}")
}
ContractHandlerEvent::UpdateResponse { new_value } => match new_value {
Ok(v) => {
write!(f, "update query response {{ {v} }}",)
}
Err(e) => {
write!(f, "update query failed {{ {e} }}",)
}
},
}
}
}
Expand Down
53 changes: 47 additions & 6 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use std::{
};

use either::Either;
use freenet_stdlib::client_api::{ClientRequest, ContractRequest, ErrorKind};
use freenet_stdlib::{
client_api::{ClientRequest, ContractRequest, ErrorKind},
prelude::{RelatedContracts, State, WrappedState},
};
use libp2p::{identity, multiaddr::Protocol, Multiaddr, PeerId as Libp2pPeerId};
use serde::{Deserialize, Serialize};
use tracing::Instrument;
Expand All @@ -36,7 +39,7 @@ use crate::{
message::{NetMessage, NodeEvent, Transaction, TransactionType},
operations::{
connect::{self, ConnectOp},
get, put, subscribe, OpEnum, OpError, OpOutcome,
get, put, subscribe, update, OpEnum, OpError, OpOutcome,
},
ring::{Location, PeerKeyLocation},
router::{RouteEvent, RouteOutcome},
Expand Down Expand Up @@ -388,15 +391,38 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc<OpM
tracing::error!("{}", err);
}
}
ContractRequest::Update {
key: _key,
data: _delta,
} => {
ContractRequest::Update { key, data } => {
// FIXME: perform updates
tracing::debug!(
this_peer = %op_manager.ring.peer_key,
"Received update from user event",
);
let state = match data {
freenet_stdlib::prelude::UpdateData::State(s) => s,
_ => {
unreachable!();
}
};

let wrapped_state = WrappedState::from(state.into_bytes());

let related_contracts = RelatedContracts::default();

let op = update::start_op(
key,
wrapped_state,
related_contracts,
op_manager.ring.max_hops_to_live,
);

let _ = op_manager
.ch_outbound
.waiting_for_transaction_result(op.id, client_id)
.await;

if let Err(err) = update::request_update(&op_manager, op).await {
tracing::error!("request update error {}", err)
}
}
ContractRequest::Get {
key,
Expand Down Expand Up @@ -694,6 +720,21 @@ async fn process_message<CB>(
)
.await;
}
NetMessage::Update(op) => {
let op_result =
handle_op_request::<update::UpdateOp, _>(&op_manager, &mut conn_manager, op)
.await;
handle_op_not_available!(op_result);
break report_result(
tx,
op_result,
&op_manager,
executor_callback,
cli_req,
&mut *event_listener,
)
.await;
}
_ => break,
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/node/testing_impl/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl<ER> Builder<ER> {
contract::contract_handling(contract_handler)
.instrument(tracing::info_span!(parent: parent_span.clone(), "contract_handling")),
);

let mut config = super::RunnerConfig {
peer_key: self.peer_key,
gateways,
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ where
sender = s;
op.process_message(network_bridge, op_manager, msg).await
};

handle_op_result(op_manager, network_bridge, result, tx, sender).await
}

Expand All @@ -90,6 +91,7 @@ where
match result {
Err(OpError::StatePushed) => {
// do nothing and continue, the operation will just continue later on
tracing::debug!("entered in state pushed to continue with op");
return Ok(None);
}
Err(err) => {
Expand Down Expand Up @@ -134,6 +136,7 @@ where
}) => {
op_manager.completed(tx_id);
// finished the operation at this node, informing back

if let Some(target) = msg.target().cloned() {
network_bridge.send(&target.peer, msg).await?;
}
Expand Down
7 changes: 6 additions & 1 deletion crates/core/src/operations/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ impl Operation for PutOp {
};

let broadcast_to = op_manager.get_broadcast_targets(&key, &sender.peer);

match try_to_broadcast(
*id,
last_hop,
Expand Down Expand Up @@ -563,7 +564,11 @@ impl Operation for PutOp {
}

impl OpManager {
fn get_broadcast_targets(&self, key: &ContractKey, sender: &PeerId) -> Vec<PeerKeyLocation> {
pub(crate) fn get_broadcast_targets(
&self,
key: &ContractKey,
sender: &PeerId,
) -> Vec<PeerKeyLocation> {
let mut subscribers = self
.ring
.subscribers_of(key)
Expand Down
Loading