Skip to content

Commit

Permalink
🍻 mount Argv to Analyser
Browse files Browse the repository at this point in the history
  • Loading branch information
RF-Tar-Railt committed Oct 20, 2024
1 parent dbeeb66 commit d792ed8
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 97 deletions.
22 changes: 11 additions & 11 deletions src/arclet/alconna/_internal/_analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ def reset(self):
self.value_result = None
self.header_result = None

def process(self, argv: Argv[TDC], name_validated: bool = True) -> Self:
def process(self, argv: Argv, name_validated: bool = True) -> Self:
"""处理传入的参数集合
Args:
argv (Argv[TDC]): 命令行参数
argv (Argv): 命令行参数
name_validated (bool, optional): 是否已经验证过名称. Defaults to True.
Returns:
Expand Down Expand Up @@ -187,31 +187,31 @@ class Analyser(SubAnalyser):

command: Alconna
"""命令实例"""
argv: Argv
"""命令行参数"""

def __init__(self, alconna: Alconna, compiler: TCompile | None = None):
def __init__(self, alconna: Alconna, argv: Argv, compiler: TCompile | None = None):
"""初始化解析器
Args:
alconna (Alconna): 命令实例
argv (Argv): 命令行参数
compiler (TCompile | None, optional): 编译器方法
"""
super().__init__(alconna)
self._compiler = compiler or default_compiler

def compile(self):
self.argv = argv
self.extra_allow = not self.command.meta.strict or not self.command.namespace_config.strict
self._compiler(self)
command_manager.resolve(self.command).stack_params.base = self.compile_params
return self
(compiler or default_compiler)(self)
self.argv.stack_params.base = self.compile_params

def __repr__(self):
return f"<{self.__class__.__name__} of {self.command.path}>"

def process(self, argv: Argv[TDC], name_validated: bool = True) -> Exception | None:
def process(self, argv: Argv, name_validated: bool = True) -> Exception | None:
"""主体解析函数, 应针对各种情况进行解析
Args:
argv (Argv[TDC]): 命令行参数
argv (Argv): 命令行参数
name_validated (bool, optional): 是否已经验证过名称. Defaults to True.
"""
if not self.header_result or not name_validated:
Expand Down
28 changes: 19 additions & 9 deletions src/arclet/alconna/_internal/_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
from ..base import Option, Header
from ..config import config
from ..exceptions import (
AlconnaException,
AnalyseException,
ArgumentMissing,
FuzzyMatchSuccess,
InvalidHeader,
InvalidParam,
PauseTriggered, ParamsUnmatched,
PauseTriggered,
ParamsUnmatched,
)
from ..model import HeadResult, OptionResult
from ..typing import KWBool, MultiKeyWordVar, MultiVar, _AllParamPattern, _StrMulti
Expand Down Expand Up @@ -353,13 +354,12 @@ def analyse_option(analyser: SubAnalyser, argv: Argv, opt: Option, name_validate
analyser.options_result[opt_n] = handle_action(opt, analyser.options_result[opt_n], opt_v)


def analyse_compact_params(analyser: SubAnalyser, argv: Argv, prefix: str):
def analyse_compact_params(analyser: SubAnalyser, argv: Argv):
"""分析紧凑参数
Args:
analyser (SubAnalyser): 当前解析器
argv (Argv): 命令行参数
prefix (str): 参数前缀
"""
exc = None
for param in analyser.compact_params:
Expand All @@ -381,7 +381,7 @@ def analyse_compact_params(analyser: SubAnalyser, argv: Argv, prefix: str):
else:
analyser.subcommands_result[sparam.command.dest] = sparam.result()
raise
except AlconnaException:
except AnalyseException:
analyser.subcommands_result[sparam.command.dest] = sparam.result()
raise
else:
Expand Down Expand Up @@ -417,17 +417,22 @@ def analyse_param(analyser: SubAnalyser, argv: Argv, seps: str | None = None):
argv (Argv): 命令行参数
seps (str, optional): 指定的分隔符.
"""
# 每次调用都会尝试解析一个参数
_text, _str = argv.next(seps)
# analyser.compile_params 有命中,说明在当前子命令内有对应的选项/子命令
if _str and _text and (_param := analyser.compile_params.get(_text)):
if Option in _param.__class__.__mro__:
# Help 之类的选项是 Option 子类, 得加上 __base__ 判断
if _param.__class__ is Option or _param.__class__.__base__ is Option:
oparam: Option = _param # type: ignore
try:
# 因为 _text 已经被确定为选项名,所以 name_validated 为 True
analyse_option(analyser, argv, oparam, True)
except AlconnaException as e:
except AnalyseException as e:
if not argv.error:
argv.error = e
return True
sparam: SubAnalyser = _param # type: ignore
# 禁止子命令重复解析
if sparam.command.dest not in analyser.subcommands_result:
try:
sparam.process(argv)
Expand All @@ -441,26 +446,31 @@ def analyse_param(analyser: SubAnalyser, argv: Argv, seps: str | None = None):
analyser.subcommands_result[sparam.command.dest] = sparam.result()
if not argv.error:
argv.error = e
except AlconnaException as e1:
except AnalyseException as e1:
analyser.subcommands_result[sparam.command.dest] = sparam.result()
if not argv.error:
argv.error = e1
else:
analyser.subcommands_result[sparam.command.dest] = sparam.result()
return True
# 如果没有命中,则说明当前参数可能存在自定义分隔符,或者属于子命令的主参数,那么需要重新解析
argv.rollback(_text)
if _str and _text and analyser.compact_params and analyse_compact_params(analyser, argv, _text):
# 尝试以紧凑参数解析
if _str and _text and analyser.compact_params and analyse_compact_params(analyser, argv):
return True
# 主参数同样只允许解析一次
if analyser.command.nargs and not analyser.args_result:
analyser.args_result = analyse_args(argv, analyser.self_args)
if analyser.args_result:
return True
# 若参数属于该子命令的同级/上级选项或子命令,则终止解析
if _str and _text and _text in argv.stack_params.parents():
return False
if analyser.extra_allow:
analyser.args_result.setdefault("$extra", []).append(_text)
argv.next()
return True
# 给 Completion 打的洞,若此时 analyser 属于主命令, 则让其先解析完主命令
elif _str and _text and not argv.stack_params.stack:
if not argv.error:
argv.error = ParamsUnmatched(lang.require("analyser", "param_unmatched").format(target=_text))
Expand Down
6 changes: 0 additions & 6 deletions src/arclet/alconna/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,22 +417,16 @@ def add(self, opt: Option | Subcommand) -> Self:


