From 972d08355c4f540c70825e544e345d99b147cba5 Mon Sep 17 00:00:00 2001 From: TomBaxter Date: Mon, 25 Sep 2017 15:54:07 -0400 Subject: [PATCH] Raise exception for copy/move replace folder that orphans itself. [#SVCS-479] Replace folder from inside folder deletes both Add replace_will_orphan(src_path,dest_path) Checks if dest_path overwrite will orphan src_path Call replace_will_orphan in copy/move defs in waterbutler.core.provider --- tests/core/test_provider.py | 57 +++++++++++++++++++++ tests/providers/osfstorage/test_provider.py | 1 - waterbutler/core/exceptions.py | 9 ++++ waterbutler/core/provider.py | 40 ++++++++++++++- 4 files changed, 104 insertions(+), 3 deletions(-) diff --git a/tests/core/test_provider.py b/tests/core/test_provider.py index d06386560..a41bea98a 100644 --- a/tests/core/test_provider.py +++ b/tests/core/test_provider.py @@ -4,6 +4,7 @@ from unittest import mock from waterbutler.core import metadata from waterbutler.core import exceptions +from waterbutler.core.path import WaterButlerPath @pytest.fixture @@ -37,6 +38,36 @@ def test_cant_intra_move(self, provider1): def test_can_intra_move(self, provider2): assert provider2.can_intra_move(provider2) is True + @pytest.mark.asyncio + async def test_will_orphan_dest_is_file(self, provider1): + src_path = WaterButlerPath('/folder1/folder1/', + _ids=['root', 'folder', 'Folder'], + folder=True) + dest_path = WaterButlerPath('/folder1/', + _ids=['root','folder'], + folder=False) + assert provider1.replace_will_orphan(src_path, dest_path) == False + + @pytest.mark.asyncio + async def test_will_orphan_dest_different_names(self, provider1): + src_path = WaterButlerPath('/folder1/folder1/', + _ids=['root', 'folder', 'Folder'], + folder=True) + dest_path = WaterButlerPath('/folder2/', + _ids=['root','folder'], + folder=True) + assert provider1.replace_will_orphan(src_path, dest_path) == False + + @pytest.mark.asyncio + async def test_will_orphan_dest_different_branch(self, provider1): + src_path = WaterButlerPath('/other_folder/folder1/', + _ids=['root', 'other_folder', 'Folder'], + folder=True) + dest_path = WaterButlerPath('/folder1/', + _ids=['root','folder'], + folder=True) + assert provider1.replace_will_orphan(src_path, dest_path) == False + @pytest.mark.asyncio async def test_exists(self, provider1): ret = await provider1.exists('somepath') @@ -309,6 +340,19 @@ async def test_calls_intra_copy(self, provider1): provider1.can_intra_copy.assert_called_once_with(provider1, src_path) provider1.intra_copy.assert_called_once_with(provider1, src_path, dest_path) + @pytest.mark.asyncio + @pytest.mark.aiohttpretty + async def test_intra_copy_folder_orphan(self, provider1): + src_path = await provider1.validate_path('/folder1/folder1/') + dest_path = await provider1.validate_path('/') + + provider1.can_intra_copy = mock.Mock(return_value=True) + + with pytest.raises(exceptions.OrphanSelfError) as exc: + await provider1.copy(provider1, src_path, dest_path) + assert exc.value.code == 400 + assert exc.typename == 'OrphanSelfError' + @pytest.mark.asyncio async def test_calls_folder_op_on_dir(self, provider1): src_path = await provider1.validate_path('/source/path/') @@ -435,6 +479,19 @@ async def test_calls_intra_move(self, provider1): provider1.can_intra_move.assert_called_once_with(provider1, src_path) provider1.intra_move.assert_called_once_with(provider1, src_path, dest_path) + @pytest.mark.asyncio + @pytest.mark.aiohttpretty + async def test_intra_move_folder_orphan(self, provider1): + src_path = await provider1.validate_path('/folder1/folder1/') + dest_path = await provider1.validate_path('/') + + provider1.can_intra_move = mock.Mock(return_value=True) + + with pytest.raises(exceptions.OrphanSelfError) as exc: + await provider1.move(provider1, src_path, dest_path) + assert exc.value.code == 400 + assert exc.typename == 'OrphanSelfError' + @pytest.mark.asyncio async def test_calls_folder_op_on_dir_and_delete(self, provider1): src_path = await provider1.validate_path('/source/path/') diff --git a/tests/providers/osfstorage/test_provider.py b/tests/providers/osfstorage/test_provider.py index 8cb3eed3f..748a1b832 100644 --- a/tests/providers/osfstorage/test_provider.py +++ b/tests/providers/osfstorage/test_provider.py @@ -327,7 +327,6 @@ async def test_intra_copy_folder(self, provider_and_mock, provider_and_mock2, src_mock._children_metadata.assert_not_called() src_mock.validate_v1_path.assert_not_called() - @pytest.mark.asyncio @pytest.mark.aiohttpretty async def test_intra_copy_file(self, provider_and_mock, provider_and_mock2, diff --git a/waterbutler/core/exceptions.py b/waterbutler/core/exceptions.py index 56f628a09..c84a10491 100644 --- a/waterbutler/core/exceptions.py +++ b/waterbutler/core/exceptions.py @@ -215,6 +215,15 @@ def __init__(self, path): 'folder onto itself is not supported.'.format(path)) +class OrphanSelfError(InvalidParameters): + """Error for conflict=replace sent to methods copy or move, when action would result + in removal of a directory containing the file or folder to be copied or moved. + """ + def __init__(self, path): + super().__init__('Unable to move or copy \'{}\'. Moving or copying a folder onto parent ' + 'folder of same name with "replace" is not supported.'.format(path)) + + class UnsupportedOperationError(ProviderError): def __init__(self, message, code=HTTPStatus.FORBIDDEN, is_user_error=True): if not message: diff --git a/waterbutler/core/provider.py b/waterbutler/core/provider.py index b2cbe7dea..c1d23e49c 100644 --- a/waterbutler/core/provider.py +++ b/waterbutler/core/provider.py @@ -207,12 +207,12 @@ async def move(self, Performs a copy and then a delete. Calls :func:`BaseProvider.intra_move` if possible. - :param dest_provider: ( :class:`.BaseProvider` ) The provider to move to + :param dest_provider: ( :class:`BaseProvider` ) The provider to move to :param src_path: ( :class:`.WaterButlerPath` ) Path to where the resource can be found :param dest_path: ( :class:`.WaterButlerPath` ) Path to where the resource will be moved :param rename: ( :class:`str` ) The desired name of the resulting path, may be incremented :param conflict: ( :class:`str` ) What to do in the event of a name conflict, ``replace`` or ``keep`` - :param handle_naming: ( :class:`bool` ) If a naming conflict is detected, should it be automatically handled? + :param handle_naming: ( :class:`bool` ) If true will be run through handle_naming method, and handled in accordance with conflict parameter. """ args = (dest_provider, src_path, dest_path) kwargs = {'rename': rename, 'conflict': conflict} @@ -252,6 +252,13 @@ async def move(self, ): raise exceptions.OverwriteSelfError(src_path) + if ( + self == dest_provider and + conflict == 'replace' and + dest_provider.replace_will_orphan(src_path, dest_path) + ): + raise exceptions.OrphanSelfError(src_path) + self.provider_metrics.add('move.can_intra_move', False) if self.can_intra_move(dest_provider, src_path): self.provider_metrics.add('move.can_intra_move', True) @@ -311,6 +318,13 @@ async def copy(self, ): raise exceptions.OverwriteSelfError(src_path) + if ( + self == dest_provider and + conflict == 'replace' and + dest_provider.replace_will_orphan(src_path, dest_path) + ): + raise exceptions.OrphanSelfError(src_path) + self.provider_metrics.add('copy.can_intra_copy', False) if self.can_intra_copy(dest_provider, src_path): self.provider_metrics.add('copy.can_intra_copy', True) @@ -523,6 +537,28 @@ async def intra_move(self, await self.delete(src_path) return data, created + def replace_will_orphan(self, + src_path: wb_path.WaterButlerPath, + dest_path: wb_path.WaterButlerPath) -> bool: + """Check if copy/move conflict=replace will orphan src_path. + + Assumes src_provider == dest_provider and conflict == 'replace' have already been checked. + This can be an expensive operation. Should be used as last resort. Should be overridden if + provider has a cheaper option. + """ + if not dest_path.kind == 'folder': + return False + if src_path.name != dest_path.name: + return False + + incr_path = src_path + while incr_path.materialized_path != '/': + incr_path = incr_path.parent + if incr_path.materialized_path == dest_path.materialized_path: + return True + + return False + async def exists(self, path: wb_path.WaterButlerPath, **kwargs) \ -> typing.Union[bool, wb_metadata.BaseMetadata, typing.List[wb_metadata.BaseMetadata]]: """Check for existence of WaterButlerPath