Skip to content

Commit

Permalink
helpers, s3_gate: Fix acl tests
Browse files Browse the repository at this point in the history
The neofs-s3-gw repository has made changes to how acl works.
This commit changes the tests so that they work correctly with the new acl rules.

Signed-off-by: Oleg Kulachenko <[email protected]>
  • Loading branch information
vvarg229 committed Nov 1, 2023
1 parent f848939 commit 1b54d3e
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 49 deletions.
97 changes: 72 additions & 25 deletions pytest_tests/helpers/s3_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,37 @@
logger = logging.getLogger("NeoLogger")


ACL_TO_PERMISSION_MAP_BUCKET = {
"Group": {
"public-read-write": "FULL_CONTROL",
"grant-full-control": "FULL_CONTROL",
"public-read": "READ",
"grant-read": "READ",
"authenticated-read": "READ",
"grant-write": "WRITE",
},
"CanonicalUser": {
"public-read-write": "FULL_CONTROL",
"grant-full-control": "FULL_CONTROL",
"public-read": "FULL_CONTROL",
"grant-read": "FULL_CONTROL",
"authenticated-read": "FULL_CONTROL",
"grant-write": "FULL_CONTROL",
"private": "FULL_CONTROL",
},
}

ACL_TO_PERMISSION_MAP_OBJECT = {
"Group": {"public-read-write": "WRITE", "public-read": "READ", "grant-read": "READ"},
"CanonicalUser": {
"public-read-write": "FULL_CONTROL",
"public-read": "WRITE",
"grant-read": "WRITE",
"private": "WRITE",
},
}


