From 4d76e71daf70b9542f92f1a5034353f34027be5d Mon Sep 17 00:00:00 2001 From: jbukhari Date: Wed, 10 Jul 2024 15:00:39 -0400 Subject: [PATCH] Last cleared and count bugfixes (#134) * fix count bug * fix last record cleared check * pass tests --- dlx_dl/scripts/sync/__init__.py | 74 ++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 33 deletions(-) diff --git a/dlx_dl/scripts/sync/__init__.py b/dlx_dl/scripts/sync/__init__.py index 3c63d79..0dec0f1 100644 --- a/dlx_dl/scripts/sync/__init__.py +++ b/dlx_dl/scripts/sync/__init__.py @@ -107,11 +107,14 @@ def run(**kwargs): args.blacklisted = [x['symbol'] for x in blacklist.find({})] HEADERS = {'Authorization': 'Token ' + args.api_key} - records = get_records(args) # returns an interator (dlx.Marc.BibSet/AuthSet) + marcset, deleted = get_records(args) # returns an interator (dlx.Marc.BibSet/AuthSet) + TOTAL = marcset.count + len(deleted) #deleted = get_deleted_records(args) - BATCH, BATCH_SIZE, SEEN, TOTAL, INDEX = [], 100, 0, records.total_count, {} - updated_count = 0 - print(f'checking {records.count} updated records') + BATCH = [] + BATCH_SIZE = 100 + SEEN = 0 + UPDATED_COUNT = 0 + print(f'checking {marcset.count} updated records') # check if last update cleared in DL yet if args.force: @@ -157,7 +160,7 @@ def run(**kwargs): last_dl_record = Bib.from_xml_raw(record_xml) # the record is hasn't been purged from DL yet - if last_dl_record.get_value('980', 'a') != 'DELETED': + if 'DELETED' not in (last_dl_record.get_value('980', 'a'), last_dl_record.get_value('980', 'c')): flag = 'DELETE' except AssertionError: # the record doesnt exist, presumably already purged @@ -181,9 +184,17 @@ def run(**kwargs): raise Exception(f'Last updated record not found by DL search API: {last_dl_record["record_type"]} {last_exported["record_id"]}') if flag: - # the last export has not cleared in DL yet # check callback log to see if the last export had an import error in DL - if callback_data := DB.handle[export.CALLBACK_COLLECTION].find_one({'record_type': last_exported['record_type'], 'record_id': last_exported['record_id']}, sort=[('time', -1)]): + callback_data = DB.handle[export.CALLBACK_COLLECTION].find_one( + { + 'record_type': last_exported['record_type'], + 'record_id': last_exported['record_id'], + 'time': {'$gt': last_exported['time']} + }, + sort=[('time', -1)] + ) + + if callback_data: if callback_data['results'][0]['success'] == False: # the last export was exported succesfully, but failed on import to DL. proceed with export pass @@ -203,7 +214,7 @@ def run(**kwargs): print('building auth cache...') Auth.build_cache() - for i, record in enumerate(records.records): + for i, record in enumerate(chain(marcset.records, (d for d in deleted))): if record.user is None: record.user = 'system' @@ -261,14 +272,14 @@ def run(**kwargs): if dl_record.get_value('980', 'a') != 'DELETED': print(f'{dlx_record.id}: RECORD DELETED') export_whole_record(args, dlx_record, export_type='DELETE') - updated_count += 1 + UPDATED_COUNT += 1 # remove record from list of DL records to compare DL_BATCH.remove(dl_record) elif dlx_record.id not in [x.id for x in DL_BATCH]: print(f'{dlx_record.id}: NOT FOUND IN DL') export_whole_record(args, dlx_record, export_type='NEW') - updated_count += 1 + UPDATED_COUNT += 1 # remove from queue to_remove.append(dlx_record.id) @@ -286,7 +297,7 @@ def run(**kwargs): to_remove.append(dlx_record.id) if result: - updated_count += 1 + UPDATED_COUNT += 1 # clear batch BATCH = [] @@ -299,7 +310,7 @@ def run(**kwargs): print('\b' * (len(str(SEEN)) + 4 + len(str(TOTAL))) + f'{SEEN} / {TOTAL} ', end='', flush=True) # limits - if args.limit != 0 and updated_count >= args.limit: + if args.limit != 0 and UPDATED_COUNT >= args.limit: print('\nReached max exports') enqueue = True if args.queue else False break @@ -316,7 +327,7 @@ def run(**kwargs): print('Submitting remaining records to the queue... ', end='', flush=True) updates = [] - for i, record in enumerate(records): + for i, record in enumerate(marcset): # records is a map object so the unprocessed records will be left over from the loop break data = {'time': datetime.now(timezone.utc), 'source': args.source, 'type': args.type, 'record_id': record.id} updates.append(UpdateOne({'source': args.source, 'type': args.type, 'record_id': record.id}, {'$setOnInsert': data}, upsert=True)) @@ -325,7 +336,7 @@ def run(**kwargs): result = DB.handle[export.QUEUE_COLLECTION].bulk_write(updates) print(f'{result.upserted_count} added. {i + 1 - result.upserted_count} were already in the queue') - print(f'Updated {updated_count} records') + print(f'Updated {UPDATED_COUNT} records') def get_records_by_date(cls, date_from, date_to=None, delete_only=False): """ @@ -385,30 +396,30 @@ def get_records_by_date(cls, date_from, date_to=None, delete_only=False): print(f'Checking {len(to_delete)} deleted records') # todo: enalbe MarcSet.count to handle hybrid cursor/list record sets - rset.total_count = rset.count + len(to_delete) - - return rset + + return [rset, to_delete] def get_records(args, log=None, queue=None): cls = BibSet if args.type == 'bib' else AuthSet since, to = None, None + deleted = [] if args.modified_within and args.modified_until: since = datetime.utcnow() - timedelta(seconds=int(args.modified_within)) to = datetime.utcnow() - timedelta(seconds=int(args.modified_until)) - records = get_records_by_date(cls, since, to, delete_only=args.delete_only) + marcset, deleted = get_records_by_date(cls, since, to, delete_only=args.delete_only) elif args.modified_within: since = datetime.utcnow() - timedelta(seconds=int(args.modified_within)) - records = get_records_by_date(cls, since, None, delete_only=args.delete_only) + marcset, deleted = get_records_by_date(cls, since, None, delete_only=args.delete_only) elif args.modified_until: raise Exception('--modified_until not valid without --modified_within') elif args.modified_from and args.modified_to: since = datetime.fromisoformat(args.modified_from) to = datetime.fromisoformat(args.modified_to) - records = get_records_by_date(cls, since, to, delete_only=args.delete_only) + marcset, deleted = get_records_by_date(cls, since, to, delete_only=args.delete_only) elif args.modified_from: since = datetime.fromisoformat(args.modified_from) - records = get_records_by_date(cls, since, to, delete_only=args.delete_only) + marcset, deleted = get_records_by_date(cls, since, to, delete_only=args.delete_only) elif args.modified_to: raise Exception('--modified_to not valid without --modified_from') elif args.modified_since_log: @@ -416,26 +427,26 @@ def get_records(args, log=None, queue=None): last = next(c, None) if last: last_export = last['export_start'] - records = get_records_by_date(cls, last_export, None, delete_only=args.delete_only) + marcset, deleted = get_records_by_date(cls, last_export, None, delete_only=args.delete_only) else: warn('Initializing the source log entry and quitting.') log.insert_one({'source': args.source, 'record_type': args.type, 'export_start': datetime.now(timezone.utc), 'export_end': datetime.now(timezone.utc)}) return elif args.id: - records = cls.from_query({'_id': int(args.id)}) + marcset = cls.from_query({'_id': int(args.id)}) elif args.ids: - records = cls.from_query({'_id': {'$in': [int(x) for x in args.ids]}}) + marcset = cls.from_query({'_id': {'$in': [int(x) for x in args.ids]}}) elif args.list: with open(args.list, 'r') as f: ids = [int(row[0]) for row in [line.split("\t") for line in f.readlines()]] if len(ids) > 5000: raise Exception(f'Max 5000 IDs from list') - records = cls.from_query({'_id': {'$in': ids}}) + marcset = cls.from_query({'_id': {'$in': ids}}) elif args.query: query = args.query.replace('\'', '"') - records = cls.from_query(json.loads(query), collation=Config.marc_index_default_collation) + marcset = cls.from_query(json.loads(query), collation=Config.marc_index_default_collation) elif args.querystring: query = Query.from_string(args.querystring, record_type=args.type) - records = cls.from_query(query, collation=Config.marc_index_default_collation) + marcset = cls.from_query(query, collation=Config.marc_index_default_collation) else: raise Exception('One of the criteria arguments is required') @@ -443,13 +454,10 @@ def get_records(args, log=None, queue=None): queue = DB.handle[export.QUEUE_COLLECTION] qids = [x['record_id'] for x in queue.find({'type': args.type})] print(f'Taking {len(qids)} from queue') - q_args, q_kwargs = records.query_params - records = cls.from_query({'$or': [{'_id': {'$in': list(qids)}}, q_args[0]]}, sort=[('updated', 1)]) - - # this value is expected to be set later - records.total_count = records.count + q_args, q_kwargs = marcset.query_params + marcset = cls.from_query({'$or': [{'_id': {'$in': list(qids)}}, q_args[0]]}, sort=[('updated', 1)]) - return records + return [marcset, deleted] def normalize(string): return unicodedata.normalize('NFD', string)