Skip to content

Commit

Permalink
task changes
Browse files Browse the repository at this point in the history
  • Loading branch information
brianjp93 committed Oct 7, 2024
1 parent 99ac658 commit 0ed0720
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 18 deletions.
2 changes: 1 addition & 1 deletion fly.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ kill_timeout = "5s"
[processes]
beat = "celery -A lolsite beat -l INFO"
web = "gunicorn --bind :8000 --workers 10 lolsite.wsgi:application"
worker = "celery -A lolsite worker --concurrency=10 --loglevel=INFO"
worker = "celery -A lolsite worker --concurrency=12 --loglevel=INFO"

[[services]]
protocol = "tcp"
Expand Down
33 changes: 16 additions & 17 deletions match/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,6 @@ def import_recent_matches(
while has_more and please_continue:
riot_match_request_time = time.time()

logger.info(f"Getting {start=} {size=}. {startTime=}")

apicall = partial(
api.match.filter,
puuid,
Expand Down Expand Up @@ -324,7 +322,9 @@ def import_recent_matches(
jobs = [(x, region) for x in new_matches]
with ThreadPool(processes=10) as pool:
pool.starmap(pool_match_import, jobs)
logger.info(f'ThreadPool match import: {time.perf_counter() - start_time}')
pool.close()
pool.join()
logger.info(f'ThreadPool match import: {time.perf_counter() - start_time}')
if len(matches) < size:
has_more = False
else:
Expand Down Expand Up @@ -363,7 +363,7 @@ def bulk_import(puuid: str, last_import_time_hours: int = 24, count=200, offset=


@app.task(name="match.tasks.huge_match_import_task")
def huge_match_import_task(hours_thresh=24, exclude_hours=6, break_early=True):
def huge_match_import_task(hours_thresh=72, exclude_hours=24, break_early=True):
thresh = timezone.now() - timedelta(hours=hours_thresh)
thresh_epoch_ms = thresh.timestamp() * 1000
qs = Participant.objects.filter(
Expand All @@ -373,27 +373,28 @@ def huge_match_import_task(hours_thresh=24, exclude_hours=6, break_early=True):
puuid__isnull=False,
puuid__in=Summoner.objects.all().values('puuid')
)
if exclude_hours:
qs = qs.exclude(
puuid__in=Summoner.objects.filter(
huge_match_import_at__gt=timezone.now() - timedelta(hours=exclude_hours),
).values('puuid')
)
qs = qs.exclude(
puuid__in=Summoner.objects.filter(
huge_match_import_at__gt=timezone.now() - timedelta(hours=exclude_hours),
).values('puuid')
)
qs = qs.select_related("match").order_by('puuid').distinct('puuid')

count = qs.count()
remaining_count = count
logger.info(f"Found {count} participants for huge_match_import_task.")
batch = 10
i = -1
while participants_query_batch := qs.all()[:100]:
while qs.all().exists():
remaining_count = qs.count()
logger.info(f"Found {remaining_count} more participants for import.")
for participants in batched(participants_query_batch, batch):
logger.info(f"Query loop. Found {remaining_count} new participants.")
i = -1
for participants in batched(qs.all().iterator(5000), batch):
jobs = []
summoners = []
for participant in participants:
i += 1
if i % 100 == 0:
logger.info(f"Finished importing {i} participants of about {remaining_count}.")
start_time = thresh
if summoner := Summoner.objects.filter(puuid=participant.puuid).first():
if break_early and summoner.huge_match_import_at and summoner.huge_match_import_at > thresh:
Expand All @@ -407,13 +408,11 @@ def huge_match_import_task(hours_thresh=24, exclude_hours=6, break_early=True):
participant.puuid,
participant.match.region,
startTime=start_time,
use_celery=True,
use_celery=False,
))
if summoner:
summoner.huge_match_import_at = import_time
summoners.append(summoner)
if i % 100 == 0:
logger.info(f"Finished importing {i} participants. About {remaining_count} remaining.")
result = group(jobs).apply_async()
while not result.ready():
time.sleep(1)
Expand Down

0 comments on commit 0ed0720

Please sign in to comment.