Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update quic-rpc and cloud services schema #2795

Merged
merged 3 commits into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ rust-version = "1.81"

[workspace.dependencies]
# First party dependencies
sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", rev = "bbc69c5cb2" }
sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", rev = "99d59e8fab" }

# Third party dependencies used by one or more of our crates
async-channel = "2.3"
Expand Down
8 changes: 4 additions & 4 deletions apps/mobile/modules/sd-core/ios/build-rust.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ mkdir -p "$TARGET_DIRECTORY"
TARGET_DIRECTORY="$(CDPATH='' cd -- "$TARGET_DIRECTORY" && pwd -P)"

TARGET_CONFIG=debug
# if [ "${CONFIGURATION:-}" = "Release" ]; then
# set -- --release
# TARGET_CONFIG=release
# fi
if [ "${CONFIGURATION:-}" = "Release" ]; then
set -- --release
TARGET_CONFIG=release
fi

trap 'if [ -e "${CARGO_CONFIG}.bak" ]; then mv "${CARGO_CONFIG}.bak" "$CARGO_CONFIG"; fi' EXIT

Expand Down
2 changes: 1 addition & 1 deletion core/crates/cloud-services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ anyhow = "1.0.86"
dashmap = "6.1.0"
iroh-net = { version = "0.27", features = ["discovery-local-network", "iroh-relay"] }
paste = "=1.0.15"
quic-rpc = { version = "0.12.1", features = ["quinn-transport"] }
quic-rpc = { version = "0.13.0", features = ["quinn-transport"] }
quinn = { package = "iroh-quinn", version = "0.11" }
# Using whatever version of reqwest that reqwest-middleware uses, just putting here to enable some features
reqwest = { version = "0.12", features = ["json", "native-tls-vendored", "stream"] }
Expand Down
21 changes: 9 additions & 12 deletions core/crates/cloud-services/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::p2p::{NotifyUser, UserResponse};

use sd_cloud_schema::{Client, Service, ServicesALPN};
use sd_cloud_schema::{Client, Request, Response, ServicesALPN};

use std::{net::SocketAddr, sync::Arc, time::Duration};

use futures::Stream;
use iroh_net::relay::RelayUrl;
use quic_rpc::{transport::quinn::QuinnConnection, RpcClient};
use quic_rpc::{transport::quinn::QuinnConnection, RpcClient, RpcMessage};
use quinn::{crypto::rustls::QuicClientConfig, ClientConfig, Endpoint};
use reqwest::{IntoUrl, Url};
use reqwest_middleware::{reqwest, ClientBuilder, ClientWithMiddleware};
Expand All @@ -19,10 +19,10 @@ use super::{
};

