Skip to content

Commit

Permalink
Large import task.
Browse files Browse the repository at this point in the history
  • Loading branch information
brianjp93 committed Oct 5, 2024
1 parent 6913de0 commit 701670b
Show file tree
Hide file tree
Showing 13 changed files with 199 additions and 42 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ WORKDIR /app

COPY requirements.txt .
COPY release.sh /release.sh
RUN pip install -r requirements.txt
RUN pip install uv
RUN uv pip install -r requirements.txt --system

COPY . .
RUN tailwindcss -i ./lolsite/static/src/main.css -o ./lolsite/static/src/output.css --minify
Expand Down
3 changes: 2 additions & 1 deletion docker/local/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ COPY requirements.txt .
RUN apk add --no-cache alpine-sdk gcc g++ python3-dev

RUN pip install --upgrade pip
RUN pip install -r requirements.txt
RUN pip install uv
RUN uv pip install -r requirements.txt --system
6 changes: 3 additions & 3 deletions lolsite/periodic_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
"task": "data.tasks.import_missing",
"schedule": crontab(minute="10"),
},
"pt-import-popular-accounts": {
"task": "match.tasks.import_matches_for_popular_accounts",
"schedule": crontab(minute="30"),
"mt-huge-match-import": {
"task": "match.tasks.huge_match_import_task",
"schedule": crontab(hour="1"),
}
}
app.conf.timezone = "America/Denver" # type: ignore
10 changes: 7 additions & 3 deletions match/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

