Skip to content

Commit 92084f7

Browse files
authored
[BUG][wal3] Fix a double-load that leads to inconsistent views of the world. (#5781)
## Description of changes This fixes a bug with the following behavior: - Writer A creates empty log L_1 - Writer A calls copy(L_1) -> L_2 - Writer A reads the empty manifest in copy - Concurrenty, writer B appends to L_1 - Writer A re-reads the log and uses B's current offsets to populate its next-to-fill-in slots. The result is that when used in forking, the compaction cursor is now behind the next-to-fill-in-slot. Tests from Claude. ## Test plan CI + cargo test -p wal3 locally ## Migration plan N/A ## Observability plan N/A ## Documentation Changes N/A
1 parent 14bf86b commit 92084f7

File tree

4 files changed

+822
-13
lines changed

4 files changed

+822
-13
lines changed

rust/wal3/src/copy.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,18 @@ pub async fn copy(
1515
offset: LogPosition,
1616
target: String,
1717
) -> Result<(), Error> {
18-
let fragments = match reader.scan(offset, Limits::UNLIMITED).await {
19-
Ok(fragments) => fragments,
20-
Err(Error::UninitializedLog) => vec![],
21-
Err(err) => return Err(err),
22-
};
18+
let reference = reader
19+
.manifest()
20+
.await?
21+
.unwrap_or(Manifest::new_empty("zero-copy task"));
22+
let mut short_read = false;
23+
let fragments = reader
24+
.scan_with_cache(&reference, offset, Limits::UNLIMITED, &mut short_read)
25+
.await?;
26+
if short_read {
27+
tracing::error!("short_read in unlimited copy");
28+
return Err(Error::Internal);
29+
}
2330
if !fragments.is_empty() {
2431
let mut futures = vec![];
2532
for fragment in fragments.into_iter() {
@@ -63,10 +70,6 @@ pub async fn copy(
6370
};
6471
Manifest::initialize_from_manifest(options, storage, &target, manifest).await?;
6572
} else {
66-
let reference = reader
67-
.manifest()
68-
.await?
69-
.unwrap_or(Manifest::new_empty("zero-copy task"));
7073
let setsum = Setsum::default();
7174
let collected = Setsum::default();
7275
let acc_bytes = 0;

rust/wal3/src/reader.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,20 @@ impl LogReader {
132132
return Err(Error::UninitializedLog);
133133
};
134134
let mut short_read = false;
135-
self.scan_with_cache(manifest, from, limits, &mut short_read)
135+
self.scan_with_cache(&manifest, from, limits, &mut short_read)
136136
.await
137137
}
138138

139-
async fn scan_with_cache(
139+
/// Scan up to:
140+
/// 1. Up to, but not including, the offset of the log position. This makes it a half-open
141+
/// interval.
142+
/// 2. Up to, and including, the number of files to return.
143+
///
144+
/// This differs from scan in that it takes a loaded manifest.
145+
/// This differs from scan_from_manifest because it will load snapshots.
146+
pub async fn scan_with_cache(
140147
&self,
141-
manifest: Manifest,
148+
manifest: &Manifest,
142149
from: LogPosition,
143150
limits: Limits,
144151
short_read: &mut bool,
@@ -344,7 +351,7 @@ impl LogReader {
344351
let from = manifest.oldest_timestamp();
345352
let mut short_read = false;
346353
let fragments = self
347-
.scan_with_cache(manifest, from, limits, &mut short_read)
354+
.scan_with_cache(&manifest, from, limits, &mut short_read)
348355
.await
349356
.map_err(|x| vec![x])?;
350357
let futures = fragments

0 commit comments

Comments
 (0)