Skip to content

Commit

Permalink
NO-REF: Fix and simplify classify record updates (#447)
Browse files Browse the repository at this point in the history
  • Loading branch information
kylevillegas93 authored Nov 14, 2024
1 parent 061d046 commit d7b19f1
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 49 deletions.
35 changes: 13 additions & 22 deletions processes/frbr/classify.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@


class ClassifyProcess(CoreProcess):
WINDOW_SIZE = 100

def __init__(self, *args):
super(ClassifyProcess, self).__init__(*args[:4], batchSize=50)

self.ingestLimit = int(args[4]) if len(args) >= 5 and args[4] else None
self.ingest_limit = int(args[4]) if len(args) >= 5 and args[4] else None

self.generateEngine()
self.createSession()
Expand Down Expand Up @@ -49,11 +47,11 @@ def runProcess(self):
else:
logger.warning(f'Unknown classify process type: {self.process}')
return

self.saveRecords()
self.commitChanges()

logger.info(f'Classified {self.classified_count} records and saved {len(self.records)} classify records')
logger.info(f'Classified {self.classified_count} records')
except Exception as e:
logger.exception(f'Failed to run classify process')
raise e
Expand All @@ -71,30 +69,23 @@ def classify_records(self, full=False, start_date_time=None):

get_unfrbrized_records_query = get_unfrbrized_records_query.filter(Record.date_modified > start_date_time)

window_size = min(self.ingestLimit or self.WINDOW_SIZE, self.WINDOW_SIZE)
frbrized_records = []
while unfrbrized_record := get_unfrbrized_records_query.first():
self.frbrize_record(unfrbrized_record)

for record in self.windowedQuery(Record, get_unfrbrized_records_query, windowSize=window_size):
self.frbrize_record(record)
unfrbrized_record.cluster_status = False
unfrbrized_record.frbr_status = 'complete'

record.cluster_status = False
record.frbr_status = 'complete'
frbrized_records.append(record)
self.session.add(unfrbrized_record)
self.session.commit()
self.classified_count += 1

if self.ingest_limit and self.classified_count >= self.ingest_limit:
break

if self.redis_manager.checkIncrementerRedis('oclcCatalog', 'API'):
logger.warning('Exceeded max requests to OCLC catalog')
break

if len(frbrized_records) >= window_size:
self.classified_count += len(frbrized_records)
self.bulkSaveObjects(frbrized_records)

frbrized_records = []

if len(frbrized_records):
self.classified_count += len(frbrized_records)
self.bulkSaveObjects(frbrized_records)

def frbrize_record(self, record: Record):
queryable_ids = self._get_queryable_identifiers(record.identifiers)

Expand Down
39 changes: 12 additions & 27 deletions tests/unit/processes/frbr/test_classify_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def test_instance(self, mocker):
class TestClassifyProcess(ClassifyProcess):
def __init__(self, *args):
self.records = set()
self.ingestLimit = None
self.ingest_limit = None
self.records = []
self.catalog_queue = os.environ['OCLC_QUEUE']
self.catalog_route = os.environ['OCLC_ROUTING_KEY']
Expand Down Expand Up @@ -81,19 +81,15 @@ def test_classify_records_not_full(self, test_instance, mocker):

mock_session.query().filter.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_records = [mocker.MagicMock(name=i) for i in range(100)]
mock_window_query = mocker.patch.object(ClassifyProcess, 'windowedQuery')
mock_window_query.return_value = mock_records
mock_query.first.side_effect = mock_records + [None]

test_instance.redis_manager.checkIncrementerRedis.side_effect = [False] * 100

mock_bulk_save_objects = mocker.patch.object(ClassifyProcess, 'bulkSaveObjects')

test_instance.classify_records()

mock_window_query.assert_called_once()
mock_frbrize_record.assert_has_calls([mocker.call(record) for record in mock_records])
mock_bulk_save_objects.assert_called_once()

def test_classify_records_custom_range(self, test_instance, mocker):
mock_frbrize = mocker.patch.object(ClassifyProcess, 'frbrize_record')
Expand All @@ -104,45 +100,37 @@ def test_classify_records_custom_range(self, test_instance, mocker):

mock_session.query().filter.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_records = [mocker.MagicMock(name=i) for i in range(100)]
mock_window_query = mocker.patch.object(ClassifyProcess, 'windowedQuery')
mock_window_query.return_value = mock_records
mock_query.first.side_effect = mock_records + [None]

test_instance.redis_manager.checkIncrementerRedis.side_effect = [False] * 100

mock_bulk_save_objects = mocker.patch.object(ClassifyProcess, 'bulkSaveObjects')

test_instance.classify_records(start_date_time='testDate')

mock_start_time.now.assert_not_called()
mock_start_time.timedelta.assert_not_called()
mock_window_query.assert_called_once()
mock_frbrize.assert_has_calls([mocker.call(record) for record in mock_records])
mock_bulk_save_objects.assert_called_once()

def test_classify_records_full(self, test_instance, mocker):
mock_frbrize_record = mocker.patch.object(ClassifyProcess, 'frbrize_record')
mock_session = mocker.MagicMock()
mockQuery = mocker.MagicMock()
mock_query = mocker.MagicMock()
test_instance.session = mock_session
mock_start_time = mocker.spy(datetime, 'datetime')

mock_session.query().filter.return_value = mockQuery
mock_session.query().filter.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_records = [mocker.MagicMock(name=i) for i in range(100)]
mock_window_query = mocker.patch.object(ClassifyProcess, 'windowedQuery')
mock_window_query.return_value = mock_records
mock_query.first.side_effect = mock_records + [None]

test_instance.redis_manager.checkIncrementerRedis.side_effect = [False] * 50 + [True]

mock_bulk_save_objects = mocker.patch.object(ClassifyProcess, 'bulkSaveObjects')

test_instance.classify_records(full=True)

mock_start_time.now.assert_not_called()
mock_start_time.timedelta.assert_not_called()
mock_window_query.assert_called_once()
mock_frbrize_record.assert_has_calls([mocker.call(record) for record in mock_records[:50]])
mock_bulk_save_objects.assert_called_once()

def test_classify_records_full_batch(self, test_instance, mocker):
mock_frbrize_record = mocker.patch.object(ClassifyProcess, 'frbrize_record')
Expand All @@ -152,21 +140,18 @@ def test_classify_records_full_batch(self, test_instance, mocker):
mock_start_time = mocker.spy(datetime, 'datetime')

mock_session.query().filter.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_records = [mocker.MagicMock(name=i) for i in range(100)]
mock_window_query = mocker.patch.object(ClassifyProcess, 'windowedQuery')
mock_window_query.return_value = mock_records
mock_query.first.side_effect = mock_records + [None]

test_instance.redis_manager.checkIncrementerRedis.side_effect = [False] * 100

mock_bulk_save_objects = mocker.patch.object(ClassifyProcess, 'bulkSaveObjects')

test_instance.ingestLimit = 100
test_instance.ingest_limit = 100
test_instance.classify_records(full=True)

mock_start_time.now.assert_not_called()
mock_start_time.timedelta.assert_not_called()
mock_frbrize_record.assert_has_calls([mocker.call(record) for record in mock_records])
mock_bulk_save_objects.assert_called_once()

def test_frbrize_record_success_valid_author(self, test_instance, test_record, mocker):
mock_identifiers = mocker.patch.object(ClassifyProcess, '_get_queryable_identifiers')
Expand Down

0 comments on commit d7b19f1

Please sign in to comment.