Skip to content

Commit

Permalink
wip: multiple tasks to order streams
Browse files Browse the repository at this point in the history
  • Loading branch information
dav1do committed Dec 4, 2024
1 parent d8b7d8f commit f389ea2
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 61 deletions.
163 changes: 103 additions & 60 deletions event-svc/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use ceramic_event::unvalidated;
use ceramic_sql::sqlite::SqliteConnection;
use cid::Cid;
use ipld_core::ipld::Ipld;
use itertools::Itertools;
use tokio::sync::mpsc::Sender;
use tracing::{debug, error, info, trace, warn};

Expand All @@ -31,42 +32,56 @@ pub struct DeliverableTask {
pub struct OrderingTask {}

impl OrderingTask {
/// Discover all undelivered events in the database and mark them deliverable if possible.
// TODO
/// Kept around to avoid updating all the tests
#[allow(dead_code)]
async fn process_all_undelivered_events(
event_access: Arc<EventAccess>,
max_iterations: usize,
batch_size: u32,
) -> Result<usize> {
Self::process_all_undelivered_events_with_tasks(event_access, max_iterations, batch_size, 1)
.await
}

/// Discover all undelivered events in the database and mark them deliverable if possible.
/// Returns the number of events marked deliverable.
pub async fn process_all_undelivered_events_with_tasks(
event_access: Arc<EventAccess>,
max_iterations: usize,
batch_size: u32,
number_tasks: u32,
) -> Result<usize> {

let (tx, rx_inserted) = tokio::sync::mpsc::channel::<DiscoveredEvent>(10_000);

let event_access_cln = event_access.clone();
let writer_handle =
tokio::task::spawn(async move { Self::run_loop(event_access_cln, rx_inserted).await });
let handle = tokio::runtime::Handle::current();
let writer_handle = std::thread::spawn(move || {
handle.block_on(Self::run_loop(event_access_cln, rx_inserted))
});

let cnt = match OrderingState::process_all_undelivered_events(
event_access,
max_iterations,
batch_size,
tx,
number_tasks
number_tasks,
)
.await
{
Ok(cnt) => cnt,
Err(e) => {
error!("encountered error processing undelivered events: {}", e);
writer_handle.abort();
writer_handle.join().unwrap(); // TODO: this needs to abort via channel
return Err(Error::new_fatal(anyhow!(
"failed to process undelivered events: {}",
e
)));
}
};
info!("Waiting for {cnt} undelivered events to finish ordering...");
if let Err(e) = writer_handle.await {
error!("event ordering task failed to complete: {}", e);
if let Err(e) = writer_handle.join() {
error!("event ordering task failed to complete: {:?}", e);
}
Ok(cnt)
}
Expand All @@ -83,15 +98,6 @@ impl OrderingTask {
}
}

/// Kept around to avoid updating all the tests
async fn process_all_undelivered_events(
event_access: Arc<EventAccess>,
max_iterations: usize,
batch_size: u32,
) -> Result<usize> {
Self::process_all_undelivered_events_with_tasks(event_access, max_iterations, batch_size, 1).await
}

async fn run_loop(
event_access: Arc<EventAccess>,
mut rx_inserted: tokio::sync::mpsc::Receiver<DiscoveredEvent>,
Expand All @@ -105,17 +111,30 @@ impl OrderingTask {
if rx_inserted.recv_many(&mut recon_events, 1000).await > 0 {
trace!(?recon_events, "new events discovered!");
state.add_inserted_events(recon_events);
// read out all the events we can since they might be useful to us while processing
if !rx_inserted.is_empty() {
let to_take = rx_inserted.len(); //.min(1000);
let mut recon_events = Vec::with_capacity(to_take);
// are we going to get behind by yielding again?
rx_inserted.recv_many(&mut recon_events, to_take).await;
state.add_inserted_events(recon_events);
}

if let Err(should_exit) = state
state = match state
.process_streams(Arc::clone(&event_access))
.await
.map_err(Self::log_error)
{
if should_exit {
error!("Ordering task exiting due to fatal error");
return;
Ok(s) => s,
Err(should_exit) => {
if should_exit {
error!("Ordering task exiting due to fatal error");
return;
} else {
panic!("handle this")
}
}
}
};
}
}
// read and process everything that's still in the channel
Expand Down Expand Up @@ -170,18 +189,11 @@ impl StreamEvent {
}

/// Builds a stream event from the database if it exists.
async fn load_by_cid(
event_access: &Arc<EventAccess>,
conn: &mut SqliteConnection,
cid: EventCid,
) -> Result<Option<Self>> {
async fn load_by_cid(conn: &mut SqliteConnection, cid: EventCid) -> Result<Option<Self>> {
// TODO: Condense the multiple DB queries happening here into a single query
let (exists, deliverable) = event_access
.deliverable_by_cid_with_conn(&cid, conn)
.await?;
let (exists, deliverable) = EventAccess::deliverable_by_cid_with_conn(&cid, conn).await?;
if exists {
let data = event_access
.value_by_cid_with_conn(&cid, conn)
let data = EventAccess::value_by_cid_with_conn(&cid, conn)
.await?
.ok_or_else(|| {
Error::new_app(anyhow!(
Expand Down Expand Up @@ -387,7 +399,7 @@ impl StreamEvents {
}
}

async fn order_events(&mut self, event_access: Arc<EventAccess>) -> Result<()> {
async fn order_events(&mut self, conn: &mut SqliteConnection) -> Result<()> {
// We collect everything we can into memory and then order things.
// If our prev is deliverable then we can mark ourselves as deliverable. If our prev wasn't deliverable yet,
// we track it and repeat (i.e. add it to our state and the set we're iterating to attempt to load its prev).
Expand All @@ -405,8 +417,6 @@ impl StreamEvents {

trace!(count=%undelivered_q.len(), "undelivered events to process");

// use one connection for all reads in the loop. will close when dropped
let mut conn = event_access.detach_ro_connection().await?;
while let Some(StreamEventMetadata {
cid: undelivered_cid,
prev: desired_prev,
Expand All @@ -429,7 +439,7 @@ impl StreamEvents {
// nothing to do until it arrives on the channel
}
} else if let Some(discovered_prev) =
StreamEvent::load_by_cid(&event_access, &mut conn, desired_prev).await?
StreamEvent::load_by_cid(conn, desired_prev).await?
{
match &discovered_prev {
// we found our prev in the database and it's deliverable, so we're deliverable now
Expand Down Expand Up @@ -519,35 +529,71 @@ impl OrderingState {

/// Process every stream we know about that has undelivered events that should be "unlocked" now. This could be adjusted to commit things in batches,
/// but for now it assumes it can process all the streams and events in one go. It should be idempotent, so if it fails, it can be retried.
async fn process_streams(&mut self, event_access: Arc<EventAccess>) -> Result<()> {
for (_stream_cid, stream_events) in self.pending_by_stream.iter_mut() {
if stream_events.should_process {
stream_events
.order_events(Arc::clone(&event_access))
.await?;
self.deliverable
async fn process_streams(mut self, event_access: Arc<EventAccess>) -> Result<Self> {
let mut task_set = tokio::task::JoinSet::new();

let mut to_process = HashMap::new();
self.pending_by_stream.retain(|k, s| {
if s.should_process {
let v = std::mem::take(s);
to_process.insert(*k, v);
false
} else {
true
}
});
let chunk_size = (to_process.len() / 16).max(50); // 16 tasks for now or 1 if small
let mut resp = self;

let chunks = &to_process.into_iter().chunks(chunk_size);
let mut vec_cuz_chunks_not_sync_sadness = Vec::with_capacity(8);

for chunk in chunks {
let streams: Vec<(Cid, StreamEvents)> = chunk.collect();
vec_cuz_chunks_not_sync_sadness.push(streams);
}

for mut streams in vec_cuz_chunks_not_sync_sadness.into_iter() {
let mut conn = event_access.detach_ro_connection().await?;
task_set.spawn(async move {
for (_, stream_events) in streams.iter_mut() {
stream_events.order_events(&mut conn).await.unwrap();
}
streams
});
}

let mut all_streams = Vec::new();
while let Some(res) = task_set.join_next().await {
let streams = res.unwrap();
for (_, stream_events) in &streams {
resp.deliverable
.extend(stream_events.new_deliverable.iter());
}
all_streams.extend(streams);
}

match self.persist_ready_events(Arc::clone(&event_access)).await {
match resp.persist_ready_events(Arc::clone(&event_access)).await {
Ok(_) => {}
Err(err) => {
// Clear the queue as we'll rediscover it on the next run, rather than try to double update everything.
// We will no-op the updates so it doesn't really hurt but it's unnecessary.
// The StreamEvents in our pending_by_stream map all have their state updated in memory so we can pick up where we left off.
self.deliverable.clear();
resp.deliverable.clear();
return Err(err);
}
}
// keep things that still have missing history but don't process them again until we get something new
self.pending_by_stream
.retain(|_, stream_events| !stream_events.processing_completed());
for (cid, mut stream) in all_streams.into_iter() {
if !stream.processing_completed() {
resp.pending_by_stream.insert(cid, stream);
}
}

debug!(remaining_streams=%self.pending_by_stream.len(), "Finished processing streams");
trace!(stream_state=?self, "Finished processing streams");
debug!(remaining_streams=%resp.pending_by_stream.len(), "Finished processing streams");
trace!(stream_state=?resp, "Finished processing streams");

Ok(())
Ok(resp)
}

/// Process all undelivered events in the database. This is a blocking operation that could take a long time.
Expand Down Expand Up @@ -603,7 +649,7 @@ impl OrderingState {
if iter_cnt > max_iterations {
warn!(%batch_size, iterations=%iter_cnt, %task_id, "Exceeded max iterations for finding undelivered events!");
}

info!(%task_id, "Finished processing undelivered events");
Ok(())
});
Expand All @@ -612,26 +658,23 @@ impl OrderingState {
drop(tx);

let mut event_cnt = 0;
while let Some((number_processed, new_hw, task_id) ) = rx.recv().await {
while let Some((number_processed, new_hw, task_id)) = rx.recv().await {
event_cnt += number_processed;
if event_cnt % LOG_EVERY_N_ENTRIES < number_processed {
info!(count=%event_cnt, highwater=%new_hw, %task_id, "Processed undelivered events");
}
}
while let Some(res) = tasks.join_next().await {
match res {
Ok(v) => {
match v {
Ok(_) => {},
Err(error) => {
warn!(?error,"event ordering task failed while processing");
},
Ok(v) => match v {
Ok(_) => {}
Err(error) => {
warn!(?error, "event ordering task failed while processing");
}
},
Err(error) => {
warn!(?error, "event ordering task failed with JoinError");

},
}
}
}
Ok(event_cnt)
Expand Down
2 changes: 1 addition & 1 deletion event-svc/src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl EventService {
Arc::clone(&self.event_access),
MAX_ITERATIONS,
DELIVERABLE_EVENTS_BATCH_SIZE,
8,
16,
)
.await
}
Expand Down

0 comments on commit f389ea2

Please sign in to comment.