Skip to content

Commit

Permalink
feat(inx): use bee-inx (#470)
Browse files Browse the repository at this point in the history
* feat(inx): use `bee-inx`

* Fmt

* Small changes

* update patch

* waahhhhh

* Remove patch
  • Loading branch information
grtlr authored Jul 27, 2022
1 parent 8470cae commit 1426dc8
Show file tree
Hide file tree
Showing 44 changed files with 508 additions and 649 deletions.
467 changes: 63 additions & 404 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,22 @@ tower-http = { version = "0.3", default-features = false, features = ["cors", "c
zeroize = { version = "1.5", default-features = false, features = ["std"], optional = true }

# INX
inx = { git = "https://github.com/iotaledger/inx", version = "1.0.0-beta.1", default-features = false, features = ["types"], optional = true }
bee-inx = { version = "1.0.0-beta.2", default-features = false, optional = true }
tonic = { version = "0.7.2", default-features = false, optional = true }

# Metrics
bee-metrics = { git = "https://github.com/iotaledger/bee", branch = "mainnet-develop-0.4", default-features = false, features = ["sync"], optional = true }

# Stardust types
bee-api-types-stardust = { package = "bee-api-types", git = "https://github.com/iotaledger/bee.git", branch = "shimmer-develop", default-features = false, optional = true }
bee-block-stardust = { package = "bee-block", git = "https://github.com/iotaledger/bee.git", branch = "shimmer-develop", default-features = false, features = ["std", "serde", "dto"], optional = true }
bee-api-types-stardust = { package = "bee-api-types", version = "1.0.0-beta.4", default-features = false, optional = true }
bee-block-stardust = { package = "bee-block", version = "1.0.0-beta.4", default-features = false, features = [ "dto", "std", "serde", ], optional = true }

# Tokio Console
console-subscriber = { version = "0.1", default-features = false, optional = true }


[dev-dependencies]
bee-test = { package = "bee-test", git = "https://github.com/iotaledger/bee.git", branch = "shimmer-develop", default-features = false }
bee-block-stardust = { package = "bee-block", version = "1.0.0-beta.4", default-features = false, features = [ "dto", "rand", "std", "serde", ] }
packable = { version = "0.5", default-features = false }

[features]
Expand Down Expand Up @@ -111,7 +113,8 @@ console = [
"tokio/tracing",
]
inx = [
"dep:inx",
"dep:bee-inx",
"dep:tonic",
]
metrics = [
"dep:bee-metrics",
Expand Down
3 changes: 1 addition & 2 deletions bin/inx-chronicle/config.template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ public_routes = [
"api/info",

# Activated APIs.
"api/analytics/v2/*",
"api/core/v2/*",
"api/history/v2/*",
"api/indexer/v1/*",

"api/analytics/v2/addresses*",
]

[inx]
Expand Down
10 changes: 5 additions & 5 deletions bin/inx-chronicle/src/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,11 @@ impl HandleEvent<Report<super::stardust_inx::InxWorker>> for Launcher {
cx.abort().await;
}
},
InxError::Read(e) => match e.code() {
inx::tonic::Code::DeadlineExceeded
| inx::tonic::Code::ResourceExhausted
| inx::tonic::Code::Aborted
| inx::tonic::Code::Unavailable => {
InxError::BeeInx(bee_inx::Error::StatusCode(e)) => match e.code() {
tonic::Code::DeadlineExceeded
| tonic::Code::ResourceExhausted
| tonic::Code::Aborted
| tonic::Code::Unavailable => {
cx.spawn_child(report.actor).await;
}
_ => {
Expand Down
30 changes: 10 additions & 20 deletions bin/inx-chronicle/src/stardust_inx/cone_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,27 @@
// SPDX-License-Identifier: Apache-2.0

use async_trait::async_trait;
use bee_inx::client::Inx;
use chronicle::{
db::MongoDb,
runtime::{Actor, ActorContext, HandleEvent},
types::tangle::MilestoneIndex,
};
use inx::{
client::InxClient,
tonic::{Channel, Status},
BlockWithMetadata,
};

use super::InxError;

#[derive(Debug)]
pub struct ConeStream {
pub milestone_index: MilestoneIndex,
inx_client: InxClient<Channel>,
inx: Inx,
db: MongoDb,
}

impl ConeStream {
pub fn new(milestone_index: MilestoneIndex, inx_client: InxClient<Channel>, db: MongoDb) -> Self {
pub fn new(milestone_index: MilestoneIndex, inx: Inx, db: MongoDb) -> Self {
Self {
milestone_index,
inx_client,
inx,
db,
}
}
Expand All @@ -38,11 +34,7 @@ impl Actor for ConeStream {
type Error = InxError;

async fn init(&mut self, cx: &mut ActorContext<Self>) -> Result<Self::State, Self::Error> {
let cone_stream = self
.inx_client
.read_milestone_cone(inx::proto::MilestoneRequest::from_index(self.milestone_index.0))
.await?
.into_inner();
let cone_stream = self.inx.read_milestone_cone(self.milestone_index.0.into()).await?;
cx.add_stream(cone_stream);
Ok(())
}
Expand All @@ -69,21 +61,19 @@ impl Actor for ConeStream {
}

#[async_trait]
impl HandleEvent<Result<inx::proto::BlockWithMetadata, Status>> for ConeStream {
impl HandleEvent<Result<bee_inx::BlockWithMetadata, bee_inx::Error>> for ConeStream {
async fn handle_event(
&mut self,
_cx: &mut ActorContext<Self>,
block_metadata_result: Result<inx::proto::BlockWithMetadata, Status>,
block_metadata_result: Result<bee_inx::BlockWithMetadata, bee_inx::Error>,
_state: &mut Self::State,
) -> Result<(), Self::Error> {
log::trace!("Received Stardust block event");
let block_metadata = block_metadata_result?;
log::trace!("Block data: {:?}", block_metadata);
let inx_block_with_metadata: inx::BlockWithMetadata = block_metadata.try_into()?;
let BlockWithMetadata { metadata, block, raw } = inx_block_with_metadata;
let bee_inx::BlockWithMetadata { block, metadata } = block_metadata_result?;
log::trace!("Block id: {:?}", metadata.block_id);

self.db
.insert_block_with_metadata(block.into(), raw, metadata.into())
.insert_block_with_metadata(block.clone().inner()?.into(), block.data(), metadata.into())
.await?;

log::trace!("Inserted block into database.");
Expand Down
6 changes: 2 additions & 4 deletions bin/inx-chronicle/src/stardust_inx/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub enum InxError {
#[error("expected INX address with format `http://<address>:<port>`, but found `{0}`")]
InvalidAddress(String),
#[error("INX type conversion error: {0:?}")]
InxTypeConversion(#[from] inx::Error),
InxTypeConversion(#[from] bee_block_stardust::InxError),
#[error("missing milestone id for milestone index `{0}`")]
MissingMilestoneInfo(MilestoneIndex),
#[error(transparent)]
Expand All @@ -21,11 +21,9 @@ pub enum InxError {
#[error(transparent)]
ParsingAddressFailed(#[from] url::ParseError),
#[error(transparent)]
Read(#[from] inx::tonic::Status),
#[error(transparent)]
Runtime(#[from] chronicle::runtime::RuntimeError),
#[error(transparent)]
Tonic(#[from] inx::tonic::Error),
BeeInx(#[from] bee_inx::Error),
}

impl ErrorLevel for InxError {
Expand Down
52 changes: 29 additions & 23 deletions bin/inx-chronicle/src/stardust_inx/ledger_update_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,24 @@
use std::ops::RangeInclusive;

use async_trait::async_trait;
use bee_inx::client::Inx;
use chronicle::{
db::MongoDb,
runtime::{Actor, ActorContext, ActorError, HandleEvent, Report},
types::tangle::MilestoneIndex,
};
use inx::{
client::InxClient,
tonic::{Channel, Status},
types::tangle::{MilestoneIndex, ProtocolInfo, ProtocolParameters},
};

use super::{cone_stream::ConeStream, InxError};

#[derive(Debug)]
pub struct LedgerUpdateStream {
db: MongoDb,
inx: InxClient<Channel>,
inx: Inx,
range: RangeInclusive<MilestoneIndex>,
}

impl LedgerUpdateStream {
pub fn new(db: MongoDb, inx: InxClient<Channel>, range: RangeInclusive<MilestoneIndex>) -> Self {
pub fn new(db: MongoDb, inx: Inx, range: RangeInclusive<MilestoneIndex>) -> Self {
Self { db, inx, range }
}
}
Expand All @@ -38,12 +35,11 @@ impl Actor for LedgerUpdateStream {
let ledger_update_stream = self
.inx
.listen_to_ledger_updates(if *self.range.end() == u32::MAX {
inx::proto::MilestoneRangeRequest::from(*self.range.start()..)
(self.range.start().0..).into()
} else {
inx::proto::MilestoneRangeRequest::from(self.range.clone())
(self.range.start().0..self.range.end().0).into()
})
.await?
.into_inner();
.await?;
cx.add_stream(ledger_update_stream);
Ok(())
}
Expand Down Expand Up @@ -81,31 +77,41 @@ impl HandleEvent<Report<ConeStream>> for LedgerUpdateStream {
}

#[async_trait]
impl HandleEvent<Result<inx::proto::LedgerUpdate, Status>> for LedgerUpdateStream {
impl HandleEvent<Result<bee_inx::LedgerUpdate, bee_inx::Error>> for LedgerUpdateStream {
async fn handle_event(
&mut self,
cx: &mut ActorContext<Self>,
ledger_update_result: Result<inx::proto::LedgerUpdate, Status>,
ledger_update_result: Result<bee_inx::LedgerUpdate, bee_inx::Error>,
_state: &mut Self::State,
) -> Result<(), Self::Error> {
log::trace!("Received ledger update event {:#?}", ledger_update_result);

let ledger_update = inx::LedgerUpdate::try_from(ledger_update_result?)?;
let ledger_update = ledger_update_result?;

let output_updates_iter = Vec::from(ledger_update.created)
let output_updates = Vec::from(ledger_update.created)
.into_iter()
.map(Into::into)
.chain(Vec::from(ledger_update.consumed).into_iter().map(Into::into));
.map(TryInto::try_into)
.chain(Vec::from(ledger_update.consumed).into_iter().map(TryInto::try_into))
.collect::<Result<Vec<_>, _>>()?;

self.db.insert_ledger_updates(output_updates_iter).await?;
self.db.insert_ledger_updates(output_updates.into_iter()).await?;

let milestone_request = inx::proto::MilestoneRequest::from_index(ledger_update.milestone_index);

let milestone_proto = self.inx.read_milestone(milestone_request.clone()).await?.into_inner();
let milestone = self.inx.read_milestone(ledger_update.milestone_index.into()).await?;
let parameters: ProtocolParameters = self
.inx
.read_protocol_parameters(ledger_update.milestone_index.into())
.await?
.inner()?
.into();

log::trace!("Received milestone: `{:?}`", milestone_proto);
self.db
.set_protocol_parameters(ProtocolInfo {
parameters,
tangle_index: ledger_update.milestone_index.into(),
})
.await?;

let milestone: inx::Milestone = milestone_proto.try_into()?;
log::trace!("Received milestone: `{:?}`", milestone);

let milestone_index = milestone.milestone_info.milestone_index.into();
let milestone_timestamp = milestone.milestone_info.milestone_timestamp.into();
Expand Down
Loading

0 comments on commit 1426dc8

Please sign in to comment.