Skip to content

Commit

Permalink
refactoring all ingest processes
Browse files Browse the repository at this point in the history
  • Loading branch information
kylevillegas93 committed Nov 25, 2024
1 parent 42573e1 commit 2de1213
Show file tree
Hide file tree
Showing 20 changed files with 183 additions and 218 deletions.
2 changes: 1 addition & 1 deletion processes/ingest/chicago_isac.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def runProcess(self):

self.record_buffer.flush()

logger.info(f'Ingested {len(self.record_buffer.number_of_ingested_records)} ISAC records')
logger.info(f'Ingested {len(self.record_buffer.ingest_count)} ISAC records')

def process_chicago_isac_record(self, record):
try:
Expand Down
2 changes: 1 addition & 1 deletion processes/ingest/doab.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def runProcess(self):

self.record_buffer.flush()

logger.info(f'Ingested {len(self.record_buffer.number_of_ingested_records)} DOAB records')
logger.info(f'Ingested {len(self.record_buffer.ingest_count)} DOAB records')

def parseDOABRecord(self, oaiRec):
try:
Expand Down
18 changes: 11 additions & 7 deletions processes/ingest/gutenberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

from constants.get_constants import get_constants
from ..core import CoreProcess
from managers import GutenbergManager, RabbitMQManager
from managers import DBManager, GutenbergManager, RabbitMQManager
from mappings.gutenberg import GutenbergMapping
from model import get_file_message
from logger import create_log
from ..record_buffer import RecordBuffer

logger = create_log(__name__)

Expand All @@ -30,8 +31,12 @@ def __init__(self, *args):
self.ingestOffset = int(args[5] or 0)
self.ingestLimit = (int(args[4]) + self.ingestOffset) if args[4] else 5000

self.generateEngine()
self.createSession()
self.db_manager = DBManager()

self.db_manager.generateEngine()
self.db_manager.createSession()

self.record_buffer = RecordBuffer(db_manager=self.db_manager)

self.fileQueue = os.environ['FILE_QUEUE']
self.fileRoute = os.environ['FILE_ROUTING_KEY']
Expand All @@ -52,10 +57,9 @@ def runProcess(self):
elif self.process == 'custom':
self.importRDFRecords(startTimestamp=self.ingestPeriod)

self.saveRecords()
self.commitChanges()
self.record_buffer.flush()

logger.info(f'Ingested {len(self.records)} Gutenberg records')
logger.info(f'Ingested {len(self.record_buffer.ingest_count)} Gutenberg records')

def importRDFRecords(self, fullImport=False, startTimestamp=None):
orderDirection = 'DESC'
Expand Down Expand Up @@ -104,7 +108,7 @@ def processGutenbergBatch(self, dataFiles):
except (KeyError, AttributeError):
logger.warning('Unable to store cover for {}'.format(gutenbergRec.record.source_id))

self.addDCDWToUpdateList(gutenbergRec)
self.record_buffer.add(gutenbergRec)

def storeEpubsInS3(self, gutenbergRec):
newParts = []
Expand Down
19 changes: 12 additions & 7 deletions processes/ingest/hathi_trust.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@

from constants.get_constants import get_constants
from ..core import CoreProcess
from managers import DBManager
from mappings.hathitrust import HathiMapping
from logger import create_log
from ..record_buffer import RecordBuffer

logger = create_log(__name__)

Expand All @@ -22,8 +24,12 @@ def __init__(self, *args):

self.ingest_limit = int(args[4]) if args[4] else None

self.generateEngine()
self.createSession()
self.db_manager = DBManager()

self.db_manager.generateEngine()
self.db_manager.createSession()

self.record_buffer = RecordBuffer(db_manager=self.db_manager)

self.constants = get_constants()

Expand All @@ -36,10 +42,9 @@ def runProcess(self):
elif self.process == 'custom':
self.importFromSpecificFile(self.customFile)

self.saveRecords()
self.commitChanges()
self.record_buffer.flush()

logger.info(f'Ingested {len(self.records)} Hathi Trust records')
logger.info(f'Ingested {len(self.record_buffer.ingest_count)} Hathi Trust records')

def importFromSpecificFile(self, file_path):
try:
Expand All @@ -53,8 +58,8 @@ def importFromSpecificFile(self, file_path):
def parseHathiDataRow(self, data_row):
hathiRec = HathiMapping(data_row, self.constants)
hathiRec.applyMapping()
self.addDCDWToUpdateList(hathiRec)


self.record_buffer.add(hathiRec)

