Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow recovering a normalized version of workflow request state from API #18985

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/framework_workflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ on:
env:
GALAXY_TEST_DBURI: 'postgresql://postgres:postgres@localhost:5432/galaxy?client_encoding=utf8'
GALAXY_TEST_RAISE_EXCEPTION_ON_HISTORYLESS_HDA: '1'
GALAXY_TEST_WORKFLOW_AFTER_RERUN: '1'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
Expand Down
173 changes: 155 additions & 18 deletions client/src/api/schema/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2645,6 +2645,23 @@ export interface paths {
patch?: never;
trace?: never;
};
"/api/invocations/{invocation_id}/request": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
/** Get a description modeling an API request to invoke this workflow - this is recreated and will be more specific in some ways than the initial creation request. */
get: operations["invocation_as_request_api_invocations__invocation_id__request_get"];
put?: never;
post?: never;
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/api/invocations/{invocation_id}/step_jobs_summary": {
parameters: {
query?: never;
Expand Down Expand Up @@ -12385,8 +12402,9 @@ export interface components {
*/
batch: boolean | null;
/**
* Dataset Map
* @description TODO
* Legacy Dataset Map
* @deprecated
* @description An older alternative to specifying inputs using database IDs, do not use this and use inputs instead
* @default {}
*/
ds_map: {
Expand All @@ -12409,12 +12427,12 @@ export interface components {
history_id?: string | null;
/**
* Inputs
* @description TODO
* @description Specify values for formal inputs to the workflow
*/
inputs?: Record<string, never> | null;
/**
* Inputs By
* @description How inputs maps to inputs (datasets/collections) to workflows steps.
* @description How the 'inputs' field maps its inputs (datasets/collections/step parameters) to workflows steps.
*/
inputs_by?: string | null;
/**
Expand All @@ -12441,35 +12459,35 @@ export interface components {
*/
no_add_to_history: boolean | null;
/**
* Parameters
* @description The raw parameters for the workflow invocation.
* Legacy Step Parameters
* @description Parameters specified per-step for the workflow invocation, this is legacy and you should generally use inputs and only specify the formal parameters of a workflow instead.
* @default {}
*/
parameters: Record<string, never> | null;
/**
* Parameters Normalized
* @description Indicates if parameters are already normalized for workflow invocation.
* Legacy Step Parameters Normalized
* @description Indicates if legacy parameters are already normalized to be indexed by the order_index and are specified as a dictionary per step. Legacy-style parameters could previously be specified as one parameter per step or by tool ID.
* @default false
*/
parameters_normalized: boolean | null;
/**
* Preferred Intermediate Object Store ID
* @description The ID of the ? object store that should be used to store ? datasets in this history.
* @description The ID of the object store that should be used to store the intermediate datasets of this workflow - - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences
*/
preferred_intermediate_object_store_id?: string | null;
/**
* Preferred Object Store ID
* @description The ID of the object store that should be used to store new datasets in this history.
* @description The ID of the object store that should be used to store all datasets (can instead specify object store IDs for intermediate and outputs datasts separately) - - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences
*/
preferred_object_store_id?: string | null;
/**
* Preferred Outputs Object Store ID
* @description The ID of the object store that should be used to store ? datasets in this history.
* @description The ID of the object store that should be used to store the marked output datasets of this workflow - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences.
*/
preferred_outputs_object_store_id?: string | null;
/**
* Replacement Parameters
* @description TODO
* @description Class of parameters mostly used for string replacement in PJAs. In best practice workflows, these should be replaced with input parameters
* @default {}
*/
replacement_params: Record<string, never> | null;
Expand All @@ -12481,7 +12499,7 @@ export interface components {
require_exact_tool_versions: boolean | null;
/**
* Resource Parameters
* @description TODO
* @description If a workflow_resource_params_file file is defined and the target workflow is configured to consumer resource parameters, they can be specified with this parameter. See https://github.com/galaxyproject/galaxy/pull/4830 for more information.
* @default {}
*/
resource_params: Record<string, never> | null;
Expand All @@ -12490,11 +12508,6 @@ export interface components {
* @description Scheduler to use for workflow invocation.
*/
scheduler?: string | null;
/**
* Step Parameters
* @description TODO
*/
step_parameters?: Record<string, never> | null;
/**
* Use cached job
* @description Indicated whether to use a cached job for workflow invocation.
Expand Down Expand Up @@ -18188,6 +18201,86 @@ export interface components {
*/
workflow_id: string;
};
/**
* WorkflowInvocationRequestModel
* @description Model a workflow invocation request (InvokeWorkflowPayload) for an existing invocation.
*/
WorkflowInvocationRequestModel: {
/**
* History ID
* @description The encoded history id the workflow was run in.
*/
history_id: string;
/**
* Inputs
* @description Values for inputs
*/
inputs: Record<string, never>;
/**
* Inputs by
* @description How the 'inputs' field maps its inputs (datasets/collections/step parameters) to workflows steps.
*/
inputs_by: string;
/**
* Is instance
* @description This API yields a particular workflow instance, newer workflows belonging to the same storedworkflow may have different state.
* @default true
* @constant
* @enum {boolean}
*/
instance: true;
/**
* Legacy Step Parameters
* @description Parameters specified per-step for the workflow invocation, this is legacy and you should generally use inputs and only specify the formal parameters of a workflow instead. If these are set, the workflow was not executed in a best-practice fashion and we the resulting invocation request may not fully reflect the executed workflow state.
*/
parameters?: Record<string, never> | null;
/**
* Legacy Step Parameters Normalized
* @description Indicates if legacy parameters are already normalized to be indexed by the order_index and are specified as a dictionary per step. Legacy-style parameters could previously be specified as one parameter per step or by tool ID.
* @default true
* @constant
* @enum {boolean}
*/
parameters_normalized: true;
/**
* Preferred Intermediate Object Store ID
* @description The ID of the object store that should be used to store the intermediate datasets of this workflow - - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences
*/
preferred_intermediate_object_store_id?: string | null;
/**
* Preferred Object Store ID
* @description The ID of the object store that should be used to store all datasets (can instead specify object store IDs for intermediate and outputs datasts separately) - - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences
*/
preferred_object_store_id?: string | null;
/**
* Preferred Outputs Object Store ID
* @description The ID of the object store that should be used to store the marked output datasets of this workflow - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences.
*/
preferred_outputs_object_store_id?: string | null;
/**
* Replacement Parameters
* @description Class of parameters mostly used for string replacement in PJAs. In best practice workflows, these should be replaced with input parameters
* @default {}
*/
replacement_params: Record<string, never> | null;
/**
* Resource Parameters
* @description If a workflow_resource_params_file file is defined and the target workflow is configured to consumer resource parameters, they can be specified with this parameter. See https://github.com/galaxyproject/galaxy/pull/4830 for more information.
* @default {}
*/
resource_params: Record<string, never> | null;
/**
* Use cached job
* @description Indicated whether to use a cached job for workflow invocation.
* @default false
*/
use_cached_job: boolean;
/**
* Workflow ID
* @description The encoded Workflow ID associated with the invocation.
*/
workflow_id: string;
};
/** WorkflowInvocationResponse */
WorkflowInvocationResponse:
| components["schemas"]["WorkflowInvocationElementView"]
Expand Down Expand Up @@ -27056,6 +27149,50 @@ export interface operations {
};
};
};
invocation_as_request_api_invocations__invocation_id__request_get: {
parameters: {
query?: never;
header?: {
/** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */
"run-as"?: string | null;
};
path: {
/** @description The encoded database identifier of the Invocation. */
invocation_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["WorkflowInvocationRequestModel"];
};
};
/** @description Request Error */
"4XX": {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["MessageExceptionModel"];
};
};
/** @description Server Error */
"5XX": {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["MessageExceptionModel"];
};
};
};
};
invocation_step_jobs_summary_api_invocations__invocation_id__step_jobs_summary_get: {
parameters: {
query?: never;
Expand Down
75 changes: 71 additions & 4 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7749,6 +7749,16 @@ def get_internal_version(self, version):
raise galaxy.exceptions.RequestParameterInvalidException("Version does not exist")
return list(reversed(self.workflows))[version]

def get_internal_version_by_id(self, workflow_instance_id: int):
sa_session = object_session(self)
assert sa_session
workflow = sa_session.get(Workflow, workflow_instance_id)
if not workflow:
raise galaxy.exceptions.ObjectNotFound()
elif workflow.stored_workflow != self:
raise galaxy.exceptions.RequestParameterInvalidException()
return workflow

def version_of(self, workflow):
for version, workflow_instance in enumerate(reversed(self.workflows)):
if workflow_instance.id == workflow.id:
Expand Down Expand Up @@ -8015,7 +8025,7 @@ class WorkflowStep(Base, RepresentById, UsesCreateAndUpdateTime):
type: Mapped[Optional[str]] = mapped_column(String(64))
tool_id: Mapped[Optional[str]] = mapped_column(TEXT)
tool_version: Mapped[Optional[str]] = mapped_column(TEXT)
tool_inputs: Mapped[Optional[bytes]] = mapped_column(JSONType)
tool_inputs: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSONType)
tool_errors: Mapped[Optional[bytes]] = mapped_column(JSONType)
position: Mapped[Optional[bytes]] = mapped_column(MutableJSONType)
config: Mapped[Optional[bytes]] = mapped_column(JSONType)
Expand Down Expand Up @@ -8085,11 +8095,13 @@ def init_on_load(self):
def tool_uuid(self):
return self.dynamic_tool and self.dynamic_tool.uuid

@property
def is_input_type(self) -> bool:
return bool(self.type and self.type in self.STEP_TYPE_TO_INPUT_TYPE)

@property
def input_type(self):
assert (
self.type and self.type in self.STEP_TYPE_TO_INPUT_TYPE
), "step.input_type can only be called on input step types"
assert self.is_input_type, "step.input_type can only be called on input step types"
return self.STEP_TYPE_TO_INPUT_TYPE[self.type]

@property
Expand Down Expand Up @@ -8310,6 +8322,17 @@ def log_str(self):
f"WorkflowStep[index={self.order_index},type={self.type},label={self.label},uuid={self.uuid},id={self.id}]"
)

