Skip to content

Commit

Permalink
Merge pull request #266 from NYPL/SFR-1718v2_AutoIngestionLOC
Browse files Browse the repository at this point in the history
SFR-1718v2_AutoIngestionLOC
  • Loading branch information
mitri-slory authored Aug 29, 2023
2 parents c21423f + cc8398b commit 8472433
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 95 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- Unit tests for Chicago ISAC catalog mapping and process
- Added mapping for Library of Congress collections
- Added process for Library of Congress collections
- Automatic Ingestion for Library of Congress collections
### Fixed
- Modify agentParser method to reduce number of future duplicate agents
- Install `wheel` with pip to fix fasttext build
Expand Down
95 changes: 56 additions & 39 deletions mappings/loc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ def __init__(self, source):
def createMapping(self):
return {
'title': ('title', '{0}'),
'alternative': [('other_title', '{0}')], #One other_title in items block and one outside of it
'medium': [('original_format', '{0}')],
'alternative': ('other_title', '{0}'),
'medium': ('original_format', '{0}'),
'authors': ('contributor', '{0}|||true'),
'dates': ('dates', '{0}|publication_date'),
'publisher': ('item', '{0}'),
'identifiers': [
('number_lccn', '{0}|loc'),
('number_lccn', '{0}|lccn'),
('item', '{0}'),
],
'contributors':
Expand All @@ -28,16 +28,24 @@ def createMapping(self):
,
}

def applyFormatting(self):
def applyFormatting(self):
self.record.has_part = []
self.record.source = 'loc'
self.record.medium = self.record.medium[0]
if self.record.medium:
self.record.medium = self.record.medium[0]
if len(self.record.is_part_of) == 0:
self.record.is_part_of = None
if len(self.record.abstract) == 0:
self.record.abstract = None

#Convert string repr of list to actual list
itemList = ast.literal_eval(self.record.identifiers[1])

self.record.identifiers[0], self.record.identifiers[1], self.record.source_id = self.formatIdentifierSourceID(itemList)

if self.record.identifiers[1] == None:
del self.record.identifiers[1]

self.record.publisher, self.record.spatial = self.formatPubSpatial(itemList)

self.record.extent = self.formatExtent(itemList)
Expand All @@ -48,46 +56,54 @@ def applyFormatting(self):

self.record.languages = self.formatLanguages(itemList)

#Identifier/SourceID Formatting
#Identifier/SourceID Formatting to return (string, string, string)
def formatIdentifierSourceID(self, itemList):
newIdentifier = itemList
newIdentifier['call_number'][0] = f'{newIdentifier["call_number"][0]}|call_number'
lccnNumber = self.record.identifiers[0][0] #lccnNumber comes in as an array and we need the string inside the array
callNumber = newIdentifier['call_number'][0].strip(' ')
sourceID = lccnNumber
if 'call_number' in newIdentifier.keys():
newIdentifier['call_number'][0] = f'{newIdentifier["call_number"][0]}|call_number'
callNumber = newIdentifier['call_number'][0].strip(' ')
else:
callNumber = None
return (lccnNumber, callNumber, sourceID)

#Publisher/Spatial Formatting
#Publisher/Spatial Formatting to return (array, string)
def formatPubSpatial(self, itemList):
pubArray = []
spatialArray = []
for elem in itemList['created_published']:
if ':' not in elem:
createdPublishedList = elem.split(',', 1)
pubLocation = createdPublishedList[0].strip(' ')
if ',' in createdPublishedList[1]:
pubOnly = createdPublishedList[1].split(',')[0].strip(' ')
spatialString = None
if 'created_published' in itemList.keys():
for elem in itemList['created_published']:
if ':' not in elem:
createdPublishedList = elem.split(',', 1)
pubLocation = createdPublishedList[0].strip(' ')
if ',' in createdPublishedList[1]:
pubOnly = createdPublishedList[1].split(',')[0].strip(' ')
pubArray.append(pubOnly)
spatialString = pubLocation
else:
pubLocatAndPubInfo = elem.split(':', 1)
pubLocation = pubLocatAndPubInfo[0].strip()
pubInfo = pubLocatAndPubInfo[1]
pubOnly = pubInfo.split(',', 1)[0].strip()
pubArray.append(pubOnly)
spatialArray.append(pubLocation)
else:
pubLocatAndPubInfo = elem.split(':', 1)
pubLocation = pubLocatAndPubInfo[0].strip()
pubInfo = pubLocatAndPubInfo[1]
pubOnly = pubInfo.split(',', 1)[0].strip()
pubArray.append(pubOnly)
spatialArray.append(pubLocation)
return (pubArray, spatialArray)
spatialString = pubLocation
return (pubArray, spatialString)
else:
return ([], None)

