Skip to content

Commit

Permalink
Pyrogram2 (#291)
Browse files Browse the repository at this point in the history
* upgrade to pyrogram 2.x

#288

* reformat code

* update dependencies

* remove instagram cookies

* add cryptography

* upgrade to py3.11

* temp fix
  • Loading branch information
BennyThink authored Sep 10, 2023
1 parent 0607ae8 commit d901332
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 76 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
FROM python:3.10-alpine as builder
FROM python:3.11-alpine as builder

RUN apk update && apk add --no-cache tzdata alpine-sdk libffi-dev ca-certificates
ADD requirements.txt /tmp/
RUN pip3 install --user -r /tmp/requirements.txt && rm /tmp/requirements.txt


FROM python:3.10-alpine
FROM python:3.11-alpine
WORKDIR /ytdlbot/ytdlbot
ENV TZ=Europe/Stockholm

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ services:
- ./db_data:/var/lib/mysql
environment:
MYSQL_ROOT_PASSWORD: 'root'
command: --default-authentication-plugin=mysql_native_password
logging:
driver: none

Expand All @@ -33,7 +34,6 @@ services:
- socat
- redis
volumes:
- ./data/instagram.com_cookies.txt:/ytdlbot/ytdlbot/instagram.com_cookies.txt
- ./data/vnstat/:/var/lib/vnstat/
labels:
- "com.centurylinklabs.watchtower.enable=true"
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
pyrogram==1.4.16
pyrogram==2.0.106
tgcrypto==1.2.5
yt-dlp==2023.7.6
APScheduler==3.10.4
beautifultable==1.1.0
ffmpeg-python==0.2.0
PyMySQL==1.1.0
celery==5.3.1
celery==5.3.4
filetype==1.2.0
flower==2.0.1
psutil==5.9.5
Expand Down
4 changes: 1 addition & 3 deletions worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ services:
- env/ytdl.env
restart: always
command: [ "/usr/local/bin/supervisord", "-c" ,"/ytdlbot/conf/supervisor_worker.conf" ]
volumes:
- ./data/instagram.com_cookies.txt:/ytdlbot/ytdlbot/instagram.com_cookies.txt
# network_mode: "host"
# deploy:
# resources:
# limits:
# cpus: '0.3'
# memory: 1500M
# memory: 1500M
7 changes: 4 additions & 3 deletions ytdlbot/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ def __init__(self):
self.con = pymysql.connect(
host=MYSQL_HOST, user=MYSQL_USER, passwd=MYSQL_PASS, db="ytdl", charset="utf8mb4"
)
logging.debug("Used real MySQL connection.")
except pymysql.err.OperationalError:
logging.warning("Using fake MySQL connection.")
self.con = FakeMySQL()

self.con.ping(reconnect=True)
Expand All @@ -273,9 +275,8 @@ def __del__(self):
self.con.close()

def get_user_settings(self, user_id: int) -> tuple:
cur = self.con.cursor()
cur.execute("SELECT * FROM settings WHERE user_id = %s", (user_id,))
data = cur.fetchone()
self.cur.execute("SELECT * FROM settings WHERE user_id = %s", (user_id,))
data = self.cur.fetchone()
if data is None:
return 100, "high", "video", "Celery"
return data
Expand Down
11 changes: 9 additions & 2 deletions ytdlbot/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@
import yt_dlp as ytdl
from tqdm import tqdm

from config import AUDIO_FORMAT, ENABLE_ARIA2, ENABLE_FFMPEG, TG_MAX_SIZE, IPv6, SS_YOUTUBE
from config import (
AUDIO_FORMAT,
ENABLE_ARIA2,
ENABLE_FFMPEG,
SS_YOUTUBE,
TG_MAX_SIZE,
IPv6,
)
from limit import Payment
from utils import adjust_formats, apply_log_formatter, current_time, sizeof_fmt

Expand All @@ -35,7 +42,7 @@


def edit_text(bot_msg, text: str):
key = f"{bot_msg.chat.id}-{bot_msg.message_id}"
key = f"{bot_msg.chat.id}-{bot_msg.id}"
# if the key exists, we shouldn't send edit message
if not r.exists(key):
time.sleep(random.random())
Expand Down
2 changes: 1 addition & 1 deletion ytdlbot/flower_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@

from config import BROKER

app = Celery('tasks', broker=BROKER, timezone="Asia/Shanghai")
app = Celery("tasks", broker=BROKER, timezone="Asia/Shanghai")
94 changes: 52 additions & 42 deletions ytdlbot/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

__author__ = "Benny <[email protected]>"

import asyncio
import logging
import math
import os
Expand All @@ -15,6 +16,7 @@
import re
import shutil
import subprocess
import sys
import tempfile
import threading
import time
Expand All @@ -30,7 +32,7 @@
from apscheduler.schedulers.background import BackgroundScheduler
from celery import Celery
from celery.worker.control import Panel
from pyrogram import Client, idle, types
from pyrogram import Client, enums, idle, types
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor

Expand All @@ -43,8 +45,8 @@
ENABLE_QUEUE,
ENABLE_VIP,
OWNER,
RCLONE_PATH,
RATE_LIMIT,
RCLONE_PATH,
WORKERS,
)
from constant import BotText
Expand All @@ -71,32 +73,31 @@
redis = Redis()
channel = Channel()

