Skip to content

Commit

Permalink
Implement DAA score timestamp estimation
Browse files Browse the repository at this point in the history
Input = array of daa_scores
Output = array of timestamps, index matched with input
  • Loading branch information
coderofstuff committed Nov 9, 2023
1 parent 881f04d commit c50cc42
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 20 deletions.
8 changes: 4 additions & 4 deletions cli/src/modules/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,16 @@ impl Rpc {
if argv.is_empty() {
return Err(Error::custom("Please specify a daa_score"));
}
let daa_score_result = argv.remove(0).parse::<u64>();
let daa_score_result = argv.iter().map(|s| s.parse::<u64>()).collect::<std::result::Result<Vec<_>, _>>();

match daa_score_result {
Ok(daa_score) => {
Ok(daa_scores) => {
let result =
rpc.get_daa_score_timestamp_estimate_call(GetDaaScoreTimestampEstimateRequest { daa_score }).await?;
rpc.get_daa_score_timestamp_estimate_call(GetDaaScoreTimestampEstimateRequest { daa_scores }).await?;
self.println(&ctx, result);
}
Err(_err) => {
return Err(Error::custom("Could not parse daa_score to u64"));
return Err(Error::custom("Could not parse daa_scores to u64"));
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions components/consensusmanager/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use kaspa_consensus_core::{
block::Block,
block_count::BlockCount,
blockstatus::BlockStatus,
daa_score_timestamp::DaaScoreTimestamp,
errors::consensus::ConsensusResult,
header::Header,
pruning::{PruningPointProof, PruningPointTrustedData, PruningPointsList},
Expand Down Expand Up @@ -250,6 +251,10 @@ impl ConsensusSessionOwned {
self.clone().spawn_blocking(|c| c.get_headers_selected_tip()).await
}

pub async fn async_get_chain_block_samples(&self) -> Vec<DaaScoreTimestamp> {
self.clone().spawn_blocking(|c| c.get_chain_block_samples()).await
}

/// Returns the anticone of block `hash` from the POV of `context`, i.e. `anticone(hash) ∩ past(context)`.
/// Since this might be an expensive operation for deep blocks, we allow the caller to specify a limit
/// `max_traversal_allowed` on the maximum amount of blocks to traverse for obtaining the answer
Expand Down
5 changes: 5 additions & 0 deletions consensus/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
block_count::BlockCount,
blockstatus::BlockStatus,
coinbase::MinerData,
daa_score_timestamp::DaaScoreTimestamp,
errors::{
block::{BlockProcessResult, RuleError},
coinbase::CoinbaseResult,
Expand Down Expand Up @@ -136,6 +137,10 @@ pub trait ConsensusApi: Send + Sync {
unimplemented!()
}

fn get_chain_block_samples(&self) -> Vec<DaaScoreTimestamp> {
unimplemented!()
}

fn get_virtual_parents(&self) -> BlockHashSet {
unimplemented!()
}
Expand Down
23 changes: 23 additions & 0 deletions consensus/core/src/daa_score_timestamp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::sync::Arc;

use serde::{Deserialize, Serialize};

use crate::header::Header;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DaaScoreTimestamp {
pub daa_score: u64,
pub timestamp: u64,
}

impl From<Header> for DaaScoreTimestamp {
fn from(header: Header) -> DaaScoreTimestamp {
DaaScoreTimestamp { daa_score: header.daa_score, timestamp: header.timestamp }
}
}

impl From<Arc<Header>> for DaaScoreTimestamp {
fn from(header: Arc<Header>) -> DaaScoreTimestamp {
DaaScoreTimestamp { daa_score: header.daa_score, timestamp: header.timestamp }
}
}
1 change: 1 addition & 0 deletions consensus/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod blockstatus;
pub mod coinbase;
pub mod config;
pub mod constants;
pub mod daa_score_timestamp;
pub mod errors;
pub mod hashing;
pub mod header;
Expand Down
60 changes: 60 additions & 0 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use kaspa_consensus_core::{
blockhash::BlockHashExtensions,
blockstatus::BlockStatus,
coinbase::MinerData,
daa_score_timestamp::DaaScoreTimestamp,
errors::{
coinbase::CoinbaseResult,
consensus::{ConsensusError, ConsensusResult},
Expand All @@ -54,6 +55,7 @@ use kaspa_consensus_core::{
errors::{difficulty::DifficultyError, pruning::PruningImportError},
header::Header,
muhash::MuHashExtensions,
network::NetworkType,
pruning::{PruningPointProof, PruningPointTrustedData, PruningPointsList},
trusted::{ExternalGhostdagData, TrustedBlock},
tx::{MutableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
Expand Down Expand Up @@ -83,6 +85,10 @@ use tokio::sync::oneshot;

use self::{services::ConsensusServices, storage::ConsensusStorage};

use crate::model::stores::selected_chain::SelectedChainStoreReader;

use std::cmp;

pub struct Consensus {
// DB
db: Arc<DB>,
Expand Down Expand Up @@ -479,6 +485,60 @@ impl ConsensusApi for Consensus {
Ok(self.services.dag_traversal_manager.calculate_chain_path(hash, self.get_sink()))
}

/// Returns a Vec of header samples since genesis
/// Ordered ascending by daa_score, first entry is genesis
fn get_chain_block_samples(&self) -> Vec<DaaScoreTimestamp> {
// Sorted from genesis to latest pruning_point_headers
let pp_headers = self.pruning_point_headers();
let step_divisor: usize = 3; // The number of extra samples we'll get from blocks after last pp header
let prealloc_len = pp_headers.len() + step_divisor + 1;

let mut sample_headers;

// Part 1: Add samples from pruning point headers:
if self.config.net.network_type == NetworkType::Mainnet {
sample_headers = Vec::<DaaScoreTimestamp>::with_capacity(prealloc_len + 15);
// For mainnet, we add extra data (15 pp headers) from before checkpoint genesis:
sample_headers.push(DaaScoreTimestamp { daa_score: 0, timestamp: 1636298787842 });
sample_headers.push(DaaScoreTimestamp { daa_score: 87133, timestamp: 1636386662010 });
sample_headers.push(DaaScoreTimestamp { daa_score: 176797, timestamp: 1636473700804 });
sample_headers.push(DaaScoreTimestamp { daa_score: 264837, timestamp: 1636560706885 });
sample_headers.push(DaaScoreTimestamp { daa_score: 355974, timestamp: 1636650005662 });
sample_headers.push(DaaScoreTimestamp { daa_score: 445152, timestamp: 1636737841327 });
sample_headers.push(DaaScoreTimestamp { daa_score: 536709, timestamp: 1636828600930 });
sample_headers.push(DaaScoreTimestamp { daa_score: 624635, timestamp: 1636912614350 });
sample_headers.push(DaaScoreTimestamp { daa_score: 712234, timestamp: 1636999362832 });
sample_headers.push(DaaScoreTimestamp { daa_score: 801831, timestamp: 1637088292662 });
sample_headers.push(DaaScoreTimestamp { daa_score: 890716, timestamp: 1637174890675 });
sample_headers.push(DaaScoreTimestamp { daa_score: 978396, timestamp: 1637260956454 });
sample_headers.push(DaaScoreTimestamp { daa_score: 1068387, timestamp: 1637349078269 });
sample_headers.push(DaaScoreTimestamp { daa_score: 1139626, timestamp: 1637418723538 });
sample_headers.push(DaaScoreTimestamp { daa_score: 1218320, timestamp: 1637495941516 });
} else {
sample_headers = Vec::<DaaScoreTimestamp>::with_capacity(prealloc_len);
}

for header in pp_headers.iter() {
sample_headers.push(DaaScoreTimestamp { daa_score: header.daa_score, timestamp: header.timestamp });
}

// Part 2: Add samples from recent chain blocks
let sc_read = self.storage.selected_chain_store.read();
let low = pp_headers.last().unwrap().hash;
let high = sc_read.get_tip().unwrap().1;

let low_index = sc_read.get_by_hash(low).unwrap_option().unwrap_or(0);
let high_index = sc_read.get_by_hash(high).unwrap_option().unwrap_or(0);
let step_size = cmp::max((high_index - low_index) / (step_divisor as u64), 1);

for index in (low_index + step_size..=high_index).step_by(step_size as usize) {
let chain_block_header = self.storage.headers_store.get_header(sc_read.get_by_index(index).unwrap()).unwrap();
sample_headers.push(DaaScoreTimestamp::from(chain_block_header));
}

sample_headers
}

fn get_virtual_parents(&self) -> BlockHashSet {
self.virtual_stores.read().state.get().unwrap().parents.iter().copied().collect()
}
Expand Down
4 changes: 2 additions & 2 deletions rpc/core/src/api/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ pub trait RpcApi: Sync + Send + AnySync {
}
async fn get_coin_supply_call(&self, request: GetCoinSupplyRequest) -> RpcResult<GetCoinSupplyResponse>;

async fn get_daa_score_timestamp_estimate(&self, daa_score: u64) -> RpcResult<GetDaaScoreTimestampEstimateResponse> {
self.get_daa_score_timestamp_estimate_call(GetDaaScoreTimestampEstimateRequest { daa_score }).await
async fn get_daa_score_timestamp_estimate(&self, daa_scores: Vec<u64>) -> RpcResult<GetDaaScoreTimestampEstimateResponse> {
self.get_daa_score_timestamp_estimate_call(GetDaaScoreTimestampEstimateRequest { daa_scores }).await
}
async fn get_daa_score_timestamp_estimate_call(
&self,
Expand Down
12 changes: 6 additions & 6 deletions rpc/core/src/model/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,24 +760,24 @@ pub struct GetSyncStatusResponse {
#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)]
#[serde(rename_all = "camelCase")]
pub struct GetDaaScoreTimestampEstimateRequest {
pub daa_score: u64,
pub daa_scores: Vec<u64>,
}

impl GetDaaScoreTimestampEstimateRequest {
pub fn new(daa_score: u64) -> Self {
Self { daa_score }
pub fn new(daa_scores: Vec<u64>) -> Self {
Self { daa_scores }
}
}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)]
#[serde(rename_all = "camelCase")]
pub struct GetDaaScoreTimestampEstimateResponse {
pub timestamp: u64,
pub timestamps: Vec<u64>,
}

impl GetDaaScoreTimestampEstimateResponse {
pub fn new(timestamp: u64) -> Self {
Self { timestamp }
pub fn new(timestamps: Vec<u64>) -> Self {
Self { timestamps }
}
}

Expand Down
4 changes: 2 additions & 2 deletions rpc/grpc/core/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -815,10 +815,10 @@ message GetSyncStatusResponseMessage{
}

message GetDaaScoreTimestampEstimateRequestMessage {
uint64 daa_score = 1;
repeated uint64 daa_scores = 1;
}

message GetDaaScoreTimestampEstimateResponseMessage{
uint64 timestamp = 1;
repeated uint64 timestamps = 1;
RPCError error = 1000;
}
8 changes: 4 additions & 4 deletions rpc/grpc/core/src/convert/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,11 @@ from!(item: RpcResult<&kaspa_rpc_core::GetCoinSupplyResponse>, protowire::GetCoi

from!(item: &kaspa_rpc_core::GetDaaScoreTimestampEstimateRequest, protowire::GetDaaScoreTimestampEstimateRequestMessage, {
Self {
daa_score: item.daa_score
daa_scores: item.daa_scores.clone()
}
});
from!(item: RpcResult<&kaspa_rpc_core::GetDaaScoreTimestampEstimateResponse>, protowire::GetDaaScoreTimestampEstimateResponseMessage, {
Self { timestamp: item.timestamp, error: None }
Self { timestamps: item.timestamps.clone(), error: None }
});

from!(&kaspa_rpc_core::PingRequest, protowire::PingRequestMessage);
Expand Down Expand Up @@ -722,11 +722,11 @@ try_from!(item: &protowire::GetCoinSupplyResponseMessage, RpcResult<kaspa_rpc_co

try_from!(item: &protowire::GetDaaScoreTimestampEstimateRequestMessage, kaspa_rpc_core::GetDaaScoreTimestampEstimateRequest , {
Self {
daa_score: item.daa_score
daa_scores: item.daa_scores.clone()
}
});
try_from!(item: &protowire::GetDaaScoreTimestampEstimateResponseMessage, RpcResult<kaspa_rpc_core::GetDaaScoreTimestampEstimateResponse>, {
Self { timestamp: item.timestamp }
Self { timestamps: item.timestamps.clone() }
});

try_from!(&protowire::PingRequestMessage, kaspa_rpc_core::PingRequest);
Expand Down
54 changes: 52 additions & 2 deletions rpc/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use kaspa_utils::{channel::Channel, triggers::SingleTrigger};
use kaspa_utxoindex::api::UtxoIndexProxy;
use kaspa_wrpc_core::ServerCounters as WrpcServerCounters;
use std::{
collections::HashMap,
iter::once,
sync::{atomic::Ordering, Arc},
vec,
Expand Down Expand Up @@ -526,8 +527,57 @@ impl RpcApi for RpcCoreService {
&self,
request: GetDaaScoreTimestampEstimateRequest,
) -> RpcResult<GetDaaScoreTimestampEstimateResponse> {
// TODO: Add the logic here
Ok(GetDaaScoreTimestampEstimateResponse::new(request.daa_score))
let session = self.consensus_manager.consensus().session().await;
let mut headers = session.async_get_chain_block_samples().await;
let mut requested_daa_scores = request.daa_scores.clone();
let mut daa_score_timestamp_map = HashMap::<u64, u64>::new();

headers.reverse();
requested_daa_scores.sort_by(|a, b| b.cmp(a));

let mut header_idx = 0;
let mut req_idx = 0;

// Loop runs at O(n + m) where n = # pp headers, m = # requested daa_scores
// Loop will always end because in the worst case the last header with daa_score = 0 (the genesis)
// will cause every remaining requested daa_score to be "found in range"
while header_idx < headers.len() && req_idx < request.daa_scores.len() {
let header = headers.get(header_idx).unwrap();
let curr_daa_score = requested_daa_scores[req_idx];

// Found daa_score in range
if header.daa_score <= curr_daa_score {
// For the last header, we estimate excess time as seconds
let time_adjustment = if header_idx == 0 {
// Assume difference in daa_score corresponds to seconds since the basis header
(curr_daa_score - header.daa_score).checked_mul(1000).unwrap_or(u64::MAX)
} else {
// "next" header is the one that we processed last iteration
let next_header = &headers[header_idx - 1];
let time_between_now_and_next = next_header.timestamp - header.timestamp;
let score_between_now_and_request = curr_daa_score - header.daa_score;
let score_between_now_and_next = next_header.daa_score - header.daa_score;

(time_between_now_and_next)
.checked_mul(score_between_now_and_request / score_between_now_and_next)
.unwrap_or(u64::MAX)
};

// Use higher types to catch overflows. Cast to lower type later on when confirmed within u64 range
let daa_score_timestamp = header.timestamp.checked_add(time_adjustment).unwrap_or(u64::MAX);

daa_score_timestamp_map.insert(curr_daa_score, daa_score_timestamp);

// Process the next daa score that's <= than current one (at earlier idx)
req_idx += 1;
} else {
header_idx += 1;
}
}

let timestamps = request.daa_scores.iter().map(|curr_daa_score| daa_score_timestamp_map[curr_daa_score]).collect();

Ok(GetDaaScoreTimestampEstimateResponse::new(timestamps))
}

async fn ping_call(&self, _: PingRequest) -> RpcResult<PingResponse> {
Expand Down

0 comments on commit c50cc42

Please sign in to comment.