Skip to content

Commit

Permalink
Merge pull request #2447 from ASFHyP3/srg
Browse files Browse the repository at this point in the history
Add `SRG_TIME_SERIES` job type
  • Loading branch information
jtherrmann authored Oct 18, 2024
2 parents 4921de2 + 24856d5 commit b932fdc
Show file tree
Hide file tree
Showing 40 changed files with 786 additions and 268 deletions.
2 changes: 1 addition & 1 deletion .github/actions/deploy-hyp3/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ inputs:
description: "Comma separated list of Subnet IDs"
required: true
SECRET_ARN:
description: "ARN of Secret in AWS Secrets Manager containing all the credentials needed for job tasks"
description: "ARN of Secret in AWS Secrets Manager containing all the credentials needed for job steps"
required: true
CLOUDFORMATION_ROLE_ARN:
description: "The CloudFormation role to use for this deployment"
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/deploy-enterprise-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ jobs:
job_files: >-
job_spec/INSAR_ISCE_BURST.yml
job_spec/SRG_GSLC.yml
job_spec/SRG_TIME_SERIES.yml
instance_types: r6id.xlarge,r6id.2xlarge,r6id.4xlarge,r6id.8xlarge,r6idn.xlarge,r6idn.2xlarge,r6idn.4xlarge,r6idn.8xlarge
default_max_vcpus: 640
expanded_max_vcpus: 640
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/deploy-enterprise.yml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ jobs:
job_files: >-
job_spec/INSAR_ISCE_BURST.yml
job_spec/SRG_GSLC.yml
job_spec/SRG_TIME_SERIES.yml
instance_types: r6id.xlarge,r6id.2xlarge,r6id.4xlarge,r6id.8xlarge,r6idn.xlarge,r6idn.2xlarge,r6idn.4xlarge,r6idn.8xlarge
default_max_vcpus: 640
expanded_max_vcpus: 640
Expand Down
9 changes: 3 additions & 6 deletions .github/workflows/deploy-multi-burst-sandbox.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Deploy Multi-Burst Sandbox Stack to AWS
on:
push:
branches:
- multi-burst-sandbox
- srg

concurrency: ${{ github.workflow }}-${{ github.ref }}

