Skip to content

Commit

Permalink
Monthly income report service completed. Clippy fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
kukabi committed Jun 23, 2024
1 parent cc11433 commit bedf564
Show file tree
Hide file tree
Showing 21 changed files with 292 additions and 23 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ anyhow = "1.0"
itertools = "0.12.0"
lazy_static = "1.4"
log = "0.4"
num-traits = "0.2"
thiserror = "1.0"
2 changes: 1 addition & 1 deletion subvt-network-status-updater/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl NetworkStatusUpdater {
))?;
let status_json_string = serde_json::to_string(status)?;
let mut redis_cmd_pipeline = Pipeline::new();
redis_cmd_pipeline
() = redis_cmd_pipeline
.cmd("SET")
.arg(format!("subvt:{}:network_status", CONFIG.substrate.chain))
.arg(status_json_string)
Expand Down
6 changes: 3 additions & 3 deletions subvt-notification-generator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Generates notifications according to the notification rules depending on three sources of data:
//! 1. Validator list updates from Redis, updated by `subvt-validator-list-updater`, and published
//! using the Redis notification (PUBLISH) support.
//! using the Redis notification (PUBLISH) support.
//! 2. Events and extrinsics in new blocks. Block are processed by `subvt-block-processor`, and the
//! finishing of the processing of a block is signalled by the processor by means of PostgreSQL
//! notifications.
//! finishing of the processing of a block is signalled by the processor by means of PostgreSQL
//! notifications.
//! 3. Regular Telemetry checks (this is work in progress still).
#![warn(clippy::disallowed_types)]
use async_trait::async_trait;
Expand Down
2 changes: 1 addition & 1 deletion subvt-persistence/src/postgres/app/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl PostgreSQLAppStorage {
.bind(&notification.validator_account_json)
.bind(&notification.notification_type_code)
.bind(notification.user_notification_channel_id as i32)
.bind(&notification.notification_channel.to_string())
.bind(notification.notification_channel.to_string())
.bind(&notification.notification_target)
.bind(&notification.data_json)
.bind(&notification.error_log)
Expand Down
4 changes: 2 additions & 2 deletions subvt-persistence/src/postgres/app/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl PostgreSQLAppStorage {
"#,
)
.bind(user_notification_channel.user_id as i32)
.bind(&user_notification_channel.channel.to_string())
.bind(user_notification_channel.channel.to_string())
.bind(&user_notification_channel.target)
.fetch_one(&self.connection_pool)
.await?;
Expand All @@ -237,7 +237,7 @@ impl PostgreSQLAppStorage {
"#,
)
.bind(user_notification_channel.user_id as i32)
.bind(&user_notification_channel.channel.to_string())
.bind(user_notification_channel.channel.to_string())
.bind(&user_notification_channel.target)
.fetch_one(&self.connection_pool)
.await?;
Expand Down
106 changes: 106 additions & 0 deletions subvt-persistence/src/postgres/network/kline/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use crate::postgres::network::PostgreSQLNetworkStorage;
use sqlx::types::BigDecimal;
use subvt_types::kline::KLine;

type DBKline = (
i32,
i64,
String,
String,
BigDecimal,
BigDecimal,
BigDecimal,
BigDecimal,
BigDecimal,
i64,
BigDecimal,
i32,
BigDecimal,
BigDecimal,
);

impl PostgreSQLNetworkStorage {
pub async fn kline_exists(
&self,
timestamp: i64,
source_ticker: &str,
target_ticker: &str,
) -> anyhow::Result<bool> {
let record_count: (i64,) = sqlx::query_as(
r#"
SELECT COUNT(id) FROM sub_kline_historical
WHERE open_time = $1 AND source_ticker = $2 AND target_ticker = $3
"#,
)
.bind(timestamp)
.bind(source_ticker)
.bind(target_ticker)
.fetch_one(&self.connection_pool)
.await?;
Ok(record_count.0 > 0)
}

pub async fn save_kline(&self, kline: &KLine) -> anyhow::Result<i32> {
let save_result: (i32,) = sqlx::query_as(
r#"
INSERT INTO sub_kline_historical (open_time, source_ticker, target_ticker, "open", high, low, "close", volume, close_time, quote_volume, "count", taker_buy_volume, taker_buy_quote_volume)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (open_time, source_ticker, target_ticker)
DO UPDATE SET taker_buy_quote_volume = EXCLUDED.taker_buy_quote_volume
RETURNING id
"#,
)
.bind(kline.open_time as i64)
.bind(kline.source_ticker.as_str())
.bind(kline.target_ticker.as_str())
.bind(&kline.open)
.bind(&kline.high)
.bind(&kline.low)
.bind(&kline.close)
.bind(&kline.volume)
.bind(kline.close_time as i64)
.bind(&kline.quote_volume)
.bind(kline.count as i32)
.bind(&kline.taker_buy_volume)
.bind(&kline.taker_buy_quote_volume)
.fetch_one(&self.connection_pool)
.await?;
Ok(save_result.0)
}

pub async fn get_kline_count(&self) -> anyhow::Result<u64> {
let record_count: (i64,) = sqlx::query_as("SELECT COUNT(id) FROM sub_kline_historical")
.fetch_one(&self.connection_pool)
.await?;
Ok(record_count.0 as u64)
}

pub async fn get_kline(&self, timestamp: u64) -> anyhow::Result<KLine> {
let db_kline: DBKline = sqlx::query_as(
r#"
SELECT id, open_time, source_ticker, target_ticker, "open", high, low, "close", volume, close_time, quote_volume, "count", taker_buy_volume, taker_buy_quote_volume
FROM sub_kline_historical
WHERE open_time = $1
"#,
)
.bind(timestamp as i64)
.fetch_one(&self.connection_pool)
.await?;
Ok(KLine {
id: db_kline.0 as u32,
open_time: db_kline.1 as u64,
source_ticker: db_kline.2,
target_ticker: db_kline.3,
open: db_kline.4,
high: db_kline.5,
low: db_kline.6,
close: db_kline.7,
volume: db_kline.8,
close_time: db_kline.9 as u64,
quote_volume: db_kline.10,
count: db_kline.11 as u32,
taker_buy_volume: db_kline.12,
taker_buy_quote_volume: db_kline.13,
})
}
}
1 change: 1 addition & 0 deletions subvt-persistence/src/postgres/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod era;
pub mod error_log;
pub mod event;
pub mod extrinsic;
pub mod kline;
pub mod nft;
pub mod notify;
pub mod onekv;
Expand Down
2 changes: 1 addition & 1 deletion subvt-persistence/src/postgres/network/onekv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl PostgreSQLNetworkStorage {
.bind(candidate.inclusion)
.bind(candidate.commission)
.bind(candidate.is_active)
.bind(&candidate.unclaimed_eras.as_ref().map(|v| v.iter().map(|i| *i as i64).collect::<Vec<i64>>()))
.bind(candidate.unclaimed_eras.as_ref().map(|v| v.iter().map(|i| *i as i64).collect::<Vec<i64>>()))
.bind(candidate.nominated_at.map(|last_valid| last_valid as i64))
.bind(candidate.offline_accumulated)
.bind(candidate.offline_since as i64)
Expand Down
39 changes: 38 additions & 1 deletion subvt-persistence/src/postgres/network/report/rewards.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::postgres::network::PostgreSQLNetworkStorage;
use std::str::FromStr;
use subvt_types::crypto::AccountId;
use subvt_types::report::ValidatorTotalReward;
use subvt_types::report::{Reward, ValidatorTotalReward};
use subvt_types::substrate::{Balance, Era};

impl PostgreSQLNetworkStorage {
Expand Down Expand Up @@ -70,4 +70,41 @@ impl PostgreSQLNetworkStorage {
}
Ok(result)
}

pub async fn get_rewards_in_time_range(
&self,
rewardee_account_id: &AccountId,
start_timestamp: u64,
end_timestamp: u64,
) -> anyhow::Result<Vec<Reward>> {
let db_rewards: Vec<(i32, String, i64, i64, i32, i32, String)> = sqlx::query_as(
r#"
SELECT E.id, E.block_hash, B.number, B.timestamp, E.extrinsic_index, E.event_index, E.amount
FROM sub_event_rewarded E
INNER JOIN sub_block B ON B.hash = E.block_hash
WHERE B.timestamp >= $1 AND B.timestamp < $2
AND E.rewardee_account_id = $3
ORDER BY B.timestamp ASC
"#,
)
.bind(start_timestamp as i64)
.bind(end_timestamp as i64)
.bind(rewardee_account_id.to_string())
.fetch_all(&self.connection_pool)
.await?;
let mut rewards = Vec::new();
for db_reward in db_rewards.iter() {
rewards.push(Reward {
id: db_reward.0 as u32,
block_hash: db_reward.1.clone(),
block_number: db_reward.2 as u64,
block_timestamp: db_reward.3 as u64,
extrinsic_index: db_reward.4 as u32,
event_index: db_reward.5 as u32,
rewardee_account_id: *rewardee_account_id,
amount: db_reward.6.parse()?,
})
}
Ok(rewards)
}
}
8 changes: 4 additions & 4 deletions subvt-persistence/src/postgres/network/telegram/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl PostgreSQLNetworkStorage {
)
.bind(app_user_id as i32)
.bind(telegram_chat_id)
.bind(&state.to_string())
.bind(state.to_string())
.execute(&self.connection_pool)
.await?;
Ok(())
Expand Down Expand Up @@ -284,7 +284,7 @@ impl PostgreSQLNetworkStorage {
"#,
)
.bind(telegram_chat_id)
.bind(&validator_account_id.to_string())
.bind(validator_account_id.to_string())
.fetch_one(&self.connection_pool)
.await?;
Ok(record_count.0 > 0)
Expand All @@ -307,7 +307,7 @@ impl PostgreSQLNetworkStorage {
"#,
)
.bind(telegram_chat_id)
.bind(&account_id.to_string())
.bind(account_id.to_string())
.bind(address)
.bind(display)
.fetch_one(&self.connection_pool)
Expand Down Expand Up @@ -348,7 +348,7 @@ impl PostgreSQLNetworkStorage {
WHERE telegram_chat_id = $2
"#,
)
.bind(&state.to_string())
.bind(state.to_string())
.bind(telegram_chat_id)
.execute(&self.connection_pool)
.await?;
Expand Down
8 changes: 4 additions & 4 deletions subvt-persistence/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Redis {
block_summary: &BlockSummary,
) -> anyhow::Result<()> {
let mut connection = self.client.get_multiplexed_async_connection().await?;
redis::cmd("MSET")
() = redis::cmd("MSET")
.arg(format!(
"subvt:{}:validators:finalized_block_number",
CONFIG.substrate.chain
Expand All @@ -55,7 +55,7 @@ impl Redis {
account_id: &AccountId,
) -> anyhow::Result<()> {
let mut connection = self.client.get_multiplexed_async_connection().await?;
redis::cmd("SADD")
() = redis::cmd("SADD")
.arg(format!(
"subvt:{}:validators:{finalized_block_number}:active:account_id_set",
CONFIG.substrate.chain,
Expand All @@ -73,7 +73,7 @@ impl Redis {
) -> anyhow::Result<()> {
let mut connection = self.client.get_multiplexed_async_connection().await?;
let validator_details_json = serde_json::to_string(validator_details)?;
redis::cmd("SET")
() = redis::cmd("SET")
.arg(format!(
"subvt:{}:validators:{finalized_block_number}:active:validator:{}",
CONFIG.substrate.chain, validator_details.account.id,
Expand Down Expand Up @@ -171,7 +171,7 @@ impl Redis {
pub async fn set_network_status(&self, network_status: &NetworkStatus) -> anyhow::Result<()> {
let mut connection = self.client.get_multiplexed_async_connection().await?;
let network_status_json = serde_json::to_string(network_status)?;
redis::cmd("SET")
() = redis::cmd("SET")
.arg(format!("subvt:{}:network_status", CONFIG.substrate.chain))
.arg(network_status_json)
.query_async(&mut connection)
Expand Down
2 changes: 2 additions & 0 deletions subvt-report-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rust-version = "1.67.0"
actix-web = "4.4"
anyhow = { workspace = true }
async-trait = "0.1"
chrono = "0.4"
futures-util = "0.3"
hex = "0.4"
lazy_static = { workspace = true }
Expand All @@ -17,6 +18,7 @@ redis = { version = "0.25", features = ["tokio-comp"] }
rustc-hash = "1.1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sqlx = { git = "https://github.com/helikon-labs/sqlx.git", branch = "helikon-increased-field-count", features = ["bigdecimal"] }
subvt-config = { path = "../subvt-config" }
subvt-metrics = { path = "../subvt-metrics" }
subvt-persistence = { path = "../subvt-persistence" }
Expand Down
Loading

0 comments on commit bedf564

Please sign in to comment.