Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented signature verification for UIC 2.0 tickets #19

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 45 additions & 21 deletions onlineticket.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

try: # pip install pycryptodome
from Cryptodome.Hash import SHA1
from Cryptodome.Hash import SHA224
from Cryptodome.Hash import SHA256
from Cryptodome.PublicKey import DSA
from Cryptodome.Signature import DSS
from Cryptodome.Math.Numbers import Integer
Expand All @@ -26,12 +28,12 @@
exit(1)
except:
logger.warning('signature verification is disabled due to missing pycryptodome package.')
SHA1, DSA, DSS, Integer = None, None, None, None
SHA1, SHA224, SHA256, DSA, DSS, Integer = None, None, None, None, None, None

try: # pip install pyasn1
import pyasn1.codec.der.decoder as asn1
except:
logger.info('signature verification is disabled due to missing pyasn1 package.')
logger.info('UIC 1.0 signature verification is disabled due to missing pyasn1 package.')
asn1 = None

#utils
Expand Down Expand Up @@ -433,19 +435,30 @@ def get_pubkey(issuer, keyid):

get_pubkey.certs = None

def verifysig(message, signature, pubkey):
if DSS is None or asn1 is None: # pycryptodome package is missing
raise SignatureVerificationError('Signature verification disabled')
def verifysig(message, version, signature, pubkey):
if DSS is None:
raise SignatureVerificationError('Signature verification disabled (pycryptodome package is missing)')
if not signature:
raise SignatureVerificationError('Signature asn1 parsing error.')
raise SignatureVerificationError('Signature parsing error.')

r, s = signature

rbytes = Integer(r).to_bytes()
sbytes = Integer(s).to_bytes()
if version <= 1:
if asn1 is None:
raise SignatureVerificationError('Signature verification disabled (pyasn1 package is missing)')
rbytes = int.to_bytes(r, 20, byteorder='big')
sbytes = int.to_bytes(s, 20, byteorder='big')
h = SHA1.new(message)
else:
rbytes = int.to_bytes(r, 32, byteorder='little')
sbytes = int.to_bytes(s, 32, byteorder='little')
# TODO: According to this document, the payload can be SHA224 or SHA256. So do we really need to verify the signature twice?!
# https://www.kcd-nrw.de/fileadmin/user_upload/01_Ergebnisdokument_Deutschlandticket_UIC__V1.02.pdf
# Hardcoded to SHA256 for now, because the October 2023 and December 2023 Deutsche Bahn tickets all have SHA256
#h = SHA224.new(message)
h = SHA256.new(message)

verifykey = DSA.import_key(pubkey)
h = SHA1.new(message)
verifier = DSS.new(verifykey, 'fips-186-3')

try:
Expand All @@ -456,15 +469,26 @@ def verifysig(message, signature, pubkey):

class OT(DataBlock):
def signature_decode(self, res):
'''Parses the asn1 signature and extracts the (r,s) tuple.'''
if not asn1: return None
signature_length = 50 if int(res['version']) <= 1 else 64
signature_bytes = self.read(signature_length)
try:
decoded = asn1.decode(signature_bytes)[0]
except Exception as e:
return (repr(e), signature_bytes)
return (int(decoded[0]), int(decoded[1]))
'''Extracts the signature (r,s) tuple.'''
if int(res['version']) <= 1:
# UIC 1.0: (r,s) are stored in an ASN 1.0 structure
# TODO: Is this code correct? How can we know that the ASN.1 structure will be exactly 50 bytes, if (r,s) can have different lengths? Or is somewhere specified that there is a padding after the ASN.1 structure?
if not asn1: return None
signature_length = 50
signature_bytes = self.read(signature_length)
try:
decoded = asn1.decode(signature_bytes)[0]
except Exception as e:
return (repr(e), signature_bytes)
return (int(decoded[0]), int(decoded[1]))
else:
# "Die Werte bei Version 2 müssen zwingend 32 Byte groß sein und nötigenfalls mit vorangestellten Nullbytes aufgefüllt werden."
decoded = [0, 0]
decoded[0] = self.read(32)
decoded[0] = int.from_bytes(decoded[0], byteorder='little', signed=False)
decoded[1] = self.read(32)
decoded[1] = int.from_bytes(decoded[1], byteorder='little', signed=False)
return (int(decoded[0]), int(decoded[1]))

def signature_validity(self, res):
if len(self.stream) - self.offset - res['data_length'] > 0:
Expand All @@ -476,7 +500,7 @@ def signature_validity(self, res):
try:
pubkey = get_pubkey(issuer=res['carrier'],
keyid=res['key_id'])
result = verifysig(self.stream[self.offset:], res['signature'], pubkey)
result = verifysig(self.stream[self.offset:], int(res['version']), res['signature'], pubkey)
except SignatureVerificationError as e:
return str(e)

Expand All @@ -499,6 +523,7 @@ def signature_validity(self, res):


def read_block(data, offset):
# TODO: Decode UIC 2.0 U_FLEX (encoded in ASN.1 U-PER). ASN.1 Definitions here: https://github.com/UnionInternationalCheminsdeFer/UIC-barcode/blob/master/misc/uicRailTicketData_v1.3.4.asn
block_types = {b'U_HEAD': OT_U_HEAD,
b'U_TLAY': OT_U_TLAY,
b'0080ID': OT_0080ID,
Expand Down Expand Up @@ -578,7 +603,7 @@ def fix_zxing(data):
'preprocessing. Rerun the script with the --zxing (or --auto-zxing) flag.\n', file=sys.stderr)
if not ot or not args.auto_zxing:
raise

print(ot)
ots.setdefault(ticket, []).append(ot)

Expand All @@ -587,4 +612,3 @@ def fix_zxing(data):
#tickets = reduce(list.__add__, ots.values())
#tickets.sort(lambda a, b: cmp(a.data['ticket'][0].data['creation_date'], b.data['ticket'][0].data['creation_date']))
#print(list_str(tickets))