Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: add checks for ContainerID in bearer token, fixes #546 #642

Merged
merged 1 commit into from
Sep 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions pytest_tests/testsuites/acl/test_bearer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import os
import uuid
from collections import namedtuple

import allure
import pytest
from cluster import Cluster
from cluster_test_base import ClusterTestBase
from common import ASSETS_DIR, TEST_FILES_DIR, WALLET_PASS
from epoch import get_epoch
from neofs_testlib.shell import Shell
from neofs_testlib.utils.wallet import get_last_address_from_wallet
from python_keywords.acl import (
EACLAccess,
Expand All @@ -19,11 +22,16 @@
sign_bearer,
wait_for_cache_expired,
)
from python_keywords.container import create_container
from python_keywords.container_access import (
check_custom_access_to_container,
check_full_access_to_container,
check_no_access_to_container,
)
from python_keywords.neofs_verbs import put_object_to_random_node
from wellknown_acl import PUBLIC_ACL

ContainerTuple = namedtuple("ContainerTuple", ["cid", "objects_oids"])


@pytest.mark.acl
Expand Down Expand Up @@ -308,3 +316,147 @@ def test_bearer_token_expiration(self, wallets, eacl_container_with_objects, exp
shell=self.shell,
cluster=self.cluster,
)

@allure.title("Check bearer token with ContainerID specified")
def test_bearer_token_with_container_id(
self, wallets, client_shell: Shell, cluster: Cluster, file_path: str
):
user_wallet = wallets.get_wallet()
container1, container2 = self._create_containers_with_objects(
containers_count=2,
objects_count=1,
user_wallet=user_wallet,
client_shell=client_shell,
cluster=cluster,
file_path=file_path,
)

with allure.step(
f"Create bearer token with all operations allowed for cid: {container1.cid}"
):
bearer = form_bearertoken_file(
user_wallet.wallet_path,
container1.cid,
[
EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.USER)
for op in EACLOperation
],
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)

with allure.step(
f"Check {EACLRole.USER.value} with token has access to all operations with container {container1.cid}"
):
check_full_access_to_container(
user_wallet.wallet_path,
container1.cid,
container1.objects_oids.pop(),
file_path,
bearer=bearer,
wallet_config=user_wallet.config_path,
shell=self.shell,
cluster=self.cluster,
)

with allure.step(
f"Check {EACLRole.USER.value} has no access to all operations with another container {container2.cid}"
):
check_no_access_to_container(
user_wallet.wallet_path,
container2.cid,
container2.objects_oids.pop(),
file_path,
bearer=bearer,
wallet_config=user_wallet.config_path,
shell=self.shell,
cluster=self.cluster,
)

@allure.title("Check bearer token without ContainerID specified")
def test_bearer_token_without_container_id(
self, wallets, client_shell: Shell, cluster: Cluster, file_path: str
):
user_wallet = wallets.get_wallet()
container1, container2 = self._create_containers_with_objects(
containers_count=2,
objects_count=1,
user_wallet=user_wallet,
client_shell=client_shell,
cluster=cluster,
file_path=file_path,
)

with allure.step(f"Create bearer token with all operations allowed for all containers"):
bearer = form_bearertoken_file(
user_wallet.wallet_path,
None,
[
EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.USER)
for op in EACLOperation
],
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)

with allure.step(
f"Check {EACLRole.USER.value} with token has access to all operations with container {container1.cid}"
):
check_full_access_to_container(
user_wallet.wallet_path,
container1.cid,
container1.objects_oids.pop(),
file_path,
bearer=bearer,
wallet_config=user_wallet.config_path,
shell=self.shell,
cluster=self.cluster,
)

with allure.step(
f"Check {EACLRole.USER.value} with token has access to all operations with container {container2.cid}"
):
check_full_access_to_container(
user_wallet.wallet_path,
container2.cid,
container2.objects_oids.pop(),
file_path,
bearer=bearer,
wallet_config=user_wallet.config_path,
shell=self.shell,
cluster=self.cluster,
)

def _create_containers_with_objects(
self,
containers_count: int,
objects_count: int,
user_wallet,
client_shell: Shell,
cluster: Cluster,
file_path: str,
) -> list[ContainerTuple]:
result = []
for _ in range(containers_count):
with allure.step("Create eACL public container"):
cid = create_container(
user_wallet.wallet_path,
basic_acl=PUBLIC_ACL,
shell=client_shell,
endpoint=cluster.default_rpc_endpoint,
)

with allure.step("Add test objects to container"):
objects_oids = [
put_object_to_random_node(
user_wallet.wallet_path,
file_path,
cid,
attributes={"key1": "val1", "key": val, "key2": "abc"},
shell=client_shell,
cluster=cluster,
)
for val in range(objects_count)
]
result.append(ContainerTuple(cid=cid, objects_oids=objects_oids))
return result
Loading