Skip to content

Commit

Permalink
use rq for cron tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Your Name authored and alphatownsman committed Oct 22, 2023
1 parent dc5e7fc commit cd8b46c
Show file tree
Hide file tree
Showing 12 changed files with 297 additions and 56 deletions.
2 changes: 1 addition & 1 deletion boofilsic/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
"DB": _parsed_redis_url.path[1:],
"DEFAULT_TIMEOUT": -1,
}
for q in ["mastodon", "export", "import", "fetch", "crawl", "ap"]
for q in ["mastodon", "export", "import", "fetch", "crawl", "ap", "cron"]
}

_parsed_search_url = env.url("NEODB_SEARCH_URL")
Expand Down
2 changes: 2 additions & 0 deletions catalog/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .discover import DiscoverGenerator
from .podcast import PodcastUpdater
95 changes: 95 additions & 0 deletions catalog/jobs/discover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from datetime import timedelta

from django.core.cache import cache
from django.db.models import Count, F
from django.utils import timezone
from loguru import logger

from catalog.models import *
from common.models import BaseJob, JobManager
from journal.models import Comment, ShelfMember, q_item_in_category

MAX_ITEMS_PER_PERIOD = 12
MIN_MARKS = 1
MAX_DAYS_FOR_PERIOD = 96
MIN_DAYS_FOR_PERIOD = 6


@JobManager.register
class DiscoverGenerator(BaseJob):
interval = timedelta(hours=3)

def get_popular_marked_item_ids(self, category, days, exisiting_ids):
item_ids = [
m["item_id"]
for m in ShelfMember.objects.filter(q_item_in_category(category))
.filter(created_time__gt=timezone.now() - timedelta(days=days))
.exclude(item_id__in=exisiting_ids)
.values("item_id")
.annotate(num=Count("item_id"))
.filter(num__gte=MIN_MARKS)
.order_by("-num")[:MAX_ITEMS_PER_PERIOD]
]
return item_ids

def get_popular_commented_podcast_ids(self, days, exisiting_ids):
return list(
Comment.objects.filter(q_item_in_category(ItemCategory.Podcast))
.filter(created_time__gt=timezone.now() - timedelta(days=days))
.annotate(p=F("item__podcastepisode__program"))
.filter(p__isnull=False)
.exclude(p__in=exisiting_ids)
.values("p")
.annotate(num=Count("p"))
.filter(num__gte=MIN_MARKS)
.order_by("-num")
.values_list("p", flat=True)[:MAX_ITEMS_PER_PERIOD]
)

def cleanup_shows(self, items):
seasons = [i for i in items if i.__class__ == TVSeason]
for season in seasons:
if season.show in items:
items.remove(season.show)
return items

def run(self):
logger.info("Discover data update start.")
cache_key = "public_gallery"
gallery_categories = [
ItemCategory.Book,
ItemCategory.Movie,
ItemCategory.TV,
ItemCategory.Game,
ItemCategory.Music,
ItemCategory.Podcast,
]
gallery_list = []
for category in gallery_categories:
days = MAX_DAYS_FOR_PERIOD
item_ids = []
while days >= MIN_DAYS_FOR_PERIOD:
ids = self.get_popular_marked_item_ids(category, days, item_ids)
logger.info(f"Most marked {category} in last {days} days: {len(ids)}")
item_ids = ids + item_ids
days //= 2
if category == ItemCategory.Podcast:
days = MAX_DAYS_FOR_PERIOD // 4
extra_ids = self.get_popular_commented_podcast_ids(days, item_ids)
logger.info(
f"Most commented podcast in last {days} days: {len(extra_ids)}"
)
item_ids = extra_ids + item_ids
items = [Item.objects.get(pk=i) for i in item_ids]
if category == ItemCategory.TV:
items = self.cleanup_shows(items)
gallery_list.append(
{
"name": "popular_" + category.value,
"title": ""
+ (category.label if category != ItemCategory.Book else "图书"),
"items": items,
}
)
cache.set(cache_key, gallery_list, timeout=None)
logger.info("Discover data updated.")
35 changes: 35 additions & 0 deletions catalog/jobs/podcast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pprint
from datetime import timedelta
from time import sleep

from loguru import logger

from catalog.common.models import IdType
from catalog.models import Podcast
from catalog.sites import RSS
from common.models import BaseJob, JobManager


@JobManager.register
class PodcastUpdater(BaseJob):
interval = timedelta(hours=2)

def run(self):
logger.info("Podcasts update start.")
count = 0
qs = Podcast.objects.filter(
is_deleted=False, merged_to_item__isnull=True
).order_by("pk")
for p in qs:
if (
p.primary_lookup_id_type == IdType.RSS
and p.primary_lookup_id_value is not None
):
logger.info(f"updating {p}")
c = p.episodes.count()
site = RSS(p.feed_url)
site.scrape_additional_data()
c2 = p.episodes.count()
logger.info(f"updated {p}, {c2-c} new episodes.")
count += c2 - c
logger.info(f"Podcasts update finished, {count} new episodes total.")
42 changes: 42 additions & 0 deletions common/management/commands/cron.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from django.core.management.base import BaseCommand
from loguru import logger

from catalog.jobs import * # noqa
from common.models import JobManager


class Command(BaseCommand):
help = "Schedule timed jobs"

