Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat : added snos worker implementation and unit tests #16

Merged
merged 6 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:

- name: Run llvm-cov
run: |
cargo llvm-cov nextest --release --lcov --output-path lcov.info
cargo llvm-cov nextest --release --lcov --output-path lcov.info --test-threads=1

- name: Upload coverage to codecov.io
uses: codecov/codecov-action@v3
Expand Down
10 changes: 9 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@ uuid = { version = "1.7.0" }
num-bigint = { version = "0.4.4" }
httpmock = { version = "0.7.0" }
utils = { path = "crates/utils" }
arc-swap = { version = "1.7.1" }
num-traits = "0.2"
lazy_static = "1.4.0"
3 changes: 2 additions & 1 deletion crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ name = "orchestrator"
path = "src/main.rs"

[dependencies]

arc-swap = { workspace = true }
async-std = "1.12.0"
async-trait = { workspace = true }
axum = { workspace = true, features = ["macros"] }
axum-macros = { workspace = true }
Expand Down
25 changes: 22 additions & 3 deletions crates/orchestrator/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::database::{Database, DatabaseConfig};
use crate::queue::sqs::SqsQueue;
use crate::queue::QueueProvider;
use crate::utils::env_utils::get_env_var_or_panic;
use arc_swap::{ArcSwap, Guard};
use da_client_interface::DaClient;
use da_client_interface::DaConfig;
use dotenvy::dotenv;
Expand Down Expand Up @@ -79,11 +80,29 @@ impl Config {

/// The app config. It can be accessed from anywhere inside the service.
/// It's initialized only once.
pub static CONFIG: OnceCell<Config> = OnceCell::const_new();
/// We are using `ArcSwap` as it allow us to replace the new `Config` with
/// a new one which is required when running test cases. This approach was
/// inspired from here - https://github.com/matklad/once_cell/issues/127
pub static CONFIG: OnceCell<ArcSwap<Config>> = OnceCell::const_new();

/// Returns the app config. Initializes if not already done.
pub async fn config() -> &'static Config {
CONFIG.get_or_init(init_config).await
pub async fn config() -> Guard<Arc<Config>> {
let cfg = CONFIG.get_or_init(|| async { ArcSwap::from_pointee(init_config().await) }).await;
cfg.load()
}

/// OnceCell only allows us to initialize the config once and that's how it should be on production.
/// However, when running tests, we often want to reinitialize because we want to clear the DB and
/// set it up again for reuse in new tests. By calling `config_force_init` we replace the already
/// stored config inside `ArcSwap` with the new configuration and pool settings.
#[cfg(test)]
pub async fn config_force_init(config: Config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's only add this code if the test flag is enabled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it could work without the global config? Apart from the need to carry it through all methods, are there any reasons to use static initialization?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya that's the only advantage. Otherwise we need to pass it everywhere although that does make testing easier. Do you think we should do the latter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be fair, yes :) that would be much easier to test, also you'd be able to pass only necessary parts of the config instead of the entire struct

match CONFIG.get() {
Some(arc) => arc.store(Arc::new(config)),
None => {
CONFIG.get_or_init(|| async { ArcSwap::from_pointee(config) }).await;
}
}
}

/// Builds the DA client based on the environment variable DA_LAYER
Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub trait Database: Send + Sync {
) -> Result<()>;

async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()>;
async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>>;
}