class Help(Option):
soft_keyword = False

def _calc_hash(self):
return hash("$ALCONNA_BUILTIN_OPTION_HELP")


class Shortcut(Option):
soft_keyword = False

def _calc_hash(self):
return hash("$ALCONNA_BUILTIN_OPTION_SHORTCUT")


class Completion(Option):
soft_keyword = False

def _calc_hash(self):
return hash("$ALCONNA_BUILTIN_OPTION_COMPLETION")

Expand Down
36 changes: 13 additions & 23 deletions src/arclet/alconna/completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def enter(self, content: list | None = None) -> EnterResult:
Raises:
ValueError: 当前没有可用的补全选项, 或者当前补全选项不可用。
"""
argv = command_manager.resolve(self.source.command)
argv = command_manager.require(self.source.command).argv
argv.raw_data = self.raw_data.copy()
argv.bak_data = self.bak_data.copy()
argv.current_index = self.current_index
Expand Down Expand Up @@ -226,17 +226,6 @@ def fresh(self, exc: PauseTriggered):
comp_ctx: ContextModel[CompSession] = ContextModel("comp_ctx")


def _prompt_unit(command: Alconna, argv: Argv, trig: Arg):
if not (comp := trig.field.get_completion()):
return [Prompt(command.formatter.param(trig), False)]
if isinstance(comp, str):
return [Prompt(f"{trig.name}: {comp}", False)]
releases = argv.release(recover=True)
target = str(releases[-1]) or str(releases[-2])
o = list(filter(lambda x: target in x, comp)) or comp
return [Prompt(f"{trig.name}: {i}", False, target) for i in o]


def _prompt_none(command: Alconna, args_got: list[str], opts_got: list[str]):
res: list[Prompt] = []
if unit := next((arg for arg in command.args if arg.name not in args_got), None):
Expand All @@ -256,21 +245,22 @@ def _prompt_none(command: Alconna, args_got: list[str], opts_got: list[str]):

def prompt(command: Alconna, argv: Argv, args_got: list[str], opts_got: list[str], trigger: str | Arg | Subcommand | None = None):
"""获取补全列表"""
if isinstance(trigger, Arg):
return _prompt_unit(command, argv, trigger)
elif isinstance(trigger, Subcommand):
return [Prompt(i) for i in argv.stack_params.stack[-1]]
elif isinstance(trigger, str):
res = list(filter(lambda x: trigger in x, argv.stack_params.base))
if not res:
return []
out = [i for i in res if i not in opts_got]
return [Prompt(i, True, trigger) for i in (out or res)]
releases = argv.release(recover=True)
target = str(releases[-1])
if isinstance(releases[-1], str) and releases[-1] in command.namespace_config.builtin_option_name["completion"]:
target = str(releases[-2])
if _res := list(filter(lambda x: target in x and target != x, argv.stack_params.base)):
if isinstance(trigger, Arg):
if not (comp := trigger.field.get_completion()):
return [Prompt(command.formatter.param(trigger), False)]
if isinstance(comp, str):
return [Prompt(f"{trigger.name}: {comp}", False)]
o = list(filter(lambda x: target in x, comp)) or comp
return [Prompt(f"{trigger.name}: {i}", False, target) for i in o]
elif isinstance(trigger, Subcommand):
return [Prompt(i) for i in argv.stack_params.stack[-1]]
if isinstance(trigger, str):
target = trigger
if _res := list(filter(lambda x: target in x, argv.stack_params.base)):
out = [i for i in _res if i not in opts_got]
return [Prompt(i, True, target) for i in (out or _res)]
return _prompt_none(command, args_got, opts_got)
21 changes: 14 additions & 7 deletions src/arclet/alconna/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Generic, Literal, Sequence, TypeVar, cast, overload
from typing import Any, Callable, Generic, Literal, Sequence, TypeVar, cast, overload, TYPE_CHECKING
from typing_extensions import Self
from weakref import WeakSet

Expand All @@ -16,6 +16,7 @@
from ._internal._handlers import handle_head_fuzzy, analyse_header
from ._internal._shortcut import shortcut as _shortcut
from .args import Arg, Args
from .argv import Argv, __argv_type__
from .arparma import Arparma, ArparmaBehavior, requirement_handler
from .base import Completion, Help, Option, Shortcut, Subcommand, Header, SPECIAL_OPTIONS
from .config import Namespace, config
Expand Down Expand Up @@ -48,11 +49,11 @@ def handle_argv():

def add_builtin_options(options: list[Option | Subcommand], cmd: Alconna, ns: Namespace) -> None:
if "help" not in ns.disable_builtin_options:
options.append(Help("|".join(ns.builtin_option_name["help"]), dest="$help", help_text=lang.require("builtin", "option_help"))) # noqa: E501
options.append(Help("|".join(ns.builtin_option_name["help"]), dest="$help", help_text=lang.require("builtin", "option_help"), soft_keyword=False)) # noqa: E501

@cmd.route("$help")
def _(command: Alconna, arp: Arparma):
argv = command_manager.resolve(cmd)
argv = command_manager.require(cmd).argv
_help_param = [str(i) for i in argv.release(recover=True) if str(i) not in ns.builtin_option_name["help"]]
arp.output = command.formatter.format_node(_help_param)
return True
Expand All @@ -64,6 +65,7 @@ def _(command: Alconna, arp: Arparma):
Args["action?", "delete|list"]["name?", str]["command?", str],
dest="$shortcut",
help_text=lang.require("builtin", "option_shortcut"),
soft_keyword=False,
)
)

Expand All @@ -84,11 +86,11 @@ def _(command: Alconna, arp: Arparma):
return True

if "completion" not in ns.disable_builtin_options:
options.append(Completion("|".join(ns.builtin_option_name["completion"]), dest="$completion", help_text=lang.require("builtin", "option_completion"))) # noqa: E501
options.append(Completion("|".join(ns.builtin_option_name["completion"]), dest="$completion", help_text=lang.require("builtin", "option_completion"), soft_keyword=False)) # noqa: E501

@cmd.route("$completion")
def _(command: Alconna, arp: Arparma):
argv = command_manager.resolve(cmd)
argv = command_manager.require(cmd).argv
rest = argv.release()
trigger = None
if rest and isinstance(rest[-1], str) and rest[-1] in ns.builtin_option_name["completion"]:
Expand Down Expand Up @@ -193,7 +195,12 @@ class Alconna(Subcommand):

def compile(self, compiler: TCompile | None = None) -> Analyser:
"""编译 `Alconna` 为对应的解析器"""
return Analyser(self, compiler).compile()
if TYPE_CHECKING:
argv_type = Argv
else:
argv_type: type[Argv] = __argv_type__.get()
argv = argv_type(self.meta, self.namespace_config, self.separators)
return Analyser(self, argv, compiler)

def __init__(
self,
Expand Down Expand Up @@ -424,7 +431,7 @@ def _parse(self, message: TDC, ctx: dict[str, Any] | None = None) -> Arparma[TDC
if (res := alc._parse(message, ctx)).matched:
return res
analyser = command_manager.require(self)
argv = command_manager.resolve(self)
argv = analyser.argv
argv.enter(ctx).build(message)
if argv.message_cache and (res := command_manager.get_record(argv.token)):
return res
Expand Down
Loading

0 comments on commit d792ed8

Please sign in to comment.