diff --git a/src/arclet/alconna/_internal/_analyser.py b/src/arclet/alconna/_internal/_analyser.py index 4c0aa87..916d6aa 100644 --- a/src/arclet/alconna/_internal/_analyser.py +++ b/src/arclet/alconna/_internal/_analyser.py @@ -139,11 +139,11 @@ def reset(self): self.value_result = None self.header_result = None - def process(self, argv: Argv[TDC], name_validated: bool = True) -> Self: + def process(self, argv: Argv, name_validated: bool = True) -> Self: """处理传入的参数集合 Args: - argv (Argv[TDC]): 命令行参数 + argv (Argv): 命令行参数 name_validated (bool, optional): 是否已经验证过名称. Defaults to True. Returns: @@ -187,31 +187,31 @@ class Analyser(SubAnalyser): command: Alconna """命令实例""" + argv: Argv + """命令行参数""" - def __init__(self, alconna: Alconna, compiler: TCompile | None = None): + def __init__(self, alconna: Alconna, argv: Argv, compiler: TCompile | None = None): """初始化解析器 Args: alconna (Alconna): 命令实例 + argv (Argv): 命令行参数 compiler (TCompile | None, optional): 编译器方法 """ super().__init__(alconna) - self._compiler = compiler or default_compiler - - def compile(self): + self.argv = argv self.extra_allow = not self.command.meta.strict or not self.command.namespace_config.strict - self._compiler(self) - command_manager.resolve(self.command).stack_params.base = self.compile_params - return self + (compiler or default_compiler)(self) + self.argv.stack_params.base = self.compile_params def __repr__(self): return f"<{self.__class__.__name__} of {self.command.path}>" - def process(self, argv: Argv[TDC], name_validated: bool = True) -> Exception | None: + def process(self, argv: Argv, name_validated: bool = True) -> Exception | None: """主体解析函数, 应针对各种情况进行解析 Args: - argv (Argv[TDC]): 命令行参数 + argv (Argv): 命令行参数 name_validated (bool, optional): 是否已经验证过名称. Defaults to True. """ if not self.header_result or not name_validated: diff --git a/src/arclet/alconna/_internal/_handlers.py b/src/arclet/alconna/_internal/_handlers.py index 8889ed4..46f2c38 100644 --- a/src/arclet/alconna/_internal/_handlers.py +++ b/src/arclet/alconna/_internal/_handlers.py @@ -11,12 +11,13 @@ from ..base import Option, Header from ..config import config from ..exceptions import ( - AlconnaException, + AnalyseException, ArgumentMissing, FuzzyMatchSuccess, InvalidHeader, InvalidParam, - PauseTriggered, ParamsUnmatched, + PauseTriggered, + ParamsUnmatched, ) from ..model import HeadResult, OptionResult from ..typing import KWBool, MultiKeyWordVar, MultiVar, _AllParamPattern, _StrMulti @@ -353,13 +354,12 @@ def analyse_option(analyser: SubAnalyser, argv: Argv, opt: Option, name_validate analyser.options_result[opt_n] = handle_action(opt, analyser.options_result[opt_n], opt_v) -def analyse_compact_params(analyser: SubAnalyser, argv: Argv, prefix: str): +def analyse_compact_params(analyser: SubAnalyser, argv: Argv): """分析紧凑参数 Args: analyser (SubAnalyser): 当前解析器 argv (Argv): 命令行参数 - prefix (str): 参数前缀 """ exc = None for param in analyser.compact_params: @@ -381,7 +381,7 @@ def analyse_compact_params(analyser: SubAnalyser, argv: Argv, prefix: str): else: analyser.subcommands_result[sparam.command.dest] = sparam.result() raise - except AlconnaException: + except AnalyseException: analyser.subcommands_result[sparam.command.dest] = sparam.result() raise else: @@ -417,17 +417,22 @@ def analyse_param(analyser: SubAnalyser, argv: Argv, seps: str | None = None): argv (Argv): 命令行参数 seps (str, optional): 指定的分隔符. """ + # 每次调用都会尝试解析一个参数 _text, _str = argv.next(seps) + # analyser.compile_params 有命中,说明在当前子命令内有对应的选项/子命令 if _str and _text and (_param := analyser.compile_params.get(_text)): - if Option in _param.__class__.__mro__: + # Help 之类的选项是 Option 子类, 得加上 __base__ 判断 + if _param.__class__ is Option or _param.__class__.__base__ is Option: oparam: Option = _param # type: ignore try: + # 因为 _text 已经被确定为选项名,所以 name_validated 为 True analyse_option(analyser, argv, oparam, True) - except AlconnaException as e: + except AnalyseException as e: if not argv.error: argv.error = e return True sparam: SubAnalyser = _param # type: ignore + # 禁止子命令重复解析 if sparam.command.dest not in analyser.subcommands_result: try: sparam.process(argv) @@ -441,26 +446,31 @@ def analyse_param(analyser: SubAnalyser, argv: Argv, seps: str | None = None): analyser.subcommands_result[sparam.command.dest] = sparam.result() if not argv.error: argv.error = e - except AlconnaException as e1: + except AnalyseException as e1: analyser.subcommands_result[sparam.command.dest] = sparam.result() if not argv.error: argv.error = e1 else: analyser.subcommands_result[sparam.command.dest] = sparam.result() return True + # 如果没有命中,则说明当前参数可能存在自定义分隔符,或者属于子命令的主参数,那么需要重新解析 argv.rollback(_text) - if _str and _text and analyser.compact_params and analyse_compact_params(analyser, argv, _text): + # 尝试以紧凑参数解析 + if _str and _text and analyser.compact_params and analyse_compact_params(analyser, argv): return True + # 主参数同样只允许解析一次 if analyser.command.nargs and not analyser.args_result: analyser.args_result = analyse_args(argv, analyser.self_args) if analyser.args_result: return True + # 若参数属于该子命令的同级/上级选项或子命令,则终止解析 if _str and _text and _text in argv.stack_params.parents(): return False if analyser.extra_allow: analyser.args_result.setdefault("$extra", []).append(_text) argv.next() return True + # 给 Completion 打的洞,若此时 analyser 属于主命令, 则让其先解析完主命令 elif _str and _text and not argv.stack_params.stack: if not argv.error: argv.error = ParamsUnmatched(lang.require("analyser", "param_unmatched").format(target=_text)) diff --git a/src/arclet/alconna/base.py b/src/arclet/alconna/base.py index 453c2bb..45ac0a4 100644 --- a/src/arclet/alconna/base.py +++ b/src/arclet/alconna/base.py @@ -417,22 +417,16 @@ def add(self, opt: Option | Subcommand) -> Self: class Help(Option): - soft_keyword = False - def _calc_hash(self): return hash("$ALCONNA_BUILTIN_OPTION_HELP") class Shortcut(Option): - soft_keyword = False - def _calc_hash(self): return hash("$ALCONNA_BUILTIN_OPTION_SHORTCUT") class Completion(Option): - soft_keyword = False - def _calc_hash(self): return hash("$ALCONNA_BUILTIN_OPTION_COMPLETION") diff --git a/src/arclet/alconna/completion.py b/src/arclet/alconna/completion.py index 5b25706..cbdd77c 100644 --- a/src/arclet/alconna/completion.py +++ b/src/arclet/alconna/completion.py @@ -113,7 +113,7 @@ def enter(self, content: list | None = None) -> EnterResult: Raises: ValueError: 当前没有可用的补全选项, 或者当前补全选项不可用。 """ - argv = command_manager.resolve(self.source.command) + argv = command_manager.require(self.source.command).argv argv.raw_data = self.raw_data.copy() argv.bak_data = self.bak_data.copy() argv.current_index = self.current_index @@ -226,17 +226,6 @@ def fresh(self, exc: PauseTriggered): comp_ctx: ContextModel[CompSession] = ContextModel("comp_ctx") -def _prompt_unit(command: Alconna, argv: Argv, trig: Arg): - if not (comp := trig.field.get_completion()): - return [Prompt(command.formatter.param(trig), False)] - if isinstance(comp, str): - return [Prompt(f"{trig.name}: {comp}", False)] - releases = argv.release(recover=True) - target = str(releases[-1]) or str(releases[-2]) - o = list(filter(lambda x: target in x, comp)) or comp - return [Prompt(f"{trig.name}: {i}", False, target) for i in o] - - def _prompt_none(command: Alconna, args_got: list[str], opts_got: list[str]): res: list[Prompt] = [] if unit := next((arg for arg in command.args if arg.name not in args_got), None): @@ -256,21 +245,22 @@ def _prompt_none(command: Alconna, args_got: list[str], opts_got: list[str]): def prompt(command: Alconna, argv: Argv, args_got: list[str], opts_got: list[str], trigger: str | Arg | Subcommand | None = None): """获取补全列表""" - if isinstance(trigger, Arg): - return _prompt_unit(command, argv, trigger) - elif isinstance(trigger, Subcommand): - return [Prompt(i) for i in argv.stack_params.stack[-1]] - elif isinstance(trigger, str): - res = list(filter(lambda x: trigger in x, argv.stack_params.base)) - if not res: - return [] - out = [i for i in res if i not in opts_got] - return [Prompt(i, True, trigger) for i in (out or res)] releases = argv.release(recover=True) target = str(releases[-1]) if isinstance(releases[-1], str) and releases[-1] in command.namespace_config.builtin_option_name["completion"]: target = str(releases[-2]) - if _res := list(filter(lambda x: target in x and target != x, argv.stack_params.base)): + if isinstance(trigger, Arg): + if not (comp := trigger.field.get_completion()): + return [Prompt(command.formatter.param(trigger), False)] + if isinstance(comp, str): + return [Prompt(f"{trigger.name}: {comp}", False)] + o = list(filter(lambda x: target in x, comp)) or comp + return [Prompt(f"{trigger.name}: {i}", False, target) for i in o] + elif isinstance(trigger, Subcommand): + return [Prompt(i) for i in argv.stack_params.stack[-1]] + if isinstance(trigger, str): + target = trigger + if _res := list(filter(lambda x: target in x, argv.stack_params.base)): out = [i for i in _res if i not in opts_got] return [Prompt(i, True, target) for i in (out or _res)] return _prompt_none(command, args_got, opts_got) diff --git a/src/arclet/alconna/core.py b/src/arclet/alconna/core.py index d9ec3ac..d9ba7af 100644 --- a/src/arclet/alconna/core.py +++ b/src/arclet/alconna/core.py @@ -4,7 +4,7 @@ import sys from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Callable, Generic, Literal, Sequence, TypeVar, cast, overload +from typing import Any, Callable, Generic, Literal, Sequence, TypeVar, cast, overload, TYPE_CHECKING from typing_extensions import Self from weakref import WeakSet @@ -16,6 +16,7 @@ from ._internal._handlers import handle_head_fuzzy, analyse_header from ._internal._shortcut import shortcut as _shortcut from .args import Arg, Args +from .argv import Argv, __argv_type__ from .arparma import Arparma, ArparmaBehavior, requirement_handler from .base import Completion, Help, Option, Shortcut, Subcommand, Header, SPECIAL_OPTIONS from .config import Namespace, config @@ -48,11 +49,11 @@ def handle_argv(): def add_builtin_options(options: list[Option | Subcommand], cmd: Alconna, ns: Namespace) -> None: if "help" not in ns.disable_builtin_options: - options.append(Help("|".join(ns.builtin_option_name["help"]), dest="$help", help_text=lang.require("builtin", "option_help"))) # noqa: E501 + options.append(Help("|".join(ns.builtin_option_name["help"]), dest="$help", help_text=lang.require("builtin", "option_help"), soft_keyword=False)) # noqa: E501 @cmd.route("$help") def _(command: Alconna, arp: Arparma): - argv = command_manager.resolve(cmd) + argv = command_manager.require(cmd).argv _help_param = [str(i) for i in argv.release(recover=True) if str(i) not in ns.builtin_option_name["help"]] arp.output = command.formatter.format_node(_help_param) return True @@ -64,6 +65,7 @@ def _(command: Alconna, arp: Arparma): Args["action?", "delete|list"]["name?", str]["command?", str], dest="$shortcut", help_text=lang.require("builtin", "option_shortcut"), + soft_keyword=False, ) ) @@ -84,11 +86,11 @@ def _(command: Alconna, arp: Arparma): return True if "completion" not in ns.disable_builtin_options: - options.append(Completion("|".join(ns.builtin_option_name["completion"]), dest="$completion", help_text=lang.require("builtin", "option_completion"))) # noqa: E501 + options.append(Completion("|".join(ns.builtin_option_name["completion"]), dest="$completion", help_text=lang.require("builtin", "option_completion"), soft_keyword=False)) # noqa: E501 @cmd.route("$completion") def _(command: Alconna, arp: Arparma): - argv = command_manager.resolve(cmd) + argv = command_manager.require(cmd).argv rest = argv.release() trigger = None if rest and isinstance(rest[-1], str) and rest[-1] in ns.builtin_option_name["completion"]: @@ -193,7 +195,12 @@ class Alconna(Subcommand): def compile(self, compiler: TCompile | None = None) -> Analyser: """编译 `Alconna` 为对应的解析器""" - return Analyser(self, compiler).compile() + if TYPE_CHECKING: + argv_type = Argv + else: + argv_type: type[Argv] = __argv_type__.get() + argv = argv_type(self.meta, self.namespace_config, self.separators) + return Analyser(self, argv, compiler) def __init__( self, @@ -424,7 +431,7 @@ def _parse(self, message: TDC, ctx: dict[str, Any] | None = None) -> Arparma[TDC if (res := alc._parse(message, ctx)).matched: return res analyser = command_manager.require(self) - argv = command_manager.resolve(self) + argv = analyser.argv argv.enter(ctx).build(message) if argv.message_cache and (res := command_manager.get_record(argv.token)): return res diff --git a/src/arclet/alconna/manager.py b/src/arclet/alconna/manager.py index 667b256..253b7be 100644 --- a/src/arclet/alconna/manager.py +++ b/src/arclet/alconna/manager.py @@ -17,6 +17,7 @@ from .argv import Argv, __argv_type__ from .arparma import Arparma +from .base import Header from .config import Namespace, config from .exceptions import ExceedMaxCount from .typing import TDC, CommandMeta, DataCollection, InnerShortcutArgs, ShortcutArgs @@ -41,7 +42,6 @@ def max_count(self) -> int: return config.command_max_count __analysers: dict[int, Analyser] - __argv: dict[int, Argv] __abandons: list[int] __record: LRU[int, Arparma] __shortcuts: dict[str, tuple[dict[str, InnerShortcutArgs], dict[str, InnerShortcutArgs]]] @@ -50,7 +50,6 @@ def __init__(self): self.sign = "ALCONNA::" self.current_count = 0 - self.__argv = {} self.__analysers = {} self.__abandons = [] self.__shortcuts = {} @@ -124,23 +123,12 @@ def register(self, command: Alconna) -> None: if self.current_count >= self.max_count: raise ExceedMaxCount cmd_hash = command._hash - self.__argv.pop(cmd_hash, None) - self.__argv[cmd_hash] = __argv_type__.get()(command.meta, command.namespace_config, command.separators) # type: ignore self.__analysers.pop(cmd_hash, None) self.__analysers[cmd_hash] = command.compile() def _resolve(self, cmd_hash: int) -> Alconna: return self.__analysers[cmd_hash].command - def resolve(self, command: Alconna) -> Argv: - """获取命令解析器的参数解析器""" - cmd_hash = command._hash - try: - return self.__argv[cmd_hash] - except KeyError as e: - namespace, name = self._command_part(command.path) - raise ValueError(lang.require("manager", "undefined_command").format(target=f"{namespace}.{name}")) from e - def require(self, command: Alconna) -> Analyser: """获取命令解析器""" cmd_hash = command._hash @@ -155,7 +143,6 @@ def delete(self, command: Alconna) -> None: cmd_hash = command._hash try: command.formatter.remove(command) - del self.__argv[cmd_hash] del self.__analysers[cmd_hash] self.current_count -= 1 except KeyError: @@ -165,22 +152,19 @@ def delete(self, command: Alconna) -> None: def update(self, command: Alconna): """同步命令更改""" cmd_hash = command._hash - if cmd_hash not in self.__argv: + if cmd_hash not in self.__analysers: raise ValueError(lang.require("manager", "undefined_command").format(target=command.path)) self.clear_result(command) command.formatter.remove(command) - argv = self.__argv.pop(cmd_hash) - analyser = self.__analysers.pop(cmd_hash) + del self.__analysers[cmd_hash] yield + command._header = Header.generate(command.command, command.prefixes, command.meta.compact) name = next(iter(command._header.content), command.command or command.prefixes[0]) command.path = f"{command.namespace}::{name}" + command.dest = command.name = name + command.aliases = frozenset(command._header.content) cmd_hash = command._hash = command._calc_hash() - argv.namespace = command.namespace_config - argv.separators = command.separators - argv.__post_init__(command.meta) - self.__argv[cmd_hash] = argv - analyser.compile() - self.__analysers[cmd_hash] = analyser + self.__analysers[cmd_hash] = command.compile() command.formatter.add(command) def is_disable(self, command: Alconna) -> bool: @@ -205,7 +189,7 @@ def add_shortcut(self, target: Alconna, key: str | TPattern, source: ShortcutArg source (ShortcutArgs): 快捷命令的参数 """ namespace, name = self._command_part(target.path) - argv = self.resolve(target) + argv = self.require(target).argv _shortcut = self.__shortcuts.setdefault(f"{namespace}.{name}", ({}, {})) if isinstance(key, str): _key = key diff --git a/tests/core_test.py b/tests/core_test.py index e517877..7048ec5 100644 --- a/tests/core_test.py +++ b/tests/core_test.py @@ -15,7 +15,7 @@ MultiVar, Option, Subcommand, - namespace, + namespace, command_manager, ) @@ -660,22 +660,6 @@ def test_completion(): * foo\ """ ) -# assert alc20.parse("core20 f --comp").output == """以下是建议的输入: -# * fool -# * foo -# * off""" -# assert alc20.parse("core20 fo --comp").output == """以下是建议的输入: -# * fool -# * foo""" -# assert alc20.parse("core20 foo --comp").output == """以下是建议的输入: -# * bar: choose a, b or c""" -# assert alc20.parse("core20 fool --comp").output == """以下是建议的输入: -# * foo -# * off""" -# assert alc20.parse("core20 off b --comp").output == """以下是建议的输入: -# * baz: use aaa -# * baz: use aab -# * baz: use abc""" alc20_1 = Alconna("core20_1", Args["foo", int], Option("bar")) res = alc20_1.parse("core20_1 -cp") @@ -1015,5 +999,19 @@ def test_extra_allow(): assert res.all_matched_args.get("$extra", []) == ["--baz", "--qux"] +def test_update(): + core30 = Alconna("core30", Option("--foo", Args["bar", str]), Option("--baz", Args["qux", str])) + res = core30.parse("core30 --foo bar --baz qux") + assert res.matched + assert res.query[str]("foo.bar") == "bar" + + with command_manager.update(core30): + core30.command = "core30_1" + + res1 = core30.parse("core30_1 --foo bar --baz qux") + assert res1.matched + assert res1.query[str]("foo.bar") == "bar" + + if __name__ == "__main__": pytest.main([__file__, "-vs"])