Skip to content

Commit

Permalink
SFR-2364: Remove DBManager as an ancestor of CoreProcess
Browse files Browse the repository at this point in the history
  • Loading branch information
kylevillegas93 committed Nov 25, 2024
1 parent 76cced2 commit 42573e1
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 45 deletions.
19 changes: 11 additions & 8 deletions processes/ingest/chicago_isac.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@

from ..core import CoreProcess
from mappings.chicagoISAC import ChicagoISACMapping
from managers import S3Manager, WebpubManifest
from managers import DBManager, S3Manager, WebpubManifest
from logger import create_log
from ..record_buffer import RecordBuffer

logger = create_log(__name__)

class ChicagoISACProcess(CoreProcess):

def __init__(self, *args):
super(ChicagoISACProcess, self).__init__(*args[:4])

self.generateEngine()
self.createSession()
self.db_manager = DBManager()

self.db_manager.generateEngine()
self.db_manager.createSession()

self.record_buffer = RecordBuffer(db_manager=self.db_manager)

self.s3Bucket = os.environ['FILE_BUCKET']
self.s3_manager = S3Manager()
Expand All @@ -27,18 +31,17 @@ def runProcess(self):
for meta_dict in chicago_isac_data:
self.process_chicago_isac_record(meta_dict)

self.saveRecords()
self.commitChanges()
self.record_buffer.flush()

logger.info(f'Ingested {len(self.records)} ISAC records')
logger.info(f'Ingested {len(self.record_buffer.number_of_ingested_records)} ISAC records')

def process_chicago_isac_record(self, record):
try:
chicago_isac_rec = ChicagoISACMapping(record)
chicago_isac_rec.applyMapping()
self.store_pdf_manifest(chicago_isac_rec.record)
self.addDCDWToUpdateList(chicago_isac_rec)

self.record_buffer.add(chicago_isac_rec)
except Exception:
logger.exception(ChicagoISACError('Unable to process ISAC record'))

Expand Down
18 changes: 11 additions & 7 deletions processes/ingest/doab.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from logger import create_log
from mappings.doab import DOABMapping
from mappings.base_mapping import MappingError
from managers import DOABLinkManager, RabbitMQManager, S3Manager
from managers import DBManager, DOABLinkManager, RabbitMQManager, S3Manager
from model import get_file_message
from ..record_buffer import RecordBuffer


logger = create_log(__name__)
Expand All @@ -32,8 +33,12 @@ def __init__(self, *args):
self.ingestOffset = int(args[5]) if args[5] else 0
self.ingestLimit = (int(args[4]) + self.ingestOffset) if args[4] else 10000

self.generateEngine()
self.createSession()
self.db_manager = DBManager()

self.db_manager.generateEngine()
self.db_manager.createSession()

self.record_buffer = RecordBuffer(self.db_manager)

self.s3_manager = S3Manager()
self.s3_manager.createS3Client()
Expand All @@ -58,10 +63,9 @@ def runProcess(self):
elif self.process == 'single':
self.importSingleOAIRecord(self.singleRecord)

self.saveRecords()
self.commitChanges()
self.record_buffer.flush()

logger.info(f'Ingested {len(self.records)} DOAB records')
logger.info(f'Ingested {len(self.record_buffer.number_of_ingested_records)} DOAB records')

def parseDOABRecord(self, oaiRec):
try:
Expand All @@ -82,7 +86,7 @@ def parseDOABRecord(self, oaiRec):
ePubPath, ePubURI = epubLink
self.rabbitmq_manager.sendMessageToQueue(self.fileQueue, self.fileRoute, get_file_message(ePubURI, ePubPath))

self.addDCDWToUpdateList(doabRec)
self.record_buffer.add(doabRec)

def importSingleOAIRecord(self, recordID):
urlParams = 'verb=GetRecord&metadataPrefix=oai_dc&identifier=oai:directory.doabooks.org:{}'.format(recordID)
Expand Down
30 changes: 30 additions & 0 deletions processes/record_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from managers import DBManager
from mappings.base_mapping import BaseMapping
from model import Record

class RecordBuffer:
def __init__(self, db_manager: DBManager, batch_size: int=500):
self.db_manager = db_manager
self.records = set()
self.batch_size = batch_size
self.number_of_ingested_records = 0

def add(self, record: BaseMapping):
existing_record = self.db_manager.session.query(Record).filter(
Record.source_id == record.record.source_id
).first()

if existing_record:
record.updateExisting(existing_record)
self.records.discard(existing_record)
self.records.add(existing_record)
else:
self.records.add(record.record)

if self.batch_size >= len(self.records):
self.flush()

def flush(self):
self.db_manager.bulkSaveObjects(self.records)
self.number_of_ingested_records += len(self.records)
self.records.clear()
16 changes: 5 additions & 11 deletions tests/unit/processes/ingest/test_chicago_isac_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,23 @@ class TestISAC(ChicagoISACProcess):
def __init__(self):
self.s3Bucket = 'test_aws_bucket'
self.s3_manager = mocker.MagicMock(s3Client=mocker.MagicMock())
self.db_manager = mocker.MagicMock()
self.record_buffer = mocker.MagicMock(db_manager=self.db_manager)
self.session = mocker.MagicMock(session='test_session')
self.records = mocker.MagicMock(record='test_record')
self.batch_size = mocker.MagicMock(batch_size='test_batch_size')

