diff --git a/scripts/integration/aws/compose.yaml b/scripts/integration/aws/compose.yaml index 07de953aaed52..c45cbe7f2e37b 100644 --- a/scripts/integration/aws/compose.yaml +++ b/scripts/integration/aws/compose.yaml @@ -6,9 +6,7 @@ services: mock-localstack: image: docker.io/localstack/localstack:3 environment: - - SERVICES=kinesis,s3,cloudwatch,es,firehose,sqs,sns - mock-watchlogs: - image: docker.io/luciofranco/mockwatchlogs:latest + - SERVICES=kinesis,s3,cloudwatch,es,firehose,sqs,sns,logs mock-ecs: image: docker.io/amazon/amazon-ecs-local-container-endpoints:latest volumes: diff --git a/scripts/integration/aws/test.yaml b/scripts/integration/aws/test.yaml index 706efe5ba6dd4..71ff65cae11f8 100644 --- a/scripts/integration/aws/test.yaml +++ b/scripts/integration/aws/test.yaml @@ -14,7 +14,6 @@ env: S3_ADDRESS: http://mock-localstack:4566 SQS_ADDRESS: http://mock-localstack:4566 SNS_ADDRESS: http://mock-localstack:4566 - WATCHLOGS_ADDRESS: http://mock-watchlogs:6000 matrix: version: [latest] diff --git a/src/sinks/aws_cloudwatch_logs/integration_tests.rs b/src/sinks/aws_cloudwatch_logs/integration_tests.rs index c87a7949c8d8a..4e80493adaf88 100644 --- a/src/sinks/aws_cloudwatch_logs/integration_tests.rs +++ b/src/sinks/aws_cloudwatch_logs/integration_tests.rs @@ -25,8 +25,8 @@ use crate::{ const GROUP_NAME: &str = "vector-cw"; -fn watchlogs_address() -> String { - std::env::var("WATCHLOGS_ADDRESS").unwrap_or_else(|_| "http://localhost:6000".into()) +fn cloudwatch_address() -> String { + std::env::var("CLOUDWATCH_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into()) } #[tokio::test] @@ -39,7 +39,7 @@ async fn cloudwatch_insert_log_event() { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(GROUP_NAME).unwrap(), - region: RegionOrEndpoint::with_both("us-east-1", watchlogs_address().as_str()), + region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()), encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, @@ -57,7 +57,7 @@ async fn cloudwatch_insert_log_event() { let timestamp = chrono::Utc::now(); - let (input_lines, events) = random_lines_with_stream(100, 11, None); + let (mut input_lines, events) = random_lines_with_stream(100, 11, None); run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await; let response = create_client_test() @@ -72,12 +72,12 @@ async fn cloudwatch_insert_log_event() { let events = response.events.unwrap(); - let output_lines = events + let mut output_lines = events .into_iter() .map(|e| e.message.unwrap()) .collect::>(); - assert_eq!(output_lines, input_lines); + assert_eq!(output_lines.sort(), input_lines.sort()); } #[tokio::test] @@ -90,7 +90,7 @@ async fn cloudwatch_insert_log_events_sorted() { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(GROUP_NAME).unwrap(), - region: RegionOrEndpoint::with_both("us-east-1", watchlogs_address().as_str()), + region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()), encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, @@ -145,7 +145,7 @@ async fn cloudwatch_insert_log_events_sorted() { let events = response.events.unwrap(); - let output_lines = events + let mut output_lines = events .into_iter() .map(|e| e.message.unwrap()) .collect::>(); @@ -153,7 +153,7 @@ async fn cloudwatch_insert_log_events_sorted() { // readjust input_lines in the same way we have readjusted timestamps. let first = input_lines.remove(0); input_lines.push(first); - assert_eq!(output_lines, input_lines); + assert_eq!(output_lines.sort(), input_lines.sort()); } #[tokio::test] @@ -166,7 +166,7 @@ async fn cloudwatch_insert_out_of_range_timestamp() { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(GROUP_NAME).unwrap(), - region: RegionOrEndpoint::with_both("us-east-1", watchlogs_address().as_str()), + region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()), encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, @@ -225,12 +225,12 @@ async fn cloudwatch_insert_out_of_range_timestamp() { let events = response.events.unwrap(); - let output_lines = events + let mut output_lines = events .into_iter() .map(|e| e.message.unwrap()) .collect::>(); - assert_eq!(output_lines, lines); + assert_eq!(output_lines.sort(), lines.sort()); } #[tokio::test] @@ -243,7 +243,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(group_name.as_str()).unwrap(), - region: RegionOrEndpoint::with_both("us-east-1", watchlogs_address().as_str()), + region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()), encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, @@ -261,7 +261,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() { let timestamp = chrono::Utc::now(); - let (input_lines, events) = random_lines_with_stream(100, 11, None); + let (mut input_lines, events) = random_lines_with_stream(100, 11, None); run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await; let response = create_client_test() @@ -276,12 +276,12 @@ async fn cloudwatch_dynamic_group_and_stream_creation() { let events = response.events.unwrap(); - let output_lines = events + let mut output_lines = events .into_iter() .map(|e| e.message.unwrap()) .collect::>(); - assert_eq!(output_lines, input_lines); + assert_eq!(output_lines.sort(), input_lines.sort()); } #[tokio::test] @@ -299,7 +299,7 @@ async fn cloudwatch_insert_log_event_batched() { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(group_name.as_str()).unwrap(), - region: RegionOrEndpoint::with_both("us-east-1", watchlogs_address().as_str()), + region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()), encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, @@ -317,7 +317,7 @@ async fn cloudwatch_insert_log_event_batched() { let timestamp = chrono::Utc::now(); - let (input_lines, events) = random_lines_with_stream(100, 11, None); + let (mut input_lines, events) = random_lines_with_stream(100, 11, None); run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await; let response = create_client_test() @@ -332,12 +332,12 @@ async fn cloudwatch_insert_log_event_batched() { let events = response.events.unwrap(); - let output_lines = events + let mut output_lines = events .into_iter() .map(|e| e.message.unwrap()) .collect::>(); - assert_eq!(output_lines, input_lines); + assert_eq!(output_lines.sort(), input_lines.sort()); } #[tokio::test] @@ -350,7 +350,7 @@ async fn cloudwatch_insert_log_event_partitioned() { let config = CloudwatchLogsSinkConfig { group_name: Template::try_from(GROUP_NAME).unwrap(), stream_name: Template::try_from(format!("{}-{{{{key}}}}", stream_name)).unwrap(), - region: RegionOrEndpoint::with_both("us-east-1", watchlogs_address().as_str()), + region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()), encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, @@ -393,11 +393,11 @@ async fn cloudwatch_insert_log_event_partitioned() { .await .unwrap(); let events = response.events.unwrap(); - let output_lines = events + let mut output_lines = events .into_iter() .map(|e| e.message.unwrap()) .collect::>(); - let expected_output = input_lines + let mut expected_output = input_lines .clone() .into_iter() .enumerate() @@ -405,7 +405,7 @@ async fn cloudwatch_insert_log_event_partitioned() { .map(|(_, e)| e) .collect::>(); - assert_eq!(output_lines, expected_output); + assert_eq!(output_lines.sort(), expected_output.sort()); let response = create_client_test() .await @@ -418,11 +418,11 @@ async fn cloudwatch_insert_log_event_partitioned() { .unwrap(); let events = response.events.unwrap(); - let output_lines = events + let mut output_lines = events .into_iter() .map(|e| e.message.unwrap()) .collect::>(); - let expected_output = input_lines + let mut expected_output = input_lines .clone() .into_iter() .enumerate() @@ -430,7 +430,7 @@ async fn cloudwatch_insert_log_event_partitioned() { .map(|(_, e)| e) .collect::>(); - assert_eq!(output_lines, expected_output); + assert_eq!(output_lines.sort(), expected_output.sort()); } #[tokio::test] @@ -443,7 +443,7 @@ async fn cloudwatch_healthcheck() { let config = CloudwatchLogsSinkConfig { stream_name: Template::try_from("test-stream").unwrap(), group_name: Template::try_from(GROUP_NAME).unwrap(), - region: RegionOrEndpoint::with_both("us-east-1", watchlogs_address().as_str()), + region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()), encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, @@ -464,7 +464,7 @@ async fn cloudwatch_healthcheck() { async fn create_client_test() -> CloudwatchLogsClient { let auth = AwsAuthentication::test_auth(); let region = Some(Region::new("us-east-1")); - let endpoint = Some(watchlogs_address()); + let endpoint = Some(cloudwatch_address()); let proxy = ProxyConfig::default(); create_client::(&auth, region, endpoint, &proxy, &None, &None)