Skip to content

Commit

Permalink
Merge pull request #112 from marblestation/parallel_delta_send
Browse files Browse the repository at this point in the history
Send nonbib data in parallel
  • Loading branch information
marblestation authored Apr 20, 2020
2 parents a727678 + 1095d2e commit 53aecf2
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 290 deletions.
112 changes: 112 additions & 0 deletions adsdata/datalinks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from adsdata import reader

PROPERTY_QUERY = "select string_agg(distinct link_type, ',') as property from {db}.datalinks where bibcode = '{bibcode}'"
ESOURCE_QUERY = "select string_agg(link_sub_type, ',') as eSource from {db}.datalinks where link_type = 'ESOURCE' and bibcode = '{bibcode}'"
DATA_QUERY = "select sum(item_count), string_agg(link_sub_type || ':' || item_count::text, ',') as data from {db}.datalinks where link_type = 'DATA' and bibcode = '{bibcode}'"
DATALINKS_QUERY = "select link_type, link_sub_type, url, title, item_count from {db}.datalinks where bibcode = '{bibcode}'"

def load_column_files_datalinks_table(config, table_name, file_type, raw_conn, cur):
# config['DATALINKS'] is a list of lines that could have one the following two formats
# path,link_type,link_sub_type (i.e., config/links/eprint_html/all.links,ARTICLE,EPRINT_HTML) or
# path,link_type (i.e., config/links/video/all.links,PRESENTATION)
for oneLinkType in config['DATALINKS']:
if (oneLinkType.count(',') == 1):
[filename, linktype] = oneLinkType.split(',')
linksubtype = 'NA'
elif (oneLinkType.count(',') == 2):
[filename, linktype, linksubtype] = oneLinkType.split(',')
else:
return

if linktype == 'ASSOCIATED':
r = reader.DataLinksWithTitleFileReader(file_type, config['DATA_PATH'] + filename, linktype)
elif linktype == 'DATA':
r = reader.DataLinksWithTargetFileReader(file_type, config['DATA_PATH'] + filename, linktype)
else:
r = reader.DataLinksFileReader(file_type, config['DATA_PATH'] + filename, linktype, linksubtype)

if r:
cur.copy_from(r, table_name)
raw_conn.commit()

def add_data_links(session, data):
"""populate property, esource, data, total_link_counts, and data_links_rows fields"""

q = PROPERTY_QUERY.format(db='nonbib', bibcode=data['bibcode'])
result = session.execute(q)
data['property'] = _fetch_data_link_elements(result.fetchone())

q = ESOURCE_QUERY.format(db='nonbib', bibcode=data['bibcode'])
result = session.execute(q)
data['esource'] = _fetch_data_link_elements(result.fetchone())

data = _add_data_link_extra_properties(data)

q = DATA_QUERY.format(db='nonbib', bibcode=data['bibcode'])
result = session.execute(q)
data['data'], data['total_link_counts'] = _fetch_data_link_elements_counts(result.fetchone())

q = DATALINKS_QUERY.format(db='nonbib', bibcode=data['bibcode'])
result = session.execute(q)
data['data_links_rows'] = _fetch_data_link_record(result.fetchall())

def _fetch_data_link_elements(query_result):
elements = []
if (query_result[0] != None):
for e in query_result[0].split(','):
if (e != None):
elements.append(e)
return elements

def _fetch_data_link_elements_counts(query_result):
elements = []
cumulative_count = 0
if (query_result[1] != None):
for e in query_result[1].split(','):
if (e != None):
elements.append(e)
cumulative_count = query_result[0]
return [elements, cumulative_count]


def _fetch_data_link_record(query_result):
# since I want to use this function from the test side,
# I was not able to use the elegant function row2dict function
# convert query results to a list of dicts
columns = ['link_type', 'link_sub_type', 'url', 'title', 'item_count']
results = []
for row in query_result:
d = {}
for i, field in enumerate(row):
d[columns[i]] = field
results.append(d)
return results

def _add_data_link_extra_properties(data):
# first, augment property field with article/nonartile, refereed/not refereed
if data['nonarticle']:
data['property'].append(u'NONARTICLE')
else:
data['property'].append(u'ARTICLE')
if data['refereed']:
data['property'].append(u'REFEREED')
else:
data['property'].append(u'NOT REFEREED')
# now augment the property field with many other boolean fields
extra_properties = ('pub_openaccess', 'private', 'ocrabstract')
for p in extra_properties:
if data[p]:
data['property'].append(p.upper())
# these property fields are set from availability of url
extra_properties_link_type = {'ADS_PDF':'ADS_OPENACCESS', 'ADS_SCAN':'ADS_OPENACCESS',
'AUTHOR_PDF':'AUTHOR_OPENACCESS', 'AUTHOR_HTML':'AUTHOR_OPENACCESS',
'EPRINT_PDF':'EPRINT_OPENACCESS', 'EPRINT_HTML':'EPRINT_OPENACCESS'}
for key,value in extra_properties_link_type.iteritems():
if key in data['esource'] and value not in data['property']:
data['property'].append(value)
# see if there is any of *_openaccess flags set, if so set the generic openaccess flag
if ('ADS_OPENACCESS' in data['property']) or ('AUTHOR_OPENACCESS' in data['property']) or \
('EPRINT_OPENACCESS' in data['property']) or ('PUB_OPENACCESS' in data['property']):
data['property'].append('OPENACCESS')
return data

