Skip to content

Commit

Permalink
Merge pull request #11 from tjacovich/scix_id
Browse files Browse the repository at this point in the history
Add module to generate SciX public identifiers given input integer.
  • Loading branch information
tjacovich authored Oct 10, 2024
2 parents 79d7d7c + dc70e89 commit a91bb4e
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 2 deletions.
10 changes: 10 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[flake8]
ignore = E203
exclude =
.git,
__pycache__,
docs/source/conf.py,
build,
dist
max-complexity = 10
max-line-length = 79
157 changes: 157 additions & 0 deletions SciXPipelineUtils/scix_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""
base32-crockford
================
A Python module implementing the alternate base32 encoding as described
by Douglas Crockford at: http://www.crockford.com/wrmg/base32.html.
He designed the encoding to:
* Be human and machine readable
* Be compact
* Be error resistant
* Be pronounceable
It uses a symbol set of 10 digits and 22 letters, excluding I, L O and
U. Decoding is not case sensitive, and 'i' and 'l' are converted to '1'
and 'o' is converted to '0'. Encoding uses only upper-case characters.
Hyphens may be present in symbol strings to improve readability, and
are removed when decoding.
A check symbol can be appended to a symbol string to detect errors
within the string.
"""

import re

__all__ = ["encode", "decode", "normalize"]

# The encoded symbol space does not include I, L, O or U
symbols = "0123456789ABCDEFGHJKMNPQRSTVWXYZ"
# These five symbols are exclusively for checksum values
check_symbols = "*~$=U"

encode_symbols = dict((i, ch) for (i, ch) in enumerate(symbols + check_symbols))
decode_symbols = dict((ch, i) for (i, ch) in enumerate(symbols + check_symbols))
normalize_symbols = str.maketrans("IiLlOo", "111100")
valid_symbols = re.compile("^[%s]+[%s]?$" % (symbols, re.escape(check_symbols)))

base = len(symbols)
check_base = len(symbols + check_symbols)


def encode(number, checksum=True, split=4, string_length=12):
"""Encode an integer into a symbol string.
A ValueError is raised on invalid input.
If checksum is set to True, a check symbol will be
calculated and appended to the string.
If split is specified, the string will be divided into
clusters of that size separated by hyphens.
The param string_length causes the returned value to be padded
with 0s if the returned string is shorter than the requested
length (ie. 01 becomes 00000001 for the default string length).
This includes the checksum if specified.
The encoded string is returned.
"""
number = int(number)
if number < 0:
raise ValueError("number '%d' is not a positive integer" % number)

split = int(split)
if split < 0:
raise ValueError("split '%d' is not a positive integer" % split)

check_symbol = ""
if checksum:
check_symbol = encode_symbols[number % check_base]

if number == 0:
symbol_string = "0"

symbol_string = ""
while number > 0:
remainder = number % base
number //= base
symbol_string = encode_symbols[remainder] + symbol_string

symbol_string = str(symbol_string).zfill(string_length - int(checksum))

if split:
chunks = []
for pos in range(0, len(symbol_string), split):
chunks.append(symbol_string[pos : pos + split])
symbol_string = "-".join(chunks)
symbol_string = symbol_string + check_symbol

return symbol_string


def decode(symbol_string, checksum=True, strict=False):
"""Decode an encoded symbol string.
If checksum is set to True, the string is assumed to have a
trailing check symbol which will be validated. If the
checksum validation fails, a ValueError is raised.
If strict is set to True, a ValueError is raised if the
normalization step requires changes to the string.
The decoded string is returned.
"""
symbol_string = normalize(symbol_string, strict=strict)
if checksum:
symbol_string, check_symbol = symbol_string[:-1], symbol_string[-1]

number = 0
for symbol in symbol_string:
number = number * base + decode_symbols[symbol]

if checksum:
check_value = decode_symbols[check_symbol]
modulo = number % check_base
if check_value != modulo:
raise ValueError(
"invalid check symbol '%s' for string '%s'" % (check_symbol, symbol_string)
)

return number


def normalize(symbol_string, strict=False):
"""Normalize an encoded symbol string.
Normalization provides error correction and prepares the
string for decoding. These transformations are applied:
1. Hyphens are removed
2. 'I', 'i', 'L' or 'l' are converted to '1'
3. 'O' or 'o' are converted to '0'
4. All characters are converted to uppercase
A TypeError is raised if an invalid string type is provided.
A ValueError is raised if the normalized string contains
invalid characters.
If the strict parameter is set to True, a ValueError is raised
if any of the above transformations are applied.
The normalized string is returned.
"""

norm_string = symbol_string.replace("-", "").translate(normalize_symbols).upper()

if not valid_symbols.match(norm_string):
raise ValueError("string '%s' contains invalid characters" % norm_string)

if strict and norm_string != symbol_string:
raise ValueError("string '%s' requires normalization" % symbol_string)

return norm_string
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "SciXPipelineUtils"
version = "0.0.2"
version = "0.0.3"
description = "A collection of utilities for the new NASA Science Explorer (NASA SciX) backoffice architecture"
authors = [{ name = "Taylor Jacovich", email = "[email protected]"}]
license = { text = "GPL-3.0" }
Expand Down Expand Up @@ -41,7 +41,7 @@ dev = [
'pytest-cov==4.0.0',
'moto==4.1.3',
'confluent-kafka==1.9.2',
'fastavro==1.7.2',
'fastavro==1.9.7',
]

[tool.pytest.ini_options]
Expand Down
17 changes: 17 additions & 0 deletions tests/test_scix_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from unittest import TestCase

import SciXPipelineUtils.scix_id as scixid


class TestSciXIDImplementation(TestCase):
def test_generate_scixid(self):
test_id = scixid.encode(1000)
self.assertEqual(test_id, "0000-0000-0Z81")
test_int = scixid.decode(test_id)
self.assertEqual(test_int, 1000)

def test_generate_scixid_no_checksum(self):
test_id = scixid.encode(1000, checksum=False)
self.assertEqual(test_id, "0000-0000-00Z8")
test_int = scixid.decode(test_id, checksum=False)
self.assertEqual(test_int, 1000)

0 comments on commit a91bb4e

Please sign in to comment.