pub trait DatabaseConfig {
Expand Down
14 changes: 13 additions & 1 deletion crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use async_trait::async_trait;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use mongodb::bson::Document;
use mongodb::options::UpdateOptions;
use mongodb::options::{FindOneOptions, UpdateOptions};
use mongodb::{
bson::doc,
options::{ClientOptions, ServerApi, ServerApiVersion},
Expand Down Expand Up @@ -115,4 +115,16 @@ impl Database for MongoDb {
self.update_job_optimistically(job, update).await?;
Ok(())
}

async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>> {
let filter = doc! {
"job_type": mongodb::bson::to_bson(&job_type)?,
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();
Copy link
Contributor

@unstark unstark Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICS internal ID is a string without additional restrictions https://github.com/unstark/madara-orchestrator/blob/3fe37331183cf3291aaef9d1e2664e54e86f0648/crates/orchestrator/src/jobs/types.rs#L104, so it's not necessarily a unique incrementing integer. We might need an additional field, e.g. created_at in microseconds to achieve what you want

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So internal_id would actually be the block number but in String. The reason it was made a string is that jobs use this column to uniquely identify a job based on the type. Like, I want the SNOS run of block 6. Now jobs can have any sort of internal id (number, uuid etc.). So as a generic, a string param was used here. I am not sure if this is the best approach though but here, -1 should work because it's a number represented as string?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok nvm, this is incorrect. This function can be used by any job so if some job has a different form of internal id, this will break. However, the created_at might not be the best check either because what if we create jobs in parallel somewhere or after receiving something from a queue, there's no guarantee that the jobs would be created sequentially.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we can explicitly rename this function to say get_latest_job_by_internal_id so it makes the purpose of the function clear. And then later, we can have another function of get_latest_by_created_at if some job is ok using that. wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function name changed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work (renaming), as for the created_at - even if there is a collision, the function would still do its job :)

Ok(self
.get_job_collection()
.find_one(filter, find_options)
.await
.expect("Failed to fetch latest job by given job type"))
}
}
7 changes: 4 additions & 3 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub async fn create_job(job_type: JobType, internal_id: String, metadata: HashMa
}

let job_handler = get_job_handler(&job_type);
let job_item = job_handler.create_job(config, internal_id, metadata).await?;
let job_item = job_handler.create_job(config.as_ref(), internal_id, metadata).await?;
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
config.database().create_job(job_item.clone()).await?;

add_job_to_process_queue(job_item.id).await?;
Expand Down Expand Up @@ -90,7 +90,7 @@ pub async fn process_job(id: Uuid) -> Result<()> {
config.database().update_job_status(&job, JobStatus::LockedForProcessing).await?;

let job_handler = get_job_handler(&job.job_type);
let external_id = job_handler.process_job(config, &job).await?;
let external_id = job_handler.process_job(config.as_ref(), &job).await?;
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved

let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;
config
Expand Down Expand Up @@ -122,7 +122,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
}

let job_handler = get_job_handler(&job.job_type);
let verification_status = job_handler.verify_job(config, &job).await?;
let verification_status = job_handler.verify_job(config.as_ref(), &job).await?;
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved

match verification_status {
JobVerificationStatus::Verified => {
Expand Down Expand Up @@ -170,6 +170,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
fn get_job_handler(job_type: &JobType) -> Box<dyn Job> {
match job_type {
JobType::DataSubmission => Box::new(da_job::DaJob),
JobType::SnosRun => Box::new(snos_job::SnosJob),
_ => unimplemented!("Job type not implemented yet."),
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::collections::HashMap;
use uuid::Uuid;

/// An external id.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(untagged)]
pub enum ExternalId {
/// A string.
Expand Down Expand Up @@ -98,7 +98,7 @@ pub enum JobStatus {
VerificationFailed,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct JobItem {
/// an uuid to identify a job
#[serde(with = "uuid_1_as_binary")]
Expand Down
4 changes: 2 additions & 2 deletions crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn main() {
// init consumer
init_consumers().await.expect("Failed to init consumers");

// spawn a thread for each worker
// spawn a thread for each workers
// changes in rollup mode - sovereign, validity, validiums etc.
// will likely involve changes in these workers as well
tokio::spawn(start_cron(Box::new(SnosWorker), 60));
Expand All @@ -40,7 +40,7 @@ async fn main() {

async fn start_cron(worker: Box<dyn Worker>, interval: u64) {
loop {
worker.run_worker().await;
worker.run_worker().await.expect("Error in running the worker.");
tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await;
}
}
4 changes: 2 additions & 2 deletions crates/orchestrator/src/queue/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";
const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue";

#[derive(Debug, Serialize, Deserialize)]
struct JobQueueMessage {
id: Uuid,
pub struct JobQueueMessage {
pub(crate) id: Uuid,
}

pub async fn add_job_to_process_queue(id: Uuid) -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ pub mod server;
pub mod queue;

pub mod common;
mod workers;
99 changes: 99 additions & 0 deletions crates/orchestrator/src/tests/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use crate::config::config_force_init;
use crate::database::MockDatabase;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType};
use crate::queue::MockQueueProvider;
use crate::tests::common::init_config;
use crate::workers::snos::SnosWorker;
use crate::workers::Worker;
use da_client_interface::MockDaClient;
use httpmock::MockServer;
use mockall::predicate::eq;
use rstest::rstest;
use serde_json::json;
use std::collections::HashMap;
use std::error::Error;
use uuid::Uuid;

#[rstest]
#[case(false)]
#[case(true)]
#[tokio::test]
async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box<dyn Error>> {
let server = MockServer::start();
let da_client = MockDaClient::new();
let mut db = MockDatabase::new();
let mut queue = MockQueueProvider::new();
let start_job_index;
let block;

const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";

// Mocking db function expectations
if !db_val {
db.expect_get_latest_job_by_type_and_internal_id().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None));
start_job_index = 1;
block = 5;
} else {
let uuid_temp = Uuid::new_v4();

db.expect_get_latest_job_by_type_and_internal_id()
.with(eq(JobType::SnosRun))
.returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp))));
block = 6;
start_job_index = 2;
}

for i in start_job_index..block + 1 {
// Getting jobs for check expectations
db.expect_get_job_by_internal_id_and_type()
.times(1)
.with(eq(i.clone().to_string()), eq(JobType::SnosRun))
.returning(|_, _| Ok(None));

let uuid = Uuid::new_v4();

// creating jobs call expectations
db.expect_create_job()
.times(1)
.withf(move |item| item.internal_id == i.clone().to_string())
.returning(move |_| Ok(get_job_item_mock_by_id(i.clone().to_string(), uuid)));
}

// Queue function call simulations
queue
.expect_send_message_to_queue()
.returning(|_, _, _| Ok(()))
.withf(|queue, _payload, _delay| queue == JOB_PROCESSING_QUEUE);

// mock block number (madara) : 5
let rpc_response_block_number = block;
let response = json!({ "id": 1,"jsonrpc":"2.0","result": rpc_response_block_number });
let config =
init_config(Some(format!("http://localhost:{}", server.port())), Some(db), Some(queue), Some(da_client)).await;
config_force_init(config).await;

// mocking block call
let rpc_block_call_mock = server.mock(|when, then| {
when.path("/").body_contains("starknet_blockNumber");
then.status(200).body(serde_json::to_vec(&response).unwrap());
});

let snos_worker = SnosWorker {};
snos_worker.run_worker().await?;

rpc_block_call_mock.assert();

Ok(())
}

fn get_job_item_mock_by_id(id: String, uuid: Uuid) -> JobItem {
JobItem {
id: uuid,
internal_id: id.clone(),
job_type: JobType::SnosRun,
status: JobStatus::Created,
external_id: ExternalId::Number(0),
metadata: HashMap::new(),
version: 0,
}
}
3 changes: 2 additions & 1 deletion crates/orchestrator/src/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_trait::async_trait;
use std::error::Error;

pub mod proof_registration;
pub mod proving;
Expand All @@ -7,5 +8,5 @@ pub mod update_state;

#[async_trait]
pub trait Worker: Send + Sync {
async fn run_worker(&self);
async fn run_worker(&self) -> Result<(), Box<dyn Error>>;
}
3 changes: 2 additions & 1 deletion crates/orchestrator/src/workers/proof_registration.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::workers::Worker;
use async_trait::async_trait;
use std::error::Error;

pub struct ProofRegistrationWorker;

Expand All @@ -8,7 +9,7 @@ impl Worker for ProofRegistrationWorker {
/// 1. Fetch all blocks with a successful proving job run
/// 2. Group blocks that have the same proof
/// 3. For each group, create a proof registration job with from and to block in metadata
async fn run_worker(&self) {
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
todo!()
}
}
3 changes: 2 additions & 1 deletion crates/orchestrator/src/workers/proving.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::workers::Worker;
use async_trait::async_trait;
use std::error::Error;

pub struct ProvingWorker;

#[async_trait]
impl Worker for ProvingWorker {
/// 1. Fetch all successful SNOS job runs that don't have a proving job
/// 2. Create a proving job for each SNOS job run
async fn run_worker(&self) {
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
todo!()
}
}
Loading
Loading