def importFromHathiTrustDataFile(self, full_dump=False, start_date_time=None):
try:
Expand Down
25 changes: 12 additions & 13 deletions processes/ingest/loc.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import time
import os, requests
from requests.exceptions import HTTPError, ConnectionError
from requests.exceptions import HTTPError

from ..core import CoreProcess
from mappings.base_mapping import MappingError
from mappings.loc import LOCMapping
from managers import RabbitMQManager, S3Manager, WebpubManifest
from managers import DBManager, RabbitMQManager, S3Manager, WebpubManifest
from model import get_file_message
from logger import create_log
from datetime import datetime, timedelta, timezone
from ..record_buffer import RecordBuffer

logger = create_log(__name__)

Expand All @@ -25,12 +25,15 @@ def __init__(self, *args):
self.process == 'complete'
self.startTimestamp = None

self.db_manager = DBManager()

self.generateEngine()
self.createSession()

self.createS3Client()
self.record_buffer = RecordBuffer(db_manager=self.db_manager)

self.s3_manager = S3Manager()
self.createS3Client()
self.s3_manager.createS3Client()
self.s3Bucket = os.environ['FILE_BUCKET']

self.fileQueue = os.environ['FILE_QUEUE']
Expand All @@ -51,14 +54,11 @@ def runProcess(self):
startTimeStamp = datetime.strptime(timeStamp, '%Y-%m-%dT%H:%M:%S')
self.importLOCRecords(startTimeStamp)

self.saveRecords()
self.commitChanges()
self.record_buffer.flush()

logger.info(f'Ingested {len(self.records)} LOC records')
logger.info(f'Ingested {len(self.record_buffer.ingest_count)} LOC records')


def importLOCRecords(self, startTimeStamp=None):

openAccessRequestCount = 0
digitizedRequestCount = 0

Expand All @@ -76,8 +76,6 @@ def importLOCRecords(self, startTimeStamp=None):
except Exception or HTTPError as e:
logger.exception(e)



def importOpenAccessRecords(self, count, customTimeStamp):
sp = 1
try:
Expand Down Expand Up @@ -188,7 +186,8 @@ def processLOCRecord(self, record):
self.addHasPartMapping(record, LOCRec.record)
self.storePDFManifest(LOCRec.record)
self.storeEpubsInS3(LOCRec.record)
self.addDCDWToUpdateList(LOCRec)

self.record_buffer.add(LOCRec)
except Exception:
logger.exception(f'Unable to process LOC record')

Expand Down
18 changes: 11 additions & 7 deletions processes/ingest/met.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from ..core import CoreProcess
from mappings.base_mapping import MappingError
from mappings.met import METMapping
from managers import RabbitMQManager, S3Manager, WebpubManifest
from managers import DBManager, RabbitMQManager, S3Manager, WebpubManifest
from model import get_file_message
from logger import create_log
from ..record_buffer import RecordBuffer

logger = create_log(__name__)

Expand All @@ -31,8 +32,12 @@ def __init__(self, *args):
self.fullImport = self.process == 'complete'
self.startTimestamp = None

self.generateEngine()
self.createSession()
self.db_manager = DBManager()

self.db_manager.generateEngine()
self.db_manager.createSession()

self.record_buffer = RecordBuffer(db_manager=self.db_manager)

self.fileQueue = os.environ['FILE_QUEUE']
self.fileRoute = os.environ['FILE_ROUTING_KEY']
Expand All @@ -49,10 +54,9 @@ def runProcess(self):
self.setStartTime()
self.importDCRecords()

self.saveRecords()
self.commitChanges()
self.record_buffer.flush()

logger.info(f'Ingested {len(self.records)} MET records')
logger.info(f'Ingested {len(self.record_buffer.ingest_count)} MET records')

def setStartTime(self):
if not self.fullImport:
Expand Down Expand Up @@ -115,7 +119,7 @@ def processMetRecord(self, record):
except METError as e:
logger.warning('Unable to fetch cover ({})'.format(e))

self.addDCDWToUpdateList(metRec)
self.record_buffer.add(metRec)

def addCoverAndStoreInS3(self, record, filetype):
recordID = record.identifiers[0].split('|')[0]
Expand Down
18 changes: 11 additions & 7 deletions processes/ingest/muse.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@

from ..core import CoreProcess
from mappings.muse import MUSEMapping
from managers import MUSEError, MUSEManager, RabbitMQManager, S3Manager
from managers import DBManager, MUSEError, MUSEManager, RabbitMQManager, S3Manager
from model import get_file_message
from logger import create_log
from ..record_buffer import RecordBuffer


