Skip to content

Commit

Permalink
Fix overlapping blank node ids when reading multiple files
Browse files Browse the repository at this point in the history
  • Loading branch information
magbak committed Dec 26, 2023
1 parent ced7c65 commit 1dd4e51
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 36 deletions.
3 changes: 2 additions & 1 deletion arrow_python_utils/src/to_rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ pub fn polars_df_to_rust_df(df: &PyAny) -> PyResult<DataFrame> {
}

pub fn array_to_rust_df(rb: &[&PyAny]) -> PyResult<DataFrame> {
let schema = rb.first()
let schema = rb
.first()
.ok_or_else(|| ToRustError::Other("empty table".into()))?
.getattr("schema")?;
let names = schema.getattr("names")?.extract::<Vec<String>>()?;
Expand Down
3 changes: 1 addition & 2 deletions maplib/src/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ impl Mapping {
base_iri: Option<String>,
transient: bool,
) -> Result<(), MappingError> {
self
.triplestore
self.triplestore
.read_triples(p, base_iri, transient)
.map_err(MappingError::TriplestoreError)
}
Expand Down
2 changes: 2 additions & 0 deletions triplestore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct Triplestore {
pub(crate) caching_folder: Option<String>,
df_map: HashMap<NamedNode, HashMap<(RDFNodeType, RDFNodeType), TripleTable>>,
transient_df_map: HashMap<NamedNode, HashMap<(RDFNodeType, RDFNodeType), TripleTable>>,
parser_call: usize,
}

pub struct TripleTable {
Expand Down Expand Up @@ -131,6 +132,7 @@ impl Triplestore {
transient_df_map: HashMap::new(),
deduplicated: true,
caching_folder,
parser_call: 0,
})
}