@allure.step("Expected all objects are presented in the bucket")
def check_objects_in_bucket(
s3_client, bucket, expected_objects: list, unexpected_objects: Optional[list] = None
Expand Down Expand Up @@ -132,29 +163,45 @@ def assert_object_lock_mode(
).days == retain_period, f"Expected retention period is {retain_period} days"


def assert_s3_acl(acl_grants: list, permitted_users: str):
def check_permission(grantee_type: str, acl: str, actual_permission: str, acl_map: dict) -> str:
expected_permission = acl_map.get(grantee_type, {}).get(acl, "")
assert (
actual_permission == expected_permission
), f"{grantee_type} should have {expected_permission} but got {actual_permission}"
return expected_permission


def assert_s3_acl(acl_grants: list, permitted_users: str, acl: str, acl_map: dict):
grantees = {"AllUsers": 0, "CanonicalUser": 0}

for acl_grant in acl_grants:
grantee_type = acl_grant.get("Grantee", {}).get("Type")

if grantee_type == "Group" and permitted_users == "AllUsers":
uri = acl_grant.get("Grantee", {}).get("URI")
permission = acl_grant.get("Permission")
expected_permission = check_permission(grantee_type, acl, permission, acl_map)
assert (
uri == "http://acs.amazonaws.com/groups/global/AllUsers"
), f"All Groups should have {expected_permission} but got {permission}"
grantees["AllUsers"] += 1

elif grantee_type == "CanonicalUser":
permission = acl_grant.get("Permission")
check_permission(grantee_type, acl, permission, acl_map)
grantees["CanonicalUser"] += 1

if permitted_users == "AllUsers":
grantees = {"AllUsers": 0, "CanonicalUser": 0}
for acl_grant in acl_grants:
if acl_grant.get("Grantee", {}).get("Type") == "Group":
uri = acl_grant.get("Grantee", {}).get("URI")
permission = acl_grant.get("Permission")
assert (uri, permission) == (
"http://acs.amazonaws.com/groups/global/AllUsers",
"FULL_CONTROL",
), "All Groups should have FULL_CONTROL"
grantees["AllUsers"] += 1
if acl_grant.get("Grantee", {}).get("Type") == "CanonicalUser":
permission = acl_grant.get("Permission")
assert permission == "FULL_CONTROL", "Canonical User should have FULL_CONTROL"
grantees["CanonicalUser"] += 1
assert grantees["AllUsers"] >= 1, "All Users should have FULL_CONTROL"
assert grantees["CanonicalUser"] >= 1, "Canonical User should have FULL_CONTROL"

if permitted_users == "CanonicalUser":
for acl_grant in acl_grants:
if acl_grant.get("Grantee", {}).get("Type") == "CanonicalUser":
permission = acl_grant.get("Permission")
assert permission == "FULL_CONTROL", "Only CanonicalUser should have FULL_CONTROL"
else:
raise AssertionError("FULL_CONTROL is given to All Users")
for key in grantees:
assert grantees[key] >= 1, f"{key} should have permission but got none"

elif permitted_users == "CanonicalUser" and grantees["AllUsers"] > 0:
logger.error(f"Permission is given to All Users")


def assert_bucket_s3_acl(acl_grants: list, permitted_users: str, acl: str):
assert_s3_acl(acl_grants, permitted_users, acl, ACL_TO_PERMISSION_MAP_BUCKET)


def assert_object_s3_acl(acl_grants: list, permitted_users: str, acl: str):
assert_s3_acl(acl_grants, permitted_users, acl, ACL_TO_PERMISSION_MAP_OBJECT)
20 changes: 13 additions & 7 deletions pytest_tests/testsuites/services/s3_gate/test_s3_ACL.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import allure
import pytest
from file_helper import generate_file
from s3_helper import assert_s3_acl, object_key_from_file_path
from s3_helper import assert_object_s3_acl, object_key_from_file_path, assert_bucket_s3_acl

from steps import s3_gate_bucket, s3_gate_object
from steps.s3_gate_base import TestS3GateBase
Expand Down Expand Up @@ -29,12 +29,12 @@ def test_s3_object_ACL(self, bucket, simple_object_size):
with allure.step("Put object ACL = public-read"):
s3_gate_object.put_object_acl_s3(self.s3_client, bucket, file_name, "public-read")
obj_acl = s3_gate_object.get_object_acl_s3(self.s3_client, bucket, file_name)
assert_s3_acl(acl_grants=obj_acl, permitted_users="AllUsers")
assert_object_s3_acl(acl_grants=obj_acl, permitted_users="AllUsers", acl="public-read")

with allure.step("Put object ACL = private"):
s3_gate_object.put_object_acl_s3(self.s3_client, bucket, file_name, "private")
obj_acl = s3_gate_object.get_object_acl_s3(self.s3_client, bucket, file_name)
assert_s3_acl(acl_grants=obj_acl, permitted_users="CanonicalUser")
assert_object_s3_acl(acl_grants=obj_acl, permitted_users="CanonicalUser", acl="private")

with allure.step(
"Put object with grant-read uri=http://acs.amazonaws.com/groups/global/AllUsers"
Expand All @@ -46,19 +46,23 @@ def test_s3_object_ACL(self, bucket, simple_object_size):
grant_read="uri=http://acs.amazonaws.com/groups/global/AllUsers",
)
obj_acl = s3_gate_object.get_object_acl_s3(self.s3_client, bucket, file_name)
assert_s3_acl(acl_grants=obj_acl, permitted_users="AllUsers")
assert_object_s3_acl(acl_grants=obj_acl, permitted_users="AllUsers", acl="grant-read")

@allure.title("Test S3: Bucket ACL")
def test_s3_bucket_ACL(self):
with allure.step("Create bucket with ACL = public-read-write"):
bucket = s3_gate_bucket.create_bucket_s3(self.s3_client, True, acl="public-read-write")
bucket_acl = s3_gate_bucket.get_bucket_acl(self.s3_client, bucket)
assert_s3_acl(acl_grants=bucket_acl, permitted_users="AllUsers")
assert_bucket_s3_acl(
acl_grants=bucket_acl, permitted_users="AllUsers", acl="public-read-write"
)

with allure.step("Change bucket ACL to private"):
s3_gate_bucket.put_bucket_acl_s3(self.s3_client, bucket, acl="private")
bucket_acl = s3_gate_bucket.get_bucket_acl(self.s3_client, bucket)
assert_s3_acl(acl_grants=bucket_acl, permitted_users="CanonicalUser")
assert_bucket_s3_acl(
acl_grants=bucket_acl, permitted_users="CanonicalUser", acl="private"
)

with allure.step(
"Change bucket acl to --grant-write uri=http://acs.amazonaws.com/groups/global/AllUsers"
Expand All @@ -69,4 +73,6 @@ def test_s3_bucket_ACL(self):
grant_write="uri=http://acs.amazonaws.com/groups/global/AllUsers",
)
bucket_acl = s3_gate_bucket.get_bucket_acl(self.s3_client, bucket)
assert_s3_acl(acl_grants=bucket_acl, permitted_users="AllUsers")
assert_bucket_s3_acl(
acl_grants=bucket_acl, permitted_users="AllUsers", acl="grant-write"
)
33 changes: 25 additions & 8 deletions pytest_tests/testsuites/services/s3_gate/test_s3_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from file_helper import generate_file
from s3_helper import (
assert_object_lock_mode,
assert_s3_acl,
assert_object_s3_acl,
check_objects_in_bucket,
object_key_from_file_path,
assert_bucket_s3_acl,
)

from steps import s3_gate_bucket, s3_gate_object
Expand All @@ -22,34 +23,44 @@ def pytest_generate_tests(metafunc):
@pytest.mark.s3_gate
@pytest.mark.s3_gate_bucket
class TestS3GateBucket(TestS3GateBase):
@pytest.mark.acl
@pytest.mark.sanity
@allure.title("Test S3: Create Bucket with different ACL")
def test_s3_create_bucket_with_ACL(self):

with allure.step("Create bucket with ACL private"):
bucket = s3_gate_bucket.create_bucket_s3(self.s3_client, True, acl="private")
bucket_acl = s3_gate_bucket.get_bucket_acl(self.s3_client, bucket)
assert_s3_acl(acl_grants=bucket_acl, permitted_users="CanonicalUser")
assert_bucket_s3_acl(
acl_grants=bucket_acl, permitted_users="CanonicalUser", acl="private"
)

with allure.step("Create bucket with ACL = public-read"):
bucket_1 = s3_gate_bucket.create_bucket_s3(self.s3_client, True, acl="public-read")
bucket_acl_1 = s3_gate_bucket.get_bucket_acl(self.s3_client, bucket_1)
assert_s3_acl(acl_grants=bucket_acl_1, permitted_users="AllUsers")
assert_bucket_s3_acl(
acl_grants=bucket_acl_1, permitted_users="AllUsers", acl="public-read"
)

with allure.step("Create bucket with ACL public-read-write"):
bucket_2 = s3_gate_bucket.create_bucket_s3(
self.s3_client, True, acl="public-read-write"
)
bucket_acl_2 = s3_gate_bucket.get_bucket_acl(self.s3_client, bucket_2)
assert_s3_acl(acl_grants=bucket_acl_2, permitted_users="AllUsers")
assert_bucket_s3_acl(
acl_grants=bucket_acl_2, permitted_users="AllUsers", acl="public-read-write"
)

with allure.step("Create bucket with ACL = authenticated-read"):
bucket_3 = s3_gate_bucket.create_bucket_s3(
self.s3_client, True, acl="authenticated-read"
)
bucket_acl_3 = s3_gate_bucket.get_bucket_acl(self.s3_client, bucket_3)
assert_s3_acl(acl_grants=bucket_acl_3, permitted_users="AllUsers")
assert_bucket_s3_acl(
acl_grants=bucket_acl_3, permitted_users="AllUsers", acl="authenticated-read"
)

@pytest.mark.acl
@allure.title("Test S3: Create Bucket with different ACL by grand")
def test_s3_create_bucket_with_grands(self):

Expand All @@ -60,7 +71,9 @@ def test_s3_create_bucket_with_grands(self):
grant_read="uri=http://acs.amazonaws.com/groups/global/AllUsers",
)
bucket_acl = s3_gate_bucket.get_bucket_acl(self.s3_client, bucket)
assert_s3_acl(acl_grants=bucket_acl, permitted_users="AllUsers")
assert_bucket_s3_acl(
acl_grants=bucket_acl, permitted_users="AllUsers", acl="grant-read"
)

