Skip to content

Commit

Permalink
Add SyncStateChanged to GRPC core
Browse files Browse the repository at this point in the history
Added SyncStateChanged handling in various files including kaspad.rs, message.rs and notification.rs in the GRPC core, which allows the system to notify and handle changes in the sync state.
  • Loading branch information
biryukovmaxim committed Aug 26, 2023
1 parent dd08356 commit e05e3b6
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 25 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rpc/grpc/core/src/convert/kaspad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ pub mod kaspad_response_convert {
impl_into_kaspad_notify_response!(NotifyVirtualDaaScoreChanged);
impl_into_kaspad_notify_response!(NotifyVirtualChainChanged);
impl_into_kaspad_notify_response!(NotifySinkBlueScoreChanged);
impl_into_kaspad_notify_response!(NotifySyncStateChanged);

macro_rules! impl_into_kaspad_response {
($name:tt) => {
Expand Down
12 changes: 12 additions & 0 deletions rpc/grpc/core/src/convert/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,12 @@ from!(item: &kaspa_rpc_core::NotifyVirtualDaaScoreChangedRequest, protowire::Not
});
from!(RpcResult<&kaspa_rpc_core::NotifyVirtualDaaScoreChangedResponse>, protowire::NotifyVirtualDaaScoreChangedResponseMessage);

from!(item: &kaspa_rpc_core::NotifySyncStateChangedRequest, protowire::NotifySyncStateChangedRequestMessage, {
Self { command: item.command.into() }
});

from!(RpcResult<&kaspa_rpc_core::NotifySyncStateChangedResponse>, protowire::NotifySyncStateChangedResponseMessage);

from!(item: &kaspa_rpc_core::NotifyVirtualChainChangedRequest, protowire::NotifyVirtualChainChangedRequestMessage, {
Self { include_accepted_transaction_ids: item.include_accepted_transaction_ids, command: item.command.into() }
});
Expand Down Expand Up @@ -715,6 +721,12 @@ try_from!(item: &protowire::NotifyVirtualDaaScoreChangedRequestMessage, kaspa_rp
});
try_from!(&protowire::NotifyVirtualDaaScoreChangedResponseMessage, RpcResult<kaspa_rpc_core::NotifyVirtualDaaScoreChangedResponse>);

try_from!(item: &protowire::NotifySyncStateChangedRequestMessage, kaspa_rpc_core::NotifySyncStateChangedRequest, {
Self { command: item.command.into() }
});

try_from!(&protowire::NotifySyncStateChangedResponseMessage, RpcResult<kaspa_rpc_core::NotifySyncStateChangedResponse>);

try_from!(item: &protowire::NotifyVirtualChainChangedRequestMessage, kaspa_rpc_core::NotifyVirtualChainChangedRequest, {
Self { include_accepted_transaction_ids: item.include_accepted_transaction_ids, command: item.command.into() }
});
Expand Down
60 changes: 48 additions & 12 deletions rpc/grpc/core/src/convert/notification.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use crate::protowire::{
kaspad_response::Payload, sync_state_changed_notification_message, BlockAddedNotificationMessage, KaspadResponse,
NewBlockTemplateNotificationMessage, RpcNotifyCommand, SyncStateChangedNotificationMessage,
use crate::{
from,
protowire::{
kaspad_response::Payload, sync_state_changed_notification_message, sync_state_changed_notification_message::SyncState,
BlockAddedNotificationMessage, BlocksState, FinalityConflictNotificationMessage, FinalityConflictResolvedNotificationMessage,
HeadersState, KaspadResponse, NewBlockTemplateNotificationMessage, NotifyPruningPointUtxoSetOverrideRequestMessage,
NotifyPruningPointUtxoSetOverrideResponseMessage, NotifyUtxosChangedRequestMessage, NotifyUtxosChangedResponseMessage,
ProofState, PruningPointUtxoSetOverrideNotificationMessage, RpcNotifyCommand, SinkBlueScoreChangedNotificationMessage,
StopNotifyingPruningPointUtxoSetOverrideRequestMessage, StopNotifyingPruningPointUtxoSetOverrideResponseMessage,
StopNotifyingUtxosChangedRequestMessage, StopNotifyingUtxosChangedResponseMessage, SyncStateChangedNotificationMessage,
TrustSyncState, UtxoSyncState, UtxosChangedNotificationMessage, VirtualChainChangedNotificationMessage,
VirtualDaaScoreChangedNotificationMessage,
},
try_from,
};
use crate::protowire::{
FinalityConflictNotificationMessage, FinalityConflictResolvedNotificationMessage, NotifyPruningPointUtxoSetOverrideRequestMessage,
NotifyPruningPointUtxoSetOverrideResponseMessage, NotifyUtxosChangedRequestMessage, NotifyUtxosChangedResponseMessage,
PruningPointUtxoSetOverrideNotificationMessage, SinkBlueScoreChangedNotificationMessage,
StopNotifyingPruningPointUtxoSetOverrideRequestMessage, StopNotifyingPruningPointUtxoSetOverrideResponseMessage,
StopNotifyingUtxosChangedRequestMessage, StopNotifyingUtxosChangedResponseMessage, UtxosChangedNotificationMessage,
VirtualChainChangedNotificationMessage, VirtualDaaScoreChangedNotificationMessage,
};
use crate::{from, try_from};
use kaspa_notify::subscription::Command;
use kaspa_rpc_core::{Notification, RpcError, RpcHash, SyncStateChangedNotification};
use std::str::FromStr;
Expand Down Expand Up @@ -160,6 +162,9 @@ try_from!(item: &Payload, kaspa_rpc_core::Notification, {
Payload::PruningPointUtxoSetOverrideNotification(ref notification) => {
Notification::PruningPointUtxoSetOverride(notification.try_into()?)
}
Payload::SyncStateChangedNotification(ref notification) => {
Notification::SyncStateChanged(notification.try_into()?)
}
_ => Err(RpcError::UnsupportedFeature)?,
}
});
Expand Down Expand Up @@ -226,3 +231,34 @@ from!(item: NotifyUtxosChangedResponseMessage, StopNotifyingUtxosChangedResponse
from!(item: NotifyPruningPointUtxoSetOverrideResponseMessage, StopNotifyingPruningPointUtxoSetOverrideResponseMessage, {
Self { error: item.error }
});

impl TryFrom<&SyncStateChangedNotificationMessage> for kaspa_rpc_core::SyncStateChangedNotification {
type Error = RpcError;

fn try_from(value: &SyncStateChangedNotificationMessage) -> Result<Self, Self::Error> {
let sync_state = value
.sync_state
.as_ref()
.ok_or(RpcError::MissingRpcFieldError(String::from("SyncStateChangedNotificationMessage"), String::from("syncState")))?;
let notification = match sync_state {
SyncState::Proof(ProofState { current, max }) => {
kaspa_rpc_core::SyncStateChangedNotification::Proof { current: *current as u8, max: *max as u8 }
}
SyncState::Headers(HeadersState { headers, progress }) => {
kaspa_rpc_core::SyncStateChangedNotification::Headers { headers: *headers, progress: *progress }
}
SyncState::Blocks(BlocksState { blocks, progress }) => {
kaspa_rpc_core::SyncStateChangedNotification::Blocks { blocks: *blocks, progress: *progress }
}
SyncState::UtxoResync(_) => kaspa_rpc_core::SyncStateChangedNotification::UtxoResync,
SyncState::UtxoSync(UtxoSyncState { chunks, total }) => {
kaspa_rpc_core::SyncStateChangedNotification::UtxoSync { chunks: *chunks, total: *total }
}
SyncState::TrustSync(TrustSyncState { processed, total }) => {
kaspa_rpc_core::SyncStateChangedNotification::TrustSync { processed: *processed, total: *total }
}
SyncState::Synced(_) => kaspa_rpc_core::SyncStateChangedNotification::Synced,
};
Ok(notification)
}
}
27 changes: 14 additions & 13 deletions rpc/grpc/core/src/ext/kaspad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl kaspad_request::Payload {
| Payload::NotifyNewBlockTemplateRequest(_)
| Payload::StopNotifyingUtxosChangedRequest(_)
| Payload::StopNotifyingPruningPointUtxoSetOverrideRequest(_)
| Payload::NotifySyncStateChangedRequest(_)
)
}
}
Expand All @@ -99,21 +100,21 @@ impl KaspadResponse {
}
}

