Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load handling/queue #68

Merged
merged 3 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 110 additions & 75 deletions dlx_dl/scripts/sync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Sync DL from DLX"""

from operator import index
import sys, os, re, json, time, argparse, unicodedata, requests, pytz
from copy import deepcopy
from warnings import warn
Expand All @@ -12,7 +11,7 @@
from io import StringIO
from xml.etree import ElementTree
from mongomock import MongoClient as MockClient
from pymongo import ASCENDING as ASC, DESCENDING as DESC
from pymongo import ASCENDING as ASC, DESCENDING as DESC, UpdateOne, DeleteOne
from bson import SON
from dlx import DB, Config
from dlx.marc import Query, Bib, BibSet, Auth, AuthSet, Datafield
Expand All @@ -30,12 +29,15 @@
def get_args(**kwargs):
parser = argparse.ArgumentParser(prog='dlx-dl-sync')

parser.add_argument('--limit', help='limit the number of exports', type=int, default=0)
parser.add_argument('--email', help='receive batch results by email instead of callback')
parser.add_argument('--delete_only', action='store_true')
#parser.add_argument('--delete_only', action='store_true')
parser.add_argument('--force', action='store_true')
parser.add_argument('--modified_since_log', action='store_true')

parser.add_argument('--limit', help='limit the number of exports', type=int, default=1000)
parser.add_argument('--time_limit', help='runtime limit in seconds', type=int, default=600)
parser.add_argument('--queue', action='store_true', help='try to export ercords in queue and add to queue if export exceeds limits')
parser.add_argument('--delete_only', action='store_true')

r = parser.add_argument_group('required')
r.add_argument('--source', required=True, help='an identity to use in the log')
r.add_argument('--type', required=True, choices=['bib', 'auth'])
Expand Down Expand Up @@ -102,16 +104,15 @@ def run(**kwargs):
blacklist = DB.handle[export.BLACKLIST_COLLECTION]
args.blacklisted = [x['symbol'] for x in blacklist.find({})]
args.log = DB.handle[LOG_COLLECTION]

HEADERS = {'Authorization': 'Token ' + args.api_key}

records = get_records(args)
records = get_records(args) # returns an interator (dlx.Marc.BibSet/AuthSet)
BATCH, BATCH_SIZE, SEEN, TOTAL, INDEX = [], 100, 0, records.count, {}
updated_count = 0
print(f'checking {TOTAL} records')

# check if last update indexed in DL yet
last = args.log.find_one({'source': args.source, 'record_type': args.type}, sort=[('time', DESC)]) or {}
last_10 = list(args.log.find({'source': args.source, 'record_type': args.type}, sort=[('time', DESC)], limit=10)) or []
last = last_10[0] if len(last_10) > 0 else {}

if last_new := args.log.find_one({'export_start': last.get('export_start') or 'X', 'export_type': 'NEW'}, sort=[('time', DESC)]):
last = last_new
Expand All @@ -120,7 +121,7 @@ def run(**kwargs):
pass
elif last is None:
raise Exception('No log data found for this source')
elif (datetime.now() - (last.get('time') or datetime.min)) > timedelta(hours=2): # skip check if more than 2 hours
elif (datetime.now() - (last.get('time') or datetime.min)) > timedelta(hours=3): # skip check if more than 3 hours
print("wait time limit exceeded for last import confirmation. proceeding")
elif last:
pre = '035__a:(DHL)' if args.type == 'bib' else '035__a:(DHLAUTH)'
Expand Down Expand Up @@ -159,14 +160,16 @@ def run(**kwargs):
# 005 is in local time
dl_last += timedelta(hours=4 if pytz.timezone('US/Eastern').localize(dl_last).dst() else 5)

if not dl_last > last['time']:
print(f'last update not cleared in DL yet ({args.type}# {last["record_id"]} @ {last["time"]})')
if last['time'] > dl_last:
print(f'last update not cleared in DL yet (UPDATE) ({args.type}# {last["record_id"]} @ {last["time"]})')
exit()

# cycle through records in batches
for record in records:
# cycle through records in batches
enqueue, to_remove = False, []

for i, record in enumerate(records):
BATCH.append(record)
SEEN += 1
SEEN = i + 1

# process DL batch
if len(BATCH) in (BATCH_SIZE, TOTAL) or SEEN == TOTAL:
Expand Down Expand Up @@ -241,26 +244,100 @@ def run(**kwargs):

if result:
updated_count += 1

###

# clear batch
BATCH = []

# remove from queue
to_remove.append(record)

# status
print('\b' * (len(str(SEEN)) + 4 + len(str(TOTAL))) + f'{SEEN} / {TOTAL} ', end='', flush=True)

#print(f'{SEEN} / {TOTAL} ', end='', flush=True)

# limits
if args.limit != 0 and updated_count == args.limit:
print('\nReached max exports')
enqueue = True if args.queue else False
break
if args.time_limit and datetime.now(timezone.utc) > args.START + timedelta(seconds=args.time_limit):
print('\nTime limit exceeded')
enqueue = True if args.queue else False
break

# end
if SEEN == TOTAL:
print(f'updated {updated_count} records')

return

if updated_count > (args.limit if args.limit > 0 else 1000):
print('Reached max exports')
break

print(f'updated {updated_count} records')
queue = DB.handle[export.QUEUE_COLLECTION]
updates = [DeleteOne({'type': args.type, 'record_id': x.id}) for x in to_remove]

if updates:
queue.bulk_write(updates)

if enqueue:
print('Submitting remaining records to the queue... ', end='', flush=True)
updates = []

for i, record in enumerate(records):
# records is a map object so the unprocessed records will be left over from the loop break
data = {'time': datetime.now(timezone.utc), 'source': args.source, 'type': args.type, 'record_id': record.id}
updates.append(UpdateOne({'source': args.source, 'type': args.type, 'record_id': record.id}, {'$setOnInsert': data}, upsert=True))

if updates:
result = queue.bulk_write(updates)
print(f'{result.upserted_count} added. {i + 1 - result.upserted_count} were already in the queue')

print(f'Updated {updated_count} records')

def get_records_by_date(cls, date_from, date_to=None, delete_only=False):
"""
Returns
-------
BibSet / AuthSet
"""
if cls == BibSet:
fft_symbols = export._new_file_symbols(date_from, date_to)

if len(fft_symbols) > 10000:
raise Exception('that\'s too many file symbols to look up, sorry :(')

print(f'found files for {len(fft_symbols)} symbols')

criteria = SON({'$gte': date_from})

if date_to:
criteria['$lte'] = date_to

query = {'$or': [{'updated': criteria}, {'191.subfields.value': {'$in': fft_symbols}}]} if cls == BibSet \
else {'updated': criteria}

# sort to ensure latest updates are checked first
rset = cls.from_query(query, sort=[('updated', DESC)])

return rset

hist = DB.handle['bib_history'] if cls == BibSet else DB.handle['auth_history']
deleted = list(hist.find({'deleted.time': {'$gte': date_from}}))

if deleted:
if delete_only:
rset.records = []

rcls = Bib if cls == BibSet else Auth
records = list(rset.records)
to_delete = []

for d in deleted:
r = rcls({'_id': d['_id']})
r.set('980', 'a', 'DELETED')
r.updated = d['deleted']['time']
to_delete.append(r)

rset.records = (r for r in records + to_delete) # program is expecting an iterable

return rset

def get_records(args, log=None, queue=None):
cls = BibSet if args.type == 'bib' else AuthSet
since, to = None, None
Expand All @@ -282,7 +359,7 @@ def get_records(args, log=None, queue=None):
since = datetime.fromisoformat(args.modified_from)
records = get_records_by_date(cls, since, to, delete_only=args.delete_only)
elif args.modified_to:
raise Exception('--modified_to not valid without --modified_within')
raise Exception('--modified_to not valid without --modified_from')
elif args.modified_since_log:
c = log.find({'source': args.source, 'record_type': args.type, 'export_end': {'$exists': 1}}, sort=[('export_start', DESCENDING)], limit=1)
last = next(c, None)
Expand Down Expand Up @@ -311,57 +388,15 @@ def get_records(args, log=None, queue=None):
else:
raise Exception('One of the criteria arguments is required')

hist = DB.handle['bib_history'] if cls == BibSet else DB.handle['auth_history']

return records

def get_records_by_date(cls, date_from, date_to=None, delete_only=False):
"""
Returns
-------
BibSet / AuthSet
"""
if cls == BibSet:
fft_symbols = export._new_file_symbols(date_from, date_to)

if len(fft_symbols) > 10000:
raise Exception('that\'s too many file symbols to look up, sorry :(')

print(f'found files for {len(fft_symbols)} symbols')

criteria = SON({'$gte': date_from})

if date_to:
criteria['$lte'] = date_to

query = {'$or': [{'updated': criteria}, {'191.subfields.value': {'$in': fft_symbols}}]} if cls == BibSet \
else {'updated': criteria}

rset = cls.from_query(query)

return rset

hist = DB.handle['bib_history'] if cls == BibSet else DB.handle['auth_history']
deleted = list(hist.find({'deleted.time': {'$gte': date_from}}))

if deleted:
if delete_only:
rset.records = []
if args.queue:
queue = DB.handle[export.QUEUE_COLLECTION]
qids = [x['record_id'] for x in queue.find({'type': args.type})]
print(f'Taking {len(qids)} from queue')
q_args, q_kwargs = records.query_params
records = cls.from_query({'$or': [{'_id': {'$in': list(qids)}}, q_args[0]]}, sort=(['updated', ASC]))

rcls = Bib if cls == BibSet else Auth
records = list(rset.records)
to_delete = []

for d in deleted:
r = rcls({'_id': d['_id']})
r.set('980', 'a', 'DELETED')
r.updated = d['deleted']['time']
to_delete.append(r)

rset.records = (r for r in records + to_delete) # program is expecting an iterable
return records

return rset

def clean_values(record):
for field in record.datafields:
for sub in filter(lambda x: not hasattr(x, 'xref'), field.subfields):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ cffi==1.15.1
charset-normalizer==2.1.1
click==8.1.3
cryptography==41.0.2
dlx @ git+https://github.com/dag-hammarskjold-library/dlx@95972bbbd3505fc7be2c44cca85b71ca87f56464
dlx @ git+https://github.com/dag-hammarskjold-library/dlx@9f8100087561dbade56a47043371d56b9124b1d6
idna==3.4
Jinja2==3.1.2
jmespath==1.0.1
Expand Down
Loading