return TestISAC()

def test_run_process(self, test_process, mocker):
run_mocks = mocker.patch.multiple(
ChicagoISACProcess,
saveRecords=mocker.DEFAULT,
commitChanges=mocker.DEFAULT
)

def test_run_process(self, test_process):
test_process.runProcess()

run_mocks['saveRecords'].assert_called_once()
run_mocks['commitChanges'].assert_called_once()
test_process.record_buffer.flush.assert_called_once()


def test_process_chicago_isac_record_success(self, test_process, mocker):
processMocks = mocker.patch.multiple(ChicagoISACProcess,
store_pdf_manifest=mocker.DEFAULT,
addDCDWToUpdateList=mocker.DEFAULT
)

mock_mapping = mocker.MagicMock(record='test_record')
Expand All @@ -53,7 +47,7 @@ def test_process_chicago_isac_record_success(self, test_process, mocker):
mock_mapping.applyMapping.assert_called_once()

processMocks['store_pdf_manifest'].assert_called_once_with('test_record')
processMocks['addDCDWToUpdateList'].assert_called_once_with(mock_mapping)
test_process.record_buffer.add.assert_called_once_with(mock_mapping)

def test_process_chicago_isac_record_error(self, mocker):

Expand Down
26 changes: 7 additions & 19 deletions tests/unit/processes/ingest/test_doab_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class TestDOAB(DOABProcess):
def __init__(self):
self.s3Bucket = 'test_aws_bucket'
self.s3_manager = mocker.MagicMock(s3Client=mocker.MagicMock())
self.db_manager = mocker.MagicMock()
self.record_buffer = mocker.MagicMock(db_manager=self.db_manager)
self.fileQueue = 'test_file_queue'
self.fileRoute = 'test_file_key'
self.constants = {}
Expand Down Expand Up @@ -61,53 +63,41 @@ def mockOAIQuery(self, mocker):

def test_runProcess_daily(self, testProcess, mocker):
mockImport = mocker.patch.object(DOABProcess, 'importOAIRecords')
mockSave = mocker.patch.object(DOABProcess, 'saveRecords')
mockCommit = mocker.patch.object(DOABProcess, 'commitChanges')

testProcess.process = 'daily'
testProcess.runProcess()

mockImport.assert_called_once
mockSave.assert_called_once
mockCommit.assert_called_once
testProcess.record_buffer.flush.assert_called_once

def test_runProcess_complete(self, testProcess, mocker):
mockImport = mocker.patch.object(DOABProcess, 'importOAIRecords')
mockSave = mocker.patch.object(DOABProcess, 'saveRecords')
mockCommit = mocker.patch.object(DOABProcess, 'commitChanges')

testProcess.process = 'complete'
testProcess.runProcess()

mockImport.assert_called_once_with(fullOrPartial=True)
mockSave.assert_called_once
mockCommit.assert_called_once
testProcess.record_buffer.flush.assert_called_once

def test_runProcess_custom(self, testProcess, mocker):
mockImport = mocker.patch.object(DOABProcess, 'importOAIRecords')
mockSave = mocker.patch.object(DOABProcess, 'saveRecords')
mockCommit = mocker.patch.object(DOABProcess, 'commitChanges')

testProcess.process = 'custom'
testProcess.ingestPeriod = 'customTimestamp'
testProcess.runProcess()

mockImport.assert_called_once_with(startTimestamp='customTimestamp')
mockSave.assert_called_once
mockCommit.assert_called_once
testProcess.record_buffer.flush.assert_called_once

def test_runProcess_single(self, testProcess, mocker):
mockImport = mocker.patch.object(DOABProcess, 'importSingleOAIRecord')
mockSave = mocker.patch.object(DOABProcess, 'saveRecords')
mockCommit = mocker.patch.object(DOABProcess, 'commitChanges')

testProcess.process = 'single'
testProcess.singleRecord = 1
testProcess.runProcess()

mockImport.assert_called_once_with(1)
mockSave.assert_called_once
mockCommit.assert_called_once
testProcess.record_buffer.flush.assert_called_once

def test_importSingleOAIRecord_success(self, testProcess, mocker):
mockResponse = mocker.MagicMock()
Expand Down Expand Up @@ -282,8 +272,6 @@ def test_parseDOABRecord_success(self, testProcess, mocker):
mockLinkManager = mocker.patch('processes.ingest.doab.DOABLinkManager')
mockLinkManager.return_value = mockManager

processMocks = mocker.patch.multiple(DOABProcess, addDCDWToUpdateList=mocker.DEFAULT)

testProcess.parseDOABRecord('testMARC')

mockMapper.assert_called_once_with('testMARC', testProcess.OAI_NAMESPACES, {})
Expand All @@ -295,7 +283,7 @@ def test_parseDOABRecord_success(self, testProcess, mocker):
testProcess.fileRoute,
get_file_message('epubURI', 'epubPath')
)
processMocks['addDCDWToUpdateList'].assert_called_once_with(mockMapping)
testProcess.record_buffer.add.assert_called_once_with(mockMapping)

def test_parseDOABRecord_error(self, testProcess, mocker):
mockMapping = mocker.MagicMock()
Expand Down

0 comments on commit 42573e1

Please sign in to comment.