Skip to content

Commit

Permalink
Rustfmt
Browse files Browse the repository at this point in the history
  • Loading branch information
magbak committed Dec 30, 2024
1 parent d0cba99 commit f754595
Show file tree
Hide file tree
Showing 31 changed files with 194 additions and 160 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ members = [
#pydf_io = { path = "../maplib/lib/pydf_io"}
#representation = { path = "../maplib/lib/representation", features = ["rdf-star"]}
#templates = { path = "../maplib/lib/templates"}
spargebra = { git = "https://github.com/DataTreehouse/maplib", rev="d262865ba5bcd3f3612a1ae0709999f6ce002d62", features = ["rdf-star"]}
query_processing = { git = "https://github.com/DataTreehouse/maplib", rev="d262865ba5bcd3f3612a1ae0709999f6ce002d62" }
pydf_io = { git = "https://github.com/DataTreehouse/maplib", rev="d262865ba5bcd3f3612a1ae0709999f6ce002d62" }
representation = { git = "https://github.com/DataTreehouse/maplib", rev="d262865ba5bcd3f3612a1ae0709999f6ce002d62", features = ["rdf-star"] }
templates = { git = "https://github.com/DataTreehouse/maplib", rev="d262865ba5bcd3f3612a1ae0709999f6ce002d62" }
spargebra = { git = "https://github.com/DataTreehouse/maplib", rev="0b0c67e767cc1c67156bde273a1a717b93c30a42", features = ["rdf-star"]}
query_processing = { git = "https://github.com/DataTreehouse/maplib", rev="0b0c67e767cc1c67156bde273a1a717b93c30a42" }
pydf_io = { git = "https://github.com/DataTreehouse/maplib", rev="0b0c67e767cc1c67156bde273a1a717b93c30a42" }
representation = { git = "https://github.com/DataTreehouse/maplib", rev="0b0c67e767cc1c67156bde273a1a717b93c30a42", features = ["rdf-star"] }
templates = { git = "https://github.com/DataTreehouse/maplib", rev="0b0c67e767cc1c67156bde273a1a717b93c30a42" }


sparesults = { version = "0.2.3", features = ["rdf-star"] }
Expand Down
18 changes: 12 additions & 6 deletions lib/bigquery-polars/src/querying.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

use std::sync::Arc;
use crate::errors::BigQueryExecutorError;
use gcp_bigquery_client::error::BQError;
use gcp_bigquery_client::job::JobApi;
Expand All @@ -34,12 +33,16 @@ use gcp_bigquery_client::model::get_query_results_response::GetQueryResultsRespo
use gcp_bigquery_client::model::query_request::QueryRequest;
use gcp_bigquery_client::model::table_cell::TableCell;
use gcp_bigquery_client::Client;
use polars::prelude::{concat, AnyValue, Column, DataFrame, DataType, IntoColumn, IntoLazy, LazyFrame, PlSmallStr, TimeUnit};
use polars::prelude::{
concat, AnyValue, Column, DataFrame, DataType, IntoColumn, IntoLazy, LazyFrame, PlSmallStr,
TimeUnit,
};
use polars::series::Series;
use rayon::iter::IndexedParallelIterator;
use rayon::iter::IntoParallelRefIterator;
use rayon::iter::ParallelIterator;
use rayon::prelude::IntoParallelIterator;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;

Expand Down Expand Up @@ -139,7 +142,9 @@ impl BigQueryExecutor {
.into_par_iter()
.zip(names.par_iter())
.map(|(any_value_vec, name)| {
Series::from_any_values(name.into(), any_value_vec.as_slice(), false).unwrap().into_column()
Series::from_any_values(name.into(), any_value_vec.as_slice(), false)
.unwrap()
.into_column()
})
.collect();
all_lfs.push(DataFrame::new(columns_vec).unwrap().lazy())
Expand Down Expand Up @@ -181,8 +186,7 @@ impl BigQueryExecutor {
start_index: None,
timeout_ms: None,
};
job
.get_query_results(self.project_id.as_str(), job_id, params.clone())
job.get_query_results(self.project_id.as_str(), job_id, params.clone())
.await
.map_err(map_bqerr)
}
Expand Down Expand Up @@ -215,7 +219,9 @@ fn table_cell_to_any<'a>(
AnyValue::Boolean(value_as_ref.as_str().unwrap().parse::<bool>().unwrap())
}
FieldType::Timestamp => {
let some_utc = some_utc.as_ref().map(|tz| Arc::new(PlSmallStr::from_str(tz)));
let some_utc = some_utc
.as_ref()
.map(|tz| Arc::new(PlSmallStr::from_str(tz)));
let ts_str = value_as_ref.as_str().unwrap();
let timestamp_ns = (ts_str.parse::<f64>().unwrap() * (1e9f64)) as i64;
AnyValue::DatetimeOwned(timestamp_ns, TimeUnit::Nanoseconds, some_utc)
Expand Down
14 changes: 3 additions & 11 deletions lib/chrontext/src/combiner/lazy_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,10 +630,8 @@ impl Combiner {
let mut output_solution_mappings = solution_mappings;
for i in 0..inner.len() {
let inner_context = inner_contexts.get(i).unwrap();
let inner_prepared_virtualized_queries = split_virtualized_queries(
&mut prepared_virtualized_queries,
inner_context,
);
let inner_prepared_virtualized_queries =
split_virtualized_queries(&mut prepared_virtualized_queries, inner_context);
let inner_static_query_map =
split_static_queries_opt(&mut static_query_map, inner_context);
output_solution_mappings = self
Expand Down Expand Up @@ -669,13 +667,7 @@ impl Combiner {
.await?;
args_contexts.insert(i, arg_context);
}
func_expression(
output_solution_mappings,
func,
args,
args_contexts,
context,
)?
func_expression(output_solution_mappings, func, args, args_contexts, context)?
}
};
Ok(output_solution_mappings)
Expand Down
7 changes: 4 additions & 3 deletions lib/chrontext/src/combiner/lazy_graph_patterns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod order_by;
mod project;
mod slice;
mod union;
mod values;

use super::Combiner;
use crate::combiner::CombinerError;
Expand Down Expand Up @@ -195,9 +196,9 @@ impl Combiner {
.await
}
GraphPattern::Values {
variables: _,
bindings: _,
} => Ok(updated_solution_mappings.unwrap()),
variables,
bindings,
} => self.lazy_values(updated_solution_mappings, variables, bindings),
GraphPattern::OrderBy { inner, expression } => {
self.lazy_order_by(
inner,
Expand Down
24 changes: 24 additions & 0 deletions lib/chrontext/src/combiner/lazy_graph_patterns/values.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use super::Combiner;
use crate::combiner::CombinerError;
use oxrdf::Variable;
use polars::prelude::JoinType;
use query_processing::graph_patterns::{join, values_pattern};
use representation::solution_mapping::SolutionMappings;
use spargebra::term::GroundTerm;

impl Combiner {
pub(crate) fn lazy_values(
&mut self,
solution_mappings: Option<SolutionMappings>,
variables: &[Variable],
bindings: &[Vec<Option<GroundTerm>>],
) -> Result<SolutionMappings, CombinerError> {
let sm = values_pattern(variables, bindings);
if let Some(mut mappings) = solution_mappings {
mappings = join(mappings, sm, JoinType::Inner)?;
Ok(mappings)
} else {
Ok(sm)
}
}
}
25 changes: 14 additions & 11 deletions lib/chrontext/src/combiner/static_subqueries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ impl Combiner {
&mut self.prepper.basic_virtualized_queries,
)?;
let (df, datatypes) = create_static_query_dataframe(&use_query, solutions);
let df_height = df.height();
debug!("Static query results:\n {}", df);
let mut out_solution_mappings = SolutionMappings::new(df.lazy(), datatypes);
let mut out_solution_mappings = SolutionMappings::new(df.lazy(), datatypes, df_height);
if let Some(use_solution_mappings) = use_solution_mappings {
out_solution_mappings = join(
out_solution_mappings,
Expand Down Expand Up @@ -77,7 +78,9 @@ pub(crate) fn split_static_queries_opt(
static_queries: &mut Option<HashMap<Context, Query>>,
context: &Context,
) -> Option<HashMap<Context, Query>> {
static_queries.as_mut().map(|static_queries| split_static_queries(static_queries, context))
static_queries
.as_mut()
.map(|static_queries| split_static_queries(static_queries, context))
}

fn constrain_query(
Expand Down Expand Up @@ -119,15 +122,15 @@ fn constrain_query(
x.into_iter()
.map(|y: Option<Term>| {
y.map(|y| match y {
Term::NamedNode(nn) => GroundTerm::NamedNode(nn),
Term::BlankNode(_) => {
panic!()
}
Term::Literal(l) => GroundTerm::Literal(l),
Term::Triple(_) => {
todo!()
}
})
Term::NamedNode(nn) => GroundTerm::NamedNode(nn),
Term::BlankNode(_) => {
panic!()
}
Term::Literal(l) => GroundTerm::Literal(l),
Term::Triple(_) => {
todo!()
}
})
})
.collect()
})
Expand Down
5 changes: 4 additions & 1 deletion lib/chrontext/src/combiner/virtualized_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use crate::preparing::grouping_col_type;
use log::debug;
use oxrdf::vocab::xsd;
use oxrdf::Term;
use polars::prelude::{col, CategoricalOrdering, Column, DataFrame, DataType, Expr, IntoLazy, JoinArgs, JoinType, SortMultipleOptions};
use polars::prelude::{
col, CategoricalOrdering, Column, DataFrame, DataType, Expr, IntoLazy, JoinArgs, JoinType,
SortMultipleOptions,
};
use representation::polars_to_rdf::polars_type_to_literal_type;
use representation::query_context::Context;
use representation::solution_mapping::{EagerSolutionMappings, SolutionMappings};
Expand Down
11 changes: 7 additions & 4 deletions lib/chrontext/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::splitter::parse_sparql_select_query;
use log::debug;
use polars::enable_string_cache;
use polars::frame::DataFrame;
use polars::prelude::PlSmallStr;
use representation::query_context::Context;
use representation::solution_mapping::SolutionMappings;
use representation::RDFNodeType;
Expand All @@ -15,7 +16,6 @@ use sparql_database::endpoint::SparqlEndpoint;
use sparql_database::SparqlQueryable;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use polars::prelude::PlSmallStr;
use virtualization::{Virtualization, VirtualizedDatabase};
use virtualized_query::pushdown_setting::PushdownSetting;

Expand Down Expand Up @@ -118,16 +118,19 @@ impl Engine {
.map_err(ChrontextError::CombinerError)?;
for (original, renamed) in rename_map {
if let Some(dt) = solution_mappings.rdf_node_types.remove(&renamed) {
solution_mappings.mappings = solution_mappings
.mappings
.rename(&[PlSmallStr::from_string(renamed)], &[PlSmallStr::from_str(&original)], true);
solution_mappings.mappings = solution_mappings.mappings.rename(
&[PlSmallStr::from_string(renamed)],
&[PlSmallStr::from_str(&original)],
true,
);
solution_mappings.rdf_node_types.insert(original, dt);
}
}

let SolutionMappings {
mappings,
rdf_node_types,
..
} = solution_mappings;

Ok((
Expand Down
10 changes: 2 additions & 8 deletions lib/chrontext/src/preparing/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,8 @@ impl TimeseriesQueryPrepper {
context: &Context,
) -> EXPrepReturn {
match expression {
Expression::NamedNode(..) => {

EXPrepReturn::new(HashMap::new())
}
Expression::Literal(..) => {

EXPrepReturn::new(HashMap::new())
}
Expression::NamedNode(..) => EXPrepReturn::new(HashMap::new()),
Expression::Literal(..) => EXPrepReturn::new(HashMap::new()),
Expression::Variable(..) => EXPrepReturn::new(HashMap::new()),
Expression::Or(left, right) => self.prepare_or_expression(
left,
Expand Down
1 change: 0 additions & 1 deletion lib/chrontext/src/preparing/expressions/not_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ impl TimeseriesQueryPrepper {
solution_mappings: &mut SolutionMappings,
context: &Context,
) -> EXPrepReturn {

self.prepare_expression(
wrapped,
try_groupby_complex_query,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl TimeseriesQueryPrepper {
UnaryOrdinaryOperator::UnaryPlus => PathEntry::UnaryPlus,
UnaryOrdinaryOperator::UnaryMinus => PathEntry::UnaryMinus,
};

self.prepare_expression(
wrapped,
try_groupby_complex_query,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl TimeseriesQueryPrepper {
);
return GPPrepReturn::fail_groupby_complex_query();
}

self.prepare_graph_pattern(
inner,
try_groupby_complex_query,
Expand Down
Loading

0 comments on commit f754595

Please sign in to comment.