Skip to content

Commit

Permalink
Merge pull request #533 from splitgraph/sync-metrics
Browse files Browse the repository at this point in the history
Add sync metrics
  • Loading branch information
gruuya authored Jun 21, 2024
2 parents b3fba9d + 380c79f commit f07c74f
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 9 deletions.
119 changes: 119 additions & 0 deletions src/frontend/flight/sync/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use crate::frontend::flight::sync::writer::{Origin, SequenceNumber};
use metrics::{
counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram,
Counter, Gauge, Histogram,
};

const REQUEST_BYTES: &str = "seafowl_changeset_writer_request_bytes_total";
const REQUEST_ROWS: &str = "seafowl_changeset_writer_request_rows_total";
const IN_MEMORY_BYTES: &str = "seafowl_changeset_writer_in_memory_bytes_current";
const IN_MEMORY_ROWS: &str = "seafowl_changeset_writer_in_memory_rows_current";
const IN_MEMORY_OLDEST: &str =
"seafowl_changeset_writer_in_memory_oldest_timestamp_seconds";
const COMPACTION_TIME: &str = "seafowl_changeset_writer_compaction_time_seconds";
const COMPACTED_BYTES: &str = "seafowl_changeset_writer_compacted_bytes_total";
const COMPACTED_ROWS: &str = "seafowl_changeset_writer_compacted_rows_total";
const FLUSH_TIME: &str = "seafowl_changeset_writer_flush_time_seconds";
const FLUSH_BYTES: &str = "seafowl_changeset_writer_flush_bytes_total";
const FLUSH_ROWS: &str = "seafowl_changeset_writer_flush_rows_total";
const FLUSH_LAST: &str =
"seafowl_changeset_writer_last_successful_flush_timestamp_seconds_current";
const FLUSH_LAG: &str = "seafowl_changeset_writer_flush_lag_seconds";
const SEQUENCE_DURABLE: &str = "seafowl_changeset_writer_sequence_durable_bytes";
const SEQUENCE_MEMORY: &str = "seafowl_changeset_writer_sequence_memory_bytes";

#[derive(Clone)]
pub struct SyncMetrics {
pub request_bytes: Counter,
pub request_rows: Counter,
pub in_memory_bytes: Gauge,
pub in_memory_rows: Gauge,
pub in_memory_oldest: Gauge,
pub compaction_time: Histogram,
pub compacted_bytes: Counter,
pub compacted_rows: Counter,
pub flush_time: Histogram,
pub flush_bytes: Counter,
pub flush_rows: Counter,
pub flush_last: Gauge,
pub flush_lag: Histogram,
}

impl Default for SyncMetrics {
fn default() -> Self {
Self::new()
}
}

