diff --git a/boofilsic/settings.py b/boofilsic/settings.py index c080d123..2b19521d 100644 --- a/boofilsic/settings.py +++ b/boofilsic/settings.py @@ -99,7 +99,7 @@ "DB": _parsed_redis_url.path[1:], "DEFAULT_TIMEOUT": -1, } - for q in ["mastodon", "export", "import", "fetch", "crawl", "ap"] + for q in ["mastodon", "export", "import", "fetch", "crawl", "ap", "cron"] } _parsed_search_url = env.url("NEODB_SEARCH_URL") diff --git a/catalog/jobs/__init__.py b/catalog/jobs/__init__.py new file mode 100644 index 00000000..a2ebd698 --- /dev/null +++ b/catalog/jobs/__init__.py @@ -0,0 +1,2 @@ +from .discover import DiscoverGenerator +from .podcast import PodcastUpdater diff --git a/catalog/jobs/discover.py b/catalog/jobs/discover.py new file mode 100644 index 00000000..5abfe6cf --- /dev/null +++ b/catalog/jobs/discover.py @@ -0,0 +1,95 @@ +from datetime import timedelta + +from django.core.cache import cache +from django.db.models import Count, F +from django.utils import timezone +from loguru import logger + +from catalog.models import * +from common.models import BaseJob, JobManager +from journal.models import Comment, ShelfMember, q_item_in_category + +MAX_ITEMS_PER_PERIOD = 12 +MIN_MARKS = 1 +MAX_DAYS_FOR_PERIOD = 96 +MIN_DAYS_FOR_PERIOD = 6 + + +@JobManager.register +class DiscoverGenerator(BaseJob): + interval = timedelta(hours=3) + + def get_popular_marked_item_ids(self, category, days, exisiting_ids): + item_ids = [ + m["item_id"] + for m in ShelfMember.objects.filter(q_item_in_category(category)) + .filter(created_time__gt=timezone.now() - timedelta(days=days)) + .exclude(item_id__in=exisiting_ids) + .values("item_id") + .annotate(num=Count("item_id")) + .filter(num__gte=MIN_MARKS) + .order_by("-num")[:MAX_ITEMS_PER_PERIOD] + ] + return item_ids + + def get_popular_commented_podcast_ids(self, days, exisiting_ids): + return list( + Comment.objects.filter(q_item_in_category(ItemCategory.Podcast)) + .filter(created_time__gt=timezone.now() - timedelta(days=days)) + .annotate(p=F("item__podcastepisode__program")) + .filter(p__isnull=False) + .exclude(p__in=exisiting_ids) + .values("p") + .annotate(num=Count("p")) + .filter(num__gte=MIN_MARKS) + .order_by("-num") + .values_list("p", flat=True)[:MAX_ITEMS_PER_PERIOD] + ) + + def cleanup_shows(self, items): + seasons = [i for i in items if i.__class__ == TVSeason] + for season in seasons: + if season.show in items: + items.remove(season.show) + return items + + def run(self): + logger.info("Discover data update start.") + cache_key = "public_gallery" + gallery_categories = [ + ItemCategory.Book, + ItemCategory.Movie, + ItemCategory.TV, + ItemCategory.Game, + ItemCategory.Music, + ItemCategory.Podcast, + ] + gallery_list = [] + for category in gallery_categories: + days = MAX_DAYS_FOR_PERIOD + item_ids = [] + while days >= MIN_DAYS_FOR_PERIOD: + ids = self.get_popular_marked_item_ids(category, days, item_ids) + logger.info(f"Most marked {category} in last {days} days: {len(ids)}") + item_ids = ids + item_ids + days //= 2 + if category == ItemCategory.Podcast: + days = MAX_DAYS_FOR_PERIOD // 4 + extra_ids = self.get_popular_commented_podcast_ids(days, item_ids) + logger.info( + f"Most commented podcast in last {days} days: {len(extra_ids)}" + ) + item_ids = extra_ids + item_ids + items = [Item.objects.get(pk=i) for i in item_ids] + if category == ItemCategory.TV: + items = self.cleanup_shows(items) + gallery_list.append( + { + "name": "popular_" + category.value, + "title": "" + + (category.label if category != ItemCategory.Book else "图书"), + "items": items, + } + ) + cache.set(cache_key, gallery_list, timeout=None) + logger.info("Discover data updated.") diff --git a/catalog/jobs/podcast.py b/catalog/jobs/podcast.py new file mode 100644 index 00000000..3b464854 --- /dev/null +++ b/catalog/jobs/podcast.py @@ -0,0 +1,35 @@ +import pprint +from datetime import timedelta +from time import sleep + +from loguru import logger + +from catalog.common.models import IdType +from catalog.models import Podcast +from catalog.sites import RSS +from common.models import BaseJob, JobManager + + +@JobManager.register +class PodcastUpdater(BaseJob): + interval = timedelta(hours=2) + + def run(self): + logger.info("Podcasts update start.") + count = 0 + qs = Podcast.objects.filter( + is_deleted=False, merged_to_item__isnull=True + ).order_by("pk") + for p in qs: + if ( + p.primary_lookup_id_type == IdType.RSS + and p.primary_lookup_id_value is not None + ): + logger.info(f"updating {p}") + c = p.episodes.count() + site = RSS(p.feed_url) + site.scrape_additional_data() + c2 = p.episodes.count() + logger.info(f"updated {p}, {c2-c} new episodes.") + count += c2 - c + logger.info(f"Podcasts update finished, {count} new episodes total.") diff --git a/common/management/commands/cron.py b/common/management/commands/cron.py new file mode 100644 index 00000000..a4dd9e4e --- /dev/null +++ b/common/management/commands/cron.py @@ -0,0 +1,42 @@ +from django.core.management.base import BaseCommand +from loguru import logger + +from catalog.jobs import * # noqa +from common.models import JobManager + + +class Command(BaseCommand): + help = "Schedule timed jobs" + + def add_arguments(self, parser): + parser.add_argument( + "--cancel", + action="store_true", + ) + parser.add_argument( + "--schedule", + action="store_true", + ) + parser.add_argument( + "--list", + action="store_true", + ) + parser.add_argument( + "--runonce", + action="append", + ) + + def handle(self, *args, **options): + if options["cancel"]: + JobManager.cancel() + if options["schedule"]: + JobManager.cancel() # cancel previously scheduled jobs if any + JobManager.schedule() + if options["runonce"]: + for job_id in options["runonce"]: + run = JobManager.run(job_id) + if not run: + logger.error(f"Job not found: {job_id}") + if options["list"]: + jobs = JobManager.get_scheduled_job_ids() + logger.info(f"{len(jobs)} scheduled jobs: {jobs}") diff --git a/common/management/commands/delete_job.py b/common/management/commands/delete_job.py deleted file mode 100644 index 66f2690e..00000000 --- a/common/management/commands/delete_job.py +++ /dev/null @@ -1,20 +0,0 @@ -import pprint - -from django.core.management.base import BaseCommand -from redis import Redis -from rq import Queue -from rq.job import Job - - -class Command(BaseCommand): - help = "Delete a job" - - def add_arguments(self, parser): - parser.add_argument("job_id", type=str, help="Job ID") - - def handle(self, *args, **options): - redis = Redis() - job_id = str(options["job_id"]) - job = Job.fetch(job_id, connection=redis) - job.delete() - self.stdout.write(self.style.SUCCESS(f"Deleted {job}")) diff --git a/common/management/commands/jobs.py b/common/management/commands/jobs.py new file mode 100644 index 00000000..011c4129 --- /dev/null +++ b/common/management/commands/jobs.py @@ -0,0 +1,45 @@ +import pprint + +import django_rq +from django.conf import settings +from django.core.management.base import BaseCommand +from redis import Redis +from rq import Queue +from rq.job import Job + + +class Command(BaseCommand): + help = "Show jobs in queue" + + def add_arguments(self, parser): + parser.add_argument("--delete", action="append") + parser.add_argument("--list", action="store_true") + + def handle(self, *args, **options): + if options["delete"]: + for job_id in options["delete"]: + job = Job.fetch(job_id, connection=django_rq.get_connection("fetch")) + job.delete() + self.stdout.write(self.style.SUCCESS(f"Deleted {job}")) + if options["list"]: + queues = settings.RQ_QUEUES.keys() + for q in queues: + queue = django_rq.get_queue(q) + for registry in [ + queue.scheduled_job_registry, + queue.started_job_registry, + queue.deferred_job_registry, + queue.finished_job_registry, + queue.failed_job_registry, + queue.canceled_job_registry, + ]: + for job_id in registry.get_job_ids(): + try: + job = Job.fetch( + job_id, connection=django_rq.get_connection(q) + ) + self.stdout.write( + self.style.SUCCESS(f"{registry.key} {repr(job)}") + ) + except Exception as e: + print(f"Error fetching {registry.key} {job_id}") diff --git a/common/management/commands/list_jobs.py b/common/management/commands/list_jobs.py deleted file mode 100644 index 51f189c5..00000000 --- a/common/management/commands/list_jobs.py +++ /dev/null @@ -1,31 +0,0 @@ -import pprint - -from django.core.management.base import BaseCommand -from redis import Redis -from rq import Queue -from rq.job import Job - - -class Command(BaseCommand): - help = "Show jobs in queue" - - def add_arguments(self, parser): - parser.add_argument("queue", type=str, help="Queue") - - def handle(self, *args, **options): - redis = Redis() - queue = Queue(str(options["queue"]), connection=redis) - for registry in [ - queue.started_job_registry, - queue.deferred_job_registry, - queue.finished_job_registry, - queue.failed_job_registry, - queue.scheduled_job_registry, - ]: - self.stdout.write(self.style.SUCCESS(f"Registry {registry}")) - for job_id in registry.get_job_ids(): - try: - job = Job.fetch(job_id, connection=redis) - pprint.pp(job) - except Exception as e: - print(f"Error fetching {job_id}") diff --git a/common/models.py b/common/models.py index e69de29b..f76a1dee 100644 --- a/common/models.py +++ b/common/models.py @@ -0,0 +1,72 @@ +from datetime import timedelta + +import django_rq +from loguru import logger +from rq.job import Job +from rq.registry import ScheduledJobRegistry + + +class BaseJob: + interval = timedelta(seconds=1) + + @classmethod + def cancel(cls): + job_id = cls.__name__ + try: + job = Job.fetch(id=job_id, connection=django_rq.get_connection("cron")) + if job.get_status() in ["queued", "scheduled"]: + logger.info(f"Cancel queued job: {job_id}") + job.cancel() + registry = ScheduledJobRegistry(queue=django_rq.get_queue("cron")) + registry.remove(job) + except: + pass + + @classmethod + def schedule(cls): + job_id = cls.__name__ + logger.info(f"Scheduling job: {job_id}") + django_rq.get_queue("cron").enqueue_in( + cls.interval, cls._run, job_id=job_id, result_ttl=0, failure_ttl=0 + ) + + @classmethod + def _run(cls): + cls.schedule() # schedule next run + cls().run() + + def run(self): + pass + + +class JobManager: + registry = set() + + @classmethod + def register(cls, target): + cls.registry.add(target) + return target + + @classmethod + def schedule(cls): + for j in cls.registry: + j.schedule() + + @classmethod + def cancel(cls): + for j in cls.registry: + j.cancel() + + @classmethod + def run(cls, job_id): + for j in cls.registry: + if j.__name__ == job_id: + logger.info(f"Run job: {job_id}") + j().run() + return True + return False + + @classmethod + def get_scheduled_job_ids(cls): + registry = ScheduledJobRegistry(queue=django_rq.get_queue("cron")) + return registry.get_job_ids() diff --git a/compose.yml b/compose.yml index 50ef5dc0..bc815d04 100644 --- a/compose.yml +++ b/compose.yml @@ -166,14 +166,14 @@ services: neodb-worker: <<: *neodb-service - command: neodb-manage rqworker --with-scheduler import export mastodon fetch crawl ap + command: neodb-manage rqworker --with-scheduler import export mastodon fetch crawl ap cron depends_on: migration: condition: service_completed_successfully neodb-worker-extra: <<: *neodb-service - command: neodb-manage rqworker --with-scheduler fetch crawl ap + command: neodb-manage rqworker fetch crawl ap depends_on: migration: condition: service_completed_successfully @@ -231,7 +231,7 @@ services: dev-neodb-worker: <<: *dev-neodb-service - command: neodb-manage rqworker --with-scheduler import export mastodon fetch crawl ap + command: neodb-manage rqworker --with-scheduler import export mastodon fetch crawl ap cron dev-takahe-web: <<: *dev-neodb-service diff --git a/misc/bin/neodb-init b/misc/bin/neodb-init index 1d63c7c8..278d6a5e 100755 --- a/misc/bin/neodb-init +++ b/misc/bin/neodb-init @@ -9,5 +9,6 @@ echo NeoDB initializing... takahe-manage migrate || exit $? neodb-manage migrate || exit $? +neodb-manage cron --schedule || exit $? echo NeoDB initialization complete. diff --git a/requirements-dev.txt b/requirements-dev.txt index 7dc3b160..da3c30f8 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -6,4 +6,4 @@ djlint~=1.32.1 isort~=5.12.0 lxml-stubs pre-commit -pyright==1.1.327 +pyright==1.1.332