Skip to content

Commit

Permalink
fix(providers/amazon): handle ClientError raised after key is missing…
Browse files Browse the repository at this point in the history
… during table.get_item (apache#42408)
  • Loading branch information
Lee-W authored and ellisms committed Nov 13, 2024
1 parent 60f5009 commit d534cc0
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 8 deletions.
30 changes: 22 additions & 8 deletions airflow/providers/amazon/aws/sensors/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from typing import TYPE_CHECKING, Any, Iterable, Sequence

from botocore.exceptions import ClientError

from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
Expand Down Expand Up @@ -102,14 +104,26 @@ def poke(self, context: Context) -> bool:
table = self.hook.conn.Table(self.table_name)
self.log.info("Table: %s", table)
self.log.info("Key: %s", key)
response = table.get_item(Key=key)

try:
item_attribute_value = response["Item"][self.attribute_name]
self.log.info("Response: %s", response)
self.log.info("Want: %s = %s", self.attribute_name, self.attribute_value)
self.log.info("Got: {response['Item'][self.attribute_name]} = %s", item_attribute_value)
return item_attribute_value in (
[self.attribute_value] if isinstance(self.attribute_value, str) else self.attribute_value
response = table.get_item(Key=key)
except ClientError as err:
self.log.error(
"Couldn't get %s from table %s.\nError Code: %s\nError Message: %s",
key,
self.table_name,
err.response["Error"]["Code"],
err.response["Error"]["Message"],
)
except KeyError:
return False
else:
try:
item_attribute_value = response["Item"][self.attribute_name]
self.log.info("Response: %s", response)
self.log.info("Want: %s = %s", self.attribute_name, self.attribute_value)
self.log.info("Got: {response['Item'][self.attribute_name]} = %s", item_attribute_value)
return item_attribute_value in (
[self.attribute_value] if isinstance(self.attribute_value, str) else self.attribute_value
)
except KeyError:
return False
17 changes: 17 additions & 0 deletions tests/providers/amazon/aws/sensors/test_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,23 @@ def test_sensor_with_pk_and_sk(self):

assert self.sensor_pk_sk.poke(None)

@mock_aws
def test_sensor_with_client_error(self):
hook = DynamoDBHook(table_name=self.table_name, table_keys=[self.pk_name])

hook.conn.create_table(
TableName=self.table_name,
KeySchema=[{"AttributeName": self.pk_name, "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": self.pk_name, "AttributeType": "S"}],
ProvisionedThroughput={"ReadCapacityUnits": 10, "WriteCapacityUnits": 10},
)

items = [{self.pk_name: self.pk_value, self.attribute_name: self.attribute_value}]
hook.write_batch_data(items)

self.sensor_pk.partition_key_name = "no such key"
assert self.sensor_pk.poke(None) is False


class TestDynamoDBMultipleValuesSensor:
def setup_method(self):
Expand Down

0 comments on commit d534cc0

Please sign in to comment.