diff --git a/test/test_director.py b/test/test_director.py index 3954093..618565b 100644 --- a/test/test_director.py +++ b/test/test_director.py @@ -23,6 +23,21 @@ from pytest_httpserver import HTTPServer +listing_response = ('\n' + '\n
\n\n' + '\n\n' + 'Mode | Flags | Size | ' + 'Modified | Name |
---|---|---|---|---|
---r-- | 16 | ' + '24 | Wed, 20 Mar 2024 15:50:39 GMT | ' + 'file1 |
---r-- | 16 | 1116 | Wed, 20 Mar 2024 15:50:40 GMT | ' + 'file2/a> |
d--r-x | 19 | 4096 | Wed, 20 Mar 2024 15:50:40 GMT | ' + 'file3 |
Request by unknown.189071:38@[::ffff:128.104.153.58] ( [::ffff:128.104.153.58] )
\nPowered by XrdHTTP v5.6.8 (CERN IT-SDC)
\n') + @pytest.fixture(scope="session") def ca(): return trustme.CA() @@ -66,6 +81,215 @@ async def clientFactory(**kwargs): return clientFactory +def test_ls(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("foo/bar") + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar", method="GET").respond_with_data(listing_response) + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + assert pelfs.ls("/foo/bar", detail=False) == ['/foo/bar/file1', '/foo/bar/file2', '/foo/bar/file3'] + +def test_glob(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("foo/bar") + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar/*").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_oneshot_request("/foo/bar/").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar", method="GET").respond_with_data(listing_response) + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + assert pelfs.glob("/foo/bar/*") == ['/foo/bar/file1', '/foo/bar/file2', '/foo/bar/file3'] + +def test_find(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("foo/bar") + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar", method="GET").respond_with_data(listing_response) + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + assert pelfs.find("/foo/bar") == ['/foo/bar/file1', '/foo/bar/file2', '/foo/bar/file3'] + +def test_info(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("foo/bar") + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar", method="GET").respond_with_data(listing_response) + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + assert pelfs.info("/foo/bar") == {'name': '/foo/bar', 'size': 1425, 'mimetype': 'text/plain', 'url': '/foo/bar', 'type': 'file'} + +def test_du(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("foo/bar") + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar", method="GET").respond_with_data(listing_response) + httpserver.expect_request("/foo/bar/file1", method="HEAD").respond_with_data( + "file1", + status=307, + ) + httpserver.expect_request("/foo/bar/file2", method="HEAD").respond_with_data( + "file2!!!!", + status=307, + ) + httpserver.expect_request("/foo/bar/file3", method="HEAD").respond_with_data( + "file3-with-extra-characters-for-more-content", + status=307, + ) + + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + assert pelfs.du("/foo/bar") == 58 + + +def test_isdir(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("foo/bar") + foo_bar_file_url = httpserver.url_for("foo/bar/file1") + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar", method="GET").respond_with_data(listing_response) + httpserver.expect_oneshot_request("/foo/bar/file1").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_file_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar/file1", method="GET").respond_with_data( + "file1", + status=307, + ) + + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + assert pelfs.isdir("/foo/bar") == True + assert pelfs.isdir("/foo/bar/file1") == False + +def test_isfile(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("foo/bar") + foo_bar_file_url = httpserver.url_for("foo/bar/file1") + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar", method="GET").respond_with_data(listing_response) + httpserver.expect_oneshot_request("/foo/bar/file1").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_file_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar/file1", method="GET").respond_with_data( + "file1", + status=307, + ) + + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + assert pelfs.isfile("/foo/bar") == False + assert pelfs.isfile("/foo/bar/file1") == True + + +def test_walk(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("foo/bar") + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + httpserver.expect_request("/foo/bar", method="GET").respond_with_data(listing_response) + + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + for root, dirnames, filenames in pelfs.walk("/foo/bar"): + assert root == "/foo/bar" + assert dirnames == [] + assert 'file1' in filenames + assert 'file2' in filenames + assert 'file3' in filenames + assert len(filenames) == 3 + def test_open(httpserver: HTTPServer, get_client): foo_bar_url = httpserver.url_for("/foo/bar") httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")})