Skip to content

Commit

Permalink
Well.. not working quite yet
Browse files Browse the repository at this point in the history
  • Loading branch information
magbak committed Dec 22, 2023
1 parent 00b0c07 commit 1c96b85
Show file tree
Hide file tree
Showing 19 changed files with 383 additions and 881 deletions.
10 changes: 8 additions & 2 deletions maplib/src/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ impl Mapping {
Mapping::new(&dataset, caching_folder)
}

pub fn read_triples(&mut self, p: &Path, base_iri: Option<String>, transient:bool) -> Result<(), MappingError> {
pub fn read_triples(
&mut self,
p: &Path,
base_iri: Option<String>,
transient: bool,
) -> Result<(), MappingError> {
Ok(self
.triplestore
.read_triples(p, base_iri, transient)
Expand Down Expand Up @@ -625,7 +630,8 @@ fn create_remapped(
new_series.push(series);
new_dynamic_columns.insert(target_colname.clone(), primitive_column);
new_dynamic_from_constant.push(target_colname);
out_blank_node_counter = max(out_blank_node_counter, blank_node_counter+input_df_height);
out_blank_node_counter =
max(out_blank_node_counter, blank_node_counter + input_df_height);
} else if original.list_expand {
let (expr, primitive_column) =
create_dynamic_expression_from_static(target_colname, ct, &target.ptype)?;
Expand Down
4 changes: 3 additions & 1 deletion py_maplib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ edition = "2021"
pyo3 = {version = "0.19.2", features = ["extension-module"]}
maplib = {path="../maplib"}
triplestore = {path="../triplestore"}
representation = {path = "../representation"}
shacl = {path="../shacl"}

oxrdf = {version="0.1.7"}
arrow_python_utils = {path="../arrow_python_utils"}
thiserror="1.0.31"
polars-core = {version="0.34.2", features=["object", "dtype-array", "dtype-categorical", "dtype-date", "dtype-datetime",
polars-lazy = "0.34.2"
polars-core = {version="0.34.2", features=["dtype-array", "dtype-categorical", "dtype-date", "dtype-datetime",
"dtype-decimal", "dtype-duration", "dtype-i8", "dtype-i16", "dtype-struct", "dtype-time", "dtype-u8", "dtype-u16"]}
log ="0.4.19"

Expand Down
26 changes: 11 additions & 15 deletions py_maplib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ use triplestore::sparql::QueryResult;
use jemallocator::Jemalloc;
use polars_core::frame::DataFrame;
use polars_core::prelude::{DataType, NamedFrom};
use triplestore::sparql::multitype::{multi_series_to_string_series, MULTI_TYPE_NAME};
use polars_lazy::frame::IntoLazy;
use representation::RDFNodeType;
use triplestore::sparql::multitype::{multi_col_to_string_col};

#[cfg(not(target_os = "linux"))]
use mimalloc::MiMalloc;
Expand Down Expand Up @@ -342,13 +344,13 @@ impl Mapping {
.map_err(PyMaplibError::from)?;
match res {
QueryResult::Select(mut df, datatypes) => {
df = fix_multicolumns(df);
df = fix_multicolumns(df, &datatypes);
df_to_py_df(df, py)
}
QueryResult::Construct(dfs) => {
let dfs = dfs
.into_iter()
.map(|(df, subj_type, obj_type)| fix_multicolumns(df))
.map(|(df, subj_type, obj_type)| fix_multicolumns(df, &[("subject".to_string(), subj_type), ("object".to_string(), obj_type)].into()))
.collect();
Ok(df_vec_to_py_df_list(dfs, py)?.into())
}
Expand Down Expand Up @@ -555,18 +557,12 @@ fn is_blank_node(s: &str) -> bool {
s.starts_with("_:")
}

fn fix_multicolumns(mut df: DataFrame) -> DataFrame {
let columns: Vec<_> = df
.get_column_names()
.iter()
.map(|x| x.to_string())
.collect();
for c in columns {
if df.column(&c).unwrap().dtype() == &DataType::Object(MULTI_TYPE_NAME) {
let ser = df.column(&c).unwrap();
let new_ser = multi_series_to_string_series(ser);
df.with_column(new_ser).unwrap();
fn fix_multicolumns(mut df: DataFrame, dts:&HashMap<String, RDFNodeType>) -> DataFrame {
let mut lf = df.lazy();
for (c, v) in dts {
if v == &RDFNodeType::MultiType {
lf = multi_col_to_string_col(lf, c);
}
}
df
lf.collect().unwrap()
}
1 change: 1 addition & 0 deletions py_maplib/tests/test_blank_nodes_multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def test_multi_datatype_query_no_error(blank_person_mapping):
}
""")
by = ["s","v","o"]
print(df)
df = df.sort(by=by)
filename = TESTDATA_PATH / "multi_datatype_query.csv"
#df.write_csv(filename)
Expand Down
30 changes: 17 additions & 13 deletions representation/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pub mod literals;

use std::fmt::Display;
use oxrdf::vocab::{rdf, xsd};
use oxrdf::{BlankNode, NamedNode, NamedNodeRef, NamedOrBlankNode, Term};
use polars_core::prelude::{DataType, TimeUnit};
Expand Down Expand Up @@ -39,10 +38,10 @@ impl RDFNodeType {
TermPattern::NamedNode(_) => Some(RDFNodeType::IRI),
TermPattern::BlankNode(_) => None,
TermPattern::Literal(l) => Some(RDFNodeType::Literal(l.datatype().into_owned())),
TermPattern::Variable(_v) => None,
_ => {
unimplemented!()
}
TermPattern::Variable(_v) => None,
}
}

Expand Down Expand Up @@ -92,18 +91,23 @@ impl RDFNodeType {
}

pub fn find_triple_type(&self) -> TripleType {
if matches!(self, RDFNodeType::IRI | RDFNodeType::BlankNode) {
TripleType::ObjectProperty
} else if let RDFNodeType::Literal(lit) = self {
if lit.as_ref() == xsd::STRING {
TripleType::StringProperty
} else if lit.as_ref() == rdf::LANG_STRING {
TripleType::LangStringProperty
} else {
TripleType::NonStringProperty
match self {
RDFNodeType::IRI | RDFNodeType::BlankNode => TripleType::ObjectProperty,
RDFNodeType::Literal(lit) => {
if lit.as_ref() == xsd::STRING {
TripleType::StringProperty
} else if lit.as_ref() == rdf::LANG_STRING {
TripleType::LangStringProperty
} else {
TripleType::NonStringProperty
}
}
RDFNodeType::None => {
panic!()
}
RDFNodeType::MultiType => {
panic!()
}
} else {
todo!("Triple type {:?} not supported", self)
}
}

Expand Down
2 changes: 1 addition & 1 deletion triplestore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ rayon = "1.6.0"
sprs = {version="0.11.0", features=["rayon"]}
spargebra = "0.2.2"
oxrdf = {version="0.1.7"}
polars = {version="0.34.2", features=["object", "performant", "semi_anti_join","abs", "round_series", "lazy", "concat_str", "is_in", "dtype-full", "strings", "horizontal_concat", "rows", "timezones", "polars-time", "temporal", "list_eval", "partition_by", "parquet", "diagonal_concat", "cross_join", "cum_agg"] }
polars = {version="0.34.2", features=["zip_with","performant", "semi_anti_join","abs", "round_series", "lazy", "concat_str", "is_in", "dtype-full", "strings", "horizontal_concat", "rows", "timezones", "polars-time", "temporal", "list_eval", "partition_by", "parquet", "diagonal_concat", "cross_join", "cum_agg"] }
log="0.4.19"
rio_turtle = "0.8.4"
rio_api = "0.8.4"
Expand Down
6 changes: 4 additions & 2 deletions triplestore/src/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ pub fn convert_to_string(series: &Series) -> Option<Series> {
(lit("\"")
+ col(series.name())
.struct_()
.field_by_name(LANG_STRING_VALUE_FIELD).cast(DataType::Utf8)
.field_by_name(LANG_STRING_VALUE_FIELD)
.cast(DataType::Utf8)
+ lit("\"@")
+ col(series.name())
.struct_()
.field_by_name(LANG_STRING_LANG_FIELD)).cast(DataType::Utf8)
.field_by_name(LANG_STRING_LANG_FIELD))
.cast(DataType::Utf8)
.alias(series.name()),
)
.collect()
Expand Down
14 changes: 9 additions & 5 deletions triplestore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::errors::TriplestoreError;
use crate::io_funcs::{create_folder_if_not_exists, delete_tmp_parquets_in_caching_folder};
use crate::sparql::lazy_graph_patterns::load_tt::multiple_tt_to_lf;
use log::debug;
use oxrdf::vocab::{rdf, xsd};
use oxrdf::vocab::xsd;
use oxrdf::NamedNode;
use parquet_io::{
property_to_filename, read_parquet, split_write_tmp_df, write_parquet, ParquetIOError,
Expand Down Expand Up @@ -134,6 +134,10 @@ impl Triplestore {
})
}

pub fn is_deduplicated(&self) -> bool {
self.deduplicated
}

pub fn deduplicate(&mut self) -> Result<(), TriplestoreError> {
let now = Instant::now();
deduplicate_map(&mut self.df_map, &self.caching_folder)?;
Expand Down Expand Up @@ -559,9 +563,9 @@ fn flatten<T>(nested: Vec<Vec<T>>) -> Vec<T> {
nested.into_iter().flatten().collect()
}


fn deduplicate_map(df_map:&mut HashMap<NamedNode, HashMap<(RDFNodeType, RDFNodeType), TripleTable>>,
caching_folder: &Option<String>,
fn deduplicate_map(
df_map: &mut HashMap<NamedNode, HashMap<(RDFNodeType, RDFNodeType), TripleTable>>,
caching_folder: &Option<String>,
) -> Result<(), TriplestoreError> {
for (predicate, map) in df_map {
for v in map.values_mut() {
Expand Down Expand Up @@ -599,7 +603,7 @@ fn deduplicate_map(df_map:&mut HashMap<NamedNode, HashMap<(RDFNodeType, RDFNodeT
unique_df,
predicate.as_str(),
)
.map_err(TriplestoreError::ParquetIOError)?;
.map_err(TriplestoreError::ParquetIOError)?;
v.df_paths = Some(paths);
v.unique = true;
} else {
Expand Down
15 changes: 12 additions & 3 deletions triplestore/src/sparql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,25 @@ pub enum QueryResult {
}

impl Triplestore {
pub fn query_deduplicated(&self, query: &str) -> Result<QueryResult, SparqlError> {
let query = Query::parse(query, None).map_err(SparqlError::ParseError)?;
self.query_deduplicated_impl(&query)
}

pub fn query(&mut self, query: &str) -> Result<QueryResult, SparqlError> {
let query = Query::parse(query, None).map_err(SparqlError::ParseError)?;
self.query_parsed(&query)
self.query_impl(&query)
}

fn query_parsed(&mut self, query: &Query) -> Result<QueryResult, SparqlError> {
fn query_impl(&mut self, query: &Query) -> Result<QueryResult, SparqlError> {
if !self.deduplicated {
self.deduplicate()
.map_err(SparqlError::DeduplicationError)?;
}
self.query_deduplicated_impl(query)
}

fn query_deduplicated_impl(&self, query: &Query) -> Result<QueryResult, SparqlError> {
enable_string_cache();
let context = Context::new();
match query {
Expand Down Expand Up @@ -89,7 +98,7 @@ impl Triplestore {
let call_uuid = Uuid::new_v4().to_string();
let query = Query::parse(query, None).map_err(SparqlError::ParseError)?;
if let Query::Construct { .. } = &query {
let res = self.query_parsed(&query)?;
let res = self.query_impl(&query)?;
match res {
QueryResult::Select(_, _) => {
panic!("Should never happen")
Expand Down
5 changes: 0 additions & 5 deletions triplestore/src/sparql/lazy_graph_patterns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ mod values;

use super::Triplestore;
use crate::sparql::errors::SparqlError;
use crate::sparql::multitype::MultiType;
use crate::sparql::query_context::{Context, PathEntry};
use crate::sparql::solution_mapping::SolutionMappings;
use log::{debug, info};
use oxrdf::Literal;
use polars_core::prelude::ObjectChunked;
use spargebra::algebra::GraphPattern;

impl Triplestore {
Expand All @@ -35,8 +32,6 @@ impl Triplestore {

match graph_pattern {
GraphPattern::Bgp { patterns } => {
let v = vec![MultiType::Literal(Literal::new_simple_literal("abc"))];
ObjectChunked::new_from_vec("chunky", v);
let mut updated_solution_mappings = solution_mappings;
let bgp_context = context.extension_with(PathEntry::Bgp);
for tp in patterns {
Expand Down
20 changes: 2 additions & 18 deletions triplestore/src/sparql/lazy_graph_patterns/join.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use super::Triplestore;
use crate::sparql::errors::SparqlError;
use crate::sparql::lazy_graph_patterns::ordering::{decide_order, Order};
use crate::sparql::multitype::{
clean_up_after_join_workaround, create_compatible_solution_mappings,
create_join_compatible_solution_mappings, helper_cols_join_workaround_polars_object_series_bug,
};
use crate::sparql::multitype::create_join_compatible_solution_mappings;
use crate::sparql::query_context::{Context, PathEntry};
use crate::sparql::solution_mapping::{is_string_col, SolutionMappings};
use log::debug;
Expand Down Expand Up @@ -70,7 +67,7 @@ impl Triplestore {
rdf_node_types: left_datatypes,
} = left_solution_mappings;

let (left_mappings, mut left_datatypes, right_mappings, right_datatypes) =
let (left_mappings, mut left_datatypes, mut right_mappings, right_datatypes) =
create_join_compatible_solution_mappings(
left_mappings,
left_datatypes,
Expand All @@ -83,13 +80,6 @@ impl Triplestore {
left_datatypes.insert(k.clone(), v.clone());
}
}
let (left_mappings, mut right_mappings, left_original_map, right_original_map) =
helper_cols_join_workaround_polars_object_series_bug(
left_mappings,
right_mappings,
&join_on,
&left_datatypes,
);

let mut left_solution_mappings = SolutionMappings {
mappings: left_mappings,
Expand Down Expand Up @@ -127,12 +117,6 @@ impl Triplestore {
left_solution_mappings.columns.insert(c);
}

left_solution_mappings.mappings = clean_up_after_join_workaround(
left_solution_mappings.mappings,
left_original_map,
right_original_map,
);

Ok(left_solution_mappings)
}
}
21 changes: 2 additions & 19 deletions triplestore/src/sparql/lazy_graph_patterns/left_join.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use super::Triplestore;
use crate::sparql::errors::SparqlError;
use crate::sparql::multitype::{
clean_up_after_join_workaround, create_compatible_solution_mappings,
create_join_compatible_solution_mappings, helper_cols_join_workaround_polars_object_series_bug,
};
use crate::sparql::multitype::create_join_compatible_solution_mappings;
use crate::sparql::query_context::{Context, PathEntry};
use crate::sparql::solution_mapping::{is_string_col, SolutionMappings};
use log::debug;
Expand Down Expand Up @@ -58,7 +55,7 @@ impl Triplestore {
.collect();
join_on.sort();

let (left_mappings, mut left_datatypes, right_mappings, right_datatypes) =
let (left_mappings, mut left_datatypes, mut right_mappings, right_datatypes) =
create_join_compatible_solution_mappings(
left_mappings,
left_datatypes,
Expand All @@ -67,14 +64,6 @@ impl Triplestore {
false,
);

let (left_mappings, mut right_mappings, left_original_map, right_original_map) =
helper_cols_join_workaround_polars_object_series_bug(
left_mappings,
right_mappings,
&join_on,
&left_datatypes,
);

for (k, v) in &right_datatypes {
if !left_datatypes.contains_key(k) {
left_datatypes.insert(k.clone(), v.clone());
Expand Down Expand Up @@ -119,12 +108,6 @@ impl Triplestore {
left_solution_mappings.columns.insert(c);
}

left_solution_mappings.mappings = clean_up_after_join_workaround(
left_solution_mappings.mappings,
left_original_map,
right_original_map,
);

Ok(left_solution_mappings)
}
}
Loading

0 comments on commit 1c96b85

Please sign in to comment.