Skip to content

Commit

Permalink
feat(aws_cloudwatch_logs sink): allow setting type of log class to cr…
Browse files Browse the repository at this point in the history
…eate

Problem: Prior to this commit it was not possible to specify the log
group's class type. The method prior always created a Standard type.

------------------------------------------------------------

Solution: Allow specifying the log group class type via a new field,
`group_class` which takes over the `create_missing_group`.

deprecation: `create_missing_group`

Initial Issue Report: vectordotdev#22008

Closes vectordotdev#22008
  • Loading branch information
PriceHiller committed Dec 13, 2024
1 parent 0b801eb commit 7758e8f
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
This deprecates the `create_missing_group` field within the AWS Cloudwatch logs. Use the
`group_class` field to explicitly specify the correct log class group to use if you need to auto
create a missing group.

authors: PriceHiller
3 changes: 3 additions & 0 deletions changelog.d/22008-specify-cloudwatch-log-class.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The Cloudwatch Logs Sink now supports specifying the type of log class to create.

authors: PriceHiller
39 changes: 39 additions & 0 deletions src/sinks/aws_cloudwatch_logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,30 @@ where
}
}

/// Defines the log class to create if missing
///
/// See https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CloudWatch_Logs_Log_Classes.html
#[configurable_component]
#[derive(Clone, Debug, Default)]
pub enum LogGroupClassDef {
/// Logs that require real-time monitoring or frequently accessed logs
#[default]
Standard,
/// Log class that can be used to cost-effectively consolidate logs
InfrequentAccess,
}

impl From<LogGroupClassDef> for aws_sdk_cloudwatchlogs::types::LogGroupClass {
fn from(value: LogGroupClassDef) -> Self {
match value {
LogGroupClassDef::Standard => aws_sdk_cloudwatchlogs::types::LogGroupClass::Standard,
LogGroupClassDef::InfrequentAccess => {
aws_sdk_cloudwatchlogs::types::LogGroupClass::InfrequentAccess
}
}
}
}

