From 56bb390df7221dacc9dbd472aa68cb928e219295 Mon Sep 17 00:00:00 2001 From: Brian Perrett Date: Sat, 12 Oct 2024 11:18:46 -0700 Subject: [PATCH] Maybe simpler query? --- match/tasks.py | 41 +++++++++++++++++++---------------------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/match/tasks.py b/match/tasks.py index 98dcdf66..4b559b10 100644 --- a/match/tasks.py +++ b/match/tasks.py @@ -4,7 +4,7 @@ from celery import group from django.conf import settings from django.db.utils import IntegrityError -from django.db.models import Count, Subquery, OuterRef +from django.db.models import Count, Exists, Subquery, OuterRef from django.db.models import Case, When, Sum from django.db.models import IntegerField, Q from django.utils import timezone @@ -394,19 +394,18 @@ def celery_task_pool(jobs, pool_size=10): @app.task(name="match.tasks.huge_match_import_task") def huge_match_import_task(hours_thresh=72, exclude_hours=24, break_early=True): thresh = timezone.now() - timedelta(hours=hours_thresh) - thresh_epoch_ms = thresh.timestamp() * 1000 - qs = Participant.objects.filter( - match__game_creation__gt=thresh_epoch_ms, - match__queue_id__in=[FLEX_QUEUE, SOLO_QUEUE], - match__platform_id="NA1", + + qs = Summoner.objects.filter( + Exists(Participant.objects.filter( + puuid=OuterRef("puuid"), + match__queue_id__in=[FLEX_QUEUE, SOLO_QUEUE], + match__game_creation_dt__gt=thresh, + )), puuid__isnull=False, + region="na", + ).exclude( + huge_match_import_at__gt=timezone.now() - timedelta(hours=exclude_hours) ) - qs = qs.exclude( - puuid__in=Summoner.objects.filter( - huge_match_import_at__gt=timezone.now() - timedelta(hours=exclude_hours), - ).values('puuid') - ) - qs = qs.select_related("match").order_by('puuid').distinct('puuid') count = qs.count() logger.info(f"Found {count} participants for huge_match_import_task.") @@ -418,26 +417,24 @@ def huge_match_import_task(hours_thresh=72, exclude_hours=24, break_early=True): elapsed_start = time.perf_counter() def get_jobs(qs, start_time): - for participant in qs: - if summoner := Summoner.objects.filter(puuid=participant.puuid).first(): - if break_early and summoner.huge_match_import_at and summoner.huge_match_import_at > thresh: - # only go back as far as we need to for this summoner - start_time = summoner.huge_match_import_at + for summoner in qs: + if break_early and summoner.huge_match_import_at and summoner.huge_match_import_at > thresh: + # only go back as far as we need to for this summoner + start_time = summoner.huge_match_import_at if not summoner: continue import_time = timezone.now() job = import_recent_matches.s( 0, 10_000, - participant.puuid, - participant.match.region, + summoner.puuid, + summoner.region, startTime=start_time, use_celery=False, ) yield job - if summoner: - summoner.huge_match_import_at = import_time - summoner.save(update_fields=["huge_match_import_at"]) + summoner.huge_match_import_at = import_time + summoner.save(update_fields=["huge_match_import_at"]) for _ in celery_task_pool(get_jobs(qs.iterator(2000), thresh), 10): i += 1