with allure.step("Create bucket with --grant-wtite"):
bucket_1 = s3_gate_bucket.create_bucket_s3(
Expand All @@ -69,7 +82,9 @@ def test_s3_create_bucket_with_grands(self):
grant_write="uri=http://acs.amazonaws.com/groups/global/AllUsers",
)
bucket_acl_1 = s3_gate_bucket.get_bucket_acl(self.s3_client, bucket_1)
assert_s3_acl(acl_grants=bucket_acl_1, permitted_users="AllUsers")
assert_bucket_s3_acl(
acl_grants=bucket_acl_1, permitted_users="AllUsers", acl="grant-write"
)

with allure.step("Create bucket with --grant-full-control"):
bucket_2 = s3_gate_bucket.create_bucket_s3(
Expand All @@ -78,7 +93,9 @@ def test_s3_create_bucket_with_grands(self):
grant_full_control="uri=http://acs.amazonaws.com/groups/global/AllUsers",
)
bucket_acl_2 = s3_gate_bucket.get_bucket_acl(self.s3_client, bucket_2)
assert_s3_acl(acl_grants=bucket_acl_2, permitted_users="AllUsers")
assert_bucket_s3_acl(
acl_grants=bucket_acl_2, permitted_users="AllUsers", acl="grant-full-control"
)

