Skip to content

Commit

Permalink
add and multiply
Browse files Browse the repository at this point in the history
  • Loading branch information
serengil committed Dec 20, 2023
1 parent a15c820 commit 378da76
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 26 deletions.
9 changes: 3 additions & 6 deletions lightphe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,10 @@ def __decrypt_tensors(self, encrypted_tensor: EncryptedTensor) -> Union[List[int
raise ValueError("Ciphertext items must be type of Fraction")

sign = c.sign
dividend = self.cs.decrypt(ciphertext=c.dividend)
abs_dividend = self.cs.decrypt(ciphertext=c.abs_dividend)
# dividend = self.cs.decrypt(ciphertext=c.dividend)
divisor = self.cs.decrypt(ciphertext=c.divisor)
if c.abs_dividend is not None and sign == -1:
abs_dividend = self.cs.decrypt(ciphertext=c.abs_dividend)
m = -1 * abs_dividend / divisor
else:
m = dividend / divisor
m = sign * abs_dividend / divisor

plain_tensor.append(m)
return plain_tensor
Expand Down
46 changes: 38 additions & 8 deletions lightphe/models/Tensor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Union, List, Optional
from typing import Union, List
from lightphe.models.Homomorphic import Homomorphic
from lightphe.commons import phe_utils

Expand All @@ -12,9 +12,9 @@ class Fraction:
def __init__(
self,
dividend: Union[int, tuple, list],
abs_dividend: Union[int, tuple, list],
divisor: Union[int, tuple, list],
sign: int = 1,
abs_dividend: Optional[Union[int, tuple, list]] = None,
):
self.dividend = dividend
self.divisor = divisor
Expand All @@ -25,7 +25,8 @@ def __str__(self):
"""
Print Fraction Class Object
"""
return f"Fraction({self.dividend} / {self.divisor})"
sign = "-" if self.sign == -1 else "+"
return f"Fraction({sign}{self.abs_dividend} / {self.divisor})"

def __repr__(self):
"""
Expand Down Expand Up @@ -66,7 +67,8 @@ def __repr__(self):

def __mul__(self, other: Union["EncryptedTensor", int, float]) -> "EncryptedTensor":
"""
Perform homomorphic multipliction on tensors or multiplication of an encrypted tensor with a constant
Perform homomorphic element-wise multipliction on tensors
or multiplication of an encrypted tensor with a constant
Args:
other: encrypted tensor or constant
Returns:
Expand All @@ -84,19 +86,27 @@ def __mul__(self, other: Union["EncryptedTensor", int, float]) -> "EncryptedTens
ciphertext1=alpha_tensor.dividend, ciphertext2=beta_tensor.dividend
)

current_abs_dividend = self.cs.multiply(
ciphertext1=alpha_tensor.abs_dividend, ciphertext2=beta_tensor.abs_dividend
)

current_divisor = self.cs.multiply(
ciphertext1=alpha_tensor.divisor, ciphertext2=beta_tensor.divisor
)

fraction = Fraction(
dividend=current_dividend,
abs_dividend=current_abs_dividend,
divisor=current_divisor,
sign=alpha_tensor.sign * beta_tensor.sign,
)

fractions.append(fraction)

return EncryptedTensor(fractions=fractions, cs=self.cs)
elif isinstance(other, (int, float)):
constant_sign = 1 if other >= 0 else -1
other = abs(other)
if isinstance(other, float):
other = phe_utils.parse_int(value=other, modulo=self.cs.plaintext_modulo)

Expand All @@ -105,10 +115,15 @@ def __mul__(self, other: Union["EncryptedTensor", int, float]) -> "EncryptedTens
dividend = self.cs.multiply_by_contant(
ciphertext=alpha_tensor.dividend, constant=other
)
abs_dividend = self.cs.multiply_by_contant(
ciphertext=alpha_tensor.abs_dividend, constant=other
)
# notice that divisor is alpha tensor's divisor instead of addition
fraction = Fraction(
dividend=dividend,
abs_dividend=abs_dividend,
divisor=alpha_tensor.divisor,
sign=constant_sign * alpha_tensor.sign,
)
fractions.append(fraction)
return EncryptedTensor(fractions=fractions, cs=self.cs)
Expand Down Expand Up @@ -145,11 +160,26 @@ def __add__(self, other: "EncryptedTensor") -> "EncryptedTensor":
current_dividend = self.cs.add(
ciphertext1=alpha_tensor.dividend, ciphertext2=beta_tensor.dividend
)
# notice that divisor is alpha tensor's divisor instead of addition
current_tensor = Fraction(
dividend=current_dividend,
divisor=alpha_tensor.divisor,
current_abs_dividend = self.cs.add(
ciphertext1=alpha_tensor.abs_dividend, ciphertext2=beta_tensor.abs_dividend
)
# notice that divisor is alpha tensor's divisor instead of addition
if alpha_tensor.sign == -1 and beta_tensor.sign == -1:
current_tensor = Fraction(
dividend=current_dividend,
abs_dividend=current_abs_dividend,
divisor=alpha_tensor.divisor,
sign=-1,
)
else:
# if one is positive and one is negative, then i cannot know
# the result is positive or negative. trust mod calculations.
current_tensor = Fraction(
dividend=current_dividend,
abs_dividend=current_dividend,
divisor=alpha_tensor.divisor,
sign=1,
)

current_tensors.append(current_tensor)

Expand Down
59 changes: 47 additions & 12 deletions tests/test_tensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

logger = Logger(module="tests/test_tensors.py")

THRESHOLD = 0.5
THRESHOLD = 1


def convert_negative_float_to_int(value: float, modulo: int) -> float:
Expand Down Expand Up @@ -50,8 +50,7 @@ def test_homomorphic_multiplication():
restored_tensors = cs.decrypt(c3)

for i, restored_tensor in enumerate(restored_tensors):
if t1[i] > 0 and t2[i] > 0:
assert abs((t1[i] * t2[i]) - restored_tensor) < THRESHOLD
assert abs((t1[i] * t2[i]) - restored_tensor) < THRESHOLD

with pytest.raises(ValueError):
_ = c1 + c2
Expand All @@ -62,7 +61,7 @@ def test_homomorphic_multiplication():
logger.info("✅ Homomorphic multiplication tests succeeded")


def test_homomorphic_multiply_by_a_constant():
def test_homomorphic_multiply_by_a_positive_constant():
cs = LightPHE(algorithm_name="Paillier")

t1 = [5, 6.2, 7.002, 7.002, 8.02]
Expand All @@ -76,7 +75,24 @@ def test_homomorphic_multiply_by_a_constant():
for i, restored_tensor in enumerate(t2):
assert abs((t1[i] * constant) - restored_tensor) < THRESHOLD

logger.info("✅ Homomorphic multiplication by a constant tests succeeded")
logger.info("✅ Homomorphic multiplication by a positive constant tests succeeded")


def test_homomorphic_multiply_by_a_negative_constant():
cs = LightPHE(algorithm_name="Paillier")

t1 = [5, 6.2, 7.002, 7.002, 8.02]
constant = -2
c1: EncryptedTensor = cs.encrypt(t1)

c2 = c1 * constant

t2 = cs.decrypt(c2)

for i, restored_tensor in enumerate(t2):
assert abs((t1[i] * constant) - restored_tensor) < THRESHOLD

logger.info("✅ Homomorphic multiplication by a negative constant tests succeeded")


def test_homomorphic_multiply_with_int_constant():
Expand All @@ -96,7 +112,7 @@ def test_homomorphic_multiply_with_int_constant():
logger.info("✅ Homomorphic multiplication with an integer constant tests succeeded")


def test_homomorphic_multiply_with_float_constant():
def test_homomorphic_multiply_with_positive_float_constant():
cs = LightPHE(algorithm_name="Paillier")

t1 = [10000.0, 15000, 20000]
Expand All @@ -110,14 +126,31 @@ def test_homomorphic_multiply_with_float_constant():
for i, restored_tensor in enumerate(t2):
assert abs((t1[i] * constant) - restored_tensor) < THRESHOLD

logger.info("✅ Homomorphic multiplication with a float constant tests succeeded")
logger.info("✅ Homomorphic multiplication with a positive float constant tests succeeded")


def test_homomorphic_multiply_with_negative_float_constant():
cs = LightPHE(algorithm_name="Paillier")

t1 = [10000.0, 15000, 20000]
constant = -1.05
c1: EncryptedTensor = cs.encrypt(t1)

c2 = constant * c1

t2 = cs.decrypt(c2)

for i, restored_tensor in enumerate(t2):
assert abs((t1[i] * constant) - restored_tensor) < THRESHOLD

logger.info("✅ Homomorphic multiplication with a positive float constant tests succeeded")


def test_homomorphic_addition():
cs = LightPHE(algorithm_name="Paillier", key_size=30)

t1 = [1.005, 2.05, 3.6, -4, -3.5]
t2 = [5, 6.2, -7.5, 8.02, -4.5]
t1 = [1.005, 2.05, 3.6, -4, 4.02, -3.5]
t2 = [5, 6.2, -7.5, 8.02, -8.02, -4.5]

c1: EncryptedTensor = cs.encrypt(t1)
c2: EncryptedTensor = cs.encrypt(t2)
Expand All @@ -127,11 +160,13 @@ def test_homomorphic_addition():
restored_tensors = cs.decrypt(c3)

for i, restored_tensor in enumerate(restored_tensors):
if t1[i] + t2[i] >= 0:
if (t1[i] >= 0 and t2[i] >= 0) or (t1[i] < 0 and t2[i] < 0) or (t1[i] + t2[i] >= 0):
assert abs((t1[i] + t2[i]) - restored_tensor) < THRESHOLD
else:
elif t1[i] + t2[i] < 0:
expected = convert_negative_float_to_int(t1[i] + t2[i], cs.cs.plaintext_modulo)
assert abs(expected - restored_tensor) < THRESHOLD * 2
assert abs(expected - restored_tensor) < THRESHOLD
else:
raise ValueError("else must not be called at all")

with pytest.raises(ValueError):
_ = c1 * c2
Expand Down

0 comments on commit 378da76

Please sign in to comment.