From f227c1fe2e1a1e2e09fe86c765520c289e62bda0 Mon Sep 17 00:00:00 2001 From: sifnoc Date: Wed, 6 Dec 2023 17:37:04 +0000 Subject: [PATCH] chore: added description as in-line comments --- src/executor/mod.rs | 10 ++++ src/executor/spawner.rs | 105 ++++++++++++++++++++++------------------ src/orchestrator/mod.rs | 27 ++++++++--- 3 files changed, 88 insertions(+), 54 deletions(-) diff --git a/src/executor/mod.rs b/src/executor/mod.rs index cd986cd..e0e7870 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -17,6 +17,16 @@ use tokio::time::{sleep, Duration}; use crate::json_mst::{JsonEntry, JsonMerkleSumTree}; use summa_backend::merkle_sum_tree::MerkleSumTree; +/// Executor role and functionality. +/// Acts as an intermediary between the Orchestrator and Workers, facilitating the data processing workflow. +/// Each Executor operates in a one-to-one relationship with a Worker, processing entry data into `mini-tree`. +/// +/// Key aspects of the Executor's role include: +/// - **Spawning and Connection**: Executors connect with Workers to execute tasks, enhancing the system's scalability. +/// - **Data Handling and Task Distribution**: Executors manage and distribute data entries, ensuring smooth workflow +/// - **Communication Bridge**: They facilitate communication within the data pipeline, relaying 'mini-tree' from Workers to the Orchestrator. +/// +/// Executors are dynamically spawned and connected to Workers for task execution. #[derive(Clone)] pub struct Executor { client: Client, diff --git a/src/executor/spawner.rs b/src/executor/spawner.rs index fb5a38a..64d5939 100644 --- a/src/executor/spawner.rs +++ b/src/executor/spawner.rs @@ -2,58 +2,69 @@ use std::{future::Future, pin::Pin}; use crate::executor::Executor; +/// ExecutorSpawner responsibility and types. +/// +/// Responsible for initializing and terminating Executors, serving as a management point for creating Executor instances and Workers. +/// +/// Types include: +/// - MockSpawner: For testing, runs `mini-tree-server` locally. +/// - LocalSpawner: Initializes Executors and Workers in local Docker environments. +/// - CloudSpawner: Optimized for cloud resources and Docker Swarm, manages containers as services for scalability. +/// +/// Note: ExecutorSpawner is a trait with key methods `spawn_executor` and `terminate_executor`. +/// pub trait ExecutorSpawner { - // Spawns an executor asynchronously. - // - // This method initializes an Executor and returns a Future that resolves to the Executor. - // - // To achieve this asynchronously (outside of an async trait function), we use a one-time channel ('oneshot`) to deliver the variables to the Future. - // - // Internally, it performs the following codelines: - // - // 1. Uses a 'oneshot'channel for sending the variables from the spawned async task. - // ``` - // let (tx, rx) = oneshot::channel(); - // ``` - // 2. Clones necessary variables (url, name and so on) to move them into the async block. - // ``` - // let url = self.url.clone(); - // ``` - // 3. Spawns an asynchronous task (`tokio::spawn`) that asynchronously creates a worker and sends back its information. - // ``` - // tokio::spawn(async move { - // if let Ok(worker_info) = - // Spawner::create_worker(url).await - // { - // let _ = tx.send(worker_info); - // } - // }); - // Note that, the "create_worker" is typically declared in the "Spawner" struct that has "ExecutorSpawner"trait. - // 4. Returns a Future that, upon completion, provides an Executor connected to the newly spawned worker. - // ``` - // Box::pin(async move { - // let url = rx.await.expect("Failed to receive worker URL"); - // Executor::new(url, None); - // }); - // ``` + /// Spawns an executor asynchronously. // + /// This method initializes an Executor and returns a Future that resolves to the Executor. + /// + /// To achieve this asynchronously (outside of an async trait function), we use a one-time channel ('oneshot`) to deliver the variables to the Future. + /// + /// Internally, it performs the following codelines: + /// + /// 1. Uses a 'oneshot'channel for sending the variables from the spawned async task. + /// ```ignore + /// let (tx, rx) = oneshot::channel(); + /// ``` + /// 2. Clones necessary variables (url, name and so on) to move them into the async block. + /// ```ignore + /// let url = self.url.clone(); + /// ``` + /// 3. Spawns an asynchronous task (`tokio::spawn`) that asynchronously creates a worker and sends back its information. + /// ```ignore + /// tokio::spawn(async move { + /// if let Ok(worker_info) = + /// Spawner::create_worker(url).await + /// { + /// let _ = tx.send(worker_info); + /// } + /// }); + /// Note that, the "create_worker" is typically declared in the "Spawner" struct that has "ExecutorSpawner"trait. + /// 4. Returns a Future that, upon completion, provides an Executor connected to the newly spawned worker. + /// ```ignore + /// Box::pin(async move { + /// let url = rx.await.expect("Failed to receive worker URL"); + /// Executor::new(url, None); + /// }); + /// ``` + /// + /// // Returns: - // - `Pin + Send>>`: A Future that, when awaited, yields an Executor instance and spawns a worker. - // + // - "Pin + Send>>": A Future that, when awaited, yields an Executor instance and spawns a worker. fn spawn_executor(&self) -> Pin + Send>>; - // Terminates all spawned executors (and/or workers) asynchronously. - // - // This method is responsible for gracefully shutting down all active executors (and/or workers) by calling - // To do this, the "Spawner"may needs some fields for storing some accessing point to the workers, which are spawned with the executors. - // For deliver variables to Future results, use a channel like the pattern at 'spawn_executor'. - // - // The termination process typically involves: - // - Iterating through all active Executors and workers. - // - Invoking kind of 'shutdown'on each executors and workers to initiate their shutdown. - // - Awaiting the completion of all shutdown operations. - // + /// Terminates all spawned executors (and/or workers) asynchronously. + /// + /// This method is responsible for gracefully shutting down all active executors (and/or workers) by calling + /// To do this, the "Spawner"may needs some fields for storing some accessing point to the workers, which are spawned with the executors. + /// For deliver variables to Future results, use a channel like the pattern at 'spawn_executor'. + /// + /// The termination process typically involves: + /// - Iterating through all active Executors and workers. + /// - Invoking kind of 'shutdown'on each executors and workers to initiate their shutdown. + /// - Awaiting the completion of all shutdown operations. + /// // Returns: - // - `Pin + Send>>`: A Future that, when awaited, indicates that all executors (and/or workers) have been terminated. + // - "Pin + Send>>": A Future that, when awaited, indicates that all executors (and/or workers) have been terminated. fn terminate_executors(&self) -> Pin + Send>>; } diff --git a/src/orchestrator/mod.rs b/src/orchestrator/mod.rs index 5720e84..e10daa2 100644 --- a/src/orchestrator/mod.rs +++ b/src/orchestrator/mod.rs @@ -10,6 +10,14 @@ use crate::aggregation_merkle_sum_tree::AggregationMerkleSumTree; use crate::executor::ExecutorSpawner; use crate::json_mst::JsonEntry; +/// The Orchestrator in Summa Aggregation +/// +/// It serves as the central management component, coordinating data processing activities +/// between Executors and Workers, thereby improving the efficiency of building the Merkle sum tree. +/// +/// Functions include dynamically spawning Executors, managing task distribution, +/// handling errors and pipeline control, and building the `AggregationMerkleSumTree` +/// by aggregating mini-trees constructed by the Workers. pub struct Orchestrator { executor_spawner: Box, entry_csvs: Vec, @@ -23,12 +31,13 @@ impl Orchestrator Orchestrator