Skip to content

Commit

Permalink
chore: Remove EventMetadata type (#490)
Browse files Browse the repository at this point in the history
* chore: Use references for Cids in more places

* chore: Remove EventMetadata type entirely
  • Loading branch information
stbrody authored Aug 19, 2024
1 parent e2fcf71 commit 7c7a7b7
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 115 deletions.
22 changes: 11 additions & 11 deletions api/src/server/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,36 @@ where
{
match event {
unvalidated::Event::Time(time_event) => {
let init_payload = get_init_event_payload_from_store(&time_event.id(), store).await?;
event_id_from_init_payload(event_cid, network, time_event.id(), &init_payload)
let init_payload = get_init_event_payload_from_store(time_event.id(), store).await?;
event_id_from_init_payload(&event_cid, network, time_event.id(), &init_payload)
}
unvalidated::Event::Signed(signed_event) => {
let payload = signed_event.payload();

match payload {
unvalidated::Payload::Init(init_payload) => event_id_from_init_payload(
event_cid,
&event_cid,
network,
signed_event.envelope_cid(),
init_payload,
),
unvalidated::Payload::Data(payload) => {
let init_cid = *payload.id();
let init_payload = get_init_event_payload_from_store(&init_cid, store).await?;
event_id_from_init_payload(event_cid, network, init_cid, &init_payload)
let init_cid = payload.id();
let init_payload = get_init_event_payload_from_store(init_cid, store).await?;
event_id_from_init_payload(&event_cid, network, init_cid, &init_payload)
}
}
}
unvalidated::Event::Unsigned(payload) => {
event_id_from_init_payload(event_cid, network, event_cid, &payload)
event_id_from_init_payload(&event_cid, network, &event_cid, &payload)
}
}
}

fn event_id_from_init_payload(
event_cid: Cid,
event_cid: &Cid,
network: Network,
init_cid: Cid,
init_cid: &Cid,
init_payload: &unvalidated::init::Payload<Ipld>,
) -> Result<EventId> {
let controller = init_payload
Expand All @@ -76,8 +76,8 @@ fn event_id_from_init_payload(
init_payload.header().sep(),
init_payload.header().model(),
controller,
&init_cid,
&event_cid,
init_cid,
event_cid,
))
}

Expand Down
29 changes: 21 additions & 8 deletions event/src/unvalidated/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,24 @@ where
}

