Skip to content

feat: added kavach funcs and tests for methods- accessControl and get_auth_message #44

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ charset-normalizer==3.1.0
idna==3.4
requests==2.31.0
urllib3==2.0.2
eth-account==0.13.7
eth-account==0.13.7
httpx==0.28.1
web3==7.12.0
pydantic==2.11.7
91 changes: 90 additions & 1 deletion src/lighthouseweb3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os
import io
from typing import List, Dict, Any, Optional
from .functions import (
upload as d,
deal_status,
Expand All @@ -16,7 +17,15 @@
remove_ipns_record as removeIpnsRecord,
create_wallet as createWallet
)

from .functions.kavach import (
generate,
recover_key as recoverKey,
shard_key as shardKey,
get_auth_message as getAuthMessage,
types as kavach_types
)
from .functions.kavach.access_control import main as accessControl
from .functions.kavach.types import AuthToken, Condition, ChainType, DecryptionType, KeyShard

class Lighthouse:
def __init__(self, token: str = ""):
Expand Down Expand Up @@ -224,3 +233,83 @@ def getTagged(self, tag: str):
except Exception as e:
raise e

class Kavach:
"""
Kavach is a threshold secret sharing library using shamir secret sharing.
"""

@staticmethod
def generate(threshold: int, keyCount: int) -> Dict[str, Any]:
"""
Generate a master key and sharded the key into key shards

:param threshold: int, number of shards required to recover the key
:param keyCount: int, number of key shards to generate
:return: dict, A dict with master key and key shards
"""
try:
return generate.generate(threshold, keyCount)
except Exception as e:
raise e


@staticmethod
def recoverKey(keyShards: List[Dict[str, Any]]) -> int:
"""
Recover the master key from the given key shards

:param keyShards: List[Dict[str, Any]], A list of key shards
:return: int, The recovered master key
"""
try:
return recoverKey.recover_key(keyShards)
except Exception as e:
raise e

@staticmethod
def shardKey(masterKey: int, threshold: int, keyCount: int) -> Dict[str, Any]:
"""
Shard the given master key into key shards

:param masterKey: int, The master key to be sharded
:param threshold: int, number of shards required to recover the key
:param keyCount: int, number of key shards to generate
:return: dict, A dict with key shards
"""
try:
return shardKey.shard_key(masterKey, threshold, keyCount)
except Exception as e:
raise e

@staticmethod
def accessControl(address: str, cid: str, auth_token: AuthToken, conditions: List[Condition], aggregator: Optional[str] = None, chain_type: ChainType = "evm", key_shards: List[KeyShard] = [], decryption_type: DecryptionType = "ADDRESS"):
"""
Create a new Kavach Access Control Record

:param address: str, The public key of the user
:param cid: str, The cid of the data
:param auth_token: AuthToken, The authorization token
:param conditions: List[Condition], The conditions for access control
:param aggregator: str, The aggregator address
:param chain_type: ChainType, The type of chain
:param key_shards: List[KeyShard], The key shards for access control
:param decryption_type: DecryptionType, The decryption type
:return: dict, A dict with the access control record
"""
try:
return accessControl.access_control(address, cid, auth_token, conditions, aggregator, chain_type, key_shards, decryption_type)
except Exception as e:
raise e


@staticmethod
def getAuthMessage(address: str) -> dict[str, Any]:
"""
Get Authentication message from the server
:param address: str, The public key of the user
:return: dict, A dict with authentication message or error
"""
try:
return getAuthMessage.get_auth_message(address)
except Exception as e:
raise e
3 changes: 3 additions & 0 deletions src/lighthouseweb3/functions/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ class Config:
lighthouse_node = "https://node.lighthouse.storage"
lighthouse_bls_node = "https://encryption.lighthouse.storage"
lighthouse_gateway = "https://gateway.lighthouse.storage/ipfs"

is_dev = False
lighthouse_bls_node_dev = "http://enctest.lighthouse.storage"
Empty file.
130 changes: 130 additions & 0 deletions src/lighthouseweb3/functions/kavach/access_control/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from .validator import UpdateConditionSchema as update_condition_schema, AccessConditionSchema as access_condition_schema
from ..types import (AuthToken,
Condition,
DecryptionType,
KeyShard,
ChainType,
LightHouseSDKResponse)


from src.lighthouseweb3.functions.config import Config
from src.lighthouseweb3.functions.kavach.util import is_equal, is_cid_reg, api_node_handler
from typing import List, Optional
import json
import time


async def access_control(
address: str,
cid: str,
auth_token: AuthToken,
conditions: List[Condition],
aggregator: Optional[str] = None,
chain_type: ChainType = "evm",
key_shards: List[KeyShard] = [],
decryption_type: DecryptionType = "ADDRESS"
) -> LightHouseSDKResponse:
try:
if not isinstance(key_shards, list) or (
len(key_shards) != 5 and len(key_shards) != 0
):
raise ValueError("keyShards must be an array of 5 objects")


if not is_cid_reg(cid):
raise ValueError("Invalid CID")


try:
if len(key_shards) == 5:
access_condition_schema.model_validate({
"address": address,
"cid": cid,
"conditions": conditions,
"aggregator": aggregator,
"decryptionType": decryption_type,
"chainType": chain_type,
"keyShards": key_shards
})
else:
update_condition_schema.model_validate({
"address": address,
"cid": cid,
"conditions": conditions,
"aggregator": aggregator,
"chainType": chain_type
})
except ValueError as e:
raise ValueError(f"Condition validation error: {str(e)}")


node_ids = [1, 2, 3, 4, 5]
node_urls = [
f":900{id}/api/fileAccessConditions/{id}" if Config.is_dev else f"/api/fileAccessConditions/{id}"
for id in node_ids
]


data = []


for index, url in enumerate(node_urls):
try:
if len(key_shards) == 5:
response = await api_node_handler(
url, "POST", auth_token, {
"address": address,
"cid": cid,
"conditions": conditions,
"aggregator": aggregator,
"decryptionType": decryption_type,
"chainType": chain_type,
"payload": key_shards[index]
}
)
else:
response = await api_node_handler(
url, "PUT", auth_token, {
"address": address,
"cid": cid,
"conditions": conditions,
"aggregator": aggregator,
"chainType": chain_type
}
)
except Exception as e:
try:
error_data = json.loads(str(e))
except Exception:
error_data = {"message": str(e)}
response = {"isSuccess": False, "error": error_data}


if response.get("error"):
time.sleep(1)


data.append(response)


success = (
is_equal(*(resp.get("message") for resp in data)) and
data[0].get("message") == "success"
)


return {"isSuccess": success, "error": None}



except Exception as e:
if isinstance(e, ValueError):
return {"isSuccess": False, "error": str(e)}
try:
err = json.loads(str(e))
if isinstance(err, dict) and "message" in err:
return {"isSuccess": False, "error": err["message"]}
return {"isSuccess": False, "error": str(e)}
except Exception:
return {"isSuccess": False, "error": str(e)}

Loading