Skip to content

Commit

Permalink
#678 Dont index unfiltered queries
Browse files Browse the repository at this point in the history
  • Loading branch information
Polleps committed Oct 12, 2023
1 parent 5dc3bee commit 623b0e6
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 81 deletions.
159 changes: 114 additions & 45 deletions lib/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod val_prop_sub_index;
use std::{
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
vec,
};

use tracing::{info, instrument};
Expand All @@ -19,7 +20,10 @@ use crate::{
agents::ForAgent,
atoms::IndexAtom,
commit::CommitResponse,
db::{query_index::NO_VALUE, val_prop_sub_index::find_in_val_prop_sub_index},
db::{
query_index::{requires_query_index, NO_VALUE},
val_prop_sub_index::find_in_val_prop_sub_index,
},
endpoints::{default_endpoints, Endpoint, HandleGetContext},
errors::{AtomicError, AtomicResult},
resources::PropVals,
Expand All @@ -35,8 +39,8 @@ use self::{
remove_atom_from_prop_val_sub_index,
},
query_index::{
check_if_atom_matches_watched_query_filters, query_indexed, update_indexed_member,
IndexIterator, QueryFilter,
check_if_atom_matches_watched_query_filters, query_sorted_indexed, should_include_resource,
update_indexed_member, IndexIterator, QueryFilter,
},
val_prop_sub_index::{add_atom_to_reference_index, remove_atom_from_reference_index},
};
Expand Down Expand Up @@ -209,6 +213,110 @@ impl Db {

Some(Resource::from_propvals(propvals, subject))
}

fn build_index_for_atom(
&self,
atom: &IndexAtom,
query_filter: &QueryFilter,
) -> AtomicResult<()> {
// Get the SortableValue either from the Atom or the Resource.
let sort_val: SortableValue = if let Some(sort) = &query_filter.sort_by {
if &atom.property == sort {
atom.sort_value.clone()
} else {
// Find the sort value in the store
match self.get_value(&atom.subject, sort) {
Ok(val) => val.to_sortable_string(),
// If we try sorting on a value that does not exist,
// we'll use an empty string as the sortable value.
Err(_) => NO_VALUE.to_string(),
}
}
} else {
atom.sort_value.clone()
};

update_indexed_member(self, query_filter, &atom.subject, &sort_val, false)?;
Ok(())
}

fn get_index_iterator_for_query(&self, q: &Query) -> IndexIterator {
match (&q.property, q.value.as_ref()) {
(Some(prop), val) => find_in_prop_val_sub_index(self, prop, val),
(None, None) => self.all_index_atoms(q.include_external),
(None, Some(val)) => find_in_val_prop_sub_index(self, val, None),
}
}

fn query_basic(&self, q: &Query) -> AtomicResult<QueryResult> {
let self_url = self
.get_self_url()
.ok_or("No self_url set, required for Queries")?;

let mut subjects: Vec<String> = vec![];
let mut resources: Vec<Resource> = vec![];
let mut total_count = 0;

let atoms = self.get_index_iterator_for_query(q);

for (i, atom_res) in atoms.enumerate() {
let atom = atom_res?;

if q.offset > i {
continue;
}

if !q.include_external && !atom.subject.starts_with(&self_url) {
continue;
}
if q.limit.is_none() || subjects.len() < q.limit.unwrap() {
if !should_include_resource(q) {
subjects.push(atom.subject.clone());
total_count += 1;
continue;
}

if let Ok(resource) = self.get_resource_extended(&atom.subject, true, &q.for_agent)
{
subjects.push(atom.subject.clone());
resources.push(resource);
}
}

total_count += 1;
}

Ok(QueryResult {
subjects,
resources,
count: total_count,
})
}

fn query_complex(&self, q: &Query) -> AtomicResult<QueryResult> {
let (mut subjects, mut resources, mut total_count) = query_sorted_indexed(self, q)?;
let q_filter: QueryFilter = q.into();

if total_count == 0 && !q_filter.is_watched(self) {
info!(filter = ?q_filter, "Building query index");
let atoms = self.get_index_iterator_for_query(q);
q_filter.watch(self)?;

// Build indexes
for atom in atoms.flatten() {
self.build_index_for_atom(&atom, &q_filter)?;
}

// Query through the new indexes.
(subjects, resources, total_count) = query_sorted_indexed(self, q)?;
}

Ok(QueryResult {
subjects,
resources,
count: total_count,
})
}
}

