diff --git a/flyrs/clients/backfill.py b/flyrs/clients/backfill.py deleted file mode 100644 index 98649410d6..0000000000 --- a/flyrs/clients/backfill.py +++ /dev/null @@ -1,107 +0,0 @@ -import logging -import typing -from datetime import datetime, timedelta - -from croniter import croniter - -from flytekit import LaunchPlan -from flytekit.core.workflow import ImperativeWorkflow, WorkflowBase, WorkflowFailurePolicy -from clients.entities import FlyteLaunchPlan - - -def create_backfill_workflow( - start_date: datetime, - end_date: datetime, - for_lp: typing.Union[LaunchPlan, FlyteLaunchPlan], - parallel: bool = False, - per_node_timeout: timedelta = None, - per_node_retries: int = 0, - failure_policy: typing.Optional[WorkflowFailurePolicy] = None, -) -> typing.Tuple[WorkflowBase, datetime, datetime]: - """ - Generates a new imperative workflow for the launchplan that can be used to backfill the given launchplan. - This can only be used to generate backfilling workflow only for schedulable launchplans - - the Backfill plan is generated as (start_date - exclusive, end_date inclusive) - - .. code-block:: python - :caption: Correct usage for dates example - - lp = Launchplan.get_or_create(...) - start_date = datetime.datetime(2023, 1, 1) - end_date = start_date + datetime.timedelta(days=10) - wf = create_backfill_workflow(start_date, end_date, for_lp=lp) - - - .. code-block:: python - :caption: Incorrect date example - - wf = create_backfill_workflow(end_date, start_date, for_lp=lp) # end_date is before start_date - # OR - wf = create_backfill_workflow(start_date, start_date, for_lp=lp) # start and end date are same - - - :param start_date: datetime generate a backfill starting at this datetime (exclusive) - :param end_date: datetime generate a backfill ending at this datetime (inclusive) - :param for_lp: typing.Union[LaunchPlan, FlyteLaunchPlan] the backfill is generated for this launchplan - :param parallel: if the backfill should be run in parallel. False (default) will run each bacfill sequentially - :param per_node_timeout: timedelta Timeout to use per node - :param per_node_retries: int Retries to user per node - :param failure_policy: WorkflowFailurePolicy Failure policy to use for the backfill workflow - :return: WorkflowBase, datetime datetime -> New generated workflow, datetime for first instance of backfill, datetime for last instance of backfill - """ - if not for_lp: - raise ValueError("Launch plan is required!") - - if start_date >= end_date: - raise ValueError( - f"for a backfill start date should be earlier than end date. Received {start_date} -> {end_date}" - ) - - schedule = for_lp.entity_metadata.schedule if isinstance(for_lp, FlyteLaunchPlan) else for_lp.schedule - - if schedule is None: - raise ValueError("Backfill can only be created for scheduled launch plans") - - if schedule.cron_schedule is not None: - cron_schedule = schedule.cron_schedule - else: - raise NotImplementedError("Currently backfilling only supports cron schedules.") - - logging.info( - f"Generating backfill from {start_date} -> {end_date}. " - f"Parallel?[{parallel}] FailurePolicy[{str(failure_policy)}]" - ) - wf = ImperativeWorkflow(name=f"backfill-{for_lp.name}", failure_policy=failure_policy) - - input_name = schedule.kickoff_time_input_arg - date_iter = croniter(cron_schedule.schedule, start_time=start_date, ret_type=datetime) - prev_node = None - actual_start = None - actual_end = None - while True: - next_start_date = date_iter.get_next() - if not actual_start: - actual_start = next_start_date - if next_start_date >= end_date: - break - actual_end = next_start_date - inputs = {} - if input_name: - inputs[input_name] = next_start_date - next_node = wf.add_launch_plan(for_lp, **inputs) - next_node = next_node.with_overrides( - name=f"b-{next_start_date}", retries=per_node_retries, timeout=per_node_timeout - ) - if not parallel: - if prev_node: - prev_node.runs_before(next_node) - prev_node = next_node - - if actual_end is None: - raise StopIteration( - f"The time window is too small for any backfill instances, first instance after start" - f" date is {actual_start}" - ) - - return wf, actual_start, actual_end diff --git a/flyrs/clients/data.py b/flyrs/clients/data.py deleted file mode 100644 index 84fcff1420..0000000000 --- a/flyrs/clients/data.py +++ /dev/null @@ -1,55 +0,0 @@ -import os -import pathlib -import typing - -from google.protobuf.json_format import MessageToJson -from rich import print - -from flytekit import BlobType, Literal -from flytekit.core.data_persistence import FileAccessProvider -from flytekit.interaction.rich_utils import RichCallback -from flytekit.interaction.string_literals import literal_string_repr - - -def download_literal( - file_access: FileAccessProvider, var: str, data: Literal, download_to: typing.Optional[pathlib.Path] = None -): - """ - Download a single literal to a file, if it is a blob or structured dataset. - """ - if data is None: - print(f"Skipping {var} as it is None.") - return - if data.scalar: - if data.scalar and (data.scalar.blob or data.scalar.structured_dataset): - uri = data.scalar.blob.uri if data.scalar.blob else data.scalar.structured_dataset.uri - if uri is None: - print("No data to download.") - return - is_multipart = False - if data.scalar.blob: - is_multipart = data.scalar.blob.metadata.type.dimensionality == BlobType.BlobDimensionality.MULTIPART - elif data.scalar.structured_dataset: - is_multipart = True - file_access.get_data( - uri, str(download_to / var) + os.sep, is_multipart=is_multipart, callback=RichCallback() - ) - elif data.scalar.union is not None: - download_literal(file_access, var, data.scalar.union.value, download_to) - elif data.scalar.generic is not None: - with open(download_to / f"{var}.json", "w") as f: - f.write(MessageToJson(data.scalar.generic)) - else: - print( - f"[dim]Skipping {var} val {literal_string_repr(data)} as it is not a blob, structured dataset," - f" or generic type.[/dim]" - ) - return - elif data.collection: - for i, v in enumerate(data.collection.literals): - download_literal(file_access, f"{i}", v, download_to / var) - elif data.map: - download_to = pathlib.Path(download_to) - for k, v in data.map.literals.items(): - download_literal(file_access, f"{k}", v, download_to / var) - print(f"Downloaded f{var} to {download_to}") diff --git a/flyrs/clients/entities.py b/flyrs/clients/entities.py deleted file mode 100644 index 9adba429a6..0000000000 --- a/flyrs/clients/entities.py +++ /dev/null @@ -1,839 +0,0 @@ -""" -This module contains shadow entities for all Flyte entities as represented in Flyte Admin / Control Plane. -The goal is to enable easy access, manipulation of these entities. -""" -from __future__ import annotations - -from typing import Dict, List, Optional, Tuple, Union - -from flytekit import FlyteContext -from flytekit.core import constants as _constants -from flytekit.core import hash as _hash_mixin -from flytekit.core import hash as hash_mixin -from flytekit.core.promise import create_and_link_node_from_remote -from flytekit.exceptions import system as _system_exceptions -from flytekit.exceptions import user as _user_exceptions -from flytekit.loggers import logger -from flytekit.models import interface as _interface_models -from flytekit.models import launch_plan as _launch_plan_model -from flytekit.models import launch_plan as _launch_plan_models -from flytekit.models import launch_plan as launch_plan_models -from flytekit.models import task as _task_model -from flytekit.models import task as _task_models -from flytekit.models.admin.workflow import WorkflowSpec -from flytekit.models.core import compiler as compiler_models -from flytekit.models.core import identifier as _identifier_model -from flytekit.models.core import identifier as id_models -from flytekit.models.core import workflow as _workflow_model -from flytekit.models.core import workflow as _workflow_models -from flytekit.models.core.identifier import Identifier -from flytekit.models.core.workflow import Node, WorkflowMetadata, WorkflowMetadataDefaults -from flytekit.models.interface import TypedInterface -from flytekit.models.literals import Binding -from flytekit.models.task import TaskSpec -import clients.interface as _interfaces -from clients.remote_callable import RemoteEntity - - -class FlyteTask(hash_mixin.HashOnReferenceMixin, RemoteEntity, TaskSpec): - """A class encapsulating a remote Flyte task.""" - - def __init__( - self, - id, - type, - metadata, - interface, - custom, - container=None, - task_type_version: int = 0, - config=None, - should_register: bool = False, - ): - super(FlyteTask, self).__init__( - template=_task_model.TaskTemplate( - id, - type, - metadata, - interface, - custom, - container=container, - task_type_version=task_type_version, - config=config, - ) - ) - self._should_register = should_register - - @property - def id(self): - """ - This is generated by the system and uniquely identifies the task. - - :rtype: flytekit.models.core.identifier.Identifier - """ - return self.template.id - - @property - def type(self): - """ - This is used to identify additional extensions for use by Propeller or SDK. - - :rtype: Text - """ - return self.template.type - - @property - def metadata(self): - """ - This contains information needed at runtime to determine behavior such as whether or not outputs are - discoverable, timeouts, and retries. - - :rtype: TaskMetadata - """ - return self.template.metadata - - @property - def interface(self): - """ - The interface definition for this task. - - :rtype: flytekit.models.interface.TypedInterface - """ - return self.template.interface - - @property - def custom(self): - """ - Arbitrary dictionary containing metadata for custom plugins. - - :rtype: dict[Text, T] - """ - return self.template.custom - - @property - def task_type_version(self): - return self.template.task_type_version - - @property - def container(self): - """ - If not None, the target of execution should be a container. - - :rtype: Container - """ - return self.template.container - - @property - def config(self): - """ - Arbitrary dictionary containing metadata for parsing and handling custom plugins. - - :rtype: dict[Text, T] - """ - return self.template.config - - @property - def security_context(self): - return self.template.security_context - - @property - def k8s_pod(self): - return self.template.k8s_pod - - @property - def sql(self): - return self.template.sql - - @property - def should_register(self) -> bool: - return self._should_register - - @property - def name(self) -> str: - return self.template.id.name - - @property - def resource_type(self) -> _identifier_model.ResourceType: - return _identifier_model.ResourceType.TASK - - @property - def entity_type_text(self) -> str: - return "Task" - - @classmethod - def promote_from_model(cls, base_model: _task_model.TaskTemplate) -> FlyteTask: - t = cls( - id=base_model.id, - type=base_model.type, - metadata=base_model.metadata, - interface=_interfaces.TypedInterface.promote_from_model(base_model.interface), - custom=base_model.custom, - container=base_model.container, - task_type_version=base_model.task_type_version, - ) - # Override the newly generated name if one exists in the base model - if not base_model.id.is_empty: - t._id = base_model.id - - return t - - -class FlyteTaskNode(_workflow_model.TaskNode): - """A class encapsulating a task that a Flyte node needs to execute.""" - - def __init__(self, flyte_task: FlyteTask): - super(FlyteTaskNode, self).__init__(None) - self._flyte_task = flyte_task - - @property - def reference_id(self) -> id_models.Identifier: - """A globally unique identifier for the task.""" - return self._flyte_task.id - - @property - def flyte_task(self) -> FlyteTask: - return self._flyte_task - - @classmethod - def promote_from_model(cls, task: FlyteTask) -> FlyteTaskNode: - """ - Takes the idl wrapper for a TaskNode, - and returns the hydrated Flytekit object for it by fetching it with the FlyteTask control plane. - """ - return cls(flyte_task=task) - - -class FlyteWorkflowNode(_workflow_model.WorkflowNode): - """A class encapsulating a workflow that a Flyte node needs to execute.""" - - def __init__( - self, - flyte_workflow: FlyteWorkflow = None, - flyte_launch_plan: FlyteLaunchPlan = None, - ): - if flyte_workflow and flyte_launch_plan: - raise _system_exceptions.FlyteSystemException( - "FlyteWorkflowNode cannot be called with both a workflow and a launchplan specified, please pick " - f"one. workflow: {flyte_workflow} launchPlan: {flyte_launch_plan}", - ) - - self._flyte_workflow = flyte_workflow - self._flyte_launch_plan = flyte_launch_plan - super(FlyteWorkflowNode, self).__init__( - launchplan_ref=self._flyte_launch_plan.id if self._flyte_launch_plan else None, - sub_workflow_ref=self._flyte_workflow.id if self._flyte_workflow else None, - ) - - def __repr__(self) -> str: - if self.flyte_workflow is not None: - return f"FlyteWorkflowNode with workflow: {self.flyte_workflow}" - return f"FlyteWorkflowNode with launch plan: {self.flyte_launch_plan}" - - @property - def launchplan_ref(self) -> id_models.Identifier: - """A globally unique identifier for the launch plan, which should map to Admin.""" - return self._flyte_launch_plan.id if self._flyte_launch_plan else None - - @property - def sub_workflow_ref(self): - return self._flyte_workflow.id if self._flyte_workflow else None - - @property - def flyte_launch_plan(self) -> FlyteLaunchPlan: - return self._flyte_launch_plan - - @property - def flyte_workflow(self) -> FlyteWorkflow: - return self._flyte_workflow - - @classmethod - def _promote_workflow( - cls, - wf: _workflow_models.WorkflowTemplate, - sub_workflows: Optional[Dict[Identifier, _workflow_models.WorkflowTemplate]] = None, - tasks: Optional[Dict[Identifier, FlyteTask]] = None, - node_launch_plans: Optional[Dict[Identifier, launch_plan_models.LaunchPlanSpec]] = None, - ) -> FlyteWorkflow: - return FlyteWorkflow.promote_from_model( - wf, - sub_workflows=sub_workflows, - node_launch_plans=node_launch_plans, - tasks=tasks, - ) - - @classmethod - def promote_from_model( - cls, - base_model: _workflow_model.WorkflowNode, - sub_workflows: Dict[id_models.Identifier, _workflow_model.WorkflowTemplate], - node_launch_plans: Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec], - tasks: Dict[Identifier, FlyteTask], - converted_sub_workflows: Dict[id_models.Identifier, FlyteWorkflow], - ) -> Tuple[FlyteWorkflowNode, Dict[id_models.Identifier, FlyteWorkflow]]: - if base_model.launchplan_ref is not None: - return ( - cls( - flyte_launch_plan=FlyteLaunchPlan.promote_from_model( - base_model.launchplan_ref, node_launch_plans[base_model.launchplan_ref] - ) - ), - converted_sub_workflows, - ) - elif base_model.sub_workflow_ref is not None: - # the workflow templates for sub-workflows should have been included in the original response - if base_model.reference in sub_workflows: - wf = None - if base_model.reference not in converted_sub_workflows: - wf = cls._promote_workflow( - sub_workflows[base_model.reference], - sub_workflows=sub_workflows, - node_launch_plans=node_launch_plans, - tasks=tasks, - ) - converted_sub_workflows[base_model.reference] = wf - else: - wf = converted_sub_workflows[base_model.reference] - return cls(flyte_workflow=wf), converted_sub_workflows - raise _system_exceptions.FlyteSystemException(f"Subworkflow {base_model.reference} not found.") - - raise _system_exceptions.FlyteSystemException( - "Bad workflow node model, neither subworkflow nor launchplan specified." - ) - - -class FlyteBranchNode(_workflow_model.BranchNode): - def __init__(self, if_else: _workflow_model.IfElseBlock): - super().__init__(if_else) - - @classmethod - def promote_from_model( - cls, - base_model: _workflow_model.BranchNode, - sub_workflows: Dict[id_models.Identifier, _workflow_model.WorkflowTemplate], - node_launch_plans: Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec], - tasks: Dict[id_models.Identifier, FlyteTask], - converted_sub_workflows: Dict[id_models.Identifier, FlyteWorkflow], - ) -> Tuple[FlyteBranchNode, Dict[id_models.Identifier, FlyteWorkflow]]: - block = base_model.if_else - block.case._then_node, converted_sub_workflows = FlyteNode.promote_from_model( - block.case.then_node, - sub_workflows, - node_launch_plans, - tasks, - converted_sub_workflows, - ) - - for o in block.other: - o._then_node, converted_sub_workflows = FlyteNode.promote_from_model( - o.then_node, sub_workflows, node_launch_plans, tasks, converted_sub_workflows - ) - - else_node = None - if block.else_node: - else_node, converted_sub_workflows = FlyteNode.promote_from_model( - block.else_node, sub_workflows, node_launch_plans, tasks, converted_sub_workflows - ) - - new_if_else_block = _workflow_model.IfElseBlock(block.case, block.other, else_node, block.error) - - return cls(new_if_else_block), converted_sub_workflows - - -class FlyteGateNode(_workflow_model.GateNode): - @classmethod - def promote_from_model(cls, model: _workflow_model.GateNode): - return cls(model.signal, model.sleep, model.approve) - - -class FlyteArrayNode(_workflow_model.ArrayNode): - @classmethod - def promote_from_model(cls, model: _workflow_model.ArrayNode): - return cls(model._parallelism, model._node, model._min_success_ratio, model._min_successes) - - -class FlyteNode(_hash_mixin.HashOnReferenceMixin, _workflow_model.Node): - """A class encapsulating a remote Flyte node.""" - - def __init__( - self, - id, - upstream_nodes, - bindings, - metadata, - task_node: Optional[FlyteTaskNode] = None, - workflow_node: Optional[FlyteWorkflowNode] = None, - branch_node: Optional[FlyteBranchNode] = None, - gate_node: Optional[FlyteGateNode] = None, - array_node: Optional[FlyteArrayNode] = None, - ): - if not task_node and not workflow_node and not branch_node and not gate_node and not array_node: - raise _user_exceptions.FlyteAssertion( - "An Flyte node must have one of task|workflow|branch|gate|array entity specified at once" - ) - # TODO: Revisit flyte_branch_node and flyte_gate_node, should they be another type like Condition instead - # of a node? - self._flyte_task_node = task_node - if task_node: - self._flyte_entity = task_node.flyte_task - elif workflow_node: - self._flyte_entity = workflow_node.flyte_workflow or workflow_node.flyte_launch_plan - else: - self._flyte_entity = branch_node or gate_node or array_node - - super(FlyteNode, self).__init__( - id=id, - metadata=metadata, - inputs=bindings, - upstream_node_ids=[n.id for n in upstream_nodes], - output_aliases=[], - task_node=task_node, - workflow_node=workflow_node, - branch_node=branch_node, - gate_node=gate_node, - array_node=array_node, - ) - self._upstream = upstream_nodes - - @property - def task_node(self) -> Optional[FlyteTaskNode]: - return self._flyte_task_node - - @property - def flyte_entity(self) -> Union[FlyteTask, FlyteWorkflow, FlyteLaunchPlan, FlyteBranchNode]: - return self._flyte_entity - - @classmethod - def _promote_task_node(cls, t: FlyteTask) -> FlyteTaskNode: - return FlyteTaskNode.promote_from_model(t) - - @classmethod - def _promote_workflow_node( - cls, - wn: _workflow_model.WorkflowNode, - sub_workflows: Dict[id_models.Identifier, _workflow_model.WorkflowTemplate], - node_launch_plans: Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec], - tasks: Dict[Identifier, FlyteTask], - converted_sub_workflows: Dict[id_models.Identifier, FlyteWorkflow], - ) -> Tuple[FlyteWorkflowNode, Dict[id_models.Identifier, FlyteWorkflow]]: - return FlyteWorkflowNode.promote_from_model( - wn, - sub_workflows, - node_launch_plans, - tasks, - converted_sub_workflows, - ) - - @classmethod - def promote_from_model( - cls, - model: _workflow_model.Node, - sub_workflows: Optional[Dict[id_models.Identifier, _workflow_model.WorkflowTemplate]], - node_launch_plans: Optional[Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec]], - tasks: Dict[id_models.Identifier, FlyteTask], - converted_sub_workflows: Dict[id_models.Identifier, FlyteWorkflow], - ) -> Tuple[Optional[FlyteNode], Dict[id_models.Identifier, FlyteWorkflow]]: - node_model_id = model.id - # TODO: Consider removing - if id in {_constants.START_NODE_ID, _constants.END_NODE_ID}: - logger.warning(f"Should not call promote from model on a start node or end node {model}") - return None, converted_sub_workflows - - flyte_task_node, flyte_workflow_node, flyte_branch_node, flyte_gate_node, flyte_array_node = ( - None, - None, - None, - None, - None, - ) - if model.task_node is not None: - if model.task_node.reference_id not in tasks: - raise RuntimeError( - f"Remote Workflow closure does not have task with id {model.task_node.reference_id}." - ) - flyte_task_node = cls._promote_task_node(tasks[model.task_node.reference_id]) - elif model.workflow_node is not None: - flyte_workflow_node, converted_sub_workflows = cls._promote_workflow_node( - model.workflow_node, - sub_workflows, - node_launch_plans, - tasks, - converted_sub_workflows, - ) - elif model.branch_node is not None: - flyte_branch_node, converted_sub_workflows = FlyteBranchNode.promote_from_model( - model.branch_node, - sub_workflows, - node_launch_plans, - tasks, - converted_sub_workflows, - ) - elif model.gate_node is not None: - flyte_gate_node = FlyteGateNode.promote_from_model(model.gate_node) - elif model.array_node is not None: - flyte_array_node = FlyteArrayNode.promote_from_model(model.array_node) - # TODO: validate task in tasks - else: - raise _system_exceptions.FlyteSystemException( - f"Bad Node model, neither task nor workflow detected, node: {model}" - ) - - # When WorkflowTemplate models (containing node models) are returned by Admin, they've been compiled with a - # start node. In order to make the promoted FlyteWorkflow look the same, we strip the start-node text back out. - # TODO: Consider removing - for model_input in model.inputs: - if ( - model_input.binding.promise is not None - and model_input.binding.promise.node_id == _constants.START_NODE_ID - ): - model_input.binding.promise._node_id = _constants.GLOBAL_INPUT_NODE_ID - - return ( - cls( - id=node_model_id, - upstream_nodes=[], # set downstream, model doesn't contain this information - bindings=model.inputs, - metadata=model.metadata, - task_node=flyte_task_node, - workflow_node=flyte_workflow_node, - branch_node=flyte_branch_node, - gate_node=flyte_gate_node, - array_node=flyte_array_node, - ), - converted_sub_workflows, - ) - - @property - def upstream_nodes(self) -> List[FlyteNode]: - return self._upstream - - @property - def upstream_node_ids(self) -> List[str]: - return list(sorted(n.id for n in self.upstream_nodes)) - - def __repr__(self) -> str: - return f"Node(ID: {self.id})" - - -class FlyteWorkflow(_hash_mixin.HashOnReferenceMixin, RemoteEntity, WorkflowSpec): - """A class encapsulating a remote Flyte workflow.""" - - def __init__( - self, - id: id_models.Identifier, - nodes: List[FlyteNode], - interface, - output_bindings, - metadata, - metadata_defaults, - subworkflows: Optional[List[FlyteWorkflow]] = None, - tasks: Optional[List[FlyteTask]] = None, - launch_plans: Optional[Dict[id_models.Identifier, launch_plan_models.LaunchPlanSpec]] = None, - compiled_closure: Optional[compiler_models.CompiledWorkflowClosure] = None, - should_register: bool = False, - ): - # TODO: Remove check - for node in nodes: - for upstream in node.upstream_nodes: - if upstream.id is None: - raise _user_exceptions.FlyteAssertion( - "Some nodes contained in the workflow were not found in the workflow description. Please " - "ensure all nodes are either assigned to attributes within the class or an element in a " - "list, dict, or tuple which is stored as an attribute in the class." - ) - - self._flyte_sub_workflows = subworkflows - template_subworkflows = [] - if subworkflows: - template_subworkflows = [swf.template for swf in subworkflows] - - super(FlyteWorkflow, self).__init__( - template=_workflow_models.WorkflowTemplate( - id=id, - metadata=metadata, - metadata_defaults=metadata_defaults, - interface=interface, - nodes=nodes, - outputs=output_bindings, - ), - sub_workflows=template_subworkflows, - ) - self._flyte_nodes = nodes - - # Optional things that we save for ease of access when promoting from a model or CompiledWorkflowClosure - self._tasks = tasks - self._launch_plans = launch_plans - self._compiled_closure = compiled_closure - self._node_map = None - self._name = id.name - self._should_register = should_register - - @property - def name(self) -> str: - return self._name - - @property - def flyte_tasks(self) -> Optional[List[FlyteTask]]: - return self._tasks - - @property - def should_register(self) -> bool: - return self._should_register - - @property - def flyte_sub_workflows(self) -> List[FlyteWorkflow]: - return self._flyte_sub_workflows - - @property - def entity_type_text(self) -> str: - return "Workflow" - - @property - def resource_type(self): - return id_models.ResourceType.WORKFLOW - - @property - def flyte_nodes(self) -> List[FlyteNode]: - return self._flyte_nodes - - @property - def id(self) -> Identifier: - """ - This is an autogenerated id by the system. The id is globally unique across Flyte. - """ - return self.template.id - - @property - def metadata(self) -> WorkflowMetadata: - """ - This contains information on how to run the workflow. - """ - return self.template.metadata - - @property - def metadata_defaults(self) -> WorkflowMetadataDefaults: - """ - This contains information on how to run the workflow. - :rtype: WorkflowMetadataDefaults - """ - return self.template.metadata_defaults - - @property - def interface(self) -> TypedInterface: - """ - Defines a strongly typed interface for the Workflow (inputs, outputs). This can include some optional - parameters. - """ - return self.template.interface - - @property - def nodes(self) -> List[Node]: - """ - A list of nodes. In addition, "globals" is a special reserved node id that can be used to consume - workflow inputs - """ - return self.template.nodes - - @property - def outputs(self) -> List[Binding]: - """ - A list of output bindings that specify how to construct workflow outputs. Bindings can - pull node outputs or specify literals. All workflow outputs specified in the interface field must be bound - in order for the workflow to be validated. A workflow has an implicit dependency on all of its nodes - to execute successfully in order to bind final outputs. - """ - return self.template.outputs - - @property - def failure_node(self) -> Node: - """ - Node failure_node: A catch-all node. This node is executed whenever the execution engine determines the - workflow has failed. The interface of this node must match the Workflow interface with an additional input - named "error" of type pb.lyft.flyte.core.Error. - """ - return self.template.failure_node - - @classmethod - def get_non_system_nodes(cls, nodes: List[_workflow_models.Node]) -> List[_workflow_models.Node]: - return [n for n in nodes if n.id not in {_constants.START_NODE_ID, _constants.END_NODE_ID}] - - @classmethod - def _promote_node( - cls, - model: _workflow_model.Node, - sub_workflows: Optional[Dict[id_models.Identifier, _workflow_model.WorkflowTemplate]], - node_launch_plans: Optional[Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec]], - tasks: Dict[id_models.Identifier, FlyteTask], - converted_sub_workflows: Dict[id_models.Identifier, FlyteWorkflow], - ) -> Tuple[Optional[FlyteNode], Dict[id_models.Identifier, FlyteWorkflow]]: - return FlyteNode.promote_from_model(model, sub_workflows, node_launch_plans, tasks, converted_sub_workflows) - - @classmethod - def promote_from_model( - cls, - base_model: _workflow_models.WorkflowTemplate, - sub_workflows: Optional[Dict[Identifier, _workflow_models.WorkflowTemplate]] = None, - tasks: Optional[Dict[Identifier, FlyteTask]] = None, - node_launch_plans: Optional[Dict[Identifier, launch_plan_models.LaunchPlanSpec]] = None, - ) -> FlyteWorkflow: - base_model_non_system_nodes = cls.get_non_system_nodes(base_model.nodes) - - node_map = {} - converted_sub_workflows = {} - for node in base_model_non_system_nodes: - flyte_node, converted_sub_workflows = cls._promote_node( - node, sub_workflows, node_launch_plans, tasks, converted_sub_workflows - ) - node_map[node.id] = flyte_node - - # Set upstream nodes for each node - for n in base_model_non_system_nodes: - current = node_map[n.id] - for upstream_id in n.upstream_node_ids: - upstream_node = node_map[upstream_id] - current._upstream.append(upstream_node) - - subworkflow_list = [] - if converted_sub_workflows: - subworkflow_list = [v for _, v in converted_sub_workflows.items()] - - task_list = [] - if tasks: - task_list = [t for _, t in tasks.items()] - - # No inputs/outputs specified, see the constructor for more information on the overrides. - wf = cls( - id=base_model.id, - nodes=list(node_map.values()), - metadata=base_model.metadata, - metadata_defaults=base_model.metadata_defaults, - interface=_interfaces.TypedInterface.promote_from_model(base_model.interface), - output_bindings=base_model.outputs, - subworkflows=subworkflow_list, - tasks=task_list, - launch_plans=node_launch_plans, - ) - - wf._node_map = node_map - - return wf - - @classmethod - def _promote_task(cls, t: _task_models.TaskTemplate) -> FlyteTask: - return FlyteTask.promote_from_model(t) - - @classmethod - def promote_from_closure( - cls, - closure: compiler_models.CompiledWorkflowClosure, - node_launch_plans: Optional[Dict[id_models, launch_plan_models.LaunchPlanSpec]] = None, - ): - """ - Extracts out the relevant portions of a FlyteWorkflow from a closure from the control plane. - - :param closure: This is the closure returned by Admin - :param node_launch_plans: The reason this exists is because the compiled closure doesn't have launch plans. - It only has subworkflows and tasks. Why this is unclear. If supplied, this map of launch plans will be - """ - sub_workflows = {sw.template.id: sw.template for sw in closure.sub_workflows} - tasks = {} - if closure.tasks: - tasks = {t.template.id: cls._promote_task(t.template) for t in closure.tasks} - - flyte_wf = cls.promote_from_model( - base_model=closure.primary.template, - sub_workflows=sub_workflows, - node_launch_plans=node_launch_plans, - tasks=tasks, - ) - flyte_wf._compiled_closure = closure - return flyte_wf - - -class FlyteLaunchPlan(hash_mixin.HashOnReferenceMixin, RemoteEntity, _launch_plan_models.LaunchPlanSpec): - """A class encapsulating a remote Flyte launch plan.""" - - def __init__(self, id, *args, **kwargs): - super(FlyteLaunchPlan, self).__init__(*args, **kwargs) - # Set all the attributes we expect this class to have - self._id = id - self._name = id.name - - # The interface is not set explicitly unless fetched in an engine context - self._interface = None - # If fetched when creating this object, can store it here. - self._flyte_workflow = None - - @property - def name(self) -> str: - return self._name - - @property - def flyte_workflow(self) -> Optional[FlyteWorkflow]: - return self._flyte_workflow - - @classmethod - def promote_from_model(cls, id: id_models.Identifier, model: _launch_plan_models.LaunchPlanSpec) -> FlyteLaunchPlan: - lp = cls( - id=id, - workflow_id=model.workflow_id, - default_inputs=_interface_models.ParameterMap(model.default_inputs.parameters), - fixed_inputs=model.fixed_inputs, - entity_metadata=model.entity_metadata, - labels=model.labels, - annotations=model.annotations, - auth_role=model.auth_role, - raw_output_data_config=model.raw_output_data_config, - max_parallelism=model.max_parallelism, - security_context=model.security_context, - ) - return lp - - @property - def id(self) -> id_models.Identifier: - return self._id - - @property - def is_scheduled(self) -> bool: - if self.entity_metadata.schedule.cron_expression: - return True - elif self.entity_metadata.schedule.rate and self.entity_metadata.schedule.rate.value: - return True - elif self.entity_metadata.schedule.cron_schedule and self.entity_metadata.schedule.cron_schedule.schedule: - return True - else: - return False - - @property - def workflow_id(self) -> id_models.Identifier: - return self._workflow_id - - @property - def interface(self) -> Optional[_interface.TypedInterface]: - """ - The interface is not technically part of the admin.LaunchPlanSpec in the IDL, however the workflow ID is, and - from the workflow ID, fetch will fill in the interface. This is nice because then you can __call__ the= - object and get a node. - """ - return self._interface - - @property - def resource_type(self) -> id_models.ResourceType: - return id_models.ResourceType.LAUNCH_PLAN - - @property - def entity_type_text(self) -> str: - return "Launch Plan" - - def compile(self, ctx: FlyteContext, *args, **kwargs): - fixed_input_lits = self.fixed_inputs.literals or {} - default_input_params = self.default_inputs.parameters or {} - return create_and_link_node_from_remote( - ctx, - entity=self, - _inputs_not_allowed=set(fixed_input_lits.keys()), - _ignorable_inputs=set(default_input_params.keys()), - **kwargs, - ) # noqa - - def __repr__(self) -> str: - return f"FlyteLaunchPlan(ID: {self.id} Interface: {self.interface}) - Spec {super().__repr__()})" diff --git a/flyrs/clients/executions.py b/flyrs/clients/executions.py deleted file mode 100644 index 05724bb868..0000000000 --- a/flyrs/clients/executions.py +++ /dev/null @@ -1,212 +0,0 @@ -from __future__ import annotations - -from abc import abstractmethod -from typing import Dict, List, Optional, Union - -from flytekit.core.type_engine import LiteralsResolver -from flytekit.exceptions import user as user_exceptions -from flytekit.models import execution as execution_models -from flytekit.models import node_execution as node_execution_models -from flytekit.models.admin import task_execution as admin_task_execution_models -from flytekit.models.core import execution as core_execution_models -from clients.entities import FlyteTask, FlyteWorkflow - - -class RemoteExecutionBase(object): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._inputs: Optional[LiteralsResolver] = None - self._outputs: Optional[LiteralsResolver] = None - - @property - def inputs(self) -> Optional[LiteralsResolver]: - return self._inputs - - @property - @abstractmethod - def error(self) -> core_execution_models.ExecutionError: - ... - - @property - @abstractmethod - def is_done(self) -> bool: - ... - - @property - def outputs(self) -> Optional[LiteralsResolver]: - """ - :return: Returns the outputs LiteralsResolver to the execution - :raises: ``FlyteAssertion`` error if execution is in progress or execution ended in error. - """ - if not self.is_done: - raise user_exceptions.FlyteAssertion( - "Please wait until the execution has completed before requesting the outputs." - ) - if self.error: - raise user_exceptions.FlyteAssertion("Outputs could not be found because the execution ended in failure.") - - return self._outputs - - -class FlyteTaskExecution(RemoteExecutionBase, admin_task_execution_models.TaskExecution): - """A class encapsulating a task execution being run on a Flyte remote backend.""" - - def __init__(self, *args, **kwargs): - super(FlyteTaskExecution, self).__init__(*args, **kwargs) - self._flyte_task = None - - @property - def task(self) -> Optional[FlyteTask]: - return self._flyte_task - - @property - def is_done(self) -> bool: - """Whether or not the execution is complete.""" - return self.closure.phase in { - core_execution_models.TaskExecutionPhase.ABORTED, - core_execution_models.TaskExecutionPhase.FAILED, - core_execution_models.TaskExecutionPhase.SUCCEEDED, - } - - @property - def error(self) -> Optional[core_execution_models.ExecutionError]: - """ - If execution is in progress, raise an exception. Otherwise, return None if no error was present upon - reaching completion. - """ - if not self.is_done: - raise user_exceptions.FlyteAssertion( - "Please what until the task execution has completed before requesting error information." - ) - return self.closure.error - - @classmethod - def promote_from_model(cls, base_model: admin_task_execution_models.TaskExecution) -> "FlyteTaskExecution": - return cls( - closure=base_model.closure, - id=base_model.id, - input_uri=base_model.input_uri, - is_parent=base_model.is_parent, - ) - - -class FlyteWorkflowExecution(RemoteExecutionBase, execution_models.Execution): - """A class encapsulating a workflow execution being run on a Flyte remote backend.""" - - def __init__(self, *args, **kwargs): - super(FlyteWorkflowExecution, self).__init__(*args, **kwargs) - self._node_executions = None - self._flyte_workflow: Optional[FlyteWorkflow] = None - - @property - def flyte_workflow(self) -> Optional[FlyteWorkflow]: - return self._flyte_workflow - - @property - def node_executions(self) -> Dict[str, "FlyteNodeExecution"]: - """Get a dictionary of node executions that are a part of this workflow execution.""" - return self._node_executions or {} - - @property - def error(self) -> core_execution_models.ExecutionError: - """ - If execution is in progress, raise an exception. Otherwise, return None if no error was present upon - reaching completion. - """ - if not self.is_done: - raise user_exceptions.FlyteAssertion( - "Please wait until a workflow has completed before checking for an error." - ) - return self.closure.error - - @property - def is_done(self) -> bool: - """ - Whether or not the execution is complete. - """ - return self.closure.phase in { - core_execution_models.WorkflowExecutionPhase.ABORTED, - core_execution_models.WorkflowExecutionPhase.FAILED, - core_execution_models.WorkflowExecutionPhase.SUCCEEDED, - core_execution_models.WorkflowExecutionPhase.TIMED_OUT, - } - - @classmethod - def promote_from_model(cls, base_model: execution_models.Execution) -> "FlyteWorkflowExecution": - return cls( - closure=base_model.closure, - id=base_model.id, - spec=base_model.spec, - ) - - -class FlyteNodeExecution(RemoteExecutionBase, node_execution_models.NodeExecution): - """A class encapsulating a node execution being run on a Flyte remote backend.""" - - def __init__(self, *args, **kwargs): - super(FlyteNodeExecution, self).__init__(*args, **kwargs) - self._task_executions = None - self._workflow_executions = [] - self._underlying_node_executions = None - self._interface = None - self._flyte_node = None - - @property - def task_executions(self) -> List[FlyteTaskExecution]: - return self._task_executions or [] - - @property - def workflow_executions(self) -> List[FlyteWorkflowExecution]: - return self._workflow_executions - - @property - def subworkflow_node_executions(self) -> Dict[str, FlyteNodeExecution]: - """ - This returns underlying node executions in instances where the current node execution is - a parent node. This happens when it's either a static or dynamic subworkflow. - """ - return ( - {} - if self._underlying_node_executions is None - else {n.id.node_id: n for n in self._underlying_node_executions} - ) - - @property - def executions(self) -> List[Union[FlyteTaskExecution, FlyteWorkflowExecution]]: - return self.task_executions or self._underlying_node_executions or [] - - @property - def error(self) -> core_execution_models.ExecutionError: - """ - If execution is in progress, raise an exception. Otherwise, return None if no error was present upon - reaching completion. - """ - if not self.is_done: - raise user_exceptions.FlyteAssertion( - "Please wait until the node execution has completed before requesting error information." - ) - return self.closure.error - - @property - def is_done(self) -> bool: - """Whether or not the execution is complete.""" - return self.closure.phase in { - core_execution_models.NodeExecutionPhase.ABORTED, - core_execution_models.NodeExecutionPhase.FAILED, - core_execution_models.NodeExecutionPhase.SKIPPED, - core_execution_models.NodeExecutionPhase.SUCCEEDED, - core_execution_models.NodeExecutionPhase.TIMED_OUT, - } - - @classmethod - def promote_from_model(cls, base_model: node_execution_models.NodeExecution) -> "FlyteNodeExecution": - return cls( - closure=base_model.closure, id=base_model.id, input_uri=base_model.input_uri, metadata=base_model.metadata - ) - - @property - def interface(self) -> "flytekit.remote.interface.TypedInterface": - """ - Return the interface of the task or subworkflow associated with this node execution. - """ - return self._interface diff --git a/flyrs/clients/interface.py b/flyrs/clients/interface.py deleted file mode 100644 index df61c8e336..0000000000 --- a/flyrs/clients/interface.py +++ /dev/null @@ -1,11 +0,0 @@ -from flytekit.models import interface as _interface_models - - -class TypedInterface(_interface_models.TypedInterface): - @classmethod - def promote_from_model(cls, model): - """ - :param flytekit.models.interface.TypedInterface model: - :rtype: TypedInterface - """ - return cls(model.inputs, model.outputs) diff --git a/flyrs/clients/lazy_entity.py b/flyrs/clients/lazy_entity.py deleted file mode 100644 index c9cb803267..0000000000 --- a/flyrs/clients/lazy_entity.py +++ /dev/null @@ -1,67 +0,0 @@ -import typing -from threading import Lock - -from flytekit import FlyteContext -from clients.remote_callable import RemoteEntity - -T = typing.TypeVar("T", bound=RemoteEntity) - - -class LazyEntity(RemoteEntity, typing.Generic[T]): - """ - Fetches the entity when the entity is called or when the entity is retrieved. - The entity is derived from RemoteEntity so that it behaves exactly like the mimicked entity. - """ - - def __init__(self, name: str, getter: typing.Callable[[], T], *args, **kwargs): - super().__init__(*args, **kwargs) - self._entity = None - self._getter = getter - self._name = name - if not self._getter: - raise ValueError("getter method is required to create a Lazy loadable Remote Entity.") - self._mutex = Lock() - - @property - def name(self) -> str: - return self._name - - def entity_fetched(self) -> bool: - with self._mutex: - return self._entity is not None - - @property - def entity(self) -> T: - """ - If not already fetched / available, then the entity will be force fetched. - """ - with self._mutex: - if self._entity is None: - try: - self._entity = self._getter() - except AttributeError as e: - raise RuntimeError( - f"Error downloading the entity {self._name}, (check original exception...)" - ) from e - return self._entity - - def __getattr__(self, item: str) -> typing.Any: - """ - Forwards all other attributes to entity, causing the entity to be fetched! - """ - return getattr(self.entity, item) - - def compile(self, ctx: FlyteContext, *args, **kwargs): - return self.entity.compile(ctx, *args, **kwargs) - - def __call__(self, *args, **kwargs): - """ - Forwards the call to the underlying entity. The entity will be fetched if not already present - """ - return self.entity(*args, **kwargs) - - def __repr__(self) -> str: - return str(self) - - def __str__(self) -> str: - return f"Promise for entity [{self._name}]" diff --git a/flyrs/clients/remote_callable.py b/flyrs/clients/remote_callable.py deleted file mode 100644 index 5b177bf7c4..0000000000 --- a/flyrs/clients/remote_callable.py +++ /dev/null @@ -1,75 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Any, Dict, Optional, Tuple, Type, Union - -from flytekit.core.context_manager import BranchEvalMode, ExecutionState, FlyteContext -from flytekit.core.promise import Promise, VoidPromise, create_and_link_node_from_remote, extract_obj_name -from flytekit.exceptions import user as user_exceptions -from flytekit.loggers import logger -from flytekit.models.core.workflow import NodeMetadata - - -class RemoteEntity(ABC): - def __init__(self, *args, **kwargs): - # In cases where we make a FlyteTask/Workflow/LaunchPlan from a locally created Python object (i.e. an @task - # or an @workflow decorated function), we actually have the Python interface, so - self._python_interface: Optional[Dict[str, Type]] = None - - super().__init__(*args, **kwargs) - - @property - @abstractmethod - def name(self) -> str: - ... - - def construct_node_metadata(self) -> NodeMetadata: - """ - Used when constructing the node that encapsulates this task as part of a broader workflow definition. - """ - return NodeMetadata( - name=extract_obj_name(self.name), - ) - - def compile(self, ctx: FlyteContext, *args, **kwargs): - return create_and_link_node_from_remote(ctx, entity=self, **kwargs) # noqa - - def __call__(self, *args, **kwargs): - # When a Task is () aka __called__, there are three things we may do: - # a. Plain execution Mode - just run the execute function. If not overridden, we should raise an exception - # b. Compilation Mode - this happens when the function is called as part of a workflow (potentially - # dynamic task). Produce promise objects and create a node. - # c. Workflow Execution Mode - when a workflow is being run locally. Even though workflows are functions - # and everything should be able to be passed through naturally, we'll want to wrap output values of the - # function into objects, so that potential .with_cpu or other ancillary functions can be attached to do - # nothing. Subsequent tasks will have to know how to unwrap these. If by chance a non-Flyte task uses a - # task output as an input, things probably will fail pretty obviously. - # Since this is a reference entity, it still needs to be mocked otherwise an exception will be raised. - if len(args) > 0: - raise user_exceptions.FlyteAssertion( - f"Cannot call remotely fetched entity with args - detected {len(args)} positional args {args}" - ) - - ctx = FlyteContext.current_context() - if ctx.compilation_state is not None and ctx.compilation_state.mode == 1: - return self.compile(ctx, *args, **kwargs) - elif ( - ctx.execution_state is not None and ctx.execution_state.mode == ExecutionState.Mode.LOCAL_WORKFLOW_EXECUTION - ): - if ctx.execution_state.branch_eval_mode == BranchEvalMode.BRANCH_SKIPPED: - return - return self.local_execute(ctx, **kwargs) - else: - logger.debug("Fetched entity, running raw execute.") - return self.execute(**kwargs) - - def local_execute(self, ctx: FlyteContext, **kwargs) -> Optional[Union[Tuple[Promise], Promise, VoidPromise]]: - return self.execute(**kwargs) - - def local_execution_mode(self) -> ExecutionState.Mode: - return ExecutionState.Mode.LOCAL_TASK_EXECUTION - - def execute(self, **kwargs) -> Any: - raise AssertionError(f"Remotely fetched entities cannot be run locally. Please mock the {self.name}.execute.") - - @property - def python_interface(self) -> Optional[Dict[str, Type]]: - return self._python_interface diff --git a/flyrs/clients/remote_fs.py b/flyrs/clients/remote_fs.py deleted file mode 100644 index 10131f63fa..0000000000 --- a/flyrs/clients/remote_fs.py +++ /dev/null @@ -1,266 +0,0 @@ -from __future__ import annotations - -import base64 -import hashlib -import os -import pathlib -import random -import threading -import typing -from uuid import UUID - -import fsspec -import requests -from fsspec.callbacks import NoOpCallback -from fsspec.implementations.http import HTTPFileSystem -from fsspec.utils import get_protocol - -from flytekit.loggers import logger -from flytekit.tools.script_mode import hash_file - -if typing.TYPE_CHECKING: - from flytekit.remote.remote import FlyteRemote - -_DEFAULT_CALLBACK = NoOpCallback() -_PREFIX_KEY = "upload_prefix" -_HASHES_KEY = "hashes" -# This file system is not really a filesystem, so users aren't really able to specify the remote path, -# at least not yet. -REMOTE_PLACEHOLDER = "flyte://data" - -HashStructure = typing.Dict[str, typing.Tuple[bytes, int]] - - -class FlytePathResolver: - protocol = "flyte://" - _flyte_path_to_remote_map: typing.Dict[str, str] = {} - _lock = threading.Lock() - - @classmethod - def resolve_remote_path(cls, flyte_uri: str) -> typing.Optional[str]: - """ - Given a flyte uri, return the remote path if it exists or was created in current session, otherwise return None - """ - with cls._lock: - if flyte_uri in cls._flyte_path_to_remote_map: - return cls._flyte_path_to_remote_map[flyte_uri] - return None - - @classmethod - def add_mapping(cls, flyte_uri: str, remote_path: str): - """ - Thread safe method to dd a mapping from a flyte uri to a remote path - """ - with cls._lock: - cls._flyte_path_to_remote_map[flyte_uri] = remote_path - - -class HttpFileWriter(fsspec.spec.AbstractBufferedFile): - def __init__(self, remote: FlyteRemote, filename: str, **kwargs): - super().__init__(**kwargs) - self._remote = remote - self._filename = filename - - def _upload_chunk(self, final=False): - """Only uploads the file at once from the buffer. - Not suitable for large files as the buffer will blow the memory for very large files. - Suitable for default values or local dataframes being uploaded all at once. - """ - if final is False: - return False - self.buffer.seek(0) - data = self.buffer.read() - - try: - res = self._remote.client.get_upload_signed_url( - self._remote.default_project, - self._remote.default_domain, - None, - None, - filename_root=self._filename, - ) - FlytePathResolver.add_mapping(self.path, res.native_url) - resp = requests.put(res.signed_url, data=data) - if not resp.ok: - raise AssertionError(f"Failed to upload file {self._filename} to {res.signed_url} reason {resp.reason}") - except Exception as e: - raise AssertionError(f"Failed to upload file {self._filename} reason {e}") - - -def get_flyte_fs(remote: FlyteRemote) -> typing.Type[FlyteFS]: - class _FlyteFS(FlyteFS): - def __init__(self, **storage_options): - super().__init__(remote=remote, **storage_options) - - return _FlyteFS - - -class FlyteFS(HTTPFileSystem): - """ - Want this to behave mostly just like the HTTP file system. - """ - - sep = "/" - protocol = "flyte" - - def __init__( - self, - remote: FlyteRemote, - asynchronous: bool = False, - **storage_options, - ): - super().__init__(asynchronous=asynchronous, **storage_options) - self._remote = remote - - @property - def fsid(self) -> str: - return "flyte" - - async def _get_file(self, rpath, lpath, **kwargs): - """ - Don't do anything special. If it's a flyte url, the create a download link and write to lpath, - otherwise default to parent. - """ - raise NotImplementedError("FlyteFS currently doesn't support downloading files.") - - async def _put_file( - self, - lpath, - rpath, - chunk_size=5 * 2**20, - callback=_DEFAULT_CALLBACK, - method="put", - **kwargs, - ): - """ - fsspec will call this method to upload a file. If recursive, rpath will already be individual files. - Make the request and upload, but then how do we get the s3 paths back to the user? - """ - prefix = kwargs.pop(_PREFIX_KEY) - _, native_url = self._remote.upload_file( - pathlib.Path(lpath), self._remote.default_project, self._remote.default_domain, prefix - ) - return native_url - - @staticmethod - def extract_common(native_urls: typing.List[str]) -> str: - """ - This function that will take a list of strings and return the longest prefix that they all have in common. - That is, if you have - ['s3://my-s3-bucket/flytesnacks/development/ABCYZWMPACZAJ2MABGMOZ6CCPY======/source/empty.md', - 's3://my-s3-bucket/flytesnacks/development/ABCXKL5ZZWXY3PDLM3OONUHHME======/source/nested/more.txt', - 's3://my-s3-bucket/flytesnacks/development/ABCXBAPBKONMADXVW5Q3J6YBWM======/source/original.txt'] - this will return back 's3://my-s3-bucket/flytesnacks/development/' - Note that trailing characters after a separator that just happen to be the same will also be stripped. - """ - if len(native_urls) == 0: - return "" - if len(native_urls) == 1: - return native_urls[0] - - common_prefix = "" - shortest = min([len(x) for x in native_urls]) - x = [[native_urls[j][i] for j in range(len(native_urls))] for i in range(shortest)] - for i in x: - if len(set(i)) == 1: - common_prefix += i[0] - else: - break - - fs = fsspec.filesystem(get_protocol(native_urls[0])) - sep = fs.sep - # split the common prefix on the last separator so we don't get any trailing characters. - common_prefix = common_prefix.rsplit(sep, 1)[0] - logger.debug(f"Returning {common_prefix} from {native_urls}") - return common_prefix - - def get_hashes_and_lengths(self, p: pathlib.Path) -> HashStructure: - """ - Returns a flat list of absolute file paths to their hashes and content lengths - this output is used both for the file upload request, and to create consistently a filename root for - uploaded folders. We'll also use it for single files just for consistency. - If a directory then all the files in the directory will be hashed. - If a single file then just that file will be hashed. - Skip symlinks - """ - if p.is_symlink(): - return {} - if p.is_dir(): - hashes = {} - for f in p.iterdir(): - hashes.update(self.get_hashes_and_lengths(f)) - return hashes - else: - md5_bytes, _, content_length = hash_file(p.resolve()) - return {str(p.absolute()): (md5_bytes, content_length)} - - @staticmethod - def get_filename_root(file_info: HashStructure) -> str: - """ - Given a dictionary of file paths to hashes and content lengths, return a consistent filename root. - This is done by hashing the sorted list of file paths and then base32 encoding the result. - If the input is empty, then generate a random string - """ - if len(file_info) == 0: - return UUID(int=random.getrandbits(128)).hex - sorted_paths = sorted(file_info.keys()) - h = hashlib.md5() - for p in sorted_paths: - h.update(file_info[p][0]) - return base64.b32encode(h.digest()).decode("utf-8") - - async def _put( - self, - lpath, - rpath, - recursive=False, - callback=_DEFAULT_CALLBACK, - batch_size=None, - **kwargs, - ): - """ - cp file.txt flyte://data/... - rpath gets ignored, so it doesn't matter what it is. - """ - # Hash everything at the top level - file_info = self.get_hashes_and_lengths(pathlib.Path(lpath)) - prefix = self.get_filename_root(file_info) - - kwargs[_PREFIX_KEY] = prefix - kwargs[_HASHES_KEY] = file_info - res = await super()._put(lpath, REMOTE_PLACEHOLDER, recursive, callback, batch_size, **kwargs) - if isinstance(res, list): - res = self.extract_common(res) - FlytePathResolver.add_mapping(rpath.strip(os.path.sep), res) - return res - - async def _isdir(self, path): - return True - - def exists(self, path, **kwargs): - raise NotImplementedError("flyte file system currently can't check if a file exists.") - - def _open( - self, - path, - mode="wb", - block_size=None, - autocommit=None, # XXX: This differs from the base class. - cache_type=None, - cache_options=None, - size=None, - **kwargs, - ): - if mode != "wb": - raise ValueError("Only wb mode is supported") - - # Dataframes are written as multiple files, default is the first file with 00000 suffix, we should drop - # that suffix and use the parent directory as the remote path. - - return HttpFileWriter( - self._remote, os.path.basename(path), fs=self, path=os.path.dirname(path), mode=mode, **kwargs - ) - - def __str__(self): - p = super().__str__() - return f"FlyteFS({self._remote}): {p}" diff --git a/flyrs/remote/remote.py b/flyrs/remote/remote.py index 2b7c390d53..285f201f0a 100644 --- a/flyrs/remote/remote.py +++ b/flyrs/remote/remote.py @@ -73,14 +73,14 @@ ) from flytekit.models.launch_plan import LaunchPlanState from flytekit.models.literals import Literal, LiteralMap -from clients.backfill import create_backfill_workflow -from clients.data import download_literal -from clients.entities import FlyteLaunchPlan, FlyteNode, FlyteTask, FlyteTaskNode, FlyteWorkflow -from clients.executions import FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflowExecution -from clients.interface import TypedInterface -from clients.lazy_entity import LazyEntity -from clients.remote_callable import RemoteEntity -from clients.remote_fs import get_flyte_fs +from flytekit.remote.backfill import create_backfill_workflow +from flytekit.remote.data import download_literal +from flytekit.remote.entities import FlyteLaunchPlan, FlyteNode, FlyteTask, FlyteTaskNode, FlyteWorkflow +from flytekit.remote.executions import FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflowExecution +from flytekit.remote.interface import TypedInterface +from flytekit.remote.lazy_entity import LazyEntity +from flytekit.remote.remote_callable import RemoteEntity +from flytekit.remote.remote_fs import get_flyte_fs from flytekit.tools.fast_registration import fast_package from flytekit.tools.interactive import ipython_check from flytekit.tools.script_mode import _find_project_root, compress_scripts, hash_file