Skip to content

Commit

Permalink
0.4.1
Browse files Browse the repository at this point in the history
  • Loading branch information
RF-Tar-Railt authored Jan 2, 2022
1 parent d0fa6eb commit 2b45421
Showing 1 changed file with 72 additions and 94 deletions.
166 changes: 72 additions & 94 deletions arclet/alconna/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import re
from .util import split_once, split
from .component import Option, CommandInterface, Subcommand, Arpamar
from .types import NonTextElement, MessageChain, TAValue, Args
from .types import NonTextElement, MessageChain, TAValue, Args, AnyParam, AllParam
from .exceptions import ParamsUnmatched, InvalidFormatMap, NullTextMessage, InvalidName

_builtin_option = Option("-help")
Expand Down Expand Up @@ -205,83 +205,70 @@ def _initialise_arguments(self):
def _analyse_args(
self,
opt_args: Args,
may_args: str,
sep: str,
rest_text: str
) -> Dict[str, Any]:
"""分析 Args 部分"""
_option_dict: Dict[str, Any] = {}
may_element_index = self.result.raw_texts[self.result.current_index][1] + 1
for key, value, default in opt_args:
_may_arg = self.result.next_data(sep)
if isinstance(value, str):
if sep != self.separator:
may_arg, may_args = split_once(may_args, sep)
else:
may_arg, rest_text = self.result.split_by(sep)
if not (_arg_find := re.findall('^' + value + '$', may_arg)):
if default is not None:
_arg_find = [default]
else:
raise ParamsUnmatched(f"param {may_arg} is incorrect")
if may_arg == value:
if not (_arg_find := re.findall('^' + value + '$', _may_arg)):
if default is None:
raise ParamsUnmatched(f"param {_may_arg} is incorrect")
_arg_find = [default]
if _may_arg == value:
_arg_find[0] = Ellipsis
_option_dict[key] = _arg_find[0]
if sep == self.separator:
self.result.raw_texts[self.result.current_index][0] = rest_text
elif value == AnyParam:
_option_dict[key] = _may_arg
elif value == AllParam:
_rest_data = self.recover_raw_data()
if not _rest_data:
_rest_data = [_may_arg]
elif isinstance(_rest_data[0], str):
_rest_data[0] = _may_arg + sep + _rest_data[0]
else:
_rest_data.insert(0, _may_arg)
_option_dict[key] = _rest_data[0] if self.result.is_str else _rest_data
return _option_dict
else:
if self.result.elements[may_element_index].__class__ == value:
_option_dict[key] = self.result.elements.pop(may_element_index)
may_element_index += 1
if _may_arg.__class__ == value:
_option_dict[key] = _may_arg
elif default is not None:
_option_dict[key] = default
may_element_index += 1
else:
raise ParamsUnmatched(
f"param type {self.result.elements[may_element_index].__class__} is incorrect")
raise ParamsUnmatched(f"param type {_may_arg.__class__} is incorrect")
return _option_dict

def _analyse_option(
self,
param: Option,
text: str,
rest_text: str
) -> Dict[str, Any]:
"""分析 Option 部分"""
opt: str = param.name
alias: str = param.alias
args: Args = param.args
sep: str = param.separator
name, may_args = split_once(text, sep)
if sep == self.separator: # 在sep等于separator的情况下name是被提前切出来的
name = text

name = self.result.next_data(sep)
if (not re.match('^' + opt + '$', name)) and (not re.match('^' + alias + '$', name)): # 先匹配选项名称
raise ParamsUnmatched(f"{name} dose not matched with {opt}")
self.result.raw_texts[self.result.current_index][0] = rest_text
name = name.lstrip("-")
if not args.argument:
return {name: Ellipsis}
return {name: self._analyse_args(args, may_args, sep, rest_text)}
return {name: self._analyse_args(args, sep)}

def _analyse_subcommand(
self,
param: Subcommand,
text: str,
rest_text: str
param: Subcommand
) -> Dict[str, Any]:
"""分析 Subcommand 部分"""
command: str = param.name
sep: str = param.separator
sub_params: Dict = param.sub_params
name, may_text = split_once(text, sep)
if sep == self.separator:
name = text
name = self.result.next_data(sep)
if not re.match('^' + command + '$', name):
raise ParamsUnmatched(f"{name} dose not matched with {command}")

self.result.raw_texts[self.result.current_index][0] = may_text
if sep == self.separator:
self.result.raw_texts[self.result.current_index][0] = rest_text

