diff --git a/anchor/models/base.py b/anchor/models/base.py index dd7b8f8..950ec5c 100644 --- a/anchor/models/base.py +++ b/anchor/models/base.py @@ -1,24 +1,34 @@ import uuid from django.db import models +from django.utils import timezone from django.utils.translation import gettext_lazy as _ -from anchor.support.base58 import b58encode +from anchor.support import base58 + +SHORT_UUID_ALPHABET: bytes = base58.BITCOIN_ALPHABET +SHORT_UUID_LENGTH = 22 def _gen_short_uuid(): - return b58encode(uuid.uuid4().bytes).decode("ascii") + return ( + base58.b58encode(uuid.uuid4().bytes, alphabet=SHORT_UUID_ALPHABET) + .decode("ascii") + .ljust(SHORT_UUID_LENGTH, chr(SHORT_UUID_ALPHABET[0])) + ) class BaseModel(models.Model): id = models.CharField( primary_key=True, - max_length=22, + max_length=SHORT_UUID_LENGTH, verbose_name="ID", editable=False, default=_gen_short_uuid, ) - created_at = models.DateTimeField(auto_now_add=True, verbose_name=_("created at")) + created_at = models.DateTimeField( + default=timezone.now, verbose_name=_("created at") + ) updated_at = models.DateTimeField(auto_now=True, verbose_name=_("updated at")) class Meta: diff --git a/anchor/models/blob/blob.py b/anchor/models/blob/blob.py index 26516a4..3bff7ec 100644 --- a/anchor/models/blob/blob.py +++ b/anchor/models/blob/blob.py @@ -102,7 +102,7 @@ def __init__(self, *args, prefix=None, backend=None, **kwargs): @property def signed_id(self): - return type(self)._get_signer().sign(self.key) + return self.get_signed_id() def get_signed_id(self, purpose: str = None): return type(self)._get_signer().sign(self.key, purpose) @@ -145,8 +145,8 @@ def unfurl(self, file: DjangoFile | Any): self.filename = self.service.get_valid_name(os.path.basename(file.name)) else: self.filename = None - except TypeError: - pass + except TypeError: # pragma: no cover + self.filename = None extension = self.extension_with_dot if extension and not self.key.endswith(extension): self.key = f"{self.key}{extension}" @@ -216,5 +216,5 @@ def custom_metadata(self): @custom_metadata.setter def custom_metadata(self, value): if self.metadata is None: - self.metadata = {} + self.metadata = dict() self.metadata["custom"] = value diff --git a/anchor/services/transformers/image.py b/anchor/services/transformers/image.py index 2682138..fe96aa3 100644 --- a/anchor/services/transformers/image.py +++ b/anchor/services/transformers/image.py @@ -1,3 +1,5 @@ +from functools import cached_property + from django.utils.module_loading import import_string from anchor.services.processors.base import BaseProcessor @@ -7,14 +9,17 @@ class ImageTransformer(BaseTransformer): + def __init__(self, *args, processor_class: type[BaseProcessor] = None, **kwargs): + super().__init__(*args, **kwargs) + self.processor_class = processor_class + def process(self, file, format: str): - processor: BaseProcessor = self.get_processor() - processor.source(file) + self.processor.source(file) for key, args in self.transformations.items(): - processor = self.apply_transformation(processor, key, args) + self.processor = self.apply_transformation(self.processor, key, args) temp = self._get_temporary_file(format) - processor.save(temp, format) + self.processor.save(temp, format) return temp def apply_transformation(self, processor, key, args): @@ -32,5 +37,11 @@ def apply_transformation(self, processor, key, args): return processor def get_processor(self): - processor_class = import_string(anchor_settings.IMAGE_PROCESSOR) + processor_class = self.processor_class or import_string( + anchor_settings.IMAGE_PROCESSOR + ) return processor_class() + + @cached_property + def processor(self): + return self.get_processor() diff --git a/tests/models/test_base.py b/tests/models/test_base.py new file mode 100644 index 0000000..58c8b97 --- /dev/null +++ b/tests/models/test_base.py @@ -0,0 +1,12 @@ +from django.test import SimpleTestCase + +from anchor.models.base import SHORT_UUID_ALPHABET, SHORT_UUID_LENGTH, _gen_short_uuid + + +class TestGenShortUuid(SimpleTestCase): + def test_gen_short_uuid(self): + for _ in range(1000): + self.assertEqual(len(_gen_short_uuid()), SHORT_UUID_LENGTH) + self.assertTrue( + all(c.encode("ascii") in SHORT_UUID_ALPHABET for c in _gen_short_uuid()) + ) diff --git a/tests/models/test_blob.py b/tests/models/test_blob.py index 7609174..ef0caa7 100644 --- a/tests/models/test_blob.py +++ b/tests/models/test_blob.py @@ -6,9 +6,12 @@ import requests from django.conf import settings from django.core.files import File +from django.core.files.base import ContentFile from django.test import SimpleTestCase, TestCase +from django.utils import timezone -from anchor.models import Blob +from anchor.models import Attachment, Blob +from anchor.settings import anchor_settings GARLIC_PNG = os.path.join(settings.BASE_DIR, "fixtures", "garlic.png") @@ -50,6 +53,11 @@ def test_mime_type_is_guessed_if_filename_is_not_available(self): blob.unfurl(File(BytesIO(b"test"))) self.assertEqual(blob.mime_type, "application/octet-stream") + def test_unknown_extensions_return_default_mime_type(self): + blob = Blob() + blob.unfurl(File(BytesIO(b"test"), name="test.unknown")) + self.assertEqual(blob.mime_type, anchor_settings.DEFAULT_MIME_TYPE) + def test_byte_size_is_extracted(self): self.assertEqual(self.blob.byte_size, 8707) @@ -58,6 +66,27 @@ def test_checksum_is_extracted(self): b64_sum = base64.urlsafe_b64encode(bytes.fromhex(hex_sum)).decode("utf-8") self.assertEqual(self.blob.checksum, b64_sum) + def test_checksum_is_extracted_from_text_files(self): + blob = Blob() + blob.unfurl(ContentFile("test")) + self.assertEqual(blob.checksum, "CY9rzUYh03PK3k6DJie09g==") + + def test_non_django_file_is_unfurled(self): + blob = Blob() + blob.unfurl(BytesIO(b"test")) + self.assertIsNone(blob.filename) + self.assertEqual(blob.mime_type, anchor_settings.DEFAULT_MIME_TYPE) + self.assertEqual(blob.byte_size, 4) + self.assertIsNotNone(blob.checksum) + + def test_is_image(self): + text = Blob() + text.upload(ContentFile(b"test", name="test.txt")) + self.assertFalse(text.is_image) + + image = self.blob + self.assertTrue(image.is_image) + class TestBlobKeys(SimpleTestCase): def test_prefix_with_no_prefix(self): @@ -96,8 +125,12 @@ def test_key_and_prefix_can_be_set(self): blob = Blob(key="test", prefix="test2") self.assertEqual(blob.key, "test2/test") + def test_str(self): + blob = Blob(key="test") + self.assertEqual(str(blob), blob.pk) -class TestBlobUploads(SimpleTestCase): + +class TestBlobsBehaveLikeFiles(SimpleTestCase): def test_upload_file(self): blob = Blob() blob.upload(File(BytesIO(b"test"), name="text.txt")) @@ -120,6 +153,18 @@ def test_upload_image_to_r2(self): blob.upload(File(f, name="image.png")) self.assertTrue(blob.key.startswith("test/")) + def test_open(self): + blob = Blob() + blob.upload(ContentFile(b"test", name="test.txt")) + with blob.open() as f: + self.assertEqual(f.read(), b"test") + + def test_purge(self): + blob = Blob() + blob.upload(ContentFile(b"test", name="test.txt")) + blob.purge() + self.assertFalse(blob.service.exists(blob.key)) + class TestBlobURLs(SimpleTestCase): def test_urls_are_generated(self): @@ -136,3 +181,80 @@ def test_urls_are_generated_for_r2(self): self.assertIsNotNone(url) response = requests.get(url) self.assertEqual(response.status_code, 200) + + +class TestBlobQuerySet(TestCase): + def test_get_signed(self): + blob = Blob.objects.create(filename="test.png") + signed_id = blob.signed_id + self.assertEqual(blob, Blob.objects.get_signed(signed_id)) + + def test_unattached_returns_all_unattached_blobs(self): + blob = Blob.objects.create(filename="unattached_blob.png") + attached_blob = Blob.objects.create(filename="attached_blob.png") + Attachment.objects.create(blob=attached_blob, content_object=blob, name="test") + self.assertEqual(blob.attachments.count(), 0) + self.assertEqual(attached_blob.attachments.count(), 1) + self.assertListEqual(list(Blob.objects.unattached()), [blob]) + + def test_unattached_is_chainable(self): + blob = Blob.objects.create(filename="test.png") + old_blob = Blob.objects.create( + filename="test2.png", + created_at=timezone.now() - timezone.timedelta(days=10), + ) + + attached_blob = Blob.objects.create( + filename="test3.png", + created_at=timezone.now(), + ) + Attachment.objects.create(blob=attached_blob, content_object=blob, name="test") + self.assertEqual(blob.attachments.count(), 0) + self.assertEqual(old_blob.attachments.count(), 0) + self.assertEqual(attached_blob.attachments.count(), 1) + + self.assertListEqual(list(Blob.objects.unattached()), [blob, old_blob]) + + self.assertLess( + old_blob.created_at, timezone.now() - timezone.timedelta(days=1) + ) + self.assertListEqual( + list( + Blob.objects.unattached().filter( + created_at__gt=timezone.now() - timezone.timedelta(days=1) + ) + ), + [blob], + ) + + def test_create(self): + blob = Blob.objects.create(filename="test.png") + self.assertIsNotNone(blob.key) + self.assertIsNotNone(blob.created_at) + self.assertIsNotNone(blob.updated_at) + + def test_create_with_file(self): + blob = Blob.objects.create(filename="test.png", file=ContentFile(b"test")) + self.assertIsNotNone(blob.key) + self.assertIsNotNone(blob.created_at) + self.assertIsNotNone(blob.updated_at) + + +class TestBlobCustomMetadata(SimpleTestCase): + def test_custom_metadata_works(self): + blob = Blob() + blob.custom_metadata = {"test": "test"} + self.assertEqual(blob.custom_metadata, {"test": "test"}) + + blob.metadata = None + self.assertEqual(blob.custom_metadata, dict()) + + blob.custom_metadata = {"hello": "world"} + self.assertEqual(blob.custom_metadata, {"hello": "world"}) + + def test_custom_metadata_does_not_overwrite_existing_metadata(self): + blob = Blob() + blob.metadata = {"test": "hello"} + blob.custom_metadata = {"test": "world"} + self.assertEqual(blob.metadata["test"], "hello") + self.assertEqual(blob.custom_metadata["test"], "world") diff --git a/tests/models/test_variant.py b/tests/models/test_variant.py index c9c57d6..d02c9bc 100644 --- a/tests/models/test_variant.py +++ b/tests/models/test_variant.py @@ -21,6 +21,10 @@ def test_service(self): v = Variant(self.blob, {"format": "png"}) self.assertEqual(type(v.service), type(self.blob.service)) + def test_url(self): + v = Variant(self.blob, {"format": "png"}) + self.assertEqual(v.url, self.blob.service.url(v.key)) + def test_delete_does_not_fail_if_variant_does_not_exist(self): v = Variant(self.blob, {"format": "png"}) v.delete() diff --git a/tests/services/transformers/test_image_transformer.py b/tests/services/transformers/test_image_transformer.py index c320de6..b51d344 100644 --- a/tests/services/transformers/test_image_transformer.py +++ b/tests/services/transformers/test_image_transformer.py @@ -1,8 +1,24 @@ from django.test import SimpleTestCase +from anchor.services.processors.base import BaseProcessor from anchor.services.transformers.image import ImageTransformer +class DummyProcessor(BaseProcessor): + def __init__(self): + self.dummy_call_count = 0 + + def source(self, *args, **kwargs): + pass + + def save(self, *args, **kwargs): + pass + + def dummy(self, arg1, arg2=10): + self.dummy_call_count += 1 + return self + + class TestImageTransformer(SimpleTestCase): def setUp(self): self.image = open("tests/fixtures/garlic.png", mode="rb") @@ -21,3 +37,22 @@ def test_transform(self): transformer = ImageTransformer({}) with transformer.transform(self.image, "png") as temp: self.assertGreater(len(temp.read()), 0) + + def test_missing_transformation(self): + transformer = ImageTransformer({"missing_transform": 10}) + with self.assertRaises(ValueError): + transformer.process(self.image, "png") + + def test_transformation_with_args(self): + transformer = ImageTransformer( + {"dummy": (1, 2)}, processor_class=DummyProcessor + ) + transformer.process(self.image, "png") + self.assertEqual(transformer.processor.dummy_call_count, 1) + + def test_transformation_with_kwargs(self): + transformer = ImageTransformer( + {"dummy": {"arg1": 1, "arg2": 2}}, processor_class=DummyProcessor + ) + transformer.process(self.image, "png") + self.assertEqual(transformer.processor.dummy_call_count, 1) diff --git a/tests/support/test_base58.py b/tests/support/test_base58.py new file mode 100644 index 0000000..e1ea926 --- /dev/null +++ b/tests/support/test_base58.py @@ -0,0 +1,12 @@ +from django.test import SimpleTestCase + +from anchor.support.base58 import b58encode, b58encode_int + + +class Base58Test(SimpleTestCase): + def test_b58encode_int(self): + self.assertEqual(b58encode_int(0), b"1") + self.assertEqual(b58encode_int(0, default_one=False), b"") + + def test_b58encode(self): + self.assertEqual(b58encode(b"hello"), b"Cn8eVZg") diff --git a/tests/views/test_file_system.py b/tests/views/test_file_system.py index adbd088..82cdd31 100644 --- a/tests/views/test_file_system.py +++ b/tests/views/test_file_system.py @@ -20,3 +20,11 @@ def test_get(self): response = self.client.get(self.blob.url) self.assertEqual(response.status_code, 200) self.assertEqual(response.getvalue(), b"test") + + def test_get_with_missing_file(self): + deleted_blob = Blob.objects.create( + file=ContentFile("deleted", name="deleted.txt") + ) + deleted_blob.purge() + response = self.client.get(deleted_blob.url) + self.assertEqual(response.status_code, 404)