70 changes: 69 additions & 1 deletion adsdata/tasks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from __future__ import absolute_import, unicode_literals
from sqlalchemy.orm import load_only
import adsdata.app as app_module
from adsputils import get_date, exceptions
from kombu import Queue
import os
from adsmsg import NonBibRecord
import adsmsg
from adsdata import models
from adsdata import datalinks

# ============================= INITIALIZATION ==================================== #

Expand All @@ -13,6 +16,7 @@


app.conf.CELERY_QUEUES = (
Queue('transform-results', app.exchange, routing_key='transform-results'),
Queue('output-results', app.exchange, routing_key='output-results'),
Queue('output-metrics', app.exchange, routing_key='output-metrics'),
)
Expand All @@ -21,6 +25,70 @@
# ============================= TASKS ============================================= #


# fields needed from nonbib to compute master record
nonbib_to_master_select_fields = ('bibcode', 'boost', 'citation_count',
'grants', 'ned_objects', 'nonarticle', 'norm_cites', 'ocrabstract',
'private', 'pub_openaccess', 'read_count', 'readers', 'reference',
'refereed', 'simbad_objects')

# select fields that are not sent to master, they are used to compute solr property field
nonbib_to_master_property_fields = ('nonarticle', 'ocrabstract', 'private', 'pub_openaccess',
'refereed', '_sa_instance_state')

@app.task(queue='transform-results')
def task_transform_results(schema, source=models.NonBibDeltaTable, offset=0, limit=100):
"""
Transform records from the database to protobuf to be sent to master pipeline.
Source can be:
- models.NonBibDeltaTable (delta updates) or models.NonBibTable (all) but
the task will only transform and send the limited number of records between
offset and offset+limit.
- a list of bibcodes (offset and limit parameters are ignored)
"""

if source is not models.NonBibDeltaTable and source is not models.NonBibTable and not isinstance(source, (list, tuple)):
raise Exception("Invalid source, it should be models.NonBibTable, models.NonBibDeltaTable, or a list of bibcodes")
elif isinstance(source, (list, tuple)):
bibcodes = source
model = None
else:
bibcodes = None
model = source

with app.session_scope() as session:
session.execute('set search_path to {}'.format(schema))
records = []
if bibcodes:
q = session.query(models.NonBibTable).options(load_only('bibcode')).filter(models.NonBibTable.bibcode.in_(bibcodes))
else:
q = session.query(model).options(load_only('bibcode')).offset(offset).limit(limit)
for record in q.yield_per(100):
q = session.query(models.NonBibTable).filter(models.NonBibTable.bibcode==record.bibcode).options(load_only(*nonbib_to_master_select_fields))
current_row = q.first()
if current_row:
current_data = current_row.__dict__
author_count = len(current_data.get('authors', ()))
current_data['citation_count_norm'] = current_data.get('citation_count', 0) / float(max(author_count, 1))
#
datalinks.add_data_links(session, current_data)
_cleanup_for_master(current_data)
record = adsmsg.NonBibRecord(**current_data)
records.append(record._data)

if len(records) > 0:
records_list = adsmsg.NonBibRecordList()
records_list.nonbib_records.extend(records)
logger.debug("Calling 'task_output_results' for offset '%i' and limit '%i'", offset, limit)
task_output_results.delay(records_list)


def _cleanup_for_master(r):
"""delete values from dict not needed by protobuf to master pipeline"""
for f in nonbib_to_master_property_fields:
r.pop(f, None)


@app.task(queue='output-results')
def task_output_results(msg):
"""
Expand Down
7 changes: 0 additions & 7 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,3 @@
OUTPUT_CELERY_BROKER = 'pyamqp://guest:guest@localhost:5682/master_pipeline'
OUTPUT_TASKNAME = 'adsmp.tasks.task_update_record'

PROPERTY_QUERY = "select string_agg(distinct link_type, ',') as property from {db}.datalinks where bibcode = '{bibcode}'"

ESOURCE_QUERY = "select string_agg(link_sub_type, ',') as eSource from {db}.datalinks where link_type = 'ESOURCE' and bibcode = '{bibcode}'"

DATA_QUERY = "select sum(item_count), string_agg(link_sub_type || ':' || item_count::text, ',') as data from {db}.datalinks where link_type = 'DATA' and bibcode = '{bibcode}'"

DATALINKS_QUERY = "select link_type, link_sub_type, url, title, item_count from {db}.datalinks where bibcode = '{bibcode}'"
Loading

0 comments on commit 53aecf2

Please sign in to comment.