diff --git a/src/object_store/cache.rs b/src/object_store/cache.rs index bcbf22c4..5f85f845 100644 --- a/src/object_store/cache.rs +++ b/src/object_store/cache.rs @@ -3,7 +3,7 @@ /// with some additions to weigh it by the file size. use crate::config::schema::str_to_hex_hash; use async_trait::async_trait; -use bytes::{Buf, BufMut, Bytes}; +use bytes::{Buf, Bytes, BytesMut}; use futures::stream::BoxStream; use moka::future::{Cache, CacheBuilder, FutureExt}; use moka::notification::RemovalCause; @@ -18,7 +18,6 @@ use std::fmt::{Debug, Formatter}; use std::fs::remove_dir_all; use std::io; -use std::io::ErrorKind; use std::ops::Range; use std::path::{Path, PathBuf}; @@ -41,20 +40,14 @@ impl CacheFileManager { Self { base_path } } - async fn write_file( - &self, - cache_key: &CacheKey, - data: Arc, - ) -> io::Result { + async fn write_file(&self, cache_key: &CacheKey, data: Bytes) -> io::Result { let mut path = self.base_path.to_path_buf(); path.push(cache_key.as_filename()); - // Should this happen normally? + // TODO: when does this happen? if path.exists() { - return Err(io::Error::new( - ErrorKind::Other, - "Internal error: cached file path already exists", - )); + debug!("{cache_key:?} file already exists, skipping write"); + return Ok(path.clone()); } tokio::fs::write(&path, data.as_ref()).await?; @@ -88,12 +81,18 @@ impl Drop for CacheFileManager { } } -#[derive(Debug, Clone, Hash, Eq, PartialEq)] +#[derive(Clone, Hash, Eq, PartialEq)] pub struct CacheKey { path: object_store::path::Path, range: Range, } +impl Debug for CacheKey { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}-{:?}", self.path, self.range) + } +} + impl CacheKey { fn as_filename(&self) -> String { format!( @@ -108,7 +107,7 @@ impl CacheKey { #[derive(Clone)] pub enum CacheValue { File(PathBuf, usize), - Memory(Arc), + Memory(Bytes), } impl CacheValue { @@ -180,7 +179,6 @@ impl CachingObjectStore { // Clone the pointer since we can't pass the whole struct to the cache let eviction_file_manager = file_manager.clone(); - // TODO: experiment with time-to-idle let cache: Cache = CacheBuilder::new(max_cache_size) .weigher(|_, v: &CacheValue| v.size() as u32) .async_eviction_listener(move |k, v, cause| { @@ -217,88 +215,145 @@ impl CachingObjectStore { } } - /// Get a certain range chunk, delineated in units of self.min_fetch_size. If the chunk - /// is cached, return it directly. Otherwise, fetch and return it. + /// Get a continuous range of chunks, each delineated in units of self.min_fetch_size. The main + /// goal is to coalesce fetching of any missing chunks in batches of maximum range to minimize + // the outgoing calls needed to satisfy the incoming call. /// - /// There is some nuance in the fetching process worth elaborating on. Namely, when there is a - /// cache miss: - /// - All concurrent calls for the same missing chunk are coalesced, thanks to `Cache::try_get_with`, - /// and so there is only one outbound request for the data. - /// - As soon as the data is fetched we: - /// a) spawn a task to persist the data to the disk - /// b) add a cache entry with the in-memory data (which is shared by the write task) - /// - Subsequently, all waiting calls are unblocked and will get the new cache value that is - /// either read from disk (if the write task finished quickly enough), or from memory (if the - /// write task is still running). + /// The algorithm works as follows: + /// - If a chunk is missing from the cache add it to a batch of pending chunks to fetch at once. + /// - If the chunk is present in the cache fetch the entire pending chunk batch (if any) and + /// a) add a cache entry with the in-memory data (which is shared by the write task) + /// b) spawn a task to persist the data to the disk + /// - All subsequent calls will get the new cache value that is either read from disk (if the + /// write task finished quickly enough), or from memory (if the write task is still running). /// - Once the write task completes it will either replace the cache value with a file pointer, /// (if it completed successfully), or invalidate the memory entry (if it didn't). - async fn get_chunk( + /// + /// NB: This is a best-effort implementation, i.e. there are no synchronization primitives used + /// so there is no guarantee that another thread won't duplicate some of the requests. + async fn get_chunk_range( &self, - path: &object_store::path::Path, - chunk: u32, - ) -> Result { - let range = (chunk as usize * self.min_fetch_size as usize) - ..((chunk + 1) as usize * self.min_fetch_size as usize); - - let key = CacheKey { - path: path.to_owned(), - range: range.clone(), - }; - - let value = self - .cache - .try_get_with::<_, object_store::Error>(key.clone(), async move { - // The Arc here is solely to avoid copying the data into the closure below, as the - // writing can be done through a reference as well. - debug!("Fetching data for {key:?}"); - let data = Arc::new(self.inner.get_range(path, range).await?); - - // Run the blocking write + cache value insert in a separate task - let cache = self.cache.clone(); - let file_manager = self.file_manager.clone(); - let size = data.len(); - let data_to_write = data.clone(); - tokio::task::spawn(async move { - match file_manager.write_file(&key, data_to_write).await { - Ok(path) => { - // Write task completed successfully, replace the in-memory cache entry - // with the file-pointer one. - debug!("Upserting file pointer for {key:?} into the cache"); - let value = CacheValue::File(path, size); - cache.insert(key, value).await; + location: &object_store::path::Path, + start_chunk: usize, + end_chunk: usize, + ) -> object_store::Result { + let mut result = BytesMut::with_capacity( + (end_chunk.saturating_sub(start_chunk) + 1) * self.min_fetch_size as usize, + ); + + let mut chunk_batch = vec![]; + for chunk in start_chunk..=end_chunk { + let chunk_range = (chunk * self.min_fetch_size as usize) + ..((chunk + 1) * self.min_fetch_size as usize); + + let key = CacheKey { + path: location.to_owned(), + range: chunk_range.clone(), + }; + + let chunk_data = match self.cache.get(&key).await { + // If the value is missing extend the chunk range to fetch and continue + None => { + chunk_batch.push(key); + None + } + Some(value) => { + // Now get the cache value for the current chunk + match value { + CacheValue::Memory(data) => { + debug!("Cache value for {key:?} fetched from memory"); + Some(data.clone()) } - Err(err) => { - // Write task failed, remove the cache entry; we could also defer that to - // TTL/LRU eviction, but then we risk ballooning the memory usage. - warn!("Failed writing value for {key:?} to a file: {err}"); - warn!("Removing cache entry"); - cache.invalidate(&key).await; + CacheValue::File(path, _) => { + debug!("Cache value for {key:?} fetching from the file"); + match self.file_manager.read_file(path).await { + Ok(data) => Some(data), + Err(err) => { + warn!( + "Re-downloading cache value for {key:?}: {err}" + ); + let data = self + .inner + .get_range(location, chunk_range.clone()) + .await?; + self.cache_chunk_data(key, data.clone()).await; + Some(data) + } + } } } - }); + } + }; - // While the write task runs cache the in-memory bytes and serve that to all calls - // prior to the write task completing. - debug!("Caching value for ({path:?}, {chunk}) in memory"); - Ok(CacheValue::Memory(data)) - }) - .await - .map_err(|e| object_store::Error::Generic { - store: "cache_store", - source: Box::new(e), - })?; - - match value { - CacheValue::File(path, _) => { - debug!("Cache value for ({path:?}, {chunk}) fetched from the file"); - let data = self.file_manager.read_file(path).await.unwrap(); - Ok(data) + if (chunk_data.is_some() || chunk == end_chunk) && !chunk_batch.is_empty() { + // We either got a value, or are at the last chunk, so first we need to resolve any + // outstanding coalesced chunk requests thus far. + let first = chunk_batch.first().unwrap(); + let last = chunk_batch.last().unwrap(); + + let batch_range = first.range.start..last.range.end; + debug!("{location}-{batch_range:?} fetching"); + let mut batch_data = + self.inner.get_range(location, batch_range.clone()).await?; + debug!("{location}-{batch_range:?} fetched"); + result.extend_from_slice(&batch_data); + + for key in &chunk_batch { + // Split the next chunk from the batch + let data = if batch_data.len() < self.min_fetch_size as usize { + batch_data.clone() + } else { + batch_data.split_to(self.min_fetch_size as usize) + }; + + self.cache_chunk_data(key.clone(), data).await; + } + + chunk_batch = vec![]; } - CacheValue::Memory(data) => { - debug!("Cache value for ({path:?}, {chunk}) fetched from memory"); - Ok(data.as_ref().clone()) + + // Finally append the current chunk data (if not included in the batch above). + if let Some(data) = chunk_data { + result.extend_from_slice(&data); } } + + Ok(result.into()) + } + + async fn cache_chunk_data(&self, key: CacheKey, data: Bytes) { + // Cache the memory value + let entry = self + .cache + .entry_by_ref(&key) + .or_insert(CacheValue::Memory(data.clone())) + .await; + + // Finally trigger persisting to disk + if entry.is_fresh() { + let cache = self.cache.clone(); + let file_manager = self.file_manager.clone(); + tokio::spawn(async move { + // Run pending tasks to avert eviction races. + cache.run_pending_tasks().await; + let size = data.len(); + match file_manager.write_file(&key, data).await { + Ok(path) => { + // Write task completed successfully, replace the in-memory cache entry + // with the file-pointer one. + debug!("Upserting file pointer for {key:?} into the cache"); + let value = CacheValue::File(path, size); + cache.insert(key, value).await; + } + Err(err) => { + // Write task failed, remove the cache entry; we could also defer that to + // TTL/LRU eviction, but then we risk ballooning the memory usage. + warn!("Invalidating cache entry for {key:?}; failed writing to a file: {err}"); + cache.invalidate(&key).await; + } + } + }); + } } } @@ -362,6 +417,7 @@ impl ObjectStore for CachingObjectStore { location: &object_store::path::Path, range: Range, ) -> object_store::Result { + debug!("{location}-{range:?} get_range"); // Expand the range to the next max_fetch_size (+ alignment) let start_chunk = range.start / self.min_fetch_size as usize; // The final chunk to fetch (inclusively). E.g. with min_fetch_size = 16: @@ -369,43 +425,16 @@ impl ObjectStore for CachingObjectStore { // - range.end == 65 (get bytes 0..64 exclusive) -> final chunk is 64 / 16 = 4 (64..72 exclusive) let end_chunk = (range.end - 1) / self.min_fetch_size as usize; - let mut result = Vec::with_capacity(range.end - range.start); - - for chunk_num in start_chunk..(end_chunk + 1) { - let mut data = self.get_chunk(location, chunk_num as u32).await?; - let data_len = data.len(); - - let buf_start = if chunk_num == start_chunk { - let buf_start = range.start % self.min_fetch_size as usize; - data.advance(buf_start); - buf_start - } else { - 0usize - }; + let mut data = self + .get_chunk_range(location, start_chunk, end_chunk) + .await?; - let buf_end = if chunk_num == end_chunk { - let buf_end = range.end % self.min_fetch_size as usize; - - // if min_fetch_size = 16 and buf_end = 64, we want to load everything - // from the final buffer, instead of 0. - if buf_end != 0 { - buf_end - } else { - self.min_fetch_size as usize - } - } else { - data_len - }; - - debug!( - "Read {} bytes from the buffer for chunk {}, slicing out {}..{}", - data_len, chunk_num, buf_start, buf_end - ); - - result.put(data.take(buf_end - buf_start)); - } - - Ok(result.into()) + // Finally trim away the expanded range from the chunks that are outside the requested range + let offset = range.start - start_chunk * self.min_fetch_size as usize; + data.advance(offset); + data.truncate(range.end - range.start); + debug!("{location}-{range:?} return"); + Ok(data) } async fn head( @@ -614,8 +643,8 @@ mod tests { assert_eq!(bytes, body[25..64]); store.cache.run_pending_tasks().await; - // Mock has had 3 requests - assert_eq!(server.received_requests().await.unwrap().len(), 3); + // Mock has had 1 request that coalesced 3 chunks + assert_eq!(server.received_requests().await.unwrap().len(), 1); assert_eq!(store.cache.entry_count(), 3); let on_disk_keys = wait_all_ranges_on_disk(HashSet::new(), &store).await; @@ -630,7 +659,7 @@ mod tests { store.cache.run_pending_tasks().await; // No extra requests - assert_eq!(server.received_requests().await.unwrap().len(), 3); + assert_eq!(server.received_requests().await.unwrap().len(), 1); assert_eq!(store.cache.entry_count(), 3); assert_ranges_in_cache(&store.base_path, &url, vec![1, 2, 3]); @@ -643,7 +672,7 @@ mod tests { store.cache.run_pending_tasks().await; // One extra request to fetch chunk 4 - assert_eq!(server.received_requests().await.unwrap().len(), 4); + assert_eq!(server.received_requests().await.unwrap().len(), 2); assert_eq!(store.cache.entry_count(), 4); let mut on_disk_keys = wait_all_ranges_on_disk(on_disk_keys, &store).await; @@ -657,7 +686,7 @@ mod tests { assert_eq!(bytes, body[80..85]); store.cache.run_pending_tasks().await; - assert_eq!(server.received_requests().await.unwrap().len(), 5); + assert_eq!(server.received_requests().await.unwrap().len(), 3); assert_eq!(store.cache.entry_count(), 4); on_disk_keys.retain(|k| k.range.start >= 32); // The first chunk got LRU-evicted