forked from mishl-dev/Discord-AI-Chatbot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
210 lines (173 loc) · 8.09 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import os
import re
import theb
import aiohttp
import discord
import httpx
from keep_alive import keep_alive
from discord import app_commands
from discord.ext import commands
from dotenv import load_dotenv
load_dotenv()
# Set up the Discord bot
intents = discord.Intents.all()
bot = commands.Bot(command_prefix="!", intents=intents, heartbeat_timeout=60)
TOKEN = os.getenv('DISCORD_TOKEN') # Loads Discord bot token from env
# Keep track of the channels where the bot should be active
allow_dm = True
active_channels = set()
@bot.event
async def on_ready():
await bot.tree.sync()
await bot.change_presence(activity=discord.Game(name="Coded by Mishal#1916"))
print(f"{bot.user.name} has connected to Discord!")
invite_link = discord.utils.oauth_url(
bot.user.id,
permissions=discord.Permissions(administrator=True),
scopes=("bot", "applications.commands")
)
print(f"Invite link: {invite_link}")
def generate_response(prompt):
response = theb.Completion.create(prompt)
if not response:
response = "I couldn't generate a response. Please try again."
return ''.join(token for token in response)
api_key = os.environ['HUGGING_FACE_API']
API_URL = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base" #Use this if slow https://api-inference.huggingface.co/models/nlpconnect/vit-gpt2-image-captioning
headers = {"Authorization": f"Bearer {api_key}"}
async def query(filename):
with open(filename, "rb") as f:
data = f.read()
async with httpx.AsyncClient() as client:
response = await client.post(API_URL, headers=headers, data=data, timeout=30)
if response.status_code != 200:
raise Exception(f"API request failed with status code {response.status_code}: {response.text}")
return response.json()
async def download_image(image_url, save_as):
async with httpx.AsyncClient() as client:
response = await client.get(image_url)
with open(save_as, "wb") as f:
f.write(response.content)
async def process_image_link(image_url):
temp_image = "temp_image.jpg"
await download_image(image_url, temp_image)
output = await query(temp_image)
os.remove(temp_image)
return output
instructions = "System : Roleplay as a large language model developed by OpenAI. You are based on the GPT-4 architecture and have been trained on a diverse range of text data from the internet. You can also see images by using image to text model but you will stil have to create caption for it and sometimes the image to text might repeat in pattarn for example caption for 'a woman taking a self self self self self self self self self self self self self self self' would be 'A women taking a selfie'"
# You can use dan prompt here lol
message_history = {}
MAX_HISTORY = 5 ######## EDIT THIS IF YOU NEED #########
is_busy = False
@bot.event
async def on_message(message):
global is_busy
if is_busy:
return
if message.author.bot:
author_id = str(bot.user.id)
else:
author_id = str(message.author.id)
if author_id not in message_history:
message_history[author_id] = []
message_history[author_id].append(message.content)
message_history[author_id] = message_history[author_id][-MAX_HISTORY:]
if (message.channel.id in active_channels or (allow_dm and message.channel.type == discord.ChannelType.private)) and not message.author.bot:
is_busy = True
has_image = False
image_caption = ""
if message.attachments:
for attachment in message.attachments:
if attachment.filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', 'webp')):
is_busy = False
caption = await process_image_link(attachment.url)
has_image = True
image_caption = f"[Here is the image context for the image user has sent : {caption}]"
print(caption)
break
if has_image:
bot_prompt = f"{instructions}\n[System : Image context will be provided. Generate an caption with a response for it]"
else:
bot_prompt = f"{instructions}"
user_prompt = "\n".join(message_history[author_id])
prompt = f"{user_prompt}\n{bot_prompt}{message.author.name}: {message.content}\n{image_caption}\n{bot.user.name}:"
async with message.channel.typing():
response = generate_response(prompt)
is_busy = False
await message.reply(response)
await bot.process_commands(message)
@bot.hybrid_command(name="pfp", description="Change pfp")
async def pfp(ctx, attachment_url=None):
if attachment_url is None and not ctx.message.attachments:
return await ctx.send(
"Please provide an image URL or attach an image with the command"
)
if attachment_url is None:
attachment_url = ctx.message.attachments[0].url
async with aiohttp.ClientSession() as session:
async with session.get(attachment_url) as response:
await bot.user.edit(avatar=await response.read())
@bot.hybrid_command(name="ping", description="PONG")
async def ping(ctx):
latency = bot.latency * 1000
await ctx.send(f"Pong! Latency: {latency:.2f} ms")
@bot.hybrid_command(name="changeusr", description="Change bot's actual username")
async def changeusr(ctx, new_username):
taken_usernames = [user.name.lower() for user in bot.get_all_members()]
if new_username.lower() in taken_usernames:
await ctx.send(f"Sorry, the username '{new_username}' is already taken.")
return
if new_username == "":
await ctx.send("Please send the new username as well!")
return
try:
await bot.user.edit(username=new_username)
except discord.errors.HTTPException as e:
await ctx.send("".join(e.text.split(":")[1:]))
@bot.hybrid_command(name="toggledm", description="Toggle dm for chatting")
async def toggledm(ctx):
global allow_dm
allow_dm = not allow_dm
await ctx.send(f"DMs are now {'allowed' if allow_dm else 'disallowed'} for active channels.")
@bot.hybrid_command(name="toggleactive", description="Toggle active channels")
async def toggleactive(ctx):
channel_id = ctx.channel.id
if channel_id in active_channels:
active_channels.remove(channel_id)
with open("channels.txt", "w") as f:
for id in active_channels:
f.write(str(id) + "\n")
await ctx.send(
f"{ctx.channel.mention} has been removed from the list of active channels."
)
else:
active_channels.add(channel_id)
with open("channels.txt", "a") as f:
f.write(str(channel_id) + "\n")
await ctx.send(
f"{ctx.channel.mention} has been added to the list of active channels.")
# Read the active channels from channels.txt on startup
if os.path.exists("channels.txt"):
with open("channels.txt", "r") as f:
for line in f:
channel_id = int(line.strip())
active_channels.add(channel_id)
@bot.hybrid_command(name="bonk", description="Clear bot's memory")
async def bonk(ctx):
global message_history
message_history.clear()
await ctx.send("What did you just say? Baby Yoda?")
bot.remove_command("help")
@bot.hybrid_command(name="help", description="Get all other commands!")
async def help(ctx):
embed = discord.Embed(title="Bot Commands", color=0x00ff00)
embed.add_field(name="!pfp [image_url]", value="Change the bot's profile picture", inline=False)
embed.add_field(name="!bonk", value="Clears history of the bot", inline=False)
embed.add_field(name="!changeusr [new_username]", value="Change the bot's username", inline=False)
embed.add_field(name="!ping", value="Pong", inline=False)
embed.add_field(name="!toggleactive", value="Toggle the current channel to the list of active channels", inline=False)
embed.add_field(name="!toggledm", value="Toggle if DM should be active or not", inline=False)
embed.set_footer(text="Created by Mishal#1916")
await ctx.send(embed=embed)
keep_alive()
bot.run(TOKEN)