Skip to content

Commit

Permalink
Add annotations to ensure that readers are Sync. (#56)
Browse files Browse the repository at this point in the history
* Add annotations to ensure that readers are Sync.
* Update lz4 version constraint.
  • Loading branch information
macklin-10x authored Jan 7, 2025
1 parent 54f2453 commit f891dfd
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ full-test = []
[dependencies]
bincode = "1.3"
byteorder = "1.3.0"
lz4 = "1"
lz4 = ">=1.28.1"
anyhow = "1"
min-max-heap = "1.2.2"
serde = { version = "1.0", features = ["derive"] }
Expand Down
12 changes: 11 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1395,6 +1395,16 @@ where
}
}

/// Ensure that readers are Sync.
#[allow(dead_code)]
const fn assert_readers_are_sync() {
const fn takes_sync<T: Sync>() {}
takes_sync::<ShardReader<usize, DefaultSort>>();
takes_sync::<RangeIter<'static, usize, DefaultSort>>();
takes_sync::<ShardIter<'static, usize, DefaultSort>>();
takes_sync::<MergeIterator<'static, usize, DefaultSort>>();
}

#[cfg(test)]
mod shard_tests {
use super::*;
Expand Down Expand Up @@ -1658,7 +1668,7 @@ mod shard_tests {
where
T: 'static + Serialize + DeserializeOwned + Clone + Send + Eq + Debug + Hash,
S: SortKey<T>,
<S as SortKey<T>>::Key: 'static + Send + Serialize + DeserializeOwned,
<S as SortKey<T>>::Key: 'static + Send + Sync + Serialize + DeserializeOwned,
{
let mut files = Vec::new();

Expand Down
38 changes: 17 additions & 21 deletions src/unsorted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ where
{
/// Iterator over the shard files in the set, opening readers.
shard_reader_iter:
Box<dyn Iterator<Item = Result<UnsortedShardFileReader<T, S>, Error>> + Send>,
Box<dyn Iterator<Item = Result<UnsortedShardFileReader<T, S>, Error>> + Send + Sync>,

/// Iterator over the current shard file.
active_shard_reader: Option<UnsortedShardFileReader<T, S>>,
Expand All @@ -42,22 +42,16 @@ where
impl<T, S> UnsortedShardReader<T, S>
where
T: DeserializeOwned,
<S as SortKey<T>>::Key: Clone + Ord + DeserializeOwned + Send,
<S as SortKey<T>>::Key: Clone + Ord + DeserializeOwned + Send + Sync + 'static,
S: SortKey<T>,
{
/// Open a single shard file.
pub fn open<P: AsRef<Path>>(shard_file: P) -> Self
where
<S as SortKey<T>>::Key: 'static,
{
pub fn open<P: AsRef<Path>>(shard_file: P) -> Self {
Self::open_set(&[shard_file])
}

/// Open a set of shard files.
pub fn open_set<P: AsRef<Path>>(shard_files: &[P]) -> Self
where
<S as SortKey<T>>::Key: 'static,
{
pub fn open_set<P: AsRef<Path>>(shard_files: &[P]) -> Self {
let reader_iter = shard_files
.iter()
.map(|f| f.as_ref().into())
Expand All @@ -71,10 +65,7 @@ where
}

/// Compute the total number of elements in this set of shard files.
pub fn len<P: AsRef<Path>>(shard_files: &[P]) -> Result<usize, Error>
where
<S as SortKey<T>>::Key: 'static,
{
pub fn len<P: AsRef<Path>>(shard_files: &[P]) -> Result<usize, Error> {
// Create a set reader, and consume all the files, just getting counts.
let files_reader = Self::open_set(shard_files);
let mut count = 0;
Expand Down Expand Up @@ -138,7 +129,7 @@ where
impl<T, S> Iterator for UnsortedShardReader<T, S>
where
T: DeserializeOwned,
<S as SortKey<T>>::Key: Clone + Ord + DeserializeOwned + Send,
<S as SortKey<T>>::Key: Clone + Ord + DeserializeOwned + Send + Sync + 'static,
S: SortKey<T>,
{
type Item = Result<T, Error>;
Expand Down Expand Up @@ -178,23 +169,20 @@ where
S: SortKey<T>,
{
count: usize,
file_index_iter: Box<dyn Iterator<Item = KeylessShardRecord> + Send>,
file_index_iter: Box<dyn Iterator<Item = KeylessShardRecord> + Send + Sync>,
shard_iter: Option<UnsortedShardIter<T>>,
phantom: PhantomData<S>,
}

impl<T, S> UnsortedShardFileReader<T, S>
where
T: DeserializeOwned,
<S as SortKey<T>>::Key: Clone + Ord + DeserializeOwned + Send,
<S as SortKey<T>>::Key: Clone + Ord + DeserializeOwned + Send + Sync + 'static,
S: SortKey<T>,
{
/// Create a unsorted reader for a single shard file.
/// Return Ok(None) if the specified shard file is empty.
pub fn new(path: &Path) -> Result<Option<Self>, Error>
where
<S as SortKey<T>>::Key: 'static,
{
pub fn new(path: &Path) -> Result<Option<Self>, Error> {
let reader = ShardReaderSingle::<T, S>::open(path)?;
let count = reader.len();
let mut file_index_iter = reader.index.into_iter().map(|r| KeylessShardRecord {
Expand Down Expand Up @@ -348,3 +336,11 @@ struct SkipResult {
skipped: usize,
exhausted: bool,
}

/// Ensure that readers are Sync.
#[allow(dead_code)]
const fn assert_readers_are_sync() {
const fn takes_sync<T: Sync>() {}
takes_sync::<UnsortedShardFileReader<usize, DefaultSort>>();
takes_sync::<UnsortedShardReader<usize, DefaultSort>>();
}

0 comments on commit f891dfd

Please sign in to comment.