-
-
Notifications
You must be signed in to change notification settings - Fork 398
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
108 additions
and
34 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,40 +1,114 @@ | ||
import random | ||
import time | ||
import requests | ||
import asyncio,base64,mimetypes,os | ||
from pyrogram import filters, types as t | ||
from lexica import AsyncClient | ||
from DAXXMUSIC import app | ||
from config import BOT_USERNAME | ||
from lexica.constants import languageModels | ||
|
||
from pyrogram.enums import ChatAction, ParseMode | ||
from pyrogram import filters | ||
|
||
@app.on_message(filters.command(["chatgpt","ai","ask","gpt","solve"], prefixes=["+", ".", "/", "-", "", "$","#","&"])) | ||
async def chat_gpt(bot, message): | ||
|
||
async def ChatCompletion(prompt,model) -> tuple | str : | ||
try: | ||
start_time = time.time() | ||
await bot.send_chat_action(message.chat.id, ChatAction.TYPING) | ||
modelInfo = getattr(languageModels,model) | ||
client = AsyncClient() | ||
output = await client.ChatCompletion(prompt,modelInfo) | ||
if model == "bard": | ||
return output['content'], output['images'] | ||
return output['content'] | ||
except Exception as E: | ||
raise Exception(f"API error: {E}",) | ||
|
||
if len(message.command) < 2: | ||
await message.reply_text( | ||
"Example:\n\n/chatgpt Where is TajMahal?" | ||
) | ||
|
||
|
||
async def geminiVision(prompt,model,images) -> tuple | str : | ||
imageInfo = [] | ||
for image in images: | ||
with open(image,"rb") as imageFile: | ||
data = base64.b64encode(imageFile.read()).decode("utf-8") | ||
mime_type,_= mimetypes.guess_type(image) | ||
imageInfo.append({ | ||
"data": data, | ||
"mime_type": mime_type | ||
}) | ||
os.remove(image) | ||
payload = { | ||
"images":imageInfo | ||
} | ||
modelInfo = getattr(languageModels,model) | ||
client = AsyncClient() | ||
output = await client.ChatCompletion(prompt,modelInfo,json=payload) | ||
return output['content']['parts'][0]['text'] | ||
|
||
|
||
|
||
def getMedia(message): | ||
"""Extract Media""" | ||
media = message.media if message.media else message.reply_to_message.media if message.reply_to_message else None | ||
if message.media: | ||
if message.photo: | ||
media = message.photo | ||
elif message.document and message.document.mime_type in ['image/png','image/jpg','image/jpeg'] and message.document.file_size < 5242880: | ||
media = message.document | ||
else: | ||
a = message.text.split(' ', 1)[1] | ||
response = requests.get(f'https://chatgpt.apinepdev.workers.dev/?question={a}') | ||
|
||
try: | ||
# Check if "results" key is present in the JSON response | ||
if "answer" in response.json(): | ||
x = response.json()["answer"] | ||
end_time = time.time() | ||
telegram_ping = str(round((end_time - start_time) * 1000, 3)) + " ms" | ||
await message.reply_text( | ||
f" {x} α΄Ι΄sα΄‘α΄ΚΙͺΙ΄Ι’ ΚΚ β @NexikoBot", | ||
parse_mode=ParseMode.MARKDOWN | ||
) | ||
else: | ||
await message.reply_text("No 'results' key found in the response.") | ||
except KeyError: | ||
# Handle any other KeyError that might occur | ||
await message.reply_text("Error accessing the response.") | ||
except Exception as e: | ||
await message.reply_text(f"**Ñ´β‘Γβ¬Γβ¬Γ‘Β΄ΒΓβ¬: {e} ") | ||
media = None | ||
elif message.reply_to_message and message.reply_to_message.media: | ||
if message.reply_to_message.photo: | ||
media = message.reply_to_message.photo | ||
elif message.reply_to_message.document and message.reply_to_message.document.mime_type in ['image/png','image/jpg','image/jpeg'] and message.reply_to_message.document.file_size < 5242880: | ||
media = message.reply_to_message.document | ||
else: | ||
media = None | ||
else: | ||
media = None | ||
return media | ||
|
||
|
||
def getText(message): | ||
"""Extract Text From Commands""" | ||
text_to_return = message.text | ||
if message.text is None: | ||
return None | ||
if " " in text_to_return: | ||
try: | ||
return message.text.split(None, 1)[1] | ||
except IndexError: | ||
return None | ||
else: | ||
return None | ||
|
||
|
||
|
||
|
||
@app.on_message(filters.command(["gpt","bard","llama","mistral","palm","gemini"])) | ||
async def chatbots(_,m: t.Message): | ||
prompt = getText(m) | ||
media = getMedia(m) | ||
if media is not None: | ||
return await askAboutImage(_,m,[media],prompt) | ||
if prompt is None: | ||
return await m.reply_text("Hello, How can i assist you today?") | ||
model = m.command[0].lower() | ||
output = await ChatCompletion(prompt,model) | ||
if model == "bard": | ||
output, images = output | ||
if len(images) == 0: | ||
return await m.reply_text(output) | ||
media = [] | ||
for i in images: | ||
media.append(t.InputMediaPhoto(i)) | ||
media[0] = t.InputMediaPhoto(images[0],caption=output) | ||
await _.send_media_group( | ||
m.chat.id, | ||
media, | ||
reply_to_message_id=m.id | ||
) | ||
return | ||
await m.reply_text(output['parts'][0]['text'] if model=="gemini" else output) | ||
|
||
|
||
async def askAboutImage(_,m:t.Message,mediaFiles: list,prompt:str): | ||
images = [] | ||
for media in mediaFiles: | ||
image = await _.download_media(media.file_id,file_name=f'./downloads/{m.from_user.id}_ask.jpg') | ||
images.append(image) | ||
output = await geminiVision(prompt if prompt else "whats this?","geminiVision",images) | ||
await m.reply_text(output) |