Skip to content

Commit

Permalink
Refactor and create worker target struct
Browse files Browse the repository at this point in the history
Change-type: minor
  • Loading branch information
pipex committed Dec 8, 2024
1 parent 0939915 commit 939fa5c
Show file tree
Hide file tree
Showing 9 changed files with 300 additions and 15 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ edition = "2021"
description = "An automated job orchestration library to build and execute dynamic workflows"

[dependencies]
json-patch = "2.0.0"
json-patch = "3"
jsonptr = "0.6.0"
matchit = "0.8.4"
serde = { version = "1.0.197" }
serde_json = "1.0.120"
thiserror = "1.0.63"
thiserror = "2"

[dev-dependencies]
tokio = { version = "1.36.0", features = ["rt", "macros", "time"] }
4 changes: 0 additions & 4 deletions src/extract/path/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ impl fmt::Display for PathDeserializationError {
impl std::error::Error for PathDeserializationError {}

/// The kinds of errors that can happen we deserializing into a [`Path`].
///
/// This type is obtained through [`FailedToDeserializePathParams::kind`] or
/// [`FailedToDeserializePathParams::into_kind`] and is useful for building
/// more precise error messages.
#[derive(Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum ErrorKind {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ pub mod error;
pub mod extract;
pub mod system;
pub mod task;
pub mod worker;
13 changes: 12 additions & 1 deletion src/system/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use json_patch::{patch, Patch};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::Value;
use std::fmt::{self, Display};
use std::{
fmt::{self, Display},
ops::Deref,
};

use crate::error::{Error, IntoError};

Expand Down Expand Up @@ -82,3 +85,11 @@ impl System {
Ok(s)
}
}

impl Deref for System {
type Target = Value;

fn deref(&self) -> &Self::Target {
&self.state
}
}
5 changes: 1 addition & 4 deletions src/task/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
mod boxed;
mod context;
mod domain;
mod effect;
mod handler;
mod intent;
mod job;
mod result;

Expand All @@ -16,10 +14,8 @@ use crate::error::{Error, IntoError};
use crate::system::System;

pub use context::*;
pub use domain::*;
pub use effect::*;
pub use handler::*;
pub use intent::*;
pub use job::*;
pub(crate) use result::*;