/// Configuration for the `aws_cloudwatch_logs` sink.
#[configurable_component(sink(
"aws_cloudwatch_logs",
Expand Down Expand Up @@ -115,9 +139,22 @@ pub struct CloudwatchLogsSinkConfig {
/// the first stream.
///
/// [log_group]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
#[configurable(deprecated, metadata(docs::hidden))]
#[serde(default = "crate::serde::default_true")]
pub create_missing_group: bool,

/// Dynamically create a [log group][log_group] if it does not already exist with the specified
/// [group class][group_class].
///
/// This ignores `create_missing_stream` directly after creating the group and creates
/// the first stream.
///
/// [log_group]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
/// [group_class]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CloudWatch_Logs_Log_Classes.html
#[configurable(derived)]
#[serde(default)]
pub group_class: Option<LogGroupClassDef>,

/// Dynamically create a [log stream][log_stream] if it does not already exist.
///
/// [log_stream]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
Expand Down Expand Up @@ -232,12 +269,14 @@ impl GenerateConfig for CloudwatchLogsSinkConfig {
}

fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig {
#[allow(deprecated)]
CloudwatchLogsSinkConfig {
encoding,
group_name: Default::default(),
stream_name: Default::default(),
region: Default::default(),
create_missing_group: true,
group_class: Default::default(),
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_cloudwatch_logs/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub async fn healthcheck(
if config.group_name.is_dynamic() {
info!("Skipping healthcheck log group check: `group_name` is dynamic.");
Ok(())
} else if config.create_missing_group {
} else if config.group_class.is_some() {
info!("Skipping healthcheck log group check: `group_name` will be created if missing.");
Ok(())
} else {
Expand Down
7 changes: 7 additions & 0 deletions src/sinks/aws_cloudwatch_logs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async fn cloudwatch_insert_log_event() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
group_class: Default::default(),
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
Expand Down Expand Up @@ -94,6 +95,7 @@ async fn cloudwatch_insert_log_events_sorted() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
group_class: Default::default(),
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
Expand Down Expand Up @@ -170,6 +172,7 @@ async fn cloudwatch_insert_out_of_range_timestamp() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
group_class: Default::default(),
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
Expand Down Expand Up @@ -247,6 +250,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
group_class: Default::default(),
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
Expand Down Expand Up @@ -303,6 +307,7 @@ async fn cloudwatch_insert_log_event_batched() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
group_class: Default::default(),
retention: Default::default(),
compression: Default::default(),
batch,
Expand Down Expand Up @@ -354,6 +359,7 @@ async fn cloudwatch_insert_log_event_partitioned() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
group_class: Default::default(),
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
Expand Down Expand Up @@ -447,6 +453,7 @@ async fn cloudwatch_healthcheck() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
group_class: Default::default(),
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
Expand Down
20 changes: 13 additions & 7 deletions src/sinks/aws_cloudwatch_logs/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use aws_sdk_cloudwatchlogs::{
put_log_events::{PutLogEventsError, PutLogEventsOutput},
put_retention_policy::PutRetentionPolicyError,
},
types::InputLogEvent,
types::{InputLogEvent, LogGroupClass},
Client as CloudwatchLogsClient,
};
use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
Expand All @@ -38,6 +38,7 @@ struct Client {
client: CloudwatchLogsClient,
stream_name: String,
group_name: String,
group_class: Option<LogGroupClass>,
headers: IndexMap<HeaderName, HeaderValue>,
retention_days: u32,
}
Expand All @@ -60,18 +61,20 @@ impl CloudwatchFuture {
headers: IndexMap<HeaderName, HeaderValue>,
stream_name: String,
group_name: String,
create_missing_group: bool,
group_class: Option<LogGroupClass>,
create_missing_stream: bool,
retention: Retention,
mut events: Vec<Vec<InputLogEvent>>,
token: Option<String>,
token_tx: oneshot::Sender<Option<String>>,
) -> Self {
let retention_days = retention.days;
let create_missing_group = group_class.is_some();
let client = Client {
client,
stream_name,
group_name,
group_class,
headers,
retention_days,
};
Expand Down Expand Up @@ -288,12 +291,15 @@ impl Client {
pub fn create_log_group(&self) -> ClientResult<(), CreateLogGroupError> {
let client = self.client.clone();
let group_name = self.group_name.clone();
let group_class = self.group_class.clone();

Box::pin(async move {
client
.create_log_group()
.log_group_name(group_name)
.send()
.await?;
let mut client_log_group_builder = client.create_log_group().log_group_name(group_name);
client_log_group_builder = match group_class {
Some(class) => client_log_group_builder.log_group_class(class),
None => client_log_group_builder,
};
client_log_group_builder.send().await?;
Ok(())
})
}
Expand Down
13 changes: 8 additions & 5 deletions src/sinks/aws_cloudwatch_logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aws_sdk_cloudwatchlogs::{
describe_log_streams::DescribeLogStreamsError, put_log_events::PutLogEventsError,
put_retention_policy::PutRetentionPolicyError,
},
types::InputLogEvent,
types::{InputLogEvent, LogGroupClass},
Client as CloudwatchLogsClient,
};
use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
Expand Down Expand Up @@ -235,7 +235,10 @@ impl CloudwatchLogsSvc {
let group_name = key.group.clone();
let stream_name = key.stream.clone();

let create_missing_group = config.create_missing_group;
let group_class = match config.group_class {
Some(class) => Some(LogGroupClass::from(class)),
None => None,
};
let create_missing_stream = config.create_missing_stream;

let retention = config.retention.clone();
Expand All @@ -245,7 +248,7 @@ impl CloudwatchLogsSvc {
client,
stream_name,
group_name,
create_missing_group,
group_class,
create_missing_stream,
retention,
token: None,
Expand Down Expand Up @@ -319,7 +322,7 @@ impl Service<Vec<InputLogEvent>> for CloudwatchLogsSvc {
self.headers.clone(),
self.stream_name.clone(),
self.group_name.clone(),
self.create_missing_group,
self.group_class.clone(),
self.create_missing_stream,
self.retention.clone(),
event_batches,
Expand All @@ -337,7 +340,7 @@ pub struct CloudwatchLogsSvc {
headers: IndexMap<HeaderName, HeaderValue>,
stream_name: String,
group_name: String,
create_missing_group: bool,
group_class: Option<LogGroupClass>,
create_missing_stream: bool,
retention: Retention,
token: Option<String>,
Expand Down
29 changes: 17 additions & 12 deletions website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue
Original file line number Diff line number Diff line change
Expand Up @@ -198,18 +198,6 @@ base: components: sinks: aws_cloudwatch_logs: configuration: {
}
}
}
create_missing_group: {
description: """
Dynamically create a [log group][log_group] if it does not already exist.
This ignores `create_missing_stream` directly after creating the group and creates
the first stream.
[log_group]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
"""
required: false
type: bool: default: true
}
create_missing_stream: {
description: """
Dynamically create a [log stream][log_stream] if it does not already exist.
Expand Down Expand Up @@ -566,6 +554,23 @@ base: components: sinks: aws_cloudwatch_logs: configuration: {
required: false
type: string: examples: ["http://127.0.0.0:5000/path/to/service"]
}
group_class: {
description: """
Dynamically create a [log group][log_group] if it does not already exist with the specified
[group class][group_class].
This ignores `create_missing_stream` directly after creating the group and creates
the first stream.
[log_group]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
[group_class]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CloudWatch_Logs_Log_Classes.html
"""
required: false
type: string: enum: {
InfrequentAccess: "Log class that can be used to cost-effectively consolidate logs"
Standard: "Logs that require real-time monitoring or frequently accessed logs"
}
}
group_name: {
description: """
The [group name][group_name] of the target CloudWatch Logs stream.
Expand Down

0 comments on commit 7758e8f

Please sign in to comment.