#[allow(clippy::match_like_matches_macro)]
impl kaspad_response::Payload {
pub fn is_notification(&self) -> bool {
use crate::protowire::kaspad_response::Payload;
match self {
Payload::BlockAddedNotification(_) => true,
Payload::VirtualChainChangedNotification(_) => true,
Payload::FinalityConflictNotification(_) => true,
Payload::FinalityConflictResolvedNotification(_) => true,
Payload::UtxosChangedNotification(_) => true,
Payload::SinkBlueScoreChangedNotification(_) => true,
Payload::VirtualDaaScoreChangedNotification(_) => true,
Payload::PruningPointUtxoSetOverrideNotification(_) => true,
Payload::NewBlockTemplateNotification(_) => true,
_ => false,
}
matches!(
self,
Payload::BlockAddedNotification(_)
| Payload::VirtualChainChangedNotification(_)
| Payload::FinalityConflictNotification(_)
| Payload::FinalityConflictResolvedNotification(_)
| Payload::UtxosChangedNotification(_)
| Payload::SinkBlueScoreChangedNotification(_)
| Payload::VirtualDaaScoreChangedNotification(_)
| Payload::PruningPointUtxoSetOverrideNotification(_)
| Payload::NewBlockTemplateNotification(_)
| Payload::SyncStateChangedNotification(_)
)
}
}
13 changes: 13 additions & 0 deletions rpc/grpc/server/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,19 @@ impl Connection {
}
}

Payload::NotifySyncStateChangedRequest(ref request) => {
match kaspa_rpc_core::NotifySyncStateChangedRequest::try_from(request) {
Ok(request) => {
let listener_id = listener_id;
let result = notifier
.clone()
.execute_subscribe_command(listener_id, Scope::SyncStateChanged(Default::default()), request.command)
.await;
NotifySyncStateChangedResponseMessage::from(result).into()
}
Err(err) => NotifySyncStateChangedResponseMessage::from(err).into(),
}
}
_ => {
return Err(GrpcServerError::InvalidSubscriptionPayload);
}
Expand Down

0 comments on commit e05e3b6

Please sign in to comment.