-
Notifications
You must be signed in to change notification settings - Fork 0
/
bot.py
200 lines (153 loc) · 6.41 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import asyncio
import logging
import sys
from io import BytesIO
from os import getenv
import aiofiles
from aiogram import Bot, Dispatcher, html
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from aiogram.types import FSInputFile
from schemas.cell import Cell
from schemas.user import User, UserStatus
from aiogram.filters import CommandStart, Command, CommandObject
from depends import get_database_user_service, get_database_map_service
from aiogram.types import Message
from dotenv import load_dotenv
import random
import datetime as dt
from aiogram.utils.formatting import (
Bold, as_list, as_marked_section, as_key_value, HashTag, TextLink, html_decoration
)
import string
from PIL import Image, ImageDraw
load_dotenv()
TOKEN = getenv("BOT_TOKEN")
SYMBOLS_FOR_SALT = "ABC"
dp = Dispatcher()
async def save_image(path: str, image: memoryview) -> None:
async with aiofiles.open(path, "wb") as file:
await file.write(image)
@dp.message(CommandStart())
async def command_start_handler(message: Message) -> None:
text = f"""<b>Привет 👋)</b>
это бот для регистрации и привязке аккаунта на сайте <a href='https://artemki77.ru'>artemki77.ru</a>
для регистрации вы можете воспользоваться командой /setpass, инструкция ниже
<b>доступные команды:</b>
1. /setpass
{html_decoration.pre('Команда для того чтобы задать или сменить пароль, формат: /setpass "ваш пароль"')}
2. /info
{html_decoration.pre('Сосояние сервера, а так же выводит карту')}
3. /me, /stats
{html_decoration.pre('выводит ваш статус, а так же важу статистику')}
"""
username = message.from_user.username.lower()
db_users = get_database_user_service()
users_from_db = await db_users.get_user_by_username(username)
if not users_from_db:
user = User(
username=username,
last_click=dt.datetime.now(),
)
await db_users.add_user(user)
await message.answer(text)
@dp.message(Command("test", "admin"))
async def test(message: Message) -> None:
await message.answer("и чё?")
@dp.message(Command("info"))
async def command_info_handler(message: Message) -> None:
KEY_INFO_DB = "INFO_IMG"
db_map = get_database_map_service()
result = await db_map.get_by_key(KEY_INFO_DB)
count_click = str(await db_map.get_count_clicks())
count_click = 1
if result != count_click:
all_map: list[Cell] = await db_map.get_all_map()
if not all_map:
await message.answer("карта пока пуста(")
return
img = Image.new('RGBA', (2000, 2000), 'white')
buffer = BytesIO()
idraw = ImageDraw.Draw(img)
for cell in all_map:
size_cell = 40 # px
cell_cords = list(map(int, cell.cords.split()))
cell_cords = list(map(lambda x: x * 40, cell_cords))
cell_cords += list(map(lambda cord: cord + size_cell, cell_cords))
idraw.rectangle(cell_cords, fill=cell.color)
img.save(buffer, format="PNG")
await save_image("info.png", buffer.getbuffer())
await db_map.set_by_key(KEY_INFO_DB, count_click)
await message.answer_photo(FSInputFile("info.png"))
else:
await message.answer_photo(FSInputFile("info.png"))
@dp.message(Command("setpass"))
async def command_setpass_handler(message: Message, command: CommandObject) -> None:
if command.args is None:
text = f"""
{html_decoration.bold('❗️Вы не ввели пароль:')}
{html_decoration.pre('/setpass "ваш пароль"')}
"""
await message.answer(text)
return
username = message.from_user.username.lower()
db_users = get_database_user_service()
users_from_db = await db_users.get_user_by_username(username)
if not users_from_db:
user = User(
username=username,
last_click=dt.datetime.now(),
)
await db_users.add_user(user)
else:
user = users_from_db[0]
new_password = command.args.strip()
new_salt = "".join(random.sample(string.ascii_lowercase, 5))
new_hash = db_users.hash_pass(new_password, new_salt)
user.password_salt = new_salt
user.password_hash = new_hash
old_status = user.status
user.status = UserStatus.activated
await db_users.add_user(user)
if old_status == UserStatus.at_registration:
text = f"""
{html_decoration.bold('✅Вы зарегистрировались')}
теперь вы можете войти на сайте <a href='https://artemki77.ru'>artemki77.ru</a>
логин: {username}
пароль: {new_password}
{html_decoration.italic('сайт и бот сохраняют только хэши паролей')}
"""
await message.answer(text)
else:
text = f"""
✅Вы поменяли пароль
теперь вы можете войти на сайте <a href='https://artemki77.ru'>artemki77.ru</a>
логин: {username}
пароль: {new_password}
{html_decoration.italic('сайт и бот сохраняют только хэши паролей')}
"""
await message.answer(text)
@dp.message(Command("me", "stats"))
async def command_stats_handler(message: Message) -> None:
username = message.from_user.username.lower()
db_users = get_database_user_service()
users = await db_users.get_user_by_username(username)
if not users:
text = html_decoration.bold("❗Вас нету в базе данных или вы заблокированны:")
await message.answer(text)
return
user = users[0]
text = f"""
👤{user.username}
{html_decoration.bold('роль:')} {user.role.value}
{html_decoration.bold('статус:')} {user.status.value}
{html_decoration.bold('последний клик:')} {user.last_click.isoformat()}
{html_decoration.bold('количество кликов:')} {user.count_click}
"""
await message.answer(text)
async def main() -> None:
bot = Bot(token=TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
await dp.start_polling(bot)
if __name__ == "__main__":
# logging.basicConfig(level=logging.INFO, stream=sys.stdout)
asyncio.run(main())