-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Alexander Vasin
committed
May 6, 2024
1 parent
7f55e23
commit 4005881
Showing
1 changed file
with
140 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
import gzip | ||
import sys | ||
from http import HTTPStatus | ||
from io import BytesIO | ||
from pathlib import Path | ||
|
||
import pytest | ||
|
||
from aiohttp_s3_client import S3Client | ||
|
||
|
||
@pytest.mark.parametrize("object_name", ("test/test", "/test/test")) | ||
async def test_put_str(s3_client: S3Client, object_name): | ||
data = "hello, world" | ||
resp = await s3_client.put(object_name, data) | ||
assert resp.status == HTTPStatus.OK | ||
|
||
resp = await s3_client.get(object_name) | ||
result = await resp.text() | ||
assert result == data | ||
|
||
|
||
@pytest.mark.parametrize("object_name", ("test/test", "/test/test")) | ||
async def test_put_bytes(s3_client: S3Client, s3_read, object_name): | ||
data = b"hello, world" | ||
resp = await s3_client.put(object_name, data) | ||
assert resp.status == HTTPStatus.OK | ||
assert (await s3_read()) == data | ||
|
||
|
||
@pytest.mark.parametrize("object_name", ("test/test", "/test/test")) | ||
async def test_put_async_iterable(s3_client: S3Client, s3_read, object_name): | ||
async def async_iterable(iterable: bytes): | ||
for i in iterable: | ||
yield i.to_bytes(1, sys.byteorder) | ||
|
||
data = b"hello, world" | ||
resp = await s3_client.put(object_name, async_iterable(data)) | ||
assert resp.status == HTTPStatus.OK | ||
|
||
assert (await s3_read()) == data | ||
|
||
|
||
async def test_put_file(s3_client: S3Client, s3_read, tmp_path): | ||
data = b"hello, world" | ||
|
||
with (tmp_path / "hello.txt").open("wb") as f: | ||
f.write(data) | ||
f.flush() | ||
|
||
# Test upload by file str path | ||
resp = await s3_client.put_file("/test/test", f.name) | ||
assert resp.status == HTTPStatus.OK | ||
|
||
assert (await s3_read("/test/test")) == data | ||
|
||
# Test upload by file Path | ||
resp = await s3_client.put_file("/test/test2", Path(f.name)) | ||
assert resp.status == HTTPStatus.OK | ||
|
||
assert (await s3_read("/test/test2")) == data | ||
|
||
|
||
async def test_list_objects_v2(s3_client: S3Client, s3_read, tmp_path): | ||
data = b"hello, world" | ||
|
||
with (tmp_path / "hello.txt").open("wb") as f: | ||
f.write(data) | ||
f.flush() | ||
|
||
resp = await s3_client.put_file("/test/list/test1", f.name) | ||
assert resp.status == HTTPStatus.OK | ||
|
||
resp = await s3_client.put_file("/test/list/test2", f.name) | ||
assert resp.status == HTTPStatus.OK | ||
|
||
# Test list file | ||
batch = 0 | ||
async for result, prefixes in s3_client.list_objects_v2( | ||
prefix="test/list/", | ||
delimiter="/", | ||
max_keys=1, | ||
): | ||
batch += 1 | ||
assert result[0].key == f"test/list/test{batch}" | ||
assert result[0].size == len(data) | ||
|
||
|
||
async def test_list_objects_v2_prefix(s3_client: S3Client, s3_read, tmp_path): | ||
data = b"hello, world" | ||
|
||
with (tmp_path / "hello.txt").open("wb") as f: | ||
f.write(data) | ||
f.flush() | ||
|
||
resp = await s3_client.put_file("/test2/list1/test1", f.name) | ||
assert resp.status == HTTPStatus.OK | ||
|
||
resp = await s3_client.put_file("/test2/list2/test2", f.name) | ||
assert resp.status == HTTPStatus.OK | ||
|
||
# Test list file | ||
batch = 0 | ||
|
||
async for result, prefixes in s3_client.list_objects_v2( | ||
prefix="test2/", | ||
delimiter="/", | ||
): | ||
batch += 1 | ||
assert len(result) == 0 | ||
assert prefixes[0] == "test2/list1/" | ||
assert prefixes[1] == "test2/list2/" | ||
|
||
|
||
async def test_url_path_with_colon(s3_client: S3Client, s3_read): | ||
data = b"hello, world" | ||
key = "/some-path:with-colon.txt" | ||
resp = await s3_client.put(key, data) | ||
assert resp.status == HTTPStatus.OK | ||
|
||
assert (await s3_read(key)) == data | ||
|
||
|
||
@pytest.mark.parametrize("object_name", ("test/test", "/test/test")) | ||
async def test_put_compression(s3_client: S3Client, s3_read, object_name): | ||
async def async_iterable(iterable: bytes): | ||
for i in iterable: | ||
yield i.to_bytes(1, sys.byteorder) | ||
|
||
data = b"hello, world" | ||
resp = await s3_client.put( | ||
object_name, async_iterable(data), compress="gzip", | ||
) | ||
assert resp.status == HTTPStatus.OK | ||
|
||
result = await s3_read() | ||
# assert resp.headers[hdrs.CONTENT_ENCODING] == "gzip" | ||
# FIXME: uncomment after update fakes3 image | ||
actual = gzip.GzipFile(fileobj=BytesIO(result)).read() | ||
assert actual == data |