-
Notifications
You must be signed in to change notification settings - Fork 39
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
394 additions
and
6 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
// This Source Code Form is subject to the terms of the Mozilla Public | ||
// License, v. 2.0. If a copy of the MPL was not distributed with this | ||
// file, You can obtain one at https://mozilla.org/MPL/2.0/. | ||
|
||
use crate::EreportData; | ||
use omicron_common::api::external::Error; | ||
use omicron_common::api::external::Generation; | ||
use std::collections::VecDeque; | ||
use tokio::sync::{mpsc, oneshot}; | ||
use uuid::Uuid; | ||
|
||
pub(crate) enum ServerReq { | ||
TruncateTo { | ||
seq: Generation, | ||
tx: oneshot::Sender<Result<(), Error>>, | ||
}, | ||
List { | ||
start_seq: Option<Generation>, | ||
limit: usize, | ||
tx: oneshot::Sender<Vec<ereporter_api::Ereport>>, | ||
}, | ||
} | ||
|
||
pub(crate) struct Buffer { | ||
pub(crate) seq: Generation, | ||
pub(crate) buf: VecDeque<ereporter_api::Ereport>, | ||
pub(crate) log: slog::Logger, | ||
pub(crate) id: Uuid, | ||
pub(crate) ereports: mpsc::Receiver<EreportData>, | ||
pub(crate) requests: mpsc::Receiver<ServerReq>, | ||
} | ||
|
||
impl Buffer { | ||
pub(crate) async fn run(mut self) { | ||
while let Some(req) = self.requests.recv().await { | ||
match req { | ||
// Asked to list ereports! | ||
ServerReq::List { start_seq, limit, tx } => { | ||
// First, grab any new ereports and stick them in our cache. | ||
while let Ok(ereport) = self.ereports.try_recv() { | ||
self.push_ereport(ereport); | ||
} | ||
|
||
let mut list = { | ||
let cap = std::cmp::min(limit, self.buf.len()); | ||
Vec::with_capacity(cap) | ||
}; | ||
|
||
match start_seq { | ||
// Start at lowest sequence number. | ||
None => { | ||
list.extend( | ||
self.buf.iter().by_ref().take(limit).cloned(), | ||
); | ||
} | ||
Some(seq) => { | ||
todo!( | ||
"eliza: draw the rest of the pagination {seq}" | ||
) | ||
} | ||
} | ||
slog::info!( | ||
self.log, | ||
"produced ereport batch from {start_seq:?}"; | ||
"start" => ?start_seq, | ||
"len" => list.len(), | ||
"limit" => limit | ||
); | ||
if tx.send(list).is_err() { | ||
slog::warn!(self.log, "client canceled list request"); | ||
} | ||
} | ||
ServerReq::TruncateTo { seq, tx } if seq > self.seq => { | ||
if tx.send(Err(Error::invalid_value( | ||
"seq", | ||
"cannot truncate to a sequence number greater than the current maximum" | ||
))).is_err() { | ||
// If the receiver no longer cares about the response to | ||
// this request, no biggie. | ||
slog::warn!(self.log, "client canceled truncate request"); | ||
} | ||
} | ||
ServerReq::TruncateTo { seq, tx } => { | ||
let prev_len = self.buf.len(); | ||
self.buf.retain(|ereport| ereport.seq <= seq); | ||
|
||
slog::info!( | ||
self.log, | ||
"truncated ereports up to {seq}"; | ||
"seq" => ?seq, | ||
"dropped" => prev_len - self.buf.len(), | ||
"remaining" => self.buf.len(), | ||
); | ||
|
||
if tx.send(Ok(())).is_err() { | ||
// If the receiver no longer cares about the response to | ||
// this request, no biggie. | ||
slog::warn!( | ||
self.log, | ||
"client canceled truncate request" | ||
); | ||
} | ||
todo!() | ||
} | ||
} | ||
} | ||
|
||
slog::info!(self.log, "server requests channel closed, shutting down"); | ||
} | ||
|
||
fn push_ereport(&mut self, ereport: EreportData) { | ||
let EreportData { facts, class, time_created } = ereport; | ||
let seq = self.seq; | ||
self.buf.push_back(ereporter_api::Ereport { | ||
facts, | ||
class, | ||
time_created, | ||
reporter_id: self.id, | ||
seq, | ||
}); | ||
self.seq = self.seq.next(); | ||
slog::trace!( | ||
self.log, | ||
"recorded ereport"; | ||
"seq" => %seq, | ||
); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,83 @@ | ||
use std::collections::BTreeMap; | ||
// This Source Code Form is subject to the terms of the Mozilla Public | ||
// License, v. 2.0. If a copy of the MPL was not distributed with this | ||
// file, You can obtain one at https://mozilla.org/MPL/2.0/. | ||
|
||
pub struct Ereport { | ||
class: String, | ||
facts: BTreeMap<String, String>, | ||
use chrono::DateTime; | ||
use chrono::Utc; | ||
use std::collections::HashMap; | ||
use tokio::sync::mpsc; | ||
|
||
mod buffer; | ||
pub mod server; | ||
|
||
#[must_use = "an `EreportBuilder` does nothing unless `submit()` is called"] | ||
pub struct EreportBuilder { | ||
data: EreportData, | ||
permit: mpsc::OwnedPermit<EreportData>, | ||
} | ||
|
||
impl EreportBuilder { | ||
/// Reserve capacity for at least `facts` facts. | ||
pub fn reserve(&mut self, facts: usize) { | ||
self.data.facts.reserve(facts); | ||
} | ||
|
||
pub fn time_created( | ||
&mut self, | ||
time_created: chrono::DateTime<Utc>, | ||
) -> &mut Self { | ||
self.data.time_created = time_created; | ||
self | ||
} | ||
|
||
pub fn fact( | ||
&mut self, | ||
name: impl ToString, | ||
value: impl ToString, | ||
) -> Option<String> { | ||
self.data.facts.insert(name.to_string(), value.to_string()) | ||
} | ||
|
||
pub fn facts<K, V>( | ||
&mut self, | ||
facts: impl Iterator<Item = (impl ToString, impl ToString)>, | ||
) { | ||
self.data | ||
.facts | ||
.extend(facts.map(|(k, v)| (k.to_string(), v.to_string()))) | ||
} | ||
|
||
pub fn submit(self) { | ||
self.permit.send(self.data); | ||
} | ||
} | ||
|
||
/// A reporter handle used to generate ereports. | ||
#[derive(Clone, Debug)] | ||
pub struct Reporter(pub(crate) tokio::sync::mpsc::Sender<EreportData>); | ||
|
||
impl Reporter { | ||
/// Begin constructing a new ereport, returning an [`EreportBuilder`]. | ||
pub async fn report( | ||
&mut self, | ||
class: impl ToString, | ||
) -> Result<EreportBuilder, ()> { | ||
let time_created = Utc::now(); | ||
let permit = self.0.clone().reserve_owned().await.map_err(|_| ())?; | ||
Ok(EreportBuilder { | ||
data: EreportData { | ||
class: class.to_string(), | ||
time_created, | ||
facts: HashMap::new(), | ||
}, | ||
permit, | ||
}) | ||
} | ||
} | ||
|
||
/// An ereport. | ||
pub(crate) struct EreportData { | ||
pub(crate) class: String, | ||
pub(crate) facts: HashMap<String, String>, | ||
pub(crate) time_created: DateTime<Utc>, | ||
} |
Oops, something went wrong.