#Extent Formatting
#Extent Formatting to return string
def formatExtent(self, itemList):
extentArray = []
extentString = ''

if 'medium' in itemList:
extentArray.extend(itemList['medium'])

return extentArray
if itemList['medium']:
extentString = itemList['medium'][0]
return extentString

return None

#Subjects Formatting
#Subjects Formatting to return array
def formatSubjects(self, itemList):
subjectArray = []

Expand All @@ -97,17 +113,18 @@ def formatSubjects(self, itemList):

return subjectArray

#Rights Formatting
#Rights Formatting to return string
def formatRights(self, itemList):
rightsArray = []
rightsString = ''

if 'rights_advisory' in itemList:
for elem in itemList['rights_advisory']:
rightsArray.append(f'loc|{elem}|||')

return rightsArray

#Languages Formatting
if itemList['rights_advisory']:
rightsString = f'loc|{itemList["rights_advisory"][0]}|||'
return rightsString

return None

#Languages Formatting to return array
def formatLanguages(self, itemList):
languageArray = []

Expand Down
148 changes: 97 additions & 51 deletions processes/loc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import json
import time
import os, requests
from requests.exceptions import HTTPError, ConnectionError

Expand All @@ -7,11 +7,12 @@
from mappings.loc import LOCMapping
from managers import WebpubManifest
from logger import createLog
from datetime import datetime, timedelta

logger = createLog(__name__)

LOC_ROOT_OPEN_ACCESS = 'https://www.loc.gov/collections/open-access-books/?fo=json&fa=access-restricted%3Afalse&c=2&at=results'
LOC_ROOT_DIGIT = 'https://www.loc.gov/collections/selected-digitized-books/?fo=json&fa=access-restricted%3Afalse&c=2&at=results'
LOC_ROOT_OPEN_ACCESS = 'https://www.loc.gov/collections/open-access-books/?fo=json&fa=access-restricted%3Afalse&c=50&at=results&sb=timestamp_desc'
LOC_ROOT_DIGIT = 'https://www.loc.gov/collections/selected-digitized-books/?fo=json&fa=access-restricted%3Afalse&c=50&at=results&sb=timestamp_desc'

class LOCProcess(CoreProcess):

Expand All @@ -20,44 +21,65 @@ def __init__(self, *args):

self.ingestOffset = int(args[5] or 0)
self.ingestLimit = (int(args[4]) + self.ingestOffset) if args[4] else 5000
self.fullImport = self.process == 'complete'
self.process == 'complete'
self.startTimestamp = None

# Connect to database
self.generateEngine()
self.createSession()

# S3 Configuration
self.createS3Client()
self.s3Bucket = os.environ['FILE_BUCKET']

# Connect to epub processing queue
self.fileQueue = os.environ['FILE_QUEUE']
self.fileRoute = os.environ['FILE_ROUTING_KEY']
self.createRabbitConnection()
self.createOrConnectQueue(self.fileQueue, self.fileRoute)

# S3 Configuration
self.s3Bucket = os.environ['FILE_BUCKET']
self.createS3Client()

def runProcess(self):
if self.process == 'weekly':
startTimeStamp = datetime.utcnow() - timedelta(days=7)
self.importLOCRecords(startTimeStamp)
elif self.process == 'complete':
self.importLOCRecords()
elif self.process == 'custom':
timeStamp = self.ingestPeriod
startTimeStamp = datetime.strptime(timeStamp, '%Y-%m-%dT%H:%M:%S')
self.importLOCRecords(startTimeStamp)

self.saveRecords()
self.commitChanges()


def importLOCRecords(self, startTimeStamp=None):

openAccessRequestCount = 0
digitizedRequestCount = 0

try:
openAccessRequestCount = self.importOpenAccessRecords(openAccessRequestCount)
openAccessRequestCount = self.importOpenAccessRecords(openAccessRequestCount, startTimeStamp)
logger.debug('Open Access Collection Ingestion Complete')

except Exception or HTTPError as e:
logger.exception(e)

try:
digitizedRequestCount = self.importDigitizedRecords(digitizedRequestCount)
digitizedRequestCount = self.importDigitizedRecords(digitizedRequestCount, startTimeStamp)
logger.debug('Digitized Books Collection Ingestion Complete')

except Exception or HTTPError as e:
logger.exception(e)

self.saveRecords()
self.commitChanges()


