From 1ba09de2b9dbac3a1506e88fc6ce5a1c0cb16ccb Mon Sep 17 00:00:00 2001 From: Karim Iskakov Date: Wed, 15 Mar 2023 19:20:43 +0300 Subject: [PATCH] Add message streaming * Add message streaming * Update README.md --- README.md | 37 ++++---- bot/bot.py | 181 +++++++++++++++++++++++++------------- bot/config.py | 1 + bot/openai_utils.py | 78 ++++++++++++++++ config/config.example.yml | 2 + requirements.txt | 3 +- 6 files changed, 227 insertions(+), 75 deletions(-) diff --git a/README.md b/README.md index b066e6fc..ab20f69f 100644 --- a/README.md +++ b/README.md @@ -20,14 +20,10 @@ This repo is ChatGPT re-created with GPT-3.5 LLM as Telegram Bot. **And it works You can deploy your own bot, or use mine: [@chatgpt_karfly_bot](https://t.me/chatgpt_karfly_bot) -## News -- *9 Mar 2023*: Now you can easily create your own Chat Modes by editing `config/chat_modes.yml` -- *8 Mar 2023*: Added voice message recognition with [OpenAI Whisper API](https://openai.com/blog/introducing-chatgpt-and-whisper-apis). Record a voice message and ChatGPT will answer you! -- *2 Mar 2023*: Added support of [ChatGPT API](https://platform.openai.com/docs/guides/chat/introduction). It's enabled by default and can be disabled with `use_chatgpt_api` option in config. Don't forget to **rebuild** you docker image (`--build`). - ## Features - Low latency replies (it usually takes about 3-5 seconds) - No request limits +- Message streaming (watch demo) - Voice message recognition - Code highlighting - Special chat modes: 👩🏼‍🎓 Assistant, 👩🏼‍💻 Code Assistant, 📝 Text Improver and 🎬 Movie Expert. You can easily create your own chat modes by editing `config/chat_modes.yml` @@ -35,6 +31,18 @@ You can deploy your own bot, or use mine: [@chatgpt_karfly_bot](https://t.me/cha - List of allowed Telegram users - Track $ balance spent on OpenAI API +

+ +

+ +--- + +## News +- *15 Mar 2023*: Added message streaming. Now you don't have to wait until the whole message is ready, it's streamed to Telegram part-by-part (watch demo) +- *9 Mar 2023*: Now you can easily create your own Chat Modes by editing `config/chat_modes.yml` +- *8 Mar 2023*: Added voice message recognition with [OpenAI Whisper API](https://openai.com/blog/introducing-chatgpt-and-whisper-apis). Record a voice message and ChatGPT will answer you! +- *2 Mar 2023*: Added support of [ChatGPT API](https://platform.openai.com/docs/guides/chat/introduction). It's enabled by default and can be disabled with `use_chatgpt_api` option in config. Don't forget to **rebuild** you docker image (`--build`). + ## Bot commands - `/retry` – Regenerate last bot answer - `/new` – Start new dialog @@ -48,16 +56,15 @@ You can deploy your own bot, or use mine: [@chatgpt_karfly_bot](https://t.me/cha 2. Get your Telegram bot token from [@BotFather](https://t.me/BotFather) 3. Edit `config/config.example.yml` to set your tokens and run 2 commands below (*if you're advanced user, you can also edit* `config/config.example.env`): -```bash -mv config/config.example.yml config/config.yml -mv config/config.example.env config/config.env -``` - -🔥 And now **run**: - -```bash -docker-compose --env-file config/config.env up --build -``` + ```bash + mv config/config.example.yml config/config.yml + mv config/config.example.env config/config.env + ``` + +4. 🔥 And now **run**: + ```bash + docker-compose --env-file config/config.env up --build + ``` ## ❤️ Top donations You can be in this list: diff --git a/bot/bot.py b/bot/bot.py index 75dcc64f..37ecb853 100644 --- a/bot/bot.py +++ b/bot/bot.py @@ -1,5 +1,6 @@ import os import logging +import asyncio import traceback import html import json @@ -23,6 +24,7 @@ CommandHandler, MessageHandler, CallbackQueryHandler, + AIORateLimiter, filters ) from telegram.constants import ParseMode, ChatAction @@ -35,6 +37,7 @@ # setup db = database.Database() logger = logging.getLogger(__name__) +user_semaphores = {} HELP_MESSAGE = """Commands: ⚪ /retry – Regenerate last bot answer @@ -64,6 +67,9 @@ async def register_user_if_not_exists(update: Update, context: CallbackContext, if db.get_user_attribute(user.id, "current_dialog_id") is None: db.start_new_dialog(user.id) + if user.id not in user_semaphores: + user_semaphores[user.id] = asyncio.Semaphore(1) + async def start_handle(update: Update, context: CallbackContext): await register_user_if_not_exists(update, context, update.message.from_user) @@ -89,6 +95,8 @@ async def help_handle(update: Update, context: CallbackContext): async def retry_handle(update: Update, context: CallbackContext): await register_user_if_not_exists(update, context, update.message.from_user) + if await is_previous_message_not_answered_yet(update, context): return + user_id = update.message.from_user.id db.set_user_attribute(user_id, "last_interaction", datetime.now()) @@ -110,72 +118,127 @@ async def message_handle(update: Update, context: CallbackContext, message=None, return await register_user_if_not_exists(update, context, update.message.from_user) + if await is_previous_message_not_answered_yet(update, context): return + user_id = update.message.from_user.id chat_mode = db.get_user_attribute(user_id, "current_chat_mode") + + async with user_semaphores[user_id]: + # new dialog timeout + if use_new_dialog_timeout: + if (datetime.now() - db.get_user_attribute(user_id, "last_interaction")).seconds > config.new_dialog_timeout and len(db.get_dialog_messages(user_id)) > 0: + db.start_new_dialog(user_id) + await update.message.reply_text(f"Starting new dialog due to timeout ({openai_utils.CHAT_MODES[chat_mode]['name']} mode) ✅", parse_mode=ParseMode.HTML) + db.set_user_attribute(user_id, "last_interaction", datetime.now()) - # new dialog timeout - if use_new_dialog_timeout: - if (datetime.now() - db.get_user_attribute(user_id, "last_interaction")).seconds > config.new_dialog_timeout and len(db.get_dialog_messages(user_id)) > 0: - db.start_new_dialog(user_id) - await update.message.reply_text(f"Starting new dialog due to timeout ({openai_utils.CHAT_MODES[chat_mode]['name']} mode) ✅", parse_mode=ParseMode.HTML) - db.set_user_attribute(user_id, "last_interaction", datetime.now()) - - # send typing action - await update.message.chat.send_action(action="typing") - - try: - message = message or update.message.text - - dialog_messages = db.get_dialog_messages(user_id, dialog_id=None) - chat_mode = db.get_user_attribute(user_id, "current_chat_mode") - - chatgpt_instance = openai_utils.ChatGPT(use_chatgpt_api=config.use_chatgpt_api) - answer, n_used_tokens, n_first_dialog_messages_removed = await chatgpt_instance.send_message( - message, - dialog_messages=dialog_messages, - chat_mode=chat_mode - ) - - # update user data - new_dialog_message = {"user": message, "bot": answer, "date": datetime.now()} - db.set_dialog_messages( - user_id, - db.get_dialog_messages(user_id, dialog_id=None) + [new_dialog_message], - dialog_id=None - ) - - db.set_user_attribute(user_id, "n_used_tokens", n_used_tokens + db.get_user_attribute(user_id, "n_used_tokens")) - - except Exception as e: - error_text = f"Something went wrong during completion. Reason: {e}" - logger.error(error_text) - await update.message.reply_text(error_text) - return - - # send message if some messages were removed from the context - if n_first_dialog_messages_removed > 0: - if n_first_dialog_messages_removed == 1: - text = "✍️ Note: Your current dialog is too long, so your first message was removed from the context.\n Send /new command to start new dialog" - else: - text = f"✍️ Note: Your current dialog is too long, so {n_first_dialog_messages_removed} first messages were removed from the context.\n Send /new command to start new dialog" - await update.message.reply_text(text, parse_mode=ParseMode.HTML) + # send typing action + await update.message.chat.send_action(action="typing") - # split answer into multiple messages due to 4096 character limit - for answer_chunk in split_text_into_chunks(answer, 4000): try: + message = message or update.message.text + + dialog_messages = db.get_dialog_messages(user_id, dialog_id=None) parse_mode = { "html": ParseMode.HTML, "markdown": ParseMode.MARKDOWN }[openai_utils.CHAT_MODES[chat_mode]["parse_mode"]] - - await update.message.reply_text(answer_chunk, parse_mode=parse_mode) - except telegram.error.BadRequest: - # answer has invalid characters, so we send it without parse_mode - await update.message.reply_text(answer_chunk) + + chatgpt_instance = openai_utils.ChatGPT(use_chatgpt_api=config.use_chatgpt_api) + if config.enable_message_streaming: + gen = chatgpt_instance.send_message_stream(message, dialog_messages=dialog_messages, chat_mode=chat_mode) + else: + answer, n_used_tokens, n_first_dialog_messages_removed = await chatgpt_instance.send_message( + message, + dialog_messages=dialog_messages, + chat_mode=chat_mode + ) + + async def fake_gen(): + yield "finished", answer, n_used_tokens, n_first_dialog_messages_removed + + gen = fake_gen() + + # send message to user + prev_answer = "" + i = -1 + async for gen_item in gen: + i += 1 + + status = gen_item[0] + if status == "not_finished": + status, answer = gen_item + elif status == "finished": + status, answer, n_used_tokens, n_first_dialog_messages_removed = gen_item + else: + raise ValueError(f"Streaming status {status} is unknown") + + answer = answer[:4096] # telegram message limit + if i == 0: # send first message (then it'll be edited if message streaming is enabled) + try: + sent_message = await update.message.reply_text(answer, parse_mode=parse_mode) + except telegram.error.BadRequest as e: + if str(e).startswith("Message must be non-empty"): # first answer chunk from openai was empty + i = -1 # try again to send first message + continue + else: + sent_message = await update.message.reply_text(answer) + else: # edit sent message + # update only when 100 new symbols are ready + if abs(len(answer) - len(prev_answer)) < 100 and status != "finished": + continue + + try: + await context.bot.edit_message_text(answer, chat_id=sent_message.chat_id, message_id=sent_message.message_id, parse_mode=parse_mode) + except telegram.error.BadRequest as e: + if str(e).startswith("Message is not modified"): + continue + else: + await context.bot.edit_message_text(answer, chat_id=sent_message.chat_id, message_id=sent_message.message_id) + + await asyncio.sleep(0.01) # wait a bit to avoid flooding + + prev_answer = answer + + # update user data + new_dialog_message = {"user": message, "bot": answer, "date": datetime.now()} + db.set_dialog_messages( + user_id, + db.get_dialog_messages(user_id, dialog_id=None) + [new_dialog_message], + dialog_id=None + ) + + db.set_user_attribute(user_id, "n_used_tokens", n_used_tokens + db.get_user_attribute(user_id, "n_used_tokens")) + except Exception as e: + error_text = f"Something went wrong during completion. Reason: {e}" + logger.error(error_text) + await update.message.reply_text(error_text) + return + + # send message if some messages were removed from the context + if n_first_dialog_messages_removed > 0: + if n_first_dialog_messages_removed == 1: + text = "✍️ Note: Your current dialog is too long, so your first message was removed from the context.\n Send /new command to start new dialog" + else: + text = f"✍️ Note: Your current dialog is too long, so {n_first_dialog_messages_removed} first messages were removed from the context.\n Send /new command to start new dialog" + await update.message.reply_text(text, parse_mode=ParseMode.HTML) + + +async def is_previous_message_not_answered_yet(update: Update, context: CallbackContext): + await register_user_if_not_exists(update, context, update.message.from_user) + + user_id = update.message.from_user.id + if user_semaphores[user_id].locked(): + text = "⏳ Please wait for a reply to the previous message" + await update.message.reply_text(text, reply_to_message_id=update.message.id, parse_mode=ParseMode.HTML) + return True + else: + return False async def voice_message_handle(update: Update, context: CallbackContext): await register_user_if_not_exists(update, context, update.message.from_user) + if await is_previous_message_not_answered_yet(update, context): return + user_id = update.message.from_user.id db.set_user_attribute(user_id, "last_interaction", datetime.now()) @@ -212,6 +275,8 @@ async def voice_message_handle(update: Update, context: CallbackContext): async def new_dialog_handle(update: Update, context: CallbackContext): await register_user_if_not_exists(update, context, update.message.from_user) + if await is_previous_message_not_answered_yet(update, context): return + user_id = update.message.from_user.id db.set_user_attribute(user_id, "last_interaction", datetime.now()) @@ -224,6 +289,8 @@ async def new_dialog_handle(update: Update, context: CallbackContext): async def show_chat_modes_handle(update: Update, context: CallbackContext): await register_user_if_not_exists(update, context, update.message.from_user) + if await is_previous_message_not_answered_yet(update, context): return + user_id = update.message.from_user.id db.set_user_attribute(user_id, "last_interaction", datetime.now()) @@ -247,11 +314,6 @@ async def set_chat_mode_handle(update: Update, context: CallbackContext): db.set_user_attribute(user_id, "current_chat_mode", chat_mode) db.start_new_dialog(user_id) - await query.edit_message_text( - f"{openai_utils.CHAT_MODES[chat_mode]['name']} chat mode is set", - parse_mode=ParseMode.HTML - ) - await query.edit_message_text(f"{openai_utils.CHAT_MODES[chat_mode]['welcome_message']}", parse_mode=ParseMode.HTML) @@ -297,7 +359,7 @@ async def error_handle(update: Update, context: CallbackContext) -> None: ) # split text into multiple messages due to 4096 character limit - for message_chunk in split_text_into_chunks(message, 4000): + for message_chunk in split_text_into_chunks(message, 4096): try: await context.bot.send_message(update.effective_chat.id, message_chunk, parse_mode=ParseMode.HTML) except telegram.error.BadRequest: @@ -320,6 +382,7 @@ def run_bot() -> None: ApplicationBuilder() .token(config.telegram_token) .concurrent_updates(True) + .rate_limiter(AIORateLimiter(max_retries=5)) .post_init(post_init) .build() ) diff --git a/bot/config.py b/bot/config.py index 41ba3639..2cc96aa4 100644 --- a/bot/config.py +++ b/bot/config.py @@ -17,6 +17,7 @@ use_chatgpt_api = config_yaml.get("use_chatgpt_api", True) allowed_telegram_usernames = config_yaml["allowed_telegram_usernames"] new_dialog_timeout = config_yaml["new_dialog_timeout"] +enable_message_streaming = config_yaml.get("enable_message_streaming", True) mongodb_uri = f"mongodb://mongo:{config_env['MONGODB_PORT']}" # chat_modes diff --git a/bot/openai_utils.py b/bot/openai_utils.py index 2b94b6b5..032f90c5 100644 --- a/bot/openai_utils.py +++ b/bot/openai_utils.py @@ -1,5 +1,6 @@ import config +import tiktoken import openai openai.api_key = config.openai_api_key @@ -58,6 +59,60 @@ async def send_message(self, message, dialog_messages=[], chat_mode="assistant") return answer, n_used_tokens, n_first_dialog_messages_removed + async def send_message_stream(self, message, dialog_messages=[], chat_mode="assistant"): + if chat_mode not in CHAT_MODES.keys(): + raise ValueError(f"Chat mode {chat_mode} is not supported") + + n_dialog_messages_before = len(dialog_messages) + answer = None + while answer is None: + try: + if self.use_chatgpt_api: + messages = self._generate_prompt_messages_for_chatgpt_api(message, dialog_messages, chat_mode) + r_gen = await openai.ChatCompletion.acreate( + model="gpt-3.5-turbo", + messages=messages, + stream=True, + **OPENAI_COMPLETION_OPTIONS + ) + + answer = "" + async for r_item in r_gen: + delta = r_item.choices[0].delta + if "content" in delta: + answer += delta.content + yield "not_finished", answer + + n_used_tokens = self._count_tokens_for_chatgpt(messages, answer, model="gpt-3.5-turbo") + else: + prompt = self._generate_prompt(message, dialog_messages, chat_mode) + r_gen = await openai.Completion.acreate( + engine="text-davinci-003", + prompt=prompt, + stream=True, + **OPENAI_COMPLETION_OPTIONS + ) + + answer = "" + async for r_item in r_gen: + answer += r_item.choices[0].text + yield "not_finished", answer + + n_used_tokens = self._count_tokens_for_gpt(prompt, answer, model="text-davinci-003") + + answer = self._postprocess_answer(answer) + + except openai.error.InvalidRequestError as e: # too many tokens + if len(dialog_messages) == 0: + raise ValueError("Dialog messages is reduced to zero, but still has too many tokens to make completion") from e + + # forget first message in dialog_messages + dialog_messages = dialog_messages[1:] + + n_first_dialog_messages_removed = n_dialog_messages_before - len(dialog_messages) + + yield "finished", answer, n_used_tokens, n_first_dialog_messages_removed # sending final answer + def _generate_prompt(self, message, dialog_messages, chat_mode): prompt = CHAT_MODES[chat_mode]["prompt_start"] prompt += "\n\n" @@ -90,6 +145,29 @@ def _postprocess_answer(self, answer): answer = answer.strip() return answer + def _count_tokens_for_chatgpt(self, prompt_messages, answer, model="gpt-3.5-turbo"): + prompt_messages += [{"role": "assistant", "content": answer}] + + encoding = tiktoken.encoding_for_model(model) + n_tokens = 0 + for message in prompt_messages: + n_tokens += 4 # every message follows "{role/name}\n{content}\n" + for key, value in message.items(): + if key == "role": + n_tokens += 1 + elif key == "content": + n_tokens += len(encoding.encode(value)) + else: + raise ValueError(f"Unknown key in message: {key}") + + n_tokens -= 1 # remove 1 "" token + return n_tokens + + def _count_tokens_for_gpt(self, prompt, answer, model="text-davinci-003"): + encoding = tiktoken.encoding_for_model(model) + n_tokens = len(encoding.encode(prompt)) + len(encoding.encode(answer)) + 1 + return n_tokens + async def transcribe_audio(audio_file): r = await openai.Audio.atranscribe("whisper-1", audio_file) diff --git a/config/config.example.yml b/config/config.example.yml index 08b17b74..fe6f1898 100644 --- a/config/config.example.yml +++ b/config/config.example.yml @@ -3,7 +3,9 @@ openai_api_key: "" use_chatgpt_api: true allowed_telegram_usernames: [] # if empty, the bot is available to anyone. pass a username string to allow it and/or user ids as integers new_dialog_timeout: 600 # new dialog starts after timeout (in seconds) +enable_message_streaming: true # if set, messages will be shown to user word-by-word +# prices chatgpt_price_per_1000_tokens: 0.002 gpt_price_per_1000_tokens: 0.02 whisper_price_per_1_min: 0.006 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index c6d4c3d5..354063b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ -python-telegram-bot==20.1 +python-telegram-bot[rate-limiter]==20.1 openai>=0.27.0 +tiktoken>=0.3.0 PyYAML==6.0 pymongo==4.3.3 python-dotenv==0.21.0