Skip to content

Commit

Permalink
Implement DAA score timestamp estimation (#268)
Browse files Browse the repository at this point in the history
* Add GetDaaEstimateTimestamp RPC API

Basic implementation, just return the value back. Commit is focused on
adding the API rather than functionality.

* Implement DAA score timestamp estimation

Input = array of daa_scores
Output = array of timestamps, index matched with input

* Use info for daa TS estimate as opposed to println

* Factor in BPS in the last header estimation

* - use compact headers
- acquire prune lock
- use tighter indexing logic
- ensure sink is included

* eof new line

* add todos

* - bug fix: use f64, otherwise the fraction is always zero
- avoid assuming that timestamps are strictly monotonic

* renames

* translate score -> miliisecs directly using target time per block

* note

* Add last data point from jupyter notebook

It's the same daa_score as checkpoint genesis, but timestamp is
different to improve accuracy for queries right before and right after
this daa_score

* use an array to avoid prealloc inaccuracies

---------

Co-authored-by: Michael Sutton <[email protected]>
  • Loading branch information
coderofstuff and michaelsutton authored Nov 23, 2023
1 parent f674ad5 commit f536fe7
Show file tree
Hide file tree
Showing 22 changed files with 298 additions and 2 deletions.
17 changes: 17 additions & 0 deletions cli/src/modules/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,23 @@ impl Rpc {
let result = rpc.get_coin_supply_call(GetCoinSupplyRequest {}).await?;
self.println(&ctx, result);
}
RpcApiOps::GetDaaScoreTimestampEstimate => {
if argv.is_empty() {
return Err(Error::custom("Please specify a daa_score"));
}
let daa_score_result = argv.iter().map(|s| s.parse::<u64>()).collect::<std::result::Result<Vec<_>, _>>();

match daa_score_result {
Ok(daa_scores) => {
let result =
rpc.get_daa_score_timestamp_estimate_call(GetDaaScoreTimestampEstimateRequest { daa_scores }).await?;
self.println(&ctx, result);
}
Err(_err) => {
return Err(Error::custom("Could not parse daa_scores to u64"));
}
}
}
_ => {
tprintln!(ctx, "rpc method exists but is not supported by the cli: '{op_str}'\r\n");
return Ok(());
Expand Down
5 changes: 5 additions & 0 deletions components/consensusmanager/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use kaspa_consensus_core::{
block::Block,
block_count::BlockCount,
blockstatus::BlockStatus,
daa_score_timestamp::DaaScoreTimestamp,
errors::consensus::ConsensusResult,
header::Header,
pruning::{PruningPointProof, PruningPointTrustedData, PruningPointsList},
Expand Down Expand Up @@ -250,6 +251,10 @@ impl ConsensusSessionOwned {
self.clone().spawn_blocking(|c| c.get_headers_selected_tip()).await
}

pub async fn async_get_chain_block_samples(&self) -> Vec<DaaScoreTimestamp> {
self.clone().spawn_blocking(|c| c.get_chain_block_samples()).await
}

/// Returns the antipast of block `hash` from the POV of `context`, i.e. `antipast(hash) ∩ past(context)`.
/// Since this might be an expensive operation for deep blocks, we allow the caller to specify a limit
/// `max_traversal_allowed` on the maximum amount of blocks to traverse for obtaining the answer
Expand Down
5 changes: 5 additions & 0 deletions consensus/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
block_count::BlockCount,
blockstatus::BlockStatus,
coinbase::MinerData,
daa_score_timestamp::DaaScoreTimestamp,
errors::{
block::{BlockProcessResult, RuleError},
coinbase::CoinbaseResult,
Expand Down Expand Up @@ -136,6 +137,10 @@ pub trait ConsensusApi: Send + Sync {
unimplemented!()
}

fn get_chain_block_samples(&self) -> Vec<DaaScoreTimestamp> {
unimplemented!()
}

fn get_virtual_parents(&self) -> BlockHashSet {
unimplemented!()
}
Expand Down
23 changes: 23 additions & 0 deletions consensus/core/src/daa_score_timestamp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::sync::Arc;

use serde::{Deserialize, Serialize};

use crate::header::Header;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DaaScoreTimestamp {
pub daa_score: u64,
pub timestamp: u64,
}

impl From<Header> for DaaScoreTimestamp {
fn from(header: Header) -> DaaScoreTimestamp {
DaaScoreTimestamp { daa_score: header.daa_score, timestamp: header.timestamp }
}
}

impl From<Arc<Header>> for DaaScoreTimestamp {
fn from(header: Arc<Header>) -> DaaScoreTimestamp {
DaaScoreTimestamp { daa_score: header.daa_score, timestamp: header.timestamp }
}
}
1 change: 1 addition & 0 deletions consensus/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod blockstatus;
pub mod coinbase;
pub mod config;
pub mod constants;
pub mod daa_score_timestamp;
pub mod errors;
pub mod hashing;
pub mod header;
Expand Down
85 changes: 84 additions & 1 deletion consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
acceptance_data::AcceptanceDataStoreReader,
block_transactions::BlockTransactionsStoreReader,
ghostdag::{GhostdagData, GhostdagStoreReader},
headers::HeaderStoreReader,
headers::{CompactHeaderData, HeaderStoreReader},
headers_selected_tip::HeadersSelectedTipStoreReader,
past_pruning_points::PastPruningPointsStoreReader,
pruning::PruningStoreReader,
Expand Down Expand Up @@ -46,6 +46,7 @@ use kaspa_consensus_core::{
blockhash::BlockHashExtensions,
blockstatus::BlockStatus,
coinbase::MinerData,
daa_score_timestamp::DaaScoreTimestamp,
errors::{
coinbase::CoinbaseResult,
consensus::{ConsensusError, ConsensusResult},
Expand All @@ -54,6 +55,7 @@ use kaspa_consensus_core::{
errors::{difficulty::DifficultyError, pruning::PruningImportError},
header::Header,
muhash::MuHashExtensions,
network::NetworkType,
pruning::{PruningPointProof, PruningPointTrustedData, PruningPointsList},
trusted::{ExternalGhostdagData, TrustedBlock},
tx::{MutableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
Expand Down Expand Up @@ -83,6 +85,10 @@ use tokio::sync::oneshot;

use self::{services::ConsensusServices, storage::ConsensusStorage};

use crate::model::stores::selected_chain::SelectedChainStoreReader;

use std::cmp;

pub struct Consensus {
// DB
db: Arc<DB>,
Expand Down Expand Up @@ -358,6 +364,16 @@ impl Consensus {
};
Ok(self.services.window_manager.estimate_network_hashes_per_second(window)?)
}

fn pruning_point_compact_headers(&self) -> Vec<(Hash, CompactHeaderData)> {
// PRUNE SAFETY: index is monotonic and past pruning point headers are expected permanently
let current_pp_info = self.pruning_point_store.read().get().unwrap();
(0..current_pp_info.index)
.map(|index| self.past_pruning_points_store.get(index).unwrap())
.chain(once(current_pp_info.pruning_point))
.map(|hash| (hash, self.headers_store.get_compact_header_data(hash).unwrap()))
.collect_vec()
}
}

impl ConsensusApi for Consensus {
Expand Down Expand Up @@ -479,6 +495,73 @@ impl ConsensusApi for Consensus {
Ok(self.services.dag_traversal_manager.calculate_chain_path(hash, self.get_sink()))
}

/// Returns a Vec of header samples since genesis
/// ordered by ascending daa_score, first entry is genesis
fn get_chain_block_samples(&self) -> Vec<DaaScoreTimestamp> {
// We need consistency between the past pruning points, selected chain and header store reads
let _guard = self.pruning_lock.blocking_read();

// Sorted from genesis to latest pruning_point_headers
let pp_headers = self.pruning_point_compact_headers();
let step_divisor: usize = 3; // The number of extra samples we'll get from blocks after last pp header
let prealloc_len = pp_headers.len() + step_divisor + 1;

let mut sample_headers;

// Part 1: Add samples from pruning point headers:
if self.config.net.network_type == NetworkType::Mainnet {
// For mainnet, we add extra data (16 pp headers) from before checkpoint genesis.
// Source: https://github.com/kaspagang/kaspad-py-explorer/blob/main/src/tx_timestamp_estimation.ipynb
// For context see also: https://github.com/kaspagang/kaspad-py-explorer/blob/main/src/genesis_proof.ipynb
const POINTS: &[DaaScoreTimestamp] = &[
DaaScoreTimestamp { daa_score: 0, timestamp: 1636298787842 },
DaaScoreTimestamp { daa_score: 87133, timestamp: 1636386662010 },
DaaScoreTimestamp { daa_score: 176797, timestamp: 1636473700804 },
DaaScoreTimestamp { daa_score: 264837, timestamp: 1636560706885 },
DaaScoreTimestamp { daa_score: 355974, timestamp: 1636650005662 },
DaaScoreTimestamp { daa_score: 445152, timestamp: 1636737841327 },
DaaScoreTimestamp { daa_score: 536709, timestamp: 1636828600930 },
DaaScoreTimestamp { daa_score: 624635, timestamp: 1636912614350 },
DaaScoreTimestamp { daa_score: 712234, timestamp: 1636999362832 },
DaaScoreTimestamp { daa_score: 801831, timestamp: 1637088292662 },
DaaScoreTimestamp { daa_score: 890716, timestamp: 1637174890675 },
DaaScoreTimestamp { daa_score: 978396, timestamp: 1637260956454 },
DaaScoreTimestamp { daa_score: 1068387, timestamp: 1637349078269 },
DaaScoreTimestamp { daa_score: 1139626, timestamp: 1637418723538 },
DaaScoreTimestamp { daa_score: 1218320, timestamp: 1637495941516 },
DaaScoreTimestamp { daa_score: 1312860, timestamp: 1637609671037 },
];
sample_headers = Vec::<DaaScoreTimestamp>::with_capacity(prealloc_len + POINTS.len());
sample_headers.extend_from_slice(POINTS);
} else {
sample_headers = Vec::<DaaScoreTimestamp>::with_capacity(prealloc_len);
}

for header in pp_headers.iter() {
sample_headers.push(DaaScoreTimestamp { daa_score: header.1.daa_score, timestamp: header.1.timestamp });
}

// Part 2: Add samples from recent chain blocks
let sc_read = self.storage.selected_chain_store.read();
let high_index = sc_read.get_tip().unwrap().0;
// The last pruning point is always expected in the selected chain store. However if due to some reason
// this is not the case, we prefer not crashing but rather avoid sampling (hence set low index to high index)
let low_index = sc_read.get_by_hash(pp_headers.last().unwrap().0).unwrap_option().unwrap_or(high_index);
let step_size = cmp::max((high_index - low_index) / (step_divisor as u64), 1);

// We chain `high_index` to make sure we sample sink, and dedup to avoid sampling it twice
for index in (low_index + step_size..=high_index).step_by(step_size as usize).chain(once(high_index)).dedup() {
let compact = self
.storage
.headers_store
.get_compact_header_data(sc_read.get_by_index(index).expect("store lock is acquired"))
.unwrap();
sample_headers.push(DaaScoreTimestamp { daa_score: compact.daa_score, timestamp: compact.timestamp });
}

sample_headers
}

fn get_virtual_parents(&self) -> BlockHashSet {
self.virtual_stores.read().state.get().unwrap().parents.iter().copied().collect()
}
Expand Down
2 changes: 2 additions & 0 deletions rpc/core/src/api/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ pub enum RpcApiOps {
GetMempoolEntriesByAddresses,
/// Get current issuance supply
GetCoinSupply,
/// Get DAA Score timestamp estimate
GetDaaScoreTimestampEstimate,

// Subscription commands for starting/stopping notifications
NotifyBlockAdded,
Expand Down
8 changes: 8 additions & 0 deletions rpc/core/src/api/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,14 @@ pub trait RpcApi: Sync + Send + AnySync {
}
async fn get_coin_supply_call(&self, request: GetCoinSupplyRequest) -> RpcResult<GetCoinSupplyResponse>;

async fn get_daa_score_timestamp_estimate(&self, daa_scores: Vec<u64>) -> RpcResult<GetDaaScoreTimestampEstimateResponse> {
self.get_daa_score_timestamp_estimate_call(GetDaaScoreTimestampEstimateRequest { daa_scores }).await
}
async fn get_daa_score_timestamp_estimate_call(
&self,
request: GetDaaScoreTimestampEstimateRequest,
) -> RpcResult<GetDaaScoreTimestampEstimateResponse>;

// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// Notification API

Expand Down
24 changes: 24 additions & 0 deletions rpc/core/src/model/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,30 @@ pub struct GetSyncStatusResponse {
pub is_synced: bool,
}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)]
#[serde(rename_all = "camelCase")]
pub struct GetDaaScoreTimestampEstimateRequest {
pub daa_scores: Vec<u64>,
}

impl GetDaaScoreTimestampEstimateRequest {
pub fn new(daa_scores: Vec<u64>) -> Self {
Self { daa_scores }
}
}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)]
#[serde(rename_all = "camelCase")]
pub struct GetDaaScoreTimestampEstimateResponse {
pub timestamps: Vec<u64>,
}

impl GetDaaScoreTimestampEstimateResponse {
pub fn new(timestamps: Vec<u64>) -> Self {
Self { timestamps }
}
}

// ----------------------------------------------------------------------------
// Subscriptions & notifications
// ----------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions rpc/grpc/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ impl RpcApi for GrpcClient {
route!(estimate_network_hashes_per_second_call, EstimateNetworkHashesPerSecond);
route!(get_mempool_entries_by_addresses_call, GetMempoolEntriesByAddresses);
route!(get_coin_supply_call, GetCoinSupply);
route!(get_daa_score_timestamp_estimate_call, GetDaaScoreTimestampEstimate);

// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// Notification API
Expand Down
2 changes: 2 additions & 0 deletions rpc/grpc/core/proto/messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ message KaspadRequest {
GetMetricsRequestMessage getMetricsRequest = 1090;
GetServerInfoRequestMessage getServerInfoRequest = 1092;
GetSyncStatusRequestMessage getSyncStatusRequest = 1094;
GetDaaScoreTimestampEstimateRequestMessage GetDaaScoreTimestampEstimateRequest = 1096;
}
}

Expand Down Expand Up @@ -116,6 +117,7 @@ message KaspadResponse {
GetMetricsResponseMessage getMetricsResponse= 1091;
GetServerInfoResponseMessage getServerInfoResponse = 1093;
GetSyncStatusResponseMessage getSyncStatusResponse = 1095;
GetDaaScoreTimestampEstimateResponseMessage GetDaaScoreTimestampEstimateResponse = 1097;
}
}

Expand Down
11 changes: 10 additions & 1 deletion rpc/grpc/core/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -812,4 +812,13 @@ message GetSyncStatusRequestMessage{
message GetSyncStatusResponseMessage{
bool isSynced = 1;
RPCError error = 1000;
}
}

message GetDaaScoreTimestampEstimateRequestMessage {
repeated uint64 daa_scores = 1;
}

message GetDaaScoreTimestampEstimateResponseMessage{
repeated uint64 timestamps = 1;
RPCError error = 1000;
}
2 changes: 2 additions & 0 deletions rpc/grpc/core/src/convert/kaspad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub mod kaspad_request_convert {
impl_into_kaspad_request!(GetMetrics);
impl_into_kaspad_request!(GetServerInfo);
impl_into_kaspad_request!(GetSyncStatus);
impl_into_kaspad_request!(GetDaaScoreTimestampEstimate);

impl_into_kaspad_request!(NotifyBlockAdded);
impl_into_kaspad_request!(NotifyNewBlockTemplate);
Expand Down Expand Up @@ -186,6 +187,7 @@ pub mod kaspad_response_convert {
impl_into_kaspad_response!(GetMetrics);
impl_into_kaspad_response!(GetServerInfo);
impl_into_kaspad_response!(GetSyncStatus);
impl_into_kaspad_response!(GetDaaScoreTimestampEstimate);

impl_into_kaspad_notify_response!(NotifyBlockAdded);
impl_into_kaspad_notify_response!(NotifyNewBlockTemplate);
Expand Down
18 changes: 18 additions & 0 deletions rpc/grpc/core/src/convert/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,15 @@ from!(item: RpcResult<&kaspa_rpc_core::GetCoinSupplyResponse>, protowire::GetCoi
Self { max_sompi: item.max_sompi, circulating_sompi: item.circulating_sompi, error: None }
});

from!(item: &kaspa_rpc_core::GetDaaScoreTimestampEstimateRequest, protowire::GetDaaScoreTimestampEstimateRequestMessage, {
Self {
daa_scores: item.daa_scores.clone()
}
});
from!(item: RpcResult<&kaspa_rpc_core::GetDaaScoreTimestampEstimateResponse>, protowire::GetDaaScoreTimestampEstimateResponseMessage, {
Self { timestamps: item.timestamps.clone(), error: None }
});

from!(&kaspa_rpc_core::PingRequest, protowire::PingRequestMessage);
from!(RpcResult<&kaspa_rpc_core::PingResponse>, protowire::PingResponseMessage);

Expand Down Expand Up @@ -715,6 +724,15 @@ try_from!(item: &protowire::GetCoinSupplyResponseMessage, RpcResult<kaspa_rpc_co
Self { max_sompi: item.max_sompi, circulating_sompi: item.circulating_sompi }
});

try_from!(item: &protowire::GetDaaScoreTimestampEstimateRequestMessage, kaspa_rpc_core::GetDaaScoreTimestampEstimateRequest , {
Self {
daa_scores: item.daa_scores.clone()
}
});
try_from!(item: &protowire::GetDaaScoreTimestampEstimateResponseMessage, RpcResult<kaspa_rpc_core::GetDaaScoreTimestampEstimateResponse>, {
Self { timestamps: item.timestamps.clone() }
});

try_from!(&protowire::PingRequestMessage, kaspa_rpc_core::PingRequest);
try_from!(&protowire::PingResponseMessage, RpcResult<kaspa_rpc_core::PingResponse>);

Expand Down
1 change: 1 addition & 0 deletions rpc/grpc/core/src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub enum KaspadPayloadOps {
GetMetrics,
GetServerInfo,
GetSyncStatus,
GetDaaScoreTimestampEstimate,

// Subscription commands for starting/stopping notifications
NotifyBlockAdded,
Expand Down
1 change: 1 addition & 0 deletions rpc/grpc/server/src/request_handler/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl Factory {
GetMetrics,
GetServerInfo,
GetSyncStatus,
GetDaaScoreTimestampEstimate,
NotifyBlockAdded,
NotifyNewBlockTemplate,
NotifyFinalityConflict,
Expand Down
7 changes: 7 additions & 0 deletions rpc/grpc/server/src/tests/rpc_core_mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ impl RpcApi for RpcCoreMock {
Err(RpcError::NotImplemented)
}

async fn get_daa_score_timestamp_estimate_call(
&self,
_request: GetDaaScoreTimestampEstimateRequest,
) -> RpcResult<GetDaaScoreTimestampEstimateResponse> {
Err(RpcError::NotImplemented)
}

// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// Notification API

Expand Down
Loading

0 comments on commit f536fe7

Please sign in to comment.