Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #12189 Added mechanism to propagate tripped QC test flags to al… #60

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# Stream Engine

# Development Release 1.6.0 2018-06-12
# Development Release 1.6.0 2018-06-18

Issue #12189 - Add mechanism to propagate tripped QC test flags to all affected parameters
- Modified StreamRequest._run_qc to recognize failed QC tests for a parameter in a dataset,
determine related and nearby sensors and affected streams, and propagate those failures to
the corresponding particles in the affected parameters in the appropriate datasets
- Modified stream metadata service to provide calls to obtain streams for a reference
designator and to obtain reference designators and streams for a subsite and node

Issue #13299 - Asychronous download of specific parameters to CSV and NetCDF is Inconsistent
- Implemented parameter filtering as part of CVS packaging of data requests.
Expand Down
24 changes: 22 additions & 2 deletions util/metadata_service/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,30 @@


@log_timing(_log)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this annotation should still be applied to _get_stream_metadata_list() since this function is called multiple times in _get_stream_metadata_list().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please restate this. You used the same function name as the calling and the called function. I refactored the 1st line of _get_stream_metadata to create _get_stream_metadata_list. I don't see what you mean when you say the function is called multiple times by the other when I look at these two functions.

def _get_stream_metadata_list():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _get_stream_metadata_list() function is getting called in 3 functions: _get_stream_metadata(), get_refdes_streams_for_subsite_node(), and get_streams_for_subsite_node_sensor(). It looks like stream_request.py calls at least two of these functions in _run_qc(). I'm not sure how expensive the metadata_service_api.get_stream_metadata_records() and underlying query are, but it may be a good idea to try to limit how often the metadata lookup occurs.

return metadata_service_api.get_stream_metadata_records()


def _get_stream_metadata():
stream_metadata_record_list = metadata_service_api.get_stream_metadata_records()
return [_RecordInfo(method=rec['method'], stream=rec['stream'], **rec['referenceDesignator'])
for rec in stream_metadata_record_list]
for rec in _get_stream_metadata_list()]


def get_refdes_streams_for_subsite_node(subsite, node):
return [(Dict['referenceDesignator']['subsite']+'-'+Dict['referenceDesignator']['node'] +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to me a bit odd that you have to reconstruct the reference designator when it's in the dictionary as a key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I've seen the data that's returned from the call to _get_stream_metadata_list has a reference_designator map that contains only the subsite, node and sensor elements.

'-'+Dict['referenceDesignator']['sensor'], Dict['stream']) for Dict in _get_stream_metadata_list()
if Dict['referenceDesignator']['subsite'] == subsite and Dict['referenceDesignator']['node'] == node]


def get_streams_for_subsite_node_sensor(subsite, node, sensor):
return [Dict['stream'] for Dict in _get_stream_metadata_list()
if Dict['referenceDesignator']['subsite'] == subsite and Dict['referenceDesignator']['node'] == node
and Dict['referenceDesignator']['sensor'] == sensor]


def get_streams_for_refdes(refdes):
subsite, node, sensor = refdes.split('-', 2)
return get_streams_for_subsite_node_sensor(subsite, node, sensor)


@timed_cache(engine.app.config['METADATA_CACHE_SECONDS'])
Expand Down
212 changes: 209 additions & 3 deletions util/stream_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,29 @@
import util.provenance_metadata_store
from util.annotation import AnnotationStore
from engine import app
from ooi_data.postgres.model import Parameter, Stream, NominalDepth
from ooi_data.postgres.model import NominalDepth, Parameter, ParameterFunction, Stream
from util.asset_management import AssetManagement
from util.cass import fetch_l0_provenance
from util.common import log_timing, StreamEngineException, StreamKey, MissingDataException, read_size_config
from util.metadata_service import build_stream_dictionary, get_available_time_range
from util.metadata_service import build_stream_dictionary, get_available_time_range, get_streams_for_refdes, \
get_refdes_streams_for_subsite_node
from util.qc_executor import QcExecutor
from util.stream_dataset import StreamDataset
from numbers import Number

log = logging.getLogger()

