Skip to content

Commit

Permalink
refactor: rename structs with Meta instead of Header
Browse files Browse the repository at this point in the history
  • Loading branch information
dav1do committed Jun 19, 2024
1 parent ac95bc2 commit e6b034b
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 36 deletions.
8 changes: 4 additions & 4 deletions service/src/event/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use cid::Cid;

use crate::Result;

use super::service::EventHeader;
use super::service::EventMetadata;

pub(crate) struct OrderEvents {
pub(crate) deliverable: Vec<(EventInsertable, EventHeader)>,
pub(crate) missing_history: Vec<(EventInsertable, EventHeader)>,
pub(crate) deliverable: Vec<(EventInsertable, EventMetadata)>,
pub(crate) missing_history: Vec<(EventInsertable, EventMetadata)>,
}

impl OrderEvents {
Expand All @@ -18,7 +18,7 @@ impl OrderEvents {
/// Will look up the prev from the database if needed to check if it's deliverable (could possibly change this for recon and allow the ordering task to handle it?)
pub async fn try_new(
pool: &SqlitePool,
mut candidate_events: Vec<(EventInsertable, EventHeader)>,
mut candidate_events: Vec<(EventInsertable, EventMetadata)>,
) -> Result<Self> {
let mut new_cids: HashMap<Cid, bool> = HashMap::from_iter(
candidate_events
Expand Down
26 changes: 13 additions & 13 deletions service/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tracing::{debug, error, info, trace, warn};

use crate::{CeramicEventService, Error, Result};

use super::service::{EventHeader, InsertableBodyWithHeader};
use super::service::{EventMetadata, InsertableBodyWithMeta};

/// How many events to select at once to see if they've become deliverable when we have downtime
/// Used at startup and occassionally in case we ever dropped something
Expand All @@ -27,7 +27,7 @@ pub struct DeliverableTask {
pub(crate) _handle: tokio::task::JoinHandle<()>,
/// Currently events discovered over recon that are out of order and need to be marked ready (deliverable)
/// when their prev chain is discovered and complete (i.e. my prev is deliverable then I am deliverable).
pub(crate) tx_inserted: tokio::sync::mpsc::Sender<InsertableBodyWithHeader>,
pub(crate) tx_inserted: tokio::sync::mpsc::Sender<InsertableBodyWithMeta>,
}

#[derive(Debug)]
Expand All @@ -36,7 +36,7 @@ pub struct OrderingTask {}
impl OrderingTask {
pub async fn run(pool: SqlitePool, q_depth: usize, load_delivered: bool) -> DeliverableTask {
let (tx_inserted, rx_inserted) =
tokio::sync::mpsc::channel::<InsertableBodyWithHeader>(q_depth);
tokio::sync::mpsc::channel::<InsertableBodyWithMeta>(q_depth);

let handle =
tokio::spawn(async move { Self::run_loop(pool, load_delivered, rx_inserted).await });
Expand All @@ -50,7 +50,7 @@ impl OrderingTask {
async fn run_loop(
pool: SqlitePool,
load_undelivered: bool,
mut rx_inserted: tokio::sync::mpsc::Receiver<InsertableBodyWithHeader>,
mut rx_inserted: tokio::sync::mpsc::Receiver<InsertableBodyWithMeta>,
) {
// before starting, make sure we've updated any events in the database we missed
// this could take a long time. possibly we want to put it in another task so we can start processing events immediately
Expand Down Expand Up @@ -146,7 +146,7 @@ impl StreamEvent {
};

let known_prev = match &parsed_body.header {
EventHeader::Init { cid, .. } => {
EventMetadata::Init { cid, .. } => {
if !deliverable {
warn!(%cid,"Found init event in database that wasn't previously marked as deliverable. Updating now...");
let mut tx = pool.begin_tx().await?;
Expand All @@ -155,7 +155,7 @@ impl StreamEvent {
}
StreamEvent::InitEvent(*cid)
}
EventHeader::Data { prev, .. } | EventHeader::Time { prev, .. } => {
EventMetadata::Data { prev, .. } | EventMetadata::Time { prev, .. } => {
if deliverable {
trace!(%cid, "Found deliverable event in database");
StreamEvent::KnownDeliverable(StreamEventMetadata::new(cid, *prev))
Expand All @@ -173,11 +173,11 @@ impl StreamEvent {
}
}

impl From<InsertableBodyWithHeader> for StreamEvent {
fn from(ev: InsertableBodyWithHeader) -> Self {
impl From<InsertableBodyWithMeta> for StreamEvent {
fn from(ev: InsertableBodyWithMeta) -> Self {
match ev.header {
EventHeader::Init { cid, .. } => StreamEvent::InitEvent(cid),
EventHeader::Data { cid, prev, .. } | EventHeader::Time { cid, prev, .. } => {
EventMetadata::Init { cid, .. } => StreamEvent::InitEvent(cid),
EventMetadata::Data { cid, prev, .. } | EventMetadata::Time { cid, prev, .. } => {
let meta = StreamEventMetadata::new(cid, prev);
if ev.body.deliverable() {
StreamEvent::KnownDeliverable(meta)
Expand Down Expand Up @@ -388,7 +388,7 @@ impl OrderingState {
/// Add a stream to the list of streams to process.
/// We ignore delivered events for streams we're not tracking as we can look them up later if we need them.
/// We will get lots of init events we can ignore unless we need them, otherwise they'll be stuck in memory for a long time.
fn add_inserted_events(&mut self, events: Vec<InsertableBodyWithHeader>) {
fn add_inserted_events(&mut self, events: Vec<InsertableBodyWithMeta>) {
for ev in events {
let stream_cid = ev.header.stream_cid();
let event = ev.into();
Expand Down Expand Up @@ -484,12 +484,12 @@ impl OrderingState {
let loaded = CeramicEventService::parse_event_carfile_cid(event_cid, &carfile).await?;

let event = match &loaded.header {
EventHeader::Init { cid, .. } => {
EventMetadata::Init { cid, .. } => {
warn!(%cid,"Found init event in database that wasn't previously marked as deliverable. Updating now...");
to_store_asap.push(*cid);
StreamEvent::InitEvent(*cid)
}
EventHeader::Data { cid, prev, .. } | EventHeader::Time { cid, prev, .. } => {
EventMetadata::Data { cid, prev, .. } | EventMetadata::Time { cid, prev, .. } => {
StreamEvent::Undelivered(StreamEventMetadata::new(*cid, *prev))
}
};
Expand Down
38 changes: 19 additions & 19 deletions service/src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl CeramicEventService {
pub(crate) async fn parse_event_carfile_order_key(
event_id: EventId,
carfile: &[u8],
) -> Result<(EventInsertable, EventHeader)> {
) -> Result<(EventInsertable, EventMetadata)> {
let mut insertable = EventInsertable::try_from_carfile(event_id, carfile).await?;

let header = Self::parse_event_body(&mut insertable.body).await?;
Expand All @@ -82,14 +82,14 @@ impl CeramicEventService {
pub(crate) async fn parse_event_carfile_cid(
cid: ceramic_core::Cid,
carfile: &[u8],
) -> Result<InsertableBodyWithHeader> {
) -> Result<InsertableBodyWithMeta> {
let mut body = EventInsertableBody::try_from_carfile(cid, carfile).await?;

let header = Self::parse_event_body(&mut body).await?;
Ok(InsertableBodyWithHeader { body, header })
Ok(InsertableBodyWithMeta { body, header })
}

pub(crate) async fn parse_event_body(body: &mut EventInsertableBody) -> Result<EventHeader> {
pub(crate) async fn parse_event_body(body: &mut EventInsertableBody) -> Result<EventMetadata> {
let cid = body.cid(); // purely for convenience writing out the match
let ev_block = body.block_for_cid(&cid)?;

Expand All @@ -104,7 +104,7 @@ impl CeramicEventService {
let (deliverable, header) = match event_ipld {
unvalidated::RawEvent::Time(t) => (
false,
EventHeader::Time {
EventMetadata::Time {
cid,
stream_cid: t.id(),
prev: t.prev(),
Expand All @@ -131,16 +131,16 @@ impl CeramicEventService {
match payload {
unvalidated::Payload::Data(d) => (
false,
EventHeader::Data {
EventMetadata::Data {
cid,
stream_cid: *d.id(),
prev: *d.prev(),
},
),
unvalidated::Payload::Init(_init) => (true, EventHeader::Init { cid }),
unvalidated::Payload::Init(_init) => (true, EventMetadata::Init { cid }),
}
}
unvalidated::RawEvent::Unsigned(_init) => (true, EventHeader::Init { cid }),
unvalidated::RawEvent::Unsigned(_init) => (true, EventMetadata::Init { cid }),
};
body.set_deliverable(deliverable);
Ok(header)
Expand Down Expand Up @@ -239,7 +239,7 @@ impl CeramicEventService {
.iter()
.find(|(i, _)| i.order_key == ev.order_key)
{
let new = InsertableBodyWithHeader {
let new = InsertableBodyWithMeta {
body: ev.body.clone(),
header: header.to_owned(),
};
Expand Down Expand Up @@ -274,7 +274,7 @@ impl CeramicEventService {
pub(crate) async fn load_by_cid(
pool: &SqlitePool,
cid: ceramic_core::Cid,
) -> Result<Option<InsertableBodyWithHeader>> {
) -> Result<Option<InsertableBodyWithMeta>> {
let data = if let Some(ev) = CeramicOneEvent::value_by_cid(pool, &cid).await? {
ev
} else {
Expand All @@ -283,7 +283,7 @@ impl CeramicEventService {

let mut body = EventInsertableBody::try_from_carfile(cid, &data).await?;
let header = Self::parse_event_body(&mut body).await?;
Ok(Some(InsertableBodyWithHeader { body, header }))
Ok(Some(InsertableBodyWithMeta { body, header }))
}
}

Expand Down Expand Up @@ -311,15 +311,15 @@ impl From<InsertResult> for Vec<ceramic_api::EventInsertResult> {
}

#[derive(Debug, Clone)]
pub(crate) struct InsertableBodyWithHeader {
pub(crate) struct InsertableBodyWithMeta {
pub(crate) body: EventInsertableBody,
pub(crate) header: EventHeader,
pub(crate) header: EventMetadata,
}

#[derive(Debug, Clone, PartialEq, Eq)]
/// An event header wrapper for use in the store crate.
/// TODO: replace this with something from the event crate
pub(crate) enum EventHeader {
pub(crate) enum EventMetadata {
Init {
cid: ceramic_core::Cid,
},
Expand All @@ -335,21 +335,21 @@ pub(crate) enum EventHeader {
},
}

impl EventHeader {
impl EventMetadata {
/// Returns the stream CID of the event
pub(crate) fn stream_cid(&self) -> ceramic_core::Cid {
match self {
EventHeader::Init { cid, .. } => *cid,
EventHeader::Data { stream_cid, .. } | EventHeader::Time { stream_cid, .. } => {
EventMetadata::Init { cid, .. } => *cid,
EventMetadata::Data { stream_cid, .. } | EventMetadata::Time { stream_cid, .. } => {
*stream_cid
}
}
}

pub(crate) fn prev(&self) -> Option<ceramic_core::Cid> {
match self {
EventHeader::Init { .. } => None,
EventHeader::Data { prev, .. } | EventHeader::Time { prev, .. } => Some(*prev),
EventMetadata::Init { .. } => None,
EventMetadata::Data { prev, .. } | EventMetadata::Time { prev, .. } => Some(*prev),
}
}
}

0 comments on commit e6b034b

Please sign in to comment.