diff --git a/Cargo.lock b/Cargo.lock index 2fbdb7d..ce7a3ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1220,20 +1220,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" -[[package]] -name = "combine" -version = "4.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" -dependencies = [ - "bytes", - "futures-core", - "memchr", - "pin-project-lite", - "tokio", - "tokio-util", -] - [[package]] name = "compact_jwt" version = "0.2.10" @@ -2465,7 +2451,6 @@ dependencies = [ "prometheus", "radix_fmt", "rand 0.8.5", - "redis", "regex", "reqwest 0.11.27", "roux", @@ -4755,30 +4740,6 @@ dependencies = [ "rand_core 0.3.1", ] -[[package]] -name = "redis" -version = "0.25.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" -dependencies = [ - "arc-swap", - "async-trait", - "bytes", - "combine", - "futures 0.3.30", - "futures-util", - "itoa 1.0.10", - "percent-encoding", - "pin-project-lite", - "ryu", - "sha1_smol", - "socket2", - "tokio", - "tokio-retry", - "tokio-util", - "url", -] - [[package]] name = "redox_syscall" version = "0.2.16" @@ -5686,12 +5647,6 @@ dependencies = [ "digest 0.10.7", ] -[[package]] -name = "sha1_smol" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" - [[package]] name = "sha2" version = "0.9.9" @@ -6331,17 +6286,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-retry" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" -dependencies = [ - "pin-project", - "rand 0.8.5", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.24.1" diff --git a/Cargo.toml b/Cargo.toml index 2156c5a..26d90ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,6 @@ percent-encoding = "2.2" prometheus = { version = "0.13", features = ["process"] } radix_fmt = "1" rand = "0.8" -redis = { version = "0.25.4", features = ["tokio-comp", "aio", "connection-manager"] } regex = "1" reqwest = { version = "0.11", features = ["json", "cookies"] } roux = "2" diff --git a/README.md b/README.md index 49479c3..71389bb 100644 --- a/README.md +++ b/README.md @@ -44,10 +44,9 @@ up and running. It has a few software dependencies to run: * PostgreSQL with the [bktree] extension as the primary backing datastore -* Redis to send events about user actions +* NATS for pubsub and distributing events * [Faktory] for managing background tasks * [faktory-cron] to run scheduled jobs (provided in jobs.yaml) -* NATS for publishing information on newly discovered public images It also requires credentials or app tokens for the following sites: @@ -60,7 +59,6 @@ It also requires credentials or app tokens for the following sites: * S3-like (endpoint, region, bucket, access, and secret key) * SMTP (host, username, and password) * Telegram (token and domain) -* Twitter (API token and secret) [bktree]: https://github.com/fake-name/pg-spgist_hamming [faktory]: https://github.com/contribsys/faktory diff --git a/migrations/20240601011418_loading_accounts.down.sql b/migrations/20240601011418_loading_accounts.down.sql new file mode 100644 index 0000000..d6c20a0 --- /dev/null +++ b/migrations/20240601011418_loading_accounts.down.sql @@ -0,0 +1 @@ +DROP TABLE linked_account_import; diff --git a/migrations/20240601011418_loading_accounts.up.sql b/migrations/20240601011418_loading_accounts.up.sql new file mode 100644 index 0000000..7a9d909 --- /dev/null +++ b/migrations/20240601011418_loading_accounts.up.sql @@ -0,0 +1,8 @@ +CREATE TABLE linked_account_import ( + linked_account_id uuid PRIMARY KEY REFERENCES linked_account (id) ON DELETE CASCADE, + started_at timestamp with time zone NOT NULL DEFAULT current_timestamp, + completed_at timestamp with time zone, + expected_count integer NOT NULL, + expected_ids text[] NOT NULL, + loaded_ids text[] NOT NULL DEFAULT array[]::text[] +); diff --git a/queries/linked_account_import/complete.sql b/queries/linked_account_import/complete.sql new file mode 100644 index 0000000..187d205 --- /dev/null +++ b/queries/linked_account_import/complete.sql @@ -0,0 +1,6 @@ +UPDATE + linked_account_import +SET + completed_at = current_timestamp +WHERE + linked_account_id = $1; diff --git a/queries/linked_account_import/loaded.sql b/queries/linked_account_import/loaded.sql new file mode 100644 index 0000000..9998f2e --- /dev/null +++ b/queries/linked_account_import/loaded.sql @@ -0,0 +1,9 @@ +UPDATE + linked_account_import +SET + loaded_ids = array_append(loaded_ids, $2) +WHERE + linked_account_id = $1 +RETURNING + expected_count, + cardinality(loaded_ids) loaded_count; diff --git a/queries/linked_account_import/start.sql b/queries/linked_account_import/start.sql new file mode 100644 index 0000000..86f7fcd --- /dev/null +++ b/queries/linked_account_import/start.sql @@ -0,0 +1,4 @@ +INSERT INTO + linked_account_import (linked_account_id, expected_count, expected_ids) +VALUES + ($1, $2, $3); diff --git a/sqlx-data.json b/sqlx-data.json index 4da8e56..8971625 100644 --- a/sqlx-data.json +++ b/sqlx-data.json @@ -352,6 +352,18 @@ }, "query": "INSERT INTO\n user_setting (owner_id, setting, value)\nVALUES\n ($1, $2, $3) ON CONFLICT (owner_id, setting) DO\nUPDATE\nSET\n value = EXCLUDED.value;\n" }, + "0c81563c54ca51f345e0f101645e7d50979013e669c3159d05bf32271cf49f50": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [ + "Uuid" + ] + } + }, + "query": "UPDATE\n linked_account_import\nSET\n completed_at = current_timestamp\nWHERE\n linked_account_id = $1;\n" + }, "0e13aba704b352e16896a42ceb6983784d3331272b579036be40513afc5bb15d": { "describe": { "columns": [], @@ -2277,6 +2289,33 @@ }, "query": "INSERT INTO\n user_event (\n owner_id,\n related_to_media_item_id,\n message,\n event_name,\n data,\n created_at\n )\nVALUES\n ($1, $2, $3, $4, $5, $6) RETURNING id;\n" }, + "7bf78736eabe0813faffa0dd039a57921ccc52e3574c6199db0f294df1c8cb2d": { + "describe": { + "columns": [ + { + "name": "expected_count", + "ordinal": 0, + "type_info": "Int4" + }, + { + "name": "loaded_count", + "ordinal": 1, + "type_info": "Int4" + } + ], + "nullable": [ + false, + null + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + } + }, + "query": "UPDATE\n linked_account_import\nSET\n loaded_ids = array_append(loaded_ids, $2)\nWHERE\n linked_account_id = $1\nRETURNING\n expected_count,\n cardinality(loaded_ids) loaded_count;\n" + }, "7cc4d49aee17c2fe42d1cc6dea2daa468c9544f689b9cb994cf0ae3f735b5601": { "describe": { "columns": [], @@ -2642,6 +2681,20 @@ }, "query": "INSERT INTO\n bluesky_image (\n repo,\n post_rkey,\n blob_cid,\n size,\n sha256,\n perceptual_hash\n )\nVALUES\n ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING;\n" }, + "8e9aeaeccff5b95e066c33392f85c22522109215ebdd2f5fefcc35b26121f795": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [ + "Uuid", + "Int4", + "TextArray" + ] + } + }, + "query": "INSERT INTO\n linked_account_import (linked_account_id, expected_count, expected_ids)\nVALUES\n ($1, $2, $3);\n" + }, "8ecca7bdf1355882cde2d9586363390532b68de510da9677f92fec09360ee037": { "describe": { "columns": [], @@ -3564,6 +3617,86 @@ }, "query": "UPDATE\n owned_media_item_account\nSET\n owned_media_item_id = $2\nWHERE\n owned_media_item_id = $1;\n" }, + "ddd2f183cb5f6d8da4ad7fc5cdb056427bb0dce4b177d14902e5a75f6e04a82f": { + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Uuid" + }, + { + "name": "owner_id", + "ordinal": 1, + "type_info": "Uuid" + }, + { + "name": "source_site", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "username", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "last_update", + "ordinal": 4, + "type_info": "Timestamptz" + }, + { + "name": "loading_state", + "ordinal": 5, + "type_info": "Jsonb" + }, + { + "name": "data", + "ordinal": 6, + "type_info": "Jsonb" + }, + { + "name": "disabled", + "ordinal": 7, + "type_info": "Bool" + }, + { + "name": "created_at", + "ordinal": 8, + "type_info": "Timestamptz" + }, + { + "name": "verification_key", + "ordinal": 9, + "type_info": "Text" + }, + { + "name": "verified_at", + "ordinal": 10, + "type_info": "Timestamptz" + } + ], + "nullable": [ + false, + false, + false, + false, + true, + true, + true, + false, + false, + true, + true + ], + "parameters": { + "Left": [ + "Uuid" + ] + } + }, + "query": "SELECT\n *\nFROM\n linked_account\nWHERE\n id = $1\nFOR UPDATE;\n" + }, "e12de042e0d40c9a18088e373341e3b8087bdce43c82317a274f4fec4e6c4e11": { "describe": { "columns": [ diff --git a/src/error.rs b/src/error.rs index 76370e9..9b5179a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -20,8 +20,6 @@ pub enum Error { Database(#[from] sqlx::Error), #[error("template error: {0}")] Template(#[from] askama::Error), - #[error("redis error: {0}")] - Redis(#[from] redis::RedisError), #[error("job error: {0}")] Job(#[from] foxlib::jobs::Error), #[error("actix error: {0}")] diff --git a/src/jobs.rs b/src/jobs.rs index 51fd96b..81b1128 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -508,7 +508,6 @@ impl Display for JobInitiator { pub struct JobContext { pub producer: FaktoryProducer, pub conn: sqlx::PgPool, - pub redis: redis::aio::ConnectionManager, pub s3: rusoto_s3::S3Client, pub fuzzysearch: Arc, pub mailer: crate::Mailer, diff --git a/src/main.rs b/src/main.rs index 6a51aa3..279a470 100644 --- a/src/main.rs +++ b/src/main.rs @@ -604,12 +604,6 @@ async fn main() { ); let s3 = rusoto_s3::S3Client::new_with(client, provider, region); - let redis_client = - redis::Client::open(config.redis_dsn.clone()).expect("could not create redis client"); - let redis_manager = redis::aio::ConnectionManager::new(redis_client.clone()) - .await - .expect("could not create redis connection manager"); - let fuzzysearch = fuzzysearch::FuzzySearch::new_with_opts(fuzzysearch::FuzzySearchOpts { endpoint: Some(config.fuzzysearch_host.clone()), api_key: config.fuzzysearch_api_key.clone(), @@ -674,7 +668,6 @@ async fn main() { let ctx = jobs::JobContext { producer, conn: pool, - redis: redis_manager, s3, fuzzysearch: std::sync::Arc::new(fuzzysearch), mailer, @@ -745,8 +738,6 @@ async fn main() { .wrap(actix_web::middleware::Compress::default()) .app_data(web::Data::new(pool.clone())) .app_data(web::Data::new(s3.clone())) - .app_data(web::Data::new(redis_client.clone())) - .app_data(web::Data::new(redis_manager.clone())) .app_data(web::Data::new(config.clone())) .app_data(web::Data::new(telegram_login.clone())) .app_data(web::Data::new(producer.clone())) diff --git a/src/models.rs b/src/models.rs index e23236d..df54a68 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1428,6 +1428,55 @@ impl LinkedAccount { } } +#[derive(Debug)] +pub struct LinkedAccountImport { + pub linked_account_id: Uuid, + pub started_at: chrono::DateTime, + pub completed_at: Option>, + pub expected_count: i32, + pub expected_ids: Vec, + pub loaded_ids: Vec, +} + +impl LinkedAccountImport { + pub async fn start( + conn: &sqlx::PgPool, + account_id: Uuid, + expected_ids: &[String], + ) -> Result<(), Error> { + sqlx::query_file!( + "queries/linked_account_import/start.sql", + account_id, + expected_ids.len() as i32, + expected_ids + ) + .execute(conn) + .await?; + + Ok(()) + } + + pub async fn loaded( + conn: &sqlx::PgPool, + account_id: Uuid, + id: &str, + ) -> Result<(i32, i32), Error> { + let row = sqlx::query_file!("queries/linked_account_import/loaded.sql", account_id, id) + .fetch_one(conn) + .await?; + + Ok((row.loaded_count.unwrap_or(0), row.expected_count)) + } + + pub async fn complete(conn: &sqlx::PgPool, account_id: Uuid) -> Result<(), Error> { + sqlx::query_file!("queries/linked_account_import/complete.sql", account_id) + .execute(conn) + .await?; + + Ok(()) + } +} + #[derive(Debug, Clone, Copy, DeserializeFromStr, SerializeDisplay, PartialEq, Eq, Hash)] pub enum Site { FurAffinity, diff --git a/src/site/bsky.rs b/src/site/bsky.rs index bbf2c63..97e9328 100644 --- a/src/site/bsky.rs +++ b/src/site/bsky.rs @@ -186,11 +186,7 @@ async fn import_submission( } } - let mut redis = ctx.redis.clone(); - super::update_import_progress( - &ctx.conn, &mut redis, &ctx.nats, user_id, account_id, post.uri, - ) - .await?; + super::update_import_progress(&ctx.conn, &ctx.nats, user_id, account_id, post.uri).await?; Ok(()) } @@ -270,17 +266,8 @@ impl CollectedSite for BSky { tracing::info!("discovered {} submissions", posts.len()); let ids = posts.iter().map(|post| &post.uri); - let mut redis = ctx.redis.clone(); - - super::set_loading_submissions( - &ctx.conn, - &mut redis, - &ctx.nats, - account.owner_id, - account.id, - ids, - ) - .await?; + super::set_loading_submissions(&ctx.conn, &ctx.nats, account.owner_id, account.id, ids) + .await?; super::queue_new_submissions( &ctx.producer, diff --git a/src/site/deviantart.rs b/src/site/deviantart.rs index 4be1fdf..a23f06e 100644 --- a/src/site/deviantart.rs +++ b/src/site/deviantart.rs @@ -158,19 +158,10 @@ impl CollectedSite for DeviantArt { let known = subs.len() as i32; tracing::info!("discovered {} submissions", known); - let mut redis = ctx.redis.clone(); - let ids = subs.iter().map(|sub| sub.deviationid); - super::set_loading_submissions( - &ctx.conn, - &mut redis, - &ctx.nats, - account.owner_id, - account.id, - ids, - ) - .await?; + super::set_loading_submissions(&ctx.conn, &ctx.nats, account.owner_id, account.id, ids) + .await?; super::queue_new_submissions( &ctx.producer, @@ -291,10 +282,8 @@ async fn add_submission_deviantart( tracing::info!("submission had no content"); if was_import { - let mut redis = ctx.redis.clone(); super::update_import_progress( &ctx.conn, - &mut redis, &ctx.nats, user_id, account_id, @@ -358,16 +347,8 @@ async fn add_submission_deviantart( } if was_import { - let mut redis = ctx.redis.clone(); - super::update_import_progress( - &ctx.conn, - &mut redis, - &ctx.nats, - user_id, - account_id, - sub.deviationid, - ) - .await?; + super::update_import_progress(&ctx.conn, &ctx.nats, user_id, account_id, sub.deviationid) + .await?; } Ok(()) diff --git a/src/site/furaffinity.rs b/src/site/furaffinity.rs index aacc181..96f373d 100644 --- a/src/site/furaffinity.rs +++ b/src/site/furaffinity.rs @@ -135,11 +135,8 @@ impl CollectedSite for FurAffinity { let known = ids.len() as i32; tracing::info!("discovered {} submissions", known); - let mut redis = ctx.redis.clone(); - super::set_loading_submissions( &ctx.conn, - &mut redis, &ctx.nats, account.owner_id, account.id, @@ -223,11 +220,7 @@ async fn add_submission_furaffinity( } if was_import { - let mut redis = ctx.redis.clone(); - super::update_import_progress( - &ctx.conn, &mut redis, &ctx.nats, user_id, account_id, sub_id, - ) - .await?; + super::update_import_progress(&ctx.conn, &ctx.nats, user_id, account_id, sub_id).await?; } Ok(()) diff --git a/src/site/mod.rs b/src/site/mod.rs index e4243e9..df996a6 100644 --- a/src/site/mod.rs +++ b/src/site/mod.rs @@ -1,10 +1,8 @@ -use std::collections::HashSet; use std::future::Future; use async_trait::async_trait; use foxlib::jobs::{FaktoryForge, FaktoryProducer, Job}; use oauth2::{AccessToken, RefreshToken, TokenResponse}; -use redis::AsyncCommands; use uuid::Uuid; use crate::jobs::JobInitiatorExt; @@ -140,7 +138,6 @@ fn get_authenticated_client( /// to the user. async fn set_loading_submissions( conn: &sqlx::PgPool, - redis: &mut redis::aio::ConnectionManager, nats: &async_nats::Client, user_id: Uuid, account_id: Uuid, @@ -150,7 +147,7 @@ where S: ToString, I: Iterator, { - let ids: HashSet = ids.into_iter().map(|item| item.to_string()).collect(); + let ids: Vec = ids.into_iter().map(|item| item.to_string()).collect(); let len = ids.len(); if len == 0 { @@ -167,9 +164,7 @@ where } else { tracing::info!("user has {} submissions to load", len); - let key = format!("account-import-ids:loading:{account_id}"); - redis.sadd::<_, _, ()>(&key, ids).await?; - redis.expire::<_, ()>(key, 60 * 60 * 24 * 7).await?; + models::LinkedAccountImport::start(conn, account_id, &ids).await?; models::LinkedAccount::update_loading_state( conn, @@ -210,43 +205,21 @@ where async fn update_import_progress( conn: &sqlx::PgPool, - redis: &mut redis::aio::ConnectionManager, nats: &async_nats::Client, user_id: Uuid, account_id: Uuid, site_id: S, ) -> Result<(), Error> { - let loading_key = format!("account-import-ids:loading:{account_id}"); - let completed_key = format!("account-import-ids:completed:{account_id}"); + let (loaded, expected) = + models::LinkedAccountImport::loaded(conn, account_id, &site_id.to_string()).await?; - let site_id = site_id.to_string(); + tracing::debug!("submission was part of import, loaded {loaded} out of {expected} items"); - redis - .smove::<_, _, _, ()>(&loading_key, &completed_key, &site_id) - .await?; - - redis - .expire::<_, ()>(&loading_key, 60 * 60 * 24 * 7) - .await?; - redis - .expire::<_, ()>(&completed_key, 60 * 60 * 24 * 7) - .await?; - - let (remaining, completed): (i32, i32) = redis::pipe() - .atomic() - .scard(loading_key) - .scard(completed_key) - .query_async(redis) - .await?; - - tracing::debug!( - "submission was part of import, {} items remaining", - remaining - ); - - if remaining == 0 { + if expected - loaded == 0 { tracing::info!("marking account import complete"); + models::LinkedAccountImport::complete(conn, account_id).await?; + models::LinkedAccount::update_loading_state( conn, nats, @@ -262,8 +235,8 @@ async fn update_import_progress( nats, crate::api::EventMessage::LoadingProgress { account_id, - loaded: completed, - total: remaining + completed, + loaded, + total: expected, }, ) .await?; diff --git a/src/site/twitter.rs b/src/site/twitter.rs index 7e4ccaa..0e2c37e 100644 --- a/src/site/twitter.rs +++ b/src/site/twitter.rs @@ -147,11 +147,8 @@ impl CollectedSite for Twitter { models::LinkedAccount::update_data(&ctx.conn, account.id, Some(data)).await?; - let mut redis = ctx.redis.clone(); - super::set_loading_submissions( &ctx.conn, - &mut redis, &ctx.nats, account.owner_id, account.id, @@ -524,11 +521,7 @@ async fn add_submission_twitter( } if was_import { - let mut redis = ctx.redis.clone(); - super::update_import_progress( - &ctx.conn, &mut redis, &ctx.nats, user_id, account_id, tweet.id, - ) - .await?; + super::update_import_progress(&ctx.conn, &ctx.nats, user_id, account_id, tweet.id).await?; } Ok(()) diff --git a/src/site/weasyl.rs b/src/site/weasyl.rs index 4758648..258163c 100644 --- a/src/site/weasyl.rs +++ b/src/site/weasyl.rs @@ -128,11 +128,8 @@ impl CollectedSite for Weasyl { let known = subs.len() as i32; tracing::info!("discovered {} submissions", known); - let mut redis = ctx.redis.clone(); - super::set_loading_submissions( &ctx.conn, - &mut redis, &ctx.nats, account.owner_id, account.id, @@ -211,11 +208,7 @@ async fn add_submission_weasyl( } if was_import { - let mut redis = ctx.redis.clone(); - super::update_import_progress( - &ctx.conn, &mut redis, &ctx.nats, user_id, account_id, sub_id, - ) - .await?; + super::update_import_progress(&ctx.conn, &ctx.nats, user_id, account_id, sub_id).await?; } Ok(())