diff --git a/pytest_tests/helpers/grpc_responses.py b/pytest_tests/helpers/grpc_responses.py index 1e9fdc355..05a42a443 100644 --- a/pytest_tests/helpers/grpc_responses.py +++ b/pytest_tests/helpers/grpc_responses.py @@ -26,6 +26,11 @@ INVALID_OFFSET_SPECIFIER = "invalid '{range}' range offset specifier" INVALID_LENGTH_SPECIFIER = "invalid '{range}' range length specifier" +NOT_CONTAINER_OWNER = "provided account differs with the container owner" +NOT_SESSION_CONTAINER_OWNER = "session issuer differs with the container owner" +TIMED_OUT = "timed out after \\d+ seconds" +CONTAINER_DELETION_TIMED_OUT = "container deletion: await timeout expired" +EACL_TIMED_OUT = "eACL setting: await timeout expired" def error_matches_status(error: Exception, status_pattern: str) -> bool: """ diff --git a/pytest_tests/testsuites/acl/test_eacl.py b/pytest_tests/testsuites/acl/test_eacl.py index eb6ceaeea..04882bd3d 100644 --- a/pytest_tests/testsuites/acl/test_eacl.py +++ b/pytest_tests/testsuites/acl/test_eacl.py @@ -2,6 +2,7 @@ import pytest from cluster_test_base import ClusterTestBase from failover_utils import wait_object_replication +from grpc_responses import NOT_CONTAINER_OWNER from neofs_testlib.shell import Shell from python_keywords.acl import ( EACLAccess, @@ -28,6 +29,7 @@ can_put_object, can_search_object, ) +from testsuites.acl.conftest import Wallets from wellknown_acl import PUBLIC_ACL @@ -35,7 +37,7 @@ @pytest.mark.acl_extended class TestEACLContainer(ClusterTestBase): @pytest.fixture(scope="function") - def eacl_full_placement_container_with_object(self, wallets, file_path) -> str: + def eacl_full_placement_container_with_object(self, wallets, file_path) -> tuple[str, str, str]: user_wallet = wallets.get_wallet() storage_nodes = self.cluster.storage_nodes node_count = len(storage_nodes) @@ -679,3 +681,28 @@ def test_extended_actions_system(self, wallets, eacl_container_with_objects): endpoint=endpoint, wallet_config=storage_wallet.config_path, ) + + @pytest.mark.trusted_party_proved + @allure.title("Not owner and not trusted party can NOT set eacl") + def test_only_owner_can_set_eacl( + self, + wallets: Wallets, + eacl_full_placement_container_with_object: tuple[str, str, str], + not_owner_wallet: str + ): + cid, oid, file_path = eacl_full_placement_container_with_object + + eacl = [ + EACLRule(access=EACLAccess.DENY, role=EACLRole.USER, operation=op) + for op in EACLOperation + ] + + with allure.step("Try to change EACL"): + with pytest.raises(RuntimeError, match=NOT_CONTAINER_OWNER): + set_eacl( + wallet_path=not_owner_wallet, + cid=cid, + eacl_table_path=create_eacl(cid, eacl, shell=self.shell), + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) diff --git a/pytest_tests/testsuites/conftest.py b/pytest_tests/testsuites/conftest.py index fc41002d0..98fa2d028 100644 --- a/pytest_tests/testsuites/conftest.py +++ b/pytest_tests/testsuites/conftest.py @@ -9,6 +9,7 @@ import allure import pytest import yaml +from typing import Optional from binary_version_helper import get_local_binaries_versions, get_remote_binaries_versions from cluster import Cluster from common import ( @@ -304,33 +305,18 @@ def background_grpc_load(client_shell: Shell, hosting: Hosting): k6_verify_instance.wait_until_finished(BACKGROUND_LOAD_MAX_TIME) -@pytest.fixture(scope="session") -@allure.title("Prepare wallet and deposit") -def default_wallet(client_shell: Shell, temp_directory: str, cluster: Cluster): - wallet_path = os.path.join(os.getcwd(), ASSETS_DIR, f"{str(uuid.uuid4())}.json") - init_wallet(wallet_path, WALLET_PASS) - allure.attach.file(wallet_path, os.path.basename(wallet_path), allure.attachment_type.JSON) - - if not FREE_STORAGE: - main_chain = cluster.main_chain_nodes[0] - deposit = 30 - transfer_gas( - shell=client_shell, - amount=deposit + 1, - main_chain=main_chain, - wallet_to_path=wallet_path, - wallet_to_password=WALLET_PASS, - ) - deposit_gas( - shell=client_shell, - main_chain=main_chain, - amount=deposit, - wallet_from_path=wallet_path, - wallet_from_password=WALLET_PASS, - ) +@pytest.fixture(scope="function") +@allure.title("Prepare not owner wallet and deposit") +def not_owner_wallet(client_shell: Shell, temp_directory: str, cluster: Cluster) -> str: + wallet_path = create_wallet(client_shell, temp_directory, cluster, 'not_owner_wallet') + yield wallet_path + os.remove(wallet_path) - return wallet_path +@pytest.fixture(scope="session") +@allure.title("Prepare default wallet and deposit") +def default_wallet(client_shell: Shell, temp_directory: str, cluster: Cluster): + return create_wallet(client_shell, temp_directory, cluster) @allure.title("Check logs for OOM and PANIC entries in {logs_dir}") def check_logs(logs_dir: str): @@ -387,4 +373,36 @@ def create_dir(dir_path: str) -> None: def remove_dir(dir_path: str) -> None: with allure.step("Remove directory"): - shutil.rmtree(dir_path, ignore_errors=True) \ No newline at end of file + shutil.rmtree(dir_path, ignore_errors=True) + + +@allure.title("Prepare wallet and deposit") +def create_wallet(client_shell: Shell, temp_directory: str, cluster: Cluster, name: Optional[str] = None) -> str: + if name is None: + wallet_name = f'{str(uuid.uuid4())}.json' + else: + wallet_name = f'{name}.json' + + wallet_path = os.path.join(os.getcwd(), ASSETS_DIR, wallet_name) + init_wallet(wallet_path, WALLET_PASS) + allure.attach.file(wallet_path, os.path.basename(wallet_path), allure.attachment_type.JSON) + + if not FREE_STORAGE: + main_chain = cluster.main_chain_nodes[0] + deposit = 30 + transfer_gas( + shell=client_shell, + amount=deposit + 1, + main_chain=main_chain, + wallet_to_path=wallet_path, + wallet_to_password=WALLET_PASS, + ) + deposit_gas( + shell=client_shell, + main_chain=main_chain, + amount=deposit, + wallet_from_path=wallet_path, + wallet_from_password=WALLET_PASS, + ) + + return wallet_path diff --git a/pytest_tests/testsuites/container/test_container.py b/pytest_tests/testsuites/container/test_container.py index 88a45e143..31ecf5376 100644 --- a/pytest_tests/testsuites/container/test_container.py +++ b/pytest_tests/testsuites/container/test_container.py @@ -3,6 +3,7 @@ import allure import pytest from epoch import tick_epoch +from grpc_responses import NOT_CONTAINER_OWNER, CONTAINER_DELETION_TIMED_OUT from python_keywords.container import ( create_container, delete_container, @@ -11,6 +12,7 @@ wait_for_container_creation, wait_for_container_deletion, ) +from wallet import WalletFile from utility import placement_policy_from_container from wellknown_acl import PRIVATE_ACL_F @@ -86,6 +88,40 @@ def test_container_creation(self, default_wallet, name): wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint ) + @pytest.mark.trusted_party_proved + @allure.title("Not owner and not trusted party can NOT delete container") + def test_only_owner_can_delete_container( + self, + not_owner_wallet: WalletFile, + default_wallet: str + ): + cid = create_container( + wallet=default_wallet, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + + with allure.step("Try to delete container"): + with pytest.raises(RuntimeError, match=NOT_CONTAINER_OWNER): + delete_container( + wallet=not_owner_wallet, + cid=cid, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + await_mode=True, + ) + + with allure.step("Try to force delete container"): + with pytest.raises(RuntimeError, match=CONTAINER_DELETION_TIMED_OUT): + delete_container( + wallet=not_owner_wallet, + cid=cid, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + await_mode=True, + force=True, + ) + @allure.title("Parallel container creation and deletion") def test_container_creation_deletion_parallel(self, default_wallet): containers_count = 3 diff --git a/pytest_tests/testsuites/session_token/conftest.py b/pytest_tests/testsuites/session_token/conftest.py index a80aab538..94573a480 100644 --- a/pytest_tests/testsuites/session_token/conftest.py +++ b/pytest_tests/testsuites/session_token/conftest.py @@ -24,3 +24,11 @@ def stranger_wallet(wallet_factory: WalletFactory) -> WalletFile: Returns stranger wallet which should fail to obtain data """ return wallet_factory.create_wallet() + + +@pytest.fixture(scope="module") +def scammer_wallet(wallet_factory: WalletFactory) -> WalletFile: + """ + Returns stranger wallet which should fail to obtain data + """ + return wallet_factory.create_wallet() diff --git a/pytest_tests/testsuites/session_token/test_static_session_token_container.py b/pytest_tests/testsuites/session_token/test_static_session_token_container.py index 8e73137fe..20bd72b0c 100644 --- a/pytest_tests/testsuites/session_token/test_static_session_token_container.py +++ b/pytest_tests/testsuites/session_token/test_static_session_token_container.py @@ -1,6 +1,7 @@ import allure import pytest from file_helper import generate_file +from grpc_responses import NOT_SESSION_CONTAINER_OWNER, CONTAINER_DELETION_TIMED_OUT, EACL_TIMED_OUT from neofs_testlib.shell import Shell from python_keywords.acl import ( EACLAccess, @@ -38,6 +39,15 @@ def static_sessions( """ Returns dict with static session token file paths for all verbs with default lifetime """ + return self.static_session_token(owner_wallet, user_wallet, client_shell, temp_directory) + + def static_session_token( + self, + owner_wallet: WalletFile, + user_wallet: WalletFile, + client_shell: Shell, + temp_directory: str, + ) -> dict[ContainerVerb, str]: return { verb: get_container_signed_token( owner_wallet, user_wallet, verb, client_shell, temp_directory @@ -142,6 +152,118 @@ def test_static_session_token_container_delete( owner_wallet.path, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint ) + @pytest.mark.trusted_party_proved + @allure.title("Not owner user can NOT delete container") + def test_not_owner_user_can_not_delete_container( + self, + owner_wallet: WalletFile, + user_wallet: WalletFile, + stranger_wallet: WalletFile, + scammer_wallet: WalletFile, + static_sessions: dict[ContainerVerb, str], + temp_directory: str, + not_owner_wallet, + ): + with allure.step("Create container"): + cid = create_container( + owner_wallet.path, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + + user_token = self.static_session_token(owner_wallet, user_wallet, self.shell, temp_directory) + stranger_token = self.static_session_token(user_wallet, stranger_wallet, self.shell, temp_directory) + + with allure.step("Try to delete container using stranger token"): + with pytest.raises(RuntimeError, match=NOT_SESSION_CONTAINER_OWNER): + delete_container( + wallet=user_wallet.path, + cid=cid, + session_token=stranger_token[ContainerVerb.DELETE], + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + await_mode=True, + ) + + with allure.step("Try to force delete container using stranger token"): + with pytest.raises(RuntimeError, match=CONTAINER_DELETION_TIMED_OUT): + delete_container( + wallet=user_wallet.path, + cid=cid, + session_token=stranger_token[ContainerVerb.DELETE], + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + await_mode=True, + force=True, + ) + + @pytest.mark.trusted_party_proved + @allure.title("Not trusted party user can NOT delete container") + def test_not_trusted_party_user_can_not_delete_container( + self, + owner_wallet: WalletFile, + user_wallet: WalletFile, + stranger_wallet: WalletFile, + scammer_wallet: WalletFile, + static_sessions: dict[ContainerVerb, str], + temp_directory: str, + not_owner_wallet, + ): + with allure.step("Create container"): + cid = create_container( + owner_wallet.path, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + + user_token = self.static_session_token(owner_wallet, user_wallet, self.shell, temp_directory) + stranger_token = self.static_session_token(user_wallet, stranger_wallet, self.shell, temp_directory) + + with allure.step("Try to delete container using scammer token"): + with pytest.raises(RuntimeError, match=CONTAINER_DELETION_TIMED_OUT): + delete_container( + wallet=scammer_wallet.path, + cid=cid, + session_token=user_token[ContainerVerb.DELETE], + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + await_mode=True, + ) + + with pytest.raises(RuntimeError, match=NOT_SESSION_CONTAINER_OWNER): + delete_container( + wallet=scammer_wallet.path, + cid=cid, + session_token=stranger_token[ContainerVerb.DELETE], + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + await_mode=True, + ) + + with allure.step("Try to force delete container using scammer token"): + with pytest.raises(RuntimeError, match=CONTAINER_DELETION_TIMED_OUT): + delete_container( + wallet=scammer_wallet.path, + cid=cid, + session_token=user_token[ContainerVerb.DELETE], + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + await_mode=True, + force=True, + ) + + with pytest.raises(RuntimeError, match=CONTAINER_DELETION_TIMED_OUT): + delete_container( + wallet=scammer_wallet.path, + cid=cid, + session_token=stranger_token[ContainerVerb.DELETE], + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + await_mode=True, + force=True, + ) + + def test_static_session_token_container_set_eacl( self, owner_wallet: WalletFile, @@ -179,3 +301,64 @@ def test_static_session_token_container_set_eacl( wait_for_cache_expired() assert not can_put_object(stranger_wallet.path, cid, file_path, self.shell, self.cluster) + + @pytest.mark.trusted_party_proved + @allure.title("Not owner and not trusted party can NOT set eacl") + def test_static_session_token_container_set_eacl_only_trusted_party_proved_by_the_container_owner( + self, + owner_wallet: WalletFile, + user_wallet: WalletFile, + stranger_wallet: WalletFile, + scammer_wallet: WalletFile, + static_sessions: dict[ContainerVerb, str], + temp_directory: str, + not_owner_wallet, + ): + with allure.step("Create container"): + cid = create_container( + owner_wallet.path, + basic_acl=PUBLIC_ACL, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + + user_token = self.static_session_token(owner_wallet, user_wallet, self.shell, temp_directory) + stranger_token = self.static_session_token(user_wallet, stranger_wallet, self.shell, temp_directory) + + new_eacl = [ + EACLRule(access=EACLAccess.DENY, role=EACLRole.OTHERS, operation=op) + for op in EACLOperation + ] + + with allure.step(f"Try to deny all operations for other via eACL"): + with pytest.raises(RuntimeError, match=NOT_SESSION_CONTAINER_OWNER): + set_eacl( + user_wallet.path, + cid, + create_eacl(cid, new_eacl, shell=self.shell), + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + session_token=stranger_token[ContainerVerb.SETEACL], + ) + + with allure.step(f"Try to deny all operations for other via eACL using scammer wallet"): + with allure.step(f"Using user token"): + with pytest.raises(RuntimeError, match=EACL_TIMED_OUT): + set_eacl( + scammer_wallet.path, + cid, + create_eacl(cid, new_eacl, shell=self.shell), + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + session_token=user_token[ContainerVerb.SETEACL], + ) + with allure.step(f"Using scammer token"): + with pytest.raises(RuntimeError, match=NOT_SESSION_CONTAINER_OWNER): + set_eacl( + scammer_wallet.path, + cid, + create_eacl(cid, new_eacl, shell=self.shell), + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + session_token=stranger_token[ContainerVerb.SETEACL], + )