Skip to content

Commit

Permalink
Correct best_hash for the frontier backend (#20)
Browse files Browse the repository at this point in the history
* Add debug info

* Adjust the kv db

* Try fix the compile

* Fix the compile

* Add sql impl

* Increase the `read_notification_timeout` time
  • Loading branch information
boundless-forest authored Apr 26, 2024
1 parent 6365fe9 commit eb643a2
Show file tree
Hide file tree
Showing 19 changed files with 130 additions and 87 deletions.
3 changes: 3 additions & 0 deletions client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ pub trait Backend<Block: BlockT>: Send + Sync {
fn is_indexed(&self) -> bool {
self.log_indexer().is_indexed()
}

/// Get the latest substrate block hash in the sql database.
async fn best_hash(&self) -> Result<Block::Hash, String>;
}

#[derive(Debug, Eq, PartialEq)]
Expand Down
13 changes: 6 additions & 7 deletions client/cli/src/frontier_db_cmd/mapping_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,21 @@ pub enum MappingKey {
EthBlockOrTransactionHash(H256),
}

pub struct MappingDb<'a, C, B: BlockT> {
pub struct MappingDb<'a, B: BlockT, C: HeaderBackend<B>> {
cmd: &'a FrontierDbCmd,
client: Arc<C>,
backend: Arc<fc_db::kv::Backend<B>>,
backend: Arc<fc_db::kv::Backend<B, C>>,
}

impl<'a, C, B: BlockT> MappingDb<'a, C, B>
impl<'a, B: BlockT, C> MappingDb<'a, B, C>
where
C: ProvideRuntimeApi<B>,
C: HeaderBackend<B> + ProvideRuntimeApi<B>,
C::Api: EthereumRuntimeRPCApi<B>,
C: HeaderBackend<B>,
{
pub fn new(
cmd: &'a FrontierDbCmd,
client: Arc<C>,
backend: Arc<fc_db::kv::Backend<B>>,
backend: Arc<fc_db::kv::Backend<B, C>>,
) -> Self {
Self {
cmd,
Expand Down Expand Up @@ -176,4 +175,4 @@ where
}
}

impl<'a, C, B: BlockT> FrontierDbMessage for MappingDb<'a, C, B> {}
impl<'a, B: BlockT, C: HeaderBackend<B>> FrontierDbMessage for MappingDb<'a, B, C> {}
11 changes: 6 additions & 5 deletions client/cli/src/frontier_db_cmd/meta_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
use ethereum_types::H256;
use serde::Deserialize;
// Substrate
use sp_blockchain::HeaderBackend;
use sp_runtime::traits::Block as BlockT;

use super::{utils::FrontierDbMessage, FrontierDbCmd, Operation};
Expand Down Expand Up @@ -57,13 +58,13 @@ impl FromStr for MetaKey {
}
}

pub struct MetaDb<'a, B: BlockT> {
pub struct MetaDb<'a, B: BlockT, C: HeaderBackend<B>> {
cmd: &'a FrontierDbCmd,
backend: Arc<fc_db::kv::Backend<B>>,
backend: Arc<fc_db::kv::Backend<B, C>>,
}

