Skip to content

Commit

Permalink
[Bifrost] Sequencer gives priority to GetSequencerState requests
Browse files Browse the repository at this point in the history
Those requests are really important to be handled quickly, we are doing two changes in this PR:
- Making expensive to run GetSequencerState operations not block the network loop
- Give priority to handling those request vs appends

```
// intentionally empty
```
  • Loading branch information
AhmedSoliman committed Feb 8, 2025
1 parent 9ebcceb commit 494c7f8
Showing 1 changed file with 47 additions and 36 deletions.
83 changes: 47 additions & 36 deletions crates/bifrost/src/providers/replicated_loglet/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl RequestPump {
router_builder: &mut MessageRouterBuilder,
) -> Self {
// todo(asoli) read from opts
let queue_length = 10;
let queue_length = 128;
let append_stream = router_builder.subscribe_to_stream(queue_length);
let get_sequencer_state_stream = router_builder.subscribe_to_stream(queue_length);
Self {
Expand All @@ -104,15 +104,16 @@ impl RequestPump {
let mut cancel = std::pin::pin!(cancellation_watcher());
loop {
tokio::select! {
biased;
_ = &mut cancel => {
break;
}
Some(append) = self.append_stream.next() => {
self.handle_append(&provider, append).await;
}
Some(get_sequencer_state) = self.get_sequencer_state_stream.next() => {
self.handle_get_sequencer_state(&provider, get_sequencer_state).await;
}
Some(append) = self.append_stream.next() => {
self.handle_append(&provider, append).await;
}
}
}

Expand Down Expand Up @@ -158,41 +159,51 @@ impl RequestPump {
return;
}

let tail = if msg.force_seal_check {
match loglet
.find_tail_inner(FindTailOptions::ForceSealCheck)
.await
{
Ok(tail) => tail,
Err(err) => {
let failure = SequencerState {
header: CommonResponseHeader {
known_global_tail: None,
sealed: None,
status: SequencerStatus::Error {
retryable: true,
message: err.to_string(),
if msg.force_seal_check {
let _ = TaskCenter::spawn(TaskKind::Disposable, "remote-check-seal", async move {
match loglet
.find_tail_inner(FindTailOptions::ForceSealCheck)
.await
{
Ok(tail) => {
let sequencer_state = SequencerState {
header: CommonResponseHeader {
known_global_tail: Some(tail.offset()),
sealed: Some(tail.is_sealed()),
status: SequencerStatus::Ok,
},
};
let _ = reciprocal.prepare(sequencer_state).try_send();
}
Err(err) => {
let failure = SequencerState {
header: CommonResponseHeader {
known_global_tail: None,
sealed: None,
status: SequencerStatus::Error {
retryable: true,
message: err.to_string(),
},
},
},
};
let _ = reciprocal.prepare(failure).try_send();
return;
};
let _ = reciprocal.prepare(failure).try_send();
}
}
}
Ok(())
});
} else {
// if we are not forced to check the seal, we can just return the last known tail from the
// sequencer's view
loglet.last_known_global_tail()
};

let sequencer_state = SequencerState {
header: CommonResponseHeader {
known_global_tail: Some(tail.offset()),
sealed: Some(tail.is_sealed()),
status: SequencerStatus::Ok,
},
};
let _ = reciprocal.prepare(sequencer_state).try_send();
let tail = loglet.last_known_global_tail();
let sequencer_state = SequencerState {
header: CommonResponseHeader {
known_global_tail: Some(tail.offset()),
sealed: Some(tail.is_sealed()),
status: SequencerStatus::Ok,
},
};
let _ = reciprocal.prepare(sequencer_state).try_send();
}
}

/// Infallible handle_append method
Expand Down Expand Up @@ -265,7 +276,7 @@ impl RequestPump {
return Err(SequencerStatus::LogletIdMismatch);
}

match self.create_loglet(provider, header).await {
match self.create_loglet(provider, header) {
Ok(loglet) => return Ok(loglet),
Err(SequencerStatus::UnknownLogId | SequencerStatus::UnknownSegmentIndex) => {
// possible outdated metadata
Expand Down Expand Up @@ -315,7 +326,7 @@ impl RequestPump {
}
}

async fn create_loglet<T: TransportConnect>(
fn create_loglet<T: TransportConnect>(
&self,
provider: &ReplicatedLogletProvider<T>,
header: &CommonRequestHeader,
Expand Down

0 comments on commit 494c7f8

Please sign in to comment.