Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added lambda layer for workflows API endpoint + RNASum Lambda #692

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions lib/workload/components/python-workflow-tools-layer/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env python3

import { Construct } from 'constructs';
import { PythonLayerVersion } from '@aws-cdk/aws-lambda-python-alpha';
import path from 'path';
import { PythonLambdaLayerConstruct } from '../python-lambda-layer';

export interface PythonWorkflowLambdaLayerConstructProps {
layerPrefix: string;
}

export class WorkflowToolsPythonLambdaLayer extends Construct {
public readonly lambdaLayerVersionObj: PythonLayerVersion;

constructor(scope: Construct, id: string, props: PythonWorkflowLambdaLayerConstructProps) {
super(scope, id);

// Generate lambda workflow python layer
// Get lambda layer object
this.lambdaLayerVersionObj = new PythonLambdaLayerConstruct(this, 'lambda_layer', {
layerName: `${props.layerPrefix}-workflow-py-layer`,
layerDescription: 'Lambda Layer for handling the workflow api via Python',
layerDirectory: path.join(__dirname, 'workflow_tools_layer'),
}).lambdaLayerVersionObj;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "workflow_tools"
version = "0.0.1"
description = "Workflow Manager Lambda Layers"
license = "GPL-3.0-or-later"
authors = [
"Alexis Lucattini"
]
homepage = "https://github.com/umccr/orcabus"
repository = "https://github.com/umccr/orcabus"

[tool.poetry.dependencies]
python = "^3.12, <3.13"
requests = "^2.32.3"

[tool.poetry.group.dev]
optional = true

[tool.poetry.group.dev.dependencies]
pytest = "^7.0.0" # For testing only
# For typehinting only, not required at runtime
mypy-boto3-ssm = "^1.34"
mypy-boto3-secretsmanager = "^1.34"
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/usr/bin/env python3

import re


def strip_context_from_orcabus_id(orcabus_id: str) -> str:
"""
Strip the context from the orcabus_id
:param orcabus_id:
:return:
"""
return re.sub(r"^(\w+).", "", orcabus_id)
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env python3

# Standard imports
import typing
import boto3
import json
from os import environ

# Type hinting
if typing.TYPE_CHECKING:
from mypy_boto3_secretsmanager import SecretsManagerClient
from mypy_boto3_ssm import SSMClient


def get_secretsmanager_client() -> 'SecretsManagerClient':
return boto3.client('secretsmanager')


def get_ssm_client() -> 'SSMClient':
return boto3.client('ssm')


def get_secret_value(secret_id) -> str:
"""
Collect the secret value
:param secret_id:
:return:
"""
# Get the boto3 response
get_secret_value_response = get_secretsmanager_client().get_secret_value(SecretId=secret_id)

return get_secret_value_response['SecretString']


def get_ssm_value(parameter_name) -> str:
# Get the boto3 response
get_ssm_parameter_response = get_ssm_client().get_parameter(Name=parameter_name)

return get_ssm_parameter_response['Parameter']['Value']


def get_orcabus_token() -> str:
"""
From the AWS Secrets Manager, retrieve the OrcaBus token.
:return:
"""
return json.loads(get_secret_value(environ.get("ORCABUS_TOKEN_SECRET_ID")))['id_token']


def get_hostname() -> str:
return get_ssm_value(environ.get("HOSTNAME_SSM_PARAMETER"))
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/usr/bin/env python3

from enum import Enum

# AWS PARAMETERS
WORKFLOW_SUBDOMAIN_NAME = "workflow"

# API ENDPOINTS
WORKFLOW_RUN_ENDPOINT = "api/v1/workflowrun"
PAYLOAD_ENDPOINT = "api/v1/payload"
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env python3

"""
Getting the payload helpers
"""
from typing import Dict

from workflow_tools.utils.globals import PAYLOAD_ENDPOINT
from workflow_tools.utils.requests_helpers import get_request_results


def get_payload(payload_id: str) -> Dict:
"""
Get subject from the subject id
:param contact_id:
:return:
"""
# Get subject
return get_request_results(PAYLOAD_ENDPOINT, payload_id)
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#!/usr/bin/env python3
from typing import Dict, Optional, List, Union
from urllib.parse import urlunparse, urlparse

# Standard imports
import requests
import logging
from copy import deepcopy

from . import strip_context_from_orcabus_id
# Locals
from .globals import (
WORKFLOW_SUBDOMAIN_NAME,
)

from .aws_helpers import (
get_orcabus_token, get_hostname
)

# Globals
DEFAULT_REQUEST_PARAMS = {
"rowsPerPage": 1000
}

# Set logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def url_path_ext(url: str, path: str) -> str:
"""
Append a path to a URL
:param url:
:param path:
:return:
"""
return str(urlunparse(
urlparse(url)._replace(path=f"{urlparse(url).path}/{path}")
))


def get_url(endpoint: str) -> str:
"""
Get the URL for the Metadata endpoint
:param endpoint:
:return:
"""
# Get the hostname
hostname = get_hostname()

return urlunparse(
[
"https",
".".join([WORKFLOW_SUBDOMAIN_NAME, hostname]),
endpoint,
None, None, None
]
)


def get_request_results(endpoint: str, orcabus_id: str) -> Union[List, Dict]:
"""
Run get response against the Metadata endpoint
:param endpoint:
:param params:
:return:
"""
# Get authorization header
headers = {
"Authorization": f"Bearer {get_orcabus_token()}"
}

# Make the request
response = requests.get(
url_path_ext(
get_url(endpoint) if not urlparse(endpoint).scheme else endpoint,
strip_context_from_orcabus_id(orcabus_id)
),
headers=headers,
)

response.raise_for_status()

return response.json()


def get_request_results_ext(endpoint: str, orcabus_id: str, url_extension: str) -> Union[List, Dict]:
"""
Run get response against the Metadata endpoint
:param endpoint:
:param params:
:return:
"""
# Get authorization header
headers = {
"Authorization": f"Bearer {get_orcabus_token()}"
}

req_params = deepcopy(DEFAULT_REQUEST_PARAMS)

# Make the request
response = requests.get(
url_path_ext(
get_url(endpoint) if not urlparse(endpoint).scheme else endpoint,
strip_context_from_orcabus_id(orcabus_id) + "/" + url_extension
),
headers=headers,
params=req_params
)

response.raise_for_status()

return response.json()


def get_request_response_results(endpoint: str, params: Optional[Dict] = None) -> List[Dict]:
"""
Run get response against the Metadata endpoint
:param endpoint:
:param params:
:return:
"""
# Get authorization header
headers = {
"Authorization": f"Bearer {get_orcabus_token()}"
}

req_params = deepcopy(DEFAULT_REQUEST_PARAMS)

req_params.update(
params if params is not None else {}
)


# Make the request
response = requests.get(
get_url(endpoint) if not urlparse(endpoint).scheme else endpoint,
headers=headers,
params=req_params
)

response.raise_for_status()

response_json = response.json()

if 'links' not in response_json.keys():
return [response_json]

if 'next' in response_json['links'].keys() and response_json['links']['next'] is not None:
return response_json['results'] + get_request_response_results(response_json['links']['next'])
return response_json['results']
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python3

"""
Helpers for using the contact API endpoint
"""

# Standard imports
from typing import List, Dict
from .globals import WORKFLOW_RUN_ENDPOINT

# Local imports
from .requests_helpers import get_request_response_results, get_request_results_ext, get_request_results


def get_workflow_run(workflow_run_orcabus_id: str) -> Dict:
"""
Get contact from the contact id
:param contact_orcabus_id:
:return:
"""
# Get contact
return get_request_results(WORKFLOW_RUN_ENDPOINT, workflow_run_orcabus_id)


def get_workflow_run_from_portal_run_id(portal_run_id: str) -> Dict:
"""
Get subject from the subject id
:param contact_id:
:return:
"""
# We have an internal id, convert to int
params = {
"portalRunId": portal_run_id
}

# Get workflow run id
return get_request_results(
WORKFLOW_RUN_ENDPOINT,
get_request_response_results(WORKFLOW_RUN_ENDPOINT, params)[0].get("orcabusId")
)



def get_workflow_run_state(workflow_run_orcabus_id: str, status: str) -> Dict:
"""
Get contact from the contact id
:param contact_orcabus_id:
:return:
"""
# Get contact
return next(
filter(
lambda workflow_state_iter_: workflow_state_iter_["status"] == status,
get_request_results_ext(WORKFLOW_RUN_ENDPOINT, workflow_run_orcabus_id, "state")
)
)


Loading