Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENG-294] Volume awareness #2796

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d842590
volume awareness init
jamiepine Oct 17, 2024
5f77ad0
Merge branch 'eng-1828-migration-to-new-cloud-api-system' into eng-29…
jamiepine Oct 24, 2024
38d8836
Merge remote-tracking branch 'origin/eng-1828-migration-to-new-cloud-…
jamiepine Oct 24, 2024
678bcb9
Refactor volume handling to replace storage statistics with volume ma…
jamiepine Oct 24, 2024
e570862
Enhance volume management: add volume watcher, optimize volume APIs, …
jamiepine Nov 1, 2024
8a5befc
Implement volume tracking, unmounting, and event subscriptions; enhan…
jamiepine Nov 1, 2024
9a7fa46
Remove unused old volume management module files
jamiepine Nov 1, 2024
f60ff48
Add volume event logging and improve watcher integration with enhance…
jamiepine Nov 3, 2024
4c866e6
Refactor volume management to streamline event handling, state update…
jamiepine Nov 3, 2024
8db67dc
Enhance volume management by refining state handling, simplifying eve…
jamiepine Nov 3, 2024
bd00623
Implement volume initialization tailored for libraries, optimize volu…
jamiepine Nov 3, 2024
378edcc
Refactor volume management to include library awareness, enhance fing…
jamiepine Nov 3, 2024
fbeb6c9
Add unique constraint to volume model, refine volume fingerprint gene…
jamiepine Nov 3, 2024
e9ce227
Add unmount functionality using fingerprint, enhance error handling i…
jamiepine Nov 3, 2024
ad18bbe
Implement volume update, mount state change, and error handling in Vo…
jamiepine Nov 3, 2024
a50db68
reduce state write calls
jamiepine Nov 4, 2024
63757a3
Introduce VolumeFingerprint and related types, improving type safety …
jamiepine Nov 4, 2024
39112a5
Merge branch 'main' into eng-294-volume-awareness
jamiepine Nov 4, 2024
4a3c871
modify init flow of actor to match other core managers
jamiepine Nov 4, 2024
4bc2b68
Implement speed testing for volumes, enhancing event handling and sta…
jamiepine Nov 4, 2024
4a5c4ce
update fingerprint handling, and clean up unused code in volume manag…
jamiepine Nov 5, 2024
d160255
Merge branch 'main' into eng-294-volume-awareness
jamiepine Nov 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .cspell/project_words.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
akar
allred
alluxio
APFS
augusto
automount
benja
Expand All @@ -17,11 +18,13 @@ elon
encryptor
Exif
Flac
fsevent
graps
haden
haoyuan
haris
Iconoir
inotify
josephjacks
justinhoffman
Keychain
Expand All @@ -32,15 +35,18 @@ lesterlee
Loas
lütke
marietti
mbps
mehrzad
Mjpeg
Mmap
mpscrr
mytton
narkhede
naveen
neha
noco
Normalised
NTFS
OSSC
poonen
rauch
Expand All @@ -52,6 +58,7 @@ rspc
rspcws
rywalker
sanjay
sdvol
sharma
skippable
spacedrive
Expand All @@ -64,8 +71,10 @@ thumbstrips
tobiaslutke
tokio
typecheck
unwatch
uuid
vdfs
vijay
winapi
zacharysmith
zxcvbn
16 changes: 14 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ base91 = "0.1.0"
ctor = "0.2.8"
directories = "5.0"
flate2 = "1.0"
fsevent = "2.1.2"
hex = "0.4.3"
hostname = "0.4.0"
http-body = "1.0"
http-range = "0.1.5"
Expand Down
37 changes: 21 additions & 16 deletions core/crates/sync/src/backfill.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use sd_prisma::{
prisma::{
crdt_operation, device, exif_data, file_path, label, label_on_object, location, object,
storage_statistics, tag, tag_on_object, PrismaClient, SortOrder,
tag, tag_on_object, volume, PrismaClient, SortOrder,
},
prisma_sync,
};
Expand Down Expand Up @@ -47,7 +47,7 @@ pub async fn backfill_operations(sync: &SyncManager) -> Result<(), Error> {
backfill_device(&db, sync, local_device).await?;

(
backfill_storage_statistics(&db, sync, local_device_id),
backfill_volumes(&db, sync, local_device_id),
paginate_tags(&db, sync),
paginate_locations(&db, sync, local_device_id),
paginate_objects(&db, sync, local_device_id),
Expand Down Expand Up @@ -102,15 +102,15 @@ async fn backfill_device(
}

#[instrument(skip(db, sync), err)]
async fn backfill_storage_statistics(
async fn backfill_volumes(
db: &PrismaClient,
sync: &SyncManager,
device_id: device::id::Type,
) -> Result<(), Error> {
let Some(stats) = db
.storage_statistics()
.find_first(vec![storage_statistics::device_id::equals(Some(device_id))])
.include(storage_statistics::include!({device: select { pub_id }}))
let Some(volume) = db
.volume()
.find_first(vec![volume::device_id::equals(Some(device_id))])
.include(volume::include!({device: select { pub_id }}))
.exec()
.await?
else {
Expand All @@ -120,24 +120,29 @@ async fn backfill_storage_statistics(

db.crdt_operation()
.create_many(vec![crdt_op_unchecked_db(&sync.shared_create(
prisma_sync::storage_statistics::SyncId {
pub_id: stats.pub_id,
prisma_sync::volume::SyncId {
pub_id: volume.pub_id,
},
chain_optional_iter(
[
sync_entry!(stats.total_capacity, storage_statistics::total_capacity),
sync_entry!(
stats.available_capacity,
storage_statistics::available_capacity
),
sync_entry!(volume.name, volume::name),
sync_entry!(volume.mount_type, volume::mount_type),
sync_entry!(volume.mount_point, volume::mount_point),
sync_entry!(volume.is_mounted, volume::is_mounted),
sync_entry!(volume.disk_type, volume::disk_type),
sync_entry!(volume.file_system, volume::file_system),
sync_entry!(volume.read_only, volume::read_only),
sync_entry!(volume.error_status, volume::error_status),
sync_entry!(volume.total_bytes_capacity, volume::total_bytes_capacity),
sync_entry!(volume.total_bytes_available, volume::total_bytes_available),
],
[option_sync_entry!(
stats.device.map(|device| {
volume.device.map(|device| {
prisma_sync::device::SyncId {
pub_id: device.pub_id,
}
}),
storage_statistics::device
volume::device
)],
),
))?])
Expand Down
2 changes: 1 addition & 1 deletion core/crates/sync/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ impl Manager {
total_count += self.ingest_by_model(prisma_sync::device::MODEL_ID).await?;

total_count += [
self.ingest_by_model(prisma_sync::storage_statistics::MODEL_ID),
self.ingest_by_model(prisma_sync::volume::MODEL_ID),
self.ingest_by_model(prisma_sync::tag::MODEL_ID),
self.ingest_by_model(prisma_sync::location::MODEL_ID),
self.ingest_by_model(prisma_sync::object::MODEL_ID),
Expand Down
59 changes: 27 additions & 32 deletions core/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ model Device {
date_created DateTime? // Not actually NULLABLE, but we have to comply with current sync implementation BS
date_deleted DateTime?

StorageStatistics StorageStatistics?
Location Location[]
FilePath FilePath[]
Object Object[]
ExifData ExifData[]
TagOnObject TagOnObject[]
LabelOnObject LabelOnObject[]
Location Location[]
FilePath FilePath[]
Object Object[]
ExifData ExifData[]
TagOnObject TagOnObject[]
LabelOnObject LabelOnObject[]
Volume Volume[]

@@map("device")
}
Expand Down Expand Up @@ -138,19 +138,29 @@ model ObjectKindStatistics {
@@map("object_kind_statistics")
}

/// @local
/// @shared(id: pub_id, modelId: 13)
model Volume {
id Int @id @default(autoincrement())
name String
mount_point String
total_bytes_capacity String @default("0")
total_bytes_available String @default("0")
id Int @id @default(autoincrement())
pub_id Bytes @unique

name String?
mount_point String?
mount_type String?
total_bytes_capacity String?
total_bytes_available String?
disk_type String?
filesystem String?
is_system Boolean @default(false)
date_modified DateTime @default(now())
file_system String?
date_modified DateTime?
is_mounted Boolean?
read_speed_mbps BigInt?
write_speed_mbps BigInt?
read_only Boolean?
error_status String?

device_id Int?
device Device? @relation(fields: [device_id], references: [id], onDelete: Cascade)

@@unique([mount_point, name])
@@unique([device_id, mount_point, name, total_bytes_capacity, file_system])
@@map("volume")
}

Expand Down Expand Up @@ -599,21 +609,6 @@ model ObjectInSpace {
@@map("object_in_space")
}

//// StorageStatistics ////
/// @shared(id: pub_id, modelId: 11)
model StorageStatistics {
id Int @id @default(autoincrement())
pub_id Bytes @unique

total_capacity BigInt @default(0)
available_capacity BigInt @default(0)

device_id Int? @unique
device Device? @relation(fields: [device_id], references: [id], onDelete: Cascade)

@@map("storage_statistics")
}

//// Job ////

model Job {
Expand Down
1 change: 1 addition & 0 deletions core/src/api/cloud/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
invalidate_query,
library::LibraryManagerError,
node::{config::NodeConfig, HardwareModel},
Node,
Expand Down
16 changes: 8 additions & 8 deletions core/src/api/libraries.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
api::CoreEvent,
invalidate_query,
library::{update_library_statistics, Library, LibraryConfig, LibraryName},
library::{Library, LibraryConfig, LibraryName},
location::{scan_location, LocationCreateArgs, ScanState},
util::MaybeUndefined,
Node,
Expand Down Expand Up @@ -644,13 +644,13 @@ async fn update_statistics_loop(
while let Some(msg) = msg_stream.next().await {
match msg {
Message::Tick => {
if last_received_at.elapsed() < FIVE_MINUTES {
if let Err(e) = update_library_statistics(&node, &library).await {
error!(?e, "Failed to update library statistics;");
} else {
invalidate_query!(&library, "library.statistics");
}
}
// if last_received_at.elapsed() < FIVE_MINUTES {
// if let Err(e) = update_library_statistics(&node, &library).await {
// error!(?e, "Failed to update library statistics;");
// } else {
// invalidate_query!(&library, "library.statistics");
// }
// }
}
Message::Requested(instant) => {
if instant - last_received_at > TWO_MINUTES {
Expand Down
Loading
Loading