session = "ytdl-celery"
celery_client = create_app(session)
bot = create_app("ytdl-celery")


def get_messages(chat_id, message_id):
try:
return celery_client.get_messages(chat_id, message_id)
return bot.get_messages(chat_id, message_id)
except ConnectionError as e:
logging.critical("WTH!!! %s", e)
celery_client.start()
return celery_client.get_messages(chat_id, message_id)
bot.start()
return bot.get_messages(chat_id, message_id)


@app.task(rate_limit=f"{RATE_LIMIT}/m")
def ytdl_download_task(chat_id, message_id, url: str):
logging.info("YouTube celery tasks started for %s", url)
bot_msg = get_messages(chat_id, message_id)
ytdl_normal_download(celery_client, bot_msg, url)
ytdl_normal_download(bot, bot_msg, url)
logging.info("YouTube celery tasks ended.")


@app.task()
def audio_task(chat_id, message_id):
logging.info("Audio celery tasks started for %s-%s", chat_id, message_id)
bot_msg = get_messages(chat_id, message_id)
normal_audio(celery_client, bot_msg)
normal_audio(bot, bot_msg)
logging.info("Audio celery tasks ended.")


Expand All @@ -116,7 +117,7 @@ def get_unique_clink(original_url: str, user_id: int):
def direct_download_task(chat_id, message_id, url):
logging.info("Direct download celery tasks started for %s", url)
bot_msg = get_messages(chat_id, message_id)
direct_normal_download(celery_client, bot_msg, url)
direct_normal_download(bot, bot_msg, url)
logging.info("Direct download celery tasks ended.")


