diff --git a/crates/subspace-farmer-components/src/file_ext.rs b/crates/subspace-farmer-components/src/file_ext.rs index b0ef51da72..435e0c7ddb 100644 --- a/crates/subspace-farmer-components/src/file_ext.rs +++ b/crates/subspace-farmer-components/src/file_ext.rs @@ -1,7 +1,7 @@ //! File extension trait use std::fs::{File, OpenOptions}; -use std::io::{Result, Seek, SeekFrom}; +use std::io::Result; /// Extension convenience trait that allows setting some file opening options in cross-platform way pub trait OpenOptionsExt { @@ -79,10 +79,10 @@ impl OpenOptionsExt for OpenOptions { /// and doing cross-platform exact reads/writes pub trait FileExt { /// Get file size - fn size(&mut self) -> Result; + fn size(&self) -> Result; /// Make sure file has specified number of bytes allocated for it - fn preallocate(&mut self, len: u64) -> Result<()>; + fn preallocate(&self, len: u64) -> Result<()>; /// Advise OS/file system that file will use random access and read-ahead behavior is /// undesirable, on Windows this can only be set when file is opened, see [`OpenOptionsExt`] @@ -100,15 +100,11 @@ pub trait FileExt { } impl FileExt for File { - fn size(&mut self) -> Result { - self.seek(SeekFrom::End(0)) + fn size(&self) -> Result { + Ok(self.metadata()?.len()) } - fn preallocate(&mut self, len: u64) -> Result<()> { - // TODO: Hack due to bugs on Windows: https://github.com/al8n/fs4-rs/issues/13 - if self.size()? == len { - return Ok(()); - } + fn preallocate(&self, len: u64) -> Result<()> { fs2::FileExt::allocate(self, len) } diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index d3da270eca..83c3670ec6 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -1220,7 +1220,7 @@ impl SingleDiskFarm { let metadata_file_path = directory.join(Self::METADATA_FILE); #[cfg(not(windows))] - let mut metadata_file = OpenOptions::new() + let metadata_file = OpenOptions::new() .read(true) .write(true) .create(true) @@ -1231,7 +1231,7 @@ impl SingleDiskFarm { metadata_file.advise_random_access()?; #[cfg(windows)] - let mut metadata_file = UnbufferedIoFileWindows::open(&metadata_file_path)?; + let metadata_file = UnbufferedIoFileWindows::open(&metadata_file_path)?; let metadata_size = metadata_file.size()?; let expected_metadata_size = @@ -1323,7 +1323,7 @@ impl SingleDiskFarm { }; #[cfg(not(windows))] - let mut plot_file = OpenOptions::new() + let plot_file = OpenOptions::new() .read(true) .write(true) .create(true) @@ -1334,7 +1334,7 @@ impl SingleDiskFarm { plot_file.advise_random_access()?; #[cfg(windows)] - let mut plot_file = UnbufferedIoFileWindows::open(&directory.join(Self::PLOT_FILE))?; + let plot_file = UnbufferedIoFileWindows::open(&directory.join(Self::PLOT_FILE))?; if plot_file.size()? != plot_file_size { // Allocating the whole file (`set_len` below can create a sparse file, which will cause @@ -1344,12 +1344,6 @@ impl SingleDiskFarm { .map_err(SingleDiskFarmError::CantPreallocatePlotFile)?; // Truncating file (if necessary) plot_file.set_len(plot_file_size)?; - - // TODO: Hack due to Windows bugs: - // https://learn.microsoft.com/en-us/answers/questions/1608540/getfileinformationbyhandle-followed-by-read-with-f - if cfg!(windows) { - warn!("Farm was resized, farmer restart is needed for optimal performance!") - } } let plot_file = Arc::new(plot_file); @@ -1399,13 +1393,12 @@ impl SingleDiskFarm { directory: &Path, ) -> io::Result> { #[cfg(not(windows))] - let mut metadata_file = OpenOptions::new() + let metadata_file = OpenOptions::new() .read(true) .open(directory.join(Self::METADATA_FILE))?; #[cfg(windows)] - let mut metadata_file = - UnbufferedIoFileWindows::open(&directory.join(Self::METADATA_FILE))?; + let metadata_file = UnbufferedIoFileWindows::open(&directory.join(Self::METADATA_FILE))?; let metadata_size = metadata_file.size()?; let sector_metadata_size = SectorMetadataChecksummed::encoded_size(); @@ -1671,7 +1664,7 @@ impl SingleDiskFarm { let (metadata_file, mut metadata_header) = { info!(path = %metadata_file_path.display(), "Checking metadata file"); - let mut metadata_file = match OpenOptions::new() + let metadata_file = match OpenOptions::new() .read(true) .write(true) .open(&metadata_file_path) @@ -1772,7 +1765,7 @@ impl SingleDiskFarm { let plot_file_path = directory.join(Self::PLOT_FILE); info!(path = %plot_file_path.display(), "Checking plot file"); - let mut plot_file = match OpenOptions::new() + let plot_file = match OpenOptions::new() .read(true) .write(true) .open(&plot_file_path) @@ -2048,7 +2041,7 @@ impl SingleDiskFarm { let file = directory.join(DiskPieceCache::FILE_NAME); info!(path = %file.display(), "Checking cache file"); - let mut cache_file = match OpenOptions::new().read(true).write(true).open(&file) { + let cache_file = match OpenOptions::new().read(true).write(true).open(&file) { Ok(plot_file) => plot_file, Err(error) => { return Err(if error.kind() == io::ErrorKind::NotFound { @@ -2211,17 +2204,15 @@ where // A lot simplified version of concurrent chunks { let start = Instant::now(); - (0..Record::NUM_S_BUCKETS) - .into_par_iter() - .try_for_each(|_| { - let offset = thread_rng().gen_range(0_usize..sector_size / Scalar::FULL_BYTES) - * Scalar::FULL_BYTES; - farming_plot.read_at(&mut [0; Scalar::FULL_BYTES], offset as u64) - })?; + (0..Record::NUM_CHUNKS).into_par_iter().try_for_each(|_| { + let offset = thread_rng().gen_range(0_usize..sector_size / Scalar::FULL_BYTES) + * Scalar::FULL_BYTES; + farming_plot.read_at(&mut [0; Scalar::FULL_BYTES], offset as u64) + })?; let elapsed = start.elapsed(); if elapsed >= INTERNAL_BENCHMARK_READ_TIMEOUT { - debug!( + info!( ?elapsed, "Proving method with chunks reading is too slow, using whole sector" ); diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs index 0154d1f953..2c69629283 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs @@ -73,7 +73,7 @@ impl DiskPieceCache { } #[cfg(not(windows))] - let mut file = OpenOptions::new() + let file = OpenOptions::new() .read(true) .write(true) .create(true) @@ -84,7 +84,7 @@ impl DiskPieceCache { file.advise_random_access()?; #[cfg(windows)] - let mut file = UnbufferedIoFileWindows::open(&directory.join(Self::FILE_NAME))?; + let file = UnbufferedIoFileWindows::open(&directory.join(Self::FILE_NAME))?; let expected_size = u64::from(Self::element_size()) * u64::from(capacity); // Align plot file size for disk sector size diff --git a/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs b/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs index 0ed9b69a34..b2bce1df49 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs @@ -31,7 +31,7 @@ fn basic() { let tempdir = tempdir().unwrap(); #[cfg(not(windows))] - let mut file = OpenOptions::new() + let file = OpenOptions::new() .read(true) .write(true) .create(true) @@ -40,7 +40,7 @@ fn basic() { .unwrap(); #[cfg(windows)] - let mut file = UnbufferedIoFileWindows::open(&tempdir.path().join("plot.bin")).unwrap(); + let file = UnbufferedIoFileWindows::open(&tempdir.path().join("plot.bin")).unwrap(); // Align plot file size for disk sector size file.preallocate( diff --git a/crates/subspace-farmer/src/single_disk_farm/unbuffered_io_file_windows.rs b/crates/subspace-farmer/src/single_disk_farm/unbuffered_io_file_windows.rs index 8691244fd1..321681a9bf 100644 --- a/crates/subspace-farmer/src/single_disk_farm/unbuffered_io_file_windows.rs +++ b/crates/subspace-farmer/src/single_disk_farm/unbuffered_io_file_windows.rs @@ -1,9 +1,11 @@ use parking_lot::Mutex; +use static_assertions::const_assert_eq; use std::fs::{File, OpenOptions}; -use std::io::{Seek, SeekFrom}; +use std::io; use std::path::Path; -use std::{io, mem}; -use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; +use subspace_farmer_components::file_ext::FileExt; +#[cfg(windows)] +use subspace_farmer_components::file_ext::OpenOptionsExt; use subspace_farmer_components::ReadAtSync; /// 4096 is as a relatively safe size due to sector size on SSDs commonly being 512 or 4096 bytes @@ -11,14 +13,15 @@ pub const DISK_SECTOR_SIZE: usize = 4096; /// Restrict how much data to read from disk in a single call to avoid very large memory usage const MAX_READ_SIZE: usize = 1024 * 1024; +const_assert_eq!(MAX_READ_SIZE % DISK_SECTOR_SIZE, 0); + /// Wrapper data structure for unbuffered I/O on Windows. #[derive(Debug)] pub struct UnbufferedIoFileWindows { - read_file: File, - write_file: File, + file: File, physical_sector_size: usize, /// Scratch buffer of aligned memory for reads and writes - buffer: Mutex>, + scratch_buffer: Mutex>, } impl ReadAtSync for UnbufferedIoFileWindows { @@ -34,12 +37,12 @@ impl ReadAtSync for &UnbufferedIoFileWindows { } impl FileExt for UnbufferedIoFileWindows { - fn size(&mut self) -> io::Result { - self.write_file.seek(SeekFrom::End(0)) + fn size(&self) -> io::Result { + Ok(self.file.metadata()?.len()) } - fn preallocate(&mut self, len: u64) -> io::Result<()> { - self.write_file.preallocate(len) + fn preallocate(&self, len: u64) -> io::Result<()> { + self.file.preallocate(len) } fn advise_random_access(&self) -> io::Result<()> { @@ -53,40 +56,71 @@ impl FileExt for UnbufferedIoFileWindows { } fn read_exact_at(&self, buf: &mut [u8], mut offset: u64) -> io::Result<()> { - let mut buffer = self.buffer.lock(); + if buf.is_empty() { + return Ok(()); + } - // Read from disk in at most 1M chunks to avoid too high memory usage, account for offset - // that would cause extra bytes to be read from disk - for buf in buf.chunks_mut(MAX_READ_SIZE - (offset % mem::size_of::() as u64) as usize) { - // Make scratch buffer of a size that is necessary to read aligned memory, accounting - // for extra bytes at the beginning and the end that will be thrown away - let bytes_to_read = buf.len(); - let offset_in_buffer = (offset % DISK_SECTOR_SIZE as u64) as usize; - let desired_buffer_size = (bytes_to_read + offset_in_buffer).div_ceil(DISK_SECTOR_SIZE); - if buffer.len() < desired_buffer_size { - buffer.resize(desired_buffer_size, [0; DISK_SECTOR_SIZE]); - } + let mut scratch_buffer = self.scratch_buffer.lock(); - // While buffer above is allocated with granularity of `MAX_DISK_SECTOR_SIZE`, reads are - // done with granularity of physical sector size - let offset_in_buffer = (offset % self.physical_sector_size as u64) as usize; - self.read_file.read_at( - &mut buffer.flatten_mut()[..(bytes_to_read + offset_in_buffer) - .div_ceil(self.physical_sector_size) - * self.physical_sector_size], - offset / self.physical_sector_size as u64 * self.physical_sector_size as u64, - )?; + // First read up to `MAX_READ_SIZE - padding` + let padding = (offset % self.physical_sector_size as u64) as usize; + let first_unaligned_chunk_size = (MAX_READ_SIZE - padding).min(buf.len()); + let (unaligned_start, buf) = buf.split_at_mut(first_unaligned_chunk_size); + { + let bytes_to_read = unaligned_start.len(); + unaligned_start.copy_from_slice(self.read_exact_at_internal( + &mut scratch_buffer, + bytes_to_read, + offset, + )?); + offset += unaligned_start.len() as u64; + } - buf.copy_from_slice(&buffer.flatten()[offset_in_buffer..][..bytes_to_read]); + if buf.is_empty() { + return Ok(()); + } + // Process the rest of the chunks, up to `MAX_READ_SIZE` at a time + for buf in buf.chunks_mut(MAX_READ_SIZE) { + let bytes_to_read = buf.len(); + buf.copy_from_slice(self.read_exact_at_internal( + &mut scratch_buffer, + bytes_to_read, + offset, + )?); offset += buf.len() as u64; } Ok(()) } - fn write_all_at(&self, buf: &[u8], offset: u64) -> io::Result<()> { - self.write_file.write_all_at(buf, offset) + fn write_all_at(&self, buf: &[u8], mut offset: u64) -> io::Result<()> { + if buf.is_empty() { + return Ok(()); + } + + let mut scratch_buffer = self.scratch_buffer.lock(); + + // First write up to `MAX_READ_SIZE - padding` + let padding = (offset % self.physical_sector_size as u64) as usize; + let first_unaligned_chunk_size = (MAX_READ_SIZE - padding).min(buf.len()); + let (unaligned_start, buf) = buf.split_at(first_unaligned_chunk_size); + { + self.write_all_at_internal(&mut scratch_buffer, unaligned_start, offset)?; + offset += unaligned_start.len() as u64; + } + + if buf.is_empty() { + return Ok(()); + } + + // Process the rest of the chunks, up to `MAX_READ_SIZE` at a time + for buf in buf.chunks(MAX_READ_SIZE) { + self.write_all_at_internal(&mut scratch_buffer, buf, offset)?; + offset += buf.len() as u64; + } + + Ok(()) } } @@ -96,37 +130,97 @@ impl UnbufferedIoFileWindows { /// /// This abstraction is useless on other platforms and will just result in extra memory copies pub fn open(path: &Path) -> io::Result { - // Open file without unbuffered I/O for easier handling of writes - let write_file = OpenOptions::new() + let mut open_options = OpenOptions::new(); + #[cfg(windows)] + open_options.advise_unbuffered(); + let file = open_options + .read(true) .write(true) .create(true) .truncate(false) - .advise_random_access() .open(path)?; - let mut open_options = OpenOptions::new(); - #[cfg(windows)] - open_options.advise_unbuffered(); - let read_file = open_options.read(true).open(path)?; - // Physical sector size on many SSDs is smaller than 4096 and should improve performance - let physical_sector_size = if read_file.read_at(&mut [0; 512], 512).is_ok() { + let physical_sector_size = if file.read_at(&mut [0; 512], 512).is_ok() { 512 } else { DISK_SECTOR_SIZE }; Ok(Self { - read_file, - write_file, + file, physical_sector_size, - buffer: Mutex::default(), + // In many cases we'll want to read this much at once, so pre-allocate it right away + scratch_buffer: Mutex::new(vec![ + [0; DISK_SECTOR_SIZE]; + MAX_READ_SIZE / DISK_SECTOR_SIZE + ]), }) } /// Truncates or extends the underlying file, updating the size of this file to become `size`. pub fn set_len(&self, size: u64) -> io::Result<()> { - self.write_file.set_len(size) + self.file.set_len(size) + } + + fn read_exact_at_internal<'a>( + &self, + scratch_buffer: &'a mut Vec<[u8; DISK_SECTOR_SIZE]>, + bytes_to_read: usize, + offset: u64, + ) -> io::Result<&'a [u8]> { + // Make scratch buffer of a size that is necessary to read aligned memory, accounting + // for extra bytes at the beginning and the end that will be thrown away + let offset_in_buffer = (offset % DISK_SECTOR_SIZE as u64) as usize; + let desired_buffer_size = (bytes_to_read + offset_in_buffer).div_ceil(DISK_SECTOR_SIZE); + if scratch_buffer.len() < desired_buffer_size { + scratch_buffer.resize(desired_buffer_size, [0; DISK_SECTOR_SIZE]); + } + + // While buffer above is allocated with granularity of `MAX_DISK_SECTOR_SIZE`, reads are + // done with granularity of physical sector size + let offset_in_buffer = (offset % self.physical_sector_size as u64) as usize; + self.file.read_exact_at( + &mut scratch_buffer.flatten_mut()[..(bytes_to_read + offset_in_buffer) + .div_ceil(self.physical_sector_size) + * self.physical_sector_size], + offset / self.physical_sector_size as u64 * self.physical_sector_size as u64, + )?; + + Ok(&scratch_buffer.flatten()[offset_in_buffer..][..bytes_to_read]) + } + + /// Panics on writes over `MAX_READ_SIZE` (including padding on both ends) + fn write_all_at_internal( + &self, + scratch_buffer: &mut Vec<[u8; DISK_SECTOR_SIZE]>, + bytes_to_write: &[u8], + offset: u64, + ) -> io::Result<()> { + // This is guaranteed by `UnbufferedIoFileWindows::open()` + assert!(scratch_buffer.flatten().len() >= MAX_READ_SIZE); + + let aligned_offset = + offset / self.physical_sector_size as u64 * self.physical_sector_size as u64; + let padding = (offset - aligned_offset) as usize; + // Calculate the size of the read including padding on both ends + let bytes_to_read = (padding + bytes_to_write.len()).div_ceil(self.physical_sector_size) + * self.physical_sector_size; + + if padding == 0 && bytes_to_read == bytes_to_write.len() { + let scratch_buffer = &mut scratch_buffer.flatten_mut()[..bytes_to_read]; + scratch_buffer.copy_from_slice(bytes_to_write); + self.file.write_all_at(scratch_buffer, offset)?; + } else { + // Read whole pages where `bytes_to_write` will be written + self.read_exact_at_internal(scratch_buffer, bytes_to_read, aligned_offset)?; + let scratch_buffer = &mut scratch_buffer.flatten_mut()[..bytes_to_read]; + // Update contents of existing pages and write into the file + scratch_buffer[padding..][..bytes_to_write.len()].copy_from_slice(bytes_to_write); + self.file.write_all_at(scratch_buffer, aligned_offset)?; + } + + Ok(()) } } @@ -137,14 +231,14 @@ mod tests { }; use rand::prelude::*; use std::fs; - use subspace_farmer_components::ReadAtSync; + use subspace_farmer_components::file_ext::FileExt; use tempfile::tempdir; #[test] fn basic() { let tempdir = tempdir().unwrap(); let file_path = tempdir.as_ref().join("file.bin"); - let mut data = vec![0u8; MAX_READ_SIZE * 3]; + let mut data = vec![0u8; MAX_READ_SIZE * 5]; thread_rng().fill(data.as_mut_slice()); fs::write(&file_path, &data).unwrap(); @@ -157,16 +251,59 @@ mod tests { let mut buffer = Vec::new(); for (offset, size) in [ + (0_usize, 512_usize), (0_usize, 4096_usize), + (0, 500), (0, 4000), (5, 50), - (5, 4091), - (4091, 5), + (12, 500), + (96, 4000), + (4000, 96), (10000, 5), + (0, MAX_READ_SIZE), + (0, MAX_READ_SIZE * 2), + (5, MAX_READ_SIZE - 5), + (5, MAX_READ_SIZE * 2 - 5), + (5, MAX_READ_SIZE), (5, MAX_READ_SIZE * 2), + (MAX_READ_SIZE, MAX_READ_SIZE), + (MAX_READ_SIZE, MAX_READ_SIZE * 2), + (MAX_READ_SIZE + 5, MAX_READ_SIZE - 5), + (MAX_READ_SIZE + 5, MAX_READ_SIZE * 2 - 5), + (MAX_READ_SIZE + 5, MAX_READ_SIZE), + (MAX_READ_SIZE + 5, MAX_READ_SIZE * 2), ] { + let data = &mut data[offset..][..size]; buffer.resize(size, 0); - file.read_at(buffer.as_mut_slice(), offset as u64) + // Read contents + file.read_exact_at(buffer.as_mut_slice(), offset as u64) + .unwrap_or_else(|error| { + panic!( + "Offset {offset}, size {size}, override physical sector size \ + {override_physical_sector_size:?}: {error}" + ) + }); + + // Ensure it is correct + assert_eq!( + data, + buffer.as_slice(), + "Offset {offset}, size {size}, override physical sector size \ + {override_physical_sector_size:?}" + ); + + // Update data with random contents and write + thread_rng().fill(data); + file.write_all_at(data, offset as u64) + .unwrap_or_else(|error| { + panic!( + "Offset {offset}, size {size}, override physical sector size \ + {override_physical_sector_size:?}: {error}" + ) + }); + + // Read contents again + file.read_exact_at(buffer.as_mut_slice(), offset as u64) .unwrap_or_else(|error| { panic!( "Offset {offset}, size {size}, override physical sector size \ @@ -174,8 +311,9 @@ mod tests { ) }); + // Ensure it is correct too assert_eq!( - &data[offset..][..size], + data, buffer.as_slice(), "Offset {offset}, size {size}, override physical sector size \ {override_physical_sector_size:?}"