Skip to content

Commit

Permalink
huge import changes
Browse files Browse the repository at this point in the history
  • Loading branch information
brianjp93 committed Oct 6, 2024
1 parent c3473aa commit 174ac3f
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 105 deletions.
213 changes: 110 additions & 103 deletions match/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@

ROLES = ["top", "jg", "mid", "adc", "sup"]
logger = logging.getLogger(__name__)
api = get_riot_api()


class RateLimitError(Exception):
Expand All @@ -80,9 +81,6 @@ def import_match(match_id, region, refresh=False):
None
"""
api = get_riot_api()
if not api:
return
retry_count = -1
match = None
while retry_count < 7:
Expand All @@ -104,7 +102,6 @@ def import_match(match_id, region, refresh=False):


def fetch_match_json(match_id: str, region: str, refresh=False):
api = get_riot_api()
r = api.match.get(match_id, region=region)
match = r.content
if r.status_code == 429:
Expand Down Expand Up @@ -268,73 +265,73 @@ def import_recent_matches(
"""
has_more = True
api = get_riot_api()
import_count = 0
if api:
index = start
size = 100
if index + size > end:
size = end - start
please_continue = True
while has_more and please_continue:
riot_match_request_time = time.time()

logger.info(f"Getting {start=} {size=}. {startTime=}")

