Skip to content

Commit

Permalink
Stream lambda payloads, if that API is available.
Browse files Browse the repository at this point in the history
  • Loading branch information
rebeccacremona committed Aug 1, 2024
1 parent 40f8e72 commit 4057c27
Showing 1 changed file with 75 additions and 32 deletions.
107 changes: 75 additions & 32 deletions web/main/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,73 +665,116 @@ def export_via_aws_lambda(obj, html, file_type):
"docx_sections": True,
},
}

if export_settings.get("function_arn"):

#
# Communicate with AWS via API, using the boto3 library
#

lambda_client = boto3.client(
"lambda",
export_settings["function_region"],
aws_access_key_id=export_settings["access_key"],
aws_secret_access_key=export_settings["secret_key"],
config=Config(read_timeout=settings.AWS_LAMBDA_EXPORT_TIMEOUT),
)
raw_response = lambda_client.invoke(
response = lambda_client.invoke_with_response_stream(
FunctionName=export_settings["function_name"],
LogType="Tail",
Payload=bytes(json.dumps(lambda_event_config), "utf-8"),
)
response = {
"status_code": raw_response["ResponseMetadata"]["HTTPStatusCode"],
"headers": raw_response["ResponseMetadata"]["HTTPHeaders"],
"content": raw_response["Payload"],
"get_text": lambda: raw_response["Payload"].read(),
}
lambda_log_str = (
str(base64.b64decode(raw_response["LogResult"]), "utf-8")
.strip()
.replace("\n", "; ")
.replace("\t", ", ")
)
logger.info(f'{log_line_prefix}: Lambda logs "{lambda_log_str}"')

payload = b""
error_code = None
error_details = None
log_result = None
log_str = ""
status_code = response["StatusCode"]

# Consume the event stream to retrieve the payload and any logs or error messages
for event in response["EventStream"]:
if event.get("PayloadChunk"):
payload += event["PayloadChunk"]["Payload"]
elif event.get("InvokeComplete"):
error_code = event["InvokeComplete"].get("ErrorCode")
error_details = event["InvokeComplete"].get("ErrorDetails")
log_result = event["InvokeComplete"].get("LogResult")
else:
logger.error(f"Unexpected event from AWs Lambda: {event}.")

# Format the logs, if present
if log_result:
log_str = (
str(base64.b64decode(log_result), "utf-8")
.strip()
.replace("\n", "; ")
.replace("\t", ", ")
)
if log_str:
logger.info(f'{log_line_prefix}: Lambda logs "{log_str}"')
else:
logger.info(f"{log_line_prefix}: No Lambda logs")

# Check for apparent success
assert (
status_code == 200
), f"Status: {status_code}. Details: {error_code or 'no code'}; {error_details or 'no details'}; {log_str or 'no logs'}."
assert (
not error_code and not error_details
), f"Error: {error_code or 'no code'}; {error_details or 'no details'}; {log_str or 'no logs'}."
assert payload, f"No payload: {log_str or 'no logs'}."

# Looks like we have something to return!
content = payload

else:

#
# Communicate with AWS (or an emulator) using a lambda function URL
#

raw_response = requests.post(
export_settings["function_url"],
timeout=settings.AWS_LAMBDA_EXPORT_TIMEOUT,
json=lambda_event_config,
)

# format the response
response = {
"status_code": raw_response.status_code,
"headers": {k.lower(): v for k, v in raw_response.headers.items()},
"log": None,
"content": raw_response.content,
"get_text": lambda: raw_response.text,
}
assert (
response["status_code"] == 200
), f"Status: {response['status_code']}. Content: {response['get_text']()}"
if response["headers"].get("content-type", "") == "text/plain; charset=utf-8":
parsed_content = json.loads(response.get("content"))
error_type = parsed_content.get("errorType", "Unknown")
if error_type == "Function.ResponseSizeTooLarge":
raise LambdaExportTooLarge(
f"An HTML export of {len(html)} chars resulted in a {parsed_content.get('errorMessage')}"
)
assert not response["headers"].get("x-amz-function-error") and response["headers"].get(
"content-type", ""
) in [
"application/zip",
"application/octet-stream",
], f"x-amz-function-error: {response['headers'].get('x-amz-function-error')}, content-type:{response['headers'].get('content-type','unknown')}, {response['get_text']()}"

# check for apparent success
assert (
response["status_code"] == 200
), f"Status: {response['status_code']}. Content: {response['get_text']()}"
assert not response["headers"].get("x-amz-function-error") and response[
"headers"
].get("content-type", "") in [
"application/zip",
"application/octet-stream",
], f"x-amz-function-error: {response['headers'].get('x-amz-function-error')}, content-type:{response['headers'].get('content-type','unknown')}, {response['get_text']()}"

# Looks like we have something to return!
content = response["content"]

except (BotoCoreError, BotoClientError, requests.RequestException, AssertionError) as e:
if export_type == "Casebook":
obj.inc_export_fails()
raise Exception(f"AWS Lambda export failed: {str(e)}")

finally:
# remove the source html from s3
storage.delete(filename)

# return the docx to the user
if export_type == "Casebook" and obj.export_fails > 0:
obj.reset_export_fails()
return response["content"]
return content


class BadFiletypeError(Exception):
Expand Down

0 comments on commit 4057c27

Please sign in to comment.