Skip to content

Commit

Permalink
add an explicit timeout for executor heartbeats (#613)
Browse files Browse the repository at this point in the history
Add an explicit timeout to detect improperly functioning executor which
fails to send heartbeats regularly but keeps rpc connection.
  • Loading branch information
maxkozlovsky authored May 22, 2024
1 parent 2f3eb50 commit 83bdfda
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions src/coordinator_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
Arc,
},
task::{Context, Poll},
time::Duration,
};

use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -91,11 +92,12 @@ use tokio::{
watch::{self, Receiver, Sender},
},
task::JoinHandle,
time::timeout,
};
use tokio_stream::{wrappers::ReceiverStream, Stream};
use tonic::{body::BoxBody, Request, Response, Status, Streaming};
use tower::{Layer, Service, ServiceBuilder};
use tracing::{error, info, Instrument};
use tracing::{error, info, warn, Instrument};

use crate::{
api::IndexifyAPIError,
Expand Down Expand Up @@ -133,6 +135,9 @@ impl<'a> Extractor for MetadataMap<'a> {
}
}

// How often we expect the executor to send us heartbeats.
const EXECUTOR_HEARTBEAT_PERIOD: Duration = Duration::new(5, 0);

impl CoordinatorServiceServer {
fn create_extraction_policies_for_graph(
&self,
Expand Down Expand Up @@ -572,7 +577,9 @@ impl CoordinatorService for CoordinatorServiceServer {
info!("shutting down server, stopping heartbeats from executor: {:?}", executor_id);
break;
}
frame = in_stream.next() => {
result = timeout(EXECUTOR_HEARTBEAT_PERIOD * 3, in_stream.next()) => {
match result {
Ok(frame) => {
// Ensure the frame has something
if frame.as_ref().is_none() {
break;
Expand Down Expand Up @@ -610,7 +617,12 @@ impl CoordinatorService for CoordinatorServiceServer {
}
}
}

}
Err(_) => {
warn!("heartbeat timed out, stopping executor: {:?}", executor_id);
break;
}
}
}
}
}
Expand Down

0 comments on commit 83bdfda

Please sign in to comment.