Expand All @@ -25,11 +25,8 @@ jobs:
deploy_ref: refs/heads/multi-burst-sandbox
job_files: >-
job_spec/INSAR_ISCE_BURST.yml
job_spec/INSAR_ISCE_MULTI_BURST.yml
job_spec/AUTORIFT.yml
job_spec/RTC_GAMMA.yml
job_spec/WATER_MAP.yml
job_spec/WATER_MAP_EQ.yml
job_spec/SRG_GSLC.yml
job_spec/SRG_TIME_SERIES.yml
instance_types: r6id.xlarge,r6id.2xlarge,r6id.4xlarge,r6id.8xlarge,r6idn.xlarge,r6idn.2xlarge,r6idn.4xlarge,r6idn.8xlarge
default_max_vcpus: 640
expanded_max_vcpus: 640
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ apps/api/src/hyp3_api/api-spec/job_parameters.yml
apps/api/src/hyp3_api/job_validation_map.yml
apps/step-function.json
apps/**/*-cf.yml
apps/start-execution-worker/src/batch_params_by_job_type.json
lib/dynamo/dynamo/*.json
lib/dynamo/dynamo/*.yml

Expand Down
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,30 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [8.0.0]

### Added
- A job step can now be applied to every item in a list using a new `map: for <item> in <items>` syntax. For example, given a job spec with a `granules` parameter, a step that includes a `map: for granule in granules` field is applied to each item in the `granules` list and can refer to `Ref::granule` within its `command` field.
- If a job contains a `map` step, the processing time value for that step (in the `processing_times` list in the job's API response) is a sub-list of processing times for the step's iterations, in the same order as the items in the input list.
- A new `SRG_TIME_SERIES` job type has been added to the `hyp3-lavas` and `hyp3-lavas-test` deployments. This workflow uses the new `map` syntax described above to produce a GSLC for each level-0 Sentinel-1 granule passed via the `granules` parameter and then produces a time series product from the GSLCs. See the [HyP3 SRG](https://github.com/ASFHyP3/hyp3-srg) plugin.
- The `SRG_GSLC` job type now includes parameter validation.

### Changed
- Changes to custom compute environments:
- Custom compute environments are now applied to individual job steps rather than to entire jobs. The `compute_environment` field is now provided at the step level rather than at the top level of the job spec.
- Custom compute environments can optionally be defined within `job_spec/config/compute_environments.yml`. Job steps can import these environments using the following syntax:
```yaml
compute_environment:
import: MyComputeEnvironment
```
If the `import` value is `Default`, then the job step uses the deployment's default compute environment.

The `compute_environment` field can still be used to define a custom compute environment directly within the job spec, as before.
- Other changes to the job spec syntax:
- The `tasks` field has been renamed to `steps`.
- Job parameters no longer contain a top-level `default` field. The `default` field within each parameter's `api_schema` mapping is still supported.
- Job specs no longer explicitly define a `bucket_prefix` parameter. Instead, `bucket_prefix` is automatically defined and can still be referenced as `Ref::bucket_prefix` within each step's `command` field.

## [7.12.0]

### Changed
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ install:
python -m pip install -r requirements-all.txt

files ?= job_spec/*.yml
compute_env_file ?= job_spec/config/compute_environments.yml
security_environment ?= ASF
api_name ?= local
cost_profile ?= DEFAULT
render:
@echo rendering $(files) for API $(api_name) and security environment $(security_environment); python apps/render_cf.py -j $(files) -s $(security_environment) -n $(api_name) -c $(cost_profile)
@echo rendering $(files) for API $(api_name) and security environment $(security_environment); python apps/render_cf.py -j $(files) -e $(compute_env_file) -s $(security_environment) -n $(api_name) -c $(cost_profile)

static: flake8 openapi-validate cfn-lint

Expand Down
4 changes: 2 additions & 2 deletions apps/api/src/hyp3_api/api-spec/job_parameters.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ components:
description: Parameters for running {{ job_type }} jobs
type: object
additionalProperties: false
{% for parameter, parameter_spec in job_spec['parameters'].items() if 'api_schema' in parameter_spec and parameter in job_spec.get('required_parameters', []) %}
{% for parameter, parameter_spec in job_spec['parameters'].items() if parameter in job_spec.get('required_parameters', []) %}
{% if loop.first %}
required:
{% endif %}
- {{ parameter }}
{% endfor %}
{% for parameter, parameter_spec in job_spec['parameters'].items() if 'api_schema' in parameter_spec %}
{% for parameter, parameter_spec in job_spec['parameters'].items() %}
{% if loop.first %}
properties:
{% endif %}
Expand Down
17 changes: 11 additions & 6 deletions apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -408,20 +408,25 @@ components:

processing_times:
description: >
A list of run times for the job's processing steps in the order that they were executed. For example,
a job comprised of a single processing step would yield a list containing one processing time, while a job
comprised of three processing steps would yield a list containing three processing times. An empty list
represents a failure to calculate processing times.
List of run times for the job's processing steps in the order that they were executed.
An empty list represents a failure to calculate processing times.
type: array
items:
$ref: '#/components/schemas/processing_time_in_seconds'
oneOf:
- type: array
items:
$ref: '#/components/schemas/processing_time_in_seconds'
example: [200, 100, 150]
- $ref: '#/components/schemas/processing_time_in_seconds'


processing_time_in_seconds:
description: >
Run time in seconds for a particular processing step's final attempt (regardless of whether it succeeded).
Run time in seconds for a processing step's final attempt (regardless of whether it succeeded).
A value of zero indicates that there were no attempts.
type: number
minimum: 0
example: 50

securitySchemes:
EarthDataLogin:
Expand Down
60 changes: 60 additions & 0 deletions apps/api/src/hyp3_api/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ class GranuleValidationError(Exception):
pass


class BoundsValidationError(Exception):
pass


with open(Path(__file__).parent / 'job_validation_map.yml') as f:
JOB_VALIDATION_MAP = yaml.safe_load(f.read())

Expand Down Expand Up @@ -136,6 +140,62 @@ def get_multipolygon_from_geojson(input_file):
return MultiPolygon(polygons)


def check_bounds_formatting(job, _):
bounds = job['job_parameters']['bounds']
if bounds == [0.0, 0.0, 0.0, 0.0]:
return

if bounds[0] >= bounds[2] or bounds[1] >= bounds[3]:
raise BoundsValidationError(
'Invalid order for bounds. Bounds should be ordered [min lon, min lat, max lon, max lat].'
)

def bad_lat(lat):
return lat > 90 or lat < -90

def bad_lon(lon):
return lon > 180 or lon < -180

if any([bad_lon(bounds[0]), bad_lon(bounds[2]), bad_lat(bounds[1]), bad_lat(bounds[3])]):
raise BoundsValidationError(
'Invalid lon/lat value(s) in bounds. Bounds should be ordered [min lon, min lat, max lon, max lat].'
)


def check_granules_intersecting_bounds(job, granule_metadata):
bounds = job['job_parameters']['bounds']
if bounds == [0.0, 0.0, 0.0, 0.0]:
bounds = granule_metadata[0]['polygon']
else:
bounds = Polygon.from_bounds(*bounds)
bad_granules = []
for granule in granule_metadata:
bbox = granule['polygon']
if not bbox.intersection(bounds):
bad_granules.append(granule['name'])
if bad_granules:
raise GranuleValidationError(
f'The following granules do not intersect the provided bounds: {bad_granules}.'
)


def check_same_relative_orbits(job, granule_metadata):
previous_relative_orbit = None
for granule in granule_metadata:
name_split = granule['name'].split('_')
absolute_orbit = name_split[7]
# "Relationship between relative and absolute orbit numbers": https://sentiwiki.copernicus.eu/web/s1-products
offset = 73 if name_split[0] == 'S1A' else 27
relative_orbit = ((int(absolute_orbit) - offset) % 175) + 1
if not previous_relative_orbit:
previous_relative_orbit = relative_orbit
if relative_orbit != previous_relative_orbit:
raise GranuleValidationError(
f'Relative orbit number for {granule["name"]} does not match that of the previous granules: '
f'{relative_orbit} is not {previous_relative_orbit}.'
)


def convert_single_burst_jobs(jobs: list[dict]) -> list[dict]:
jobs = deepcopy(jobs)
for job in jobs:
Expand Down
24 changes: 14 additions & 10 deletions apps/check-processing-time/src/check_processing_time.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from typing import Union


def get_time_from_attempts(attempts: list[dict]) -> float:
Expand All @@ -9,15 +10,18 @@ def get_time_from_attempts(attempts: list[dict]) -> float:
return (final_attempt['StoppedAt'] - final_attempt['StartedAt']) / 1000


def get_time_from_result(result: dict) -> float:
if 'Attempts' in result:
attempts = result['Attempts']
else:
attempts = json.loads(result['Cause'])['Attempts']
return get_time_from_attempts(attempts)
def get_time_from_result(result: Union[list, dict]) -> Union[list, float]:
if isinstance(result, list):
return [get_time_from_result(item) for item in result]

if 'start' in result:
attempts = [{'StartedAt': start, 'StoppedAt': stop} for start, stop in zip(result['start'], result['stop'])]
return get_time_from_attempts(attempts)

def lambda_handler(event, context) -> list[float]:
results_dict = event['processing_results']
results = [results_dict[key] for key in sorted(results_dict.keys())]
return list(map(get_time_from_result, results))
return get_time_from_attempts(json.loads(result['Cause'])['Attempts'])


def lambda_handler(event, _) -> list[Union[list, float]]:
processing_results = event['processing_results']
result_list = [processing_results[key] for key in sorted(processing_results.keys())]
return get_time_from_result(result_list)
6 changes: 2 additions & 4 deletions apps/compute-cf.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ Outputs:
JobQueueArn:
Value: !Ref BatchJobQueue

{% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %}
{% set name = job_spec['compute_environment']['name'] %}
{% for name in compute_env_names %}
{{ name }}ComputeEnvironmentArn:
Value: !Ref {{ name }}ComputeEnvironment

Expand Down Expand Up @@ -106,8 +105,7 @@ Resources:
SchedulingPolicy:
Type: AWS::Batch::SchedulingPolicy

{% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %}
{% set env = job_spec['compute_environment'] %}
{% for env in compute_envs %}
{% set name = env['name'] %}
{% set instance_types = env['instance_types'].split(',') if 'instance_types' in env else '!Ref InstanceTypes' %}
{% set ami_id = env['ami_id'] if 'ami_id' in env else '!Ref AmiId' %}
Expand Down
8 changes: 4 additions & 4 deletions apps/handle-batch-event/handle-batch-event-cf.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ Parameters:
JobQueueArn:
Type: String

{% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %}
{{ job_spec['compute_environment']['name'] }}JobQueueArn:
{% for name in compute_env_names %}
{{ name }}JobQueueArn:
Type: String
{% endfor %}

Expand Down Expand Up @@ -100,8 +100,8 @@ Resources:
detail:
jobQueue:
- !Ref JobQueueArn
{% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %}
- !Ref {{ job_spec['compute_environment']['name'] }}JobQueueArn
{% for name in compute_env_names %}
- !Ref {{ name }}JobQueueArn
{% endfor %}
status:
- RUNNING
Expand Down
11 changes: 4 additions & 7 deletions apps/main-cf.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Parameters:
Type: List<AWS::EC2::Subnet::Id>

SecretArn:
Description: ARN of Secret in AWS Secrets Manager containing all the credentials needed for job tasks.
Description: ARN of Secret in AWS Secrets Manager containing all the credentials needed for job steps.
Type: String

ImageTag:
Expand Down Expand Up @@ -154,8 +154,7 @@ Resources:
Properties:
Parameters:
ComputeEnvironmentArn: !GetAtt Cluster.Outputs.ComputeEnvironmentArn
{% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %}
{% set name = job_spec['compute_environment']['name'] %}
{% for name in compute_env_names %}
{{ name }}ComputeEnvironmentArn: !GetAtt Cluster.Outputs.{{ name }}ComputeEnvironmentArn
{% endfor %}
DefaultMaxvCpus: !Ref DefaultMaxvCpus
Expand All @@ -173,8 +172,7 @@ Resources:
Properties:
Parameters:
JobQueueArn: !GetAtt Cluster.Outputs.JobQueueArn
{% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %}
{% set name = job_spec['compute_environment']['name'] %}
{% for name in compute_env_names %}
{{ name }}JobQueueArn: !GetAtt Cluster.Outputs.{{ name }}JobQueueArn
{% endfor %}
JobsTable: !Ref JobsTable
Expand All @@ -189,8 +187,7 @@ Resources:
Properties:
Parameters:
JobQueueArn: !GetAtt Cluster.Outputs.JobQueueArn
{% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %}
{% set name = job_spec['compute_environment']['name'] %}
{% for name in compute_env_names %}
{{ name }}JobQueueArn: !GetAtt Cluster.Outputs.{{ name }}JobQueueArn
{% endfor %}
TaskRoleArn: !GetAtt Cluster.Outputs.TaskRoleArn
Expand Down
Loading

0 comments on commit b932fdc

Please sign in to comment.