Skip to content

Commit

Permalink
Last cleared and count bugfixes (#134)
Browse files Browse the repository at this point in the history
* fix count bug

* fix last record cleared check

* pass tests
  • Loading branch information
jbukhari authored Jul 10, 2024
1 parent 9103eb2 commit 4d76e71
Showing 1 changed file with 41 additions and 33 deletions.
74 changes: 41 additions & 33 deletions dlx_dl/scripts/sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,14 @@ def run(**kwargs):
args.blacklisted = [x['symbol'] for x in blacklist.find({})]

HEADERS = {'Authorization': 'Token ' + args.api_key}
records = get_records(args) # returns an interator (dlx.Marc.BibSet/AuthSet)
marcset, deleted = get_records(args) # returns an interator (dlx.Marc.BibSet/AuthSet)
TOTAL = marcset.count + len(deleted)
#deleted = get_deleted_records(args)
BATCH, BATCH_SIZE, SEEN, TOTAL, INDEX = [], 100, 0, records.total_count, {}
updated_count = 0
print(f'checking {records.count} updated records')
BATCH = []
BATCH_SIZE = 100
SEEN = 0
UPDATED_COUNT = 0
print(f'checking {marcset.count} updated records')

# check if last update cleared in DL yet
if args.force:
Expand Down Expand Up @@ -157,7 +160,7 @@ def run(**kwargs):
last_dl_record = Bib.from_xml_raw(record_xml)

# the record is hasn't been purged from DL yet
if last_dl_record.get_value('980', 'a') != 'DELETED':
if 'DELETED' not in (last_dl_record.get_value('980', 'a'), last_dl_record.get_value('980', 'c')):
flag = 'DELETE'
except AssertionError:
# the record doesnt exist, presumably already purged
Expand All @@ -181,9 +184,17 @@ def run(**kwargs):
raise Exception(f'Last updated record not found by DL search API: {last_dl_record["record_type"]} {last_exported["record_id"]}')

if flag:
# the last export has not cleared in DL yet
# check callback log to see if the last export had an import error in DL
if callback_data := DB.handle[export.CALLBACK_COLLECTION].find_one({'record_type': last_exported['record_type'], 'record_id': last_exported['record_id']}, sort=[('time', -1)]):
callback_data = DB.handle[export.CALLBACK_COLLECTION].find_one(
{
'record_type': last_exported['record_type'],
'record_id': last_exported['record_id'],
'time': {'$gt': last_exported['time']}
},
sort=[('time', -1)]
)

if callback_data:
if callback_data['results'][0]['success'] == False:
# the last export was exported succesfully, but failed on import to DL. proceed with export
pass
Expand All @@ -203,7 +214,7 @@ def run(**kwargs):
print('building auth cache...')
Auth.build_cache()

for i, record in enumerate(records.records):
for i, record in enumerate(chain(marcset.records, (d for d in deleted))):
if record.user is None:
record.user = 'system'

Expand Down Expand Up @@ -261,14 +272,14 @@ def run(**kwargs):
if dl_record.get_value('980', 'a') != 'DELETED':
print(f'{dlx_record.id}: RECORD DELETED')
export_whole_record(args, dlx_record, export_type='DELETE')
updated_count += 1
UPDATED_COUNT += 1

# remove record from list of DL records to compare
DL_BATCH.remove(dl_record)
elif dlx_record.id not in [x.id for x in DL_BATCH]:
print(f'{dlx_record.id}: NOT FOUND IN DL')
export_whole_record(args, dlx_record, export_type='NEW')
updated_count += 1
UPDATED_COUNT += 1

# remove from queue
to_remove.append(dlx_record.id)
Expand All @@ -286,7 +297,7 @@ def run(**kwargs):
to_remove.append(dlx_record.id)

if result:
updated_count += 1
UPDATED_COUNT += 1

# clear batch
BATCH = []
Expand All @@ -299,7 +310,7 @@ def run(**kwargs):
print('\b' * (len(str(SEEN)) + 4 + len(str(TOTAL))) + f'{SEEN} / {TOTAL} ', end='', flush=True)

# limits
if args.limit != 0 and updated_count >= args.limit:
if args.limit != 0 and UPDATED_COUNT >= args.limit:
print('\nReached max exports')
enqueue = True if args.queue else False
break
Expand All @@ -316,7 +327,7 @@ def run(**kwargs):
print('Submitting remaining records to the queue... ', end='', flush=True)
updates = []

for i, record in enumerate(records):
for i, record in enumerate(marcset):
# records is a map object so the unprocessed records will be left over from the loop break
data = {'time': datetime.now(timezone.utc), 'source': args.source, 'type': args.type, 'record_id': record.id}
updates.append(UpdateOne({'source': args.source, 'type': args.type, 'record_id': record.id}, {'$setOnInsert': data}, upsert=True))
Expand All @@ -325,7 +336,7 @@ def run(**kwargs):
result = DB.handle[export.QUEUE_COLLECTION].bulk_write(updates)
print(f'{result.upserted_count} added. {i + 1 - result.upserted_count} were already in the queue')

print(f'Updated {updated_count} records')
print(f'Updated {UPDATED_COUNT} records')

def get_records_by_date(cls, date_from, date_to=None, delete_only=False):
"""
Expand Down Expand Up @@ -385,71 +396,68 @@ def get_records_by_date(cls, date_from, date_to=None, delete_only=False):
print(f'Checking {len(to_delete)} deleted records')

# todo: enalbe MarcSet.count to handle hybrid cursor/list record sets
rset.total_count = rset.count + len(to_delete)

return rset

return [rset, to_delete]

def get_records(args, log=None, queue=None):
cls = BibSet if args.type == 'bib' else AuthSet
since, to = None, None
deleted = []

if args.modified_within and args.modified_until:
since = datetime.utcnow() - timedelta(seconds=int(args.modified_within))
to = datetime.utcnow() - timedelta(seconds=int(args.modified_until))
records = get_records_by_date(cls, since, to, delete_only=args.delete_only)
marcset, deleted = get_records_by_date(cls, since, to, delete_only=args.delete_only)
elif args.modified_within:
since = datetime.utcnow() - timedelta(seconds=int(args.modified_within))
records = get_records_by_date(cls, since, None, delete_only=args.delete_only)
marcset, deleted = get_records_by_date(cls, since, None, delete_only=args.delete_only)
elif args.modified_until:
raise Exception('--modified_until not valid without --modified_within')
elif args.modified_from and args.modified_to:
since = datetime.fromisoformat(args.modified_from)
to = datetime.fromisoformat(args.modified_to)
records = get_records_by_date(cls, since, to, delete_only=args.delete_only)
marcset, deleted = get_records_by_date(cls, since, to, delete_only=args.delete_only)
elif args.modified_from:
since = datetime.fromisoformat(args.modified_from)
records = get_records_by_date(cls, since, to, delete_only=args.delete_only)
marcset, deleted = get_records_by_date(cls, since, to, delete_only=args.delete_only)
elif args.modified_to:
raise Exception('--modified_to not valid without --modified_from')
elif args.modified_since_log:
c = log.find({'source': args.source, 'record_type': args.type, 'export_end': {'$exists': 1}}, sort=[('export_start', -1)], limit=1)
last = next(c, None)
if last:
last_export = last['export_start']
records = get_records_by_date(cls, last_export, None, delete_only=args.delete_only)
marcset, deleted = get_records_by_date(cls, last_export, None, delete_only=args.delete_only)
else:
warn('Initializing the source log entry and quitting.')
log.insert_one({'source': args.source, 'record_type': args.type, 'export_start': datetime.now(timezone.utc), 'export_end': datetime.now(timezone.utc)})
return
elif args.id:
records = cls.from_query({'_id': int(args.id)})
marcset = cls.from_query({'_id': int(args.id)})
elif args.ids:
records = cls.from_query({'_id': {'$in': [int(x) for x in args.ids]}})
marcset = cls.from_query({'_id': {'$in': [int(x) for x in args.ids]}})
elif args.list:
with open(args.list, 'r') as f:
ids = [int(row[0]) for row in [line.split("\t") for line in f.readlines()]]
if len(ids) > 5000: raise Exception(f'Max 5000 IDs from list')
records = cls.from_query({'_id': {'$in': ids}})
marcset = cls.from_query({'_id': {'$in': ids}})
elif args.query:
query = args.query.replace('\'', '"')
records = cls.from_query(json.loads(query), collation=Config.marc_index_default_collation)
marcset = cls.from_query(json.loads(query), collation=Config.marc_index_default_collation)
elif args.querystring:
query = Query.from_string(args.querystring, record_type=args.type)
records = cls.from_query(query, collation=Config.marc_index_default_collation)
marcset = cls.from_query(query, collation=Config.marc_index_default_collation)
else:
raise Exception('One of the criteria arguments is required')

if args.queue:
queue = DB.handle[export.QUEUE_COLLECTION]
qids = [x['record_id'] for x in queue.find({'type': args.type})]
print(f'Taking {len(qids)} from queue')
q_args, q_kwargs = records.query_params
records = cls.from_query({'$or': [{'_id': {'$in': list(qids)}}, q_args[0]]}, sort=[('updated', 1)])

# this value is expected to be set later
records.total_count = records.count
q_args, q_kwargs = marcset.query_params
marcset = cls.from_query({'$or': [{'_id': {'$in': list(qids)}}, q_args[0]]}, sort=[('updated', 1)])

return records
return [marcset, deleted]

def normalize(string):
return unicodedata.normalize('NFD', string)
Expand Down

0 comments on commit 4d76e71

Please sign in to comment.