Skip to content

Commit

Permalink
🍻 separate shortcut handle
Browse files Browse the repository at this point in the history
  • Loading branch information
RF-Tar-Railt committed Oct 13, 2024
1 parent 63ee1bb commit d0d32f4
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 189 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ dev = [
]

[tool.pdm.scripts]
test = "pytest -v -W ignore --ignore entry_test.py"
test = "pytest -v -W ignore --ignore entry_test.py --durations=0 -s"
benchmark = "python benchmark.py"
deps = "pydeps -o alconna.svg ./src/arclet/alconna --max-bacon=4 --cluster --keep-target-cluster --rmprefix alconna. "

Expand Down
86 changes: 22 additions & 64 deletions src/arclet/alconna/_internal/_analyser.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from __future__ import annotations

import re
from dataclasses import dataclass, field
from re import Match
from typing import TYPE_CHECKING, Any, Callable, Generic, Set
from typing import TYPE_CHECKING, Any, Callable, Set
from typing_extensions import Self, TypeAlias

from tarina import Empty, lang
Expand All @@ -25,11 +23,9 @@
from ..manager import command_manager
from ..model import HeadResult, OptionResult, SubcommandResult
from ..output import output_manager
from ..typing import TDC, InnerShortcutArgs
from ..typing import TDC
from ._handlers import (
HEAD_HANDLES,
_handle_shortcut_data,
_handle_shortcut_reg,
handle_head_fuzzy,
analyse_args,
analyse_param,
Expand All @@ -39,6 +35,7 @@
handle_shortcut,
prompt,
)
from ._shortcut import shortcut
from ._header import Header
from ._util import levenshtein

Expand Down Expand Up @@ -78,7 +75,7 @@ def default_compiler(analyser: SubAnalyser, pids: set[str]):


@dataclass
class SubAnalyser(Generic[TDC]):
class SubAnalyser:
"""子解析器, 用于子命令的解析"""

command: Subcommand
Expand All @@ -87,9 +84,9 @@ class SubAnalyser(Generic[TDC]):
"""命令是否只有主参数"""
need_main_args: bool = field(default=False)
"""是否需要主参数"""
compile_params: dict[str, Option | SubAnalyser[TDC]] = field(default_factory=dict)
compile_params: dict[str, Option | SubAnalyser] = field(default_factory=dict)
"""编译的节点"""
compact_params: list[Option | SubAnalyser[TDC]] = field(default_factory=list)
compact_params: list[Option | SubAnalyser] = field(default_factory=list)
"""可能紧凑的需要逐个解析的节点"""
self_args: Args = field(init=False)
"""命令自身参数"""
Expand Down Expand Up @@ -206,7 +203,7 @@ def analyse(self, argv: Argv[TDC]) -> Self:
)
return self

def get_sub_analyser(self, target: Subcommand) -> SubAnalyser[TDC] | None:
def get_sub_analyser(self, target: Subcommand) -> SubAnalyser | None:
"""获取子解析器
Args:
Expand All @@ -222,7 +219,7 @@ def get_sub_analyser(self, target: Subcommand) -> SubAnalyser[TDC] | None:
return param.get_sub_analyser(target)


class Analyser(SubAnalyser[TDC], Generic[TDC]):
class Analyser(SubAnalyser):
"""命令解析器"""

command: Alconna
Expand All @@ -234,11 +231,11 @@ class Analyser(SubAnalyser[TDC], Generic[TDC]):
header_handler: Callable[[Header, Argv], HeadResult]
"""头部处理器"""

def __init__(self, alconna: Alconna[TDC], compiler: TCompile | None = None):
def __init__(self, alconna: Alconna, compiler: TCompile | None = None):
"""初始化解析器
Args:
alconna (Alconna[TDC]): 命令实例
alconna (Alconna): 命令实例
compiler (TCompile | None, optional): 编译器方法
"""
super().__init__(alconna)
Expand All @@ -259,50 +256,6 @@ def _clr(self):
def __repr__(self):
return f"<{self.__class__.__name__} of {self.command.path}>"

def shortcut(
self, argv: Argv[TDC], data: list[Any], short: Arparma | InnerShortcutArgs, reg: Match | None = None
) -> Arparma[TDC] | None:
"""处理被触发的快捷命令
Args:
argv (Argv[TDC]): 命令行参数
data (list[Any]): 剩余参数
short (Arparma | InnerShortcutArgs): 快捷命令
reg (Match | None): 可能的正则匹配结果
Returns:
Arparma[TDC] | None: Arparma 解析结果
Raises:
ParamsUnmatched: 若不允许快捷命令后随其他参数,则抛出此异常
"""
self.reset()

if isinstance(short, Arparma):
return short

argv.build(short.command) # type: ignore
if not short.fuzzy and data:
exc = ParamsUnmatched(lang.require("analyser", "param_unmatched").format(target=data[0]))
if self.command.meta.raise_exception:
raise exc
return self.export(argv, True, exc)
argv.addon(short.args, merge_str=False)
data = _handle_shortcut_data(argv, data)
if not data and argv.raw_data and any(isinstance(i, str) and bool(re.search(r"\{%(\d+)|\*(.*?)\}", i)) for i in argv.raw_data):
exc = ArgumentMissing(lang.require("analyser", "param_missing"))
if self.command.meta.raise_exception:
raise exc
return self.export(argv, True, exc)
argv.addon(data, merge_str=False)
if reg:
data = _handle_shortcut_reg(argv, reg.groups(), reg.groupdict(), short.wrapper)
argv.raw_data.clear()
argv.ndata = 0
argv.current_index = 0
argv.addon(data)
return

def process(self, argv: Argv[TDC]) -> Arparma[TDC]:
"""主体解析函数, 应针对各种情况进行解析
Expand Down Expand Up @@ -337,16 +290,21 @@ def process(self, argv: Argv[TDC]) -> Arparma[TDC]:
if self.command.meta.raise_exception:
raise e from exc
return self.export(argv, True, e)
else:
argv.context[SHORTCUT_ARGS] = short
argv.context[SHORTCUT_REST] = rest
argv.context[SHORTCUT_REGEX_MATCH] = mat

