-
Notifications
You must be signed in to change notification settings - Fork 0
/
bot.py
148 lines (111 loc) · 4.89 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python
import logging
from typing import Dict
from telegram import (Bot, ChatMember, ChatPermissions, InlineKeyboardButton,
InlineKeyboardMarkup, Message, ParseMode, Update, User)
from telegram.ext import (CallbackContext, CallbackQueryHandler,
CommandHandler, Filters, Job, MessageHandler,
Updater)
NEEDED_PERMISSIONS_TO_OPERATE = ['can_restrict_members', 'can_delete_messages']
TIMEOUT_SEC = 60
DELETE_SEC = 5
RESTRICT_ALL = ChatPermissions(
can_send_messages=False,
can_send_media_messages=False,
can_send_polls=False,
can_send_other_messages=False,
can_add_web_page_previews=False,
can_change_info=False,
can_invite_users=False,
can_pin_messages=False)
ALLOW_ALL = ChatPermissions(
can_send_messages=True,
can_send_media_messages=True,
can_send_polls=True,
can_send_other_messages=True,
can_add_web_page_previews=True,
can_change_info=True,
can_invite_users=True,
can_pin_messages=True)
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
)
logger = logging.getLogger(__name__)
job_queue = None
jobs: Dict[int, Dict[int, Job]] = dict()
def send_answer(message: Message, user: User, answer):
message.edit_text(
text=f"{user.mention_markdown_v2()} \(user id: `{user.id}`\) treated as *{answer}*", parse_mode=ParseMode.MARKDOWN_V2)
job_queue.run_once(lambda c: message.delete(), DELETE_SEC)
def action_bot(message: Message, user: User):
send_answer(message, user, 'Bot')
message.bot.kick_chat_member(message.chat_id, user.id)
message.bot.unban_chat_member(message.chat_id, user.id)
def action_human(message: Message, user: User):
send_answer(message, user, 'Human')
message.bot.restrict_chat_member(message.chat_id, user.id, ALLOW_ALL)
def start(update: Update, context: CallbackContext) -> None:
global jobs
global job_queue
bot: Bot = update.message.bot
bot_in_group: ChatMember = bot.get_chat_member(
update.effective_chat.id, bot.id)
if not all([getattr(bot_in_group, perm) for perm in NEEDED_PERMISSIONS_TO_OPERATE]):
perms = {perm.replace("can_", "").replace("_", " "): getattr(
bot_in_group, perm) for perm in NEEDED_PERMISSIONS_TO_OPERATE}
formated_permissions = '\n'.join(
f'{"Y" if can else "X"} {perm}' for perm, can in perms.items())
update.effective_chat.send_message(
f'Please give me admin and make sure I have the folowing permissions so I can work properly :)\n\n{formated_permissions}')
return
else:
update.message.delete()
for member in update.message.new_chat_members:
member: User
if not member.is_bot:
keyboard = [
[
InlineKeyboardButton(
"I'm a bot!", callback_data=f'{member.id},bot'),
InlineKeyboardButton(
"I'm a human!", callback_data=f'{member.id},human'),
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
msg = update.effective_chat.send_message(f'Welcome {member.mention_markdown_v2()} \(user id: `{member.id}`\) to our group\!\n\n'
f'Please state if you are *human* or *bot*\n\n'
f'hurry up, you only have {TIMEOUT_SEC} seconds',
reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN_V2)
job = job_queue.run_once(
lambda c: action_bot(msg, member), TIMEOUT_SEC)
jobs[update.effective_chat.id] = {member.id: job}
bot.restrict_chat_member(
update.effective_chat.id, member.id, RESTRICT_ALL)
def button(update: Update, context: CallbackContext) -> None:
global jobs
query = update.callback_query
member_id, answer = query.data.split(',')
member_id = int(member_id)
if update.effective_user.id == member_id:
query.answer()
jobs[update.effective_chat.id][update.effective_user.id].schedule_removal()
if answer == 'human':
action_human(query.message, update.effective_user)
else:
action_bot(query.message, update.effective_user)
def main(token):
global job_queue
updater = Updater(token, use_context=True)
job_queue = updater.job_queue
updater.dispatcher.add_handler(CallbackQueryHandler(button))
updater.dispatcher.add_handler(MessageHandler(
Filters.status_update.new_chat_members, start))
# Start the Bot
updater.start_polling()
# Run the bot until the user presses Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT
updater.idle()
if __name__ == '__main__':
with open('TOKEN') as f:
TOKEN = f.read().strip()
main(TOKEN)