Skip to content

Commit

Permalink
Maybe simpler query?
Browse files Browse the repository at this point in the history
  • Loading branch information
brianjp93 committed Oct 12, 2024
1 parent 95403e4 commit 56bb390
Showing 1 changed file with 19 additions and 22 deletions.
41 changes: 19 additions & 22 deletions match/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from celery import group
from django.conf import settings
from django.db.utils import IntegrityError
from django.db.models import Count, Subquery, OuterRef
from django.db.models import Count, Exists, Subquery, OuterRef
from django.db.models import Case, When, Sum
from django.db.models import IntegerField, Q
from django.utils import timezone
Expand Down Expand Up @@ -394,19 +394,18 @@ def celery_task_pool(jobs, pool_size=10):
@app.task(name="match.tasks.huge_match_import_task")
def huge_match_import_task(hours_thresh=72, exclude_hours=24, break_early=True):
thresh = timezone.now() - timedelta(hours=hours_thresh)
thresh_epoch_ms = thresh.timestamp() * 1000
qs = Participant.objects.filter(
match__game_creation__gt=thresh_epoch_ms,
match__queue_id__in=[FLEX_QUEUE, SOLO_QUEUE],
match__platform_id="NA1",

qs = Summoner.objects.filter(
Exists(Participant.objects.filter(
puuid=OuterRef("puuid"),
match__queue_id__in=[FLEX_QUEUE, SOLO_QUEUE],
match__game_creation_dt__gt=thresh,
)),
puuid__isnull=False,
region="na",
).exclude(
huge_match_import_at__gt=timezone.now() - timedelta(hours=exclude_hours)
)
qs = qs.exclude(
puuid__in=Summoner.objects.filter(
huge_match_import_at__gt=timezone.now() - timedelta(hours=exclude_hours),
).values('puuid')
)
qs = qs.select_related("match").order_by('puuid').distinct('puuid')

count = qs.count()
logger.info(f"Found {count} participants for huge_match_import_task.")
Expand All @@ -418,26 +417,24 @@ def huge_match_import_task(hours_thresh=72, exclude_hours=24, break_early=True):
elapsed_start = time.perf_counter()

def get_jobs(qs, start_time):
for participant in qs:
if summoner := Summoner.objects.filter(puuid=participant.puuid).first():
if break_early and summoner.huge_match_import_at and summoner.huge_match_import_at > thresh:
# only go back as far as we need to for this summoner
start_time = summoner.huge_match_import_at
for summoner in qs:
if break_early and summoner.huge_match_import_at and summoner.huge_match_import_at > thresh:
# only go back as far as we need to for this summoner
start_time = summoner.huge_match_import_at
if not summoner:
continue
import_time = timezone.now()
job = import_recent_matches.s(
0,
10_000,
participant.puuid,
participant.match.region,
summoner.puuid,
summoner.region,
startTime=start_time,
use_celery=False,
)
yield job
if summoner:
summoner.huge_match_import_at = import_time
summoner.save(update_fields=["huge_match_import_at"])
summoner.huge_match_import_at = import_time
summoner.save(update_fields=["huge_match_import_at"])

for _ in celery_task_pool(get_jobs(qs.iterator(2000), thresh), 10):
i += 1
Expand Down

0 comments on commit 56bb390

Please sign in to comment.