impl SyncMetrics {
fn new() -> Self {
describe_counter!(
REQUEST_BYTES,
"The total byte size of all batches in the sync message"
);
describe_counter!(
REQUEST_ROWS,
"The total row count of of all batches in the sync message"
);
describe_gauge!(
IN_MEMORY_BYTES,
"The total byte size of all pending batches in memory"
);
describe_gauge!(
IN_MEMORY_ROWS,
"The total row count of all pending batches in memory"
);
describe_gauge!(
IN_MEMORY_OLDEST,
"The timestamp of the oldest pending change set in memory"
);
describe_histogram!(
COMPACTION_TIME,
"The time taken to compact a single sync message"
);
describe_counter!(
COMPACTED_BYTES,
"The reduction in byte size due to batch compaction"
);
describe_counter!(
COMPACTED_ROWS,
"The reduction in row count due to batch compaction"
);
describe_histogram!(FLUSH_TIME, "The time taken to flush a collections of syncs");
describe_counter!(FLUSH_BYTES, "The total byte size flushed");
describe_counter!(FLUSH_ROWS, "The total row count flushed");
describe_counter!(FLUSH_LAST, "The timestamp of the last successful flush");
describe_counter!(
FLUSH_LAG,
"The total time between the first queued change set and flush"
);
describe_gauge!(SEQUENCE_DURABLE, "The durable sequence number per origin");
describe_gauge!(SEQUENCE_MEMORY, "The memory sequence number per origin");

Self {
request_bytes: counter!(REQUEST_BYTES),
request_rows: counter!(REQUEST_ROWS),
in_memory_bytes: gauge!(IN_MEMORY_BYTES),
in_memory_rows: gauge!(IN_MEMORY_ROWS),
in_memory_oldest: gauge!(IN_MEMORY_OLDEST),
compaction_time: histogram!(COMPACTION_TIME),
compacted_bytes: counter!(COMPACTED_BYTES),
compacted_rows: counter!(COMPACTED_ROWS),
flush_time: histogram!(FLUSH_TIME),
flush_bytes: counter!(FLUSH_BYTES),
flush_rows: counter!(FLUSH_ROWS),
flush_last: gauge!(FLUSH_LAST),
flush_lag: histogram!(FLUSH_LAG),
}
}

pub fn sequence_durable(&self, origin: &Origin, sequence: SequenceNumber) {
let sequence_durable = gauge!(SEQUENCE_DURABLE, "origin" => origin.to_string());
sequence_durable.set(sequence as f64);
}

pub fn sequence_memory(&self, origin: &Origin, sequence: SequenceNumber) {
let sequence_memory = gauge!(SEQUENCE_MEMORY, "origin" => origin.to_string());
sequence_memory.set(sequence as f64);
}
}
1 change: 1 addition & 0 deletions src/frontend/flight/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod metrics;
pub mod schema;
mod utils;
pub(crate) mod writer;
Expand Down
89 changes: 80 additions & 9 deletions src/frontend/flight/sync/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@ use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::ops::Not;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use tracing::{debug, info};

use crate::context::delta::plan_to_object_store;

use crate::context::SeafowlContext;
use crate::frontend::flight::handler::SEAFOWL_SYNC_DATA_SEQUENCE_NUMBER;
use crate::frontend::flight::sync::metrics::SyncMetrics;
use crate::frontend::flight::sync::schema::SyncSchema;
use crate::frontend::flight::sync::utils::{compact_batches, construct_qualifier};

type Origin = u64;
type SequenceNumber = u64;
pub(super) type Origin = u64;
pub(super) type SequenceNumber = u64;
const SYNC_REF: &str = "sync_data";
const SYNC_JOIN_COLUMN: &str = "__sync_join";

Expand Down Expand Up @@ -86,6 +87,8 @@ pub(crate) struct SeafowlDataSyncWriter {
origin_memory: HashMap<Origin, SequenceNumber>,
// Map of known durable sequence numbers per origin
origin_durable: HashMap<Origin, SequenceNumber>,
// Keep track of various metrics for observability
metrics: SyncMetrics,
}

// A struct tracking relevant information about a single transaction/sequence from a single origin
Expand All @@ -104,6 +107,8 @@ struct DataSyncSequence {
pub(super) struct DataSyncCollection {
// Total in-memory size of all the batches in all the items for this table
size: usize,
// Total in-memory rows of all the batches in all the items for this table
rows: usize,
// Unix epoch of the first sync command in this collection
insertion_time: u64,
// Table log store
Expand Down Expand Up @@ -134,6 +139,7 @@ impl SeafowlDataSyncWriter {
size: 0,
origin_memory: Default::default(),
origin_durable: Default::default(),
metrics: Default::default(),
}
}

Expand Down Expand Up @@ -183,8 +189,22 @@ impl SeafowlDataSyncWriter {
})
.or_insert(IndexMap::from([(sequence_number, sequence)]));

