Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preview: Indexer groupby #92

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 40 additions & 19 deletions bigquery/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ def existingSampleId = existingSample.get(params.sample_id_column);
}
"""


# Copied from https://stackoverflow.com/a/45392259
def _environ_or_required(key):
if os.environ.get(key):
Expand Down Expand Up @@ -228,15 +227,18 @@ def _sample_scripts_by_id(df, table_name, participant_id_column,
}
}


def _sample_scripts_by_participant_id(df, table_name, participant_id_column, sample_id_column, sample_file_columns):
participant_groups = df.groupby(participant_id_column)
participant_index = 0
# cycle through each group by name (participant_id_column)
for participant_id, group in participant_groups:
participant_index = participant_index + 1
# Remove the participant_id_column since it is stored as document id.
participant_group = participant_groups.get_group(participant_id).drop([participant_id_column], axis=1) #.to_dict('records')
participant_group = participant_groups.get_group(participant_id).drop([participant_id_column], axis=1)
participant_row_dicts = []
for _, row in participant_group.iterrows():
sample_index = 0
for sample_id, row in participant_group.iloc[0:5000].iterrows(): #
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be hard-coded here.

sample_index = sample_index + 1
# Remove nan's as described in
# https://stackoverflow.com/questions/40363926/how-do-i-convert-my-dataframe-into-a-dictionary-while-ignoring-the-nan-values
# Elasticsearch crashes when indexing nan's.
Expand All @@ -259,16 +261,29 @@ def _sample_scripts_by_participant_id(df, table_name, participant_id_column, sam
row_dict[has_name] = False

participant_row_dicts.append(row_dict)

script = UPDATE_PARTICIPANT_SAMPLES_SCRIPT
yield participant_id, {
'source': script,
'lang': 'painless',
'params': {
'samples': participant_row_dicts,
'sample_id_column': sample_id_column
logger.info('participant_index: %d, sample number: %d , sample index %d, sample id %d' % (participant_index, len(participant_row_dicts) , sample_index, sample_id))
if len(participant_row_dicts) >= 500:
script = UPDATE_PARTICIPANT_SAMPLES_SCRIPT
yield participant_id, {
'source': script,
'lang': 'painless',
'params': {
'samples': participant_row_dicts,
'sample_id_column': sample_id_column
}
}
participant_row_dicts = []

if len(participant_row_dicts) > 0:
script = UPDATE_PARTICIPANT_SAMPLES_SCRIPT
yield participant_id, {
'source': script,
'lang': 'painless',
'params': {
'samples': participant_row_dicts,
'sample_id_column': sample_id_column
}
}
}


def index_table(es, index_name, client, table, participant_id_column,
Expand Down Expand Up @@ -312,6 +327,7 @@ def index_table(es, index_name, client, table, participant_id_column,
scripts_by_id = _sample_scripts_by_participant_id( #_sample_scripts_by_id(
df, table_name, participant_id_column, sample_id_column,
sample_file_columns)
#logger.info('%r' % scripts_by_id)
indexer_util.bulk_index_scripts(es, index_name, scripts_by_id)
else:
docs_by_id = _docs_by_id(df, table_name, participant_id_column)
Expand Down Expand Up @@ -347,7 +363,7 @@ def read_table(client, table_name):
client.dataset(dataset_id, project=project_id).table(table_name))


def create_samples_json_export_file(es, index_name, deploy_project_id):
def create_samples_json_export_file(es, index_name, deploy_project_id, sample_id_column):
"""
Writes the samples export JSON file to a GCS bucket. This significantly
speeds up exporting the samples table to Terra in the Data Explorer.
Expand All @@ -359,11 +375,13 @@ def create_samples_json_export_file(es, index_name, deploy_project_id):
"""
entities = []
search = Search(using=es, index=index_name)
for hit in search.scan():
sample_id_col = sample_id_column if sample_id_column is not None else 'sample_id'

for hit in search.params(size=1).scan():
participant_id = hit.meta['id']
doc = hit.to_dict()
for sample in doc.get('samples', []):
sample_id = sample['sample_id']
sample_id = sample[sample_id_col] #sample_id_col = sample_id_column if sample_id_column else 'sample_id'
export_sample = {'participant': participant_id}
for es_field_name, value in sample.iteritems():
# es_field_name looks like "_has_chr_18_vcf", "sample_id" or
Expand Down Expand Up @@ -406,7 +424,7 @@ def main():
deploy_config_path = os.path.join(args.dataset_config_dir, 'deploy.json')
es = indexer_util.maybe_create_elasticsearch_index(args.elasticsearch_url,
index_name)

es.indices.put_settings({'refresh_interval': -1}, index_name) # disables refresh to speed up indexing
participant_id_column = bigquery_config['participant_id_column']
sample_id_column = bigquery_config.get('sample_id_column', None)
sample_file_columns = bigquery_config.get('sample_file_columns', {})
Expand All @@ -420,11 +438,14 @@ def main():
index_fields(es, index_name + '_fields', table, sample_id_column)

# Ensure all of the newly indexed documents are loaded into ES.
time.sleep(5)
es.indices.put_settings({'refresh_interval': "1s"}, index_name) # re-enable refresh
es.indices.forcemerge(index_name)
time.sleep(10)
if os.path.exists(deploy_config_path):
deploy_config = indexer_util.parse_json_file(deploy_config_path)
create_samples_json_export_file(es, index_name,
deploy_config['project_id'])
deploy_config['project_id'],
sample_id_column)


if __name__ == '__main__':
Expand Down