Skip to content

Commit

Permalink
Rename SignerTickerImporter to SignerImporter
Browse files Browse the repository at this point in the history
This is more cardano agnostic and say better what it realy does (it does
not just import the ticker but also create a signer in the db if it does
not exist).
  • Loading branch information
Alenar committed Oct 4, 2023
1 parent 32bfc9f commit 183d97f
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 65 deletions.
11 changes: 5 additions & 6 deletions mithril-aggregator/src/commands/serve_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,10 @@ impl ServeCommand {
Ok(())
});

// Create a [SignerTickersImporter] only if the `cexplorer_pools_url` is provided in the
// config.
// Create a SignersImporter only if the `cexplorer_pools_url` is provided in the config.
if let Some(cexplorer_pools_url) = config.cexplorer_pools_url {
match dependencies_builder
.create_signer_ticker_importer(&cexplorer_pools_url)
.create_signer_importer(&cexplorer_pools_url)
.await
{
Ok(service) => {
Expand All @@ -148,16 +147,16 @@ impl ServeCommand {
tokio::time::sleep(Duration::from_secs(5)).await;
service
.run_forever(Duration::from_secs(
// Signer Ticker Interval are in minutes
config.signer_ticker_run_interval * 60,
// Import interval are in minutes
config.signer_importer_run_interval * 60,
))
.await;
Ok(())
});
}
Err(error) => {
warn!(
"Failed to build the `SignerTickersImporter` fetching url `{}`. Error: {:?}",
"Failed to build the `SignersImporter`:\n url to import `{}`\n Error: {:?}",
cexplorer_pools_url, error
);
}
Expand Down
19 changes: 9 additions & 10 deletions mithril-aggregator/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,11 @@ pub struct Configuration {
/// is set to [zstandard][CompressionAlgorithm::Zstandard].
pub zstandard_parameters: Option<ZstandardCompressionParameters>,

/// Url to CExplorer list of pools that will be pulled to import the pools as signer in
/// the database (including their pool ticker).
/// Url to CExplorer list of pools to import as signer in the database.
pub cexplorer_pools_url: Option<String>,

/// Time interval at which the [Self::cexplorer_pools_url] will be fetch (in minutes).
pub signer_ticker_run_interval: u64,
/// Time interval at which the signers in [Self::cexplorer_pools_url] will be imported (in minutes).
pub signer_importer_run_interval: u64,
}

/// Uploader needed to copy the snapshot once computed.
Expand Down Expand Up @@ -196,7 +195,7 @@ impl Configuration {
snapshot_compression_algorithm: CompressionAlgorithm::Zstandard,
zstandard_parameters: Some(ZstandardCompressionParameters::default()),
cexplorer_pools_url: None,
signer_ticker_run_interval: 1,
signer_importer_run_interval: 1,
}
}

Expand Down Expand Up @@ -272,8 +271,8 @@ pub struct DefaultConfiguration {
/// Use CDN domain to construct snapshot urls default setting (if snapshot_uploader_type is Gcp)
pub snapshot_use_cdn_domain: String,

/// Signer ticker run interval default setting
pub signer_ticker_run_interval: u64,
/// Signer importer run interval default setting
pub signer_importer_run_interval: u64,
}

impl Default for DefaultConfiguration {
Expand All @@ -291,7 +290,7 @@ impl Default for DefaultConfiguration {
disable_digests_cache: "false".to_string(),
snapshot_compression_algorithm: "zstandard".to_string(),
snapshot_use_cdn_domain: "false".to_string(),
signer_ticker_run_interval: 720,
signer_importer_run_interval: 720,
}
}
}
Expand Down Expand Up @@ -384,10 +383,10 @@ impl Source for DefaultConfiguration {
),
);
result.insert(
"signer_ticker_run_interval".to_string(),
"signer_importer_run_interval".to_string(),
Value::new(
Some(&namespace),
ValueKind::from(myself.signer_ticker_run_interval),
ValueKind::from(myself.signer_importer_run_interval),
),
);

