-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Decrypt input secrets if there are some #45
Changes from 7 commits
80ce441
b4b23d8
1408ea7
184c963
1442fda
3f49bd2
ad19f3c
73f642d
5566410
dbbef6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
import base64 | ||
import secrets | ||
from typing import Any | ||
|
||
from cryptography.exceptions import InvalidTag as InvalidTagException | ||
from cryptography.hazmat.primitives import hashes, serialization | ||
from cryptography.hazmat.primitives.asymmetric import padding, rsa | ||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | ||
|
||
from .consts import ENCRYPTED_INPUT_VALUE_REGEXP | ||
|
||
ENCRYPTION_KEY_LENGTH = 32 | ||
ENCRYPTION_IV_LENGTH = 16 | ||
ENCRYPTION_AUTH_TAG_LENGTH = 16 | ||
|
||
|
||
def public_encrypt(value: str, *, public_key: rsa.RSAPublicKey) -> dict: | ||
"""Encrypts the given value using AES cipher and the password for encryption using the public key. | ||
|
||
The encryption password is a string of encryption key and initial vector used for cipher. | ||
It returns the encrypted password and encrypted value in BASE64 format. | ||
|
||
Args: | ||
public_key (RSAPublicKey): Private key to use for decryption. | ||
value (str): Password used to encrypt the private key encoded as base64 string. | ||
|
||
Returns: | ||
disc: Encrypted password and value. | ||
""" | ||
key_bytes = _crypto_random_object_id(ENCRYPTION_KEY_LENGTH).encode('utf-8') | ||
initialized_vector_bytes = _crypto_random_object_id(ENCRYPTION_IV_LENGTH).encode('utf-8') | ||
value_bytes = value.encode('utf-8') | ||
|
||
password_bytes = key_bytes + initialized_vector_bytes | ||
|
||
# NOTE: Auth Tag is appended to the end of the encrypted data, it has length of 16 bytes and ensures integrity of the data. | ||
cipher = Cipher(algorithms.AES(key_bytes), modes.GCM(initialized_vector_bytes, min_tag_length=ENCRYPTION_AUTH_TAG_LENGTH)) | ||
encryptor = cipher.encryptor() | ||
encrypted_value_bytes = encryptor.update(value_bytes) + encryptor.finalize() | ||
encrypted_password_bytes = public_key.encrypt( | ||
password_bytes, | ||
padding.OAEP( | ||
mgf=padding.MGF1(algorithm=hashes.SHA1()), | ||
algorithm=hashes.SHA1(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SHA-1 is a deprecated hash algorithm that has practical known collision attacks. You are strongly discouraged from using it. Existing applications should strongly consider moving away. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right, it is deprecated for the encryption, but in this case, we are using sha1 for padding and it is not the issue. In short, sha1 is there used for generating a hash which fills the block of the message for encryption. It is quite complex stuff, but the message itself together with the hash is then encrypted using the public key(RSA). There is a nice picture and explanation of how OAEP works. |
||
label=None, | ||
), | ||
) | ||
return { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I need the value as a string. The function is aligned with the same one from JS for better accountability. Basically, we didn't use this function in python yet. The function is there mainly for testing and maybe for the future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
'encrypted_value': base64.b64encode(encrypted_value_bytes + encryptor.tag).decode('utf-8'), | ||
'encrypted_password': base64.b64encode(encrypted_password_bytes).decode('utf-8'), | ||
} | ||
|
||
|
||
def private_decrypt( | ||
encrypted_password: str, | ||
encrypted_value: str, | ||
*, | ||
private_key: rsa.RSAPrivateKey, | ||
) -> str: | ||
"""Decrypts the given encrypted value using the private key and password. | ||
|
||
Args: | ||
private_key (RSAPrivateKey): Private key to use for decryption. | ||
encrypted_password (str): Password used to encrypt the private key encoded as base64 string. | ||
encrypted_value (str): Encrypted value to decrypt as base64 string. | ||
|
||
Returns: | ||
str: Decrypted value. | ||
""" | ||
encrypted_password_bytes = base64.b64decode(encrypted_password.encode('utf-8')) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think if you didn't decode in encrypt, you can omit encode here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is true. But the encrypted secret(password and value) is stored in input JSON, and the base64 string is better for handling as it has a subset of characters and we can easily match it. |
||
encrypted_value_bytes = base64.b64decode(encrypted_value.encode('utf-8')) | ||
|
||
# Decrypt the password | ||
password_bytes = private_key.decrypt( | ||
encrypted_password_bytes, | ||
padding.OAEP( | ||
mgf=padding.MGF1(algorithm=hashes.SHA1()), | ||
jirimoravcik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
algorithm=hashes.SHA1(), | ||
label=None, | ||
), | ||
) | ||
|
||
if len(password_bytes) != ENCRYPTION_KEY_LENGTH + ENCRYPTION_IV_LENGTH: | ||
raise ValueError('Decryption failed, invalid password length!') | ||
|
||
# Slice the encrypted into cypher and authentication tag | ||
authentication_tag_bytes = encrypted_value_bytes[-ENCRYPTION_AUTH_TAG_LENGTH:] | ||
encrypted_data_bytes = encrypted_value_bytes[:len(encrypted_value_bytes) - ENCRYPTION_AUTH_TAG_LENGTH] | ||
encryption_key_bytes = password_bytes[:ENCRYPTION_KEY_LENGTH] | ||
initialization_vector_bytes = password_bytes[ENCRYPTION_KEY_LENGTH:] | ||
|
||
try: | ||
cipher = Cipher(algorithms.AES(encryption_key_bytes), modes.GCM(initialization_vector_bytes, authentication_tag_bytes)) | ||
decryptor = cipher.decryptor() | ||
decipher_bytes = decryptor.update(encrypted_data_bytes) + decryptor.finalize() | ||
except InvalidTagException: | ||
raise ValueError('Decryption failed, malformed encrypted value or password.') | ||
except Exception as err: | ||
raise err | ||
|
||
return decipher_bytes.decode('utf-8') | ||
|
||
|
||
def _load_private_key(private_key_file_base64: str, private_key_password: str) -> rsa.RSAPrivateKey: | ||
private_key = serialization.load_pem_private_key(base64.b64decode( | ||
private_key_file_base64.encode('utf-8')), password=private_key_password.encode('utf-8')) | ||
if not isinstance(private_key, rsa.RSAPrivateKey): | ||
raise ValueError('Invalid private key.') | ||
|
||
return private_key | ||
|
||
|
||
def _load_public_key(public_key_file_base64: str) -> rsa.RSAPublicKey: | ||
public_key = serialization.load_pem_public_key(base64.b64decode(public_key_file_base64.encode('utf-8'))) | ||
if not isinstance(public_key, rsa.RSAPublicKey): | ||
raise ValueError('Invalid public key.') | ||
|
||
return public_key | ||
|
||
|
||
def _crypto_random_object_id(length: int = 17) -> str: | ||
"""Python reimplementation of cryptoRandomObjectId from `@apify/utilities`.""" | ||
chars = 'abcdefghijklmnopqrstuvwxyzABCEDFGHIJKLMNOPQRSTUVWXYZ0123456789' | ||
return ''.join(secrets.choice(chars) for _ in range(length)) | ||
|
||
|
||
def _decrypt_input_secrets(private_key: rsa.RSAPrivateKey, input: Any) -> Any: | ||
"""Decrypt input secrets.""" | ||
if not isinstance(input, dict): | ||
return input | ||
|
||
for key, value in input.items(): | ||
if isinstance(value, str): | ||
match = ENCRYPTED_INPUT_VALUE_REGEXP.fullmatch(value) | ||
if match: | ||
encrypted_password = match.group(1) | ||
encrypted_value = match.group(2) | ||
input[key] = private_decrypt( | ||
encrypted_password, | ||
encrypted_value, | ||
private_key=private_key, | ||
) | ||
|
||
return input |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
import base64 | ||
|
||
import pytest | ||
|
||
from apify._crypto import _crypto_random_object_id, _load_private_key, _load_public_key, private_decrypt, public_encrypt | ||
|
||
# NOTE: Uses the same keys as in: | ||
# https://github.com/apify/apify-shared-js/blob/master/test/crypto.test.ts | ||
PRIVATE_KEY_PEM_BASE64 = 'LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpQcm9jLVR5cGU6IDQsRU5DUllQVEVECkRFSy1JbmZvOiBERVMtRURFMy1DQkMsNTM1QURERjIzNUQ4QkFGOQoKMXFWUzl0S0FhdkVhVUVFMktESnpjM3plMk1lZkc1dmVEd2o1UVJ0ZkRaMXdWNS9VZmIvcU5sVThTSjlNaGhKaQp6RFdrWExueUUzSW0vcEtITVZkS0czYWZkcFRtcis2TmtidXptd0dVMk0vSWpzRjRJZlpad0lGbGJoY09jUnp4CmZmWVIvTlVyaHNrS1RpNGhGV0lBUDlLb3Z6VDhPSzNZY3h6eVZQWUxYNGVWbWt3UmZzeWkwUU5Xb0tGT3d0ZC8KNm9HYzFnd2piRjI5ZDNnUThZQjFGWmRLa1AyMTJGbkt1cTIrUWgvbE1zTUZrTHlTQTRLTGJ3ZG1RSXExbE1QUwpjbUNtZnppV3J1MlBtNEZoM0dmWlQyaE1JWHlIRFdEVzlDTkxKaERodExOZ2RRamFBUFpVT1E4V2hwSkE5MS9vCjJLZzZ3MDd5Z2RCcVd5dTZrc0pXcjNpZ1JpUEJ5QmVNWEpEZU5HY3NhaUZ3Q2c5eFlja1VORXR3NS90WlRsTjIKSEdZV0NpVU5Ed0F2WllMUHR1SHpIOFRFMGxsZm5HR0VuVC9QQlp1UHV4andlZlRleE1mdzFpbGJRU3lkcy9HMgpOOUlKKzkydms0N0ZXR2NOdGh1Q3lCbklva0NpZ0c1ZlBlV2IwQTdpdjk0UGtwRTRJZ3plc0hGQ0ZFQWoxWldLCnpQdFRBQlkwZlJrUzBNc3UwMHYxOXloTTUrdFUwYkVCZWo2eWpzWHRoYzlwS01hcUNIZWlQTC9TSHRkaWsxNVMKQmU4Sml4dVJxZitUeGlYWWVuNTg2aDlzTFpEYzA3cGpkUGp2NVNYRnBYQjhIMlVxQ0tZY2p4R3RvQWpTV0pjWApMNHc3RHNEby80bVg1N0htR09iamlCN1ZyOGhVWEJDdFh2V0dmQXlmcEFZNS9vOXowdm4zREcxaDc1NVVwdDluCkF2MFZrbm9qcmJVYjM1ZlJuU1lYTVltS01LSnpNRlMrdmFvRlpwV0ZjTG10cFRWSWNzc0JGUEYyZEo3V1c0WHMKK0d2Vkl2eFl3S2wyZzFPTE1TTXRZa09vekdlblBXTzdIdU0yMUVKVGIvbHNEZ25GaTkrYWRGZHBLY3R2cm0zdgpmbW1HeG5pRmhLU05GU0xtNms5YStHL2pjK3NVQVBhb2FZNEQ3NHVGajh0WGp0eThFUHdRRGxVUGRVZld3SE9PClF3bVgyMys1REh4V0VoQy91Tm8yNHNNY2ZkQzFGZUpBV281bUNuVU5vUVVmMStNRDVhMzNJdDhhMmlrNUkxUWoKeSs1WGpRaG0xd3RBMWhWTWE4aUxBR0toT09lcFRuK1VBZHpyS0hvNjVtYzNKbGgvSFJDUXJabnVxWkErK0F2WgpjeWU0dWZGWC8xdmRQSTdLb2Q0MEdDM2dlQnhweFFNYnp1OFNUcGpOcElJRkJvRVc5dFRhemUzeHZXWnV6dDc0CnFjZS8xWURuUHBLeW5lM0xGMk94VWoyYWVYUW5YQkpYcGhTZTBVTGJMcWJtUll4bjJKWkl1d09RNHV5dm94NjUKdG9TWGNac054dUs4QTErZXNXR3JSN3pVc0djdU9QQTFERE9Ja2JjcGtmRUxMNjk4RTJRckdqTU9JWnhrcWdxZQoySE5VNktWRmV2NzdZeEJDbm1VcVdXZEhYMjcyU2NPMUYzdWpUdFVnRVBNWGN0aEdBckYzTWxEaUw1Q0k0RkhqCnhHc3pVemxzalRQTmpiY2MzdUE2MjVZS3VVZEI2c1h1Rk5NUHk5UDgwTzBpRWJGTXl3MWxmN2VpdFhvaUUxWVoKc3NhMDVxTUx4M3pPUXZTLzFDdFpqaFp4cVJMRW5pQ3NWa2JVRlVYclpodEU4dG94bGpWSUtpQ25qbitORmtqdwo2bTZ1anpBSytZZHd2Nk5WMFB4S0gwUk5NYVhwb1lmQk1oUmZ3dGlaS3V3Y2hyRFB5UEhBQ2J3WXNZOXdtUE9rCnpwdDNxWi9JdDVYTmVqNDI0RzAzcGpMbk1sd1B1T1VzYmFQUWQ2VHU4TFhsckZReUVjTXJDNHdjUTA1SzFVN3kKM1NNN3RFaTlnbjV3RjY1YVI5eEFBR0grTUtMMk5WNnQrUmlTazJVaWs1clNmeDE4Mk9wYmpSQ2grdmQ4UXhJdwotLS0tLUVORCBSU0EgUFJJVkFURSBLRVktLS0tLQo=' # noqa: E501 | ||
PRIVATE_KEY_PASSWORD = 'pwd1234' | ||
PUBLIC_KEY_PEM_BASE64 = 'LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0dis3NlNXbklhOFFKWC94RUQxRQpYdnBBQmE3ajBnQnVYenJNUU5adjhtTW1RU0t2VUF0TmpOL2xacUZpQ0haZUQxU2VDcGV1MnFHTm5XbGRxNkhUCnh5cXJpTVZEbFNKaFBNT09QSENISVNVdFI4Tk5lR1Y1MU0wYkxJcENabHcyTU9GUjdqdENWejVqZFRpZ1NvYTIKQWxrRUlRZWQ4UVlDKzk1aGJoOHk5bGcwQ0JxdEdWN1FvMFZQR2xKQ0hGaWNuaWxLVFFZay9MZzkwWVFnUElPbwozbUppeFl5bWFGNmlMZTVXNzg1M0VHWUVFVWdlWmNaZFNjaGVBMEdBMGpRSFVTdnYvMEZjay9adkZNZURJOTVsCmJVQ0JoQjFDbFg4OG4wZUhzUmdWZE5vK0NLMDI4T2IvZTZTK1JLK09VaHlFRVdPTi90alVMdGhJdTJkQWtGcmkKOFFJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==' # noqa: E501 | ||
PRIVATE_KEY = _load_private_key( | ||
PRIVATE_KEY_PEM_BASE64, | ||
PRIVATE_KEY_PASSWORD, | ||
) | ||
PUBLIC_KEY = _load_public_key(PUBLIC_KEY_PEM_BASE64) | ||
|
||
|
||
class TestCrypto(): | ||
|
||
def test_encrypt_decrypt_varions_string(self) -> None: | ||
for value in [_crypto_random_object_id(10), '👍', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '-', '_', '=', '+', '[', ']', '{', '}', '|', ';', ':', '"', "'", ',', '.', '<', '>', '?', '/', '~']: # noqa: E501 | ||
encrypted = public_encrypt(value, public_key=PUBLIC_KEY) | ||
decrypted_value = private_decrypt(**encrypted, private_key=PRIVATE_KEY) | ||
assert decrypted_value == value | ||
|
||
def test_throw_if_password_is_not_valid(self) -> None: | ||
test_value = 'test' | ||
encrypted = public_encrypt(test_value, public_key=PUBLIC_KEY) | ||
encrypted['encrypted_password'] = base64.b64encode(b'invalid_password').decode('utf-8') | ||
|
||
with pytest.raises(ValueError): | ||
private_decrypt(**encrypted, private_key=PRIVATE_KEY) | ||
|
||
def test_throw_error_if_cipher_is_manipulated(self) -> None: | ||
test_value = 'test2' | ||
encrypted = public_encrypt(test_value, public_key=PUBLIC_KEY) | ||
encrypted['encrypted_value'] = base64.b64encode( | ||
b'invalid_cipher' + base64.b64decode(encrypted['encrypted_value'].encode('utf-8'))).decode('utf-8') | ||
|
||
with pytest.raises(ValueError): | ||
private_decrypt(**encrypted, private_key=PRIVATE_KEY) | ||
|
||
def test_same_encrypted_value_should_return_deffirent_cipher(self) -> None: | ||
test_value = 'test3' | ||
encrypted1 = public_encrypt(test_value, public_key=PUBLIC_KEY) | ||
encrypted2 = public_encrypt(test_value, public_key=PUBLIC_KEY) | ||
assert encrypted1['encrypted_value'] != encrypted2['encrypted_value'] | ||
|
||
# Check if the method is compatible with js version of the same method in: | ||
# https://github.com/apify/apify-shared-js/blob/master/packages/utilities/src/crypto.ts | ||
def test_private_encrypt_node_js_encrypted_value(self) -> None: | ||
value = 'encrypted_with_node_js' | ||
# This was encrypted with nodejs version of the same method. | ||
encrypted_value_with_node_js = { | ||
'encrypted_password': 'lw0ez64/T1UcCQMLfhucZ6VIfMcf/TKni7PmXlL/ZRA4nmdGYz7/YQUzGWzKbLChrpqbG21DHxPIubUIQFDFE1ASkLvoSd0Ks8/wjKHMyhp+hsg5aSh9EZK6pBFpp6FeHoinV80+UURTvJuSVbWd1Orw5Frl41taP6RK3uNJlXikmgs8Xc7mShFEENgkz6y9+Pbe7jpcKkaJ2U/h7FN0eNON189kNFYVuAE1n2N6C3Q7dFnjl2e1btqErvg5Vu7ZS4BbX3wgC2qLYySGnqI3BNI5VGhAnncnQcjHb+85qG+LKoPekgY9I0s0kGMxiz/bmy1mYm9O+Lj1mbVUr7BDjQ==', # noqa: E501 | ||
'encrypted_value': 'k8nkZDCi0hRfBc0RRefxeSHeGV0X60N03VCrhRhENKXBjrF/tEg=', | ||
} | ||
decrypted_value = private_decrypt( | ||
**encrypted_value_with_node_js, | ||
private_key=PRIVATE_KEY, | ||
) | ||
|
||
assert decrypted_value == value | ||
|
||
def test__crypto_random_object_id(self) -> None: | ||
assert len(_crypto_random_object_id()) == 17 | ||
assert len(_crypto_random_object_id(5)) == 5 | ||
long_random_object_id = _crypto_random_object_id(1000) | ||
for char in long_random_object_id: | ||
assert char in 'abcdefghijklmnopqrstuvwxyzABCEDFGHIJKLMNOPQRSTUVWXYZ0123456789' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
min_tag_length (int) – The minimum length tag must be. By default this is 16, meaning tag truncation is not allowed. Allowing tag truncation is strongly discouraged for most applications.
min_tag_length
is 16 by default, why override with 16?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be sure that default will change the decryption still work. We are strictly setting tag to 16 chars in length same as in node js version of this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, fair enough