if arp := self.shortcut(argv, rest, short, mat):
argv.context[SHORTCUT_ARGS] = short
argv.context[SHORTCUT_REST] = rest
argv.context[SHORTCUT_REGEX_MATCH] = mat
self.reset()
try:
if arp := shortcut(argv, rest, short, mat):
return arp
except Exception as e1:
if self.command.meta.raise_exception:
raise
return self.export(argv, True, e1)

self.header_result = self.header_handler(self.command_header, argv)
self.header_result.origin = _next
self.header_result = self.header_handler(self.command_header, argv)
self.header_result.origin = _next

except RuntimeError as e:
exc = InvalidParam(lang.require("header", "error").format(target=argv.release(recover=True)[0]))
Expand Down
110 changes: 0 additions & 110 deletions src/arclet/alconna/_internal/_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,116 +591,6 @@ def handle_shortcut(analyser: Analyser, argv: Argv):
return analyser.export(argv, True, SpecialOptionTriggered("shortcut"))


INDEX_SLOT = re.compile(r"\{%(\d+)\}")
WILDCARD_SLOT = re.compile(r"\{\*(.*)\}", re.DOTALL)


def _gen_extend(data: list, sep: str):
extend = []
for slot in data:
if isinstance(slot, str) and extend and isinstance(extend[-1], str):
extend[-1] += sep + slot
else:
extend.append(slot)
return extend


def _handle_multi_slot(argv: Argv, unit: str, data: list, index: int, current: int, offset: int):
slot = data[index]
if not isinstance(slot, str):
left, right = unit.split(f"{{%{index}}}", 1)
if left.strip():
argv.raw_data[current] = left.strip()
argv.raw_data.insert(current + 1, slot)
if right.strip():
argv.raw_data[current + 2] = right.strip()
offset += 1
else:
argv.raw_data[current + offset] = unescape(unit.replace(f"{{%{index}}}", slot))
return offset


def _handle_shortcut_data(argv: Argv, data: list):
data_len = len(data)
record = set()
offset = 0
for i, unit in enumerate(argv.raw_data.copy()):
if not isinstance(unit, str):
continue
unit = escape(unit)
if mat := INDEX_SLOT.fullmatch(unit):
index = int(mat[1])
if index >= data_len:
continue
argv.raw_data[i + offset] = data[index]
record.add(index)
elif res := INDEX_SLOT.findall(unit):
for index in map(int, res):
if index >= data_len:
continue
offset = _handle_multi_slot(argv, unit, data, index, i, offset)
record.add(index)
elif mat := WILDCARD_SLOT.search(unit):
extend = _gen_extend(data, mat[1] or " ")
if unit == f"{{*{mat[1]}}}":
argv.raw_data.extend(extend)
else:
argv.raw_data[i + offset] = unescape(unit.replace(f"{{*{mat[1]}}}", "".join(map(str, extend))))
data.clear()
break

def recover_quote(_unit):
if isinstance(_unit, str) and any(_unit.count(sep) for sep in argv.separators) and not (_unit[0] in ('"', "'") and _unit[0] == _unit[-1]):
return f'"{_unit}"'
return _unit

return [recover_quote(unit) for i, unit in enumerate(data) if i not in record]


INDEX_REG_SLOT = re.compile(r"\{(\d+)\}")
KEY_REG_SLOT = re.compile(r"\{(\w+)\}")


def _handle_shortcut_reg(argv: Argv, groups: tuple[str, ...], gdict: dict[str, str], wrapper: _ShortcutRegWrapper):
data = []
for unit in argv.raw_data:
if not isinstance(unit, str):
data.append(unit)
continue
unit = escape(unit)
if mat := INDEX_REG_SLOT.fullmatch(unit):
index = int(mat[1])
if index >= len(groups):
continue
slot = groups[index]
data.append(wrapper(index, slot, argv.context))
continue
if mat := KEY_REG_SLOT.fullmatch(unit):
key = mat[1]
if key not in gdict:
continue
slot = gdict[key]
data.append(wrapper(key, slot, argv.context))
continue
if mat := INDEX_REG_SLOT.findall(unit):
for index in map(int, mat):
if index >= len(groups):
unit = unit.replace(f"{{{index}}}", "")
continue
slot = groups[index]
unit = unit.replace(f"{{{index}}}", str(wrapper(index, slot, argv.context) or ""))
if mat := KEY_REG_SLOT.findall(unit):
for key in mat:
if key not in gdict:
unit = unit.replace(f"{{{key}}}", "")
continue
slot = gdict[key]
unit = unit.replace(f"{{{key}}}", str(wrapper(key, slot, argv.context) or ""))
if unit:
data.append(unescape(unit))
return data


def _prompt_unit(analyser: Analyser, argv: Argv, trig: Arg):
if not (comp := trig.field.get_completion()):
return [Prompt(analyser.command.formatter.param(trig), False)]
Expand Down
2 changes: 0 additions & 2 deletions src/arclet/alconna/_internal/_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ def __repr__(self):
if not self.origin[1]:
return self.origin[0]
return f"[{'│'.join(self.origin[1])}]{self.origin[0]}"
if isinstance(self.content, list):
return "│".join(map(str, self.content))
return str(self.content)

@classmethod
Expand Down
Loading

0 comments on commit d0d32f4

Please sign in to comment.