@admin.register(Match)
class MatchAdmin(admin.ModelAdmin):
list_display = ("_id", "get_creation", "queue_id", "game_version")
list_display = ("_id", "creation", "queue_id", "game_version")
list_filter = ("platform_id", "major")
search_fields = (
"participants__summoner_name_simplified",
Expand All @@ -21,13 +21,17 @@ class MatchAdmin(admin.ModelAdmin):
list_per_page = 30
paginator = CachedCountPaginator

@admin.display(ordering="game_creation")
def creation(self, obj):
return obj.get_creation()


@admin.register(Participant)
class ParticipantAdmin(admin.ModelAdmin):
list_display = ("_id", "summoner_name_simplified", "champion_id", "team_position", "team_id")
list_display = ("_id", "riot_id_name", "champion_id", "team_position", "team_id")
raw_id_fields = ("match",)
list_filter = ('team_position',)
search_fields = ("summoner_name_simplified", "match___id")
search_fields = ("riot_id_name", "match___id")
show_full_result_count = False
list_per_page = 30
paginator = CachedCountPaginator
Expand Down
2 changes: 2 additions & 0 deletions match/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ def result(self):
return 'abort_unexpected'
case 'Abort_AntiCheatExit':
return 'abort_anticheat'
case 'Abort_TooFewPlayers':
return 'abort_too_few_players'
return 'normal'

def get_absolute_url(self, pname: str | None = None):
Expand Down
2 changes: 1 addition & 1 deletion match/parsers/match.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ class MatchModel(BaseModelWithLogger):
platformId: str
queueId: int
tournamentCode: str | None
endOfGameResult: Literal['GameComplete', 'Abort_Unexpected', 'Abort_AntiCheatExit'] | None = None
endOfGameResult: Literal['GameComplete', 'Abort_Unexpected', 'Abort_AntiCheatExit', 'Abort_TooFewPlayers'] | None = None

@model_validator(mode='before')
def game_duration_is_sometimes_not_right(cls, data):
Expand Down
135 changes: 107 additions & 28 deletions match/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""match/tasks.py
"""
from itertools import batched
from celery import group
from django.conf import settings
from django.db.utils import IntegrityError
from django.db.models import Count, Subquery, OuterRef
Expand All @@ -8,7 +10,7 @@
from django.utils import timezone
from django.db import connections, transaction
from pydantic import ValidationError
from data.constants import ARENA_QUEUE
from data.constants import ARENA_QUEUE, FLEX_QUEUE, SOLO_QUEUE

from match.parsers.spectate import SpectateModel
from match.serializers import LlmMatchSerializer
Expand Down Expand Up @@ -79,16 +81,26 @@ def import_match(match_id, region, refresh=False):
"""
api = get_riot_api()
if api:
if not api:
return
retry_count = -1
match = None
while retry_count < 7:
retry_count += 1
r = api.match.get(match_id, region=region)
match = r.content

if r.status_code == 429:
return "throttled"
if r.status_code == 404:
if retry_count == 7:
return "throttled"
else:
time.sleep(2**retry_count)
continue
elif r.status_code == 404:
return "not found"

import_match_from_data(match, region, refresh=refresh)
else:
import_match_from_data(match, region, refresh=refresh)
return


def fetch_match_json(match_id: str, region: str, refresh=False):
Expand Down Expand Up @@ -234,6 +246,8 @@ def import_recent_matches(
queue: Optional[int] = None,
startTime: Optional[timezone.datetime] = None,
endTime: Optional[timezone.datetime] = None,
break_on_match_found = False,
use_celery = False,
):
"""Import recent matches for a puuid.
Expand Down Expand Up @@ -265,6 +279,8 @@ def import_recent_matches(
while has_more and please_continue:
riot_match_request_time = time.time()

logger.info(f"Getting {start=} {size=}. {startTime=}")

apicall = partial(
api.match.filter,
puuid,
Expand All @@ -275,33 +291,46 @@ def import_recent_matches(
endTime=endTime,
queue=queue,
)
r = apicall()
logger.info('response: %s' % str(r))
riot_match_request_time = time.time() - riot_match_request_time
logger.info(
f"Riot API match filter request time : {riot_match_request_time}"
)
try:
if r.status_code == 404:
matches = []
else:
matches = r.json()
except Exception:
time.sleep(10)
retry_count = -1
matches = []
while retry_count < 7:
retry_count += 1
r = apicall()
if r.status_code == 404:
matches = []
else:
matches = r.json()
logger.debug('response: %s' % str(r))
riot_match_request_time = time.time() - riot_match_request_time
logger.debug(
f"Riot API match filter request time : {riot_match_request_time}"
)
try:
if r.status_code == 404:
matches = []
else:
matches = r.json()
break
except Exception:
time.sleep(2**retry_count)
return 0
if len(matches) > 0:
existing_ids = [x._id for x in Match.objects.filter(_id__in=matches)]
if existing_ids and break_on_match_found:
has_more = False
new_matches = list(set(matches) - set(existing_ids))
import_count += len(new_matches)
jobs = [(x, region) for x in new_matches]
with ThreadPool(processes=10) as pool:
start_time = time.perf_counter()
pool.starmap(pool_match_import, jobs)
logger.info(f'ThreadPool match import: {time.perf_counter() - start_time}')
start_time = time.perf_counter()
if use_celery:
jobs = []
for batch in batched(new_matches, 10):
for match_id in batch:
jobs.append(import_match.s(match_id, region))
result = group(jobs).apply_async()
while not result.ready():
time.sleep(1)
logger.info(f'Celery match import time: {time.perf_counter() - start_time}')
else:
jobs = [(x, region) for x in new_matches]
with ThreadPool(processes=10) as pool:
pool.starmap(pool_match_import, jobs)
logger.info(f'ThreadPool match import: {time.perf_counter() - start_time}')
else:
has_more = False
index += size
Expand Down Expand Up @@ -337,6 +366,56 @@ def bulk_import(puuid: str, last_import_time_hours: int = 24, count=200, offset=
import_recent_matches(offset, offset + count, puuid, region=summoner.region)


@app.task(name="match.huge_match_import_task")
def huge_match_import_task(days=60, break_early=True):
thresh = timezone.now() - timedelta(days=days)
thresh_epoch_ms = thresh.timestamp() * 1000
qs = Participant.objects.filter(
match__game_creation__gt=thresh_epoch_ms,
match__queue_id__in=[FLEX_QUEUE, SOLO_QUEUE],
puuid__isnull=False,
).exclude(
puuid__in=Summoner.objects.filter(
huge_match_import_at__gt=timezone.now() - timedelta(days=1),
).values('puuid')
).select_related("match").order_by('puuid').distinct('puuid')
count = qs.count()
logger.info(f"Found {count} participants for huge_match_import_task.")
imported = 0
batch = 5
for a, participants in enumerate(batched(qs.iterator(), batch)):
jobs = []
summoners = []
for b, participant in enumerate(participants):
i = (a * batch) + b
start_time = thresh
if summoner := Summoner.objects.filter(puuid=participant.puuid).first():
if break_early and summoner.huge_match_import_at and summoner.huge_match_import_at > thresh:
# only go back as far as we need to for this summoner
start_time = summoner.huge_match_import_at
logger.info(f"Importing back to {start_time=}")
jobs.append(import_recent_matches.s(
0,
10_000,
participant.puuid,
participant.match.region,
startTime=start_time,
use_celery=True,
))
if summoner:
summoner.huge_match_import_at = timezone.now()
summoners.append(summoner)
if i % 100 == 0:
logger.info(f"Finished importing {i} of {count} summoner's games.")
logger.info(f"Imported {imported} new games.")
result = group(jobs).apply_async()
while not result.ready():
time.sleep(1)
imported += sum(result.get())
Summoner.objects.bulk_update(summoners, fields=["huge_match_import_at"])
logger.info(f"Imported {imported} total new games.")


def get_top_played_with(
summoner_id,
team=True,
Expand Down
18 changes: 18 additions & 0 deletions player/migrations/0051_summoner_huge_match_import_at.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 5.1.1 on 2024-10-05 19:23

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('player', '0050_follow'),
]

operations = [
migrations.AddField(
model_name='summoner',
name='huge_match_import_at',
field=models.DateTimeField(db_index=True, null=True),
),
]
14 changes: 14 additions & 0 deletions player/migrations/0052_merge_20241005_2113.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Generated by Django 5.1.1 on 2024-10-05 21:13

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
('player', '0051_alter_summoner_simple_riot_id'),
('player', '0051_summoner_huge_match_import_at'),
]

operations = [
]
1 change: 1 addition & 0 deletions player/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class Summoner(models.Model):
ranked_import_count = models.IntegerField(default=0, blank=True)

last_summoner_page_import = models.DateTimeField(null=True)
huge_match_import_at = models.DateTimeField(null=True, db_index=True)
created_date = models.DateTimeField(default=timezone.now, db_index=True)

def __str__(self):
Expand Down
3 changes: 2 additions & 1 deletion requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ djangorestframework
factory_boy
gunicorn
hypothesis
lolwrapper @ git+https://github.com/brianjp93/lolapi.git@c6fd95185ad664eb01fed4f1367e1dc315459664
ipython
lolwrapper @ git+https://github.com/brianjp93/lolapi.git@a1d711ea453d27cc65a94a4c48287af976e94457
Pillow>=9.0.0
psycopg[binary]
python-decouple<=3.3
Expand Down
Loading

0 comments on commit 701670b

Please sign in to comment.