Skip to content

Commit

Permalink
SFR-2245: Refactor dev setup process and fix infinite cluster loop
Browse files Browse the repository at this point in the history
  • Loading branch information
kylevillegas93 committed Oct 10, 2024
1 parent 2beac30 commit 533a81e
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 113 deletions.
15 changes: 0 additions & 15 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,6 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
- ./localstack/init-localstack-resources.sh:/etc/localstack/init/ready.d/init-localstack-resources.sh

devsetup:
container_name: drb_local_devSetUp
depends_on:
- database
- elasticsearch
- s3
build:
context: .
command: -e local-compose -p DevelopmentSetupProcess
volumes:
- type: bind
source: .
target: /src
read_only: true

volumes:
drb_local_dbdata:
drb_local_esdata:
Expand Down
225 changes: 129 additions & 96 deletions processes/developmentSetup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,136 +11,169 @@

from managers.db import DBManager
from .core import CoreProcess
from logger import createLog
from mappings.hathitrust import HathiMapping
from .oclcClassify import ClassifyProcess
from .oclcCatalog import CatalogProcess
from .sfrCluster import ClusterProcess

logger = createLog(__name__)


class DevelopmentSetupProcess(CoreProcess):
def __init__(self, *args):
self.adminDBConnection = DBManager(
self.admin_db_manager = DBManager(
user=os.environ['ADMIN_USER'],
pswd=os.environ['ADMIN_PSWD'],
host=os.environ['POSTGRES_HOST'],
port=os.environ['POSTGRES_PORT'],
db='postgres'
)
self.initializeDB()

self.initialize_db()

super(DevelopmentSetupProcess, self).__init__(*args[:4])

def runProcess(self):
self.generateEngine()
self.createSession()

self.initializeDatabase()

self.createElasticConnection()
self.waitForElasticSearch()
self.createElasticSearchIndex()

self.createRabbitConnection()
self.createOrConnectQueue(os.environ['OCLC_QUEUE'], os.environ['OCLC_ROUTING_KEY'])
self.createOrConnectQueue(os.environ['FILE_QUEUE'], os.environ['FILE_ROUTING_KEY'])

self.fetchHathiSampleData()

procArgs = ['complete'] + ([None] * 4)

self.createRedisClient()
self.clear_cache()

classifyProc = ClassifyProcess(*procArgs)
classifyProc.runProcess()

catalogProc = CatalogProcess(*procArgs)
catalogProc.runProcess(max_attempts=1)
try:
self.generateEngine()
self.createSession()

self.initializeDatabase()

self.createElasticConnection()
self.wait_for_elastic_search()
self.createElasticSearchIndex()

self.createRabbitConnection()
self.createOrConnectQueue(os.environ['OCLC_QUEUE'], os.environ['OCLC_ROUTING_KEY'])
self.createOrConnectQueue(os.environ['FILE_QUEUE'], os.environ['FILE_ROUTING_KEY'])

self.fetch_hathi_sample_data()

process_args = ['complete'] + ([None] * 4)

self.createRedisClient()
self.clear_cache()

classify_process = ClassifyProcess(*process_args)
classify_process.runProcess()

catalog_process = CatalogProcess(*process_args)
catalog_process.runProcess(max_attempts=1)

cluster_process = ClusterProcess(*process_args)
cluster_process.runProcess()
except Exception:
logger.exception(f'Failed to run development setup process')

def initialize_db(self):
self.admin_db_manager.generateEngine()

with self.admin_db_manager.engine.connect() as admin_db_connection:
admin_db_connection.connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

self.create_database(admin_db_connection)
self.create_database_user(admin_db_connection)

self.admin_db_manager.engine.dispose()

def create_database(self, db_connection):
try:
db_connection.execute(
sa.text(f"CREATE DATABASE {os.environ['POSTGRES_NAME']}"),
)
except ProgrammingError:
pass
except Exception as e:
logger.exception('Failed to create database')
raise e

def create_database_user(self, db_connection):
try:
db_connection.execute(
sa.text(
f"CREATE USER {os.environ['POSTGRES_USER']} "
f"WITH PASSWORD '{os.environ['POSTGRES_PSWD']}'",
),
)
db_connection.execute(
sa.text(
f"GRANT ALL PRIVILEGES ON DATABASE {os.environ['POSTGRES_NAME']} "
f"TO {os.environ['POSTGRES_USER']}",
),
)
except ProgrammingError:
pass
except Exception as e:
logger.exception('Failed to create database user')
raise e

clusterProc = ClusterProcess(*procArgs)
clusterProc.runProcess()

def initializeDB(self):
self.adminDBConnection.generateEngine()
with self.adminDBConnection.engine.connect() as conn:
conn.connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
try:
conn.execute(
sa.text(f"CREATE DATABASE {os.environ['POSTGRES_NAME']}"),
)
except ProgrammingError:
pass

try:
conn.execute(
sa.text(
f"CREATE USER {os.environ['POSTGRES_USER']} "
f"WITH PASSWORD '{os.environ['POSTGRES_PSWD']}'",
),
)
conn.execute(
sa.text(
f"GRANT ALL PRIVILEGES ON DATABASE {os.environ['POSTGRES_NAME']} "
f"TO {os.environ['POSTGRES_USER']}",
),
)
except ProgrammingError:
pass

self.adminDBConnection.engine.dispose()

def fetchHathiSampleData(self):
self.importFromHathiTrustDataFile()
self.saveRecords()
self.commitChanges()

def waitForElasticSearch(self):
def wait_for_elastic_search(self):
increment = 5
totalTime = 0
while True and totalTime < 60:
max_time = 60

for _ in range(0, max_time, increment):
try:
self.es.info()
break
except ConnectionError:
pass
except Exception as e:
logger.exception('Failed to wait for elastic search')
raise e

totalTime += increment
sleep(increment)

@staticmethod
def returnHathiDateFormat(strDate):
if 'T' in strDate and '-' in strDate:
return '%Y-%m-%dT%H:%M:%S%z'
elif 'T' in strDate:
return '%Y-%m-%dT%H:%M:%S'
else:
return '%Y-%m-%d %H:%M:%S %z'
def fetch_hathi_sample_data(self):
self.import_from_hathi_trust_data_file()

self.saveRecords()
self.commitChanges()

def importFromHathiTrustDataFile(self):
fileList = requests.get(os.environ['HATHI_DATAFILES'])
if fileList.status_code != 200:
raise IOError('Unable to load data files')
def import_from_hathi_trust_data_file(self):
hathi_files_response = requests.get(os.environ['HATHI_DATAFILES'])

fileJSON = fileList.json()
if hathi_files_response.status_code != 200:
raise Exception('Unable to load Hathi Trust data files')

fileJSON.sort(
key=lambda x: datetime.strptime(
x['created'],
self.returnHathiDateFormat(x['created'])
hathi_files_json = hathi_files_response.json()

hathi_files_json.sort(
key=lambda file: datetime.strptime(
file['created'],
self.map_to_hathi_date_format(file['created'])
).timestamp(),
reverse=True
)

with open('/tmp/tmp_hathi.txt.gz', 'wb') as hathiTSV:
hathiReq = requests.get(fileJSON[0]['url'])
hathiTSV.write(hathiReq.content)
temp_hathi_file = '/tmp/tmp_hathi.txt.gz'
in_copyright_statuses = { 'ic', 'icus', 'ic-world', 'und' }

with open(temp_hathi_file, 'wb') as hathi_tsv_file:
hathi_data_response = requests.get(hathi_files_json[0]['url'])

hathi_tsv_file.write(hathi_data_response.content)

with gzip.open('/tmp/tmp_hathi.txt.gz', 'rt') as unzipTSV:
hathiTSV = csv.reader(unzipTSV, delimiter='\t')
for i, row in enumerate(hathiTSV):
if row[2] not in ['ic', 'icus', 'ic-world', 'und']:
hathiRec = HathiMapping(row, self.statics)
hathiRec.applyMapping()
self.addDCDWToUpdateList(hathiRec)
with gzip.open(temp_hathi_file, 'rt') as unzipped_tsv_file:
hathi_tsv_file = csv.reader(unzipped_tsv_file, delimiter='\t')

if i >= 500:
for number_of_books_ingested, book in enumerate(hathi_tsv_file):
if number_of_books_ingested > 500:
break

book_right = book[2]

if book_right not in in_copyright_statuses:
hathi_record = HathiMapping(book, self.statics)
hathi_record.applyMapping()

self.addDCDWToUpdateList(hathi_record)

def map_to_hathi_date_format(self, date: str):
if 'T' in date and '-' in date:
return '%Y-%m-%dT%H:%M:%S%z'
elif 'T' in date:
return '%Y-%m-%dT%H:%M:%S'
else:
return '%Y-%m-%d %H:%M:%S %z'
4 changes: 2 additions & 2 deletions processes/sfrCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ def clusterRecords(self, full=False, startDateTime=None):
logger.warning('Skipping record {}'.format(rec))
self.updateMatchedRecordsStatus([rec.id])
self.session.commit()
except Exception:
except Exception as e:
logger.exception(f'Failed to cluster record {rec}')
continue
raise e

if len(indexingWorks) >= 50:
self.updateElasticSearch(indexingWorks, deletingWorks)
Expand Down

0 comments on commit 533a81e

Please sign in to comment.