Skip to content

Commit

Permalink
feat(inclusion_monitor): rewrite query and parse
Browse files Browse the repository at this point in the history
Rewrites how we query and parse the result from Loki. This version is
more reliable and can expect only a single log line result.
  • Loading branch information
alextes committed Oct 10, 2023
1 parent aee1976 commit cb27301
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 284 deletions.
78 changes: 41 additions & 37 deletions src/phoenix/inclusion_monitor/loki_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,46 @@ pub struct PayloadLogStats {
pub request_download_duration_ms: i64,
}

fn date_time_from_timestamp(
request_finished_log: &serde_json::Value,
key: &str,
) -> anyhow::Result<DateTime<Utc>> {
request_finished_log[key]
.as_str()
fn extract_date_time(json_map: &serde_json::Value, key: &str) -> anyhow::Result<DateTime<Utc>> {
json_map
.get(key)
.and_then(|timestamp| timestamp.as_str())
.and_then(|timestamp| timestamp.parse::<i64>().ok())
.and_then(|timestamp| Utc.timestamp_millis_opt(timestamp).single())
.with_context(|| format!("failed to parse {key} as timestamp from payload log"))
.with_context(|| {
format!(
"failed to parse {} as timestamp from payload published log",
key
)
})
}

/// Takes the payload logs response and extracts the first log line. We rely on the query being
/// crafted in such a way that the oldest matching result is the one we're looking for.
fn parse_log_response(text: &str) -> anyhow::Result<Option<PayloadLogStats>> {
let request_finished_log: serde_json::Value = {
let log_data: serde_json::Value = serde_json::from_str(text)
let log_response_json: serde_json::Value = serde_json::from_str(text)
.context("failed to parse payload log request body as JSON")?;

// This is the array of parsed log lines and their raw values.
let results = log_data["data"]["result"]
.as_array()
.context("expected at least one log line in payload logs response")?;

let log_data = results
.iter()
.find(|result| {
let stream = &result["stream"];
let msg = stream["msg"].as_str().unwrap_or("");
msg.contains("request finished")
})
.map(|result| &result["stream"])
.cloned();

match log_data {
Some(log_data) => log_data,
// If there are no logs, we stop here.
None => return Ok(None),
}
// See the tests for an example of the data.
log_response_json
// The rest is response metadata
.get("data")
// These are the lines, the rest is metadata about the lines
.and_then(|data| data.get("result"))
.and_then(|result| result.as_array())
.and_then(|lines| lines.iter().next())
// Each of these lines is expected to contain parsed JSON values in the `stream` field and
// raw values in the `values` field.
.and_then(|line| line.get("stream"))
.context("failed to get stream from payload published log")
.cloned()?
};

let received_at = date_time_from_timestamp(&request_finished_log, "timestampRequestStart")?;
let decoded_at = date_time_from_timestamp(&request_finished_log, "timestampAfterDecode")?;
let pre_publish_at =
date_time_from_timestamp(&request_finished_log, "timestampBeforePublishing")?;
let post_publish_at =
date_time_from_timestamp(&request_finished_log, "timestampAfterPublishing")?;
let received_at = extract_date_time(&request_finished_log, "timestampRequestStart")?;
let decoded_at = extract_date_time(&request_finished_log, "timestampAfterDecode")?;
let pre_publish_at = extract_date_time(&request_finished_log, "timestampBeforePublishing")?;
let post_publish_at = extract_date_time(&request_finished_log, "timestampAfterPublishing")?;
let decoded_at_slot_age_ms = request_finished_log["msIntoSlot"]
.as_str()
.and_then(|s| s.parse::<i64>().ok())
Expand Down Expand Up @@ -98,12 +94,20 @@ impl LokiClient {
}

pub async fn payload_logs(&self, slot: &i32) -> anyhow::Result<Option<PayloadLogStats>> {
let query = format!(r#"{{app="proposer-api"}} |= `"slot":{slot}` | json"#);
let query = format!(
r#"{{app="proposer-api"}} |= `"slot":{slot}` |= "block published through beacon node" | json"#
);
let since = "24h";

let url = format!("{}/loki/api/v1/query_range", self.server_url);
let url_with_params =
Url::parse_with_params(&url, &[("query", query.as_str()), ("since", since)])?;
let url_with_params = Url::parse_with_params(
&url,
&[
("direction", "forward"),
("query", query.as_str()),
("since", since),
],
)?;

let response = self.client.get(url_with_params).send().await?;
let body = response.text().await?;
Expand Down
Loading

0 comments on commit cb27301

Please sign in to comment.