-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
84 lines (64 loc) · 2.18 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import sys
from asyncio import gather, create_task
from loguru import logger
from textual.color import Color
from nonechat.app import Frontend
from nonechat.backend import Backend
from nonechat.setting import ConsoleSetting
from nonechat.message import Text, ConsoleMessage
from nonechat.info import Event, Robot, MessageEvent
class ExampleBackend(Backend):
callbacks = []
def __init__(self, frontend: "Frontend"):
super().__init__(frontend)
self._stderr = sys.stderr
self._logger_id = None
def on_console_load(self):
print("on_console_load")
logger.remove()
self._logger_id = logger.add(self.frontend._fake_output, level=0, diagnose=False)
def on_console_mount(self):
logger.info("on_console_mount")
def on_console_unmount(self):
if self._logger_id is not None:
logger.remove(self._logger_id)
self._logger_id = None
logger.add(
self._stderr,
backtrace=True,
diagnose=True,
colorize=True,
)
# logger.success("Console exit.")
# logger.warning("Press Ctrl-C for Application exit")
async def post_event(self, event: Event):
logger.info("post_event")
if isinstance(event, MessageEvent):
await gather(*[create_task(callback(event)) for callback in self.callbacks])
def register(self):
def wrapper(func):
self.callbacks.append(func)
return func
return wrapper
app = Frontend(
ExampleBackend,
ConsoleSetting(
title="Test",
sub_title="This is a test.",
room_title="Room",
icon="🤖",
bg_color=Color(40, 44, 52),
title_color=Color(229, 192, 123),
header_color=Color(90, 99, 108, 0.6),
icon_color=Color.parse("#22b14c"),
toolbar_exit="❌",
bot_name="Nonebot",
),
)
async def send_message(message: ConsoleMessage):
await app.call("send_msg", {"message": message, "info": Robot("robot")})
@app.backend.register()
async def on_message(event: MessageEvent):
if str(event.message) == "ping":
await send_message(ConsoleMessage([Text("pong!")]))
app.run()