Expand Down
24 changes: 12 additions & 12 deletions mithril-aggregator/src/dependency_injection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ use crate::{
StakeDistributionService, TickerService,
},
tools::{
CExplorerSignerTickerRetriever, GcpFileUploader, GenesisToolsDependency,
SignerTickersImporter, SignerTickersPersister,
CExplorerSignerRetriever, GcpFileUploader, GenesisToolsDependency, SignerImporterPersister,
SignersImporter,
},
AggregatorConfig, AggregatorRunner, AggregatorRuntime, CertificatePendingStore,
CompressedArchiveSnapshotter, Configuration, DependencyContainer, DumbSnapshotUploader,
Expand Down Expand Up @@ -872,8 +872,10 @@ impl DependenciesBuilder {
Ok(self.signer_store.as_ref().cloned().unwrap())
}

/// [SignerTickersPersister] service
pub async fn get_signer_ticker_persister(&mut self) -> Result<Arc<dyn SignerTickersPersister>> {
/// [SignerImporterPersister] service
pub async fn get_signer_ticker_persister(
&mut self,
) -> Result<Arc<dyn SignerImporterPersister>> {
if self.signer_store.is_none() {
self.signer_store = Some(self.build_signer_store().await?);
}
Expand Down Expand Up @@ -1103,18 +1105,16 @@ impl DependenciesBuilder {
Ok(dependencies)
}

/// Create a [SignerTickersImporter] instance.
pub async fn create_signer_ticker_importer(
/// Create a [SignersImporter] instance.
pub async fn create_signer_importer(
&mut self,
cexplorer_pools_url: &str,
) -> Result<SignerTickersImporter> {
let retriever = CExplorerSignerTickerRetriever::new(
cexplorer_pools_url,
Some(Duration::from_secs(30)),
)?;
) -> Result<SignersImporter> {
let retriever =
CExplorerSignerRetriever::new(cexplorer_pools_url, Some(Duration::from_secs(30)))?;
let persister = self.get_signer_ticker_persister().await?;

Ok(SignerTickersImporter::new(Arc::new(retriever), persister))
Ok(SignersImporter::new(Arc::new(retriever), persister))
}

/// Create [TickerService] instance.
Expand Down
3 changes: 3 additions & 0 deletions mithril-aggregator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ pub use store::{
CertificatePendingStore, ProtocolParametersStore, ProtocolParametersStorer,
VerificationKeyStore, VerificationKeyStorer,
};
pub use tools::{
CExplorerSignerRetriever, SignerImporterPersister, SignerImporterRetriever, SignersImporter,
};

#[cfg(test)]
pub use dependency_injection::tests::initialize_dependencies;
7 changes: 3 additions & 4 deletions mithril-aggregator/src/tools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ mod digest_helpers;
mod era;
mod genesis;
mod remote_file_uploader;
mod signer_tickers_importer;
mod signer_importer;

pub use certificates_hash_migrator::CertificatesHashMigrator;
pub use digest_helpers::extract_digest_from_path;
pub use era::EraTools;
pub use genesis::{GenesisTools, GenesisToolsDependency};
pub use remote_file_uploader::{GcpFileUploader, RemoteFileUploader};
pub use signer_tickers_importer::{
CExplorerSignerTickerRetriever, SignerTickersImporter, SignerTickersPersister,
SignerTickersRetriever,
pub use signer_importer::{
CExplorerSignerRetriever, SignerImporterPersister, SignerImporterRetriever, SignersImporter,
};

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ use slog_scope::{info, warn};

pub type PoolTicker = String;

/// Tool that can import a list of Signer including their Pool Tickers
pub struct SignerTickersImporter {
retriever: Arc<dyn SignerTickersRetriever>,
persister: Arc<dyn SignerTickersPersister>,
/// Tool that can import a list of Signer
pub struct SignersImporter {
retriever: Arc<dyn SignerImporterRetriever>,
persister: Arc<dyn SignerImporterPersister>,
}

impl SignerTickersImporter {
/// [SignerTickersImporter] factory
impl SignersImporter {
/// [SignersImporter] factory
pub fn new(
retriever: Arc<dyn SignerTickersRetriever>,
persister: Arc<dyn SignerTickersPersister>,
retriever: Arc<dyn SignerImporterRetriever>,
persister: Arc<dyn SignerImporterPersister>,
) -> Self {
Self {
retriever,
Expand All @@ -36,7 +36,7 @@ impl SignerTickersImporter {

/// Import and persist the signers
pub async fn run(&self) -> StdResult<()> {
info!("🔧 Signer Ticker Importer: starting");
info!("🔧 Signer Importer: starting");
let items = self
.retriever
.retrieve()
Expand All @@ -55,35 +55,37 @@ impl SignerTickersImporter {
loop {
interval.tick().await;
if let Err(error) = self.run().await {
warn!("Signer ticker retriever failed: Error: «{:?}».", error);
warn!("Signer retriever failed: Error: «{:?}».", error);
}
info!(
"🔧 Signer Ticker Importer: Cycle finished, Sleeping for {} min",
"🔧 Signer Importer: Cycle finished, Sleeping for {} min",
run_interval.as_secs() / 60
);
}
}
}

/// Trait that define how a [SignersImporter] retrieve the signers to import.
#[cfg_attr(test, automock)]
#[async_trait]
pub trait SignerTickersRetriever: Sync + Send {
pub trait SignerImporterRetriever: Sync + Send {
/// Retrieve the signers list.
async fn retrieve(&self) -> StdResult<HashMap<PartyId, Option<PoolTicker>>>;
}

/// Trait that define how a [SignersImporter] persist the retrieved signers.
#[cfg_attr(test, automock)]
#[async_trait]
pub trait SignerTickersPersister: Sync + Send {
pub trait SignerImporterPersister: Sync + Send {
/// Persist the given list of signers.
async fn persist(&self, signers: HashMap<PartyId, Option<PoolTicker>>) -> StdResult<()>;
}

#[async_trait]
impl SignerTickersPersister for SignerStore {
impl SignerImporterPersister for SignerStore {
async fn persist(&self, signers: HashMap<PartyId, Option<PoolTicker>>) -> StdResult<()> {
info!(
"🔧 Signer Ticker Importer: persisting retrieved data in the database";
"🔧 Signer Importer: persisting retrieved data in the database";
"number_of_signer_to_insert" => signers.len()
);
self.import_many_signers(signers).await?;
Expand All @@ -92,15 +94,15 @@ impl SignerTickersPersister for SignerStore {
}
}

/// A [SignerTickersRetriever] fetching signers data from CExplorer.
pub struct CExplorerSignerTickerRetriever {
/// A [SignerImporterRetriever] fetching signers data from CExplorer.
pub struct CExplorerSignerRetriever {
/// Url from which a SPO list using the CExplorer format will be fetch.
source_url: Url,
client: reqwest::Client,
}

impl CExplorerSignerTickerRetriever {
/// Create a new [CExplorerSignerTickerRetriever] that will fetch data from the given url.
impl CExplorerSignerRetriever {
/// Create a new [CExplorerSignerRetriever] that will fetch data from the given url.
pub(crate) fn new<T: IntoUrl>(source_url: T, timeout: Option<Duration>) -> StdResult<Self> {
let source_url = source_url
.into_url()
Expand All @@ -118,10 +120,10 @@ impl CExplorerSignerTickerRetriever {
}

#[async_trait]
impl SignerTickersRetriever for CExplorerSignerTickerRetriever {
impl SignerImporterRetriever for CExplorerSignerRetriever {
async fn retrieve(&self) -> StdResult<HashMap<PartyId, Option<PoolTicker>>> {
info!(
"🔧 Signer Ticker Importer: retrieving data from source";
"🔧 Signer Importer: retrieving data from source";
"source_url" => &self.source_url.as_str()
);
let response = self
Expand Down Expand Up @@ -167,7 +169,7 @@ impl SPOItem {
}

/// Consume this item to convert it to a result ready to be yield by a
/// [SignerTickersRetriever::retrieve] implementation.
/// [SignerImporterRetriever::retrieve] implementation.
fn extract(self) -> (PartyId, Option<PoolTicker>) {
let is_name_empty = self.is_name_empty();
let (pool_id, name) = (self.pool_id, self.name);
Expand Down Expand Up @@ -282,7 +284,7 @@ mod tests {
}));

let retriever =
CExplorerSignerTickerRetriever::new(format!("{}/list", server.url()), None).unwrap();
CExplorerSignerRetriever::new(format!("{}/list", server.url()), None).unwrap();
let result = retriever
.retrieve()
.await
Expand All @@ -305,7 +307,7 @@ mod tests {
);

let retriever =
CExplorerSignerTickerRetriever::new(format!("{}/list", server.url()), None).unwrap();
CExplorerSignerRetriever::new(format!("{}/list", server.url()), None).unwrap();
retriever
.retrieve()
.await
Expand All @@ -317,7 +319,7 @@ mod tests {
let server = test_http_server(warp::path("list").map(|| r#"{ "data": [ {"pool_" ] }"#));

let retriever =
CExplorerSignerTickerRetriever::new(format!("{}/list", server.url()), None).unwrap();
CExplorerSignerRetriever::new(format!("{}/list", server.url()), None).unwrap();
retriever
.retrieve()
.await
Expand All @@ -331,7 +333,7 @@ mod tests {
Ok::<&str, Infallible>(r#"{"data":[]}"#)
}));

let retriever = CExplorerSignerTickerRetriever::new(
let retriever = CExplorerSignerRetriever::new(
format!("{}/list", server.url()),
Some(Duration::from_millis(10)),
)
Expand All @@ -345,15 +347,15 @@ mod tests {
#[tokio::test]
async fn persist_list_of_two_signers_one_with_ticker_the_other_without() {
let connection = Arc::new(Mutex::new(connection_without_foreign_key_support()));
let mut retriever = MockSignerTickersRetriever::new();
let mut retriever = MockSignerImporterRetriever::new();
retriever.expect_retrieve().returning(|| {
Ok(HashMap::from([
("pool1".to_string(), Some("[Pool name test]".to_string())),
("pool2".to_string(), None),
]))
});

let importer = SignerTickersImporter::new(
let importer = SignersImporter::new(
Arc::new(retriever),
Arc::new(SignerStore::new(connection.clone())),
);
Expand Down Expand Up @@ -386,7 +388,7 @@ mod tests {
)
.await
.unwrap();
let mut retriever = MockSignerTickersRetriever::new();
let mut retriever = MockSignerImporterRetriever::new();
retriever.expect_retrieve().returning(|| {
Ok(HashMap::from([
("pool1".to_string(), Some("[Updated Pool name]".to_string())),
Expand All @@ -397,7 +399,7 @@ mod tests {
]))
});

let importer = SignerTickersImporter::new(
let importer = SignersImporter::new(
Arc::new(retriever),
Arc::new(SignerStore::new(connection.clone())),
);
Expand Down Expand Up @@ -446,10 +448,9 @@ mod tests {
}"#
}));

let importer = SignerTickersImporter::new(
let importer = SignersImporter::new(
Arc::new(
CExplorerSignerTickerRetriever::new(format!("{}/list", server.url()), None)
.unwrap(),
CExplorerSignerRetriever::new(format!("{}/list", server.url()), None).unwrap(),
),
Arc::new(SignerStore::new(connection.clone())),
);
Expand Down

0 comments on commit 183d97f

Please sign in to comment.