logger = create_log(__name__)
Expand All @@ -24,8 +25,12 @@ def __init__(self, *args):

self.ingest_limit = int(args[4]) if args[4] else None

self.generateEngine()
self.createSession()
self.db_manager = DBManager()

self.db_manager.generateEngine()
self.db_manager.createSession()

self.record_buffer = RecordBuffer(db_manager=self.db_manager)

self.s3_manager = S3Manager()
self.s3_manager.createS3Client()
Expand All @@ -47,10 +52,9 @@ def runProcess(self):
elif self.process == 'single':
self.importMARCRecords(recID=self.singleRecord)

self.saveRecords()
self.commitChanges()
self.record_buffer.flush()

logger.info(f'Ingested {len(self.records)} MUSE records')
logger.info(f'Ingested {len(self.record_buffer.ingest_count)} MUSE records')

def parseMuseRecord(self, marcRec):
museRec = MUSEMapping(marcRec)
Expand Down Expand Up @@ -80,7 +84,7 @@ def parseMuseRecord(self, marcRec):
if museManager.epubURL:
self.rabbitmq_manager.sendMessageToQueue(self.fileQueue, self.fileRoute, get_file_message(museManager.epubURL, museManager.s3EpubPath))

self.addDCDWToUpdateList(museRec)
self.record_buffer.add(museRec)

def importMARCRecords(self, full=False, startTimestamp=None, recID=None):
self.downloadRecordUpdates()
Expand Down
21 changes: 13 additions & 8 deletions processes/ingest/nypl.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from ..core import CoreProcess
from logger import create_log
from managers import DBManager
from ..record_buffer import RecordBuffer
from services import NYPLBibService

logger = create_log(__name__)
Expand All @@ -14,11 +16,15 @@ def __init__(self, *args):

self.nypl_bib_service = NYPLBibService()

self.db_manager = DBManager()

self.db_manager.generateEngine()
self.db_manager.createSession()

self.record_buffer = RecordBuffer(db_manager=self.db_manager)

def runProcess(self):
try:
self.generateEngine()
self.createSession()

if self.process == 'daily':
records = self.nypl_bib_service.get_records(offset=self.offset, limit=self.limit)
elif self.process == 'complete':
Expand All @@ -30,14 +36,13 @@ def runProcess(self):
return

for record in records:
self.addDCDWToUpdateList(record)
self.record_buffer.add(record)

self.saveRecords()
self.commitChanges()
self.record_buffer.flush()

logger.info(f'Ingested {len(self.records)} NYPL records')
logger.info(f'Ingested {len(self.record_buffer.ingest_count)} NYPL records')
except Exception as e:
logger.exception(f'Failed to ingest NYPL records')
raise e
finally:
self.close_connection()
self.db_manager.close_connection()
20 changes: 12 additions & 8 deletions processes/ingest/publisher_backlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from ..core import CoreProcess
from logger import create_log
from managers import S3Manager
from managers import DBManager, S3Manager
from ..record_buffer import RecordBuffer

logger = create_log(__name__)

Expand All @@ -14,16 +15,20 @@ def __init__(self, *args):
self.limit = (len(args) >= 5 and args[4] and args(4) <= 100) or None
self.offset = (len(args) >= 6 and args[5]) or None

self.db_manager = DBManager()

self.db_manager.generateEngine()
self.db_manager.createSession()

self.record_buffer = RecordBuffer(db_manger=self.db_manager)

self.s3_bucket = os.environ['FILE_BUCKET']
self.s3_manager = S3Manager()

self.publisher_backlist_service = PublisherBacklistService()

def runProcess(self):
try:
self.generateEngine()
self.createSession()

if self.process == 'daily':
records = self.publisher_backlist_service.get_records(offset=self.offset, limit=self.limit)
elif self.process == 'complete':
Expand All @@ -35,12 +40,11 @@ def runProcess(self):
return

for record in records:
self.addDCDWToUpdateList(record)
self.record_buffer.add(record)

self.saveRecords()
self.commitChanges()
self.record_buffer.flush()

logger.info(f'Ingested {len(self.records)} Publisher Backlist records')
logger.info(f'Ingested {len(self.record_buffer.ingest_count)} Publisher Backlist records')

except Exception as e:
logger.exception('Failed to run Publisher Backlist process')
Expand Down
Loading

0 comments on commit 2de1213

Please sign in to comment.