Skip to content

Commit

Permalink
tests: add test_session_token_expiration_flags
Browse files Browse the repository at this point in the history
To validate session token expiration flags - lifetime, expire_at

Signed-off-by: Evgeniy Zayats <[email protected]>
  • Loading branch information
Evgeniy Zayats committed Sep 1, 2023
1 parent cbab558 commit 3b5877a
Showing 1 changed file with 72 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import pytest
from cluster_test_base import ClusterTestBase
from common import WALLET_PASS
from epoch import get_epoch
from file_helper import generate_file
from grpc_responses import SESSION_NOT_FOUND
from grpc_responses import EXPIRED_SESSION_TOKEN, SESSION_NOT_FOUND
from neofs_testlib.utils.wallet import get_last_address_from_wallet
from python_keywords.container import create_container
from python_keywords.neofs_verbs import delete_object, put_object, put_object_to_random_node
Expand Down Expand Up @@ -144,3 +145,73 @@ def test_object_session_token(self, default_wallet, object_size):
endpoint=non_container_node.get_rpc_endpoint(),
session=session_token,
)

@allure.title("Verify session token expiration flags")
@pytest.mark.skip(reason="https://github.com/nspcc-dev/neofs-node/issues/2539")
@pytest.mark.nspcc_dev__neofs_node__issue_2539
@pytest.mark.parametrize("expiration_flag", ["lifetime", "expire_at"])
def test_session_token_expiration_flags(
self, default_wallet, simple_object_size, expiration_flag, cluster
):
rpc_endpoint = self.cluster.storage_nodes[0].get_rpc_endpoint()

with allure.step("Create Session Token with Lifetime param"):
current_epoch = get_epoch(self.shell, cluster)

session_token = create_session_token(
shell=self.shell,
owner=get_last_address_from_wallet(default_wallet, ""),
wallet_path=default_wallet,
wallet_password=WALLET_PASS,
rpc_endpoint=rpc_endpoint,
lifetime=1 if expiration_flag == "lifetime" else None,
expire_at=current_epoch + 1 if expiration_flag == "expire_at" else None,
)

with allure.step("Create Private Container"):
un_locode = self.cluster.storage_nodes[0].get_un_locode()
locode = "SPB" if un_locode == "RU LED" else un_locode.split()[1]
placement_policy = (
f"REP 1 IN LOC_{locode}_PLACE CBF 1 SELECT 1 FROM LOC_{locode} "
f'AS LOC_{locode}_PLACE FILTER "UN-LOCODE" '
f'EQ "{un_locode}" AS LOC_{locode}'
)
cid = create_container(
default_wallet,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule=placement_policy,
)

with allure.step("Verify object operations with created session token are allowed"):
file_path = generate_file(simple_object_size)
oid = put_object(
wallet=default_wallet,
path=file_path,
cid=cid,
shell=self.shell,
endpoint=rpc_endpoint,
session=session_token,
)
delete_object(
wallet=default_wallet,
cid=cid,
oid=oid,
shell=self.shell,
endpoint=rpc_endpoint,
session=session_token,
)

self.tick_epochs(2)

with allure.step("Verify object operations with created session token are not allowed"):
file_path = generate_file(simple_object_size)
with pytest.raises(RuntimeError, match=EXPIRED_SESSION_TOKEN):
oid = put_object(
wallet=default_wallet,
path=file_path,
cid=cid,
shell=self.shell,
endpoint=rpc_endpoint,
session=session_token,
)

0 comments on commit 3b5877a

Please sign in to comment.