diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 33f9fa3..f650c2e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -37,6 +37,7 @@ jobs: - name: Run Integration Tests env: - TEST_CREDENTIAL: ${{ secrets.TEST_CREDENTIAL }} + PASSWORD: ${{ secrets.PASSWORD }} + CLIENT_SECRET: ${{ secrets.CLIENT_SECRET }} run: ./test/expect diff --git a/.sqlx/query-933ade6d1ecef66bf6633f89c237a4f277800a52983f1931fe4960d5bdd53797.json b/.sqlx/query-933ade6d1ecef66bf6633f89c237a4f277800a52983f1931fe4960d5bdd53797.json new file mode 100644 index 0000000..6d1a63a --- /dev/null +++ b/.sqlx/query-933ade6d1ecef66bf6633f89c237a4f277800a52983f1931fe4960d5bdd53797.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO project (\n id,\n namespace,\n name,\n owner,\n status,\n billing_provider,\n billing_provider_id,\n billing_subscription_id,\n created_at,\n updated_at\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 10 + }, + "nullable": [] + }, + "hash": "933ade6d1ecef66bf6633f89c237a4f277800a52983f1931fe4960d5bdd53797" +} diff --git a/.sqlx/query-9a8cb6764b1166d31b7a24a348a4852cffaba306a490a0e67d5eab5fcbcd10c9.json b/.sqlx/query-9a8cb6764b1166d31b7a24a348a4852cffaba306a490a0e67d5eab5fcbcd10c9.json deleted file mode 100644 index 798fbe7..0000000 --- a/.sqlx/query-9a8cb6764b1166d31b7a24a348a4852cffaba306a490a0e67d5eab5fcbcd10c9.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n INSERT INTO project (\n id,\n namespace,\n name,\n owner,\n status,\n created_at,\n updated_at\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7)\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 7 - }, - "nullable": [] - }, - "hash": "9a8cb6764b1166d31b7a24a348a4852cffaba306a490a0e67d5eab5fcbcd10c9" -} diff --git a/Cargo.lock b/Cargo.lock index e8a8d7f..fcb3aa1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -479,7 +479,7 @@ dependencies = [ [[package]] name = "dmtri" version = "0.1.0" -source = "git+https://github.com/demeter-run/specs.git#2696af304ee6d7c83b9193a4f60f635b0d8f4384" +source = "git+https://github.com/demeter-run/specs.git#2c1636f131c4ec45df40d4697f8bc148280c1f7c" dependencies = [ "bytes", "pbjson", diff --git a/examples/config/rpc.toml b/examples/config/rpc.toml index a0884d5..7a641db 100644 --- a/examples/config/rpc.toml +++ b/examples/config/rpc.toml @@ -13,3 +13,7 @@ topic="events" [auth] url="https://txpipe.us.auth0.com" + +[stripe] +url = "https://api.stripe.com/v1" +api_key = "" diff --git a/src/bin/rpc.rs b/src/bin/rpc.rs index 5b9f83d..75a9277 100644 --- a/src/bin/rpc.rs +++ b/src/bin/rpc.rs @@ -37,11 +37,17 @@ struct Auth { url: String, } #[derive(Debug, Clone, Deserialize)] +struct Stripe { + url: String, + api_key: String, +} +#[derive(Debug, Clone, Deserialize)] struct Config { addr: String, db_path: String, crds_path: PathBuf, auth: Auth, + stripe: Stripe, secret: String, topic: String, kafka_producer: HashMap, @@ -69,6 +75,8 @@ impl From for GrpcConfig { db_path: value.db_path, crds_path: value.crds_path, auth_url: value.auth.url, + stripe_url: value.stripe.url, + stripe_api_key: value.stripe.api_key, secret: value.secret, kafka: value.kafka_producer, topic: value.topic, diff --git a/src/domain/auth/mod.rs b/src/domain/auth/mod.rs index 180be5a..91fc5d2 100644 --- a/src/domain/auth/mod.rs +++ b/src/domain/auth/mod.rs @@ -4,12 +4,19 @@ use super::{error::Error, project::cache::ProjectDrivenCache}; use crate::domain::Result; +#[async_trait::async_trait] +pub trait Auth0Driven: Send + Sync { + fn verify(&self, token: &str) -> Result; + async fn find_info(&self, token: &str) -> Result<(String, String)>; +} + pub type UserId = String; +pub type Token = String; pub type SecretId = String; #[derive(Debug, Clone)] pub enum Credential { - Auth0(UserId), + Auth0(UserId, Token), ApiKey(SecretId), } @@ -19,7 +26,7 @@ pub async fn assert_project_permission( project_id: &str, ) -> Result<()> { match credential { - Credential::Auth0(user_id) => { + Credential::Auth0(user_id, _) => { let result = project_cache .find_user_permission(user_id, project_id) .await?; diff --git a/src/domain/event/mod.rs b/src/domain/event/mod.rs index d672870..164816f 100644 --- a/src/domain/event/mod.rs +++ b/src/domain/event/mod.rs @@ -22,6 +22,10 @@ pub struct ProjectCreated { pub namespace: String, pub owner: String, pub status: String, + pub billing_provider: String, + pub billing_provider_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub billing_subscription_id: Option, pub created_at: DateTime, pub updated_at: DateTime, } @@ -30,7 +34,9 @@ into_event!(ProjectCreated); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ProjectUpdated { pub id: String, + #[serde(skip_serializing_if = "Option::is_none")] pub name: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub status: Option, pub updated_at: DateTime, } @@ -176,6 +182,9 @@ mod tests { namespace: "sonic-vegas".into(), owner: "user id".into(), status: ProjectStatus::Active.to_string(), + billing_provider: "stripe".into(), + billing_provider_id: "stripe id".into(), + billing_subscription_id: None, created_at: Utc::now(), updated_at: Utc::now(), } diff --git a/src/domain/project/command.rs b/src/domain/project/command.rs index 686c1f8..74d69c5 100644 --- a/src/domain/project/command.rs +++ b/src/domain/project/command.rs @@ -11,7 +11,7 @@ use tracing::{error, info}; use uuid::Uuid; use crate::domain::{ - auth::{Credential, UserId}, + auth::{Auth0Driven, Credential, Token, UserId}, error::Error, event::{ EventDrivenBridge, ProjectCreated, ProjectDeleted, ProjectSecretCreated, ProjectUpdated, @@ -20,10 +20,10 @@ use crate::domain::{ utils, Result, MAX_SECRET, PAGE_SIZE_DEFAULT, PAGE_SIZE_MAX, }; -use super::{cache::ProjectDrivenCache, Project, ProjectSecret}; +use super::{cache::ProjectDrivenCache, Project, ProjectSecret, StripeDriven}; pub async fn fetch(cache: Arc, cmd: FetchCmd) -> Result> { - let user_id = assert_credential(&cmd.credential)?; + let (user_id, _) = assert_credential(&cmd.credential)?; cache.find(&user_id, &cmd.page, &cmd.page_size).await } @@ -31,20 +31,28 @@ pub async fn fetch(cache: Arc, cmd: FetchCmd) -> Result< pub async fn create( cache: Arc, event: Arc, + auth0: Arc, + stripe: Arc, cmd: CreateCmd, ) -> Result<()> { - let user_id = assert_credential(&cmd.credential)?; + let (user_id, token) = assert_credential(&cmd.credential)?; if cache.find_by_namespace(&cmd.namespace).await?.is_some() { return Err(Error::CommandMalformed("invalid project namespace".into())); } + let (name, email) = auth0.find_info(&token).await?; + let billing_provider_id = stripe.create_customer(&name, &email).await?; + let evt = ProjectCreated { id: cmd.id, namespace: cmd.namespace.clone(), name: cmd.name, owner: user_id, status: ProjectStatus::Active.to_string(), + billing_provider: "stripe".into(), + billing_provider_id, + billing_subscription_id: None, created_at: Utc::now(), updated_at: Utc::now(), }; @@ -207,9 +215,9 @@ pub async fn verify_secret( Ok(secret) } -fn assert_credential(credential: &Credential) -> Result { +fn assert_credential(credential: &Credential) -> Result<(UserId, Token)> { match credential { - Credential::Auth0(user_id) => Ok(user_id.into()), + Credential::Auth0(user_id, token) => Ok((user_id.into(), token.into())), Credential::ApiKey(_) => Err(Error::Unauthorized( "project rpc doesnt support secret".into(), )), @@ -221,7 +229,7 @@ async fn assert_permission( project_id: &str, ) -> Result<()> { match credential { - Credential::Auth0(user_id) => { + Credential::Auth0(user_id, _) => { let result = cache.find_user_permission(user_id, project_id).await?; if result.is_none() { return Err(Error::Unauthorized("user doesnt have permission".into())); @@ -371,10 +379,29 @@ mod tests { } } + mock! { + pub FakeAuth0Driven { } + + #[async_trait::async_trait] + impl Auth0Driven for FakeAuth0Driven { + fn verify(&self, token: &str) -> Result; + async fn find_info(&self, token: &str) -> Result<(String, String)>; + } + } + + mock! { + pub FakeStripeDriven { } + + #[async_trait::async_trait] + impl StripeDriven for FakeStripeDriven { + async fn create_customer(&self, name: &str, email: &str) -> Result; + } + } + impl Default for FetchCmd { fn default() -> Self { Self { - credential: Credential::Auth0("user id".into()), + credential: Credential::Auth0("user id".into(), "token".into()), page: 1, page_size: 12, } @@ -383,7 +410,7 @@ mod tests { impl Default for CreateCmd { fn default() -> Self { Self { - credential: Credential::Auth0("user id".into()), + credential: Credential::Auth0("user id".into(), "token".into()), id: Uuid::new_v4().to_string(), name: "New Project".into(), namespace: "sonic-vegas".into(), @@ -393,7 +420,7 @@ mod tests { impl Default for UpdateCmd { fn default() -> Self { Self { - credential: Credential::Auth0("user id".into()), + credential: Credential::Auth0("user id".into(), "token".into()), id: Uuid::new_v4().to_string(), name: "Other name".into(), } @@ -402,7 +429,7 @@ mod tests { impl Default for CreateSecretCmd { fn default() -> Self { Self { - credential: Credential::Auth0("user id".into()), + credential: Credential::Auth0("user id".into(), "token".into()), id: Uuid::new_v4().to_string(), project_id: Uuid::new_v4().to_string(), name: "Key 1".into(), @@ -437,34 +464,30 @@ mod tests { let mut cache = MockFakeProjectDrivenCache::new(); cache.expect_find_by_namespace().return_once(|_| Ok(None)); - let mut event = MockFakeEventDrivenBridge::new(); - event.expect_dispatch().return_once(|_| Ok(())); - - let cmd = CreateCmd::default(); - - let result = create(Arc::new(cache), Arc::new(event), cmd).await; - assert!(result.is_ok()); - } + let mut auth0 = MockFakeAuth0Driven::new(); + auth0 + .expect_find_info() + .return_once(|_| Ok(("user name".into(), "user email".into()))); - #[tokio::test] - async fn it_should_update_project() { - let mut cache = MockFakeProjectDrivenCache::new(); - cache - .expect_find_user_permission() - .return_once(|_, _| Ok(Some(ProjectUser::default()))); - cache - .expect_find_by_id() - .return_once(|_| Ok(Some(Project::default()))); - cache - .expect_find_secret_by_project_id() - .return_once(|_| Ok(Vec::new())); + let mut stripe = MockFakeStripeDriven::new(); + stripe + .expect_create_customer() + .return_once(|_, _| Ok("stripe id".into())); let mut event = MockFakeEventDrivenBridge::new(); event.expect_dispatch().return_once(|_| Ok(())); - let cmd = UpdateCmd::default(); + let cmd = CreateCmd::default(); + + let result = create( + Arc::new(cache), + Arc::new(event), + Arc::new(auth0), + Arc::new(stripe), + cmd, + ) + .await; - let result = update(Arc::new(cache), Arc::new(event), cmd).await; assert!(result.is_ok()); } @@ -475,16 +498,28 @@ mod tests { .expect_find_by_namespace() .return_once(|_| Ok(Some(Project::default()))); + let auth0 = MockFakeAuth0Driven::new(); + let stripe = MockFakeStripeDriven::new(); let event = MockFakeEventDrivenBridge::new(); let cmd = CreateCmd::default(); - let result = create(Arc::new(cache), Arc::new(event), cmd).await; + let result = create( + Arc::new(cache), + Arc::new(event), + Arc::new(auth0), + Arc::new(stripe), + cmd, + ) + .await; + assert!(result.is_err()); } #[tokio::test] async fn it_should_fail_create_project_when_invalid_permission() { let cache = MockFakeProjectDrivenCache::new(); + let auth0 = MockFakeAuth0Driven::new(); + let stripe = MockFakeStripeDriven::new(); let event = MockFakeEventDrivenBridge::new(); let cmd = CreateCmd { @@ -492,10 +527,40 @@ mod tests { ..Default::default() }; - let result = create(Arc::new(cache), Arc::new(event), cmd).await; + let result = create( + Arc::new(cache), + Arc::new(event), + Arc::new(auth0), + Arc::new(stripe), + cmd, + ) + .await; + assert!(result.is_err()); } + #[tokio::test] + async fn it_should_update_project() { + let mut cache = MockFakeProjectDrivenCache::new(); + cache + .expect_find_user_permission() + .return_once(|_, _| Ok(Some(ProjectUser::default()))); + cache + .expect_find_by_id() + .return_once(|_| Ok(Some(Project::default()))); + cache + .expect_find_secret_by_project_id() + .return_once(|_| Ok(Vec::new())); + + let mut event = MockFakeEventDrivenBridge::new(); + event.expect_dispatch().return_once(|_| Ok(())); + + let cmd = UpdateCmd::default(); + + let result = update(Arc::new(cache), Arc::new(event), cmd).await; + assert!(result.is_ok()); + } + #[tokio::test] async fn it_should_create_project_secret() { let mut cache = MockFakeProjectDrivenCache::new(); diff --git a/src/domain/project/mod.rs b/src/domain/project/mod.rs index 1645ea7..96406c7 100644 --- a/src/domain/project/mod.rs +++ b/src/domain/project/mod.rs @@ -5,12 +5,18 @@ use chrono::{DateTime, Utc}; use super::{ error::Error, event::{ProjectCreated, ProjectSecretCreated, ProjectUpdated}, + Result, }; pub mod cache; pub mod cluster; pub mod command; +#[async_trait::async_trait] +pub trait StripeDriven: Send + Sync { + async fn create_customer(&self, name: &str, email: &str) -> Result; +} + #[derive(Debug, Clone)] pub struct Project { pub id: String, @@ -18,6 +24,9 @@ pub struct Project { pub namespace: String, pub owner: String, pub status: ProjectStatus, + pub billing_provider: String, + pub billing_provider_id: String, + pub billing_subscription_id: Option, pub created_at: DateTime, pub updated_at: DateTime, } @@ -31,6 +40,9 @@ impl TryFrom for Project { name: value.name, owner: value.owner, status: value.status.parse()?, + billing_provider: value.billing_provider, + billing_provider_id: value.billing_provider_id, + billing_subscription_id: value.billing_subscription_id, created_at: value.created_at, updated_at: value.updated_at, }) @@ -137,6 +149,9 @@ mod tests { namespace: "sonic-vegas".into(), owner: "user id".into(), status: ProjectStatus::Active, + billing_provider: "stripe".into(), + billing_provider_id: "stripe id".into(), + billing_subscription_id: None, created_at: Utc::now(), updated_at: Utc::now(), } diff --git a/src/domain/resource/command.rs b/src/domain/resource/command.rs index e58cb50..a6ee776 100644 --- a/src/domain/resource/command.rs +++ b/src/domain/resource/command.rs @@ -355,7 +355,7 @@ mod tests { impl Default for FetchCmd { fn default() -> Self { Self { - credential: Credential::Auth0("user id".into()), + credential: Credential::Auth0("user id".into(), "token".into()), project_id: Uuid::new_v4().to_string(), page: 1, page_size: 12, @@ -365,7 +365,7 @@ mod tests { impl Default for FetchByIdCmd { fn default() -> Self { Self { - credential: Credential::Auth0("user id".into()), + credential: Credential::Auth0("user id".into(), "token".into()), project_id: Uuid::new_v4().to_string(), resource_id: Uuid::new_v4().to_string(), } @@ -374,7 +374,7 @@ mod tests { impl Default for CreateCmd { fn default() -> Self { Self { - credential: Credential::Auth0("user id".into()), + credential: Credential::Auth0("user id".into(), "token".into()), id: Uuid::new_v4().to_string(), project_id: Uuid::new_v4().to_string(), kind: "CardanoNodePort".into(), @@ -385,7 +385,7 @@ mod tests { impl Default for DeleteCmd { fn default() -> Self { Self { - credential: Credential::Auth0("user id".into()), + credential: Credential::Auth0("user id".into(), "token".into()), resource_id: Uuid::new_v4().to_string(), project_id: Uuid::new_v4().to_string(), } diff --git a/src/domain/usage/command.rs b/src/domain/usage/command.rs index f24b524..a3be63e 100644 --- a/src/domain/usage/command.rs +++ b/src/domain/usage/command.rs @@ -95,7 +95,7 @@ mod tests { impl Default for FetchCmd { fn default() -> Self { Self { - credential: Credential::Auth0("user id".into()), + credential: Credential::Auth0("user id".into(), "token".into()), project_id: Uuid::new_v4().to_string(), page: 1, page_size: 12, diff --git a/src/driven/auth/mod.rs b/src/driven/auth/mod.rs deleted file mode 100644 index e315dc0..0000000 --- a/src/driven/auth/mod.rs +++ /dev/null @@ -1,55 +0,0 @@ -use anyhow::{bail, Result}; -use jsonwebtoken::jwk::{AlgorithmParameters, JwkSet}; -use jsonwebtoken::{decode, decode_header, DecodingKey, Validation}; -use serde::Deserialize; - -pub struct Auth0Provider { - jwks: JwkSet, -} -impl Auth0Provider { - pub async fn try_new(url: &str) -> Result { - let client = reqwest::Client::new(); - let jwks_request = client - .get(format!("{}/.well-known/jwks.json", url)) - .build()?; - - let jwks_response = client.execute(jwks_request).await?; - let jwks = jwks_response.json().await?; - - let auth_provider = Self { jwks }; - - Ok(auth_provider) - } - - pub fn verify(&self, token: &str) -> Result { - let header = decode_header(token)?; - - let Some(kid) = header.kid else { - bail!("token doesnt have a `kid` header field"); - }; - let Some(jwk) = self.jwks.find(&kid) else { - bail!("no matching jwk found for the given kid"); - }; - - let decoding_key = match &jwk.algorithm { - AlgorithmParameters::RSA(rsa) => DecodingKey::from_rsa_components(&rsa.n, &rsa.e)?, - _ => bail!("algorithm should be a RSA"), - }; - - let validation = { - let mut validation = Validation::new(header.alg); - validation.set_audience(&["demeter-api"]); - validation.validate_exp = true; - validation - }; - - let decoded_token = decode::(token, &decoding_key, &validation)?; - - Ok(decoded_token.claims.sub) - } -} - -#[derive(Deserialize)] -struct Claims { - sub: String, -} diff --git a/src/driven/auth0/mod.rs b/src/driven/auth0/mod.rs new file mode 100644 index 0000000..cd46cc8 --- /dev/null +++ b/src/driven/auth0/mod.rs @@ -0,0 +1,106 @@ +use anyhow::Result as AnyhowResult; +use jsonwebtoken::jwk::{AlgorithmParameters, JwkSet}; +use jsonwebtoken::{decode, decode_header, DecodingKey, Validation}; +use reqwest::header::{HeaderValue, AUTHORIZATION}; +use reqwest::Client; +use serde::Deserialize; +use tracing::error; + +use crate::domain::error::Error; +use crate::domain::{auth::Auth0Driven, Result}; + +pub struct Auth0DrivenImpl { + client: Client, + url: String, + jwks: JwkSet, +} +impl Auth0DrivenImpl { + pub async fn try_new(url: &str) -> AnyhowResult { + let client = Client::new(); + let url = url.to_string(); + + let jwks_request = client + .get(format!("{}/.well-known/jwks.json", url)) + .build()?; + + let jwks_response = client.execute(jwks_request).await?; + let jwks = jwks_response.json().await?; + + Ok(Self { client, url, jwks }) + } +} + +#[async_trait::async_trait] +impl Auth0Driven for Auth0DrivenImpl { + fn verify(&self, token: &str) -> Result { + let header = decode_header(token).map_err(|err| Error::Unexpected(err.to_string()))?; + + let Some(kid) = header.kid else { + return Err(Error::Unexpected( + "token doesnt have a `kid` header field".into(), + )); + }; + let Some(jwk) = self.jwks.find(&kid) else { + return Err(Error::Unexpected( + "no matching jwk found for the given kid".into(), + )); + }; + + let decoding_key = match &jwk.algorithm { + AlgorithmParameters::RSA(rsa) => DecodingKey::from_rsa_components(&rsa.n, &rsa.e) + .map_err(|err| Error::Unexpected(err.to_string())), + _ => Err(Error::Unexpected("algorithm should be a RSA".into())), + }?; + + let validation = { + let mut validation = Validation::new(header.alg); + validation.set_audience(&["demeter-api"]); + validation.validate_exp = true; + validation + }; + + let decoded_token = decode::(token, &decoding_key, &validation) + .map_err(|err| Error::Unexpected(err.to_string()))?; + + Ok(decoded_token.claims.sub) + } + + async fn find_info(&self, token: &str) -> Result<(String, String)> { + let response = self + .client + .get(format!("{}/userinfo", &self.url)) + .header( + AUTHORIZATION, + HeaderValue::from_str(&format!("Bearer {token}")).unwrap(), + ) + .send() + .await?; + + let status = response.status(); + if status.is_client_error() || status.is_server_error() { + error!( + status = status.to_string(), + "request status code fail to get auth0 user info" + ); + return Err(Error::Unexpected(format!( + "Auth0 request error to get user info. Status: {}", + status + ))); + } + + let profile: Profile = response.json().await?; + + Ok((profile.name, profile.email)) + } +} + +#[derive(Deserialize)] +struct Claims { + sub: String, +} + +#[derive(Deserialize)] +struct Profile { + name: String, + email: String, +} diff --git a/src/driven/cache/migrations/20240606_tables.sql b/src/driven/cache/migrations/20240606_tables.sql index f673adf..5639d3a 100644 --- a/src/driven/cache/migrations/20240606_tables.sql +++ b/src/driven/cache/migrations/20240606_tables.sql @@ -4,6 +4,9 @@ CREATE TABLE IF NOT EXISTS project ( name TEXT NOT NULL, owner TEXT NOT NULL, status TEXT NOT NULL, + billing_provider TEXT NOT NULL, + billing_provider_id TEXT NOT NULL, + billing_subscription_id TEXT, created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL ); @@ -36,3 +39,13 @@ CREATE TABLE IF NOT EXISTS project_secret ( created_at DATETIME NOT NULL, FOREIGN KEY(project_id) REFERENCES project(id) ); + +CREATE TABLE IF NOT EXISTS usage ( + id TEXT PRIMARY KEY NOT NULL, + event_id TEXT NOT NULL, + resource_id TEXT NOT NULL, + units INT NOT NULL, + tier TEXT NOT NULL, + created_at DATETIME NOT NULL, + FOREIGN KEY(resource_id) REFERENCES resource(id) +); diff --git a/src/driven/cache/migrations/20240821_table_usage.sql b/src/driven/cache/migrations/20240821_table_usage.sql deleted file mode 100644 index 61be093..0000000 --- a/src/driven/cache/migrations/20240821_table_usage.sql +++ /dev/null @@ -1,9 +0,0 @@ -CREATE TABLE IF NOT EXISTS usage ( - id TEXT PRIMARY KEY NOT NULL, - event_id TEXT NOT NULL, - resource_id TEXT NOT NULL, - units INT NOT NULL, - tier TEXT NOT NULL, - created_at DATETIME NOT NULL, - FOREIGN KEY(resource_id) REFERENCES resource(id) -); diff --git a/src/driven/cache/project.rs b/src/driven/cache/project.rs index 95577c4..ff4d48a 100644 --- a/src/driven/cache/project.rs +++ b/src/driven/cache/project.rs @@ -35,6 +35,9 @@ impl ProjectDrivenCache for SqliteProjectDrivenCache { p.name, p.owner, p.status, + p.billing_provider, + p.billing_provider_id, + p.billing_subscription_id, p.created_at, p.updated_at FROM project_user pu @@ -63,6 +66,9 @@ impl ProjectDrivenCache for SqliteProjectDrivenCache { p.name, p.owner, p.status, + p.billing_provider, + p.billing_provider_id, + p.billing_subscription_id, p.created_at, p.updated_at FROM project p @@ -85,6 +91,9 @@ impl ProjectDrivenCache for SqliteProjectDrivenCache { p.name, p.owner, p.status, + p.billing_provider, + p.billing_provider_id, + p.billing_subscription_id, p.created_at, p.updated_at FROM project p @@ -112,16 +121,22 @@ impl ProjectDrivenCache for SqliteProjectDrivenCache { name, owner, status, + billing_provider, + billing_provider_id, + billing_subscription_id, created_at, updated_at ) - VALUES ($1, $2, $3, $4, $5, $6, $7) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) "#, project.id, project.namespace, project.name, project.owner, status, + project.billing_provider, + project.billing_provider_id, + project.billing_subscription_id, project.created_at, project.updated_at ) @@ -322,6 +337,9 @@ impl FromRow<'_, SqliteRow> for Project { status: status .parse() .map_err(|err: Error| sqlx::Error::Decode(err.into()))?, + billing_provider: row.try_get("billing_provider")?, + billing_provider_id: row.try_get("billing_provider_id")?, + billing_subscription_id: row.try_get("billing_subscription_id")?, created_at: row.try_get("created_at")?, updated_at: row.try_get("updated_at")?, }) diff --git a/src/driven/mod.rs b/src/driven/mod.rs index a164a49..f36d993 100644 --- a/src/driven/mod.rs +++ b/src/driven/mod.rs @@ -1,6 +1,7 @@ -pub mod auth; +pub mod auth0; pub mod cache; pub mod k8s; pub mod kafka; pub mod metadata; pub mod prometheus; +pub mod stripe; diff --git a/src/driven/prometheus/mod.rs b/src/driven/prometheus/mod.rs index 6595b08..eeb77f8 100644 --- a/src/driven/prometheus/mod.rs +++ b/src/driven/prometheus/mod.rs @@ -1,4 +1,3 @@ -use anyhow::Result as AnyhowResult; use reqwest::Client; use serde::{Deserialize, Deserializer}; @@ -11,11 +10,11 @@ pub struct PrometheusUsageDriven { url: String, } impl PrometheusUsageDriven { - pub async fn new(url: &str) -> AnyhowResult { + pub fn new(url: &str) -> Self { let client = Client::new(); let url = url.to_string(); - Ok(Self { client, url }) + Self { client, url } } } diff --git a/src/driven/stripe/mod.rs b/src/driven/stripe/mod.rs new file mode 100644 index 0000000..016e08f --- /dev/null +++ b/src/driven/stripe/mod.rs @@ -0,0 +1,64 @@ +use std::collections::HashMap; + +use reqwest::Client; +use serde::Deserialize; +use tracing::error; + +use crate::domain::{error::Error, project::StripeDriven, Result}; + +pub struct StripeDrivenImpl { + client: Client, + url: String, + api_key: String, +} +impl StripeDrivenImpl { + pub fn new(url: &str, api_key: &str) -> Self { + let client = Client::new(); + let url = url.to_string(); + let api_key = api_key.to_string(); + + Self { + client, + url, + api_key, + } + } +} + +#[async_trait::async_trait] +impl StripeDriven for StripeDrivenImpl { + async fn create_customer(&self, name: &str, email: &str) -> Result { + let mut params = HashMap::new(); + params.insert("name", name); + params.insert("email", email); + + let response = self + .client + .post(format!("{}/customers", &self.url)) + .basic_auth(&self.api_key, Some("")) + .form(¶ms) + .send() + .await?; + + let status = response.status(); + if status.is_client_error() || status.is_server_error() { + error!( + status = status.to_string(), + "request status code fail to create stripe customer" + ); + return Err(Error::Unexpected(format!( + "stripe create customer request error. Status: {}", + status + ))); + } + + let customer: StripeCustomer = response.json().await?; + + Ok(customer.id) + } +} + +#[derive(Deserialize)] +struct StripeCustomer { + id: String, +} diff --git a/src/drivers/grpc/middlewares/auth.rs b/src/drivers/grpc/middlewares/auth.rs index 210032f..d829687 100644 --- a/src/drivers/grpc/middlewares/auth.rs +++ b/src/drivers/grpc/middlewares/auth.rs @@ -1,20 +1,17 @@ use std::sync::Arc; -use crate::{ - domain::{ - auth::Credential, - project::{self, cache::ProjectDrivenCache}, - }, - driven::auth::Auth0Provider, +use crate::domain::{ + auth::{Auth0Driven, Credential}, + project::{self, cache::ProjectDrivenCache}, }; #[derive(Clone)] pub struct AuthenticatorImpl { - auth0: Arc, + auth0: Arc, cache: Arc, } impl AuthenticatorImpl { - pub fn new(auth0: Arc, cache: Arc) -> Self { + pub fn new(auth0: Arc, cache: Arc) -> Self { Self { auth0, cache } } } @@ -37,7 +34,7 @@ impl tonic::service::Interceptor for AuthenticatorImpl { let token = token.replace("Bearer ", ""); return match self.auth0.verify(&token) { Ok(user_id) => { - let credential = Credential::Auth0(user_id); + let credential = Credential::Auth0(user_id, token); request.extensions_mut().insert(credential); Ok(request) } diff --git a/src/drivers/grpc/mod.rs b/src/drivers/grpc/mod.rs index d590871..b443470 100644 --- a/src/drivers/grpc/mod.rs +++ b/src/drivers/grpc/mod.rs @@ -15,13 +15,14 @@ use tracing::{error, info}; use dmtri::demeter::ops::v1alpha::project_service_server::ProjectServiceServer; use crate::domain::error::Error; -use crate::driven::auth::Auth0Provider; +use crate::driven::auth0::Auth0DrivenImpl; use crate::driven::cache::project::SqliteProjectDrivenCache; use crate::driven::cache::resource::SqliteResourceDrivenCache; use crate::driven::cache::usage::SqliteUsageDrivenCache; use crate::driven::cache::SqliteCache; use crate::driven::kafka::KafkaProducer; use crate::driven::metadata::MetadataCrd; +use crate::driven::stripe::StripeDrivenImpl; mod metadata; mod middlewares; @@ -39,23 +40,29 @@ pub async fn server(config: GrpcConfig) -> Result<()> { let metadata = Arc::new(MetadataCrd::new(&config.crds_path)?); + let auth0 = Arc::new(Auth0DrivenImpl::try_new(&config.auth_url).await?); + let stripe = Arc::new(StripeDrivenImpl::new( + &config.stripe_url, + &config.stripe_api_key, + )); + let reflection = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(dmtri::demeter::ops::v1alpha::FILE_DESCRIPTOR_SET) .register_encoded_file_descriptor_set(protoc_wkt::google::protobuf::FILE_DESCRIPTOR_SET) .build() .unwrap(); - let auth = AuthenticatorImpl::new( - Arc::new(Auth0Provider::try_new(&config.auth_url).await?), - project_cache.clone(), - ); + let auth_interceptor = AuthenticatorImpl::new(auth0.clone(), project_cache.clone()); let project_inner = project::ProjectServiceImpl::new( project_cache.clone(), event_bridge.clone(), + auth0.clone(), + stripe.clone(), config.secret.clone(), ); - let project_service = ProjectServiceServer::with_interceptor(project_inner, auth.clone()); + let project_service = + ProjectServiceServer::with_interceptor(project_inner, auth_interceptor.clone()); let resource_inner = resource::ResourceServiceImpl::new( project_cache.clone(), @@ -63,13 +70,14 @@ pub async fn server(config: GrpcConfig) -> Result<()> { event_bridge.clone(), metadata.clone(), ); - let resource_service = ResourceServiceServer::with_interceptor(resource_inner, auth.clone()); + let resource_service = + ResourceServiceServer::with_interceptor(resource_inner, auth_interceptor.clone()); let metadata_inner = metadata::MetadataServiceImpl::new(metadata.clone()); let metadata_service = MetadataServiceServer::new(metadata_inner); let usage_inner = usage::UsageServiceImpl::new(project_cache.clone(), usage_cache.clone()); - let usage_service = UsageServiceServer::with_interceptor(usage_inner, auth.clone()); + let usage_service = UsageServiceServer::with_interceptor(usage_inner, auth_interceptor.clone()); let address = SocketAddr::from_str(&config.addr)?; @@ -92,6 +100,8 @@ pub struct GrpcConfig { pub db_path: String, pub crds_path: PathBuf, pub auth_url: String, + pub stripe_url: String, + pub stripe_api_key: String, pub secret: String, pub topic: String, pub kafka: HashMap, diff --git a/src/drivers/grpc/project.rs b/src/drivers/grpc/project.rs index 2984a33..80f4b7b 100644 --- a/src/drivers/grpc/project.rs +++ b/src/drivers/grpc/project.rs @@ -5,14 +5,16 @@ use tracing::error; use uuid::Uuid; use crate::domain::{ - auth::Credential, + auth::{Auth0Driven, Credential}, event::EventDrivenBridge, - project::{self, cache::ProjectDrivenCache, Project}, + project::{self, cache::ProjectDrivenCache, Project, StripeDriven}, }; pub struct ProjectServiceImpl { pub cache: Arc, pub event: Arc, + pub auth0: Arc, + pub stripe: Arc, pub secret: String, } @@ -20,11 +22,15 @@ impl ProjectServiceImpl { pub fn new( cache: Arc, event: Arc, + auth0: Arc, + stripe: Arc, secret: String, ) -> Self { Self { cache, event, + auth0, + stripe, secret, } } @@ -66,7 +72,14 @@ impl proto::project_service_server::ProjectService for ProjectServiceImpl { let cmd = project::command::CreateCmd::new(credential, req.name); - project::command::create(self.cache.clone(), self.event.clone(), cmd.clone()).await?; + project::command::create( + self.cache.clone(), + self.event.clone(), + self.auth0.clone(), + self.stripe.clone(), + cmd.clone(), + ) + .await?; let message = proto::CreateProjectResponse { id: cmd.id, @@ -178,52 +191,6 @@ impl proto::project_service_server::ProjectService for ProjectServiceImpl { Ok(tonic::Response::new(message)) } - async fn create_project_payment( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - let _credential = match request.extensions().get::() { - Some(credential) => credential.clone(), - None => return Err(Status::unauthenticated("invalid credential")), - }; - - let req = request.into_inner(); - - let message = proto::CreateProjectPaymentResponse { - id: Uuid::new_v4().to_string(), - project_id: req.project_id, - provider: "stripe".into(), - provider_id: "provider id".into(), - subscription_id: Some("subscription id".into()), - }; - - Ok(tonic::Response::new(message)) - } - async fn fetch_project_payment( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - let _credential = match request.extensions().get::() { - Some(credential) => credential.clone(), - None => return Err(Status::unauthenticated("invalid credential")), - }; - - let req = request.into_inner(); - - let message = proto::FetchProjectPaymentResponse { - records: vec![proto::ProjectPayment { - id: Uuid::new_v4().to_string(), - project_id: req.project_id, - provider: "stripe".into(), - provider_id: "provider id".into(), - subscription_id: Some("subscription id".into()), - ..Default::default() - }], - }; - - Ok(tonic::Response::new(message)) - } - async fn create_project_invite( &self, request: tonic::Request, @@ -271,6 +238,9 @@ impl From for proto::Project { name: value.name, status: value.status.to_string(), namespace: value.namespace, + billing_provider: value.billing_provider, + billing_provider_id: value.billing_provider_id, + billing_subscription_id: value.billing_subscription_id, created_at: value.created_at.to_rfc3339(), updated_at: value.updated_at.to_rfc3339(), } diff --git a/src/drivers/usage/mod.rs b/src/drivers/usage/mod.rs index a4a64f8..5a251b3 100644 --- a/src/drivers/usage/mod.rs +++ b/src/drivers/usage/mod.rs @@ -11,7 +11,7 @@ use crate::{ }; pub async fn schedule(config: UsageConfig) -> Result<()> { - let prometheus_driven = Arc::new(PrometheusUsageDriven::new(&config.prometheus_url).await?); + let prometheus_driven = Arc::new(PrometheusUsageDriven::new(&config.prometheus_url)); let event_bridge = Arc::new(KafkaProducer::new(&config.topic, &config.kafka)?); let mut cursor = Utc::now(); diff --git a/test/kafka.manifest.yaml b/test/dependences.manifest.yaml similarity index 71% rename from test/kafka.manifest.yaml rename to test/dependences.manifest.yaml index 2682937..12acaac 100644 --- a/test/kafka.manifest.yaml +++ b/test/dependences.manifest.yaml @@ -73,3 +73,52 @@ spec: - name: REDPANDA_BROKERS value: "redpanda.demeter-kafka.svc.cluster.local:19092" restartPolicy: OnFailure +--- +# Fake API +apiVersion: v1 +kind: Namespace +metadata: + name: demeter-mock +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: fake-api + namespace: demeter-mock + labels: + app: fake-api +spec: + selector: + matchLabels: + app: fake-api + strategy: + type: Recreate + template: + metadata: + labels: + app: fake-api + spec: + containers: + - name: fake-api + image: paulobressan/fake-api:latest + ports: + - containerPort: 80 + env: + - name: PORT + value: "80" +--- +apiVersion: v1 +kind: Service +metadata: + name: api + namespace: demeter-mock + labels: + app: api +spec: + selector: + app: fake-api + type: ClusterIP + ports: + - port: 80 + targetPort: 80 + protocol: TCP diff --git a/test/expect b/test/expect index c8a1a71..2c62928 100755 --- a/test/expect +++ b/test/expect @@ -3,6 +3,7 @@ RPC_IMAGE="rpc:1.0" DAEMON_IMAGE="daemon:1.0" CLUSTER_NAME="k8scluster" +MOCK_NAMESPACE="demeter-mock" KAFKA_NAMESPACE="demeter-kafka" FABRIC_NAMESPACE="demeter-rpc" DAEMON_NAMESPACE="demeter-daemon" @@ -53,9 +54,10 @@ wait_for_pods() { build_and_load_image $RPC_IMAGE docker/dockerfile.rpc build_and_load_image $DAEMON_IMAGE docker/dockerfile.daemon -# Apply Kafka manifest -kubectl apply -f ./test/kafka.manifest.yaml +# Apply Dependences manifest +kubectl apply -f ./test/dependences.manifest.yaml wait_for_pods $KAFKA_NAMESPACE "Kafka" +wait_for_pods $MOCK_NAMESPACE "Mock" # Apply Fabric manifest kubectl apply -f ./test/fabric.manifest.yaml @@ -68,7 +70,16 @@ tar -zxvf "./grpcurl_${GRPCURL_VERSION}_linux_x86_64.tar.gz" grpcurl # Get Auth0 access token echo "Getting Auth0 access token" -TOKEN=$(curl --silent --request POST --url $AUTH0_URL --header 'content-type: application/json' --data "$TEST_CREDENTIAL" | jq -r '.access_token') +TOKEN=$(curl --location $AUTH0_URL \ +--header 'content-type: application/x-www-form-urlencoded' \ +--data-urlencode "grant_type=password" \ +--data-urlencode "username=githubci@txpipe.io" \ +--data-urlencode "audience=demeter-api" \ +--data-urlencode "scope=profile openid email" \ +--data-urlencode "client_id=f6y19wTU92tkVAasM5VubeEOsDSES56X" \ +--data-urlencode "password=$PASSWORD" \ +--data-urlencode "client_secret=$CLIENT_SECRET" | jq -r '.access_token') + if [ -z "$TOKEN" ]; then echo "Error: Failed to get Auth0 access token" exit 1 diff --git a/test/fabric.manifest.yaml b/test/fabric.manifest.yaml index 4aa4317..057f2a6 100644 --- a/test/fabric.manifest.yaml +++ b/test/fabric.manifest.yaml @@ -111,6 +111,10 @@ data: [auth] url="https://dev-dflg0ssi.us.auth0.com" + + [stripe] + url = "http://api.demeter-mock.svc.cluster.local/stripe" + api_key = "test" --- apiVersion: v1 kind: ConfigMap