-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgroup-bot.py
548 lines (487 loc) · 26.8 KB
/
group-bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
import os
import logging
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
CallbackQueryHandler
)
from telegram.error import BadRequest
# Настройка логгирования
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
# Загрузка токена
with open("bot_token.txt", "r") as f:
TOKEN = f.read().strip()
# Пути к файлам
USERS_FILE = "users.txt"
ADMINS_FILE = "admins.txt"
# Кэш состояний
states = {}
def load_groups(filename: str):
try:
with open(filename, "r") as f:
groups = [line.strip() for line in f if line.strip()]
if not groups:
logger.warning(f"Файл {filename} пуст")
return groups
except Exception as e:
logger.error(f"Ошибка чтения файла {filename}: {e}")
return []
def update_file(filename: str, chat_id: str, add: bool = True):
try:
with open(filename, "r+") as f:
ids = {line.strip() for line in f}
if add:
ids.add(chat_id)
else:
ids.discard(chat_id)
f.seek(0)
f.truncate()
f.write("\n".join(ids))
return True
except Exception as e:
logger.error(f"Error updating file {filename}: {e}")
return False
async def check_admin(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
Проверяет, является ли текущий чат админским, основываясь на списке групп из admins.txt.
"""
chat_id = str(update.effective_chat.id)
# Загружаем список админских групп
admin_chats = set(load_groups(ADMINS_FILE))
# Проверяем, входит ли текущий чат в список админских групп
return chat_id in admin_chats
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message.chat.type == 'private':
return
await update.message.reply_text("🤖 Бот запущен и готов к работе!")
async def toggle_mode(update: Update, context: ContextTypes.DEFAULT_TYPE, mode: bool):
chat_id = str(update.effective_chat.id)
user_chats = set(load_groups(USERS_FILE))
admin_chats = set(load_groups(ADMINS_FILE))
is_user_chat = chat_id in user_chats
is_admin_chat = chat_id in admin_chats
if not (is_user_chat or is_admin_chat):
await update.message.reply_text("⚠️ Эта команда доступна только в разрешенных группах.")
return
# Определяем тип группы
group_type = "👨🏻💼 Пользовательский" if is_user_chat else "👨🏻🔧 Админский"
states[chat_id] = mode
status = "ВКЛЮЧЕН ✅" if mode else "ВЫКЛЮЧЕН ❌"
await update.message.reply_text(f"{group_type} режим прослушивания сообщений {status}")
async def on_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message.chat.type == 'private':
return
await toggle_mode(update, context, True)
async def off_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message.chat.type == 'private':
return
await toggle_mode(update, context, False)
async def all_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message.chat.type == 'private':
return
if not await check_admin(update, context):
await update.message.reply_text("⚠️ Эта команда доступна только в группах администраторов.")
return
if not context.args:
await update.message.reply_text("💡 Использование: /all [текст сообщения]")
return
admin = update.message.from_user
# Заголовок, который будет отправлен отдельным сообщением
header = (
f"❗️ Важное сообщение от администратора @{escape_markdownv2(admin.username)}\n"
f"Группа: *{escape_markdownv2(update.effective_chat.title)}*\n"
f"ID группы: `{update.message.chat.id}`"
)
user_chats = load_groups(USERS_FILE)
for user_chat in user_chats:
try:
# Сначала отправляем заголовок
await context.bot.send_message(
chat_id=user_chat,
text=header,
parse_mode='MarkdownV2'
)
# Затем пересылаем (копируем) командное сообщение админа
await context.bot.copy_message(
chat_id=user_chat,
from_chat_id=update.message.chat.id,
message_id=update.message.message_id
)
await update.message.reply_text(f"✉️ Сообщение доставлено в группу {user_chat}.")
except BadRequest as e:
logger.error(f"BadRequest error while sending to {user_chat}: {e}")
await update.message.reply_text(f"⚠️ Ошибка при отправке сообщения в группу {user_chat}.")
except Exception as e:
logger.error(f"Unexpected error while sending to {user_chat}: {e}")
await update.message.reply_text(f"⚠️ Непредвиденная ошибка при отправке в группу {user_chat}.")
async def get_groups_id(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message.chat.type == 'private':
return
if not await check_admin(update, context):
await update.message.reply_text("⚠️ Эта команда доступна только в группах администраторов.")
return
groups = []
for filename in [USERS_FILE, ADMINS_FILE]:
with open(filename, "r") as f:
for chat_id in f.readlines():
chat_id = chat_id.strip()
if chat_id:
try:
chat = await context.bot.get_chat(chat_id)
title = escape_markdownv2(chat.title)
group_type = '👨🏻💼 Пользовательская' if filename == USERS_FILE else '👨🏻🔧 Админская'
groups.append(f"{group_type} группа: *{title}* \nID: `{chat_id}`")
except Exception as e:
logger.error(f"⚠️ Ошибка получения информации о чате {chat_id}: {e}")
response = "Список групп:\n\n" + "\n\n".join(groups)
# Разделение сообщения на части, если оно слишком длинное
messages_to_send = split_message(response)
# Отправка сообщений по частям
for msg in messages_to_send:
await update.message.reply_text(msg, parse_mode='MarkdownV2')
async def handle_group_management(update: Update, context: ContextTypes.DEFAULT_TYPE, file: str, add: bool):
if update.message.chat.type == 'private':
return
if not await check_admin(update, context):
await update.message.reply_text("⚠️ Эта команда доступна только в группах администраторов.")
return
chat_id = update.message.text.split()[-1]
if not chat_id.startswith('-'):
await update.message.reply_text("⚠️ Неверный формат ID группы")
return
success = update_file(file, chat_id, add)
action = "добавлена ✅" if add else "удалена ❌"
if success:
await update.message.reply_text(f"Группа {chat_id} {action}")
else:
await update.message.reply_text("⚠️ Ошибка при обновлении списка групп")
def split_message(message_text: str, max_length: int = 4096):
"""
Разбивает сообщение на части, если его длина превышает максимальный лимит (4096 символов).
Возвращает список сообщений.
"""
# Если сообщение меньше лимита, возвращаем его как есть
if len(message_text) <= max_length:
return [message_text]
# Разбиваем сообщение на части по max_length символов
messages = []
while len(message_text) > max_length:
# Находим максимальную длину сообщения, не превышающую лимит
part = message_text[:max_length]
messages.append(part)
message_text = message_text[max_length:]
# Добавляем оставшуюся часть, если она есть
if message_text:
messages.append(message_text)
return messages
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message.chat.type == 'private':
return
chat_id = str(update.effective_chat.id)
message = update.message
# Проверка принадлежности чата к разрешенным группам
user_chats = set(load_groups(USERS_FILE))
admin_chats = set(load_groups(ADMINS_FILE))
is_user_chat = chat_id in user_chats
is_admin_chat = chat_id in admin_chats
if not (is_user_chat or is_admin_chat):
return
# Обработка сообщений
if is_user_chat:
await handle_user_message(update, context)
elif is_admin_chat:
await handle_admin_message(update, context)
async def handle_user_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
message = update.message
chat_id = str(update.effective_chat.id)
# Запускаем обработку, если:
# - есть реплай, или
# - текст содержит упоминание бота, или включён режим /on.
if not ((message.text and f"@{context.bot.username}" in message.text) or
states.get(chat_id, False) or
message.reply_to_message):
return
admin_chats = load_groups(ADMINS_FILE)
user = message.from_user
chat = await context.bot.get_chat(chat_id)
header_text = (
f"✉️ Сообщение из группы *{escape_markdownv2(chat.title)}*\n"
f"От: @{escape_markdownv2(user.username)}\n"
f"ID группы: `{chat_id}`"
)
success = False
for admin_chat in admin_chats:
try:
# Отправляем заголовок с информацией
await context.bot.send_message(
chat_id=admin_chat,
text=header_text,
parse_mode='MarkdownV2'
)
if message.reply_to_message:
if message.reply_to_message.from_user.id == context.bot.id:
# Реплай на сообщение бота – старое поведение: пересылаем текущее сообщение
await context.bot.copy_message(
chat_id=admin_chat,
from_chat_id=chat_id,
message_id=message.message_id
)
else:
# Реплай на НЕ-ботовое сообщение – пересылаем два сообщения:
# сначала оригинальное (на которое дали реплай), затем само новое сообщение
await context.bot.copy_message(
chat_id=admin_chat,
from_chat_id=message.reply_to_message.chat.id,
message_id=message.reply_to_message.message_id
)
await context.bot.copy_message(
chat_id=admin_chat,
from_chat_id=chat_id,
message_id=message.message_id
)
else:
# Если реплая нет – пересылаем текущее сообщение
await context.bot.copy_message(
chat_id=admin_chat,
from_chat_id=chat_id,
message_id=message.message_id
)
success = True
except Exception as e:
logger.error(f"Ошибка при пересылке сообщения в админскую группу {admin_chat}: {e}")
if success:
await update.message.reply_text("✉️ Сообщение успешно доставлено администратору(ам).")
else:
await update.message.reply_text("⚠️ Не удалось доставить сообщение администраторам.")
async def handle_admin_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
message = update.message
chat_id = str(update.effective_chat.id)
# Проверяем, является ли сообщение реплаем
if message.reply_to_message:
# Проверяем, является ли реплай сообщением от бота и содержит ли оно заголовок
if message.reply_to_message.from_user.id == context.bot.id:
original_text = message.reply_to_message.text
if "✉️ Сообщение из группы" in original_text:
try:
# Извлекаем ID целевой группы из заголовка
parts = original_text.split('\n')
group_id = parts[2].split(': ')[-1].strip('`')
admin = message.from_user
header_text = (
f"✉️ Ответ из группы *{escape_markdownv2(update.effective_chat.title)}*\n"
f"От: @{escape_markdownv2(admin.username)}\n"
f"ID группы: `{update.effective_chat.id}`"
)
# Отправляем ответ в целевую группу
await context.bot.send_message(
chat_id=group_id,
text=header_text,
parse_mode='MarkdownV2'
)
await context.bot.copy_message(
chat_id=group_id,
from_chat_id=chat_id,
message_id=message.message_id
)
await update.message.reply_text(f"✉️ Сообщение доставлено в группу {group_id}.")
except Exception as e:
logger.error(f"⚠️ Ошибка отправки сообщения в чат: {e}")
await update.message.reply_text("⚠️ Произошла ошибка при отправке сообщения.")
else:
# Если реплай не содержит заголовка
await update.message.reply_text(
"⚠️ Пожалуйста, ответьте на сообщение с заголовком: ✉️ Сообщение из группы."
)
else:
# Реплай на НЕ-ботовое сообщение – новая логика (двойная пересылка)
user_chats = load_groups(USERS_FILE)
admin = message.from_user
header_text = (
f"✉️ Ответ из группы *{escape_markdownv2(update.effective_chat.title)}*\n"
f"От: @{escape_markdownv2(admin.username)}"
)
# Сохраняем информацию о reply_to_message в context.chat_data
context.chat_data["reply_to_message"] = message.reply_to_message
keyboard = []
keyboard.append([InlineKeyboardButton("*️⃣ Отправить всем", callback_data="send_to_all")])
for user_chat in user_chats:
try:
chat = await context.bot.get_chat(user_chat)
button_text = chat.title[:20]
keyboard.append([InlineKeyboardButton(button_text, callback_data=f"send_to_group_{user_chat}")])
except Exception as e:
logger.error(f"⚠️ Ошибка при получении информации о чате {user_chat}: {e}")
continue
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text("❓ Выберите группу для отправки сообщения:", reply_markup=reply_markup)
else:
# Если это не реплай, но есть упоминание бота или включен режим /on
if (message.text and f"@{context.bot.username}" in message.text) or states.get(chat_id, False):
user_chats = load_groups(USERS_FILE)
if not user_chats:
await update.message.reply_text("⚠️ Список пользовательских групп пуст.")
return
keyboard = []
keyboard.append([InlineKeyboardButton("*️⃣ Отправить всем", callback_data="send_to_all")])
for user_chat in user_chats:
try:
chat = await context.bot.get_chat(user_chat)
button_text = chat.title[:20]
keyboard.append([InlineKeyboardButton(button_text, callback_data=f"send_to_group_{user_chat}")])
except Exception as e:
logger.error(f"⚠️ Ошибка при получении информации о чате {user_chat}: {e}")
continue
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text("❓ Выберите группу для отправки сообщения:", reply_markup=reply_markup)
async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
action = query.data
admin_chat_id = str(query.message.chat.id)
if not await check_admin(update, context):
await query.edit_message_text("⚠️ Эта функция доступна только в группах администраторов.")
return
user_chats = load_groups(USERS_FILE)
if not user_chats:
await query.edit_message_text("⚠️ Список пользовательских групп пуст.")
return
# Получаем исходное сообщение
original_message = query.message.reply_to_message
if not original_message:
await query.edit_message_text("⚠️ Не удалось найти исходное сообщение.")
return
# Получаем reply_to_message из context.chat_data
reply_to_message = context.chat_data.get("reply_to_message")
admin = original_message.from_user
admin_chat = await context.bot.get_chat(admin_chat_id)
header_text = f"✉️ Сообщение из админской группы *{escape_markdownv2(admin_chat.title)}*\nОт: @{escape_markdownv2(admin.username)}"
# Логируем данные для отладки
logger.info(f"Action: {action}, Admin Chat ID: {admin_chat_id}, Original Message: {original_message.text}")
# Определяем, нужно ли выполнять двойную пересылку:
dual_forward = False
if reply_to_message and reply_to_message.from_user.id != context.bot.id:
dual_forward = True
logger.info("Dual forward is required.")
else:
logger.info("Single forward will be used.")
if action == "send_to_all":
for user_chat in user_chats:
try:
# Отправляем заголовок
await context.bot.send_message(
chat_id=user_chat,
text=header_text,
parse_mode='MarkdownV2'
)
# Если нужно выполнить двойную пересылку
if dual_forward:
# Пересылаем оригинальное сообщение (на которое дали реплай)
await context.bot.copy_message(
chat_id=user_chat,
from_chat_id=reply_to_message.chat.id,
message_id=reply_to_message.message_id
)
logger.info(f"Copied original message to {user_chat}.")
# Пересылаем само новое сообщение
await context.bot.copy_message(
chat_id=user_chat,
from_chat_id=original_message.chat.id,
message_id=original_message.message_id
)
logger.info(f"Copied new message to {user_chat}.")
else:
# Пересылаем только само сообщение
await context.bot.copy_message(
chat_id=user_chat,
from_chat_id=original_message.chat.id,
message_id=original_message.message_id
)
logger.info(f"Copied single message to {user_chat}.")
await context.bot.send_message(chat_id=admin_chat_id, text=f"✉️ Сообщение доставлено в группу {user_chat}.")
except Exception as e:
logger.error(f"Ошибка при пересылке сообщения в группу {user_chat}: {e}")
await context.bot.send_message(chat_id=admin_chat_id, text=f"⚠️ Ошибка при отправке сообщения в группу {user_chat}.")
# Очищаем reply_to_message после успешной отправки
context.chat_data.pop("reply_to_message", None)
await query.edit_message_text("✉️ Сообщение отправлено во все пользовательские группы.")
elif action.startswith("send_to_group_"):
group_id = action.split("_")[-1]
if group_id in user_chats:
try:
# Отправляем заголовок
await context.bot.send_message(
chat_id=group_id,
text=header_text,
parse_mode='MarkdownV2'
)
# Если нужно выполнить двойную пересылку
if dual_forward:
# Пересылаем оригинальное сообщение (на которое дали реплай)
await context.bot.copy_message(
chat_id=group_id,
from_chat_id=reply_to_message.chat.id,
message_id=reply_to_message.message_id
)
logger.info(f"Copied original message to {group_id}.")
# Пересылаем само новое сообщение
await context.bot.copy_message(
chat_id=group_id,
from_chat_id=original_message.chat.id,
message_id=original_message.message_id
)
logger.info(f"Copied new message to {group_id}.")
else:
# Пересылаем только само сообщение
await context.bot.copy_message(
chat_id=group_id,
from_chat_id=original_message.chat.id,
message_id=original_message.message_id
)
logger.info(f"Copied single message to {group_id}.")
await query.edit_message_text(f"✉️ Сообщение отправлено в группу {group_id}.")
await context.bot.send_message(chat_id=admin_chat_id, text=f"✉️ Сообщение доставлено в группу {group_id}.")
except Exception as e:
logger.error(f"Ошибка при пересылке сообщения в группу {group_id}: {e}")
await query.edit_message_text(f"⚠️ Ошибка при отправке сообщения в группу {group_id}.")
await context.bot.send_message(chat_id=admin_chat_id, text=f"⚠️ Ошибка при отправке сообщения в группу {group_id}.")
# Очищаем reply_to_message после успешной отправки
context.chat_data.pop("reply_to_message", None)
else:
await query.edit_message_text(f"⚠️ Группа {group_id} не найдена в списке пользовательских групп.")
def escape_markdownv2(text: str) -> str:
"""
Экранирует специальные символы для MarkdownV2.
"""
reserved_chars = r"_*[]()~`>#+-=|{}.!"
return "".join(f"\\{char}" if char in reserved_chars else char for char in text)
def main():
application = Application.builder().token(TOKEN).connection_pool_size(8).build()
# Регистрация обработчиков команд
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("on", on_command))
application.add_handler(CommandHandler("off", off_command))
application.add_handler(CommandHandler("all", all_command))
application.add_handler(CommandHandler("get_groups_id", get_groups_id))
# Обработчики управления группами
application.add_handler(CommandHandler("add_user_group",
lambda u,c: handle_group_management(u,c, USERS_FILE, True)))
application.add_handler(CommandHandler("del_user_group",
lambda u,c: handle_group_management(u,c, USERS_FILE, False)))
application.add_handler(CommandHandler("add_admin_group",
lambda u,c: handle_group_management(u,c, ADMINS_FILE, True)))
application.add_handler(CommandHandler("del_admin_group",
lambda u,c: handle_group_management(u,c, ADMINS_FILE, False)))
# Обработчик сообщений
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
# Обработчик кнопок
application.add_handler(CallbackQueryHandler(button_handler))
application.run_polling()
if __name__ == "__main__":
main()