Skip to content

Commit

Permalink
feat(DA): simplify openDA startup (#1310)
Browse files Browse the repository at this point in the history
1. add test_internal_da_server_config_str

2. insert val from env for gcs

---------

Co-authored-by: popcnt-subodhi <subodhi@west>
  • Loading branch information
popcnt1 and popcnt-subodhi authored Jan 18, 2024
1 parent 99459cc commit 0997228
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 7 deletions.
73 changes: 69 additions & 4 deletions crates/rooch-config/src/da_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,27 @@ impl FromStr for InternalDAServerConfigType {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let v: Value = serde_json::from_str(s).map_err(|_| format!("Invalid JSON: {}", s))?;
let v: Value =
serde_json::from_str(s).map_err(|e| format!("Error parsing JSON: {}, {}", e, s))?;

if let Some(obj) = v.as_object() {
if let Some(celestia) = obj.get("celestia") {
let celestia_config: DAServerCelestiaConfig =
serde_json::from_value(celestia.clone())
.map_err(|_| format!("invalid celestia config: {}", celestia))?;
serde_json::from_value(celestia.clone()).map_err(|e| {
format!(
"invalid celestia config: {} error: {}, original: {}",
celestia, e, s
)
})?;
Ok(InternalDAServerConfigType::Celestia(celestia_config))
} else if let Some(openda) = obj.get("open-da") {
let openda_config: DAServerOpenDAConfig = serde_json::from_value(openda.clone())
.map_err(|_| format!("invalid open-da config: {}", openda))?;
.map_err(|e| {
format!(
"invalid open-da config: {}, error: {}, original: {}",
openda, e, s
)
})?;
Ok(InternalDAServerConfigType::OpenDA(openda_config))
} else {
Err(format!("Invalid value: {}", s))
Expand Down Expand Up @@ -431,4 +441,59 @@ mod tests {

assert!(output.is_err());
}

#[test]
fn test_internal_da_server_config_str() {
let celestia_config_str = r#"{"celestia": {"namespace": "test_namespace", "conn": "test_conn", "auth_token": "test_token", "max_segment_size": 2048}}"#;
let openda_config_str = r#"{"open-da": {"scheme": "gcs", "config": {"Param1": "value1", "param2": "Value2"}, "max_segment_size": 2048}}"#;
let invalid_config_str = r#"{"unknown": {...}}"#;

match InternalDAServerConfigType::from_str(celestia_config_str) {
Ok(InternalDAServerConfigType::Celestia(celestia_config)) => {
assert_eq!(
celestia_config,
DAServerCelestiaConfig {
namespace: Some("test_namespace".to_string()),
conn: Some("test_conn".to_string()),
auth_token: Some("test_token".to_string()),
max_segment_size: Some(2048),
}
);
}
Ok(_) => {
panic!("Expected Celestia Config");
}
Err(e) => {
panic!("Error parsing Celestia Config: {}", e)
}
}

let mut config: HashMap<String, String> = HashMap::new();
config.insert("Param1".to_string(), "value1".to_string());
config.insert("param2".to_string(), "Value2".to_string());

match InternalDAServerConfigType::from_str(openda_config_str) {
Ok(InternalDAServerConfigType::OpenDA(openda_config)) => {
assert_eq!(
openda_config,
DAServerOpenDAConfig {
scheme: OpenDAScheme::GCS,
config,
max_segment_size: Some(2048),
}
);
}
Ok(_) => {
panic!("Expected OpenDA Config");
}
Err(e) => {
panic!("Error parsing OpenDA Config: {}", e)
}
}

if let Err(_) = InternalDAServerConfigType::from_str(invalid_config_str) {
} else {
panic!("Expected Error for invalid config");
}
}
}
51 changes: 48 additions & 3 deletions crates/rooch-da/src/server/openda/actor/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use coerce::actor::context::ActorContext;
use coerce::actor::message::Handler;
use coerce::actor::Actor;
use opendal::{Operator, Scheme};
use std::collections::HashMap;

use rooch_config::da_config::{DAServerOpenDAConfig, OpenDAScheme};

Expand All @@ -29,15 +30,47 @@ impl Actor for DAServerOpenDAActor {}
// TODO add FEC get for SDC protection (wrong response attacks)
impl DAServerOpenDAActor {
pub async fn new(cfg: &DAServerOpenDAConfig) -> Result<DAServerOpenDAActor> {
let config = cfg.clone();
let mut config = cfg.clone();

let op: Operator = match config.scheme {
OpenDAScheme::S3 => Operator::via_map(Scheme::S3, config.config)?,
OpenDAScheme::GCS => Operator::via_map(Scheme::Gcs, config.config)?,
OpenDAScheme::GCS => {
// If certain keys don't exist in the map, set them from environment
if !config.config.contains_key("bucket") {
if let Ok(bucket) = std::env::var("OPENDA_GCS_BUCKET") {
config.config.insert("bucket".to_string(), bucket);
}
}
if !config.config.contains_key("root") {
if let Ok(root) = std::env::var("OPENDA_GCS_ROOT") {
config.config.insert("root".to_string(), root);
}
}
if !config.config.contains_key("credential") {
if let Ok(credential) = std::env::var("OPENDA_GCS_CREDENTIAL") {
config.config.insert("credential".to_string(), credential);
}
}
insert_default_from_env_or_const(
&mut config.config,
"predefined_acl",
"OPENDA_GCS_PREDEFINED_ACL",
"publicRead",
);
insert_default_from_env_or_const(
&mut config.config,
"default_storage_class",
"OPENDA_GCS_DEFAULT_STORAGE_CLASS",
"STANDARD",
);

// After setting defaults, proceed with creating Operator
Operator::via_map(Scheme::Gcs, config.config)?
}
};

Ok(Self {
max_segment_size: cfg.max_segment_size.unwrap() as usize,
max_segment_size: cfg.max_segment_size.unwrap_or(4 * 1024 * 1024) as usize,
operator: op,
})
}
Expand Down Expand Up @@ -78,6 +111,18 @@ impl DAServerOpenDAActor {
}
}

fn insert_default_from_env_or_const(
config: &mut HashMap<String, String>,
key: &str,
env_var: &str,
const_default: &str,
) {
if !config.contains_key(key) {
let value = std::env::var(env_var).unwrap_or(const_default.to_string());
config.insert(key.to_string(), value);
}
}

#[async_trait]
impl Handler<PutBatchMessage> for DAServerOpenDAActor {
async fn handle(
Expand Down

0 comments on commit 0997228

Please sign in to comment.