Skip to content

Commit

Permalink
Add first implementation of Domain
Browse files Browse the repository at this point in the history
The domain defines what jobs are applicable for different routes in the
state and to which operations.

Change-type: minor
  • Loading branch information
pipex committed Nov 4, 2024
1 parent 4c4a006 commit 83f49c5
Show file tree
Hide file tree
Showing 8 changed files with 394 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ description = "An automated job orchestration library to build and execute dynam
[dependencies]
json-patch = "2.0.0"
jsonptr = "0.6.0"
matchit = "0.8.4"
serde = { version = "1.0.197" }
serde_json = "1.0.120"
thiserror = "1.0.63"
Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ async fn main() {
let agent = Worker::new()
// The plus_one job is applicable to an UPDATE operation
// on any given counter
.job("/counters/:name", update(plus_one))
.job("/counters/{name}", update(plus_one))
// Initialize two counters "a" and "b" to 0
.with_state(State {counters: HashMap::from([("a".to_string(), 0), ("b".to_string(), 0)])})

Expand Down Expand Up @@ -194,7 +194,7 @@ As programmers, we want to be able to build code by composing simpler behaviors
```rust
use gustav::system::Context;

fn plus_two(counter: Update<State, i32>, tgt: Target<State, i32>) -> Vec<Task<i32>> {
fn plus_two(counter: Update<State, i32>, tgt: Target<State, i32>, Path(name): Path<String>) -> Vec<Task<i32>> {
if *tgt - *counter < 2 {
// Returning an empty result tells the planner
// the task is not applicable to reach the target
Expand All @@ -204,17 +204,17 @@ fn plus_two(counter: Update<State, i32>, tgt: Target<State, i32>) -> Vec<Task<i3
// A compound job returns a list of tasks that need to be executed
// to achieve a certain goal
vec![
plus_one.into_task(Context::from_target(*tgt)),
plus_one.into_task(Context::from_target(*tgt)),
plus_one.into_task(Context::new().with_target(*tgt).with_arg("name", &name)),
plus_one.into_task(Context::new().with_target(*tgt).with_arg("name", &name)),
]
}

#[tokio::main]
async fn main() {
// build our agent using the plus one job
let agent = Worker::new()
.job("/counters/:name", update(plus_one))
.job("/counters/:name", update(plus_two))
.job("/counters/{name}", update(plus_one))
.job("/counters/{name}", update(plus_two))
// Initialize two counters "a" and "b" to 0
.with_state(State {counters: HashMap::from([("a".to_string(), 0), ("b".to_string(), 0)])})

Expand Down
17 changes: 17 additions & 0 deletions src/path.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use jsonptr::Pointer;
use std::fmt::Display;
use std::sync::Arc;

#[derive(Clone, Default, PartialEq, Debug)]
pub struct Path(&'static Pointer);
Expand Down Expand Up @@ -32,6 +33,22 @@ impl AsRef<Pointer> for Path {
}
}

// Structure to store path arguments when matching
// against a lens
#[derive(Clone)]
pub(crate) struct PathArgs(pub Vec<(Arc<str>, String)>);

impl PathArgs {
pub fn new(params: matchit::Params) -> Self {
let params: Vec<(Arc<str>, String)> = params
.iter()
.map(|(k, v)| (Arc::from(k), String::from(v)))
.collect();

PathArgs(params)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
8 changes: 4 additions & 4 deletions src/task/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl<S> BoxedIntoTask<S> {
}))
}

pub fn into_task(self, id: String, context: Context<S>) -> Task<S> {
pub fn into_task(self, id: &str, context: Context<S>) -> Task<S> {
self.0.into_task(id, context)
}
}
Expand All @@ -43,12 +43,12 @@ impl<S> Clone for BoxedIntoTask<S> {
trait ErasedIntoTask<S> {
fn clone_box(&self) -> Box<dyn ErasedIntoTask<S>>;

fn into_task(self: Box<Self>, id: String, context: Context<S>) -> Task<S>;
fn into_task(self: Box<Self>, id: &str, context: Context<S>) -> Task<S>;
}

struct MakeIntoTask<H, S> {
pub(crate) handler: H,
pub(crate) into_task: fn(String, H, Context<S>) -> Task<S>,
pub(crate) into_task: fn(&str, H, Context<S>) -> Task<S>,
}

impl<S, H> ErasedIntoTask<S> for MakeIntoTask<H, S>
Expand All @@ -63,7 +63,7 @@ where
})
}

fn into_task(self: Box<Self>, id: String, context: Context<S>) -> Task<S> {
fn into_task(self: Box<Self>, id: &str, context: Context<S>) -> Task<S> {
(self.into_task)(id, self.handler, context)
}
}
222 changes: 222 additions & 0 deletions src/task/domain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
use matchit::Router;
use std::collections::{BTreeSet, HashMap};

use super::intent::{Intent, Operation};
use crate::path::PathArgs;

#[derive(Default)]
pub struct Domain<S> {
// The router stores a list of intents matching a route
router: Router<BTreeSet<Intent<S>>>,
// The index stores the reverse relation of job id to a route
index: HashMap<String, String>,
}

// Placeholder string to replace escaped parameters
// in a route
const PLACEHOLDER: &str = "__gustav_placeholder__";

impl<S> Domain<S> {
pub fn new() -> Self {
Self {
router: Router::new(),
index: HashMap::new(),
}
}

pub fn job(self, route: &str, intent: Intent<S>) -> Self {
let Self {
mut router,
mut index,
} = self;

let job_id = intent.job.id().clone();
let operation = intent.operation.clone();

// Remove the route from the router if it exists or create
// a new set if it doesn't
let mut queue = router.remove(route).unwrap_or_default();

// Do not allow the same job to be assigned to
// multiple operations. This could cause problems at
// runtime
if queue.iter().any(|i| i.job.id() == &job_id) {
panic!(
"cannot assign job '{}' to operation '{:?}', a previous assignment exists",
job_id, operation
)
}

// Insert the route to the queue
queue.insert(intent);

// (re)insert the queue to the router, we should not have
// conflicts here
router.insert(route, queue).expect("route should be valid");

// Only allow one assignment of a job to a route
if let Some(oldroute) = index.insert(job_id.clone(), String::from(route)) {
panic!(
"cannot assign job '{}' to route '{}', a previous assignment exists to '{}'",
job_id, route, oldroute
)
}

Self { router, index }
}

// This allows to find the path that a task relates to from the
// job it belongs to and the arguments given by the user as part
// of the context.
//
// This implementation is still missing a ton of edge cases but should
// work as a proof of concept
//
// This will no longer be dead code when the planner
// is implemented
#[allow(dead_code)]
pub(crate) fn get_path(&self, job_id: &String, args: PathArgs) -> Option<String> {
if let Some(route) = self.index.get(job_id) {
let mut route = route.clone();
let placeholder = PLACEHOLDER.to_string();

// for each key in path args look for a parameter
// in the route and replace it by the value
for (k, v) in args.0.iter() {
// look for double bracket versions first and replace
// by a placeholder
let escaped = format!("{{{{{}}}}}", k);
route = route.replace(&escaped, &placeholder);

let param = format!("{{{}}}", k);
route = route.replace(&param, v);

// Replace placeholder for its unescaped version
route = route.replace(&placeholder, &escaped);
}

// TODO: for each escaped value `{{param}}` we should replace it
// with `{param}`
// TODO: what about wildcards? `{*param}`

// QUESTION: Should be fail if there are still parameters?
return Some(route);
}

None
}

/// Find matches for the given path in the domain
/// the matches are sorted in order that they should be
/// tested
///
// This will no longer be dead code when the planner
// is implemented
#[allow(dead_code)]
pub(crate) fn at(&self, path: &str) -> Option<(PathArgs, impl Iterator<Item = &Intent<S>>)> {
self.router
.at(path)
.map(|matched| {
(
PathArgs::new(matched.params),
matched
.value
.iter()
.filter(|i| i.operation != Operation::None),
)
})
.ok()
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use crate::extract::{Target, Update};
use crate::path::PathArgs;
use crate::system::Context;
use crate::task::*;

fn plus_one(mut counter: Update<i32>, tgt: Target<i32>) -> Update<i32> {
if *counter < *tgt {
*counter += 1;
}

// Update implements IntoResult
counter
}

fn plus_two(counter: Update<i32>, tgt: Target<i32>) -> Vec<Task<i32>> {
if *tgt - *counter < 2 {
// Returning an empty result tells the planner
// the task is not applicable to reach the target
return vec![];
}

vec![
plus_one.into_task(Context::from_target(*tgt)),
plus_one.into_task(Context::from_target(*tgt)),
]
}

#[test]
fn it_finds_jobs_ordered_by_degree() {
let domain = Domain::new()
.job("/counters/{counter}", update(plus_one))
.job("/counters/{counter}", update(plus_two));

let jobs: Vec<&String> = domain
.at("/counters/{counter}")
.map(|(_, iter)| iter.map(|i| i.job.id()).collect())
.unwrap();

// It should return compound jobs first
assert_eq!(
jobs,
vec![plus_two.into_job().id(), plus_one.into_job().id()]
);
}

#[test]
fn it_ignores_none_jobs() {
let domain = Domain::new()
.job("/counters/{counter}", none(plus_one))
.job("/counters/{counter}", update(plus_two));

let jobs: Vec<&String> = domain
.at("/counters/{counter}")
.map(|(_, iter)| iter.map(|i| i.job.id()).collect())
.unwrap();

// It should not return jobs for None operations
assert_eq!(jobs, vec![plus_two.into_job().id()]);
}

#[test]
fn it_constructs_a_path_given_arguments() {
let domain = Domain::new()
.job("/counters/{counter}", none(plus_one))
.job("/counters/{counter}", update(plus_two));

let args = PathArgs(vec![(Arc::from("counter"), String::from("one"))]);
let path = domain.get_path(plus_one.into_job().id(), args).unwrap();
assert_eq!(path, String::from("/counters/one"))
}

#[test]
#[should_panic]
fn it_fails_if_assigning_the_same_job_to_multiple_ops() {
Domain::new()
.job("/counters/{counter}", update(plus_one))
.job("/counters/{counter}", update(plus_one));
}

#[test]
#[should_panic]
fn it_fails_if_assigning_the_same_job_to_multiple_routes() {
Domain::new()
.job("/counters/{counter}", update(plus_one))
.job("/numbers/{counter}", create(plus_one));
}
}
Loading

0 comments on commit 83f49c5

Please sign in to comment.