Skip to content

Commit

Permalink
chore(ci): Swap out mockwatchlogs for localstack (#21114)
Browse files Browse the repository at this point in the history
* chore(ci): Swap out mockwatchlogs for localstack

Now that localstack supports CloudWatch Logs as a service.

Signed-off-by: Jesse Szwedko <[email protected]>

* Enable logs

Signed-off-by: Jesse Szwedko <[email protected]>

* Fix port

Signed-off-by: Jesse Szwedko <[email protected]>

* sort arrays when comparing

Signed-off-by: balonik <[email protected]>

* clippy

Signed-off-by: Jesse Szwedko <[email protected]>

---------

Signed-off-by: Jesse Szwedko <[email protected]>
Signed-off-by: balonik <[email protected]>
Co-authored-by: balonik <[email protected]>
  • Loading branch information
jszwedko and balonik committed Aug 26, 2024
1 parent fe2cc26 commit 46fccce
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 33 deletions.
4 changes: 1 addition & 3 deletions scripts/integration/aws/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ services:
mock-localstack:
image: docker.io/localstack/localstack:3
environment:
- SERVICES=kinesis,s3,cloudwatch,es,firehose,sqs,sns
mock-watchlogs:
image: docker.io/luciofranco/mockwatchlogs:latest
- SERVICES=kinesis,s3,cloudwatch,es,firehose,sqs,sns,logs
mock-ecs:
image: docker.io/amazon/amazon-ecs-local-container-endpoints:latest
volumes:
Expand Down
1 change: 0 additions & 1 deletion scripts/integration/aws/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ env:
S3_ADDRESS: http://mock-localstack:4566
SQS_ADDRESS: http://mock-localstack:4566
SNS_ADDRESS: http://mock-localstack:4566
WATCHLOGS_ADDRESS: http://mock-watchlogs:6000

matrix:
version: [latest]
Expand Down
58 changes: 29 additions & 29 deletions src/sinks/aws_cloudwatch_logs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use crate::{

const GROUP_NAME: &str = "vector-cw";

fn watchlogs_address() -> String {
std::env::var("WATCHLOGS_ADDRESS").unwrap_or_else(|_| "http://localhost:6000".into())
fn cloudwatch_address() -> String {
std::env::var("CLOUDWATCH_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into())
}

#[tokio::test]
Expand All @@ -39,7 +39,7 @@ async fn cloudwatch_insert_log_event() {
let config = CloudwatchLogsSinkConfig {
stream_name: Template::try_from(stream_name.as_str()).unwrap(),
group_name: Template::try_from(GROUP_NAME).unwrap(),
region: RegionOrEndpoint::with_both("us-east-1", watchlogs_address().as_str()),
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
Expand All @@ -57,7 +57,7 @@ async fn cloudwatch_insert_log_event() {

let timestamp = chrono::Utc::now();

let (input_lines, events) = random_lines_with_stream(100, 11, None);
let (mut input_lines, events) = random_lines_with_stream(100, 11, None);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;

let response = create_client_test()
Expand All @@ -72,12 +72,12 @@ async fn cloudwatch_insert_log_event() {

let events = response.events.unwrap();

let output_lines = events
let mut output_lines = events
.into_iter()
.map(|e| e.message.unwrap())
.collect::<Vec<_>>();

assert_eq!(output_lines, input_lines);
assert_eq!(output_lines.sort(), input_lines.sort());
}

#[tokio::test]
Expand All @@ -90,7 +90,7 @@ async fn cloudwatch_insert_log_events_sorted() {
let config = CloudwatchLogsSinkConfig {
stream_name: Template::try_from(stream_name.as_str()).unwrap(),
group_name: Template::try_from(GROUP_NAME).unwrap(),
region: RegionOrEndpoint::with_both("us-east-1", watchlogs_address().as_str()),
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
Expand Down Expand Up @@ -145,15 +145,15 @@ async fn cloudwatch_insert_log_events_sorted() {

let events = response.events.unwrap();

let output_lines = events
let mut output_lines = events
.into_iter()
.map(|e| e.message.unwrap())
.collect::<Vec<_>>();

// readjust input_lines in the same way we have readjusted timestamps.
let first = input_lines.remove(0);
input_lines.push(first);
assert_eq!(output_lines, input_lines);
assert_eq!(output_lines.sort(), input_lines.sort());
}

#[tokio::test]
Expand All @@ -166,7 +166,7 @@ async fn cloudwatch_insert_out_of_range_timestamp() {
let config = CloudwatchLogsSinkConfig {
stream_name: Template::try_from(stream_name.as_str()).unwrap(),
group_name: Template::try_from(GROUP_NAME).unwrap(),
region: RegionOrEndpoint::with_both("us-east-1", watchlogs_address().as_str()),
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
Expand Down Expand Up @@ -225,12 +225,12 @@ async fn cloudwatch_insert_out_of_range_timestamp() {

let events = response.events.unwrap();

let output_lines = events
let mut output_lines = events
.into_iter()
.map(|e| e.message.unwrap())
.collect::<Vec<_>>();

assert_eq!(output_lines, lines);
assert_eq!(output_lines.sort(), lines.sort());
}

#[tokio::test]
Expand All @@ -243,7 +243,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() {
let config = CloudwatchLogsSinkConfig {
stream_name: Template::try_from(stream_name.as_str()).unwrap(),
group_name: Template::try_from(group_name.as_str()).unwrap(),
region: RegionOrEndpoint::with_both("us-east-1", watchlogs_address().as_str()),
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
Expand All @@ -261,7 +261,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() {

let timestamp = chrono::Utc::now();

let (input_lines, events) = random_lines_with_stream(100, 11, None);
let (mut input_lines, events) = random_lines_with_stream(100, 11, None);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;

let response = create_client_test()
Expand All @@ -276,12 +276,12 @@ async fn cloudwatch_dynamic_group_and_stream_creation() {

let events = response.events.unwrap();

let output_lines = events
let mut output_lines = events
.into_iter()
.map(|e| e.message.unwrap())
.collect::<Vec<_>>();

assert_eq!(output_lines, input_lines);
assert_eq!(output_lines.sort(), input_lines.sort());
}

#[tokio::test]
Expand All @@ -299,7 +299,7 @@ async fn cloudwatch_insert_log_event_batched() {
let config = CloudwatchLogsSinkConfig {
stream_name: Template::try_from(stream_name.as_str()).unwrap(),
group_name: Template::try_from(group_name.as_str()).unwrap(),
region: RegionOrEndpoint::with_both("us-east-1", watchlogs_address().as_str()),
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
Expand All @@ -317,7 +317,7 @@ async fn cloudwatch_insert_log_event_batched() {

let timestamp = chrono::Utc::now();

let (input_lines, events) = random_lines_with_stream(100, 11, None);
let (mut input_lines, events) = random_lines_with_stream(100, 11, None);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;

let response = create_client_test()
Expand All @@ -332,12 +332,12 @@ async fn cloudwatch_insert_log_event_batched() {

let events = response.events.unwrap();

let output_lines = events
let mut output_lines = events
.into_iter()
.map(|e| e.message.unwrap())
.collect::<Vec<_>>();

assert_eq!(output_lines, input_lines);
assert_eq!(output_lines.sort(), input_lines.sort());
}

#[tokio::test]
Expand All @@ -350,7 +350,7 @@ async fn cloudwatch_insert_log_event_partitioned() {
let config = CloudwatchLogsSinkConfig {
group_name: Template::try_from(GROUP_NAME).unwrap(),
stream_name: Template::try_from(format!("{}-{{{{key}}}}", stream_name)).unwrap(),
region: RegionOrEndpoint::with_both("us-east-1", watchlogs_address().as_str()),
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
Expand Down Expand Up @@ -393,19 +393,19 @@ async fn cloudwatch_insert_log_event_partitioned() {
.await
.unwrap();
let events = response.events.unwrap();
let output_lines = events
let mut output_lines = events
.into_iter()
.map(|e| e.message.unwrap())
.collect::<Vec<_>>();
let expected_output = input_lines
let mut expected_output = input_lines
.clone()
.into_iter()
.enumerate()
.filter(|(i, _)| i % 2 == 0)
.map(|(_, e)| e)
.collect::<Vec<_>>();

assert_eq!(output_lines, expected_output);
assert_eq!(output_lines.sort(), expected_output.sort());

let response = create_client_test()
.await
Expand All @@ -418,19 +418,19 @@ async fn cloudwatch_insert_log_event_partitioned() {
.unwrap();

let events = response.events.unwrap();
let output_lines = events
let mut output_lines = events
.into_iter()
.map(|e| e.message.unwrap())
.collect::<Vec<_>>();
let expected_output = input_lines
let mut expected_output = input_lines
.clone()
.into_iter()
.enumerate()
.filter(|(i, _)| i % 2 == 1)
.map(|(_, e)| e)
.collect::<Vec<_>>();

assert_eq!(output_lines, expected_output);
assert_eq!(output_lines.sort(), expected_output.sort());
}

#[tokio::test]
Expand All @@ -443,7 +443,7 @@ async fn cloudwatch_healthcheck() {
let config = CloudwatchLogsSinkConfig {
stream_name: Template::try_from("test-stream").unwrap(),
group_name: Template::try_from(GROUP_NAME).unwrap(),
region: RegionOrEndpoint::with_both("us-east-1", watchlogs_address().as_str()),
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
Expand All @@ -464,7 +464,7 @@ async fn cloudwatch_healthcheck() {
async fn create_client_test() -> CloudwatchLogsClient {
let auth = AwsAuthentication::test_auth();
let region = Some(Region::new("us-east-1"));
let endpoint = Some(watchlogs_address());
let endpoint = Some(cloudwatch_address());
let proxy = ProxyConfig::default();

create_client::<CloudwatchLogsClientBuilder>(&auth, region, endpoint, &proxy, &None, &None)
Expand Down

0 comments on commit 46fccce

Please sign in to comment.