From 6de062c0368ffd495a86c39e1d4348e36db30f10 Mon Sep 17 00:00:00 2001 From: Arondondon Date: Fri, 11 Oct 2024 13:03:20 +0300 Subject: [PATCH 01/10] Start of implementation of channel caching --- snet/cli/arguments.py | 109 ++++++++--------------- snet/cli/commands/mpe_channel.py | 144 ++++++++++++++++++++++++++----- 2 files changed, 156 insertions(+), 97 deletions(-) diff --git a/snet/cli/arguments.py b/snet/cli/arguments.py index e9ce4b93..af55bd96 100644 --- a/snet/cli/arguments.py +++ b/snet/cli/arguments.py @@ -662,14 +662,6 @@ def add_p_open_channel_basic(p): p.add_argument("--open-new-anyway", action="store_true", help="Skip check that channel already exists and open new channel anyway") - add_p_from_block(p) - - -def add_p_from_block(p): - p.add_argument("--from-block", - type=int, - default=0, - help="Start searching from this block (for channel searching)") def add_mpe_channel_options(parser): @@ -677,38 +669,17 @@ def add_mpe_channel_options(parser): subparsers = parser.add_subparsers(title="Commands", metavar="COMMAND") subparsers.required = True - p = subparsers.add_parser("init", - help="Initialize channel taking org metadata from Registry") - p.set_defaults(fn="init_channel_from_registry") - - add_p_org_id(p) - add_group_name(p) - add_p_registry_address_opt(p) - add_p_mpe_address_opt(p) - add_p_channel_id(p) - - p = subparsers.add_parser("init-metadata", - help="Initialize channel using organization metadata") - p.set_defaults(fn="init_channel_from_metadata") - add_p_org_id(p) - add_group_name(p) - add_p_registry_address_opt(p) - add_p_organization_metadata_file_opt(p) - add_p_mpe_address_opt(p) - add_p_channel_id(p) - add_eth_call_arguments(p) - - p = subparsers.add_parser("open-init", - help="Open and initialize channel using organization metadata from Registry") - p.set_defaults(fn="open_init_channel_from_registry") + p = subparsers.add_parser("open", + help="Open channel using organization metadata from Registry") + p.set_defaults(fn="open_channel_from_registry") add_p_org_id(p) add_p_registry_address_opt(p) add_p_open_channel_basic(p) - p = subparsers.add_parser("open-init-metadata", - help="Open and initialize channel using organization metadata") - p.set_defaults(fn="open_init_channel_from_metadata") + p = subparsers.add_parser("open-from-metadata", + help="Open channel using existing organization metadata") + p.set_defaults(fn="open_channel_from_metadata") add_p_org_id(p) add_p_registry_address_opt(p) add_p_open_channel_basic(p) @@ -724,14 +695,13 @@ def add_p_set_for_extend_add(_p): add_p_mpe_address_opt(p) add_transaction_arguments(p) - p = subparsers.add_parser( - "extend-add", help="Set new expiration for the channel and add funds") + p = subparsers.add_parser("extend-add", help="Set new expiration for the channel and add funds") p.set_defaults(fn="channel_extend_and_add_funds") add_p_channel_id(p) add_p_set_for_extend_add(p) p = subparsers.add_parser("extend-add-for-org", - help="Set new expiration and add funds for the channel for the given service") + help="Set new expiration and add funds for the channel for the given service (organization and group name)") p.set_defaults(fn="channel_extend_and_add_funds_for_org") add_p_org_id(p) add_group_name(p) @@ -739,7 +709,6 @@ def add_p_set_for_extend_add(_p): add_p_set_for_extend_add(p) add_p_group_name(p) add_p_channel_id_opt(p) - add_p_from_block(p) p = subparsers.add_parser("block-number", help="Print the last ethereum block number") @@ -767,70 +736,63 @@ def add_p_sender(_p): default=None, help="Account to set as sender (by default we use the current identity)") - p = subparsers.add_parser("print-initialized", - help="Print initialized channels.") - p.set_defaults(fn="print_initialized_channels") - add_p_only_id(p) - add_p_only_sender_signer(p) - add_p_mpe_address_opt(p) - add_eth_call_arguments(p) - add_p_registry_address_opt(p) - - p = subparsers.add_parser("print-initialized-filter-org", - help="Print initialized channels for the given org (all payment group).") - p.set_defaults(fn="print_initialized_channels_filter_org") - add_p_org_id(p) - add_group_name(p) - add_p_registry_address_opt(p) - add_p_only_id(p) - add_p_only_sender_signer(p) - add_p_mpe_address_opt(p) - add_eth_call_arguments(p) + def add_p_dont_sync_channels(_p): + _p.add_argument("--do-not-sync", "-ds", + action='store_true', + help="Print channels without synchronizing their state") - p = subparsers.add_parser("print-all-filter-sender", + p = subparsers.add_parser("print-filter-sender", help="Print all channels for the given sender.") - p.set_defaults(fn="print_all_channels_filter_sender") + p.set_defaults(fn="print_channels_filter_sender") add_p_only_id(p) add_p_mpe_address_opt(p) - add_p_from_block(p) add_eth_call_arguments(p) add_p_sender(p) + add_p_dont_sync_channels(p) - p = subparsers.add_parser("print-all-filter-recipient", + p = subparsers.add_parser("print-filter-recipient", help="Print all channels for the given recipient.") - p.set_defaults(fn="print_all_channels_filter_recipient") + p.set_defaults(fn="print_channels_filter_recipient") add_p_only_id(p) add_p_mpe_address_opt(p) - add_p_from_block(p) add_eth_call_arguments(p) p.add_argument("--recipient", default=None, help="Account to set as recipient (by default we use the current identity)") + add_p_dont_sync_channels(p) - p = subparsers.add_parser("print-all-filter-group", + p = subparsers.add_parser("print-filter-group", help="Print all channels for the given service.") - p.set_defaults(fn="print_all_channels_filter_group") - + p.set_defaults(fn="print_channels_filter_group") add_p_org_id(p) add_group_name(p) add_p_registry_address_opt(p) add_p_only_id(p) add_p_mpe_address_opt(p) - add_p_from_block(p) add_eth_call_arguments(p) + add_p_dont_sync_channels(p) - p = subparsers.add_parser("print-all-filter-group-sender", + p = subparsers.add_parser("print-filter-group-sender", help="Print all channels for the given group and sender.") - p.set_defaults(fn="print_all_channels_filter_group_sender") - + p.set_defaults(fn="print_channels_filter_group_sender") add_p_org_id(p) add_group_name(p) add_p_registry_address_opt(p) add_p_only_id(p) add_p_mpe_address_opt(p) - add_p_from_block(p) add_eth_call_arguments(p) add_p_sender(p) + add_p_dont_sync_channels(p) + + p = subparsers.add_parser("print-all", + help="Print all channels.") + p.set_defaults(fn="print_all_channels") + add_p_registry_address_opt(p) + add_p_only_id(p) + add_p_only_sender_signer(p) + add_p_mpe_address_opt(p) + add_eth_call_arguments(p) + add_p_dont_sync_channels(p) p = subparsers.add_parser("claim-timeout", help="Claim timeout of the channel") @@ -844,7 +806,6 @@ def add_p_sender(_p): p.set_defaults(fn="channel_claim_timeout_all") add_p_mpe_address_opt(p) add_transaction_arguments(p) - add_p_from_block(p) def add_mpe_client_options(parser): @@ -888,9 +849,7 @@ def add_p_set1_for_call(_p): add_p_org_id_service_id(p) add_group_name(p) add_p_set1_for_call(p) - add_p_channel_id_opt(p) - add_p_from_block(p) p.add_argument("--yes", "-y", action="store_true", help="Skip interactive confirmation of call price", diff --git a/snet/cli/commands/mpe_channel.py b/snet/cli/commands/mpe_channel.py index 0c467c82..73bf48b0 100644 --- a/snet/cli/commands/mpe_channel.py +++ b/snet/cli/commands/mpe_channel.py @@ -9,7 +9,7 @@ from eth_abi.codec import ABICodec from web3._utils.encoding import pad_hex from web3._utils.events import get_event_data -from snet.contracts import get_contract_def +from snet.contracts import get_contract_def, get_contract_deployment_block from snet.cli.commands.commands import OrganizationCommand from snet.cli.metadata.service import mpe_service_metadata_from_json, load_mpe_service_metadata @@ -29,6 +29,52 @@ def _get_persistent_mpe_dir(self): registry_address = self.get_registry_address().lower() return Path.home().joinpath(".snet", "mpe_client", "%s_%s" % (mpe_address, registry_address)) + def _get_channels_cache_file(self): + channels_dir = Path.home().joinpath(".snet", "cache", "mpe") + mpe_address = self.get_mpe_address().lower() + channels_file = channels_dir.joinpath(str(mpe_address), "channels.pickle") + return channels_file + + def _update_channels_cache(self): + channels = [] + last_read_block = get_contract_deployment_block(self.ident.w3, "MultiPartyEscrow") + channels_file = self._get_channels_cache_file() + + if not channels_file.exists(): + self._printout(f"Channels cache is empty. Caching may take some time when first accessing channels.\nCaching in progress...") + channels_file.parent.mkdir(parents=True, exist_ok=True) + with open(channels_file, "wb") as f: + empty_dict = { + "last_read_block": last_read_block, + "channels": channels + } + pickle.dump(empty_dict, f) + else: + with open(channels_file, "rb") as f: + load_dict = pickle.load(f) + last_read_block = load_dict["last_read_block"] + channels = load_dict["channels"] + + current_block_number = self.ident.w3.eth.block_number + + if last_read_block < current_block_number: + new_channels = self._get_all_opened_channels_from_blockchain(last_read_block, current_block_number) + channels = channels + new_channels + last_read_block = current_block_number + + with open(channels_file, "wb") as f: + dict_to_save = { + "last_read_block": last_read_block, + "channels": channels + } + pickle.dump(dict_to_save, f) + + def _get_channels_from_cache(self): + self._update_channels_cache() + with open(self._get_channels_cache_file(), "rb") as f: + load_dict = pickle.load(f) + return load_dict["channels"] + def _get_service_base_dir(self, org_id, service_id): """ get persistent storage for the given service (~/.snet/mpe_client/_///) """ return self._get_persistent_mpe_dir().joinpath(org_id, service_id) @@ -182,6 +228,7 @@ def _check_channel_is_mine(self, channel): "(address=%s sender=%s signer=%s)" % (self.ident.address.lower(), channel["sender"].lower(), channel["signer"].lower())) def _init_channel_from_metadata(self, metadata, org_registration): + # TODO: delete channel_id = self.args.channel_id channel = self._get_channel_state_from_blockchain(channel_id) self._check_channel_is_mine(channel) @@ -196,10 +243,14 @@ def _init_channel_from_metadata(self, metadata, org_registration): self._add_channel_to_initialized(self.args.org_id, channel) def init_channel_from_metadata(self): + # TODO: "channel init-metadata" command + # TODO: delete metadata = OrganizationMetadata.from_file(self.args.metadata_file) self._init_channel_from_metadata(metadata, {}) def init_channel_from_registry(self): + # TODO: "channel init" command + # TODO: delete metadata = self._get_organization_metadata_from_registry( self.args.org_id) org_registration = self._get_organization_registration( @@ -288,6 +339,7 @@ def _initialize_already_opened_channel(self, metadata, sender, signer): return None def _open_init_channel_from_metadata(self, metadata, org_registration): + # TODO: change to not init self._init_or_update_org_if_needed(metadata, org_registration) # Before open new channel we try to find already opened channel @@ -307,11 +359,13 @@ def _open_init_channel_from_metadata(self, metadata, org_registration): # initialize channel self._add_channel_to_initialized(self.args.org_id, channel) - def open_init_channel_from_metadata(self): + def open_channel_from_metadata(self): + # TODO: "channel open-init-metadata" command metadata = OrganizationMetadata.from_file(self.args.metadata_file) self._open_init_channel_from_metadata(metadata, {}) - def open_init_channel_from_registry(self): + def open_channel_from_registry(self): + # TODO: "channel open-init" command metadata = self._get_organization_metadata_from_registry( self.args.org_id) org_registration = self._get_organization_registration( @@ -363,21 +417,21 @@ def check_new_expiration_from_blockchain(self, channel_id, new_expiration): raise Exception("New expiration (%i) is smaller then old one (%i)" % ( new_expiration, channel["expiration"])) - def _smart_get_initialized_channel_for_org(self, metadata, filter_by, is_try_initailize=True): + def _smart_get_channel_for_org(self, metadata, filter_by, is_try_initailize=True): + # TODO: change to not take from initialized ''' - filter_by can be sender or signer ''' channels = self._get_initialized_channels_for_org(self.args.org_id) - group_id = base64.b64decode( - metadata.get_group_id_by_group_name(self.args.group_name)) - channels = [c for c in channels if c[filter_by].lower( - ) == self.ident.address.lower() and c["groupId"] == group_id] + group_id = base64.b64decode(metadata.get_group_id_by_group_name(self.args.group_name)) + channels = [c for c in channels + if c[filter_by].lower() == self.ident.address.lower() and c["groupId"] == group_id] if len(channels) == 0 and is_try_initailize: # this will work only in simple case where signer == sender self._initialize_already_opened_channel( metadata, self.ident.address, self.ident.address) - return self._smart_get_initialized_channel_for_org(metadata, filter_by, is_try_initailize=False) + return self._smart_get_channel_for_org(metadata, filter_by, is_try_initailize=False) if len(channels) == 0: raise Exception("Cannot find initialized channel for service with org_id=%s service_id=%s and signer=%s" % ( @@ -394,16 +448,14 @@ def _smart_get_initialized_channel_for_org(self, metadata, filter_by, is_try_ini raise Exception( "Channel %i has not been initialized or your are not the sender/signer of it" % self.args.channel_id) - def _smart_get_channel_for_org(self): + def channel_extend_and_add_funds_for_org(self): self._init_or_update_registered_org_if_needed() metadata = self._read_metadata_for_org(self.args.org_id) - return self._smart_get_initialized_channel_for_org(metadata, "sender") - - def channel_extend_and_add_funds_for_org(self): - channel_id = self._smart_get_channel_for_org()["channelId"] + channel_id = self._smart_get_channel_for_org(metadata, "sender")["channelId"] self._channel_extend_add_funds_with_channel_id(channel_id) def _get_all_initialized_channels(self): + # TODO: delete """ return dict of lists rez[(, )] = [(channel_id, channel_info)] """ channels_dict = defaultdict(list) for service_base_dir in self._get_persistent_mpe_dir().glob("*/*"): @@ -490,14 +542,53 @@ def _filter_channels_sender_or_signer(self, channels): return good_channels def print_initialized_channels(self): + # TODO: "print-initialized" command + # TODO: delete channels_dict = self._get_all_initialized_channels() self._print_channels_dict_from_blockchain(channels_dict) - def print_initialized_channels_filter_org(self): - channels = self._get_initialized_channels_for_org(self.args.org_id) - self._print_channels_dict_from_blockchain({self.args.org_id: channels}) + def _event_data_args_to_dict(self, event_data): + return { + "channel_id": event_data["channelId"], + "sender": event_data["sender"], + "signer": event_data["signer"], + "recipient": event_data["recipient"], + "group_id": event_data["groupId"], + } + + def _get_all_opened_channels_from_blockchain(self, starting_block_number, to_block_number): + mpe_address = self.get_mpe_address() + event_topics = self.ident.w3.keccak( + text="ChannelOpen(uint256,uint256,address,address,address,bytes32,uint256,uint256)").hex() + blocks_per_batch = 5000 + codec: ABICodec = self.ident.w3.codec + + logs = [] + from_block = starting_block_number + while from_block <= to_block_number: + to_block = min(from_block + blocks_per_batch, to_block_number) + logs += self.ident.w3.eth.get_logs({"fromBlock": from_block, + "toBlock": to_block, + "address": mpe_address, + "topics": event_topics}) + from_block = to_block + 1 + + abi = get_contract_def("MultiPartyEscrow") + event_abi = abi_get_element_by_name(abi, "ChannelOpen") + + event_data_list = [get_event_data(codec, event_abi, l)["args"] for l in logs] + channels_opened = list(map(self._event_data_args_to_dict, event_data_list)) + + return channels_opened + + def _get_filtered_channels(self, **kwargs): + channels = self._get_channels_from_cache() + for key, value in kwargs.items(): + channels = [c for c in channels if c[key] == value] + return channels def _get_all_filtered_channels(self, topics_without_signature): + # TODO: delete """ get all filtered chanels from blockchain logs """ mpe_address = self.get_mpe_address() event_signature = self.ident.w3.keccak( @@ -505,7 +596,7 @@ def _get_all_filtered_channels(self, topics_without_signature): codec: ABICodec = self.ident.w3.codec topics = [event_signature] + topics_without_signature logs = self.ident.w3.eth.get_logs( - {"fromBlock": self.args.from_block, "address": mpe_address, "topics": topics}) + {"fromBlock": get_contract_deployment_block(self.ident.w3, "MultiPartyEscrow"), "address": mpe_address, "topics": topics}) abi = get_contract_def("MultiPartyEscrow") event_abi = abi_get_element_by_name(abi, "ChannelOpen") channels_ids = [get_event_data(codec, event_abi, l)[ @@ -517,18 +608,24 @@ def get_address_from_arg_or_ident(self, arg): return arg return self.ident.address - def print_all_channels_filter_sender(self): + def print_channels_filter_org(self): + # TODO: "print-initialized-filter-org" command + # TODO: change to "print-filter_org" + channels = self._get_initialized_channels_for_org(self.args.org_id) + self._print_channels_dict_from_blockchain({self.args.org_id: channels}) + + def print_channels_filter_sender(self): address = self.get_address_from_arg_or_ident(self.args.sender) channels_ids = self._get_all_channels_filter_sender(address) self._print_channels_from_blockchain(channels_ids) - def print_all_channels_filter_recipient(self): + def print_channels_filter_recipient(self): address = self.get_address_from_arg_or_ident(self.args.recipient) address_padded = pad_hex(address.lower(), 256) channels_ids = self._get_all_filtered_channels([None, address_padded]) self._print_channels_from_blockchain(channels_ids) - def print_all_channels_filter_group(self): + def print_channels_filter_group(self): metadata = self._get_organization_metadata_from_registry( self.args.org_id) group_id = base64.b64decode( @@ -538,7 +635,7 @@ def print_all_channels_filter_group(self): [None, None, group_id_hex]) self._print_channels_from_blockchain(channels_ids) - def print_all_channels_filter_group_sender(self): + def print_channels_filter_group_sender(self): address = self.get_address_from_arg_or_ident(self.args.sender) address_padded = pad_hex(address.lower(), 256) metadata = self._get_organization_metadata_from_registry( @@ -550,6 +647,9 @@ def print_all_channels_filter_group_sender(self): [address_padded, None, group_id_hex]) self._print_channels_from_blockchain(channels_ids) + def print_all_channels(self): + pass + def _get_all_channels_filter_sender(self, sender): sender_padded = pad_hex(sender.lower(), 256) channels_ids = self._get_all_filtered_channels([sender_padded]) From f036279ddf87cc5b3e9caf028b8a874873aadfa6 Mon Sep 17 00:00:00 2001 From: Arondondon Date: Mon, 14 Oct 2024 16:11:51 +0300 Subject: [PATCH 02/10] Implemented cache updating and loading to output channels info. --- snet/cli/arguments.py | 19 +-- snet/cli/commands/mpe_channel.py | 241 ++++++++++++++++++------------- 2 files changed, 144 insertions(+), 116 deletions(-) diff --git a/snet/cli/arguments.py b/snet/cli/arguments.py index af55bd96..ff394aba 100644 --- a/snet/cli/arguments.py +++ b/snet/cli/arguments.py @@ -717,19 +717,8 @@ def add_p_set_for_extend_add(_p): def add_p_only_id(_p): _p.add_argument("--only-id", action='store_true', - help="Print only id of channels") - - def add_p_only_sender_signer(_p): - pm = _p.add_mutually_exclusive_group(required=False) - pm.add_argument("--filter-sender", - action='store_true', - help="Print only channels in which current identity is sender") - pm.add_argument("--filter-signer", - action='store_true', - help="Print only channels in which current identity is signer") - pm.add_argument("--filter-my", - action='store_true', - help="Print only channels in which current identity is sender or signer") + help="Print only id of channels", + default=False) def add_p_sender(_p): _p.add_argument("--sender", @@ -739,7 +728,8 @@ def add_p_sender(_p): def add_p_dont_sync_channels(_p): _p.add_argument("--do-not-sync", "-ds", action='store_true', - help="Print channels without synchronizing their state") + help="Print channels without synchronizing their state", + default=False) p = subparsers.add_parser("print-filter-sender", help="Print all channels for the given sender.") @@ -789,7 +779,6 @@ def add_p_dont_sync_channels(_p): p.set_defaults(fn="print_all_channels") add_p_registry_address_opt(p) add_p_only_id(p) - add_p_only_sender_signer(p) add_p_mpe_address_opt(p) add_eth_call_arguments(p) add_p_dont_sync_channels(p) diff --git a/snet/cli/commands/mpe_channel.py b/snet/cli/commands/mpe_channel.py index 73bf48b0..0b85456b 100644 --- a/snet/cli/commands/mpe_channel.py +++ b/snet/cli/commands/mpe_channel.py @@ -4,6 +4,7 @@ import shutil import tempfile from collections import defaultdict +from distutils.command.check import check from pathlib import Path from eth_abi.codec import ABICodec @@ -75,6 +76,68 @@ def _get_channels_from_cache(self): load_dict = pickle.load(f) return load_dict["channels"] + def _event_data_args_to_dict(self, event_data): + return { + "channel_id": event_data["channelId"], + "sender": event_data["sender"], + "signer": event_data["signer"], + "recipient": event_data["recipient"], + "group_id": event_data["groupId"], + } + + def _get_all_opened_channels_from_blockchain(self, starting_block_number, to_block_number): + mpe_address = self.get_mpe_address() + event_topics = [self.ident.w3.keccak( + text="ChannelOpen(uint256,uint256,address,address,address,bytes32,uint256,uint256)").hex()] + blocks_per_batch = 5000 + codec: ABICodec = self.ident.w3.codec + + logs = [] + from_block = starting_block_number + while from_block <= to_block_number: + to_block = min(from_block + blocks_per_batch, to_block_number) + logs += self.ident.w3.eth.get_logs({"fromBlock": from_block, + "toBlock": to_block, + "address": mpe_address, + "topics": event_topics}) + from_block = to_block + 1 + + abi = get_contract_def("MultiPartyEscrow") + event_abi = abi_get_element_by_name(abi, "ChannelOpen") + + event_data_list = [get_event_data(codec, event_abi, l)["args"] for l in logs] + channels_opened = list(map(self._event_data_args_to_dict, event_data_list)) + + return channels_opened + + def _get_filtered_channels(self, return_only_id=False, **kwargs): + channels = self._get_channels_from_cache() + for key, value in kwargs.items(): + if key == "group_id": + check_ch = lambda c: base64.b64encode(c[key]).decode("utf-8") == value + else: + check_ch = lambda c: c[key] == value + channels = [c for c in channels if check_ch(c)] + if return_only_id: + return [c["channel_id"] for c in channels] + return channels + + def _get_all_filtered_channels(self, topics_without_signature): + # TODO: delete + """ get all filtered chanels from blockchain logs """ + mpe_address = self.get_mpe_address() + event_signature = self.ident.w3.keccak( + text="ChannelOpen(uint256,uint256,address,address,address,bytes32,uint256,uint256)").hex() + codec: ABICodec = self.ident.w3.codec + topics = [event_signature] + topics_without_signature + logs = self.ident.w3.eth.get_logs( + {"fromBlock": get_contract_deployment_block(self.ident.w3, "MultiPartyEscrow"), "address": mpe_address, "topics": topics}) + abi = get_contract_def("MultiPartyEscrow") + event_abi = abi_get_element_by_name(abi, "ChannelOpen") + channels_ids = [get_event_data(codec, event_abi, l)[ + "args"]["channelId"] for l in logs] + return channels_ids + def _get_service_base_dir(self, org_id, service_id): """ get persistent storage for the given service (~/.snet/mpe_client/_///) """ return self._get_persistent_mpe_dir().joinpath(org_id, service_id) @@ -322,12 +385,9 @@ def _open_channel_for_org(self, metadata): def _initialize_already_opened_channel(self, metadata, sender, signer): - group_id = base64.b64decode( - metadata.get_group_id_by_group_name(self.args.group_name)) - recipient = metadata.get_payment_address_for_group( - self.args.group_name) - channels_ids = self._get_all_channels_filter_sender_recipient_group( - sender, recipient, group_id) + group_id = base64.b64decode(metadata.get_group_id_by_group_name(self.args.group_name)) + recipient = metadata.get_payment_address_for_group(self.args.group_name) + channels_ids = self._get_all_channels_filter_sender_recipient_group(sender, recipient, group_id) for i in sorted(channels_ids): channel = self._get_channel_state_from_blockchain(i) if channel["signer"].lower() == signer.lower(): @@ -360,12 +420,12 @@ def _open_init_channel_from_metadata(self, metadata, org_registration): self._add_channel_to_initialized(self.args.org_id, channel) def open_channel_from_metadata(self): - # TODO: "channel open-init-metadata" command + # TODO: "channel open-metadata" command metadata = OrganizationMetadata.from_file(self.args.metadata_file) self._open_init_channel_from_metadata(metadata, {}) def open_channel_from_registry(self): - # TODO: "channel open-init" command + # TODO: "channel open" command metadata = self._get_organization_metadata_from_registry( self.args.org_id) org_registration = self._get_organization_registration( @@ -481,21 +541,55 @@ def _read_metadata_for_org(self, org_id): "Service with org_id=%s is not initialized" % (org_id)) return OrganizationMetadata.from_file(sdir.joinpath("organization_metadata.json")) - def _print_channels_from_blockchain(self, channels_ids): - channels_ids = sorted(channels_ids) + def _convert_channel_dict_to_str(self, channel, filters=None): + if filters is None: + filters = [] + for key, value in channel.items(): + channel[key] = str(value) + # converting to string not using " ".join() to always have the same order + channel_as_str = "" + channel_as_str += channel["channel_id"] + channel_as_str += " " + channel["nonce"] if "nonce" in channel else "" + channel_as_str += " " + channel["sender"] if "sender" not in filters else "" + channel_as_str += " " + channel["signer"] + channel_as_str += " " + channel["recipient"] if "recipient" not in filters else "" + channel_as_str += " " + channel["group_id"] if "group_id" not in filters else "" + channel_as_str += " " + channel["value"] if "value" in channel else "" + channel_as_str += " " + channel["expiration"] if "expiration" in channel else "" + return channel_as_str + + + def _print_channels(self, channels, filters: list[str] = None): + if filters is None: + filters = [] + if self.args.only_id: - self._printout("#channelId") - [self._printout(str(i)) for i in channels_ids] + self._printout("#channel_id") + [self._printout(ch_id) for ch_id in channels] return - self._printout( - "#channelId nonce recipient groupId(base64) value(AGIX) expiration(blocks)") - for i in channels_ids: - channel = self._get_channel_state_from_blockchain(i) - value_agi = cogs2stragix(channel["value"]) - group_id_base64 = base64.b64encode( - channel["groupId"]).decode("ascii") - self._printout("%i %i %s %s %s %i" % (i, channel["nonce"], channel["recipient"], group_id_base64, - value_agi, channel["expiration"])) + + titles = ["channel_id", "nonce", "sender", "signer", "recipient", "group_id", "value", "expiration"] + for channel_filter in filters: + titles.remove(channel_filter) + + if self.args.do_not_sync: + titles.remove("nonce") + titles.remove("value") + titles.remove("expiration") + self._printout("#" + " ".join(titles)) + for channel in channels: + channel["group_id"] = base64.b64encode(channel["group_id"]).decode("ascii") + self._printout(self._convert_channel_dict_to_str(channel, filters)) + return + + channels_ids = sorted(channels) + self._printout("#" + " ".join(titles)) + for channel_id in channels_ids: + channel = self._get_channel_state_from_blockchain(channel_id) + channel["channel_id"] = channel_id + channel["value"] = cogs2stragix(channel["value"]) + channel["group_id"] = base64.b64encode(channel["groupId"]).decode("ascii") + self._printout(self._convert_channel_dict_to_str(channel, filters)) def _print_channels_dict_from_blockchain(self, channels_dict): # print only caption @@ -547,62 +641,6 @@ def print_initialized_channels(self): channels_dict = self._get_all_initialized_channels() self._print_channels_dict_from_blockchain(channels_dict) - def _event_data_args_to_dict(self, event_data): - return { - "channel_id": event_data["channelId"], - "sender": event_data["sender"], - "signer": event_data["signer"], - "recipient": event_data["recipient"], - "group_id": event_data["groupId"], - } - - def _get_all_opened_channels_from_blockchain(self, starting_block_number, to_block_number): - mpe_address = self.get_mpe_address() - event_topics = self.ident.w3.keccak( - text="ChannelOpen(uint256,uint256,address,address,address,bytes32,uint256,uint256)").hex() - blocks_per_batch = 5000 - codec: ABICodec = self.ident.w3.codec - - logs = [] - from_block = starting_block_number - while from_block <= to_block_number: - to_block = min(from_block + blocks_per_batch, to_block_number) - logs += self.ident.w3.eth.get_logs({"fromBlock": from_block, - "toBlock": to_block, - "address": mpe_address, - "topics": event_topics}) - from_block = to_block + 1 - - abi = get_contract_def("MultiPartyEscrow") - event_abi = abi_get_element_by_name(abi, "ChannelOpen") - - event_data_list = [get_event_data(codec, event_abi, l)["args"] for l in logs] - channels_opened = list(map(self._event_data_args_to_dict, event_data_list)) - - return channels_opened - - def _get_filtered_channels(self, **kwargs): - channels = self._get_channels_from_cache() - for key, value in kwargs.items(): - channels = [c for c in channels if c[key] == value] - return channels - - def _get_all_filtered_channels(self, topics_without_signature): - # TODO: delete - """ get all filtered chanels from blockchain logs """ - mpe_address = self.get_mpe_address() - event_signature = self.ident.w3.keccak( - text="ChannelOpen(uint256,uint256,address,address,address,bytes32,uint256,uint256)").hex() - codec: ABICodec = self.ident.w3.codec - topics = [event_signature] + topics_without_signature - logs = self.ident.w3.eth.get_logs( - {"fromBlock": get_contract_deployment_block(self.ident.w3, "MultiPartyEscrow"), "address": mpe_address, "topics": topics}) - abi = get_contract_def("MultiPartyEscrow") - event_abi = abi_get_element_by_name(abi, "ChannelOpen") - channels_ids = [get_event_data(codec, event_abi, l)[ - "args"]["channelId"] for l in logs] - return channels_ids - def get_address_from_arg_or_ident(self, arg): if arg: return arg @@ -610,45 +648,46 @@ def get_address_from_arg_or_ident(self, arg): def print_channels_filter_org(self): # TODO: "print-initialized-filter-org" command - # TODO: change to "print-filter_org" + # TODO: delete channels = self._get_initialized_channels_for_org(self.args.org_id) self._print_channels_dict_from_blockchain({self.args.org_id: channels}) def print_channels_filter_sender(self): + # we don't need to return other channel fields if we only need channel_id or if we'll sync channels state + return_only_id = self.args.only_id or not self.args.do_not_sync address = self.get_address_from_arg_or_ident(self.args.sender) - channels_ids = self._get_all_channels_filter_sender(address) - self._print_channels_from_blockchain(channels_ids) + channels = self._get_filtered_channels(return_only_id=return_only_id, sender=address) + self._print_channels(channels, ["sender"]) def print_channels_filter_recipient(self): + # we don't need to return other channel fields if we only need channel_id or if we'll sync channels state + return_only_id = self.args.only_id or not self.args.do_not_sync address = self.get_address_from_arg_or_ident(self.args.recipient) - address_padded = pad_hex(address.lower(), 256) - channels_ids = self._get_all_filtered_channels([None, address_padded]) - self._print_channels_from_blockchain(channels_ids) + channels = self._get_filtered_channels(return_only_id=return_only_id, recipient=address) + self._print_channels(channels, ["recipient"]) def print_channels_filter_group(self): - metadata = self._get_organization_metadata_from_registry( - self.args.org_id) - group_id = base64.b64decode( - metadata.get_group_id_by_group_name(self.args.group_name)) - group_id_hex = "0x" + group_id.hex() - channels_ids = self._get_all_filtered_channels( - [None, None, group_id_hex]) - self._print_channels_from_blockchain(channels_ids) + # we don't need to return other channel fields if we only need channel_id or if we'll sync channels state + return_only_id = self.args.only_id or not self.args.do_not_sync + metadata = self._get_organization_metadata_from_registry(self.args.org_id) + group_id = metadata.get_group_id_by_group_name(self.args.group_name) + channels = self._get_filtered_channels(return_only_id=return_only_id, group_id=group_id) + self._print_channels(channels, ["group_id"]) def print_channels_filter_group_sender(self): + # we don't need to return other channel fields if we only need channel_id or if we'll sync channels state + return_only_id = self.args.only_id or not self.args.do_not_sync address = self.get_address_from_arg_or_ident(self.args.sender) - address_padded = pad_hex(address.lower(), 256) - metadata = self._get_organization_metadata_from_registry( - self.args.org_id) - group_id = base64.b64decode( - metadata.get_group_id_by_group_name(self.args.group_name)) - group_id_hex = "0x" + group_id.hex() - channels_ids = self._get_all_filtered_channels( - [address_padded, None, group_id_hex]) - self._print_channels_from_blockchain(channels_ids) + metadata = self._get_organization_metadata_from_registry(self.args.org_id) + group_id = metadata.get_group_id_by_group_name(self.args.group_name) + channels = self._get_filtered_channels(return_only_id=return_only_id, sender=address, group_id=group_id) + self._print_channels(channels, ["sender", "group_id"]) def print_all_channels(self): - pass + # we don't need to return other channel fields if we only need channel_id or if we'll sync channels state + return_only_id = self.args.only_id or not self.args.do_not_sync + channels = self._get_filtered_channels(return_only_id=return_only_id) + self._print_channels(channels) def _get_all_channels_filter_sender(self, sender): sender_padded = pad_hex(sender.lower(), 256) From bd00e33ab8ba8f9ae45ae91a0785d85de16e6118 Mon Sep 17 00:00:00 2001 From: Arondondon Date: Wed, 16 Oct 2024 13:18:34 +0300 Subject: [PATCH 03/10] Finished changes in "snet channel" section. --- snet/cli/arguments.py | 5 +- snet/cli/commands/mpe_channel.py | 101 +++++++++++++++++-------------- 2 files changed, 59 insertions(+), 47 deletions(-) diff --git a/snet/cli/arguments.py b/snet/cli/arguments.py index d9542dbb..5a731e4c 100644 --- a/snet/cli/arguments.py +++ b/snet/cli/arguments.py @@ -1162,7 +1162,6 @@ def add_p_publish_params(_p): _p.add_argument("--update-mpe-address", action='store_true', help="Update mpe_address in metadata before publishing them") - add_p_storage_param(p) add_p_mpe_address_opt(_p) p = subparsers.add_parser("publish", @@ -1171,7 +1170,8 @@ def add_p_publish_params(_p): add_p_publish_params(p) add_p_service_in_registry(p) add_transaction_arguments(p) - + add_p_storage_param(p) + p = subparsers.add_parser("publish-in-ipfs", help="Publish metadata only in IPFS, without publishing in Registry") p.set_defaults(fn="publish_metadata_in_ipfs") @@ -1190,6 +1190,7 @@ def add_p_publish_params(_p): add_p_publish_params(p) add_p_service_in_registry(p) add_transaction_arguments(p) + add_p_storage_param(p) p = subparsers.add_parser("update-add-tags", help="Add tags to existed service registration") diff --git a/snet/cli/commands/mpe_channel.py b/snet/cli/commands/mpe_channel.py index 53d934e9..eb5c5412 100644 --- a/snet/cli/commands/mpe_channel.py +++ b/snet/cli/commands/mpe_channel.py @@ -117,7 +117,7 @@ def _get_filtered_channels(self, return_only_id=False, **kwargs): check_ch = lambda c: base64.b64encode(c[key]).decode("utf-8") == value else: check_ch = lambda c: c[key] == value - channels = [c for c in channels if check_ch(c)] + channels = [ch for ch in channels if check_ch(ch)] if return_only_id: return [c["channel_id"] for c in channels] return channels @@ -351,28 +351,26 @@ def _get_expiration_from_args(self): return rez def _open_channel_for_org(self, metadata): + # TODO: needed mpe_cogs = self.call_contract_command( "MultiPartyEscrow", "balances", [self.ident.address]) if mpe_cogs < self.args.amount: raise Exception( "insufficient funds. You MPE balance is %s AGIX " % cogs2stragix(mpe_cogs)) - group_id = base64.b64decode( - metadata.get_group_id_by_group_name(self.args.group_name)) + group_id = base64.b64decode(metadata.get_group_id_by_group_name(self.args.group_name)) if not group_id: - raise Exception( - "group %s is associated with organization", self.args.group_name) + raise Exception("group %s is associated with organization", self.args.group_name) - recipient = metadata.get_payment_address_for_group( - self.args.group_name) + recipient = metadata.get_payment_address_for_group(self.args.group_name) signer = self.get_address_from_arg_or_ident(self.args.signer) channel_info = {"sender": self.ident.address, "signer": signer, - "recipient": recipient, "groupId": group_id} + "recipient": recipient, "group_id": group_id} expiration = self._get_expiration_from_args() params = [channel_info["signer"], channel_info["recipient"], - channel_info["groupId"], self.args.amount, expiration] + channel_info["group_id"], self.args.amount, expiration] rez = self.transact_contract_command( "MultiPartyEscrow", "openChannel", params) @@ -380,11 +378,11 @@ def _open_channel_for_org(self, metadata): raise Exception( "We've expected only one ChannelOpen event after openChannel. Make sure that you use correct MultiPartyEscrow address") - channel_info["channelId"] = rez[1][0]["args"]["channelId"] + channel_info["channel_id"] = rez[1][0]["args"]["channelId"] return channel_info def _initialize_already_opened_channel(self, metadata, sender, signer): - + # TODO: delete group_id = base64.b64decode(metadata.get_group_id_by_group_name(self.args.group_name)) recipient = metadata.get_payment_address_for_group(self.args.group_name) channels_ids = self._get_all_channels_filter_sender_recipient_group(sender, recipient, group_id) @@ -398,31 +396,42 @@ def _initialize_already_opened_channel(self, metadata, sender, signer): return channel return None - def _open_init_channel_from_metadata(self, metadata, org_registration): - # TODO: change to not init + def _check_already_opened_channel(self, metadata, sender, signer): + group_id = metadata.get_group_id_by_group_name(self.args.group_name) + recipient = metadata.get_payment_address_for_group(self.args.group_name) + channels = self._get_filtered_channels(sender=sender, recipient=recipient, group_id=group_id) + + for i in channels: + channel = self._get_channel_state_from_blockchain(i) + if channel["signer"].lower() == signer.lower(): + self._printerr( + "# Channel with given sender, signer and group_id is already exists. (channel_id = %i)" + % channel["channel_id"]) + return channel + + return None + + def _open_channel_from_metadata(self, metadata, org_registration): + # TODO: needed self._init_or_update_org_if_needed(metadata, org_registration) # Before open new channel we try to find already opened channel if not self.args.open_new_anyway: sender = self.ident.address signer = self.get_address_from_arg_or_ident(self.args.signer) - channel = self._initialize_already_opened_channel( - metadata, sender, signer) + channel = self._check_already_opened_channel(metadata, sender, signer) if channel is not None: return # open payment channel channel = self._open_channel_for_org(metadata) self._printout("#channel_id") - self._printout(channel["channelId"]) - - # initialize channel - self._add_channel_to_initialized(self.args.org_id, channel) + self._printout(channel["channel_id"]) def open_channel_from_metadata(self): - # TODO: "channel open-metadata" command + # TODO: "channel open-from-metadata" command metadata = OrganizationMetadata.from_file(self.args.metadata_file) - self._open_init_channel_from_metadata(metadata, {}) + self._open_channel_from_metadata(metadata, {}) def open_channel_from_registry(self): # TODO: "channel open" command @@ -430,17 +439,19 @@ def open_channel_from_registry(self): self.args.org_id) org_registration = self._get_organization_registration( self.args.org_id) - self._open_init_channel_from_metadata(metadata, org_registration) + self._open_channel_from_metadata(metadata, org_registration) def channel_claim_timeout(self): rez = self._get_channel_state_from_blockchain(self.args.channel_id) - if rez["value"] == 0: + if rez["expiration"] >= self.ident.w3.eth.block_number: + raise Exception("Channel is not expired yet") + elif rez["value"] == 0: raise Exception("Channel has 0 value. There is nothing to claim") self.transact_contract_command( "MultiPartyEscrow", "channelClaimTimeout", [self.args.channel_id]) def channel_claim_timeout_all(self): - channels_ids = self._get_all_channels_filter_sender(self.ident.address) + channels_ids = self._get_filtered_channels(return_only_id=True, sender=self.ident.address) for channel_id in channels_ids: response = self._get_channel_state_from_blockchain(channel_id) if response["value"] > 0 and response["expiration"] < self.ident.w3.eth.block_number: @@ -457,7 +468,7 @@ def _channel_extend_add_funds_with_channel_id(self, channel_id): return expiration = self._get_expiration_from_args() - self.check_new_expiration_from_blockchain(channel_id, expiration) + self._check_new_expiration_from_blockchain(channel_id, expiration) # only extend channel (if --amount hasn't been specified) if self.args.amount is None: self.transact_contract_command("MultiPartyEscrow", "channelExtend", [ @@ -471,42 +482,40 @@ def _channel_extend_add_funds_with_channel_id(self, channel_id): def channel_extend_and_add_funds(self): self._channel_extend_add_funds_with_channel_id(self.args.channel_id) - def check_new_expiration_from_blockchain(self, channel_id, new_expiration): + def _check_new_expiration_from_blockchain(self, channel_id, new_expiration): channel = self._get_channel_state_from_blockchain(channel_id) if new_expiration < channel["expiration"]: raise Exception("New expiration (%i) is smaller then old one (%i)" % ( new_expiration, channel["expiration"])) - def _smart_get_channel_for_org(self, metadata, filter_by, is_try_initailize=True): - # TODO: change to not take from initialized + def _smart_get_channel_for_org(self, metadata, filter_by): + # TODO: needed ''' - filter_by can be sender or signer ''' - channels = self._get_initialized_channels_for_org(self.args.org_id) - group_id = base64.b64decode(metadata.get_group_id_by_group_name(self.args.group_name)) - channels = [c for c in channels - if c[filter_by].lower() == self.ident.address.lower() and c["groupId"] == group_id] - - if len(channels) == 0 and is_try_initailize: - # this will work only in simple case where signer == sender - self._initialize_already_opened_channel( - metadata, self.ident.address, self.ident.address) - return self._smart_get_channel_for_org(metadata, filter_by, is_try_initailize=False) + recipient = metadata.get_payment_address_for_group(self.args.group_name) + group_id = metadata.get_group_id_by_group_name(self.args.group_name) + channels = self._get_filtered_channels(return_only_id=False, recipient=recipient, group_id=group_id) + channels = [c for c in channels if c[filter_by].lower() == self.ident.address.lower()] if len(channels) == 0: - raise Exception("Cannot find initialized channel for service with org_id=%s service_id=%s and signer=%s" % ( - self.args.org_id, self.args.service_id, self.ident.address)) + if self.args.service_id: + raise Exception("Cannot find channel for service with org_id=%s service_id=%s group_name=%s and signer=%s" % ( + self.args.org_id, self.args.service_id, self.args.group_name, self.ident.address)) + else: + raise Exception("Cannot find channel for org_id=%s group_name=%s and signer=%s" % ( + self.args.org_id, self.args.group_name, self.ident.address)) if self.args.channel_id is None: if len(channels) > 1: - channel_ids = [channel["channelId"] for channel in channels] + channel_ids = [channel["channel_id"] for channel in channels] raise Exception( - "We have several initialized channel: %s. You should use --channel-id to select one" % str(channel_ids)) + "We have several channels: %s. You should use --channel-id to select one" % str(channel_ids)) return channels[0] for channel in channels: - if channel["channelId"] == self.args.channel_id: + if channel["channel_id"] == self.args.channel_id: return channel raise Exception( - "Channel %i has not been initialized or your are not the sender/signer of it" % self.args.channel_id) + "Channel %i has not been opened or you are not the sender/signer of it" % self.args.channel_id) def channel_extend_and_add_funds_for_org(self): self._init_or_update_registered_org_if_needed() @@ -690,11 +699,13 @@ def print_all_channels(self): self._print_channels(channels) def _get_all_channels_filter_sender(self, sender): + # TODO: delete sender_padded = pad_hex(sender.lower(), 256) - channels_ids = self._get_all_filtered_channels([sender_padded]) + channels_ids = self._get_filtered_channels(return_only_id=True, sender=sender_padded) return channels_ids def _get_all_channels_filter_sender_recipient_group(self, sender, recipient, group_id): + # TODO: delete sender_padded = pad_hex(sender.lower(), 256) recipient_padded = pad_hex(recipient.lower(), 256) group_id_hex = "0x" + group_id.hex() From d2db5f4220e8a05136f34ee1c16398ffd825b56f Mon Sep 17 00:00:00 2001 From: Arondondon Date: Wed, 16 Oct 2024 13:53:38 +0300 Subject: [PATCH 04/10] Changed "client call" command. Deleted unnecessary methods in mpe_channel.py. --- snet/cli/commands/mpe_channel.py | 170 +---------------------------- snet/cli/commands/mpe_client.py | 7 +- snet/cli/commands/mpe_treasurer.py | 3 +- 3 files changed, 4 insertions(+), 176 deletions(-) diff --git a/snet/cli/commands/mpe_channel.py b/snet/cli/commands/mpe_channel.py index eb5c5412..2ec326e7 100644 --- a/snet/cli/commands/mpe_channel.py +++ b/snet/cli/commands/mpe_channel.py @@ -122,22 +122,6 @@ def _get_filtered_channels(self, return_only_id=False, **kwargs): return [c["channel_id"] for c in channels] return channels - def _get_all_filtered_channels(self, topics_without_signature): - # TODO: delete - """ get all filtered chanels from blockchain logs """ - mpe_address = self.get_mpe_address() - event_signature = self.ident.w3.keccak( - text="ChannelOpen(uint256,uint256,address,address,address,bytes32,uint256,uint256)").hex() - codec: ABICodec = self.ident.w3.codec - topics = [event_signature] + topics_without_signature - logs = self.ident.w3.eth.get_logs( - {"fromBlock": get_contract_deployment_block(self.ident.w3, "MultiPartyEscrow"), "address": mpe_address, "topics": topics}) - abi = get_contract_def("MultiPartyEscrow") - event_abi = abi_get_element_by_name(abi, "ChannelOpen") - channels_ids = [get_event_data(codec, event_abi, l)[ - "args"]["channelId"] for l in logs] - return channels_ids - def _get_service_base_dir(self, org_id, service_id): """ get persistent storage for the given service (~/.snet/mpe_client/_///) """ return self._get_persistent_mpe_dir().joinpath(org_id, service_id) @@ -163,7 +147,6 @@ def is_service_initialized(self): def _get_org_base_dir(self, org_id): """ get persistent storage for the given service (~/.snet/mpe_client/_//) """ return self._get_persistent_mpe_dir().joinpath(org_id) - # def get_org_spec_dir(self, org_id): """ get persistent storage for the given service (~/.snet/mpe_client///) """ @@ -179,28 +162,6 @@ def _get_service_metadata(self): os.path.join(service_dir, "service_metadata.json")) return service_metadata - def _add_channel_to_initialized(self, org_id, channel): - channels_dict = self._get_initialized_channels_dict_for_org(org_id) - channels_dict[channel["channelId"]] = channel - - # replace old file atomically (os.rename is more or less atomic) - tmp = tempfile.NamedTemporaryFile(delete=False) - pickle.dump(channels_dict, open(tmp.name, "wb")) - shutil.move(tmp.name, self._get_channels_info_file(org_id)) - - def _get_initialized_channels_dict_for_org(self, org_id): - '''return {channel_id: channel}''' - fn = self._get_channels_info_file(org_id) - if os.path.isfile(fn): - return pickle.load(open(fn, "rb")) - else: - return {} - - def _get_initialized_channels_for_org(self, org_id): - '''return [channel]''' - channels_dict = self._get_initialized_channels_dict_for_org(org_id) - return list(channels_dict.values()) - def _get_org_info_file(self, org_id): return os.path.join(self._get_org_base_dir(org_id), "org_info.pickle") @@ -290,36 +251,6 @@ def _check_channel_is_mine(self, channel): raise Exception("Channel does not correspond to the current Ethereum identity " + "(address=%s sender=%s signer=%s)" % (self.ident.address.lower(), channel["sender"].lower(), channel["signer"].lower())) - def _init_channel_from_metadata(self, metadata, org_registration): - # TODO: delete - channel_id = self.args.channel_id - channel = self._get_channel_state_from_blockchain(channel_id) - self._check_channel_is_mine(channel) - group_id = metadata.get_group_id_by_group_name(self.args.group_name) - if group_id is None: - raise Exception("Channel %i does not correspond to the given metadata.\n" % channel_id + - "We can't find the following group_id in metadata: " + self.args.group_name) - - self._printout("#group_name") - # self._printout(group.group_name) - self._init_or_update_org_if_needed(metadata, org_registration) - self._add_channel_to_initialized(self.args.org_id, channel) - - def init_channel_from_metadata(self): - # TODO: "channel init-metadata" command - # TODO: delete - metadata = OrganizationMetadata.from_file(self.args.metadata_file) - self._init_channel_from_metadata(metadata, {}) - - def init_channel_from_registry(self): - # TODO: "channel init" command - # TODO: delete - metadata = self._get_organization_metadata_from_registry( - self.args.org_id) - org_registration = self._get_organization_registration( - self.args.org_id) - self._init_channel_from_metadata(metadata, org_registration) - def _expiration_str_to_blocks(self, expiration_str, current_block): s = expiration_str if s.startswith("+") and s.endswith("days"): @@ -351,7 +282,7 @@ def _get_expiration_from_args(self): return rez def _open_channel_for_org(self, metadata): - # TODO: needed + mpe_cogs = self.call_contract_command( "MultiPartyEscrow", "balances", [self.ident.address]) if mpe_cogs < self.args.amount: @@ -381,21 +312,6 @@ def _open_channel_for_org(self, metadata): channel_info["channel_id"] = rez[1][0]["args"]["channelId"] return channel_info - def _initialize_already_opened_channel(self, metadata, sender, signer): - # TODO: delete - group_id = base64.b64decode(metadata.get_group_id_by_group_name(self.args.group_name)) - recipient = metadata.get_payment_address_for_group(self.args.group_name) - channels_ids = self._get_all_channels_filter_sender_recipient_group(sender, recipient, group_id) - for i in sorted(channels_ids): - channel = self._get_channel_state_from_blockchain(i) - if channel["signer"].lower() == signer.lower(): - self._printerr( - "# Channel with given sender, signer and group_id is already exists we simply initialize it (channel_id = %i)" % channel["channelId"]) -# self._printerr("# Please run 'snet channel extend-add %i --expiration --amount ' if necessary"%channel["channelId"]) - self._add_channel_to_initialized(self.args.org_id, channel) - return channel - return None - def _check_already_opened_channel(self, metadata, sender, signer): group_id = metadata.get_group_id_by_group_name(self.args.group_name) recipient = metadata.get_payment_address_for_group(self.args.group_name) @@ -412,7 +328,6 @@ def _check_already_opened_channel(self, metadata, sender, signer): return None def _open_channel_from_metadata(self, metadata, org_registration): - # TODO: needed self._init_or_update_org_if_needed(metadata, org_registration) # Before open new channel we try to find already opened channel @@ -429,12 +344,10 @@ def _open_channel_from_metadata(self, metadata, org_registration): self._printout(channel["channel_id"]) def open_channel_from_metadata(self): - # TODO: "channel open-from-metadata" command metadata = OrganizationMetadata.from_file(self.args.metadata_file) self._open_channel_from_metadata(metadata, {}) def open_channel_from_registry(self): - # TODO: "channel open" command metadata = self._get_organization_metadata_from_registry( self.args.org_id) org_registration = self._get_organization_registration( @@ -489,7 +402,6 @@ def _check_new_expiration_from_blockchain(self, channel_id, new_expiration): new_expiration, channel["expiration"])) def _smart_get_channel_for_org(self, metadata, filter_by): - # TODO: needed ''' - filter_by can be sender or signer ''' @@ -523,17 +435,6 @@ def channel_extend_and_add_funds_for_org(self): channel_id = self._smart_get_channel_for_org(metadata, "sender")["channelId"] self._channel_extend_add_funds_with_channel_id(channel_id) - def _get_all_initialized_channels(self): - # TODO: delete - """ return dict of lists rez[(, )] = [(channel_id, channel_info)] """ - channels_dict = defaultdict(list) - for service_base_dir in self._get_persistent_mpe_dir().glob("*/*"): - org_id = service_base_dir.parent.name - channels = self._get_initialized_channels_for_org(org_id) - if channels: - channels_dict[org_id] = channels - return channels_dict - def _get_channel_state_from_blockchain(self, channel_id): abi = get_contract_def("MultiPartyEscrow") channel_abi = abi_get_element_by_name(abi, "channels") @@ -600,67 +501,11 @@ def _print_channels(self, channels, filters: list[str] = None): channel["group_id"] = base64.b64encode(channel["groupId"]).decode("ascii") self._printout(self._convert_channel_dict_to_str(channel, filters)) - def _print_channels_dict_from_blockchain(self, channels_dict): - # print only caption - if self.args.only_id: - self._printout("#organization_id service_id channelId") - else: - self._printout( - "#organization_id service_id group_name channel_id nonce value(AGIX) expiration(blocks)") - for org_id in channels_dict: - channels = self._filter_channels_sender_or_signer( - channels_dict[org_id]) - metadata = self._read_metadata_for_org(org_id) - for channel in channels: - channel_id = channel["channelId"] - group_id_base64 = base64.b64encode( - channel["groupId"]).decode('ascii') - group = metadata.get_group_by_group_id(group_id_base64) - if group is None: - group_name = "UNDEFINED" - else: - group_name = group.group_name - if self.args.only_id: - self._printout("%s %s %i" % - (org_id, group_name, channel_id)) - else: - channel_blockchain = self._get_channel_state_from_blockchain( - channel_id) - value_agi = cogs2stragix(channel_blockchain["value"]) - self._printout("%s %s %i %i %s %i" % (org_id, group_name, channel_id, - channel_blockchain["nonce"], value_agi, channel_blockchain["expiration"])) - - def _filter_channels_sender_or_signer(self, channels): - good_channels = [] - for channel in channels: - not_sender = channel["sender"] != self.ident.address - not_signer = channel["signer"] != self.ident.address - if self.args.filter_sender and not_sender: - continue - if self.args.filter_signer and not_signer: - continue - if self.args.filter_my and not_sender and not_signer: - continue - good_channels.append(channel) - return good_channels - - def print_initialized_channels(self): - # TODO: "print-initialized" command - # TODO: delete - channels_dict = self._get_all_initialized_channels() - self._print_channels_dict_from_blockchain(channels_dict) - def get_address_from_arg_or_ident(self, arg): if arg: return arg return self.ident.address - def print_channels_filter_org(self): - # TODO: "print-initialized-filter-org" command - # TODO: delete - channels = self._get_initialized_channels_for_org(self.args.org_id) - self._print_channels_dict_from_blockchain({self.args.org_id: channels}) - def print_channels_filter_sender(self): # we don't need to return other channel fields if we only need channel_id or if we'll sync channels state return_only_id = self.args.only_id or not self.args.do_not_sync @@ -698,19 +543,6 @@ def print_all_channels(self): channels = self._get_filtered_channels(return_only_id=return_only_id) self._print_channels(channels) - def _get_all_channels_filter_sender(self, sender): - # TODO: delete - sender_padded = pad_hex(sender.lower(), 256) - channels_ids = self._get_filtered_channels(return_only_id=True, sender=sender_padded) - return channels_ids - - def _get_all_channels_filter_sender_recipient_group(self, sender, recipient, group_id): - # TODO: delete - sender_padded = pad_hex(sender.lower(), 256) - recipient_padded = pad_hex(recipient.lower(), 256) - group_id_hex = "0x" + group_id.hex() - return self._get_all_filtered_channels([sender_padded, recipient_padded, group_id_hex]) - # Auxilary functions def print_block_number(self): self._printout(self.ident.w3.eth.block_number) diff --git a/snet/cli/commands/mpe_client.py b/snet/cli/commands/mpe_client.py index 8a508835..64b9de66 100644 --- a/snet/cli/commands/mpe_client.py +++ b/snet/cli/commands/mpe_client.py @@ -295,11 +295,8 @@ def call_server_statelessly_with_params(self, params, group_name): grpc_channel = open_grpc_channel(endpoint) # if channel was not initilized we will try to initailize it (it will work only in simple case of signer == sender) - channel = self._smart_get_initialized_channel_for_org( - org_metadata, - filter_by="signer" - ) - channel_id = channel["channelId"] + channel = self._smart_get_channel_for_org(org_metadata, filter_by="signer") + channel_id = channel["channel_id"] price = self._get_price_from_metadata(service_metadata, group_name) server_state = self._get_channel_state_from_server(grpc_channel, channel_id) diff --git a/snet/cli/commands/mpe_treasurer.py b/snet/cli/commands/mpe_treasurer.py index 9e173414..db1d959b 100644 --- a/snet/cli/commands/mpe_treasurer.py +++ b/snet/cli/commands/mpe_treasurer.py @@ -151,8 +151,7 @@ def _claim_in_progress_and_claim_channels(self, grpc_channel, channels): def claim_channels(self): grpc_channel = open_grpc_channel(self.args.endpoint) - self._claim_in_progress_and_claim_channels( - grpc_channel, self.args.channels) + self._claim_in_progress_and_claim_channels(grpc_channel, self.args.channels) def claim_all_channels(self): grpc_channel = open_grpc_channel(self.args.endpoint) From 8541f4d6132ede458768ebb3e9d8237b97cbe222 Mon Sep 17 00:00:00 2001 From: Arondondon Date: Wed, 16 Oct 2024 18:21:42 +0300 Subject: [PATCH 05/10] Fixed "print" commands in "channel" section. --- snet/cli/commands/mpe_channel.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/snet/cli/commands/mpe_channel.py b/snet/cli/commands/mpe_channel.py index 2ec326e7..507534a6 100644 --- a/snet/cli/commands/mpe_channel.py +++ b/snet/cli/commands/mpe_channel.py @@ -511,6 +511,7 @@ def print_channels_filter_sender(self): return_only_id = self.args.only_id or not self.args.do_not_sync address = self.get_address_from_arg_or_ident(self.args.sender) channels = self._get_filtered_channels(return_only_id=return_only_id, sender=address) + self._printout("Channels for sender: %s" % address) self._print_channels(channels, ["sender"]) def print_channels_filter_recipient(self): @@ -518,24 +519,29 @@ def print_channels_filter_recipient(self): return_only_id = self.args.only_id or not self.args.do_not_sync address = self.get_address_from_arg_or_ident(self.args.recipient) channels = self._get_filtered_channels(return_only_id=return_only_id, recipient=address) + self._printout("Channels for recipient: %s" % address) self._print_channels(channels, ["recipient"]) def print_channels_filter_group(self): # we don't need to return other channel fields if we only need channel_id or if we'll sync channels state return_only_id = self.args.only_id or not self.args.do_not_sync metadata = self._get_organization_metadata_from_registry(self.args.org_id) + recipient = metadata.get_payment_address_for_group(self.args.group_name) group_id = metadata.get_group_id_by_group_name(self.args.group_name) channels = self._get_filtered_channels(return_only_id=return_only_id, group_id=group_id) - self._print_channels(channels, ["group_id"]) + self._printout("Channels for group_id: %s and recipient: %s" % (group_id, recipient)) + self._print_channels(channels, ["group_id", "recipient"]) def print_channels_filter_group_sender(self): # we don't need to return other channel fields if we only need channel_id or if we'll sync channels state return_only_id = self.args.only_id or not self.args.do_not_sync - address = self.get_address_from_arg_or_ident(self.args.sender) + sender = self.get_address_from_arg_or_ident(self.args.sender) metadata = self._get_organization_metadata_from_registry(self.args.org_id) group_id = metadata.get_group_id_by_group_name(self.args.group_name) - channels = self._get_filtered_channels(return_only_id=return_only_id, sender=address, group_id=group_id) - self._print_channels(channels, ["sender", "group_id"]) + recipient = metadata.get_payment_address_for_group(self.args.group_name) + channels = self._get_filtered_channels(return_only_id=return_only_id, sender=sender, group_id=group_id) + self._printout("Channels for group_id: %s, sender: %s and recipient: %s" % (group_id, sender, recipient)) + self._print_channels(channels, ["sender", "group_id", "recipient"]) def print_all_channels(self): # we don't need to return other channel fields if we only need channel_id or if we'll sync channels state From f694f6aa42b3b9d7b793874a65817dce9627f254 Mon Sep 17 00:00:00 2001 From: Arondondon Date: Tue, 22 Oct 2024 19:57:22 +0300 Subject: [PATCH 06/10] Removed deprecated functionality --- snet/cli/arguments.py | 14 +++---------- snet/cli/commands/sdk_command.py | 5 ++--- snet/cli/resources/proto/merckledag.proto | 17 --------------- snet/cli/resources/proto/unixfs.proto | 25 ----------------------- snet/cli/utils/utils.py | 25 +++++------------------ 5 files changed, 10 insertions(+), 76 deletions(-) delete mode 100644 snet/cli/resources/proto/merckledag.proto delete mode 100644 snet/cli/resources/proto/unixfs.proto diff --git a/snet/cli/arguments.py b/snet/cli/arguments.py index 5a731e4c..4ac9e909 100644 --- a/snet/cli/arguments.py +++ b/snet/cli/arguments.py @@ -185,9 +185,6 @@ def add_network_options(parser, config): help="Name of network to create") p.add_argument("eth_rpc_endpoint", help="Ethereum rpc endpoint") - p.add_argument("--default-gas-price", - default="medium", - help="Default gas price (in wei) or gas price strategy ('fast' ~1min, 'medium' ~5min or 'slow' ~60min), default is 'medium'") p.add_argument("--skip-check", action="store_true", help="Skip check that eth_rpc_endpoint is valid") @@ -236,6 +233,8 @@ def add_contract_options(parser): contracts = get_all_abi_contract_files() for path in contracts: + if "TokenConversionManager" in str(path) or "TokenStake" in str(path): + continue contract_name = re.search( r"([^.]*)\.json", os.path.basename(path)).group(1) contract_p = subparsers.add_parser( @@ -1323,16 +1322,9 @@ def add_sdk_options(parser): subparsers = parser.add_subparsers(title="Commands", metavar="COMMAND") subparsers.required = True - supported_languages = ["python", "nodejs"] - p = subparsers.add_parser("generate-client-library", - help="Generate compiled client libraries to call services using your language of choice") + help="Generate compiled client libraries to call services using Python") p.set_defaults(fn="generate_client_library") - p.add_argument("language", - choices=supported_languages, - help="Choose target language for the generated client library from {}".format( - supported_languages), - metavar="LANGUAGE") add_p_service_in_registry(p) p.add_argument("protodir", nargs="?", diff --git a/snet/cli/commands/sdk_command.py b/snet/cli/commands/sdk_command.py index 2a6c5cbd..fae953ac 100644 --- a/snet/cli/commands/sdk_command.py +++ b/snet/cli/commands/sdk_command.py @@ -17,11 +17,10 @@ def generate_client_library(self): os.makedirs(client_libraries_base_dir_path, exist_ok=True) # Create service client libraries path - library_language = self.args.language library_org_id = self.args.org_id library_service_id = self.args.service_id - library_dir_path = client_libraries_base_dir_path.joinpath(library_org_id, library_service_id, library_language) + library_dir_path = client_libraries_base_dir_path.joinpath(library_org_id, library_service_id, "python") metadata = self._get_service_metadata_from_registry() service_api_source = metadata.get("service_api_source") or metadata.get("model_ipfs_hash") @@ -30,7 +29,7 @@ def generate_client_library(self): download_and_safe_extract_proto(service_api_source, library_dir_path, self._get_ipfs_client()) # Compile proto files - compile_proto(Path(library_dir_path), library_dir_path, target_language=self.args.language) + compile_proto(Path(library_dir_path), library_dir_path) self._printout( 'client libraries for service with id "{}" in org with id "{}" generated at {}'.format(library_service_id, diff --git a/snet/cli/resources/proto/merckledag.proto b/snet/cli/resources/proto/merckledag.proto deleted file mode 100644 index 5af078a5..00000000 --- a/snet/cli/resources/proto/merckledag.proto +++ /dev/null @@ -1,17 +0,0 @@ -syntax = "proto2"; -// An IPFS MerkleDAG Link -message MerkleLink { - required bytes Hash = 1; // multihash of the target object - required string Name = 2; // utf string name - required uint64 Tsize = 3; // cumulative size of target object - - // user extensions start at 50 -} - -// An IPFS MerkleDAG Node -message MerkleNode { - repeated MerkleLink Links = 2; // refs to other objects - required bytes Data = 1; // opaque user data - - // user extensions start at 50 -} \ No newline at end of file diff --git a/snet/cli/resources/proto/unixfs.proto b/snet/cli/resources/proto/unixfs.proto deleted file mode 100644 index c190079a..00000000 --- a/snet/cli/resources/proto/unixfs.proto +++ /dev/null @@ -1,25 +0,0 @@ -syntax = "proto2"; -package unixfs.pb; - -message Data { - enum DataType { - Raw = 0; - Directory = 1; - File = 2; - Metadata = 3; - Symlink = 4; - HAMTShard = 5; - } - - required DataType Type = 1; - optional bytes Data = 2; - optional uint64 filesize = 3; - repeated uint64 blocksizes = 4; - - optional uint64 hashType = 5; - optional uint64 fanout = 6; -} - -message Metadata { - optional string MimeType = 1; -} \ No newline at end of file diff --git a/snet/cli/utils/utils.py b/snet/cli/utils/utils.py index b69e60ba..b2b38479 100644 --- a/snet/cli/utils/utils.py +++ b/snet/cli/utils/utils.py @@ -146,7 +146,7 @@ def get_cli_version(): return distribution("snet.cli").version -def compile_proto(entry_path, codegen_dir, proto_file=None, target_language="python"): +def compile_proto(entry_path, codegen_dir, proto_file=None): try: if not os.path.exists(codegen_dir): os.makedirs(codegen_dir) @@ -157,25 +157,10 @@ def compile_proto(entry_path, codegen_dir, proto_file=None, target_language="pyt "-I{}".format(proto_include) ] - if target_language == "python": - compiler_args.insert(0, "protoc") - compiler_args.append("--python_out={}".format(codegen_dir)) - compiler_args.append("--grpc_python_out={}".format(codegen_dir)) - compiler = protoc - elif target_language == "nodejs": - protoc_node_compiler_path = Path( - RESOURCES_PATH.joinpath("node_modules").joinpath("grpc-tools").joinpath("bin").joinpath( - "protoc.js")).absolute() - grpc_node_plugin_path = Path( - RESOURCES_PATH.joinpath("node_modules").joinpath("grpc-tools").joinpath("bin").joinpath( - "grpc_node_plugin")).resolve() - if not os.path.isfile(protoc_node_compiler_path) or not os.path.isfile(grpc_node_plugin_path): - print("Missing required node.js protoc compiler. Retrieving from npm...") - subprocess.run(["npm", "install"], cwd=RESOURCES_PATH) - compiler_args.append("--js_out=import_style=commonjs,binary:{}".format(codegen_dir)) - compiler_args.append("--grpc_out={}".format(codegen_dir)) - compiler_args.append("--plugin=protoc-gen-grpc={}".format(grpc_node_plugin_path)) - compiler = lambda args: subprocess.run([str(protoc_node_compiler_path)] + args) + compiler_args.insert(0, "protoc") + compiler_args.append("--python_out={}".format(codegen_dir)) + compiler_args.append("--grpc_python_out={}".format(codegen_dir)) + compiler = protoc if proto_file: compiler_args.append(str(proto_file)) From 50a4b1d5638387ebc13e9f1c91e7b89aaf476137 Mon Sep 17 00:00:00 2001 From: Arondondon Date: Wed, 23 Oct 2024 17:50:24 +0300 Subject: [PATCH 07/10] Fixed "snet channel metadata-init-utility" command with wrong "encoding" field in metadata. --- snet/cli/metadata/service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snet/cli/metadata/service.py b/snet/cli/metadata/service.py index 895df1dc..739ae035 100644 --- a/snet/cli/metadata/service.py +++ b/snet/cli/metadata/service.py @@ -65,7 +65,7 @@ class MPEServiceMetadata: def __init__(self): self.m = {"version": 1, "display_name": "", - "encoding": "grpc", # grpc by default + "encoding": "proto", # grpc by default "service_type": "grpc", # grpc by default # one week by default (15 sec block, 24*60*60*7/15) "service_api_source": "", From 2d4f47bab507b990c2d002eee1be327bd303cc46 Mon Sep 17 00:00:00 2001 From: Arondondon Date: Thu, 24 Oct 2024 14:23:34 +0300 Subject: [PATCH 08/10] Updated README.md for building documentation --- README.md | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 254bd0e4..cb21c43f 100644 --- a/README.md +++ b/README.md @@ -127,28 +127,49 @@ Backward compatibility for other Python versions is not guaranteed. $ git clone https://github.com/singnet/snet-cli.git $ cd snet-cli/packages/snet_cli ``` + +* * Install the package in development/editable mode + ```bash $ pip3 install -e . ``` -#### Building the Documentation +### Building the Documentation in Markdown files + +* Clone repository and install dependencies -* Install sphinx, sphinx-argparse and the rtd theme ```bash +$ git clone https://github.com/singnet/snet-cli.git +$ cd snet-cli/packages/snet_cli $ pip install sphinx $ pip install sphinx-argparse $ pip install sphinx-rtd-theme -``` +$ pip install bs4 +$ pip install html2text +``` + +#### On Linux * Run the build-docs.sh in the docs directory + ```bash $ cd docs $ sh build-docs.sh ``` -The documentation is generated under the docs/build/html folder +#### On Windows + +* Install `make` utility and run the build-docs.ps1 in the docs directory + +```powershell +choco install make # install choco if it is not already installed +cd docs +powershell -file build-docs.ps1 +``` + +The documentation is generated under the docs/build/markdown folder ### Release From 9ca7975a9ee2f90433d1461cab283ee7ae05b3ee Mon Sep 17 00:00:00 2001 From: Arondondon Date: Thu, 24 Oct 2024 15:03:29 +0300 Subject: [PATCH 09/10] Got rid of the "goerli" network in the default config. --- snet/cli/config.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/snet/cli/config.py b/snet/cli/config.py index 95dd3b31..79a002cf 100644 --- a/snet/cli/config.py +++ b/snet/cli/config.py @@ -200,9 +200,6 @@ def create_default_config(self): self["network.mainnet"] = { "default_eth_rpc_endpoint": "https://mainnet.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5" } - self["network.goerli"] = { - "default_eth_rpc_endpoint": "https://goerli.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5", - } self["network.sepolia"] = { "default_eth_rpc_endpoint": "https://sepolia.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5", } From c2d04c38a3f297705e0d057eec0754485077d276 Mon Sep 17 00:00:00 2001 From: Arondondon Date: Fri, 25 Oct 2024 13:45:34 +0300 Subject: [PATCH 10/10] Fixed contract names in "snet contract" command. --- snet/cli/arguments.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/snet/cli/arguments.py b/snet/cli/arguments.py index 4ac9e909..0336a9c7 100644 --- a/snet/cli/arguments.py +++ b/snet/cli/arguments.py @@ -233,13 +233,12 @@ def add_contract_options(parser): contracts = get_all_abi_contract_files() for path in contracts: - if "TokenConversionManager" in str(path) or "TokenStake" in str(path): - continue - contract_name = re.search( - r"([^.]*)\.json", os.path.basename(path)).group(1) - contract_p = subparsers.add_parser( - contract_name, help="{} contract".format(contract_name)) - add_contract_function_options(contract_p, contract_name) + if "MultiPartyEscrow" in str(path) or "Registry" in str(path) or "SingularityNetToken" in str(path): + contract_name = re.search( + r"([^.]*)\.json", os.path.basename(path)).group(1) + contract_p = subparsers.add_parser( + contract_name, help="{} contract".format(contract_name)) + add_contract_function_options(contract_p, contract_name) def add_organization_arguments(parser):