def add_arguments(self, parser):
parser.add_argument(
"--cancel",
action="store_true",
)
parser.add_argument(
"--schedule",
action="store_true",
)
parser.add_argument(
"--list",
action="store_true",
)
parser.add_argument(
"--runonce",
action="append",
)

def handle(self, *args, **options):
if options["cancel"]:
JobManager.cancel()
if options["schedule"]:
JobManager.cancel() # cancel previously scheduled jobs if any
JobManager.schedule()
if options["runonce"]:
for job_id in options["runonce"]:
run = JobManager.run(job_id)
if not run:
logger.error(f"Job not found: {job_id}")
if options["list"]:
jobs = JobManager.get_scheduled_job_ids()
logger.info(f"{len(jobs)} scheduled jobs: {jobs}")
20 changes: 0 additions & 20 deletions common/management/commands/delete_job.py

This file was deleted.

45 changes: 45 additions & 0 deletions common/management/commands/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import pprint

import django_rq
from django.conf import settings
from django.core.management.base import BaseCommand
from redis import Redis
from rq import Queue
from rq.job import Job


class Command(BaseCommand):
help = "Show jobs in queue"

def add_arguments(self, parser):
parser.add_argument("--delete", action="append")
parser.add_argument("--list", action="store_true")

def handle(self, *args, **options):
if options["delete"]:
for job_id in options["delete"]:
job = Job.fetch(job_id, connection=django_rq.get_connection("fetch"))
job.delete()
self.stdout.write(self.style.SUCCESS(f"Deleted {job}"))
if options["list"]:
queues = settings.RQ_QUEUES.keys()
for q in queues:
queue = django_rq.get_queue(q)
for registry in [
queue.scheduled_job_registry,
queue.started_job_registry,
queue.deferred_job_registry,
queue.finished_job_registry,
queue.failed_job_registry,
queue.canceled_job_registry,
]:
for job_id in registry.get_job_ids():
try:
job = Job.fetch(
job_id, connection=django_rq.get_connection(q)
)
self.stdout.write(
self.style.SUCCESS(f"{registry.key} {repr(job)}")
)
except Exception as e:
print(f"Error fetching {registry.key} {job_id}")
31 changes: 0 additions & 31 deletions common/management/commands/list_jobs.py

This file was deleted.

72 changes: 72 additions & 0 deletions common/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from datetime import timedelta

import django_rq
from loguru import logger
from rq.job import Job
from rq.registry import ScheduledJobRegistry


class BaseJob:
interval = timedelta(seconds=1)

@classmethod
def cancel(cls):
job_id = cls.__name__
try:
job = Job.fetch(id=job_id, connection=django_rq.get_connection("cron"))
if job.get_status() in ["queued", "scheduled"]:
logger.info(f"Cancel queued job: {job_id}")
job.cancel()
registry = ScheduledJobRegistry(queue=django_rq.get_queue("cron"))
registry.remove(job)
except:
pass

@classmethod
def schedule(cls):
job_id = cls.__name__
logger.info(f"Scheduling job: {job_id}")
django_rq.get_queue("cron").enqueue_in(
cls.interval, cls._run, job_id=job_id, result_ttl=0, failure_ttl=0
)

@classmethod
def _run(cls):
cls.schedule() # schedule next run
cls().run()

def run(self):
pass


class JobManager:
registry = set()

@classmethod
def register(cls, target):
cls.registry.add(target)
return target

@classmethod
def schedule(cls):
for j in cls.registry:
j.schedule()

@classmethod
def cancel(cls):
for j in cls.registry:
j.cancel()

@classmethod
def run(cls, job_id):
for j in cls.registry:
if j.__name__ == job_id:
logger.info(f"Run job: {job_id}")
j().run()
return True
return False

@classmethod
def get_scheduled_job_ids(cls):
registry = ScheduledJobRegistry(queue=django_rq.get_queue("cron"))
return registry.get_job_ids()
6 changes: 3 additions & 3 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,14 @@ services:

neodb-worker:
<<: *neodb-service
command: neodb-manage rqworker --with-scheduler import export mastodon fetch crawl ap
command: neodb-manage rqworker --with-scheduler import export mastodon fetch crawl ap cron
depends_on:
migration:
condition: service_completed_successfully

neodb-worker-extra:
<<: *neodb-service
command: neodb-manage rqworker --with-scheduler fetch crawl ap
command: neodb-manage rqworker fetch crawl ap
depends_on:
migration:
condition: service_completed_successfully
Expand Down Expand Up @@ -231,7 +231,7 @@ services:

dev-neodb-worker:
<<: *dev-neodb-service
command: neodb-manage rqworker --with-scheduler import export mastodon fetch crawl ap
command: neodb-manage rqworker --with-scheduler import export mastodon fetch crawl ap cron

dev-takahe-web:
<<: *dev-neodb-service
Expand Down
1 change: 1 addition & 0 deletions misc/bin/neodb-init
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ echo NeoDB initializing...

takahe-manage migrate || exit $?
neodb-manage migrate || exit $?
neodb-manage cron --schedule || exit $?

echo NeoDB initialization complete.
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ djlint~=1.32.1
isort~=5.12.0
lxml-stubs
pre-commit
pyright==1.1.327
pyright==1.1.332

0 comments on commit cd8b46c

Please sign in to comment.