@allure.title("Test S3: create bucket with object lock")
def test_s3_bucket_object_lock(self, simple_object_size):
Expand Down
25 changes: 16 additions & 9 deletions pytest_tests/testsuites/services/s3_gate/test_s3_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from python_keywords.payment_neogo import deposit_gas, transfer_gas
from s3_helper import (
assert_object_lock_mode,
assert_s3_acl,
assert_object_s3_acl,
check_objects_in_bucket,
set_bucket_versioning,
)
Expand Down Expand Up @@ -119,6 +119,7 @@ def test_s3_copy_version_object(self, two_buckets, simple_object_size):
with pytest.raises(Exception):
s3_gate_object.copy_object_s3(self.s3_client, bucket_1, obj_key)

@pytest.mark.acl
@allure.title("Test S3: Checking copy with acl")
def test_s3_copy_acl(self, bucket, simple_object_size):
version_1_content = "Version 1"
Expand All @@ -136,7 +137,7 @@ def test_s3_copy_acl(self, bucket, simple_object_size):
self.s3_client, bucket, obj_key, ACL="private"
)
obj_acl = s3_gate_object.get_object_acl_s3(self.s3_client, bucket, copy_obj_path)
assert_s3_acl(acl_grants=obj_acl, permitted_users="CanonicalUser")
assert_object_s3_acl(acl_grants=obj_acl, permitted_users="CanonicalUser", acl="private")