@property
def effective_label(self) -> Optional[str]:
label = self.label
if label is not None:
return label
elif self.is_input_type:
tool_inputs = self.tool_inputs
if tool_inputs is not None:
return cast(Optional[str], tool_inputs.get("name"))
return None

def clear_module_extras(self):
# the module code adds random dynamic state to the step, this
# attempts to clear that.
Expand Down Expand Up @@ -9112,6 +9135,50 @@ def attach_step(request_to_content):
attach_step(request_to_content)
self.input_step_parameters.append(request_to_content)

def recover_inputs(self) -> Tuple[Dict[str, Any], str]:
inputs = {}
inputs_by = "name"

have_referenced_steps_by_order_index = False

def best_step_reference(workflow_step: "WorkflowStep") -> str:
label = workflow_step.effective_label
if label is not None:
return label
nonlocal have_referenced_steps_by_order_index
have_referenced_steps_by_order_index = True
return str(workflow_step.order_index)

def ensure_step(step: Optional["WorkflowStep"]) -> "WorkflowStep":
if step is None:
raise galaxy.exceptions.InconsistentDatabase(
"workflow input found without step definition, this should not happen"
)
assert step
return step

for input_dataset_assoc in self.input_datasets:
workflow_step = ensure_step(input_dataset_assoc.workflow_step)
input_dataset = input_dataset_assoc.dataset
input_index = best_step_reference(workflow_step)
inputs[input_index] = input_dataset

for input_dataset_collection_assoc in self.input_dataset_collections:
workflow_step = ensure_step(input_dataset_collection_assoc.workflow_step)
input_dataset_collection = input_dataset_collection_assoc.dataset_collection
input_index = best_step_reference(workflow_step)
inputs[input_index] = input_dataset_collection

for input_step_parameter_assoc in self.input_step_parameters:
workflow_step = ensure_step(input_step_parameter_assoc.workflow_step)
value = input_step_parameter_assoc.parameter_value
input_index = best_step_reference(workflow_step)
inputs[input_index] = value

if have_referenced_steps_by_order_index:
inputs_by = "name|step_index"
return inputs, inputs_by

def add_message(self, message: "InvocationMessageUnion"):
self.messages.append(
WorkflowInvocationMessage( # type:ignore[abstract]
Expand Down
Loading
Loading