Skip to content

Commit 9bda3dc

Browse files
authored
[TST] More benchmark queries for regex (#4910)
## Description of changes _Summarize the changes made by this PR._ - Improvements & Bug fixes - This PR adds more regex patterns in the benchmark. The benchmark also serve as an integration for regex as it compares the result with bruteforce evaluation. - Updates a few dependencies. Verified that there should be no breaking change - Updates some wal3 test because fragment size changed after dependency. The existing fragment should be compatible and manifest should still be valid - New functionality - N/A ## Test plan _How are these changes tested?_ - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
1 parent 981a93c commit 9bda3dc

File tree

11 files changed

+647
-178
lines changed

11 files changed

+647
-178
lines changed

Cargo.lock

Lines changed: 290 additions & 76 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,15 @@ members = [
3535
]
3636

3737
[workspace.dependencies]
38-
arrow = "52.2.0"
38+
arrow = "55.1"
3939
async-trait = "0.1"
4040
axum = { version = "0.8", features = ["macros"] }
4141
bytes = "1.10"
4242
chrono = { version = "0.4", features = ["serde"] }
4343
clap = { version = "4", features = ["derive"] }
4444
criterion = { version = "0.5", features = ["async_tokio"] }
4545
figment = { version = "0.10.12", features = ["env", "yaml", "test"] }
46-
flatbuffers = "24.3.25"
46+
flatbuffers = "25.2.10"
4747
futures = "0.3"
4848
futures-core = "0.3"
4949
http-body-util = "0.1.3"
@@ -56,7 +56,7 @@ opentelemetry-otlp = { version = "0.27", features = ["http-proto"] }
5656
opentelemetry-http = { version = "0.27", features = ["reqwest"] }
5757
opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] }
5858
parking_lot = { version = "0.12.3", features = ["serde"] }
59-
parquet = "52"
59+
parquet = { version = "55.1", features = ["async"] }
6060
pin-project = "1.1.10"
6161
prost = "0.13"
6262
prost-types = "0.13.5"

rust/benchmark/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ path = "src/lib.rs"
88

99
[dependencies]
1010
anyhow = "1.0.93"
11+
arrow = { workspace = true }
1112
async-tempfile = "0.6.0"
1213
async-compression = { version = "0.4.18", features = ["tokio", "gzip", "bzip2"] }
1314

1415
bincode = { workspace = true }
1516
criterion = { workspace = true }
1617
futures = { workspace = true }
18+
parquet = { workspace = true }
1719
rand = { workspace = true }
1820
serde = { workspace = true }
1921
serde_json = { workspace = true }
@@ -26,3 +28,4 @@ reqwest = { workspace = true, features = ["stream"] }
2628
tokio-stream = { version = "0.1.16", features = ["full"] }
2729
tokio-util = "0.7.12"
2830
bloom = "0.3.2"
31+
hf-hub = { version = "0.4.1", features = ["tokio"] }

rust/benchmark/src/datasets/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub mod util;
33

44
pub mod gist;
55
pub mod ms_marco_queries;
6+
pub mod rust;
67
pub mod scidocs;
78
pub mod sift;
89
pub mod wikipedia;
Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,52 @@
1-
use anyhow::{anyhow, Result};
2-
use futures::io::BufReader;
3-
use tokio::{
4-
fs::{File, ReadDir},
5-
io::AsyncWriteExt,
6-
};
1+
use std::path::PathBuf;
72

8-
use super::util::get_or_populate_cached_dataset_file;
3+
use anyhow::{Context, Result};
4+
use arrow::array::AsArray;
5+
use futures::{stream, StreamExt, TryStreamExt};
6+
use hf_hub::api::tokio::Api;
7+
use parquet::arrow::ParquetRecordBatchStreamBuilder;
8+
use tokio::fs::File;
99

