From c5a64e79d689353092ed39504060bb48ea18be81 Mon Sep 17 00:00:00 2001 From: Thomas Erland Clausen Date: Mon, 27 Jan 2025 12:36:34 +0100 Subject: [PATCH 1/5] updating requirements to get package repo from personal branch --- .docker/requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.docker/requirements.txt b/.docker/requirements.txt index 7df4a3731b..25bda442ef 100644 --- a/.docker/requirements.txt +++ b/.docker/requirements.txt @@ -40,4 +40,5 @@ pytest-mock==3.14.0 # Custom packages opengeh-spark-sql-migrations @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages@2.4.2#subdirectory=source/spark_sql_migrations -opengeh-telemetry @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages@2.4.2#subdirectory=source/telemetry +# opengeh-telemetry @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages@2.4.2#subdirectory=source/telemetry +opengeh-telemetry @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages.git@cbang/simplify_entry_point#subdirectory=source/telemetry From 2470359b62eda173f5bb1becbadd745f503272f2 Mon Sep 17 00:00:00 2001 From: Thomas Erland Clausen Date: Wed, 29 Jan 2025 10:50:06 +0100 Subject: [PATCH 2/5] editing entry point and param settings --- .docker/requirements.txt | 2 + .../package/calculation/calculator_args.py | 61 +++++++++++++++---- .../package/calculator_job.py | 52 ++++++++++++++-- .../package/calculator_job_args.py | 2 +- .../infrastructure/infrastructure_settings.py | 55 +++++++++++++---- 5 files changed, 140 insertions(+), 32 deletions(-) diff --git a/.docker/requirements.txt b/.docker/requirements.txt index 25bda442ef..afafe17f52 100644 --- a/.docker/requirements.txt +++ b/.docker/requirements.txt @@ -37,6 +37,8 @@ coverage==7.6.8 pytest==8.3.3 configargparse==1.7.0 pytest-mock==3.14.0 +pydantic==2.10.5 +pydantic-settings==2.7.1 # Custom packages opengeh-spark-sql-migrations @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages@2.4.2#subdirectory=source/spark_sql_migrations diff --git a/source/databricks/calculation_engine/package/calculation/calculator_args.py b/source/databricks/calculation_engine/package/calculation/calculator_args.py index dc10e94c00..c482ccdb66 100644 --- a/source/databricks/calculation_engine/package/calculation/calculator_args.py +++ b/source/databricks/calculation_engine/package/calculation/calculator_args.py @@ -14,21 +14,56 @@ from dataclasses import dataclass from datetime import datetime - +from pydantic_settings import BaseSettings, CliSettingsSource, PydanticBaseSettingsSource +from typing import Tuple, Type, Optional from package.codelists.calculation_type import ( CalculationType, ) -@dataclass -class CalculatorArgs: - calculation_id: str - calculation_grid_areas: list[str] - calculation_period_start_datetime: datetime - calculation_period_end_datetime: datetime - calculation_type: CalculationType - calculation_execution_time_start: datetime - created_by_user_id: str - time_zone: str - quarterly_resolution_transition_datetime: datetime - is_internal_calculation: bool +# @dataclass +# class CalculatorArgs: +# calculation_id: str +# calculation_grid_areas: list[str] +# calculation_period_start_datetime: datetime +# calculation_period_end_datetime: datetime +# calculation_type: CalculationType +# calculation_execution_time_start: datetime +# created_by_user_id: str +# time_zone: str +# quarterly_resolution_transition_datetime: datetime +# is_internal_calculation: bool + +import os + +os.environ["TIME_ZONE"] = "test_time_zone" + +class CalculatorArgs(BaseSettings): + """ + CalculatorArgs class uses Pydantic BaseSettings to configure and validate parameters. + Parameters can come from both runtime (CLI) or from environment variables. + The priority is CLI parameters first and then environment variables. + """ + calculation_id: Optional[str] = None + calculation_grid_areas: Optional[list[str]] = None + calculation_period_start_datetime: Optional[datetime] = None + calculation_period_end_datetime: Optional[datetime] = None + calculation_type: Optional[CalculationType] = None + calculation_execution_time_start: Optional[datetime] = None + created_by_user_id: Optional[str] = None + time_zone: Optional[str] = None + quarterly_resolution_transition_datetime: Optional[datetime] = None + is_internal_calculation: Optional[bool] = None + + @classmethod + def settings_customise_sources( + cls, + settings_cls: Type[BaseSettings], + init_settings: PydanticBaseSettingsSource, + env_settings: PydanticBaseSettingsSource, + dotenv_settings: PydanticBaseSettingsSource, + file_secret_settings: PydanticBaseSettingsSource, + ) -> Tuple[PydanticBaseSettingsSource, ...]: + return CliSettingsSource(settings_cls, cli_parse_args=True, cli_ignore_unknown_args=True), env_settings + + diff --git a/source/databricks/calculation_engine/package/calculator_job.py b/source/databricks/calculation_engine/package/calculator_job.py index 3272a48242..8b872da9fb 100644 --- a/source/databricks/calculation_engine/package/calculator_job.py +++ b/source/databricks/calculation_engine/package/calculator_job.py @@ -36,19 +36,59 @@ from package.infrastructure import initialize_spark from package.infrastructure.infrastructure_settings import InfrastructureSettings +from telemetry_logging import Logger, logging_configuration +from telemetry_logging.logging_configuration import configure_logging,LoggingSettings +from telemetry_logging.decorators import start_trace, use_span + # The start() method should only have its name updated in correspondence with the # wheels entry point for it. Further the method must remain parameterless because # it will be called from the entry point when deployed. + def start() -> None: - applicationinsights_connection_string = os.getenv( - "APPLICATIONINSIGHTS_CONNECTION_STRING" + + logging_settings = LoggingSettings() + + configure_logging( + logging_settings=logging_settings, + extras={'tracer_name': 'calculator-job'} #Tjek om denne skal kunne overskrives eller om den bare skal bruge subsystem ) - start_with_deps( - applicationinsights_connection_string=applicationinsights_connection_string + foo() + +@start_trace() +def foo() -> None: + spark = initialize_spark() + args = CalculatorArgs() #Lav class med BaseSettings + infrastructure_settings = InfrastructureSettings() #Lav class med BaseSettings + create_and_configure_container(spark, infrastructure_settings) + + prepared_data_reader = create_prepared_data_reader( + infrastructure_settings, spark ) + if not prepared_data_reader.is_calculation_id_unique(args.calculation_id): + raise Exception( + f"Calculation ID '{args.calculation_id}' is already used." + ) + try: + calculation_executor( + args, + prepared_data_reader, + CalculationCore(), + CalculationMetadataService(), + CalculationOutputService(), + ) + + + + + + + +# -------------------------------------------------------- + + def start_with_deps( *, @@ -83,7 +123,7 @@ def start_with_deps( span.set_attributes(config.get_extras()) args, infrastructure_settings = parse_job_args(command_line_args) - +# -------------------------------------------------------------------------- spark = initialize_spark() create_and_configure_container(spark, infrastructure_settings) @@ -105,6 +145,8 @@ def start_with_deps( ) # Added as ConfigArgParse uses sys.exit() rather than raising exceptions + + # ------ TJEK at dette er håndteret i start_trace() except SystemExit as e: if e.code != 0: span_record_exception(e, span) diff --git a/source/databricks/calculation_engine/package/calculator_job_args.py b/source/databricks/calculation_engine/package/calculator_job_args.py index 1c6d629c7b..9c014b3d80 100644 --- a/source/databricks/calculation_engine/package/calculator_job_args.py +++ b/source/databricks/calculation_engine/package/calculator_job_args.py @@ -47,7 +47,7 @@ def parse_job_arguments( with logging_configuration.start_span("calculation.parse_job_arguments"): - time_zone = env_vars.get_time_zone() + # time_zone = env_vars.get_time_zone() quarterly_resolution_transition_datetime = ( env_vars.get_quarterly_resolution_transition_datetime() ) diff --git a/source/databricks/calculation_engine/package/infrastructure/infrastructure_settings.py b/source/databricks/calculation_engine/package/infrastructure/infrastructure_settings.py index b8a2ceeae5..3877484a73 100644 --- a/source/databricks/calculation_engine/package/infrastructure/infrastructure_settings.py +++ b/source/databricks/calculation_engine/package/infrastructure/infrastructure_settings.py @@ -13,19 +13,48 @@ # limitations under the License. from dataclasses import dataclass, field - +from pydantic_settings import BaseSettings, CliSettingsSource, PydanticBaseSettingsSource +from typing import Tuple, Type, Optional from azure.identity import ClientSecretCredential -@dataclass -class InfrastructureSettings: - catalog_name: str - calculation_input_database_name: str - data_storage_account_name: str - # Prevent the credentials from being printed or logged (using e.g. print() or repr()) - data_storage_account_credentials: ClientSecretCredential = field(repr=False) - wholesale_container_path: str - calculation_input_path: str - time_series_points_table_name: str | None - metering_point_periods_table_name: str | None - grid_loss_metering_point_ids_table_name: str | None +# @dataclass +# class InfrastructureSettings: +# catalog_name: str +# calculation_input_database_name: str +# data_storage_account_name: str +# # Prevent the credentials from being printed or logged (using e.g. print() or repr()) +# data_storage_account_credentials: ClientSecretCredential = field(repr=False) +# wholesale_container_path: str +# calculation_input_path: str +# time_series_points_table_name: str | None +# metering_point_periods_table_name: str | None +# grid_loss_metering_point_ids_table_name: str | None + +class InfrastructureSettings(BaseSettings): + """ + InfrastructureSettings class uses Pydantic BaseSettings to configure and validate parameters. + Parameters can come from both runtime (CLI) or from environment variables. + The priority is CLI parameters first and then environment variables. + """ + catalog_name: Optional[str] = None + calculation_input_database_name: Optional[str] = None + data_storage_account_name: Optional[str] = None + # Prevent the credentials from being printed or logged (using e.g., print() or repr()) + data_storage_account_credentials: Optional[ClientSecretCredential] = field(default=None, repr=False) + wholesale_container_path: Optional[str] = None + calculation_input_path: Optional[str] = None + time_series_points_table_name: Optional[str] = None + metering_point_periods_table_name: Optional[str] = None + grid_loss_metering_point_ids_table_name: Optional[str] = None + + @classmethod + def settings_customise_sources( + cls, + settings_cls: Type[BaseSettings], + init_settings: PydanticBaseSettingsSource, + env_settings: PydanticBaseSettingsSource, + dotenv_settings: PydanticBaseSettingsSource, + file_secret_settings: PydanticBaseSettingsSource, + ) -> Tuple[PydanticBaseSettingsSource, ...]: + return CliSettingsSource(settings_cls, cli_parse_args=True, cli_ignore_unknown_args=True), env_settings From 0bfdfc8342b3866558114b57f7a5b24722e26553 Mon Sep 17 00:00:00 2001 From: Thomas Erland Clausen Date: Tue, 4 Feb 2025 14:42:46 +0100 Subject: [PATCH 3/5] changing calculator start and param fetching --- .docker/requirements.txt | 2 +- .../calculation-job-parameters-reference.txt | 2 + ...l-calculation-job-parameters-reference.txt | 3 + .../package/calculation/calculator_args.py | 37 ++- .../package/calculator_job.py | 265 ++++++++++++------ .../package/calculator_job_args.py | 4 +- .../infrastructure/environment_variables.py | 1 + .../infrastructure/infrastructure_settings.py | 72 ++++- .../tests/calculator_job/test_start.py | 53 +++- .../calculation_engine/tests/conftest.py | 44 ++- 10 files changed, 358 insertions(+), 125 deletions(-) diff --git a/.docker/requirements.txt b/.docker/requirements.txt index afafe17f52..dd00182de0 100644 --- a/.docker/requirements.txt +++ b/.docker/requirements.txt @@ -43,4 +43,4 @@ pydantic-settings==2.7.1 # Custom packages opengeh-spark-sql-migrations @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages@2.4.2#subdirectory=source/spark_sql_migrations # opengeh-telemetry @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages@2.4.2#subdirectory=source/telemetry -opengeh-telemetry @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages.git@cbang/simplify_entry_point#subdirectory=source/telemetry +opengeh-telemetry @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages.git@cbang/simplify_entry_point2#subdirectory=source/telemetry diff --git a/source/databricks/calculation_engine/contracts/calculation-job-parameters-reference.txt b/source/databricks/calculation_engine/contracts/calculation-job-parameters-reference.txt index 97962cbf31..0fcb6ced6d 100644 --- a/source/databricks/calculation_engine/contracts/calculation-job-parameters-reference.txt +++ b/source/databricks/calculation_engine/contracts/calculation-job-parameters-reference.txt @@ -13,3 +13,5 @@ --created-by-user-id=c2975345-d935-44a2-b7bf-2629db4aa8bf # It's a flag: True if present, otherwise false #--is-internal-calculation +--orchestration_instance_id=4a540892-2c0a-46a9-9257-c4e13051d76a +--force_configuration = false diff --git a/source/databricks/calculation_engine/contracts/internal-calculation-job-parameters-reference.txt b/source/databricks/calculation_engine/contracts/internal-calculation-job-parameters-reference.txt index 5ed235ab36..be0728e7cf 100644 --- a/source/databricks/calculation_engine/contracts/internal-calculation-job-parameters-reference.txt +++ b/source/databricks/calculation_engine/contracts/internal-calculation-job-parameters-reference.txt @@ -13,3 +13,6 @@ --created-by-user-id=c2975345-d935-44a2-b7bf-2629db4aa8bf # It's a flag: True if present, otherwise false --is-internal-calculation +--orchestration_instance_id=4a540892-2c0a-46a9-9257-c4e13051d76a +--force_configuration = false + diff --git a/source/databricks/calculation_engine/package/calculation/calculator_args.py b/source/databricks/calculation_engine/package/calculation/calculator_args.py index c482ccdb66..5bccc126dc 100644 --- a/source/databricks/calculation_engine/package/calculation/calculator_args.py +++ b/source/databricks/calculation_engine/package/calculation/calculator_args.py @@ -13,12 +13,14 @@ # limitations under the License. from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timezone from pydantic_settings import BaseSettings, CliSettingsSource, PydanticBaseSettingsSource +from pydantic.dataclasses import Field from typing import Tuple, Type, Optional from package.codelists.calculation_type import ( CalculationType, ) +import os # @dataclass @@ -34,9 +36,6 @@ # quarterly_resolution_transition_datetime: datetime # is_internal_calculation: bool -import os - -os.environ["TIME_ZONE"] = "test_time_zone" class CalculatorArgs(BaseSettings): """ @@ -44,16 +43,16 @@ class CalculatorArgs(BaseSettings): Parameters can come from both runtime (CLI) or from environment variables. The priority is CLI parameters first and then environment variables. """ - calculation_id: Optional[str] = None - calculation_grid_areas: Optional[list[str]] = None - calculation_period_start_datetime: Optional[datetime] = None - calculation_period_end_datetime: Optional[datetime] = None - calculation_type: Optional[CalculationType] = None - calculation_execution_time_start: Optional[datetime] = None - created_by_user_id: Optional[str] = None - time_zone: Optional[str] = None - quarterly_resolution_transition_datetime: Optional[datetime] = None - is_internal_calculation: Optional[bool] = None + calculation_id: str = Field(..., alias="calculation-id") #From CLI + calculation_grid_areas: list[str] = Field(..., alias="grid-areas") #From CLI + calculation_period_start_datetime: datetime = Field(..., alias="period-start-datetime") #From CLI + calculation_period_end_datetime: datetime = Field(..., alias="period-end-datetime") #From CLI + calculation_type: CalculationType = Field(..., alias="calculation-type") #From CLI + calculation_execution_time_start: datetime = datetime.now(timezone.utc) + created_by_user_id: str = Field(..., alias="created-by-user-id") #From CLI + time_zone: str #From ENVIRONMENT + quarterly_resolution_transition_datetime: datetime #From ENVIRONMENT + is_internal_calculation: Optional[bool] = Field(default=False, alias="is-internal-calculation") #From CLI @classmethod def settings_customise_sources( @@ -64,6 +63,14 @@ def settings_customise_sources( dotenv_settings: PydanticBaseSettingsSource, file_secret_settings: PydanticBaseSettingsSource, ) -> Tuple[PydanticBaseSettingsSource, ...]: - return CliSettingsSource(settings_cls, cli_parse_args=True, cli_ignore_unknown_args=True), env_settings + return CliSettingsSource(settings_cls, cli_parse_args=True, cli_ignore_unknown_args=True), env_settings, init_settings + +# os.environ['TIME_ZONE'] = 'tzutc' +# os.environ['QUARTERLY_RESOLUTION_TRANSITION_DATETIME'] = '1989-01-01' +# +# _settings = CalculatorArgs() +# # +# for s in _settings: +# print(s) diff --git a/source/databricks/calculation_engine/package/calculator_job.py b/source/databricks/calculation_engine/package/calculator_job.py index 8b872da9fb..2ad9350165 100644 --- a/source/databricks/calculation_engine/package/calculator_job.py +++ b/source/databricks/calculation_engine/package/calculator_job.py @@ -16,7 +16,7 @@ import sys from argparse import Namespace from typing import Callable, Tuple - +from datetime import datetime from opentelemetry.trace import SpanKind from pyspark.sql import SparkSession @@ -27,10 +27,6 @@ from package.calculation.calculation_metadata_service import CalculationMetadataService from package.calculation.calculation_output_service import CalculationOutputService from package.calculation.calculator_args import CalculatorArgs -from package.calculator_job_args import ( - parse_job_arguments, - parse_command_line_arguments, -) from package.container import create_and_configure_container from package.databases import migrations_wholesale, wholesale_internal from package.infrastructure import initialize_spark @@ -38,29 +34,74 @@ from telemetry_logging import Logger, logging_configuration from telemetry_logging.logging_configuration import configure_logging,LoggingSettings -from telemetry_logging.decorators import start_trace, use_span +from telemetry_logging.decorators import start_trace +from package.common.datetime_utils import ( + is_exactly_one_calendar_month, + is_midnight_in_time_zone, +) +from package.codelists.calculation_type import ( + CalculationType, + is_wholesale_calculation_type, +) + # The start() method should only have its name updated in correspondence with the # wheels entry point for it. Further the method must remain parameterless because # it will be called from the entry point when deployed. + + def start() -> None: - logging_settings = LoggingSettings() + # Parse params for LoggingSettings + logging_settings = LoggingSettings( + cloud_role_name = "dbr-calculation-engine", + subsystem = "calculator-job" #Will be used as trace_name + ) + # Parse params for CalculatorArgs and InfrastructureSettings + args = CalculatorArgs() + infrastructure_settings = InfrastructureSettings() + + # _validate_quarterly_resolution_transition_datetime( + # args.quarterly_resolution_transition_datetime, + # args.time_zone, + # args.period_start_datetime, + # args.period_end_datetime, + # ) + # + # if is_wholesale_calculation_type(args.calculation_type): + # _validate_period_for_wholesale_calculation( + # args.time_zone, + # args.period_start_datetime, + # args.period_end_datetime, + # ) + # + # _throw_exception_if_internal_calculation_and_not_aggregation_calculation_type( + # args + # ) + configure_logging( logging_settings=logging_settings, - extras={'tracer_name': 'calculator-job'} #Tjek om denne skal kunne overskrives eller om den bare skal bruge subsystem + extras = dict(Subsystem = "wholesale-aggregations", + calculation_id = args.calculation_id) + # extras={'tracer_name': 'calculator-job'} ) - foo() + start_with_deps(args, infrastructure_settings) + + + + @start_trace() -def foo() -> None: +def start_with_deps( + args: CalculatorArgs, + infrastructure_settings: InfrastructureSettings, +) -> None: spark = initialize_spark() - args = CalculatorArgs() #Lav class med BaseSettings - infrastructure_settings = InfrastructureSettings() #Lav class med BaseSettings + create_and_configure_container(spark, infrastructure_settings) prepared_data_reader = create_prepared_data_reader( @@ -71,90 +112,85 @@ def foo() -> None: raise Exception( f"Calculation ID '{args.calculation_id}' is already used." ) - try: - calculation_executor( - args, - prepared_data_reader, - CalculationCore(), - CalculationMetadataService(), - CalculationOutputService(), - ) - - - - + calculation.execute( + args, + prepared_data_reader, + CalculationCore(), + CalculationMetadataService(), + CalculationOutputService(), + ) # -------------------------------------------------------- -def start_with_deps( - *, - cloud_role_name: str = "dbr-calculation-engine", - applicationinsights_connection_string: str | None = None, - parse_command_line_args: Callable[..., Namespace] = parse_command_line_arguments, - parse_job_args: Callable[ - ..., Tuple[CalculatorArgs, InfrastructureSettings] - ] = parse_job_arguments, - calculation_executor: Callable[..., None] = calculation.execute, -) -> None: - """Start overload with explicit dependencies for easier testing.""" - - config.configure_logging( - cloud_role_name=cloud_role_name, - tracer_name="calculator-job", - applicationinsights_connection_string=applicationinsights_connection_string, - extras={"Subsystem": "wholesale-aggregations"}, - ) - - with config.get_tracer().start_as_current_span( - __name__, kind=SpanKind.SERVER - ) as span: - # Try/except added to enable adding custom fields to the exception as - # the span attributes do not appear to be included in the exception. - try: - # The command line arguments are parsed to have necessary information for coming log messages - command_line_args = parse_command_line_args() - - # Add calculation_id to structured logging data to be included in every log message. - config.add_extras({"calculation_id": command_line_args.calculation_id}) - span.set_attributes(config.get_extras()) - - args, infrastructure_settings = parse_job_args(command_line_args) -# -------------------------------------------------------------------------- - spark = initialize_spark() - create_and_configure_container(spark, infrastructure_settings) - - prepared_data_reader = create_prepared_data_reader( - infrastructure_settings, spark - ) - - if not prepared_data_reader.is_calculation_id_unique(args.calculation_id): - raise Exception( - f"Calculation ID '{args.calculation_id}' is already used." - ) - - calculation_executor( - args, - prepared_data_reader, - CalculationCore(), - CalculationMetadataService(), - CalculationOutputService(), - ) - - # Added as ConfigArgParse uses sys.exit() rather than raising exceptions - - # ------ TJEK at dette er håndteret i start_trace() - except SystemExit as e: - if e.code != 0: - span_record_exception(e, span) - sys.exit(e.code) - - except Exception as e: - span_record_exception(e, span) - sys.exit(4) +# def start_with_deps( +# *, +# cloud_role_name: str = "dbr-calculation-engine", +# applicationinsights_connection_string: str | None = None, +# parse_command_line_args: Callable[..., Namespace] = parse_command_line_arguments, +# parse_job_args: Callable[ +# ..., Tuple[CalculatorArgs, InfrastructureSettings] +# ] = parse_job_arguments, +# calculation_executor: Callable[..., None] = calculation.execute, +# ) -> None: +# """Start overload with explicit dependencies for easier testing.""" +# +# config.configure_logging( +# cloud_role_name=cloud_role_name, +# tracer_name="calculator-job", +# applicationinsights_connection_string=applicationinsights_connection_string, +# extras={"Subsystem": "wholesale-aggregations"}, +# ) +# +# with config.get_tracer().start_as_current_span( +# __name__, kind=SpanKind.SERVER +# ) as span: +# # Try/except added to enable adding custom fields to the exception as +# # the span attributes do not appear to be included in the exception. +# try: +# # The command line arguments are parsed to have necessary information for coming log messages +# command_line_args = parse_command_line_args() +# +# # Add calculation_id to structured logging data to be included in every log message. +# config.add_extras({"calculation_id": command_line_args.calculation_id}) +# span.set_attributes(config.get_extras()) +# +# args, infrastructure_settings = parse_job_args(command_line_args) +# # -------------------------------------------------------------------------- +# spark = initialize_spark() +# create_and_configure_container(spark, infrastructure_settings) +# +# prepared_data_reader = create_prepared_data_reader( +# infrastructure_settings, spark +# ) +# +# if not prepared_data_reader.is_calculation_id_unique(args.calculation_id): +# raise Exception( +# f"Calculation ID '{args.calculation_id}' is already used." +# ) +# +# calculation_executor( +# args, +# prepared_data_reader, +# CalculationCore(), +# CalculationMetadataService(), +# CalculationOutputService(), +# ) +# +# # Added as ConfigArgParse uses sys.exit() rather than raising exceptions +# +# # ------ TJEK at dette er håndteret i start_trace() +# except SystemExit as e: +# if e.code != 0: +# span_record_exception(e, span) +# sys.exit(e.code) +# +# except Exception as e: +# span_record_exception(e, span) +# sys.exit(4) def create_prepared_data_reader( @@ -182,3 +218,54 @@ def create_prepared_data_reader( migrations_wholesale_repository, wholesale_internal_repository ) return prepared_data_reader + + +def _validate_quarterly_resolution_transition_datetime( + quarterly_resolution_transition_datetime: datetime, + time_zone: str, + calculation_period_start_datetime: datetime, + calculation_period_end_datetime: datetime, +) -> None: + if ( + is_midnight_in_time_zone(quarterly_resolution_transition_datetime, time_zone) + is False + ): + raise Exception( + f"The quarterly resolution transition datetime must be at midnight local time ({time_zone})." + ) + if ( + calculation_period_start_datetime + < quarterly_resolution_transition_datetime + < calculation_period_end_datetime + ): + raise Exception( + "The calculation period must not cross the quarterly resolution transition datetime." + ) + + +def _validate_period_for_wholesale_calculation( + time_zone: str, + calculation_period_start_datetime: datetime, + calculation_period_end_datetime: datetime, +) -> None: + is_valid_period = is_exactly_one_calendar_month( + calculation_period_start_datetime, + calculation_period_end_datetime, + time_zone, + ) + + if not is_valid_period: + raise Exception( + f"The calculation period for wholesale calculation types must be a full month starting and ending at midnight local time ({time_zone}))." + ) + + +def _throw_exception_if_internal_calculation_and_not_aggregation_calculation_type( + calculator_args: CalculatorArgs, +) -> None: + if ( + calculator_args.is_internal_calculation + and calculator_args.calculation_type != CalculationType.AGGREGATION + ): + raise Exception("Internal calculations must be of type AGGREGATION. ") + diff --git a/source/databricks/calculation_engine/package/calculator_job_args.py b/source/databricks/calculation_engine/package/calculator_job_args.py index 9c014b3d80..248b4f5316 100644 --- a/source/databricks/calculation_engine/package/calculator_job_args.py +++ b/source/databricks/calculation_engine/package/calculator_job_args.py @@ -47,10 +47,12 @@ def parse_job_arguments( with logging_configuration.start_span("calculation.parse_job_arguments"): - # time_zone = env_vars.get_time_zone() + time_zone = env_vars.get_time_zone() quarterly_resolution_transition_datetime = ( env_vars.get_quarterly_resolution_transition_datetime() ) + calculator_args = CalculatorArgs() + _validate_quarterly_resolution_transition_datetime( quarterly_resolution_transition_datetime, diff --git a/source/databricks/calculation_engine/package/infrastructure/environment_variables.py b/source/databricks/calculation_engine/package/infrastructure/environment_variables.py index 85d9cef4e8..fe42f43d21 100644 --- a/source/databricks/calculation_engine/package/infrastructure/environment_variables.py +++ b/source/databricks/calculation_engine/package/infrastructure/environment_variables.py @@ -33,6 +33,7 @@ class EnvironmentVariable(Enum): QUARTERLY_RESOLUTION_TRANSITION_DATETIME = ( "QUARTERLY_RESOLUTION_TRANSITION_DATETIME" ) + CLOUD_ROLE_NAME = "CLOUD_ROLE_NAME" def get_storage_account_credential() -> ClientSecretCredential: diff --git a/source/databricks/calculation_engine/package/infrastructure/infrastructure_settings.py b/source/databricks/calculation_engine/package/infrastructure/infrastructure_settings.py index 3877484a73..80a2920d7e 100644 --- a/source/databricks/calculation_engine/package/infrastructure/infrastructure_settings.py +++ b/source/databricks/calculation_engine/package/infrastructure/infrastructure_settings.py @@ -11,12 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import os from dataclasses import dataclass, field from pydantic_settings import BaseSettings, CliSettingsSource, PydanticBaseSettingsSource +from pydantic import Field from typing import Tuple, Type, Optional from azure.identity import ClientSecretCredential - +from package.infrastructure import paths # @dataclass # class InfrastructureSettings: @@ -37,16 +38,35 @@ class InfrastructureSettings(BaseSettings): Parameters can come from both runtime (CLI) or from environment variables. The priority is CLI parameters first and then environment variables. """ - catalog_name: Optional[str] = None - calculation_input_database_name: Optional[str] = None - data_storage_account_name: Optional[str] = None + catalog_name: str #From ENVIRONMENT + calculation_input_database_name: str #From ENVIRONMENT + data_storage_account_name: str #From ENVIRONMENT # Prevent the credentials from being printed or logged (using e.g., print() or repr()) - data_storage_account_credentials: Optional[ClientSecretCredential] = field(default=None, repr=False) - wholesale_container_path: Optional[str] = None - calculation_input_path: Optional[str] = None - time_series_points_table_name: Optional[str] = None - metering_point_periods_table_name: Optional[str] = None - grid_loss_metering_point_ids_table_name: Optional[str] = None + tenant_id: Optional[str] = Field(repr=False) #Ud? #From ENVIRONMENT + spn_app_id: Optional[str] = Field(repr=False) #Ud? #From ENVIRONMENT + spn_app_secret: Optional[str] = Field(repr=False) #Ud? #From ENVIRONMENT + + calculation_input_folder_name: Optional[str] = Field(default=None) #Ud? From CLI/ENVIRONMENT + + time_series_points_table_name: Optional[str] = None #From CLI + metering_point_periods_table_name: Optional[str] = None #From CLI + grid_loss_metering_point_ids_table_name: Optional[str] = Field(default=None, alias="grid_loss_metering_points_table_name") #From CLI + + data_storage_account_credentials: Optional[ClientSecretCredential] = Field(default=None, repr=False) # Filled out in model_post_init + wholesale_container_path: Optional[str] = Field(default=None) # Filled out in model_post_init + calculation_input_path: Optional[str] = Field(default=None) # Filled out in model_post_init + + + def model_post_init(self, __context): + """Automatically set data_storage_account_credentials after settings are loaded.""" + self.data_storage_account_credentials = ClientSecretCredential( + tenant_id=str(self.tenant_id), + client_id=str(self.spn_app_id), + client_secret=str(self.spn_app_secret), + ) + self.wholesale_container_path = paths.get_container_root_path(str(self.data_storage_account_name)) + self.calculation_input_path = paths.get_calculation_input_path(str(self.data_storage_account_name), str(self.calculation_input_folder_name)) + @classmethod def settings_customise_sources( @@ -57,4 +77,32 @@ def settings_customise_sources( dotenv_settings: PydanticBaseSettingsSource, file_secret_settings: PydanticBaseSettingsSource, ) -> Tuple[PydanticBaseSettingsSource, ...]: - return CliSettingsSource(settings_cls, cli_parse_args=True, cli_ignore_unknown_args=True), env_settings + return CliSettingsSource(settings_cls, cli_parse_args=True, cli_ignore_unknown_args=True), env_settings, init_settings + + + + + + + +# os.environ['CATALOG_NAME'] = 'catalog_name_str' +# os.environ['CALCULATION_INPUT_DATABASE_NAME'] = 'calculation_input_database_name_str' +# os.environ['DATA_STORAGE_ACCOUNT_NAME'] = 'data_storage_account_name_str' +# os.environ['TENANT_ID'] = '550e8400-e29b-41d4-a716-446655440000' +# os.environ['SPN_APP_ID'] = '123e4567-e89b-12d3-a456-426614174000' +# os.environ['SPN_APP_SECRET'] = 'MyPassword~HQ' +# os.environ['CALCULATION_INPUT_FOLDER_NAME'] = 'calculation_input_folder_name_env' +# +# +# +# +# +# print(_settings.catalog_name) +# # +# for s in _settings: +# print(s) +# # +# print(isinstance(_settings.data_storage_account_credentials, ClientSecretCredential)) +# # # Inspect the attributes of the ClientSecretCredential object +# print(vars(_settings.data_storage_account_credentials)) + diff --git a/source/databricks/calculation_engine/tests/calculator_job/test_start.py b/source/databricks/calculation_engine/tests/calculator_job/test_start.py index 2fb3f799e8..1ab59b1084 100644 --- a/source/databricks/calculation_engine/tests/calculator_job/test_start.py +++ b/source/databricks/calculation_engine/tests/calculator_job/test_start.py @@ -17,23 +17,72 @@ import uuid from datetime import timedelta from typing import cast, Callable +from unittest import mock from unittest.mock import Mock, patch + import pytest from azure.monitor.query import LogsQueryClient, LogsQueryResult from package.calculation.calculator_args import CalculatorArgs -from package.calculator_job import start, start_with_deps +from package.calculator_job import start from package.infrastructure.infrastructure_settings import InfrastructureSettings from tests.integration_test_configuration import IntegrationTestConfiguration +from telemetry_logging.logging_configuration import configure_logging, LoggingSettings + +def test_start() -> None: + env_args = { + "CLOUD_ROLE_NAME": "test_role", + "APPLICATIONINSIGHTS_CONNECTION_STRING": "connection_string", + "SUBSYSTEM": "test_subsystem", + "CATALOG_NAME": "default_hadoop", + "TIME_ZONE": "Europe/Copenhagen", + "CALCULATION_INPUT_DATABASE_NAME": 'calculation_input_database_name_str', + "DATA_STORAGE_ACCOUNT_NAME": 'data_storage_account_name_str', + "TENANT_ID": '550e8400-e29b-41d4-a716-446655440000', + "SPN_APP_ID": '123e4567-e89b-12d3-a456-426614174000', + "SPN_APP_SECRET": 'MyPassword~HQ', + "QUARTERLY_RESOLUTION_TRANSITION_DATETIME": "2019-12-04" + } + + sys_args = [ + "program_name", + "--force-configuration", "false", + "--orchestration-instance-id", "4a540892-2c0a-46a9-9257-c4e13051d76a", + "--calculation-id", "runID123", + "--grid-areas", "[gridarea1, gridarea2]", + "--period-start-datetime", "2024-01-30T08:00:00Z", + "--period-end-datetime", "2024-01-31T08:00:00Z", + "--calculation-type", "wholesale_fixing", + "--created-by-user-id", "userid123", + "--is-internal-calculation", "true", + "--calculation_input_folder_name", "calculation_input_folder_name_str", + "--time_series_points_table_name", "time_series_points_table_name_str", + "--metering_point_periods_table_name", "metering_point_periods_table_name_str", + "--grid_loss_metering_points_table_name", "grid_loss_metering_point_ids_table_name_str" + + ] + with ( + mock.patch('sys.argv', sys_args), + mock.patch.dict('os.environ', env_args, clear=False), + # mock.patch("package.calculator_job.start.CalculatorArgs") as mock_CalculatorArgs, + # mock.patch("telemetry_logging.logging_configuration.LoggingSettings") as mock_logging_settings, + # mock.patch("telemetry_logging.logging_configuration.configure_logging") as mock_configure_logging, + # mock.patch( + # "package.calculator_job.start.start_with_deps" + # ) as mock_start_with_deps, + ): + # start() + # print(mock_CalculatorArgs.return_value) + assert 1==1 +# -------------------------------------------------------------------------------------------------------- class TestWhenInvokedWithInvalidArguments: def test_exits_with_code_2(self) -> None: """The exit code 2 originates from the argparse library.""" with pytest.raises(SystemExit) as system_exit: start() - assert system_exit.value.code == 2 diff --git a/source/databricks/calculation_engine/tests/conftest.py b/source/databricks/calculation_engine/tests/conftest.py index 545f7a9a9d..f4d12242f3 100644 --- a/source/databricks/calculation_engine/tests/conftest.py +++ b/source/databricks/calculation_engine/tests/conftest.py @@ -23,6 +23,7 @@ from datetime import datetime from pathlib import Path from typing import Generator, Callable, Optional +from unittest import mock import pytest import telemetry_logging.logging_configuration as config @@ -31,6 +32,7 @@ from delta import configure_spark_with_delta_pip from pyspark.sql import SparkSession from pyspark.sql.types import StructType +from telemetry_logging.logging_configuration import LoggingSettings import tests.helpers.spark_sql_migration_helper as sql_migration_helper from package.calculation.calculator_args import CalculatorArgs @@ -55,6 +57,21 @@ ) +# @pytest.fixture(scope="session") +# def setup_params() -> None: +# sys_args = [ +# 'program_name', +# '--orchestration_instance_id', '4a540892-2c0a-46a9-9257-c4e13051d76a', +# '--force_configuration', 'true' +# ] +# env_args = { +# 'CLOUD_ROLE_NAME': 'cloud_role_name_value', +# 'APPLICATIONINSIGHTS_CONNECTION_STRING': 'applicationinsights_connection_string_value', +# 'SUBSYSTEM': 'subsystem_value' +# } +# with mock.patch.dict(os.environ, env_args), mock.patch('sys.argv', sys_args): + +# ----------------------------------------------------------------------------- @pytest.fixture(scope="session") def test_files_folder_path(tests_path: str) -> str: return f"{tests_path}/test_files" @@ -428,15 +445,32 @@ def grid_loss_metering_point_ids_input_data_written_to_delta( ) -@pytest.fixture(scope="session", autouse=True) +@pytest.fixture(scope="session", autouse=False) def configure_logging() -> None: """ Configures the logging initially. """ - config.configure_logging( - cloud_role_name="dbr-calculation-engine-tests", - tracer_name="unit-tests", - ) + + logging_env_args = { + 'CLOUD_ROLE_NAME': 'dbr-calculation-engine-tests', + 'APPLICATIONINSIGHTS_CONNECTION_STRING': 'temp_string', + 'SUBSYSTEM': 'unit-tests' + } + + logging_sys_args = [ + 'program_name', + '--orchestration_instance_id', '4a540892-2c0a-46a9-9257-c4e13051d76a', + '--force_configuration', 'false' + ] + + with (mock.patch('sys.argv', logging_sys_args), + mock.patch.dict('os.environ', logging_env_args, clear=False)): + logging_settings = config.LoggingSettings() + logging_settings.applicationinsights_connection_string = None + config.configure_logging( + logging_settings = logging_settings, + extras = dict(orchestration_instance_id = logging_settings.orchestration_instance_id), + ) @pytest.fixture(scope="session") From 7a6019b9c670ed7336cbf8a4e80257b90d845097 Mon Sep 17 00:00:00 2001 From: chba Date: Tue, 4 Feb 2025 15:46:57 +0100 Subject: [PATCH 4/5] fixed test_start --- .../infrastructure/infrastructure_settings.py | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/source/databricks/calculation_engine/package/infrastructure/infrastructure_settings.py b/source/databricks/calculation_engine/package/infrastructure/infrastructure_settings.py index 80a2920d7e..4141f37633 100644 --- a/source/databricks/calculation_engine/package/infrastructure/infrastructure_settings.py +++ b/source/databricks/calculation_engine/package/infrastructure/infrastructure_settings.py @@ -42,31 +42,31 @@ class InfrastructureSettings(BaseSettings): calculation_input_database_name: str #From ENVIRONMENT data_storage_account_name: str #From ENVIRONMENT # Prevent the credentials from being printed or logged (using e.g., print() or repr()) - tenant_id: Optional[str] = Field(repr=False) #Ud? #From ENVIRONMENT - spn_app_id: Optional[str] = Field(repr=False) #Ud? #From ENVIRONMENT - spn_app_secret: Optional[str] = Field(repr=False) #Ud? #From ENVIRONMENT + tenant_id: Optional[str] = Field(repr=False, default="tenant_id") #Ud? #From ENVIRONMENT + spn_app_id: Optional[str] = Field(repr=False, default="spn_app_id") #Ud? #From ENVIRONMENT + spn_app_secret: Optional[str] = Field(repr=False, default="spn_app_secret") #Ud? #From ENVIRONMENT calculation_input_folder_name: Optional[str] = Field(default=None) #Ud? From CLI/ENVIRONMENT time_series_points_table_name: Optional[str] = None #From CLI - metering_point_periods_table_name: Optional[str] = None #From CLI - grid_loss_metering_point_ids_table_name: Optional[str] = Field(default=None, alias="grid_loss_metering_points_table_name") #From CLI + metering_point_periods_table_name: str | None = Field(default=None) #From CLI + grid_loss_metering_point_ids_table_name: str | None = Field(default=None) #From CLI data_storage_account_credentials: Optional[ClientSecretCredential] = Field(default=None, repr=False) # Filled out in model_post_init wholesale_container_path: Optional[str] = Field(default=None) # Filled out in model_post_init calculation_input_path: Optional[str] = Field(default=None) # Filled out in model_post_init - def model_post_init(self, __context): - """Automatically set data_storage_account_credentials after settings are loaded.""" - self.data_storage_account_credentials = ClientSecretCredential( - tenant_id=str(self.tenant_id), - client_id=str(self.spn_app_id), - client_secret=str(self.spn_app_secret), - ) - self.wholesale_container_path = paths.get_container_root_path(str(self.data_storage_account_name)) - self.calculation_input_path = paths.get_calculation_input_path(str(self.data_storage_account_name), str(self.calculation_input_folder_name)) - + # def model_post_init(self, __context): + # """Automatically set data_storage_account_credentials after settings are loaded.""" + # self.data_storage_account_credentials = ClientSecretCredential( + # tenant_id=str(self.tenant_id), + # client_id=str(self.spn_app_id), + # client_secret=str(self.spn_app_secret), + # ) + # self.wholesale_container_path = paths.get_container_root_path(str(self.data_storage_account_name)) + # self.calculation_input_path = paths.get_calculation_input_path(str(self.data_storage_account_name), str(self.calculation_input_folder_name)) + # @classmethod def settings_customise_sources( From a39151ed58a806d4c54eb05cf1b19abe35cd8c9f Mon Sep 17 00:00:00 2001 From: chba Date: Tue, 4 Feb 2025 16:29:10 +0100 Subject: [PATCH 5/5] fixed patches for conftest --- .../package/calculator_job.py | 24 ++++++++--------- .../infrastructure/infrastructure_settings.py | 26 +++++++++---------- .../tests/calculator_job/test_start.py | 7 ++--- 3 files changed, 29 insertions(+), 28 deletions(-) diff --git a/source/databricks/calculation_engine/package/calculator_job.py b/source/databricks/calculation_engine/package/calculator_job.py index 2ad9350165..1a4ef95de1 100644 --- a/source/databricks/calculation_engine/package/calculator_job.py +++ b/source/databricks/calculation_engine/package/calculator_job.py @@ -55,10 +55,10 @@ def start() -> None: # Parse params for LoggingSettings - logging_settings = LoggingSettings( - cloud_role_name = "dbr-calculation-engine", - subsystem = "calculator-job" #Will be used as trace_name - ) + # logging_settings = LoggingSettings( + # cloud_role_name = "dbr-calculation-engine", + # subsystem = "calculator-job" #Will be used as trace_name + # ) # Parse params for CalculatorArgs and InfrastructureSettings args = CalculatorArgs() infrastructure_settings = InfrastructureSettings() @@ -82,14 +82,14 @@ def start() -> None: # ) - configure_logging( - logging_settings=logging_settings, - extras = dict(Subsystem = "wholesale-aggregations", - calculation_id = args.calculation_id) - # extras={'tracer_name': 'calculator-job'} - ) - - start_with_deps(args, infrastructure_settings) + # configure_logging( + # logging_settings=logging_settings, + # extras = dict(Subsystem = "wholesale-aggregations", + # calculation_id = args.calculation_id) + # # extras={'tracer_name': 'calculator-job'} + # ) + # + # start_with_deps(args, infrastructure_settings) diff --git a/source/databricks/calculation_engine/package/infrastructure/infrastructure_settings.py b/source/databricks/calculation_engine/package/infrastructure/infrastructure_settings.py index 4141f37633..9b76bc1a32 100644 --- a/source/databricks/calculation_engine/package/infrastructure/infrastructure_settings.py +++ b/source/databricks/calculation_engine/package/infrastructure/infrastructure_settings.py @@ -42,9 +42,9 @@ class InfrastructureSettings(BaseSettings): calculation_input_database_name: str #From ENVIRONMENT data_storage_account_name: str #From ENVIRONMENT # Prevent the credentials from being printed or logged (using e.g., print() or repr()) - tenant_id: Optional[str] = Field(repr=False, default="tenant_id") #Ud? #From ENVIRONMENT - spn_app_id: Optional[str] = Field(repr=False, default="spn_app_id") #Ud? #From ENVIRONMENT - spn_app_secret: Optional[str] = Field(repr=False, default="spn_app_secret") #Ud? #From ENVIRONMENT + tenant_id: Optional[str] = Field(repr=False, default="foo") #Ud? #From ENVIRONMENT + spn_app_id: Optional[str] = Field(repr=False, default="foo") #Ud? #From ENVIRONMENT + spn_app_secret: Optional[str] = Field(repr=False, default="foo") #Ud? #From ENVIRONMENT calculation_input_folder_name: Optional[str] = Field(default=None) #Ud? From CLI/ENVIRONMENT @@ -57,16 +57,16 @@ class InfrastructureSettings(BaseSettings): calculation_input_path: Optional[str] = Field(default=None) # Filled out in model_post_init - # def model_post_init(self, __context): - # """Automatically set data_storage_account_credentials after settings are loaded.""" - # self.data_storage_account_credentials = ClientSecretCredential( - # tenant_id=str(self.tenant_id), - # client_id=str(self.spn_app_id), - # client_secret=str(self.spn_app_secret), - # ) - # self.wholesale_container_path = paths.get_container_root_path(str(self.data_storage_account_name)) - # self.calculation_input_path = paths.get_calculation_input_path(str(self.data_storage_account_name), str(self.calculation_input_folder_name)) - # + def model_post_init(self, __context): + """Automatically set data_storage_account_credentials after settings are loaded.""" + self.data_storage_account_credentials = ClientSecretCredential( + tenant_id=str(self.tenant_id), + client_id=str(self.spn_app_id), + client_secret=str(self.spn_app_secret), + ) + self.wholesale_container_path = paths.get_container_root_path(str(self.data_storage_account_name)) + self.calculation_input_path = paths.get_calculation_input_path(str(self.data_storage_account_name), str(self.calculation_input_folder_name)) + @classmethod def settings_customise_sources( diff --git a/source/databricks/calculation_engine/tests/calculator_job/test_start.py b/source/databricks/calculation_engine/tests/calculator_job/test_start.py index 1ab59b1084..82a8e7a74c 100644 --- a/source/databricks/calculation_engine/tests/calculator_job/test_start.py +++ b/source/databricks/calculation_engine/tests/calculator_job/test_start.py @@ -66,15 +66,16 @@ def test_start() -> None: with ( mock.patch('sys.argv', sys_args), mock.patch.dict('os.environ', env_args, clear=False), - # mock.patch("package.calculator_job.start.CalculatorArgs") as mock_CalculatorArgs, + #mock.patch("package.calculator_job.calculator_args.CalculatorArgs") as mock_CalculatorArgs, # mock.patch("telemetry_logging.logging_configuration.LoggingSettings") as mock_logging_settings, # mock.patch("telemetry_logging.logging_configuration.configure_logging") as mock_configure_logging, # mock.patch( # "package.calculator_job.start.start_with_deps" # ) as mock_start_with_deps, ): - # start() - # print(mock_CalculatorArgs.return_value) + #start() + #print(mock_CalculatorArgs.return_value) + #1assert mock_CalculatorArgs.assert_called_once() assert 1==1 # --------------------------------------------------------------------------------------------------------