Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(openobserve transform): Add OpenObserve as an officially supported sink #21531

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/actions/spelling/allow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ Nextbook
Nextcloud
OVH
Odys
openobserve
Openpeak
Oppo
Ovi
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ sinks-logs = [
"sinks-new_relic",
"sinks-papertrail",
"sinks-pulsar",
"sinks-openobserve",
pront marked this conversation as resolved.
Show resolved Hide resolved
"sinks-redis",
"sinks-sematext",
"sinks-socket",
Expand Down Expand Up @@ -772,6 +773,7 @@ sinks-mqtt = ["dep:rumqttc"]
sinks-nats = ["dep:async-nats", "dep:nkeys"]
sinks-new_relic_logs = ["sinks-http"]
sinks-new_relic = []
sinks-openobserve = ["sinks-http"]
sinks-papertrail = ["dep:syslog"]
sinks-prometheus = ["dep:base64", "dep:prost", "vector-lib/prometheus"]
sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"]
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/21531_add_openobserve_as_new_sink.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added a new `openobserve` sink to Vector, built on top of the `http`, sink allowing users to send logs, metrics, and traces directly to OpenObserve. This integration provides enhanced support for monitoring and observability by enabling seamless data ingestion into OpenObserve.

authors: chaitanya-sistla, prabhatsharma, taimingl
2 changes: 2 additions & 0 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ pub mod nats;
pub mod new_relic;
#[cfg(feature = "sinks-webhdfs")]
pub mod opendal_common;
#[cfg(feature = "sinks-openobserve")]
pub mod openobserve;
#[cfg(feature = "sinks-papertrail")]
pub mod papertrail;
#[cfg(feature = "sinks-prometheus")]
Expand Down
135 changes: 135 additions & 0 deletions src/sinks/openobserve.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use http::Uri;
use vector_lib::codecs::encoding::{FramingConfig, JsonSerializerConfig, SerializerConfig};
use vector_lib::configurable::configurable_component;

use crate::{
codecs::{EncodingConfig, EncodingConfigWithFraming},
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
http::{Auth, MaybeAuth},
sinks::{
http::config::{HttpMethod, HttpSinkConfig},
util::{
http::RequestConfig, BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings,
UriSerde,
},
Healthcheck, VectorSink,
},
tls::TlsConfig,
};

/// Configuration for the `openobserve` sink.
#[configurable_component(sink("openobserve", "Deliver log events to OpenObserve."))]
#[derive(Clone, Debug)]
pub struct OpenObserveConfig {
/// The OpenObserve endpoint to send data to.
#[serde(default = "default_endpoint")]
#[configurable(metadata(docs::examples = "http://localhost:5080/api/default/default/_json"))]
uri: UriSerde,

/// Authentication configuration for HTTP requests.
#[configurable(derived)]
auth: Option<Auth>,

/// Outbound HTTP request settings.
#[configurable(derived)]
prabhatsharma marked this conversation as resolved.
Show resolved Hide resolved
#[serde(default)]
request: RequestConfig,

/// The compression algorithm to use.
#[configurable(derived)]
#[serde(default = "Compression::gzip_default")]
compression: Compression,

/// Encoding configuration.
#[configurable(derived)]
prabhatsharma marked this conversation as resolved.
Show resolved Hide resolved
encoding: EncodingConfig,

/// The batch settings for the sink.
#[configurable(derived)]
#[serde(default)]
pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,

/// Controls how acknowledgements are handled for this sink.
#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
acknowledgements: AcknowledgementsConfig,

/// The TLS settings for the connection.
///
/// Optional, constrains TLS settings for this sink.
#[configurable(derived)]
tls: Option<TlsConfig>,
}

impl GenerateConfig for OpenObserveConfig {
fn generate_config() -> toml::Value {
toml::from_str(
r#"
uri = "http://localhost:5080/api/default/default/_json"
Auth = "user: [email protected], password: your_ingestion_password"
encoding.codec = "json"
"#,
)
.unwrap()
}
}

fn default_endpoint() -> UriSerde {
UriSerde {
uri: Uri::from_static("http://localhost:5080/api/default/default/_json"),
auth: None,
}
}

/// This sink wraps the Vector HTTP sink to provide official support for OpenObserve's
/// native HTTP ingest endpoint. By doing so, it maintains a distinct configuration for
/// the OpenObserve sink, separate from the Vector HTTP sink. This approach ensures
/// that future changes to OpenObserve's interface can be accommodated without impacting
/// the underlying Vector HTTP sink.
#[async_trait::async_trait]
#[typetag::serde(name = "openobserve")]
impl SinkConfig for OpenObserveConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let request = self.request.clone();
let http_sink_config = HttpSinkConfig {
uri: self.uri.clone(),
compression: self.compression,
auth: self.auth.choose_one(&self.uri.auth)?,
method: HttpMethod::Post,
tls: self.tls.clone(),
request,
acknowledgements: self.acknowledgements,
batch: self.batch,
headers: None,
encoding: EncodingConfigWithFraming::new(
Some(FramingConfig::Bytes),
SerializerConfig::Json(JsonSerializerConfig::default()),
self.encoding.transformer(),
),
payload_prefix: "".into(), // Always newline delimited JSON
payload_suffix: "".into(), // Always newline delimited JSON
};

http_sink_config.build(cx).await
}

fn input(&self) -> Input {
Input::new(self.encoding.config().input_type() & DataType::Log)
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}

#[cfg(test)]
mod test {
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<super::OpenObserveConfig>();
}
}
Loading
Loading