Expand Down
3 changes: 2 additions & 1 deletion triplestore/src/sparql/lazy_graph_patterns/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ impl Triplestore {

let mut join_on: Vec<_> = left_solution_mappings
.columns
.intersection(&right_columns).cloned()
.intersection(&right_columns)
.cloned()
.collect();
join_on.sort();

Expand Down
8 changes: 2 additions & 6 deletions triplestore/src/sparql/lazy_graph_patterns/left_join.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use super::Triplestore;
use crate::sparql::errors::SparqlError;
use crate::sparql::multitype::{
create_join_compatible_solution_mappings, join_workaround,
};
use crate::sparql::multitype::{create_join_compatible_solution_mappings, join_workaround};
use crate::sparql::query_context::{Context, PathEntry};
use crate::sparql::solution_mapping::{is_string_col, SolutionMappings};
use log::debug;
Expand Down Expand Up @@ -51,9 +49,7 @@ impl Triplestore {
rdf_node_types: left_datatypes,
} = left_solution_mappings;

let mut join_on: Vec<_> = left_columns
.intersection(&right_columns).cloned()
.collect();
let mut join_on: Vec<_> = left_columns.intersection(&right_columns).cloned().collect();
join_on.sort();

let (mut left_mappings, mut left_datatypes, mut right_mappings, right_datatypes) =
Expand Down
4 changes: 3 additions & 1 deletion triplestore/src/sparql/lazy_graph_patterns/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,9 @@ fn sparse_path(
dt_obj: dt_obj_left,
})
}
} else { res_right }
} else {
res_right
}
}
PropertyPathExpression::ZeroOrMore(inner) => {
if let Some(SparsePathReturn {
Expand Down
12 changes: 4 additions & 8 deletions triplestore/src/sparql/lazy_graph_patterns/triple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ impl Triplestore {
{
let overlap: Vec<_> = colnames
.iter()
.filter(|x| columns.contains(*x)).cloned()
.filter(|x| columns.contains(*x))
.cloned()
.collect();
if height_0 {
// Important that overlapping cols are dropped from mappings and not from lf,
Expand Down Expand Up @@ -162,13 +163,8 @@ impl Triplestore {
mappings = mappings.with_column(col(c).cast(DataType::Categorical(None)));
}

mappings = join_workaround(
mappings,
&rdf_node_types,
lf,
&dts,
JoinType::Inner.into(),
);
mappings =
join_workaround(mappings, &rdf_node_types, lf, &dts, JoinType::Inner.into());
} else {
mappings = mappings.join(lf, [], [], JoinType::Cross.into());
}
Expand Down
9 changes: 6 additions & 3 deletions triplestore/src/sparql/multitype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub const MULTI_BLANK_DT: &str = "B";
pub const MULTI_PLACEHOLDER_LANG: &str = "?";

pub fn convert_lf_col_to_multitype(lf: LazyFrame, c: &str, dt: &RDFNodeType) -> LazyFrame {

match dt {
RDFNodeType::IRI => lf.with_column(
as_struct(vec![
Expand Down Expand Up @@ -420,8 +419,12 @@ pub fn split_df_multicols(
}

pub fn lf_printer(lf: &LazyFrame) {
let df = lf_destruct(lf);
println!("DF: {}", df);
}

pub fn lf_destruct(lf: &LazyFrame) -> DataFrame {
let df = lf.clone().collect().unwrap();
println!("DF datatypes {:?}", df.dtypes());
let colnames: Vec<_> = df
.get_column_names()
.iter()
Expand Down Expand Up @@ -466,7 +469,7 @@ pub fn lf_printer(lf: &LazyFrame) {
series_vec.push(ser.clone());
}
}
println!("DF: {}", DataFrame::new(series_vec).unwrap());
DataFrame::new(series_vec).unwrap()
}

pub fn join_workaround(
Expand Down
4 changes: 1 addition & 3 deletions triplestore/src/sparql/solution_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ pub fn is_string_col(rdf_node_type: &RDFNodeType) -> bool {
match rdf_node_type {
RDFNodeType::IRI => true,
RDFNodeType::BlankNode => true,
RDFNodeType::Literal(lit) => {
lit.as_ref() == xsd::STRING
}
RDFNodeType::Literal(lit) => lit.as_ref() == xsd::STRING,
RDFNodeType::MultiType => false,
RDFNodeType::None => false,
}
Expand Down
25 changes: 14 additions & 11 deletions triplestore/src/triples_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ impl Triplestore {
) -> Result<(), TriplestoreError> {
//Copied from the documentation of rio_turtle
let mut predicate_map = HashMap::new();
let parser_call = self.parser_call.to_string();
let parse_func = &mut |t: rio_api::model::Triple| {
let verb_key = t.predicate.iri;
if !predicate_map.contains_key(verb_key) {
Expand All @@ -39,8 +40,8 @@ impl Triplestore {
}

let (subjects, objects) = type_map.get_mut(&(types_tuple)).unwrap();
subjects.push(rio_subject_to_oxrdf_subject(&t.subject));
objects.push(rio_term_to_oxrdf_term(&t.object));
subjects.push(rio_subject_to_oxrdf_subject(&t.subject, &parser_call));
objects.push(rio_term_to_oxrdf_term(&t.object, &parser_call));
Ok(())
};

Expand All @@ -55,9 +56,7 @@ impl Triplestore {

if path.extension() == Some("ttl".as_ref()) {
let mut tparser = TurtleParser::new(
BufReader::new(
File::open(path).map_err(TriplestoreError::ReadTriplesFileError)?,
),
BufReader::new(File::open(path).map_err(TriplestoreError::ReadTriplesFileError)?),
base_iri,
);
tparser
Expand Down Expand Up @@ -121,18 +120,19 @@ impl Triplestore {
});
}
}
self.parser_call += 1;
self.add_triples_vec(triples_to_add, &uuid::Uuid::new_v4().to_string(), transient)?;
Ok(())
}
}

fn rio_term_to_oxrdf_term(t: &rio_api::model::Term) -> oxrdf::Term {
fn rio_term_to_oxrdf_term(t: &rio_api::model::Term, parser_call: &str) -> oxrdf::Term {
match t {
rio_api::model::Term::NamedNode(nn) => {
oxrdf::Term::NamedNode(rio_named_node_to_oxrdf_named_node(nn))
}
rio_api::model::Term::BlankNode(bn) => {
oxrdf::Term::BlankNode(rio_blank_node_to_oxrdf_blank_node(bn))
oxrdf::Term::BlankNode(rio_blank_node_to_oxrdf_blank_node(bn, parser_call))
}
rio_api::model::Term::Literal(l) => oxrdf::Term::Literal(rio_literal_to_oxrdf_literal(l)),
rio_api::model::Term::Triple(_) => {
Expand All @@ -153,22 +153,25 @@ fn rio_literal_to_oxrdf_literal(l: &rio_api::model::Literal) -> oxrdf::Literal {
}
}

fn rio_subject_to_oxrdf_subject(s: &rio_api::model::Subject) -> oxrdf::Subject {
fn rio_subject_to_oxrdf_subject(s: &rio_api::model::Subject, parser_call: &str) -> oxrdf::Subject {
match s {
rio_api::model::Subject::NamedNode(nn) => {
oxrdf::Subject::NamedNode(rio_named_node_to_oxrdf_named_node(nn))
}
rio_api::model::Subject::BlankNode(bn) => {
oxrdf::Subject::BlankNode(rio_blank_node_to_oxrdf_blank_node(bn))
oxrdf::Subject::BlankNode(rio_blank_node_to_oxrdf_blank_node(bn, parser_call))
}
rio_api::model::Subject::Triple(_) => {
todo!()
}
}
}

fn rio_blank_node_to_oxrdf_blank_node(bn: &rio_api::model::BlankNode) -> oxrdf::BlankNode {
oxrdf::BlankNode::new_unchecked(bn.id)
fn rio_blank_node_to_oxrdf_blank_node(
bn: &rio_api::model::BlankNode,
parser_call: &str,
) -> oxrdf::BlankNode {
oxrdf::BlankNode::new_unchecked(format!("{}_{}", bn.id, parser_call))
}

fn rio_named_node_to_oxrdf_named_node(nn: &rio_api::model::NamedNode) -> oxrdf::NamedNode {
Expand Down

0 comments on commit 1dd4e51

Please sign in to comment.