impl Storelike for Db {
Expand Down Expand Up @@ -467,50 +575,11 @@ impl Storelike for Db {
/// Tries `query_cache`, which you should implement yourself.
#[instrument(skip(self))]
fn query(&self, q: &Query) -> AtomicResult<QueryResult> {
let q_filter: QueryFilter = q.into();
if let Ok(res) = query_indexed(self, q) {
if res.count > 0 || q_filter.is_watched(self) {
// Yay, we have a cache hit!
// We don't have to create the indexes, so we can return early.
return Ok(res);
}
}

// Maybe make this optional?
q_filter.watch(self)?;

info!(filter = ?q_filter, "Building query index");

let atoms: IndexIterator = match (&q.property, q.value.as_ref()) {
(Some(prop), val) => find_in_prop_val_sub_index(self, prop, val),
(None, None) => self.all_index_atoms(q.include_external),
(None, Some(val)) => find_in_val_prop_sub_index(self, val, None),
};

for a in atoms {
let atom = a?;
// Get the SortableValue either from the Atom or the Resource.
let sort_val: SortableValue = if let Some(sort) = &q_filter.sort_by {
if &atom.property == sort {
atom.sort_value
} else {
// Find the sort value in the store
match self.get_value(&atom.subject, sort) {
Ok(val) => val.to_sortable_string(),
// If we try sorting on a value that does not exist,
// we'll use an empty string as the sortable value.
Err(_) => NO_VALUE.to_string(),
}
}
} else {
atom.sort_value
};

update_indexed_member(self, &q_filter, &atom.subject, &sort_val, false)?;
if requires_query_index(q) {
return self.query_complex(q);
}

// Retry the same query!
query_indexed(self, q)
self.query_basic(q)
}

#[instrument(skip(self))]
Expand Down
58 changes: 23 additions & 35 deletions lib/src/db/query_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ pub const NO_VALUE: &str = "";

#[tracing::instrument(skip(store))]
/// Performs a query on the `query_index` Tree, which is a lexicographic sorted list of all hits for QueryFilters.
pub fn query_indexed(store: &Db, q: &Query) -> AtomicResult<QueryResult> {
pub fn query_sorted_indexed(
store: &Db,
q: &Query,
) -> AtomicResult<(Vec<String>, Vec<Resource>, usize)> {
// When there is no explicit start / end value passed, we use the very first and last
// lexicographic characters in existence to make the range practically encompass all values.
let start = if let Some(val) = &q.start_val {
Expand All @@ -97,18 +100,14 @@ pub fn query_indexed(store: &Db, q: &Query) -> AtomicResult<QueryResult> {
};

let mut subjects: Vec<String> = vec![];
let mut resources = Vec::new();
let mut resources: Vec<Resource> = vec![];
let mut count = 0;

let self_url = store
.get_self_url()
.ok_or("No self_url set, required for Queries")?;

let limit = if let Some(limit) = q.limit {
limit
} else {
std::usize::MAX
};
let limit = q.limit.unwrap_or(std::usize::MAX);

for (i, kv) in iter.enumerate() {
// The user's maximum amount of results has not yet been reached
Expand All @@ -118,49 +117,30 @@ pub fn query_indexed(store: &Db, q: &Query) -> AtomicResult<QueryResult> {
if in_selection {
let (k, _v) = kv.map_err(|_e| "Unable to parse query_cached")?;
let (_q_filter, _val, subject) = parse_collection_members_key(&k)?;

// If no external resources should be included, skip this one if it's an external resource
if !q.include_external && !subject.starts_with(&self_url) {
continue;
}

// When an agent is defined, we must perform authorization checks
// WARNING: EXPENSIVE!
// TODO: Make async
if q.include_nested || q.for_agent != ForAgent::Sudo {
match store.get_resource_extended(subject, true, &q.for_agent) {
Ok(resource) => {
resources.push(resource);
subjects.push(subject.into())
}
Err(e) => match &e.error_type {
crate::AtomicErrorType::NotFoundError => {}
crate::AtomicErrorType::UnauthorizedError => {}
_other => {
return Err(format!(
"Error when getting resource in collection: {}",
&e
)
.into());
}
},
if should_include_resource(q) {
if let Ok(resource) = store.get_resource_extended(subject, true, &q.for_agent) {
resources.push(resource);
subjects.push(subject.into());
}
} else {
// If there is no need for nested resources, and no auth checks, we can skip the expensive part!
subjects.push(subject.into())
subjects.push(subject.into());
}
}

// We iterate over every single resource, even if we don't perform any computation on the items.
// This helps with pagination, but it comes at a serious performance cost. We might need to change how this works later on.
// Also, this count does not take into account the `include_external` filter.
count += 1;
// https://github.com/atomicdata-dev/atomic-server/issues/290
count = i + 1;
}

Ok(QueryResult {
count,
resources,
subjects,
})
Ok((subjects, resources, count))
}

/// Checks if the resource will match with a QueryFilter.
Expand Down Expand Up @@ -390,6 +370,14 @@ pub fn parse_collection_members_key(bytes: &[u8]) -> AtomicResult<(QueryFilter,
Ok((q_filter, value, subject))
}

pub fn requires_query_index(query: &Query) -> bool {
query.sort_by.is_some() || query.start_val.is_some() || query.end_val.is_some()
}

pub fn should_include_resource(query: &Query) -> bool {
query.include_nested || query.for_agent != ForAgent::Sudo
}

#[cfg(test)]
pub mod test {
use crate::urls;
Expand Down
2 changes: 1 addition & 1 deletion lib/src/db/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ fn queries() {
assert_eq!(res.resources.len(), limit, "nested resources");

q.sort_by = Some(sort_by.into());
println!("!!!!!!! !!!!!!!! SORT STUFF");
let mut res = store.query(&q).unwrap();
assert!(!res.resources.is_empty(), "resources should be returned");
let mut prev_resource = res.resources[0].clone();
// For one resource, we will change the order by changing its value
let mut resource_changed_order_opt = None;
Expand Down

0 comments on commit 623b0e6

Please sign in to comment.