Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new sink): Keep sink #22072

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ sinks-logs = [
"sinks-humio",
"sinks-influxdb",
"sinks-kafka",
"sinks-keep",
"sinks-loki",
"sinks-mezmo",
"sinks-mqtt",
Expand Down Expand Up @@ -776,6 +777,7 @@ sinks-http = []
sinks-humio = ["sinks-splunk_hec", "transforms-metric_to_log"]
sinks-influxdb = []
sinks-kafka = ["dep:rdkafka"]
sinks-keep = []
sinks-mezmo = []
sinks-loki = ["loki-logproto"]
sinks-mqtt = ["dep:rumqttc"]
Expand Down
175 changes: 175 additions & 0 deletions src/sinks/keep/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
//! Configuration for the `keep` sink.

use bytes::Bytes;
use futures::FutureExt;
use http::{Request, StatusCode, Uri};
use vector_lib::configurable::configurable_component;
use vector_lib::sensitive_string::SensitiveString;
use vrl::value::Kind;

use crate::{
http::HttpClient,
sinks::{
prelude::*,
util::{
http::{http_response_retry_logic, HttpService},
BatchConfig, BoxedRawValue,
},
},
};

use super::{
encoder::KeepEncoder, request_builder::KeepRequestBuilder, service::KeepSvcRequestBuilder,
sink::KeepSink,
};

pub(super) const HTTP_HEADER_KEEP_API_KEY: &str = "x-api-key";

/// Configuration for the `keep` sink.
#[configurable_component(sink("keep", "Deliver log events to Keep."))]
#[derive(Clone, Debug)]
pub struct KeepConfig {
/// Keep's endpoint to send logs to
#[serde(default = "default_endpoint")]
#[configurable(metadata(
docs::examples = "https://backend.keep.com:8081/alerts/event/vectordev?provider_id=test",
))]
#[configurable(validation(format = "uri"))]
pub(super) endpoint: String,

/// The API key that is used to authenticate against Keep.
#[configurable(metadata(docs::examples = "${KEEP_API_KEY}"))]
#[configurable(metadata(docs::examples = "keepappkey"))]
api_key: SensitiveString,

#[configurable(derived)]
#[serde(default)]
batch: BatchConfig<KeepDefaultBatchSettings>,

#[configurable(derived)]
#[serde(default)]
request: TowerRequestConfig,

#[configurable(derived)]
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
encoding: Transformer,

#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
acknowledgements: AcknowledgementsConfig,
}

fn default_endpoint() -> String {
"http://localhost:8080/alerts/event/vectordev?provider_id=test".to_string()
}

#[derive(Clone, Copy, Debug, Default)]
struct KeepDefaultBatchSettings;

impl SinkBatchSettings for KeepDefaultBatchSettings {
const MAX_EVENTS: Option<usize> = None;
const MAX_BYTES: Option<usize> = Some(100_000);
const TIMEOUT_SECS: f64 = 1.0;
}

impl GenerateConfig for KeepConfig {
fn generate_config() -> toml::Value {
toml::from_str(
r#"api_key = "${KEEP_API_KEY}"
"#,
)
.unwrap()
}
}

#[async_trait::async_trait]
#[typetag::serde(name = "keep")]
impl SinkConfig for KeepConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let batch_settings = self.batch.validate()?.into_batcher_settings()?;

let request_builder = KeepRequestBuilder {
encoder: KeepEncoder {
transformer: self.encoding.clone(),
},
// TODO: add compression support
compression: Compression::None,
};

let uri: Uri = self.endpoint.clone().try_into()?;
let keep_service_request_builder = KeepSvcRequestBuilder {
uri: uri.clone(),
api_key: self.api_key.clone(),
};

let client = HttpClient::new(None, cx.proxy())?;

let service = HttpService::new(client.clone(), keep_service_request_builder);

let request_limits = self.request.into_settings();

let service = ServiceBuilder::new()
.settings(request_limits, http_response_retry_logic())
.service(service);

let sink = KeepSink::new(service, batch_settings, request_builder);

let healthcheck = healthcheck(uri, self.api_key.clone(), client).boxed();

Ok((VectorSink::from_event_streamsink(sink), healthcheck))
}

fn input(&self) -> Input {
let requirement = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());

Input::log().with_schema_requirement(requirement)
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}

