diff --git a/.github/actions/deploy-hyp3/action.yml b/.github/actions/deploy-hyp3/action.yml index 52986fd3e..d096c7e8c 100644 --- a/.github/actions/deploy-hyp3/action.yml +++ b/.github/actions/deploy-hyp3/action.yml @@ -30,7 +30,7 @@ inputs: description: "Comma separated list of Subnet IDs" required: true SECRET_ARN: - description: "ARN of Secret in AWS Secrets Manager containing all the credentials needed for job tasks" + description: "ARN of Secret in AWS Secrets Manager containing all the credentials needed for job steps" required: true CLOUDFORMATION_ROLE_ARN: description: "The CloudFormation role to use for this deployment" diff --git a/.github/workflows/changelog.yml b/.github/workflows/changelog.yml index 7740de9ed..3b1e74062 100644 --- a/.github/workflows/changelog.yml +++ b/.github/workflows/changelog.yml @@ -13,4 +13,4 @@ on: jobs: call-changelog-check-workflow: - uses: ASFHyP3/actions/.github/workflows/reusable-changelog-check.yml@v0.11.2 + uses: ASFHyP3/actions/.github/workflows/reusable-changelog-check.yml@v0.12.0 diff --git a/.github/workflows/create-jira-issue.yml b/.github/workflows/create-jira-issue.yml index 99489d507..d95ef849e 100644 --- a/.github/workflows/create-jira-issue.yml +++ b/.github/workflows/create-jira-issue.yml @@ -6,7 +6,7 @@ on: jobs: call-create-jira-issue-workflow: - uses: ASFHyP3/actions/.github/workflows/reusable-create-jira-issue.yml@v0.11.2 + uses: ASFHyP3/actions/.github/workflows/reusable-create-jira-issue.yml@v0.12.0 secrets: JIRA_BASE_URL: ${{ secrets.JIRA_BASE_URL }} JIRA_USER_EMAIL: ${{ secrets.JIRA_USER_EMAIL }} diff --git a/.github/workflows/deploy-daac.yml b/.github/workflows/deploy-daac.yml index 9abfb18b2..d1bfc1ba7 100644 --- a/.github/workflows/deploy-daac.yml +++ b/.github/workflows/deploy-daac.yml @@ -111,6 +111,6 @@ jobs: call-bump-version-workflow: if: github.ref == 'refs/heads/main' needs: deploy - uses: ASFHyP3/actions/.github/workflows/reusable-bump-version.yml@v0.11.2 + uses: ASFHyP3/actions/.github/workflows/reusable-bump-version.yml@v0.12.0 secrets: USER_TOKEN: ${{ secrets.TOOLS_BOT_PAK }} diff --git a/.github/workflows/deploy-enterprise-test.yml b/.github/workflows/deploy-enterprise-test.yml index ab7b964d6..d4479499a 100644 --- a/.github/workflows/deploy-enterprise-test.yml +++ b/.github/workflows/deploy-enterprise-test.yml @@ -88,6 +88,7 @@ jobs: job_files: >- job_spec/INSAR_ISCE_BURST.yml job_spec/SRG_GSLC.yml + job_spec/SRG_TIME_SERIES.yml instance_types: r6id.xlarge,r6id.2xlarge,r6id.4xlarge,r6id.8xlarge,r6idn.xlarge,r6idn.2xlarge,r6idn.4xlarge,r6idn.8xlarge default_max_vcpus: 640 expanded_max_vcpus: 640 diff --git a/.github/workflows/deploy-enterprise.yml b/.github/workflows/deploy-enterprise.yml index 98bb769a3..024c27fdd 100644 --- a/.github/workflows/deploy-enterprise.yml +++ b/.github/workflows/deploy-enterprise.yml @@ -245,6 +245,7 @@ jobs: job_files: >- job_spec/INSAR_ISCE_BURST.yml job_spec/SRG_GSLC.yml + job_spec/SRG_TIME_SERIES.yml instance_types: r6id.xlarge,r6id.2xlarge,r6id.4xlarge,r6id.8xlarge,r6idn.xlarge,r6idn.2xlarge,r6idn.4xlarge,r6idn.8xlarge default_max_vcpus: 640 expanded_max_vcpus: 640 diff --git a/.github/workflows/labeled-pr.yml b/.github/workflows/labeled-pr.yml index f89f3e3bd..f408f3b37 100644 --- a/.github/workflows/labeled-pr.yml +++ b/.github/workflows/labeled-pr.yml @@ -12,4 +12,4 @@ on: jobs: call-labeled-pr-check-workflow: - uses: ASFHyP3/actions/.github/workflows/reusable-labeled-pr-check.yml@v0.11.2 + uses: ASFHyP3/actions/.github/workflows/reusable-labeled-pr-check.yml@v0.12.0 diff --git a/.github/workflows/release-template-comment.yml b/.github/workflows/release-template-comment.yml index 8311cb7b1..1c0dcccea 100644 --- a/.github/workflows/release-template-comment.yml +++ b/.github/workflows/release-template-comment.yml @@ -7,7 +7,7 @@ on: jobs: call-release-checklist-workflow: - uses: ASFHyP3/actions/.github/workflows/reusable-release-checklist-comment.yml@v0.11.2 + uses: ASFHyP3/actions/.github/workflows/reusable-release-checklist-comment.yml@v0.12.0 permissions: pull-requests: write with: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 06194fe08..1bd429336 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -7,7 +7,7 @@ on: jobs: call-release-workflow: - uses: ASFHyP3/actions/.github/workflows/reusable-release.yml@v0.11.2 + uses: ASFHyP3/actions/.github/workflows/reusable-release.yml@v0.12.0 with: release_prefix: HyP3 secrets: diff --git a/.github/workflows/static-analysis.yml b/.github/workflows/static-analysis.yml index d94186bc6..0fa2dc434 100644 --- a/.github/workflows/static-analysis.yml +++ b/.github/workflows/static-analysis.yml @@ -87,4 +87,4 @@ jobs: snyk iac test --severity-threshold=high call-secrets-analysis-workflow: - uses: ASFHyP3/actions/.github/workflows/reusable-secrets-analysis.yml@v0.11.2 + uses: ASFHyP3/actions/.github/workflows/reusable-secrets-analysis.yml@v0.12.0 diff --git a/.gitignore b/.gitignore index f1e057d6a..d37283ec2 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ apps/api/src/hyp3_api/api-spec/job_parameters.yml apps/api/src/hyp3_api/job_validation_map.yml apps/step-function.json apps/**/*-cf.yml +apps/start-execution-worker/src/batch_params_by_job_type.json lib/dynamo/dynamo/*.json lib/dynamo/dynamo/*.yml diff --git a/CHANGELOG.md b/CHANGELOG.md index c546afe62..18de461ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,23 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [8.0.0] + +### Added +- A job step can now be applied to every item in a list using a new `map: for in ` syntax. For example, given a job spec with a `granules` parameter, a step that includes a `map: for granule in granules` field is applied to each item in the `granules` list and can refer to `Ref::granule` within its `command` field. +- If a job contains a `map` step, the processing time value for that step (in the `processing_times` list in the job's API response) is a sub-list of processing times for the step's iterations, in the same order as the items in the input list. +- A new `SRG_TIME_SERIES` job type has been added to the `hyp3-lavas` and `hyp3-lavas-test` deployments. This workflow uses the new `map` syntax described above to produce a GSLC for each level-0 Sentinel-1 granule passed via the `granules` parameter and then produces a time series product from the GSLCs. See the [HyP3 SRG](https://github.com/ASFHyP3/hyp3-srg) plugin. +- The `SRG_GSLC` job type now includes parameter validation. + +### Changed +- Changes to custom compute environments: + - Custom compute environments are now applied to individual job steps rather than to entire jobs. The `compute_environment` field is now provided at the step level rather than at the top level of the job spec. + - If the value of the `compute_environment` field is `Default`, then the step uses the deployment's default compute environment. Otherwise, the value must be the name of a custom compute environment defined in `job_spec/config/compute_environments.yml`. +- Other changes to the job spec syntax: + - The `tasks` field has been renamed to `steps`. + - Job parameters no longer contain a top-level `default` field. The `default` field within each parameter's `api_schema` mapping is still supported. + - Job specs no longer explicitly define a `bucket_prefix` parameter. Instead, `bucket_prefix` is automatically defined and can still be referenced as `Ref::bucket_prefix` within each step's `command` field. + ## [7.12.0] ### Changed diff --git a/Makefile b/Makefile index c76c9a847..81d462c22 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ API = ${PWD}/apps/api/src +APPS = ${PWD}/apps CHECK_PROCESSING_TIME = ${PWD}/apps/check-processing-time/src GET_FILES = ${PWD}/apps/get-files/src HANDLE_BATCH_EVENT = ${PWD}/apps/handle-batch-event/src @@ -10,7 +11,7 @@ DISABLE_PRIVATE_DNS = ${PWD}/apps/disable-private-dns/src UPDATE_DB = ${PWD}/apps/update-db/src UPLOAD_LOG = ${PWD}/apps/upload-log/src DYNAMO = ${PWD}/lib/dynamo -export PYTHONPATH = ${API}:${CHECK_PROCESSING_TIME}:${GET_FILES}:${HANDLE_BATCH_EVENT}:${SET_BATCH_OVERRIDES}:${SCALE_CLUSTER}:${START_EXECUTION_MANAGER}:${START_EXECUTION_WORKER}:${DISABLE_PRIVATE_DNS}:${UPDATE_DB}:${UPLOAD_LOG}:${DYNAMO} +export PYTHONPATH = ${API}:${CHECK_PROCESSING_TIME}:${GET_FILES}:${HANDLE_BATCH_EVENT}:${SET_BATCH_OVERRIDES}:${SCALE_CLUSTER}:${START_EXECUTION_MANAGER}:${START_EXECUTION_WORKER}:${DISABLE_PRIVATE_DNS}:${UPDATE_DB}:${UPLOAD_LOG}:${DYNAMO}:${APPS} build: render @@ -36,11 +37,12 @@ install: python -m pip install -r requirements-all.txt files ?= job_spec/*.yml +compute_env_file ?= job_spec/config/compute_environments.yml security_environment ?= ASF api_name ?= local cost_profile ?= DEFAULT render: - @echo rendering $(files) for API $(api_name) and security environment $(security_environment); python apps/render_cf.py -j $(files) -s $(security_environment) -n $(api_name) -c $(cost_profile) + @echo rendering $(files) for API $(api_name) and security environment $(security_environment); python apps/render_cf.py -j $(files) -e $(compute_env_file) -s $(security_environment) -n $(api_name) -c $(cost_profile) static: flake8 openapi-validate cfn-lint diff --git a/apps/api/src/hyp3_api/api-spec/job_parameters.yml.j2 b/apps/api/src/hyp3_api/api-spec/job_parameters.yml.j2 index 0351fbf7a..83ea5c963 100644 --- a/apps/api/src/hyp3_api/api-spec/job_parameters.yml.j2 +++ b/apps/api/src/hyp3_api/api-spec/job_parameters.yml.j2 @@ -26,13 +26,13 @@ components: description: Parameters for running {{ job_type }} jobs type: object additionalProperties: false - {% for parameter, parameter_spec in job_spec['parameters'].items() if 'api_schema' in parameter_spec and parameter in job_spec.get('required_parameters', []) %} + {% for parameter, parameter_spec in job_spec['parameters'].items() if parameter in job_spec.get('required_parameters', []) %} {% if loop.first %} required: {% endif %} - {{ parameter }} {% endfor %} - {% for parameter, parameter_spec in job_spec['parameters'].items() if 'api_schema' in parameter_spec %} + {% for parameter, parameter_spec in job_spec['parameters'].items() %} {% if loop.first %} properties: {% endif %} diff --git a/apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 b/apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 index 16c014488..ebee991a0 100644 --- a/apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 +++ b/apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 @@ -408,20 +408,25 @@ components: processing_times: description: > - A list of run times for the job's processing steps in the order that they were executed. For example, - a job comprised of a single processing step would yield a list containing one processing time, while a job - comprised of three processing steps would yield a list containing three processing times. An empty list - represents a failure to calculate processing times. + List of run times for the job's processing steps in the order that they were executed. + An empty list represents a failure to calculate processing times. type: array items: - $ref: '#/components/schemas/processing_time_in_seconds' + oneOf: + - type: array + items: + $ref: '#/components/schemas/processing_time_in_seconds' + example: [200, 100, 150] + - $ref: '#/components/schemas/processing_time_in_seconds' + processing_time_in_seconds: description: > - Run time in seconds for a particular processing step's final attempt (regardless of whether it succeeded). + Run time in seconds for a processing step's final attempt (regardless of whether it succeeded). A value of zero indicates that there were no attempts. type: number minimum: 0 + example: 50 securitySchemes: EarthDataLogin: diff --git a/apps/api/src/hyp3_api/handlers.py b/apps/api/src/hyp3_api/handlers.py index 393fc4ac6..fbe92de96 100644 --- a/apps/api/src/hyp3_api/handlers.py +++ b/apps/api/src/hyp3_api/handlers.py @@ -6,7 +6,7 @@ import dynamo from dynamo.exceptions import AccessCodeError, InsufficientCreditsError, UnexpectedApplicationStatusError from hyp3_api import util -from hyp3_api.validation import GranuleValidationError, validate_jobs +from hyp3_api.validation import BoundsValidationError, GranuleValidationError, validate_jobs def problem_format(status, message): @@ -28,7 +28,7 @@ def post_jobs(body, user): validate_jobs(body['jobs']) except requests.HTTPError as e: print(f'WARN: CMR search failed: {e}') - except GranuleValidationError as e: + except (BoundsValidationError, GranuleValidationError) as e: abort(problem_format(400, str(e))) try: diff --git a/apps/api/src/hyp3_api/validation.py b/apps/api/src/hyp3_api/validation.py index 0f5acbe5e..4355f71f6 100644 --- a/apps/api/src/hyp3_api/validation.py +++ b/apps/api/src/hyp3_api/validation.py @@ -18,6 +18,10 @@ class GranuleValidationError(Exception): pass +class BoundsValidationError(Exception): + pass + + with open(Path(__file__).parent / 'job_validation_map.yml') as f: JOB_VALIDATION_MAP = yaml.safe_load(f.read()) @@ -136,6 +140,62 @@ def get_multipolygon_from_geojson(input_file): return MultiPolygon(polygons) +def check_bounds_formatting(job, _): + bounds = job['job_parameters']['bounds'] + if bounds == [0.0, 0.0, 0.0, 0.0]: + return + + if bounds[0] >= bounds[2] or bounds[1] >= bounds[3]: + raise BoundsValidationError( + 'Invalid order for bounds. Bounds should be ordered [min lon, min lat, max lon, max lat].' + ) + + def bad_lat(lat): + return lat > 90 or lat < -90 + + def bad_lon(lon): + return lon > 180 or lon < -180 + + if any([bad_lon(bounds[0]), bad_lon(bounds[2]), bad_lat(bounds[1]), bad_lat(bounds[3])]): + raise BoundsValidationError( + 'Invalid lon/lat value(s) in bounds. Bounds should be ordered [min lon, min lat, max lon, max lat].' + ) + + +def check_granules_intersecting_bounds(job, granule_metadata): + bounds = job['job_parameters']['bounds'] + if bounds == [0.0, 0.0, 0.0, 0.0]: + bounds = granule_metadata[0]['polygon'] + else: + bounds = Polygon.from_bounds(*bounds) + bad_granules = [] + for granule in granule_metadata: + bbox = granule['polygon'] + if not bbox.intersection(bounds): + bad_granules.append(granule['name']) + if bad_granules: + raise GranuleValidationError( + f'The following granules do not intersect the provided bounds: {bad_granules}.' + ) + + +def check_same_relative_orbits(job, granule_metadata): + previous_relative_orbit = None + for granule in granule_metadata: + name_split = granule['name'].split('_') + absolute_orbit = name_split[7] + # "Relationship between relative and absolute orbit numbers": https://sentiwiki.copernicus.eu/web/s1-products + offset = 73 if name_split[0] == 'S1A' else 27 + relative_orbit = ((int(absolute_orbit) - offset) % 175) + 1 + if not previous_relative_orbit: + previous_relative_orbit = relative_orbit + if relative_orbit != previous_relative_orbit: + raise GranuleValidationError( + f'Relative orbit number for {granule["name"]} does not match that of the previous granules: ' + f'{relative_orbit} is not {previous_relative_orbit}.' + ) + + def convert_single_burst_jobs(jobs: list[dict]) -> list[dict]: jobs = deepcopy(jobs) for job in jobs: diff --git a/apps/check-processing-time/src/check_processing_time.py b/apps/check-processing-time/src/check_processing_time.py index 7cc979b7a..bb5456e13 100644 --- a/apps/check-processing-time/src/check_processing_time.py +++ b/apps/check-processing-time/src/check_processing_time.py @@ -1,4 +1,5 @@ import json +from typing import Union def get_time_from_attempts(attempts: list[dict]) -> float: @@ -9,15 +10,18 @@ def get_time_from_attempts(attempts: list[dict]) -> float: return (final_attempt['StoppedAt'] - final_attempt['StartedAt']) / 1000 -def get_time_from_result(result: dict) -> float: - if 'Attempts' in result: - attempts = result['Attempts'] - else: - attempts = json.loads(result['Cause'])['Attempts'] - return get_time_from_attempts(attempts) +def get_time_from_result(result: Union[list, dict]) -> Union[list, float]: + if isinstance(result, list): + return [get_time_from_result(item) for item in result] + if 'start' in result: + attempts = [{'StartedAt': start, 'StoppedAt': stop} for start, stop in zip(result['start'], result['stop'])] + return get_time_from_attempts(attempts) -def lambda_handler(event, context) -> list[float]: - results_dict = event['processing_results'] - results = [results_dict[key] for key in sorted(results_dict.keys())] - return list(map(get_time_from_result, results)) + return get_time_from_attempts(json.loads(result['Cause'])['Attempts']) + + +def lambda_handler(event, _) -> list[Union[list, float]]: + processing_results = event['processing_results'] + result_list = [processing_results[key] for key in sorted(processing_results.keys())] + return get_time_from_result(result_list) diff --git a/apps/compute-cf.yml.j2 b/apps/compute-cf.yml.j2 index 80e52b489..fab82396b 100644 --- a/apps/compute-cf.yml.j2 +++ b/apps/compute-cf.yml.j2 @@ -29,15 +29,14 @@ Outputs: JobQueueArn: Value: !Ref BatchJobQueue - {% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %} - {% set name = job_spec['compute_environment']['name'] %} + {% for name in compute_envs %} {{ name }}ComputeEnvironmentArn: Value: !Ref {{ name }}ComputeEnvironment {{ name }}JobQueueArn: Value: !Ref {{ name }}JobQueue - {% endfor %} + {% endfor %} TaskRoleArn: Value: !GetAtt TaskRole.Arn @@ -106,9 +105,7 @@ Resources: SchedulingPolicy: Type: AWS::Batch::SchedulingPolicy - {% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %} - {% set env = job_spec['compute_environment'] %} - {% set name = env['name'] %} + {% for name, env in compute_envs.items() %} {% set instance_types = env['instance_types'].split(',') if 'instance_types' in env else '!Ref InstanceTypes' %} {% set ami_id = env['ami_id'] if 'ami_id' in env else '!Ref AmiId' %} {% set type = env['allocation_type'] if 'allocation_type' in env else 'SPOT' %} diff --git a/apps/handle-batch-event/handle-batch-event-cf.yml.j2 b/apps/handle-batch-event/handle-batch-event-cf.yml.j2 index 386ae87c6..cf39fd7a2 100644 --- a/apps/handle-batch-event/handle-batch-event-cf.yml.j2 +++ b/apps/handle-batch-event/handle-batch-event-cf.yml.j2 @@ -5,8 +5,8 @@ Parameters: JobQueueArn: Type: String - {% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %} - {{ job_spec['compute_environment']['name'] }}JobQueueArn: + {% for name in compute_envs %} + {{ name }}JobQueueArn: Type: String {% endfor %} @@ -100,8 +100,8 @@ Resources: detail: jobQueue: - !Ref JobQueueArn - {% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %} - - !Ref {{ job_spec['compute_environment']['name'] }}JobQueueArn + {% for name in compute_envs %} + - !Ref {{ name }}JobQueueArn {% endfor %} status: - RUNNING diff --git a/apps/main-cf.yml.j2 b/apps/main-cf.yml.j2 index 13b7405d5..5f33c52f4 100644 --- a/apps/main-cf.yml.j2 +++ b/apps/main-cf.yml.j2 @@ -9,7 +9,7 @@ Parameters: Type: List SecretArn: - Description: ARN of Secret in AWS Secrets Manager containing all the credentials needed for job tasks. + Description: ARN of Secret in AWS Secrets Manager containing all the credentials needed for job steps. Type: String ImageTag: @@ -154,8 +154,7 @@ Resources: Properties: Parameters: ComputeEnvironmentArn: !GetAtt Cluster.Outputs.ComputeEnvironmentArn - {% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %} - {% set name = job_spec['compute_environment']['name'] %} + {% for name in compute_envs %} {{ name }}ComputeEnvironmentArn: !GetAtt Cluster.Outputs.{{ name }}ComputeEnvironmentArn {% endfor %} DefaultMaxvCpus: !Ref DefaultMaxvCpus @@ -173,8 +172,7 @@ Resources: Properties: Parameters: JobQueueArn: !GetAtt Cluster.Outputs.JobQueueArn - {% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %} - {% set name = job_spec['compute_environment']['name'] %} + {% for name in compute_envs %} {{ name }}JobQueueArn: !GetAtt Cluster.Outputs.{{ name }}JobQueueArn {% endfor %} JobsTable: !Ref JobsTable @@ -189,8 +187,7 @@ Resources: Properties: Parameters: JobQueueArn: !GetAtt Cluster.Outputs.JobQueueArn - {% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %} - {% set name = job_spec['compute_environment']['name'] %} + {% for name in compute_envs %} {{ name }}JobQueueArn: !GetAtt Cluster.Outputs.{{ name }}JobQueueArn {% endfor %} TaskRoleArn: !GetAtt Cluster.Outputs.TaskRoleArn diff --git a/apps/render_cf.py b/apps/render_cf.py index 1907d065a..37f248602 100644 --- a/apps/render_cf.py +++ b/apps/render_cf.py @@ -11,7 +11,156 @@ def snake_to_pascal_case(input_string: str): return ''.join([i.title() for i in split_string]) -def render_templates(job_types, security_environment, api_name): +def get_states_for_jobs(job_types: dict) -> dict: + states = {} + for job_spec in job_types.values(): + states.update(get_states_for_job(job_spec)) + return states + + +def get_states_for_job(job_spec: dict) -> dict: + states = {} + steps = job_spec['steps'] + for i in range(len(steps)): + step = steps[i] + next_state_name = steps[i + 1]['name'] if i < len(steps) - 1 else 'GET_FILES' + states[step['name']] = get_state_for_job_step(step, i, next_state_name, job_spec) + return states + + +def get_state_for_job_step(step: dict, index: int, next_state_name: str, job_spec: dict) -> dict: + if 'map' in step: + state = get_map_state(job_spec, step) + else: + state = get_batch_submit_job_state(job_spec, step, filter_batch_params=True) + state.update( + { + 'Catch': [ + { + 'ErrorEquals': [ + 'States.ALL' + ], + 'ResultPath': f'$.results.processing_results.step_{index}', + 'Next': 'PROCESSING_FAILED', + }, + ], + 'ResultPath': f'$.results.processing_results.step_{index}', + 'Next': next_state_name, + } + ) + return state + + +def get_map_state(job_spec: dict, step: dict) -> dict: + item, items = parse_map_statement(step['map']) + + batch_job_parameters = get_batch_job_parameters(job_spec, step, map_item=item) + + submit_job_state = get_batch_submit_job_state(job_spec, step) + submit_job_state['End'] = True + submit_job_state_name = step['name'] + '_SUBMIT_JOB' + return { + 'Type': 'Map', + 'ItemsPath': f'$.job_parameters.{items}', + 'ItemSelector': { + 'job_id.$': '$.job_id', + 'priority.$': '$.priority', + 'container_overrides.$': '$.container_overrides', + 'batch_job_parameters': batch_job_parameters, + }, + 'ItemProcessor': { + 'StartAt': submit_job_state_name, + 'States': { + submit_job_state_name: submit_job_state, + } + } + } + + +def get_batch_submit_job_state(job_spec: dict, step: dict, filter_batch_params=False) -> dict: + if filter_batch_params: + batch_job_parameters = get_batch_job_parameters(job_spec, step) + parameters_key = 'Parameters' + else: + batch_job_parameters = '$.batch_job_parameters' + parameters_key = 'Parameters.$' + + compute_environment = step['compute_environment'] + job_queue = 'JobQueueArn' if compute_environment == 'Default' else compute_environment + 'JobQueueArn' + return { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::batch:submitJob.sync', + 'Parameters': { + 'JobDefinition': '${' + snake_to_pascal_case(step['name']) + '}', + 'JobName.$': '$.job_id', + 'JobQueue': '${' + job_queue + '}', + 'ShareIdentifier': 'default', + 'SchedulingPriorityOverride.$': '$.priority', + parameters_key: batch_job_parameters, + 'ContainerOverrides.$': '$.container_overrides', + 'RetryStrategy': { + 'Attempts': 3 + }, + }, + 'ResultSelector': { + 'start.$': '$.Attempts[*].StartedAt', + 'stop.$': '$.Attempts[*].StoppedAt', + }, + 'Retry': [ + { + 'ErrorEquals': [ + 'Batch.ServerException', + 'Batch.AWSBatchException' + ], + 'MaxAttempts': 2 + }, + { + 'ErrorEquals': [ + 'States.ALL' + ], + 'MaxAttempts': 0 + } + ] + } + + +def parse_map_statement(map_statement: str) -> tuple[str, str]: + tokens = map_statement.split(' ') + if len(tokens) != 4: + raise ValueError(f'expected 4 tokens in map statement but got {len(tokens)}: {map_statement}') + if tokens[0] != 'for': + raise ValueError(f"expected 'for', got '{tokens[0]}': {map_statement}") + if tokens[2] != 'in': + raise ValueError(f"expected 'in', got '{tokens[2]}': {map_statement}") + return tokens[1], tokens[3] + + +def get_batch_job_parameters(job_spec: dict, step: dict, map_item: str = None) -> dict: + job_params = {'bucket_prefix', *job_spec['parameters'].keys()} + step_params = get_batch_param_names_for_job_step(step) + batch_params = {} + for param in step_params: + if param == map_item: + batch_params[f'{map_item}.$'] = '$$.Map.Item.Value' + else: + if param not in job_params: + raise ValueError(f"job parameter '{param}' has not been defined") + batch_params[f'{param}.$'] = f'$.batch_job_parameters.{param}' + return batch_params + + +def get_batch_param_names_for_job_step(step: dict) -> set[str]: + ref_prefix = 'Ref::' + return { + arg.removeprefix(ref_prefix) + for arg in step['command'] + if arg.startswith(ref_prefix) + } + + +def render_templates(job_types: dict, compute_envs: dict, security_environment: str, api_name: str): + job_states = get_states_for_jobs(job_types) + env = jinja2.Environment( loader=jinja2.FileSystemLoader('./'), autoescape=jinja2.select_autoescape(default=True, disabled_extensions=('j2',)), @@ -26,20 +175,50 @@ def render_templates(job_types, security_environment, api_name): output = template.render( job_types=job_types, + compute_envs=compute_envs, security_environment=security_environment, api_name=api_name, json=json, snake_to_pascal_case=snake_to_pascal_case, + job_states=job_states, ) + if str(template_file).endswith('.json.j2'): + output = json.dumps(json.loads(output), indent=2) + template_file.with_suffix('').write_text(output) +def get_compute_environments_for_deployment(job_types: dict, compute_env_file: Path) -> dict: + compute_envs = yaml.safe_load(compute_env_file.read_text())['compute_environments'] + + if 'Default' in compute_envs: + raise ValueError("'Default' is a reserved compute environment name") + + return { + step['compute_environment']: compute_envs[step['compute_environment']] + for job_spec in job_types.values() + for step in job_spec['steps'] + if step['compute_environment'] != 'Default' + } + + +def render_batch_params_by_job_type(job_types: dict) -> None: + batch_params_by_job_type = {} + for job_type, job_spec in job_types.items(): + params = set() + for step in job_spec['steps']: + params.update(get_batch_param_names_for_job_step(step)) + batch_params_by_job_type[job_type] = list(params) + with (Path('apps') / 'start-execution-worker' / 'src' / 'batch_params_by_job_type.json').open('w') as f: + json.dump(batch_params_by_job_type, f, indent=2) + + def render_default_params_by_job_type(job_types: dict) -> None: default_params_by_job_type = { job_type: { key: value['api_schema']['default'] for key, value in job_spec['parameters'].items() - if key not in job_spec['required_parameters'] and 'api_schema' in value + if key not in job_spec['required_parameters'] } for job_type, job_spec in job_types.items() } @@ -59,9 +238,31 @@ def render_costs(job_types: dict, cost_profile: str) -> None: json.dump(costs, f, indent=2) +def validate_job_spec(job_type: str, job_spec: dict) -> None: + expected_fields = sorted(['required_parameters', 'parameters', 'cost_profiles', 'validators', 'steps']) + actual_fields = sorted(job_spec.keys()) + if actual_fields != expected_fields: + raise ValueError(f'{job_type} has fields {actual_fields} but should have {expected_fields}') + + reserved_params = {'bucket_prefix'} + reserved_params_in_spec = reserved_params.intersection(set(job_spec['parameters'].keys())) + if reserved_params_in_spec: + raise ValueError(f'{job_type} contains reserved parameter names: {sorted(reserved_params_in_spec)}') + + expected_param_fields = ['api_schema'] + for param_name, param_dict in job_spec['parameters'].items(): + actual_param_fields = sorted(param_dict.keys()) + if actual_param_fields != expected_param_fields: + raise ValueError( + f"parameter '{param_name}' for {job_type} has fields {actual_param_fields} " + f"but should have {expected_param_fields}" + ) + + def main(): parser = argparse.ArgumentParser() parser.add_argument('-j', '--job-spec-files', required=True, nargs='+', type=Path) + parser.add_argument('-e', '--compute-environment-file', required=True, type=Path) parser.add_argument('-s', '--security-environment', default='ASF', choices=['ASF', 'EDC', 'JPL', 'JPL-public']) parser.add_argument('-n', '--api-name', required=True) parser.add_argument('-c', '--cost-profile', default='DEFAULT', choices=['DEFAULT', 'EDC']) @@ -72,12 +273,18 @@ def main(): job_types.update(yaml.safe_load(file.read_text())) for job_type, job_spec in job_types.items(): - for task in job_spec['tasks']: - task['name'] = job_type + '_' + task['name'] if task['name'] else job_type + validate_job_spec(job_type, job_spec) + + for job_type, job_spec in job_types.items(): + for step in job_spec['steps']: + step['name'] = job_type + '_' + step['name'] if step['name'] else job_type + + compute_envs = get_compute_environments_for_deployment(job_types, args.compute_environment_file) + render_batch_params_by_job_type(job_types) render_default_params_by_job_type(job_types) render_costs(job_types, args.cost_profile) - render_templates(job_types, args.security_environment, args.api_name) + render_templates(job_types, compute_envs, args.security_environment, args.api_name) if __name__ == '__main__': diff --git a/apps/scale-cluster/scale-cluster-cf.yml.j2 b/apps/scale-cluster/scale-cluster-cf.yml.j2 index 358f095b3..0fa44cd90 100644 --- a/apps/scale-cluster/scale-cluster-cf.yml.j2 +++ b/apps/scale-cluster/scale-cluster-cf.yml.j2 @@ -5,8 +5,7 @@ Parameters: ComputeEnvironmentArn: Type: String - {% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %} - {% set name = job_spec['compute_environment']['name'] %} + {% for name in compute_envs %} {{ name }}ComputeEnvironmentArn: Type: String {% endfor %} @@ -87,8 +86,8 @@ Resources: Action: batch:UpdateComputeEnvironment Resource: - !Ref ComputeEnvironmentArn - {% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %} - - !Ref {{ job_spec['compute_environment']['name'] }}ComputeEnvironmentArn + {% for name in compute_envs %} + - !Ref {{ name }}ComputeEnvironmentArn {% endfor %} Lambda: @@ -128,8 +127,7 @@ Resources: Targets: - Arn: !GetAtt Lambda.Arn Id: lambda - {% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %} - {% set name = job_spec['compute_environment']['name'] %} + {% for name in compute_envs %} - Arn: !GetAtt {{ name }}Lambda.Arn Id: {{ name }}lambda {% endfor %} @@ -142,8 +140,7 @@ Resources: Principal: events.amazonaws.com SourceArn: !GetAtt Schedule.Arn - {% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %} - {% set name = job_spec['compute_environment']['name'] %} + {% for name in compute_envs %} {{ name }}LogGroup: Type: AWS::Logs::LogGroup Properties: diff --git a/apps/set-batch-overrides/src/set_batch_overrides.py b/apps/set-batch-overrides/src/set_batch_overrides.py index 0a9dc6627..9d419ec31 100644 --- a/apps/set-batch-overrides/src/set_batch_overrides.py +++ b/apps/set-batch-overrides/src/set_batch_overrides.py @@ -35,7 +35,7 @@ def get_container_overrides(memory: str, omp_num_threads: str = None) -> dict: def get_insar_isce_burst_memory(job_parameters: dict) -> str: looks = job_parameters['looks'] - bursts = len(job_parameters['reference'].split(' ')) + bursts = len(job_parameters['reference']) if looks == '5x1': if bursts < 2: return INSAR_ISCE_BURST_MEMORY_8G @@ -70,16 +70,16 @@ def lambda_handler(event: dict, _) -> dict: omp_num_threads = INSAR_ISCE_BURST_OMP_NUM_THREADS[memory] return get_container_overrides(memory, omp_num_threads) - if job_type == 'AUTORIFT' and job_parameters['granules'].startswith('S2'): + if job_type == 'AUTORIFT' and job_parameters['granules'][0].startswith('S2'): return get_container_overrides(AUTORIFT_S2_MEMORY) - if job_type == 'AUTORIFT' and job_parameters['granules'].startswith('L'): + if job_type == 'AUTORIFT' and job_parameters['granules'][0].startswith('L'): return get_container_overrides(AUTORIFT_LANDSAT_MEMORY) - if job_type == 'RTC_GAMMA' and job_parameters['resolution'] in ['10', '20']: + if job_type == 'RTC_GAMMA' and job_parameters['resolution'] in [10, 20]: return get_container_overrides(RTC_GAMMA_10M_MEMORY) - if job_type in ['WATER_MAP', 'WATER_MAP_EQ'] and job_parameters['resolution'] in ['10', '20']: + if job_type in ['WATER_MAP', 'WATER_MAP_EQ'] and job_parameters['resolution'] in [10, 20]: return get_container_overrides(WATER_MAP_10M_MEMORY) return {} diff --git a/apps/start-execution-worker/src/start_execution_worker.py b/apps/start-execution-worker/src/start_execution_worker.py index 7becf0093..63b3b2a72 100644 --- a/apps/start-execution-worker/src/start_execution_worker.py +++ b/apps/start-execution-worker/src/start_execution_worker.py @@ -1,5 +1,6 @@ import json import os +from pathlib import Path from typing import Any import boto3 @@ -8,6 +9,13 @@ STEP_FUNCTION = boto3.client('stepfunctions') +batch_params_file = Path(__file__).parent / 'batch_params_by_job_type.json' +if batch_params_file.exists(): + BATCH_PARAMS_BY_JOB_TYPE = json.loads(batch_params_file.read_text()) +else: + # Allows mocking with unittest.mock.patch + BATCH_PARAMS_BY_JOB_TYPE = {} + def convert_to_string(obj: Any) -> str: if isinstance(obj, list): @@ -15,17 +23,21 @@ def convert_to_string(obj: Any) -> str: return str(obj) -def convert_parameters_to_strings(parameters: dict[str, Any]) -> dict[str, str]: - return {key: convert_to_string(value) for key, value in parameters.items()} +def get_batch_job_parameters(job: dict) -> dict[str, str]: + # Convert parameters to strings so they can be passed to Batch; see: + # https://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.html#Batch-SubmitJob-request-parameters + return { + key: convert_to_string(value) + for key, value in job['job_parameters'].items() + if key in BATCH_PARAMS_BY_JOB_TYPE[job['job_type']] + } def submit_jobs(jobs: list[dict]) -> None: step_function_arn = os.environ['STEP_FUNCTION_ARN'] logger.info(f'Step function ARN: {step_function_arn}') for job in jobs: - # Convert parameters to strings so they can be passed to Batch; see: - # https://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.html#Batch-SubmitJob-request-parameters - job['job_parameters'] = convert_parameters_to_strings(job['job_parameters']) + job['batch_job_parameters'] = get_batch_job_parameters(job) STEP_FUNCTION.start_execution( stateMachineArn=step_function_arn, input=json.dumps(job, sort_keys=True), diff --git a/apps/step-function.json.j2 b/apps/step-function.json.j2 index bc168208d..567f1cef4 100644 --- a/apps/step-function.json.j2 +++ b/apps/step-function.json.j2 @@ -35,7 +35,7 @@ "ADD_PREFIX_TO_JOB_PARAMETERS": { "Type": "Pass", "InputPath": "$.job_id", - "ResultPath": "$.job_parameters.bucket_prefix", + "ResultPath": "$.batch_job_parameters.bucket_prefix", "Next": "SET_BATCH_OVERRIDES" }, "SET_BATCH_OVERRIDES": { @@ -72,59 +72,14 @@ { "Variable": "$.job_type", "StringEquals": "{{ job_type }}", - "Next": "{{ job_spec['tasks'][0]['name'] }}" + "Next": "{{ job_spec['steps'][0]['name'] }}" }{% if not loop.last %},{% endif %} {% endfor %} ], "Default": "JOB_FAILED" }, - {% for job_type, job_spec in job_types.items() %} - {% for task in job_spec['tasks'] %} - "{{ task['name'] }}": { - "Type": "Task", - "Resource": "arn:aws:states:::batch:submitJob.sync", - "Parameters": { - "JobDefinition": "{{ '${'+ snake_to_pascal_case(task['name']) + '}' }}", - "JobName.$": "$.job_id", - {% set name = job_spec['compute_environment']['name'] %} - {% set job_queue = name + 'JobQueueArn' if 'Default' != name else 'JobQueueArn' %} - "JobQueue": "{{ '${' + job_queue + '}' }}", - "ShareIdentifier": "default", - "SchedulingPriorityOverride.$": "$.priority", - "Parameters.$": "$.job_parameters", - "ContainerOverrides.$": "$.container_overrides", - "RetryStrategy": { - "Attempts": 3 - } - }, - "ResultPath": "$.results.processing_results.step_{{ loop.index0 }}", - "Next": "{% if not loop.last %}{{ loop.nextitem['name'] }}{% else %}GET_FILES{% endif %}", - "Retry": [ - { - "ErrorEquals": [ - "Batch.ServerException", - "Batch.AWSBatchException" - ], - "MaxAttempts": 2 - }, - { - "ErrorEquals": [ - "States.ALL" - ], - "MaxAttempts": 0 - } - ], - "Catch": [ - { - "ErrorEquals": [ - "States.ALL" - ], - "Next": "PROCESSING_FAILED", - "ResultPath": "$.results.processing_results.step_{{ loop.index0 }}" - } - ] - }, - {% endfor %} + {% for name, body in job_states.items() %} + "{{ name }}": {{ json.dumps(body) }}, {% endfor %} "PROCESSING_FAILED": { "Type": "Pass", diff --git a/apps/upload-log/src/upload_log.py b/apps/upload-log/src/upload_log.py index 284641093..aecab3043 100644 --- a/apps/upload-log/src/upload_log.py +++ b/apps/upload-log/src/upload_log.py @@ -58,7 +58,6 @@ def write_log_to_s3(bucket, prefix, content): def lambda_handler(event, context): - # TODO handle all results, not just the last one results_dict = event['processing_results'] result = results_dict[max(results_dict.keys())] diff --git a/apps/workflow-cf.yml.j2 b/apps/workflow-cf.yml.j2 index 0859387c6..011bbf813 100644 --- a/apps/workflow-cf.yml.j2 +++ b/apps/workflow-cf.yml.j2 @@ -5,8 +5,8 @@ Parameters: JobQueueArn: Type: String - {% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %} - {{ job_spec['compute_environment']['name'] }}JobQueueArn: + {% for name in compute_envs %} + {{ name }}JobQueueArn: Type: String {% endfor %} @@ -44,45 +44,41 @@ Outputs: Resources: {% for job_type, job_spec in job_types.items() %} - {% for task in job_spec['tasks'] %} - {{ snake_to_pascal_case(task['name']) }}: + {% for step in job_spec['steps'] %} + {{ snake_to_pascal_case(step['name']) }}: Type: AWS::Batch::JobDefinition Properties: Type: container - Parameters: - {% for k, v in job_spec['parameters'].items() %} - {{ k }}: {{ v.get('default') or v['api_schema'].get('default') }} - {% endfor %} ContainerProperties: - Image: {% if 'image_tag' in task -%} - "{{ task['image'] }}:{{ task['image_tag'] }}" + Image: {% if 'image_tag' in step -%} + "{{ step['image'] }}:{{ step['image_tag'] }}" {% else -%} - !Sub "{{ task['image'] }}:${ImageTag}" + !Sub "{{ step['image'] }}:${ImageTag}" {% endif %} JobRoleArn: !Ref TaskRoleArn ExecutionRoleArn: !GetAtt ExecutionRole.Arn ResourceRequirements: - Type: VCPU - Value: "{{ task['vcpu'] }}" + Value: "{{ step['vcpu'] }}" - Type: MEMORY - Value: "{{ task['memory'] }}" - {% if 'gpu' in task %} + Value: "{{ step['memory'] }}" + {% if 'gpu' in step %} - Type: GPU - Value: "{{ task['gpu'] }}" + Value: "{{ step['gpu'] }}" {% endif %} Command: - {% for command in task['command'] %} + {% for command in step['command'] %} - {{ command }} {% endfor %} - {% if task.get('secrets') %} + {% if step.get('secrets') %} Secrets: - {% for secret in task['secrets'] %} + {% for secret in step['secrets'] %} - Name: {{ secret }} ValueFrom: !Sub "${SecretArn}:{{ secret }}::" {% endfor %} {% endif %} Timeout: - AttemptDurationSeconds: {{ task['timeout'] }} + AttemptDurationSeconds: {{ step['timeout'] }} {% endfor %} {% endfor %} @@ -93,13 +89,12 @@ Resources: DefinitionS3Location: step-function.json DefinitionSubstitutions: JobQueueArn: !Ref JobQueueArn - {% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %} - {% set name = job_spec['compute_environment']['name'] %} + {% for name in compute_envs %} {{ name }}JobQueueArn: !Ref {{ name }}JobQueueArn {% endfor %} {% for job_type, job_spec in job_types.items() %} - {% for task in job_spec['tasks'] %} - {{ snake_to_pascal_case(task['name']) }}: !Ref {{ snake_to_pascal_case(task['name']) }} + {% for step in job_spec['steps'] %} + {{ snake_to_pascal_case(step['name']) }}: !Ref {{ snake_to_pascal_case(step['name']) }} {% endfor %} {% endfor %} UpdateDBLambdaArn: !GetAtt UpdateDB.Outputs.LambdaArn @@ -139,12 +134,12 @@ Resources: Action: batch:SubmitJob Resource: - !Ref JobQueueArn - {% for job_type, job_spec in job_types.items() if 'Default' != job_spec['compute_environment']['name'] %} - - !Ref {{ job_spec['compute_environment']['name'] }}JobQueueArn + {% for name in compute_envs %} + - !Ref {{ name }}JobQueueArn {% endfor %} {% for job_type, job_spec in job_types.items() %} - {% for task in job_spec['tasks'] %} - - !Ref {{ snake_to_pascal_case(task['name']) }} + {% for step in job_spec['steps'] %} + - !Ref {{ snake_to_pascal_case(step['name']) }} {% endfor %} {% endfor %} - Effect: Allow diff --git a/job_spec/ARIA_AUTORIFT.yml b/job_spec/ARIA_AUTORIFT.yml index 5d92c9cce..39adeb7a9 100644 --- a/job_spec/ARIA_AUTORIFT.yml +++ b/job_spec/ARIA_AUTORIFT.yml @@ -3,7 +3,6 @@ AUTORIFT: - granules parameters: granules: - default: '""' api_schema: type: array minItems: 2 @@ -31,8 +30,6 @@ AUTORIFT: minLength: 40 maxLength: 40 example: LC08_L1GT_118112_20210107_20210107_02_T2 - bucket_prefix: - default: '""' parameter_file: api_schema: description: Shapefile for determining the correct search parameters by geographic location. Path to shapefile must be understood by GDAL. @@ -42,10 +39,7 @@ AUTORIFT: DEFAULT: cost: 1.0 validators: [] - compute_environment: - name: 'AriaAutorift' - instance_types: r6id.xlarge,r6id.2xlarge,r6id.4xlarge,r6id.8xlarge,r6idn.xlarge,r6idn.2xlarge,r6idn.4xlarge,r6idn.8xlarge - tasks: + steps: - name: '' image: ghcr.io/asfhyp3/hyp3-autorift command: @@ -61,6 +55,7 @@ AUTORIFT: - ITS_LIVE_OD - Ref::granules timeout: 10800 + compute_environment: AriaAutorift vcpu: 1 memory: 31500 secrets: diff --git a/job_spec/ARIA_RAIDER.yml b/job_spec/ARIA_RAIDER.yml index 840a67154..fc41b41ea 100644 --- a/job_spec/ARIA_RAIDER.yml +++ b/job_spec/ARIA_RAIDER.yml @@ -19,15 +19,11 @@ ARIA_RAIDER: - GMAO - HRES - HRRR - bucket_prefix: - default: '""' cost_profiles: DEFAULT: cost: 1.0 validators: [] - compute_environment: - name: 'Default' - tasks: + steps: - name: '' image: ghcr.io/dbekaert/raider command: @@ -42,6 +38,7 @@ ARIA_RAIDER: - --input-bucket-prefix - Ref::job_id timeout: 10800 + compute_environment: Default vcpu: 1 memory: 7500 secrets: diff --git a/job_spec/AUTORIFT.yml b/job_spec/AUTORIFT.yml index e4bc9ef39..0943fd578 100644 --- a/job_spec/AUTORIFT.yml +++ b/job_spec/AUTORIFT.yml @@ -3,7 +3,6 @@ AUTORIFT: - granules parameters: granules: - default: '""' api_schema: type: array minItems: 2 @@ -31,17 +30,13 @@ AUTORIFT: minLength: 40 maxLength: 40 example: LC08_L1GT_118112_20210107_20210107_02_T2 - bucket_prefix: - default: '""' cost_profiles: EDC: cost: 25.0 DEFAULT: cost: 1.0 validators: [] - compute_environment: - name: 'Default' - tasks: + steps: - name: '' image: ghcr.io/asfhyp3/hyp3-autorift command: @@ -57,6 +52,7 @@ AUTORIFT: - ITS_LIVE_OD - Ref::granules timeout: 10800 + compute_environment: Default vcpu: 1 memory: 31500 secrets: diff --git a/job_spec/AUTORIFT_ITS_LIVE.yml b/job_spec/AUTORIFT_ITS_LIVE.yml index 2a9840d28..698d53ee3 100644 --- a/job_spec/AUTORIFT_ITS_LIVE.yml +++ b/job_spec/AUTORIFT_ITS_LIVE.yml @@ -3,7 +3,6 @@ AUTORIFT: - granules parameters: granules: - default: '""' api_schema: type: array minItems: 2 @@ -31,8 +30,6 @@ AUTORIFT: minLength: 40 maxLength: 40 example: LC08_L1GT_118112_20210107_20210107_02_T2 - bucket_prefix: - default: '""' parameter_file: api_schema: description: Shapefile for determining the correct search parameters by geographic location. Path to shapefile must be understood by GDAL. @@ -51,9 +48,7 @@ AUTORIFT: DEFAULT: cost: 1.0 validators: [] - compute_environment: - name: 'Default' - tasks: + steps: - name: '' image: ghcr.io/asfhyp3/hyp3-autorift command: @@ -71,6 +66,7 @@ AUTORIFT: - ITS_LIVE_PROD - Ref::granules timeout: 10800 + compute_environment: Default vcpu: 1 memory: 31500 secrets: diff --git a/job_spec/INSAR_GAMMA.yml b/job_spec/INSAR_GAMMA.yml index 2d376bf8d..7a6473bac 100644 --- a/job_spec/INSAR_GAMMA.yml +++ b/job_spec/INSAR_GAMMA.yml @@ -3,7 +3,6 @@ INSAR_GAMMA: - granules parameters: granules: - default: '""' api_schema: type: array minItems: 2 @@ -18,8 +17,6 @@ INSAR_GAMMA: minLength: 67 maxLength: 67 example: S1A_IW_SLC__1SSV_20150621T120220_20150621T120232_006471_008934_72D8 - bucket_prefix: - default: '""' include_look_vectors: api_schema: description: Include the look vector theta and phi files in the product package @@ -82,9 +79,7 @@ INSAR_GAMMA: cost: 1.0 validators: - check_dem_coverage - compute_environment: - name: 'Default' - tasks: + steps: - name: '' image: 845172464411.dkr.ecr.us-west-2.amazonaws.com/hyp3-gamma command: @@ -116,6 +111,7 @@ INSAR_GAMMA: - Ref::phase_filter_parameter - Ref::granules timeout: 10800 + compute_environment: Default vcpu: 1 memory: 31500 secrets: diff --git a/job_spec/INSAR_ISCE.yml b/job_spec/INSAR_ISCE.yml index 75b62104c..7c9ee8a5c 100644 --- a/job_spec/INSAR_ISCE.yml +++ b/job_spec/INSAR_ISCE.yml @@ -5,7 +5,6 @@ INSAR_ISCE: - frame_id parameters: granules: - default: '""' api_schema: type: array minItems: 1 @@ -17,7 +16,6 @@ INSAR_ISCE: maxLength: 67 example: S1B_IW_SLC__1SDV_20210723T014947_20210723T015014_027915_0354B4_B3A9 secondary_granules: - default: '""' api_schema: type: array minItems: 1 @@ -86,17 +84,11 @@ INSAR_ISCE: - GMAO - HRES - HRRR - bucket_prefix: - default: '""' cost_profiles: DEFAULT: cost: 1.0 validators: [] - compute_environment: - name: 'InsarIsceAria' - allocation_type: EC2 - allocation_strategy: BEST_FIT_PROGRESSIVE - tasks: + steps: - name: '' image: ghcr.io/access-cloud-based-insar/dockerizedtopsapp command: @@ -127,6 +119,7 @@ INSAR_ISCE: - --unfiltered-coherence - Ref::unfiltered_coherence timeout: 21600 + compute_environment: InsarIsceAria vcpu: 1 memory: 15500 secrets: @@ -146,6 +139,7 @@ INSAR_ISCE: - --weather-model - Ref::weather_model timeout: 10800 + compute_environment: InsarIsceAria vcpu: 1 memory: 7500 secrets: diff --git a/job_spec/INSAR_ISCE_BURST.yml b/job_spec/INSAR_ISCE_BURST.yml index d741b5aa4..77c8b254d 100644 --- a/job_spec/INSAR_ISCE_BURST.yml +++ b/job_spec/INSAR_ISCE_BURST.yml @@ -3,7 +3,6 @@ INSAR_ISCE_BURST: - granules parameters: granules: - default: '""' api_schema: type: array minItems: 2 @@ -18,8 +17,6 @@ INSAR_ISCE_BURST: minLength: 43 maxLength: 43 example: S1_136231_IW2_20200604T022312_VV_7C85-BURST - bucket_prefix: - default: '""' apply_water_mask: api_schema: description: Sets pixels over coastal and large inland waterbodies as invalid for phase unwrapping. @@ -44,9 +41,7 @@ INSAR_ISCE_BURST: - check_valid_polarizations - check_same_burst_ids - check_not_antimeridian - compute_environment: - name: 'Default' - tasks: + steps: - name: '' image: ghcr.io/asfhyp3/hyp3-isce2 command: @@ -64,6 +59,7 @@ INSAR_ISCE_BURST: - Ref::looks - Ref::granules timeout: 5400 + compute_environment: Default vcpu: 1 memory: 7600 secrets: diff --git a/job_spec/INSAR_ISCE_MULTI_BURST.yml b/job_spec/INSAR_ISCE_MULTI_BURST.yml index e6b3a89e5..0ce01289c 100644 --- a/job_spec/INSAR_ISCE_MULTI_BURST.yml +++ b/job_spec/INSAR_ISCE_MULTI_BURST.yml @@ -4,7 +4,6 @@ INSAR_ISCE_MULTI_BURST: - secondary parameters: reference: - default: '""' api_schema: type: array minItems: 1 @@ -20,7 +19,6 @@ INSAR_ISCE_MULTI_BURST: maxLength: 43 example: S1_136231_IW2_20200604T022312_VV_7C85-BURST secondary: - default: '""' api_schema: type: array minItems: 1 @@ -34,8 +32,6 @@ INSAR_ISCE_MULTI_BURST: minLength: 43 maxLength: 43 example: S1_136231_IW2_20200616T022313_VV_5D11-BURST - bucket_prefix: - default: '""' apply_water_mask: api_schema: description: Sets pixels over coastal and large inland waterbodies as invalid for phase unwrapping. @@ -60,9 +56,7 @@ INSAR_ISCE_MULTI_BURST: - check_valid_polarizations - check_same_burst_ids - check_not_antimeridian - compute_environment: - name: 'Default' - tasks: + steps: - name: '' image: ghcr.io/asfhyp3/hyp3-isce2 command: @@ -81,6 +75,7 @@ INSAR_ISCE_MULTI_BURST: - --secondary - Ref::secondary timeout: 126000 # 35 hours + compute_environment: Default vcpu: 1 memory: 4 # Memory is always overridden by the step function secrets: diff --git a/job_spec/RTC_GAMMA.yml b/job_spec/RTC_GAMMA.yml index 393814074..36277f549 100644 --- a/job_spec/RTC_GAMMA.yml +++ b/job_spec/RTC_GAMMA.yml @@ -3,7 +3,6 @@ RTC_GAMMA: - granules parameters: granules: - default: '""' api_schema: type: array minItems: 1 @@ -24,8 +23,6 @@ RTC_GAMMA: minLength: 67 maxLength: 67 example: S1A_IW_SLC__1SSV_20150621T120220_20150621T120232_006471_008934_72D8 - bucket_prefix: - default: '""' resolution: api_schema: default: 30.0 @@ -103,9 +100,7 @@ RTC_GAMMA: cost: 1.0 validators: - check_dem_coverage - compute_environment: - name: 'Default' - tasks: + steps: - name: '' image: 845172464411.dkr.ecr.us-west-2.amazonaws.com/hyp3-gamma command: @@ -139,6 +134,7 @@ RTC_GAMMA: - Ref::include_rgb - Ref::granules timeout: 36000 + compute_environment: Default vcpu: 1 memory: 31500 secrets: diff --git a/job_spec/S1_CORRECTION_ITS_LIVE.yml b/job_spec/S1_CORRECTION_ITS_LIVE.yml index 8de729ce6..ccddb38d9 100644 --- a/job_spec/S1_CORRECTION_ITS_LIVE.yml +++ b/job_spec/S1_CORRECTION_ITS_LIVE.yml @@ -3,7 +3,6 @@ S1_CORRECTION_TEST: - granules parameters: granules: - default: '""' api_schema: type: array minItems: 1 @@ -23,15 +22,11 @@ S1_CORRECTION_TEST: default: 0 type: integer minimum: 0 - bucket_prefix: - default: '""' cost_profiles: DEFAULT: cost: 1.0 validators: [] - compute_environment: - name: 'Default' - tasks: + steps: - name: '' image: ghcr.io/asfhyp3/hyp3-autorift command: @@ -49,6 +44,7 @@ S1_CORRECTION_TEST: - '/vsicurl/http://its-live-data.s3.amazonaws.com/autorift_parameters/v001/autorift_landice_0120m.shp' - Ref::granules timeout: 10800 + compute_environment: Default vcpu: 1 memory: 15750 secrets: diff --git a/job_spec/SRG_GSLC.yml b/job_spec/SRG_GSLC.yml index 21154e0e8..92421982e 100644 --- a/job_spec/SRG_GSLC.yml +++ b/job_spec/SRG_GSLC.yml @@ -3,7 +3,6 @@ SRG_GSLC: - granules parameters: granules: - default: '""' api_schema: type: array minItems: 1 @@ -18,12 +17,10 @@ SRG_GSLC: minLength: 67 maxLength: 67 example: S1A_IW_RAW__0SDV_20231229T134404_20231229T134436_051870_064437_5F38 - bucket_prefix: - default: '""' bounds: - default: '""' api_schema: type: array + description: Bounds for extent of processing, formatted like [min lon, min lat, max lon, max lat] in EPSG:4326. Setting to [0, 0, 0, 0] will use the extent of the first granule. default: [0.0, 0.0, 0.0, 0.0] minItems: 4 maxItems: 4 @@ -33,22 +30,19 @@ SRG_GSLC: - -113.209 - 38.138 items: - description: min lon, min lat, max lon, max lat in EPSG:4326 type: number example: -116.583 - validators: [] + validators: [ + check_bounds_formatting, + check_granules_intersecting_bounds, + check_same_relative_orbits + ] cost_profiles: DEFAULT: cost: 1.0 - compute_environment: - name: SrgGslc - instance_types: g6.2xlarge - # Image ID for: /aws/service/ecs/optimized-ami/amazon-linux-2/gpu/recommended/image_id - ami_id: ami-0729c079aae647cb3 - tasks: + steps: - name: '' image: ghcr.io/asfhyp3/hyp3-srg - image_tag: latest.gpu command: - ++process - back_projection @@ -61,6 +55,7 @@ SRG_GSLC: - Ref::bucket_prefix - Ref::granules timeout: 10800 + compute_environment: SrgGslc vcpu: 1 gpu: 1 memory: 30500 diff --git a/job_spec/SRG_TIME_SERIES.yml b/job_spec/SRG_TIME_SERIES.yml new file mode 100644 index 000000000..f073a338c --- /dev/null +++ b/job_spec/SRG_TIME_SERIES.yml @@ -0,0 +1,83 @@ +SRG_TIME_SERIES: + required_parameters: + - granules + parameters: + granules: + api_schema: + type: array + minItems: 1 + maxItems: 600 + example: + - S1A_IW_RAW__0SDV_20240629T020812_20240629T020845_054532_06A2F8_8276 + - S1A_IW_RAW__0SDV_20240723T020812_20240723T020844_054882_06AF26_2CE5 + - S1A_IW_RAW__0SDV_20240804T020812_20240804T020844_055057_06B527_1346 + items: + description: Name of the Level-0 Sentinel-1 scenes to process + type: string + pattern: "^S1[AB]_IW_RAW" + minLength: 67 + maxLength: 67 + example: S1A_IW_RAW__0SDV_20240629T020812_20240629T020845_054532_06A2F8_8276 + bounds: + api_schema: + type: array + description: Bounds for extent of processing, formatted like [min lon, min lat, max lon, max lat] in EPSG:4326. Setting to [0, 0, 0, 0] will use the extent of the first granule. + default: [0.0, 0.0, 0.0, 0.0] + minItems: 4 + maxItems: 4 + example: + - -124.41473278572731 + - 37.098700238673814 + - -120.9825007499895 + - 39.52359974376425 + items: + type: number + example: -124.41473278572731 + validators: [ + check_bounds_formatting, + check_granules_intersecting_bounds, + check_same_relative_orbits + ] + cost_profiles: + DEFAULT: + cost: 1.0 + steps: + - name: BACK_PROJECTION + map: for granule in granules + image: ghcr.io/asfhyp3/hyp3-srg + command: + - ++process + - back_projection + - --bounds + - Ref::bounds + - --gpu + - --bucket + - '!Ref Bucket' + - --bucket-prefix + - Ref::bucket_prefix + - --use-gslc-prefix + - Ref::granule + timeout: 10800 + compute_environment: SrgGslc + vcpu: 1 + gpu: 1 + memory: 30500 + secrets: + - EARTHDATA_USERNAME + - EARTHDATA_PASSWORD + - name: '' + image: ghcr.io/asfhyp3/hyp3-srg + command: + - ++process + - time_series + - --bounds + - Ref::bounds + - --bucket + - '!Ref Bucket' + - --bucket-prefix + - Ref::bucket_prefix + - --use-gslc-prefix + timeout: 21600 # 6 hr + compute_environment: Default + vcpu: 1 + memory: 30500 diff --git a/job_spec/WATER_MAP.yml b/job_spec/WATER_MAP.yml index 89105c97a..70ac8c352 100644 --- a/job_spec/WATER_MAP.yml +++ b/job_spec/WATER_MAP.yml @@ -3,7 +3,6 @@ WATER_MAP: - granules parameters: granules: - default: '""' api_schema: type: array minItems: 1 @@ -22,8 +21,6 @@ WATER_MAP: minLength: 67 maxLength: 67 example: S1A_IW_SLC__1SDV_20211110T234815_20211110T234842_040516_04CE0A_E717 - bucket_prefix: - default: '""' resolution: api_schema: default: 30.0 @@ -116,9 +113,7 @@ WATER_MAP: cost: 1.0 validators: - check_dem_coverage - compute_environment: - name: 'Default' - tasks: + steps: - name: RTC image: 845172464411.dkr.ecr.us-west-2.amazonaws.com/hyp3-gamma command: @@ -152,6 +147,7 @@ WATER_MAP: - 'copernicus' - Ref::granules timeout: 36000 + compute_environment: Default vcpu: 1 memory: 31500 secrets: @@ -177,6 +173,7 @@ WATER_MAP: - --membership-threshold - Ref::membership_threshold timeout: 36000 + compute_environment: Default vcpu: 1 memory: 126000 - name: FLOOD_MAP @@ -203,5 +200,6 @@ WATER_MAP: - --minimization-metric - Ref::minimization_metric timeout: 86400 + compute_environment: Default vcpu: 1 memory: 126000 diff --git a/job_spec/WATER_MAP_EQ.yml b/job_spec/WATER_MAP_EQ.yml index d4a16d23f..deeb575ee 100644 --- a/job_spec/WATER_MAP_EQ.yml +++ b/job_spec/WATER_MAP_EQ.yml @@ -3,7 +3,6 @@ WATER_MAP_EQ: - granules parameters: granules: - default: '""' api_schema: type: array minItems: 1 @@ -22,8 +21,6 @@ WATER_MAP_EQ: minLength: 67 maxLength: 67 example: S1A_IW_SLC__1SDV_20211110T234815_20211110T234842_040516_04CE0A_E717 - bucket_prefix: - default: '""' resolution: api_schema: default: 30.0 @@ -58,9 +55,7 @@ WATER_MAP_EQ: cost: 1.0 validators: - check_dem_coverage - compute_environment: - name: 'Default' - tasks: + steps: - name: RTC image: 845172464411.dkr.ecr.us-west-2.amazonaws.com/hyp3-gamma command: @@ -94,6 +89,7 @@ WATER_MAP_EQ: - 'copernicus' - Ref::granules timeout: 36000 + compute_environment: Default vcpu: 1 memory: 31500 secrets: @@ -113,5 +109,6 @@ WATER_MAP_EQ: - --hand-fraction - Ref::hand_fraction timeout: 36000 + compute_environment: Default vcpu: 1 memory: 126000 diff --git a/job_spec/config/compute_environments.yml b/job_spec/config/compute_environments.yml new file mode 100644 index 000000000..01de096b7 --- /dev/null +++ b/job_spec/config/compute_environments.yml @@ -0,0 +1,14 @@ +compute_environments: + # Supported fields: + # instance_types + # ami_id + # allocation_type + # allocation_strategy + SrgGslc: + instance_types: g6.2xlarge + ami_id: ami-0729c079aae647cb3 # /aws/service/ecs/optimized-ami/amazon-linux-2/gpu/recommended/image_id + AriaAutorift: + instance_types: r6id.xlarge,r6id.2xlarge,r6id.4xlarge,r6id.8xlarge,r6idn.xlarge,r6idn.2xlarge,r6idn.4xlarge,r6idn.8xlarge + InsarIsceAria: + allocation_type: EC2 + allocation_strategy: BEST_FIT_PROGRESSIVE diff --git a/requirements-all.txt b/requirements-all.txt index c34af36e6..2935a6425 100644 --- a/requirements-all.txt +++ b/requirements-all.txt @@ -5,9 +5,9 @@ -r requirements-apps-start-execution-worker.txt -r requirements-apps-disable-private-dns.txt -r requirements-apps-update-db.txt -boto3==1.35.34 +boto3==1.35.44 jinja2==3.1.4 -moto[dynamodb]==5.0.16 +moto[dynamodb]==5.0.18 pytest==8.3.3 PyYAML==6.0.2 responses==0.25.3 @@ -15,6 +15,6 @@ flake8==7.1.1 flake8-import-order==0.18.2 flake8-blind-except==0.2.1 flake8-builtins==2.5.0 -setuptools==75.1.0 +setuptools==75.2.0 openapi-spec-validator==0.7.1 -cfn-lint==1.16.0 +cfn-lint==1.18.1 diff --git a/requirements-apps-api-binary.txt b/requirements-apps-api-binary.txt index ccb822680..75287dd9f 100644 --- a/requirements-apps-api-binary.txt +++ b/requirements-apps-api-binary.txt @@ -1 +1 @@ -cryptography==43.0.1 +cryptography==43.0.3 diff --git a/requirements-apps-disable-private-dns.txt b/requirements-apps-disable-private-dns.txt index 591888472..a54ad61db 100644 --- a/requirements-apps-disable-private-dns.txt +++ b/requirements-apps-disable-private-dns.txt @@ -1 +1 @@ -boto3==1.35.34 +boto3==1.35.44 diff --git a/requirements-apps-start-execution-manager.txt b/requirements-apps-start-execution-manager.txt index 3a73b98e4..d5e084a91 100644 --- a/requirements-apps-start-execution-manager.txt +++ b/requirements-apps-start-execution-manager.txt @@ -1,3 +1,3 @@ -boto3==1.35.34 +boto3==1.35.44 ./lib/dynamo/ ./lib/lambda_logging/ diff --git a/requirements-apps-start-execution-worker.txt b/requirements-apps-start-execution-worker.txt index ad5be7290..0b09d31f4 100644 --- a/requirements-apps-start-execution-worker.txt +++ b/requirements-apps-start-execution-worker.txt @@ -1,2 +1,2 @@ -boto3==1.35.34 +boto3==1.35.44 ./lib/lambda_logging/ diff --git a/tests/test_api/test_validation.py b/tests/test_api/test_validation.py index 67ccee8d2..01912dda3 100644 --- a/tests/test_api/test_validation.py +++ b/tests/test_api/test_validation.py @@ -483,3 +483,73 @@ def test_validate_jobs(): ] with raises(validation.GranuleValidationError): validation.validate_jobs(jobs) + + +def test_check_bounds_formatting(): + valid_jobs = [ + {'job_parameters': {'bounds': [-10, 0, 10, 10]}}, + {'job_parameters': {'bounds': [-180, -90, -170, -80]}}, + {'job_parameters': {'bounds': [170, 75, 180, 90]}}, + {'job_parameters': {'bounds': [0, 0, 0, 0]}} + ] + invalid_jobs_bad_order = [ + {'job_parameters': {'bounds': [10, 0, -10, 10]}}, + {'job_parameters': {'bounds': [-10, 10, 10, 0]}}, + {'job_parameters': {'bounds': [10, 0, 10, 10]}}, + {'job_parameters': {'bounds': [-10, 0, 10, 0]}}, + ] + invalid_jobs_bad_values = [ + {'job_parameters': {'bounds': [-10, 0, 10, 100]}}, + {'job_parameters': {'bounds': [-200, 0, 10, 10]}}, + {'job_parameters': {'bounds': [-10, -100, 10, 80]}}, + {'job_parameters': {'bounds': [-100, 0, 200, 10]}}, + ] + for valid_job in valid_jobs: + validation.check_bounds_formatting(valid_job, {}) + for invalid_job in invalid_jobs_bad_order: + with raises(validation.BoundsValidationError, match=r'.*Invalid order for bounds.*'): + validation.check_bounds_formatting(invalid_job, {}) + for invalid_job in invalid_jobs_bad_values: + with raises(validation.BoundsValidationError, match=r'.*Invalid lon/lat value(s)*'): + validation.check_bounds_formatting(invalid_job, {}) + + +def test_check_granules_intersecting_bounds(): + job_with_specified_bounds = {'job_parameters': {'bounds': [-10, 0, 10, 10]}} + job_with_default_bounds = {'job_parameters': {'bounds': [0, 0, 0, 0]}} + valid_granule_metadata = [ + {'name': 'intersects1', 'polygon': Polygon.from_bounds(-10.0, 0.0, 10.0, 10.0)}, + {'name': 'intersects2', 'polygon': Polygon.from_bounds(-9.0, -1.0, 20.0, 11.0)}, + {'name': 'intersects3', 'polygon': Polygon.from_bounds(0.0, 5.0, 15.0, 15.0)} + ] + invalid_granule_metadata = [ + {'name': 'intersects1', 'polygon': Polygon.from_bounds(-10.0, 0.0, 10.0, 10.0)}, + {'name': 'does_not_intersect1', 'polygon': Polygon.from_bounds(10.1, -10, 20.0, -0.1)}, + {'name': 'intersects2', 'polygon': Polygon.from_bounds(-9.0, -1.0, 20.0, 11.0)}, + {'name': 'does_not_intersect2', 'polygon': Polygon.from_bounds(-80.0, 20.0, -60.0, 90.0)}, + {'name': 'does_not_intersect3', 'polygon': Polygon.from_bounds(100.0, -50.0, 120.0, -0.1)}, + ] + validation.check_granules_intersecting_bounds(job_with_specified_bounds, valid_granule_metadata) + validation.check_granules_intersecting_bounds(job_with_default_bounds, valid_granule_metadata) + error_pattern = r".*bounds: \['does_not_intersect1', 'does_not_intersect2', 'does_not_intersect3'\]*" + with raises(validation.GranuleValidationError, match=error_pattern): + validation.check_granules_intersecting_bounds(job_with_specified_bounds, invalid_granule_metadata) + with raises(validation.GranuleValidationError, match=error_pattern): + validation.check_granules_intersecting_bounds(job_with_default_bounds, invalid_granule_metadata) + + +def test_check_same_relative_orbits(): + valid_granule_metadata = [ + {'name': 'S1A_IW_RAW__0SDV_20201015T161622_20201015T161654_034809_040E95_AF3C'}, + {'name': 'S1A_IW_RAW__0SDV_20200816T161620_20200816T161652_033934_03EFCE_5730'}, + {'name': 'S1B_IW_RAW__0SDV_20200810T161537_20200810T161610_022863_02B66A_F7D7'}, + {'name': 'S1B_IW_RAW__0SDV_20200623T161535_20200623T161607_022163_02A10F_7FD6'} + ] + invalid_granule_metadata = valid_granule_metadata.copy() + invalid_granule_metadata.append( + {'name': 'S1B_IW_RAW__0SDV_20200623T161535_20200623T161607_012345_02A10F_7FD6'} + ) + validation.check_same_relative_orbits({}, valid_granule_metadata) + error_pattern = r'.*69 is not 87.*' + with raises(validation.GranuleValidationError, match=error_pattern): + validation.check_same_relative_orbits({}, invalid_granule_metadata) diff --git a/tests/test_check_processing_time.py b/tests/test_check_processing_time.py index 3c2f7b662..9ad6d1e90 100644 --- a/tests/test_check_processing_time.py +++ b/tests/test_check_processing_time.py @@ -41,14 +41,26 @@ def test_no_attempts(): def test_get_time_from_result(): result = { - 'Attempts': [ - {'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000}, - {'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700} - ] + 'start': [500, 3000], + 'stop': [1000, 8700], } assert check_processing_time.get_time_from_result(result) == 5.7 +def test_get_time_from_result_list(): + result = [ + { + 'start': [500, 3000], + 'stop': [1000, 8900], + }, + { + 'start': [500, 4000], + 'stop': [3000, 4200], + }, + ] + assert check_processing_time.get_time_from_result(result) == [5.9, 0.2] + + def test_get_time_from_result_failed(): result = { 'Error': 'States.TaskFailed', @@ -64,10 +76,8 @@ def test_lambda_handler(): event = { 'processing_results': { 'step_0': { - 'Attempts': [ - {'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000}, - {'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700} - ] + 'start': [500, 3000], + 'stop': [1000, 8700], }, 'step_1': { 'Error': 'States.TaskFailed', @@ -76,6 +86,16 @@ def test_lambda_handler(): '{"Container": {}, "StartedAt": 1500, "StatusReason": "", "StoppedAt": 2000}, ' '{"Container": {}, "StartedAt": 3000, "StatusReason": "", "StoppedAt": 9400}]}' }, + 'step_2': [ + { + 'start': [500, 3000], + 'stop': [1000, 8900], + }, + { + 'start': [500, 4000], + 'stop': [3000, 4200], + }, + ] } } - assert check_processing_time.lambda_handler(event, None) == [5.7, 6.4] + assert check_processing_time.lambda_handler(event, None) == [5.7, 6.4, [5.9, 0.2]] diff --git a/tests/test_render_cf.py b/tests/test_render_cf.py new file mode 100644 index 000000000..ecb27bacd --- /dev/null +++ b/tests/test_render_cf.py @@ -0,0 +1,104 @@ +import pytest +import render_cf +import yaml + + +def test_parse_map_statement(): + assert render_cf.parse_map_statement('for item in items') == ('item', 'items') + assert render_cf.parse_map_statement('for foo in bar') == ('foo', 'bar') + + with pytest.raises(ValueError, match='expected 4 tokens in map statement but got 3: item in items'): + render_cf.parse_map_statement('item in items') + + with pytest.raises(ValueError, match='expected 4 tokens in map statement but got 5: for for item in items'): + render_cf.parse_map_statement('for for item in items') + + with pytest.raises(ValueError, match="expected 'for', got 'fr': fr item in items"): + render_cf.parse_map_statement('fr item in items') + + with pytest.raises(ValueError, match="expected 'in', got 'ib': for item ib items"): + render_cf.parse_map_statement('for item ib items') + + +def test_get_batch_job_parameters(): + job_spec = {'parameters': {'param1': {}, 'param2': {}, 'param3': {}, 'param4': {}}} + + step = {'command': ['foo', 'Ref::param2', 'Ref::param3', 'bar', 'Ref::bucket_prefix']} + assert render_cf.get_batch_job_parameters(job_spec, step) == { + 'param2.$': '$.batch_job_parameters.param2', + 'param3.$': '$.batch_job_parameters.param3', + 'bucket_prefix.$': '$.batch_job_parameters.bucket_prefix', + } + + step = {'command': ['foo', 'Ref::param2', 'Ref::param3', 'bar', 'Ref::param5']} + assert render_cf.get_batch_job_parameters(job_spec, step, map_item='param5') == { + 'param2.$': '$.batch_job_parameters.param2', + 'param3.$': '$.batch_job_parameters.param3', + 'param5.$': '$$.Map.Item.Value', + } + + step = {'command': ['foo', 'Ref::param2', 'Ref::param3', 'bar', 'Ref::param5']} + with pytest.raises(ValueError, match="job parameter 'param5' has not been defined"): + render_cf.get_batch_job_parameters(job_spec, step) + + +def test_get_batch_param_names_for_job_step(): + step = {'command': ['param1', 'Ref::param2', 'Ref::param3', 'Ref::param2', 'param4', 'Ref::param5']} + assert render_cf.get_batch_param_names_for_job_step(step) == {'param2', 'param3', 'param5'} + + +def test_get_compute_environments(tmp_path): + job_types = { + 'FOO': { + 'steps': [ + {'compute_environment': 'ComputeEnvironment1'}, + {'compute_environment': 'Default'}, + ], + }, + 'BAR': { + 'steps': [ + {'compute_environment': 'ComputeEnvironment2'}, + ], + }, + 'BAZ': { + 'steps': [ + {'compute_environment': 'ComputeEnvironment1'}, + {'compute_environment': 'ComputeEnvironment2'}, + ], + }, + } + compute_env_file_contents = { + 'compute_environments': { + 'ComputeEnvironment1': {'key1': 'value1'}, + 'ComputeEnvironment2': {'key2': 'value2'}, + 'ComputeEnvironment3': {'key3': 'value3'}, + } + } + expected_compute_envs = { + 'ComputeEnvironment1': {'key1': 'value1'}, + 'ComputeEnvironment2': {'key2': 'value2'}, + } + compute_env_file = tmp_path / 'compute_environments.yml' + yaml.dump(compute_env_file_contents, open(compute_env_file, 'w')) + assert render_cf.get_compute_environments_for_deployment(job_types, compute_env_file) == expected_compute_envs + + compute_env_file_contents = { + 'compute_environments': { + 'ComputeEnvironment1': {'key1': 'value1'}, + 'ComputeEnvironment2': {'key2': 'value2'}, + 'ComputeEnvironment3': {'key3': 'value3'}, + 'Default': {'key', 'value'}, + } + } + yaml.dump(compute_env_file_contents, open(compute_env_file, 'w')) + with pytest.raises(ValueError, match="'Default' is a reserved compute environment name"): + render_cf.get_compute_environments_for_deployment(job_types, compute_env_file) + + compute_env_file_contents = { + 'compute_environments': { + 'ComputeEnvironment1': {'key1': 'value1'}, + } + } + yaml.dump(compute_env_file_contents, open(compute_env_file, 'w')) + with pytest.raises(KeyError, match='ComputeEnvironment2'): + render_cf.get_compute_environments_for_deployment(job_types, compute_env_file) diff --git a/tests/test_set_batch_overrides.py b/tests/test_set_batch_overrides.py index 375413556..3b9bafefe 100644 --- a/tests/test_set_batch_overrides.py +++ b/tests/test_set_batch_overrides.py @@ -19,7 +19,7 @@ def mock_insar_isce_burst_job(looks: str, bursts: int) -> dict: 'job_type': 'INSAR_ISCE_MULTI_BURST', 'job_parameters': { 'looks': looks, - 'reference': ' '.join('foo' for _ in range(bursts)), + 'reference': ['foo' for _ in range(bursts)], } } @@ -154,7 +154,7 @@ def test_set_batch_overrides_autorift_s2(): assert lambda_handler( { 'job_type': 'AUTORIFT', - 'job_parameters': {'granules': 'S2B_'}, + 'job_parameters': {'granules': ['S2B_']}, }, None, ) == { @@ -171,7 +171,7 @@ def test_set_batch_overrides_autorift_landsat(): assert lambda_handler( { 'job_type': 'AUTORIFT', - 'job_parameters': {'granules': 'LC08_'}, + 'job_parameters': {'granules': ['LC08_']}, }, None, ) == { @@ -188,7 +188,7 @@ def test_set_batch_overrides_rtc_gamma_10m(): assert lambda_handler( { 'job_type': 'RTC_GAMMA', - 'job_parameters': {'resolution': '10'}, + 'job_parameters': {'resolution': 10}, }, None, ) == { @@ -202,7 +202,7 @@ def test_set_batch_overrides_rtc_gamma_10m(): assert lambda_handler( { 'job_type': 'RTC_GAMMA', - 'job_parameters': {'resolution': '20'}, + 'job_parameters': {'resolution': 20}, }, None, ) == { @@ -219,7 +219,7 @@ def test_set_batch_overrides_water_map_10m(): assert lambda_handler( { 'job_type': 'WATER_MAP', - 'job_parameters': {'resolution': '10'}, + 'job_parameters': {'resolution': 10}, }, None, ) == { @@ -233,7 +233,7 @@ def test_set_batch_overrides_water_map_10m(): assert lambda_handler( { 'job_type': 'WATER_MAP', - 'job_parameters': {'resolution': '20'}, + 'job_parameters': {'resolution': 20}, }, None, ) == { @@ -247,7 +247,7 @@ def test_set_batch_overrides_water_map_10m(): assert lambda_handler( { 'job_type': 'WATER_MAP_EQ', - 'job_parameters': {'resolution': '10'}, + 'job_parameters': {'resolution': 10}, }, None, ) == { @@ -261,7 +261,7 @@ def test_set_batch_overrides_water_map_10m(): assert lambda_handler( { 'job_type': 'WATER_MAP_EQ', - 'job_parameters': {'resolution': '20'}, + 'job_parameters': {'resolution': 20}, }, None, ) == { diff --git a/tests/test_start_execution_worker.py b/tests/test_start_execution_worker.py index f7ab6a8a4..1a26d469b 100644 --- a/tests/test_start_execution_worker.py +++ b/tests/test_start_execution_worker.py @@ -13,27 +13,17 @@ def test_convert_to_string(): assert start_execution_worker.convert_to_string('abc') == 'abc' -def test_convert_parameters_to_string(): - parameters = { - 'param1': 1, - 'param2': True, - 'param3': [1, 2], - 'param4': ['abc', 'bcd'], - 'param5': 'abc', - } - assert start_execution_worker.convert_parameters_to_strings(parameters) == { - 'param1': '1', - 'param2': 'True', - 'param3': '1 2', - 'param4': 'abc bcd', - 'param5': 'abc', +def test_submit_jobs(): + batch_params_by_job_type = { + 'JOB_0': ['granules', 'string_field', 'boolean_field', 'float_field', 'integer_field'], + 'JOB_1': ['string_field', 'boolean_field'], + 'JOB_2': [], } - -def test_submit_jobs(): jobs = [ { 'job_id': 'job0', + 'job_type': 'JOB_0', 'string_field': 'value1', 'boolean_field': True, 'float_field': 10.1, @@ -51,18 +41,61 @@ def test_submit_jobs(): }, { 'job_id': 'job1', - 'job_parameters': {'granules': ['granule1']}, - } + 'job_type': 'JOB_1', + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + 'job_parameters': { + 'granules': [ + 'granule1', + 'granule2', + ], + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + }, + }, + { + 'job_id': 'job2', + 'job_type': 'JOB_2', + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + 'job_parameters': { + 'granules': [ + 'granule1', + 'granule2', + ], + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + }, + }, ] expected_input_job0 = json.dumps( { 'job_id': 'job0', + 'job_type': 'JOB_0', 'string_field': 'value1', 'boolean_field': True, 'float_field': 10.1, 'integer_field': 10, 'job_parameters': { + 'granules': [ + 'granule1', + 'granule2', + ], + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + }, + 'batch_job_parameters': { 'granules': 'granule1 granule2', 'string_field': 'value1', 'boolean_field': 'True', @@ -76,13 +109,55 @@ def test_submit_jobs(): expected_input_job1 = json.dumps( { 'job_id': 'job1', - 'job_parameters': {'granules': 'granule1'}, + 'job_type': 'JOB_1', + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + 'job_parameters': { + 'granules': [ + 'granule1', + 'granule2', + ], + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + }, + 'batch_job_parameters': { + 'string_field': 'value1', + 'boolean_field': 'True', + }, + }, + sort_keys=True, + ) + + expected_input_job2 = json.dumps( + { + 'job_id': 'job2', + 'job_type': 'JOB_2', + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + 'job_parameters': { + 'granules': [ + 'granule1', + 'granule2', + ], + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + }, + 'batch_job_parameters': {}, }, sort_keys=True, ) with patch('start_execution_worker.STEP_FUNCTION.start_execution') as mock_start_execution, \ - patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-state-machine-arn'}, clear=True): + patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-state-machine-arn'}, clear=True), \ + patch('start_execution_worker.BATCH_PARAMS_BY_JOB_TYPE', batch_params_by_job_type): start_execution_worker.submit_jobs(jobs) assert mock_start_execution.mock_calls == [ @@ -95,7 +170,12 @@ def test_submit_jobs(): stateMachineArn='test-state-machine-arn', input=expected_input_job1, name='job1', - ) + ), + call( + stateMachineArn='test-state-machine-arn', + input=expected_input_job2, + name='job2', + ), ]