Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/pip/tomlkit-0.11.8
Browse files Browse the repository at this point in the history
  • Loading branch information
samupl authored Dec 13, 2023
2 parents 768faa6 + 5ec59bc commit 65b4ce9
Show file tree
Hide file tree
Showing 10 changed files with 496 additions and 126 deletions.
51 changes: 23 additions & 28 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ repos:
- repo: meta
hooks:
- id: check-useless-excludes
- repo: https://github.com/MarcoGorelli/absolufy-imports
rev: v0.3.1
hooks:
- id: absolufy-imports
- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.9.0
hooks:
Expand All @@ -14,6 +18,17 @@ repos:
- id: rst-backticks
- id: rst-directive-colons
- id: rst-inline-touching-normal
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 23.12.0
hooks:
- id: black
language_version: python3.8
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.7
hooks:
- id: ruff
args: [ "--fix", "--fixable=I001,ERA001,F401,F841,T201,T203" ]
- id: ruff-format
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.3.0
hooks:
Expand Down Expand Up @@ -42,34 +57,13 @@ repos:
- id: no-commit-to-branch
- id: trailing-whitespace
types: [python]
- repo: local
- repo: https://github.com/commitizen-tools/commitizen
rev: v1.17.0
hooks:
- id: commitizen
name: commitizen
entry: cz check
args: [--commit-msg-file]
require_serial: true
language: system
stages: [commit-msg]
- id: absolufy-imports
name: absolufy-imports
entry: absolufy-imports
require_serial: true
language: system
types: [python]
- id: ruff
name: ruff
entry: ruff
args: ["--fix", "--fixable=I001,ERA001,F401,F841,T201,T203"]
require_serial: true
language: system
types: [python]
- id: black
name: black
entry: black
require_serial: true
language: system
types: [python]
stages: [ commit-msg ]
- repo: local
hooks:
- id: shellcheck
name: shellcheck
entry: shellcheck
Expand All @@ -90,6 +84,7 @@ repos:
pass_filenames: false
- id: mypy
name: mypy
entry: mypy
entry: poetry run mypy
require_serial: true
language: system
types: [python]
types: [ python ]
208 changes: 122 additions & 86 deletions poetry.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ version_files = ["pyproject.toml:version"]
pydantic = "^1.10.5"
python = "^3.8"
requests = "^2.28.2"
cryptography = "^41.0.7"

[tool.poetry.group.test.dependencies] # https://python-poetry.org/docs/master/managing-dependencies/
absolufy-imports = "^0.3.1"
Expand Down Expand Up @@ -71,12 +72,16 @@ plugins = "pydantic.mypy"
strict = true
disallow_subclassing_any = false
disallow_untyped_decorators = false
disallow_untyped_calls = false
ignore_missing_imports = true
pretty = true
pretty = false
show_column_numbers = true
show_error_codes = true
show_error_context = true
warn_unreachable = true
exclude = [
"^tests/*"
]

