Skip to content

Commit

Permalink
optimize import memory consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
beda42 committed May 10, 2021
1 parent 58f8ce7 commit ce78db6
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 200 deletions.
2 changes: 1 addition & 1 deletion apps/logs/logic/attempt_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def import_new_sushi_attempts():
count = queryset.count()
logger.info('Found %d unprocessed successful download attempts matching criteria', count)
for i, attempt in enumerate(queryset):
logger.info('----- Importing attempt #%d -----', i)
logger.info('----- Importing attempt #%d (pk: %d) -----', i, attempt.pk)
try:
import_one_sushi_attempt(attempt)
except Exception as e:
Expand Down
28 changes: 17 additions & 11 deletions apps/logs/logic/data_import.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from collections import Counter, namedtuple
from datetime import date
from typing import Optional
from typing import Optional, Tuple, Set

from core.logic.debug import log_memory
from logs.logic.validation import clean_and_validate_issn, ValidationError, normalize_isbn
Expand Down Expand Up @@ -244,7 +244,9 @@ def import_counter_records(
to_compare[key] = (pk, value)
# make the comparison
log_memory('XX2')
dicts_to_insert = []
als_to_insert = []
target_date_tuples = set()
max_batch_size = 100_000
for key, value in to_insert.items():
db_pk, db_value = to_compare.get(key, (None, None))
if db_pk:
Expand All @@ -257,21 +259,26 @@ def import_counter_records(
else:
rec = dict(key)
rec['value'] = value
dicts_to_insert.append(rec)
als_to_insert.append(AccessLog(import_batch=import_batch, **rec))
if rec['target_id'] is not None:
target_date_tuples.add((rec['target_id'], rec['date']))
if len(als_to_insert) >= max_batch_size:
log_memory('Batch create')
AccessLog.objects.bulk_create(als_to_insert)
stats['new logs'] += len(als_to_insert)
als_to_insert = []
# now insert the records that are clean to be inserted
log_memory('XX3')
AccessLog.objects.bulk_create(
[AccessLog(import_batch=import_batch, **rec) for rec in dicts_to_insert]
)
stats['new logs'] += len(dicts_to_insert)
AccessLog.objects.bulk_create(als_to_insert)
stats['new logs'] += len(als_to_insert)
log_memory('XX4')
# and insert the PlatformTitle links
stats.update(create_platformtitle_links(organization, platform, dicts_to_insert))
stats.update(create_platformtitle_links(organization, platform, target_date_tuples))
log_memory('XX5')
return stats


def create_platformtitle_links(organization, platform, records: [dict]):
def create_platformtitle_links(organization, platform, target_date_tuples: Set[Tuple]):
"""
Takes list of dicts that are used to create AccessLogs in `import_counter_records`
and creates the explicit PlatformTitle objects from the data
Expand All @@ -280,10 +287,9 @@ def create_platformtitle_links(organization, platform, records: [dict]):
(pt.title_id, pt.date.isoformat())
for pt in PlatformTitle.objects.filter(organization=organization, platform=platform)
}
tuples = {(rec['target_id'], rec['date']) for rec in records if rec['target_id'] is not None}
pts = []
before_count = PlatformTitle.objects.count()
for title_id, rec_date in tuples - existing:
for title_id, rec_date in target_date_tuples - existing:
pts.append(
PlatformTitle(
organization=organization, platform=platform, title_id=title_id, date=rec_date
Expand Down
Loading

0 comments on commit ce78db6

Please sign in to comment.