-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Invoker #88
Invoker #88
Conversation
a7fad7b
to
53bd2d4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @slinkydeveloper. This looks like a really good piece of work :-)
My main question is around the unbounded channels within the invoker. I think that this is something we should have an answer for (not necessarily addressing it right away) because it might affect stability.
Another question is around the handling of stored entry acks. I assume that this part is not fully implemented yet, right?
I noticed that you expect an empty Completion
for a Custom
journal entry? So far I thought that the partition processor would send for every journal entry an stored entry ack which could be used to signal that a Custom
entry has been persisted (e.g. a side effect entry). This is probably something we need to align on.
src/invoker/src/lib.rs
Outdated
} | ||
|
||
pub trait ServiceEndpointRegistry { | ||
fn resolve_endpoint(&self, service_name: &str) -> Option<EndpointMetadata>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering whether this needs to become eventually asynchronous because in the distributed setup we will probably have to talk to the meta component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the time being I would keep it synchronous, as we still need to unfold this discussion: #91
Making this async opens up another can of worms, such as do you block the whole invoker waiting on the meta to give you the endpoint metadata? Also, in that case, would it make sense for the partition processor to directly resolve the EndpointMetadata
and pass them when creating invoke messages for the invoker?
src/invoker/src/invocation_task.rs
Outdated
response_res = &mut req_fut, if response.is_none() => { | ||
response = Some( | ||
response_res?? | ||
); | ||
}, | ||
Some(je) = journal_stream.next() => { | ||
self.write(http_stream_tx, ProtocolMessage::UnparsedEntry(je)).await?; | ||
self.next_journal_index += 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, this parallelizes the request initiation and data sending if the underlying connection supports buffering, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the only reason. In one of the iterations of the protocol I figured out that I could send the response header frame as soon as I get the request, without waiting for the journal to be sent over. This could provide faster feedback to the invoker, for example when connection cannot be established, or when a service is not known. For the latter, we should move the status check from the run_internal
to the match arm of response_res
.
Because it is not mandated in the protocol spec when the sdk writes the response headers, we simply don't make any assumption here.
Perhaps another point here is that hyper doesn't split the "request buffer ready" and "response header frame received" in two separate futures, which requires a bit of playing around with spawning a new tokio task. Hopefully this will get better with hyper 1.0, where most likely we'll have more control of http frames.
@tillrohrmann For the discussion about bounded/unbounded channels, i've opened an issue here #92 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @slinkydeveloper. I think there are two bigger problems we have to tackle at some point.
- Make sure that
InvocationTask
does not leak resources - Rethink propagation of journal revision to the service endpoints in order to make it work (I guess this would require a protocol change)
src/invoker/src/invoker.rs
Outdated
// TODO we don't need this field right? | ||
#[allow(dead_code)] | ||
journal_revision: JournalRevision, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need this field. I have removed it in #89.
}; | ||
use super::{EndpointMetadata, InvokeInputJournal, JournalMetadata, JournalReader, ProtocolType}; | ||
|
||
#[allow(clippy::declare_interior_mutable_const)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it a false positive? Is it because Bytes
is classified wrongly and does not expose interior mutability?
src/invoker/src/invocation_task.rs
Outdated
let buf = self.encoder.encode(msg); | ||
|
||
if let Err(hyper_err) = http_stream_tx.send_data(buf).await { | ||
// is_closed() can happen only with sender's channel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is sender's channel
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.rs/hyper/latest/hyper/struct.Error.html#method.is_closed It's the body sender channel. Essentially is_closed()
means the request channel is closed. This is fine to ignore, as it simply means we cannot write the request anymore, because perhaps the sdk wants to suspend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for the clarification. Can you update the comment to reflect this? E.g. "is_closed is only true iff the request channel has been closed. This can happen if the service endpoint wants to suspend."
843bfde
to
05ff67d
Compare
@tillrohrmann I rebased on main, and addressed all your comments. This is ready for another pass. I also included a commit to remove |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @slinkydeveloper. The changes look good to me. +1 for merging after resolving my last minor comments.
}; | ||
use super::{EndpointMetadata, InvokeInputJournal, JournalMetadata, JournalReader, ProtocolType}; | ||
|
||
#[allow(clippy::declare_interior_mutable_const)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then let's add this to the comment that we suspect that it is because of the Bytes
.
src/invoker/src/invocation_task.rs
Outdated
let buf = self.encoder.encode(msg); | ||
|
||
if let Err(hyper_err) = http_stream_tx.send_data(buf).await { | ||
// is_closed() can happen only with sender's channel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for the clarification. Can you update the comment to reflect this? E.g. "is_closed is only true iff the request channel has been closed. This can happen if the service endpoint wants to suspend."
src/invoker/src/invoker.rs
Outdated
break; | ||
} | ||
} | ||
} | ||
|
||
// Wait for all the tasks to shutdown | ||
if tokio::time::timeout(shutdown_timeout, invocation_tasks.shutdown()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we pushing the timeout into the invoker
? Wouldn't it be enough that someone who is waiting on the task to complete adds a timeout (like the owner of the invoker)?
src/worker/src/lib.rs
Outdated
@@ -27,6 +29,8 @@ type PartitionProcessor = partition::PartitionProcessor< | |||
>; | |||
type TargetedFsmCommand = Targeted<partition::Command>; | |||
|
|||
const INVOKER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The restate binary will add a timeout for the shutdown https://github.com/restatedev/restate/blob/main/src/restate/src/main.rs#L47. That's why I believe that we don't need this timeout here.
@@ -27,12 +27,12 @@ pub enum ErrorKind { | |||
macro_rules! match_decode { | |||
($ty:expr, $buf:expr, { $($variant:ident),* }) => { | |||
match $ty { | |||
$(EntryType::$variant => paste::paste! { | |||
$(RawEntryHeader::$variant { .. } => paste::paste! { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good simplification. Thanks for fixing it :-)
Define the Invoker interface with InvocationTask (tasks reading and writing single invocation streams).
05ff67d
to
1e01c52
Compare
…9437c..552361ef 552361ef handlerType -> ty f38562fd Rename entries and manifest to have names closer to the concept names we use everywhere else (restatedev#88) git-subtree-dir: crates/service-protocol/service-protocol git-subtree-split: 552361ef45340173e8beb361e35fb7b6e5d6effe
…9437c..552361ef 552361ef handlerType -> ty f38562fd Rename entries and manifest to have names closer to the concept names we use everywhere else (#88) git-subtree-dir: crates/service-protocol/service-protocol git-subtree-split: 552361ef45340173e8beb361e35fb7b6e5d6effe
Depends on #80, Fix #82.
This PR implements the basic logic of the invoker:
InvocationStateMachine
assigned to it, handling the different states of the invocation:InvocationTask
is created, running in a separate tokio task. This is where we implement the protocol network logic:InvocationTask
is composed by 3 loops: write only, when replaying the journal and waiting for response headers, read and write when both completions are sent from invoker main loop and when new entries are available from the service endpoint, and read only when the completions channel is closed. In the request response mode, we will be able to reuse the same code simply disabling the completions channel.Encoder
/Decoder
from Introduce invoker::message module #80InvocationTask
ends either after the http stream(s) are closed gracefully, or after a network failure, sending a signal back to the invoker, including last known journal revision and journal entry.InvocationTask
s, eventually applying state changes toInvocationStateMachine
, sending completions to singleInvocationTask
s, or cleaning up internal state.InvocationTask
might do that as well, when we'll reintroduce theSuspensionMessage
.This PR also includes:
test_utils
from the PoC runtime.Counter
example