async fn healthcheck(uri: Uri, api_key: SensitiveString, client: HttpClient) -> crate::Result<()> {
let request = Request::post(uri).header(HTTP_HEADER_KEEP_API_KEY, api_key.inner());
let body = crate::serde::json::to_bytes(&Vec::<BoxedRawValue>::new())
.unwrap()
.freeze();
let req: Request<Bytes> = request.body(body)?;
let req = req.map(hyper::Body::from);

let res = client.send(req).await?;

let status = res.status();
let body = hyper::body::to_bytes(res.into_body()).await?;

match status {
StatusCode::OK => Ok(()), // Healthcheck passed
StatusCode::BAD_REQUEST => Ok(()), // Healthcheck failed due to client error but is still considered valid
StatusCode::ACCEPTED => Ok(()), // Consider healthcheck passed if server accepted request
StatusCode::UNAUTHORIZED => {
// Handle unauthorized errors
let json: serde_json::Value = serde_json::from_slice(&body[..])?;
let message = json
.as_object()
.and_then(|o| o.get("error"))
.and_then(|s| s.as_str())
.unwrap_or("Token is not valid, 401 returned.")
.to_string();
Err(message.into())
}
_ => {
// Handle other unexpected statuses
let body = String::from_utf8_lossy(&body[..]);
Err(format!(
"Server returned unexpected error status: {} body: {}",
status, body
)
.into())
}
}
}
50 changes: 50 additions & 0 deletions src/sinks/keep/encoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//! Encoding for the `keep` sink.

use bytes::Bytes;
use serde_json::{json, to_vec};
use std::io;

use crate::sinks::{
prelude::*,
util::encoding::{write_all, Encoder as SinkEncoder},
};

pub(super) struct KeepEncoder {
pub(super) transformer: Transformer,
}

impl SinkEncoder<Vec<Event>> for KeepEncoder {
fn encode_input(
&self,
events: Vec<Event>,
writer: &mut dyn io::Write,
) -> io::Result<(usize, GroupedCountByteSize)> {
let mut byte_size = telemetry().create_request_count_byte_size();
let n_events = events.len();
let mut json_events: Vec<serde_json::Value> = Vec::with_capacity(n_events);

for mut event in events {
self.transformer.transform(&mut event);

byte_size.add_event(&event, event.estimated_json_encoded_size_of());

let mut data = json!(event.as_log());
if let Some(message) = data.get("message") {
if let Some(message_str) = message.as_str() {
// Parse the JSON string in `message`
let parsed_message: serde_json::Value = serde_json::from_str(message_str)?;

// Reassign the parsed JSON back to `message`
data["message"] = parsed_message;
}
}
data["keep_source_type"] = json!(event.source_id());

json_events.push(data);
}

let body = Bytes::from(to_vec(&serde_json::Value::Array(json_events))?);

write_all(writer, n_events, body.as_ref()).map(|()| (body.len(), byte_size))
}
}
10 changes: 10 additions & 0 deletions src/sinks/keep/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
//! The Keep [`vector_lib::sink::VectorSink`].
//!
//! This module contains the [`vector_lib::sink::VectorSink`] instance that is responsible for
//! taking a stream of [`vector_lib::event::Event`]s and forwarding them to the Keep service.

mod config;
mod encoder;
mod request_builder;
mod service;
mod sink;
48 changes: 48 additions & 0 deletions src/sinks/keep/request_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//! `RequestBuilder` implementation for the `keep` sink.

use bytes::Bytes;
use std::io;

use crate::sinks::{prelude::*, util::http::HttpRequest};

use super::encoder::KeepEncoder;

pub(super) struct KeepRequestBuilder {
pub(super) encoder: KeepEncoder,
pub(super) compression: Compression,
}

impl RequestBuilder<Vec<Event>> for KeepRequestBuilder {
type Metadata = EventFinalizers;
type Events = Vec<Event>;
type Encoder = KeepEncoder;
type Payload = Bytes;
type Request = HttpRequest<()>;
type Error = io::Error;

fn compression(&self) -> Compression {
self.compression
}

fn encoder(&self) -> &Self::Encoder {
&self.encoder
}

fn split_input(
&self,
mut events: Vec<Event>,
) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
let finalizers = events.take_finalizers();
let builder = RequestMetadataBuilder::from_events(&events);
(finalizers, builder, events)
}

fn build_request(
&self,
metadata: Self::Metadata,
request_metadata: RequestMetadata,
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
HttpRequest::new(payload.into_payload(), metadata, request_metadata, ())
}
}
33 changes: 33 additions & 0 deletions src/sinks/keep/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//! Service implementation for the `keep` sink.

use bytes::Bytes;
use http::{Request, Uri};
use vector_lib::sensitive_string::SensitiveString;

use crate::sinks::{
util::http::{HttpRequest, HttpServiceRequestBuilder},
HTTPRequestBuilderSnafu,
};
use snafu::ResultExt;

use super::config::HTTP_HEADER_KEEP_API_KEY;

#[derive(Debug, Clone)]
pub(super) struct KeepSvcRequestBuilder {
pub(super) uri: Uri,
pub(super) api_key: SensitiveString,
}

impl HttpServiceRequestBuilder<()> for KeepSvcRequestBuilder {
fn build(&self, mut request: HttpRequest<()>) -> Result<Request<Bytes>, crate::Error> {
let builder =
Request::post(&self.uri).header(HTTP_HEADER_KEEP_API_KEY, self.api_key.inner());

let builder = builder.header("Content-Type".to_string(), "application/json".to_string());

builder
.body(request.take_payload())
.context(HTTPRequestBuilderSnafu)
.map_err(Into::into)
}
}
Loading