diff --git a/Cargo.lock b/Cargo.lock index 10c3d90..4a970e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3036,7 +3036,7 @@ dependencies = [ [[package]] name = "pydf_io" version = "0.7.6" -source = "git+https://github.com/DataTreehouse/maplib?rev=d262865ba5bcd3f3612a1ae0709999f6ce002d62#d262865ba5bcd3f3612a1ae0709999f6ce002d62" +source = "git+https://github.com/DataTreehouse/maplib?rev=0b0c67e767cc1c67156bde273a1a717b93c30a42#0b0c67e767cc1c67156bde273a1a717b93c30a42" dependencies = [ "polars", "polars-core", @@ -3113,7 +3113,7 @@ dependencies = [ [[package]] name = "query_processing" version = "0.3.12" -source = "git+https://github.com/DataTreehouse/maplib?rev=d262865ba5bcd3f3612a1ae0709999f6ce002d62#d262865ba5bcd3f3612a1ae0709999f6ce002d62" +source = "git+https://github.com/DataTreehouse/maplib?rev=0b0c67e767cc1c67156bde273a1a717b93c30a42#0b0c67e767cc1c67156bde273a1a717b93c30a42" dependencies = [ "log", "oxrdf", @@ -3341,7 +3341,7 @@ checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "representation" version = "0.6.10" -source = "git+https://github.com/DataTreehouse/maplib?rev=d262865ba5bcd3f3612a1ae0709999f6ce002d62#d262865ba5bcd3f3612a1ae0709999f6ce002d62" +source = "git+https://github.com/DataTreehouse/maplib?rev=0b0c67e767cc1c67156bde273a1a717b93c30a42#0b0c67e767cc1c67156bde273a1a717b93c30a42" dependencies = [ "chrono", "chrono-tz 0.10.0", @@ -3746,7 +3746,7 @@ dependencies = [ [[package]] name = "spargebra" version = "0.3.0-alpha.5-parparse" -source = "git+https://github.com/DataTreehouse/maplib?rev=d262865ba5bcd3f3612a1ae0709999f6ce002d62#d262865ba5bcd3f3612a1ae0709999f6ce002d62" +source = "git+https://github.com/DataTreehouse/maplib?rev=0b0c67e767cc1c67156bde273a1a717b93c30a42#0b0c67e767cc1c67156bde273a1a717b93c30a42" dependencies = [ "chrono", "fundu", @@ -3967,7 +3967,7 @@ dependencies = [ [[package]] name = "templates" version = "0.1.0" -source = "git+https://github.com/DataTreehouse/maplib?rev=d262865ba5bcd3f3612a1ae0709999f6ce002d62#d262865ba5bcd3f3612a1ae0709999f6ce002d62" +source = "git+https://github.com/DataTreehouse/maplib?rev=0b0c67e767cc1c67156bde273a1a717b93c30a42#0b0c67e767cc1c67156bde273a1a717b93c30a42" dependencies = [ "log", "nom", diff --git a/Cargo.toml b/Cargo.toml index 4edcbc3..02730db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,11 +16,11 @@ members = [ #pydf_io = { path = "../maplib/lib/pydf_io"} #representation = { path = "../maplib/lib/representation", features = ["rdf-star"]} #templates = { path = "../maplib/lib/templates"} -spargebra = { git = "https://github.com/DataTreehouse/maplib", rev="d262865ba5bcd3f3612a1ae0709999f6ce002d62", features = ["rdf-star"]} -query_processing = { git = "https://github.com/DataTreehouse/maplib", rev="d262865ba5bcd3f3612a1ae0709999f6ce002d62" } -pydf_io = { git = "https://github.com/DataTreehouse/maplib", rev="d262865ba5bcd3f3612a1ae0709999f6ce002d62" } -representation = { git = "https://github.com/DataTreehouse/maplib", rev="d262865ba5bcd3f3612a1ae0709999f6ce002d62", features = ["rdf-star"] } -templates = { git = "https://github.com/DataTreehouse/maplib", rev="d262865ba5bcd3f3612a1ae0709999f6ce002d62" } +spargebra = { git = "https://github.com/DataTreehouse/maplib", rev="0b0c67e767cc1c67156bde273a1a717b93c30a42", features = ["rdf-star"]} +query_processing = { git = "https://github.com/DataTreehouse/maplib", rev="0b0c67e767cc1c67156bde273a1a717b93c30a42" } +pydf_io = { git = "https://github.com/DataTreehouse/maplib", rev="0b0c67e767cc1c67156bde273a1a717b93c30a42" } +representation = { git = "https://github.com/DataTreehouse/maplib", rev="0b0c67e767cc1c67156bde273a1a717b93c30a42", features = ["rdf-star"] } +templates = { git = "https://github.com/DataTreehouse/maplib", rev="0b0c67e767cc1c67156bde273a1a717b93c30a42" } sparesults = { version = "0.2.3", features = ["rdf-star"] } diff --git a/lib/bigquery-polars/src/querying.rs b/lib/bigquery-polars/src/querying.rs index bc083d7..fa4a21b 100644 --- a/lib/bigquery-polars/src/querying.rs +++ b/lib/bigquery-polars/src/querying.rs @@ -24,7 +24,6 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -use std::sync::Arc; use crate::errors::BigQueryExecutorError; use gcp_bigquery_client::error::BQError; use gcp_bigquery_client::job::JobApi; @@ -34,12 +33,16 @@ use gcp_bigquery_client::model::get_query_results_response::GetQueryResultsRespo use gcp_bigquery_client::model::query_request::QueryRequest; use gcp_bigquery_client::model::table_cell::TableCell; use gcp_bigquery_client::Client; -use polars::prelude::{concat, AnyValue, Column, DataFrame, DataType, IntoColumn, IntoLazy, LazyFrame, PlSmallStr, TimeUnit}; +use polars::prelude::{ + concat, AnyValue, Column, DataFrame, DataType, IntoColumn, IntoLazy, LazyFrame, PlSmallStr, + TimeUnit, +}; use polars::series::Series; use rayon::iter::IndexedParallelIterator; use rayon::iter::IntoParallelRefIterator; use rayon::iter::ParallelIterator; use rayon::prelude::IntoParallelIterator; +use std::sync::Arc; use std::time::Duration; use tokio::time::sleep; @@ -139,7 +142,9 @@ impl BigQueryExecutor { .into_par_iter() .zip(names.par_iter()) .map(|(any_value_vec, name)| { - Series::from_any_values(name.into(), any_value_vec.as_slice(), false).unwrap().into_column() + Series::from_any_values(name.into(), any_value_vec.as_slice(), false) + .unwrap() + .into_column() }) .collect(); all_lfs.push(DataFrame::new(columns_vec).unwrap().lazy()) @@ -181,8 +186,7 @@ impl BigQueryExecutor { start_index: None, timeout_ms: None, }; - job - .get_query_results(self.project_id.as_str(), job_id, params.clone()) + job.get_query_results(self.project_id.as_str(), job_id, params.clone()) .await .map_err(map_bqerr) } @@ -215,7 +219,9 @@ fn table_cell_to_any<'a>( AnyValue::Boolean(value_as_ref.as_str().unwrap().parse::().unwrap()) } FieldType::Timestamp => { - let some_utc = some_utc.as_ref().map(|tz| Arc::new(PlSmallStr::from_str(tz))); + let some_utc = some_utc + .as_ref() + .map(|tz| Arc::new(PlSmallStr::from_str(tz))); let ts_str = value_as_ref.as_str().unwrap(); let timestamp_ns = (ts_str.parse::().unwrap() * (1e9f64)) as i64; AnyValue::DatetimeOwned(timestamp_ns, TimeUnit::Nanoseconds, some_utc) diff --git a/lib/chrontext/src/combiner/lazy_expressions.rs b/lib/chrontext/src/combiner/lazy_expressions.rs index 760abd0..e17a27a 100644 --- a/lib/chrontext/src/combiner/lazy_expressions.rs +++ b/lib/chrontext/src/combiner/lazy_expressions.rs @@ -630,10 +630,8 @@ impl Combiner { let mut output_solution_mappings = solution_mappings; for i in 0..inner.len() { let inner_context = inner_contexts.get(i).unwrap(); - let inner_prepared_virtualized_queries = split_virtualized_queries( - &mut prepared_virtualized_queries, - inner_context, - ); + let inner_prepared_virtualized_queries = + split_virtualized_queries(&mut prepared_virtualized_queries, inner_context); let inner_static_query_map = split_static_queries_opt(&mut static_query_map, inner_context); output_solution_mappings = self @@ -669,13 +667,7 @@ impl Combiner { .await?; args_contexts.insert(i, arg_context); } - func_expression( - output_solution_mappings, - func, - args, - args_contexts, - context, - )? + func_expression(output_solution_mappings, func, args, args_contexts, context)? } }; Ok(output_solution_mappings) diff --git a/lib/chrontext/src/combiner/lazy_graph_patterns.rs b/lib/chrontext/src/combiner/lazy_graph_patterns.rs index 386ac40..5e038b3 100644 --- a/lib/chrontext/src/combiner/lazy_graph_patterns.rs +++ b/lib/chrontext/src/combiner/lazy_graph_patterns.rs @@ -9,6 +9,7 @@ mod order_by; mod project; mod slice; mod union; +mod values; use super::Combiner; use crate::combiner::CombinerError; @@ -195,9 +196,9 @@ impl Combiner { .await } GraphPattern::Values { - variables: _, - bindings: _, - } => Ok(updated_solution_mappings.unwrap()), + variables, + bindings, + } => self.lazy_values(updated_solution_mappings, variables, bindings), GraphPattern::OrderBy { inner, expression } => { self.lazy_order_by( inner, diff --git a/lib/chrontext/src/combiner/lazy_graph_patterns/values.rs b/lib/chrontext/src/combiner/lazy_graph_patterns/values.rs new file mode 100644 index 0000000..a615af4 --- /dev/null +++ b/lib/chrontext/src/combiner/lazy_graph_patterns/values.rs @@ -0,0 +1,24 @@ +use super::Combiner; +use crate::combiner::CombinerError; +use oxrdf::Variable; +use polars::prelude::JoinType; +use query_processing::graph_patterns::{join, values_pattern}; +use representation::solution_mapping::SolutionMappings; +use spargebra::term::GroundTerm; + +impl Combiner { + pub(crate) fn lazy_values( + &mut self, + solution_mappings: Option, + variables: &[Variable], + bindings: &[Vec>], + ) -> Result { + let sm = values_pattern(variables, bindings); + if let Some(mut mappings) = solution_mappings { + mappings = join(mappings, sm, JoinType::Inner)?; + Ok(mappings) + } else { + Ok(sm) + } + } +} diff --git a/lib/chrontext/src/combiner/static_subqueries.rs b/lib/chrontext/src/combiner/static_subqueries.rs index e8c65cb..d047919 100644 --- a/lib/chrontext/src/combiner/static_subqueries.rs +++ b/lib/chrontext/src/combiner/static_subqueries.rs @@ -42,8 +42,9 @@ impl Combiner { &mut self.prepper.basic_virtualized_queries, )?; let (df, datatypes) = create_static_query_dataframe(&use_query, solutions); + let df_height = df.height(); debug!("Static query results:\n {}", df); - let mut out_solution_mappings = SolutionMappings::new(df.lazy(), datatypes); + let mut out_solution_mappings = SolutionMappings::new(df.lazy(), datatypes, df_height); if let Some(use_solution_mappings) = use_solution_mappings { out_solution_mappings = join( out_solution_mappings, @@ -77,7 +78,9 @@ pub(crate) fn split_static_queries_opt( static_queries: &mut Option>, context: &Context, ) -> Option> { - static_queries.as_mut().map(|static_queries| split_static_queries(static_queries, context)) + static_queries + .as_mut() + .map(|static_queries| split_static_queries(static_queries, context)) } fn constrain_query( @@ -119,15 +122,15 @@ fn constrain_query( x.into_iter() .map(|y: Option| { y.map(|y| match y { - Term::NamedNode(nn) => GroundTerm::NamedNode(nn), - Term::BlankNode(_) => { - panic!() - } - Term::Literal(l) => GroundTerm::Literal(l), - Term::Triple(_) => { - todo!() - } - }) + Term::NamedNode(nn) => GroundTerm::NamedNode(nn), + Term::BlankNode(_) => { + panic!() + } + Term::Literal(l) => GroundTerm::Literal(l), + Term::Triple(_) => { + todo!() + } + }) }) .collect() }) diff --git a/lib/chrontext/src/combiner/virtualized_queries.rs b/lib/chrontext/src/combiner/virtualized_queries.rs index 2f0900e..652d45b 100644 --- a/lib/chrontext/src/combiner/virtualized_queries.rs +++ b/lib/chrontext/src/combiner/virtualized_queries.rs @@ -4,7 +4,10 @@ use crate::preparing::grouping_col_type; use log::debug; use oxrdf::vocab::xsd; use oxrdf::Term; -use polars::prelude::{col, CategoricalOrdering, Column, DataFrame, DataType, Expr, IntoLazy, JoinArgs, JoinType, SortMultipleOptions}; +use polars::prelude::{ + col, CategoricalOrdering, Column, DataFrame, DataType, Expr, IntoLazy, JoinArgs, JoinType, + SortMultipleOptions, +}; use representation::polars_to_rdf::polars_type_to_literal_type; use representation::query_context::Context; use representation::solution_mapping::{EagerSolutionMappings, SolutionMappings}; diff --git a/lib/chrontext/src/engine.rs b/lib/chrontext/src/engine.rs index e2bf997..8d3e179 100644 --- a/lib/chrontext/src/engine.rs +++ b/lib/chrontext/src/engine.rs @@ -7,6 +7,7 @@ use crate::splitter::parse_sparql_select_query; use log::debug; use polars::enable_string_cache; use polars::frame::DataFrame; +use polars::prelude::PlSmallStr; use representation::query_context::Context; use representation::solution_mapping::SolutionMappings; use representation::RDFNodeType; @@ -15,7 +16,6 @@ use sparql_database::endpoint::SparqlEndpoint; use sparql_database::SparqlQueryable; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use polars::prelude::PlSmallStr; use virtualization::{Virtualization, VirtualizedDatabase}; use virtualized_query::pushdown_setting::PushdownSetting; @@ -118,9 +118,11 @@ impl Engine { .map_err(ChrontextError::CombinerError)?; for (original, renamed) in rename_map { if let Some(dt) = solution_mappings.rdf_node_types.remove(&renamed) { - solution_mappings.mappings = solution_mappings - .mappings - .rename(&[PlSmallStr::from_string(renamed)], &[PlSmallStr::from_str(&original)], true); + solution_mappings.mappings = solution_mappings.mappings.rename( + &[PlSmallStr::from_string(renamed)], + &[PlSmallStr::from_str(&original)], + true, + ); solution_mappings.rdf_node_types.insert(original, dt); } } @@ -128,6 +130,7 @@ impl Engine { let SolutionMappings { mappings, rdf_node_types, + .. } = solution_mappings; Ok(( diff --git a/lib/chrontext/src/preparing/expressions.rs b/lib/chrontext/src/preparing/expressions.rs index c3c3c77..189f8d1 100644 --- a/lib/chrontext/src/preparing/expressions.rs +++ b/lib/chrontext/src/preparing/expressions.rs @@ -88,14 +88,8 @@ impl TimeseriesQueryPrepper { context: &Context, ) -> EXPrepReturn { match expression { - Expression::NamedNode(..) => { - - EXPrepReturn::new(HashMap::new()) - } - Expression::Literal(..) => { - - EXPrepReturn::new(HashMap::new()) - } + Expression::NamedNode(..) => EXPrepReturn::new(HashMap::new()), + Expression::Literal(..) => EXPrepReturn::new(HashMap::new()), Expression::Variable(..) => EXPrepReturn::new(HashMap::new()), Expression::Or(left, right) => self.prepare_or_expression( left, diff --git a/lib/chrontext/src/preparing/expressions/not_expression.rs b/lib/chrontext/src/preparing/expressions/not_expression.rs index ad5a829..7ca9459 100644 --- a/lib/chrontext/src/preparing/expressions/not_expression.rs +++ b/lib/chrontext/src/preparing/expressions/not_expression.rs @@ -12,7 +12,6 @@ impl TimeseriesQueryPrepper { solution_mappings: &mut SolutionMappings, context: &Context, ) -> EXPrepReturn { - self.prepare_expression( wrapped, try_groupby_complex_query, diff --git a/lib/chrontext/src/preparing/expressions/unary_ordinary_expression.rs b/lib/chrontext/src/preparing/expressions/unary_ordinary_expression.rs index df30c20..f1f4bf3 100644 --- a/lib/chrontext/src/preparing/expressions/unary_ordinary_expression.rs +++ b/lib/chrontext/src/preparing/expressions/unary_ordinary_expression.rs @@ -22,7 +22,7 @@ impl TimeseriesQueryPrepper { UnaryOrdinaryOperator::UnaryPlus => PathEntry::UnaryPlus, UnaryOrdinaryOperator::UnaryMinus => PathEntry::UnaryMinus, }; - + self.prepare_expression( wrapped, try_groupby_complex_query, diff --git a/lib/chrontext/src/preparing/graph_patterns/distinct_pattern.rs b/lib/chrontext/src/preparing/graph_patterns/distinct_pattern.rs index 1a0723c..2ef362e 100644 --- a/lib/chrontext/src/preparing/graph_patterns/distinct_pattern.rs +++ b/lib/chrontext/src/preparing/graph_patterns/distinct_pattern.rs @@ -20,7 +20,7 @@ impl TimeseriesQueryPrepper { ); return GPPrepReturn::fail_groupby_complex_query(); } - + self.prepare_graph_pattern( inner, try_groupby_complex_query, diff --git a/lib/chrontext/src/preparing/graph_patterns/expression_rewrites.rs b/lib/chrontext/src/preparing/graph_patterns/expression_rewrites.rs index bb819c0..aabd115 100644 --- a/lib/chrontext/src/preparing/graph_patterns/expression_rewrites.rs +++ b/lib/chrontext/src/preparing/graph_patterns/expression_rewrites.rs @@ -112,13 +112,11 @@ pub(crate) fn try_recursive_rewrite_expression( } match &expression { - Expression::Literal(lit) => { - RecursiveRewriteReturn::new( - Some(Expression::Literal(lit.clone())), - Some(ChangeType::NoChange), - false, - ) - } + Expression::Literal(lit) => RecursiveRewriteReturn::new( + Some(Expression::Literal(lit.clone())), + Some(ChangeType::NoChange), + false, + ), Expression::Variable(v) => { if vq.has_equivalent_variable(v, context) { RecursiveRewriteReturn::new( @@ -239,8 +237,12 @@ pub(crate) fn try_recursive_rewrite_expression( use_lost_value, ); } - } else if left_rewrite.expression.is_some() && right_rewrite.expression.is_none() && (left_rewrite.change_type.as_ref().unwrap() == &ChangeType::NoChange || left_rewrite.change_type.as_ref().unwrap() - == &ChangeType::Constrained) { + } else if left_rewrite.expression.is_some() + && right_rewrite.expression.is_none() + && (left_rewrite.change_type.as_ref().unwrap() == &ChangeType::NoChange + || left_rewrite.change_type.as_ref().unwrap() + == &ChangeType::Constrained) + { return RecursiveRewriteReturn::new( Some(left_rewrite.expression.as_ref().unwrap().clone()), Some(ChangeType::Constrained), @@ -362,7 +364,11 @@ pub(crate) fn try_recursive_rewrite_expression( use_lost_value, ); } - } else if left_rewrite.expression.is_some() && right_rewrite.expression.is_none() && (left_rewrite.change_type.as_ref().unwrap() == &ChangeType::NoChange || left_rewrite.change_type.as_ref().unwrap() == &ChangeType::Relaxed) { + } else if left_rewrite.expression.is_some() + && right_rewrite.expression.is_none() + && (left_rewrite.change_type.as_ref().unwrap() == &ChangeType::NoChange + || left_rewrite.change_type.as_ref().unwrap() == &ChangeType::Relaxed) + { return RecursiveRewriteReturn::new( Some(left_rewrite.expression.as_ref().unwrap().clone()), Some(ChangeType::Relaxed), @@ -933,9 +939,11 @@ pub(crate) fn try_recursive_rewrite_expression( }) .collect::>(); let use_lost_value = or_lost_value(inner_rewrites.iter().collect()); - if inner_rewrites.iter().all(|x| x.expression.is_some()) && inner_rewrites + if inner_rewrites.iter().all(|x| x.expression.is_some()) + && inner_rewrites .iter() - .all(|x| x.change_type.as_ref().unwrap() == &ChangeType::NoChange) { + .all(|x| x.change_type.as_ref().unwrap() == &ChangeType::NoChange) + { return RecursiveRewriteReturn::new( Some(Expression::Coalesce( inner_rewrites @@ -965,11 +973,12 @@ pub(crate) fn try_recursive_rewrite_expression( }) .collect::>(); let use_lost_value = or_lost_value(right_rewrites.iter().collect()); - if right_rewrites.iter().all(|x| x.expression.is_some()) && right_rewrites + if right_rewrites.iter().all(|x| x.expression.is_some()) + && right_rewrites .iter() - .all(|x| x.change_type.as_ref().unwrap() == &ChangeType::NoChange) { - let use_lost_value = - right_rewrites.iter().any(|x| x.lost_value); + .all(|x| x.change_type.as_ref().unwrap() == &ChangeType::NoChange) + { + let use_lost_value = right_rewrites.iter().any(|x| x.lost_value); return RecursiveRewriteReturn::new( Some(Expression::FunctionCall( left.clone(), diff --git a/lib/chrontext/src/preparing/graph_patterns/graph_pattern.rs b/lib/chrontext/src/preparing/graph_patterns/graph_pattern.rs index 1e8fc03..aef22dc 100644 --- a/lib/chrontext/src/preparing/graph_patterns/graph_pattern.rs +++ b/lib/chrontext/src/preparing/graph_patterns/graph_pattern.rs @@ -18,7 +18,6 @@ impl TimeseriesQueryPrepper { debug!("Encountered graph inside groupby, not supported for complex groupby pushdown"); GPPrepReturn::fail_groupby_complex_query() } else { - self.prepare_graph_pattern( inner, try_groupby_complex_query, diff --git a/lib/chrontext/src/preparing/graph_patterns/group_pattern.rs b/lib/chrontext/src/preparing/graph_patterns/group_pattern.rs index d021be2..3e21b4b 100644 --- a/lib/chrontext/src/preparing/graph_patterns/group_pattern.rs +++ b/lib/chrontext/src/preparing/graph_patterns/group_pattern.rs @@ -7,7 +7,9 @@ use crate::constants::GROUPING_COL; use crate::preparing::graph_patterns::GPPrepReturn; use crate::preparing::grouping_col_type; use oxrdf::Variable; -use polars::prelude::{col, DataFrameJoinOps, IntoLazy, JoinArgs, JoinType, PlSmallStr, Series, UniqueKeepStrategy}; +use polars::prelude::{ + col, DataFrameJoinOps, IntoLazy, JoinArgs, JoinType, PlSmallStr, Series, UniqueKeepStrategy, +}; use query_processing::find_query_variables::find_all_used_variables_in_aggregate_expression; use representation::solution_mapping::SolutionMappings; use spargebra::algebra::{AggregateExpression, GraphPattern}; @@ -34,7 +36,10 @@ impl TimeseriesQueryPrepper { let inner_context = &context.extension_with(PathEntry::GroupInner); let mut try_graph_pattern_prepare = self.prepare_graph_pattern(graph_pattern, true, solution_mappings, inner_context); - if !try_graph_pattern_prepare.fail_groupby_complex_query && self.pushdown_settings.contains(&PushdownSetting::GroupBy) && try_graph_pattern_prepare.virtualized_queries.len() == 1 { + if !try_graph_pattern_prepare.fail_groupby_complex_query + && self.pushdown_settings.contains(&PushdownSetting::GroupBy) + && try_graph_pattern_prepare.virtualized_queries.len() == 1 + { let (_c, mut vqs) = try_graph_pattern_prepare .virtualized_queries .drain() @@ -42,8 +47,7 @@ impl TimeseriesQueryPrepper { .unwrap(); if vqs.len() == 1 { let mut vq = vqs.remove(0); - let in_scope = - check_aggregations_are_in_scope(&vq, inner_context, aggregations); + let in_scope = check_aggregations_are_in_scope(&vq, inner_context, aggregations); if in_scope { let grouping_col = self.add_grouping_col(solution_mappings, by); @@ -89,7 +93,8 @@ impl TimeseriesQueryPrepper { .mappings .clone() .select(by_names.iter().map(col).collect::>()) - .unique(None, UniqueKeepStrategy::First).collect() + .unique(None, UniqueKeepStrategy::First) + .collect() .unwrap(); let mut series = Series::from_iter(0..(df.height() as i64)); series.rename(PlSmallStr::from_str(&grouping_col)); diff --git a/lib/chrontext/src/preparing/graph_patterns/project_pattern.rs b/lib/chrontext/src/preparing/graph_patterns/project_pattern.rs index 4def477..7795d7e 100644 --- a/lib/chrontext/src/preparing/graph_patterns/project_pattern.rs +++ b/lib/chrontext/src/preparing/graph_patterns/project_pattern.rs @@ -15,7 +15,7 @@ impl TimeseriesQueryPrepper { context: &Context, ) -> GPPrepReturn { let inner_context = context.extension_with(PathEntry::ProjectInner); - + self.prepare_graph_pattern( inner, try_groupby_complex_query, diff --git a/lib/chrontext/src/preparing/graph_patterns/reduced_pattern.rs b/lib/chrontext/src/preparing/graph_patterns/reduced_pattern.rs index fc1bcef..df468b3 100644 --- a/lib/chrontext/src/preparing/graph_patterns/reduced_pattern.rs +++ b/lib/chrontext/src/preparing/graph_patterns/reduced_pattern.rs @@ -17,7 +17,6 @@ impl TimeseriesQueryPrepper { debug!("Encountered graph inside reduced, not supported for complex groupby pushdown"); GPPrepReturn::fail_groupby_complex_query() } else { - self.prepare_graph_pattern( inner, try_groupby_complex_query, diff --git a/lib/chrontext/src/preparing/graph_patterns/slice_pattern.rs b/lib/chrontext/src/preparing/graph_patterns/slice_pattern.rs index 4798e65..8ea2aef 100644 --- a/lib/chrontext/src/preparing/graph_patterns/slice_pattern.rs +++ b/lib/chrontext/src/preparing/graph_patterns/slice_pattern.rs @@ -37,8 +37,7 @@ impl TimeseriesQueryPrepper { } } if !found_noncompatible && vqs.len() == 1 { - let vq = - VirtualizedQuery::Sliced(Box::new(vqs.remove(0)), start, length); + let vq = VirtualizedQuery::Sliced(Box::new(vqs.remove(0)), start, length); *vqs = vec![vq]; } } diff --git a/lib/chrontext/src/preprocessing.rs b/lib/chrontext/src/preprocessing.rs index ff4c425..b02789b 100644 --- a/lib/chrontext/src/preprocessing.rs +++ b/lib/chrontext/src/preprocessing.rs @@ -101,10 +101,12 @@ impl Preprocessor { right, &context.extension_with(PathEntry::LeftJoinRightSide), ); - let preprocessed_expression = expression.as_ref().map(|e| self.preprocess_expression( + let preprocessed_expression = expression.as_ref().map(|e| { + self.preprocess_expression( e, &context.extension_with(PathEntry::LeftJoinExpression), - )); + ) + }); GraphPattern::LeftJoin { left: Box::new(left), right: Box::new(right), @@ -161,7 +163,9 @@ impl Preprocessor { find_all_used_variables_in_expression(expression, &mut used_vars, true, true); for v in used_vars.drain() { if let Some(ctr) = self.variable_constraints.get_constraint(&v, context) { - if (ctr == &Constraint::External || ctr == &Constraint::ExternallyDerived) && !self.variable_constraints.contains(variable, context) { + if (ctr == &Constraint::External || ctr == &Constraint::ExternallyDerived) + && !self.variable_constraints.contains(variable, context) + { self.variable_constraints.insert( variable.clone(), context.clone(), diff --git a/lib/chrontext/src/rewriting/expressions/and_expression.rs b/lib/chrontext/src/rewriting/expressions/and_expression.rs index 39e4531..eb641f4 100644 --- a/lib/chrontext/src/rewriting/expressions/and_expression.rs +++ b/lib/chrontext/src/rewriting/expressions/and_expression.rs @@ -87,7 +87,10 @@ impl StaticQueryRewriter { return exr; } } - } else if right_rewrite.expression.is_some() && (right_rewrite.change_type.as_ref().unwrap() == &ChangeType::Relaxed || right_rewrite.change_type.as_ref().unwrap() == &ChangeType::NoChange) { + } else if right_rewrite.expression.is_some() + && (right_rewrite.change_type.as_ref().unwrap() == &ChangeType::Relaxed + || right_rewrite.change_type.as_ref().unwrap() == &ChangeType::NoChange) + { let right_expression_rewrite = right_rewrite.expression.take().unwrap(); exr.with_expression(right_expression_rewrite) .with_change_type(ChangeType::Relaxed); @@ -95,12 +98,15 @@ impl StaticQueryRewriter { } } ChangeType::Constrained => { - if left_rewrite.expression.is_some() && right_rewrite.expression.is_some() && (left_rewrite.change_type.as_ref().unwrap() == &ChangeType::NoChange + if left_rewrite.expression.is_some() + && right_rewrite.expression.is_some() + && (left_rewrite.change_type.as_ref().unwrap() == &ChangeType::NoChange || left_rewrite.change_type.as_ref().unwrap() - == &ChangeType::Constrained) && (right_rewrite.change_type.as_ref().unwrap() - == &ChangeType::NoChange - || right_rewrite.change_type.as_ref().unwrap() - == &ChangeType::Constrained) { + == &ChangeType::Constrained) + && (right_rewrite.change_type.as_ref().unwrap() == &ChangeType::NoChange + || right_rewrite.change_type.as_ref().unwrap() + == &ChangeType::Constrained) + { let left_expression_rewrite = left_rewrite.expression.take().unwrap(); let right_expression_rewrite = right_rewrite.expression.take().unwrap(); diff --git a/lib/chrontext/src/rewriting/expressions/or_expression.rs b/lib/chrontext/src/rewriting/expressions/or_expression.rs index 0e01c8e..62dc8f3 100644 --- a/lib/chrontext/src/rewriting/expressions/or_expression.rs +++ b/lib/chrontext/src/rewriting/expressions/or_expression.rs @@ -49,11 +49,13 @@ impl StaticQueryRewriter { } else { match required_change_direction { ChangeType::Relaxed => { - if left_rewrite.expression.is_some() && right_rewrite.expression.is_some() && (left_rewrite.change_type.as_ref().unwrap() == &ChangeType::NoChange - || left_rewrite.change_type.as_ref().unwrap() == &ChangeType::Relaxed) && (right_rewrite.change_type.as_ref().unwrap() - == &ChangeType::NoChange - || right_rewrite.change_type.as_ref().unwrap() - == &ChangeType::Relaxed) { + if left_rewrite.expression.is_some() + && right_rewrite.expression.is_some() + && (left_rewrite.change_type.as_ref().unwrap() == &ChangeType::NoChange + || left_rewrite.change_type.as_ref().unwrap() == &ChangeType::Relaxed) + && (right_rewrite.change_type.as_ref().unwrap() == &ChangeType::NoChange + || right_rewrite.change_type.as_ref().unwrap() == &ChangeType::Relaxed) + { let left_expression_rewrite = left_rewrite.expression.take().unwrap(); let right_expression_rewrite = right_rewrite.expression.take().unwrap(); exr.with_expression(Expression::Or( @@ -100,7 +102,10 @@ impl StaticQueryRewriter { return exr; } } - } else if right_rewrite.expression.is_some() && (right_rewrite.change_type.as_ref().unwrap() == &ChangeType::Constrained || right_rewrite.change_type.as_ref().unwrap() == &ChangeType::NoChange) { + } else if right_rewrite.expression.is_some() + && (right_rewrite.change_type.as_ref().unwrap() == &ChangeType::Constrained + || right_rewrite.change_type.as_ref().unwrap() == &ChangeType::NoChange) + { let right_expression_rewrite = right_rewrite.expression.take().unwrap(); exr.with_expression(right_expression_rewrite) .with_change_type(ChangeType::Constrained); diff --git a/lib/chrontext/src/rewriting/graph_patterns/bgp_pattern.rs b/lib/chrontext/src/rewriting/graph_patterns/bgp_pattern.rs index 42b18e7..a0e4960 100644 --- a/lib/chrontext/src/rewriting/graph_patterns/bgp_pattern.rs +++ b/lib/chrontext/src/rewriting/graph_patterns/bgp_pattern.rs @@ -80,7 +80,6 @@ impl StaticQueryRewriter { } } - let rewritten = !dynamic_triples.is_empty(); self.basic_virtualized_queries.extend(new_basic_vqs); @@ -104,7 +103,6 @@ impl StaticQueryRewriter { } } - GPReturn::new( GraphPattern::Bgp { patterns: new_triples, diff --git a/lib/chrontext/src/rewriting/graph_patterns/path_pattern.rs b/lib/chrontext/src/rewriting/graph_patterns/path_pattern.rs index 950255d..4e32ae1 100644 --- a/lib/chrontext/src/rewriting/graph_patterns/path_pattern.rs +++ b/lib/chrontext/src/rewriting/graph_patterns/path_pattern.rs @@ -21,7 +21,6 @@ impl StaticQueryRewriter { variables_in_scope.insert(o.clone()); } - GPReturn::new( GraphPattern::Path { subject: subject.clone(), diff --git a/lib/chrontext/src/rewriting/subqueries.rs b/lib/chrontext/src/rewriting/subqueries.rs index 308dca3..59bc8fc 100644 --- a/lib/chrontext/src/rewriting/subqueries.rs +++ b/lib/chrontext/src/rewriting/subqueries.rs @@ -12,10 +12,8 @@ impl StaticQueryRewriter { if is_gp { self.add_subquery(context, gpreturn.graph_pattern.unwrap()); } else { - let mut variables: Vec = gpreturn - .variables_in_scope - .iter().cloned() - .collect(); + let mut variables: Vec = + gpreturn.variables_in_scope.iter().cloned().collect(); variables.sort_by_key(|x| x.as_str().to_string()); let projection = self.create_projection_graph_pattern(&gpreturn, context, &variables); diff --git a/lib/chrontext/src/sparql_result_to_polars.rs b/lib/chrontext/src/sparql_result_to_polars.rs index 9281aaa..bfe7b8a 100644 --- a/lib/chrontext/src/sparql_result_to_polars.rs +++ b/lib/chrontext/src/sparql_result_to_polars.rs @@ -1,8 +1,7 @@ use oxrdf::{NamedNode, Term, Variable}; -use polars::prelude::{as_struct, col, lit, DataFrame, Expr, IntoColumn, IntoLazy, LiteralValue}; +use polars::prelude::{as_struct, col, DataFrame, IntoColumn, IntoLazy, LiteralValue}; use representation::multitype::{ - all_multi_cols, multi_has_this_type_column, non_multi_type_string, MULTI_BLANK_DT, - MULTI_IRI_DT, MULTI_NONE_DT, + all_multi_cols, base_col_name, MULTI_BLANK_DT, MULTI_IRI_DT, MULTI_NONE_DT, }; use representation::rdf_to_polars::{ polars_literal_values_to_series, rdf_blank_node_to_polars_literal_value, @@ -94,7 +93,7 @@ pub(crate) fn create_static_query_dataframe( let mut types = vec![]; for (t, v) in m { let name = if mlen > 1 { - non_multi_type_string(&t) + base_col_name(&t) } else { c.clone() }; @@ -105,13 +104,15 @@ pub(crate) fn create_static_query_dataframe( ser.struct_() .unwrap() .field_by_name(LANG_STRING_VALUE_FIELD) - .unwrap().into_column() + .unwrap() + .into_column(), ); columns.push( ser.struct_() .unwrap() .field_by_name(LANG_STRING_LANG_FIELD) - .unwrap().into_column() + .unwrap() + .into_column(), ); } else if matches!(t, BaseRDFNodeType::None) { columns.push(ser.cast(&t.polars_data_type()).unwrap().into_column()); @@ -129,33 +130,6 @@ pub(crate) fn create_static_query_dataframe( for c in all_multi_cols(&types) { struct_exprs.push(col(&c)); } - let mut is_exprs: Vec = vec![]; - let mut need_none = false; - for t in &types { - if &BaseRDFNodeType::None == t { - need_none = true; - } else { - is_exprs.push( - col(non_multi_type_string(t)) - .is_null() - .alias(multi_has_this_type_column(t)), - ); - } - } - if need_none { - let mut is_iter = is_exprs.iter(); - let mut e = if let Some(e) = is_iter.next() { - e.clone() - } else { - lit(true) - }; - for other_e in is_iter { - e = e.and(other_e.clone().not()) - } - e = e.alias(multi_has_this_type_column(&BaseRDFNodeType::None)); - is_exprs.push(e); - } - struct_exprs.extend(is_exprs); lf = lf .with_column(as_struct(struct_exprs).alias(&c)) .select([col(&c)]); diff --git a/lib/virtualization/src/bigquery.rs b/lib/virtualization/src/bigquery.rs index 053e806..5c06f47 100644 --- a/lib/virtualization/src/bigquery.rs +++ b/lib/virtualization/src/bigquery.rs @@ -3,6 +3,7 @@ use crate::get_datatype_map; use crate::python::translate_sql; use bigquery_polars::{BigQueryExecutor, Client}; use oxrdf::Variable; +use polars::prelude::PlSmallStr; use pyo3::types::PyDict; use pyo3::Py; use representation::solution_mapping::EagerSolutionMappings; @@ -10,7 +11,6 @@ use reqwest::Url; use spargebra::algebra::{AggregateExpression, Expression, OrderExpression}; use spargebra::term::TermPattern; use std::collections::{HashMap, HashSet}; -use polars::prelude::PlSmallStr; use virtualized_query::pushdown_setting::{all_pushdowns, PushdownSetting}; use virtualized_query::{GroupedVirtualizedQuery, VirtualizedQuery}; diff --git a/lib/virtualization/src/opcua.rs b/lib/virtualization/src/opcua.rs index f9013e3..1fb297e 100644 --- a/lib/virtualization/src/opcua.rs +++ b/lib/virtualization/src/opcua.rs @@ -12,7 +12,10 @@ use opcua::sync::RwLock; use oxrdf::vocab::xsd; use oxrdf::{Literal, Variable}; use polars::export::chrono::{DateTime as ChronoDateTime, Duration, TimeZone, Utc}; -use polars::prelude::{concat, AnyValue, Column, DataFrame, DataType, IntoColumn, IntoLazy, NamedFrom, Series, UnionArgs}; +use polars::prelude::{ + concat, AnyValue, Column, DataFrame, DataType, IntoColumn, IntoLazy, NamedFrom, Series, + UnionArgs, +}; use query_processing::constants::DATETIME_AS_SECONDS; use representation::query_context::Context; use representation::solution_mapping::EagerSolutionMappings; @@ -112,10 +115,15 @@ impl VirtualizedOPCUADatabase { .get(0) .unwrap() .as_str(); - let mut id_iter = mapping_df.column(identifier_var).unwrap().as_materialized_series().iter(); + let mut id_iter = mapping_df + .column(identifier_var) + .unwrap() + .as_materialized_series() + .iter(); let mut grouping_col_iter = mapping_df .column(grouping_col_name.as_ref().unwrap()) - .unwrap().as_materialized_series() + .unwrap() + .as_materialized_series() .iter(); for _ in 0..mapping_df.height() { let id_value = match id_iter.next().unwrap() { @@ -231,7 +239,11 @@ impl VirtualizedOPCUADatabase { Column::new_empty((*grouping_col).into(), &DataType::Int64) } else { Column::new_empty( - vq.get_identifier_variables().get(0).unwrap().as_str().into(), + vq.get_identifier_variables() + .get(0) + .unwrap() + .as_str() + .into(), &DataType::String, ) }; diff --git a/lib/virtualized_query/src/lib.rs b/lib/virtualized_query/src/lib.rs index 2d7929a..43951e8 100644 --- a/lib/virtualized_query/src/lib.rs +++ b/lib/virtualized_query/src/lib.rs @@ -108,7 +108,6 @@ impl BasicVirtualizedQuery { let id_var = Variable::new_unchecked(ID_VARIABLE_NAME); let mut queue = vec![(&self.query_source_variable, &id_var)]; while let Some((current_query_var, current_template_var)) = queue.pop() { - if !visited_query_vars.contains(¤t_query_var) { visited_query_vars.insert(current_query_var); for p in patterns { @@ -291,7 +290,11 @@ impl VirtualizedQuery { pub fn validate(&self, df: &DataFrame) -> Result<(), VirtualizedResultValidationError> { let expected_columns = self.expected_columns(); - let df_columns: HashSet<&str> = df.get_column_names().into_iter().map(|x|x.as_str()).collect(); + let df_columns: HashSet<&str> = df + .get_column_names() + .into_iter() + .map(|x| x.as_str()) + .collect(); if expected_columns != df_columns { let err = VirtualizedResultValidationError { missing_columns: expected_columns diff --git a/lib/virtualized_query/src/python.rs b/lib/virtualized_query/src/python.rs index 6095bfd..873cdad 100644 --- a/lib/virtualized_query/src/python.rs +++ b/lib/virtualized_query/src/python.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use crate::VirtualizedQuery; use polars::prelude::AnyValue; use pyo3::prelude::*; @@ -7,6 +6,7 @@ use spargebra::algebra::{ AggregateExpression, AggregateFunction, Expression, Function, OrderExpression, }; use spargebra::term::TermPattern; +use std::collections::HashMap; #[derive(Clone)] #[pyclass(name = "VirtualizedQuery")] @@ -104,9 +104,7 @@ impl PyVirtualizedQuery { match self { PyVirtualizedQuery::Basic { id_grouping_tuples, .. - } => { - id_grouping_tuples.clone() - } + } => id_grouping_tuples.clone(), _ => None, } } @@ -116,9 +114,7 @@ impl PyVirtualizedQuery { PyVirtualizedQuery::Basic { grouping_column_name, .. - } => { - grouping_column_name.clone() - } + } => grouping_column_name.clone(), _ => None, } } @@ -185,9 +181,7 @@ impl PyVirtualizedQuery { #[getter] fn limit(&self) -> Option { match self { - PyVirtualizedQuery::Sliced { limit, .. } => { - limit.as_ref().map(|limit| *limit) - } + PyVirtualizedQuery::Sliced { limit, .. } => limit.as_ref().map(|limit| *limit), _ => None, } } @@ -215,12 +209,14 @@ impl PyVirtualizedQuery { let mut id_grouping_tuples = vec![]; let id_iter = df .column(basic.identifier_variable.as_str()) - .unwrap().as_materialized_series() + .unwrap() + .as_materialized_series() .iter(); let group_iter = df .column(basic.grouping_col.as_ref().unwrap()) .unwrap() - .as_materialized_series().iter(); + .as_materialized_series() + .iter(); for (id, group) in id_iter.zip(group_iter) { if let (AnyValue::String(id), AnyValue::Int64(group)) = (id, group) { id_grouping_tuples.push((id.to_string(), group)); diff --git a/py_chrontext/src/lib.rs b/py_chrontext/src/lib.rs index 73da466..4855ca6 100644 --- a/py_chrontext/src/lib.rs +++ b/py_chrontext/src/lib.rs @@ -189,9 +189,12 @@ impl PyEngine { }; let sparql_endpoint = self.sparql_endpoint.clone(); - let sparql_oxigraph_config = self.sparql_embedded_oxigraph.as_ref().map(|store| EmbeddedOxigraph { - store: store.clone(), - }); + let sparql_oxigraph_config = + self.sparql_embedded_oxigraph + .as_ref() + .map(|store| EmbeddedOxigraph { + store: store.clone(), + }); let mut virtualization_map = HashMap::new(); for (k, v) in &self.resources {