Skip to content

Commit

Permalink
feat: allow tilde paths in kafka properties (ssl.ca.location for in…
Browse files Browse the repository at this point in the history
…stance), update dependencies
  • Loading branch information
Mcdostone committed Dec 15, 2024
1 parent e9b2873 commit d3d1378
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 4 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ rdkafka = { version = "0.37.0", features = ["cmake-build"] }
async-trait = "0.1.83"
extism = "1.9.1"
url = { version = "2.5.4", features = ["serde"] }
resolve-path = "0.1.0"


[features]
Expand All @@ -31,4 +32,4 @@ ssl-vendored = [
]
gssapi-vendored = [
"rdkafka/gssapi-vendored"
]
]
31 changes: 29 additions & 2 deletions crates/app/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use indexmap::IndexMap;
use itertools::Itertools;
use lib::Error;
use resolve_path::PathResolveExt;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
Expand All @@ -14,7 +15,17 @@ use url::Url;

use crate::APPLICATION_NAME;

static EXAMPLE_PROMPTS: &[&str] = &[
/// List of kafka properties that are a file location.
const KAFKA_PROPERTIES_WITH_LOCATIONS: [&str; 6] = [
"ssl.ca.location",
"ssl.certificate.location",
"ssl.key.location",
"ssl.keystore.location",
"ssl.crl.location",
"ssl.engine.location",
];

const EXAMPLE_PROMPTS: &[&str] = &[
r#"timestamp between "2 hours ago" and "1 hour ago" limit 100 from beginning"#,
r#"offset > 100000 and value contains "music" limit 10"#,
r#"key == "ABC" and timestamp >= "2 days ago""#,
Expand Down Expand Up @@ -164,7 +175,6 @@ impl Config {
/// Returns the kafka properties for the given cluster.
pub fn kafka_config_of(&self, cluster: &str) -> Result<HashMap<String, String>, Error> {
let mut config = HashMap::new();

config.extend(self.default_kafka_config.clone());

if !self.clusters.contains_key(cluster) {
Expand All @@ -177,9 +187,26 @@ impl Config {

let env_config = self.clusters.get(cluster.trim()).unwrap();
config.extend(env_config.kafka.clone());
config = Self::normalize_path_locations(config);

Ok(config)
}

/// Returns the kafka properties for the given cluster.
fn normalize_path_locations(mut config: HashMap<String, String>) -> HashMap<String, String> {
for key in KAFKA_PROPERTIES_WITH_LOCATIONS {
if let Some(path) = config.get(key) {
let normalized_path = PathBuf::from(path)
.resolve()
.canonicalize()
.map(|d| d.display().to_string())
.unwrap_or(path.to_string());
config.insert(key.to_string(), normalized_path);
}
}
config
}

/// Returns the schema registry configuration for the given cluster.
pub fn schema_registry_config_of(&self, cluster: &str) -> Option<SchemaRegistryConfig> {
let cluster_config = self.clusters.get(cluster.trim()).unwrap();
Expand Down
1 change: 0 additions & 1 deletion crates/tui/src/component/ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ impl Ui {
) -> Result<(), TuiError> {
let app = self.app.clone();
tokio::spawn(async move {
info!("Loading topics");
match app.topic_details(topics) {
Ok(details) => action_tx.send(Action::TopicDetails(details)).unwrap(),
Err(e) => action_tx
Expand Down

0 comments on commit d3d1378

Please sign in to comment.