Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Report fatal error from the server to client properly. #19

Merged
merged 3 commits into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[profile.default]
slow-timeout = { period = "15s", terminate-after = 1 }
10 changes: 8 additions & 2 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@ jobs:
- name: Build
run: cargo check --all-targets

- name: Install cargo-nextest
uses: baptiste0928/cargo-install@v3
with:
crate: cargo-nextest
locked: true

- name: Run tests
run: cargo test --all-targets
run: cargo nextest run --hide-progress-bar

linting:
needs: build
Expand Down Expand Up @@ -78,4 +84,4 @@ jobs:
run: cargo clippy --all-features -- -D warnings

- name: Format
run: cargo fmt -- --check
run: cargo fmt -- --check
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
target/
.idea/
proptest-regressions/
mutants.out/
2 changes: 0 additions & 2 deletions geth-client-tests/src/delete_tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::u64;

use bytes::Bytes;
use fake::{faker::name::en::Name, Fake};
use geth_client::GrpcClient;
Expand Down
2 changes: 1 addition & 1 deletion geth-client-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub mod tests {

#[ctor::ctor]
fn test_init() {
let _ = tracing_subscriber::fmt::fmt()
tracing_subscriber::fmt::fmt()
.with_env_filter(EnvFilter::new("geth_engine=debug"))
// .with_max_level(tracing::Level::DEBUG)
.with_file(true)
Expand Down
14 changes: 13 additions & 1 deletion geth-client/src/next/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use eyre::bail;
use uuid::Uuid;

use geth_common::generated::next::protocol;
use geth_common::{EndPoint, Operation, OperationIn, OperationOut};
use geth_common::{EndPoint, Operation, OperationIn, OperationOut, Reply};

use crate::next::{connect_to_node, Command, ConnErr, Connection, Mailbox};

Expand Down Expand Up @@ -104,6 +104,18 @@ impl Driver {
tracing::warn!("received an event that is not related to any command");
}

pub fn handle_disconnect(&mut self) {
tracing::warn!("connection was closed by the server");
self.connection = None;

for (correlation, cmd) in self.registry.drain() {
let _ = cmd.resp.send(OperationOut {
correlation,
reply: Reply::ServerDisconnected,
});
}
}

/// We might consider implementing a retry logic here.
async fn connect(&mut self) -> eyre::Result<()> {
let uri = format!("http://{}:{}", self.endpoint.host, self.endpoint.port)
Expand Down
9 changes: 8 additions & 1 deletion geth-client/src/next/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,14 @@ impl Client for GrpcClient {
if let Some(out) = task.recv().await? {
match out {
Reply::AppendStreamCompleted(resp) => Ok(resp),
_ => eyre::bail!("unexpected reply when appending events to '{}'", stream_id),
Reply::Error(e) => {
eyre::bail!("server error when writing to stream '{}': {}", stream_id, e)
}
x => eyre::bail!(
"unexpected reply when appending events to '{}' = {:?}",
stream_id,
x
),
}
} else {
eyre::bail!(
Expand Down
14 changes: 11 additions & 3 deletions geth-client/src/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod grpc;
pub enum Msg {
Command(Command),
Event(OperationOut),
Disconnected,
}

#[derive(Clone)]
Expand Down Expand Up @@ -48,18 +49,21 @@ pub(crate) async fn connect_to_node(uri: &Uri, mailbox: Mailbox) -> Result<Conne
while let Some(reply) = stream_response.next().await {
match reply {
Err(e) => {
tracing::error!("Error receiving response: {:?}", e);
break;
tracing::error!("error receiving response: {:?}", e);
// TODO - Needs find a way to handle unexpected errors so we avoid
// stalled operations.
}

Ok(out) => {
if mailbox.send(Msg::Event(out.into())).is_err() {
tracing::warn!("seems main connection is closed");
break;
return;
}
}
}
}

let _ = mailbox.send(Msg::Disconnected);
});

Ok(connection)
Expand All @@ -75,6 +79,10 @@ pub(crate) async fn multiplex_loop(mut driver: Driver, mut receiver: UnboundedRe
}

Msg::Event(event) => driver.handle_event(event),

Msg::Disconnected => {
driver.handle_disconnect();
}
}
}
}
2 changes: 2 additions & 0 deletions geth-common/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ use crate::{
SubscriptionConfirmation,
};

#[derive(Debug)]
pub enum SubscriptionEvent {
EventAppeared(Record),
Confirmed(SubscriptionConfirmation),
CaughtUp,
Unsubscribed(UnsubscribeReason),
}

#[derive(Debug)]
pub enum UnsubscribeReason {
User,
Server,
Expand Down
17 changes: 16 additions & 1 deletion geth-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl From<next::protocol::Ident> for Uuid {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct EndPoint {
pub host: String,
pub port: u16,
Expand Down Expand Up @@ -113,6 +113,7 @@ impl From<next::protocol::OperationIn> for OperationIn {
}
}

#[derive(Debug)]
pub enum Reply {
AppendStreamCompleted(AppendStreamCompleted),
StreamRead(StreamRead),
Expand All @@ -121,6 +122,8 @@ pub enum Reply {
ProgramsListed(ProgramListed),
ProgramKilled(ProgramKilled),
ProgramObtained(ProgramObtained),
ServerDisconnected,
Error(String),
}

pub struct OperationOut {
Expand Down Expand Up @@ -157,6 +160,7 @@ impl From<next::protocol::OperationOut> for OperationOut {
operation_out::Operation::ProgramKilled(resp) => Reply::ProgramKilled(resp.into()),

operation_out::Operation::ProgramGot(resp) => Reply::ProgramObtained(resp.into()),
operation_out::Operation::Error(e) => Reply::Error(e),
};

Self { correlation, reply }
Expand All @@ -165,6 +169,7 @@ impl From<next::protocol::OperationOut> for OperationOut {

impl TryFrom<OperationOut> for next::protocol::OperationOut {
type Error = eyre::Report;

fn try_from(value: OperationOut) -> eyre::Result<Self> {
let correlation = Some(value.correlation.into());
let operation = match value.reply {
Expand All @@ -183,6 +188,12 @@ impl TryFrom<OperationOut> for next::protocol::OperationOut {
Reply::ProgramKilled(resp) => operation_out::Operation::ProgramKilled(resp.into()),

Reply::ProgramObtained(resp) => operation_out::Operation::ProgramGot(resp.into()),

Reply::ServerDisconnected => {
eyre::bail!("not supposed to send server disconnected message to the server");
}

Reply::Error(e) => operation_out::Operation::Error(e),
};

Ok(Self {
Expand Down Expand Up @@ -1074,6 +1085,7 @@ impl<A> ReadStreamCompleted<A> {
}
}

#[derive(Debug)]
pub enum StreamRead {
EndOfStream,
EventAppeared(Record),
Expand Down Expand Up @@ -1116,6 +1128,7 @@ impl TryFrom<StreamRead> for operation_out::StreamRead {
}
}

#[derive(Debug)]
pub enum SubscriptionConfirmation {
StreamName(String),
ProcessId(Uuid),
Expand Down Expand Up @@ -1205,6 +1218,7 @@ impl From<operation_out::subscription_event::Error> for SubscriptionError {
}
}

#[derive(Debug)]
pub enum DeleteStreamCompleted {
Success(WriteResult),
Error(DeleteError),
Expand All @@ -1220,6 +1234,7 @@ impl DeleteStreamCompleted {
}
}

#[derive(Debug)]
pub enum DeleteError {
StreamDeleted,
WrongExpectedRevision(WrongExpectedRevisionError),
Expand Down
5 changes: 2 additions & 3 deletions geth-consensus/src/tests/storage/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ impl PersistentStorage for InMemStorage {

self.inner
.iter()
.find(|e| e.index == entry_id.index && e.term == entry_id.term)
.is_some()
.any(|e| e.index == entry_id.index && e.term == entry_id.term)
}
}

Expand All @@ -81,7 +80,7 @@ struct InMemIter<'a> {
init: bool,
}

impl<'a> IterateEntries for InMemIter<'a> {
impl IterateEntries for InMemIter<'_> {
fn next(&mut self) -> std::io::Result<Option<Entry>> {
if self.count >= self.limit {
return Ok(None);
Expand Down
10 changes: 6 additions & 4 deletions geth-domain/src/index/tests/fs/lsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ fn test_fs_lsm_mem_table_scan() -> io::Result<()> {
/// When scanning, it will access to sstables.
#[test]
fn test_fs_lsm_sync() -> io::Result<()> {
let mut setts = LsmSettings::default();
setts.mem_table_max_size = MEM_TABLE_ENTRY_SIZE;
let setts = LsmSettings {
mem_table_max_size: MEM_TABLE_ENTRY_SIZE,
..Default::default()
};

let temp = TempDir::default();
let root = PathBuf::from(temp.as_ref());
Expand Down Expand Up @@ -135,8 +137,8 @@ fn test_fs_lsm_serialization() -> io::Result<()> {

assert_eq!(lsm.logical_position, actual.logical_position);

let actual_table_1 = actual.levels.get(&0).unwrap().front().clone().unwrap();
let actual_table_2 = actual.levels.get(&1).unwrap().front().clone().unwrap();
let actual_table_1 = actual.levels.get(&0).unwrap().front().unwrap();
let actual_table_2 = actual.levels.get(&1).unwrap().front().unwrap();

assert_eq!(table1.id, actual_table_1.id);
assert_eq!(table1.metas, actual_table_1.metas);
Expand Down
2 changes: 1 addition & 1 deletion geth-domain/src/index/tests/fs/ss_table.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::io;
use std::path::PathBuf;
use std::{io, u64};

use temp_testdir::TempDir;

Expand Down
2 changes: 1 addition & 1 deletion geth-domain/src/index/tests/in_mem/block.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{io, u64};
use std::io;

use geth_mikoshi::InMemoryStorage;

Expand Down
8 changes: 5 additions & 3 deletions geth-domain/src/index/tests/in_mem/lsm.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{io, u64};
use std::io;

use geth_common::IteratorIO;
use geth_mikoshi::InMemoryStorage;
Expand Down Expand Up @@ -107,8 +107,10 @@ fn test_in_mem_lsm_mem_table_scan_backward() -> io::Result<()> {
/// When scanning, it will access to in-mem sstables.
#[test]
fn test_in_mem_lsm_sync() -> io::Result<()> {
let mut setts = LsmSettings::default();
setts.mem_table_max_size = MEM_TABLE_ENTRY_SIZE;
let setts = LsmSettings {
mem_table_max_size: MEM_TABLE_ENTRY_SIZE,
..Default::default()
};

let mut lsm = Lsm::new(setts, InMemoryStorage::new());

Expand Down
2 changes: 1 addition & 1 deletion geth-domain/src/index/tests/in_mem/mem_table.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{io, u64};
use std::io;

use geth_common::IteratorIO;
use geth_mikoshi::InMemoryStorage;
Expand Down
2 changes: 1 addition & 1 deletion geth-domain/src/index/tests/in_mem/ss_table.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{io, u64};
use std::io;

use geth_common::IteratorIO;
use geth_mikoshi::InMemoryStorage;
Expand Down
Loading
Loading