Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python - CloudWatch Logs - Large query implementation #6141

Merged
merged 4 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .doc_gen/metadata/cloudwatch-logs_metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,14 @@ cloudwatch-logs_GetQueryResults:
- description:
snippet_tags:
- javascript.v3.cloudwatch-logs.actions.GetQueryResults
Python:
versions:
- sdk_version: 3
github: python/example_code/cloudwatch-logs
excerpts:
- description:
snippet_tags:
- python.example_code.cloudwatch_logs.get_query_results
services:
cloudwatch-logs: {GetQueryResults}
cloudwatch-logs_StartQuery:
Expand All @@ -331,6 +339,14 @@ cloudwatch-logs_StartQuery:
- description:
snippet_tags:
- javascript.v3.cloudwatch-logs.actions.StartQuery
Python:
versions:
- sdk_version: 3
github: python/example_code/cloudwatch-logs
excerpts:
- description:
snippet_tags:
- python.example_code.cloudwatch_logs.start_query
services:
cloudwatch-logs: {StartQuery}
cloudwatch-logs_Scenario_BigQuery:
Expand All @@ -350,5 +366,16 @@ cloudwatch-logs_Scenario_BigQuery:
- description: This is a class that splits queries into multiple steps if necessary.
- snippet_files:
- javascriptv3/example_code/cloudwatch-logs/scenarios/large-query/cloud-watch-query.js
Python:
versions:
- sdk_version: 3
github: python/example_code/cloudwatch-logs/scenarios/large-query
excerpts:
- description: This file invokes an example module for managing CloudWatch queries exceeding 10,000 results.
snippet_files:
- python/example_code/cloudwatch-logs/scenarios/large-query/exec.py
- description: This module processes CloudWatch queries exceeding 10,000 results.
- snippet_files:
- python/example_code/cloudwatch-logs/scenarios/large-query/cloudwatch_query.py
services:
cloudwatch-logs: {StartQuery, GetQueryResults}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# CloudWatch Logs large query

## Overview

This example shows how to use AWS SDKs to perform a query on Amazon CloudWatch Logs and get more than the maximum number of 10,000 logs back.

The CloudWatch Logs API is capped at 10,000 records for requests that [read](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_GetLogEvents.html) or [write](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html). GetLogEvents returns tokens for pagination, but [GetQueryResults](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_GetQueryResults.html) does not. This example breaks down one query into multiple queries if more than the maximum number of records are returned from the query.

The following components are used in this example:

- [Amazon CloudWatch Logs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/WhatIsCloudWatchLogs.html) hosts the logs that are queried using the [Amazon CloudWatch Logs API](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/Welcome.html).

## ⚠ Important

