diff --git a/pytest_tests/helpers/grpc_responses.py b/pytest_tests/helpers/grpc_responses.py index 5f44c4cda..abb065d81 100644 --- a/pytest_tests/helpers/grpc_responses.py +++ b/pytest_tests/helpers/grpc_responses.py @@ -27,6 +27,7 @@ INVALID_LENGTH_SPECIFIER = "invalid '{range}' range length specifier" NOT_CONTAINER_OWNER = "provided account differs with the container owner" +NOT_SESSION_CONTAINER_OWNER = "session issuer differs with the container owner" TIMED_OUT = "timed out after \\d+ seconds" def error_matches_status(error: Exception, status_pattern: str) -> bool: diff --git a/pytest_tests/testsuites/session_token/conftest.py b/pytest_tests/testsuites/session_token/conftest.py index a80aab538..94573a480 100644 --- a/pytest_tests/testsuites/session_token/conftest.py +++ b/pytest_tests/testsuites/session_token/conftest.py @@ -24,3 +24,11 @@ def stranger_wallet(wallet_factory: WalletFactory) -> WalletFile: Returns stranger wallet which should fail to obtain data """ return wallet_factory.create_wallet() + + +@pytest.fixture(scope="module") +def scammer_wallet(wallet_factory: WalletFactory) -> WalletFile: + """ + Returns stranger wallet which should fail to obtain data + """ + return wallet_factory.create_wallet() diff --git a/pytest_tests/testsuites/session_token/test_static_session_token_container.py b/pytest_tests/testsuites/session_token/test_static_session_token_container.py index 8e73137fe..e0cc9ef9f 100644 --- a/pytest_tests/testsuites/session_token/test_static_session_token_container.py +++ b/pytest_tests/testsuites/session_token/test_static_session_token_container.py @@ -1,6 +1,9 @@ +import logging + import allure import pytest from file_helper import generate_file +from grpc_responses import TIMED_OUT, NOT_SESSION_CONTAINER_OWNER from neofs_testlib.shell import Shell from python_keywords.acl import ( EACLAccess, @@ -38,6 +41,15 @@ def static_sessions( """ Returns dict with static session token file paths for all verbs with default lifetime """ + return self.static_session_token(owner_wallet, user_wallet, client_shell, temp_directory) + + def static_session_token( + self, + owner_wallet: WalletFile, + user_wallet: WalletFile, + client_shell: Shell, + temp_directory: str, + ) -> dict[ContainerVerb, str]: return { verb: get_container_signed_token( owner_wallet, user_wallet, verb, client_shell, temp_directory @@ -142,6 +154,95 @@ def test_static_session_token_container_delete( owner_wallet.path, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint ) + @allure.title("Not owner and not trusted party can NOT delete container") + def test_static_session_token_container_delete_only_trusted_party_proved_by_the_container_owner( + self, + owner_wallet: WalletFile, + user_wallet: WalletFile, + stranger_wallet: WalletFile, + scammer_wallet: WalletFile, + static_sessions: dict[ContainerVerb, str], + temp_directory: str, + not_owner_wallet, + ): + with allure.step("Create container"): + cid = create_container( + owner_wallet.path, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + + user_token = self.static_session_token(owner_wallet, user_wallet, self.shell, temp_directory) + stranger_token = self.static_session_token(user_wallet, stranger_wallet, self.shell, temp_directory) + + with allure.step("Try to delete container using stranger token"): + with pytest.raises(RuntimeError): + delete_container( + wallet=user_wallet.path, + cid=cid, + session_token=stranger_token[ContainerVerb.DELETE], + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + await_mode=True, + ) + + with allure.step("Try to force delete container using stranger token"): + with pytest.raises(Exception, match=TIMED_OUT): + delete_container( + wallet=user_wallet.path, + cid=cid, + session_token=stranger_token[ContainerVerb.DELETE], + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + await_mode=True, + force=True, + ) + + with allure.step("Try to delete container using scammer token"): + with pytest.raises(Exception, match=TIMED_OUT): + delete_container( + wallet=scammer_wallet.path, + cid=cid, + session_token=user_token[ContainerVerb.DELETE], + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + await_mode=True, + ) + + with pytest.raises(RuntimeError, match=NOT_SESSION_CONTAINER_OWNER): + delete_container( + wallet=scammer_wallet.path, + cid=cid, + session_token=stranger_token[ContainerVerb.DELETE], + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + await_mode=True, + ) + + with allure.step("Try to force delete container using scammer token"): + with pytest.raises(Exception, match=TIMED_OUT): + delete_container( + wallet=scammer_wallet.path, + cid=cid, + session_token=user_token[ContainerVerb.DELETE], + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + await_mode=True, + force=True, + ) + + with pytest.raises(Exception, match=TIMED_OUT): + delete_container( + wallet=scammer_wallet.path, + cid=cid, + session_token=stranger_token[ContainerVerb.DELETE], + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + await_mode=True, + force=True, + ) + + def test_static_session_token_container_set_eacl( self, owner_wallet: WalletFile, @@ -179,3 +280,61 @@ def test_static_session_token_container_set_eacl( wait_for_cache_expired() assert not can_put_object(stranger_wallet.path, cid, file_path, self.shell, self.cluster) + + @allure.title("Not owner and not trusted party can NOT set eacl") + def test_static_session_token_container_set_eacl_only_trusted_party_proved_by_the_container_owner( + self, + owner_wallet: WalletFile, + user_wallet: WalletFile, + stranger_wallet: WalletFile, + scammer_wallet: WalletFile, + static_sessions: dict[ContainerVerb, str], + temp_directory: str, + not_owner_wallet, + ): + with allure.step("Create container"): + cid = create_container( + owner_wallet.path, + basic_acl=PUBLIC_ACL, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + + user_token = self.static_session_token(owner_wallet, user_wallet, self.shell, temp_directory) + stranger_token = self.static_session_token(user_wallet, stranger_wallet, self.shell, temp_directory) + + new_eacl = [ + EACLRule(access=EACLAccess.DENY, role=EACLRole.OTHERS, operation=op) + for op in EACLOperation + ] + + with allure.step(f"Try to deny all operations for other via eACL"): + with pytest.raises(Exception, match=TIMED_OUT): + set_eacl( + user_wallet.path, + cid, + create_eacl(cid, new_eacl, shell=self.shell), + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + session_token=stranger_token[ContainerVerb.SETEACL], + ) + + with allure.step(f"Try to deny all operations for other via eACL using scammer token"): + with pytest.raises(Exception, match=TIMED_OUT): + set_eacl( + scammer_wallet.path, + cid, + create_eacl(cid, new_eacl, shell=self.shell), + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + session_token=user_token[ContainerVerb.SETEACL], + ) + with pytest.raises(Exception, match=TIMED_OUT): + set_eacl( + scammer_wallet.path, + cid, + create_eacl(cid, new_eacl, shell=self.shell), + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + session_token=stranger_token[ContainerVerb.SETEACL], + )