From a2b65203ae9874f53fe565942f2136ff8f5737aa Mon Sep 17 00:00:00 2001 From: AnonO6 <21ucs043@gmail.com> Date: Sun, 13 Jul 2025 17:24:57 +0000 Subject: [PATCH 1/2] feat:added get_auth_message --- src/lighthouseweb3/__init__.py | 17 ++++ src/lighthouseweb3/functions/config.py | 4 + .../functions/kavach/get_auth_message.py | 10 ++ src/lighthouseweb3/functions/kavach/util.py | 95 +++++++++++++++++++ tests/test_get_auth_message.py | 31 ++++++ 5 files changed, 157 insertions(+) create mode 100644 src/lighthouseweb3/functions/kavach/get_auth_message.py create mode 100644 src/lighthouseweb3/functions/kavach/util.py create mode 100644 tests/test_get_auth_message.py diff --git a/src/lighthouseweb3/__init__.py b/src/lighthouseweb3/__init__.py index b1d8d7c..1cb9924 100644 --- a/src/lighthouseweb3/__init__.py +++ b/src/lighthouseweb3/__init__.py @@ -17,6 +17,8 @@ create_wallet as createWallet ) +from .functions.kavach import get_auth_message as getAuthMessage + class Lighthouse: def __init__(self, token: str = ""): @@ -224,3 +226,18 @@ def getTagged(self, tag: str): except Exception as e: raise e +class Kavach: + + @staticmethod + def getAuthMessage(address: str): + """ + Retrieves an authentication message for a given address. + + :param address: str, The address for which to retrieve the authentication message. + :return: dict, A dictionary containing the authentication message. + """ + try: + return getAuthMessage.get_auth_message(address) + except Exception as e: + raise e + diff --git a/src/lighthouseweb3/functions/config.py b/src/lighthouseweb3/functions/config.py index 000c5ef..70988eb 100644 --- a/src/lighthouseweb3/functions/config.py +++ b/src/lighthouseweb3/functions/config.py @@ -9,3 +9,7 @@ class Config: lighthouse_node = "https://node.lighthouse.storage" lighthouse_bls_node = "https://encryption.lighthouse.storage" lighthouse_gateway = "https://gateway.lighthouse.storage/ipfs" + + + is_dev = False + lighthouse_bls_node_dev = "http://enctest.lighthouse.storage" \ No newline at end of file diff --git a/src/lighthouseweb3/functions/kavach/get_auth_message.py b/src/lighthouseweb3/functions/kavach/get_auth_message.py new file mode 100644 index 0000000..3f330f9 --- /dev/null +++ b/src/lighthouseweb3/functions/kavach/get_auth_message.py @@ -0,0 +1,10 @@ +from typing import Any +from .util import api_node_handler + + +async def get_auth_message(address: str) -> dict[str, Any]: + try: + response = await api_node_handler(f"/api/message/{address}", "GET") + return {'message': response[0]['message'], 'error': None} + except Exception as e: + return {'message': None, 'error':str(e)} diff --git a/src/lighthouseweb3/functions/kavach/util.py b/src/lighthouseweb3/functions/kavach/util.py new file mode 100644 index 0000000..f9cadd2 --- /dev/null +++ b/src/lighthouseweb3/functions/kavach/util.py @@ -0,0 +1,95 @@ +import re +import json +import asyncio +import httpx +from typing import Any, Optional, Union +from src.lighthouseweb3.functions.config import Config + +def is_cid_reg(cid: str) -> bool: + """Check if string is a valid CID (Content Identifier)""" + pattern = r'Qm[1-9A-HJ-NP-Za-km-z]{44}|b[A-Za-z2-7]{58}|B[A-Z2-7]{58}|z[1-9A-HJ-NP-Za-km-z]{48}|F[0-9A-F]{50}' + return bool(re.match(pattern, cid)) + +def is_equal(*objects: Any) -> bool: + """Check if all objects are equal by comparing their JSON representations""" + if not objects: + return True + + first_obj_json = json.dumps(objects[0], sort_keys=True) + return all(json.dumps(obj, sort_keys=True) == first_obj_json for obj in objects) + +async def api_node_handler( + endpoint: str, + verb: str, + auth_token: str = "", + body: Any = None, + retry_count: int = 3 +) -> Any: + """ + Handle API requests to node with retry logic + + Args: + endpoint: API endpoint path + verb: HTTP method (GET, POST, DELETE, PUT) + auth_token: Bearer token for authentication + body: Request body for POST/PUT/DELETE requests + retry_count: Number of retry attempts + + Returns: + JSON response from API + + Raises: + Exception: If request fails after all retries + """ + verb = verb.upper() + url = Config.lighthouse_bls_node if not Config.is_dev else Config.lighthouse_bls_node_dev + url += endpoint + + headers = { + "Content-Type": "application/json" + } + + if auth_token: + headers["Authorization"] = f"Bearer {auth_token}" + + json_data = body if verb in ["POST", "PUT", "DELETE"] and body is not None else None + + async with httpx.AsyncClient() as client: + for i in range(retry_count): + try: + response = await client.request( + method=verb, + url=url, + headers=headers, + json=json_data + ) + + if not response.is_success: + if response.status_code == 404: + raise Exception(json.dumps({ + "message": "fetch Error", + "statusCode": response.status_code + })) + + try: + error_body = response.json() + except: + error_body = {"message": "Unknown error"} + + raise Exception(json.dumps({ + **error_body, + "statusCode": response.status_code + })) + + return response.json() + + except Exception as error: + error_str = str(error) + if "fetch" not in error_str: + raise error + + if i == retry_count - 1: # Last attempt + raise error + + # Wait 1 second before retry + await asyncio.sleep(1) \ No newline at end of file diff --git a/tests/test_get_auth_message.py b/tests/test_get_auth_message.py new file mode 100644 index 0000000..80bc573 --- /dev/null +++ b/tests/test_get_auth_message.py @@ -0,0 +1,31 @@ +import unittest +import logging +from src.lighthouseweb3 import Kavach + +logger = logging.getLogger(__name__) + +class TestGetAuthMessage(unittest.IsolatedAsyncioTestCase): + """Test cases for the getAuthMessage function.""" + + async def test_get_auth_message_valid_address(self): + """Test getting auth message with a valid address.""" + address = 'h6gar47c9GxYda8Kkg5J9So3R9K3jhcWKbgrjKhqfst' + auth_message = await Kavach.getAuthMessage(address=address) + + self.assertIn( + "Please sign this message to prove you are owner of this account", + auth_message['message'], + "Should return a valid auth message" + ) + self.assertIsNone(auth_message['error']) + + async def test_get_auth_message_invalid_address(self): + """Test getting auth message with an invalid address.""" + auth_message = await Kavach.getAuthMessage(address="0x9a40b8EE3B8Fe7eB621cd142a651560Fa7") + + self.assertIsNone(auth_message['message']) + self.assertIsNotNone(auth_message['error']) + self.assertIn("invalid address", str(auth_message["error"]).lower()) + +if __name__ == '__main__': + unittest.main(verbosity=2) \ No newline at end of file From bb559b1bdae9c32623d7aee81031ce6ac268d9cc Mon Sep 17 00:00:00 2001 From: AnonO6 <21ucs043@gmail.com> Date: Sun, 13 Jul 2025 18:23:39 +0000 Subject: [PATCH 2/2] feat:added save shards func --- src/lighthouseweb3/__init__.py | 29 +++- .../functions/kavach/save_shards.py | 75 +++++++++ src/lighthouseweb3/functions/kavach/types.py | 156 ++++++++++++++++++ .../test_get_auth_message.py | 0 tests/tests_kavach/test_save_shards.py | 105 ++++++++++++ 5 files changed, 364 insertions(+), 1 deletion(-) create mode 100644 src/lighthouseweb3/functions/kavach/save_shards.py create mode 100644 src/lighthouseweb3/functions/kavach/types.py rename tests/{ => tests_kavach}/test_get_auth_message.py (100%) create mode 100644 tests/tests_kavach/test_save_shards.py diff --git a/src/lighthouseweb3/__init__.py b/src/lighthouseweb3/__init__.py index 1cb9924..3ec4202 100644 --- a/src/lighthouseweb3/__init__.py +++ b/src/lighthouseweb3/__init__.py @@ -17,7 +17,13 @@ create_wallet as createWallet ) -from .functions.kavach import get_auth_message as getAuthMessage +from .functions.kavach import( + get_auth_message as getAuthMessage, + save_shards as saveShards +) + +from typing import List, Dict, Any, Union +from .functions.kavach.types import AuthToken, KeyShard class Lighthouse: @@ -241,3 +247,24 @@ def getAuthMessage(address: str): except Exception as e: raise e + def saveShards( + address: str, + cid: str, + auth_token: AuthToken, + key_shards: List[KeyShard], + share_to: List[str] = [] + ) -> Dict[str, Union[bool, str, None]]: + """ + Save shards for a given address and CID. + + :param address: str, The address for which to save the shards. + :param cid: str, The content identifier for the data. + :param auth_token: AuthToken, The authentication token. + :param key_shards: List[KeyShard], The list of key shards to save. + :param share_to: List[str], The list of addresses to share the shards with (optional). + :return: dict, A dictionary containing the result of the operation. + """ + try: + return saveShards.save_shards(address, cid, auth_token, key_shards, share_to) + except Exception as e: + raise e \ No newline at end of file diff --git a/src/lighthouseweb3/functions/kavach/save_shards.py b/src/lighthouseweb3/functions/kavach/save_shards.py new file mode 100644 index 0000000..c587125 --- /dev/null +++ b/src/lighthouseweb3/functions/kavach/save_shards.py @@ -0,0 +1,75 @@ +import asyncio +from typing import List, Dict, Any, Union +from .util import api_node_handler, is_cid_reg, is_equal +from .types import AuthToken, KeyShard + +async def save_shards( + address: str, + cid: str, + auth_token: AuthToken, + key_shards: List[KeyShard], + share_to: List[str] = [] +) -> Dict[str, Union[bool, str, None]]: + + if not is_cid_reg(cid): + return { + "isSuccess": False, + "error": "Invalid CID" + } + + if not isinstance(key_shards, list) or len(key_shards) != 5: + return { + "isSuccess": False, + "error": "keyShards must be an array of 5 objects" + } + + try: + node_ids = [1, 2, 3, 4, 5] + node_urls = [f"/api/setSharedKey/{i}" for i in node_ids] + + async def request_data(url: str, index: int) -> Dict[str, Any]: + try: + payload = { + "address": address, + "cid": cid, + "payload": key_shards[index] + } + if share_to: + payload["sharedTo"] = share_to + + response = await api_node_handler(url, "POST", auth_token, payload) + return response + + except Exception as error: + return { + "error": error + } + + data = [] + for index, url in enumerate(node_urls): + response = await request_data(url, index) + if "error" in response: + try: + return { + "isSuccess": False, + "error": str(response["error"]) + } + except Exception: + return { + "isSuccess": False, + "error": "Unknown error" + } + await asyncio.sleep(1) + data.append(response) + + temp = [{**elem, "data": None} for elem in data] + return { + "isSuccess": is_equal(*temp) and data[0].get("message") == "success", + "error": None + } + + except Exception as err: + return { + "isSuccess": False, + "error": str(err) + } diff --git a/src/lighthouseweb3/functions/kavach/types.py b/src/lighthouseweb3/functions/kavach/types.py new file mode 100644 index 0000000..55aaf87 --- /dev/null +++ b/src/lighthouseweb3/functions/kavach/types.py @@ -0,0 +1,156 @@ +from typing import List, Dict, Union, Optional, Any, Literal +from dataclasses import dataclass +from enum import Enum + +ErrorValue = Union[str, List[str], int, bool, None, Dict[str, Any], Any] +SignedMessage = str +JWT = str +AuthToken = Union[SignedMessage, JWT] + +class ChainType(str, Enum): + EVM = "EVM" + EVM_LOWER = "evm" + SOLANA = "SOLANA" + SOLANA_LOWER = "solana" + +class DecryptionType(str, Enum): + ADDRESS = "ADDRESS" + ACCESS_CONDITIONS = "ACCESS_CONDITIONS" + +class StandardContractType(str, Enum): + ERC20 = "ERC20" + ERC721 = "ERC721" + ERC1155 = "ERC1155" + CUSTOM = "Custom" + EMPTY = "" + +class SolanaContractType(str, Enum): + SPL_TOKEN = "spl-token" + EMPTY = "" + +class Comparator(str, Enum): + EQUAL = "==" + GREATER_EQUAL = ">=" + LESS_EQUAL = "<=" + NOT_EQUAL = "!=" + GREATER = ">" + LESS = "<" + +# Data Classes +@dataclass +class KeyShard: + key: str + index: str + +@dataclass +class GeneratedKey: + master_key: Optional[str] + key_shards: List[KeyShard] + +@dataclass +class GenerateInput: + threshold: Optional[int] = None + key_count: Optional[int] = None + +@dataclass +class AuthMessage: + message: Optional[str] + error: Optional[ErrorValue] + +@dataclass +class RecoveredKey: + master_key: Optional[str] + error: Optional[ErrorValue] + +@dataclass +class RecoverShards: + shards: List[KeyShard] + error: ErrorValue + +@dataclass +class LightHouseSDKResponse: + is_success: bool + error: ErrorValue + +@dataclass +class ReturnValueTest: + comparator: Comparator + value: Union[int, str, List[Any]] + +@dataclass +class PDAInterface: + offset: Optional[int] = None + selector: Optional[str] = None + +@dataclass +class EVMCondition: + id: int + standard_contract_type: StandardContractType + chain: str + method: str + return_value_test: ReturnValueTest + contract_address: Optional[str] = None + parameters: Optional[List[Any]] = None + input_array_type: Optional[List[str]] = None + output_type: Optional[str] = None + +@dataclass +class SolanaCondition: + id: int + chain: str + method: str + standard_contract_type: SolanaContractType + pda_interface: PDAInterface + return_value_test: ReturnValueTest + contract_address: Optional[str] = None + parameters: Optional[List[Any]] = None + +# Union Type for Conditions +Condition = Union[EVMCondition, SolanaCondition] + +@dataclass +class UpdateConditionSchema: + chain_type: Literal["EVM", "SOLANA"] + conditions: List[Condition] + decryption_type: Literal["ADDRESS", "ACCESS_CONDITIONS"] + address: str + cid: str + aggregator: Optional[str] = None + +@dataclass +class AccessConditionSchema: + chain_type: Literal["EVM", "SOLANA"] + conditions: List[Condition] + decryption_type: Literal["ADDRESS", "ACCESS_CONDITIONS"] + address: str + cid: str + key_shards: List[Any] + aggregator: Optional[str] = None + +@dataclass +class IGetAccessCondition: + aggregator: str + owner: str + cid: str + conditions: Optional[List[Condition]] = None + conditions_solana: Optional[List[Any]] = None + shared_to: Optional[List[Any]] = None + +def is_jwt(token: str) -> bool: + """Check if token is a JWT (starts with 'jwt:')""" + return token.startswith('jwt:') + +def create_jwt(token: str) -> JWT: + """Create a JWT token with proper prefix""" + if not token.startswith('jwt:'): + return f'jwt:{token}' + return token + +# Type Guards +def is_evm_condition(condition: Condition) -> bool: + """Check if condition is an EVM condition""" + return isinstance(condition, EVMCondition) + +def is_solana_condition(condition: Condition) -> bool: + """Check if condition is a Solana condition""" + return isinstance(condition, SolanaCondition) diff --git a/tests/test_get_auth_message.py b/tests/tests_kavach/test_get_auth_message.py similarity index 100% rename from tests/test_get_auth_message.py rename to tests/tests_kavach/test_get_auth_message.py diff --git a/tests/tests_kavach/test_save_shards.py b/tests/tests_kavach/test_save_shards.py new file mode 100644 index 0000000..488766b --- /dev/null +++ b/tests/tests_kavach/test_save_shards.py @@ -0,0 +1,105 @@ +import unittest +import logging +from src.lighthouseweb3 import Kavach +from web3 import Web3 +from eth_account.messages import encode_defunct + +logger = logging.getLogger(__name__) + +class TestSaveShards(unittest.IsolatedAsyncioTestCase): + """Test cases for the saveShards function.""" + + def setUp(self): + self.private_key = "0x8218aa5dbf4dbec243142286b93e26af521b3e91219583595a06a7765abc9c8b" + self.signer_address = Web3().eth.account.from_key(self.private_key).address + + + async def test_invalid_signature(self): + """Test saveShards with invalid signature.""" + result = await Kavach.saveShards( + address=self.signer_address, + cid="QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH", + auth_token="signature", + key_shards=[ + {"key": "1", "index": "1"}, + {"key": "2", "index": "2"}, + {"key": "3", "index": "3"}, + {"key": "4", "index": "4"}, + {"key": "5", "index": "5"}, + ] + ) + + self.assertFalse(result['isSuccess']) + self.assertTrue(result['error'] is None or isinstance(result['error'], str)) + + async def test_save_key_success(self): + """Test successful key saving.""" + auth_message_result = await Kavach.getAuthMessage(address=self.signer_address) + self.assertIsNone(auth_message_result['error']) + message = auth_message_result['message'] + signature = f"0x{Web3().eth.account.sign_message( + encode_defunct(text=message), + private_key=self.private_key + ).signature.hex()}" + + result = await Kavach.saveShards( + address=self.signer_address, + cid="QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH", + auth_token=signature, + key_shards=[ + {"key": "1", "index": "1"}, + {"key": "2", "index": "2"}, + {"key": "3", "index": "3"}, + {"key": "4", "index": "4"}, + {"key": "5", "index": "5"}, + ] + ) + self.assertTrue(result['isSuccess']) + self.assertIsNone(result['error']) + + async def test_save_key_insufficient_shards(self): + """Test saving key with insufficient shards (should fail).""" + auth_message_result = await Kavach.getAuthMessage(address=self.signer_address) + self.assertIsNone(auth_message_result['error']) + auth_message = auth_message_result['message'] + signature = "0x" + Web3().eth.account.sign_message(encode_defunct(text=auth_message), private_key=self.private_key).signature.hex() + + result = await Kavach.saveShards( + address=self.signer_address, + cid="QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQJ", + auth_token=signature, + key_shards=[ + {"key": "1", "index": "1"}, + {"key": "2", "index": "2"}, + {"key": "3", "index": "3"}, + ] + ) + + self.assertFalse(result['isSuccess']) + self.assertRegex(str(result['error']).lower(), r'keyshards must be an array of 5 objects') + + async def test_invalid_cid(self): + """Test saving key with invalid CID.""" + auth_message_result = await Kavach.getAuthMessage(address=self.signer_address) + self.assertIsNone(auth_message_result['error']) + auth_message = auth_message_result['message'] + signature = "0x" + Web3().eth.account.sign_message(encode_defunct(text=auth_message), private_key=self.private_key).signature.hex() + + result = await Kavach.saveShards( + address=self.signer_address, + cid="cid", + auth_token=signature, + key_shards=[ + {"key": "1", "index": "1"}, + {"key": "2", "index": "2"}, + {"key": "3", "index": "3"}, + {"key": "4", "index": "4"}, + {"key": "5", "index": "5"}, + ] + ) + + self.assertFalse(result['isSuccess']) + self.assertRegex(str(result['error']).lower(), r'invalid cid') + +if __name__ == '__main__': + unittest.main(verbosity=2) \ No newline at end of file