#[derive(Debug, Default, Clone)]
enum ClientState {
enum ClientState<In: RpcMessage, Out: RpcMessage> {
#[default]
NotConnected,
Connected(Client<QuinnConnection<Service>, Service>),
Connected(Client<QuinnConnection<In, Out>>),
}

/// Cloud services are a optional feature that allows you to interact with the cloud services
Expand All @@ -35,7 +35,7 @@ enum ClientState {
/// that core can always operate without the cloud services.
#[derive(Debug)]
pub struct CloudServices {
client_state: Arc<RwLock<ClientState>>,
client_state: Arc<RwLock<ClientState<Response, Request>>>,
get_cloud_api_address: Url,
http_client: ClientWithMiddleware,
domain_name: String,
Expand Down Expand Up @@ -157,7 +157,7 @@ impl CloudServices {
http_client: &ClientWithMiddleware,
get_cloud_api_address: Url,
domain_name: String,
) -> Result<Client<QuinnConnection<Service>, Service>, Error> {
) -> Result<Client<QuinnConnection<Response, Request>>, Error> {
let cloud_api_address = http_client
.get(get_cloud_api_address)
.send()
Expand Down Expand Up @@ -256,9 +256,6 @@ impl CloudServices {
.map_err(Error::FailedToCreateEndpoint)?;
endpoint.set_default_client_config(client_config);

// TODO(@fogodev): It's possible that we can't keep the connection alive all the time,
// and need to use single shot connections. I will only be sure when we have
// actually battle-tested the cloud services in core.
Ok(Client::new(RpcClient::new(QuinnConnection::new(
endpoint,
cloud_api_address,
Expand All @@ -271,9 +268,9 @@ impl CloudServices {
/// If the client is not connected, it will try to connect to the cloud services.
/// Available routes documented in
/// [`sd_cloud_schema::Service`](https://github.com/spacedriveapp/cloud-services-schema).
pub async fn client(&self) -> Result<Client<QuinnConnection<Service>, Service>, Error> {
if let ClientState::Connected(client) = { self.client_state.read().await.clone() } {
return Ok(client);
pub async fn client(&self) -> Result<Client<QuinnConnection<Response, Request>>, Error> {
if let ClientState::Connected(client) = { &*self.client_state.read().await } {
return Ok(client.clone());
}

// If we're not connected, we need to try to connect.
Expand Down
16 changes: 9 additions & 7 deletions core/crates/cloud-services/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use sd_cloud_schema::{cloud_p2p, sync::groups, Service};
use sd_cloud_schema::{cloud_p2p, sync::groups, Request, Response};
use sd_utils::error::FileIOError;

use std::{io, net::AddrParseError};
Expand Down Expand Up @@ -68,7 +68,9 @@ pub enum Error {
#[error("Failed to connect to Cloud P2P node: {0}")]
ConnectToCloudP2PNode(anyhow::Error),
#[error("Communication error with Cloud P2P node: {0}")]
CloudP2PRpcCommunication(#[from] rpc::Error<QuinnConnection<cloud_p2p::Service>>),
CloudP2PRpcCommunication(
#[from] rpc::Error<QuinnConnection<cloud_p2p::Response, cloud_p2p::Request>>,
),
#[error("Cloud P2P not initialized")]
CloudP2PNotInitialized,
#[error("Failed to initialize LocalSwarmDiscovery: {0}")]
Expand All @@ -78,15 +80,15 @@ pub enum Error {

// Communication errors
#[error("Failed to communicate with RPC backend: {0}")]
RpcCommunication(#[from] rpc::Error<QuinnConnection<Service>>),
RpcCommunication(#[from] rpc::Error<QuinnConnection<Response, Request>>),
#[error("Failed to communicate with Server Streaming RPC backend: {0}")]
ServerStreamCommunication(#[from] server_streaming::Error<QuinnConnection<Service>>),
ServerStreamCommunication(#[from] server_streaming::Error<QuinnConnection<Response, Request>>),
#[error("Failed to receive next response from Server Streaming RPC backend: {0}")]
ServerStreamRecv(#[from] server_streaming::ItemError<QuinnConnection<Service>>),
ServerStreamRecv(#[from] server_streaming::ItemError<QuinnConnection<Response, Request>>),
#[error("Failed to communicate with Bidi Streaming RPC backend: {0}")]
BidiStreamCommunication(#[from] bidi_streaming::Error<QuinnConnection<Service>>),
BidiStreamCommunication(#[from] bidi_streaming::Error<QuinnConnection<Response, Request>>),
#[error("Failed to receive next response from Bidi Streaming RPC backend: {0}")]
BidiStreamRecv(#[from] bidi_streaming::ItemError<QuinnConnection<Service>>),
BidiStreamRecv(#[from] bidi_streaming::ItemError<QuinnConnection<Response, Request>>),
#[error("Error from backend: {0}")]
Backend(#[from] sd_cloud_schema::Error),
#[error("Failed to get access token from refresher: {0}")]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{token_refresher::TokenRefresher, Error};

use sd_cloud_schema::{
cloud_p2p::{Client, CloudP2PALPN, Service},
cloud_p2p::{Client, CloudP2PALPN},
devices,
sync::groups,
};
Expand All @@ -24,8 +24,7 @@ pub async fn dispatch_notifier(
devices: Option<(Instant, Vec<(devices::PubId, NodeId)>)>,
msgs_tx: flume::Sender<Message>,
cloud_services: sd_cloud_schema::Client<
QuinnConnection<sd_cloud_schema::Service>,
sd_cloud_schema::Service,
QuinnConnection<sd_cloud_schema::Response, sd_cloud_schema::Request>,
>,
token_refresher: TokenRefresher,
endpoint: Endpoint,
Expand Down Expand Up @@ -64,8 +63,7 @@ async fn notify_peers(
device_pub_id: devices::PubId,
devices: Option<(Instant, Vec<(devices::PubId, NodeId)>)>,
cloud_services: sd_cloud_schema::Client<
QuinnConnection<sd_cloud_schema::Service>,
sd_cloud_schema::Service,
QuinnConnection<sd_cloud_schema::Response, sd_cloud_schema::Request>,
>,
token_refresher: TokenRefresher,
endpoint: Endpoint,
Expand Down Expand Up @@ -130,7 +128,7 @@ async fn connect_and_send_notification(
connection_id: &NodeId,
endpoint: &Endpoint,
) -> Result<(), Error> {
let client = Client::new(RpcClient::new(QuinnConnection::<Service>::from_connection(
let client = Client::new(RpcClient::new(QuinnConnection::from_connection(
endpoint
.connect(*connection_id, CloudP2PALPN::LATEST)
.await
Expand Down
26 changes: 11 additions & 15 deletions core/crates/cloud-services/src/p2p/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ pub struct Runner {
current_device_pub_id: devices::PubId,
token_refresher: TokenRefresher,
cloud_services: sd_cloud_schema::Client<
QuinnConnection<sd_cloud_schema::Service>,
sd_cloud_schema::Service,
QuinnConnection<sd_cloud_schema::Response, sd_cloud_schema::Request>,
>,
msgs_tx: flume::Sender<Message>,
endpoint: Endpoint,
Expand Down Expand Up @@ -112,12 +111,14 @@ impl Clone for Runner {
}

struct PendingSyncGroupJoin {
channel: RpcChannel<Service, QuinnServerEndpoint<Service>>,
channel: RpcChannel<Service, QuinnServerEndpoint<cloud_p2p::Request, cloud_p2p::Response>>,
request: authorize_new_device_in_sync_group::Request,
this_device: Device,
since: Instant,
}

type P2PServerEndpoint = QuinnServerEndpoint<cloud_p2p::Request, cloud_p2p::Response>;

impl Runner {
pub async fn new(
current_device_pub_id: devices::PubId,
Expand Down Expand Up @@ -152,10 +153,7 @@ impl Runner {
#[allow(clippy::large_enum_variant)]
enum StreamMessage {
AcceptResult(
Result<
Accepting<Service, QuinnServerEndpoint<Service>>,
RpcServerError<QuinnServerEndpoint<Service>>,
>,
Result<Accepting<Service, P2PServerEndpoint>, RpcServerError<P2PServerEndpoint>>,
),
Message(Message),
UserResponse(UserResponse),
Expand Down Expand Up @@ -361,7 +359,7 @@ impl Runner {
async fn handle_request(
&self,
request: cloud_p2p::Request,
channel: RpcChannel<Service, QuinnServerEndpoint<Service>>,
channel: RpcChannel<Service, P2PServerEndpoint>,
) {
match request {
cloud_p2p::Request::AuthorizeNewDeviceInSyncGroup(
Expand Down Expand Up @@ -598,7 +596,7 @@ impl Runner {
async fn connect_to_first_available_client(
endpoint: &Endpoint,
devices_in_group: &[(devices::PubId, NodeId)],
) -> Result<Client<QuinnConnection<Service>, Service>, CloudP2PError> {
) -> Result<Client<QuinnConnection<cloud_p2p::Response, cloud_p2p::Request>>, CloudP2PError> {
for (device_pub_id, device_connection_id) in devices_in_group {
if let Ok(connection) = endpoint
.connect(*device_connection_id, CloudP2PALPN::LATEST)
Expand All @@ -607,8 +605,9 @@ async fn connect_to_first_available_client(
|e| error!(?e, %device_pub_id, "Failed to connect to authorizor device candidate"),
) {
debug!(%device_pub_id, "Connected to authorizor device candidate");

return Ok(Client::new(RpcClient::new(
QuinnConnection::<Service>::from_connection(connection),
QuinnConnection::from_connection(connection),
)));
}
}
Expand All @@ -618,10 +617,7 @@ async fn connect_to_first_available_client(

fn setup_server_endpoint(
endpoint: Endpoint,
) -> (
RpcServer<Service, QuinnServerEndpoint<Service>>,
JoinHandle<()>,
) {
) -> (RpcServer<Service, P2PServerEndpoint>, JoinHandle<()>) {
let local_addr = {
let (ipv4_addr, maybe_ipv6_addr) = endpoint.bound_sockets();
// Trying to give preference to IPv6 addresses because it's 2024
Expand All @@ -631,7 +627,7 @@ fn setup_server_endpoint(
let (connections_tx, connections_rx) = flume::bounded(16);

(
RpcServer::new(QuinnServerEndpoint::<Service>::handle_connections(
RpcServer::new(QuinnServerEndpoint::handle_connections(
connections_rx,
local_addr,
)),
Expand Down
4 changes: 2 additions & 2 deletions core/crates/cloud-services/src/sync/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use sd_cloud_schema::{
groups,
messages::{pull, MessagesCollection},
},
Client, Service,
Client, Request, Response,
};
use sd_core_sync::{
cloud_crdt_op_db, CRDTOperation, CompressedCRDTOperationsPerModel, SyncManager,
Expand Down Expand Up @@ -49,7 +49,7 @@ pub struct Receiver {
sync_group_pub_id: groups::PubId,
device_pub_id: devices::PubId,
cloud_services: Arc<CloudServices>,
cloud_client: Client<QuinnConnection<Service>>,
cloud_client: Client<QuinnConnection<Response, Request>>,
key_manager: Arc<KeyManager>,
sync: SyncManager,
notifiers: Arc<ReceiveAndIngestNotifiers>,
Expand Down
4 changes: 2 additions & 2 deletions core/crates/cloud-services/src/sync/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use sd_cloud_schema::{
devices,
error::{ClientSideError, NotFoundError},
sync::{groups, messages},
Client, Service,
Client, Request, Response,
};
use sd_crypto::{
cloud::{OneShotEncryption, SecretKey, StreamEncryption},
Expand Down Expand Up @@ -60,7 +60,7 @@ pub struct Sender {
sync_group_pub_id: groups::PubId,
sync: SyncManager,
cloud_services: Arc<CloudServices>,
cloud_client: Client<QuinnConnection<Service>>,
cloud_client: Client<QuinnConnection<Response, Request>>,
key_manager: Arc<KeyManager>,
is_active: Arc<AtomicBool>,
state_notify: Arc<Notify>,
Expand Down
6 changes: 3 additions & 3 deletions core/src/api/cloud/devices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use sd_cloud_schema::{
ClientRegistration, ClientRegistrationFinishParameters, ClientRegistrationFinishResult,
ClientRegistrationStartResult,
},
Client, NodeId, Service, SpacedriveCipherSuite,
Client, NodeId, Request, Response, SpacedriveCipherSuite,
};
use sd_crypto::{cloud::secret_key::SecretKey, CryptoRng};

Expand Down Expand Up @@ -149,7 +149,7 @@ pub fn mount() -> AlphaRouter<Ctx> {
}

pub async fn hello(
client: &Client<QuinnConnection<Service>, Service>,
client: &Client<QuinnConnection<Response, Request>>,
access_token: AccessToken,
device_pub_id: PubId,
hashed_pub_id: Hash,
Expand Down Expand Up @@ -270,7 +270,7 @@ pub struct DeviceRegisterData {
}

pub async fn register(
client: &Client<QuinnConnection<Service>, Service>,
client: &Client<QuinnConnection<Response, Request>>,
access_token: AccessToken,
DeviceRegisterData {
pub_id,
Expand Down
12 changes: 9 additions & 3 deletions core/src/api/cloud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use sd_cloud_schema::{
auth,
error::{ClientSideError, Error},
sync::groups,
users, Client, SecretKey as IrohSecretKey, Service,
users, Client, Request, Response, SecretKey as IrohSecretKey,
};
use sd_crypto::{CryptoRng, SeedableRng};
use sd_utils::error::report_error;
Expand All @@ -32,7 +32,7 @@ mod sync_groups;

async fn try_get_cloud_services_client(
node: &Node,
) -> Result<Client<QuinnConnection<Service>, Service>, sd_core_cloud_services::Error> {
) -> Result<Client<QuinnConnection<Response, Request>>, sd_core_cloud_services::Error> {
node.cloud_services
.client()
.await
Expand Down Expand Up @@ -302,7 +302,13 @@ async fn initialize_cloud_sync(

async fn get_client_and_access_token(
node: &Node,
) -> Result<(Client<QuinnConnection<Service>, Service>, auth::AccessToken), rspc::Error> {
) -> Result<
(
Client<QuinnConnection<Response, Request>>,
auth::AccessToken,
),
rspc::Error,
> {
(
try_get_cloud_services_client(node),
node.cloud_services
Expand Down
Loading