diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..e151dac --- /dev/null +++ b/.flake8 @@ -0,0 +1,10 @@ +[flake8] +ignore = E203 +exclude = + .git, + __pycache__, + docs/source/conf.py, + build, + dist +max-complexity = 10 +max-line-length = 79 diff --git a/SciXPipelineUtils/scix_id.py b/SciXPipelineUtils/scix_id.py new file mode 100644 index 0000000..57084b6 --- /dev/null +++ b/SciXPipelineUtils/scix_id.py @@ -0,0 +1,157 @@ +""" +base32-crockford +================ + +A Python module implementing the alternate base32 encoding as described +by Douglas Crockford at: http://www.crockford.com/wrmg/base32.html. + +He designed the encoding to: + + * Be human and machine readable + * Be compact + * Be error resistant + * Be pronounceable + +It uses a symbol set of 10 digits and 22 letters, excluding I, L O and +U. Decoding is not case sensitive, and 'i' and 'l' are converted to '1' +and 'o' is converted to '0'. Encoding uses only upper-case characters. + +Hyphens may be present in symbol strings to improve readability, and +are removed when decoding. + +A check symbol can be appended to a symbol string to detect errors +within the string. + +""" + +import re + +__all__ = ["encode", "decode", "normalize"] + +# The encoded symbol space does not include I, L, O or U +symbols = "0123456789ABCDEFGHJKMNPQRSTVWXYZ" +# These five symbols are exclusively for checksum values +check_symbols = "*~$=U" + +encode_symbols = dict((i, ch) for (i, ch) in enumerate(symbols + check_symbols)) +decode_symbols = dict((ch, i) for (i, ch) in enumerate(symbols + check_symbols)) +normalize_symbols = str.maketrans("IiLlOo", "111100") +valid_symbols = re.compile("^[%s]+[%s]?$" % (symbols, re.escape(check_symbols))) + +base = len(symbols) +check_base = len(symbols + check_symbols) + + +def encode(number, checksum=True, split=4, string_length=12): + """Encode an integer into a symbol string. + + A ValueError is raised on invalid input. + + If checksum is set to True, a check symbol will be + calculated and appended to the string. + + If split is specified, the string will be divided into + clusters of that size separated by hyphens. + + The param string_length causes the returned value to be padded + with 0s if the returned string is shorter than the requested + length (ie. 01 becomes 00000001 for the default string length). + This includes the checksum if specified. + + The encoded string is returned. + """ + number = int(number) + if number < 0: + raise ValueError("number '%d' is not a positive integer" % number) + + split = int(split) + if split < 0: + raise ValueError("split '%d' is not a positive integer" % split) + + check_symbol = "" + if checksum: + check_symbol = encode_symbols[number % check_base] + + if number == 0: + symbol_string = "0" + + symbol_string = "" + while number > 0: + remainder = number % base + number //= base + symbol_string = encode_symbols[remainder] + symbol_string + + symbol_string = str(symbol_string).zfill(string_length - int(checksum)) + + if split: + chunks = [] + for pos in range(0, len(symbol_string), split): + chunks.append(symbol_string[pos : pos + split]) + symbol_string = "-".join(chunks) + symbol_string = symbol_string + check_symbol + + return symbol_string + + +def decode(symbol_string, checksum=True, strict=False): + """Decode an encoded symbol string. + + If checksum is set to True, the string is assumed to have a + trailing check symbol which will be validated. If the + checksum validation fails, a ValueError is raised. + + If strict is set to True, a ValueError is raised if the + normalization step requires changes to the string. + + The decoded string is returned. + """ + symbol_string = normalize(symbol_string, strict=strict) + if checksum: + symbol_string, check_symbol = symbol_string[:-1], symbol_string[-1] + + number = 0 + for symbol in symbol_string: + number = number * base + decode_symbols[symbol] + + if checksum: + check_value = decode_symbols[check_symbol] + modulo = number % check_base + if check_value != modulo: + raise ValueError( + "invalid check symbol '%s' for string '%s'" % (check_symbol, symbol_string) + ) + + return number + + +def normalize(symbol_string, strict=False): + """Normalize an encoded symbol string. + + Normalization provides error correction and prepares the + string for decoding. These transformations are applied: + + 1. Hyphens are removed + 2. 'I', 'i', 'L' or 'l' are converted to '1' + 3. 'O' or 'o' are converted to '0' + 4. All characters are converted to uppercase + + A TypeError is raised if an invalid string type is provided. + + A ValueError is raised if the normalized string contains + invalid characters. + + If the strict parameter is set to True, a ValueError is raised + if any of the above transformations are applied. + + The normalized string is returned. + """ + + norm_string = symbol_string.replace("-", "").translate(normalize_symbols).upper() + + if not valid_symbols.match(norm_string): + raise ValueError("string '%s' contains invalid characters" % norm_string) + + if strict and norm_string != symbol_string: + raise ValueError("string '%s' requires normalization" % symbol_string) + + return norm_string diff --git a/pyproject.toml b/pyproject.toml index c9f732c..afa4159 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "SciXPipelineUtils" -version = "0.0.2" +version = "0.0.3" description = "A collection of utilities for the new NASA Science Explorer (NASA SciX) backoffice architecture" authors = [{ name = "Taylor Jacovich", email = "tjacovich@cfa.harvard.edu"}] license = { text = "GPL-3.0" } @@ -41,7 +41,7 @@ dev = [ 'pytest-cov==4.0.0', 'moto==4.1.3', 'confluent-kafka==1.9.2', - 'fastavro==1.7.2', + 'fastavro==1.9.7', ] [tool.pytest.ini_options] diff --git a/tests/test_scix_id.py b/tests/test_scix_id.py new file mode 100644 index 0000000..80e140d --- /dev/null +++ b/tests/test_scix_id.py @@ -0,0 +1,17 @@ +from unittest import TestCase + +import SciXPipelineUtils.scix_id as scixid + + +class TestSciXIDImplementation(TestCase): + def test_generate_scixid(self): + test_id = scixid.encode(1000) + self.assertEqual(test_id, "0000-0000-0Z81") + test_int = scixid.decode(test_id) + self.assertEqual(test_int, 1000) + + def test_generate_scixid_no_checksum(self): + test_id = scixid.encode(1000, checksum=False) + self.assertEqual(test_id, "0000-0000-00Z8") + test_int = scixid.decode(test_id, checksum=False) + self.assertEqual(test_int, 1000)