Skip to content

Commit

Permalink
Move _remove_sig out of top level namespace
Browse files Browse the repository at this point in the history
Also sort imports with isort and use standard flake8 configuration
  • Loading branch information
kislyuk committed Aug 20, 2022
1 parent f783e93 commit 2ddaaf9
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 77 deletions.
1 change: 0 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
universal=1
[flake8]
max-line-length=120
ignore: E301, E302, E305, E401, E226, F841
[coverage:run]
omit =
signxml/__pyinstaller/*
Expand Down
84 changes: 36 additions & 48 deletions signxml/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
from base64 import b64encode, b64decode
from base64 import b64decode, b64encode
from collections import namedtuple
from enum import Enum

from lxml import etree
from lxml.etree import Element, SubElement

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa, utils
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.hashes import Hash, SHA1, SHA224, SHA256, SHA384, SHA512
from cryptography.hazmat.primitives.hashes import SHA1, SHA224, SHA256, SHA384, SHA512, Hash
from cryptography.hazmat.primitives.serialization import load_der_public_key
from cryptography.hazmat.backends import default_backend
from lxml import etree
from lxml.etree import Element, SubElement

from .exceptions import InvalidSignature, InvalidDigest, InvalidInput, InvalidCertificate # noqa
from .util import (bytes_to_long, long_to_bytes, strip_pem_header, add_pem_header, ensure_bytes, ensure_str, Namespace,
XMLProcessor, iterate_pem, verify_x509_cert_chain, bits_to_bytes_unit)
from collections import namedtuple
from .exceptions import InvalidCertificate, InvalidDigest, InvalidInput, InvalidSignature # noqa
from .util import (
Namespace,
XMLProcessor,
_remove_sig,
add_pem_header,
bits_to_bytes_unit,
bytes_to_long,
ensure_bytes,
ensure_str,
iterate_pem,
long_to_bytes,
strip_pem_header,
verify_x509_cert_chain,
)

methods = Enum("Methods", "enveloped enveloping detached")

Expand All @@ -27,46 +38,18 @@
xenc11="http://www.w3.org/2009/xmlenc11#"
)


def ds_tag(tag):
return "{" + namespaces.ds + "}" + tag


def dsig11_tag(tag):
return "{" + namespaces.dsig11 + "}" + tag


def ec_tag(tag):
return "{" + namespaces.ec + "}" + tag

def _remove_sig(signature, idempotent=False):
"""
Remove the signature node from its parent, keeping any tail element.
This is needed for eneveloped signatures.
:param signature: Signature to remove from payload
:type signature: XML ElementTree Element
:param idempotent:
If True, don't raise an error if signature is already detached from parent.
:type idempotent: boolean
"""
try:
signaturep = next(signature.iterancestors())
except StopIteration:
if idempotent:
return
raise ValueError("Can't remove the root signature node")
if signature.tail is not None:
try:
signatures = next(signature.itersiblings(preceding=True))
except StopIteration:
if signaturep.text is not None:
signaturep.text = signaturep.text + signature.tail
else:
signaturep.text = signature.tail
else:
if signatures.tail is not None:
signatures.tail = signatures.tail + signature.tail
else:
signatures.tail = signature.tail
signaturep.remove(signature)

class VerifyResult(namedtuple("VerifyResult", "signed_data signed_xml signature_xml")):
"""
Expand All @@ -84,6 +67,7 @@ class VerifyResult(namedtuple("VerifyResult", "signed_data signed_xml signature_
verified_data = signxml.XMLVerifier().verify(input_data).signed_xml
"""


class XMLSignatureProcessor(XMLProcessor):
schema_file = "xmldsig1-schema.xsd"

Expand Down Expand Up @@ -245,6 +229,7 @@ def _resolve_reference(self, doc_root, reference, uri_resolver=None):
raise InvalidInput("Unable to resolve reference URI: {}".format(uri))
return result


class XMLSigner(XMLSignatureProcessor):
"""
Create a new XML Signature Signer object, which can be used to hold configuration information and sign multiple
Expand Down Expand Up @@ -427,7 +412,7 @@ def sign(self, data, key=None, passphrase=None, cert=None, reference_uri=None, k
if isinstance(cert, (str, bytes)):
x509_certificate.text = strip_pem_header(cert)
else:
from OpenSSL.crypto import dump_certificate, FILETYPE_PEM
from OpenSSL.crypto import FILETYPE_PEM, dump_certificate
x509_certificate.text = strip_pem_header(dump_certificate(FILETYPE_PEM, cert))
else:
sig_root.append(key_info)
Expand Down Expand Up @@ -568,13 +553,14 @@ def _serialize_key_value(self, key, key_info_element):
e.text = ensure_str(b64encode(long_to_bytes(getattr(key_params, field))))
elif self.sign_alg.startswith("ecdsa-"):
ec_key_value = SubElement(key_value, dsig11_tag("ECKeyValue"), nsmap=dict(dsig11=namespaces.dsig11))
named_curve = SubElement(ec_key_value, dsig11_tag("NamedCurve"),
named_curve = SubElement(ec_key_value, dsig11_tag("NamedCurve"), # noqa:F841
URI=self.known_ecdsa_curve_oids[key.curve.name])
public_key = SubElement(ec_key_value, dsig11_tag("PublicKey"))
x = key.public_key().public_numbers().x
y = key.public_key().public_numbers().y
public_key.text = ensure_str(b64encode(long_to_bytes(4) + long_to_bytes(x) + long_to_bytes(y)))


class XMLVerifier(XMLSignatureProcessor):
"""
Create a new XML Signature Verifier object, which can be used to hold configuration information and verify multiple
Expand All @@ -596,8 +582,8 @@ def _verify_signature_with_pubkey(self, signed_info_c14n, raw_signature, key_val
named_curve = self._find(ec_key_value, "NamedCurve", namespace="dsig11")
public_key = self._find(ec_key_value, "PublicKey", namespace="dsig11")
key_data = b64decode(public_key.text)[1:]
x = bytes_to_long(key_data[:len(key_data)//2])
y = bytes_to_long(key_data[len(key_data)//2:])
x = bytes_to_long(key_data[:len(key_data) // 2])
y = bytes_to_long(key_data[len(key_data) // 2:])
curve_class = self.known_ecdsa_curves[named_curve.get("URI")]
key = ec.EllipticCurvePublicNumbers(x=x, y=y, curve=curve_class()).public_key(backend=default_backend())
elif not isinstance(key, ec.EllipticCurvePublicKey):
Expand Down Expand Up @@ -818,7 +804,9 @@ def verify(self, data, require_x509=True, x509_cert=None, cert_subject_name=None
inclusive_ns_prefixes=inclusive_ns_prefixes)

if x509_data is not None or self.require_x509:
from OpenSSL.crypto import load_certificate, X509, FILETYPE_PEM, verify, Error as OpenSSLCryptoError
from OpenSSL.crypto import FILETYPE_PEM, X509
from OpenSSL.crypto import Error as OpenSSLCryptoError
from OpenSSL.crypto import load_certificate, verify

if self.x509_cert is None:
if x509_data is None:
Expand Down Expand Up @@ -939,8 +927,8 @@ def check_key_value_matches_cert_public_key(self, key_value, public_key, signatu
named_curve = self._find(ec_key_value, "NamedCurve", namespace="dsig11")
public_key = self._find(ec_key_value, "PublicKey", namespace="dsig11")
key_data = b64decode(public_key.text)[1:]
x = bytes_to_long(key_data[:len(key_data)//2])
y = bytes_to_long(key_data[len(key_data)//2:])
x = bytes_to_long(key_data[:len(key_data) // 2])
y = bytes_to_long(key_data[len(key_data) // 2:])
curve_class = self.known_ecdsa_curves[named_curve.get("URI")]

pubk_curve = public_key.to_cryptography_key().public_numbers().curve
Expand Down
5 changes: 5 additions & 0 deletions signxml/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,28 @@

import cryptography.exceptions


class InvalidSignature(cryptography.exceptions.InvalidSignature):
"""
Raised when signature validation fails.
"""


class InvalidDigest(InvalidSignature):
"""
Raised when digest validation fails (causing the signature to be untrusted).
"""


class InvalidCertificate(InvalidSignature):
"""
Raised when certificate validation fails.
"""


class InvalidInput(ValueError):
pass


class RedundantCert(Exception):
pass
48 changes: 43 additions & 5 deletions signxml/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
"""

import math
import os, sys, re, struct, textwrap
import os
import re
import struct
import sys
import textwrap
from base64 import b64decode, b64encode
from xml.etree import ElementTree as stdlibElementTree
from base64 import b64encode, b64decode

from lxml import etree

from ..exceptions import RedundantCert, InvalidCertificate, InvalidInput
from ..exceptions import InvalidCertificate, InvalidInput, RedundantCert

USING_PYTHON2 = True if sys.version_info < (3, 0) else False

Expand Down Expand Up @@ -168,8 +172,8 @@ def get_root(self, data):


def hmac_sha1(key, message):
from cryptography.hazmat.primitives import hashes, hmac
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, hmac
hasher = hmac.HMAC(key, hashes.SHA1(), backend=default_backend())
hasher.update(message)
return hasher.finalize()
Expand Down Expand Up @@ -208,7 +212,8 @@ def p_sha1(client_b64_bytes, server_b64_bytes):


def _add_cert_to_store(store, cert):
from OpenSSL.crypto import X509StoreContext, X509StoreContextError, Error as OpenSSLCryptoError
from OpenSSL.crypto import Error as OpenSSLCryptoError
from OpenSSL.crypto import X509StoreContext, X509StoreContextError
try:
X509StoreContext(store, cert).verify_certificate()
except X509StoreContextError as e:
Expand Down Expand Up @@ -257,3 +262,36 @@ def verify_x509_cert_chain(cert_chain, ca_pem_file=None, ca_path=None):
else:
raise last_error
return end_of_chain


def _remove_sig(signature, idempotent=False):
"""
Remove the signature node from its parent, keeping any tail element.
This is needed for eneveloped signatures.
:param signature: Signature to remove from payload
:type signature: XML ElementTree Element
:param idempotent:
If True, don't raise an error if signature is already detached from parent.
:type idempotent: boolean
"""
try:
signaturep = next(signature.iterancestors())
except StopIteration:
if idempotent:
return
raise ValueError("Can't remove the root signature node")
if signature.tail is not None:
try:
signatures = next(signature.itersiblings(preceding=True))
except StopIteration:
if signaturep.text is not None:
signaturep.text = signaturep.text + signature.tail
else:
signaturep.text = signature.tail
else:
if signatures.tail is not None:
signatures.tail = signatures.tail + signature.tail
else:
signatures.tail = signature.tail
signaturep.remove(signature)
36 changes: 23 additions & 13 deletions test/generate_125_example.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import os.path
from base64 import b64encode

from lxml import etree
from lxml.etree import Element, SubElement

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.backends import default_backend

from signxml import XMLSignatureProcessor, InvalidInput, namespaces, iterate_pem, ds_tag, _remove_sig, \
ensure_str, long_to_bytes, strip_pem_header, dsig11_tag
from lxml import etree
from lxml.etree import Element, SubElement
from signxml import (
InvalidInput,
XMLSignatureProcessor,
_remove_sig,
ds_tag,
dsig11_tag,
ensure_str,
iterate_pem,
long_to_bytes,
namespaces,
strip_pem_header,
)


class XMLEnvelopedEnvelopingSigner(XMLSignatureProcessor):
Expand Down Expand Up @@ -96,7 +104,9 @@ def sign(self, data, key=None, passphrase=None, cert=None, key_name=None, key_in
sig_root.append(signature_value_element)
elif any(self.sign_alg.startswith(i) for i in ["dsa-", "rsa-", "ecdsa-"]):
if isinstance(key, (str, bytes)):
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import (
load_pem_private_key,
)
key = load_pem_private_key(key, password=passphrase, backend=default_backend())

hash_alg = self._get_signature_digest_method_by_tag(self.sign_alg)
Expand Down Expand Up @@ -133,7 +143,7 @@ def sign(self, data, key=None, passphrase=None, cert=None, key_name=None, key_in
if isinstance(cert, (str, bytes)):
x509_certificate.text = strip_pem_header(cert)
else:
from OpenSSL.crypto import dump_certificate, FILETYPE_PEM
from OpenSSL.crypto import FILETYPE_PEM, dump_certificate
x509_certificate.text = strip_pem_header(dump_certificate(FILETYPE_PEM, cert))
else:
sig_root.append(key_info)
Expand Down Expand Up @@ -191,19 +201,19 @@ def _unpack(self, data):

def _build_sig(self, sig_root, reference_uris, c14n_inputs):
signed_info = SubElement(sig_root, ds_tag("SignedInfo"), nsmap=self.namespaces)
c14n_method = SubElement(signed_info, ds_tag("CanonicalizationMethod"), Algorithm=self.c14n_alg)
c14n_method = SubElement(signed_info, ds_tag("CanonicalizationMethod"), Algorithm=self.c14n_alg) # noqa:F841
if self.sign_alg.startswith("hmac-"):
algorithm_id = self.known_hmac_digest_tags[self.sign_alg]
else:
algorithm_id = self.known_signature_digest_tags[self.sign_alg]
signature_method = SubElement(signed_info, ds_tag("SignatureMethod"), Algorithm=algorithm_id)
signature_method = SubElement(signed_info, ds_tag("SignatureMethod"), Algorithm=algorithm_id) # noqa:F841
for i, reference_uri in enumerate(reference_uris):
reference = SubElement(signed_info, ds_tag("Reference"), URI=reference_uri)
if i == 0:
transforms = SubElement(reference, ds_tag("Transforms"))
SubElement(transforms, ds_tag("Transform"), Algorithm=namespaces.ds + "enveloped-signature")
SubElement(transforms, ds_tag("Transform"), Algorithm=self.c14n_alg)
digest_method = SubElement(reference, ds_tag("DigestMethod"),
digest_method = SubElement(reference, ds_tag("DigestMethod"), # noqa:F841
Algorithm=self.known_digest_tags[self.digest_alg])
digest_value = SubElement(reference, ds_tag("DigestValue"))
payload_c14n = self._c14n(c14n_inputs[i], algorithm=self.c14n_alg)
Expand Down Expand Up @@ -233,7 +243,7 @@ def _serialize_key_value(self, key, key_info_element):
e.text = ensure_str(b64encode(long_to_bytes(getattr(key_params, field))))
elif self.sign_alg.startswith("ecdsa-"):
ec_key_value = SubElement(key_value, dsig11_tag("ECKeyValue"), nsmap=dict(dsig11=namespaces.dsig11))
named_curve = SubElement(ec_key_value, dsig11_tag("NamedCurve"),
named_curve = SubElement(ec_key_value, dsig11_tag("NamedCurve"), # noqa:F841
URI=self.known_ecdsa_curve_oids[key.curve.name])
public_key = SubElement(ec_key_value, dsig11_tag("PublicKey"))
x = key.public_key().public_numbers().x
Expand Down
Loading

0 comments on commit 2ddaaf9

Please sign in to comment.