apicall = partial(
api.match.filter,
puuid,
region=region,
start=index,
count=size,
startTime=startTime,
endTime=endTime,
queue=queue,
index = start
size = 100
if index + size > end:
size = end - start
please_continue = True
while has_more and please_continue:
riot_match_request_time = time.time()

logger.info(f"Getting {start=} {size=}. {startTime=}")

apicall = partial(
api.match.filter,
puuid,
region=region,
start=index,
count=size,
startTime=startTime,
endTime=endTime,
queue=queue,
)
retry_count = -1
matches = []
while retry_count < 7:
retry_count += 1
r = apicall()
logger.debug('response: %s' % str(r))
riot_match_request_time = time.time() - riot_match_request_time
logger.debug(
f"Riot API match filter request time : {riot_match_request_time}"
)
retry_count = -1
matches = []
while retry_count < 7:
retry_count += 1
r = apicall()
logger.debug('response: %s' % str(r))
riot_match_request_time = time.time() - riot_match_request_time
logger.debug(
f"Riot API match filter request time : {riot_match_request_time}"
)
try:
if r.status_code == 404:
matches = []
else:
matches = r.json()
break
except Exception:
time.sleep(2**retry_count)
if len(matches) > 0:
existing_ids = [x._id for x in Match.objects.filter(_id__in=matches)]
if existing_ids and break_on_match_found:
has_more = False
new_matches = list(set(matches) - set(existing_ids))
import_count += len(new_matches)
start_time = time.perf_counter()
if use_celery:
jobs = []
for batch in batched(new_matches, 10):
for match_id in batch:
jobs.append(import_match.s(match_id, region))
result = group(jobs).apply_async()
while not result.ready():
time.sleep(1)
logger.info(f'Celery match import time: {time.perf_counter() - start_time}')
try:
if r.status_code == 404:
matches = []
else:
jobs = [(x, region) for x in new_matches]
with ThreadPool(processes=10) as pool:
pool.starmap(pool_match_import, jobs)
logger.info(f'ThreadPool match import: {time.perf_counter() - start_time}')
matches = r.json()
break
except Exception:
time.sleep(2**retry_count)
if len(matches) > 0:
existing_ids = [x._id for x in Match.objects.filter(_id__in=matches)]
if existing_ids and break_on_match_found:
has_more = False
new_matches = list(set(matches) - set(existing_ids))
import_count += len(new_matches)
start_time = time.perf_counter()
if use_celery:
jobs = []
for batch in batched(new_matches, 10):
for match_id in batch:
jobs.append(import_match.s(match_id, region))
result = group(jobs).apply_async()
while not result.ready():
time.sleep(1)
logger.info(f'Celery match import time: {time.perf_counter() - start_time}')
else:
jobs = [(x, region) for x in new_matches]
with ThreadPool(processes=10) as pool:
pool.starmap(pool_match_import, jobs)
logger.info(f'ThreadPool match import: {time.perf_counter() - start_time}')
if len(matches) < size:
has_more = False
index += size
if index >= end:
please_continue = False
else:
has_more = False
index += size
if index >= end:
please_continue = False
return import_count


Expand Down Expand Up @@ -365,51 +362,62 @@ def bulk_import(puuid: str, last_import_time_hours: int = 24, count=200, offset=
import_recent_matches(offset, offset + count, puuid, region=summoner.region)


@app.task(name="match.huge_match_import_task")
def huge_match_import_task(days=7, break_early=True):
thresh = timezone.now() - timedelta(days=days)
@app.task(name="match.tasks.huge_match_import_task")
def huge_match_import_task(hours_thresh=24, exclude_hours=6, break_early=True):
thresh = timezone.now() - timedelta(hours=hours_thresh)
thresh_epoch_ms = thresh.timestamp() * 1000
qs = Participant.objects.filter(
match__game_creation__gt=thresh_epoch_ms,
match__queue_id__in=[FLEX_QUEUE, SOLO_QUEUE],
match__platform_id="NA1",
puuid__isnull=False,
).exclude(
puuid__in=Summoner.objects.filter(
huge_match_import_at__gt=timezone.now() - timedelta(hours=12),
).values('puuid')
).select_related("match").order_by('puuid').distinct('puuid')
puuid__in=Summoner.objects.all().values('puuid')
)
if exclude_hours:
qs = qs.exclude(
puuid__in=Summoner.objects.filter(
huge_match_import_at__gt=timezone.now() - timedelta(hours=exclude_hours),
).values('puuid')
)
qs = qs.select_related("match").order_by('puuid').distinct('puuid')

count = qs.count()
remaining_count = count
logger.info(f"Found {count} participants for huge_match_import_task.")
batch = 5
for a, participants in enumerate(batched(qs.iterator(), batch)):
jobs = []
summoners = []
for b, participant in enumerate(participants):
i = (a * batch) + b
start_time = thresh
if summoner := Summoner.objects.filter(puuid=participant.puuid).first():
if break_early and summoner.huge_match_import_at and summoner.huge_match_import_at > thresh:
# only go back as far as we need to for this summoner
start_time = summoner.huge_match_import_at
logger.info(f"Importing back to {start_time=}")
jobs.append(import_recent_matches.s(
0,
10_000,
participant.puuid,
participant.match.region,
startTime=start_time,
use_celery=True,
))
if summoner:
summoner.huge_match_import_at = timezone.now()
summoners.append(summoner)
if i % 100 == 0:
logger.info(f"Finished importing {i} of {count} summoner's games.")
result = group(jobs).apply_async()
while not result.ready():
time.sleep(1)
Summoner.objects.bulk_update(summoners, fields=["huge_match_import_at"])
i = -1
while participants_query_batch := qs.all()[:100]:
remaining_count = qs.count()
logger.info(f"Found {remaining_count} more participants for import.")
for participants in batched(participants_query_batch, batch):
jobs = []
summoners = []
for participant in participants:
i += 1
start_time = thresh
if summoner := Summoner.objects.filter(puuid=participant.puuid).first():
if break_early and summoner.huge_match_import_at and summoner.huge_match_import_at > thresh:
# only go back as far as we need to for this summoner
start_time = summoner.huge_match_import_at
logger.info(f"Importing back to {start_time=}")
import_time = timezone.now()
jobs.append(import_recent_matches.s(
0,
10_000,
participant.puuid,
participant.match.region,
startTime=start_time,
use_celery=True,
))
if summoner:
summoner.huge_match_import_at = import_time
summoners.append(summoner)
if i % 100 == 0:
logger.info(f"Finished importing {i} participants. About {remaining_count} remaining.")
result = group(jobs).apply_async()
while not result.ready():
time.sleep(1)
Summoner.objects.bulk_update(summoners, fields=["huge_match_import_at"])
logger.info("huge_match_import_task finished.")


Expand Down Expand Up @@ -522,7 +530,6 @@ def import_advanced_timeline(match_id: str, overwrite=False):
if overwrite and hasattr(match, 'advancedtimeline_id'):
assert match.advancedtimeline
match.advancedtimeline.delete()
api = get_riot_api()
region = match.platform_id.lower()
logger.info(f"Requesting info for match {match.id} in region {region}")
try:
Expand Down
4 changes: 2 additions & 2 deletions player/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def count(self):


class SummonerAdmin(admin.ModelAdmin):
list_display = ("simple_name", "region")
search_fields = ("simple_name",)
list_display = ("simple_riot_id", "region")
search_fields = ("simple_riot_id",)
list_select_related = True
raw_id_fields = ("user", "pro")
show_full_result_count = False
Expand Down

0 comments on commit 174ac3f

Please sign in to comment.