10-
pub struct RustStack {
11-
pub train: BufReader<File>,
10+
pub struct TheStackDedupRust {
11+
pub shard_paths: Vec<PathBuf>,
1212
}
1313

14-
impl RustStack {
14+
impl TheStackDedupRust {
1515
pub async fn init() -> Result<Self> {
16-
let dir = read_dir("~/Desktop/rust-stack");
16+
let mut shard_paths = Vec::new();
17+
let api = Api::new()?;
18+
let dataset = api.dataset("bigcode/the-stack-dedup".to_string());
19+
for i in 0..21 {
20+
let shard_path = format!("data/rust/data-{:05}-of-00021.parquet", i);
21+
let local_path = dataset.get(&shard_path).await?;
22+
shard_paths.push(local_path);
23+
}
24+
25+
Ok(Self { shard_paths })
26+
}
27+
28+
pub async fn documents(&self) -> Result<Vec<String>> {
29+
let mut shard_streams = Vec::new();
30+
for shard_path in &self.shard_paths {
31+
let file = File::open(shard_path).await?;
32+
let shard_stream = ParquetRecordBatchStreamBuilder::new(file).await?.build()?;
33+
shard_streams.push(shard_stream);
34+
}
35+
let batches = stream::iter(shard_streams)
36+
.flatten()
37+
.try_collect::<Vec<_>>()
38+
.await?;
39+
let mut documents = Vec::with_capacity(batches.iter().map(|batch| batch.num_rows()).sum());
40+
for batch in batches {
41+
documents.extend(
42+
batch
43+
.column_by_name("content")
44+
.context("Inspecting content column")?
45+
.as_string::<i32>()
46+
.iter()
47+
.map(|doc| doc.unwrap_or_default().to_string()),
48+
);
49+
}
50+
Ok(documents)
1751
}
1852
}

rust/benchmark/src/datasets/util.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,17 @@ async fn get_dataset_cache_path(
2525

2626
/// Calls the populate callback to create a cached dataset file if it doesn't exist, and returns the path to the cached file.
2727
pub(crate) async fn get_or_populate_cached_dataset_file<F, Fut>(
28-
dataset_name: &str,
29-
file_name: &str,
28+
dataset_name: impl AsRef<str>,
29+
file_name: impl AsRef<str>,
3030
cache_dir: Option<PathBuf>,
3131
populate: F,
3232
) -> Result<PathBuf>
3333
where
3434
F: FnOnce(Box<dyn AsyncWrite + Unpin + Send>) -> Fut,
3535
Fut: Future<Output = Result<()>>,
3636
{
37-
let dataset_dir = get_dataset_cache_path(dataset_name, cache_dir).await?;
38-
let file_path = dataset_dir.join(file_name);
37+
let dataset_dir = get_dataset_cache_path(dataset_name.as_ref(), cache_dir).await?;
38+
let file_path = dataset_dir.join(file_name.as_ref());
3939

4040
if !file_path.exists() {
4141
// We assume that dataset creation was successful if the file exists, so we use a temporary file to avoid scenarios where the file is partially written and then the callback fails.

rust/index/benches/literal.rs

Lines changed: 147 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
use std::fs::{read_dir, File};
2-
3-
use arrow::array::AsArray;
4-
use chroma_benchmark::benchmark::{bench_run, tokio_multi_thread};
1+
use chroma_benchmark::{
2+
benchmark::{bench_run, tokio_multi_thread},
3+
datasets::rust::TheStackDedupRust,
4+
};
55
use chroma_blockstore::{
66
arrow::provider::{ArrowBlockfileProvider, BlockfileReaderOptions},
77
provider::BlockfileProvider,
@@ -12,56 +12,149 @@ use chroma_index::fulltext::types::{DocumentMutation, FullTextIndexReader, FullT
1212
use chroma_storage::{local::LocalStorage, Storage};
1313
use chroma_types::regex::{literal_expr::NgramLiteralProvider, ChromaRegex};
1414
use criterion::{criterion_group, criterion_main, Criterion};
15-
use indicatif::{ParallelProgressIterator, ProgressIterator};
16-
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
17-
use rayon::iter::{IntoParallelIterator, ParallelIterator};
1815
use tantivy::tokenizer::NgramTokenizer;
1916
use tempfile::tempdir;
2017

2118
const BLOCK_SIZE: usize = 1 << 23;
22-
const MAX_CODE_LENGTH: usize = 1 << 12;
2319

24-
const FTS_PATTERNS: &[&str] = &[r"unreachable", r"unsafe", r"use std::collections::HashMap;"];
20+
const FTS_PATTERNS: &[&str] = &[
21+
r"std::ptr::",
22+
r"env_logger::",
23+
r"tracing::",
24+
r"futures::",
25+
r"tokio::",
26+
r"async_std::",
27+
r"crossbeam::",
28+
r"atomic::",
29+
r"mpsc::",
30+
r"Some(",
31+
r"Ok(",
32+
r"Err(",
33+
r"None",
34+
r"unwrap()",
35+
r"expect()",
36+
r"clone()",
37+
r"Box::new",
38+
r"Rc::new",
39+
r"RefCell::new",
40+
r"debug!(",
41+
r"error!(",
42+
r"warn!(",
43+
r"panic!(",
44+
r"todo!(",
45+
r"join!(",
46+
r"select!(",
47+
r"unimplemented!(",
48+
r"std::mem::transmute",
49+
r"std::ffi::",
50+
r"thread::sleep",
51+
r"std::fs::File::open",
52+
r"std::net::TcpListener",
53+
r"use serde::",
54+
r"use rand::",
55+
r"use tokio::",
56+
r"use futures::",
57+
r"use anyhow::",
58+
r"use thiserror::",
59+
r"use chrono::",
60+
r"serde::Serialize",
61+
r"serde::Deserialize",
62+
r"regex::Regex::new",
63+
r"chrono::DateTime",
64+
r"uuid::Uuid::new_v4",
65+
r"proc_macro::TokenStream",
66+
r"assert_eq!(",
67+
r"assert_ne!(",
68+
r"#[allow(dead_code)]",
69+
r"#[allow(unused)]",
70+
r"#[allow(unused_variables)]",
71+
r"#[allow(unused_mut)]",
72+
r"#[allow",
73+
r"#[deny",
74+
r"#[warn",
75+
r"#[cfg",
76+
r"#[feature",
77+
r"#[derive(",
78+
r"#[proc_macro]",
79+
r"#[proc_macro_derive(",
80+
r"#[proc_macro_attribute]",
81+
r"#[test]",
82+
r"#[tokio::test]",
83+
r"///",
84+
r"//!",
85+
r"test_",
86+
r"_tmp",
87+
r"_old",
88+
];
2589
const REGEX_PATTERNS: &[&str] = &[
26-
r"\.collect::<.+>()",
27-
r"(?i)(TODO|FIXME)",
28-
r"!\[allow\(clippy::.+\)\]",
90+
r"(?m)^\s*fn\s+\w+",
91+
r"(?m)^\s*pub\s+fn\s+\w+",
92+
r"(?m)^\s*async\s+fn\s+\w+",
93+
r"(?m)^\s*pub\s+async\s+fn\s+\w+",
94+
r"fn\s+\w+\s*\([^)]*\)\s*->\s*\w+",
95+
r"fn\s+\w+\s*\([^)]*Result<[^>]+>",
96+
r"fn\s+\w+\s*\([^)]*Option<[^>]+>",
97+
r"(\w+)::(\w+)\(",
98+
r"\w+\.\w+\(",
99+
r"(?m)^\s*struct\s+\w+",
100+
r"(?m)^\s*pub\s+struct\s+\w+",
101+
r"(?m)^\s*enum\s+\w+",
102+
r"(?m)^\s*pub\s+enum\s+\w+",
103+
r"(?m)^\s*trait\s+\w+",
104+
r"(?m)^\s*pub\s+trait\s+\w+",
105+
r"impl\s+(\w+)\s+for\s+(\w+)",
106+
r"impl\s+(\w+)",
107+
r"impl\s*<.*>\s*\w+",
108+
r"\bSelf::\w+\(",
109+
r"(?m)^\s*unsafe\s+fn\s+",
110+
r"(?m)^\s*unsafe\s+\{",
111+
r"\bunsafe\b",
112+
r"fn\s+\w+\s*<",
113+
r"struct\s+\w+\s*<",
114+
r"enum\s+\w+\s*<",
115+
r"impl\s*<.*>",
116+
r"<[A-Za-z, ]+>",
117+
r"\b'\w+\b",
118+
r"&'\w+",
119+
r"<'\w+>",
120+
r"for<'\w+>",
121+
r"macro_rules!\s*\w+",
122+
r"\w+!\s*\(",
123+
r"\blog!\s*\(",
124+
r"\bdbg!\s*\(",
125+
r"\bprintln!\s*\(",
126+
r"\bassert!\s*\(",
127+
r"log::\w+\(",
128+
r"Result<[^>]+>",
129+
r"Option<[^>]+>",
130+
r"match\s+\w+\s*\{",
131+
r"mod\s+tests\s*\{",
132+
r"async\s+fn\s+\w+",
133+
r"await\s*;?",
134+
r"std::thread::spawn",
135+
r"tokio::spawn",
136+
r"match\s+.+\s*\{",
137+
r"if\s+let\s+Some\(",
138+
r"while\s+let\s+Some\(",
139+
r"//.*",
140+
r"/\*.*?\*/",
141+
r"//\s*TODO",
142+
r"//\s*FIXME",
143+
r"//\s*HACK",
144+
r"unsafe\s*\{",
145+
r"<'\w+,\s*'\w+>",
146+
r"for<'\w+>",
147+
r"&'\w+\s*\w+",
148+
r"where\s+",
149+
r"T:\s*\w+",
150+
r"dyn\s+\w+",
151+
r"Box<dyn\s+\w+>",
152+
r"impl\s+Trait",
153+
r"temp\w*",
154+
r"foo|bar|baz",
155+
r"let\s+mut\s+\w+",
29156
];
30157

31-
fn collect_rust_code() -> Vec<String> {
32-
let files = read_dir("/Users/macronova/Desktop/rust-stack")
33-
.expect("Directory should exist")
34-
.collect::<Result<Vec<_>, _>>()
35-
.expect("Files should be present");
36-
let parquet_batches = files
37-
.into_iter()
38-
.progress()
39-
.flat_map(|entry| {
40-
ParquetRecordBatchReaderBuilder::try_new(
41-
File::open(entry.path()).expect("File should be readable"),
42-
)
43-
.expect("Parquet file should be present")
44-
.build()
45-
.expect("Parquet file should be readable")
46-
})
47-
.collect::<Result<Vec<_>, _>>()
48-
.expect("Parquet file should be valid");
49-
parquet_batches
50-
.into_par_iter()
51-
.progress()
52-
.flat_map(|batch| {
53-
batch
54-
.column_by_name("content")
55-
.expect("Content column should be present")
56-
.as_string::<i32>()
57-
.iter()
58-
.map(|os| os.unwrap_or_default().to_string())
59-
.collect::<Vec<_>>()
60-
})
61-
.filter(|code| code.len() < MAX_CODE_LENGTH)
62-
.collect()
63-
}
64-
65158
async fn bench_fts_query((reader, pattern): (FullTextIndexReader<'_>, &str)) {
66159
reader
67160
.search(pattern)
@@ -78,7 +171,14 @@ async fn bench_literal_expr((reader, pattern): (FullTextIndexReader<'_>, ChromaR
78171

79172
fn bench_literal(criterion: &mut Criterion) {
80173
let runtime = tokio_multi_thread();
81-
let source_code_chunk = collect_rust_code();
174+
let source_code_chunk = runtime.block_on(async {
175+
TheStackDedupRust::init()
176+
.await
177+
.expect("the-stack-dedup-rust dataset should be initializable")
178+
.documents()
179+
.await
180+
.expect("the dataset should contain documents")
181+
});
82182

83183
let temp_dir = tempdir().expect("Temporary directory should be creatable");
84184
let storage = Storage::Local(LocalStorage::new(

rust/wal3/tests/test_k8s_integration_03_initialized_append_succeeds.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ async fn test_k8s_integration_03_initialized_append_succeeds() {
4747
seq_no: FragmentSeqNo(1),
4848
start: 1,
4949
limit: 2,
50-
num_bytes: 1187,
50+
num_bytes: 1044,
5151
data: vec![(position, vec![42, 43, 44, 45])],
5252
};
5353
let postconditions = [
5454
Condition::Manifest(ManifestCondition {
55-
acc_bytes: 1187,
55+
acc_bytes: 1044,
5656
writer: "test writer".to_string(),
5757
snapshots: vec![],
5858
fragments: vec![fragment1.clone()],

0 commit comments

Comments
 (0)