Skip to content

Commit

Permalink
feat: grpc driver project proto implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan committed Jun 12, 2024
1 parent f3b202d commit abae63f
Show file tree
Hide file tree
Showing 11 changed files with 246 additions and 23 deletions.
134 changes: 134 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ tracing = "0.1.40"
tracing-subscriber = {version = "0.3.18", features = ["env-filter"]}
rand = "0.8.5"
dotenv = "0.15.0"
prost = "0.12.6"
tonic-reflection = "0.11.0"
futures = "0.3.30"

[[bin]]
name = "daemon"
Expand All @@ -31,3 +34,5 @@ path = "src/lib.rs"
[dev-dependencies]
mockall = "0.12.1"

[build-dependencies]
tonic-build = "0.11.0"
4 changes: 4 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/v1alpha/project.proto")?;
Ok(())
}
16 changes: 16 additions & 0 deletions proto/v1alpha/project.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";

package fabric.project.v1alpha;

message CreateProjectRequest {
string name = 1;
}

message CreateProjectResponse {
string name = 1;
string slug = 2;
}

service ProjectService {
rpc CreateProject(CreateProjectRequest) returns (CreateProjectResponse);
}
9 changes: 6 additions & 3 deletions src/bin/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use dotenv::dotenv;
use tracing::Level;
use tracing::{info, Level};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

#[tokio::main]
Expand All @@ -17,9 +17,12 @@ async fn main() -> Result<()> {
.with(env_filter)
.init();

fabric::drivers::grpc::server().await?;
let grpc_driver = tokio::spawn(async { fabric::drivers::grpc::server().await });

fabric::drivers::event::subscribe().await?;
let event_driver = tokio::spawn(async { fabric::drivers::event::subscribe().await });

info!("rpc services running");
let _result = futures::future::join(grpc_driver, event_driver).await;

Ok(())
}
2 changes: 1 addition & 1 deletion src/domain/management/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ pub enum Event {
}

#[async_trait::async_trait]
pub trait EventBridge {
pub trait EventBridge: Send + Sync {
async fn dispatch(&self, event: Event) -> Result<()>;
}
2 changes: 1 addition & 1 deletion src/domain/management/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl From<Namespace> for Project {
}

#[async_trait::async_trait]
pub trait ProjectCache {
pub trait ProjectCache: Send + Sync {
async fn create(&self, project: &Project) -> Result<()>;
async fn find_by_slug(&self, slug: &str) -> Result<Option<Project>>;
}
Expand Down
3 changes: 0 additions & 3 deletions src/drivers/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use kafka::{
consumer::Consumer,
};
use std::{path::Path, sync::Arc};
use tracing::info;

use crate::{
domain::management::{events::Event, project::create_cache},
Expand All @@ -27,8 +26,6 @@ pub async fn subscribe() -> Result<()> {
.with_offset_storage(Some(GroupOffsetStorage::Kafka))
.create()?;

info!("Event Driver started listening");

loop {
let mss = consumer.poll()?;
if mss.is_empty() {
Expand Down
33 changes: 19 additions & 14 deletions src/drivers/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,36 @@
use anyhow::Result;
use rand::distributions::Alphanumeric;
use rand::Rng;
use std::net::SocketAddr;
use std::str::FromStr;
use std::{path::Path, sync::Arc};
use tonic::transport::Server;

use crate::domain::management;
use crate::domain::management::project::Project;
use crate::driven::cache::{project::SqliteProjectCache, SqliteCache};
use crate::driven::kafka::KafkaEventBridge;

mod project;

pub mod proto {
pub mod project {
tonic::include_proto!("fabric.project.v1alpha");
}
}

pub async fn server() -> Result<()> {
let sqlite_cache = Arc::new(SqliteCache::new(Path::new("dev.db")).await?);
let project_cache = Arc::new(SqliteProjectCache::new(sqlite_cache));

let event_bridge = Arc::new(KafkaEventBridge::new(&["localhost:9092".into()], "events")?);

let slug: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(7)
.map(char::from)
.collect();
let project_inner = project::ProjectServiceImpl::new(project_cache, event_bridge);
let project_service =
proto::project::project_service_server::ProjectServiceServer::new(project_inner);

let project = Project {
name: format!("test name {slug}"),
slug,
};
let address = SocketAddr::from_str("0.0.0.0:5000")?;

management::project::create(project_cache, event_bridge, project).await?;
Server::builder()
.add_service(project_service)
.serve(address)
.await?;

Ok(())
}
Loading

0 comments on commit abae63f

Please sign in to comment.