diff --git a/arclet/alconna/__init__.py b/arclet/alconna/__init__.py index 2d96ecec..4824b9ba 100644 --- a/arclet/alconna/__init__.py +++ b/arclet/alconna/__init__.py @@ -4,7 +4,7 @@ import re from .util import split_once, split from .component import Option, CommandInterface, Subcommand, Arpamar -from .types import NonTextElement, MessageChain, TAValue, Args +from .types import NonTextElement, MessageChain, TAValue, Args, AnyParam, AllParam from .exceptions import ParamsUnmatched, InvalidFormatMap, NullTextMessage, InvalidName _builtin_option = Option("-help") @@ -205,83 +205,70 @@ def _initialise_arguments(self): def _analyse_args( self, opt_args: Args, - may_args: str, sep: str, - rest_text: str ) -> Dict[str, Any]: """分析 Args 部分""" _option_dict: Dict[str, Any] = {} - may_element_index = self.result.raw_texts[self.result.current_index][1] + 1 for key, value, default in opt_args: + _may_arg = self.result.next_data(sep) if isinstance(value, str): - if sep != self.separator: - may_arg, may_args = split_once(may_args, sep) - else: - may_arg, rest_text = self.result.split_by(sep) - if not (_arg_find := re.findall('^' + value + '$', may_arg)): - if default is not None: - _arg_find = [default] - else: - raise ParamsUnmatched(f"param {may_arg} is incorrect") - if may_arg == value: + if not (_arg_find := re.findall('^' + value + '$', _may_arg)): + if default is None: + raise ParamsUnmatched(f"param {_may_arg} is incorrect") + _arg_find = [default] + if _may_arg == value: _arg_find[0] = Ellipsis _option_dict[key] = _arg_find[0] - if sep == self.separator: - self.result.raw_texts[self.result.current_index][0] = rest_text + elif value == AnyParam: + _option_dict[key] = _may_arg + elif value == AllParam: + _rest_data = self.recover_raw_data() + if not _rest_data: + _rest_data = [_may_arg] + elif isinstance(_rest_data[0], str): + _rest_data[0] = _may_arg + sep + _rest_data[0] + else: + _rest_data.insert(0, _may_arg) + _option_dict[key] = _rest_data[0] if self.result.is_str else _rest_data + return _option_dict else: - if self.result.elements[may_element_index].__class__ == value: - _option_dict[key] = self.result.elements.pop(may_element_index) - may_element_index += 1 + if _may_arg.__class__ == value: + _option_dict[key] = _may_arg elif default is not None: _option_dict[key] = default - may_element_index += 1 else: - raise ParamsUnmatched( - f"param type {self.result.elements[may_element_index].__class__} is incorrect") + raise ParamsUnmatched(f"param type {_may_arg.__class__} is incorrect") return _option_dict def _analyse_option( self, param: Option, - text: str, - rest_text: str ) -> Dict[str, Any]: """分析 Option 部分""" opt: str = param.name alias: str = param.alias args: Args = param.args sep: str = param.separator - name, may_args = split_once(text, sep) - if sep == self.separator: # 在sep等于separator的情况下name是被提前切出来的 - name = text + + name = self.result.next_data(sep) if (not re.match('^' + opt + '$', name)) and (not re.match('^' + alias + '$', name)): # 先匹配选项名称 raise ParamsUnmatched(f"{name} dose not matched with {opt}") - self.result.raw_texts[self.result.current_index][0] = rest_text name = name.lstrip("-") if not args.argument: return {name: Ellipsis} - return {name: self._analyse_args(args, may_args, sep, rest_text)} + return {name: self._analyse_args(args, sep)} def _analyse_subcommand( self, - param: Subcommand, - text: str, - rest_text: str + param: Subcommand ) -> Dict[str, Any]: """分析 Subcommand 部分""" command: str = param.name sep: str = param.separator sub_params: Dict = param.sub_params - name, may_text = split_once(text, sep) - if sep == self.separator: - name = text + name = self.result.next_data(sep) if not re.match('^' + command + '$', name): raise ParamsUnmatched(f"{name} dose not matched with {command}") - - self.result.raw_texts[self.result.current_index][0] = may_text - if sep == self.separator: - self.result.raw_texts[self.result.current_index][0] = rest_text - name = name.lstrip("-") if not param.args.argument and not param.options: return {name: Ellipsis} @@ -290,17 +277,18 @@ def _analyse_subcommand( _get_args = False for _ in range(len(sub_params)): try: - _text, _rest_text = self.result.split_by(sep) + _text = self.result.next_data(sep, pop=False) if not (sub_param := sub_params.get(_text)): sub_param = sub_params['sub_args'] - for sp in sub_params: - if _text.startswith(sp): - sub_param = sub_params.get(sp) - break + if isinstance(_text, str): + for sp in sub_params: + if _text.startswith(sp): + sub_param = sub_params.get(sp) + break if isinstance(sub_param, Option): - subcommand.update(self._analyse_option(sub_param, _text, _rest_text)) + subcommand.update(self._analyse_option(sub_param)) elif not _get_args: - if args := self._analyse_args(sub_param, _text, sep, _rest_text): + if args := self._analyse_args(sub_param, sep): _get_args = True subcommand.update(args) except (IndexError, KeyError): @@ -309,23 +297,32 @@ def _analyse_subcommand( if self.exception_in_time: raise break - - if sep != self.separator: - self.result.raw_texts[self.result.current_index][0] = rest_text return {name: subcommand} def _analyse_header(self) -> str: """分析命令头部""" - head_text, self.result.raw_texts[0][0] = self.result.split_by(self.separator) - for ch in self._command_headers: - if not (_head_find := re.findall('^' + ch + '$', head_text)): - continue - self.result.head_matched = True - if _head_find[0] != ch: - return _head_find[0] + head_text = self.result.next_data(self.separator) + if isinstance(head_text, str): + for ch in self._command_headers: + if not (_head_find := re.findall('^' + ch + '$', head_text)): + continue + self.result.head_matched = True + if _head_find[0] != ch: + return _head_find[0] if not self.result.head_matched: raise ParamsUnmatched(f"{head_text} does not matched") + def recover_raw_data(self) -> List[Union[str, NonTextElement]]: + """将处理过的命令数据大概还原""" + _result = [] + for v in self.result.raw_data.values(): + if isinstance(v, list): + _result.append(f'{self.separator}'.join(v)) + else: + _result.append(v) + self.result.raw_data.clear() + return _result + def analyse_message(self, message: Union[str, MessageChain]) -> Arpamar: """命令分析功能, 传入字符串或消息链, 返回一个特定的数据集合类""" if hasattr(self, "result"): @@ -337,15 +334,15 @@ def analyse_message(self, message: Union[str, MessageChain]) -> Arpamar: if isinstance(message, str): self.result.is_str = True - self.result.raw_texts.append([message, 0]) + self.result.raw_data.setdefault(0, split(message, self.separator)) else: for i, ele in enumerate(message): - if ele.__class__.__name__ not in ("Plain", "Source", "Quote", "File"): - self.result.elements[i] = ele - elif ele.__class__.__name__ == "Plain": - self.result.raw_texts.append([ele.text.lstrip(' '), i]) + if ele.__class__.__name__ not in ("Plain", "Text", "Source", "Quote", "File"): + self.result.raw_data[i] = ele + elif ele.__class__.__name__ in ("Plain", "Text"): + self.result.raw_data[i] = split(ele.text.lstrip(' '), self.separator) - if not self.result.raw_texts: + if not self.result.raw_data: if self.exception_in_time: raise NullTextMessage self.result.results.clear() @@ -358,23 +355,24 @@ def analyse_message(self, message: Union[str, MessageChain]) -> Arpamar: return self.result for i in range(len(self._params)): - if all([t[0] == "" for t in self.result.raw_texts]): + if not self.result.raw_data: break try: - _text, _rest_text = self.result.split_by(self.separator) + _text = self.result.next_data(self.separator, pop=False) _param = self._params.get(_text) if not _param: _param = self._params['main_args'] - for p, value in self._params.items(): - if _text.startswith(p) or _text.startswith(getattr(value, 'alias', p)): - _param = value - break + if isinstance(_text, str): + for p, value in self._params.items(): + if _text.startswith(p) or _text.startswith(getattr(value, 'alias', p)): + _param = value + break if isinstance(_param, Option): - self.result.results['options'].update(self._analyse_option(_param, _text, _rest_text)) + self.result.results['options'].update(self._analyse_option(_param)) elif isinstance(_param, Subcommand): - self.result.results['options'].update(self._analyse_subcommand(_param, _text, _rest_text)) + self.result.results['options'].update(self._analyse_subcommand(_param)) elif not self.result.results.get("main_args"): - self.result.results['main_args'] = self._analyse_args(_param, _text, self.separator, _rest_text) + self.result.results['main_args'] = self._analyse_args(_param, self.separator) except (IndexError, KeyError): pass except ParamsUnmatched: @@ -382,33 +380,13 @@ def analyse_message(self, message: Union[str, MessageChain]) -> Arpamar: raise break - try: - may_element_index = self.result.raw_texts[self.result.current_index][1] + 1 - for key, value, default in self.args: - if self.result.elements[may_element_index].__class__ == value: - self.result.results['main_args'][key] = self.result.elements.pop(may_element_index) - may_element_index += 1 - elif default is not None: - self.result.results['main_args'][key] = default - may_element_index += 1 - else: - raise ParamsUnmatched( - f"param element type {self.result.elements[may_element_index].__class__} is incorrect") - except (KeyError, IndexError): - pass - except ParamsUnmatched: - if self.exception_in_time: - raise - - if len(self.result.elements) == 0 and all( - [t[0] == "" for t in self.result.raw_texts] - ) and (not self.result.need_main_args or ( + if not self.result.raw_data and (not self.result.need_main_args or ( self.result.need_main_args and not self.result.has('help') and self.result.results.get('main_args') )): self.result.matched = True self.result.encapsulate_result() else: if self.exception_in_time: - raise ParamsUnmatched(", ".join([t[0] for t in self.result.raw_texts])) + raise ParamsUnmatched(", ".join([f"{v}" for v in self.result.raw_data.values()])) self.result.results.clear() return self.result