diff --git a/Cargo.lock b/Cargo.lock index d2297a1..ce7a3ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1220,20 +1220,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" -[[package]] -name = "combine" -version = "4.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" -dependencies = [ - "bytes", - "futures-core", - "memchr", - "pin-project-lite", - "tokio", - "tokio-util", -] - [[package]] name = "compact_jwt" version = "0.2.10" @@ -2465,8 +2451,6 @@ dependencies = [ "prometheus", "radix_fmt", "rand 0.8.5", - "redis 0.25.4", - "redlock", "regex", "reqwest 0.11.27", "roux", @@ -4756,60 +4740,6 @@ dependencies = [ "rand_core 0.3.1", ] -[[package]] -name = "redis" -version = "0.22.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa8455fa3621f6b41c514946de66ea0531f57ca017b2e6c7cc368035ea5b46df" -dependencies = [ - "async-trait", - "bytes", - "combine", - "futures-util", - "itoa 1.0.10", - "percent-encoding", - "pin-project-lite", - "ryu", - "sha1_smol", - "tokio", - "tokio-util", - "url", -] - -[[package]] -name = "redis" -version = "0.25.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" -dependencies = [ - "arc-swap", - "async-trait", - "bytes", - "combine", - "futures 0.3.30", - "futures-util", - "itoa 1.0.10", - "percent-encoding", - "pin-project-lite", - "ryu", - "sha1_smol", - "socket2", - "tokio", - "tokio-retry", - "tokio-util", - "url", -] - -[[package]] -name = "redlock" -version = "1.1.0" -source = "git+https://github.com/Syfaro/redlock-rs.git#ffbad68893194e11a43c6b0e1e6b82aaebd2b56f" -dependencies = [ - "rand 0.8.5", - "redis 0.22.3", - "tokio", -] - [[package]] name = "redox_syscall" version = "0.2.16" @@ -5717,12 +5647,6 @@ dependencies = [ "digest 0.10.7", ] -[[package]] -name = "sha1_smol" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" - [[package]] name = "sha2" version = "0.9.9" @@ -6362,17 +6286,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-retry" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" -dependencies = [ - "pin-project", - "rand 0.8.5", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.24.1" diff --git a/Cargo.toml b/Cargo.toml index 4830bf0..26d90ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,8 +54,6 @@ percent-encoding = "2.2" prometheus = { version = "0.13", features = ["process"] } radix_fmt = "1" rand = "0.8" -redis = { version = "0.25.4", features = ["tokio-comp", "aio", "connection-manager"] } -redlock = { git = "https://github.com/Syfaro/redlock-rs.git" } regex = "1" reqwest = { version = "0.11", features = ["json", "cookies"] } roux = "2" diff --git a/README.md b/README.md index 49479c3..71389bb 100644 --- a/README.md +++ b/README.md @@ -44,10 +44,9 @@ up and running. It has a few software dependencies to run: * PostgreSQL with the [bktree] extension as the primary backing datastore -* Redis to send events about user actions +* NATS for pubsub and distributing events * [Faktory] for managing background tasks * [faktory-cron] to run scheduled jobs (provided in jobs.yaml) -* NATS for publishing information on newly discovered public images It also requires credentials or app tokens for the following sites: @@ -60,7 +59,6 @@ It also requires credentials or app tokens for the following sites: * S3-like (endpoint, region, bucket, access, and secret key) * SMTP (host, username, and password) * Telegram (token and domain) -* Twitter (API token and secret) [bktree]: https://github.com/fake-name/pg-spgist_hamming [faktory]: https://github.com/contribsys/faktory diff --git a/migrations/20240601011418_loading_accounts.down.sql b/migrations/20240601011418_loading_accounts.down.sql new file mode 100644 index 0000000..d6c20a0 --- /dev/null +++ b/migrations/20240601011418_loading_accounts.down.sql @@ -0,0 +1 @@ +DROP TABLE linked_account_import; diff --git a/migrations/20240601011418_loading_accounts.up.sql b/migrations/20240601011418_loading_accounts.up.sql new file mode 100644 index 0000000..272dc9c --- /dev/null +++ b/migrations/20240601011418_loading_accounts.up.sql @@ -0,0 +1,10 @@ +CREATE TABLE linked_account_import ( + linked_account_id uuid PRIMARY KEY REFERENCES linked_account (id) ON DELETE CASCADE, + started_at timestamp with time zone NOT NULL DEFAULT current_timestamp, + completed_at timestamp with time zone, + expected_count integer NOT NULL, + expected_ids text[] NOT NULL, + loaded_ids text[] NOT NULL DEFAULT array[]::text[] +); + +CREATE INDEX linked_account_import_started_at_idx ON linked_account_import (started_at DESC); diff --git a/queries/linked_account/lookup_by_id_for_update.sql b/queries/linked_account/lookup_by_id_for_update.sql new file mode 100644 index 0000000..4b83c8c --- /dev/null +++ b/queries/linked_account/lookup_by_id_for_update.sql @@ -0,0 +1,7 @@ +SELECT + * +FROM + linked_account +WHERE + id = $1 +FOR UPDATE; diff --git a/queries/linked_account_import/admin_list.sql b/queries/linked_account_import/admin_list.sql new file mode 100644 index 0000000..05d5e98 --- /dev/null +++ b/queries/linked_account_import/admin_list.sql @@ -0,0 +1,14 @@ +SELECT + linked_account_id, + linked_account.source_site, + started_at, + completed_at, + expected_count, + cardinality(loaded_ids) loaded_count +FROM + linked_account_import + JOIN linked_account ON linked_account.id = linked_account_import.linked_account_id +ORDER BY + started_at DESC +LIMIT + 100; diff --git a/queries/linked_account_import/complete.sql b/queries/linked_account_import/complete.sql new file mode 100644 index 0000000..187d205 --- /dev/null +++ b/queries/linked_account_import/complete.sql @@ -0,0 +1,6 @@ +UPDATE + linked_account_import +SET + completed_at = current_timestamp +WHERE + linked_account_id = $1; diff --git a/queries/linked_account_import/loaded.sql b/queries/linked_account_import/loaded.sql new file mode 100644 index 0000000..9998f2e --- /dev/null +++ b/queries/linked_account_import/loaded.sql @@ -0,0 +1,9 @@ +UPDATE + linked_account_import +SET + loaded_ids = array_append(loaded_ids, $2) +WHERE + linked_account_id = $1 +RETURNING + expected_count, + cardinality(loaded_ids) loaded_count; diff --git a/queries/linked_account_import/start.sql b/queries/linked_account_import/start.sql new file mode 100644 index 0000000..86f7fcd --- /dev/null +++ b/queries/linked_account_import/start.sql @@ -0,0 +1,4 @@ +INSERT INTO + linked_account_import (linked_account_id, expected_count, expected_ids) +VALUES + ($1, $2, $3); diff --git a/sqlx-data.json b/sqlx-data.json index 4da8e56..62e2271 100644 --- a/sqlx-data.json +++ b/sqlx-data.json @@ -352,6 +352,18 @@ }, "query": "INSERT INTO\n user_setting (owner_id, setting, value)\nVALUES\n ($1, $2, $3) ON CONFLICT (owner_id, setting) DO\nUPDATE\nSET\n value = EXCLUDED.value;\n" }, + "0c81563c54ca51f345e0f101645e7d50979013e669c3159d05bf32271cf49f50": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [ + "Uuid" + ] + } + }, + "query": "UPDATE\n linked_account_import\nSET\n completed_at = current_timestamp\nWHERE\n linked_account_id = $1;\n" + }, "0e13aba704b352e16896a42ceb6983784d3331272b579036be40513afc5bb15d": { "describe": { "columns": [], @@ -2277,6 +2289,33 @@ }, "query": "INSERT INTO\n user_event (\n owner_id,\n related_to_media_item_id,\n message,\n event_name,\n data,\n created_at\n )\nVALUES\n ($1, $2, $3, $4, $5, $6) RETURNING id;\n" }, + "7bf78736eabe0813faffa0dd039a57921ccc52e3574c6199db0f294df1c8cb2d": { + "describe": { + "columns": [ + { + "name": "expected_count", + "ordinal": 0, + "type_info": "Int4" + }, + { + "name": "loaded_count", + "ordinal": 1, + "type_info": "Int4" + } + ], + "nullable": [ + false, + null + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + } + }, + "query": "UPDATE\n linked_account_import\nSET\n loaded_ids = array_append(loaded_ids, $2)\nWHERE\n linked_account_id = $1\nRETURNING\n expected_count,\n cardinality(loaded_ids) loaded_count;\n" + }, "7cc4d49aee17c2fe42d1cc6dea2daa468c9544f689b9cb994cf0ae3f735b5601": { "describe": { "columns": [], @@ -2504,6 +2543,54 @@ }, "query": "SELECT\n id \"id!\",\n owner_id \"owner_id!\",\n perceptual_hash,\n sha256_hash \"sha256_hash!: Sha256Hash\",\n last_modified \"last_modified!\",\n content_url,\n content_size,\n thumb_url,\n event_count \"event_count!\",\n last_event,\n accounts \"accounts: sqlx::types::Json>\"\nFROM\n owned_media_item_accounts\nWHERE\n owner_id = $1\n AND (\n $4::uuid IS NULL\n OR exists(\n SELECT\n 1\n FROM\n owned_media_item_account\n WHERE\n owned_media_item_account.owned_media_item_id = owned_media_item_accounts.id\n AND account_id = $4\n )\n )\nORDER BY\n last_event DESC NULLS LAST,\n last_modified DESC\nLIMIT\n $2 OFFSET ($3::integer * $2::integer);\n" }, + "84a7fbb33b578275eb259a0ae3667ff57805aebfa48e2bc3c8a8e5a029a0d253": { + "describe": { + "columns": [ + { + "name": "linked_account_id", + "ordinal": 0, + "type_info": "Uuid" + }, + { + "name": "source_site", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "started_at", + "ordinal": 2, + "type_info": "Timestamptz" + }, + { + "name": "completed_at", + "ordinal": 3, + "type_info": "Timestamptz" + }, + { + "name": "expected_count", + "ordinal": 4, + "type_info": "Int4" + }, + { + "name": "loaded_count", + "ordinal": 5, + "type_info": "Int4" + } + ], + "nullable": [ + false, + false, + false, + true, + false, + null + ], + "parameters": { + "Left": [] + } + }, + "query": "SELECT\n linked_account_id,\n linked_account.source_site,\n started_at,\n completed_at,\n expected_count,\n cardinality(loaded_ids) loaded_count\nFROM\n linked_account_import\n JOIN linked_account ON linked_account.id = linked_account_import.linked_account_id\nORDER BY\n started_at DESC\nLIMIT\n 100;\n" + }, "88bacfdae0ff1060188b606680fd60eac3e41ee8f11fc101f88885d6fd0cb420": { "describe": { "columns": [ @@ -2642,6 +2729,20 @@ }, "query": "INSERT INTO\n bluesky_image (\n repo,\n post_rkey,\n blob_cid,\n size,\n sha256,\n perceptual_hash\n )\nVALUES\n ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING;\n" }, + "8e9aeaeccff5b95e066c33392f85c22522109215ebdd2f5fefcc35b26121f795": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [ + "Uuid", + "Int4", + "TextArray" + ] + } + }, + "query": "INSERT INTO\n linked_account_import (linked_account_id, expected_count, expected_ids)\nVALUES\n ($1, $2, $3);\n" + }, "8ecca7bdf1355882cde2d9586363390532b68de510da9677f92fec09360ee037": { "describe": { "columns": [], @@ -3564,6 +3665,86 @@ }, "query": "UPDATE\n owned_media_item_account\nSET\n owned_media_item_id = $2\nWHERE\n owned_media_item_id = $1;\n" }, + "ddd2f183cb5f6d8da4ad7fc5cdb056427bb0dce4b177d14902e5a75f6e04a82f": { + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Uuid" + }, + { + "name": "owner_id", + "ordinal": 1, + "type_info": "Uuid" + }, + { + "name": "source_site", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "username", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "last_update", + "ordinal": 4, + "type_info": "Timestamptz" + }, + { + "name": "loading_state", + "ordinal": 5, + "type_info": "Jsonb" + }, + { + "name": "data", + "ordinal": 6, + "type_info": "Jsonb" + }, + { + "name": "disabled", + "ordinal": 7, + "type_info": "Bool" + }, + { + "name": "created_at", + "ordinal": 8, + "type_info": "Timestamptz" + }, + { + "name": "verification_key", + "ordinal": 9, + "type_info": "Text" + }, + { + "name": "verified_at", + "ordinal": 10, + "type_info": "Timestamptz" + } + ], + "nullable": [ + false, + false, + false, + false, + true, + true, + true, + false, + false, + true, + true + ], + "parameters": { + "Left": [ + "Uuid" + ] + } + }, + "query": "SELECT\n *\nFROM\n linked_account\nWHERE\n id = $1\nFOR UPDATE;\n" + }, "e12de042e0d40c9a18088e373341e3b8087bdce43c82317a274f4fec4e6c4e11": { "describe": { "columns": [ diff --git a/src/admin.rs b/src/admin.rs index 44682d9..5c1650d 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -18,6 +18,7 @@ pub fn service() -> Scope { web::scope("/admin").service(services![ admin_overview, admin_inject, + admin_imports, admin_sites_reddit, admin_sites_flist, inject_post, @@ -124,6 +125,32 @@ async fn admin_inject( Ok(HttpResponse::Ok().content_type("text/html").body(body)) } +#[derive(Template)] +#[template(path = "admin/imports.html")] +struct AdminImports { + imports: Vec, +} + +#[get("/imports", name = "admin_imports")] +async fn admin_imports( + request: actix_web::HttpRequest, + user: models::User, + conn: web::Data, +) -> Result { + if !user.is_admin { + return Err(actix_web::error::ErrorUnauthorized("Unauthorized").into()); + } + + let imports = models::LinkedAccountImport::admin_list(&conn).await?; + + let body = AdminImports { imports } + .wrap_admin(&request, &user) + .await + .render()?; + + Ok(HttpResponse::Ok().content_type("text/html").body(body)) +} + #[derive(Template)] #[template(path = "admin/sites/reddit.html")] struct AdminReddit { diff --git a/src/api.rs b/src/api.rs index eda09e9..8481b03 100644 --- a/src/api.rs +++ b/src/api.rs @@ -20,7 +20,7 @@ use uuid::Uuid; use crate::{ auth::FuzzySearchSessionToken, - common, + common::{self, NATS_PREFIX}, jobs::{JobInitiator, JobInitiatorExt, NewSubmissionJob}, models, Error, UrlUuid, }; @@ -104,16 +104,16 @@ impl StreamHandler> for UnauthorizedWsEve struct WsEventSession { user_id: Uuid, session_id: Uuid, - redis: redis::Client, + nats: async_nats::Client, hb: Instant, } impl WsEventSession { - fn new(user_id: Uuid, session_id: Uuid, redis: redis::Client) -> Self { + fn new(user_id: Uuid, session_id: Uuid, nats: async_nats::Client) -> Self { Self { user_id, session_id, - redis, + nats, hb: Instant::now(), } } @@ -133,12 +133,9 @@ impl WsEventSession { } fn keep_connected_to_events(&mut self) -> LocalBoxActorFuture { - Box::pin(self.attempt_redis_connection().then(|res, this, _| { + Box::pin(self.attempt_nats_connection().then(|res, this, _| { if let Err(err) = res { - tracing::warn!( - "could not connect to redis pubsub for user events: {:?}", - err - ); + tracing::warn!("could not connect to pubsub for user events: {:?}", err); futures::future::Either::Left( tokio::time::sleep(Duration::from_secs(1)) @@ -151,29 +148,26 @@ impl WsEventSession { })) } - fn attempt_redis_connection( - &mut self, - ) -> LocalBoxActorFuture> { - let redis = self.redis.clone(); + fn attempt_nats_connection(&mut self) -> LocalBoxActorFuture> { + let nats = self.nats.clone(); let user_id = self.user_id; - let (tx, rx) = futures::channel::mpsc::unbounded(); + let (mut tx, rx) = futures::channel::mpsc::channel::>(32); Box::pin( async move { - let mut pubsub = redis.get_async_pubsub().await?; - pubsub.subscribe(format!("user-events:{user_id}")).await?; - - Ok(pubsub) + let sub = nats + .subscribe(format!("{NATS_PREFIX}.user-events.{user_id}")) + .await + .map_err(Error::from_displayable)?; + Ok(sub) } .into_actor(self) - .map_ok(|mut pubsub, this, ctx| { + .map_ok(|mut sub, this, ctx| { ctx.spawn( async move { - let mut stream = pubsub.on_message(); - - while let Some(msg) = stream.next().await { - if let Err(err) = tx.unbounded_send(msg) { + while let Some(msg) = sub.next().await { + if let Err(err) = tx.try_send(msg.payload.to_vec()) { tracing::error!("could not send pubsub event: {:?}", err); break; } @@ -229,13 +223,12 @@ impl StreamHandler> for WsEventSession { } } -impl StreamHandler for WsEventSession { - fn handle(&mut self, item: redis::Msg, ctx: &mut Self::Context) { - tracing::debug!("got redis message for session"); +impl StreamHandler> for WsEventSession { + fn handle(&mut self, item: Vec, ctx: &mut Self::Context) { + tracing::debug!("got message for session"); - let payload = item.get_payload_bytes(); let event: EventMessage = - serde_json::from_slice(payload).expect("got invalid data from redis"); + serde_json::from_slice(&item).expect("got invalid data from user events"); ctx.notify(event); } @@ -268,7 +261,7 @@ async fn events( session: Session, req: HttpRequest, stream: web::Payload, - redis: web::Data, + nats: web::Data, ) -> Result { if let Some(user) = user { let session_token = session @@ -278,7 +271,7 @@ async fn events( let session = WsEventSession::new( user.id, session_token.session_id, - (*redis.into_inner()).clone(), + (*nats.into_inner()).clone(), ); ws::start(session, &req, stream).map_err(Into::into) @@ -372,7 +365,7 @@ async fn ingest_stats( #[post("/upload")] async fn upload( pool: web::Data, - redis: web::Data, + nats: web::Data, s3: web::Data, faktory: web::Data, config: web::Data, @@ -397,7 +390,7 @@ async fn upload( }; let ids = - common::handle_multipart_upload(&pool, &redis, &s3, &faktory, &config, &user, form).await?; + common::handle_multipart_upload(&pool, &nats, &s3, &faktory, &config, &user, form).await?; Ok(web::Json(ids)) } diff --git a/src/auth.rs b/src/auth.rs index f51df59..aba0ca7 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -351,10 +351,10 @@ async fn logout( request: actix_web::HttpRequest, session: Session, conn: web::Data, - redis: web::Data, + nats: web::Data, ) -> Result { if let Ok(Some(token)) = session.get_session_token() { - models::UserSession::destroy(&conn, &redis, token.session_id, token.user_id).await?; + models::UserSession::destroy(&conn, &nats, token.session_id, token.user_id).await?; } session.purge(); @@ -405,7 +405,7 @@ async fn sessions_remove( session: Session, user: models::User, conn: web::Data, - redis: web::Data, + nats: web::Data, form: web::Form, ) -> Result { let session_token = session @@ -416,7 +416,7 @@ async fn sessions_remove( return Err(Error::user_error("You cannot remove the current session.")); } - models::UserSession::destroy(&conn, &redis, form.session_id, user.id).await?; + models::UserSession::destroy(&conn, &nats, form.session_id, user.id).await?; Ok(HttpResponse::Found() .insert_header(( diff --git a/src/common.rs b/src/common.rs index 47a32ff..e4ea29c 100644 --- a/src/common.rs +++ b/src/common.rs @@ -18,6 +18,9 @@ use crate::{ /// Maximum permitted download size for an image. pub const MAX_DOWNLOAD_SIZE: usize = 50_000_000; +/// Prefix used for all NATS event subjects. +pub const NATS_PREFIX: &str = "fuzzysearch-owo"; + pub type SimilarAndPosted = (SimilarImage, Option>); /// Search a perceptual hash from all known data sources. @@ -194,7 +197,7 @@ async fn notify_found( let mut buffered = futures::stream::iter(owned_items.iter().map(|item| { models::UserEvent::similar_found( &ctx.conn, - &ctx.redis, + &ctx.nats, user.id, item.id, similar.clone(), @@ -490,7 +493,7 @@ async fn download_image(ctx: &JobContext, url: &str) -> Result( + user_id: Uuid, + nats: &async_nats::Client, + data: D, +) -> Result<(), Error> +where + D: serde::Serialize, +{ + let data = serde_json::to_vec(&data)?; + + let subject = format!("{NATS_PREFIX}.user-events.{user_id}"); + if let Err(err) = nats.publish(subject, data.into()).await { + tracing::error!("could not publish nats user event: {err}"); + } + + Ok(()) +} diff --git a/src/error.rs b/src/error.rs index 76370e9..9b5179a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -20,8 +20,6 @@ pub enum Error { Database(#[from] sqlx::Error), #[error("template error: {0}")] Template(#[from] askama::Error), - #[error("redis error: {0}")] - Redis(#[from] redis::RedisError), #[error("job error: {0}")] Job(#[from] foxlib::jobs::Error), #[error("actix error: {0}")] diff --git a/src/jobs.rs b/src/jobs.rs index d7da3fb..81b1128 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -7,7 +7,6 @@ use foxlib::jobs::{ use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use lettre::AsyncTransport; -use redis::AsyncCommands; use rusoto_s3::S3; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -509,8 +508,6 @@ impl Display for JobInitiator { pub struct JobContext { pub producer: FaktoryProducer, pub conn: sqlx::PgPool, - pub redis: redis::aio::ConnectionManager, - pub redlock: Arc, pub s3: rusoto_s3::S3Client, pub fuzzysearch: Arc, pub mailer: crate::Mailer, @@ -655,7 +652,7 @@ pub async fn start_job_processing(ctx: JobContext) -> Result<(), Error> { models::LinkedAccount::update_loading_state( &ctx.conn, - &ctx.redis, + &ctx.nats, user_id, account_id, models::LoadingState::DiscoveringItems, @@ -773,16 +770,15 @@ pub async fn start_job_processing(ctx: JobContext) -> Result<(), Error> { tracing::info!(account_was_verified, "checked verification"); - let mut redis = ctx.redis.clone(); - redis - .publish( - format!("user-events:{user_id}"), - serde_json::to_string(&api::EventMessage::AccountVerified { - account_id, - verified: account_was_verified, - })?, - ) - .await?; + common::send_user_event( + user_id, + &ctx.nats, + api::EventMessage::AccountVerified { + account_id, + verified: account_was_verified, + }, + ) + .await?; if account_was_verified { ctx.producer @@ -835,7 +831,7 @@ pub async fn start_job_processing(ctx: JobContext) -> Result<(), Error> { models::UserEvent::similar_found( &ctx.conn, - &ctx.redis, + &ctx.nats, user_id, media_id, similar_image, diff --git a/src/main.rs b/src/main.rs index 8854703..279a470 100644 --- a/src/main.rs +++ b/src/main.rs @@ -604,14 +604,6 @@ async fn main() { ); let s3 = rusoto_s3::S3Client::new_with(client, provider, region); - let redis_client = - redis::Client::open(config.redis_dsn.clone()).expect("could not create redis client"); - let redis_manager = redis::aio::ConnectionManager::new(redis_client.clone()) - .await - .expect("could not create redis connection manager"); - - let redlock = redlock::RedLock::new(vec![config.redis_dsn.as_ref()]); - let fuzzysearch = fuzzysearch::FuzzySearch::new_with_opts(fuzzysearch::FuzzySearchOpts { endpoint: Some(config.fuzzysearch_host.clone()), api_key: config.fuzzysearch_api_key.clone(), @@ -676,8 +668,6 @@ async fn main() { let ctx = jobs::JobContext { producer, conn: pool, - redis: redis_manager, - redlock: std::sync::Arc::new(redlock), s3, fuzzysearch: std::sync::Arc::new(fuzzysearch), mailer, @@ -748,8 +738,6 @@ async fn main() { .wrap(actix_web::middleware::Compress::default()) .app_data(web::Data::new(pool.clone())) .app_data(web::Data::new(s3.clone())) - .app_data(web::Data::new(redis_client.clone())) - .app_data(web::Data::new(redis_manager.clone())) .app_data(web::Data::new(config.clone())) .app_data(web::Data::new(telegram_login.clone())) .app_data(web::Data::new(producer.clone())) diff --git a/src/models.rs b/src/models.rs index 60846ff..cb4d5c8 100644 --- a/src/models.rs +++ b/src/models.rs @@ -7,7 +7,6 @@ use std::{ use argonautica::{Hasher, Verifier}; use futures::{StreamExt, TryStreamExt}; use image::GenericImageView; -use redis::AsyncCommands; use rusoto_s3::S3; use serde::{Deserialize, Serialize}; use serde_with::{DeserializeFromStr, SerializeDisplay}; @@ -15,8 +14,8 @@ use sqlx::types::Json; use uuid::Uuid; use webauthn_rs_proto::CredentialID; -use crate::site::SiteFromConfig; use crate::{api, site, Error}; +use crate::{common, site::SiteFromConfig}; pub struct User { pub id: Uuid, @@ -522,7 +521,7 @@ impl UserSession { pub async fn destroy( conn: &sqlx::PgPool, - redis: &redis::aio::ConnectionManager, + nats: &async_nats::Client, id: Uuid, user_id: Uuid, ) -> Result<(), Error> { @@ -530,13 +529,12 @@ impl UserSession { .execute(conn) .await?; - let mut redis = redis.clone(); - redis - .publish( - format!("user-events:{user_id}"), - serde_json::to_string(&api::EventMessage::SessionEnded { session_id: id })?, - ) - .await?; + common::send_user_event( + user_id, + nats, + api::EventMessage::SessionEnded { session_id: id }, + ) + .await?; Ok(()) } @@ -1211,6 +1209,30 @@ impl LinkedAccount { Ok(account) } + pub async fn lookup_by_id_for_update( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + id: Uuid, + ) -> Result, Error> { + let account = sqlx::query_file!("queries/linked_account/lookup_by_id_for_update.sql", id) + .map(|row| LinkedAccount { + id: row.id, + owner_id: row.owner_id, + source_site: row.source_site.parse().expect("unknown site in database"), + username: row.username, + last_update: row.last_update, + loading_state: row + .loading_state + .and_then(|loading_state| serde_json::from_value(loading_state).ok()), + data: row.data, + verification_key: row.verification_key, + verified_at: row.verified_at, + }) + .fetch_optional(tx) + .await?; + + Ok(account) + } + pub async fn lookup_by_site_id( conn: &sqlx::PgPool, user_id: Uuid, @@ -1289,7 +1311,7 @@ impl LinkedAccount { pub async fn update_loading_state( conn: &sqlx::PgPool, - redis: &redis::aio::ConnectionManager, + nats: &async_nats::Client, user_id: Uuid, account_id: Uuid, loading_state: LoadingState, @@ -1303,16 +1325,15 @@ impl LinkedAccount { .execute(conn) .await?; - let mut redis = redis.clone(); - redis - .publish( - format!("user-events:{user_id}"), - serde_json::to_string(&api::EventMessage::LoadingStateChange { - account_id, - loading_state: loading_state.message(), - })?, - ) - .await?; + common::send_user_event( + user_id, + nats, + api::EventMessage::LoadingStateChange { + account_id, + loading_state: loading_state.message(), + }, + ) + .await?; Ok(()) } @@ -1325,13 +1346,16 @@ impl LinkedAccount { Ok(()) } - pub async fn update_data( - conn: &sqlx::PgPool, + pub async fn update_data<'a, E>( + executor: E, account_id: Uuid, data: Option, - ) -> Result<(), Error> { + ) -> Result<(), Error> + where + E: sqlx::Executor<'a, Database = sqlx::Postgres>, + { sqlx::query_file!("queries/linked_account/update_data.sql", account_id, data) - .execute(conn) + .execute(executor) .await?; Ok(()) @@ -1404,6 +1428,70 @@ impl LinkedAccount { } } +#[derive(Debug)] +pub struct LinkedAccountImport { + pub linked_account_id: Uuid, + pub source_site: Site, + pub started_at: chrono::DateTime, + pub completed_at: Option>, + pub expected_count: i32, + pub loaded_count: i32, +} + +impl LinkedAccountImport { + pub async fn start( + conn: &sqlx::PgPool, + account_id: Uuid, + expected_ids: &[String], + ) -> Result<(), Error> { + sqlx::query_file!( + "queries/linked_account_import/start.sql", + account_id, + expected_ids.len() as i32, + expected_ids + ) + .execute(conn) + .await?; + + Ok(()) + } + + pub async fn loaded( + conn: &sqlx::PgPool, + account_id: Uuid, + id: &str, + ) -> Result<(i32, i32), Error> { + let row = sqlx::query_file!("queries/linked_account_import/loaded.sql", account_id, id) + .fetch_one(conn) + .await?; + + Ok((row.loaded_count.unwrap_or(0), row.expected_count)) + } + + pub async fn complete(conn: &sqlx::PgPool, account_id: Uuid) -> Result<(), Error> { + sqlx::query_file!("queries/linked_account_import/complete.sql", account_id) + .execute(conn) + .await?; + + Ok(()) + } + + pub async fn admin_list(conn: &sqlx::PgPool) -> Result, Error> { + sqlx::query_file!("queries/linked_account_import/admin_list.sql") + .map(|row| LinkedAccountImport { + linked_account_id: row.linked_account_id, + source_site: row.source_site.parse().expect("unknown site in database"), + started_at: row.started_at, + completed_at: row.completed_at, + expected_count: row.expected_count, + loaded_count: row.loaded_count.unwrap_or(0), + }) + .fetch_all(conn) + .await + .map_err(Into::into) + } +} + #[derive(Debug, Clone, Copy, DeserializeFromStr, SerializeDisplay, PartialEq, Eq, Hash)] pub enum Site { FurAffinity, @@ -1593,7 +1681,7 @@ pub struct EventAndRelatedMedia { impl UserEvent { pub async fn notify>( conn: &sqlx::PgPool, - redis: &redis::aio::ConnectionManager, + nats: &async_nats::Client, user_id: Uuid, message: M, ) -> Result { @@ -1606,23 +1694,22 @@ impl UserEvent { .fetch_one(conn) .await?; - let mut redis = redis.clone(); - redis - .publish( - format!("user-events:{user_id}"), - serde_json::to_string(&api::EventMessage::SimpleMessage { - id: notification_id, - message: message.as_ref().to_string(), - })?, - ) - .await?; + common::send_user_event( + user_id, + nats, + api::EventMessage::SimpleMessage { + id: notification_id, + message: message.as_ref().to_string(), + }, + ) + .await?; Ok(notification_id) } pub async fn similar_found( conn: &sqlx::PgPool, - redis: &redis::aio::ConnectionManager, + nats: &async_nats::Client, user_id: Uuid, media_id: Uuid, similar_image: SimilarImage, @@ -1644,13 +1731,12 @@ impl UserEvent { .await?; if created_at.is_none() { - let mut redis = redis.clone(); - redis - .publish( - format!("user-events:{user_id}"), - serde_json::to_string(&api::EventMessage::SimilarImage { media_id, link })?, - ) - .await?; + common::send_user_event( + user_id, + nats, + api::EventMessage::SimilarImage { media_id, link }, + ) + .await?; } Ok(notification_id) diff --git a/src/site/bsky.rs b/src/site/bsky.rs index fc564c5..97e9328 100644 --- a/src/site/bsky.rs +++ b/src/site/bsky.rs @@ -7,7 +7,6 @@ use askama::Template; use async_trait::async_trait; use foxlib::jobs::{FaktoryForge, FaktoryJob, FaktoryProducer, Job, JobExtra}; use futures::TryStreamExt; -use redis::AsyncCommands; use serde::{Deserialize, Serialize}; use sha2::Digest; use url::Url; @@ -15,6 +14,7 @@ use uuid::Uuid; use crate::{ api::ResolvedDidResult, + common, jobs::{ self, JobContext, JobInitiator, JobInitiatorExt, NatsNewImage, NewSubmissionJob, Queue, SearchExistingSubmissionsJob, @@ -186,8 +186,7 @@ async fn import_submission( } } - let mut redis = ctx.redis.clone(); - super::update_import_progress(&ctx.conn, &mut redis, user_id, account_id, post.uri).await?; + super::update_import_progress(&ctx.conn, &ctx.nats, user_id, account_id, post.uri).await?; Ok(()) } @@ -267,9 +266,7 @@ impl CollectedSite for BSky { tracing::info!("discovered {} submissions", posts.len()); let ids = posts.iter().map(|post| &post.uri); - let mut redis = ctx.redis.clone(); - - super::set_loading_submissions(&ctx.conn, &mut redis, account.owner_id, account.id, ids) + super::set_loading_submissions(&ctx.conn, &ctx.nats, account.owner_id, account.id, ids) .await?; super::queue_new_submissions( @@ -365,13 +362,12 @@ async fn resolve_did_impl( .await .map_err(|message| ResolvedDidResult::Error { message }); - let mut redis = ctx.redis.clone(); - redis - .publish( - format!("user-events:{user_id}"), - serde_json::to_string(&crate::api::EventMessage::ResolvedDid { did, result: event })?, - ) - .await?; + common::send_user_event( + user_id, + &ctx.nats, + crate::api::EventMessage::ResolvedDid { did, result: event }, + ) + .await?; Ok(()) } @@ -915,7 +911,7 @@ async fn auth_verify( session: None, })?; - models::LinkedAccount::update_data(&conn, account.id, Some(data.clone())).await?; + models::LinkedAccount::update_data(&**conn, account.id, Some(data.clone())).await?; account.data = Some(data); account } diff --git a/src/site/deviantart.rs b/src/site/deviantart.rs index d2dca6d..a23f06e 100644 --- a/src/site/deviantart.rs +++ b/src/site/deviantart.rs @@ -63,12 +63,11 @@ impl DeviantArt { async fn refresh_credentials( &self, conn: &sqlx::PgPool, - redlock: &redlock::RedLock, data: &types::DeviantArtData, account_id: Uuid, ) -> Result { super::refresh_credentials( - redlock, + conn, account_id, data, || self.get_oauth_client(), @@ -79,26 +78,26 @@ impl DeviantArt { item.expires_after, )) }, - || async { - let account = models::LinkedAccount::lookup_by_id(conn, account_id) + |mut tx| async move { + let account = models::LinkedAccount::lookup_by_id_for_update(&mut tx, account_id) .await? .ok_or(Error::Missing)?; let data = account.data.ok_or(Error::Missing)?; let data: types::DeviantArtData = serde_json::from_value(data)?; - Ok(data) + Ok((data, tx)) }, - |_data, access_token, refresh_token, expires_after| async move { + |mut tx, _data, access_token, refresh_token, expires_after| async move { let data = serde_json::to_value(types::DeviantArtData { access_token, refresh_token, expires_after, })?; - models::LinkedAccount::update_data(conn, account_id, Some(data)).await?; + models::LinkedAccount::update_data(&mut tx, account_id, Some(data)).await?; - Ok(()) + Ok(tx) }, ) .await @@ -136,7 +135,7 @@ impl CollectedSite for DeviantArt { ) -> Result<(), Error> { models::LinkedAccount::update_loading_state( &ctx.conn, - &ctx.redis, + &ctx.nats, account.owner_id, account.id, models::LoadingState::DiscoveringItems, @@ -150,7 +149,7 @@ impl CollectedSite for DeviantArt { )?; let token = self - .refresh_credentials(&ctx.conn, &ctx.redlock, &data, account.id) + .refresh_credentials(&ctx.conn, &data, account.id) .await?; let client = super::get_authenticated_client(&ctx.config, &token)?; @@ -159,11 +158,9 @@ impl CollectedSite for DeviantArt { let known = subs.len() as i32; tracing::info!("discovered {} submissions", known); - let mut redis = ctx.redis.clone(); - let ids = subs.iter().map(|sub| sub.deviationid); - super::set_loading_submissions(&ctx.conn, &mut redis, account.owner_id, account.id, ids) + super::set_loading_submissions(&ctx.conn, &ctx.nats, account.owner_id, account.id, ids) .await?; super::queue_new_submissions( @@ -285,10 +282,9 @@ async fn add_submission_deviantart( tracing::info!("submission had no content"); if was_import { - let mut redis = ctx.redis.clone(); super::update_import_progress( &ctx.conn, - &mut redis, + &ctx.nats, user_id, account_id, sub.deviationid, @@ -351,8 +347,7 @@ async fn add_submission_deviantart( } if was_import { - let mut redis = ctx.redis.clone(); - super::update_import_progress(&ctx.conn, &mut redis, user_id, account_id, sub.deviationid) + super::update_import_progress(&ctx.conn, &ctx.nats, user_id, account_id, sub.deviationid) .await?; } @@ -384,9 +379,7 @@ async fn update_account(ctx: JobContext, _job: FaktoryJob, account_id: Uuid) -> let da = DeviantArt::site_from_config(&ctx.config).await?; - let token = da - .refresh_credentials(&ctx.conn, &ctx.redlock, &data, account.id) - .await?; + let token = da.refresh_credentials(&ctx.conn, &data, account.id).await?; let client = super::get_authenticated_client(&ctx.config, &token)?; let subs = collect_gallery_items(&client, &account.username).await?; @@ -502,7 +495,7 @@ async fn callback( let id = match account { Some(account) => { tracing::info!("got existing account"); - models::LinkedAccount::update_data(&conn, account.id, Some(saved_data)).await?; + models::LinkedAccount::update_data(&**conn, account.id, Some(saved_data)).await?; account.id } diff --git a/src/site/furaffinity.rs b/src/site/furaffinity.rs index 12a395f..96f373d 100644 --- a/src/site/furaffinity.rs +++ b/src/site/furaffinity.rs @@ -123,7 +123,7 @@ impl CollectedSite for FurAffinity { async fn add_account(&self, ctx: &JobContext, account: LinkedAccount) -> Result<(), Error> { models::LinkedAccount::update_loading_state( &ctx.conn, - &ctx.redis, + &ctx.nats, account.owner_id, account.id, models::LoadingState::DiscoveringItems, @@ -135,11 +135,9 @@ impl CollectedSite for FurAffinity { let known = ids.len() as i32; tracing::info!("discovered {} submissions", known); - let mut redis = ctx.redis.clone(); - super::set_loading_submissions( &ctx.conn, - &mut redis, + &ctx.nats, account.owner_id, account.id, ids.iter(), @@ -222,8 +220,7 @@ async fn add_submission_furaffinity( } if was_import { - let mut redis = ctx.redis.clone(); - super::update_import_progress(&ctx.conn, &mut redis, user_id, account_id, sub_id).await?; + super::update_import_progress(&ctx.conn, &ctx.nats, user_id, account_id, sub_id).await?; } Ok(()) diff --git a/src/site/mod.rs b/src/site/mod.rs index cba4de2..df996a6 100644 --- a/src/site/mod.rs +++ b/src/site/mod.rs @@ -1,14 +1,12 @@ -use std::collections::HashSet; use std::future::Future; use async_trait::async_trait; use foxlib::jobs::{FaktoryForge, FaktoryProducer, Job}; use oauth2::{AccessToken, RefreshToken, TokenResponse}; -use redis::AsyncCommands; use uuid::Uuid; use crate::jobs::JobInitiatorExt; -use crate::Error; +use crate::{common, Error}; use crate::{jobs, models}; mod bsky; @@ -140,7 +138,7 @@ fn get_authenticated_client( /// to the user. async fn set_loading_submissions( conn: &sqlx::PgPool, - redis: &mut redis::aio::ConnectionManager, + nats: &async_nats::Client, user_id: Uuid, account_id: Uuid, ids: I, @@ -149,7 +147,7 @@ where S: ToString, I: Iterator, { - let ids: HashSet = ids.into_iter().map(|item| item.to_string()).collect(); + let ids: Vec = ids.into_iter().map(|item| item.to_string()).collect(); let len = ids.len(); if len == 0 { @@ -157,7 +155,7 @@ where models::LinkedAccount::update_loading_state( conn, - redis, + nats, user_id, account_id, models::LoadingState::Complete, @@ -166,13 +164,11 @@ where } else { tracing::info!("user has {} submissions to load", len); - let key = format!("account-import-ids:loading:{account_id}"); - redis.sadd::<_, _, ()>(&key, ids).await?; - redis.expire::<_, ()>(key, 60 * 60 * 24 * 7).await?; + models::LinkedAccountImport::start(conn, account_id, &ids).await?; models::LinkedAccount::update_loading_state( conn, - redis, + nats, user_id, account_id, models::LoadingState::LoadingItems { known: len as i32 }, @@ -209,45 +205,24 @@ where async fn update_import_progress( conn: &sqlx::PgPool, - redis: &mut redis::aio::ConnectionManager, + nats: &async_nats::Client, user_id: Uuid, account_id: Uuid, site_id: S, ) -> Result<(), Error> { - let loading_key = format!("account-import-ids:loading:{account_id}"); - let completed_key = format!("account-import-ids:completed:{account_id}"); + let (loaded, expected) = + models::LinkedAccountImport::loaded(conn, account_id, &site_id.to_string()).await?; - let site_id = site_id.to_string(); + tracing::debug!("submission was part of import, loaded {loaded} out of {expected} items"); - redis - .smove::<_, _, _, ()>(&loading_key, &completed_key, &site_id) - .await?; - - redis - .expire::<_, ()>(&loading_key, 60 * 60 * 24 * 7) - .await?; - redis - .expire::<_, ()>(&completed_key, 60 * 60 * 24 * 7) - .await?; - - let (remaining, completed): (i32, i32) = redis::pipe() - .atomic() - .scard(loading_key) - .scard(completed_key) - .query_async(redis) - .await?; - - tracing::debug!( - "submission was part of import, {} items remaining", - remaining - ); - - if remaining == 0 { + if expected - loaded == 0 { tracing::info!("marking account import complete"); + models::LinkedAccountImport::complete(conn, account_id).await?; + models::LinkedAccount::update_loading_state( conn, - redis, + nats, user_id, account_id, models::LoadingState::Complete, @@ -255,23 +230,25 @@ async fn update_import_progress( .await?; } - redis - .publish( - format!("user-events:{user_id}"), - serde_json::to_string(&crate::api::EventMessage::LoadingProgress { - account_id, - loaded: completed, - total: remaining + completed, - })?, - ) - .await?; + common::send_user_event( + user_id, + nats, + crate::api::EventMessage::LoadingProgress { + account_id, + loaded, + total: expected, + }, + ) + .await?; Ok(()) } -#[tracing::instrument(skip(redlock, data, client_fn, extract_fn, data_fn, update_fn))] +type Tx = sqlx::Transaction<'static, sqlx::Postgres>; + +#[tracing::instrument(skip(data, client_fn, extract_fn, data_fn, update_fn))] async fn refresh_credentials( - redlock: &redlock::RedLock, + conn: &sqlx::PgPool, account_id: Uuid, data: &Item, client_fn: C, @@ -282,10 +259,10 @@ async fn refresh_credentials( where C: FnOnce() -> oauth2::basic::BasicClient, E: Fn(&Item) -> Option<(String, String, chrono::DateTime)>, - D: FnOnce() -> DFut, - DFut: Future>, - U: FnOnce(Item, String, String, chrono::DateTime) -> UFut, - UFut: Future>, + D: FnOnce(Tx) -> DFut, + DFut: Future>, + U: FnOnce(Tx, Item, String, String, chrono::DateTime) -> UFut, + UFut: Future>, { tracing::debug!("checking oauth credentials"); @@ -297,22 +274,14 @@ where return Ok(AccessToken::new(initial_access_token)); } - let lock_key = format!("refresh-credentials:{account_id}"); - let lock_key = lock_key.as_bytes(); - let lock = loop { - if let Some(lock) = redlock.lock(lock_key, 10 * 1000).await { - tracing::trace!("locked credentials for update"); - break lock; - } - }; + let tx = conn.begin().await?; - let current_data = data_fn().await?; + let (current_data, tx) = data_fn(tx).await?; let (current_access_token, current_refresh_token, current_expires_at) = extract_fn(¤t_data).ok_or(Error::Missing)?; if current_expires_at >= chrono::Utc::now() { tracing::debug!("credentials were already updated"); - redlock.unlock(&lock).await; return Ok(AccessToken::new(current_access_token)); } @@ -340,8 +309,9 @@ where .and_then(|dur| chrono::Duration::from_std(dur).ok()) .unwrap_or_else(|| chrono::Duration::try_seconds(3600).unwrap()); - update_fn(current_data, access_token, refresh_token, expires_at).await?; - redlock.unlock(&lock).await; + let tx = update_fn(tx, current_data, access_token, refresh_token, expires_at).await?; + + tx.commit().await?; tracing::info!("credential refresh complete"); diff --git a/src/site/patreon.rs b/src/site/patreon.rs index 95fb730..5e6a314 100644 --- a/src/site/patreon.rs +++ b/src/site/patreon.rs @@ -59,12 +59,11 @@ impl Patreon { async fn refresh_credentials( &self, conn: &sqlx::PgPool, - redlock: &redlock::RedLock, data: &types::SavedPatreonData, account_id: Uuid, ) -> Result { super::refresh_credentials( - redlock, + conn, account_id, data, || self.get_oauth_client(), @@ -75,17 +74,17 @@ impl Patreon { item.credentials.expires_after, )) }, - || async { - let account = models::LinkedAccount::lookup_by_id(conn, account_id) + |mut tx| async move { + let account = models::LinkedAccount::lookup_by_id_for_update(&mut tx, account_id) .await? .ok_or(Error::Missing)?; let data = account.data.ok_or(Error::Missing)?; let data: types::SavedPatreonData = serde_json::from_value(data)?; - Ok(data) + Ok((data, tx)) }, - |data, access_token, refresh_token, expires_after| async move { + |mut tx, data, access_token, refresh_token, expires_after| async move { let data = serde_json::to_value(types::SavedPatreonData { credentials: types::PatreonCredentials { access_token, @@ -95,9 +94,9 @@ impl Patreon { ..data })?; - models::LinkedAccount::update_data(conn, account_id, Some(data)).await?; + models::LinkedAccount::update_data(&mut tx, account_id, Some(data)).await?; - Ok(()) + Ok(tx) }, ) .await @@ -127,7 +126,7 @@ impl CollectedSite for Patreon { async fn add_account(&self, ctx: &JobContext, account: LinkedAccount) -> Result<(), Error> { models::LinkedAccount::update_loading_state( &ctx.conn, - &ctx.redis, + &ctx.nats, account.owner_id, account.id, models::LoadingState::DiscoveringItems, @@ -138,7 +137,7 @@ impl CollectedSite for Patreon { serde_json::from_value(account.data.ok_or(Error::Missing)?)?; let token = self - .refresh_credentials(&ctx.conn, &ctx.redlock, &data, account.id) + .refresh_credentials(&ctx.conn, &data, account.id) .await?; let client = super::get_authenticated_client(&ctx.config, &token)?; @@ -163,7 +162,7 @@ impl CollectedSite for Patreon { models::LinkedAccount::update_loading_state( &ctx.conn, - &ctx.redis, + &ctx.nats, account.owner_id, account.id, models::LoadingState::Complete, @@ -388,7 +387,7 @@ async fn callback( tracing::warn!("database had outdated webhook information"); models::LinkedAccount::update_data( - &conn, + &**conn, linked_account.id, Some(serde_json::to_value(types::SavedPatreonData { site_id: patreon_campaign_id.to_string(), diff --git a/src/site/twitter.rs b/src/site/twitter.rs index 18d641a..0e2c37e 100644 --- a/src/site/twitter.rs +++ b/src/site/twitter.rs @@ -58,7 +58,7 @@ impl CollectedSite for Twitter { ) -> Result<(), Error> { models::LinkedAccount::update_loading_state( &ctx.conn, - &ctx.redis, + &ctx.nats, account.owner_id, account.id, models::LoadingState::DiscoveringItems, @@ -147,11 +147,9 @@ impl CollectedSite for Twitter { models::LinkedAccount::update_data(&ctx.conn, account.id, Some(data)).await?; - let mut redis = ctx.redis.clone(); - super::set_loading_submissions( &ctx.conn, - &mut redis, + &ctx.nats, account.owner_id, account.id, tweets.iter().map(|tweet| tweet.id.clone()), @@ -298,7 +296,7 @@ async fn callback( Some(Err(err)) => return Err(err.into()), None => saved_data, }; - models::LinkedAccount::update_data(&conn, account.id, Some(saved_data)).await?; + models::LinkedAccount::update_data(&**conn, account.id, Some(saved_data)).await?; account.id } @@ -345,7 +343,7 @@ struct TwitterArchiveForm { #[post("/archive")] async fn archive_post( conn: web::Data, - redis: web::Data, + nats: web::Data, faktory: web::Data, request: actix_web::HttpRequest, session: actix_session::Session, @@ -385,10 +383,10 @@ async fn archive_post( ..data })?; - models::LinkedAccount::update_data(&conn, account.id, Some(data)).await?; + models::LinkedAccount::update_data(&**conn, account.id, Some(data)).await?; models::LinkedAccount::update_loading_state( &conn, - &redis, + &nats, user.id, account.id, models::LoadingState::Custom { @@ -523,8 +521,7 @@ async fn add_submission_twitter( } if was_import { - let mut redis = ctx.redis.clone(); - super::update_import_progress(&ctx.conn, &mut redis, user_id, account_id, tweet.id).await?; + super::update_import_progress(&ctx.conn, &ctx.nats, user_id, account_id, tweet.id).await?; } Ok(()) @@ -902,7 +899,7 @@ async fn load_archive( models::LinkedAccount::update_loading_state( &ctx.conn, - &ctx.redis, + &ctx.nats, user_id, account_id, models::LoadingState::Complete, diff --git a/src/site/weasyl.rs b/src/site/weasyl.rs index df15b2e..258163c 100644 --- a/src/site/weasyl.rs +++ b/src/site/weasyl.rs @@ -116,7 +116,7 @@ impl CollectedSite for Weasyl { async fn add_account(&self, ctx: &JobContext, account: LinkedAccount) -> Result<(), Error> { models::LinkedAccount::update_loading_state( &ctx.conn, - &ctx.redis, + &ctx.nats, account.owner_id, account.id, models::LoadingState::DiscoveringItems, @@ -128,11 +128,9 @@ impl CollectedSite for Weasyl { let known = subs.len() as i32; tracing::info!("discovered {} submissions", known); - let mut redis = ctx.redis.clone(); - super::set_loading_submissions( &ctx.conn, - &mut redis, + &ctx.nats, account.owner_id, account.id, subs.iter().map(|sub| sub.id), @@ -210,8 +208,7 @@ async fn add_submission_weasyl( } if was_import { - let mut redis = ctx.redis.clone(); - super::update_import_progress(&ctx.conn, &mut redis, user_id, account_id, sub_id).await?; + super::update_import_progress(&ctx.conn, &ctx.nats, user_id, account_id, sub_id).await?; } Ok(()) diff --git a/src/user.rs b/src/user.rs index 6698396..d277a86 100644 --- a/src/user.rs +++ b/src/user.rs @@ -293,7 +293,7 @@ async fn events( #[allow(clippy::too_many_arguments)] async fn single( conn: web::Data, - redis: web::Data, + nats: web::Data, s3: web::Data, faktory: web::Data, config: web::Data, @@ -303,7 +303,7 @@ async fn single( form: actix_multipart::Multipart, ) -> Result { let _ids = - common::handle_multipart_upload(&conn, &redis, &s3, &faktory, &config, &user, form).await?; + common::handle_multipart_upload(&conn, &nats, &s3, &faktory, &config, &user, form).await?; session.add_flash(FlashStyle::Success, "Uploaded image."); diff --git a/templates/admin/_sidebar.html b/templates/admin/_sidebar.html index ad1f3b7..584a2ae 100644 --- a/templates/admin/_sidebar.html +++ b/templates/admin/_sidebar.html @@ -15,6 +15,7 @@