diff --git a/src/aggregation/agg_req_with_accessor.rs b/src/aggregation/agg_req_with_accessor.rs index 8f8a3df057..67cd401284 100644 --- a/src/aggregation/agg_req_with_accessor.rs +++ b/src/aggregation/agg_req_with_accessor.rs @@ -112,12 +112,24 @@ impl AggregationWithAccessor { fallback_type, )? } - Average(AverageAggregation { field: field_name }) - | Count(CountAggregation { field: field_name }) - | Max(MaxAggregation { field: field_name }) - | Min(MinAggregation { field: field_name }) - | Stats(StatsAggregation { field: field_name }) - | Sum(SumAggregation { field: field_name }) => { + Average(AverageAggregation { + field: field_name, .. + }) + | Count(CountAggregation { + field: field_name, .. + }) + | Max(MaxAggregation { + field: field_name, .. + }) + | Min(MinAggregation { + field: field_name, .. + }) + | Stats(StatsAggregation { + field: field_name, .. + }) + | Sum(SumAggregation { + field: field_name, .. + }) => { let (accessor, field_type) = get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?; diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 66db88f61e..5092d7d3a2 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -1455,6 +1455,47 @@ mod tests { Ok(()) } + #[test] + fn terms_empty_json() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let json = schema_builder.add_json_field("json", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_for_tests().unwrap(); + // => Segment with empty json + index_writer.add_document(doc!()).unwrap(); + index_writer.commit().unwrap(); + // => Segment with json, but no field partially_empty + index_writer + .add_document(doc!(json => json!({"different_field": "blue"}))) + .unwrap(); + index_writer.commit().unwrap(); + //// => Segment with field partially_empty + index_writer + .add_document(doc!(json => json!({"partially_empty": "blue"}))) + .unwrap(); + index_writer.add_document(doc!())?; + index_writer.commit().unwrap(); + + let agg_req: Aggregations = serde_json::from_value(json!({ + "my_texts": { + "terms": { + "field": "json.partially_empty" + }, + } + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + assert_eq!(res["my_texts"]["buckets"][0]["key"], "blue"); + assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 1); + assert_eq!(res["my_texts"]["buckets"][1], serde_json::Value::Null); + assert_eq!(res["my_texts"]["sum_other_doc_count"], 0); + assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0); + + Ok(()) + } #[test] fn terms_aggregation_bytes() -> crate::Result<()> { @@ -1492,6 +1533,7 @@ mod tests { Ok(()) } + #[test] fn terms_aggregation_missing_multi_value() -> crate::Result<()> { let mut schema_builder = Schema::builder(); diff --git a/src/aggregation/metric/average.rs b/src/aggregation/metric/average.rs index ca8770a09b..ed13c53fba 100644 --- a/src/aggregation/metric/average.rs +++ b/src/aggregation/metric/average.rs @@ -20,12 +20,21 @@ use super::{IntermediateStats, SegmentStatsCollector}; pub struct AverageAggregation { /// The field name to compute the average on. pub field: String, + /// The missing parameter defines how documents that are missing a value should be treated. + /// By default they will be ignored but it is also possible to treat them as if they had a + /// value. Examples in JSON format: + /// { "field": "my_numbers", "missing": "10.0" } + #[serde(skip_serializing_if = "Option::is_none", default)] + pub missing: Option, } impl AverageAggregation { /// Creates a new [`AverageAggregation`] instance from a field name. pub fn from_field_name(field_name: String) -> Self { - Self { field: field_name } + Self { + field: field_name, + missing: None, + } } /// Returns the field name the aggregation is computed on. pub fn field_name(&self) -> &str { diff --git a/src/aggregation/metric/count.rs b/src/aggregation/metric/count.rs index 7a1c2a9b5d..ca685d6670 100644 --- a/src/aggregation/metric/count.rs +++ b/src/aggregation/metric/count.rs @@ -20,12 +20,21 @@ use super::{IntermediateStats, SegmentStatsCollector}; pub struct CountAggregation { /// The field name to compute the count on. pub field: String, + /// The missing parameter defines how documents that are missing a value should be treated. + /// By default they will be ignored but it is also possible to treat them as if they had a + /// value. Examples in JSON format: + /// { "field": "my_numbers", "missing": "10.0" } + #[serde(skip_serializing_if = "Option::is_none", default)] + pub missing: Option, } impl CountAggregation { /// Creates a new [`CountAggregation`] instance from a field name. pub fn from_field_name(field_name: String) -> Self { - Self { field: field_name } + Self { + field: field_name, + missing: None, + } } /// Returns the field name the aggregation is computed on. pub fn field_name(&self) -> &str { diff --git a/src/aggregation/metric/max.rs b/src/aggregation/metric/max.rs index bf579b6535..674972f519 100644 --- a/src/aggregation/metric/max.rs +++ b/src/aggregation/metric/max.rs @@ -20,12 +20,21 @@ use super::{IntermediateStats, SegmentStatsCollector}; pub struct MaxAggregation { /// The field name to compute the maximum on. pub field: String, + /// The missing parameter defines how documents that are missing a value should be treated. + /// By default they will be ignored but it is also possible to treat them as if they had a + /// value. Examples in JSON format: + /// { "field": "my_numbers", "missing": "10.0" } + #[serde(skip_serializing_if = "Option::is_none", default)] + pub missing: Option, } impl MaxAggregation { /// Creates a new [`MaxAggregation`] instance from a field name. pub fn from_field_name(field_name: String) -> Self { - Self { field: field_name } + Self { + field: field_name, + missing: None, + } } /// Returns the field name the aggregation is computed on. pub fn field_name(&self) -> &str { @@ -56,3 +65,55 @@ impl IntermediateMax { self.stats.finalize().max } } + +#[cfg(test)] +mod tests { + use crate::aggregation::agg_req::Aggregations; + use crate::aggregation::tests::exec_request_with_query; + use crate::schema::{Schema, FAST}; + use crate::Index; + + #[test] + fn test_max_agg_with_missing() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let json = schema_builder.add_json_field("json", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_for_tests().unwrap(); + // => Segment with empty json + index_writer.add_document(doc!()).unwrap(); + index_writer.commit().unwrap(); + // => Segment with json, but no field partially_empty + index_writer + .add_document(doc!(json => json!({"different_field": "blue"}))) + .unwrap(); + index_writer.commit().unwrap(); + //// => Segment with field partially_empty + index_writer + .add_document(doc!(json => json!({"partially_empty": 10.0}))) + .unwrap(); + index_writer.add_document(doc!())?; + index_writer.commit().unwrap(); + + let agg_req: Aggregations = serde_json::from_value(json!({ + "my_stats": { + "max": { + "field": "json.partially_empty", + "missing": 100.0, + } + } + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + assert_eq!( + res["my_stats"], + json!({ + "value": 100.0, + }) + ); + + Ok(()) + } +} diff --git a/src/aggregation/metric/min.rs b/src/aggregation/metric/min.rs index 395c214f21..562e865d38 100644 --- a/src/aggregation/metric/min.rs +++ b/src/aggregation/metric/min.rs @@ -20,12 +20,21 @@ use super::{IntermediateStats, SegmentStatsCollector}; pub struct MinAggregation { /// The field name to compute the minimum on. pub field: String, + /// The missing parameter defines how documents that are missing a value should be treated. + /// By default they will be ignored but it is also possible to treat them as if they had a + /// value. Examples in JSON format: + /// { "field": "my_numbers", "missing": "10.0" } + #[serde(skip_serializing_if = "Option::is_none", default)] + pub missing: Option, } impl MinAggregation { /// Creates a new [`MinAggregation`] instance from a field name. pub fn from_field_name(field_name: String) -> Self { - Self { field: field_name } + Self { + field: field_name, + missing: None, + } } /// Returns the field name the aggregation is computed on. pub fn field_name(&self) -> &str { diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index 2011d5be2b..022e40ec4e 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -5,11 +5,11 @@ use super::*; use crate::aggregation::agg_req_with_accessor::{ AggregationWithAccessor, AggregationsWithAccessor, }; -use crate::aggregation::f64_from_fastfield_u64; use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResult, IntermediateAggregationResults, IntermediateMetricResult, }; use crate::aggregation::segment_agg_result::SegmentAggregationCollector; +use crate::aggregation::{f64_from_fastfield_u64, f64_to_fastfield_u64}; use crate::{DocId, TantivyError}; /// A multi-value metric aggregation that computes a collection of statistics on numeric values that @@ -29,12 +29,21 @@ use crate::{DocId, TantivyError}; pub struct StatsAggregation { /// The field name to compute the stats on. pub field: String, + /// The missing parameter defines how documents that are missing a value should be treated. + /// By default they will be ignored but it is also possible to treat them as if they had a + /// value. Examples in JSON format: + /// { "field": "my_numbers", "missing": "10.0" } + #[serde(skip_serializing_if = "Option::is_none", default)] + pub missing: Option, } impl StatsAggregation { /// Creates a new [`StatsAggregation`] instance from a field name. pub fn from_field_name(field_name: String) -> Self { - StatsAggregation { field: field_name } + StatsAggregation { + field: field_name, + missing: None, + } } /// Returns the field name the aggregation is computed on. pub fn field_name(&self) -> &str { @@ -153,6 +162,7 @@ pub(crate) enum SegmentStatsType { #[derive(Clone, Debug, PartialEq)] pub(crate) struct SegmentStatsCollector { + missing: Option, field_type: ColumnType, pub(crate) collecting_for: SegmentStatsType, pub(crate) stats: IntermediateStats, @@ -165,12 +175,15 @@ impl SegmentStatsCollector { field_type: ColumnType, collecting_for: SegmentStatsType, accessor_idx: usize, + missing: Option, ) -> Self { + let missing = missing.and_then(|val| f64_to_fastfield_u64(val, &field_type)); Self { field_type, collecting_for, stats: IntermediateStats::default(), accessor_idx, + missing, val_cache: Default::default(), } } @@ -180,10 +193,17 @@ impl SegmentStatsCollector { docs: &[DocId], agg_accessor: &mut AggregationWithAccessor, ) { - agg_accessor - .column_block_accessor - .fetch_block(docs, &agg_accessor.accessor); - + if let Some(missing) = self.missing.as_ref() { + agg_accessor.column_block_accessor.fetch_block_with_missing( + docs, + &agg_accessor.accessor, + *missing, + ); + } else { + agg_accessor + .column_block_accessor + .fetch_block(docs, &agg_accessor.accessor); + } for val in agg_accessor.column_block_accessor.iter_vals() { let val1 = f64_from_fastfield_u64(val, &self.field_type); self.stats.collect(val1); @@ -262,11 +282,13 @@ mod tests { use crate::aggregation::agg_req::{Aggregation, Aggregations}; use crate::aggregation::agg_result::AggregationResults; - use crate::aggregation::tests::{get_test_index_2_segments, get_test_index_from_values}; + use crate::aggregation::tests::{ + exec_request_with_query, get_test_index_2_segments, get_test_index_from_values, + }; use crate::aggregation::AggregationCollector; use crate::query::{AllQuery, TermQuery}; - use crate::schema::IndexRecordOption; - use crate::Term; + use crate::schema::{IndexRecordOption, Schema, FAST}; + use crate::{Index, Term}; #[test] fn test_aggregation_stats_empty_index() -> crate::Result<()> { @@ -453,4 +475,99 @@ mod tests { Ok(()) } + + #[test] + fn test_stats_json() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let json = schema_builder.add_json_field("json", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_for_tests().unwrap(); + // => Segment with empty json + index_writer.add_document(doc!()).unwrap(); + index_writer.commit().unwrap(); + // => Segment with json, but no field partially_empty + index_writer + .add_document(doc!(json => json!({"different_field": "blue"}))) + .unwrap(); + index_writer.commit().unwrap(); + //// => Segment with field partially_empty + index_writer + .add_document(doc!(json => json!({"partially_empty": 10.0}))) + .unwrap(); + index_writer.add_document(doc!())?; + index_writer.commit().unwrap(); + + let agg_req: Aggregations = serde_json::from_value(json!({ + "my_stats": { + "stats": { + "field": "json.partially_empty" + }, + } + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + assert_eq!( + res["my_stats"], + json!({ + "avg": 10.0, + "count": 1, + "max": 10.0, + "min": 10.0, + "sum": 10.0 + }) + ); + + Ok(()) + } + + #[test] + fn test_stats_json_missing() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let json = schema_builder.add_json_field("json", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_for_tests().unwrap(); + // => Segment with empty json + index_writer.add_document(doc!()).unwrap(); + index_writer.commit().unwrap(); + // => Segment with json, but no field partially_empty + index_writer + .add_document(doc!(json => json!({"different_field": "blue"}))) + .unwrap(); + index_writer.commit().unwrap(); + //// => Segment with field partially_empty + index_writer + .add_document(doc!(json => json!({"partially_empty": 10.0}))) + .unwrap(); + index_writer.add_document(doc!())?; + index_writer.commit().unwrap(); + + let agg_req: Aggregations = serde_json::from_value(json!({ + "my_stats": { + "stats": { + "field": "json.partially_empty", + "missing": 0.0 + }, + } + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + assert_eq!( + res["my_stats"], + json!({ + "avg": 2.5, + "count": 4, + "max": 10.0, + "min": 0.0, + "sum": 10.0 + }) + ); + + Ok(()) + } } diff --git a/src/aggregation/metric/sum.rs b/src/aggregation/metric/sum.rs index 91d58a2ec2..f63c144be8 100644 --- a/src/aggregation/metric/sum.rs +++ b/src/aggregation/metric/sum.rs @@ -20,12 +20,21 @@ use super::{IntermediateStats, SegmentStatsCollector}; pub struct SumAggregation { /// The field name to compute the minimum on. pub field: String, + /// The missing parameter defines how documents that are missing a value should be treated. + /// By default they will be ignored but it is also possible to treat them as if they had a + /// value. Examples in JSON format: + /// { "field": "my_numbers", "missing": "10.0" } + #[serde(skip_serializing_if = "Option::is_none", default)] + pub missing: Option, } impl SumAggregation { /// Creates a new [`SumAggregation`] instance from a field name. pub fn from_field_name(field_name: String) -> Self { - Self { field: field_name } + Self { + field: field_name, + missing: None, + } } /// Returns the field name the aggregation is computed on. pub fn field_name(&self) -> &str { diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index e5be68dc61..853779689f 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -105,35 +105,43 @@ pub(crate) fn build_single_agg_segment_collector( req.field_type, accessor_idx, )?)), - Average(AverageAggregation { .. }) => Ok(Box::new(SegmentStatsCollector::from_req( - req.field_type, - SegmentStatsType::Average, - accessor_idx, - ))), - Count(CountAggregation { .. }) => Ok(Box::new(SegmentStatsCollector::from_req( + Average(AverageAggregation { missing, .. }) => { + Ok(Box::new(SegmentStatsCollector::from_req( + req.field_type, + SegmentStatsType::Average, + accessor_idx, + *missing, + ))) + } + Count(CountAggregation { missing, .. }) => Ok(Box::new(SegmentStatsCollector::from_req( req.field_type, SegmentStatsType::Count, accessor_idx, + *missing, ))), - Max(MaxAggregation { .. }) => Ok(Box::new(SegmentStatsCollector::from_req( + Max(MaxAggregation { missing, .. }) => Ok(Box::new(SegmentStatsCollector::from_req( req.field_type, SegmentStatsType::Max, accessor_idx, + *missing, ))), - Min(MinAggregation { .. }) => Ok(Box::new(SegmentStatsCollector::from_req( + Min(MinAggregation { missing, .. }) => Ok(Box::new(SegmentStatsCollector::from_req( req.field_type, SegmentStatsType::Min, accessor_idx, + *missing, ))), - Stats(StatsAggregation { .. }) => Ok(Box::new(SegmentStatsCollector::from_req( + Stats(StatsAggregation { missing, .. }) => Ok(Box::new(SegmentStatsCollector::from_req( req.field_type, SegmentStatsType::Stats, accessor_idx, + *missing, ))), - Sum(SumAggregation { .. }) => Ok(Box::new(SegmentStatsCollector::from_req( + Sum(SumAggregation { missing, .. }) => Ok(Box::new(SegmentStatsCollector::from_req( req.field_type, SegmentStatsType::Sum, accessor_idx, + *missing, ))), Percentiles(percentiles_req) => Ok(Box::new( SegmentPercentilesCollector::from_req_and_validate(