Skip to content

Commit

Permalink
feat(new sink): Add possibility to use nats jetstream in nats sink (#…
Browse files Browse the repository at this point in the history
…20834)

* use nats jetstream as an option

* no need to manually flush messages

* fix jetstream option annotation + prettify code

* generate component docs

* add more precise field description + generate docs

* check nats stream existence in healthcheck

* do not check stream existance

* add new field to struct in tests

* add changelog

* add author to changelog

* remove example config from changelog

* flush core messages

* flush after each message
  • Loading branch information
whatcouldbepizza authored Jul 24, 2024
1 parent 787e1ec commit 56bd0dd
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 10 deletions.
3 changes: 3 additions & 0 deletions changelog.d/20834_nats_jetstream_sink.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add possibility to use NATS JetStream in NATS sink. Can be turned on/off via `jetstream` option (default is false).

authors: whatcouldbepizza
63 changes: 63 additions & 0 deletions src/sinks/nats/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use bytes::Bytes;
use futures_util::TryFutureExt;
use snafu::ResultExt;
use vector_lib::codecs::JsonSerializerConfig;
Expand Down Expand Up @@ -76,6 +77,14 @@ pub struct NatsSinkConfig {
#[configurable(derived)]
#[serde(default)]
pub(super) request: TowerRequestConfig<NatsTowerRequestConfigDefaults>,

/// Send messages using [Jetstream][jetstream].
///
/// If set, the `subject` must belong to an existing JetStream stream.
///
/// [jetstream]: https://docs.nats.io/nats-concepts/jetstream
#[serde(default)]
pub(super) jetstream: bool,
}

fn default_name() -> String {
Expand All @@ -93,6 +102,7 @@ impl GenerateConfig for NatsSinkConfig {
tls: None,
url: "nats://127.0.0.1:4222".into(),
request: Default::default(),
jetstream: Default::default(),
})
.unwrap()
}
Expand Down Expand Up @@ -130,8 +140,61 @@ impl NatsSinkConfig {

options.connect(&self.url).await.context(ConnectSnafu)
}

pub(super) async fn publisher(&self) -> Result<NatsPublisher, NatsError> {
let connection = self.connect().await?;

if self.jetstream {
Ok(NatsPublisher::JetStream(async_nats::jetstream::new(
connection,
)))
} else {
Ok(NatsPublisher::Core(connection))
}
}
}

async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> {
config.connect().map_ok(|_| ()).map_err(|e| e.into()).await
}

pub enum NatsPublisher {
Core(async_nats::Client),
JetStream(async_nats::jetstream::Context),
}

impl NatsPublisher {
pub(super) async fn publish<S: async_nats::subject::ToSubject>(
&self,
subject: S,
payload: Bytes,
) -> Result<(), NatsError> {
match self {
NatsPublisher::Core(client) => {
client
.publish(subject, payload)
.await
.map_err(|e| NatsError::PublishError {
source: Box::new(e),
})?;
client
.flush()
.map_ok(|_| ())
.map_err(|e| NatsError::PublishError {
source: Box::new(e),
})
.await
}
NatsPublisher::JetStream(jetstream) => {
let ack = jetstream.publish(subject, payload).await.map_err(|e| {
NatsError::PublishError {
source: Box::new(e),
}
})?;
ack.await.map(|_| ()).map_err(|e| NatsError::PublishError {
source: Box::new(e),
})
}
}
}
}
13 changes: 13 additions & 0 deletions src/sinks/nats/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ async fn nats_no_auth() {
tls: None,
auth: None,
request: Default::default(),
jetstream: false,
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -110,6 +111,7 @@ async fn nats_userpass_auth_valid() {
},
}),
request: Default::default(),
jetstream: false,
};