name = name.lstrip("-")
if not param.args.argument and not param.options:
return {name: Ellipsis}
Expand All @@ -290,17 +277,18 @@ def _analyse_subcommand(
_get_args = False
for _ in range(len(sub_params)):
try:
_text, _rest_text = self.result.split_by(sep)
_text = self.result.next_data(sep, pop=False)
if not (sub_param := sub_params.get(_text)):
sub_param = sub_params['sub_args']
for sp in sub_params:
if _text.startswith(sp):
sub_param = sub_params.get(sp)
break
if isinstance(_text, str):
for sp in sub_params:
if _text.startswith(sp):
sub_param = sub_params.get(sp)
break
if isinstance(sub_param, Option):
subcommand.update(self._analyse_option(sub_param, _text, _rest_text))
subcommand.update(self._analyse_option(sub_param))
elif not _get_args:
if args := self._analyse_args(sub_param, _text, sep, _rest_text):
if args := self._analyse_args(sub_param, sep):
_get_args = True
subcommand.update(args)
except (IndexError, KeyError):
Expand All @@ -309,23 +297,32 @@ def _analyse_subcommand(
if self.exception_in_time:
raise
break

if sep != self.separator:
self.result.raw_texts[self.result.current_index][0] = rest_text
return {name: subcommand}

def _analyse_header(self) -> str:
"""分析命令头部"""
head_text, self.result.raw_texts[0][0] = self.result.split_by(self.separator)
for ch in self._command_headers:
if not (_head_find := re.findall('^' + ch + '$', head_text)):
continue
self.result.head_matched = True
if _head_find[0] != ch:
return _head_find[0]
head_text = self.result.next_data(self.separator)
if isinstance(head_text, str):
for ch in self._command_headers:
if not (_head_find := re.findall('^' + ch + '$', head_text)):
continue
self.result.head_matched = True
if _head_find[0] != ch:
return _head_find[0]
if not self.result.head_matched:
raise ParamsUnmatched(f"{head_text} does not matched")

def recover_raw_data(self) -> List[Union[str, NonTextElement]]:
"""将处理过的命令数据大概还原"""
_result = []
for v in self.result.raw_data.values():
if isinstance(v, list):
_result.append(f'{self.separator}'.join(v))
else:
_result.append(v)
self.result.raw_data.clear()
return _result

def analyse_message(self, message: Union[str, MessageChain]) -> Arpamar:
"""命令分析功能, 传入字符串或消息链, 返回一个特定的数据集合类"""
if hasattr(self, "result"):
Expand All @@ -337,15 +334,15 @@ def analyse_message(self, message: Union[str, MessageChain]) -> Arpamar:

if isinstance(message, str):
self.result.is_str = True
self.result.raw_texts.append([message, 0])
self.result.raw_data.setdefault(0, split(message, self.separator))
else:
for i, ele in enumerate(message):
if ele.__class__.__name__ not in ("Plain", "Source", "Quote", "File"):
self.result.elements[i] = ele
elif ele.__class__.__name__ == "Plain":
self.result.raw_texts.append([ele.text.lstrip(' '), i])
if ele.__class__.__name__ not in ("Plain", "Text", "Source", "Quote", "File"):
self.result.raw_data[i] = ele
elif ele.__class__.__name__ in ("Plain", "Text"):
self.result.raw_data[i] = split(ele.text.lstrip(' '), self.separator)

if not self.result.raw_texts:
if not self.result.raw_data:
if self.exception_in_time:
raise NullTextMessage
self.result.results.clear()
Expand All @@ -358,57 +355,38 @@ def analyse_message(self, message: Union[str, MessageChain]) -> Arpamar:
return self.result

for i in range(len(self._params)):
if all([t[0] == "" for t in self.result.raw_texts]):
if not self.result.raw_data:
break
try:
_text, _rest_text = self.result.split_by(self.separator)
_text = self.result.next_data(self.separator, pop=False)
_param = self._params.get(_text)
if not _param:
_param = self._params['main_args']
for p, value in self._params.items():
if _text.startswith(p) or _text.startswith(getattr(value, 'alias', p)):
_param = value
break
if isinstance(_text, str):
for p, value in self._params.items():
if _text.startswith(p) or _text.startswith(getattr(value, 'alias', p)):
_param = value
break
if isinstance(_param, Option):
self.result.results['options'].update(self._analyse_option(_param, _text, _rest_text))
self.result.results['options'].update(self._analyse_option(_param))
elif isinstance(_param, Subcommand):
self.result.results['options'].update(self._analyse_subcommand(_param, _text, _rest_text))
self.result.results['options'].update(self._analyse_subcommand(_param))
elif not self.result.results.get("main_args"):
self.result.results['main_args'] = self._analyse_args(_param, _text, self.separator, _rest_text)
self.result.results['main_args'] = self._analyse_args(_param, self.separator)
except (IndexError, KeyError):
pass
except ParamsUnmatched:
if self.exception_in_time:
raise
break

try:
may_element_index = self.result.raw_texts[self.result.current_index][1] + 1
for key, value, default in self.args:
if self.result.elements[may_element_index].__class__ == value:
self.result.results['main_args'][key] = self.result.elements.pop(may_element_index)
may_element_index += 1
elif default is not None:
self.result.results['main_args'][key] = default
may_element_index += 1
else:
raise ParamsUnmatched(
f"param element type {self.result.elements[may_element_index].__class__} is incorrect")
except (KeyError, IndexError):
pass
except ParamsUnmatched:
if self.exception_in_time:
raise

if len(self.result.elements) == 0 and all(
[t[0] == "" for t in self.result.raw_texts]
) and (not self.result.need_main_args or (
if not self.result.raw_data and (not self.result.need_main_args or (
self.result.need_main_args and not self.result.has('help') and self.result.results.get('main_args')
)):
self.result.matched = True
self.result.encapsulate_result()
else:
if self.exception_in_time:
raise ParamsUnmatched(", ".join([t[0] for t in self.result.raw_texts]))
raise ParamsUnmatched(", ".join([f"{v}" for v in self.result.raw_data.values()]))
self.result.results.clear()
return self.result

0 comments on commit 2b45421

Please sign in to comment.