Expand Down Expand Up @@ -146,8 +147,8 @@ def ytdl_download_entrance(client: Client, bot_msg: types.Message, url: str, mod
return
mode = mode or payment.get_user_settings(chat_id)[-1]
if ENABLE_CELERY and mode in [None, "Celery"]:
async_task(ytdl_download_task, chat_id, bot_msg.message_id, url)
# ytdl_download_task.delay(chat_id, bot_msg.message_id, url)
async_task(ytdl_download_task, chat_id, bot_msg.id, url)
# ytdl_download_task.delay(chat_id, bot_msg.id, url)
else:
ytdl_normal_download(client, bot_msg, url)
except Exception as e:
Expand All @@ -158,15 +159,15 @@ def ytdl_download_entrance(client: Client, bot_msg: types.Message, url: str, mod
def direct_download_entrance(client: Client, bot_msg: typing.Union[types.Message, typing.Coroutine], url: str):
if ENABLE_CELERY:
direct_normal_download(client, bot_msg, url)
# direct_download_task.delay(bot_msg.chat.id, bot_msg.message_id, url)
# direct_download_task.delay(bot_msg.chat.id, bot_msg.id, url)
else:
direct_normal_download(client, bot_msg, url)


def audio_entrance(client, bot_msg):
if ENABLE_CELERY:
async_task(audio_task, bot_msg.chat.id, bot_msg.message_id)
# audio_task.delay(bot_msg.chat.id, bot_msg.message_id)
async_task(audio_task, bot_msg.chat.id, bot_msg.id)
# audio_task.delay(bot_msg.chat.id, bot_msg.id)
else:
normal_audio(client, bot_msg)

Expand Down Expand Up @@ -205,7 +206,7 @@ def direct_normal_download(client: Client, bot_msg: typing.Union[types.Message,
logging.info("Downloaded file %s", filename)
st_size = os.stat(filepath).st_size

client.send_chat_action(chat_id, "upload_document")
client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_DOCUMENT)
client.send_document(
bot_msg.chat.id,
filepath,
Expand All @@ -224,11 +225,11 @@ def normal_audio(client: Client, bot_msg: typing.Union[types.Message, typing.Cor
)
orig_url: str = re.findall(r"https?://.*", bot_msg.caption)[0]
with tempfile.TemporaryDirectory(prefix="ytdl-") as tmp:
client.send_chat_action(chat_id, "record_audio")
client.send_chat_action(chat_id, enums.ChatAction.RECORD_AUDIO)
# just try to download the audio using yt-dlp
filepath = ytdl_download(orig_url, tmp, status_msg, hijack="bestaudio[ext=m4a]")
status_msg.edit_text("Sending audio now...")
client.send_chat_action(chat_id, "upload_audio")
client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_AUDIO)
for f in filepath:
client.send_audio(chat_id, f)
status_msg.edit_text("✅ Conversion complete.")
Expand Down Expand Up @@ -264,18 +265,18 @@ def ytdl_normal_download(client: Client, bot_msg: typing.Union[types.Message, ty

video_paths = ytdl_download(url, temp_dir.name, bot_msg)
logging.info("Download complete.")
client.send_chat_action(chat_id, "upload_document")
client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_DOCUMENT)
bot_msg.edit_text("Download complete. Sending now...")
try:
upload_processor(client, bot_msg, url, video_paths)
except pyrogram.errors.Flood as e:
except pyrogram.errors.FloodWait as e:
logging.critical("FloodWait from Telegram: %s", e)
client.send_message(
chat_id,
f"I'm being rate limited by Telegram. Your video will come after {e.x} seconds. Please wait patiently.",
f"I'm being rate limited by Telegram. Your video will come after {e.value} seconds. Please wait patiently.",
)
flood_owner_message(client, e)
time.sleep(e.x)
time.sleep(e.value)
upload_processor(client, bot_msg, url, video_paths)

bot_msg.edit_text("Download success!✅")
Expand Down Expand Up @@ -407,7 +408,7 @@ def upload_processor(client, bot_msg, url, vp_or_fid: typing.Union[str, list]):
redis.add_send_cache(unique, getattr(obj, "file_id", None))
redis.update_metrics("video_success")
if ARCHIVE_ID and isinstance(vp_or_fid, pathlib.Path):
client.forward_messages(bot_msg.chat.id, ARCHIVE_ID, res_msg.message_id)
client.forward_messages(bot_msg.chat.id, ARCHIVE_ID, res_msg.id)
return res_msg


Expand Down Expand Up @@ -449,14 +450,8 @@ def gen_cap(bm, url, video_path):

def gen_video_markup():
markup = InlineKeyboardMarkup(
[
[ # First row
InlineKeyboardButton( # Generates a callback query when pressed
"convert to audio", callback_data="convert"
)
]
]
)
[[InlineKeyboardButton("convert to audio", callback_data="convert")]]
) # First row # Generates a callback query when pressed
return markup


Expand Down Expand Up @@ -506,28 +501,43 @@ def async_task(task_name, *args):
task_name.apply_async(args=args, queue=destination)


def run_celery():
worker_name = os.getenv("WORKER_NAME", "")
argv = ["-A", "tasks", "worker", "--loglevel=info", "--pool=threads", f"--concurrency={WORKERS}", "-n", worker_name]
if ENABLE_QUEUE:
argv.extend(["-Q", worker_name])
app.worker_main(argv)


def purge_tasks():
count = app.control.purge()
return f"purged {count} tasks."


def run_celery():
# 创建一个新的事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
worker_name = os.getenv("WORKER_NAME", "")
argv = [
"-A",
"tasks",
"worker",
"--loglevel=info",
"--pool=threads",
f"--concurrency={WORKERS}",
"-n",
worker_name,
]
if ENABLE_QUEUE:
argv.extend(["-Q", worker_name])
app.worker_main(argv)
except:
logging.warning("Celery worker failed to start.")
sys.exit(1)


if __name__ == "__main__":
# celery_client.start()
print("Bootstrapping Celery worker now.....")
time.sleep(5)
threading.Thread(target=run_celery, daemon=True).start()

scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
scheduler.add_job(auto_restart, "interval", seconds=900)
scheduler.add_job(auto_restart, "interval", seconds=120)
scheduler.start()

idle()
celery_client.stop()
bot.stop()
17 changes: 17 additions & 0 deletions ytdlbot/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import logging
import os
import pathlib
import re
import shutil
import subprocess
import tempfile
import time
import uuid
from datetime import datetime

import coloredlogs
import ffmpeg
Expand Down Expand Up @@ -189,6 +191,21 @@ def next_salt_detector(self):
# logging.warning("Potential crash detected by %s, it's time to commit suicide...", self.func_name())
# return True

def fail_connect_detector(self):
# TODO: don't know why sometimes it stops connected to DC
last_line = self.logs.strip().split("\n")[-1]
try:
log_time_str = re.findall(r"\[(.*),", last_line)[0]
log_time = datetime.strptime(log_time_str, "%Y-%m-%d %H:%M:%S")
except Exception:
return

time_difference = (datetime.now() - log_time).total_seconds()

if ("Sending as video" in last_line or "PingTask started" in last_line) and time_difference > 60:
logging.warning("Can't connect to Telegram DC")
return True


def auto_restart():
log_path = "/var/log/ytdl.log"
Expand Down
Loading

2 comments on commit d901332

@SanujaNS
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pyrogram2 update ❤️‍🔥❤️‍🔥❤️‍🔥
👏👏👏

@BennyThink
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to revert it because it stopped working...

Please sign in to comment.