def importOpenAccessRecords(self, count):
sp = 2
def importOpenAccessRecords(self, count, customTimeStamp):
sp = 1
try:

whileBreakFlag = False

# An HTTP error will occur when the sp parameter value
# passes the last page number of the collection search reuslts
while sp < 100000:
Expand All @@ -66,56 +88,83 @@ def importOpenAccessRecords(self, count):
LOCData = jsonData.json()

for metaDict in LOCData['results']:
resources = metaDict['resources'][0]
if 'pdf' in resources.keys() or 'epub_file' in resources.keys():
logger.debug(f'OPEN ACCESS URL: {openAccessURL}')
logger.debug(f"TITLE: {metaDict['title']}")
#Weekly/Custom Ingestion Conditional
if customTimeStamp:
itemTimeStamp = datetime.strptime(metaDict['timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ')

if itemTimeStamp < customTimeStamp:
whileBreakFlag = True
break

self.processLOCRecord(metaDict)
count += 1
if 'resources' in metaDict.keys():
if metaDict['resources']:
resources = metaDict['resources'][0]
if 'pdf' in resources.keys() or 'epub_file' in resources.keys():
logger.debug(f'OPEN ACCESS URL: {openAccessURL}')
logger.debug(f"TITLE: {metaDict['title']}")

logger.debug(f'Count for OP Access: {count}')
self.processLOCRecord(metaDict)
count += 1

logger.debug(f'Count for OP Access: {count}')

if whileBreakFlag == True:
logger.debug('No new items added to collection')
break

sp += 1
time.sleep(5)

except Exception or HTTPError as e:
if e == Exception:
logger.exception(e)
else:
logger.debug('Open Access Collection Ingestion Complete')
except Exception or HTTPError or IndexError or KeyError as e:
logger.exception(e)

return count

def importDigitizedRecords(self, count):
sp = 2
def importDigitizedRecords(self, count, customTimeStamp):
sp = 1
try:

whileBreakFlag = False

# An HTTP error will occur when the sp parameter value
# passes the last page number of the collection search reuslts
while sp > 100000:
while sp < 100000:
digitizedURL = '{}&sp={}'.format(LOC_ROOT_DIGIT, sp)
jsonData = self.fetchPageJSON(digitizedURL)
LOCData = jsonData.json()

for metaDict in LOCData['results']:
resources = metaDict['resources'][0]
if 'pdf' in resources.keys() or 'epub_file' in resources.keys():
logger.debug(f'DIGITIZED URL: {digitizedURL}')
logger.debug(f"TITLE: {metaDict['title']}")
#Weekly Ingestion conditional
if customTimeStamp:
itemTimeStamp = datetime.strptime(metaDict['timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ')

self.processLOCRecord(metaDict)
count += 1
if itemTimeStamp < customTimeStamp:
whileBreakFlag = True
break

logger.debug(f'Count for Digitized: {count}')
if 'resources' in metaDict.keys():
if metaDict['resources']:
resources = metaDict['resources'][0]
if 'pdf' in resources.keys() or 'epub_file' in resources.keys():
logger.debug(f'DIGITIZED URL: {digitizedURL}')
logger.debug(f"TITLE: {metaDict['title']}")

sp += 1
self.processLOCRecord(metaDict)
count += 1

logger.debug(f'Count for Digitized: {count}')

if whileBreakFlag == True:
logger.debug('No new items added to collection')
break

sp += 1
time.sleep(5)

return count

except Exception or HTTPError as e:
if e == Exception:
logger.exception(e)
else:
logger.debug('Digitized Books Collection Ingestion Complete')
except Exception or HTTPError or IndexError or KeyError as e:
logger.exception(e)

def processLOCRecord(self, record):
try:
Expand Down Expand Up @@ -186,16 +235,13 @@ def storeEpubsInS3(self, record):

recordID = record.identifiers[0].split('|')[0]

flags = json.loads(flagStr)

if flags['download'] is True:
bucketLocation = 'epubs/{}/{}.epub'.format(source, recordID)
self.addEPUBManifest(
record, itemNo, source, flagStr, mediaType, bucketLocation
)
bucketLocation = 'epubs/{}/{}.epub'.format(source, recordID)
self.addEPUBManifest(
record, itemNo, source, flagStr, mediaType, bucketLocation
)

self.sendFileToProcessingQueue(uri, bucketLocation)
break
self.sendFileToProcessingQueue(uri, bucketLocation)
break

def createManifestInS3(self, manifestPath, manifestJSON):
self.putObjectInBucket(
Expand Down
Loading

0 comments on commit 8472433

Please sign in to comment.