-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbot.py
176 lines (143 loc) · 6.07 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import asyncio
import json
import sys
import traceback
from datetime import datetime
import grpc.aio
from dbots import *
from dbots.cmd import *
from motor.motor_asyncio import AsyncIOMotorClient
from xenon.backups import backup_pb2_grpc
from xenon.mutations import service_pb2_grpc as mutation_pb2_grpc
import config
from util import PremiumLevel
class RpcCollection:
def __init__(self):
options = [('grpc.max_message_length', 256 * 1024 * 1024)]
backups_channel = grpc.aio.insecure_channel(config.BACKUPS_SERVICES, options=options)
self.backups = backup_pb2_grpc.BackupServiceStub(backups_channel)
mutations_channel = grpc.aio.insecure_channel(config.MUTATIONS_SERVICE, options=options)
self.mutations = mutation_pb2_grpc.MutationServiceStub(mutations_channel)
class Xenon(InteractionBot):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.mongo = None
self._invite = None
self._support_invite = None
self.rpc = None
self.component(self._delete_button, name="delete")
@property
def db(self):
return self.mongo.xenon
async def _delete_button(self, ctx):
ctx.defer()
await ctx.delete_response()
async def on_command_error(self, ctx, e):
if isinstance(e, asyncio.CancelledError):
raise e
else:
tb = "".join(traceback.format_exception(type(e), e, e.__traceback__))
print("Command Error:\n", tb, file=sys.stderr)
error_id = unique_id()
name = None
args = None
if isinstance(ctx, CommandContext):
name = ctx.command.full_name
args = {arg.name: arg.value for arg in ctx.args}
elif isinstance(ctx, ComponentContext):
name = ctx.component.name
elif isinstance(ctx, ModalContext):
name = ctx.modal.name
await self.redis.setex(f"cmd:errors:{error_id}", 60 * 60 * 24, json.dumps({
"command": name,
"args": args,
"timestamp": datetime.utcnow().timestamp(),
"author": ctx.author.id,
"traceback": tb
}))
try:
await ctx.respond(**create_message(
"An unexpected error occurred. Please report this on the "
f"[Support Server](<{await ctx.bot.get_support_invite()}>).\n\n"
f"**Error Code**: `{error_id.upper()}`",
f=Format.ERROR
), ephemeral=True)
except rest.HTTPException:
pass
async def _set_user_entitlement_active(self, member):
await self.db.users.update_one(
{"_id": member.id},
{
"$set": {
"_id": member.id,
**member.to_dict()["user"],
"entitlement_active": True,
},
"$setOnInsert": {
"tier": 0,
"manual_tier": False
}
},
upsert=True
)
async def execute_component(self, component, payload, args):
premium_level = 0
user_doc = await self.db.users.find_one({"_id": payload.author.id})
if user_doc is not None:
premium_level = user_doc.get("tier", 0)
payload.premium_level = PremiumLevel(premium_level)
return await super().execute_component(component, payload, args)
async def execute_command(self, command, payload, remaining_options):
await self.redis.hincrby("cmd:commands", command.full_name, 1)
blacklist = await self.db.blacklist.find_one({"_id": payload.author.id})
if blacklist is None and payload.guild_id:
blacklist = await self.db.blacklist.find_one({"_id": payload.guild_id})
if blacklist is not None and command.full_name not in {"opt out", "opt in"}:
if blacklist.get("guild"):
return InteractionResponse.message(**create_message(
"This server is **no longer allowed to use this bot** for the following reason:"
f"```{blacklist['reason']}```",
f=Format.ERROR
), ephemeral=True)
else:
return InteractionResponse.message(**create_message(
"You are **no longer allowed to use this bot** for the following reason:"
f"```{blacklist['reason']}```",
f=Format.ERROR
), ephemeral=True)
if payload.entitlement_sku_ids:
await self._set_user_entitlement_active(payload.author)
premium_level = 0
user_doc = await self.db.users.find_one({"_id": payload.author.id})
if user_doc is not None:
premium_level = user_doc.get("tier", 0)
payload.premium_level = PremiumLevel(premium_level)
return await super().execute_command(command, payload, remaining_options)
async def get_invite(self):
if self._invite:
return self._invite
invite = "https://xenon.bot/invite"
while "discord.com" not in invite:
async with self.session.get(invite, allow_redirects=False) as resp:
if 400 > resp.status >= 300:
invite = resp.headers["Location"]
else:
break
self._invite = invite
return invite
async def get_support_invite(self):
if self._support_invite:
return self._support_invite
invite = "https://xenon.bot/discord"
while "discord.com" not in invite:
async with self.session.get(invite, allow_redirects=False) as resp:
if 400 > resp.status >= 300:
invite = resp.headers["Location"]
else:
break
self._support_invite = invite
return invite
async def setup(self, redis_url="redis://localhost"):
self.rpc = RpcCollection()
self.mongo = AsyncIOMotorClient(config.MONGO_URL)
await super().setup(redis_url)