Skip to content

Commit

Permalink
fixes for kafka on macOS (#1090)
Browse files Browse the repository at this point in the history
  • Loading branch information
AdheipSingh authored Jan 10, 2025
1 parent cc259d5 commit 7c2bb3e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 9 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,6 @@ sysinfo = "0.31.4"
thread-priority = "1.0.0"
uptime_lib = "0.3.0"

# Kafka
rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] }

# Utility Libraries
anyhow = { version = "1.0", features = ["backtrace"] }
bytes = "1.4"
Expand Down Expand Up @@ -156,3 +153,6 @@ codegen-units = 1
# adding rdkafka here because, for unsupported platforms, cargo skips other deps which come after this
[target.'cfg(all(target_os = "linux", target_arch = "x86_64"))'.dependencies]
rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] }

[target.'cfg(all(target_os = "macos", target_arch = "aarch64"))'.dependencies]
rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] }
15 changes: 12 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@ use crate::{
option::{validation, Compression, Mode},
};

#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
#[cfg(any(
all(target_os = "linux", target_arch = "x86_64"),
all(target_os = "macos", target_arch = "aarch64")
))]
use crate::kafka::SslProtocol as KafkaSslProtocol;

#[cfg(not(all(target_os = "linux", target_arch = "x86_64")))]
#[cfg(not(any(
all(target_os = "linux", target_arch = "x86_64"),
all(target_os = "macos", target_arch = "aarch64")
)))]
use std::string::String as KafkaSslProtocol;

#[derive(Debug, Default)]
Expand Down Expand Up @@ -507,7 +513,10 @@ impl FromArgMatches for Cli {
}

fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> {
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
#[cfg(any(
all(target_os = "linux", target_arch = "x86_64"),
all(target_os = "macos", target_arch = "aarch64")
))]
{
self.kafka_topics = m.get_one::<String>(Self::KAFKA_TOPICS).cloned();
self.kafka_security_protocol = m
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ pub mod correlation;
mod event;
pub mod handlers;
pub mod hottier;
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
#[cfg(any(
all(target_os = "linux", target_arch = "x86_64"),
all(target_os = "macos", target_arch = "aarch64")
))]
pub mod kafka;
mod livetail;
mod metadata;
Expand Down
10 changes: 8 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use parseable::{
};
use tracing_subscriber::EnvFilter;

#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
#[cfg(any(
all(target_os = "linux", target_arch = "x86_64"),
all(target_os = "macos", target_arch = "aarch64")
))]
use parseable::kafka;

#[actix_web::main]
Expand All @@ -49,7 +52,10 @@ async fn main() -> anyhow::Result<()> {
// keep metadata info in mem
metadata.set_global();

#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
#[cfg(any(
all(target_os = "linux", target_arch = "x86_64"),
all(target_os = "macos", target_arch = "aarch64")
))]
// load kafka server
if CONFIG.parseable.mode != Mode::Query {
tokio::task::spawn(kafka::setup_integration());
Expand Down

0 comments on commit 7c2bb3e

Please sign in to comment.