Skip to content

Commit

Permalink
[release/v1.0.0-rc1]: Merge commit 'b9881d1' into release/v1.0.0-rc1
Browse files Browse the repository at this point in the history
  • Loading branch information
bfops committed Nov 5, 2024
2 parents be63a47 + b9881d1 commit e00ac50
Show file tree
Hide file tree
Showing 13 changed files with 277 additions and 226 deletions.
12 changes: 8 additions & 4 deletions crates/bindings/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,17 +437,21 @@ extern "C" fn __describe_module__(description: BytesSink) {
/// when the `sender` calls the reducer identified by `id` at `timestamp` with `args`.
///
/// The `sender_{0-3}` are the pieces of a `[u8; 32]` (`u256`) representing the sender's `Identity`.
/// They are encoded as follows (assuming `identity.identity_bytes: [u8; 32]`):
/// They are encoded as follows (assuming `identity.to_byte_array(): [u8; 32]`):
/// - `sender_0` contains bytes `[0 ..8 ]`.
/// - `sender_1` contains bytes `[8 ..16]`.
/// - `sender_2` contains bytes `[16..24]`.
/// - `sender_3` contains bytes `[24..32]`.
///
/// Note that `to_byte_array` uses LITTLE-ENDIAN order! This matches most host systems.
///
/// The `address_{0-1}` are the pieces of a `[u8; 16]` (`u128`) representing the callers's `Address`.
/// They are encoded as follows (assuming `address.__address__: u128`):
/// They are encoded as follows (assuming `address.as_byte_array(): [u8; 16]`):
/// - `address_0` contains bytes `[0 ..8 ]`.
/// - `address_1` contains bytes `[8 ..16]`.
///
/// Again, note that `to_byte_array` uses LITTLE-ENDIAN order! This matches most host systems.
///
/// The `args` is a `BytesSource`, registered on the host side,
/// which can be read with `bytes_source_read`.
/// The contents of the buffer are the BSATN-encoding of the arguments to the reducer.
Expand Down Expand Up @@ -475,13 +479,13 @@ extern "C" fn __call_reducer__(
// Piece together `sender_i` into an `Identity`.
let sender = [sender_0, sender_1, sender_2, sender_3];
let sender: [u8; 32] = bytemuck::must_cast(sender);
let sender = Identity::from_byte_array(sender);
let sender = Identity::from_byte_array(sender); // The LITTLE-ENDIAN constructor.

// Piece together `address_i` into an `Address`.
// The all-zeros `address` (`Address::__DUMMY`) is interpreted as `None`.
let address = [address_0, address_1];
let address: [u8; 16] = bytemuck::must_cast(address);
let address = Address::from_byte_array(address);
let address = Address::from_byte_array(address); // The LITTLE-ENDIAN constructor.
let address = (address != Address::__DUMMY).then_some(address);

// Assemble the `ReducerContext`.
Expand Down
2 changes: 1 addition & 1 deletion crates/client-api/src/routes/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl IdentityForUrl {

impl<'de> serde::Deserialize<'de> for IdentityForUrl {
fn deserialize<D: serde::Deserializer<'de>>(de: D) -> Result<Self, D::Error> {
<_>::deserialize(de).map(|DeserializeWrapper(b)| IdentityForUrl(Identity::from_byte_array(b)))
<_>::deserialize(de).map(|DeserializeWrapper(b)| IdentityForUrl(Identity::from_be_byte_array(b)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,37 +654,6 @@ impl<'a> CommittedIndexIter<'a> {
}
}

// TODO(shub): this runs parralely for subscriptions leading to lock contention.
// commenting until we find a way to batch them without lock.
// impl Drop for CommittedIndexIter<'_> {
// fn drop(&mut self) {
// let mut metrics = self.ctx.metrics.write();
// let get_table_name = || {
// self.committed_state
// .get_schema(&self.table_id)
// .map(|table| &*table.table_name)
// .unwrap_or_default()
// .to_string()
// };

// metrics.inc_by(self.table_id, MetricType::IndexSeeks, 1, get_table_name);
// // Increment number of index keys scanned
// metrics.inc_by(
// self.table_id,
// MetricType::KeysScanned,
// self.committed_rows.num_pointers_yielded(),
// get_table_name,
// );
// // Increment number of rows fetched
// metrics.inc_by(
// self.table_id,
// MetricType::RowsFetched,
// self.num_committed_rows_fetched,
// get_table_name,
// );
// }
// }

impl<'a> Iterator for CommittedIndexIter<'a> {
type Item = RowRef<'a>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ impl MutTxDatastore for Locking {

/// This utility is responsible for recording all transaction metrics.
pub(super) fn record_metrics(
ctx: ExecutionContext,
ctx: &ExecutionContext,
tx_timer: Instant,
lock_wait_time: Duration,
committed: bool,
Expand Down
22 changes: 12 additions & 10 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ impl MutTxId {
// Record metrics for the transaction at the very end,
// right before we drop and release the lock.
record_metrics(
self.ctx,
&self.ctx,
self.timer,
self.lock_wait_time,
true,
Expand All @@ -1038,49 +1038,51 @@ impl MutTxId {
tx_data
}

pub fn commit_downgrade(self, workload: Workload) -> (TxData, TxId) {
pub fn commit_downgrade(mut self, workload: Workload) -> (TxData, TxId) {
let Self {
mut committed_state_write_lock,
tx_state,
..
} = self;
let tx_data = committed_state_write_lock.merge(tx_state, &self.ctx);
let database_identity = self.ctx.database_identity();
// Record metrics for the transaction at the very end,
// right before we drop and release the lock.
record_metrics(
self.ctx,
&self.ctx,
self.timer,
self.lock_wait_time,
true,
Some(&tx_data),
Some(&committed_state_write_lock),
);
// Update the workload type of the execution context
self.ctx.workload = workload.into();
let tx = TxId {
committed_state_shared_lock: SharedWriteGuard::downgrade(committed_state_write_lock),
lock_wait_time: Duration::ZERO,
timer: Instant::now(),
ctx: ExecutionContext::with_workload(database_identity, workload),
ctx: self.ctx,
};
(tx_data, tx)
}

pub fn rollback(self) {
// Record metrics for the transaction at the very end,
// right before we drop and release the lock.
record_metrics(self.ctx, self.timer, self.lock_wait_time, false, None, None);
record_metrics(&self.ctx, self.timer, self.lock_wait_time, false, None, None);
}

pub fn rollback_downgrade(self, workload: Workload) -> TxId {
pub fn rollback_downgrade(mut self, workload: Workload) -> TxId {
// Record metrics for the transaction at the very end,
// right before we drop and release the lock.
let database_identity = self.ctx.database_identity();
record_metrics(self.ctx, self.timer, self.lock_wait_time, false, None, None);
record_metrics(&self.ctx, self.timer, self.lock_wait_time, false, None, None);
// Update the workload type of the execution context
self.ctx.workload = workload.into();
TxId {
committed_state_shared_lock: SharedWriteGuard::downgrade(self.committed_state_write_lock),
lock_wait_time: Duration::ZERO,
timer: Instant::now(),
ctx: ExecutionContext::with_workload(database_identity, workload),
ctx: self.ctx,
}
}
}
Expand Down
48 changes: 0 additions & 48 deletions crates/core/src/db/datastore/locking_tx_datastore/state_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,6 @@ pub struct Iter<'a> {
num_committed_rows_fetched: u64,
}

// impl Drop for Iter<'_> {
// fn drop(&mut self) {
// let mut metrics = self.ctx.metrics.write();
// // Increment number of rows fetched
// metrics.inc_by(
// self.table_id,
// MetricType::RowsFetched,
// self.num_committed_rows_fetched,
// || self.table_name.to_string(),
// );
// }
// }

impl<'a> Iter<'a> {
pub(super) fn new(
table_id: TableId,
Expand Down Expand Up @@ -294,41 +281,6 @@ pub struct IndexSeekIterMutTxId<'a> {
pub(super) num_committed_rows_fetched: u64,
}

// impl Drop for IndexSeekIterMutTxId<'_> {
// fn drop(&mut self) {
// let mut metrics = self.ctx.metrics.write();
// let get_table_name = || {
// self.committed_state
// .get_schema(&self.table_id)
// .map(|table| &*table.table_name)
// .unwrap_or_default()
// .to_string()
// };

// let num_pointers_yielded = self
// .committed_rows
// .as_ref()
// .map_or(0, |iter| iter.num_pointers_yielded());

// // Increment number of index seeks
// metrics.inc_by(self.table_id, MetricType::IndexSeeks, 1, get_table_name);
// // Increment number of index keys scanned
// metrics.inc_by(
// self.table_id,
// MetricType::KeysScanned,
// num_pointers_yielded,
// get_table_name,
// );
// // Increment number of rows fetched
// metrics.inc_by(
// self.table_id,
// MetricType::RowsFetched,
// self.num_committed_rows_fetched,
// get_table_name,
// );
// }
// }

impl<'a> Iterator for IndexSeekIterMutTxId<'a> {
type Item = RowRef<'a>;

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/db/datastore/locking_tx_datastore/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl StateView for TxId {

impl TxId {
pub(super) fn release(self) {
record_metrics(self.ctx, self.timer, self.lock_wait_time, true, None, None);
record_metrics(&self.ctx, self.timer, self.lock_wait_time, true, None, None);
}

/// The Number of Distinct Values (NDV) for a column or list of columns,
Expand Down
121 changes: 17 additions & 104 deletions crates/core/src/execution_context.rs
Original file line number Diff line number Diff line change
@@ -1,114 +1,23 @@
use std::sync::Arc;

use crate::db::db_metrics::DB_METRICS;
use bytes::Bytes;
use derive_more::Display;
use parking_lot::RwLock;
use spacetimedb_client_api_messages::timestamp::Timestamp;
use spacetimedb_commitlog::{payload::txdata, Varchar};
use spacetimedb_lib::{Address, Identity};
use spacetimedb_primitives::TableId;
use spacetimedb_sats::bsatn;

pub enum MetricType {
IndexSeeks,
KeysScanned,
RowsFetched,
}

#[derive(Default, Clone)]
struct BufferMetric {
pub table_id: TableId,
pub index_seeks: u64,
pub keys_scanned: u64,
pub rows_fetched: u64,
pub cache_table_name: String,
}

impl BufferMetric {
pub fn inc_by(&mut self, ty: MetricType, val: u64) {
match ty {
MetricType::IndexSeeks => {
self.index_seeks += val;
}
MetricType::KeysScanned => {
self.keys_scanned += val;
}
MetricType::RowsFetched => {
self.rows_fetched += val;
}
}
}
}

impl BufferMetric {
pub fn new(table_id: TableId, table_name: String) -> Self {
Self {
table_id,
cache_table_name: table_name,
..Default::default()
}
}
}

#[derive(Default, Clone)]
pub struct Metrics(Vec<BufferMetric>);

impl Metrics {
pub fn inc_by<F: FnOnce() -> String>(&mut self, table_id: TableId, ty: MetricType, val: u64, get_table_name: F) {
if let Some(metric) = self.0.iter_mut().find(|x| x.table_id == table_id) {
metric.inc_by(ty, val);
} else {
let table_name = get_table_name();
let mut metric = BufferMetric::new(table_id, table_name);
metric.inc_by(ty, val);
self.0.push(metric);
}
}

pub fn table_exists(&self, table_id: TableId) -> bool {
self.0.iter().any(|x| x.table_id == table_id)
}

#[allow(dead_code)]
fn flush(&mut self, workload: &WorkloadType, database_identity: &Identity, reducer: &str) {
macro_rules! flush_metric {
($db_metric:expr, $metric:expr, $metric_field:ident) => {
if $metric.$metric_field > 0 {
$db_metric
.with_label_values(
workload,
database_identity,
reducer,
&$metric.table_id.0,
&$metric.cache_table_name,
)
.inc_by($metric.$metric_field);
}
};
}

self.0.iter().for_each(|metric| {
flush_metric!(DB_METRICS.rdb_num_index_seeks, metric, index_seeks);
flush_metric!(DB_METRICS.rdb_num_keys_scanned, metric, keys_scanned);
flush_metric!(DB_METRICS.rdb_num_rows_fetched, metric, rows_fetched);
});
}
}

/// Represents the context under which a database runtime method is executed.
/// In particular it provides details about the currently executing txn to runtime operations.
/// More generally it acts as a container for information that database operations may require to function correctly.
#[derive(Default, Clone)]
pub struct ExecutionContext {
/// The identity of the database on which a transaction is being executed.
database_identity: Identity,
pub database_identity: Identity,
/// The reducer from which the current transaction originated.
reducer: Option<ReducerContext>,
pub reducer: Option<ReducerContext>,
/// The type of workload that is being executed.
workload: WorkloadType,
/// The Metrics to be reported for this transaction.
pub metrics: Arc<RwLock<Metrics>>,
pub workload: WorkloadType,
}

/// If an [`ExecutionContext`] is a reducer context, describes the reducer.
Expand Down Expand Up @@ -208,6 +117,20 @@ pub enum WorkloadType {
Internal,
}

impl From<Workload> for WorkloadType {
fn from(value: Workload) -> Self {
match value {
#[cfg(test)]
Workload::ForTests => Self::Internal,
Workload::Reducer(_) => Self::Reducer,
Workload::Sql => Self::Sql,
Workload::Subscribe => Self::Subscribe,
Workload::Update => Self::Update,
Workload::Internal => Self::Internal,
}
}
}

impl Default for WorkloadType {
fn default() -> Self {
Self::Internal
Expand All @@ -221,7 +144,6 @@ impl ExecutionContext {
database_identity,
reducer,
workload,
metrics: <_>::default(),
}
}

Expand Down Expand Up @@ -293,12 +215,3 @@ impl ExecutionContext {
self.workload
}
}

impl Drop for ExecutionContext {
fn drop(&mut self) {
let workload = self.workload;
let database = self.database_identity;
let reducer = self.reducer_name();
self.metrics.write().flush(&workload, &database, reducer);
}
}
Loading

0 comments on commit e00ac50

Please sign in to comment.