-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* add depseudo tools * update changelog --------- Co-authored-by: djkhl <[email protected]>
- Loading branch information
Showing
17 changed files
with
433 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
"""Command line tool to depseudonymize a string using the given keys.""" | ||
|
||
import click | ||
|
||
from logprep.util.pseudo.depseudonymizer.depseudonymizer import Depseudonymizer | ||
|
||
|
||
@click.command() | ||
@click.argument("analyst-key", type=str) | ||
@click.argument("depseudo-key", type=str) | ||
@click.argument("pseudo-string", type=str) | ||
def depseudonymize(analyst_key: str, depseudo_key: str, pseudo_string: str): | ||
"""depseudonymize a string using the given keys.""" | ||
depseudo = Depseudonymizer(pseudo_string) | ||
keys = {} | ||
for key_file_name in analyst_key, depseudo_key: | ||
with open(f"{key_file_name}.key", "r", encoding="utf8") as key_file: | ||
keys[key_file_name] = key_file.read() | ||
depseudo.depseudo_key = keys[depseudo_key] | ||
depseudo.analyst_key = keys[analyst_key] | ||
print(depseudo.depseudonymize()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
import click | ||
|
||
from logprep.util.pseudo.keygenerator import generate_rsa_key | ||
|
||
|
||
@click.command() | ||
@click.argument("key-length", default="1024", type=int) | ||
@click.option("-f", "--file") | ||
def generate(key_length: int, file: str): | ||
"""Generate RSA keys for pseudonymization.""" | ||
priv_key, pub_key = generate_rsa_key.generate_keys(key_length=key_length) | ||
if not file: | ||
print(priv_key.decode("utf8")) | ||
print(pub_key.decode("utf8")) | ||
else: | ||
with open(f"{file}.key", "w", encoding="utf8") as private_key_file: | ||
private_key_file.write(priv_key.decode("utf8")) | ||
with open(f"{file}.crt", "w", encoding="utf8") as public_key_file: | ||
public_key_file.write(pub_key.decode("utf8")) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
import click | ||
|
||
from logprep.processor.pseudonymizer.encrypter import DualPKCS1HybridEncrypter | ||
|
||
|
||
@click.command() | ||
@click.argument("analyst-key", type=str) | ||
@click.argument("depseudo-key", type=str) | ||
@click.argument("string", type=str) | ||
def pseudonymize(analyst_key: str, depseudo_key: str, string: str): | ||
"""pseudonymize a string using the given keys.""" | ||
encrypter = DualPKCS1HybridEncrypter() | ||
encrypter.load_public_keys( | ||
keyfile_analyst=f"{analyst_key}.crt", | ||
keyfile_depseudo=f"{depseudo_key}.crt", | ||
) | ||
print(encrypter.encrypt(string)) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
"""module to depseudonymize""" | ||
|
||
import base64 | ||
from dataclasses import dataclass | ||
from Crypto.PublicKey import RSA | ||
from Crypto.Cipher.PKCS1_OAEP import PKCS1OAEP_Cipher | ||
from Crypto.Cipher import AES, PKCS1_OAEP | ||
|
||
|
||
class DepseudonymizeError(Exception): | ||
"""Depseudonymizer custom Exception""" | ||
|
||
|
||
@dataclass | ||
class Depseudonymizer: | ||
"""class to depseudonymize a pseudonymized string | ||
Parameters | ||
---------- | ||
pseudonymized_string: str | ||
The base64 encoded pseudonymized string. | ||
Base64 decoding is done in __post_init__ method | ||
""" | ||
|
||
pseudonymized_string: str | ||
"""the pseudonymized string""" | ||
|
||
_analyst_key: PKCS1OAEP_Cipher = None | ||
|
||
_depseudo_key: PKCS1OAEP_Cipher = None | ||
|
||
def __post_init__(self) -> None: | ||
self.pseudonymized_string = base64.b64decode(self.pseudonymized_string) | ||
|
||
@property | ||
def encrypted_session_key(self) -> bytes: | ||
"""the encrypted session key | ||
Returns | ||
------- | ||
bytes | ||
the first 16 bytes of the pseudonymized_string | ||
""" | ||
return self.pseudonymized_string[:256] | ||
|
||
@property | ||
def cipher_nonce(self) -> bytes: | ||
"""the cipher nonce | ||
Returns | ||
------- | ||
bytes | ||
The 2 bytes after the session key | ||
""" | ||
return self.pseudonymized_string[256:264] | ||
|
||
@property | ||
def ciphertext(self) -> bytes: | ||
"""the cipher text | ||
Returns | ||
------- | ||
bytes | ||
All bytes after the first 18 bytes | ||
""" | ||
return self.pseudonymized_string[264:] | ||
|
||
@property | ||
def depseudo_key(self) -> PKCS1OAEP_Cipher: | ||
"""getter for depseudo_key | ||
Returns | ||
------- | ||
PKCS1OAEP_Cipher | ||
returns a PKCS1OAEP_Cipher representation of the depseudo key | ||
""" | ||
return self._depseudo_key | ||
|
||
@depseudo_key.setter | ||
def depseudo_key(self, depseudo_key: str) -> None: | ||
"""setter for the depseudo_key | ||
saves the depseudo_key as PKCS1OAEP_Cipher in _depseudo_key | ||
Parameters | ||
---------- | ||
depseudo_key : str | ||
the depseudo privat key | ||
""" | ||
self._depseudo_key = RSA.import_key(depseudo_key) | ||
|
||
@property | ||
def analyst_key(self) -> PKCS1OAEP_Cipher: | ||
"""getter for analyst_key | ||
Returns | ||
------- | ||
PKCS1OAEP_Cipher | ||
returns a PKCS1OAEP_Cipher representation of the analyst key | ||
""" | ||
return self._analyst_key | ||
|
||
@analyst_key.setter | ||
def analyst_key(self, analyst_key: str) -> None: | ||
"""setter for the analyst_key | ||
saves the analyst_key as PKCS1OAEP_Cipher in _analyst_key | ||
Parameters | ||
---------- | ||
analyst_key : str | ||
the analyst privat key | ||
""" | ||
self._analyst_key = RSA.import_key(analyst_key) | ||
|
||
def depseudonymize(self) -> str: | ||
"""depseudonymizes after setting the depseudo and analyst keys | ||
Returns | ||
------- | ||
str | ||
the depseudonymized string | ||
Raises | ||
------ | ||
DepseudonymizeError | ||
if depseudo_key or analyst_key is not set | ||
""" | ||
if self._depseudo_key is None: | ||
raise DepseudonymizeError("No depseudo key") | ||
if self._analyst_key is None: | ||
raise DepseudonymizeError("No analyst key") | ||
cipher_rsa_depseudo = PKCS1_OAEP.new(self._depseudo_key) | ||
cipher_rsa_analyst = PKCS1_OAEP.new(self._analyst_key) | ||
depseudo_decrypted_session_key = cipher_rsa_depseudo.decrypt(self.encrypted_session_key) | ||
analyst_decrypted_session_key = cipher_rsa_analyst.decrypt(depseudo_decrypted_session_key) | ||
cipher_aes = AES.new(analyst_decrypted_session_key, AES.MODE_CTR, nonce=self.cipher_nonce) | ||
return cipher_aes.decrypt(self.ciphertext).decode("utf-8") |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
from Crypto.PublicKey import RSA | ||
|
||
|
||
def generate_keys(key_length): | ||
key = RSA.generate(key_length) | ||
pv_key_string = key.exportKey() | ||
pb_key_string = key.publickey().exportKey() | ||
return pv_key_string, pb_key_string |
Empty file.
Oops, something went wrong.