CC = 'CC'
DATA_QC_PROPAGATE_FLAGS = 'dataqc_propagateflags'
DPI = 'dpi_'
NEARBY = 'N'
PD = 'PD'
QC_EXECUTED = '_qc_executed'
QC_RESULTS = '_qc_results'
RELATED = 'R'
# these tests were deemed not ready for propagation by the Data Team
QC_TEST_FAIL_SKIP_PROPAGATE = ('dataqc_gradienttest', 'dataqc_localrangetest', 'dataqc_polytrendtest')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this constant be better off defined in the config file, config/default.py? Then code changes would not be necessary if the data team saw reason to add or remove entries from this group.


PRESSURE_DPI = app.config.get('PRESSURE_DPI')
GPS_STREAM_ID = app.config.get('GPS_STREAM_ID')
LATITUDE_PARAM_ID = app.config.get('LATITUDE_PARAM_ID')
Expand Down Expand Up @@ -64,6 +77,9 @@ def __init__(self, stream_key, parameters, time_range, uflags, qc_parameters=Non
self.datasets = {}
self.external_includes = {}
self.annotation_store = AnnotationStore()
self.qc_affects = {}
self.qc_propagate_flag = None
self.qc_test_flag_map = None

self._initialize()

Expand Down Expand Up @@ -193,11 +209,129 @@ def insert_provenance(self):

@log_timing(log)
def _run_qc(self):
propagate_failures = {}
# execute any QC
for sk, stream_dataset in self.datasets.iteritems():
sk_related_refdes_streams = {}
sk_related_stream_refdes = {}
sk_nearby_refdes_streams = {}
for param in sk.stream.parameters:
param_qc_executed = param.name + QC_EXECUTED
param_qc_results = param.name + QC_RESULTS
for dataset in stream_dataset.datasets.itervalues():
self.qc_executor.qc_check(param, dataset)
# check for QC test failure on 1+ data particles for the parameter
if (param_qc_executed in dataset.keys()) and (param_qc_results in dataset.keys()) and \
(dataset[param_qc_executed].values.min() > dataset[param_qc_results].values.min()):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how the comparison of min values is supposed to indicate whether or not there were any QC failures. I'm thinking dataset[param_qc_executed].values looks something like [21, 21, 21, 21, ..., 21] and dataset[param_qc_results].values is the same structure, but with potentially different numbers. Not sure if these values are possible, but for example, if the qc_executed values were [21, 21, 16, 4] and the qc_results values were [21, 21, 8, 4], then the minimum values would be equal and I think the check would fail to see there was a QC failure (executed value of 16 vs results value of 8).

qc_executed_values = dataset[param_qc_executed].values
qc_results_values = dataset[param_qc_results].values
# list of tuples with the position where a QC variance exists and the variance
qc_variances = [(position, value[0]-value[1])
for position, value in enumerate(zip(qc_executed_values, qc_results_values))
if value[0] != value[1]]
# populate these collections as needed, once per sk
if not sk_related_refdes_streams:
# capture related (by subsite, node) reference-designators and streams in separate
# maps, one where reference-designator is key and one where stream is key
for refdes, stream in get_refdes_streams_for_subsite_node(sk.subsite, sk.node):
sk_related_refdes_streams.setdefault(refdes, set()).add(stream)
sk_related_stream_refdes.setdefault(stream, set()).add(refdes)
if not sk.is_mobile:
# capture nearby (by subsite, depth nearness) reference-designators
# and streams if they are not already considered "related"
max_depth_var = self.derive_max_depth(sk.sensor)
nd = NominalDepth.get_nominal_depth(sk.subsite, sk.node, sk.sensor)
for nearby_nd in nd.get_depth_within(max_depth_var):
if not sk_related_refdes_streams.get(nearby_nd.reference_designator):
nearby_streams = set(get_streams_for_refdes(nearby_nd.reference_designator))
if nearby_streams:
sk_nearby_refdes_streams[nearby_nd.reference_designator] = nearby_streams

# affected_streams: map of streams (and their parameters) affected by param
affected_streams = self._get_affected_streams(param)

# put all the related streams (from the keys of sk_related_stream_refdes) into a set
# and match those streams to affected_streams and prepare to propagate QC test failure
sk_related_streams = set(sk_related_stream_refdes.keys())
for matched_stream in sk_related_streams.intersection(affected_streams):
# get the parameters from the matching affected_streams without param
new_matched_parameters = affected_streams.get(matched_stream, set()).difference({param})
if not new_matched_parameters:
continue
# get the corresponding refdes for use in failure propagation
for matched_refdes in sk_related_stream_refdes.get(matched_stream):
matching_propagation = propagate_failures.get(
(RELATED, matched_refdes, matched_stream, sk.method), {}).get(param)
current_matched_parameters = matching_propagation[1] if matching_propagation else set()
matched_parameters = current_matched_parameters.union(new_matched_parameters)
if matched_parameters > current_matched_parameters:
propagate_failures.setdefault(
(RELATED, matched_refdes, matched_stream, sk.method), {})[param] = \
(qc_variances, matched_parameters)

# match nearby streams to affected_streams and prepare to propagate QC test failure
for review_refdes, review_streams in sk_nearby_refdes_streams.items():
for matched_stream in review_streams.intersection(affected_streams):
new_matched_parameters = affected_streams.get(matched_stream, set()).\
difference({param})
if not new_matched_parameters:
continue
matching_propagation = propagate_failures.get(
(NEARBY, review_refdes, matched_stream, sk.method), {}).get(param)
current_matched_parameters = matching_propagation[1] if matching_propagation else set()
matched_parameters = current_matched_parameters.union(new_matched_parameters)
if matched_parameters > current_matched_parameters:
propagate_failures.setdefault(
(NEARBY, review_refdes, matched_stream, sk.method), {})[param] = \
(qc_variances, matched_parameters)

if not propagate_failures:
return

# process all propagation failures on the tested streams and data sets
for sk, stream_dataset in self.datasets.iteritems():
stream_parameters = set(sk.stream.parameters)
failed_nearby_params_map = propagate_failures.get((NEARBY, sk.as_three_part_refdes(), sk.stream_name,
sk.method), {})
# make a set of the failed input parameters for the sk for matching stream_parameters
failed_nearby_parameters = set(failed_nearby_params_map.keys())
# only propagate failures on affected parameters whose upstream parameters are not inputs to this stream
for matched_parameter in failed_nearby_parameters.difference(stream_parameters):
propagate_input = failed_nearby_params_map[matched_parameter]
qc_variances = propagate_input[0]
affected_parameters = propagate_input[1]
for propagate_affected in affected_parameters.intersection(stream_parameters):
param_qc_executed = propagate_affected.name + QC_EXECUTED
for dataset in stream_dataset.datasets.itervalues():
if param_qc_executed in dataset.keys():
qc_executed_values = dataset[param_qc_executed].values
for position, variance in qc_variances:
# propagate the failure on this particle if propagating tests failed
failed_tests_to_propagate = [(i, self.qc_test_flag_map[i][0])
for i in self.qc_test_flag_map.keys()
if i & variance and self.qc_test_flag_map[i][1]]
if failed_tests_to_propagate:
qc_executed_values[position] = \
qc_executed_values[position] | self.qc_propagate_flag

failed_related_parameters = propagate_failures.get((RELATED, sk.as_three_part_refdes(), sk.stream_name,
sk.method), {})
for propagate_input in failed_related_parameters.values():
qc_variances = propagate_input[0]
affected_parameters = propagate_input[1]
for propagate_affected in affected_parameters.intersection(stream_parameters):
param_qc_executed = propagate_affected.name + QC_EXECUTED
for dataset in stream_dataset.datasets.itervalues():
if param_qc_executed in dataset.keys():
qc_executed_values = dataset[param_qc_executed].values
for position, variance in qc_variances:
# propagate the failure on this particle if propagating tests failed
failed_tests_to_propagate = [(i, self.qc_test_flag_map[i][0])
for i in self.qc_test_flag_map.keys()
if i & variance and self.qc_test_flag_map[i][1]]
if failed_tests_to_propagate:
qc_executed_values[position] = \
qc_executed_values[position] | self.qc_propagate_flag
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that comments 12 and 13 of the 12189 ticket mention this, but I don't understand why QC is propagated by changing bits in the qc_executed parameter instead of the qc_results parameter. My understanding was that _qc_executed indicated what QC tests were run for param and _qc_results indicated which QC tests passed for param. It seems strange to me, therefore, to indicate a propagated failure in qc_executed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, this should be OK. I wasn't realizing there is a special qc propagation flag and thought other test flags were getting overwritten.


# noinspection PyTypeChecker
def _insert_provenance(self):
Expand Down Expand Up @@ -225,6 +359,30 @@ def insert_annotations(self):
for stream_key in self.stream_parameters:
self.annotation_store.add_query_annotations(stream_key, self.time_range)

# builds a map of all streams that are mapped to any parameters that can have affects
def _get_affected_streams(self, param):
affected_params = {param}
# visit parameters that can be affected by this one
params_to_visit = self.qc_affects.get(param, [])

while params_to_visit:
visit_param = params_to_visit.pop()
affected_params.add(visit_param)
# capture and prepare to visit any parameters this one can affect
for review_param in self.qc_affects.get(visit_param, []):
if review_param in affected_params:
continue
affected_params.add(review_param)
params_to_visit.add(review_param)

# build streams map keyed by stream tied to affected parameters
streams = {}
for affected_param in affected_params:
for stream in affected_param.streams:
streams.setdefault(stream.name, set()).add(affected_param)

return streams

def _exclude_flagged_data(self):
"""
Exclude data from datasets based on annotations
Expand Down Expand Up @@ -363,6 +521,50 @@ def _initialize(self):
log.debug('<%s> primary stream internal needs: %r', self.request_id, primary_internals)
self.stream_parameters[self.stream_key] = primary_internals

# The decimal value for the dataqc_propagateflags entry in the parameter_function table
self.qc_propagate_flag = [int(param_func.qc_flag, 2)
for param_func in ParameterFunction.query.filter_by(function_type_id=3).all()
if DATA_QC_PROPAGATE_FLAGS == param_func.name][0]

# Map of decimal value for the 6 QC tests and a tuple of the name and a boolean
# indicating the test is considered one on which test failures are propagated
self.qc_test_flag_map = dict([(int(param_func.qc_flag, 2),
(param_func.name, param_func.name not in QC_TEST_FAIL_SKIP_PROPAGATE))
for param_func in ParameterFunction.query.filter_by(function_type_id=3).all()
if DATA_QC_PROPAGATE_FLAGS != param_func.name])

# map key=Parameter.data_product_identifier,value=set of corresponding Parameters
# build the map from each Parameter with a populated data_product_identifier
dpi = {}
for p in Parameter.query:
if p.data_product_identifier:
dpi.setdefault(p.data_product_identifier, set()).add(p)

# review each Parameter having a populated parameter_function_map
# qc_affects key=the referenced Parameter (via the "dpi_" or "PD" value)
# value=the Parameters being reviewed
for p in Parameter.query:
if p.is_function:
for values in p.parameter_function_map.values():
# force "values" of a single value to be a list
if not isinstance(values, list):
values = [values]
for value in values:
# values that are numbers or calibration coefficients affect no other parameters
if isinstance(value, Number) or value.startswith(CC):
continue
# DPI values reference parameters that are affected
# the key is each Parameter found in the dpi map via the "dpi_" value
if value.startswith(DPI):
dpi_value = value.split(DPI)[-1]
for param in dpi.get(dpi_value, []):
self.qc_affects.setdefault(param, set()).add(p)
# PD numbers reference parameters that are affected
# the key is the Parameter found after the "PD" literal
elif PD in value:
param = Parameter.query.get(value.split(PD)[-1])
self.qc_affects.setdefault(param, set()).add(p)

if self.execute_dpa:
# Identify external parameters needed to support this query
external_to_process = self.stream_key.stream.needs_external(internal_requested)
Expand Down Expand Up @@ -457,7 +659,7 @@ def find_stream(self, stream_key, poss_params, stream=None):
if not stream_key.is_mobile:
nominal_depth = NominalDepth.get_nominal_depth(subsite, node, sensor)
if nominal_depth is not None:
max_depth_var = MAX_DEPTH_VARIANCE_METBK if 'METBK' in sensor else MAX_DEPTH_VARIANCE
max_depth_var = self.derive_max_depth(sensor)
nearby = nominal_depth.get_depth_within(max_depth_var)
for param, search_streams in param_streams:
sk = self._find_stream_from_list(stream_key, search_streams, nearby, stream_dictionary)
Expand Down Expand Up @@ -562,3 +764,7 @@ def compute_request_size(self, size_estimates=SIZE_ESTIMATES):
@staticmethod
def compute_request_time(file_size):
return max(MINIMUM_REPORTED_TIME, file_size * SECONDS_PER_BYTE)

@staticmethod
def derive_max_depth(sensor):
return MAX_DEPTH_VARIANCE_METBK if 'METBK' in sensor else MAX_DEPTH_VARIANCE