Skip to content

Commit

Permalink
Deduplicate media uploads by SHA256 hash.
Browse files Browse the repository at this point in the history
  • Loading branch information
Syfaro committed Feb 7, 2024
1 parent e4e52bc commit 1906320
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 118 deletions.
17 changes: 17 additions & 0 deletions queries/owned_media/lookup_by_sha256.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
SELECT
id "id!",
owner_id "owner_id!",
perceptual_hash,
sha256_hash "sha256_hash!: Sha256Hash",
last_modified "last_modified!",
content_url,
content_size,
thumb_url,
event_count "event_count!",
last_event,
accounts "accounts: sqlx::types::Json<Vec<OwnedMediaItemAccount>>"
FROM
owned_media_item_accounts
WHERE
owner_id = $1
AND sha256_hash = $2;
54 changes: 29 additions & 25 deletions src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ async fn import_add(

let sha256_hash = sub.sha256.ok_or(Error::Missing)?;

let media_id = models::OwnedMediaItem::add_item(
let (media_id, is_new) = models::OwnedMediaItem::add_item(
&ctx.conn,
user_id,
account_id,
Expand All @@ -449,20 +449,22 @@ async fn import_add(
)
.await?;

match download_image(ctx, &sub.content_url).await {
Ok(im) => {
models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, media_id, im)
.await?;
if is_new {
match download_image(ctx, &sub.content_url).await {
Ok(im) => {
models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, media_id, im)
.await?;
}
Err(err) => tracing::warn!("could not attach image: {}", err),
}
Err(err) => tracing::warn!("could not attach image: {}", err),
}

ctx.producer
.enqueue_job(
jobs::SearchExistingSubmissionsJob { user_id, media_id }
.initiated_by(jobs::JobInitiator::user(user_id)),
)
.await?;
ctx.producer
.enqueue_job(
jobs::SearchExistingSubmissionsJob { user_id, media_id }
.initiated_by(jobs::JobInitiator::user(user_id)),
)
.await?;
}

Ok(())
}
Expand Down Expand Up @@ -556,29 +558,31 @@ pub async fn handle_multipart_upload(
.await
.map_err(|_err| Error::unknown_message("join error"))??;

let id = models::OwnedMediaItem::add_manual_item(
let (id, is_new) = models::OwnedMediaItem::add_manual_item(
pool,
user.id,
i64::from_be_bytes(perceptual_hash),
sha256_hash,
)
.await?;

models::OwnedMediaItem::update_media(pool, s3, config, id, im).await?;

models::UserEvent::notify(pool, redis, user.id, "Uploaded image.").await?;

ids.push(id);

faktory
.enqueue_job(
jobs::SearchExistingSubmissionsJob {
user_id: user.id,
media_id: id,
}
.initiated_by(jobs::JobInitiator::user(user.id)),
)
.await?;
if is_new {
models::OwnedMediaItem::update_media(pool, s3, config, id, im).await?;

faktory
.enqueue_job(
jobs::SearchExistingSubmissionsJob {
user_id: user.id,
media_id: id,
}
.initiated_by(jobs::JobInitiator::user(user.id)),
)
.await?;
}
}

Ok(ids)
Expand Down
44 changes: 34 additions & 10 deletions src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,18 @@ impl OwnedMediaItem {
user_id: Uuid,
perceptual_hash: i64,
sha256_hash: [u8; 32],
) -> Result<Uuid, Error> {
) -> Result<(Uuid, bool), Error> {
if let Some(media) = sqlx::query_file!(
"queries/owned_media/lookup_by_sha256.sql",
user_id,
sha256_hash.to_vec()
)
.fetch_optional(conn)
.await?
{
return Ok((media.id, false));
}

let item_id = sqlx::query_file_scalar!(
"queries/owned_media/add_manual_item.sql",
user_id,
Expand All @@ -640,7 +651,7 @@ impl OwnedMediaItem {
.fetch_one(conn)
.await?;

Ok(item_id)
Ok((item_id, true))
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -654,21 +665,34 @@ impl OwnedMediaItem {
link: Option<String>,
title: Option<String>,
posted_at: Option<chrono::DateTime<chrono::Utc>>,
) -> Result<Uuid, Error> {
) -> Result<(Uuid, bool), Error> {
let mut tx = conn.begin().await?;

let item_id = sqlx::query_file_scalar!(
"queries/owned_media/add_item.sql",
let (id, is_new) = if let Some(media) = sqlx::query_file!(
"queries/owned_media/lookup_by_sha256.sql",
user_id,
perceptual_hash.filter(|hash| *hash != 0),
sha256_hash.to_vec()
)
.fetch_one(&mut tx)
.await?;
.fetch_optional(&mut tx)
.await?
{
(media.id, false)
} else {
let item_id = sqlx::query_file_scalar!(
"queries/owned_media/add_item.sql",
user_id,
perceptual_hash.filter(|hash| *hash != 0),
sha256_hash.to_vec()
)
.fetch_one(&mut tx)
.await?;

(item_id, true)
};

sqlx::query_file!(
"queries/owned_media/link_site_account.sql",
item_id,
id,
account_id,
source_id.to_string(),
link,
Expand All @@ -680,7 +704,7 @@ impl OwnedMediaItem {

tx.commit().await?;

Ok(item_id)
Ok((id, is_new))

Check failure on line 707 in src/models.rs

View workflow job for this annotation

GitHub Actions / clippy

mismatched types

error[E0308]: mismatched types --> src/models.rs:707:13 | 707 | Ok((id, is_new)) | ^^ expected `Uuid`, found `Option<_>` | = note: expected struct `uuid::Uuid` found enum `std::option::Option<_>` help: consider using `Option::expect` to unwrap the `std::option::Option<_>` value, panicking if the value is an `Option::None` | 707 | Ok((id.expect("REASON"), is_new)) | +++++++++++++++++
}

pub async fn update_media(
Expand Down
28 changes: 15 additions & 13 deletions src/site/bsky.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ async fn import_submission(
(None, None)
};

let item_id = models::OwnedMediaItem::add_item(
let (item_id, is_new) = models::OwnedMediaItem::add_item(
&ctx.conn,
user_id,
account_id,
Expand All @@ -168,19 +168,21 @@ async fn import_submission(
)
.await?;

if let Some(im) = im {
models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im)
.await?;
if is_new {
if let Some(im) = im {
models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im)
.await?;

ctx.producer
.enqueue_job(
SearchExistingSubmissionsJob {
user_id,
media_id: item_id,
}
.initiated_by(JobInitiator::user(user_id)),
)
.await?;
ctx.producer
.enqueue_job(
SearchExistingSubmissionsJob {
user_id,
media_id: item_id,
}
.initiated_by(JobInitiator::user(user_id)),
)
.await?;
}
}
}

Expand Down
27 changes: 15 additions & 12 deletions src/site/deviantart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ async fn add_submission_deviantart(
(None, None)
};

let item_id = models::OwnedMediaItem::add_item(
let (item_id, is_new) = models::OwnedMediaItem::add_item(
&ctx.conn,
user_id,
account_id,
Expand All @@ -333,18 +333,21 @@ async fn add_submission_deviantart(
)
.await?;

if let Some(im) = im {
models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im).await?;
if is_new {
if let Some(im) = im {
models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im)
.await?;

ctx.producer
.enqueue_job(
SearchExistingSubmissionsJob {
user_id,
media_id: item_id,
}
.initiated_by(JobInitiator::user(user_id)),
)
.await?;
ctx.producer
.enqueue_job(
SearchExistingSubmissionsJob {
user_id,
media_id: item_id,
}
.initiated_by(JobInitiator::user(user_id)),
)
.await?;
}
}

if was_import {
Expand Down
28 changes: 15 additions & 13 deletions src/site/furaffinity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ async fn process_submission(
.try_into()
.expect("sha256 hash was wrong length");

let item_id = models::OwnedMediaItem::add_item(
let (item_id, is_new) = models::OwnedMediaItem::add_item(
&ctx.conn,
user_id,
account_id,
Expand All @@ -287,18 +287,20 @@ async fn process_submission(
)
.await?;

let im = image::load_from_memory(&sub.file.ok_or(Error::Missing)?)?;
models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im).await?;

ctx.producer
.enqueue_job(
SearchExistingSubmissionsJob {
user_id,
media_id: item_id,
}
.initiated_by(JobInitiator::user(user_id)),
)
.await?;
if is_new {
let im = image::load_from_memory(&sub.file.ok_or(Error::Missing)?)?;
models::OwnedMediaItem::update_media(&ctx.conn, &ctx.s3, &ctx.config, item_id, im).await?;

ctx.producer
.enqueue_job(
SearchExistingSubmissionsJob {
user_id,
media_id: item_id,
}
.initiated_by(JobInitiator::user(user_id)),
)
.await?;
}

Ok(())
}
Expand Down
Loading

0 comments on commit 1906320

Please sign in to comment.