diff --git a/Cargo.lock b/Cargo.lock index b5838d3..8be03bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -379,14 +379,18 @@ dependencies = [ "anyhow", "async-trait", "dotenv", + "futures", "kafka", "mockall", + "prost", "rand", "serde", "serde_json", "sqlx", "tokio", "tonic", + "tonic-build", + "tonic-reflection", "tracing", "tracing-subscriber", ] @@ -397,6 +401,12 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.0.30" @@ -454,6 +464,21 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -498,6 +523,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -516,8 +552,10 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -923,6 +961,12 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "multimap" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" + [[package]] name = "nom" version = "7.1.3" @@ -1103,6 +1147,16 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.2.6", +] + [[package]] name = "pin-project" version = "1.1.5" @@ -1194,6 +1248,16 @@ dependencies = [ "termtree", ] +[[package]] +name = "prettyplease" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" +dependencies = [ + "proc-macro2", + "syn 2.0.66", +] + [[package]] name = "proc-macro2" version = "1.0.85" @@ -1210,6 +1274,50 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" dependencies = [ "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" +dependencies = [ + "bytes", + "heck", + "itertools", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.66", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.66", +] + +[[package]] +name = "prost-types" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" +dependencies = [ + "prost", ] [[package]] @@ -1969,6 +2077,32 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic-build" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4ef6dd70a610078cb4e338a0f79d06bc759ff1b22d2120c2ff02ae264ba9c2" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn 2.0.66", +] + +[[package]] +name = "tonic-reflection" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "548c227bd5c0fae5925812c4ec6c66ffcfced23ea370cb823f4d18f0fc1cb6a7" +dependencies = [ + "prost", + "prost-types", + "tokio", + "tokio-stream", + "tonic", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/Cargo.toml b/Cargo.toml index 159eab0..87a571e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,9 @@ tracing = "0.1.40" tracing-subscriber = {version = "0.3.18", features = ["env-filter"]} rand = "0.8.5" dotenv = "0.15.0" +prost = "0.12.6" +tonic-reflection = "0.11.0" +futures = "0.3.30" [[bin]] name = "daemon" @@ -31,3 +34,5 @@ path = "src/lib.rs" [dev-dependencies] mockall = "0.12.1" +[build-dependencies] +tonic-build = "0.11.0" diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..a3c7c0e --- /dev/null +++ b/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("proto/v1alpha/project.proto")?; + Ok(()) +} diff --git a/proto/v1alpha/project.proto b/proto/v1alpha/project.proto new file mode 100644 index 0000000..de80e2f --- /dev/null +++ b/proto/v1alpha/project.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package fabric.project.v1alpha; + +message CreateProjectRequest { + string name = 1; +} + +message CreateProjectResponse { + string name = 1; + string slug = 2; +} + +service ProjectService { + rpc CreateProject(CreateProjectRequest) returns (CreateProjectResponse); +} diff --git a/src/bin/rpc.rs b/src/bin/rpc.rs index ae779ab..8b90df1 100644 --- a/src/bin/rpc.rs +++ b/src/bin/rpc.rs @@ -1,6 +1,6 @@ use anyhow::Result; use dotenv::dotenv; -use tracing::Level; +use tracing::{info, Level}; use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; #[tokio::main] @@ -17,9 +17,12 @@ async fn main() -> Result<()> { .with(env_filter) .init(); - fabric::drivers::grpc::server().await?; + let grpc_driver = tokio::spawn(async { fabric::drivers::grpc::server().await }); - fabric::drivers::event::subscribe().await?; + let event_driver = tokio::spawn(async { fabric::drivers::event::subscribe().await }); + + info!("rpc services running"); + let _result = futures::future::join(grpc_driver, event_driver).await; Ok(()) } diff --git a/src/domain/management/events.rs b/src/domain/management/events.rs index ddb78a0..887f564 100644 --- a/src/domain/management/events.rs +++ b/src/domain/management/events.rs @@ -14,6 +14,6 @@ pub enum Event { } #[async_trait::async_trait] -pub trait EventBridge { +pub trait EventBridge: Send + Sync { async fn dispatch(&self, event: Event) -> Result<()>; } diff --git a/src/domain/management/project.rs b/src/domain/management/project.rs index fd65ee1..06c4ca0 100644 --- a/src/domain/management/project.rs +++ b/src/domain/management/project.rs @@ -43,7 +43,7 @@ impl From for Project { } #[async_trait::async_trait] -pub trait ProjectCache { +pub trait ProjectCache: Send + Sync { async fn create(&self, project: &Project) -> Result<()>; async fn find_by_slug(&self, slug: &str) -> Result>; } diff --git a/src/drivers/event/mod.rs b/src/drivers/event/mod.rs index 977a0f9..5fef0f6 100644 --- a/src/drivers/event/mod.rs +++ b/src/drivers/event/mod.rs @@ -4,7 +4,6 @@ use kafka::{ consumer::Consumer, }; use std::{path::Path, sync::Arc}; -use tracing::info; use crate::{ domain::management::{events::Event, project::create_cache}, @@ -27,8 +26,6 @@ pub async fn subscribe() -> Result<()> { .with_offset_storage(Some(GroupOffsetStorage::Kafka)) .create()?; - info!("Event Driver started listening"); - loop { let mss = consumer.poll()?; if mss.is_empty() { diff --git a/src/drivers/grpc/mod.rs b/src/drivers/grpc/mod.rs index b2093a6..29537e5 100644 --- a/src/drivers/grpc/mod.rs +++ b/src/drivers/grpc/mod.rs @@ -1,31 +1,36 @@ use anyhow::Result; -use rand::distributions::Alphanumeric; -use rand::Rng; +use std::net::SocketAddr; +use std::str::FromStr; use std::{path::Path, sync::Arc}; +use tonic::transport::Server; -use crate::domain::management; -use crate::domain::management::project::Project; use crate::driven::cache::{project::SqliteProjectCache, SqliteCache}; use crate::driven::kafka::KafkaEventBridge; +mod project; + +pub mod proto { + pub mod project { + tonic::include_proto!("fabric.project.v1alpha"); + } +} + pub async fn server() -> Result<()> { let sqlite_cache = Arc::new(SqliteCache::new(Path::new("dev.db")).await?); let project_cache = Arc::new(SqliteProjectCache::new(sqlite_cache)); let event_bridge = Arc::new(KafkaEventBridge::new(&["localhost:9092".into()], "events")?); - let slug: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(7) - .map(char::from) - .collect(); + let project_inner = project::ProjectServiceImpl::new(project_cache, event_bridge); + let project_service = + proto::project::project_service_server::ProjectServiceServer::new(project_inner); - let project = Project { - name: format!("test name {slug}"), - slug, - }; + let address = SocketAddr::from_str("0.0.0.0:5000")?; - management::project::create(project_cache, event_bridge, project).await?; + Server::builder() + .add_service(project_service) + .serve(address) + .await?; Ok(()) } diff --git a/src/drivers/grpc/project.rs b/src/drivers/grpc/project.rs new file mode 100644 index 0000000..6fd1ff7 --- /dev/null +++ b/src/drivers/grpc/project.rs @@ -0,0 +1,59 @@ +use rand::{distributions::Alphanumeric, Rng}; +use std::sync::Arc; +use tonic::{async_trait, Request, Response, Status}; + +use crate::domain::management::{ + self, + events::EventBridge, + project::{Project, ProjectCache}, +}; + +use super::proto::project::{ + project_service_server::ProjectService, CreateProjectRequest, CreateProjectResponse, +}; + +pub struct ProjectServiceImpl { + pub cache: Arc, + pub event: Arc, +} + +impl ProjectServiceImpl { + pub fn new(cache: Arc, event: Arc) -> Self { + Self { cache, event } + } +} + +#[async_trait] +impl ProjectService for ProjectServiceImpl { + async fn create_project( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + let slug: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(7) + .map(char::from) + .collect(); + + let project = Project { + name: req.name, + slug, + }; + + let result = + management::project::create(self.cache.clone(), self.event.clone(), project.clone()) + .await; + + if let Err(err) = result { + return Err(Status::failed_precondition(err.to_string())); + } + + let message = CreateProjectResponse { + name: project.name, + slug: project.slug, + }; + Ok(Response::new(message)) + } +} diff --git a/src/drivers/mod.rs b/src/drivers/mod.rs index eff9694..29d9af3 100644 --- a/src/drivers/mod.rs +++ b/src/drivers/mod.rs @@ -1,2 +1,2 @@ -pub mod grpc; pub mod event; +pub mod grpc;