publish_and_check(conf)
Expand Down Expand Up @@ -139,6 +141,7 @@ async fn nats_userpass_auth_invalid() {
},
}),
request: Default::default(),
jetstream: false,
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -170,6 +173,7 @@ async fn nats_token_auth_valid() {
},
}),
request: Default::default(),
jetstream: false,
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -201,6 +205,7 @@ async fn nats_token_auth_invalid() {
},
}),
request: Default::default(),
jetstream: false,
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -233,6 +238,7 @@ async fn nats_nkey_auth_valid() {
},
}),
request: Default::default(),
jetstream: false,
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -265,6 +271,7 @@ async fn nats_nkey_auth_invalid() {
},
}),
request: Default::default(),
jetstream: false,
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -298,6 +305,7 @@ async fn nats_tls_valid() {
}),
auth: None,
request: Default::default(),
jetstream: false,
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -325,6 +333,7 @@ async fn nats_tls_invalid() {
tls: None,
auth: None,
request: Default::default(),
jetstream: false,
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -360,6 +369,7 @@ async fn nats_tls_client_cert_valid() {
}),
auth: None,
request: Default::default(),
jetstream: false,
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -393,6 +403,7 @@ async fn nats_tls_client_cert_invalid() {
}),
auth: None,
request: Default::default(),
jetstream: false,
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -430,6 +441,7 @@ async fn nats_tls_jwt_auth_valid() {
},
}),
request: Default::default(),
jetstream: false,
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -467,6 +479,7 @@ async fn nats_tls_jwt_auth_invalid() {
},
}),
request: Default::default(),
jetstream: false,
};

let r = publish_and_check(conf).await;
Expand Down
2 changes: 2 additions & 0 deletions src/sinks/nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ enum NatsError {
Connect { source: async_nats::ConnectError },
#[snafu(display("NATS Server Error: {}", source))]
ServerError { source: async_nats::Error },
#[snafu(display("NATS Publish Error: {}", source))]
PublishError { source: async_nats::Error },
}
9 changes: 4 additions & 5 deletions src/sinks/nats/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use futures_util::TryFutureExt;

use crate::sinks::prelude::*;

use super::{request_builder::NatsRequest, NatsError};
use super::{config::NatsPublisher, request_builder::NatsRequest, NatsError};

#[derive(Clone)]
pub(super) struct NatsService {
pub(super) connection: Arc<async_nats::Client>,
pub(super) publisher: Arc<NatsPublisher>,
}

pub(super) struct NatsResponse {
Expand Down Expand Up @@ -44,13 +44,12 @@ impl Service<NatsRequest> for NatsService {
}

fn call(&mut self, req: NatsRequest) -> Self::Future {
let connection = Arc::clone(&self.connection);
let publisher = Arc::clone(&self.publisher);

Box::pin(async move {
match connection
match publisher
.publish(req.subject, req.bytes)
.map_err(async_nats::Error::from)
.and_then(|_| connection.flush().map_err(Into::into))
.await
{
Err(error) => Err(NatsError::ServerError { source: error }),
Expand Down
10 changes: 5 additions & 5 deletions src/sinks/nats/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use snafu::ResultExt;
use crate::sinks::prelude::*;

use super::{
config::{NatsSinkConfig, NatsTowerRequestConfigDefaults},
config::{NatsPublisher, NatsSinkConfig, NatsTowerRequestConfigDefaults},
request_builder::{NatsEncoder, NatsRequestBuilder},
service::{NatsResponse, NatsService},
EncodingSnafu, NatsError,
Expand All @@ -20,7 +20,7 @@ pub(super) struct NatsSink {
request: TowerRequestConfig<NatsTowerRequestConfigDefaults>,
transformer: Transformer,
encoder: Encoder<()>,
connection: Arc<async_nats::Client>,
publisher: Arc<NatsPublisher>,
subject: Template,
}

Expand All @@ -42,7 +42,7 @@ impl NatsSink {
}

pub(super) async fn new(config: NatsSinkConfig) -> Result<Self, NatsError> {
let connection = Arc::new(config.connect().await?);
let publisher = Arc::new(config.publisher().await?);
let transformer = config.encoding.transformer();
let serializer = config.encoding.build().context(EncodingSnafu)?;
let encoder = Encoder::<()>::new(serializer);
Expand All @@ -51,9 +51,9 @@ impl NatsSink {

Ok(NatsSink {
request,
connection,
transformer,
encoder,
publisher,
subject,
})
}
Expand All @@ -71,7 +71,7 @@ impl NatsSink {
let service = ServiceBuilder::new()
.settings(request, NatsRetryLogic)
.service(NatsService {
connection: Arc::clone(&self.connection),
publisher: Arc::clone(&self.publisher),
});

input
Expand Down
11 changes: 11 additions & 0 deletions website/cue/reference/components/sinks/base/nats.cue
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,17 @@ base: components: sinks: nats: configuration: {
}
}
}
jetstream: {
description: """
Send messages using [Jetstream][jetstream].
If set, the `subject` must belong to an existing JetStream stream.
[jetstream]: https://docs.nats.io/nats-concepts/jetstream
"""
required: false
type: bool: default: false
}
request: {
description: """
Middleware settings for outbound requests.
Expand Down

0 comments on commit 56bd0dd

Please sign in to comment.