/// Returns the prev CID (or None if the event is an init event)
pub fn prev(&self) -> Option<Cid> {
pub fn prev(&self) -> Option<&Cid> {
match self {
Event::Time(t) => Some(t.prev()),
Event::Signed(event) => match event.payload() {
Payload::Data(d) => Some(*d.prev()),
Payload::Data(d) => Some(d.prev()),
Payload::Init(_) => None,
},
Event::Unsigned(_) => None,
}
}

/// Returns the 'id' field of the event, which is the Cid of the stream's init event.
/// If this event *is* the init event, then it doesn't know its own Cid and returns None.
pub fn id(&self) -> Option<&Cid> {
match self {
Event::Time(t) => Some(t.id()),
Event::Signed(event) => match event.payload() {
Payload::Data(d) => Some(d.id()),
Payload::Init(_) => None,
},
Event::Unsigned(_) => None,
Expand Down Expand Up @@ -277,18 +290,18 @@ impl TimeEvent {
}

/// Get the id
pub fn id(&self) -> Cid {
self.event.id
pub fn id(&self) -> &Cid {
&self.event.id
}

/// Get the prev
pub fn prev(&self) -> Cid {
self.event.prev
pub fn prev(&self) -> &Cid {
&self.event.prev
}

/// Get the proof
pub fn proof(&self) -> Cid {
self.event.proof
pub fn proof(&self) -> &Cid {
&self.event.proof
}

/// Get the path
Expand Down
12 changes: 6 additions & 6 deletions event/src/unvalidated/signed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,18 @@ impl<D: serde::Serialize> Event<D> {
}

/// Get the CID of the signature envelope
pub fn envelope_cid(&self) -> Cid {
self.envelope_cid
pub fn envelope_cid(&self) -> &Cid {
&self.envelope_cid
}

/// Get the CID of the payload
pub fn payload_cid(&self) -> Cid {
self.payload_cid
pub fn payload_cid(&self) -> &Cid {
&self.payload_cid
}

/// Get the CID of the capability
pub fn capability_cid(&self) -> Option<Cid> {
self.capability.as_ref().map(|c| c.0)
pub fn capability_cid(&self) -> Option<&Cid> {
self.capability.as_ref().map(|c| &c.0)
}

/// Encodes the full signed event into a CAR file.
Expand Down
14 changes: 7 additions & 7 deletions service/src/event/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl OrderEvents {
let mut new_cids: HashMap<Cid, bool> =
HashMap::from_iter(candidate_events.into_iter().map(|mut e| {
// all init events are deliverable so we mark them as such before we do anything else
let cid = e.cid();
let cid = *e.cid();
let deliverable = if e.event().is_init() {
e.set_deliverable(true);
deliverable.push(e);
Expand Down Expand Up @@ -70,20 +70,20 @@ impl OrderEvents {
unreachable!("Init events should have been filtered out since they're always deliverable");
}
Some(prev) => {
if let Some(in_mem_is_deliverable) = new_cids.get(&prev) {
if let Some(in_mem_is_deliverable) = new_cids.get(prev) {
if *in_mem_is_deliverable {
event.set_deliverable(true);
*new_cids.get_mut(&event.cid()).expect("CID must exist") = true;
*new_cids.get_mut(event.cid()).expect("CID must exist") = true;
deliverable.push(event);
} else {
undelivered_prevs_in_memory.push_back(event);
}
} else {
let (_exists, prev_deliverable) =
CeramicOneEvent::deliverable_by_cid(pool, &prev).await?;
CeramicOneEvent::deliverable_by_cid(pool, prev).await?;
if prev_deliverable {
event.set_deliverable(true);
*new_cids.get_mut(&event.cid()).expect("CID must exist") = true;
*new_cids.get_mut(event.cid()).expect("CID must exist") = true;
deliverable.push(event);
} else {
missing_history.push(event);
Expand All @@ -107,8 +107,8 @@ impl OrderEvents {
unreachable!("Init events should have been filtered out of the in memory set");
}
Some(prev) => {
if new_cids.get(&prev).map_or(false, |v| *v) {
*new_cids.get_mut(&event.cid()).expect("CID must exist") = true;
if new_cids.get(prev).map_or(false, |v| *v) {
*new_cids.get_mut(event.cid()).expect("CID must exist") = true;
event.set_deliverable(true);
deliverable.push(event);
// reset the iteration count since we made changes. once it doesn't change for a loop through the queue we're done
Expand Down
45 changes: 22 additions & 23 deletions service/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing::{debug, error, info, trace, warn};

use crate::{Error, Result};

use super::service::{DiscoveredEvent, EventMetadata};
use super::service::DiscoveredEvent;

type StreamCid = Cid;
type EventCid = Cid;
Expand Down Expand Up @@ -130,18 +130,16 @@ impl StreamEvent {
let (_cid, parsed) = unvalidated::Event::<Ipld>::decode_car(data.as_slice(), false)
.map_err(Error::new_app)?;

let metadata = EventMetadata::from(&parsed);

let known_prev = match &metadata {
EventMetadata::Init => {
let known_prev = match parsed.prev() {
None => {
assert!(
deliverable,
"Init event must always be deliverable. Found undelivered CID: {}",
cid
);
StreamEvent::InitEvent(cid)
}
EventMetadata::Data { prev, .. } | EventMetadata::Time { prev, .. } => {
Some(prev) => {
if deliverable {
trace!(%cid, "Found deliverable event in database");
StreamEvent::KnownDeliverable(StreamEventMetadata::new(cid, *prev))
Expand All @@ -161,9 +159,9 @@ impl StreamEvent {

impl From<DiscoveredEvent> for StreamEvent {
fn from(ev: DiscoveredEvent) -> Self {
match ev.metadata {
EventMetadata::Init => StreamEvent::InitEvent(ev.cid),
EventMetadata::Data { prev, .. } | EventMetadata::Time { prev, .. } => {
match ev.prev {
None => StreamEvent::InitEvent(ev.cid),
Some(prev) => {
let meta = StreamEventMetadata::new(ev.cid, prev);
if ev.known_deliverable {
StreamEvent::KnownDeliverable(meta)
Expand Down Expand Up @@ -540,22 +538,23 @@ impl OrderingState {
let mut event_cnt = 0;
let mut discovered_inits = Vec::new();
for (cid, parsed_event) in event_data {
let metadata = EventMetadata::from(&parsed_event);

let (stream_cid, loaded) = match &metadata {
EventMetadata::Init => {
discovered_inits.push(cid);
continue;
}
EventMetadata::Data { stream_cid, prev }
| EventMetadata::Time { stream_cid, prev } => (
stream_cid,
StreamEvent::Undelivered(StreamEventMetadata::new(cid, *prev)),
),
};
if parsed_event.is_init() {
discovered_inits.push(cid);
continue;
}

event_cnt += 1;
self.add_stream_event(*stream_cid, loaded);
let stream_cid = parsed_event
.id()
.expect("id must exist for non-init events");
let prev = parsed_event
.id()
.expect("prev must exist for non-init events");

self.add_stream_event(
*stream_cid,
StreamEvent::Undelivered(StreamEventMetadata::new(cid, *prev)),
);
}
// while undelivered init events should be unreachable, we can fix the state if it happens so we won't panic in release mode
// and simply correct things in the database. We could make this fatal in the future, but for now it's just a warning to
Expand Down
69 changes: 17 additions & 52 deletions service/src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,12 @@ impl CeramicEventService {
.iter()
.chain(ordered.missing_history().iter())
{
let metadata = EventMetadata::from(ev.event());

if new.contains(&ev.cid()) {
if new.contains(ev.cid()) {
self.send_discovered_event(DiscoveredEvent {
cid: ev.cid(),
cid: *ev.cid(),
prev: ev.event().prev().copied(),
id: ev.event().id().copied(),
known_deliverable: ev.deliverable(),
metadata: metadata.to_owned(),
})
.await?;
}
Expand Down Expand Up @@ -248,56 +247,22 @@ pub struct InsertResult {
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DiscoveredEvent {
pub cid: ceramic_core::Cid,
pub known_deliverable: bool,
pub metadata: EventMetadata,
pub(crate) struct DiscoveredEvent {
/// The Cid of this event.
pub(crate) cid: Cid,
/// The prev event that this event builds on.
pub(crate) prev: Option<Cid>,
/// The Cid of the init event that identifies the stream this event belongs to.
pub(crate) id: Option<Cid>,
/// Whether this event is known to already be deliverable.
pub(crate) known_deliverable: bool,
}

impl DiscoveredEvent {
pub(crate) fn stream_cid(&self) -> ceramic_core::Cid {
match self.metadata {
EventMetadata::Init => self.cid,
EventMetadata::Data { stream_cid, .. } | EventMetadata::Time { stream_cid, .. } => {
stream_cid
}
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
/// An event header wrapper for use in the store crate.
/// TODO: replace this with something from the event crate
pub(crate) enum EventMetadata {
/// The init CID and stream CID are the same
Init,
Data {
stream_cid: ceramic_core::Cid,
prev: ceramic_core::Cid,
},
Time {
stream_cid: ceramic_core::Cid,
prev: ceramic_core::Cid,
},
}

impl From<&unvalidated::Event<Ipld>> for EventMetadata {
// TODO(AES-312): can we remove EventMetadata entirely?
fn from(value: &unvalidated::Event<Ipld>) -> Self {
match value {
unvalidated::Event::Time(t) => EventMetadata::Time {
stream_cid: t.id(),
prev: t.prev(),
},

unvalidated::Event::Signed(signed) => match signed.payload() {
unvalidated::Payload::Data(d) => EventMetadata::Data {
stream_cid: *d.id(),
prev: *d.prev(),
},
unvalidated::Payload::Init(_init) => EventMetadata::Init,
},
unvalidated::Event::Unsigned(_init) => EventMetadata::Init,
pub(crate) fn stream_cid(&self) -> Cid {
match self.id {
None => self.cid, // init event
Some(id) => id,
}
}
}
6 changes: 3 additions & 3 deletions service/src/tests/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ async fn random_signed_data_event() -> Vec<unvalidated::Event<Ipld>> {
let init = random_signed_init_event().await;
let data = ipld_core::ipld!({"key": thread_rng().gen::<u32>()});
let payload = unvalidated::Builder::data()
.with_id(init.envelope_cid())
.with_prev(init.envelope_cid())
.with_id(*init.envelope_cid())
.with_prev(*init.envelope_cid())
.with_data(data)
.build();
vec![
Expand Down Expand Up @@ -169,7 +169,7 @@ async fn random_unsigned_init_time_event() -> Vec<unvalidated::Event<Ipld>> {
async fn random_signed_init_time_event() -> Vec<unvalidated::Event<Ipld>> {
let init = random_signed_init_event().await;

let time_event = random_time_event(init.envelope_cid()).await.into();
let time_event = random_time_event(*init.envelope_cid()).await.into();
vec![init.into(), time_event]
}

Expand Down
4 changes: 2 additions & 2 deletions service/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ async fn build_event_fixed_model(model: StreamId) -> TestEventInfo {
blocks: vec![
Block::new(
signed.encode_envelope().unwrap().into(),
signed.envelope_cid(),
*signed.envelope_cid(),
),
Block::new(
signed.encode_payload().unwrap().into(),
signed.payload_cid(),
*signed.payload_cid(),
),
],
car,
Expand Down
2 changes: 1 addition & 1 deletion store/src/sql/access/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl CeramicOneEvent {
}
// the item already existed so we didn't mark it as deliverable on insert
if !new_key && item.deliverable() {
Self::mark_ready_to_deliver(&mut tx, &item.cid()).await?;
Self::mark_ready_to_deliver(&mut tx, item.cid()).await?;
}
}
tx.commit().await.map_err(Error::from)?;
Expand Down
Loading

0 comments on commit 7c7a7b7

Please sign in to comment.