Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add missing parameter for percentiles #2157

Merged
merged 1 commit into from
Aug 29, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 143 additions & 8 deletions src/aggregation/metric/percentiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateMetricResult,
};
use crate::aggregation::segment_agg_result::SegmentAggregationCollector;
use crate::aggregation::{f64_from_fastfield_u64, AggregationError};
use crate::aggregation::{f64_from_fastfield_u64, f64_to_fastfield_u64, AggregationError};
use crate::{DocId, TantivyError};

/// # Percentiles
Expand Down Expand Up @@ -134,6 +134,7 @@
pub(crate) percentiles: PercentilesCollector,
pub(crate) accessor_idx: usize,
val_cache: Vec<u64>,
missing: Option<u64>,
}

#[derive(Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -234,11 +235,16 @@
accessor_idx: usize,
) -> crate::Result<Self> {
req.validate()?;
let missing = req
.missing
.and_then(|val| f64_to_fastfield_u64(val, &field_type));

Ok(Self {
field_type,
percentiles: PercentilesCollector::new(),
accessor_idx,
val_cache: Default::default(),
missing,
})
}
#[inline]
Expand All @@ -247,9 +253,17 @@
docs: &[DocId],
agg_accessor: &mut AggregationWithAccessor,
) {
agg_accessor
.column_block_accessor
.fetch_block(docs, &agg_accessor.accessor);
if let Some(missing) = self.missing.as_ref() {
agg_accessor.column_block_accessor.fetch_block_with_missing(
docs,
&agg_accessor.accessor,
*missing,
);
} else {
agg_accessor
.column_block_accessor
.fetch_block(docs, &agg_accessor.accessor);
}

for val in agg_accessor.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
Expand Down Expand Up @@ -284,9 +298,22 @@
) -> crate::Result<()> {
let field = &agg_with_accessor.aggs.values[self.accessor_idx].accessor;

for val in field.values_for_doc(doc) {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.percentiles.collect(val1);
if let Some(missing) = self.missing {
let mut has_val = false;
for val in field.values_for_doc(doc) {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.percentiles.collect(val1);
has_val = true;
}
if !has_val {
self.percentiles
.collect(f64_from_fastfield_u64(missing, &self.field_type));
}
} else {
for val in field.values_for_doc(doc) {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.percentiles.collect(val1);
}
}

Ok(())
Expand Down Expand Up @@ -316,10 +343,12 @@
use crate::aggregation::agg_req::Aggregations;
use crate::aggregation::agg_result::AggregationResults;
use crate::aggregation::tests::{
get_test_index_from_values, get_test_index_from_values_and_terms,
exec_request_with_query, get_test_index_from_values, get_test_index_from_values_and_terms,
};
use crate::aggregation::AggregationCollector;
use crate::query::AllQuery;
use crate::schema::{Schema, FAST};
use crate::Index;

#[test]
fn test_aggregation_percentiles_empty_index() -> crate::Result<()> {
Expand Down Expand Up @@ -470,7 +499,7 @@

fn test_aggregation_percentiles(merge_segments: bool) -> crate::Result<()> {
use rand_distr::Distribution;
let num_values_in_segment = vec![100, 30_000, 8000];

Check warning on line 502 in src/aggregation/metric/percentiles.rs

View workflow job for this annotation

GitHub Actions / clippy

useless use of `vec!`

warning: useless use of `vec!` --> src/aggregation/metric/percentiles.rs:502:37 | 502 | let num_values_in_segment = vec![100, 30_000, 8000]; | ^^^^^^^^^^^^^^^^^^^^^^^ help: you can use an array directly: `[100, 30_000, 8000]` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#useless_vec = note: `#[warn(clippy::useless_vec)]` on by default
let lg_norm = rand_distr::LogNormal::new(2.996f64, 0.979f64).unwrap();
let mut rng = StdRng::from_seed([1u8; 32]);

Expand Down Expand Up @@ -552,4 +581,110 @@

Ok(())
}

#[test]
fn test_percentiles_missing_sub_agg() -> crate::Result<()> {
// This test verifies the `collect` method (in contrast to `collect_block`), which is
// called when the sub-aggregations are flushed.
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("texts", FAST);
let score_field_f64 = schema_builder.add_f64_field("score", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);

{
let mut index_writer = index.writer_for_tests()?;
// writing the segment
index_writer.add_document(doc!(
score_field_f64 => 10.0f64,
text_field => "a"
))?;
index_writer.add_document(doc!(
score_field_f64 => 10.0f64,
text_field => "a"
))?;

index_writer.add_document(doc!(text_field => "a"))?;

index_writer.commit()?;
}

let agg_req: Aggregations = {
serde_json::from_value(json!({
"range_with_stats": {
"terms": {
"field": "texts"
},
"aggs": {
"percentiles": {
"percentiles": {
"field": "score",
"missing": 5.0
}
}
}
}
}))
.unwrap()
};

let res = exec_request_with_query(agg_req, &index, None)?;
assert_eq!(res["range_with_stats"]["buckets"][0]["doc_count"], 3);

assert_eq!(
res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["1.0"],
5.0028295751107414
);
assert_eq!(
res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["99.0"],
10.07469668951144
);

Ok(())
}

#[test]
fn test_percentiles_missing() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("texts", FAST);
let score_field_f64 = schema_builder.add_f64_field("score", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);

{
let mut index_writer = index.writer_for_tests()?;
// writing the segment
index_writer.add_document(doc!(
score_field_f64 => 10.0f64,
text_field => "a"
))?;
index_writer.add_document(doc!(
score_field_f64 => 10.0f64,
text_field => "a"
))?;

index_writer.add_document(doc!(text_field => "a"))?;

index_writer.commit()?;
}

let agg_req: Aggregations = {
serde_json::from_value(json!({
"percentiles": {
"percentiles": {
"field": "score",
"missing": 5.0
}
}
}))
.unwrap()
};

let res = exec_request_with_query(agg_req, &index, None)?;

assert_eq!(res["percentiles"]["values"]["1.0"], 5.0028295751107414);
assert_eq!(res["percentiles"]["values"]["99.0"], 10.07469668951144);

Ok(())
}
}
Loading