[tool.pydantic-mypy] # https://pydantic-docs.helpmanual.io/mypy_plugin/#configuring-the-plugin
init_forbid_extra = true
Expand Down
145 changes: 145 additions & 0 deletions src/ksef/auth/init_session_token_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""Builder for InitSessionTokenRequest xml document."""
from typing import cast
from xml.dom import minidom

from ksef.models.responses.authorization_challenge import AuthorizationChallenge


class InitSessionTokenRequestBuilder:
"""Builder used to construct a XML request for initializing a session token."""

NS = "http://ksef.mf.gov.pl/schema/gtw/svc/online/types/2021/10/01/0001"
NS2 = "http://ksef.mf.gov.pl/schema/gtw/svc/types/2021/10/01/0001"
NS3 = "http://ksef.mf.gov.pl/schema/gtw/svc/online/auth/request/2021/10/01/0001"
XSI = "http://www.w3.org/2001/XMLSchema-instance"

def __init__(
self, authorization_challenge: AuthorizationChallenge, nip: str, encrypted_token: str
):
self.authorization_challenge = authorization_challenge
self.nip = nip
self.encrypted_token = encrypted_token

@staticmethod
def _build_document_type_element(root: minidom.Document) -> minidom.Element:
document_type = root.createElement("DocumentType")
service = root.createElement("ns2:Service")
service.appendChild(root.createTextNode("KSeF"))
document_type.appendChild(service)

form_code = root.createElement("ns2:FormCode")
document_type.appendChild(form_code)

system_code = root.createElement("ns2:SystemCode")
system_code.appendChild(root.createTextNode("FA (1)"))
form_code.appendChild(system_code)
schema_version = root.createElement("ns2:SchemaVersion")
schema_version.appendChild(root.createTextNode("1-0E"))
form_code.appendChild(schema_version)
target_namespace = root.createElement("ns2:TargetNamespace")
target_namespace.appendChild(
root.createTextNode("http://crd.gov.pl/wzor/2021/11/29/11089/")
)
form_code.appendChild(target_namespace)
value = root.createElement("ns2:Value")
value.appendChild(root.createTextNode("FA"))
form_code.appendChild(value)

return document_type

def _build_token_element(self, root: minidom.Document) -> minidom.Element:
token = root.createElement("Token")
token.appendChild(root.createTextNode(self.encrypted_token))
return token

def _build_context_element(self, root: minidom.Document) -> minidom.Element:
context = root.createElement("ns3:Context")

challenge = root.createElement("Challenge")
challenge.appendChild(root.createTextNode(self.authorization_challenge.challenge))
context.appendChild(challenge)

identifier = root.createElement("Identifier")
identifier.setAttribute("xmlns:xsi", self.XSI)
identifier.setAttribute("xsi:type", "ns2:SubjectIdentifierByCompanyType")

identifier_inner = root.createElement("ns2:Identifier")
identifier_inner.appendChild(root.createTextNode(self.nip))
identifier.appendChild(identifier_inner)
context.appendChild(identifier)

context.appendChild(self._build_document_type_element(root=root))
context.appendChild(self._build_token_element(root=root))

return context

def _build_signature_element(self, root: minidom.Document) -> minidom.Element:
signature = root.createElement("Signature")
signature.setAttribute("xmlns", "http://www.w3.org/2000/09/xmldsig#")

signed_info = root.createElement("SignedInfo")

canonicalization_method = root.createElement("CanonicalizationMethod")
canonicalization_method.setAttribute(
"Algorithm", "http://www.w3.org/TR/2001/REC-xml-c14n-20010315"
)
signed_info.appendChild(canonicalization_method)

signature_method = root.createElement("SignatureMethod")
signature_method.setAttribute("Algorithm", "http://www.w3.org/2000/09/xmldsig#rsa-sha1")
signed_info.appendChild(signature_method)

reference = root.createElement("Reference")
reference.setAttribute("URI", "")
signed_info.appendChild(reference)

transforms = root.createElement("Transforms")
reference.appendChild(transforms)

transform = root.createElement("Transform")
transform.setAttribute("Algorithm", "http://www.w3.org/2000/09/xmldsig#enveloped-signature")
transforms.appendChild(transform)

digest_method = root.createElement("DigestMethod")
digest_method.setAttribute("Algorithm", "http://www.w3.org/2000/09/xmldsig#sha1")
reference.appendChild(digest_method)

digest_value = root.createElement("DigestValue")
calculated_digest_value = "XXX" # TODO
digest_value.appendChild(root.createTextNode(calculated_digest_value))
reference.appendChild(digest_value)

signature.appendChild(signed_info)

calculated_signature_value = "XXX" # TODO
signature_value = root.createElement("SignatureValue")
signature_value.appendChild(root.createTextNode(calculated_signature_value))

signature.appendChild(signature_value)

key_info = root.createElement("KeyInfo")
signature.appendChild(key_info)

x509_data = root.createElement("X509Data")
key_info.appendChild(x509_data)

calculated_x509_certificate = "XXX" # TODO
x509_certificate = root.createElement("X509Certificate")
x509_certificate.appendChild(root.createTextNode(calculated_x509_certificate))
x509_data.appendChild(x509_certificate)
return signature

def build_xml(self) -> str:
"""Build and return an XML string representing a request to initialize a session token."""
root = minidom.Document()

document = root.createElement("ns3:InitSessionTokenRequest")
document.setAttribute("xmlns", self.NS)
document.setAttribute("xmlns:ns2", self.NS2)
document.setAttribute("xmlns:ns3", self.NS3)
root.appendChild(document)

context = self._build_context_element(root=root)
document.appendChild(context)

return cast(str, root.toprettyxml(indent=" ", encoding="UTF-8").decode("utf-8"))
68 changes: 64 additions & 4 deletions src/ksef/auth/token.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,38 @@
"""Simple token-based authorization implementation."""
import base64
import copy
from typing import Mapping
from datetime import datetime, timezone
from typing import Mapping, cast
from urllib.parse import urljoin

import requests
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from requests import Request

from ksef.auth.base import Authorization
from ksef.constants import BASE_URL, DEFAULT_HEADERS, TIMEOUT, URL_SESSION_CHALLENGE
from ksef.auth.init_session_token_request import InitSessionTokenRequestBuilder
from ksef.constants import (
BASE_URL,
DEFAULT_HEADERS,
TIMEOUT,
URL_AUTH_CHALLENGE,
URL_AUTH_INIT_TOKEN,
)
from ksef.models.responses.authorization_challenge import AuthorizationChallenge
from ksef.models.responses.authorization_token import AuthorizationToken


class TokenAuthorization(Authorization):
"""Simple token-based authorization."""

def __init__(self, token: str, base_url: str = BASE_URL):
def __init__(
self, token: str, public_key: str, base_url: str = BASE_URL, timeout: int = TIMEOUT
):
self.base_url = base_url
self.public_key = public_key
self.token = token
self.timeout = timeout

def modify_request(self, request: Request) -> Request:
"""Enrich requests with authorization headers.
Expand All @@ -25,6 +41,8 @@ def modify_request(self, request: Request) -> Request:
"""
request.prepare()
request.headers["Authorization"] = self.token # TODO: This is just a stub implementaion
headers = self.build_headers()
request.headers.update(headers)
return request

def build_url(self, url: str) -> str:
Expand All @@ -45,7 +63,7 @@ def build_headers(**optional: str) -> Mapping[str, str]:
def get_authorization_challenge(self, nip: str) -> AuthorizationChallenge:
"""Get the token flow authorization challenge."""
response = requests.post(
url=self.build_url(URL_SESSION_CHALLENGE),
url=self.build_url(URL_AUTH_CHALLENGE),
headers=self.build_headers(),
json={
"contextIdentifier": {
Expand All @@ -59,3 +77,45 @@ def get_authorization_challenge(self, nip: str) -> AuthorizationChallenge:
return AuthorizationChallenge(
timestamp=challenge["timestamp"], challenge=challenge["challenge"]
)

def _encrypt_token(self, authorization_challenge: AuthorizationChallenge) -> str:
public_key = serialization.load_pem_public_key(self.public_key.encode())
public_key = cast(rsa.RSAPublicKey, public_key)

timestamp = (
int(
datetime.strptime(authorization_challenge.timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
.replace(tzinfo=timezone.utc)
.timestamp()
)
* 1000
)
message = self.token.encode() + b"|" + str(timestamp).encode()
encrypted_message = public_key.encrypt(plaintext=message, padding=padding.PKCS1v15())
return base64.b64encode(encrypted_message).decode("utf-8")

def _build_init_token_xml(
self, nip: str, authorization_challenge: AuthorizationChallenge
) -> str:
encrypted_token = self._encrypt_token(authorization_challenge=authorization_challenge)

request_builder = InitSessionTokenRequestBuilder(
authorization_challenge=authorization_challenge,
nip=nip,
encrypted_token=encrypted_token,
)
return request_builder.build_xml()

def init_token(
self, authorization_challenge: AuthorizationChallenge, nip: str
) -> AuthorizationToken:
"""Initialize the session."""
document_xml = self._build_init_token_xml(
nip=nip, authorization_challenge=authorization_challenge
)
response = requests.post(
url=self.build_url(URL_AUTH_INIT_TOKEN),
data=document_xml,
timeout=self.timeout,
)
return AuthorizationToken.from_dict(response.json())
Loading

0 comments on commit 65b4ce9

Please sign in to comment.