@allure.title("Test S3: Copy object with metadata")
def test_s3_copy_metadate(self, bucket, simple_object_size):
Expand Down Expand Up @@ -706,15 +707,15 @@ def test_s3_put_object_acl(
with allure.step("Put object with acl private"):
s3_gate_object.put_object_s3(self.s3_client, bucket, file_path_1, ACL="private")
obj_acl = s3_gate_object.get_object_acl_s3(self.s3_client, bucket, file_name)
assert_s3_acl(acl_grants=obj_acl, permitted_users="CanonicalUser")
assert_object_s3_acl(acl_grants=obj_acl, permitted_users="CanonicalUser", acl="private")
object_1 = s3_gate_object.get_object_s3(self.s3_client, bucket, file_name)
assert get_file_hash(file_path_1) == get_file_hash(object_1), "Hashes must be the same"

with allure.step("Put object with acl public-read"):
file_path_2 = generate_file_with_content(simple_object_size, file_path=file_path_1)
s3_gate_object.put_object_s3(self.s3_client, bucket, file_path_2, ACL="public-read")
s3_gate_object.put_object_s3(self.s3_client, bucket, file_path_2, ACL="private")
obj_acl = s3_gate_object.get_object_acl_s3(self.s3_client, bucket, file_name)
assert_s3_acl(acl_grants=obj_acl, permitted_users="AllUsers")
assert_object_s3_acl(acl_grants=obj_acl, permitted_users="AllUsers", acl="private")
object_2 = s3_gate_object.get_object_s3(self.s3_client, bucket, file_name)
assert get_file_hash(file_path_2) == get_file_hash(object_2), "Hashes must be the same"

Expand All @@ -724,7 +725,9 @@ def test_s3_put_object_acl(
self.s3_client, bucket, file_path_3, ACL="public-read-write"
)
obj_acl = s3_gate_object.get_object_acl_s3(self.s3_client, bucket, file_name)
assert_s3_acl(acl_grants=obj_acl, permitted_users="AllUsers")
assert_object_s3_acl(
acl_grants=obj_acl, permitted_users="AllUsers", acl="public-read-write"
)
object_3 = s3_gate_object.get_object_s3(self.s3_client, bucket, file_name)
assert get_file_hash(file_path_3) == get_file_hash(object_3), "Hashes must be the same"

Expand All @@ -734,7 +737,9 @@ def test_s3_put_object_acl(
self.s3_client, bucket, file_path_4, ACL="authenticated-read"
)
obj_acl = s3_gate_object.get_object_acl_s3(self.s3_client, bucket, file_name)
assert_s3_acl(acl_grants=obj_acl, permitted_users="AllUsers")
assert_object_s3_acl(
acl_grants=obj_acl, permitted_users="AllUsers", acl="authenticated-read"
)
object_4 = s3_gate_object.get_object_s3(self.s3_client, bucket, file_name)
assert get_file_hash(file_path_4) == get_file_hash(object_4), "Hashes must be the same"

Expand All @@ -750,7 +755,9 @@ def test_s3_put_object_acl(
GrantFullControl=f"id={self.other_public_key}",
)
obj_acl = s3_gate_object.get_object_acl_s3(self.s3_client, bucket, file_name_5)
assert_s3_acl(acl_grants=obj_acl, permitted_users="CanonicalUser")
assert_object_s3_acl(
acl_grants=obj_acl, permitted_users="CanonicalUser", acl="grant-full-control"
)
object_4 = s3_gate_object.get_object_s3(self.s3_client, bucket, file_name_5)
assert get_file_hash(file_path_5) == get_file_hash(object_4), "Hashes must be the same"

Expand All @@ -765,7 +772,7 @@ def test_s3_put_object_acl(
GrantRead="uri=http://acs.amazonaws.com/groups/global/AllUsers",
)
obj_acl = s3_gate_object.get_object_acl_s3(self.s3_client, bucket, file_name_5)
assert_s3_acl(acl_grants=obj_acl, permitted_users="AllUsers")
assert_object_s3_acl(acl_grants=obj_acl, permitted_users="AllUsers", acl="grant-read")
object_7 = s3_gate_object.get_object_s3(self.s3_client, bucket, file_name_5)
assert get_file_hash(file_path_7) == get_file_hash(object_7), "Hashes must be the same"

Expand Down

0 comments on commit 1b54d3e

Please sign in to comment.