Skip to content

Commit

Permalink
implement queue
Browse files Browse the repository at this point in the history
  • Loading branch information
J. Bukhari authored and J. Bukhari committed Feb 27, 2024
1 parent b30049a commit 834d816
Showing 1 changed file with 110 additions and 75 deletions.
185 changes: 110 additions & 75 deletions dlx_dl/scripts/sync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Sync DL from DLX"""

from operator import index
import sys, os, re, json, time, argparse, unicodedata, requests, pytz
from copy import deepcopy
from warnings import warn
Expand All @@ -12,7 +11,7 @@
from io import StringIO
from xml.etree import ElementTree
from mongomock import MongoClient as MockClient
from pymongo import ASCENDING as ASC, DESCENDING as DESC
from pymongo import ASCENDING as ASC, DESCENDING as DESC, UpdateOne, DeleteOne
from bson import SON
from dlx import DB, Config
from dlx.marc import Query, Bib, BibSet, Auth, AuthSet, Datafield
Expand All @@ -30,12 +29,15 @@
def get_args(**kwargs):
parser = argparse.ArgumentParser(prog='dlx-dl-sync')

parser.add_argument('--limit', help='limit the number of exports', type=int, default=0)
parser.add_argument('--email', help='receive batch results by email instead of callback')
parser.add_argument('--delete_only', action='store_true')
#parser.add_argument('--delete_only', action='store_true')
parser.add_argument('--force', action='store_true')
parser.add_argument('--modified_since_log', action='store_true')

parser.add_argument('--limit', help='limit the number of exports', type=int, default=1000)
parser.add_argument('--time_limit', help='runtime limit in seconds', type=int, default=600)
parser.add_argument('--queue', action='store_true', help='try to export ercords in queue and add to queue if export exceeds limits')
parser.add_argument('--delete_only', action='store_true')

r = parser.add_argument_group('required')
r.add_argument('--source', required=True, help='an identity to use in the log')
r.add_argument('--type', required=True, choices=['bib', 'auth'])
Expand Down Expand Up @@ -102,16 +104,15 @@ def run(**kwargs):
blacklist = DB.handle[export.BLACKLIST_COLLECTION]
args.blacklisted = [x['symbol'] for x in blacklist.find({})]
args.log = DB.handle[LOG_COLLECTION]

HEADERS = {'Authorization': 'Token ' + args.api_key}

records = get_records(args)
records = get_records(args) # returns an interator (dlx.Marc.BibSet/AuthSet)
BATCH, BATCH_SIZE, SEEN, TOTAL, INDEX = [], 100, 0, records.count, {}
updated_count = 0
print(f'checking {TOTAL} records')

# check if last update indexed in DL yet
last = args.log.find_one({'source': args.source, 'record_type': args.type}, sort=[('time', DESC)]) or {}
last_10 = list(args.log.find({'source': args.source, 'record_type': args.type}, sort=[('time', DESC)], limit=10)) or []
last = last_10[0] if len(last_10) > 0 else {}

if last_new := args.log.find_one({'export_start': last.get('export_start') or 'X', 'export_type': 'NEW'}, sort=[('time', DESC)]):
last = last_new
Expand All @@ -120,8 +121,8 @@ def run(**kwargs):
pass
elif last is None:
raise Exception('No log data found for this source')
#elif (datetime.now() - (last.get('time') or datetime.min)) > timedelta(hours=2): # skip check if more than 2 hours
# print("wait time limit exceeded for last import confirmation. proceeding")
elif (datetime.now() - (last.get('time') or datetime.min)) > timedelta(hours=3): # skip check if more than 3 hours
print("wait time limit exceeded for last import confirmation. proceeding")
elif last:
pre = '035__a:(DHL)' if args.type == 'bib' else '035__a:(DHLAUTH)'
url = f'{API_SEARCH_URL}?search_id=&p={pre}{last["record_id"]}&format=xml'
Expand Down Expand Up @@ -159,14 +160,16 @@ def run(**kwargs):
# 005 is in local time
dl_last += timedelta(hours=4 if pytz.timezone('US/Eastern').localize(dl_last).dst() else 5)

if not dl_last > last['time']:
if last['time'] > dl_last:
print(f'last update not cleared in DL yet (UPDATE) ({args.type}# {last["record_id"]} @ {last["time"]})')
exit()

# cycle through records in batches
for record in records:
# cycle through records in batches
enqueue, to_remove = False, []

for i, record in enumerate(records):
BATCH.append(record)
SEEN += 1
SEEN = i + 1

# process DL batch
if len(BATCH) in (BATCH_SIZE, TOTAL) or SEEN == TOTAL:
Expand Down Expand Up @@ -241,26 +244,100 @@ def run(**kwargs):

if result:
updated_count += 1

###

# clear batch
BATCH = []

# remove from queue
to_remove.append(record)

# status
print('\b' * (len(str(SEEN)) + 4 + len(str(TOTAL))) + f'{SEEN} / {TOTAL} ', end='', flush=True)

#print(f'{SEEN} / {TOTAL} ', end='', flush=True)

# limits
if args.limit != 0 and updated_count == args.limit:
print('\nReached max exports')
enqueue = True if args.queue else False
break
if args.time_limit and datetime.now(timezone.utc) > args.START + timedelta(seconds=args.time_limit):
print('\nTime limit exceeded')
enqueue = True if args.queue else False
break

# end
if SEEN == TOTAL:
print(f'updated {updated_count} records')

return

if updated_count > (args.limit if args.limit > 0 else 1000):
print('Reached max exports')
break

print(f'updated {updated_count} records')
queue = DB.handle[export.QUEUE_COLLECTION]
updates = [DeleteOne({'type': args.type, 'record_id': x.id}) for x in to_remove]

if updates:
queue.bulk_write(updates)

if enqueue:
print('Submitting remaining records to the queue... ', end='', flush=True)
updates = []

for i, record in enumerate(records):
# records is a map object so the unprocessed records will be left over from the loop break
data = {'time': datetime.now(timezone.utc), 'source': args.source, 'type': args.type, 'record_id': record.id}
updates.append(UpdateOne({'source': args.source, 'type': args.type, 'record_id': record.id}, {'$setOnInsert': data}, upsert=True))

if updates:
result = queue.bulk_write(updates)
print(f'{result.upserted_count} added. {i + 1 - result.upserted_count} were already in the queue')

print(f'Updated {updated_count} records')

def get_records_by_date(cls, date_from, date_to=None, delete_only=False):
"""
Returns
-------
BibSet / AuthSet
"""
if cls == BibSet:
fft_symbols = export._new_file_symbols(date_from, date_to)

if len(fft_symbols) > 10000:
raise Exception('that\'s too many file symbols to look up, sorry :(')

print(f'found files for {len(fft_symbols)} symbols')

criteria = SON({'$gte': date_from})

if date_to:
criteria['$lte'] = date_to

query = {'$or': [{'updated': criteria}, {'191.subfields.value': {'$in': fft_symbols}}]} if cls == BibSet \
else {'updated': criteria}

# sort to ensure latest updates are checked first
rset = cls.from_query(query, sort=[('updated', DESC)])

return rset

hist = DB.handle['bib_history'] if cls == BibSet else DB.handle['auth_history']
deleted = list(hist.find({'deleted.time': {'$gte': date_from}}))

if deleted:
if delete_only:
rset.records = []

rcls = Bib if cls == BibSet else Auth
records = list(rset.records)
to_delete = []

for d in deleted:
r = rcls({'_id': d['_id']})
r.set('980', 'a', 'DELETED')
r.updated = d['deleted']['time']
to_delete.append(r)

rset.records = (r for r in records + to_delete) # program is expecting an iterable

return rset

def get_records(args, log=None, queue=None):
cls = BibSet if args.type == 'bib' else AuthSet
since, to = None, None
Expand All @@ -282,7 +359,7 @@ def get_records(args, log=None, queue=None):
since = datetime.fromisoformat(args.modified_from)
records = get_records_by_date(cls, since, to, delete_only=args.delete_only)
elif args.modified_to:
raise Exception('--modified_to not valid without --modified_within')
raise Exception('--modified_to not valid without --modified_from')
elif args.modified_since_log:
c = log.find({'source': args.source, 'record_type': args.type, 'export_end': {'$exists': 1}}, sort=[('export_start', DESCENDING)], limit=1)
last = next(c, None)
Expand Down Expand Up @@ -311,57 +388,15 @@ def get_records(args, log=None, queue=None):
else:
raise Exception('One of the criteria arguments is required')

hist = DB.handle['bib_history'] if cls == BibSet else DB.handle['auth_history']

return records

def get_records_by_date(cls, date_from, date_to=None, delete_only=False):
"""
Returns
-------
BibSet / AuthSet
"""
if cls == BibSet:
fft_symbols = export._new_file_symbols(date_from, date_to)

if len(fft_symbols) > 10000:
raise Exception('that\'s too many file symbols to look up, sorry :(')

print(f'found files for {len(fft_symbols)} symbols')

criteria = SON({'$gte': date_from})

if date_to:
criteria['$lte'] = date_to

query = {'$or': [{'updated': criteria}, {'191.subfields.value': {'$in': fft_symbols}}]} if cls == BibSet \
else {'updated': criteria}

rset = cls.from_query(query)

return rset

hist = DB.handle['bib_history'] if cls == BibSet else DB.handle['auth_history']
deleted = list(hist.find({'deleted.time': {'$gte': date_from}}))

if deleted:
if delete_only:
rset.records = []
if args.queue:
queue = DB.handle[export.QUEUE_COLLECTION]
qids = [x['record_id'] for x in queue.find({'type': args.type})]
print(f'Taking {len(qids)} from queue')
q_args, q_kwargs = records.query_params
records = cls.from_query({'$or': [{'_id': {'$in': list(qids)}}, q_args[0]]}, sort=(['updated', ASC]))

rcls = Bib if cls == BibSet else Auth
records = list(rset.records)
to_delete = []

for d in deleted:
r = rcls({'_id': d['_id']})
r.set('980', 'a', 'DELETED')
r.updated = d['deleted']['time']
to_delete.append(r)

rset.records = (r for r in records + to_delete) # program is expecting an iterable
return records

return rset

def clean_values(record):
for field in record.datafields:
for sub in filter(lambda x: not hasattr(x, 'xref'), field.subfields):
Expand Down

0 comments on commit 834d816

Please sign in to comment.