diff --git a/.gitignore b/.gitignore index 40c8c4e..1f961bb 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ pyvenv.cfg *.pyc *~ dist/ +.vscode/ diff --git a/magic/compat.py b/magic/compat.py index 07fad45..6ab9400 100644 --- a/magic/compat.py +++ b/magic/compat.py @@ -4,14 +4,12 @@ Python bindings for libmagic ''' -import ctypes - +import threading from collections import namedtuple from ctypes import * from ctypes.util import find_library - from . import loader _libraries = {} @@ -45,13 +43,19 @@ MAGIC_NO_CHECK_BUILTIN = NO_CHECK_BUILTIN = 4173824 +MAGIC_PARAM_INDIR_MAX = PARAM_INDIR_MAX = 0 +MAGIC_PARAM_NAME_MAX = PARAM_NAME_MAX = 1 +MAGIC_PARAM_ELF_PHNUM_MAX = PARAM_ELF_PHNUM_MAX = 2 +MAGIC_PARAM_ELF_SHNUM_MAX = PARAM_ELF_SHNUM_MAX = 3 +MAGIC_PARAM_ELF_NOTES_MAX = PARAM_ELF_NOTES_MAX = 4 +MAGIC_PARAM_REGEX_MAX = PARAM_REGEX_MAX = 5 +MAGIC_PARAM_BYTES_MAX = PARAM_BYTES_MAX = 6 + FileMagic = namedtuple('FileMagic', ('mime_type', 'encoding', 'name')) class magic_set(Structure): pass - - magic_set._fields_ = [] magic_t = POINTER(magic_set) @@ -103,6 +107,14 @@ class magic_set(Structure): _errno.restype = c_int _errno.argtypes = [magic_t] +_getparam = _libraries['magic'].magic_getparam +_getparam.restype = c_int +_getparam.argtypes = [magic_t, c_int, c_void_p] + +_setparam = _libraries['magic'].magic_setparam +_setparam.restype = c_int +_setparam.argtypes = [magic_t, c_int, c_void_p] + class Magic(object): def __init__(self, ms): @@ -228,29 +240,81 @@ def errno(self): """ return _errno(self._magic_t) + def getparam(self, param): + """ + Returns the param value if successful and -1 if the parameter + was unknown. + """ + v = c_int() + i = _getparam(self._magic_t, param, byref(v)) + if i == -1: + return -1 + return v.value + + def setparam(self, param, value): + """ + Returns 0 if successful and -1 if the parameter was unknown. + """ + v = c_int(value) + return _setparam(self._magic_t, param, byref(v)) + def open(flags): """ Returns a magic object on success and None on failure. Flags argument as for setflags. """ - return Magic(_open(flags)) + magic_t = _open(flags) + if magic_t is None: + return None + return Magic(magic_t) # Objects used by `detect_from_` functions -mime_magic = Magic(_open(MAGIC_MIME)) -mime_magic.load() -none_magic = Magic(_open(MAGIC_NONE)) -none_magic.load() +class error(Exception): + pass +class MagicDetect(object): + def __init__(self): + self.mime_magic = open(MAGIC_MIME) + if self.mime_magic is None: + raise error + if self.mime_magic.load() == -1: + self.mime_magic.close() + self.mime_magic = None + raise error + self.none_magic = open(MAGIC_NONE) + if self.none_magic is None: + self.mime_magic.close() + self.mime_magic = None + raise error + if self.none_magic.load() == -1: + self.none_magic.close() + self.none_magic = None + self.mime_magic.close() + self.mime_magic = None + raise error + + def __del__(self): + if self.mime_magic is not None: + self.mime_magic.close() + if self.none_magic is not None: + self.none_magic.close() + +threadlocal = threading.local() + +def _detect_make(): + v = getattr(threadlocal, "magic_instance", None) + if v is None: + v = MagicDetect() + setattr(threadlocal, "magic_instance", v) + return v def _create_filemagic(mime_detected, type_detected): - splat = mime_detected.split('; ') - mime_type = splat[0] - if len(splat) == 2: - mime_encoding = splat[1] - else: - mime_encoding = '' + try: + mime_type, mime_encoding = mime_detected.split('; ') + except ValueError: + raise ValueError(mime_detected) return FileMagic(name=type_detected, mime_type=mime_type, encoding=mime_encoding.replace('charset=', '')) @@ -261,9 +325,9 @@ def detect_from_filename(filename): Returns a `FileMagic` namedtuple. ''' - - return _create_filemagic(mime_magic.file(filename), - none_magic.file(filename)) + x = _detect_make() + return _create_filemagic(x.mime_magic.file(filename), + x.none_magic.file(filename)) def detect_from_fobj(fobj): @@ -273,8 +337,9 @@ def detect_from_fobj(fobj): ''' file_descriptor = fobj.fileno() - return _create_filemagic(mime_magic.descriptor(file_descriptor), - none_magic.descriptor(file_descriptor)) + x = _detect_make() + return _create_filemagic(x.mime_magic.descriptor(file_descriptor), + x.none_magic.descriptor(file_descriptor)) def detect_from_content(byte_content): @@ -283,5 +348,6 @@ def detect_from_content(byte_content): Returns a `FileMagic` namedtuple. ''' - return _create_filemagic(mime_magic.buffer(byte_content), - none_magic.buffer(byte_content)) + x = _detect_make() + return _create_filemagic(x.mime_magic.buffer(byte_content), + x.none_magic.buffer(byte_content)) diff --git a/test/python_magic_test.py b/test/python_magic_test.py index 7ad15c8..b557762 100755 --- a/test/python_magic_test.py +++ b/test/python_magic_test.py @@ -1,3 +1,5 @@ +from dataclasses import dataclass +from enum import Enum import os import os.path import shutil @@ -17,11 +19,140 @@ import magic +@dataclass +class TestFile: + file_name: str + mime_results: list[str] + text_results: list[str] + no_check_elf_results: list[str] | None + buf_equals_file: bool = True # magic_descriptor is broken (?) in centos 7, so don't run those tests SKIP_FROM_DESCRIPTOR = bool(os.environ.get("SKIP_FROM_DESCRIPTOR")) +COMMON_PLAIN = [ + {}, + {"check_soft": True}, + {"check_soft": False}, + {"check_json": True}, + {"check_json": False}, +] + +NO_SOFT = {"check_soft": False} + +COMMON_MIME = [{"mime": True, **k} for k in COMMON_PLAIN] + +CASES = { + "magic._pyc_": [ + (COMMON_MIME, [ + "application/octet-stream", + "text/x-bytecode.python", + "application/x-bytecode.python", + ]), + (COMMON_PLAIN, ["python 2.4 byte-compiled"]), + (NO_SOFT, ["data"]), + ], + "test.pdf": [ + (COMMON_MIME, ["application/pdf"]), + (COMMON_PLAIN, [ + "PDF document, version 1.2", + "PDF document, version 1.2, 2 pages", + "PDF document, version 1.2, 2 page(s)", + ]), + (NO_SOFT, ["ASCII text"]), + ], + "test.gz": [ + (COMMON_MIME, ["application/gzip", "application/x-gzip"]), + (COMMON_PLAIN, [ + 'gzip compressed data, was "test", from Unix, last modified: Sun Jun 29 01:32:52 2008', + 'gzip compressed data, was "test", last modified: Sun Jun 29 01:32:52 2008, from Unix', + 'gzip compressed data, was "test", last modified: Sun Jun 29 01:32:52 2008, from Unix, original size 15', + 'gzip compressed data, was "test", last modified: Sun Jun 29 01:32:52 2008, from Unix, original size modulo 2^32 15', + 'gzip compressed data, was "test", last modified: Sun Jun 29 01:32:52 2008, from Unix, truncated', + ]), + ({"extension": True}, [ + # some versions return '' for the extensions of a gz file, + # including w/ the command line. Who knows... + "gz/tgz/tpz/zabw/svgz/adz/kmy/xcfgz", + "gz/tgz/tpz/zabw/svgz", + "", + "???", + ]), + (NO_SOFT, ["data"]), + ], + "test.snappy.parquet": [ + (COMMON_MIME, ["application/octet-stream"]), + (COMMON_PLAIN, ["Apache Parquet", "Par archive data"]), + (NO_SOFT, ["data"]), + ], + "test.json": [ + # TODO: soft, no_json + (COMMON_MIME, ["application/json"]), + (COMMON_PLAIN, ["JSON text data"]), + ({"mime": True, "check_json": False}, [ + "data", + ]), + (NO_SOFT, ["JSON text data"]) + ], + "elf-NetBSD-x86_64-echo": [ + # TODO: soft, no elf + (COMMON_PLAIN, [ + "ELF 64-bit LSB shared object, x86-64, version 1 (SYSV)", + "ELF 64-bit LSB pie executable, x86-64, version 1 (SYSV), dynamically linked, interpreter /libexec/ld.elf_so, for NetBSD 8.0, not stripped", + ]), + (COMMON_MIME, [ + "application/x-pie-executable", + "application/x-sharedlib", + ]), + ({"check_elf": False}, [ + "ELF 64-bit LSB shared object, x86-64, version 1 (SYSV)", + ]), + # TODO: sometimes + # "ELF 64-bit LSB pie executable, x86-64, version 1 (SYSV), dynamically linked, interpreter /libexec/ld.elf_so, for NetBSD 8.0, not stripped", + + (NO_SOFT, ["data"]), + ], + "test.txt": [ + (COMMON_MIME, ["text/plain"]), + (COMMON_PLAIN, ["ASCII text"]), + ({"mime_encoding": True}, [ + "us-ascii", + ]), + (NO_SOFT, ["ASCII text"]), + ], + "text-iso8859-1.txt": [ + ({"mime_encoding": True}, [ + "iso-8859-1", + ]), + ], + b"\xce\xbb": [ + (COMMON_MIME, ["text/plain"]), + ], + "b\xce\xbb".decode("utf-8"): [ + (COMMON_MIME, ["text/plain"]), + ], + "name_use.jpg": [ + ({"extension": True}, [ + "jpeg/jpg/jpe/jfif" + ]), + ], + "keep-going.jpg": [ + (COMMON_MIME, [ + "image/jpeg" + ]), + ({"mime": True, "keep_going": True}, [ + "image/jpeg\\012- application/octet-stream", + ]) + ], + "test.py": [ + (COMMON_MIME, [ + "text/x-python", + "text/x-script.python", + ]) + ] +} + class MagicTest(unittest.TestCase): TESTDATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "testdata")) @@ -34,26 +165,6 @@ def test_version(self): def test_fs_encoding(self): self.assertEqual("utf-8", sys.getfilesystemencoding().lower()) - def assert_values(self, m, expected_values, buf_equals_file=True): - for filename, expected_value in expected_values.items(): - try: - filename = os.path.join(self.TESTDATA_DIR, filename) - except TypeError: - filename = os.path.join(self.TESTDATA_DIR.encode("utf-8"), filename) - - if type(expected_value) is not tuple: - expected_value = (expected_value,) - - with open(filename, "rb") as f: - buf_value = m.from_buffer(f.read()) - - file_value = m.from_file(filename) - - if buf_equals_file: - self.assertEqual(buf_value, file_value) - - for value in (buf_value, file_value): - self.assertIn(value, expected_value) def test_from_file_str_and_bytes(self): filename = os.path.join(self.TESTDATA_DIR, "test.pdf") @@ -63,203 +174,34 @@ def test_from_file_str_and_bytes(self): "application/pdf", magic.from_file(filename.encode("utf-8"), mime=True) ) - def test_from_descriptor_str_and_bytes(self): - if SKIP_FROM_DESCRIPTOR: - self.skipTest("magic_descriptor is broken in this version of libmagic") - - filename = os.path.join(self.TESTDATA_DIR, "test.pdf") - with open(filename) as f: - self.assertEqual( - "application/pdf", magic.from_descriptor(f.fileno(), mime=True) - ) - self.assertEqual( - "application/pdf", magic.from_descriptor(f.fileno(), mime=True) - ) - - def test_from_buffer_str_and_bytes(self): - if SKIP_FROM_DESCRIPTOR: - self.skipTest("magic_descriptor is broken in this version of libmagic") - m = magic.Magic(mime=True) - - self.assertTrue( - m.from_buffer('#!/usr/bin/env python\nprint("foo")') - in ("text/x-python", "text/x-script.python") - ) - self.assertTrue( - m.from_buffer(b'#!/usr/bin/env python\nprint("foo")') - in ("text/x-python", "text/x-script.python") - ) - def test_mime_types(self): + def test_all_cases(self): + # TODO: + # * MAGIC_EXTENSION not supported + # * keep_going not supported + # * buffer checks dest = os.path.join(MagicTest.TESTDATA_DIR, b"\xce\xbb".decode("utf-8")) shutil.copyfile(os.path.join(MagicTest.TESTDATA_DIR, "lambda"), dest) + os.environ["TZ"] = "UTC" try: - m = magic.Magic(mime=True) - self.assert_values( - m, - { - "elf-NetBSD-x86_64-echo": ( - "application/x-pie-executable", - "application/x-sharedlib", - ), - "magic._pyc_": ( - "application/octet-stream", - "text/x-bytecode.python", - "application/x-bytecode.python", - ), - "test.pdf": "application/pdf", - "test.gz": ("application/gzip", "application/x-gzip"), - "test.snappy.parquet": "application/octet-stream", - "text.txt": "text/plain", - b"\xce\xbb".decode("utf-8"): "text/plain", - b"\xce\xbb": "text/plain", - "test.json": "application/json", - }, - buf_equals_file=False, - ) - finally: - os.unlink(dest) + for file_name, cases in CASES: + filename = os.path.join(self.TESTDATA_DIR, file_name) + for flags, outputs in cases: + m = magic.Magic(**flags) + with open(filename) as f: + self.assertIn(m.from_descriptor(f.fileno()), outputs) - # TODO: Fix this failing test on Ubuntu - @pytest.mark.skipif(sys.platform == "linux", reason="'JSON data' not found") - def test_descriptions(self): - m = magic.Magic() - os.environ["TZ"] = "UTC" # To get last modified date of test.gz in UTC - try: - self.assert_values( - m, - { - "elf-NetBSD-x86_64-echo": ( - "ELF 64-bit LSB shared object, x86-64, version 1 (SYSV)", - "ELF 64-bit LSB pie executable, x86-64, version 1 (SYSV), dynamically linked, interpreter /libexec/ld.elf_so, for NetBSD 8.0, not stripped", - ), - "magic._pyc_": "python 2.4 byte-compiled", - "test.pdf": ( - "PDF document, version 1.2", - "PDF document, version 1.2, 2 pages", - "PDF document, version 1.2, 2 page(s)", - ), - "test.gz": ( - 'gzip compressed data, was "test", from Unix, last ' - "modified: Sun Jun 29 01:32:52 2008", - 'gzip compressed data, was "test", last modified' - ": Sun Jun 29 01:32:52 2008, from Unix", - 'gzip compressed data, was "test", last modified' - ": Sun Jun 29 01:32:52 2008, from Unix, original size 15", - 'gzip compressed data, was "test", ' - "last modified: Sun Jun 29 01:32:52 2008, " - "from Unix, original size modulo 2^32 15", - 'gzip compressed data, was "test", last modified' - ": Sun Jun 29 01:32:52 2008, from Unix, truncated", - ), - "text.txt": "ASCII text", - "test.snappy.parquet": ("Apache Parquet", "Par archive data"), - "test.json": "JSON text data", - }, - buf_equals_file=False, - ) - finally: - del os.environ["TZ"] - - # TODO: Fix this failing test on Ubuntu - @pytest.mark.skipif(sys.platform == "linux", reason="'JSON data' not found") - def test_descriptions_no_soft(self): - m = magic.Magic(check_soft=False) - self.assert_values( - m, - { - "elf-NetBSD-x86_64-echo": ( - "data", - "ELF 64-bit LSB pie executable, x86-64, version 1 (SYSV), dynamically linked, interpreter /libexec/ld.elf_so, for NetBSD 8.0, not stripped", - ), - "magic._pyc_": "data", - "test.pdf": "ASCII text", - "test.gz": "data", - "text.txt": "ASCII text", - "test.snappy.parquet": "data", - "test.json": "JSON text data", - }, - buf_equals_file=False, - ) + self.assertIn(m.from_file(filename), outputs) - def test_descriptions_no_elf(self): - m = magic.Magic(check_elf=False) - self.assert_values( - m, - { - "elf-NetBSD-x86_64-echo": "ELF 64-bit LSB shared object, x86-64, version 1 (SYSV)", - }, - buf_equals_file=True, - ) - - def test_descriptions_no_json(self): - m = magic.Magic(check_elf=False) - self.assert_values( - m, - { - "test.json": "data", - }, - buf_equals_file=True, - ) + fname_bytes = filename.encode("utf-8") + self.assertIn(m.from_file(fname_bytes), outputs) - def test_descriptions_no_json_unchanged(self): - # verify non-json results are unchanged - m = magic.Magic(check_json=False) - os.environ["TZ"] = "UTC" # To get last modified date of test.gz in UTC - try: - self.assert_values( - m, - { - "elf-NetBSD-x86_64-echo": ( - "ELF 64-bit LSB shared object, x86-64, version 1 (SYSV)", - "ELF 64-bit LSB pie executable, x86-64, version 1 (SYSV), dynamically linked, interpreter /libexec/ld.elf_so, for NetBSD 8.0, not stripped", - ), - "magic._pyc_": "python 2.4 byte-compiled", - "test.pdf": ( - "PDF document, version 1.2", - "PDF document, version 1.2, 2 pages", - "PDF document, version 1.2, 2 page(s)", - ), - "test.gz": ( - 'gzip compressed data, was "test", from Unix, last ' - "modified: Sun Jun 29 01:32:52 2008", - 'gzip compressed data, was "test", last modified' - ": Sun Jun 29 01:32:52 2008, from Unix", - 'gzip compressed data, was "test", last modified' - ": Sun Jun 29 01:32:52 2008, from Unix, original size 15", - 'gzip compressed data, was "test", ' - "last modified: Sun Jun 29 01:32:52 2008, " - "from Unix, original size modulo 2^32 15", - 'gzip compressed data, was "test", last modified' - ": Sun Jun 29 01:32:52 2008, from Unix, truncated", - ), - "text.txt": "ASCII text", - "test.snappy.parquet": ("Apache Parquet", "Par archive data"), - }, - buf_equals_file=False, - ) + with open(file_name, "rb") as f: + buf_result = m.from_buffer(f.read(1024)) + self.assertIn(buf_result, outputs) finally: del os.environ["TZ"] - - def test_extension(self): - try: - m = magic.Magic(extension=True) - self.assert_values( - m, - { - # some versions return '' for the extensions of a gz file, - # including w/ the command line. Who knows... - "test.gz": ( - "gz/tgz/tpz/zabw/svgz/adz/kmy/xcfgz", - "gz/tgz/tpz/zabw/svgz", - "", - "???", - ), - "name_use.jpg": "jpeg/jpg/jpe/jfif", - }, - ) - except NotImplementedError: - self.skipTest("MAGIC_EXTENSION not supported in this version") + os.unlink(dest) def test_unicode_result_nonraw(self): m = magic.Magic(raw=False) @@ -280,15 +222,6 @@ def test_unicode_result_raw(self): else: raise unittest.SkipTest("Magic file doesn't return expected type.") - def test_mime_encodings(self): - m = magic.Magic(mime_encoding=True) - self.assert_values( - m, - { - "text-iso8859-1.txt": "iso-8859-1", - "text.txt": "us-ascii", - }, - ) def test_errors(self): m = magic.Magic() @@ -300,22 +233,6 @@ def test_errors(self): finally: del os.environ["MAGIC"] - def test_keep_going(self): - filename = os.path.join(self.TESTDATA_DIR, "keep-going.jpg") - - m = magic.Magic(mime=True) - self.assertEqual(m.from_file(filename), "image/jpeg") - - try: - # this will throw if you have an "old" version of the library - # I'm otherwise not sure how to query if keep_going is supported - magic.version() - m = magic.Magic(mime=True, keep_going=True) - self.assertEqual( - m.from_file(filename), "image/jpeg\\012- application/octet-stream" - ) - except NotImplementedError: - pass def test_rethrow(self): old = magic.magic_buffer