Expand Down Expand Up @@ -133,6 +129,7 @@ impl<S> Task<S> {

/// Run every action in the task sequentially and return the
/// aggregate changes.
/// TODO: this should probably only have crate visibility
pub fn dry_run(self, system: &System) -> Result<Patch> {
match self {
Self::Atom {
Expand Down
2 changes: 2 additions & 0 deletions src/task/domain.rs → src/worker/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,13 @@ impl<S> Domain<S> {

#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;

use crate::extract::{Target, Update};
use crate::path::PathArgs;
use crate::task::*;
use crate::worker::*;

fn plus_one(mut counter: Update<i32>, tgt: Target<i32>) -> Update<i32> {
if *counter < *tgt {
Expand Down
7 changes: 3 additions & 4 deletions src/task/intent.rs → src/worker/intent.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use super::handler::Handler;
use super::job::Job;
use crate::task::{Handler, Job};
use std::cmp::Ordering;

#[derive(PartialEq, PartialOrd, Eq, Ord, Debug, Clone)]
pub(crate) enum Operation {
pub enum Operation {
Create,
Update,
Delete,
Expand Down Expand Up @@ -39,7 +38,7 @@ impl<S> Intent<S> {
}
}

pub(crate) fn with_operation(self, operation: Operation) -> Self {
fn with_operation(self, operation: Operation) -> Self {
let Intent { priority, job, .. } = self;
Intent {
operation,
Expand Down
7 changes: 7 additions & 0 deletions src/worker/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod domain;
mod intent;
mod target;

pub use domain::*;
pub use intent::*;
pub use target::*;
272 changes: 272 additions & 0 deletions src/worker/target.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
use json_patch::{diff, Patch, PatchOperation, RemoveOperation, ReplaceOperation};
use jsonptr::Pointer;
use serde::Serialize;
use serde_json::Value;
use std::{
cmp::Ordering,
collections::{BTreeSet, LinkedList},
fmt::{self, Display},
ops::Deref,
};

pub(crate) struct Distance(BTreeSet<Operation>);

impl Distance {
fn new() -> Self {
Distance(BTreeSet::new())
}

fn insert(&mut self, o: Operation) {
self.0.insert(o);
}

pub(crate) fn iter(&self) -> impl Iterator<Item = &Operation> {
self.0.iter()
}

fn add_removes_for_children(&mut self, path: &Pointer, value: &Value) {
let mut queue = LinkedList::new();
queue.push_back((path.to_buf(), value));

while let Some((path, value)) = queue.pop_front() {
if value.is_object() {
let obj = value.as_object().unwrap();
for (k, v) in obj.iter() {
let path = path.concat(Pointer::parse(&format!("/{}", k)).unwrap());
// Insert a remove operation for each child
self.insert(Operation::from(PatchOperation::Remove(RemoveOperation {
path: path.clone(),
})));

// Append the value to the queue
queue.push_back((path, v));
}
}

if value.is_array() {
let obj = value.as_array().unwrap();
for (k, v) in obj.iter().enumerate() {
let path = path.concat(Pointer::parse(&format!("/{}", k)).unwrap());

// Insert a remove operation for each child
self.insert(Operation::from(PatchOperation::Remove(RemoveOperation {
path: path.clone(),
})));

// Append the value to the queue
queue.push_back((path, v));
}
}
}
}
}

pub struct Target(Value);

#[derive(Debug)]
pub enum TargetError {
SerializationFailed(serde_json::error::Error),
}

impl std::error::Error for TargetError {}

impl Display for TargetError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TargetError::SerializationFailed(err) => err.fmt(f),
}
}
}

#[derive(PartialEq, Eq, Debug, Clone)]
pub(crate) struct Operation(PatchOperation);

impl PartialOrd for Operation {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for Operation {
fn cmp(&self, other: &Self) -> Ordering {
let thispath = self.0.path();
let otherpath = other.0.path();

// Order operations by path length
thispath
.count()
.cmp(&otherpath.count())
.then(thispath.cmp(otherpath))
}
}

impl From<PatchOperation> for Operation {
fn from(op: PatchOperation) -> Operation {
Operation(op)
}
}

impl Target {
pub fn new(target: impl Into<Value>) -> Self {
Self(target.into())
}

pub fn from<S: Serialize>(state: S) -> Result<Self, TargetError> {
let state = serde_json::to_value(state).map_err(TargetError::SerializationFailed)?;
Ok(Target::new(state))
}

#[allow(dead_code)]
/// Calculate the distance between some state and the self target
///
/// The distance is the list of operations required in order to convert the
/// the state into the target. The distance also includes alternate paths
///
/// For instance if the target creates a new value at the
/// pointer `/a/b/c`, that new value could come from a create operation on the
/// given pointer, but also from an update operation on `/a/b` or `/a` or `/`.
///
/// Similarly, in order to delete a pointer `/a/b`, just removing the path could be enough, but
/// it might be necessary to remove every child of `/a/b` before being able to remove the
/// path.
///
/// The distance encodes all the possible operations that can be used to move
/// between two states
pub(crate) fn distance(&self, state: &Value) -> Distance {
let mut distance = Distance::new();

// calculate differences between the system root and
// the target
let Patch(changes) = diff(state, self);
for op in changes {
// For every operation on the list of changes
let path = op.path();

let mut parent = path;

// get all paths up to the root
while let Some(newparent) = parent.parent() {
// get the target at the parent to use as value
// no matter the operation, the parent of the target shoul
// always exist. If it doesn't there is a bug (probbly in jsonptr)
let value = newparent.resolve(&self.0).unwrap_or_else(|e| {
panic!(
"[BUG] Path `{}` should be resolvable on the target, but got error: {}",
newparent, e
)
});

// Insert a replace operation for each one
distance.insert(Operation::from(PatchOperation::Replace(ReplaceOperation {
path: newparent.to_buf(),
value: value.clone(),
})));

parent = newparent;
}

// for every delete operation '/a/b/c', add child
// nodes of the deleted path with 'remove' operation
if let PatchOperation::Remove(_) = op {
// By definition of the remove operation the path should be
// resolvable on the left side of the diff
let value = path.resolve(state).unwrap_or_else(|e| {
panic!(
"[BUG] Path `{}` should be resolvable on the state, but got error: {}",
path, e
)
});

//
distance.add_removes_for_children(path, value);
}

// Finally insert the actual operation
distance.insert(Operation::from(op));
}

distance
}
}

impl Deref for Target {
type Target = Value;

fn deref(&self) -> &Self::Target {
&self.0
}
}

#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;

fn distance_eq(src: Value, tgt: Value, result: Vec<Value>) {
let target = Target::new(tgt);
let distance = target.distance(&src);

// Serialize results to make comparison easier
let ops: Vec<Value> = distance
.iter()
.map(|Operation(o)| serde_json::to_value(o).unwrap())
.collect();

assert_eq!(ops, result)
}

#[test]
fn it_calculates_possible_changes_to_target() {
distance_eq(
json!({"a": 1, "b": "one", "c": {"k": "v"}}),
json!({"a": 2, "b": "one", "c": {}}),
vec![
json!({"op": "replace", "path": "", "value": {"a": 2, "b": "one", "c": {}}}),
json!({"op": "replace", "path": "/a", "value": 2}),
json!({"op": "replace", "path": "/c", "value": {}}),
json!({"op": "remove", "path": "/c/k"}),
],
);
distance_eq(
json!({"a": 1, "b": "one", "c": {"k": "v"}}),
json!({"a": 2}),
vec![
json!({"op": "replace", "path": "", "value": {"a": 2}}),
json!({"op": "replace", "path": "/a", "value": 2}),
json!({"op": "remove", "path": "/b"}),
json!({"op": "remove", "path": "/c"}),
json!({"op": "remove", "path": "/c/k"}),
],
);
distance_eq(
json!({"a": 1, "b": "one", "c": {"k": "v"}}),
json!({"a": 2, "b": "two", "c": {"k": "v"}}),
vec![
json!({"op": "replace", "path": "", "value": {"a": 2, "b": "two", "c": {"k": "v"}}}),
json!({"op": "replace", "path": "/a", "value": 2}),
json!({"op": "replace", "path": "/b", "value": "two"}),
],
);
distance_eq(
json!({"a": {"b": {"c": {"d": "e"}}}}),
json!({"a": {"b": {}}}),
vec![
json!({"op": "replace", "path": "", "value": {"a": {"b": {}}}}),
json!({"op": "replace", "path": "/a", "value": {"b": {}}}),
json!({"op": "replace", "path": "/a/b", "value": {}}),
json!({"op": "remove", "path": "/a/b/c"}),
json!({"op": "remove", "path": "/a/b/c/d"}),
],
);
}

#[test]
fn it_allows_filtering_some_paths_from_calculation() {
// TODO
// we not only need to calculate the operations to the target but we need to be able
// to ignore some elements from the difference. This is so we can use the same structure
// for both current and target state. For instance, there might be a runtime value that
// encodes the creation date of an entity, but we may not really want to compare based on
// creation date when calculating the distance
}
}

0 comments on commit 939fa5c

Please sign in to comment.