- Running this code might result in charges to your AWS account.
- Running the tests might result in charges to your AWS account.
- We recommend that you grant your code least privilege. At most, grant only the minimum permissions required to perform the task. For more information, see [Grant least privilege](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#grant-least-privilege).
- This code is not tested in every AWS Region. For more information, see [AWS Regional Services](https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services).

## Scenario

### Prerequisites

For general prerequisites, see the [README](../../../../README.md) in the `python` folder.

To run this example, you need a CloudWatch log group that contains over 10,000 logs. You can [create one yourself](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html), or you can follow the steps in the [Infrastructure and data](#infrastructure-and-data) section. These steps require you to [install or update the latest version of the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)

### Infrastructure and data

Use the following steps to create the necessary resources in AWS CloudFormation and use the AWS CLI to upload the necessary logs.

1. In your local terminal, change directories to [resources](../../../../../workflows/cloudwatch_logs_large_query/resources/).
1. Run `aws cloudformation deploy --template-file stack.yaml --stack-name CloudWatchLargeQuery`
1. Run `./make-log-files.sh`. This will output two timestamps for use in the following step.
1. Run `export QUERY_START_DATE=<QUERY_START_DATE>`. Replace `<QUERY_START_DATE>` with the output from the previous step. Repeat this for `QUERY_END_DATE`.
1. Run `./put-log-events.sh`.
1. Wait five minutes for logs to settle and to make sure you're not querying for logs that exist in the future.

### Run the scenario

1. `python exec.py`

## Additional reading

- [CloudWatch Logs Insights query syntax](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html)
- [CloudWatch Logs billing and cost](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/LogsBillingDetails.html)

---

Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: Apache-2.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
import time
from datetime import datetime
import threading
import boto3

from date_utilities import DateUtilities


class DateOutOfBoundsError(Exception):
"""Exception raised when the date range for a query is out of bounds."""

pass


class CloudWatchQuery:
"""
A class to query AWS CloudWatch logs within a specified date range.

:ivar date_range: Start and end datetime for the query.
:vartype date_range: tuple
:ivar limit: Maximum number of log entries to return.
:vartype limit: int
"""

def __init__(self, date_range):
self.lock = threading.Lock()
self.log_groups = "/workflows/cloudwatch-logs/large-query"
self.query_results = []
self.date_range = date_range
self.query_duration = None
self.datetime_format = "%Y-%m-%d %H:%M:%S.%f"
self.date_utilities = DateUtilities()
self.limit = 10000

def query_logs(self, date_range):
"""
Executes a CloudWatch logs query for a specified date range and calculates the execution time of the query.

:return: A batch of logs retrieved from the CloudWatch logs query.
:rtype: list
"""
start_time = datetime.now()

start_date, end_date = self.date_utilities.normalize_date_range_format(
date_range, from_format="unix_timestamp", to_format="datetime"
)

logging.info(
f"Original query:"
f"\n START: {start_date}"
f"\n END: {end_date}"
)
self.recursive_query((start_date, end_date))
end_time = datetime.now()
self.query_duration = (end_time - start_time).total_seconds()

def recursive_query(self, date_range):
"""
Processes logs within a given date range, fetching batches of logs recursively if necessary.

:param date_range: The date range to fetch logs for, specified as a tuple (start_timestamp, end_timestamp).
:type date_range: tuple
:return: None if the recursive fetching is continued or stops when the final batch of logs is processed.
Although it doesn't explicitly return the query results, this method accumulates all fetched logs
in the `self.query_results` attribute.
:rtype: None
"""
batch_of_logs = self.perform_query(date_range)
# Add the batch to the accumulated logs
with self.lock:
self.query_results.extend(batch_of_logs)
if len(batch_of_logs) == self.limit:
logging.info(f"Fetched {self.limit}, checking for more...")
most_recent_log = self.find_most_recent_log(batch_of_logs)
most_recent_log_timestamp = next(
item["value"]
for item in most_recent_log
if item["field"] == "@timestamp"
)
new_range = (most_recent_log_timestamp, date_range[1])
midpoint = self.date_utilities.find_middle_time(new_range)

first_half_thread = threading.Thread(
target=self.recursive_query,
args=((most_recent_log_timestamp, midpoint),),
)
second_half_thread = threading.Thread(
target=self.recursive_query, args=((midpoint, date_range[1]),)
)

first_half_thread.start()
second_half_thread.start()

first_half_thread.join()
second_half_thread.join()

def find_most_recent_log(self, logs):
"""
Search a list of log items and return most recent log entry.
:param logs: A list of logs to analyze.
:return: log
:type :return List containing log item details
"""
most_recent_log = None
most_recent_date = "1970-01-01 00:00:00.000"

for log in logs:
for item in log:
if item["field"] == "@timestamp":
logging.debug(f"Compared: {item['value']} to {most_recent_date}")
if (
self.date_utilities.compare_dates(
item["value"], most_recent_date
)
== item["value"]
):
logging.debug(f"New most recent: {item['value']}")
most_recent_date = item["value"]
most_recent_log = log
logging.info(f"Most recent log date of batch: {most_recent_date}")
return most_recent_log

# snippet-start:[python.example_code.cloudwatch_logs.start_query]
def perform_query(self, date_range):
"""
Performs the actual CloudWatch log query.

:param date_range: A tuple representing the start and end datetime for the query.
:type date_range: tuple
:return: A list containing the query results.
:rtype: list
"""
client = boto3.client("logs")
try:
try:
start_time = round(
self.date_utilities.convert_iso8601_to_unix_timestamp(date_range[0])
)
end_time = round(
self.date_utilities.convert_iso8601_to_unix_timestamp(date_range[1])
)
response = client.start_query(
logGroupName=self.log_groups,
startTime=start_time,
endTime=end_time,
queryString="fields @timestamp, @message | sort @timestamp asc",
limit=self.limit,
)
query_id = response["queryId"]
except client.exceptions.ResourceNotFoundException as e:
raise DateOutOfBoundsError(f"Resource not found: {e}")
while True:
time.sleep(1)
results = client.get_query_results(queryId=query_id)
if results["status"] in [
"Complete",
"Failed",
"Cancelled",
"Timeout",
"Unknown",
]:
return results.get("results", [])
except DateOutOfBoundsError:
return []

def _initiate_query(self, client, date_range, max_logs):
"""
Initiates the CloudWatch logs query.

:param date_range: A tuple representing the start and end datetime for the query.
:type date_range: tuple
:param max_logs: The maximum number of logs to retrieve.
:type max_logs: int
:return: The query ID as a string.
:rtype: str
"""
try:
start_time = round(
self.date_utilities.convert_iso8601_to_unix_timestamp(date_range[0])
)
end_time = round(
self.date_utilities.convert_iso8601_to_unix_timestamp(date_range[1])
)
response = client.start_query(
logGroupName=self.log_groups,
startTime=start_time,
endTime=end_time,
queryString="fields @timestamp, @message | sort @timestamp asc",
limit=max_logs,
)
return response["queryId"]
except client.exceptions.ResourceNotFoundException as e:
raise DateOutOfBoundsError(f"Resource not found: {e}")

# snippet-end:[python.example_code.cloudwatch_logs.start_query]

# snippet-start:[python.example_code.cloudwatch_logs.get_query_results]
def _wait_for_query_results(self, client, query_id):
"""
Waits for the query to complete and retrieves the results.

:param query_id: The ID of the initiated query.
:type query_id: str
:return: A list containing the results of the query.
:rtype: list
"""
while True:
time.sleep(1)
results = client.get_query_results(queryId=query_id)
if results["status"] in [
"Complete",
"Failed",
"Cancelled",
"Timeout",
"Unknown",
]:
return results.get("results", [])

# snippet-end:[python.example_code.cloudwatch_logs.get_query_results]
Loading
Loading