diff --git a/dlx_dl/scripts/sync/__init__.py b/dlx_dl/scripts/sync/__init__.py index 89ca591..57bc0da 100644 --- a/dlx_dl/scripts/sync/__init__.py +++ b/dlx_dl/scripts/sync/__init__.py @@ -1,6 +1,5 @@ """Sync DL from DLX""" -from operator import index import sys, os, re, json, time, argparse, unicodedata, requests, pytz from copy import deepcopy from warnings import warn @@ -12,7 +11,7 @@ from io import StringIO from xml.etree import ElementTree from mongomock import MongoClient as MockClient -from pymongo import ASCENDING as ASC, DESCENDING as DESC +from pymongo import ASCENDING as ASC, DESCENDING as DESC, UpdateOne, DeleteOne from bson import SON from dlx import DB, Config from dlx.marc import Query, Bib, BibSet, Auth, AuthSet, Datafield @@ -30,11 +29,13 @@ def get_args(**kwargs): parser = argparse.ArgumentParser(prog='dlx-dl-sync') - parser.add_argument('--limit', help='limit the number of exports', type=int, default=0) parser.add_argument('--email', help='receive batch results by email instead of callback') - parser.add_argument('--delete_only', action='store_true') + #parser.add_argument('--delete_only', action='store_true') parser.add_argument('--force', action='store_true') parser.add_argument('--modified_since_log', action='store_true') + parser.add_argument('--limit', help='limit the number of exports', type=int, default=1000) + parser.add_argument('--time_limit', help='runtime limit in seconds', type=int, default=600) + parser.add_argument('--queue', action='store_true', help='try to export ercords in queue and add to queue if export exceeds limits') r = parser.add_argument_group('required') r.add_argument('--source', required=True, help='an identity to use in the log') @@ -102,16 +103,15 @@ def run(**kwargs): blacklist = DB.handle[export.BLACKLIST_COLLECTION] args.blacklisted = [x['symbol'] for x in blacklist.find({})] args.log = DB.handle[LOG_COLLECTION] - HEADERS = {'Authorization': 'Token ' + args.api_key} - - records = get_records(args) + records = get_records(args) # returns an interator (dlx.Marc.BibSet/AuthSet) BATCH, BATCH_SIZE, SEEN, TOTAL, INDEX = [], 100, 0, records.count, {} updated_count = 0 print(f'checking {TOTAL} records') # check if last update indexed in DL yet - last = args.log.find_one({'source': args.source, 'record_type': args.type}, sort=[('time', DESC)]) or {} + last_10 = args.log.find({'source': args.source, 'record_type': args.type}, sort=[('time', DESC)], limit=10) or [] + last = last_10[0] if last_10 else {} if last_new := args.log.find_one({'export_start': last.get('export_start') or 'X', 'export_type': 'NEW'}, sort=[('time', DESC)]): last = last_new @@ -120,8 +120,8 @@ def run(**kwargs): pass elif last is None: raise Exception('No log data found for this source') - #elif (datetime.now() - (last.get('time') or datetime.min)) > timedelta(hours=2): # skip check if more than 2 hours - # print("wait time limit exceeded for last import confirmation. proceeding") + elif (datetime.now() - (last.get('time') or datetime.min)) > timedelta(hours=3): # skip check if more than 3 hours + print("wait time limit exceeded for last import confirmation. proceeding") elif last: pre = '035__a:(DHL)' if args.type == 'bib' else '035__a:(DHLAUTH)' url = f'{API_SEARCH_URL}?search_id=&p={pre}{last["record_id"]}&format=xml' @@ -159,14 +159,16 @@ def run(**kwargs): # 005 is in local time dl_last += timedelta(hours=4 if pytz.timezone('US/Eastern').localize(dl_last).dst() else 5) - if not dl_last > last['time']: + if last['time'] > dl_last: print(f'last update not cleared in DL yet (UPDATE) ({args.type}# {last["record_id"]} @ {last["time"]})') exit() - # cycle through records in batches - for record in records: + # cycle through records in batches + enqueue, to_remove = False, [] + + for i, record in enumerate(records): BATCH.append(record) - SEEN += 1 + SEEN = i + 1 # process DL batch if len(BATCH) in (BATCH_SIZE, TOTAL) or SEEN == TOTAL: @@ -241,26 +243,100 @@ def run(**kwargs): if result: updated_count += 1 - - ### + # clear batch BATCH = [] + + # remove from queue + to_remove.append(record) # status print('\b' * (len(str(SEEN)) + 4 + len(str(TOTAL))) + f'{SEEN} / {TOTAL} ', end='', flush=True) - + #print(f'{SEEN} / {TOTAL} ', end='', flush=True) + + # limits + if args.limit != 0 and updated_count == args.limit: + print('\nReached max exports') + enqueue = True if args.queue else False + break + if args.time_limit and datetime.now(timezone.utc) > args.START + timedelta(seconds=args.time_limit): + print('\nTime limit exceeded') + enqueue = True if args.queue else False + break + # end if SEEN == TOTAL: - print(f'updated {updated_count} records') - - return - - if updated_count > (args.limit if args.limit > 0 else 1000): - print('Reached max exports') break - print(f'updated {updated_count} records') + queue = DB.handle[export.QUEUE_COLLECTION] + updates = [DeleteOne({'type': args.type, 'record_id': x.id}) for x in to_remove] + + if updates: + queue.bulk_write(updates) + + if enqueue: + print('Submitting remaining records to the queue... ', end='', flush=True) + updates = [] + + for i, record in enumerate(records): + # records is a map object so the unprocessed records will be left over from the loop break + data = {'time': datetime.now(timezone.utc), 'source': args.source, 'type': args.type, 'record_id': record.id} + updates.append(UpdateOne({'source': args.source, 'type': args.type, 'record_id': record.id}, {'$setOnInsert': data}, upsert=True)) + + if updates: + result = queue.bulk_write(updates) + print(f'{result.upserted_count} added. {i + 1 - result.upserted_count} were already in the queue') + + print(f'Updated {updated_count} records') + +def get_records_by_date(cls, date_from, date_to=None, delete_only=False): + """ + Returns + ------- + BibSet / AuthSet + """ + if cls == BibSet: + fft_symbols = export._new_file_symbols(date_from, date_to) + + if len(fft_symbols) > 10000: + raise Exception('that\'s too many file symbols to look up, sorry :(') + + print(f'found files for {len(fft_symbols)} symbols') + + criteria = SON({'$gte': date_from}) + + if date_to: + criteria['$lte'] = date_to + + query = {'$or': [{'updated': criteria}, {'191.subfields.value': {'$in': fft_symbols}}]} if cls == BibSet \ + else {'updated': criteria} + + # sort to ensure latest updates are checked first + rset = cls.from_query(query, sort=[('updated', DESC)]) + + return rset + + hist = DB.handle['bib_history'] if cls == BibSet else DB.handle['auth_history'] + deleted = list(hist.find({'deleted.time': {'$gte': date_from}})) + + if deleted: + if delete_only: + rset.records = [] + + rcls = Bib if cls == BibSet else Auth + records = list(rset.records) + to_delete = [] + for d in deleted: + r = rcls({'_id': d['_id']}) + r.set('980', 'a', 'DELETED') + r.updated = d['deleted']['time'] + to_delete.append(r) + + rset.records = (r for r in records + to_delete) # program is expecting an iterable + + return rset + def get_records(args, log=None, queue=None): cls = BibSet if args.type == 'bib' else AuthSet since, to = None, None @@ -282,7 +358,7 @@ def get_records(args, log=None, queue=None): since = datetime.fromisoformat(args.modified_from) records = get_records_by_date(cls, since, to, delete_only=args.delete_only) elif args.modified_to: - raise Exception('--modified_to not valid without --modified_within') + raise Exception('--modified_to not valid without --modified_from') elif args.modified_since_log: c = log.find({'source': args.source, 'record_type': args.type, 'export_end': {'$exists': 1}}, sort=[('export_start', DESCENDING)], limit=1) last = next(c, None) @@ -311,57 +387,15 @@ def get_records(args, log=None, queue=None): else: raise Exception('One of the criteria arguments is required') - hist = DB.handle['bib_history'] if cls == BibSet else DB.handle['auth_history'] - - return records - -def get_records_by_date(cls, date_from, date_to=None, delete_only=False): - """ - Returns - ------- - BibSet / AuthSet - """ - if cls == BibSet: - fft_symbols = export._new_file_symbols(date_from, date_to) - - if len(fft_symbols) > 10000: - raise Exception('that\'s too many file symbols to look up, sorry :(') - - print(f'found files for {len(fft_symbols)} symbols') - - criteria = SON({'$gte': date_from}) - - if date_to: - criteria['$lte'] = date_to - - query = {'$or': [{'updated': criteria}, {'191.subfields.value': {'$in': fft_symbols}}]} if cls == BibSet \ - else {'updated': criteria} - - rset = cls.from_query(query) - - return rset - - hist = DB.handle['bib_history'] if cls == BibSet else DB.handle['auth_history'] - deleted = list(hist.find({'deleted.time': {'$gte': date_from}})) - - if deleted: - if delete_only: - rset.records = [] - - rcls = Bib if cls == BibSet else Auth - records = list(rset.records) - to_delete = [] - - for d in deleted: - r = rcls({'_id': d['_id']}) - r.set('980', 'a', 'DELETED') - r.updated = d['deleted']['time'] - to_delete.append(r) + if args.queue: + queue = DB.handle[export.QUEUE_COLLECTION] + qids = [x['record_id'] for x in queue.find({'type': args.type})] + print(f'Taking {len(qids)} from queue') + q_args, q_kwargs = records.query_params + records = cls.from_query({'$or': [{'_id': {'$in': list(qids)}}, q_args[0]]}, sort=(['updated', ASC])) - rset.records = (r for r in records + to_delete) # program is expecting an iterable + return records - return rset - def clean_values(record): for field in record.datafields: for sub in filter(lambda x: not hasattr(x, 'xref'), field.subfields):