diff --git a/Cargo.lock b/Cargo.lock index 4065a5064f..5a389fb387 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -265,6 +265,7 @@ dependencies = [ "build-list-for-env", "cdn-namespace-domain-create", "chirp-client", + "chirp-workflow", "chrono", "cloud-namespace-token-development-create", "cloud-namespace-token-public-create", @@ -313,9 +314,8 @@ dependencies = [ "upload-complete", "upload-get", "url", - "user-get", + "user", "user-identity-get", - "user-team-list", "uuid", ] @@ -360,11 +360,7 @@ dependencies = [ "tracing-subscriber", "url", "user", - "user-get", - "user-identity-create", "user-identity-get", - "user-resolve-email", - "user-token-create", "uuid", ] @@ -425,6 +421,7 @@ dependencies = [ "cdn-site-list-for-game", "cf-custom-hostname-list-for-namespace-id", "chirp-client", + "chirp-workflow", "chrono", "cloud-device-link-create", "cloud-game-config-create", @@ -511,10 +508,8 @@ dependencies = [ "upload-get", "upload-prepare", "url", - "user-get", + "user", "user-identity-create", - "user-identity-get", - "user-team-list", "uuid", ] @@ -530,6 +525,7 @@ dependencies = [ "build-list-for-env", "cdn-namespace-domain-create", "chirp-client", + "chirp-workflow", "chrono", "cloud-namespace-token-development-create", "cloud-namespace-token-public-create", @@ -572,9 +568,8 @@ dependencies = [ "upload-complete", "upload-get", "url", - "user-get", + "user", "user-identity-get", - "user-team-list", "uuid", ] @@ -585,6 +580,7 @@ dependencies = [ "api-helper", "async-trait", "chirp-client", + "chirp-workflow", "chrono", "faker-user", "futures-util", @@ -626,10 +622,7 @@ dependencies = [ "tracing-subscriber", "upload-prepare", "url", - "user-get", - "user-identity-get", - "user-team-list", - "user-token-create", + "user", "uuid", ] @@ -696,6 +689,7 @@ dependencies = [ "cdn-namespace-domain-create", "cdn-namespace-get", "chirp-client", + "chirp-workflow", "cloud-namespace-token-development-create", "faker-build", "faker-game", @@ -743,11 +737,7 @@ dependencies = [ "tracing-subscriber", "upload-prepare", "url", - "user-avatar-upload-complete", - "user-get", - "user-identity-get", - "user-pending-delete-toggle", - "user-profile-validate", + "user", "uuid", ] @@ -914,6 +904,7 @@ dependencies = [ "api-helper", "async-trait", "chirp-client", + "chirp-workflow", "chrono", "faker-user", "futures-util", @@ -946,9 +937,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", - "user-get", - "user-team-list", - "user-token-create", + "user", "uuid", ] @@ -5899,9 +5888,10 @@ version = "25.1.0-rc.1" dependencies = [ "chirp-client", "chirp-worker", + "chirp-workflow", "prost 0.10.4", "rivet-operation", - "user-get", + "user", ] [[package]] @@ -8340,6 +8330,7 @@ version = "25.1.0-rc.1" dependencies = [ "chirp-client", "chirp-worker", + "chirp-workflow", "faker-game", "faker-team", "reqwest 0.11.27", @@ -8353,7 +8344,7 @@ dependencies = [ "tracing", "tracing-logfmt", "tracing-subscriber", - "user-identity-create", + "user", ] [[package]] @@ -8448,6 +8439,7 @@ version = "25.1.0-rc.1" dependencies = [ "chirp-client", "chirp-worker", + "chirp-workflow", "faker-game", "faker-team", "reqwest 0.11.27", @@ -8461,7 +8453,7 @@ dependencies = [ "tracing", "tracing-logfmt", "tracing-subscriber", - "user-identity-create", + "user", ] [[package]] @@ -9177,7 +9169,7 @@ dependencies = [ "upload-complete", "upload-get", "upload-prepare", - "user-identity-create", + "user", ] [[package]] @@ -11704,6 +11696,7 @@ version = "25.1.0-rc.1" dependencies = [ "cdn-namespace-get", "chirp-client", + "chirp-workflow", "chrono", "cluster", "game-get", @@ -11734,9 +11727,7 @@ dependencies = [ "team-validate", "tier", "types-proto", - "user-get", - "user-identity-get", - "user-team-list", + "user", ] [[package]] @@ -12343,6 +12334,7 @@ version = "25.1.0-rc.1" dependencies = [ "bit-vec", "chirp-client", + "chirp-workflow", "heck 0.3.3", "http 0.2.12", "ip-info", @@ -12353,7 +12345,7 @@ dependencies = [ "serde", "serde_json", "strum 0.24.1", - "user-identity-get", + "user", "uuid", ] @@ -15888,9 +15880,21 @@ version = "25.1.0-rc.1" dependencies = [ "chirp-workflow", "cluster", + "email-address-parser", + "faker-user", "linode", + "rand", + "reqwest 0.11.27", "rivet-config", + "rivet-operation", + "serde", "server-spec", + "sqlx", + "token-create", + "upload-complete", + "upload-file-list", + "upload-get", + "upload-prepare", ] [[package]] @@ -16006,11 +16010,12 @@ version = "25.1.0-rc.1" dependencies = [ "chirp-client", "chirp-worker", + "chirp-workflow", "faker-user", "prost 0.10.4", "rivet-operation", "sqlx", - "user-get", + "user", ] [[package]] @@ -16056,6 +16061,7 @@ version = "25.1.0-rc.1" dependencies = [ "chirp-client", "chirp-worker", + "chirp-workflow", "chrono", "faker-user", "game-get", @@ -16071,10 +16077,7 @@ dependencies = [ "upload-get", "upload-list-for-user", "upload-prepare", - "user-get", - "user-identity-delete", - "user-profile-validate", - "user-team-list", + "user", ] [[package]] diff --git a/packages/api/actor/Cargo.toml b/packages/api/actor/Cargo.toml index 3a18e44134..98cfab5fbf 100644 --- a/packages/api/actor/Cargo.toml +++ b/packages/api/actor/Cargo.toml @@ -8,6 +8,7 @@ edition.workspace = true [dependencies] api-helper.workspace = true chirp-client.workspace = true +chirp-workflow.workspace = true rivet-operation.workspace = true chrono = "0.4" http = "0.2" @@ -53,9 +54,8 @@ token-create.workspace = true token-revoke.workspace = true upload-complete.workspace = true upload-get.workspace = true -user-get.workspace = true +user.workspace = true user-identity-get.workspace = true -user-team-list.workspace = true [dev-dependencies] rivet-connection.workspace = true diff --git a/packages/api/actor/src/auth.rs b/packages/api/actor/src/auth.rs index 506f319ffc..fc38a9cea4 100644 --- a/packages/api/actor/src/auth.rs +++ b/packages/api/actor/src/auth.rs @@ -157,15 +157,21 @@ impl Auth { } else if let Ok(user_ent) = claims.as_user() { // Get the user let (user_res, game_res, team_list_res) = tokio::try_join!( - op!([ctx] user_get { - user_ids: vec![user_ent.user_id.into()], - }), + chirp_workflow::compat::op( + &ctx, + ::user::ops::get::Input { + user_ids: vec![user_ent.user_id], + }, + ), op!([ctx] game_get { game_ids: vec![game_id.into()], }), - op!([ctx] user_team_list { - user_ids: vec![user_ent.user_id.into()], - }), + chirp_workflow::compat::op( + &ctx, + user::ops::team_list::Input { + user_ids: vec![user_ent.user_id], + }, + ), )?; let Some(user) = user_res.users.first() else { bail_with!(TOKEN_REVOKED) @@ -186,8 +192,7 @@ impl Auth { let is_part_of_team = user_teams .teams .iter() - .filter_map(|x| x.team_id) - .any(|x| x.as_uuid() == dev_team_id); + .any(|x| x.team_id == dev_team_id); ensure_with!(is_part_of_team, GROUP_NOT_MEMBER); // Get team diff --git a/packages/api/auth/Cargo.toml b/packages/api/auth/Cargo.toml index 3578124a54..da6b9f76fb 100644 --- a/packages/api/auth/Cargo.toml +++ b/packages/api/auth/Cargo.toml @@ -38,10 +38,6 @@ tokio = { version = "1.40" } tracing = "0.1" tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "json", "ansi"] } url = "2.2.2" -user-get.workspace = true -user-identity-create.workspace = true -user-resolve-email.workspace = true -user-token-create.workspace = true user.workspace = true uuid = { version = "1", features = ["v4"] } @@ -50,7 +46,6 @@ rivet-auth.workspace = true rivet-connection.workspace = true faker-user.workspace = true -user-token-create.workspace = true debug-email-res.workspace = true user-identity-get.workspace = true diff --git a/packages/api/auth/src/auth.rs b/packages/api/auth/src/auth.rs index 30225b8df0..c82045d618 100644 --- a/packages/api/auth/src/auth.rs +++ b/packages/api/auth/src/auth.rs @@ -51,9 +51,12 @@ impl Auth { let claims = self.claims()?; let user_ent = claims.as_user()?; - let user_res = op!([ctx] user_get { - user_ids: vec![user_ent.user_id.into()], - }) + let user_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::get::Input { + user_ids: vec![user_ent.user_id], + }, + ) .await?; let Some(user) = user_res.users.first() else { bail_with!(TOKEN_REVOKED) diff --git a/packages/api/auth/src/route/identity.rs b/packages/api/auth/src/route/identity.rs index 89cca03fad..629a99edfc 100644 --- a/packages/api/auth/src/route/identity.rs +++ b/packages/api/auth/src/route/identity.rs @@ -97,7 +97,7 @@ pub async fn complete( return Ok(models::AuthIdentityCompleteEmailVerificationResponse { status }); } - let email_res = op!([ctx] user_resolve_email { + let email_res = (*ctx).op(::user::ops::resolve_email::Input { emails: vec![res.email.clone()], }) .await?; @@ -106,13 +106,13 @@ pub async fn complete( if let Some(new_user) = email_res.users.first() { tracing::info!(email = %new_user.email, "resolved email"); - let new_user_id = unwrap_ref!(new_user.user_id).as_uuid(); + let new_user_id = new_user.user_id; tracing::info!(old_user_id = %user_ent.user_id, %new_user_id, "identity found, switching user"); - let token_res = op!([ctx] user_token_create { - user_id: Some(new_user_id.into()), - client: Some(ctx.client_info()), + let token_res = (*ctx).op(::user::ops::token_create::Input { + user_id: new_user_id, + client: ctx.client_info(), }) .await?; @@ -130,15 +130,15 @@ pub async fn complete( else { tracing::info!(user_id = %user_ent.user_id, "creating new identity for guest"); - op!([ctx] user_identity_create { - user_id: Some(Into::into(user_ent.user_id)), - identity: Some(backend::user_identity::Identity { + (*ctx).op(::user::ops::identity::create::Input { + user_id: user_ent.user_id, + identity: backend::user_identity::Identity { kind: Some(backend::user_identity::identity::Kind::Email( backend::user_identity::identity::Email { email: res.email.clone(), } )) - }) + } }) .await?; diff --git a/packages/api/auth/src/route/tokens.rs b/packages/api/auth/src/route/tokens.rs index 87e58a1f9b..68a9dc0589 100644 --- a/packages/api/auth/src/route/tokens.rs +++ b/packages/api/auth/src/route/tokens.rs @@ -11,7 +11,7 @@ use crate::{ utils::{delete_refresh_token_header, refresh_token_header}, }; -// Also see user-token-create/src/main.rs +// Also see user/src/ops/token_create.rs pub const TOKEN_TTL: i64 = util::duration::minutes(15); pub const REFRESH_TOKEN_TTL: i64 = util::duration::days(90); @@ -137,8 +137,8 @@ pub async fn identity( // Verify user is not deleted if has_refresh_token { - let user_res = op!([ctx] user_get { - user_ids: vec![user_ent.user_id.into()], + let user_res = (*ctx).op(::user::ops::get::Input { + user_ids: vec![user_ent.user_id], }) .await?; let user = unwrap!(user_res.users.first()); @@ -205,10 +205,13 @@ async fn fallback_user( }; // Generate token - let token_res = op!([ctx] user_token_create { - user_id: Some(user_id.into()), - client: Some(client_info), - }) + let token_res = chirp_workflow::compat::op( + ctx, + ::user::ops::token_create::Input { + user_id: user_id, + client: client_info, + }, + ) .await?; Ok((token_res.token.clone(), token_res.refresh_token)) diff --git a/packages/api/cloud/Cargo.toml b/packages/api/cloud/Cargo.toml index 1b4748f45f..4f9bbb82df 100644 --- a/packages/api/cloud/Cargo.toml +++ b/packages/api/cloud/Cargo.toml @@ -11,6 +11,7 @@ async-trait = "0.1" base64 = "0.13" bytes = "1.0" chirp-client.workspace = true +chirp-workflow.workspace = true chrono = "0.4" futures-util = "0.3" http = "0.2" @@ -107,9 +108,7 @@ upload-complete.workspace = true upload-file-list.workspace = true upload-get.workspace = true upload-prepare.workspace = true -user-get.workspace = true -user-identity-get.workspace = true -user-team-list.workspace = true +user.workspace = true rivet-config.workspace = true rivet-env.workspace = true diff --git a/packages/api/cloud/src/assert.rs b/packages/api/cloud/src/assert.rs index 009b938784..aa5bfeaffd 100644 --- a/packages/api/cloud/src/assert.rs +++ b/packages/api/cloud/src/assert.rs @@ -7,9 +7,12 @@ use crate::auth::Auth; /// Validates that a given user ID is registered. pub async fn user_registered(ctx: &OperationContext<()>, user_id: Uuid) -> GlobalResult<()> { // If the user has at least one identity they are considered registered - let identity = op!([ctx] user_identity_get { - user_ids: vec![user_id.into()] - }) + let identity = chirp_workflow::compat::op( + &ctx, + ::user::ops::identity::get::Input { + user_ids: vec![user_id] + }, + ) .await?; let identities = &unwrap_ref!(identity.users.first()).identities; diff --git a/packages/api/cloud/src/auth.rs b/packages/api/cloud/src/auth.rs index 5b538f4874..772e35f94a 100644 --- a/packages/api/cloud/src/auth.rs +++ b/packages/api/cloud/src/auth.rs @@ -60,9 +60,12 @@ impl Auth { let claims = self.claims()?; let user_ent = claims.as_user()?; - let user_res = op!([ctx] user_get { - user_ids: vec![user_ent.user_id.into()], - }) + let user_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::get::Input { + user_ids: vec![user_ent.user_id], + }, + ) .await?; let Some(user) = user_res.users.first() else { bail_with!(TOKEN_REVOKED) @@ -94,16 +97,19 @@ impl Auth { let (user, user_ent) = self.user(ctx).await?; assert::user_registered(ctx, user_ent.user_id).await?; - let team_list_res = op!([ctx] user_team_list { - user_ids: vec![user_ent.user_id.into()], - }) + let team_list_res = chirp_workflow::compat::op( + &ctx, + user::ops::team_list::Input { + user_ids: vec![user_ent.user_id.into()], + }, + ) .await?; let user_teams = unwrap!(team_list_res.users.first()); let user_team_ids = user_teams .teams .iter() - .map(|t| Ok(unwrap_ref!(t.team_id).as_uuid())) + .map(|t| Ok(t.team_id)) .collect::>>()?; let has_teams = team_ids .iter() @@ -271,24 +277,26 @@ impl Auth { let (_, user_ent) = self.user(ctx).await?; // Fetch teams associated with user - let teams_res = op!([ctx] user_team_list { - user_ids: vec![user_ent.user_id.into()], - }) + let teams_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::team_list::Input { + user_ids: vec![user_ent.user_id.into()], + }, + ) .await?; let user = unwrap!(teams_res.users.first()); - let team_ids_proto = user + let team_ids = user .teams .iter() - .filter_map(|t| t.team_id) - .collect::>(); - let team_ids = team_ids_proto - .iter() - .map(common::Uuid::as_uuid) + .map(|t| t.team_id) .collect::>(); // Fetch games associated with teams let games_res = op!([ctx] game_list_for_team { - team_ids: team_ids_proto, + team_ids: team_ids + .iter() + .map(|id| (*id).into()) + .collect::>() }) .await?; diff --git a/packages/api/games/Cargo.toml b/packages/api/games/Cargo.toml index fe585e41af..77a274cdf1 100644 --- a/packages/api/games/Cargo.toml +++ b/packages/api/games/Cargo.toml @@ -8,6 +8,7 @@ edition.workspace = true [dependencies] api-helper.workspace = true chirp-client.workspace = true +chirp-workflow.workspace = true rivet-operation.workspace = true chrono = "0.4" http = "0.2" @@ -45,9 +46,8 @@ token-revoke.workspace = true token-create.workspace = true upload-complete.workspace = true upload-get.workspace = true -user-get.workspace = true +user.workspace = true user-identity-get.workspace = true -user-team-list.workspace = true rivet-config.workspace = true rivet-env.workspace = true diff --git a/packages/api/games/src/auth.rs b/packages/api/games/src/auth.rs index e593eae80e..aeef00de9e 100644 --- a/packages/api/games/src/auth.rs +++ b/packages/api/games/src/auth.rs @@ -97,15 +97,21 @@ impl Auth { } else if let Ok(user_ent) = claims.as_user() { // Get the user let (user_res, game_res, team_list_res) = tokio::try_join!( - op!([ctx] user_get { - user_ids: vec![user_ent.user_id.into()], - }), + chirp_workflow::compat::op( + &ctx, + ::user::ops::get::Input { + user_ids: vec![user_ent.user_id], + }, + ), op!([ctx] game_get { game_ids: vec![game_id.into()], }), - op!([ctx] user_team_list { - user_ids: vec![user_ent.user_id.into()], - }), + chirp_workflow::compat::op( + &ctx, + ::user::ops::team_list::Input { + user_ids: vec![user_ent.user_id.into()], + }, + ) )?; let Some(user) = user_res.users.first() else { bail_with!(TOKEN_REVOKED) @@ -126,8 +132,7 @@ impl Auth { let is_part_of_team = user_teams .teams .iter() - .filter_map(|x| x.team_id) - .any(|x| x.as_uuid() == dev_team_id); + .any(|x| x.team_id == dev_team_id); ensure_with!(is_part_of_team, GROUP_NOT_MEMBER); // Get team diff --git a/packages/api/group/Cargo.toml b/packages/api/group/Cargo.toml index 8ad2bdcc35..911e6ff7e3 100644 --- a/packages/api/group/Cargo.toml +++ b/packages/api/group/Cargo.toml @@ -10,6 +10,7 @@ rivet-convert.workspace = true api-helper.workspace = true async-trait = "0.1" chirp-client.workspace = true +chirp-workflow.workspace = true rivet-operation.workspace = true chrono = "0.4" futures-util = "0.3" @@ -50,9 +51,7 @@ team-user-ban-get.workspace = true team-user-ban-list.workspace = true token-revoke.workspace = true upload-prepare.workspace = true -user-get.workspace = true -user-identity-get.workspace = true -user-team-list.workspace = true +user.workspace = true rivet-config.workspace = true rivet-env.workspace = true @@ -62,4 +61,3 @@ rivet-group.workspace = true rand = "0.8" faker-user.workspace = true -user-token-create.workspace = true diff --git a/packages/api/group/src/assert.rs b/packages/api/group/src/assert.rs index 96fa848992..0635d2afa9 100644 --- a/packages/api/group/src/assert.rs +++ b/packages/api/group/src/assert.rs @@ -6,8 +6,8 @@ use crate::auth::Auth; /// Validates that a given user ID is registered. pub async fn user_registered(ctx: &Ctx, user_id: Uuid) -> GlobalResult<()> { // If the user has at least one identity they are considered registered - let identity = op!([ctx] user_identity_get { - user_ids: vec![user_id.into()] + let identity = (*ctx).op(::user::ops::identity::get::Input { + user_ids: vec![user_id] }) .await?; diff --git a/packages/api/group/src/auth.rs b/packages/api/group/src/auth.rs index 83416b5431..2c17b3d507 100644 --- a/packages/api/group/src/auth.rs +++ b/packages/api/group/src/auth.rs @@ -56,9 +56,12 @@ impl Auth { let claims = self.claims()?; let user_ent = claims.as_user()?; - let user_res = op!([ctx] user_get { - user_ids: vec![user_ent.user_id.into()], - }) + let user_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::get::Input { + user_ids: vec![user_ent.user_id], + }, + ) .await?; let Some(user) = user_res.users.first() else { bail_with!(TOKEN_REVOKED) @@ -86,9 +89,12 @@ impl Auth { let (_, user_ent) = self.user(ctx).await?; // Get user - let user_res = op!([ctx] user_get { - user_ids: vec![user_ent.user_id.into()] - }) + let user_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::get::Input { + user_ids: vec![user_ent.user_id], + }, + ) .await?; let user = unwrap!(user_res.users.first(), "user not found"); diff --git a/packages/api/group/src/fetch/group.rs b/packages/api/group/src/fetch/group.rs index 8d879909a4..1069b81106 100644 --- a/packages/api/group/src/fetch/group.rs +++ b/packages/api/group/src/fetch/group.rs @@ -17,9 +17,11 @@ pub async fn summaries( // Fetch team metadata let (user_team_list_res, teams_res, team_member_count_res) = tokio::try_join!( - op!([ctx] user_team_list { - user_ids: vec![current_user_id.into()], - }), + (*ctx).op( + ::user::ops::team_list::Input { + user_ids: vec![current_user_id.into()], + } + ), op!([ctx] team_get { team_ids: group_ids_proto.clone(), }), @@ -34,7 +36,9 @@ pub async fn summaries( .teams .iter() .map(|team| { - let is_current_identity_member = user_teams.iter().any(|t| t.team_id == team.team_id); + let is_current_identity_member = user_teams + .iter() + .any(|t| Some(common::Uuid::from(t.team_id)) == team.team_id); convert::group::summary( ctx.config(), diff --git a/packages/api/group/src/fetch/identity.rs b/packages/api/group/src/fetch/identity.rs index 01e4a74714..0327e37c91 100644 --- a/packages/api/group/src/fetch/identity.rs +++ b/packages/api/group/src/fetch/identity.rs @@ -9,9 +9,9 @@ use crate::{auth::Auth, convert}; pub async fn users( ctx: &Ctx, - user_ids: Vec, -) -> GlobalResult { - op!([ctx] user_get { + user_ids: Vec, +) -> GlobalResult<::user::ops::get::Output> { + (*ctx).op(::user::ops::get::Input { user_ids: user_ids, }) .await diff --git a/packages/api/group/src/route/groups.rs b/packages/api/group/src/route/groups.rs index 789f477818..09704c25a5 100644 --- a/packages/api/group/src/route/groups.rs +++ b/packages/api/group/src/route/groups.rs @@ -194,16 +194,18 @@ pub async fn members( let (user, user_ent) = ctx.auth().user(ctx.op_ctx()).await?; // Check if user is a member of this team - let team_list_res = op!([ctx] user_team_list { - user_ids: vec![user_ent.user_id.into()], - }) + let team_list_res = (*ctx).op( + ::user::ops::team_list::Input { + user_ids: vec![user_ent.user_id.into()], + } + ) .await?; let user_team = unwrap!(team_list_res.users.first()); let user_team_ids = user_team .teams .iter() - .map(|t| Ok(unwrap_ref!(t.team_id).as_uuid())) + .map(|t| Ok(t.team_id)) .collect::>>()?; let has_team = user_team_ids.iter().any(|team_id| &group_id == team_id); @@ -285,7 +287,13 @@ pub async fn members( // NOTE: We don't use fetch::identities::handles here because the end model is `GroupMember` not `IdentityHandle` // Fetch team member and join request data - let users = fetch::identity::users(&ctx, user_ids.clone()).await?; + let users = fetch::identity::users( + &ctx, + user_ids + .iter() + .map(|u| u.as_uuid()) + .collect::>() + ).await?; let raw_user_ent_id = Into::::into(user_ent.user_id); let members = users @@ -422,7 +430,13 @@ pub async fn join_requests( // NOTE: We don't use fetch::identities::handles here because the end model is `GroupMember` not // `IdentityHandle` // Fetch team member and join request data - let users = fetch::identity::users(&ctx, user_ids.clone()).await?; + let users = fetch::identity::users( + &ctx, + user_ids + .iter() + .map(|u| u.as_uuid()) + .collect::>() + ).await?; let raw_user_ent_id = Into::::into(user_ent.user_id); let join_requests = users @@ -1246,7 +1260,13 @@ pub async fn bans( // NOTE: We don't use fetch::identities::handles here because the end model is `BannedIdentity` not // `IdentityHandle` // Fetch team member and ban data - let users = fetch::identity::users(&ctx, user_ids.clone()).await?; + let users = fetch::identity::users( + &ctx, + user_ids + .iter() + .map(|u| u.as_uuid()) + .collect::>() + ).await?; let raw_user_ent_id = Into::::into(user_ent.user_id); let banned_identities = users diff --git a/packages/api/group/tests/basic.rs b/packages/api/group/tests/basic.rs index 778d75d050..f039638697 100644 --- a/packages/api/group/tests/basic.rs +++ b/packages/api/group/tests/basic.rs @@ -59,13 +59,16 @@ impl Ctx { let user_res = op!([ctx] faker_user {}).await.unwrap(); let user_id = user_res.user_id.as_ref().unwrap().as_uuid(); - let token_res = op!([ctx] user_token_create { - user_id: user_res.user_id, - client: Some(backend::net::ClientInfo { - user_agent: Some(USER_AGENT.into()), - remote_address: Some(socket_addr().to_string()), - }) - }) + let token_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::token_create::Input { + user_id: user_id, + client: backend::net::ClientInfo { + user_agent: Some(USER_AGENT.into()), + remote_address: Some(socket_addr().to_string()), + } + } + ) .await .unwrap(); diff --git a/packages/api/identity/Cargo.toml b/packages/api/identity/Cargo.toml index be42386f2a..216a97bf22 100644 --- a/packages/api/identity/Cargo.toml +++ b/packages/api/identity/Cargo.toml @@ -9,6 +9,7 @@ edition.workspace = true rivet-convert.workspace = true api-helper.workspace = true async-trait = "0.1" +chirp-workflow.workspace = true chirp-client.workspace = true rivet-operation.workspace = true futures-util = "0.3" @@ -50,11 +51,7 @@ token-create.workspace = true token-get.workspace = true token-revoke.workspace = true upload-prepare.workspace = true -user-avatar-upload-complete.workspace = true -user-get.workspace = true -user-identity-get.workspace = true -user-pending-delete-toggle.workspace = true -user-profile-validate.workspace = true +user.workspace = true rivet-config.workspace = true rivet-env.workspace = true diff --git a/packages/api/identity/src/assert.rs b/packages/api/identity/src/assert.rs index 96fa848992..0635d2afa9 100644 --- a/packages/api/identity/src/assert.rs +++ b/packages/api/identity/src/assert.rs @@ -6,8 +6,8 @@ use crate::auth::Auth; /// Validates that a given user ID is registered. pub async fn user_registered(ctx: &Ctx, user_id: Uuid) -> GlobalResult<()> { // If the user has at least one identity they are considered registered - let identity = op!([ctx] user_identity_get { - user_ids: vec![user_id.into()] + let identity = (*ctx).op(::user::ops::identity::get::Input { + user_ids: vec![user_id] }) .await?; diff --git a/packages/api/identity/src/auth.rs b/packages/api/identity/src/auth.rs index 507117cd63..62308c24d6 100644 --- a/packages/api/identity/src/auth.rs +++ b/packages/api/identity/src/auth.rs @@ -116,9 +116,12 @@ impl Auth { let claims = self.claims()?; let user_ent = claims.as_user()?; - let user_res = op!([ctx] user_get { - user_ids: vec![user_ent.user_id.into()], - }) + let user_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::get::Input { + user_ids: vec![user_ent.user_id], + }, + ) .await?; let Some(user) = user_res.users.first() else { bail_with!(TOKEN_REVOKED) @@ -143,9 +146,12 @@ impl Auth { let user_ent = self.user(ctx).await?; // Get user - let user_res = op!([ctx] user_get { - user_ids: vec![user_ent.user_id.into()] - }) + let user_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::get::Input { + user_ids: vec![user_ent.user_id], + }, + ) .await?; let user = unwrap!(user_res.users.first(), "user not found"); diff --git a/packages/api/identity/src/route/identities.rs b/packages/api/identity/src/route/identities.rs index ddecbb5ec2..68092451a9 100644 --- a/packages/api/identity/src/route/identities.rs +++ b/packages/api/identity/src/route/identities.rs @@ -217,13 +217,12 @@ pub async fn validate_profile( "invalid parameter account_number`" ); - let res = op!([ctx] user_profile_validate { - user_id: Some(user_ent.user_id.into()), + let res = (*ctx).op(::user::ops::profile_validate::Input { + user_id: user_ent.user_id, display_name: body.display_name.clone(), - account_number: - body.account_number - .map(|n| n.api_try_into()) - .transpose()?, + account_number: body.account_number + .map(|n| n.api_try_into()) + .transpose()?, bio: body.bio.clone(), }) .await?; @@ -291,9 +290,9 @@ pub async fn complete_avatar_upload( ) -> GlobalResult { let user_ent = ctx.auth().user(ctx.op_ctx()).await?; - op!([ctx] user_avatar_upload_complete { - user_id: Some(user_ent.user_id.into()), - upload_id: Some(upload_id.into()), + (*ctx).op(::user::ops::avatar_upload_complete::Input { + user_id: user_ent.user_id, + upload_id: upload_id, }) .await?; @@ -307,8 +306,8 @@ pub async fn mark_deletion( ) -> GlobalResult { let user_ent = ctx.auth().user(ctx.op_ctx()).await?; - op!([ctx] user_pending_delete_toggle { - user_id: Some(user_ent.user_id.into()), + (*ctx).op(::user::ops::pending_delete_toggle::Input { + user_id: user_ent.user_id, active: true, }) .await?; @@ -320,8 +319,8 @@ pub async fn mark_deletion( pub async fn unmark_deletion(ctx: Ctx) -> GlobalResult { let user_ent = ctx.auth().user(ctx.op_ctx()).await?; - op!([ctx] user_pending_delete_toggle { - user_id: Some(user_ent.user_id.into()), + (*ctx).op(::user::ops::pending_delete_toggle::Input { + user_id: user_ent.user_id, active: false, }) .await?; diff --git a/packages/api/portal/Cargo.toml b/packages/api/portal/Cargo.toml index ccc62491fd..2559c9838d 100644 --- a/packages/api/portal/Cargo.toml +++ b/packages/api/portal/Cargo.toml @@ -10,6 +10,7 @@ rivet-convert.workspace = true api-helper.workspace = true async-trait = "0.1" chirp-client.workspace = true +chirp-workflow.workspace = true rivet-operation.workspace = true chrono = "0.4" futures-util = "0.3" @@ -41,8 +42,7 @@ game-resolve-name-id.workspace = true team-get.workspace = true team-member-count.workspace = true token-revoke.workspace = true -user-get.workspace = true -user-team-list.workspace = true +user.workspace = true rivet-config.workspace = true rivet-env.workspace = true @@ -52,4 +52,3 @@ rivet-portal.workspace = true regex = "1.4" faker-user.workspace = true -user-token-create.workspace = true diff --git a/packages/api/portal/src/auth.rs b/packages/api/portal/src/auth.rs index b52f74a370..84ff88dddf 100644 --- a/packages/api/portal/src/auth.rs +++ b/packages/api/portal/src/auth.rs @@ -53,9 +53,12 @@ impl Auth { let claims = self.claims()?; let user_ent = claims.as_user()?; - let user_res = op!([ctx] user_get { - user_ids: vec![user_ent.user_id.into()], - }) + let user_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::get::Input { + user_ids: vec![user_ent.user_id], + }, + ) .await?; let Some(user) = user_res.users.first() else { bail_with!(TOKEN_REVOKED) @@ -80,9 +83,12 @@ impl Auth { let user_ent = self.user(ctx).await?; // Get user - let user_res = op!([ctx] user_get { - user_ids: vec![user_ent.user_id.into()] - }) + let user_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::get::Input { + user_ids: vec![user_ent.user_id], + }, + ) .await?; let user = unwrap!(user_res.users.first(), "user not found"); diff --git a/packages/api/portal/src/build.rs b/packages/api/portal/src/build.rs index ffe5d7b519..bed56ac22b 100644 --- a/packages/api/portal/src/build.rs +++ b/packages/api/portal/src/build.rs @@ -13,9 +13,11 @@ pub async fn group_summaries( ) -> GlobalResult> { // Fetch team metadata let (user_team_list_res, teams_res, team_member_count_res) = tokio::try_join!( - op!([ctx] user_team_list { - user_ids: vec![current_user_id], - }), + (*ctx).op( + ::user::ops::team_list::Input { + user_ids: vec![current_user_id.into()], + } + ), op!([ctx] team_get { team_ids: group_ids.to_vec(), }), @@ -35,7 +37,7 @@ pub async fn group_summaries( let is_current_user_member = unwrap!(user_team_list_res.users.first()) .teams .iter() - .any(|team| team.team_id.as_ref() == Some(group_id)); + .any(|team| common::Uuid::from(team.team_id) == (*group_id)); let member_count = unwrap!(team_member_count_res .teams .iter() diff --git a/packages/api/portal/tests/basic.rs b/packages/api/portal/tests/basic.rs index f8657fc082..6e55f00144 100644 --- a/packages/api/portal/tests/basic.rs +++ b/packages/api/portal/tests/basic.rs @@ -58,13 +58,16 @@ impl Ctx { let user_res = op!([ctx] faker_user {}).await.unwrap(); let user_id = user_res.user_id.as_ref().unwrap().as_uuid(); - let token_res = op!([ctx] user_token_create { - user_id: user_res.user_id, - client: Some(backend::net::ClientInfo { - user_agent: Some(USER_AGENT.into()), - remote_address: Some(socket_addr().to_string()), - }) - }) + let token_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::token_create::Input { + user_id: user_id, + client: backend::net::ClientInfo { + user_agent: Some(USER_AGENT.into()), + remote_address: Some(socket_addr().to_string()), + } + } + ) .await .unwrap(); diff --git a/packages/common/convert/Cargo.toml b/packages/common/convert/Cargo.toml index 89f5ac4eb4..00bcbc66fe 100644 --- a/packages/common/convert/Cargo.toml +++ b/packages/common/convert/Cargo.toml @@ -7,6 +7,7 @@ edition.workspace = true [dependencies] chirp-client.workspace = true +chirp-workflow.workspace = true rivet-operation.workspace = true serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -35,9 +36,7 @@ team-member-count.workspace = true team-profile-validate.workspace = true team-validate.workspace = true tier.workspace = true -user-get.workspace = true -user-identity-get.workspace = true -user-team-list.workspace = true +user.workspace = true rivet-api.workspace = true rivet-group-server.workspace = true diff --git a/packages/common/convert/src/convert/identity.rs b/packages/common/convert/src/convert/identity.rs index 1d3f276a16..48c3731a6a 100644 --- a/packages/common/convert/src/convert/identity.rs +++ b/packages/common/convert/src/convert/identity.rs @@ -1,6 +1,6 @@ use rivet_api::models; use rivet_operation::prelude::*; -use types_proto::rivet::backend::{self, pkg::*}; +use types_proto::rivet::backend::{self}; use crate::{convert, fetch, ApiTryInto}; @@ -26,7 +26,7 @@ pub fn handle( pub fn summary( config: &rivet_config::Config, - current_user_id: Uuid, + _current_user_id: Uuid, user: &backend::user::User, ) -> GlobalResult { let user_id_proto = unwrap!(user.user_id); @@ -51,7 +51,7 @@ pub fn summary( #[derive(Debug)] pub struct ProfileCtx<'a> { pub teams_ctx: &'a fetch::identity::TeamsCtx, - pub linked_accounts: &'a [user_identity::get::response::User], + pub linked_accounts: &'a [::user::ops::identity::get::User], pub self_is_game_linked: bool, } @@ -69,7 +69,7 @@ pub fn profile( let identities = unwrap!(pctx .linked_accounts .iter() - .find(|identity| identity.user_id == user.user_id)); + .find(|identity| identity.user_id == user_id)); let identities = &identities.identities; // If the user has at least one identity they are considered registered let is_registered = !identities.is_empty(); @@ -81,11 +81,11 @@ pub fn profile( .user_teams .users .iter() - .find(|u| u.user_id == user.user_id)); + .find(|u| Some(common::Uuid::from(u.user_id)) == user.user_id)); let team_ids = user .teams .iter() - .map(|t| Ok(unwrap!(t.team_id))) + .map(|t| Ok(common::Uuid::from(t.team_id))) .collect::>>()?; pctx.teams_ctx diff --git a/packages/common/convert/src/fetch/group.rs b/packages/common/convert/src/fetch/group.rs index 93111b0ab2..43c90fccfd 100644 --- a/packages/common/convert/src/fetch/group.rs +++ b/packages/common/convert/src/fetch/group.rs @@ -22,9 +22,12 @@ pub async fn summaries( let (user_teams, teams_res, team_member_count_res) = tokio::try_join!( async { if let Some(current_user_id) = current_user_id { - let user_team_list_res = op!([ctx] user_team_list { - user_ids: vec![current_user_id.into()], - }) + let user_team_list_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::team_list::Input { + user_ids: vec![current_user_id.into()], + }, + ) .await?; Ok(unwrap!(user_team_list_res.users.first()).teams.clone()) @@ -44,7 +47,9 @@ pub async fn summaries( .teams .iter() .map(|team| { - let is_current_identity_member = user_teams.iter().any(|t| t.team_id == team.team_id); + let is_current_identity_member = user_teams + .iter() + .any(|t| Some(common::Uuid::from(t.team_id)) == team.team_id); convert::group::summary( ctx.config(), diff --git a/packages/common/convert/src/fetch/identity.rs b/packages/common/convert/src/fetch/identity.rs index 2f92569430..9d4f69f4f3 100644 --- a/packages/common/convert/src/fetch/identity.rs +++ b/packages/common/convert/src/fetch/identity.rs @@ -1,5 +1,5 @@ use proto::{ - backend::{self, pkg::*}, + backend::{self}, common, }; use rivet_api::models; @@ -9,25 +9,19 @@ use crate::convert; #[derive(Debug)] pub struct TeamsCtx { - pub user_teams: user::team_list::Response, + pub user_teams: ::user::ops::team_list::Output, pub teams: Vec, } pub async fn handles( ctx: &OperationContext<()>, current_user_id: Uuid, - raw_user_ids: Vec, + user_ids: Vec, ) -> GlobalResult> { - if raw_user_ids.is_empty() { + if user_ids.is_empty() { return Ok(Vec::new()); } - let user_ids = raw_user_ids - .clone() - .into_iter() - .map(Into::into) - .collect::>(); - let users = users(ctx, user_ids.clone()).await?; // Convert all data @@ -41,18 +35,12 @@ pub async fn handles( pub async fn summaries( ctx: &OperationContext<()>, current_user_id: Uuid, - raw_user_ids: Vec, + user_ids: Vec, ) -> GlobalResult> { - if raw_user_ids.is_empty() { + if user_ids.is_empty() { return Ok(Vec::new()); } - let user_ids = raw_user_ids - .clone() - .into_iter() - .map(Into::into) - .collect::>(); - let users = users(ctx, user_ids.clone()).await?; // Convert all data @@ -79,7 +67,7 @@ pub async fn profiles( .collect::>(); let (users, teams_ctx, linked_accounts) = tokio::try_join!( - users(ctx, user_ids.clone()), + users(ctx, raw_user_ids.clone()), teams(ctx, user_ids.clone()), linked_accounts(ctx, user_ids.clone()), )?; @@ -105,18 +93,27 @@ pub async fn profiles( pub async fn users( ctx: &OperationContext<()>, - user_ids: Vec, -) -> GlobalResult { - op!([ctx] user_get { - user_ids: user_ids, - }) + user_ids: Vec, +) -> GlobalResult<::user::ops::get::Output> { + chirp_workflow::compat::op( + &ctx, + ::user::ops::get::Input { + user_ids: user_ids, + }, + ) .await } async fn teams(ctx: &OperationContext<()>, user_ids: Vec) -> GlobalResult { - let user_teams_res = op!([ctx] user_team_list { - user_ids: user_ids, - }) + let user_teams_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::team_list::Input { + user_ids: user_ids + .iter() + .map(|x| (*x).into()) + .collect::>(), + }, + ) .await?; let team_ids = user_teams_res @@ -125,7 +122,7 @@ async fn teams(ctx: &OperationContext<()>, user_ids: Vec) -> Globa .map(|user| { user.teams .iter() - .map(|t| Ok(unwrap!(t.team_id))) + .map(|t| Ok(common::Uuid::from(t.team_id))) .collect::>>() }) .collect::>>()? @@ -150,9 +147,15 @@ async fn teams(ctx: &OperationContext<()>, user_ids: Vec) -> Globa async fn linked_accounts( ctx: &OperationContext<()>, user_ids: Vec, -) -> GlobalResult { - op!([ctx] user_identity_get { - user_ids: user_ids.clone(), - }) - .await +) -> GlobalResult<::user::ops::identity::get::Output> { + Ok(chirp_workflow::compat::op( + &ctx, + ::user::ops::identity::get::Input { + user_ids: user_ids + .iter() + .map(|i| i.as_uuid()) + .collect::>() + }, + ) + .await?) } diff --git a/packages/infra/server/src/run_config.rs b/packages/infra/server/src/run_config.rs index 2d0c403e1a..b957d1ad72 100644 --- a/packages/infra/server/src/run_config.rs +++ b/packages/infra/server/src/run_config.rs @@ -282,7 +282,7 @@ pub fn config(rivet_config: rivet_config::Config) -> Result { SqlService { kind: SqlServiceKind::CockroachDB, migrations: include_dir!( - "$CARGO_MANIFEST_DIR/../../services/user-identity/db/user-identity" + "$CARGO_MANIFEST_DIR/../../services/user/db/user-identity" ), db_name: "db_user_identity", }, diff --git a/packages/services/faker/ops/user/Cargo.toml b/packages/services/faker/ops/user/Cargo.toml index 67a500e360..9043aa7d3e 100644 --- a/packages/services/faker/ops/user/Cargo.toml +++ b/packages/services/faker/ops/user/Cargo.toml @@ -12,5 +12,5 @@ rivet-operation.workspace = true [dev-dependencies] chirp-worker.workspace = true - -user-get.workspace = true +chirp-workflow.workspace = true +user.workspace = true diff --git a/packages/services/faker/ops/user/tests/integration.rs b/packages/services/faker/ops/user/tests/integration.rs index 0aca4bdf8f..05c0c08fb2 100644 --- a/packages/services/faker/ops/user/tests/integration.rs +++ b/packages/services/faker/ops/user/tests/integration.rs @@ -4,8 +4,8 @@ use chirp_worker::prelude::*; async fn empty(ctx: TestCtx) { let res = op!([ctx] faker_user {}).await.unwrap(); - let get_res = op!([ctx] user_get { - user_ids: vec![res.user_id.unwrap()], + let get_res = (*ctx).op(::user::ops::get::Input { + user_ids: vec![res.user_id.unwrap().as_uuid()], }) .await .unwrap(); diff --git a/packages/services/load-test/standalone/api-cloud/Cargo.toml b/packages/services/load-test/standalone/api-cloud/Cargo.toml index 26c6bac6e0..81ef886927 100644 --- a/packages/services/load-test/standalone/api-cloud/Cargo.toml +++ b/packages/services/load-test/standalone/api-cloud/Cargo.toml @@ -7,6 +7,7 @@ edition.workspace = true [dependencies] chirp-client.workspace = true +chirp-workflow.workspace = true rivet-operation.workspace = true rivet-connection.workspace = true rivet-runtime.workspace = true @@ -23,7 +24,7 @@ reqwest = "0.11" faker-game.workspace = true faker-team.workspace = true -user-identity-create.workspace = true +user.workspace = true token-create.workspace = true rivet-config.workspace = true diff --git a/packages/services/load-test/standalone/api-cloud/src/lib.rs b/packages/services/load-test/standalone/api-cloud/src/lib.rs index 15a685bee7..65ce64f882 100644 --- a/packages/services/load-test/standalone/api-cloud/src/lib.rs +++ b/packages/services/load-test/standalone/api-cloud/src/lib.rs @@ -44,14 +44,17 @@ pub async fn run_from_env( let primary_user_id = create_res.member_user_ids[0].as_uuid(); // Register user - op!([ctx] user_identity_create { - user_id: Some(primary_user_id.into()), - identity: Some(backend::user_identity::Identity { - kind: Some(backend::user_identity::identity::Kind::Email(backend::user_identity::identity::Email { - email: util::faker::email() - })) - }) - }) + chirp_workflow::compat::op( + &ctx, + ::user::ops::identity::create::Input { + user_id: primary_user_id, + identity: backend::user_identity::Identity { + kind: Some(backend::user_identity::identity::Kind::Email(backend::user_identity::identity::Email { + email: util::faker::email() + })) + } + } + ) .await?; (team_id, primary_user_id) diff --git a/packages/services/load-test/standalone/watch-requests/Cargo.toml b/packages/services/load-test/standalone/watch-requests/Cargo.toml index 123a94828f..5fb12c92a5 100644 --- a/packages/services/load-test/standalone/watch-requests/Cargo.toml +++ b/packages/services/load-test/standalone/watch-requests/Cargo.toml @@ -7,6 +7,7 @@ edition.workspace = true [dependencies] chirp-client.workspace = true +chirp-workflow.workspace = true rivet-operation.workspace = true rivet-connection.workspace = true rivet-runtime.workspace = true @@ -19,7 +20,7 @@ reqwest = "0.11" faker-game.workspace = true faker-team.workspace = true -user-identity-create.workspace = true +user.workspace = true token-create.workspace = true rivet-config.workspace = true diff --git a/packages/services/load-test/standalone/watch-requests/src/lib.rs b/packages/services/load-test/standalone/watch-requests/src/lib.rs index c538e46ae7..ad7dca82b8 100644 --- a/packages/services/load-test/standalone/watch-requests/src/lib.rs +++ b/packages/services/load-test/standalone/watch-requests/src/lib.rs @@ -44,14 +44,17 @@ pub async fn run_from_env( let primary_user_id = create_res.member_user_ids[0].as_uuid(); // Register user - op!([ctx] user_identity_create { - user_id: Some(primary_user_id.into()), - identity: Some(backend::user_identity::Identity { - kind: Some(backend::user_identity::identity::Kind::Email(backend::user_identity::identity::Email { - email: util::faker::email() - })) - }) - }) + chirp_workflow::compat::op( + &ctx, + ::user::ops::identity::create::Input { + user_id: primary_user_id, + identity: backend::user_identity::Identity { + kind: Some(backend::user_identity::identity::Kind::Email(backend::user_identity::identity::Email { + email: util::faker::email() + })) + } + } + ) .await?; (team_id, primary_user_id) diff --git a/packages/services/mm/util/Cargo.toml b/packages/services/mm/util/Cargo.toml index 5a9b788353..83b76264a8 100644 --- a/packages/services/mm/util/Cargo.toml +++ b/packages/services/mm/util/Cargo.toml @@ -8,6 +8,7 @@ edition.workspace = true [dependencies] bit-vec = "0.6" chirp-client.workspace = true +chirp-workflow.workspace = true heck = "0.3" http = "0.2" rivet-operation.workspace = true @@ -20,4 +21,4 @@ uuid = { version = "1", features = ["v4", "serde"] } ip-info.workspace = true mm-lobby-list-for-user-id.workspace = true region-get.workspace = true -user-identity-get.workspace = true +user.workspace = true diff --git a/packages/services/mm/util/src/verification.rs b/packages/services/mm/util/src/verification.rs index ae51d87f55..2df5828944 100644 --- a/packages/services/mm/util/src/verification.rs +++ b/packages/services/mm/util/src/verification.rs @@ -231,9 +231,12 @@ pub async fn verify_config( // Verify identity requirement match (highest_identity_requirement, opts.user_id) { (backend::matchmaker::IdentityRequirement::Registered, Some(user_id)) => { - let user_identities_res = op!([ctx] user_identity_get { - user_ids: vec![user_id.into()], - }) + let user_identities_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::identity::get::Input { + user_ids: vec![user_id] + }, + ) .await?; let user = unwrap!( user_identities_res.users.first(), diff --git a/packages/services/mm/worker/Cargo.toml b/packages/services/mm/worker/Cargo.toml index 0bdba0c5b1..613d6827ee 100644 --- a/packages/services/mm/worker/Cargo.toml +++ b/packages/services/mm/worker/Cargo.toml @@ -74,4 +74,4 @@ mm-config-version-get.workspace = true mm-lobby-get.workspace = true mm-lobby-player-count.workspace = true mm-player-count-for-namespace.workspace = true -user-identity-create.workspace = true +user.workspace = true diff --git a/packages/services/mm/worker/tests/lobby_find.rs b/packages/services/mm/worker/tests/lobby_find.rs index f014bad178..8f0592aefa 100644 --- a/packages/services/mm/worker/tests/lobby_find.rs +++ b/packages/services/mm/worker/tests/lobby_find.rs @@ -527,8 +527,8 @@ async fn registered_verification(ctx: TestCtx) { ); let email = util::faker::email(); - op!([ctx] user_identity_create { - user_id: user_res.user_id, + ctx.op(::user::ops::identity::create::Input { + user_id: user_id, identity: Some(backend::user_identity::Identity { kind: Some(backend::user_identity::identity::Kind::Email( backend::user_identity::identity::Email { diff --git a/packages/services/user/Cargo.toml b/packages/services/user/Cargo.toml index b3085dc82b..b1329396cd 100644 --- a/packages/services/user/Cargo.toml +++ b/packages/services/user/Cargo.toml @@ -6,9 +6,27 @@ license.workspace = true edition.workspace = true [dependencies] +serde = { version = "1.0.198", features = ["derive"] } chirp-workflow.workspace = true cluster.workspace = true +email-address-parser = "1.0.1" server-spec.workspace = true linode.workspace = true rivet-config.workspace = true +rivet-operation.workspace = true +token-create.workspace = true +upload-file-list.workspace = true +upload-get.workspace = true +upload-complete.workspace = true + +[dependencies.sqlx] +workspace = true +default-features = false + +[dev-dependencies] +faker-user.workspace = true +rand = "0.8" +reqwest = "0.11" +upload-get.workspace = true +upload-prepare.workspace = true \ No newline at end of file diff --git a/packages/services/user-identity/db/user-identity/migrations/20200101000000_init.down.sql b/packages/services/user/db/user-identity/migrations/20200101000000_init.down.sql similarity index 100% rename from packages/services/user-identity/db/user-identity/migrations/20200101000000_init.down.sql rename to packages/services/user/db/user-identity/migrations/20200101000000_init.down.sql diff --git a/packages/services/user-identity/db/user-identity/migrations/20200101000000_init.up.sql b/packages/services/user/db/user-identity/migrations/20200101000000_init.up.sql similarity index 100% rename from packages/services/user-identity/db/user-identity/migrations/20200101000000_init.up.sql rename to packages/services/user/db/user-identity/migrations/20200101000000_init.up.sql diff --git a/packages/services/user-identity/db/user-identity/migrations/20230101105612_index_user.down.sql b/packages/services/user/db/user-identity/migrations/20230101105612_index_user.down.sql similarity index 100% rename from packages/services/user-identity/db/user-identity/migrations/20230101105612_index_user.down.sql rename to packages/services/user/db/user-identity/migrations/20230101105612_index_user.down.sql diff --git a/packages/services/user-identity/db/user-identity/migrations/20230101105612_index_user.up.sql b/packages/services/user/db/user-identity/migrations/20230101105612_index_user.up.sql similarity index 100% rename from packages/services/user-identity/db/user-identity/migrations/20230101105612_index_user.up.sql rename to packages/services/user/db/user-identity/migrations/20230101105612_index_user.up.sql diff --git a/packages/services/user-identity/db/user-identity/migrations/20231116005712_access_tokens.down.sql b/packages/services/user/db/user-identity/migrations/20231116005712_access_tokens.down.sql similarity index 100% rename from packages/services/user-identity/db/user-identity/migrations/20231116005712_access_tokens.down.sql rename to packages/services/user/db/user-identity/migrations/20231116005712_access_tokens.down.sql diff --git a/packages/services/user-identity/db/user-identity/migrations/20231116005712_access_tokens.up.sql b/packages/services/user/db/user-identity/migrations/20231116005712_access_tokens.up.sql similarity index 100% rename from packages/services/user-identity/db/user-identity/migrations/20231116005712_access_tokens.up.sql rename to packages/services/user/db/user-identity/migrations/20231116005712_access_tokens.up.sql diff --git a/packages/services/user-identity/db/user-identity/migrations/20241104000750_drop_access_tokens.down.sql b/packages/services/user/db/user-identity/migrations/20241104000750_drop_access_tokens.down.sql similarity index 100% rename from packages/services/user-identity/db/user-identity/migrations/20241104000750_drop_access_tokens.down.sql rename to packages/services/user/db/user-identity/migrations/20241104000750_drop_access_tokens.down.sql diff --git a/packages/services/user-identity/db/user-identity/migrations/20241104000750_drop_access_tokens.up.sql b/packages/services/user/db/user-identity/migrations/20241104000750_drop_access_tokens.up.sql similarity index 100% rename from packages/services/user-identity/db/user-identity/migrations/20241104000750_drop_access_tokens.up.sql rename to packages/services/user/db/user-identity/migrations/20241104000750_drop_access_tokens.up.sql diff --git a/packages/services/user/ops/profile-validate/Cargo.toml b/packages/services/user/ops/profile-validate/Cargo.toml index a18293de50..10b62fa785 100644 --- a/packages/services/user/ops/profile-validate/Cargo.toml +++ b/packages/services/user/ops/profile-validate/Cargo.toml @@ -8,9 +8,10 @@ edition.workspace = true [dependencies] rivet-operation.workspace = true chirp-client.workspace = true +chirp-workflow.workspace = true prost = "0.10" -user-get.workspace = true +user.workspace = true [dependencies.sqlx] workspace = true diff --git a/packages/services/user/ops/profile-validate/src/lib.rs b/packages/services/user/ops/profile-validate/src/lib.rs index b0d53a690d..f5a8b683d7 100644 --- a/packages/services/user/ops/profile-validate/src/lib.rs +++ b/packages/services/user/ops/profile-validate/src/lib.rs @@ -45,9 +45,12 @@ async fn handle( if ctx.display_name.is_none() || ctx.account_number.is_none() { let user_id = unwrap_ref!(ctx.user_id); - let users_res = op!([ctx] user_get { - user_ids: vec![*user_id], - }) + let users_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::get::Input { + user_ids: vec![(*user_id).as_uuid()], + }, + ) .await?; let user = users_res.users.first(); diff --git a/packages/services/user/src/ops/avatar_upload_complete.rs b/packages/services/user/src/ops/avatar_upload_complete.rs new file mode 100644 index 0000000000..23913ff6d8 --- /dev/null +++ b/packages/services/user/src/ops/avatar_upload_complete.rs @@ -0,0 +1,63 @@ +use chirp_workflow::prelude::*; +use rivet_operation::prelude::proto; +use proto::backend::{pkg::*}; +use serde_json::json; + +#[derive(Debug)] +pub struct Input { + pub user_id: Uuid, + pub upload_id: Uuid, +} + +#[derive(Debug)] +pub struct Output {} + + +#[operation] +pub async fn avatar_upload_complete( + ctx: &OperationCtx, + input: &Input +) -> GlobalResult { + let user_id = input.user_id; + + op!([ctx] upload_complete { + upload_id: Some(input.upload_id.into()), + bucket: Some("bucket-user-avatar".into()), + }) + .await?; + + // Set avatar id + sql_execute!( + [ctx] + " + UPDATE db_user.users set profile_id = $2 + WHERE user_id = $1 + ", + user_id, + input.upload_id, + ) + .await?; + + ctx.cache().purge("user", [user_id]).await?; + + msg!([ctx] user::msg::update(user_id) { + user_id: Some(user_id.into()), + }) + .await?; + + msg!([ctx] analytics::msg::event_create() { + events: vec![ + analytics::msg::event_create::Event { + event_id: Some(Uuid::new_v4().into()), + name: "user.avatar_set".into(), + properties_json: Some(serde_json::to_string(&json!({ + "user_id": user_id, + }))?), + ..Default::default() + } + ], + }) + .await?; + + Ok(Output {}) +} \ No newline at end of file diff --git a/packages/services/user/src/ops/get.rs b/packages/services/user/src/ops/get.rs new file mode 100644 index 0000000000..221666902b --- /dev/null +++ b/packages/services/user/src/ops/get.rs @@ -0,0 +1,140 @@ +use chirp_workflow::prelude::*; +use rivet_operation::prelude::{common,proto}; +use proto::backend; + +#[derive(Debug)] +pub struct Input { + pub user_ids: Vec, +} + +#[derive(Debug)] +pub struct Output { + pub users: Vec +} + + +#[derive(Debug, Default, Serialize, Deserialize, sqlx::FromRow)] +struct CacheUser { + user_id: Uuid, + display_name: String, + account_number: i64, + avatar_id: String, + profile_id: Option, + join_ts: i64, + bio: String, + is_admin: bool, + delete_request_ts: Option, + delete_complete_ts: Option, +} + + +#[operation] +pub async fn get(ctx: &OperationCtx, input: &Input) -> GlobalResult { + let user_ids = input.user_ids.clone(); + + let users = ctx + .cache() + .fetch_all_json("user", user_ids, { + let ctx = ctx.clone(); + move |mut cache, user_ids| { + let ctx = ctx.clone(); + async move { + let users = sql_fetch_all!( + [ctx, CacheUser] + " + SELECT + user_id, + display_name, + account_number, + avatar_id, + profile_id, + join_ts, + bio, + is_admin, + delete_request_ts, + delete_complete_ts + FROM db_user.users + WHERE user_id = ANY($1) + ", + user_ids + ) + .await?; + + for row in users { + cache.resolve(&row.user_id.clone(), row); + } + + Ok(cache) + } + } + }) + .await?; + + let upload_ids = users + .iter() + .filter_map(|x| x.profile_id) + .map(Into::::into) + .collect::>(); + + let (upload_res, files_res) = tokio::try_join!( + op!([ctx] upload_get { + upload_ids: upload_ids.clone() + }), + op!([ctx] upload_file_list { + upload_ids: upload_ids.clone(), + }) + )?; + + Ok(Output { + users: users + .into_iter() + .map(|user| { + let profile_id = user.profile_id.map(Into::::into); + + // Fetch all information relating to the profile image + let (profile_upload_complete_ts, profile_file_name, profile_provider) = { + let upload = upload_res + .uploads + .iter() + .find(|upload| upload.upload_id == profile_id); + let file = files_res + .files + .iter() + .find(|file| file.upload_id == profile_id); + + if let (Some(upload), Some(file)) = (upload, file) { + // TODO: Why do we parse the file name here? Based on route.rs in utils shouldn't + // the entire path be present in the media url? + let profile_file_name = file + .path + .rsplit_once('/') + .map(|(_, file_name)| file_name.to_owned()) + .or(Some(file.path.clone())); + (upload.complete_ts, profile_file_name, Some(upload.provider)) + } else { + Default::default() + } + }; + + backend::user::User { + user_id: Some(user.user_id.into()), + display_name: user.display_name, + account_number: user.account_number as u32, + avatar_id: user.avatar_id, + profile_upload_id: if profile_upload_complete_ts.is_some() { + profile_id + } else { + None + }, + profile_file_name, + profile_provider, + join_ts: user.join_ts, + bio: user.bio, + is_admin: user.is_admin, + delete_request_ts: user.delete_request_ts, + delete_complete_ts: user.delete_complete_ts, + } + }) + .collect::>(), + }) +} \ No newline at end of file diff --git a/packages/services/user/src/ops/identity/create.rs b/packages/services/user/src/ops/identity/create.rs new file mode 100644 index 0000000000..92123e17b4 --- /dev/null +++ b/packages/services/user/src/ops/identity/create.rs @@ -0,0 +1,78 @@ +use chirp_workflow::prelude::*; +use email_address_parser::EmailAddress; +use rivet_operation::prelude::proto; +use proto::backend::{self,pkg::*}; +use serde_json::json; + +#[derive(Debug)] +pub struct Input { + pub user_id: Uuid, + pub identity: backend::user_identity::Identity, +} + +#[derive(Debug)] +pub struct Output {} + + +#[operation] +pub async fn create( + ctx: &OperationCtx, + input: &Input +) -> GlobalResult { + let user_id = input.user_id; + let identity = &input.identity; + let identity_kind = unwrap_ref!(identity.kind); + + match &identity_kind { + backend::user_identity::identity::Kind::Email(email) => { + ensure!(EmailAddress::is_valid(&email.email, None), "invalid email"); + + sql_execute!( + [ctx] + " + INSERT INTO db_user_identity.emails (email, user_id, create_ts) + VALUES ($1, $2, $3) + ", + &email.email, + user_id, + ctx.ts(), + ) + .await?; + + msg!([ctx] analytics::msg::event_create() { + events: vec![ + analytics::msg::event_create::Event { + event_id: Some(Uuid::new_v4().into()), + name: "user_identity.create".into(), + properties_json: Some(serde_json::to_string(&json!({ + "identity_email": email.email, + "user_id": user_id, + }))?), + ..Default::default() + } + ], + }) + .await?; + } + backend::user_identity::identity::Kind::DefaultUser(_) => { + bail!("cannot create default user identity") + } + } + + ctx.cache() + .purge("user_identity.identity", [user_id]) + .await?; + + msg!([ctx] user_identity::msg::create_complete(user_id) { + user_id: Some(user_id.into()), + identity: Some(identity.clone()), + }) + .await?; + + msg!([ctx] user::msg::update(user_id) { + user_id: Some(user_id.into()), + }) + .await?; + + Ok(Output {}) +} \ No newline at end of file diff --git a/packages/services/user/src/ops/identity/delete.rs b/packages/services/user/src/ops/identity/delete.rs new file mode 100644 index 0000000000..13254049d1 --- /dev/null +++ b/packages/services/user/src/ops/identity/delete.rs @@ -0,0 +1,32 @@ +use chirp_workflow::prelude::*; + +#[derive(Debug)] +pub struct Input { + pub user_ids: Vec +} + +#[derive(Debug)] +pub struct Output {} + + +#[operation] +pub async fn create( + ctx: &OperationCtx, + input: &Input +) -> GlobalResult { + sql_execute!( + [ctx] + " + DELETE FROM db_user_identity.emails + WHERE user_id = ANY($1) + ", + &input.user_ids, + ) + .await?; + + ctx.cache() + .purge("user_identity.identity", input.user_ids.clone()) + .await?; + + Ok(Output {}) +} \ No newline at end of file diff --git a/packages/services/user/src/ops/identity/get.rs b/packages/services/user/src/ops/identity/get.rs new file mode 100644 index 0000000000..ce352f9503 --- /dev/null +++ b/packages/services/user/src/ops/identity/get.rs @@ -0,0 +1,123 @@ +use chirp_workflow::prelude::*; +use proto::backend::{self}; +use rivet_operation::prelude::{proto}; + + +#[derive(Debug, Default, Serialize, Deserialize, sqlx::FromRow)] +struct CacheUserIdentity { + user_id: Uuid, + email: Option, +} + +#[derive(Debug)] +pub struct Input { + pub user_ids: Vec, +} + +#[derive(Debug)] +pub struct Output { + pub users: Vec +} + +#[derive(Debug)] +pub struct User { + pub user_id: Uuid, + pub identities: Vec +} + + +#[operation] +pub async fn get( + ctx: &OperationCtx, + input: &Input +) -> GlobalResult { + let is_development = ctx.config().server()?.rivet.auth.access_kind + == rivet_config::config::rivet::AccessKind::Development; + + let user_ids = &input.user_ids; + // Get the user display names + let users = ctx.op(crate::ops::get::Input { + user_ids: user_ids.clone(), + }) + .await?; + + // Fetch the identities + let identities = ctx + .cache() + .fetch_all_json("user_identity.identity", user_ids.clone(), { + let ctx = ctx.clone(); + move |mut cache, user_ids| { + let ctx = ctx.clone(); + async move { + let identity_rows = sql_fetch_all!( + [ctx, CacheUserIdentity] + " + SELECT e.user_id AS user_id, e.email + FROM db_user_identity.emails as e + WHERE e.user_id = ANY($1) + ", + &user_ids, + ) + .await?; + + for row in identity_rows { + cache.resolve( + &row.user_id.clone(), + row, + ); + } + + Ok(cache) + } + } + }) + .await?; + + let users = user_ids + .iter() + .filter_map(|user_id| { + // Find matching user + let Some(user) = users + .users + .iter() + .find(|x| x.user_id.map(|x| x.as_uuid()) == Some(*user_id)) + else { + return None; + }; + + // Find matching identities + let mut identities = identities + .iter() + .filter(|x| x.user_id == *user_id) + .flat_map(|x| { + IntoIterator::into_iter([x.email.as_ref().map(|email| { + backend::user_identity::Identity { + kind: Some(backend::user_identity::identity::Kind::Email( + backend::user_identity::identity::Email { + email: email.clone(), + }, + )), + } + })]) + .flatten() + }) + .collect::>(); + + // Inject identity for development users since they should behave like registered users. + if is_development && user.display_name == util::dev_defaults::USER_NAME { + identities.push(backend::user_identity::Identity { + kind: Some(backend::user_identity::identity::Kind::DefaultUser( + backend::user_identity::identity::DefaultUser {}, + )), + }) + } + + Some(User { + user_id: *user_id, + identities, + }) + }) + .collect::>(); + + Ok(Output { users }) +} diff --git a/packages/services/user/src/ops/identity/mod.rs b/packages/services/user/src/ops/identity/mod.rs new file mode 100644 index 0000000000..ca15fa0ec9 --- /dev/null +++ b/packages/services/user/src/ops/identity/mod.rs @@ -0,0 +1,3 @@ +pub mod create; +pub mod delete; +pub mod get; \ No newline at end of file diff --git a/packages/services/user/src/ops/mod.rs b/packages/services/user/src/ops/mod.rs index b081cc6b49..3dfc7ed2ae 100644 --- a/packages/services/user/src/ops/mod.rs +++ b/packages/services/user/src/ops/mod.rs @@ -1 +1,10 @@ +pub mod avatar_upload_complete; +pub mod get; +pub mod pending_delete_toggle; +pub mod profile_validate; pub mod resolve_display_name; +pub mod resolve_email; +pub mod team_list; +pub mod token_create; + +pub mod identity; \ No newline at end of file diff --git a/packages/services/user/src/ops/pending_delete_toggle.rs b/packages/services/user/src/ops/pending_delete_toggle.rs new file mode 100644 index 0000000000..ff762296b4 --- /dev/null +++ b/packages/services/user/src/ops/pending_delete_toggle.rs @@ -0,0 +1,47 @@ +use chirp_workflow::prelude::*; +use proto::backend::{pkg::*}; +use rivet_operation::prelude::proto; + +#[derive(Debug)] +pub struct Input { + pub user_id: Uuid, + pub active: bool +} + +#[derive(Debug)] +pub struct Output { +} + + +#[operation] +pub async fn pending_delete_toggle( + ctx: &OperationCtx, + input: &Input +) -> GlobalResult { + let user_id = input.user_id; + + // Verify the user is registered + let identity = ctx.op(crate::ops::identity::get::Input { + user_ids: vec![user_id] + }) + .await?; + let identities = &unwrap_ref!(identity.users.first()).identities; + ensure_with!(!identities.is_empty(), IDENTITY_NOT_REGISTERED); + + sql_execute!( + [ctx] + "UPDATE db_user.users SET delete_request_ts = $2 WHERE user_id = $1", + user_id, + input.active.then(util::timestamp::now), + ) + .await?; + + ctx.cache().purge("user", [user_id]).await?; + + msg!([ctx] user::msg::update(user_id) { + user_id: Some(user_id.into()), + }) + .await?; + + Ok(Output {}) +} \ No newline at end of file diff --git a/packages/services/user/src/ops/profile_validate.rs b/packages/services/user/src/ops/profile_validate.rs new file mode 100644 index 0000000000..9b0c1a2e02 --- /dev/null +++ b/packages/services/user/src/ops/profile_validate.rs @@ -0,0 +1,109 @@ +use chirp_workflow::prelude::*; +use rivet_operation::prelude::common; + +#[derive(Debug)] +pub struct Input { + pub user_id: Uuid, + pub display_name: Option, + pub account_number: Option, + pub bio: Option, +} + +#[derive(Debug)] +pub struct Output { + pub errors: Vec, +} + + +#[operation] +pub async fn profile_validate( + ctx: &OperationCtx, + input: &Input +) -> GlobalResult { + let mut errors = Vec::new(); + + // Validate display name + if let Some(display_name) = &input.display_name { + if display_name.is_empty() { + errors.push(util::err_path!["display-name", "too-short"]); + } else if display_name.len() > util::check::MAX_DISPLAY_NAME_LEN { + errors.push(util::err_path!["display-name", "too-long"]); + } + + if !util::check::display_name(display_name) { + errors.push(util::err_path!["display-name", "invalid"]); + } + } + + // Validate account number + if let Some(account_number) = &input.account_number { + if *account_number < 1 || *account_number > 9999 { + errors.push(util::err_path!["account-number-invalid"]); + } + } + + // Validate biography + if let Some(bio) = &input.bio { + if bio.len() > util::check::MAX_BIOGRAPHY_LEN { + errors.push(util::err_path!["bio", "too-long"]); + } + + if !util::check::biography(bio) { + errors.push(util::err_path!["bio", "invalid"]); + } + } + + // Only validate handle uniqueness if at least one of the two handle components is given + if input.display_name.is_some() || input.account_number.is_some() { + // If either the display name or account number are missing, fetch them from the given user + let (display_name, account_number) = + if input.display_name.is_none() || input.account_number.is_none() { + let users_res = (*ctx).op(crate::ops::get::Input { + user_ids: vec![input.user_id], + }) + .await?; + + let user = users_res.users.first(); + let user = unwrap_ref!(user, "user not found"); + + ( + input.display_name + .clone() + .unwrap_or(user.display_name.clone()), + input.account_number.unwrap_or(user.account_number), + ) + } else { + ( + unwrap_ref!(input.display_name).clone(), + *unwrap_ref!(input.account_number), + ) + }; + + // Find user by handle + let (user_exists,) = sql_fetch_one!( + [ctx, (bool,)] + " + SELECT EXISTS ( + SELECT 1 + FROM db_user.users + WHERE display_name = $1 and account_number = $2 + ) + ", + display_name, + account_number as i64, + ) + .await?; + + // Validate handle uniqueness + if user_exists { + errors.push(util::err_path!["handle-not-unique"]); + } + } + + Ok(Output { + errors: errors + .into_iter() + .map(|path| common::ValidationError { path }) + .collect::>(), + }) +} \ No newline at end of file diff --git a/packages/services/user/src/ops/resolve_email.rs b/packages/services/user/src/ops/resolve_email.rs new file mode 100644 index 0000000000..853e64a94f --- /dev/null +++ b/packages/services/user/src/ops/resolve_email.rs @@ -0,0 +1,42 @@ +use chirp_workflow::prelude::*; + +#[derive(Debug)] +pub struct Input { + pub emails: Vec, +} + +#[derive(Debug)] +pub struct Output { + pub users: Vec, +} + +#[derive(Debug, sqlx::FromRow)] +pub struct User { + pub email: String, + pub user_id: Uuid, +} + +#[operation] +pub async fn resolve_email( + ctx: &OperationCtx, + input: &Input +) -> GlobalResult { + let users = sql_fetch_all!( + [ctx, User] + " + SELECT email, user_id + FROM db_user_identity.emails + WHERE email = ANY($1) + ", + &input.emails, + ) + .await? + .into_iter() + .map(|row| User { + email: row.email, + user_id: row.user_id, + }) + .collect::>(); + + Ok(Output { users }) +} diff --git a/packages/services/user/src/ops/team_list.rs b/packages/services/user/src/ops/team_list.rs new file mode 100644 index 0000000000..b0e11b90a0 --- /dev/null +++ b/packages/services/user/src/ops/team_list.rs @@ -0,0 +1,67 @@ +use chirp_workflow::prelude::*; + +#[derive(Debug)] +pub struct Input { + pub user_ids: Vec, +} + +#[derive(Debug)] +pub struct Output { + pub users: Vec, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct UserTeams { + pub user_id: Uuid, + pub teams: Vec, +} + +#[derive(Debug, Default, Serialize, Deserialize, Clone)] +pub struct TeamMember { + pub team_id: Uuid, +} + +#[operation] +pub async fn team_list(ctx: &OperationCtx, input: &Input) -> GlobalResult { + let users = ctx + .cache() + .fetch_all_json("user_team_list", input.user_ids.clone(), { + let ctx = ctx.clone(); + move |mut cache, user_ids| { + let ctx = ctx.clone(); + async move { + let team_members = sql_fetch_all!( + [ctx, (Uuid, Uuid)] + " + SELECT user_id, team_id + FROM db_team.team_members + WHERE user_id = ANY($1) + ", + &user_ids, + ) + .await?; + + for user_id in user_ids { + // Aggregate user teams + let user_teams = UserTeams { + user_id: user_id, + teams: team_members + .iter() + .filter(|(team_user_id, _)| *team_user_id == user_id) + .map(|(_, team_id)| TeamMember { + team_id: *team_id, + }) + .collect(), + }; + + cache.resolve(&user_id.clone(), user_teams); + } + + Ok(cache) + } + } + }) + .await?; + + Ok(Output { users }) +} diff --git a/packages/services/user/src/ops/token_create.rs b/packages/services/user/src/ops/token_create.rs new file mode 100644 index 0000000000..535650c932 --- /dev/null +++ b/packages/services/user/src/ops/token_create.rs @@ -0,0 +1,68 @@ +use chirp_workflow::prelude::*; +use proto::backend::{self, pkg::*}; +use rivet_operation::prelude::proto; + +// Also see api/auth/src/route/tokens.rs +pub const TOKEN_TTL: i64 = util::duration::minutes(15); +pub const REFRESH_TOKEN_TTL: i64 = util::duration::days(90); + +#[derive(Debug)] +pub struct Input { + pub user_id: Uuid, + pub client: backend::net::ClientInfo, +} + +#[derive(Debug)] +pub struct Output { + pub token: String, + pub refresh_token: String +} + + +#[operation] +pub async fn token_create( + ctx: &OperationCtx, + input: &Input +) -> GlobalResult { + let user_id = input.user_id; + + let token_res = op!([ctx] token_create { + token_config: Some(token::create::request::TokenConfig { + ttl: TOKEN_TTL, + }), + refresh_token_config: Some(token::create::request::TokenConfig { + ttl: REFRESH_TOKEN_TTL, + }), + issuer: Self::NAME.to_owned(), + client: Some(input.client.clone()), + kind: Some(token::create::request::Kind::New( + token::create::request::KindNew { + entitlements: vec![proto::claims::Entitlement { + kind: Some(proto::claims::entitlement::Kind::User(proto::claims::entitlement::User { + user_id: Some(user_id.into()), + })), + }], + }, + )), + label: Some("usr".into()), + ..Default::default() + }) + .await?; + + let token = unwrap_ref!(token_res.token); + let refresh_token = unwrap_ref!(token_res.refresh_token); + let token_session_id = unwrap_ref!(token_res.session_id).as_uuid(); + + sql_execute!( + [ctx] + "INSERT INTO db_user.user_tokens (user_id, token_session_id) VALUES ($1, $2)", + user_id, + token_session_id, + ) + .await?; + + Ok(Output { + token: token.token.clone(), + refresh_token: refresh_token.token.clone(), + }) +} diff --git a/packages/services/user/tests/avatar_upload_complete.rs b/packages/services/user/tests/avatar_upload_complete.rs new file mode 100644 index 0000000000..37caf9abc8 --- /dev/null +++ b/packages/services/user/tests/avatar_upload_complete.rs @@ -0,0 +1,62 @@ +use chirp_workflow::prelude::*; +use rivet_operation::prelude::proto::backend; + +const TEST_BODY: &[u8] = b"test file"; + +#[workflow_test] +async fn empty(ctx: TestCtx) { + let user_res = op!([ctx] faker_user {}).await.unwrap(); + let user_id = user_res.user_id.unwrap().as_uuid(); + + // Create the upload + let upload_prepare_res = op!([ctx] upload_prepare { + bucket: "bucket-user-avatar".into(), + files: vec![ + backend::upload::PrepareFile { + path: "image.png".to_owned(), + mime: Some("image/png".into()), + content_length: TEST_BODY.len() as u64, + ..Default::default() + }, + ], + }) + .await + .unwrap(); + let upload_id = upload_prepare_res.upload_id.unwrap(); + let presigned_request = upload_prepare_res.presigned_requests.first().unwrap(); + + tracing::info!("writing test files"); + let res = reqwest::Client::new() + .put(&presigned_request.url) + .body(TEST_BODY.to_vec()) + .header("content-type", "image/png") + .send() + .await + .expect("failed to upload"); + if res.status().is_success() { + tracing::info!("uploaded successfully"); + } else { + panic!( + "failed to upload ({}): {:?}", + res.status(), + res.text().await + ); + } + + ctx.op(::user::ops::avatar_upload_complete::Input { + user_id: user_id, + upload_id: upload_id.as_uuid() + }) + .await + .unwrap(); + + let uploads_res = op!([ctx] upload_get { + upload_ids: vec![upload_id] + }) + .await + .unwrap(); + + let upload = uploads_res.uploads.first().unwrap(); + + assert!(upload.complete_ts.is_some(), "Upload did not complete"); +} diff --git a/packages/services/user/tests/get.rs b/packages/services/user/tests/get.rs new file mode 100644 index 0000000000..ab54e6d194 --- /dev/null +++ b/packages/services/user/tests/get.rs @@ -0,0 +1,72 @@ +use chirp_workflow::prelude::*; +use proto::backend::{pkg::*}; +use rivet_operation::prelude::proto; +use rand::Rng; + +#[workflow_test] +async fn empty(ctx: TestCtx) { + let res = ctx.op(::user::ops::get::Input { + user_ids: Vec::new(), + }) + .await + .unwrap(); + assert!(res.users.is_empty()); +} + +#[workflow_test] +async fn fetch(ctx: TestCtx) { + struct TestUser { + user_id: Option, + display_name: String, + account_number: i64, + bio: String, + } + + // Generate test users + let mut users = std::iter::repeat_with(|| TestUser { + user_id: None, + display_name: util::faker::display_name(), + account_number: rand::thread_rng().gen_range(1..10000), + bio: util::faker::ident(), + }) + .take(8) + .collect::>(); + + // Insert test users + for user in &mut users { + let user_res = op!([ctx] faker_user { }).await.unwrap(); + let user_id = user_res.user_id.unwrap().as_uuid(); + + msg!([ctx] user::msg::profile_set(user_id) -> user::msg::update { + user_id: Some(user_id.into()), + display_name: Some(user.display_name.clone()), + account_number: Some(user.account_number as u32), + bio: Some(user.bio.clone()), + }) + .await + .unwrap(); + + user.user_id = Some(user_id); + } + + // Fetch the users + let res = ctx.op(::user::ops::get::Input { + user_ids: users.iter().map(|u| u.user_id.unwrap()).collect(), + }) + .await + .unwrap(); + + // Validate the users + assert_eq!(users.len(), res.users.len()); + for user in &users { + let user_res = res + .users + .iter() + .find(|u| u.user_id.unwrap().as_uuid() == user.user_id.unwrap()) + .expect("user not returned"); + + assert_eq!(user.display_name, user_res.display_name); + assert_eq!(user.account_number, user_res.account_number as i64); + assert_eq!(user.bio, user_res.bio); + } +} diff --git a/packages/services/user/tests/identity_create.rs b/packages/services/user/tests/identity_create.rs new file mode 100644 index 0000000000..d133afb077 --- /dev/null +++ b/packages/services/user/tests/identity_create.rs @@ -0,0 +1,35 @@ +use chirp_workflow::prelude::*; +use rivet_operation::prelude::proto::backend; + +#[workflow_test] +async fn email(ctx: TestCtx) { + let user_res = op!([ctx] faker_user { + ..Default::default() + }) + .await + .unwrap(); + let user_id = user_res.user_id.unwrap().as_uuid(); + + let email = util::faker::email(); + ctx.op(::user::ops::identity::create::Input { + user_id: user_id, + identity: backend::user_identity::Identity { + kind: Some(backend::user_identity::identity::Kind::Email( + backend::user_identity::identity::Email { + email: email.clone(), + } + )), + }, + }) + .await + .unwrap(); + + let (sql_exists,) = sqlx::query_as::<_, (bool,)>( + "SELECT EXISTS (SELECT 1 FROM db_user_identity.emails WHERE email = $1)", + ) + .bind(&email) + .fetch_one(&ctx.crdb().await.unwrap()) + .await + .unwrap(); + assert!(sql_exists, "identity not created"); +} diff --git a/packages/services/user/tests/identity_delete.rs b/packages/services/user/tests/identity_delete.rs new file mode 100644 index 0000000000..fb60977b58 --- /dev/null +++ b/packages/services/user/tests/identity_delete.rs @@ -0,0 +1,41 @@ +use chirp_workflow::prelude::*; +use rivet_operation::prelude::proto::backend; + +#[workflow_test] +async fn empty(ctx: TestCtx) { + let user_res = op!([ctx] faker_user { + ..Default::default() + }) + .await + .unwrap(); + let user_id = user_res.user_id.unwrap().as_uuid(); + + let email = util::faker::email(); + ctx.op(::user::ops::identity::create::Input { + user_id: user_id, + identity: backend::user_identity::Identity { + kind: Some(backend::user_identity::identity::Kind::Email( + backend::user_identity::identity::Email { + email: email.clone() + } + )), + }, + }) + .await + .unwrap(); + + ctx.op(::user::ops::identity::delete::Input { + user_ids: vec![user_id], + }) + .await + .unwrap(); + + let (sql_exists,) = sqlx::query_as::<_, (bool,)>( + "SELECT EXISTS (SELECT 1 FROM db_user_identity.emails WHERE email = $1)", + ) + .bind(&email) + .fetch_one(&ctx.crdb().await.unwrap()) + .await + .unwrap(); + assert!(!sql_exists, "identity not deleted"); +} diff --git a/packages/services/user/tests/identity_get.rs b/packages/services/user/tests/identity_get.rs new file mode 100644 index 0000000000..cae9379a9c --- /dev/null +++ b/packages/services/user/tests/identity_get.rs @@ -0,0 +1,42 @@ +use chirp_workflow::prelude::*; +use rivet_operation::prelude::proto::backend; + +#[workflow_test] +async fn empty(ctx: TestCtx) { + let user_res = op!([ctx] faker_user { + ..Default::default() + }) + .await + .unwrap(); + let user_id = user_res.user_id.unwrap().as_uuid(); + + let email = util::faker::email(); + ctx.op(::user::ops::identity::create::Input { + user_id: user_id, + identity: backend::user_identity::Identity { + kind: Some(backend::user_identity::identity::Kind::Email( + backend::user_identity::identity::Email { + email: email.clone() + } + )), + }, + }) + .await + .unwrap(); + + let res = ctx.op(::user::ops::identity::get::Input { + user_ids: vec![user_id, Uuid::new_v4()], + }) + .await + .unwrap(); + assert_eq!(1, res.users.len()); + assert_eq!( + 1, + res.users + .iter() + .find(|u| u.user_id == user_id) + .unwrap() + .identities + .len() + ); +} diff --git a/packages/services/user/tests/pending_delete_toggle.rs b/packages/services/user/tests/pending_delete_toggle.rs new file mode 100644 index 0000000000..c992d388d0 --- /dev/null +++ b/packages/services/user/tests/pending_delete_toggle.rs @@ -0,0 +1,67 @@ +use chirp_workflow::prelude::*; +use rivet_operation::prelude::proto::backend; + +#[workflow_test] +async fn empty(ctx: TestCtx) { + let user_res = op!([ctx] faker_user {}).await.unwrap(); + let user_id = user_res.user_id.as_ref().unwrap().as_uuid(); + + // Register user + let email = util::faker::email(); + let _res = ctx.op(::user::ops::identity::create::Input { + user_id: user_id, + identity: backend::user_identity::Identity { + kind: Some(backend::user_identity::identity::Kind::Email( + backend::user_identity::identity::Email { + email: email.clone() + } + )), + }, + }) + .await + .unwrap(); + + ctx.op(::user::ops::pending_delete_toggle::Input { + user_id: user_id, + active: true, + }) + .await + .unwrap(); + + let (delete_request_ts,): (Option,) = sqlx::query_as(indoc!( + " + SELECT delete_request_ts + FROM db_user.users + WHERE + user_id = $1 + ", + )) + .bind(user_id) + .fetch_one(&ctx.crdb().await.unwrap()) + .await + .unwrap(); + + assert!(delete_request_ts.is_some()); + + ctx.op(::user::ops::pending_delete_toggle::Input { + user_id: user_id, + active: false, + }) + .await + .unwrap(); + + let (delete_request_ts,): (Option,) = sqlx::query_as(indoc!( + " + SELECT delete_request_ts + FROM db_user.users + WHERE + user_id = $1 + ", + )) + .bind(user_id) + .fetch_one(&ctx.crdb().await.unwrap()) + .await + .unwrap(); + + assert!(delete_request_ts.is_none()); +} diff --git a/packages/services/user/tests/profile_validate.rs b/packages/services/user/tests/profile_validate.rs new file mode 100644 index 0000000000..205039102d --- /dev/null +++ b/packages/services/user/tests/profile_validate.rs @@ -0,0 +1,18 @@ +use chirp_workflow::prelude::*; + +#[workflow_test] +async fn empty(ctx: TestCtx) { + let user_res = op!([ctx] faker_user {}).await.unwrap(); + let user_id = user_res.user_id.unwrap().as_uuid(); + + let res = ctx.op(::user::ops::profile_validate::Input { + user_id: user_id, + display_name: Some(" bad display name".to_owned()), + account_number: Some(10000), + bio: Some("bad\n\n\n\n\n\nbio".to_owned()) + }) + .await + .unwrap(); + + assert_eq!(res.errors.len(), 3, "validation failed"); +} diff --git a/packages/services/user/tests/resolve_email.rs b/packages/services/user/tests/resolve_email.rs new file mode 100644 index 0000000000..5a82efed89 --- /dev/null +++ b/packages/services/user/tests/resolve_email.rs @@ -0,0 +1,35 @@ +use chirp_workflow::prelude::*; +use rivet_operation::prelude::proto::backend; + +#[workflow_test] +async fn empty(ctx: TestCtx) { + let user_res = op!([ctx] faker_user { + ..Default::default() + }) + .await + .unwrap(); + let user_id = user_res.user_id.as_ref().unwrap().as_uuid(); + + let email = util::faker::email(); + ctx.op(::user::ops::identity::create::Input { + user_id: user_id, + identity: backend::user_identity::Identity { + kind: Some(backend::user_identity::identity::Kind::Email( + backend::user_identity::identity::Email { + email: email.clone() + } + )), + }, + }) + .await + .unwrap(); + + let res = ctx.op(::user::ops::resolve_email::Input { + emails: vec![email.clone(), util::faker::email()], + }) + .await + .unwrap(); + assert_eq!(1, res.users.len()); + let user = res.users.first().unwrap(); + assert_eq!(user_id, user.user_id); +} diff --git a/packages/services/user/tests/team_list.rs b/packages/services/user/tests/team_list.rs new file mode 100644 index 0000000000..00b18e682f --- /dev/null +++ b/packages/services/user/tests/team_list.rs @@ -0,0 +1,49 @@ +use std::collections::HashMap; + +use chirp_workflow::prelude::*; +use rivet_operation::prelude::proto; +use proto::backend::{pkg::*}; + +#[workflow_test] +async fn empty(ctx: TestCtx) { + let user_a = Uuid::new_v4(); + let user_b = Uuid::new_v4(); + let user_c = Uuid::new_v4(); + + let team_a = Uuid::new_v4(); + let team_b = Uuid::new_v4(); + let team_c = Uuid::new_v4(); + + let members = vec![ + (user_a, team_a), + (user_a, team_b), + (user_b, team_a), + (user_b, team_b), + (user_b, team_c), + ]; + for (user_id, team_id) in &members { + msg!([ctx] team::msg::member_create(team_id, user_id) -> team::msg::member_create_complete { + team_id: Some((*team_id).into()), + user_id: Some((*user_id).into()), + invitation: None, + }) + .await + .unwrap(); + } + + let res = ctx.op(::user::ops::team_list::Input { + user_ids: vec![user_a, user_b, user_c], + }) + .await + .unwrap(); + + assert_eq!(3, res.users.len()); + let users_map = res + .users + .iter() + .map(|u| (u.user_id, u.teams.len())) + .collect::>(); + assert_eq!(2, *users_map.get(&user_a).unwrap()); + assert_eq!(3, *users_map.get(&user_b).unwrap()); + assert_eq!(0, *users_map.get(&user_c).unwrap()); +} diff --git a/packages/services/user/tests/token_create.rs b/packages/services/user/tests/token_create.rs new file mode 100644 index 0000000000..f8bdb387a8 --- /dev/null +++ b/packages/services/user/tests/token_create.rs @@ -0,0 +1,18 @@ +use chirp_workflow::prelude::*; +use rivet_operation::prelude::proto::backend; + +#[workflow_test] +async fn empty(ctx: TestCtx) { + let user_res = op!([ctx] faker_user {}).await.unwrap(); + let user_id = user_res.user_id.as_ref().unwrap().as_uuid(); + + let res = ctx.op(::user::ops::token_create::Input { + user_id: user_id, + client: backend::net::ClientInfo::default() + }) + .await + .unwrap(); + + assert!(res.token.starts_with("usr")); + assert!(res.refresh_token.starts_with("usr_rf")); +} diff --git a/packages/services/user/worker/Cargo.toml b/packages/services/user/worker/Cargo.toml index 809a6c8254..d46c5d5b7c 100644 --- a/packages/services/user/worker/Cargo.toml +++ b/packages/services/user/worker/Cargo.toml @@ -8,22 +8,19 @@ edition.workspace = true [dependencies] rivet-convert.workspace = true chirp-client.workspace = true +chirp-workflow.workspace = true chirp-worker.workspace = true chrono = "0.4" lazy_static = "1.4.0" rivet-health-checks.workspace = true rivet-metrics.workspace = true rivet-runtime.workspace = true - game-get.workspace = true game-namespace-get.workspace = true team-get.workspace = true team-member-list.workspace = true upload-list-for-user.workspace = true -user-get.workspace = true -user-identity-delete.workspace = true -user-profile-validate.workspace = true -user-team-list.workspace = true +user.workspace = true rivet-config.workspace = true [dev-dependencies] diff --git a/packages/services/user/worker/src/workers/delete.rs b/packages/services/user/worker/src/workers/delete.rs index cac285c196..eec89d14ef 100644 --- a/packages/services/user/worker/src/workers/delete.rs +++ b/packages/services/user/worker/src/workers/delete.rs @@ -13,9 +13,12 @@ async fn worker(ctx: &OperationContext) -> GlobalRes // Delete user identities { - op!([ctx] user_identity_delete { - user_ids: vec![user_id.into()], - }) + chirp_workflow::compat::op( + &ctx, + ::user::ops::identity::delete::Input { + user_ids: vec![user_id] + } + ) .await?; } @@ -55,16 +58,19 @@ async fn worker(ctx: &OperationContext) -> GlobalRes { tracing::info!(?user_id, "removing teams"); - let user_teams_res = op!([ctx] user_team_list { - user_ids: vec![user_id.into()], - }) + let user_teams_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::team_list::Input { + user_ids: vec![user_id.into()], + }, + ) .await?; let user_teams = unwrap!(user_teams_res.users.first()); let teams_res = op!([ctx] team_get { team_ids: user_teams.teams .iter() - .map(|member| Ok(unwrap!(member.team_id))) + .map(|member| Ok(member.team_id.into())) .collect::>>()? }) .await?; diff --git a/packages/services/user/worker/src/workers/profile_set.rs b/packages/services/user/worker/src/workers/profile_set.rs index d2d64b0ced..14017206ca 100644 --- a/packages/services/user/worker/src/workers/profile_set.rs +++ b/packages/services/user/worker/src/workers/profile_set.rs @@ -29,12 +29,15 @@ async fn worker(ctx: &OperationContext) -> Glob ensure!(!query_components.is_empty()); // Validate profile - let validation_res = op!([ctx] user_profile_validate { - user_id: body.user_id, - display_name: display_name.clone(), - account_number: *account_number, - bio: bio.clone() - }) + let validation_res = chirp_workflow::compat::op( + &ctx, + ::user::ops::profile_validate::Input { + user_id: user_id.as_uuid(), + display_name: display_name.clone(), + account_number: *account_number, + bio: bio.clone() + }, + ) .await?; if !validation_res.errors.is_empty() { tracing::warn!(errors = ?validation_res.errors, "validation errors");