Skip to content

Commit

Permalink
PValues construction
Browse files Browse the repository at this point in the history
  • Loading branch information
magbak committed Jan 27, 2024
1 parent 29c7300 commit 1e5f81e
Show file tree
Hide file tree
Showing 31 changed files with 543 additions and 217 deletions.
33 changes: 6 additions & 27 deletions maplib/src/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ use std::collections::{HashMap, HashSet};
use std::io::Write;
use std::path::Path;
use std::time::Instant;
use triplestore::constants::{OBJECT_COL_NAME, SUBJECT_COL_NAME, VERB_COL_NAME};
use triplestore::{TriplesToAdd, Triplestore};
use uuid::Uuid;
use triplestore::constants::{OBJECT_COL_NAME, SUBJECT_COL_NAME, VERB_COL_NAME};

pub struct Mapping {
pub template_dataset: TemplateDataset,
Expand Down Expand Up @@ -185,8 +185,7 @@ impl Mapping {
let target_template = self.resolve_template(template)?.clone();
let target_template_name = target_template.signature.template_name.as_str().to_string();

let columns =
self.validate_infer_dataframe_columns(&target_template.signature, &df)?;
let columns = self.validate_infer_dataframe_columns(&target_template.signature, &df)?;
let ExpandOptions {
unique_subsets: unique_subsets_opt,
} = options;
Expand Down Expand Up @@ -368,14 +367,7 @@ impl Mapping {
ok_triples.push(t?);
}
let mut all_triples_to_add = vec![];
for (
mut df,
subj_rdf_node_type,
obj_rdf_node_type,
verb,
has_unique_subset,
) in ok_triples
{
for (mut df, subj_rdf_node_type, obj_rdf_node_type, verb, has_unique_subset) in ok_triples {
let mut coltypes_names = vec![
(&subj_rdf_node_type, SUBJECT_COL_NAME),
(&obj_rdf_node_type, OBJECT_COL_NAME),
Expand Down Expand Up @@ -448,16 +440,7 @@ fn get_term_names<'a>(out_vars: &mut Vec<&'a String>, term: &'a StottrTerm) {

fn create_triples(
i: OTTRTripleInstance,
) -> Result<
(
DataFrame,
RDFNodeType,
RDFNodeType,
Option<NamedNode>,
bool,
),
MappingError,
> {
) -> Result<(DataFrame, RDFNodeType, RDFNodeType, Option<NamedNode>, bool), MappingError> {
let OTTRTripleInstance {
mut df,
mut dynamic_columns,
Expand Down Expand Up @@ -524,9 +507,7 @@ fn create_dynamic_expression_from_static(
ptype: &Option<PType>,
) -> Result<(Expr, PrimitiveColumn), MappingError> {
let (mut expr, _, rdf_node_type) = constant_to_expr(constant_term, ptype)?;
let mapped_column = PrimitiveColumn {
rdf_node_type,
};
let mapped_column = PrimitiveColumn { rdf_node_type };
expr = expr.alias(column_name);
Ok((expr, mapped_column))
}
Expand All @@ -547,9 +528,7 @@ fn create_series_from_blank_node_constant(
n_rows,
)?;
series.rename(column_name);
let mapped_column = PrimitiveColumn {
rdf_node_type,
};
let mapped_column = PrimitiveColumn { rdf_node_type };
Ok((series, mapped_column))
}

Expand Down
10 changes: 3 additions & 7 deletions maplib/tests/test_stottr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use polars::series::Series;
use polars_core::prelude::{AnyValue, TimeUnit};
use rstest::*;
use serial_test::serial;
use std::collections::{HashSet};
use std::collections::HashSet;
use std::fs::File;
use std::path::PathBuf;

Expand Down Expand Up @@ -151,16 +151,12 @@ fn test_string_language_tag_cases() {
Triple {
subject: Subject::NamedNode(NamedNode::new_unchecked("http://example.net/ns#anObject")),
predicate: NamedNode::new_unchecked("http://example.net/ns#hasString"),
object: Term::Literal(Literal::new_simple_literal(
"one",
)),
object: Term::Literal(Literal::new_simple_literal("one")),
},
Triple {
subject: Subject::NamedNode(NamedNode::new_unchecked("http://example.net/ns#anObject")),
predicate: NamedNode::new_unchecked("http://example.net/ns#hasString"),
object: Term::Literal(Literal::new_simple_literal(
"two"
)),
object: Term::Literal(Literal::new_simple_literal("two")),
},
Triple {
subject: Subject::NamedNode(NamedNode::new_unchecked(
Expand Down
14 changes: 10 additions & 4 deletions py_maplib/python/maplib/_maplib.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import Union, List
from typing import Union, List, Dict

from polars import DataFrame
from .semantic_dataframe import SemanticDataFrame
Expand Down Expand Up @@ -82,7 +82,8 @@ class Mapping:
:return: The generated template
"""

def query(self, query: str) -> Union[SemanticDataFrame, List[SemanticDataFrame], None]:
def query(self, query: str, parameters: Dict[str, DataFrame] = None) -> Union[
SemanticDataFrame, List[SemanticDataFrame], None]:
"""
Query the contained knowledge graph using SPARQL
Currently, SELECT, CONSTRUCT and INSERT are supported.
Expand All @@ -97,10 +98,11 @@ class Mapping:
... print(res.types)
:param query: The SPARQL query string
:param parameters: PVALUES Parameters, a DataFrame containing the value bindings in the custom PVALUES construction.
:return: DataFrame (Select), list of DataFrames (Construct) containing results, or None for Insert-queries
"""

def insert(self, query: str):
def insert(self, query: str, parameters: Dict[str, DataFrame] = None, transient: bool = False):
"""
Insert the results of a Construct query in the graph.
Useful for being able to use the same query for inspecting what will be inserted and actually inserting.
Expand All @@ -119,10 +121,12 @@ class Mapping:
... m.insert(hpizzas)
:param query: The SPARQL Insert query string
:param parameters: PVALUES Parameters, a DataFrame containing the value bindings in the custom PVALUES construction.
:param transient: Should the inserted triples be included in exports?
:return: None
"""

def insert_sprout(self, query: str):
def insert_sprout(self, query: str, parameters: Dict[str, DataFrame] = None, transient: bool = False):
"""
Insert the results of a Construct query in the sprouted graph.
Useful for being able to use the same query for inspecting what will be inserted and actually inserting.
Expand All @@ -142,6 +146,8 @@ class Mapping:
... m.insert_sprout(hpizzas)
:param query: The SPARQL Insert query string
:param parameters: PVALUES Parameters, a DataFrame containing the value bindings in the custom PVALUES construction.
:param transient: Should the inserted triples be included in exports?
:return: None
"""

Expand Down
65 changes: 59 additions & 6 deletions py_maplib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ use triplestore::sparql::{QueryResult as SparqlQueryResult, QueryResult};
// SOFTWARE.
#[cfg(target_os = "linux")]
use jemallocator::Jemalloc;
use oxrdf::vocab::xsd;
use polars_core::frame::DataFrame;
use polars_lazy::frame::IntoLazy;
use pyo3::types::PyList;
use representation::multitype::multi_col_to_string_col;
use representation::polars_to_sparql::primitive_polars_type_to_literal_type;
use representation::solution_mapping::EagerSolutionMappings;
use representation::RDFNodeType;

#[cfg(not(target_os = "linux"))]
Expand Down Expand Up @@ -219,11 +222,17 @@ impl Mapping {
return Ok(format!("{}", tmpl));
}

fn query(&mut self, py: Python<'_>, query: String) -> PyResult<PyObject> {
fn query(
&mut self,
py: Python<'_>,
query: String,
parameters: Option<HashMap<String, &PyAny>>,
) -> PyResult<PyObject> {
let mapped_parameters = map_parameters(parameters)?;
let res = self
.inner
.triplestore
.query(&query)
.query(&query, &mapped_parameters)
.map_err(PyMaplibError::from)?;
query_to_result(res, py)
}
Expand All @@ -241,25 +250,27 @@ impl Mapping {
Ok(ValidationReport { conforms, report })
}

fn insert(&mut self, query: String, transient: Option<bool>) -> PyResult<()> {
fn insert(&mut self, query: String, parameters: Option<HashMap<String, &PyAny>>, transient: Option<bool>) -> PyResult<()> {
let mapped_parameters = map_parameters(parameters)?;
self.inner
.triplestore
.insert(&query, transient.unwrap_or(false))
.insert(&query, &mapped_parameters, transient.unwrap_or(false))
.map_err(PyMaplibError::from)?;
if self.sprout.is_some() {
self.sprout.as_mut().unwrap().blank_node_counter = self.inner.blank_node_counter;
}
Ok(())
}

fn insert_sprout(&mut self, query: String, transient: Option<bool>) -> PyResult<()> {
fn insert_sprout(&mut self, query: String, parameters: Option<HashMap<String, &PyAny>>, transient: Option<bool>) -> PyResult<()> {
let mapped_parameters = map_parameters(parameters)?;
if self.sprout.is_none() {
self.create_sprout()?;
}
let res = self
.inner
.triplestore
.query(&query)
.query(&query, &mapped_parameters)
.map_err(PyMaplibError::from)?;
if let QueryResult::Construct(dfs_and_dts) = res {
self.sprout
Expand Down Expand Up @@ -352,3 +363,45 @@ fn query_to_result(res: SparqlQueryResult, py: Python<'_>) -> PyResult<PyObject>
fn dtypes_map(map: HashMap<String, RDFNodeType>) -> HashMap<String, String> {
map.into_iter().map(|(x, y)| (x, y.to_string())).collect()
}

fn map_parameters(
parameters: Option<HashMap<String, &PyAny>>,
) -> PyResult<Option<HashMap<String, EagerSolutionMappings>>> {
if let Some(parameters) = parameters {
let mut mapped_parameters = HashMap::new();
for (k, pydf) in parameters {
let mut rdf_node_types = HashMap::new();
let df = polars_df_to_rust_df(pydf)?;
let names = df.get_column_names();
for c in df.columns(names).unwrap() {
let dt = primitive_polars_type_to_literal_type(c.dtype()).unwrap();

let mut rdf_node_type = None;

if dt == xsd::STRING {
let ch = c.utf8().unwrap();
if let Some(s) = ch.first_non_null() {
let f = ch.get(s).unwrap();
if f.starts_with("<") {
rdf_node_type = Some(RDFNodeType::IRI);
}
}
}

if rdf_node_type.is_none() {
rdf_node_type = Some(RDFNodeType::Literal(dt.into_owned()))
}
rdf_node_types.insert(c.name().to_string(), rdf_node_type.unwrap());
}
let m = EagerSolutionMappings {
mappings: df,
rdf_node_types,
};
mapped_parameters.insert(k, m);
}

Ok(Some(mapped_parameters))
} else {
Ok(None)
}
}
2 changes: 1 addition & 1 deletion triplestore/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ pub const XSD_DATE_WITHOUT_TZ_FORMAT: &str = "%Y-%m-%d";

pub const VERB_COL_NAME: &str = "verb";
pub const OBJECT_COL_NAME: &str = "object";
pub const SUBJECT_COL_NAME: &str = "subject";
pub const SUBJECT_COL_NAME: &str = "subject";
5 changes: 3 additions & 2 deletions triplestore/src/export_triples.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::Triplestore;

use crate::constants::{OBJECT_COL_NAME, SUBJECT_COL_NAME};
use crate::conversion::convert_to_string;
use crate::errors::TriplestoreError;
use oxrdf::{Literal, NamedNode, Subject, Term, Triple};
Expand All @@ -9,7 +10,6 @@ use representation::{
literal_iri_to_namednode, RDFNodeType, TripleType, LANG_STRING_LANG_FIELD,
LANG_STRING_VALUE_FIELD,
};
use crate::constants::{OBJECT_COL_NAME, SUBJECT_COL_NAME};

impl Triplestore {
pub fn object_property_triples<F, T>(
Expand Down Expand Up @@ -135,7 +135,8 @@ impl Triplestore {
return Ok(());
}
let mut subject_iterator = df.column(SUBJECT_COL_NAME).unwrap().iter();
let data_as_strings = convert_to_string(df.column(OBJECT_COL_NAME).unwrap());
let data_as_strings =
convert_to_string(df.column(OBJECT_COL_NAME).unwrap());
if let Some(s) = data_as_strings {
let mut data_iterator = s.iter();
for _ in 0..df.height() {
Expand Down
6 changes: 2 additions & 4 deletions triplestore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod rdfs_inferencing;
pub mod sparql;
pub mod triples_read;

use crate::constants::{OBJECT_COL_NAME, SUBJECT_COL_NAME, VERB_COL_NAME};
use crate::errors::TriplestoreError;
use crate::io_funcs::{create_folder_if_not_exists, delete_tmp_parquets_in_caching_folder};
use crate::sparql::lazy_graph_patterns::load_tt::multiple_tt_to_lf;
Expand All @@ -26,16 +27,13 @@ use polars_core::frame::{DataFrame, UniqueKeepStrategy};
use polars_core::utils::concat_df;
use rayon::iter::ParallelIterator;
use rayon::iter::{IntoParallelRefIterator, ParallelDrainRange};
use representation::{
literal_iri_to_namednode, RDFNodeType,
};
use representation::{literal_iri_to_namednode, RDFNodeType};
use std::collections::HashMap;
use std::fs::remove_file;
use std::io;
use std::path::Path;
use std::time::Instant;
use uuid::Uuid;
use crate::constants::{OBJECT_COL_NAME, SUBJECT_COL_NAME, VERB_COL_NAME};

pub struct Triplestore {
deduplicated: bool,
Expand Down
2 changes: 1 addition & 1 deletion triplestore/src/ntriples_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
use super::Triplestore;
use crate::constants::OBJECT_COL_NAME;
use crate::conversion::convert_to_string;
use crate::errors::TriplestoreError;
use oxrdf::NamedNode;
Expand All @@ -32,7 +33,6 @@ use polars_core::POOL;
use polars_utils::contention_pool::LowContentionPool;
use representation::{RDFNodeType, TripleType};
use std::io::Write;
use crate::constants::OBJECT_COL_NAME;

/// Utility to write to `&mut Vec<u8>` buffer
struct StringWrap<'a>(pub &'a mut Vec<u8>);
Expand Down
2 changes: 1 addition & 1 deletion triplestore/src/rdfs_inferencing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ WHERE {

impl Triplestore {
pub fn rdfs_class_inheritance(&mut self) -> Result<(), TriplestoreError> {
self.insert(SUBCLASS_INFERENCING, true)
self.insert(SUBCLASS_INFERENCING, &None, true)
.map_err(|x| TriplestoreError::RDFSClassInheritanceError(x.to_string()))?;
Ok(())
}
Expand Down
Loading

0 comments on commit 1e5f81e

Please sign in to comment.