diff --git a/queries/owned_media/lookup_by_sha256.sql b/queries/owned_media/lookup_by_sha256.sql new file mode 100644 index 0000000..8254198 --- /dev/null +++ b/queries/owned_media/lookup_by_sha256.sql @@ -0,0 +1,17 @@ +SELECT + id "id!", + owner_id "owner_id!", + perceptual_hash, + sha256_hash "sha256_hash!: Sha256Hash", + last_modified "last_modified!", + content_url, + content_size, + thumb_url, + event_count "event_count!", + last_event, + accounts "accounts: sqlx::types::Json>" +FROM + owned_media_item_accounts +WHERE + owner_id = $1 + AND sha256_hash = $2; diff --git a/src/common.rs b/src/common.rs index bfa8687..0b53f77 100644 --- a/src/common.rs +++ b/src/common.rs @@ -436,7 +436,7 @@ async fn import_add( let sha256_hash = sub.sha256.ok_or(Error::Missing)?; - let media_id = models::OwnedMediaItem::add_item( + let (media_id, is_new) = models::OwnedMediaItem::add_item( &ctx.conn, user_id, account_id, @@ -449,20 +449,22 @@ async fn import_add( ) .await?; - match download_image(ctx, &sub.content_url).await { - Ok(im) => { - models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, media_id, im) - .await?; + if is_new { + match download_image(ctx, &sub.content_url).await { + Ok(im) => { + models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, media_id, im) + .await?; + } + Err(err) => tracing::warn!("could not attach image: {}", err), } - Err(err) => tracing::warn!("could not attach image: {}", err), - } - ctx.producer - .enqueue_job( - jobs::SearchExistingSubmissionsJob { user_id, media_id } - .initiated_by(jobs::JobInitiator::user(user_id)), - ) - .await?; + ctx.producer + .enqueue_job( + jobs::SearchExistingSubmissionsJob { user_id, media_id } + .initiated_by(jobs::JobInitiator::user(user_id)), + ) + .await?; + } Ok(()) } @@ -556,7 +558,7 @@ pub async fn handle_multipart_upload( .await .map_err(|_err| Error::unknown_message("join error"))??; - let id = models::OwnedMediaItem::add_manual_item( + let (id, is_new) = models::OwnedMediaItem::add_manual_item( pool, user.id, i64::from_be_bytes(perceptual_hash), @@ -564,21 +566,23 @@ pub async fn handle_multipart_upload( ) .await?; - models::OwnedMediaItem::update_media(pool, s3, config, id, im).await?; - models::UserEvent::notify(pool, redis, user.id, "Uploaded image.").await?; ids.push(id); - faktory - .enqueue_job( - jobs::SearchExistingSubmissionsJob { - user_id: user.id, - media_id: id, - } - .initiated_by(jobs::JobInitiator::user(user.id)), - ) - .await?; + if is_new { + models::OwnedMediaItem::update_media(pool, s3, config, id, im).await?; + + faktory + .enqueue_job( + jobs::SearchExistingSubmissionsJob { + user_id: user.id, + media_id: id, + } + .initiated_by(jobs::JobInitiator::user(user.id)), + ) + .await?; + } } Ok(ids) diff --git a/src/models.rs b/src/models.rs index 5aae5e5..018c8e7 100644 --- a/src/models.rs +++ b/src/models.rs @@ -626,7 +626,18 @@ impl OwnedMediaItem { user_id: Uuid, perceptual_hash: i64, sha256_hash: [u8; 32], - ) -> Result { + ) -> Result<(Uuid, bool), Error> { + if let Some(media) = sqlx::query_file!( + "queries/owned_media/lookup_by_sha256.sql", + user_id, + sha256_hash.to_vec() + ) + .fetch_optional(conn) + .await? + { + return Ok((media.id, false)); + } + let item_id = sqlx::query_file_scalar!( "queries/owned_media/add_manual_item.sql", user_id, @@ -640,7 +651,7 @@ impl OwnedMediaItem { .fetch_one(conn) .await?; - Ok(item_id) + Ok((item_id, true)) } #[allow(clippy::too_many_arguments)] @@ -654,21 +665,34 @@ impl OwnedMediaItem { link: Option, title: Option, posted_at: Option>, - ) -> Result { + ) -> Result<(Uuid, bool), Error> { let mut tx = conn.begin().await?; - let item_id = sqlx::query_file_scalar!( - "queries/owned_media/add_item.sql", + let (id, is_new) = if let Some(media) = sqlx::query_file!( + "queries/owned_media/lookup_by_sha256.sql", user_id, - perceptual_hash.filter(|hash| *hash != 0), sha256_hash.to_vec() ) - .fetch_one(&mut tx) - .await?; + .fetch_optional(&mut tx) + .await? + { + (media.id, false) + } else { + let item_id = sqlx::query_file_scalar!( + "queries/owned_media/add_item.sql", + user_id, + perceptual_hash.filter(|hash| *hash != 0), + sha256_hash.to_vec() + ) + .fetch_one(&mut tx) + .await?; + + (item_id, true) + }; sqlx::query_file!( "queries/owned_media/link_site_account.sql", - item_id, + id, account_id, source_id.to_string(), link, @@ -680,7 +704,7 @@ impl OwnedMediaItem { tx.commit().await?; - Ok(item_id) + Ok((id, is_new)) } pub async fn update_media( diff --git a/src/site/bsky.rs b/src/site/bsky.rs index 59111c8..99a8243 100644 --- a/src/site/bsky.rs +++ b/src/site/bsky.rs @@ -152,7 +152,7 @@ async fn import_submission( (None, None) }; - let item_id = models::OwnedMediaItem::add_item( + let (item_id, is_new) = models::OwnedMediaItem::add_item( &ctx.conn, user_id, account_id, @@ -168,19 +168,21 @@ async fn import_submission( ) .await?; - if let Some(im) = im { - models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im) - .await?; + if is_new { + if let Some(im) = im { + models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im) + .await?; - ctx.producer - .enqueue_job( - SearchExistingSubmissionsJob { - user_id, - media_id: item_id, - } - .initiated_by(JobInitiator::user(user_id)), - ) - .await?; + ctx.producer + .enqueue_job( + SearchExistingSubmissionsJob { + user_id, + media_id: item_id, + } + .initiated_by(JobInitiator::user(user_id)), + ) + .await?; + } } } diff --git a/src/site/deviantart.rs b/src/site/deviantart.rs index 761dece..e4be5b0 100644 --- a/src/site/deviantart.rs +++ b/src/site/deviantart.rs @@ -320,7 +320,7 @@ async fn add_submission_deviantart( (None, None) }; - let item_id = models::OwnedMediaItem::add_item( + let (item_id, is_new) = models::OwnedMediaItem::add_item( &ctx.conn, user_id, account_id, @@ -333,18 +333,21 @@ async fn add_submission_deviantart( ) .await?; - if let Some(im) = im { - models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im).await?; + if is_new { + if let Some(im) = im { + models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im) + .await?; - ctx.producer - .enqueue_job( - SearchExistingSubmissionsJob { - user_id, - media_id: item_id, - } - .initiated_by(JobInitiator::user(user_id)), - ) - .await?; + ctx.producer + .enqueue_job( + SearchExistingSubmissionsJob { + user_id, + media_id: item_id, + } + .initiated_by(JobInitiator::user(user_id)), + ) + .await?; + } } if was_import { diff --git a/src/site/furaffinity.rs b/src/site/furaffinity.rs index e5e5af7..12a395f 100644 --- a/src/site/furaffinity.rs +++ b/src/site/furaffinity.rs @@ -274,7 +274,7 @@ async fn process_submission( .try_into() .expect("sha256 hash was wrong length"); - let item_id = models::OwnedMediaItem::add_item( + let (item_id, is_new) = models::OwnedMediaItem::add_item( &ctx.conn, user_id, account_id, @@ -287,18 +287,20 @@ async fn process_submission( ) .await?; - let im = image::load_from_memory(&sub.file.ok_or(Error::Missing)?)?; - models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im).await?; - - ctx.producer - .enqueue_job( - SearchExistingSubmissionsJob { - user_id, - media_id: item_id, - } - .initiated_by(JobInitiator::user(user_id)), - ) - .await?; + if is_new { + let im = image::load_from_memory(&sub.file.ok_or(Error::Missing)?)?; + models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im).await?; + + ctx.producer + .enqueue_job( + SearchExistingSubmissionsJob { + user_id, + media_id: item_id, + } + .initiated_by(JobInitiator::user(user_id)), + ) + .await?; + } Ok(()) } diff --git a/src/site/twitter.rs b/src/site/twitter.rs index 0a02ffa..086c973 100644 --- a/src/site/twitter.rs +++ b/src/site/twitter.rs @@ -487,7 +487,7 @@ async fn add_submission_twitter( (None, None) }; - let item_id = models::OwnedMediaItem::add_item( + let (item_id, is_new) = models::OwnedMediaItem::add_item( &ctx.conn, user_id, account_id, @@ -503,19 +503,21 @@ async fn add_submission_twitter( ) .await?; - if let Some(im) = im { - models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im) - .await?; + if is_new { + if let Some(im) = im { + models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im) + .await?; - ctx.producer - .enqueue_job( - SearchExistingSubmissionsJob { - user_id, - media_id: item_id, - } - .initiated_by(JobInitiator::user(user_id)), - ) - .await?; + ctx.producer + .enqueue_job( + SearchExistingSubmissionsJob { + user_id, + media_id: item_id, + } + .initiated_by(JobInitiator::user(user_id)), + ) + .await?; + } } } @@ -928,7 +930,7 @@ async fn add_tweet( return Ok(media.id); } - let item_id = models::OwnedMediaItem::add_item( + let (item_id, is_new) = models::OwnedMediaItem::add_item( &ctx.conn, user_id, account_id, @@ -944,20 +946,22 @@ async fn add_tweet( ) .await?; - if let Some(im) = tweet.im { - models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im) - .await - .unwrap(); + if is_new { + if let Some(im) = tweet.im { + models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im) + .await + .unwrap(); - ctx.producer - .enqueue_job( - SearchExistingSubmissionsJob { - user_id, - media_id: item_id, - } - .initiated_by(JobInitiator::user(user_id)), - ) - .await?; + ctx.producer + .enqueue_job( + SearchExistingSubmissionsJob { + user_id, + media_id: item_id, + } + .initiated_by(JobInitiator::user(user_id)), + ) + .await?; + } } Ok(item_id) diff --git a/src/site/weasyl.rs b/src/site/weasyl.rs index 6c0a8ef..df15b2e 100644 --- a/src/site/weasyl.rs +++ b/src/site/weasyl.rs @@ -244,7 +244,7 @@ async fn process_submission( (None, None) }; - let item_id = models::OwnedMediaItem::add_item( + let (item_id, is_new) = models::OwnedMediaItem::add_item( &ctx.conn, user_id, account_id, @@ -257,19 +257,21 @@ async fn process_submission( ) .await?; - if let Some(im) = im { - models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im) - .await?; - - ctx.producer - .enqueue_job( - SearchExistingSubmissionsJob { - user_id, - media_id: item_id, - } - .initiated_by(JobInitiator::user(user_id)), - ) - .await?; + if is_new { + if let Some(im) = im { + models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im) + .await?; + + ctx.producer + .enqueue_job( + SearchExistingSubmissionsJob { + user_id, + media_id: item_id, + } + .initiated_by(JobInitiator::user(user_id)), + ) + .await?; + } } } diff --git a/src/user.rs b/src/user.rs index 7a59fa2..02817bb 100644 --- a/src/user.rs +++ b/src/user.rs @@ -525,10 +525,7 @@ async fn account_remove( ) -> Result { models::LinkedAccount::remove(&conn, user.id, form.account_id).await?; - session.add_flash( - FlashStyle::Success, - "Removed account link.", - ); + session.add_flash(FlashStyle::Success, "Removed account link."); Ok(HttpResponse::Found() .insert_header(("Location", request.url_for_static("user_home")?.as_str()))