// Compactify the batches and measure the time it took and the reduction in rows/size
let (old_size, old_rows) = batches.iter().fold((0, 0), |(size, rows), batch| {
(
size + batch.get_array_memory_size(),
rows + batch.num_rows(),
)
});
self.metrics.request_bytes.increment(old_size as u64);
self.metrics.request_rows.increment(old_rows as u64);
let start = Instant::now();
let batch = compact_batches(&sync_schema, batches)?;
let duration = start.elapsed().as_secs();

// Get new size and row count
let size = batch.get_array_memory_size();
let rows = batch.num_rows();

// If there's no delta table at this location yet create one first.
if !log_store.is_delta_table_location().await? {
Expand All @@ -203,27 +223,43 @@ impl SeafowlDataSyncWriter {
.and_modify(|entry| {
entry.syncs.push(item.clone());
entry.size += size;
entry.rows += rows;
})
.or_insert(DataSyncCollection {
size,
rows,
insertion_time: now(),
log_store,
syncs: vec![item],
});

// Update the total size
// Update the total size and metrics
self.size += size;
self.metrics.in_memory_bytes.increment(size as f64);
self.metrics.in_memory_rows.increment(rows as f64);
self.metrics.compaction_time.record(duration as f64);
self.metrics
.compacted_bytes
.increment((old_size - size) as u64);
self.metrics
.compacted_rows
.increment((old_rows - rows) as u64);
self.metrics.in_memory_oldest.set(
self.syncs
.first()
.map(|(_, v)| v.insertion_time as f64)
.unwrap_or(0.0),
);

// Flag the sequence as volatile persisted for this origin if it is the last sync command
if last {
self.origin_memory.insert(origin, sequence_number);
}
self.metrics.sequence_memory(&origin, sequence_number);
// TODO: (when) shsould we be removing the memory sequence number?

while self.flush_ready() {
// TODO: do out-of-band
self.flush_syncs().await?;
self.origin_memory.insert(origin, sequence_number);
}

self.flush().await?;
Ok(self.stored_sequences(&origin))
}

Expand All @@ -250,6 +286,22 @@ impl SeafowlDataSyncWriter {
.await?)
}

pub async fn flush(&mut self) -> Result<()> {
while self.flush_ready() {
// TODO: do out-of-band
self.flush_syncs().await?;
}

self.metrics.in_memory_oldest.set(
self.syncs
.first()
.map(|(_, v)| v.insertion_time as f64)
.unwrap_or(0.0),
);

Ok(())
}

// Criteria for return the cached entry ready to be persisted to storage.
// First flush any records that are explicitly beyond the configured max
// lag, followed by further entries if we're still above max cache size.
Expand Down Expand Up @@ -278,6 +330,10 @@ impl SeafowlDataSyncWriter {
}
};

let start = Instant::now();
let insertion_time = entry.insertion_time;
let rows = entry.rows;
let size = entry.size;
let url = url.clone();
let log_store = entry.log_store.clone();

Expand Down Expand Up @@ -380,6 +436,19 @@ impl SeafowlDataSyncWriter {
self.remove_sync(&url);
self.advance_durable();

// Record flush metrics
let flush_duration = start.elapsed().as_millis();
self.metrics.flush_time.record(flush_duration as f64);
self.metrics.flush_bytes.increment(size as u64);
self.metrics.flush_rows.increment(rows as u64);
self.metrics.flush_last.set(now() as f64);
self.metrics
.flush_lag
.record((now() - insertion_time) as f64);
self.origin_durable.iter().for_each(|(origin, seq)| {
self.metrics.sequence_durable(origin, *seq);
});

Ok(())
}

Expand Down Expand Up @@ -577,6 +646,8 @@ impl SeafowlDataSyncWriter {
fn remove_sync(&mut self, url: &String) {
if let Some(sync) = self.syncs.shift_remove(url) {
self.size -= sync.size;
self.metrics.in_memory_bytes.decrement(sync.size as f64);
self.metrics.in_memory_rows.decrement(sync.rows as f64);
}
}

Expand Down

0 comments on commit f07c74f

Please sign in to comment.