diff --git a/.docker/Dockerfile b/.docker/Dockerfile index 70a7796b12..ac0bcfc8bc 100644 --- a/.docker/Dockerfile +++ b/.docker/Dockerfile @@ -14,7 +14,8 @@ # The spark version should follow the spark version in databricks. # The databricks version of spark is controlled from dh3-infrastructure and uses latest LTS (ATTOW - Spark v3.5.0) -FROM ghcr.io/energinet-datahub/pyspark-slim:3.5.1-3 +# pyspark-slim version should match pyspark version in requirements.txt +FROM ghcr.io/energinet-datahub/pyspark-slim:3.5.1-5 SHELL ["/bin/bash", "-o", "pipefail", "-c"] diff --git a/.docker/requirements.txt b/.docker/requirements.txt index b26300002e..e09f1755a8 100644 --- a/.docker/requirements.txt +++ b/.docker/requirements.txt @@ -18,21 +18,21 @@ pytest-xdist # databricks-cli==0.18 dataclasses-json==0.6.7 -delta-spark==3.1.0 -dependency_injector==4.41.0 -azure-identity==1.19.0 -azure-keyvault-secrets==4.9.0 +delta-spark==3.2.0 +pyspark==3.5.1 +dependency_injector==4.43.0 +azure-identity==1.17.1 +azure-keyvault-secrets==4.8.0 azure-monitor-opentelemetry==1.6.4 azure-core==1.32.0 azure-monitor-query==1.4.0 -azure-storage-file-datalake==12.11.0 -opengeh-spark-sql-migrations @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages@1.9.0#subdirectory=source/spark_sql_migrations -python-dateutil==2.9.0.post0 +opengeh-spark-sql-migrations @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages@2.4.1#subdirectory=source/spark_sql_migrations +python-dateutil==2.8.2 types-python-dateutil==2.9.0.20241003 -opengeh-telemetry @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages@2.3.1#subdirectory=source/telemetry +opengeh-telemetry @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages@2.4.1#subdirectory=source/telemetry coverage==7.6.8 pytest==8.3.3 configargparse==1.7.0 pytest-mock==3.14.0 -virtualenv==20.28.0 \ No newline at end of file +virtualenv==20.24.2 \ No newline at end of file diff --git a/source/databricks/calculation_engine/contracts/__init__.py b/source/databricks/calculation_engine/contracts/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/source/databricks/calculation_engine/contracts/data_products/__init__.py b/source/databricks/calculation_engine/contracts/data_products/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/source/databricks/calculation_engine/contracts/data_products/wholesale_basis_data/__init__.py b/source/databricks/calculation_engine/contracts/data_products/wholesale_basis_data/__init__.py new file mode 100644 index 0000000000..9bd824ee44 --- /dev/null +++ b/source/databricks/calculation_engine/contracts/data_products/wholesale_basis_data/__init__.py @@ -0,0 +1,6 @@ +from .charge_link_periods_v1 import charge_link_periods_v1 +from .charge_price_information_periods_v1 import charge_price_information_periods_v1 +from .charge_price_points_v1 import charge_price_points_v1 +from .grid_loss_metering_points_v1 import grid_loss_metering_points_v1 +from .metering_point_periods_v1 import metering_point_periods_v1 +from .time_series_points_v1 import time_series_points_v1 diff --git a/source/databricks/calculation_engine/contracts/data_products/wholesale_results/__init__.py b/source/databricks/calculation_engine/contracts/data_products/wholesale_results/__init__.py new file mode 100644 index 0000000000..d5c380ded2 --- /dev/null +++ b/source/databricks/calculation_engine/contracts/data_products/wholesale_results/__init__.py @@ -0,0 +1,11 @@ +from .amounts_per_charge_v1 import amounts_per_charge_v1 +from .energy_per_brp_v1 import energy_per_brp_v1 +from .energy_per_es_v1 import energy_per_es_v1 +from .energy_v1 import energy_v1 +from .exchange_per_neighbor_v1 import exchange_per_neighbor_v1 +from .grid_loss_metering_point_time_series_v1 import ( + grid_loss_metering_point_time_series_v1, +) +from .latest_calculations_by_day_v1 import latest_calculations_by_day_v1 +from .monthly_amounts_per_charge_v1 import monthly_amounts_per_charge_v1 +from .total_monthly_amounts_v1 import total_monthly_amounts_v1 diff --git a/source/databricks/calculation_engine/contracts/data_products/wholesale_sap/__init__.py b/source/databricks/calculation_engine/contracts/data_products/wholesale_sap/__init__.py new file mode 100644 index 0000000000..65a4da68cd --- /dev/null +++ b/source/databricks/calculation_engine/contracts/data_products/wholesale_sap/__init__.py @@ -0,0 +1,3 @@ +from .energy_v1 import energy_v1 +from .latest_calculations_history_v1 import latest_calculations_history_v1 +from .amounts_per_charge_v1 import amounts_per_charge_v1 diff --git a/source/databricks/calculation_engine/contracts/data_products/wholesale_settlement_reports/__init__.py b/source/databricks/calculation_engine/contracts/data_products/wholesale_settlement_reports/__init__.py new file mode 100644 index 0000000000..1faa2cf6bb --- /dev/null +++ b/source/databricks/calculation_engine/contracts/data_products/wholesale_settlement_reports/__init__.py @@ -0,0 +1,13 @@ +from .amounts_per_charge_v1 import amounts_per_charge_v1 +from .charge_link_periods_v1 import charge_link_periods_v1 +from .charge_prices_v1 import charge_prices_v1 +from .current_balance_fixing_calculation_version_v1 import ( + current_balance_fixing_calculation_version_v1, +) +from .energy_per_es_v1 import energy_per_es_v1 +from .energy_v1 import energy_v1 +from .metering_point_periods_v1 import metering_point_periods_v1 +from .metering_point_time_series_v1 import metering_point_time_series_v1 +from .monthly_amounts_per_charge_v1 import monthly_amounts_per_charge_v1 +from .monthly_amounts_v1 import monthly_amounts_v1 +from .total_monthly_amounts_v1 import total_monthly_amounts_v1 diff --git a/source/databricks/calculation_engine/contracts/wholesale_internal/__init__.py b/source/databricks/calculation_engine/contracts/wholesale_internal/__init__.py new file mode 100644 index 0000000000..88581f0b3d --- /dev/null +++ b/source/databricks/calculation_engine/contracts/wholesale_internal/__init__.py @@ -0,0 +1 @@ +from .succeeded_external_calculations_v1 import succeeded_external_calculations_v1 diff --git a/source/databricks/calculation_engine/package/datamigration/migration_scripts/202409110811_create_view__wholesale_sap__latest_calculations_history_v1.sql b/source/databricks/calculation_engine/package/datamigration/migration_scripts/202409110811_create_view__wholesale_sap__latest_calculations_history_v1.sql index ed2ba643ce..61a783debb 100644 --- a/source/databricks/calculation_engine/package/datamigration/migration_scripts/202409110811_create_view__wholesale_sap__latest_calculations_history_v1.sql +++ b/source/databricks/calculation_engine/package/datamigration/migration_scripts/202409110811_create_view__wholesale_sap__latest_calculations_history_v1.sql @@ -22,7 +22,6 @@ WITH calculations_by_day AS ( interval 1 day )) AS from_date_local, -- All rows should represent a full day, so the to_date is the day after the from_date - DATE_ADD(from_date_local, 1) AS to_date_local, calculation_succeeded_time as latest_from_time FROM {CATALOG_NAME}.{WHOLESALE_INTERNAL_DATABASE_NAME}.calculations c INNER JOIN {CATALOG_NAME}.{WHOLESALE_INTERNAL_DATABASE_NAME}.calculation_grid_areas cga ON c.calculation_id = cga.calculation_id @@ -34,7 +33,7 @@ SELECT calculation_version, grid_area_code, TO_UTC_TIMESTAMP(from_date_local, 'Europe/Copenhagen') AS from_date, - TO_UTC_TIMESTAMP(to_date_local, 'Europe/Copenhagen') AS to_date, + TO_UTC_TIMESTAMP(DATE_ADD(from_date_local, 1) , 'Europe/Copenhagen') AS to_date, latest_from_time, -- The latest_to_time is the latest_from_time of the next calculation that has been calculated (with the same, calc. type, grid are and from date). LEAD(latest_from_time) OVER (PARTITION BY calculation_type, grid_area_code, from_date_local ORDER BY calculation_version ASC) AS latest_to_time diff --git a/source/databricks/calculation_engine/package/datamigration/schema_config.py b/source/databricks/calculation_engine/package/datamigration/schema_config.py index fac30fbc70..b54c6b4242 100644 --- a/source/databricks/calculation_engine/package/datamigration/schema_config.py +++ b/source/databricks/calculation_engine/package/datamigration/schema_config.py @@ -3,6 +3,17 @@ import package.databases.wholesale_basis_data_internal.schemas as basis_data_schemas import package.databases.wholesale_internal.schemas as internal_schemas import package.infrastructure.paths as paths + +from contracts.data_products import ( + wholesale_sap, + wholesale_results, + wholesale_settlement_reports, +) + +from contracts.wholesale_internal import ( + succeeded_external_calculations_v1, +) + from package.databases.wholesale_results_internal.schemas import ( energy_schema, energy_per_brp_schema, @@ -35,6 +46,7 @@ views=[ View( name=paths.WholesaleInternalDatabase.SUCCEEDED_EXTERNAL_CALCULATIONS_V1_VIEW_NAME, + schema=succeeded_external_calculations_v1, ), ], ), @@ -108,27 +120,35 @@ views=[ View( name=paths.WholesaleResultsDatabase.ENERGY_V1_VIEW_NAME, + schema=wholesale_results.energy_v1, ), View( name=paths.WholesaleResultsDatabase.ENERGY_PER_BRP_V1_VIEW_NAME, + schema=wholesale_results.energy_per_brp_v1, ), View( name=paths.WholesaleResultsDatabase.ENERGY_PER_ES_V1_VIEW_NAME, + schema=wholesale_results.energy_per_es_v1, ), View( name=paths.WholesaleResultsDatabase.EXCHANGE_PER_NEIGHBOR_V1_VIEW_NAME, + schema=wholesale_results.exchange_per_neighbor_v1, ), View( name=paths.WholesaleResultsDatabase.GRID_LOSS_METERING_POINT_TIME_SERIES_VIEW_NAME, + schema=wholesale_results.grid_loss_metering_point_time_series_v1, ), View( name=paths.WholesaleResultsDatabase.AMOUNTS_PER_CHARGE_V1_VIEW_NAME, + schema=wholesale_results.amounts_per_charge_v1, ), View( name=paths.WholesaleResultsDatabase.MONTHLY_AMOUNTS_PER_CHARGE_V1_VIEW_NAME, + schema=wholesale_results.monthly_amounts_per_charge_v1, ), View( name=paths.WholesaleResultsDatabase.TOTAL_MONTHLY_AMOUNTS_V1_VIEW_NAME, + schema=wholesale_results.total_monthly_amounts_v1, ), ], ), @@ -137,29 +157,37 @@ tables=[], views=[ View( - name=paths.WholesaleSettlementReportsDatabase.METERING_POINT_PERIODS_VIEW_NAME_V1 + name=paths.WholesaleSettlementReportsDatabase.METERING_POINT_PERIODS_VIEW_NAME_V1, + schema=wholesale_settlement_reports.metering_point_periods_v1, ), View( - name=paths.WholesaleSettlementReportsDatabase.METERING_POINT_TIME_SERIES_VIEW_NAME_V1 + name=paths.WholesaleSettlementReportsDatabase.METERING_POINT_TIME_SERIES_VIEW_NAME_V1, + schema=wholesale_settlement_reports.metering_point_time_series_v1, ), View( - name=paths.WholesaleSettlementReportsDatabase.CHARGE_PRICES_VIEW_NAME_V1 + name=paths.WholesaleSettlementReportsDatabase.CHARGE_PRICES_VIEW_NAME_V1, + schema=wholesale_settlement_reports.charge_prices_v1, ), View( - name=paths.WholesaleSettlementReportsDatabase.CHARGE_LINK_PERIODS_VIEW_NAME_V1 + name=paths.WholesaleSettlementReportsDatabase.CHARGE_LINK_PERIODS_VIEW_NAME_V1, + schema=wholesale_settlement_reports.charge_link_periods_v1, ), View( name=paths.WholesaleSettlementReportsDatabase.AMOUNTS_PER_CHARGE_V1_VIEW_NAME, + schema=wholesale_settlement_reports.amounts_per_charge_v1, ), View( name=paths.WholesaleSettlementReportsDatabase.MONTHLY_AMOUNTS_PER_CHARGE_V1_VIEW_NAME, + schema=wholesale_settlement_reports.monthly_amounts_per_charge_v1, ), View( name=paths.WholesaleSettlementReportsDatabase.TOTAL_MONTHLY_AMOUNTS_V1_VIEW_NAME, + schema=wholesale_settlement_reports.total_monthly_amounts_v1, ), View( # ToDo JMG: Remove when settlement report subsystem uses monthly_amounts_per_charge_v1/total_monthly_amounts_v1 name=paths.WholesaleSettlementReportsDatabase.MONTHLY_AMOUNTS_V1_VIEW_NAME, + schema=wholesale_settlement_reports.monthly_amounts_v1, ), ], ), @@ -169,12 +197,15 @@ views=[ View( name=paths.WholesaleSapDatabase.LATEST_CALCULATIONS_HISTORY_V1_VIEW_NAME, + schema=wholesale_sap.latest_calculations_history_v1, ), View( name=paths.WholesaleSapDatabase.ENERGY_V1_VIEW_NAME, + schema=wholesale_sap.energy_v1, ), View( name=paths.WholesaleSapDatabase.AMOUNTS_PER_CHARGE_V1_VIEW_NAME, + schema=wholesale_sap.amounts_per_charge_v1, ), ], ), diff --git a/source/databricks/calculation_engine/setup.py b/source/databricks/calculation_engine/setup.py index 6738a1b8c1..adda09bf29 100644 --- a/source/databricks/calculation_engine/setup.py +++ b/source/databricks/calculation_engine/setup.py @@ -29,16 +29,16 @@ # Make sure these packages are added to the docker container and pinned to the same versions install_requires=[ "ConfigArgParse==1.7.0", - "pyspark==3.5.1", - "azure-identity==1.19.0", - "dependency_injector==4.41.0", + "pyspark==3.5.1", # This needs to be same version pyspark-slim in docker file + "azure-identity==1.17.1", + "dependency_injector==4.43.0", "urllib3==2.2.*", - "delta-spark==3.1.0", - "python-dateutil==2.9.0.post0", + "delta-spark==3.2.0", + "python-dateutil==2.8.2", "azure-monitor-opentelemetry==1.6.4", "azure-core==1.32.0", - "opengeh-spark-sql-migrations @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages@1.9.0#subdirectory=source/spark_sql_migrations", - "opengeh-telemetry @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages@2.3.1#subdirectory=source/telemetry", + "opengeh-spark-sql-migrations @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages@2.4.1#subdirectory=source/spark_sql_migrations", + "opengeh-telemetry @ git+https://git@github.com/Energinet-DataHub/opengeh-python-packages@2.4.1#subdirectory=source/telemetry", ], entry_points={ "console_scripts": [ diff --git a/source/databricks/calculation_engine/tests/contracts/databases_and_schemas.py b/source/databricks/calculation_engine/tests/contracts/databases_and_schemas.py index 53badcaf6e..1ad5b18363 100644 --- a/source/databricks/calculation_engine/tests/contracts/databases_and_schemas.py +++ b/source/databricks/calculation_engine/tests/contracts/databases_and_schemas.py @@ -34,7 +34,7 @@ def get_expected_schemas(folder: str) -> dict: for root, _, files in os.walk(schemas_folder): database_name = Path(root).name for file_name in files: - if file_name.endswith(".py"): + if file_name.endswith(".py") and not file_name.startswith("__init__"): # Remove the file extension schema_name = file_name[:-3]