diff --git a/backend_py/primary/primary/services/flow_network_assembler/_utils.py b/backend_py/primary/primary/services/flow_network_assembler/_utils.py index 627f67ed0..fe960e1b8 100644 --- a/backend_py/primary/primary/services/flow_network_assembler/_utils.py +++ b/backend_py/primary/primary/services/flow_network_assembler/_utils.py @@ -1,8 +1,10 @@ import logging +import pandas as pd from primary.services.sumo_access.group_tree_types import DataType, EdgeOrNode, TreeType from ._group_tree_dataframe_model import GroupTreeDataframeModel +from .flow_network_types import NodeClassification, SummaryVectorInfo, TreeClassification, NodeSummaryVectorsInfo LOGGER = logging.getLogger(__name__) @@ -189,3 +191,64 @@ def get_data_label(datatype: DataType) -> str: if label is None: raise ValueError(f"Label for datatype {datatype.value} not implemented.") return label + + +def get_sumvecs_for_node_row( + node_row: pd.Series, + node_classifications: dict[str, NodeClassification], + tree_classification: TreeClassification, +) -> tuple[NodeSummaryVectorsInfo, set[str], set[str]]: + nodename = node_row["CHILD"] + keyword = node_row["KEYWORD"] + + if not isinstance(nodename, str) or not isinstance(keyword, str): + raise ValueError("Nodename and keyword must be strings") + + node_classification = node_classifications[nodename] + + datatypes = _compute_datatypes_for_row(nodename, keyword, node_classification, tree_classification) + + node_vectors_info = NodeSummaryVectorsInfo(SMRY_INFO={}) + all_sumvecs = set() + edge_sumvecs = set() + + for datatype in datatypes: + sumvec_name = create_sumvec_from_datatype_nodename_and_keyword(datatype, nodename, keyword) + edge_or_node = get_tree_element_for_data_type(datatype) + + all_sumvecs.add(sumvec_name) + + if edge_or_node == EdgeOrNode.EDGE: + edge_sumvecs.add(sumvec_name) + + node_vectors_info.SMRY_INFO[sumvec_name] = SummaryVectorInfo(DATATYPE=datatype, EDGE_NODE=edge_or_node) + + return (node_vectors_info, all_sumvecs, edge_sumvecs) + + +def _compute_datatypes_for_row( + node_name: str, + node_keyword: str, + node_classification: NodeClassification, + tree_classification: TreeClassification, +) -> list[DataType]: + + is_prod = node_classification.IS_PROD + is_inj = node_classification.IS_INJ + is_terminal = node_name == tree_classification.TERMINAL_NODE + is_wellspec = node_keyword == "WELSPECS" + + datatypes = [DataType.PRESSURE] + + if is_wellspec: + datatypes += [DataType.BHP, DataType.WMCTL] + + if not is_terminal: + if is_prod: + datatypes += [DataType.OILRATE, DataType.GASRATE, DataType.WATERRATE] + if is_inj and tree_classification.HAS_WATER_INJ: + datatypes.append(DataType.WATERINJRATE) + if is_inj and tree_classification.HAS_GAS_INJ: + datatypes.append(DataType.GASINJRATE) + + return datatypes diff --git a/backend_py/primary/primary/services/flow_network_assembler/flow_network_assembler.py b/backend_py/primary/primary/services/flow_network_assembler/flow_network_assembler.py index 14a5993a1..5cdbdf1d0 100644 --- a/backend_py/primary/primary/services/flow_network_assembler/flow_network_assembler.py +++ b/backend_py/primary/primary/services/flow_network_assembler/flow_network_assembler.py @@ -1,5 +1,6 @@ from enum import StrEnum import logging +import asyncio from typing import Dict, List, Optional, Tuple, Literal from dataclasses import dataclass @@ -11,6 +12,7 @@ from fastapi import HTTPException from webviz_pkg.core_utils.perf_timer import PerfTimer + from primary.services.sumo_access.summary_access import Frequency, SummaryAccess from primary.services.sumo_access.group_tree_access import GroupTreeAccess from primary.services.sumo_access.group_tree_types import ( @@ -29,80 +31,63 @@ DatedFlowNetwork, FlowNetworkMetadata, NetworkNode, + FlowNetworkSummaryVectorsInfo, + NodeSummaryVectorsInfo, + StaticNodeWorkingData, + NodeClassification, + TreeClassification, ) LOGGER = logging.getLogger(__name__) -@dataclass -class NodeClassification: - """ - Classification of a node in the flow network. - Can be producer, injector and other over the time period of network. - """ - - # pylint: disable=invalid-name - IS_PROD: bool - IS_INJ: bool - IS_OTHER: bool - - -@dataclass -class SummaryVectorInfo: - """ - Info/metadata for a summary vector for a node in the network - """ - - # pylint: disable=invalid-name - DATATYPE: DataType - EDGE_NODE: EdgeOrNode - - -# For each node, the summary vectors needed to create the flow network dataset -@dataclass -class NodeSummaryVectorsInfo: - # Dict with summary vector name as key, and its metadata as values - # E.g.: {sumvec_1: SummaryVectorInfo, sumvec_2: SummaryVectorInfo, ...} - # pylint: disable=invalid-name - SMRY_INFO: Dict[str, SummaryVectorInfo] - - -@dataclass -class FlowNetworkSummaryVectorsInfo: - """ - Dataclass to hold summary vectors info for the flow network. - - - node_summary_vectors_info_dict - Dict with node name and all its summary vectors info as value - - all_summary_vectors - List of all summary vectors present in the flow network - - edge_summary_vectors - List of summary vectors used for edges in the flow network - """ - - # Dict with node as key, and all the summary vectors w/ metadata for the node as value - node_summary_vectors_info_dict: Dict[str, NodeSummaryVectorsInfo] - all_summary_vectors: set[str] # All summary vectors present in the group tree - edge_summary_vectors: set[str] # All summary vectors used for edges in the group tree - - -@dataclass -class StaticNodeWorkingData: - """ - Static working data for a node in the network. - - Data independent of dates, used for building the flow network. - """ - - node_name: str # Redundant, but kept for debugging purposes - node_classification: NodeClassification - node_summary_vectors_info: Dict[str, SummaryVectorInfo] - - class NodeType(StrEnum): PROD = "prod" INJ = "inj" OTHER = "other" +@dataclass +# Dataclass needs to save a bunch of timestamps. Many attributes is okay here, as splitting it would be more cumbersome +# pylint: disable-next=too-many-instance-attributes +class _PerformanceTimes: + """Simple utility class to store performance timer results for different internal method calls""" + + init_sumo_data: int = 0 + init_summary_vector_list: int = 0 + fetch_grouptree_df: int = 0 + init_grouptree_df_model: int = 0 + create_filtered_dataframe: int = 0 + init_summary_vector_data_table: int = 0 + create_node_classifications: int = 0 + create_network_summary_vectors_info: int = 0 + + # Unused for logging for now, but available if needed + build_and_verify_vectors_of_interest: int = 0 + create_well_node_classifications: int = 0 + + def log_sumo_download_times(self) -> None: + # Log download from Sumo times + LOGGER.info( + f"Total time to fetch data from Sumo: {self.init_sumo_data + self.init_summary_vector_data_table}ms, " + f"Get summary vector list in: {self.init_summary_vector_list}ms, " + f"Get group tree table in: {self.fetch_grouptree_df}ms, " + f"Get summary vectors in: {self.init_summary_vector_data_table}ms" + ) + + def log_structure_init_times(self) -> None: + # Log initialization of data structures times + LOGGER.info( + f"Initialize GroupTreeModel in: {self.init_grouptree_df_model}ms, " + f"Create filtered dataframe in: {self.create_filtered_dataframe}ms, " + f"Create node classifications in: {self.create_node_classifications}ms, " + f"Create group tree summary vectors info in: {self.create_network_summary_vectors_info}ms" + ) + + +# Should probably aim to reduce this, but that's out of scope for my current task, so leaving it as is. It was like this when I got here >:I +# pylint: disable-next=too-many-instance-attributes class FlowNetworkAssembler: """ Class to manage the fetching of data from sumo trees (GRUPTREE or BRANPROP) and vector summary @@ -111,6 +96,8 @@ class FlowNetworkAssembler: **Note: Currently, only the single realization (SINGLE_REAL) mode is supported** """ + # As before, fixing the arguments would be breaking, and out of scope for now. Leaving it as I found it + # pylint: disable-next=too-many-arguments def __init__( self, group_tree_access: GroupTreeAccess, @@ -132,207 +119,154 @@ def __init__( self._realization = realization self._group_tree_access = group_tree_access self._summary_access = summary_access - self._terminal_node = terminal_node self._tree_type = tree_type self._excl_well_startswith = excl_well_startswith self._excl_well_endswith = excl_well_endswith self._summary_resampling_frequency = summary_frequency self._node_types = node_types - self._has_waterinj = False - self._has_gasinj = False - - self._group_tree_df: pd.DataFrame | None = None - self._all_vectors: set[str] | None = None + self._group_tree_df_model: Optional[GroupTreeDataframeModel] = None + self._filtered_group_tree_df: Optional[pd.DataFrame] = None + self._all_vectors: Optional[set[str]] = None self._smry_table_sorted_by_date: pa.Table | None = None - self._node_static_working_data_dict: Dict[str, StaticNodeWorkingData] | None = None + self._node_static_working_data: Dict[str, StaticNodeWorkingData] | None = None + + self._performance_times = _PerformanceTimes() + + # Store tree details in data class to make it easier to feed it to the various helpers + self._tree_classification = TreeClassification( + HAS_GAS_INJ=False, HAS_WATER_INJ=False, TERMINAL_NODE=terminal_node + ) def _verify_that_sumvecs_exists(self, check_sumvecs: set[str]) -> None: """ Takes in a list of summary vectors and checks if they are present among the assemblers available vectors. If any are missing, a ValueError is raised with the list of all missing summary vectors. """ - if self._all_vectors is None: - raise ValueError("List of summary vectors has not been initialized") - # Find vectors that are missing in the valid sumvecs - missing_sumvecs = check_sumvecs - self._all_vectors + missing_sumvecs = check_sumvecs - self._all_vectors_safe if len(missing_sumvecs) > 0: str_missing_sumvecs = ", ".join(missing_sumvecs) raise ValueError("Missing summary vectors for the GroupTree plugin: ", f"{str_missing_sumvecs}.") + async def _initialize_group_tree_df_async(self) -> None: + """Get group tree data from Sumo, and store it in a helper model class""" + timer = PerfTimer() + + group_tree_table_df = await self._group_tree_access.get_group_tree_table(realization=self._realization) + + # Store performance time for later logging + self._performance_times.fetch_grouptree_df = timer.lap_ms() + + if group_tree_table_df is None: + raise HTTPException(status_code=404, detail="Group tree data not found") + + self._group_tree_df_model = GroupTreeDataframeModel(group_tree_table_df, self._tree_type) + + # Store performance time for later logging + self._performance_times.init_grouptree_df_model = timer.lap_ms() + + # Create filtered group tree df from model + self._filtered_group_tree_df = self._group_tree_df_model.create_filtered_dataframe( + terminal_node=self._tree_classification.TERMINAL_NODE, + excl_well_startswith=self._excl_well_startswith, + excl_well_endswith=self._excl_well_endswith, + ) + + # Store performance time for later logging + self._performance_times.create_filtered_dataframe = timer.lap_ms() + async def _initialize_all_vectors_list_async(self) -> None: + timer = PerfTimer() + vector_info_arr = await self._summary_access.get_available_vectors_async() self._all_vectors = {vec.name for vec in vector_info_arr} - async def fetch_and_initialize_async(self) -> None: - """ - Fetches the group tree and summary data from Sumo, and initializes the data structures needed to build a single realization - flow network. - """ + # Store performance time for later logging + self._performance_times.init_summary_vector_list = timer.elapsed_ms() + def _validate_assembler_config(self) -> None: if self._network_mode != NetworkModeOptions.SINGLE_REAL: raise ValueError("Network mode must be SINGLE_REAL to initialize single realization data") if self._realization is None: raise ValueError("FlowNetworkAssembler missing realization") - timer = PerfTimer() + @property + def _group_tree_df_model_safe(self) -> GroupTreeDataframeModel: + if self._group_tree_df_model is None: + raise ValueError("Grouptree dataframe model has not been initialized") + return self._group_tree_df_model - await self._initialize_all_vectors_list_async() - get_summary_vector_list_time_ms = timer.lap_ms() + @property + def _all_vectors_safe(self) -> set[str]: if self._all_vectors is None: raise ValueError("List of summary vectors has not been initialized") + return self._all_vectors - # Get group tree data from Sumo - group_tree_table_df = await self._group_tree_access.get_group_tree_table(realization=self._realization) - get_group_tree_table_time_ms = timer.lap_ms() - if group_tree_table_df is None: - raise HTTPException(status_code=404, detail="Group tree data not found") + @property + def _filtered_group_tree_df_safe(self) -> pd.DataFrame: + if self._filtered_group_tree_df is None: + raise ValueError("Filtered group-tree dataframe has not been initialized") + return self._filtered_group_tree_df - timer.lap_ms() + async def fetch_and_initialize_async(self) -> None: + """ + Fetches the group tree and summary data from Sumo, and initializes the data structures needed to build a single realization + flow network. + """ + timer = PerfTimer() + self._performance_times = _PerformanceTimes() + self._validate_assembler_config() - # Initialize dataframe model - group_tree_df_model = GroupTreeDataframeModel(group_tree_table_df, self._tree_type) - initialize_grouptree_model_time_ms = timer.lap_ms() + # Run data fetch + init concurrently + await asyncio.gather(self._initialize_all_vectors_list_async(), self._initialize_group_tree_df_async()) + self._performance_times.init_sumo_data = timer.lap_ms() - # Get status vectors + available_vectors = self._all_vectors_safe + group_tree_df_model = self._group_tree_df_model_safe + + # Get well status vectors tree_wstat_vectors = _utils.compute_tree_well_vectors(group_tree_df_model, DataType.WELL_STATUS) # Ensure "WSTAT" vectors expected for group tree exist among summary vectors self._verify_that_sumvecs_exists(tree_wstat_vectors) - # Get all vectors of interest existing in the summary data vectors_of_interest = _utils.get_all_vectors_of_interest_for_tree(group_tree_df_model) - vectors_of_interest = vectors_of_interest & self._all_vectors + vectors_of_interest = vectors_of_interest & available_vectors - # Has any water injection or gas injection vectors among vectors of interest - has_wi_vectors = False - has_gi_vectors = False - for vec in vectors_of_interest: - if has_wi_vectors and has_gi_vectors: - break - if vec.startswith("WWIR") or vec.startswith("GWIR"): - has_wi_vectors = True - if vec.startswith("WGIR") or vec.startswith("GGIR"): - has_gi_vectors = True - - # If any water or gas injection vectors exist, require field injection vectors exist - if has_wi_vectors and "FWIR" not in vectors_of_interest: - raise ValueError("Water injection vectors (WWIR/GWIR) found, but missing expected: FWIR") - if has_gi_vectors and "FGIR" not in vectors_of_interest: - raise ValueError("Gas injection vectors (WGIR/GGIR) found, but missing expected: FGIR") + self._verify_neccessary_injection_vectors(vectors_of_interest) + self._performance_times.build_and_verify_vectors_of_interest = timer.lap_ms() # Get summary vectors for all data simultaneously to obtain one request from Sumo # Many summary vectors might not be needed, but will be filtered out later on. This is the most efficient way to get the data # NOTE: "WSTAT" vectors are enumerated well state indicator, thus interpolated values might create issues (should be resolved by resampling-code) - timer.lap_ms() single_realization_vectors_table, _ = await self._summary_access.get_single_real_vectors_table_async( vector_names=list(vectors_of_interest), resampling_frequency=self._summary_resampling_frequency, realization=self._realization, ) - get_summary_vectors_time_ms = timer.lap_ms() + self._performance_times.init_summary_vector_data_table = timer.lap_ms() # Create list of column names in the table once (for performance) vectors_table_column_names = single_realization_vectors_table.column_names - - # Create well node classifications based on "WSTAT" vectors - well_node_classifications: Dict[str, NodeClassification] = {} - for wstat_vector in tree_wstat_vectors: - well = wstat_vector.split(":")[1] - well_states = set(single_realization_vectors_table[wstat_vector].to_pylist()) - well_node_classifications[well] = NodeClassification( - IS_PROD=1.0 in well_states, - IS_INJ=2.0 in well_states, - IS_OTHER=(1.0 not in well_states) and (2.0 not in well_states), - ) - - # Create filtered group tree df from model - timer.lap_ms() - group_tree_df = group_tree_df_model.create_filtered_dataframe( - terminal_node=self._terminal_node, - excl_well_startswith=self._excl_well_startswith, - excl_well_endswith=self._excl_well_endswith, - ) - create_filtered_dataframe_time_ms = timer.lap_ms() - - # Create node classifications based on leaf node classifications - node_classification_dict = _create_node_classification_dict( - group_tree_df, well_node_classifications, single_realization_vectors_table - ) - create_node_classifications_time_ms = timer.lap_ms() + node_classifications = self._create_node_classifications(tree_wstat_vectors, single_realization_vectors_table) # Initialize injection states based on group tree data - if self._terminal_node in node_classification_dict: - is_inj_in_grouptree = node_classification_dict[self._terminal_node].IS_INJ - if is_inj_in_grouptree and "FWIR" in vectors_table_column_names: - self._has_waterinj = pc.sum(single_realization_vectors_table["FWIR"]).as_py() > 0 - if is_inj_in_grouptree and "FGIR" in vectors_table_column_names: - self._has_gasinj = pc.sum(single_realization_vectors_table["FGIR"]).as_py() > 0 + self._init_injection_states(node_classifications, single_realization_vectors_table, vectors_table_column_names) # Get nodes with summary vectors and their metadata, and all summary vectors, and edge summary vectors - # _node_sumvec_info_dict, all_sumvecs, edge_sumvecs = - _group_tree_summary_vectors_info = _create_flow_network_summary_vectors_info( - group_tree_df, node_classification_dict, self._terminal_node, self._has_waterinj, self._has_gasinj + network_summary_vectors_info = self._init_and_verify_network_summary_info( + node_classifications, vectors_table_column_names ) - create_group_tree_summary_vectors_info_tims_ms = timer.lap_ms() - - # Check if all edges is subset of initialized single realization vectors column names - if not _group_tree_summary_vectors_info.edge_summary_vectors.issubset(vectors_table_column_names): - missing_sumvecs = _group_tree_summary_vectors_info.edge_summary_vectors - set(vectors_table_column_names) - raise ValueError(f"Missing summary vectors for edges in the GroupTree: {', '.join(missing_sumvecs)}.") - - # Expect all dictionaries to have the same keys - if set(_group_tree_summary_vectors_info.node_summary_vectors_info_dict.keys()) != set( - node_classification_dict.keys() - ): - raise ValueError("Node classifications and summary vector info must have the same keys.") - - # Create static working data for each node - node_static_working_data_dict: Dict[str, StaticNodeWorkingData] = {} - for node_name, node_classification in node_classification_dict.items(): - node_summary_vectors_info = _group_tree_summary_vectors_info.node_summary_vectors_info_dict[ - node_name - ].SMRY_INFO - node_static_working_data_dict[node_name] = StaticNodeWorkingData( - node_name=node_name, - node_classification=node_classification, - node_summary_vectors_info=node_summary_vectors_info, - ) - self._node_static_working_data_dict = node_static_working_data_dict - - # Expect each node to have working data - node_names_set = set(group_tree_df["CHILD"].unique().tolist()) - if set(self._node_static_working_data_dict.keys()) != node_names_set: - missing_node_working_data = node_names_set - set(self._node_static_working_data_dict.keys()) - raise ValueError(f"Missing static working data for nodes: {missing_node_working_data}") - - # Find group tree vectors existing in summary data - valid_summary_vectors = [ - vec for vec in _group_tree_summary_vectors_info.all_summary_vectors if vec in vectors_table_column_names - ] - columns_of_interest = list(valid_summary_vectors) + ["DATE"] - self._smry_table_sorted_by_date = single_realization_vectors_table.select(columns_of_interest).sort_by("DATE") - - # Assign group tree dataframe - self._group_tree_df = group_tree_df - # Log download from Sumo times - LOGGER.info( - f"Total time to fetch data from Sumo: {get_summary_vector_list_time_ms+get_summary_vectors_time_ms+get_group_tree_table_time_ms}ms, " - f"Get summary vector list in: {get_summary_vector_list_time_ms}ms, " - f"Get group tree table in: {get_group_tree_table_time_ms}ms, " - f"Get summary vectors in: {get_summary_vectors_time_ms}ms" + self._init_node_static_working_data(node_classifications, network_summary_vectors_info) + self._init_sorted_summary_table( + single_realization_vectors_table, vectors_table_column_names, network_summary_vectors_info ) - # Log initialization of data structures times - LOGGER.info( - f"Initialize GroupTreeModel in: {initialize_grouptree_model_time_ms}ms, " - f"Create filtered dataframe in: {create_filtered_dataframe_time_ms}ms, " - f"Create node classifications in: {create_node_classifications_time_ms}ms, " - f"Create group tree summary vectors info in: {create_group_tree_summary_vectors_info_tims_ms}ms" - ) + self._performance_times.log_sumo_download_times() + self._performance_times.log_structure_init_times() async def create_dated_trees_and_metadata_lists( self, @@ -349,15 +283,15 @@ async def create_dated_trees_and_metadata_lists( if self._smry_table_sorted_by_date is None: raise ValueError("Summary dataframe sorted by date has not been initialized") - if self._node_static_working_data_dict is None: + if self._node_static_working_data is None: raise ValueError("Static working data for nodes has not been initialized") dated_tree_list = _create_dated_networks( self._smry_table_sorted_by_date, - self._group_tree_df, - self._node_static_working_data_dict, + self._filtered_group_tree_df_safe, + self._node_static_working_data, self._node_types, - self._terminal_node, + self._tree_classification.TERMINAL_NODE, ) return ( @@ -383,11 +317,11 @@ def _get_edge_options(self, node_types: set[NodeType]) -> List[FlowNetworkMetada if NodeType.PROD in node_types: for rate in [DataType.OILRATE, DataType.GASRATE, DataType.WATERRATE]: options.append(FlowNetworkMetadata(key=rate.value, label=_utils.get_data_label(rate))) - if NodeType.INJ in node_types and self._has_waterinj: + if NodeType.INJ in node_types and self._tree_classification.HAS_WATER_INJ: options.append( FlowNetworkMetadata(key=DataType.WATERINJRATE.value, label=_utils.get_data_label(DataType.WATERINJRATE)) ) - if NodeType.INJ in node_types and self._has_gasinj: + if NodeType.INJ in node_types and self._tree_classification.HAS_GAS_INJ: options.append( FlowNetworkMetadata(key=DataType.GASINJRATE.value, label=_utils.get_data_label(DataType.GASINJRATE)) ) @@ -395,92 +329,170 @@ def _get_edge_options(self, node_types: set[NodeType]) -> List[FlowNetworkMetada return options return [FlowNetworkMetadata(key=DataType.OILRATE.value, label=_utils.get_data_label(DataType.OILRATE))] + def _verify_neccessary_injection_vectors(self, vectors_of_interest: set[str]) -> None: + # Has any water injection or gas injection vectors among vectors of interest + has_wi_vectors = False + has_gi_vectors = False + for vec in vectors_of_interest: + if has_wi_vectors and has_gi_vectors: + break + if vec.startswith("WWIR") or vec.startswith("GWIR"): + has_wi_vectors = True + if vec.startswith("WGIR") or vec.startswith("GGIR"): + has_gi_vectors = True -def _create_flow_network_summary_vectors_info( - group_tree_df: pd.DataFrame, - node_classification_dict: Dict[str, NodeClassification], - terminal_node: str, - has_waterinj: bool, - has_gasinj: bool, -) -> FlowNetworkSummaryVectorsInfo: - """ - Extract summary vector info from the provided group tree dataframe and node classifications. + # If any water or gas injection vectors exist, require field injection vectors exist + if has_wi_vectors and "FWIR" not in vectors_of_interest: + raise ValueError("Water injection vectors (WWIR/GWIR) found, but missing expected: FWIR") + if has_gi_vectors and "FGIR" not in vectors_of_interest: + raise ValueError("Gas injection vectors (WGIR/GGIR) found, but missing expected: FGIR") - The group tree dataframe must have columns ["CHILD", "KEYWORD"] + def _create_node_classifications( + self, wstat_vectors: set[str], vector_data_table: pa.Table + ) -> Dict[str, NodeClassification]: + timer = PerfTimer() - Returns a dataclass which holds summary vectors info for the flow network. A dictionary with node name as key, - and all its summary vectors info as value. Also returns a set with all summary vectors present in the network, - and a set with summary vectors used for edges in the network. + # Create well node classifications based on "WSTAT" vectors + well_node_classifications: Dict[str, NodeClassification] = {} + for wstat_vector in wstat_vectors: + well = wstat_vector.split(":")[1] + well_states = set(vector_data_table[wstat_vector].to_pylist()) + well_node_classifications[well] = NodeClassification( + IS_PROD=1.0 in well_states, + IS_INJ=2.0 in well_states, + IS_OTHER=(1.0 not in well_states) and (2.0 not in well_states), + ) - Rates are not required for the terminal node since they will not be used. + self._performance_times.create_well_node_classifications = timer.elapsed_ms() - `Arguments`: - group_tree_df: pd.DataFrame - Group tree dataframe. Expected columns are: ["CHILD", "KEYWORD"] - node_classification_dict: Dict[str, NodeClassification] - Dictionary with node name as key, and classification as value - terminal_node: str - Name of the terminal node in the group tree - has_waterinj: bool - True if water injection is present in the group tree - has_gasinj: bool - True if gas injection is present in the group tree + # Create node classifications based on leaf node classifications + node_classifications = _create_node_classification_dict( + self._filtered_group_tree_df_safe, well_node_classifications, vector_data_table + ) + self._performance_times.create_node_classifications = timer.lap_ms() - `Returns`: - FlowNetworkSummaryVectorsInfo - """ - node_sumvecs_info_dict: Dict[str, NodeSummaryVectorsInfo] = {} - all_sumvecs: set[str] = set() - edge_sumvecs: set[str] = set() + return node_classifications - unique_nodes = group_tree_df.drop_duplicates(subset=["CHILD", "KEYWORD"]) + def _init_injection_states( + self, + node_classifications: Dict[str, NodeClassification], + vector_table: pa.Table, + vector_column_names: list[str], + ) -> None: + terminal_node_name = self._tree_classification.TERMINAL_NODE - node_names = unique_nodes["CHILD"].to_numpy() - node_keyword = unique_nodes["KEYWORD"].to_numpy() + if terminal_node_name in node_classifications: + is_inj_in_grouptree = node_classifications[terminal_node_name].IS_INJ + if is_inj_in_grouptree and "FWIR" in vector_column_names: + self._tree_classification.HAS_WATER_INJ = pc.sum(vector_table["FWIR"]).as_py() > 0 - if len(node_names) != len(node_keyword): - raise ValueError("Length of node names and keywords must be equal.") + if is_inj_in_grouptree and "FGIR" in vector_column_names: + self._tree_classification.HAS_GAS_INJ = pc.sum(vector_table["FGIR"]).as_py() > 0 + + def _init_and_verify_network_summary_info( + self, + node_classifications: Dict[str, NodeClassification], + vector_column_names: list[str], + ) -> FlowNetworkSummaryVectorsInfo: + timer = PerfTimer() + # Get nodes with summary vectors and their metadata, and all summary vectors, and edge summary vectors + # _node_sumvec_info_dict, all_sumvecs, edge_sumvecs = + network_summary_vectors_info = self._create_flow_network_summary_vectors_info(node_classifications) + + # Check if all edges is subset of initialized single realization vectors column names + if not network_summary_vectors_info.edge_summary_vectors.issubset(vector_column_names): + missing_sumvecs = network_summary_vectors_info.edge_summary_vectors - set(vector_column_names) + raise ValueError(f"Missing summary vectors for edges in the GroupTree: {', '.join(missing_sumvecs)}.") + + # Expect all dictionaries to have the same keys + if set(network_summary_vectors_info.node_summary_vectors_info_dict.keys()) != set(node_classifications.keys()): + raise ValueError("Node classifications and summary vector info must have the same keys.") + + self._performance_times.create_network_summary_vectors_info = timer.elapsed_ms() + + return network_summary_vectors_info - if set(node_names) != set(node_classification_dict.keys()): - missing_node_names = set(node_names) - set(node_classification_dict.keys()) - raise ValueError(f"Node names missing in node classification dict: {missing_node_names}") - - num_nodes = len(node_names) - for i in range(num_nodes): - nodename = node_names[i] - keyword = node_keyword[i] - node_classification = node_classification_dict[nodename] - is_prod = node_classification.IS_PROD - is_inj = node_classification.IS_INJ - - if not isinstance(nodename, str) or not isinstance(keyword, str): - raise ValueError("Nodename and keyword must be strings") - - datatypes = [DataType.PRESSURE] - if is_prod and nodename != terminal_node: - datatypes += [DataType.OILRATE, DataType.GASRATE, DataType.WATERRATE] - if is_inj and has_waterinj and nodename != terminal_node: - datatypes.append(DataType.WATERINJRATE) - if is_inj and has_gasinj and nodename != terminal_node: - datatypes.append(DataType.GASINJRATE) - if keyword == "WELSPECS": - datatypes += [DataType.BHP, DataType.WMCTL] - - if len(datatypes) > 0: - node_sumvecs_info_dict[nodename] = NodeSummaryVectorsInfo(SMRY_INFO={}) - - for datatype in datatypes: - sumvec_name = _utils.create_sumvec_from_datatype_nodename_and_keyword(datatype, nodename, keyword) - - edge_or_node = _utils.get_tree_element_for_data_type(datatype) - - all_sumvecs.add(sumvec_name) - if edge_or_node == EdgeOrNode.EDGE: - edge_sumvecs.add(sumvec_name) - node_sumvecs_info_dict[nodename].SMRY_INFO[sumvec_name] = SummaryVectorInfo( - DATATYPE=datatype, EDGE_NODE=edge_or_node + def _init_node_static_working_data( + self, node_classifications: Dict[str, NodeClassification], network_summary_info: FlowNetworkSummaryVectorsInfo + ) -> None: + # Create static working data for each node + filtered_group_tree_df = self._filtered_group_tree_df_safe + node_static_working_data: Dict[str, StaticNodeWorkingData] = {} + + for node_name, node_classification in node_classifications.items(): + node_summary_vectors_info = network_summary_info.node_summary_vectors_info_dict[node_name].SMRY_INFO + node_static_working_data[node_name] = StaticNodeWorkingData( + node_name=node_name, + node_classification=node_classification, + node_summary_vectors_info=node_summary_vectors_info, ) - return FlowNetworkSummaryVectorsInfo( - node_summary_vectors_info_dict=node_sumvecs_info_dict, - all_summary_vectors=all_sumvecs, - edge_summary_vectors=edge_sumvecs, - ) + # Expect each node to have working data + node_names_set = set(filtered_group_tree_df["CHILD"].unique().tolist()) + if set(node_static_working_data.keys()) != node_names_set: + missing_node_working_data = node_names_set - set(node_static_working_data.keys()) + raise ValueError(f"Missing static working data for nodes: {missing_node_working_data}") + + self._node_static_working_data = node_static_working_data + + def _init_sorted_summary_table( + self, + vector_data_table: pa.Table, + vector_column_names: list[str], + network_summary_info: FlowNetworkSummaryVectorsInfo, + ) -> None: + # Find group tree vectors existing in summary data + valid_summary_vectors = [vec for vec in network_summary_info.all_summary_vectors if vec in vector_column_names] + + columns_of_interest = list(valid_summary_vectors) + ["DATE"] + self._smry_table_sorted_by_date = vector_data_table.select(columns_of_interest).sort_by("DATE") + + def _create_flow_network_summary_vectors_info( + self, node_classification_dict: Dict[str, NodeClassification] + ) -> FlowNetworkSummaryVectorsInfo: + """ + Extract summary vector info from the provided group tree dataframe and node classifications. + + The group tree dataframe must have columns ["CHILD", "KEYWORD"] + + Returns a dataclass which holds summary vectors info for the flow network. A dictionary with node name as key, + and all its summary vectors info as value. Also returns a set with all summary vectors present in the network, + and a set with summary vectors used for edges in the network. + + Rates are not required for the terminal node since they will not be used. + + `Arguments`: + group_tree_df: pd.DataFrame - Group tree dataframe. Expected columns are: ["CHILD", "KEYWORD"] + node_classification_dict: Dict[str, NodeClassification] - Dictionary with node name as key, and classification as value + terminal_node: str - Name of the terminal node in the group tree + has_waterinj: bool - True if water injection is present in the group tree + has_gasinj: bool - True if gas injection is present in the group tree + + `Returns`: + FlowNetworkSummaryVectorsInfo + """ + node_sumvecs_info_dict: Dict[str, NodeSummaryVectorsInfo] = {} + all_sumvecs: set[str] = set() + edge_sumvecs: set[str] = set() + + group_tree_df = self._filtered_group_tree_df_safe + + unique_nodes = group_tree_df.drop_duplicates(subset=["CHILD", "KEYWORD"])[["CHILD", "KEYWORD"]] + + for _, node_row in unique_nodes.iterrows(): + (node_vectors_info, all_node_sumvecs, node_edge_sumvecs) = _utils.get_sumvecs_for_node_row( + node_row, node_classification_dict, self._tree_classification + ) + + node_sumvecs_info_dict[node_row["CHILD"]] = node_vectors_info + all_sumvecs |= all_node_sumvecs + edge_sumvecs |= node_edge_sumvecs + + return FlowNetworkSummaryVectorsInfo( + node_summary_vectors_info_dict=node_sumvecs_info_dict, + all_summary_vectors=all_sumvecs, + edge_summary_vectors=edge_sumvecs, + ) def _create_node_classification_dict( @@ -518,14 +530,12 @@ def _create_node_classification_dict( if len(node_parent_ndarray) != len(node_name_ndarray) or len(node_name_ndarray) != len(node_keyword_ndarray): raise ValueError("Length of node names, parent names and keywords must be equal.") - num_nodes = len(node_name_ndarray) - # Build lists of leaf node, their keyword and parent node. leaf_node_list: List[str] = [] leaf_node_keyword_list: List[str] = [] leaf_node_parent_list: List[str] = [] - for i in range(num_nodes): - node_name = node_name_ndarray[i] + for i, node_name in enumerate(node_name_ndarray): + is_leaf_node = np.count_nonzero(node_parent_ndarray == node_name) == 0 if is_leaf_node: leaf_node_list.append(node_name) @@ -544,6 +554,28 @@ def _create_node_classification_dict( classifying_leafnodes_time_ms = timer.lap_ms() + node_classifications = _build_node_classifications_upwards( + leaf_node_classification_map, leaf_node_parent_list, node_parent_ndarray, node_name_ndarray + ) + + classify_remaining_nodes_time_ms = timer.lap_ms() + + LOGGER.info( + f"Leaf node classification took: {is_leafnode_time_ms}ms, " + f"Classifying leaf nodes took: {classifying_leafnodes_time_ms}ms, " + f"Classify remaining nodes took: {classify_remaining_nodes_time_ms}ms " + f"Total time add node type columns: {timer.elapsed_ms()}ms" + ) + + return node_classifications + + +def _build_node_classifications_upwards( + leaf_node_classification_map: Dict[str, NodeClassification], + leaf_node_parent_list: list[str], + node_parent_ndarray: np.ndarray, + node_name_ndarray: np.ndarray, +) -> Dict[str, NodeClassification]: # Initial node classifications are leaf nodes node_classifications: Dict[str, NodeClassification] = leaf_node_classification_map @@ -596,15 +628,6 @@ def _create_node_classification_dict( missing_node_classifications = set(node_name_list) - set(node_classifications.keys()) raise ValueError(f"Node classifications missing for nodes: {missing_node_classifications}") - classify_remaining_nodes_time_ms = timer.lap_ms() - - LOGGER.info( - f"Leaf node classification took: {is_leafnode_time_ms}ms, " - f"Classifying leaf nodes took: {classifying_leafnodes_time_ms}ms, " - f"Classify remaining nodes took: {classify_remaining_nodes_time_ms}ms " - f"Total time add node type columns: {timer.elapsed_ms()}ms" - ) - return node_classifications @@ -671,6 +694,8 @@ def _create_leaf_node_classification_map( return leaf_node_classifications +# Many of the variables are just taken by performance timer laps as we compute, so the count is hard to reduce +# pylint: disable-next=too-many-locals def _create_dated_networks( smry_sorted_by_date: pa.Table, group_tree_df: pd.DataFrame, @@ -814,54 +839,87 @@ def _create_dated_network( # Extract names once smry_columns_set = set(smry_for_grouptree_sorted_by_date.column_names) - num_rows = len(node_names) # Iterate over every row in the grouptree dataframe to create the tree nodes - for i in range(num_rows): - node_name = node_names[i] + for i, node_name in enumerate(node_names): + if node_name in nodes_dict: + continue + + node_static_working_data = node_static_working_data_dict.get(node_name) + if node_static_working_data is None: + raise ValueError(f"No summary vector info found for node {node_name}") + + if not _is_valid_node_type(node_static_working_data.node_classification, valid_node_types): + continue + + network_node = _create_network_node( + node_name, + keywords[i], + edge_labels[i], + node_static_working_data, + smry_columns_set, + smry_for_grouptree_sorted_by_date, + number_of_dates_in_smry, + ) + + # parent list has matching indices parent_name = parent_names[i] - if node_name not in nodes_dict: - # Find working data for the node - node_static_working_data = node_static_working_data_dict.get(node_name) - if node_static_working_data is None: - raise ValueError(f"No summary vector info found for node {node_name}") + nodes_dict[node_name] = (parent_name, network_node) - if not _is_valid_node_type(node_static_working_data.node_classification, valid_node_types): - continue + # Add children to the nodes, start with terminal node + # ! Mutates nodes_dict + _apped_children(nodes_dict, terminal_node, grouptree_date) - node_type: Literal["Well", "Group"] = "Well" if keywords[i] == "WELSPECS" else "Group" - edge_label = edge_labels[i] - - edge_data: Dict[str, List[float]] = {} - node_data: Dict[str, List[float]] = {} - - # Each row in summary data is a unique date - summary_vector_info = node_static_working_data.node_summary_vectors_info - for sumvec, info in summary_vector_info.items(): - datatype = info.DATATYPE - - data = _get_data_for_summary_vector_info( - sumvec, smry_columns_set, smry_for_grouptree_sorted_by_date, number_of_dates_in_smry - ) - - if info.EDGE_NODE == EdgeOrNode.EDGE: - edge_data[datatype] = data - else: - node_data[datatype] = data - - # children = [], and are added below after each node is created, to prevent recursive search - nodes_dict[node_name] = ( - parent_name, - NetworkNode( - node_label=node_name, - node_type=node_type, - edge_label=edge_label, - edge_data=edge_data, - node_data=node_data, - children=[], - ), - ) + # Terminal node is the dated tree + result = nodes_dict[terminal_node][1] - # Add children to the nodes, start with terminal node + return result + + +def _create_network_node( + node_name: str, + keyword: str, + edge_label: str, + working_data: StaticNodeWorkingData, + smry_columns_set: set, + smry_for_grouptree_sorted_by_date: pa.Table, + number_of_dates_in_smry: int, +) -> NetworkNode: + # Find working data for the node + + node_type: Literal["Well", "Group"] = "Well" if keyword == "WELSPECS" else "Group" + edge_data: Dict[str, List[float]] = {} + node_data: Dict[str, List[float]] = {} + + # Each row in summary data is a unique date + summary_vector_info = working_data.node_summary_vectors_info + for sumvec, info in summary_vector_info.items(): + datatype = info.DATATYPE + + data = _get_data_for_summary_vector_info( + sumvec, smry_columns_set, smry_for_grouptree_sorted_by_date, number_of_dates_in_smry + ) + + if info.EDGE_NODE == EdgeOrNode.EDGE: + edge_data[datatype] = data + else: + node_data[datatype] = data + + # children = [], and are added below after each node is created, to prevent recursive search + return NetworkNode( + node_label=node_name, + node_type=node_type, + edge_label=edge_label, + edge_data=edge_data, + node_data=node_data, + children=[], + ) + + +def _apped_children( + nodes_dict: dict[str, Tuple[str, NetworkNode]], + terminal_node: str, + grouptree_date: pd.Timestamp, +) -> None: terminal_node_elm = nodes_dict.get(terminal_node) if terminal_node_elm is None: date_str = grouptree_date.strftime("%Y-%m-%d") @@ -869,15 +927,10 @@ def _create_dated_network( # Iterate over the nodes dict and add children to the nodes by looking at the parent name # Operates by reference, so each node is updated in the dict - for node_name, (parent_name, node) in nodes_dict.items(): + for _node_name, (parent_name, node) in nodes_dict.items(): if parent_name in nodes_dict: nodes_dict[parent_name][1].children.append(node) - # Terminal node is the dated tree - result = nodes_dict[terminal_node][1] - - return result - def _get_data_for_summary_vector_info( sumvec_name: str, summary_col_set: set, smry_for_grouptree_sorted_by_date: pa.Table, number_of_dates: int diff --git a/backend_py/primary/primary/services/flow_network_assembler/flow_network_types.py b/backend_py/primary/primary/services/flow_network_assembler/flow_network_types.py index 22c2d6d16..30174317c 100644 --- a/backend_py/primary/primary/services/flow_network_assembler/flow_network_types.py +++ b/backend_py/primary/primary/services/flow_network_assembler/flow_network_types.py @@ -1,6 +1,9 @@ +from dataclasses import dataclass from typing import Dict, List, Literal from pydantic import BaseModel +from primary.services.sumo_access.group_tree_types import DataType, EdgeOrNode + class NetworkNode(BaseModel): # NOTE: Not to be confused with the NodeType enum below. We should probably change to some more distinct names at some later time @@ -27,3 +30,78 @@ class FlowNetworkData(BaseModel): edge_metadata_list: List[FlowNetworkMetadata] node_metadata_list: List[FlowNetworkMetadata] dated_trees: List[DatedFlowNetwork] + + +@dataclass +class NodeClassification: + """ + Classification of a node in the flow network. + Can be producer, injector and other over the time period of network. + """ + + # pylint: disable=invalid-name + IS_PROD: bool + IS_INJ: bool + IS_OTHER: bool + + +@dataclass +class TreeClassification: + """ + Classification of a node in the flow network. + Can be producer, injector and other over the time period of network. + """ + + # pylint: disable=invalid-name + TERMINAL_NODE: str + HAS_GAS_INJ: bool + HAS_WATER_INJ: bool + + +@dataclass +class SummaryVectorInfo: + """ + Info/metadata for a summary vector for a node in the network + """ + + # pylint: disable=invalid-name + DATATYPE: DataType + EDGE_NODE: EdgeOrNode + + +# For each node, the summary vectors needed to create the flow network dataset +@dataclass +class NodeSummaryVectorsInfo: + # Dict with summary vector name as key, and its metadata as values + # E.g.: {sumvec_1: SummaryVectorInfo, sumvec_2: SummaryVectorInfo, ...} + # pylint: disable=invalid-name + SMRY_INFO: Dict[str, SummaryVectorInfo] + + +@dataclass +class FlowNetworkSummaryVectorsInfo: + """ + Dataclass to hold summary vectors info for the flow network. + + - node_summary_vectors_info_dict - Dict with node name and all its summary vectors info as value + - all_summary_vectors - List of all summary vectors present in the flow network + - edge_summary_vectors - List of summary vectors used for edges in the flow network + """ + + # Dict with node as key, and all the summary vectors w/ metadata for the node as value + node_summary_vectors_info_dict: Dict[str, NodeSummaryVectorsInfo] + all_summary_vectors: set[str] # All summary vectors present in the group tree + edge_summary_vectors: set[str] # All summary vectors used for edges in the group tree + + +@dataclass +class StaticNodeWorkingData: + """ + Static working data for a node in the network. + + Data independent of dates, used for building the flow network. + """ + + node_name: str # Redundant, but kept for debugging purposes + node_classification: NodeClassification + node_summary_vectors_info: Dict[str, SummaryVectorInfo]