Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jscatena/vector clock types #42

Merged
merged 10 commits into from
Jul 8, 2024
32 changes: 16 additions & 16 deletions src/codec/meta/actor_settings.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use ecdsa::signature::rand_core::CryptoRngCore;
use futures::{AsyncWrite, AsyncWriteExt};
use winnow::binary::le_u8;
use winnow::token::take;
use winnow::Parser;
use winnow::{binary::le_u8, token::take, Parser};

use crate::codec::crypto::{
AccessKey, AsymLockedAccessKey, AsymLockedAccessKeyError, SigningKey, VerifyingKey,
use crate::codec::{
crypto::{AccessKey, AsymLockedAccessKey, AsymLockedAccessKeyError, SigningKey, VerifyingKey},
header::AccessMask,
meta::{UserAgent, VectorClockActor, VectorClockActorSnapshot},
ParserResult, Stream,
};
use crate::codec::header::AccessMask;
use crate::codec::meta::{UserAgent, VectorClock, VectorClockSnapshot};
use crate::codec::{ParserResult, Stream};

use super::ActorId;

const KEY_PRESENT_BIT: u8 = 0b0000_0001;

#[derive(Clone, Debug)]
pub struct ActorSettings {
verifying_key: VerifyingKey,
vector_clock: VectorClock,
vector_clock: VectorClockActor,
jscatena88 marked this conversation as resolved.
Show resolved Hide resolved

access_mask: AccessMask,
filesystem_key: Option<AsymLockedAccessKey>,
Expand Down Expand Up @@ -80,7 +80,7 @@ impl ActorSettings {
let mut written_bytes = 0;

written_bytes += self.verifying_key.encode(writer).await?;
written_bytes += self.vector_clock.encode(writer).await?;
written_bytes += self.vector_clock.to_snapshot().encode(writer).await?;
written_bytes += self.access_mask.encode(writer).await?;
written_bytes += self.user_agent().encode(writer).await?;

Expand Down Expand Up @@ -176,8 +176,8 @@ impl ActorSettings {
Ok(Some(open_key))
}

pub fn new(verifying_key: VerifyingKey, access_mask: AccessMask) -> Self {
let vector_clock = VectorClock::initialize();
pub fn new(verifying_key: VerifyingKey, access_mask: AccessMask, actor_id: ActorId) -> Self {
let vector_clock = VectorClockActor::initialize(actor_id);
let user_agent = UserAgent::current();

Self {
Expand All @@ -195,7 +195,7 @@ impl ActorSettings {

pub fn parse(input: Stream) -> ParserResult<Self> {
let (input, verifying_key) = VerifyingKey::parse(input)?;
let (input, vector_clock) = VectorClock::parse(input)?;
let (input, vector_clock) = VectorClockActor::parse(input)?;
let (input, access_mask) = AccessMask::parse(input)?;
let (input, user_agent) = UserAgent::parse(input)?;

Expand All @@ -220,7 +220,7 @@ impl ActorSettings {

pub const fn size() -> usize {
VerifyingKey::size()
+ VectorClock::size()
+ VectorClockActorSnapshot::size()
+ AccessMask::size()
+ UserAgent::size()
+ 3 * (1 + AsymLockedAccessKey::size())
Expand All @@ -234,8 +234,8 @@ impl ActorSettings {
self.user_agent.clone()
}

pub fn vector_clock(&self) -> VectorClockSnapshot {
VectorClockSnapshot::from(&self.vector_clock)
pub fn vector_clock(&self) -> VectorClockActorSnapshot {
(&self.vector_clock).into()
}

pub fn verifying_key(&self) -> VerifyingKey {
Expand Down
8 changes: 7 additions & 1 deletion src/codec/meta/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,10 @@ pub use journal_checkpoint::JournalCheckpoint;
pub use meta_key::MetaKey;
pub use permanent_id::PermanentId;
pub use user_agent::UserAgent;
pub use vector_clock::{VectorClock, VectorClockSnapshot};
pub use vector_clock::{
Actor as VectorClockActor, ActorSnapshot as VectorClockActorSnapshot,
Filesystem as VectorClockFilesystem,
FilesystemActorSnapshot as VectorClockFilesystemActorSnapshot,
FilesystemSnapshot as VectorClockFilesystemSnapshot, Node as VectorClockNode,
NodeActorSnapshot as VectorClockNodeActorSnapshot, NodeSnapshot as VectorClockNodeSnapshot,
};
87 changes: 87 additions & 0 deletions src/codec/meta/vector_clock/actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use super::clock_inner::{ClockInner, ClockInnerSnapshot};
use crate::codec::{ActorId, ParserResult, Stream};

use futures::AsyncWrite;

#[derive(Debug, PartialEq, Clone)]
pub struct Actor {
id: ActorId,
clock: ClockInner,
}

impl Actor {
fn new(id: ActorId, clock: ClockInner) -> Self {
Self { id, clock }
}

pub fn initialize(actor_id: ActorId) -> Self {
Self::new(actor_id, ClockInner::initialize())
}

pub fn to_snapshot(&self) -> ActorSnapshot {
self.into()
}

pub async fn encode<W: AsyncWrite + Unpin + Send>(
&self,
writer: &mut W,
) -> std::io::Result<usize> {
ActorSnapshot::from(self).encode(writer).await
}

pub fn parse(input: Stream) -> ParserResult<Self> {
let (input, snapshot) = ActorSnapshot::parse(input)?;
Ok((input, Self::new(snapshot.id, snapshot.clock.into())))
}
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct ActorSnapshot {
id: ActorId,
clock: ClockInnerSnapshot,
}

impl ActorSnapshot {
fn new(id: ActorId, clock: ClockInnerSnapshot) -> Self {
Self { id, clock }
}

pub const fn size() -> usize {
ActorId::size() + ClockInnerSnapshot::size()
}

pub async fn encode<W: AsyncWrite + Unpin + Send>(
&self,
writer: &mut W,
) -> std::io::Result<usize> {
self.id.encode(writer).await?;
self.clock.encode(writer).await
}

pub fn parse(input: Stream) -> ParserResult<Self> {
let (input, id) = ActorId::parse(input)?;
let (input, clock) = ClockInnerSnapshot::parse(input)?;
Ok((input, Self::new(id, clock)))
}
}

impl PartialOrd for ActorSnapshot {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ActorSnapshot {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.id.cmp(&other.id).then(self.clock.cmp(&other.clock))
}
}

impl From<&Actor> for ActorSnapshot {
fn from(value: &Actor) -> Self {
Self {
id: value.id,
clock: (&value.clock).into(),
}
}
}
197 changes: 197 additions & 0 deletions src/codec/meta/vector_clock/clock_inner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
use futures::{AsyncWrite, AsyncWriteExt};
use std::{
cmp::PartialEq,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use winnow::{binary::le_u64, Parser};

use crate::codec::{ParserResult, Stream};

/// Vector clocks are used as monotonic clocks for a particular actor or resource within the
/// filesystem and is used for providing strict ordering of events. The internal value is
/// initialized to a random value when a new one is initialized.
///
/// Internally this uses an atomic 64 unsigned integer for the ID, wrapping is an allowed behavior
/// and must be handled by all consumers. We consider a roll over valid if the last known ID was
/// within 262,144 (2^18) ticks of rolling over.

#[derive(Debug, Clone)]
pub struct ClockInner(Arc<AtomicU64>);

impl ClockInner {
pub fn initialize() -> Self {
// TODO: make this actually initialize to a random value as the docs above indicate
Self::from(ClockInnerSnapshot::from(0))
}

pub fn to_snapshot(&self) -> ClockInnerSnapshot {
ClockInnerSnapshot::from(self)
}
}

impl From<ClockInnerSnapshot> for ClockInner {
fn from(val: ClockInnerSnapshot) -> Self {
Self(Arc::new(AtomicU64::new(val.0)))
}
}

impl PartialEq for ClockInner {
fn eq(&self, other: &Self) -> bool {
self.0
.load(Ordering::Relaxed)
.eq(&other.0.load(Ordering::Relaxed))
}
}

const WRAP_THRESHOLD: u64 = 2 ^ 18;

/// A snapshot of a [`VectorClock`] at a specific value
///
/// These are what get stored to record the state of a vector
/// during specific operations
///
/// # Wrapping Behavior
/// These must functionally monotonically increase, but if we overflow
/// the underlying value we will wrap around. This is handled by
/// comparing the values with a threshold to determine if the
/// wrapped value is greater than the non-wrapped value.
/// The threshold is 2^18, or 262,144.
/// [`PartialOrd`] and [`Ord`] are implemented to handle this.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct ClockInnerSnapshot(pub(super) u64);

impl ClockInnerSnapshot {
pub async fn encode<W: AsyncWrite + Unpin + Send>(
&self,
writer: &mut W,
) -> std::io::Result<usize> {
let clock_bytes = self.0.to_le_bytes();

writer.write_all(&clock_bytes).await?;

Ok(clock_bytes.len())
}

pub fn parse(input: Stream) -> ParserResult<Self> {
let (input, value) = le_u64.parse_peek(input)?;
Ok((input, Self(value)))
}

pub const fn size() -> usize {
8
}
}

impl From<&ClockInner> for ClockInnerSnapshot {
fn from(value: &ClockInner) -> Self {
Self(value.0.load(Ordering::Relaxed))
}
}

impl From<u64> for ClockInnerSnapshot {
fn from(value: u64) -> Self {
Self(value)
}
}

impl PartialOrd for ClockInnerSnapshot {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ClockInnerSnapshot {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
if self.0 < WRAP_THRESHOLD || other.0 < WRAP_THRESHOLD {
sstelfox marked this conversation as resolved.
Show resolved Hide resolved
self.0
.wrapping_add(WRAP_THRESHOLD)
.cmp(&other.0.wrapping_add(WRAP_THRESHOLD))
} else {
self.0.cmp(&other.0)
}
}
}

#[cfg(test)]
mod test {
use super::*;
use winnow::Partial;

#[cfg(target_arch = "wasm32")]
use wasm_bindgen_test::*;

/// Test PartialOrd implementation
#[test]
fn test_partial_ord() {
// Basic ordering
assert!(ClockInnerSnapshot(1) < ClockInnerSnapshot(2));
assert!(ClockInnerSnapshot(2) > ClockInnerSnapshot(1));
assert!(ClockInnerSnapshot(1) == ClockInnerSnapshot(1));

// Wrapping
assert!(ClockInnerSnapshot(u64::MAX) < ClockInnerSnapshot(0));
assert!(ClockInnerSnapshot(0) > ClockInnerSnapshot(u64::MAX));
assert!(ClockInnerSnapshot(u64::MAX) < ClockInnerSnapshot(WRAP_THRESHOLD - 1));
assert!(ClockInnerSnapshot(u64::MAX) > ClockInnerSnapshot(WRAP_THRESHOLD));
}

/// Test Ord implementation
#[test]
fn test_ord() {
// Basic ordering
assert_eq!(
ClockInnerSnapshot(1).cmp(&ClockInnerSnapshot(2)),
std::cmp::Ordering::Less
);
assert_eq!(
ClockInnerSnapshot(2).cmp(&ClockInnerSnapshot(1)),
std::cmp::Ordering::Greater
);
assert_eq!(
ClockInnerSnapshot(1).cmp(&ClockInnerSnapshot(1)),
std::cmp::Ordering::Equal
);

// Wrapping
assert_eq!(
ClockInnerSnapshot(u64::MAX).cmp(&ClockInnerSnapshot(0)),
std::cmp::Ordering::Less
);
assert_eq!(
ClockInnerSnapshot(0).cmp(&ClockInnerSnapshot(u64::MAX)),
std::cmp::Ordering::Greater
);
assert_eq!(
ClockInnerSnapshot(u64::MAX).cmp(&ClockInnerSnapshot(WRAP_THRESHOLD - 1)),
std::cmp::Ordering::Less
);
assert_eq!(
ClockInnerSnapshot(u64::MAX).cmp(&ClockInnerSnapshot(WRAP_THRESHOLD)),
std::cmp::Ordering::Greater
);
}

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test(async))]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
async fn test_user_agent_roundtrip() {
let checkpoint = ClockInner::initialize();

let mut buffer = Vec::with_capacity(ClockInnerSnapshot::size());
checkpoint
.to_snapshot()
.encode(&mut buffer)
.await
.expect("encoding success");

let partial = Partial::new(buffer.as_slice());
let (remaining, parsed) = ClockInnerSnapshot::parse(partial).expect("round trip");

let parsed = ClockInner::from(parsed);

assert!(remaining.is_empty());
assert_eq!(checkpoint, parsed);
}
}
Loading
Loading