impl<'a, B: BlockT> MetaDb<'a, B> {
pub fn new(cmd: &'a FrontierDbCmd, backend: Arc<fc_db::kv::Backend<B>>) -> Self {
impl<'a, B: BlockT, C: HeaderBackend<B>> MetaDb<'a, B, C> {
pub fn new(cmd: &'a FrontierDbCmd, backend: Arc<fc_db::kv::Backend<B, C>>) -> Self {
Self { cmd, backend }
}

Expand Down Expand Up @@ -151,4 +152,4 @@ impl<'a, B: BlockT> MetaDb<'a, B> {
}
}

impl<'a, B: BlockT> FrontierDbMessage for MetaDb<'a, B> {}
impl<'a, B: BlockT, C: HeaderBackend<B>> FrontierDbMessage for MetaDb<'a, B, C> {}
7 changes: 3 additions & 4 deletions client/cli/src/frontier_db_cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,14 @@ pub enum DbValue<H> {
}

impl FrontierDbCmd {
pub fn run<C, B: BlockT>(
pub fn run<B: BlockT, C>(
&self,
client: Arc<C>,
backend: Arc<fc_db::kv::Backend<B>>,
backend: Arc<fc_db::kv::Backend<B, C>>,
) -> sc_cli::Result<()>
where
C: ProvideRuntimeApi<B>,
C: HeaderBackend<B> + ProvideRuntimeApi<B>,
C::Api: fp_rpc::EthereumRuntimeRPCApi<B>,
C: HeaderBackend<B>,
{
match self.column {
Column::Meta => {
Expand Down
4 changes: 2 additions & 2 deletions client/cli/src/frontier_db_cmd/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ type OpaqueBlock =
pub fn open_frontier_backend<Block: BlockT, C: HeaderBackend<Block>>(
client: Arc<C>,
path: PathBuf,
) -> Result<Arc<fc_db::kv::Backend<Block>>, String> {
Ok(Arc::new(fc_db::kv::Backend::<Block>::new(
) -> Result<Arc<fc_db::kv::Backend<Block, C>>, String> {
Ok(Arc::new(fc_db::kv::Backend::<Block, C>::new(
client,
&fc_db::kv::DatabaseSettings {
source: sc_client_db::DatabaseSource::RocksDb {
Expand Down
22 changes: 13 additions & 9 deletions client/db/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use sp_blockchain::HeaderBackend;
use sp_core::{H160, H256};
pub use sp_database::Database;
use sp_runtime::traits::Block as BlockT;

// Frontier
use fc_api::{FilteredLog, TransactionMetadata};
use fp_storage::{EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA_CACHE};
Expand Down Expand Up @@ -62,14 +63,15 @@ pub mod static_keys {
}

#[derive(Clone)]
pub struct Backend<Block: BlockT> {
pub struct Backend<Block: BlockT, C: HeaderBackend<Block>> {
client: Arc<C>,
meta: Arc<MetaDb<Block>>,
mapping: Arc<MappingDb<Block>>,
log_indexer: LogIndexerBackend<Block>,
}

#[async_trait::async_trait]
impl<Block: BlockT> fc_api::Backend<Block> for Backend<Block> {
impl<Block: BlockT, C: HeaderBackend<Block>> fc_api::Backend<Block> for Backend<Block, C> {
async fn block_hash(
&self,
ethereum_block_hash: &H256,
Expand All @@ -88,6 +90,10 @@ impl<Block: BlockT> fc_api::Backend<Block> for Backend<Block> {
fn log_indexer(&self) -> &dyn fc_api::LogIndexerBackend<Block> {
&self.log_indexer
}

async fn best_hash(&self) -> Result<Block::Hash, String> {
Ok(self.client.info().best_hash)
}
}

#[derive(Clone, Default)]
Expand Down Expand Up @@ -115,8 +121,8 @@ pub fn frontier_database_dir(db_config_dir: &Path, db_path: &str) -> PathBuf {
db_config_dir.join("frontier").join(db_path)
}

impl<Block: BlockT> Backend<Block> {
pub fn open<C: HeaderBackend<Block>>(
impl<Block: BlockT, C: HeaderBackend<Block>> Backend<Block, C> {
pub fn open(
client: Arc<C>,
database: &DatabaseSource,
db_config_dir: &Path,
Expand Down Expand Up @@ -148,13 +154,11 @@ impl<Block: BlockT> Backend<Block> {
)
}

pub fn new<C: HeaderBackend<Block>>(
client: Arc<C>,
config: &DatabaseSettings,
) -> Result<Self, String> {
let db = utils::open_database::<Block, C>(client, config)?;
pub fn new(client: Arc<C>, config: &DatabaseSettings) -> Result<Self, String> {
let db = utils::open_database::<Block, C>(client.clone(), config)?;

Ok(Self {
client,
mapping: Arc::new(MappingDb {
db: db.clone(),
write_lock: Arc::new(Mutex::new(())),
Expand Down
6 changes: 4 additions & 2 deletions client/db/src/kv/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,10 @@ mod tests {
pub fn open_frontier_backend<Block: BlockT, C: HeaderBackend<Block>>(
client: Arc<C>,
setting: &crate::kv::DatabaseSettings,
) -> Result<Arc<crate::kv::Backend<Block>>, String> {
Ok(Arc::new(crate::kv::Backend::<Block>::new(client, setting)?))
) -> Result<Arc<crate::kv::Backend<Block, C>>, String> {
Ok(Arc::new(crate::kv::Backend::<Block, C>::new(
client, setting,
)?))
}

#[cfg_attr(not(feature = "rocksdb"), ignore)]
Expand Down
11 changes: 7 additions & 4 deletions client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

#![deny(unused_crate_dependencies)]
// #![deny(unused_crate_dependencies)]

use std::sync::Arc;

// Substrate
pub use sc_client_db::DatabaseSource;
use sp_blockchain::HeaderBackend;
use sp_runtime::traits::Block as BlockT;

pub mod kv;
#[cfg(feature = "sql")]
pub mod sql;

#[derive(Clone)]
pub enum Backend<Block: BlockT> {
KeyValue(kv::Backend<Block>),
pub enum Backend<Block: BlockT, C: HeaderBackend<Block>> {
KeyValue(Arc<kv::Backend<Block, C>>),
#[cfg(feature = "sql")]
Sql(sql::Backend<Block>),
Sql(Arc<sql::Backend<Block>>),
}
35 changes: 30 additions & 5 deletions client/db/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,8 @@ pub enum BackendConfig<'a> {
pub struct Backend<Block: BlockT> {
/// The Sqlite connection.
pool: SqlitePool,

/// The additional overrides for the logs handler.
overrides: Arc<OverrideHandle<Block>>,

/// The number of allowed operations for the Sqlite filter call.
/// A value of `0` disables the timeout.
num_ops_timeout: i32,
Expand Down Expand Up @@ -239,6 +237,7 @@ where
let block_number = 0i32;
let is_canon = 1i32;

let mut tx = self.pool().begin().await?;
let _ = sqlx::query(
"INSERT OR IGNORE INTO blocks(
ethereum_block_hash,
Expand All @@ -253,8 +252,20 @@ where
.bind(block_number)
.bind(schema)
.bind(is_canon)
.execute(self.pool())
.execute(&mut *tx)
.await?;

sqlx::query("INSERT INTO sync_status(substrate_block_hash) VALUES (?)")
.bind(substrate_block_hash)
.execute(&mut *tx)
.await?;
sqlx::query("UPDATE sync_status SET status = 1 WHERE substrate_block_hash = ?")
.bind(substrate_block_hash)
.execute(&mut *tx)
.await?;

tx.commit().await?;
log::debug!(target: "frontier-sql", "The genesis block information has been submitted.");
}
Some(substrate_genesis_hash)
} else {
Expand Down Expand Up @@ -509,7 +520,6 @@ where
});
// https://www.sqlite.org/pragma.html#pragma_optimize
let _ = sqlx::query("PRAGMA optimize").execute(&pool).await;
log::debug!(target: "frontier-sql", "Batch committed");
}

fn get_logs<Client, BE>(
Expand Down Expand Up @@ -686,7 +696,7 @@ where
}

/// Retrieve the block hash for the last indexed canon block.
pub async fn get_last_indexed_canon_block(&self) -> Result<H256, Error> {
pub async fn last_indexed_canon_block(&self) -> Result<H256, Error> {
let row = sqlx::query(
"SELECT b.substrate_block_hash FROM blocks AS b
INNER JOIN sync_status AS s
Expand Down Expand Up @@ -853,6 +863,21 @@ impl<Block: BlockT<Hash = H256>> fc_api::Backend<Block> for Backend<Block> {
fn log_indexer(&self) -> &dyn fc_api::LogIndexerBackend<Block> {
self
}

async fn best_hash(&self) -> Result<Block::Hash, String> {
// Retrieves the block hash for the latest indexed block, maybe it's not canon.
sqlx::query(
"SELECT b.substrate_block_hash FROM blocks AS b
INNER JOIN sync_status AS s
ON s.substrate_block_hash = b.substrate_block_hash
WHERE s.status = 1
ORDER BY b.block_number DESC LIMIT 1",
)
.fetch_one(self.pool())
.await
.map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
.map_err(|e| format!("Failed to fetch best hash: {}", e))
}
}

#[async_trait::async_trait]
Expand Down
15 changes: 8 additions & 7 deletions client/mapping-sync/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrat
pub fn sync_block<Block: BlockT, C, BE>(
client: &C,
overrides: Arc<OverrideHandle<Block>>,
backend: &fc_db::kv::Backend<Block>,
backend: &fc_db::kv::Backend<Block, C>,
header: &Block::Header,
) -> Result<(), String>
where
Expand Down Expand Up @@ -111,11 +111,11 @@ where

pub fn sync_genesis_block<Block: BlockT, C>(
client: &C,
backend: &fc_db::kv::Backend<Block>,
backend: &fc_db::kv::Backend<Block, C>,
header: &Block::Header,
) -> Result<(), String>
where
C: ProvideRuntimeApi<Block>,
C: HeaderBackend<Block> + ProvideRuntimeApi<Block>,
C::Api: EthereumRuntimeRPCApi<Block>,
{
let substrate_block_hash = header.hash();
Expand Down Expand Up @@ -159,7 +159,7 @@ pub fn sync_one_block<Block: BlockT, C, BE>(
client: &C,
substrate_backend: &BE,
overrides: Arc<OverrideHandle<Block>>,
frontier_backend: &fc_db::kv::Backend<Block>,
frontier_backend: &fc_db::kv::Backend<Block, C>,
sync_from: <Block::Header as HeaderT>::Number,
strategy: SyncStrategy,
sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
Expand Down Expand Up @@ -248,7 +248,7 @@ pub fn sync_blocks<Block: BlockT, C, BE>(
client: &C,
substrate_backend: &BE,
overrides: Arc<OverrideHandle<Block>>,
frontier_backend: &fc_db::kv::Backend<Block>,
frontier_backend: &fc_db::kv::Backend<Block, C>,
limit: usize,
sync_from: <Block::Header as HeaderT>::Number,
strategy: SyncStrategy,
Expand Down Expand Up @@ -282,13 +282,14 @@ where
Ok(synced_any)
}

pub fn fetch_header<Block: BlockT, BE>(
pub fn fetch_header<Block: BlockT, C, BE>(
substrate_backend: &BE,
frontier_backend: &fc_db::kv::Backend<Block>,
frontier_backend: &fc_db::kv::Backend<Block, C>,
checking_tip: Block::Hash,
sync_from: <Block::Header as HeaderT>::Number,
) -> Result<Option<Block::Header>, String>
where
C: HeaderBackend<Block>,
BE: HeaderBackend<Block>,
{
if frontier_backend.mapping().is_synced(&checking_tip)? {
Expand Down
Loading

0 comments on commit eb643a2

Please sign in to comment.