diff --git a/Cargo.toml b/Cargo.toml index d693da56644e9..2a1006847bced 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -710,6 +710,7 @@ sinks-logs = [ "sinks-humio", "sinks-influxdb", "sinks-kafka", + "sinks-keep", "sinks-loki", "sinks-mezmo", "sinks-mqtt", @@ -776,6 +777,7 @@ sinks-http = [] sinks-humio = ["sinks-splunk_hec", "transforms-metric_to_log"] sinks-influxdb = [] sinks-kafka = ["dep:rdkafka"] +sinks-keep = [] sinks-mezmo = [] sinks-loki = ["loki-logproto"] sinks-mqtt = ["dep:rumqttc"] diff --git a/src/sinks/keep/config.rs b/src/sinks/keep/config.rs new file mode 100644 index 0000000000000..f6ca30888fa35 --- /dev/null +++ b/src/sinks/keep/config.rs @@ -0,0 +1,175 @@ +//! Configuration for the `keep` sink. + +use bytes::Bytes; +use futures::FutureExt; +use http::{Request, StatusCode, Uri}; +use vector_lib::configurable::configurable_component; +use vector_lib::sensitive_string::SensitiveString; +use vrl::value::Kind; + +use crate::{ + http::HttpClient, + sinks::{ + prelude::*, + util::{ + http::{http_response_retry_logic, HttpService}, + BatchConfig, BoxedRawValue, + }, + }, +}; + +use super::{ + encoder::KeepEncoder, request_builder::KeepRequestBuilder, service::KeepSvcRequestBuilder, + sink::KeepSink, +}; + +pub(super) const HTTP_HEADER_KEEP_API_KEY: &str = "x-api-key"; + +/// Configuration for the `keep` sink. +#[configurable_component(sink("keep", "Deliver log events to Keep."))] +#[derive(Clone, Debug)] +pub struct KeepConfig { + /// Keep's endpoint to send logs to + #[serde(default = "default_endpoint")] + #[configurable(metadata( + docs::examples = "https://backend.keep.com:8081/alerts/event/vectordev?provider_id=test", + ))] + #[configurable(validation(format = "uri"))] + pub(super) endpoint: String, + + /// The API key that is used to authenticate against Keep. + #[configurable(metadata(docs::examples = "${KEEP_API_KEY}"))] + #[configurable(metadata(docs::examples = "keepappkey"))] + api_key: SensitiveString, + + #[configurable(derived)] + #[serde(default)] + batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + request: TowerRequestConfig, + + #[configurable(derived)] + #[serde(default, skip_serializing_if = "crate::serde::is_default")] + encoding: Transformer, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::is_default" + )] + acknowledgements: AcknowledgementsConfig, +} + +fn default_endpoint() -> String { + "http://localhost:8080/alerts/event/vectordev?provider_id=test".to_string() +} + +#[derive(Clone, Copy, Debug, Default)] +struct KeepDefaultBatchSettings; + +impl SinkBatchSettings for KeepDefaultBatchSettings { + const MAX_EVENTS: Option = None; + const MAX_BYTES: Option = Some(100_000); + const TIMEOUT_SECS: f64 = 1.0; +} + +impl GenerateConfig for KeepConfig { + fn generate_config() -> toml::Value { + toml::from_str( + r#"api_key = "${KEEP_API_KEY}" + "#, + ) + .unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "keep")] +impl SinkConfig for KeepConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let batch_settings = self.batch.validate()?.into_batcher_settings()?; + + let request_builder = KeepRequestBuilder { + encoder: KeepEncoder { + transformer: self.encoding.clone(), + }, + // TODO: add compression support + compression: Compression::None, + }; + + let uri: Uri = self.endpoint.clone().try_into()?; + let keep_service_request_builder = KeepSvcRequestBuilder { + uri: uri.clone(), + api_key: self.api_key.clone(), + }; + + let client = HttpClient::new(None, cx.proxy())?; + + let service = HttpService::new(client.clone(), keep_service_request_builder); + + let request_limits = self.request.into_settings(); + + let service = ServiceBuilder::new() + .settings(request_limits, http_response_retry_logic()) + .service(service); + + let sink = KeepSink::new(service, batch_settings, request_builder); + + let healthcheck = healthcheck(uri, self.api_key.clone(), client).boxed(); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + let requirement = Requirement::empty().optional_meaning("timestamp", Kind::timestamp()); + + Input::log().with_schema_requirement(requirement) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +async fn healthcheck(uri: Uri, api_key: SensitiveString, client: HttpClient) -> crate::Result<()> { + let request = Request::post(uri).header(HTTP_HEADER_KEEP_API_KEY, api_key.inner()); + let body = crate::serde::json::to_bytes(&Vec::::new()) + .unwrap() + .freeze(); + let req: Request = request.body(body)?; + let req = req.map(hyper::Body::from); + + let res = client.send(req).await?; + + let status = res.status(); + let body = hyper::body::to_bytes(res.into_body()).await?; + + match status { + StatusCode::OK => Ok(()), // Healthcheck passed + StatusCode::BAD_REQUEST => Ok(()), // Healthcheck failed due to client error but is still considered valid + StatusCode::ACCEPTED => Ok(()), // Consider healthcheck passed if server accepted request + StatusCode::UNAUTHORIZED => { + // Handle unauthorized errors + let json: serde_json::Value = serde_json::from_slice(&body[..])?; + let message = json + .as_object() + .and_then(|o| o.get("error")) + .and_then(|s| s.as_str()) + .unwrap_or("Token is not valid, 401 returned.") + .to_string(); + Err(message.into()) + } + _ => { + // Handle other unexpected statuses + let body = String::from_utf8_lossy(&body[..]); + Err(format!( + "Server returned unexpected error status: {} body: {}", + status, body + ) + .into()) + } + } +} diff --git a/src/sinks/keep/encoder.rs b/src/sinks/keep/encoder.rs new file mode 100644 index 0000000000000..95e44ea170c20 --- /dev/null +++ b/src/sinks/keep/encoder.rs @@ -0,0 +1,50 @@ +//! Encoding for the `keep` sink. + +use bytes::Bytes; +use serde_json::{json, to_vec}; +use std::io; + +use crate::sinks::{ + prelude::*, + util::encoding::{write_all, Encoder as SinkEncoder}, +}; + +pub(super) struct KeepEncoder { + pub(super) transformer: Transformer, +} + +impl SinkEncoder> for KeepEncoder { + fn encode_input( + &self, + events: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + let mut byte_size = telemetry().create_request_count_byte_size(); + let n_events = events.len(); + let mut json_events: Vec = Vec::with_capacity(n_events); + + for mut event in events { + self.transformer.transform(&mut event); + + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + + let mut data = json!(event.as_log()); + if let Some(message) = data.get("message") { + if let Some(message_str) = message.as_str() { + // Parse the JSON string in `message` + let parsed_message: serde_json::Value = serde_json::from_str(message_str)?; + + // Reassign the parsed JSON back to `message` + data["message"] = parsed_message; + } + } + data["keep_source_type"] = json!(event.source_id()); + + json_events.push(data); + } + + let body = Bytes::from(to_vec(&serde_json::Value::Array(json_events))?); + + write_all(writer, n_events, body.as_ref()).map(|()| (body.len(), byte_size)) + } +} diff --git a/src/sinks/keep/mod.rs b/src/sinks/keep/mod.rs new file mode 100644 index 0000000000000..594b5fa963df2 --- /dev/null +++ b/src/sinks/keep/mod.rs @@ -0,0 +1,10 @@ +//! The Keep [`vector_lib::sink::VectorSink`]. +//! +//! This module contains the [`vector_lib::sink::VectorSink`] instance that is responsible for +//! taking a stream of [`vector_lib::event::Event`]s and forwarding them to the Keep service. + +mod config; +mod encoder; +mod request_builder; +mod service; +mod sink; diff --git a/src/sinks/keep/request_builder.rs b/src/sinks/keep/request_builder.rs new file mode 100644 index 0000000000000..5c58d3fee88fc --- /dev/null +++ b/src/sinks/keep/request_builder.rs @@ -0,0 +1,48 @@ +//! `RequestBuilder` implementation for the `keep` sink. + +use bytes::Bytes; +use std::io; + +use crate::sinks::{prelude::*, util::http::HttpRequest}; + +use super::encoder::KeepEncoder; + +pub(super) struct KeepRequestBuilder { + pub(super) encoder: KeepEncoder, + pub(super) compression: Compression, +} + +impl RequestBuilder> for KeepRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = KeepEncoder; + type Payload = Bytes; + type Request = HttpRequest<()>; + type Error = io::Error; + + fn compression(&self) -> Compression { + self.compression + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut events: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + (finalizers, builder, events) + } + + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + HttpRequest::new(payload.into_payload(), metadata, request_metadata, ()) + } +} diff --git a/src/sinks/keep/service.rs b/src/sinks/keep/service.rs new file mode 100644 index 0000000000000..89b9ebaacf7c9 --- /dev/null +++ b/src/sinks/keep/service.rs @@ -0,0 +1,33 @@ +//! Service implementation for the `keep` sink. + +use bytes::Bytes; +use http::{Request, Uri}; +use vector_lib::sensitive_string::SensitiveString; + +use crate::sinks::{ + util::http::{HttpRequest, HttpServiceRequestBuilder}, + HTTPRequestBuilderSnafu, +}; +use snafu::ResultExt; + +use super::config::HTTP_HEADER_KEEP_API_KEY; + +#[derive(Debug, Clone)] +pub(super) struct KeepSvcRequestBuilder { + pub(super) uri: Uri, + pub(super) api_key: SensitiveString, +} + +impl HttpServiceRequestBuilder<()> for KeepSvcRequestBuilder { + fn build(&self, mut request: HttpRequest<()>) -> Result, crate::Error> { + let builder = + Request::post(&self.uri).header(HTTP_HEADER_KEEP_API_KEY, self.api_key.inner()); + + let builder = builder.header("Content-Type".to_string(), "application/json".to_string()); + + builder + .body(request.take_payload()) + .context(HTTPRequestBuilderSnafu) + .map_err(Into::into) + } +} diff --git a/src/sinks/keep/sink.rs b/src/sinks/keep/sink.rs new file mode 100644 index 0000000000000..0a990767f996f --- /dev/null +++ b/src/sinks/keep/sink.rs @@ -0,0 +1,77 @@ +//! Implementation of the `keep` sink. + +use crate::sinks::{ + prelude::*, + util::http::{HttpJsonBatchSizer, HttpRequest}, +}; + +use super::request_builder::KeepRequestBuilder; + +pub(super) struct KeepSink { + service: S, + batch_settings: BatcherSettings, + request_builder: KeepRequestBuilder, +} + +impl KeepSink +where + S: Service> + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + /// Creates a new `keep`. + pub(super) const fn new( + service: S, + batch_settings: BatcherSettings, + request_builder: KeepRequestBuilder, + ) -> Self { + Self { + service, + batch_settings, + request_builder, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + // Batch the input stream with size calculation based on the estimated encoded json size + .batched(self.batch_settings.as_item_size_config(HttpJsonBatchSizer)) + // Build requests with default concurrency limit. + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) + // Filter out any errors that occurred in the request building. + .filter_map(|request| async move { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + // Generate the driver that will send requests and handle retries, + // event finalization, and logging/internal metric reporting. + .into_driver(self.service) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for KeepSink +where + S: Service> + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index d3c1e1982a67b..3cf5613324bbe 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -70,6 +70,8 @@ pub mod humio; pub mod influxdb; #[cfg(feature = "sinks-kafka")] pub mod kafka; +#[cfg(feature = "sinks-keep")] +pub mod keep; #[cfg(feature = "sinks-loki")] pub mod loki; #[cfg(feature = "sinks-mezmo")]