From 91e5cd26670312478ef2bc1e2fafa821d04da938 Mon Sep 17 00:00:00 2001 From: YoEight Date: Fri, 10 Jan 2025 00:30:16 -0500 Subject: [PATCH 1/3] fix: Report fatal error from the server to client properly. --- .config/nextest.toml | 2 ++ .gitignore | 1 + geth-client-tests/src/delete_tests.rs | 2 -- geth-engine/src/process.rs | 28 ++++++++++++++++++++-- geth-engine/src/process/messages.rs | 1 + geth-engine/src/process/writing/entries.rs | 9 +++---- geth-mikoshi/src/wal/log_writer.rs | 9 +++++++ 7 files changed, 44 insertions(+), 8 deletions(-) create mode 100644 .config/nextest.toml diff --git a/.config/nextest.toml b/.config/nextest.toml new file mode 100644 index 0000000..8cce1e8 --- /dev/null +++ b/.config/nextest.toml @@ -0,0 +1,2 @@ +[profile.default] +slow-timeout = { period = "15s", terminate-after = 1 } diff --git a/.gitignore b/.gitignore index 7f5d37f..a29f9cf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ target/ .idea/ proptest-regressions/ +mutants.out/ diff --git a/geth-client-tests/src/delete_tests.rs b/geth-client-tests/src/delete_tests.rs index 8ecd124..1b9bd3b 100644 --- a/geth-client-tests/src/delete_tests.rs +++ b/geth-client-tests/src/delete_tests.rs @@ -1,5 +1,3 @@ -use std::u64; - use bytes::Bytes; use fake::{faker::name::en::Name, Fake}; use geth_client::GrpcClient; diff --git a/geth-engine/src/process.rs b/geth-engine/src/process.rs index f5886e6..60d273e 100644 --- a/geth-engine/src/process.rs +++ b/geth-engine/src/process.rs @@ -2,7 +2,7 @@ use bb8::Pool; use geth_mikoshi::storage::Storage; use geth_mikoshi::wal::chunks::ChunkContainer; use geth_mikoshi::{FileSystemStorage, InMemoryStorage}; -use messages::Messages; +use messages::{Messages, Responses}; use resource::{create_buffer_pool, BufferManager}; use std::collections::HashMap; use std::future::Future; @@ -79,6 +79,7 @@ enum Topology { struct RunningProc { id: ProcId, proc: Proc, + last_received_request: Uuid, mailbox: Mailbox, } @@ -123,6 +124,7 @@ impl Catalog { } fn terminate(&mut self, proc_id: ProcId) -> Option { + tracing::debug!("looking up terminated process {} runtime info...", proc_id); if let Some(running) = self.monitor.remove(&proc_id) { if let Some(mut topology) = self.inner.get_mut(&running.proc) { match &mut topology { @@ -132,9 +134,11 @@ impl Catalog { } } + tracing::debug!("process {} runtime info was found", proc_id); return Some(running); } + tracing::debug!("no running info was found for process {}", proc_id); None } @@ -216,9 +220,12 @@ where Item::Mail(mail) => { if let Some(resp) = self.requests.remove(&mail.correlation) { let _ = resp.send(mail); - } else if let Some(proc) = self.catalog.monitor.get(&dest) { + } else if let Some(proc) = self.catalog.monitor.get_mut(&dest) { if let Some(resp) = resp { self.requests.insert(mail.correlation, resp); + proc.last_received_request = mail.correlation; + } else { + proc.last_received_request = Uuid::nil(); } let _ = proc.mailbox.send(Item::Mail(mail)); @@ -290,6 +297,21 @@ where } else { tracing::info!("process {:?}:{} terminated", running.proc, id); } + + if let Some(resp) = self.requests.remove(&running.last_received_request) { + tracing::warn!( + "process {:?}:{} terminated with pending request", + running.proc, + id + ); + + let _ = resp.send(Mail { + origin: running.id, + correlation: running.last_received_request, + payload: Messages::Responses(Responses::FatalError), + created: Instant::now(), + }); + } } if self.closing { @@ -693,6 +715,7 @@ where id, proc, mailbox: Mailbox::Raw(proc_sender), + last_received_request: Uuid::nil(), } } @@ -724,5 +747,6 @@ where id, proc, mailbox: Mailbox::Tokio(proc_sender), + last_received_request: Uuid::nil(), } } diff --git a/geth-engine/src/process/messages.rs b/geth-engine/src/process/messages.rs index 6b893dc..68bd184 100644 --- a/geth-engine/src/process/messages.rs +++ b/geth-engine/src/process/messages.rs @@ -253,6 +253,7 @@ pub enum Responses { Subscribe(SubscribeResponses), Write(WriteResponses), TestSink(TestSinkResponses), + FatalError, } #[derive(Debug)] diff --git a/geth-engine/src/process/writing/entries.rs b/geth-engine/src/process/writing/entries.rs index 21fa8e3..3dd6a43 100644 --- a/geth-engine/src/process/writing/entries.rs +++ b/geth-engine/src/process/writing/entries.rs @@ -46,10 +46,11 @@ impl LogEntries for ProposeEntries { } fn current_entry_size(&self) -> usize { - size_of::() // revision - + size_of::() // stream name length - + self.ident.len() // stream name - + propose_estimate_size(self.current.as_ref().unwrap()) + 1 + // size_of::() // revision + // + size_of::() // stream name length + // + self.ident.len() // stream name + // + propose_estimate_size(self.current.as_ref().unwrap()) } fn write_current_entry(&mut self, buffer: &mut BytesMut, position: u64) { diff --git a/geth-mikoshi/src/wal/log_writer.rs b/geth-mikoshi/src/wal/log_writer.rs index 0425429..bb34cdc 100644 --- a/geth-mikoshi/src/wal/log_writer.rs +++ b/geth-mikoshi/src/wal/log_writer.rs @@ -73,6 +73,15 @@ where let mut payload_buffer = self.buffer.split_off(ENTRY_PREFIX_SIZE); entries.write_current_entry(&mut payload_buffer, position); + + if payload_buffer.len() != entry_size { + eyre::bail!( + "payload size mismatch: expected {}, got {}", + entry_size, + payload_buffer.len() + ); + } + payload_buffer.put_u32_le(reported_size); self.buffer.unsplit(payload_buffer); let record = self.buffer.split().freeze(); From 5b91c20f63f66aa4cc936f96243d860c065187d1 Mon Sep 17 00:00:00 2001 From: YoEight Date: Fri, 10 Jan 2025 20:25:47 -0500 Subject: [PATCH 2/3] pass down server runtime errors to the grpc client. --- .github/workflows/pr.yml | 10 +++++++-- geth-client-tests/src/lib.rs | 2 +- geth-client/src/next/driver.rs | 14 +++++++++++- geth-client/src/next/grpc.rs | 9 +++++++- geth-client/src/next/mod.rs | 14 +++++++++--- geth-common/src/client.rs | 2 ++ geth-common/src/lib.rs | 17 +++++++++++++- geth-consensus/src/tests/storage/in_mem.rs | 5 ++--- geth-domain/src/index/tests/fs/lsm.rs | 10 +++++---- geth-domain/src/index/tests/fs/ss_table.rs | 2 +- geth-domain/src/index/tests/in_mem/block.rs | 2 +- geth-domain/src/index/tests/in_mem/lsm.rs | 8 ++++--- .../src/index/tests/in_mem/mem_table.rs | 2 +- .../src/index/tests/in_mem/ss_table.rs | 2 +- geth-engine/src/process/grpc/protocol.rs | 15 ++++++++++++- geth-engine/src/process/sink.rs | 5 ++--- geth-engine/src/process/tests/mod.rs | 2 +- geth-engine/src/process/tests/reading.rs | 5 ++--- geth-engine/src/process/tests/subscribing.rs | 5 ++--- geth-engine/src/process/writing/entries.rs | 9 ++++---- geth-mikoshi/src/wal/log_reader.rs | 13 ++++++----- geth-mikoshi/src/wal/mod.rs | 22 +++++++++++++++++++ protos/protocol.proto | 1 + 23 files changed, 132 insertions(+), 44 deletions(-) diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 7fec6fa..9c13f3d 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -44,8 +44,14 @@ jobs: - name: Build run: cargo check --all-targets + - name: Install cargo-nextest + uses: baptiste0928/cargo-install@v3 + with: + crate: cargo-nextest + locked: true + - name: Run tests - run: cargo test --all-targets + run: cargo nextest run --hide-progress-bar linting: needs: build @@ -78,4 +84,4 @@ jobs: run: cargo clippy --all-features -- -D warnings - name: Format - run: cargo fmt -- --check \ No newline at end of file + run: cargo fmt -- --check diff --git a/geth-client-tests/src/lib.rs b/geth-client-tests/src/lib.rs index 864f3fd..f75da94 100644 --- a/geth-client-tests/src/lib.rs +++ b/geth-client-tests/src/lib.rs @@ -16,7 +16,7 @@ pub mod tests { #[ctor::ctor] fn test_init() { - let _ = tracing_subscriber::fmt::fmt() + tracing_subscriber::fmt::fmt() .with_env_filter(EnvFilter::new("geth_engine=debug")) // .with_max_level(tracing::Level::DEBUG) .with_file(true) diff --git a/geth-client/src/next/driver.rs b/geth-client/src/next/driver.rs index a7382c4..ddd6cf8 100644 --- a/geth-client/src/next/driver.rs +++ b/geth-client/src/next/driver.rs @@ -5,7 +5,7 @@ use eyre::bail; use uuid::Uuid; use geth_common::generated::next::protocol; -use geth_common::{EndPoint, Operation, OperationIn, OperationOut}; +use geth_common::{EndPoint, Operation, OperationIn, OperationOut, Reply}; use crate::next::{connect_to_node, Command, ConnErr, Connection, Mailbox}; @@ -104,6 +104,18 @@ impl Driver { tracing::warn!("received an event that is not related to any command"); } + pub fn handle_disconnect(&mut self) { + tracing::warn!("connection was closed by the server"); + self.connection = None; + + for (correlation, cmd) in self.registry.drain() { + let _ = cmd.resp.send(OperationOut { + correlation, + reply: Reply::ServerDisconnected, + }); + } + } + /// We might consider implementing a retry logic here. async fn connect(&mut self) -> eyre::Result<()> { let uri = format!("http://{}:{}", self.endpoint.host, self.endpoint.port) diff --git a/geth-client/src/next/grpc.rs b/geth-client/src/next/grpc.rs index 7d66ebf..93e2d9f 100644 --- a/geth-client/src/next/grpc.rs +++ b/geth-client/src/next/grpc.rs @@ -103,7 +103,14 @@ impl Client for GrpcClient { if let Some(out) = task.recv().await? { match out { Reply::AppendStreamCompleted(resp) => Ok(resp), - _ => eyre::bail!("unexpected reply when appending events to '{}'", stream_id), + Reply::Error(e) => { + eyre::bail!("server error when writing to stream '{}': {}", stream_id, e) + } + x => eyre::bail!( + "unexpected reply when appending events to '{}' = {:?}", + stream_id, + x + ), } } else { eyre::bail!( diff --git a/geth-client/src/next/mod.rs b/geth-client/src/next/mod.rs index 3cfbe22..c3ebfba 100644 --- a/geth-client/src/next/mod.rs +++ b/geth-client/src/next/mod.rs @@ -16,6 +16,7 @@ pub mod grpc; pub enum Msg { Command(Command), Event(OperationOut), + Disconnected, } #[derive(Clone)] @@ -48,18 +49,21 @@ pub(crate) async fn connect_to_node(uri: &Uri, mailbox: Mailbox) -> Result { - tracing::error!("Error receiving response: {:?}", e); - break; + tracing::error!("error receiving response: {:?}", e); + // TODO - Needs find a way to handle unexpected errors so we avoid + // stalled operations. } Ok(out) => { if mailbox.send(Msg::Event(out.into())).is_err() { tracing::warn!("seems main connection is closed"); - break; + return; } } } } + + let _ = mailbox.send(Msg::Disconnected); }); Ok(connection) @@ -75,6 +79,10 @@ pub(crate) async fn multiplex_loop(mut driver: Driver, mut receiver: UnboundedRe } Msg::Event(event) => driver.handle_event(event), + + Msg::Disconnected => { + driver.handle_disconnect(); + } } } } diff --git a/geth-common/src/client.rs b/geth-common/src/client.rs index 2275466..de24f39 100644 --- a/geth-common/src/client.rs +++ b/geth-common/src/client.rs @@ -11,6 +11,7 @@ use crate::{ SubscriptionConfirmation, }; +#[derive(Debug)] pub enum SubscriptionEvent { EventAppeared(Record), Confirmed(SubscriptionConfirmation), @@ -18,6 +19,7 @@ pub enum SubscriptionEvent { Unsubscribed(UnsubscribeReason), } +#[derive(Debug)] pub enum UnsubscribeReason { User, Server, diff --git a/geth-common/src/lib.rs b/geth-common/src/lib.rs index 4d5167c..773a8b6 100644 --- a/geth-common/src/lib.rs +++ b/geth-common/src/lib.rs @@ -36,7 +36,7 @@ impl From for Uuid { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct EndPoint { pub host: String, pub port: u16, @@ -113,6 +113,7 @@ impl From for OperationIn { } } +#[derive(Debug)] pub enum Reply { AppendStreamCompleted(AppendStreamCompleted), StreamRead(StreamRead), @@ -121,6 +122,8 @@ pub enum Reply { ProgramsListed(ProgramListed), ProgramKilled(ProgramKilled), ProgramObtained(ProgramObtained), + ServerDisconnected, + Error(String), } pub struct OperationOut { @@ -157,6 +160,7 @@ impl From for OperationOut { operation_out::Operation::ProgramKilled(resp) => Reply::ProgramKilled(resp.into()), operation_out::Operation::ProgramGot(resp) => Reply::ProgramObtained(resp.into()), + operation_out::Operation::Error(e) => Reply::Error(e), }; Self { correlation, reply } @@ -165,6 +169,7 @@ impl From for OperationOut { impl TryFrom for next::protocol::OperationOut { type Error = eyre::Report; + fn try_from(value: OperationOut) -> eyre::Result { let correlation = Some(value.correlation.into()); let operation = match value.reply { @@ -183,6 +188,12 @@ impl TryFrom for next::protocol::OperationOut { Reply::ProgramKilled(resp) => operation_out::Operation::ProgramKilled(resp.into()), Reply::ProgramObtained(resp) => operation_out::Operation::ProgramGot(resp.into()), + + Reply::ServerDisconnected => { + eyre::bail!("not supposed to send server disconnected message to the server"); + } + + Reply::Error(e) => operation_out::Operation::Error(e), }; Ok(Self { @@ -1074,6 +1085,7 @@ impl ReadStreamCompleted { } } +#[derive(Debug)] pub enum StreamRead { EndOfStream, EventAppeared(Record), @@ -1116,6 +1128,7 @@ impl TryFrom for operation_out::StreamRead { } } +#[derive(Debug)] pub enum SubscriptionConfirmation { StreamName(String), ProcessId(Uuid), @@ -1205,6 +1218,7 @@ impl From for SubscriptionError { } } +#[derive(Debug)] pub enum DeleteStreamCompleted { Success(WriteResult), Error(DeleteError), @@ -1220,6 +1234,7 @@ impl DeleteStreamCompleted { } } +#[derive(Debug)] pub enum DeleteError { StreamDeleted, WrongExpectedRevision(WrongExpectedRevisionError), diff --git a/geth-consensus/src/tests/storage/in_mem.rs b/geth-consensus/src/tests/storage/in_mem.rs index f46e891..c311309 100644 --- a/geth-consensus/src/tests/storage/in_mem.rs +++ b/geth-consensus/src/tests/storage/in_mem.rs @@ -67,8 +67,7 @@ impl PersistentStorage for InMemStorage { self.inner .iter() - .find(|e| e.index == entry_id.index && e.term == entry_id.term) - .is_some() + .any(|e| e.index == entry_id.index && e.term == entry_id.term) } } @@ -81,7 +80,7 @@ struct InMemIter<'a> { init: bool, } -impl<'a> IterateEntries for InMemIter<'a> { +impl IterateEntries for InMemIter<'_> { fn next(&mut self) -> std::io::Result> { if self.count >= self.limit { return Ok(None); diff --git a/geth-domain/src/index/tests/fs/lsm.rs b/geth-domain/src/index/tests/fs/lsm.rs index 28a6cf8..17d6226 100644 --- a/geth-domain/src/index/tests/fs/lsm.rs +++ b/geth-domain/src/index/tests/fs/lsm.rs @@ -75,8 +75,10 @@ fn test_fs_lsm_mem_table_scan() -> io::Result<()> { /// When scanning, it will access to sstables. #[test] fn test_fs_lsm_sync() -> io::Result<()> { - let mut setts = LsmSettings::default(); - setts.mem_table_max_size = MEM_TABLE_ENTRY_SIZE; + let setts = LsmSettings { + mem_table_max_size: MEM_TABLE_ENTRY_SIZE, + ..Default::default() + }; let temp = TempDir::default(); let root = PathBuf::from(temp.as_ref()); @@ -135,8 +137,8 @@ fn test_fs_lsm_serialization() -> io::Result<()> { assert_eq!(lsm.logical_position, actual.logical_position); - let actual_table_1 = actual.levels.get(&0).unwrap().front().clone().unwrap(); - let actual_table_2 = actual.levels.get(&1).unwrap().front().clone().unwrap(); + let actual_table_1 = actual.levels.get(&0).unwrap().front().unwrap(); + let actual_table_2 = actual.levels.get(&1).unwrap().front().unwrap(); assert_eq!(table1.id, actual_table_1.id); assert_eq!(table1.metas, actual_table_1.metas); diff --git a/geth-domain/src/index/tests/fs/ss_table.rs b/geth-domain/src/index/tests/fs/ss_table.rs index 72bf473..1bb0c71 100644 --- a/geth-domain/src/index/tests/fs/ss_table.rs +++ b/geth-domain/src/index/tests/fs/ss_table.rs @@ -1,5 +1,5 @@ +use std::io; use std::path::PathBuf; -use std::{io, u64}; use temp_testdir::TempDir; diff --git a/geth-domain/src/index/tests/in_mem/block.rs b/geth-domain/src/index/tests/in_mem/block.rs index b22c7e8..6de8697 100644 --- a/geth-domain/src/index/tests/in_mem/block.rs +++ b/geth-domain/src/index/tests/in_mem/block.rs @@ -1,4 +1,4 @@ -use std::{io, u64}; +use std::io; use geth_mikoshi::InMemoryStorage; diff --git a/geth-domain/src/index/tests/in_mem/lsm.rs b/geth-domain/src/index/tests/in_mem/lsm.rs index b31bdc2..c4c6c6a 100644 --- a/geth-domain/src/index/tests/in_mem/lsm.rs +++ b/geth-domain/src/index/tests/in_mem/lsm.rs @@ -1,4 +1,4 @@ -use std::{io, u64}; +use std::io; use geth_common::IteratorIO; use geth_mikoshi::InMemoryStorage; @@ -107,8 +107,10 @@ fn test_in_mem_lsm_mem_table_scan_backward() -> io::Result<()> { /// When scanning, it will access to in-mem sstables. #[test] fn test_in_mem_lsm_sync() -> io::Result<()> { - let mut setts = LsmSettings::default(); - setts.mem_table_max_size = MEM_TABLE_ENTRY_SIZE; + let setts = LsmSettings { + mem_table_max_size: MEM_TABLE_ENTRY_SIZE, + ..Default::default() + }; let mut lsm = Lsm::new(setts, InMemoryStorage::new()); diff --git a/geth-domain/src/index/tests/in_mem/mem_table.rs b/geth-domain/src/index/tests/in_mem/mem_table.rs index 39030ad..88efffc 100644 --- a/geth-domain/src/index/tests/in_mem/mem_table.rs +++ b/geth-domain/src/index/tests/in_mem/mem_table.rs @@ -1,4 +1,4 @@ -use std::{io, u64}; +use std::io; use geth_common::IteratorIO; use geth_mikoshi::InMemoryStorage; diff --git a/geth-domain/src/index/tests/in_mem/ss_table.rs b/geth-domain/src/index/tests/in_mem/ss_table.rs index f0b3c46..1101a97 100644 --- a/geth-domain/src/index/tests/in_mem/ss_table.rs +++ b/geth-domain/src/index/tests/in_mem/ss_table.rs @@ -1,4 +1,4 @@ -use std::{io, u64}; +use std::io; use geth_common::IteratorIO; use geth_mikoshi::InMemoryStorage; diff --git a/geth-engine/src/process/grpc/protocol.rs b/geth-engine/src/process/grpc/protocol.rs index 69fdb16..f72a093 100644 --- a/geth-engine/src/process/grpc/protocol.rs +++ b/geth-engine/src/process/grpc/protocol.rs @@ -200,7 +200,20 @@ async fn execute_operation( let correlation = input.correlation; match input.operation { Operation::AppendStream(params) => { - let completed = internal.writer.append(params.stream_name, params.expected_revision, params.events).await?; + let outcome = internal.writer.append(params.stream_name, params.expected_revision, params.events).await; + + let completed = match outcome { + Err(e) => { + yield OperationOut { + correlation, + reply: Reply::Error(e.to_string()), + }; + + return; + }, + + Ok(c) => c, + }; yield OperationOut { correlation, diff --git a/geth-engine/src/process/sink.rs b/geth-engine/src/process/sink.rs index 4f6f3fa..ac53b0d 100644 --- a/geth-engine/src/process/sink.rs +++ b/geth-engine/src/process/sink.rs @@ -6,8 +6,7 @@ use super::messages::{Messages, TestSinkRequests, TestSinkResponses}; pub async fn run(mut env: ProcessEnv) -> eyre::Result<()> { while let Some(item) = env.queue.recv().await { if let Item::Stream(stream) = item { - if let Some(TestSinkRequests::StreamFrom { low, high }) = stream.payload.try_into().ok() - { + if let Ok(TestSinkRequests::StreamFrom { low, high }) = stream.payload.try_into() { for num in low..high { if stream .sender @@ -57,7 +56,7 @@ pub struct Streaming { impl Streaming { pub async fn next(&mut self) -> eyre::Result> { if let Some(resp) = self.inner.recv().await { - if let Some(TestSinkResponses::Stream(value)) = resp.try_into().ok() { + if let Ok(TestSinkResponses::Stream(value)) = resp.try_into() { return Ok(Some(value)); } diff --git a/geth-engine/src/process/tests/mod.rs b/geth-engine/src/process/tests/mod.rs index 6d7d815..4c67795 100644 --- a/geth-engine/src/process/tests/mod.rs +++ b/geth-engine/src/process/tests/mod.rs @@ -6,7 +6,7 @@ mod writing; #[ctor::ctor] fn test_init() { - let _ = tracing_subscriber::fmt::fmt() + tracing_subscriber::fmt::fmt() .with_max_level(tracing::Level::DEBUG) .with_file(true) .with_line_number(true) diff --git a/geth-engine/src/process/tests/reading.rs b/geth-engine/src/process/tests/reading.rs index 78b7d9c..29bdfe8 100644 --- a/geth-engine/src/process/tests/reading.rs +++ b/geth-engine/src/process/tests/reading.rs @@ -2,7 +2,7 @@ use crate::process::reading::ReaderClient; use crate::process::writing::WriterClient; use crate::process::{start_process_manager, Proc}; use crate::Options; -use geth_common::{Direction, ExpectedRevision, Propose, Record, Revision}; +use geth_common::{Direction, ExpectedRevision, Propose, Revision}; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -42,8 +42,7 @@ async fn test_reader_proc_simple() -> eyre::Result<()> { .success()?; let mut count = 0; - while let Some(entry) = stream.next().await? { - let record: Record = entry.clone().into(); + while let Some(record) = stream.next().await? { let foo = record.as_value::()?; assert_eq!(count, record.revision); assert_eq!(stream_name, record.stream_name); diff --git a/geth-engine/src/process/tests/subscribing.rs b/geth-engine/src/process/tests/subscribing.rs index 24ea860..73bc36c 100644 --- a/geth-engine/src/process/tests/subscribing.rs +++ b/geth-engine/src/process/tests/subscribing.rs @@ -2,7 +2,7 @@ use crate::process::subscription::SubscriptionClient; use crate::process::writing::WriterClient; use crate::process::{start_process_manager, Proc}; use crate::Options; -use geth_common::{ExpectedRevision, Propose, Record}; +use geth_common::{ExpectedRevision, Propose}; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -33,9 +33,8 @@ async fn test_pubsub_proc_simple() -> eyre::Result<()> { .success()?; let mut count = 0; - while let Some(entry) = stream.next().await? { + while let Some(record) = stream.next().await? { tracing::debug!("received entry {}/10", count + 1); - let record: Record = entry.clone().into(); let foo = record.as_value::()?; assert_eq!(count, record.revision); diff --git a/geth-engine/src/process/writing/entries.rs b/geth-engine/src/process/writing/entries.rs index 3dd6a43..860f2dc 100644 --- a/geth-engine/src/process/writing/entries.rs +++ b/geth-engine/src/process/writing/entries.rs @@ -46,11 +46,10 @@ impl LogEntries for ProposeEntries { } fn current_entry_size(&self) -> usize { - 1 - // size_of::() // revision - // + size_of::() // stream name length - // + self.ident.len() // stream name - // + propose_estimate_size(self.current.as_ref().unwrap()) + size_of::() // revision + + size_of::() // stream name length + + self.ident.len() // stream name + + propose_estimate_size(self.current.as_ref().unwrap()) } fn write_current_entry(&mut self, buffer: &mut BytesMut, position: u64) { diff --git a/geth-mikoshi/src/wal/log_reader.rs b/geth-mikoshi/src/wal/log_reader.rs index c3777a8..11adbc3 100644 --- a/geth-mikoshi/src/wal/log_reader.rs +++ b/geth-mikoshi/src/wal/log_reader.rs @@ -67,12 +67,15 @@ where )? .get_u32_le() as usize; - debug_assert_eq!( - record_size, post_record_size, - "pre and post record size don't match!" - ); + if record_size != post_record_size { + eyre::bail!( + "pre and post record size don't match! {} != {}", + record_size, + post_record_size + ); + } - Ok(LogEntry::get(record_bytes)) + LogEntry::try_from(record_bytes) } } diff --git a/geth-mikoshi/src/wal/mod.rs b/geth-mikoshi/src/wal/mod.rs index b349e33..40c7c64 100644 --- a/geth-mikoshi/src/wal/mod.rs +++ b/geth-mikoshi/src/wal/mod.rs @@ -7,6 +7,8 @@ mod log_writer; pub use log_reader::LogReader; pub use log_writer::LogWriter; +pub const LOG_ENTRY_HEADER_SIZE: usize = size_of::() + size_of::(); // position and type + pub trait LogEntries { fn move_next(&mut self) -> bool; fn current_entry_size(&self) -> usize; @@ -49,6 +51,26 @@ impl LogEntry { } } +impl TryFrom for LogEntry { + type Error = eyre::Report; + + fn try_from(mut src: Bytes) -> eyre::Result { + if src.remaining() < LOG_ENTRY_HEADER_SIZE { + eyre::bail!("bytes buffer is too short to contain a valid log entry"); + } + + let position = src.get_u64_le(); + let r#type = src.get_u8(); + let payload = src; + + Ok(Self { + position, + r#type, + payload, + }) + } +} + pub struct LogReceipt { pub start_position: u64, pub next_position: u64, diff --git a/protos/protocol.proto b/protos/protocol.proto index c565771..372bfea 100644 --- a/protos/protocol.proto +++ b/protos/protocol.proto @@ -137,6 +137,7 @@ message OperationOut { ProgramsListed programs_listed = 6; ProgramKilled program_killed = 7; ProgramObtained program_got = 8; + string error = 9; } message AppendStreamCompleted { From eca2ef2870d043cb796cad436fb15dd213360926 Mon Sep 17 00:00:00 2001 From: YoEight Date: Fri, 10 Jan 2025 21:34:18 -0500 Subject: [PATCH 3/3] prevent the index process from crashing when receiving an empty entry vector. --- geth-engine/src/process/indexing/proc.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/geth-engine/src/process/indexing/proc.rs b/geth-engine/src/process/indexing/proc.rs index 82c68b0..7d8ceef 100644 --- a/geth-engine/src/process/indexing/proc.rs +++ b/geth-engine/src/process/indexing/proc.rs @@ -48,6 +48,18 @@ where if let Ok(req) = mail.payload.try_into() { match req { IndexRequests::Store { entries } => { + if entries.is_empty() { + tracing::warn!("empty entries vector received"); + + let _ = env.client.reply( + mail.origin, + mail.correlation, + IndexResponses::Committed.into(), + ); + + continue; + } + let last = entries.last().copied().unwrap(); if let Err(e) = store_entries(&lsm, entries) { tracing::error!("error when storing index entries: {}", e);