From b240aa2a3b6b22a37797f53f77b457bac0e98baf Mon Sep 17 00:00:00 2001 From: luiz Date: Sun, 16 Jul 2023 12:30:38 +0200 Subject: [PATCH 01/21] new column for PiecesRepository table --- rest/database/models/enums.py | 8 ++++++++ rest/database/models/piece_repository.py | 3 ++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/rest/database/models/enums.py b/rest/database/models/enums.py index a985afad..2e4ec847 100644 --- a/rest/database/models/enums.py +++ b/rest/database/models/enums.py @@ -9,6 +9,14 @@ class Config: use_enum_values = True +class PieceExecutionMode(str, enum.Enum): + docker = 'docker' + worker = 'worker' + + class Config: + use_enum_values = True + + class Permission(str, enum.Enum): owner = 'owner' read = 'read' diff --git a/rest/database/models/piece_repository.py b/rest/database/models/piece_repository.py index cd13db58..9261f841 100644 --- a/rest/database/models/piece_repository.py +++ b/rest/database/models/piece_repository.py @@ -1,6 +1,6 @@ from database.models.base import Base, BaseDatabaseModel from sqlalchemy import Column, String, Integer, DateTime, Enum, JSON, ForeignKey -from database.models.enums import RepositorySource +from database.models.enums import RepositorySource, PieceExecutionMode from sqlalchemy.orm import relationship from datetime import datetime @@ -15,6 +15,7 @@ class PieceRepository(Base, BaseDatabaseModel): source = Column(Enum(RepositorySource), nullable=True, default=RepositorySource.github.value) path = Column(String(250), nullable=True) version = Column(String(10), nullable=True) + piece_execution_mode = Column(Enum(PieceExecutionMode), nullable=False, default=PieceExecutionMode.docker.value) dependencies_map = Column(JSON, nullable=True) compiled_metadata = Column(JSON, nullable=True) workspace_id = Column(Integer, ForeignKey("workspace.id", ondelete='cascade'), nullable=False) From a4bc624e48d565281bf9738567e6213867addfc4 Mon Sep 17 00:00:00 2001 From: luiz Date: Sun, 16 Jul 2023 12:39:11 +0200 Subject: [PATCH 02/21] remove airflow client from domino-py --- domino/client/airflow_client.py | 94 --------------------------------- 1 file changed, 94 deletions(-) delete mode 100644 domino/client/airflow_client.py diff --git a/domino/client/airflow_client.py b/domino/client/airflow_client.py deleted file mode 100644 index 4335754b..00000000 --- a/domino/client/airflow_client.py +++ /dev/null @@ -1,94 +0,0 @@ -import uuid -from datetime import datetime -import requests -from urllib.parse import urljoin -import os - - -class AirflowRestClient(requests.Session): - def __init__(self, *args, **kwargs): - """ - _summary_ - """ - super(AirflowRestClient, self).__init__(*args, **kwargs) - - self.base_url = os.getenv("AIRFLOW_WEBSERVER_HOST") - - # TODO using airflow admin only for dev - self.auth = ("airflow", "airflow") - - def request(self, method, resource, **kwargs): - """ - _summary_ - - Args: - method (_type_): _description_ - resource (_type_): _description_ - - Raises: - e: _description_ - - Returns: - _type_: _description_ - """ - try: - url = urljoin(self.base_url, resource) - return super(AirflowRestClient, self).request(method, url, **kwargs) - except Exception as e: - # self.logger.exception(e) - raise e - - def run_dag(self, dag_id): - resource = f"api/v1/dags/{dag_id}/dagRuns" - dag_run_uuid = str(uuid.uuid4()) - payload = { - "dag_run_id": f"rest-client-{dag_run_uuid}", - "logical_date": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') - } - response = self.request( - method="post", - resource=resource, - json=payload - ) - return response - - def delete_dag(self, dag_id): - resource = f"api/v1/dags/{dag_id}" - response = self.request( - method="delete", - resource=resource, - ) - return response - - def update_dag(self, dag_id, payload): - resource = f"api/v1/dags/{dag_id}" - response = self.request( - method='patch', - resource=resource, - json=payload - ) - return response - - def get_all_dag_tasks(self, dag_id): - resource = f"api/v1/dags/{dag_id}/tasks" - response = self.request( - method='get', - resource=resource, - ) - return response - - def get_all_dag_runs(self, dag_id): - resource = f"api/v1/dags/{dag_id}/dagRuns" - response = self.request( - method='get', - resource=resource, - ) - return response - - def get_xcom_by_task_id(self, dag_id, dag_run_id, task_id): - resource = f"api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries" - response = self.request( - method='get', - resource=resource, - ) - return response \ No newline at end of file From 8fadecb1311dafb7ecbfbb7a61e8750bf9b6aac4 Mon Sep 17 00:00:00 2001 From: luiz Date: Sun, 16 Jul 2023 13:10:13 +0200 Subject: [PATCH 03/21] create DominoWorkerOperator --- domino/custom_operators/worker_operator.py | 73 ++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 domino/custom_operators/worker_operator.py diff --git a/domino/custom_operators/worker_operator.py b/domino/custom_operators/worker_operator.py new file mode 100644 index 00000000..da63e2d3 --- /dev/null +++ b/domino/custom_operators/worker_operator.py @@ -0,0 +1,73 @@ +from typing import Dict, Optional +from airflow.models import BaseOperator + +from domino.client.domino_backend_client import DominoBackendRestClient + + +# This is WIP, not working yet. + +class DominoWorkerOperator(BaseOperator): + """ + This Operator runs Pieces directly in a Worker. + """ + + def __init__( + self, + dag_id: str, + task_id: str, + piece_name: str, + repository_name: str, + workflow_id: int, + piece_input_kwargs: Optional[Dict] = None, + ): + self.dag_id = dag_id + self.task_id = task_id + self.piece_name = piece_name + self.repository_name = repository_name + self.workflow_id = workflow_id + self.piece_input_kwargs = piece_input_kwargs + self.backend_client = DominoBackendRestClient(base_url="http://domino-rest:8000/") + + self._get_piece_class() + self._get_piece_secrets() + self._get_airflow_conn_id() + + def _get_piece_class(self): + # TODO + pass + + def _get_airflow_conn_id(self): + """ + Form correct conn_id string with conn_type + repository_id. + Check if conn_id already exists in Airflow and, if not, create it. + """ + # TODO + conn_type = "aws" # TODO: get from piece + self.conn_id = f"{conn_type}_{self.repository_id}" + try: + response = self.backend_client.check_create_airflow_connection( + conn_id=self.conn_id, + conn_type=conn_type, + ) + except Exception as e: + raise e + + def _get_piece_secrets(self, piece_repository_id: int, piece_name: str): + # Get piece secrets values from api and append to env vars + # TODO + secrets_response = self.backend_client.get_piece_secrets( + piece_repository_id=piece_repository_id, + piece_name=piece_name + ) + if secrets_response.status_code != 200: + raise Exception(f"Error getting piece secrets: {secrets_response.json()}") + piece_secrets = { + e.get('name'): e.get('value') + for e in secrets_response.json() + } + return piece_secrets + + def execute(self, context): + """Execute the Piece.""" + # TODO + pass \ No newline at end of file From ae5b48267acfef8d24b1c9183f569146dd10b29f Mon Sep 17 00:00:00 2001 From: luiz Date: Sun, 16 Jul 2023 13:10:28 +0200 Subject: [PATCH 04/21] adapt Task --- domino/base_piece.py | 5 ++++- domino/task.py | 27 +++++++++++++++++++-------- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/domino/base_piece.py b/domino/base_piece.py index ae763631..9a0e8429 100644 --- a/domino/base_piece.py +++ b/domino/base_piece.py @@ -288,7 +288,10 @@ def run_piece_function( self.dag_run_id = airflow_context.get("dag_run_id") # Check if Piece's necessary secrets are present in ENV - self.validate_and_get_env_secrets(piece_secrets_model=piece_secrets_model, secrets_values=secrets_values) + self.validate_and_get_env_secrets( + piece_secrets_model=piece_secrets_model, + secrets_values=secrets_values + ) # Generate paths workflow_run_subpath = os.environ.get('DOMINO_WORKFLOW_RUN_SUBPATH', '') diff --git a/domino/task.py b/domino/task.py index d1a04d2c..f358b55d 100644 --- a/domino/task.py +++ b/domino/task.py @@ -1,4 +1,5 @@ from airflow import DAG +from airflow.models import BaseOperator from kubernetes.client import models as k8s from datetime import datetime from typing import Callable @@ -6,6 +7,7 @@ from domino.custom_operators.docker_operator import DominoDockerOperator from domino.custom_operators.python_operator import PythonOperator from domino.custom_operators.k8s_operator import DominoKubernetesPodOperator +from domino.custom_operators.worker_operator import DominoWorkerOperator from domino.schemas.shared_storage import shared_storage_map from domino.utils import dict_deep_update from domino.logger import get_configured_logger @@ -39,6 +41,7 @@ def __init__( self.dag_id = self.dag.dag_id self.repository_id = piece["repository_id"] self.piece = piece + self.piece_input_kwargs = piece_input_kwargs if not workflow_shared_storage: workflow_shared_storage = {} # Shared storage @@ -51,18 +54,26 @@ def __init__( self.deploy_mode = os.environ.get('DOMINO_DEPLOY_MODE') - if self.deploy_mode in ['local-k8s', 'local-k8s-dev', 'prod']: config.load_incluster_config() self.k8s_client = client.CoreV1Api() - # Set up piece logic - self._task_piece = self._set_piece(piece_input_kwargs) + # Set up task operator + self._task_operator = self._set_operator() - def _set_piece(self, piece_input_kwargs) -> None: + def _set_operator(self) -> BaseOperator: """ - Set airflow piece based on task configuration + Set Airflow Operator according to deploy mode and Piece execution mode. """ + if self.piece["execution_mode"] == "worker": + return DominoWorkerOperator( + dag_id=self.dag_id, + task_id=self.task_id, + piece_name=self.piece.get('name'), + repository_name=self.piece.get('repository_name'), + workflow_id=self.piece.get('workflow_id'), + piece_input_kwargs=self.piece_input_kwargs, + ) if self.deploy_mode == "local-python": return PythonOperator( @@ -70,7 +81,7 @@ def _set_piece(self, piece_input_kwargs) -> None: task_id=self.task_id, start_date=datetime(2021, 1, 1), # TODO - get correct start_date provide_context=True, - op_kwargs=piece_input_kwargs, + op_kwargs=self.piece_input_kwargs, # queue=dependencies_group, make_python_callable_kwargs=dict( piece_name=self.piece_name, @@ -94,7 +105,7 @@ def _set_piece(self, piece_input_kwargs) -> None: "task_id": self.task_id, "dag_id": self.dag_id, }), - "DOMINO_K8S_RUN_PIECE_KWARGS": str(piece_input_kwargs), + "DOMINO_K8S_RUN_PIECE_KWARGS": str(self.piece_input_kwargs), "DOMINO_WORKFLOW_SHARED_STORAGE": self.workflow_shared_storage.json() if self.workflow_shared_storage else "", "AIRFLOW_CONTEXT_EXECUTION_DATETIME": "{{ dag_run.logical_date | ts_nodash }}", "AIRFLOW_CONTEXT_DAG_RUN_ID": "{{ run_id }}", @@ -216,7 +227,7 @@ def _set_piece(self, piece_input_kwargs) -> None: task_id=self.task_id, piece_name=self.piece.get('name'), repository_id=self.repository_id, - piece_kwargs=piece_input_kwargs, + piece_kwargs=self.piece_input_kwargs, dag_id=self.dag_id, deploy_mode=self.deploy_mode, image=self.piece["source_image"], From fcadf36704fcc9e4159fc6861c905688b1ec9869 Mon Sep 17 00:00:00 2001 From: luiz Date: Sun, 16 Jul 2023 13:11:09 +0200 Subject: [PATCH 05/21] check_create_airflow_connection on DominoClient --- domino/client/domino_backend_client.py | 28 ++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/domino/client/domino_backend_client.py b/domino/client/domino_backend_client.py index 7a7ff80f..24fb171e 100644 --- a/domino/client/domino_backend_client.py +++ b/domino/client/domino_backend_client.py @@ -54,3 +54,31 @@ def get_piece_repositories(self, workspace_id: int, filters: dict): params=filters ) return response + + def check_create_airflow_connection(self, conn_id: str, conn_type: str): + """ + This should check if a specific Airflow connection exists and create it if it doesn't. + ref: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/post_connection + """ + # TODO - this is WIP + resource = f"/airflow/connections/{conn_id}" + response = self.request( + method='get', + resource=resource, + ) + if response.status_code == 404: + response = self.request( + method='post', + resource=resource, + json={ + "conn_id": conn_id, + "conn_type": conn_type, + "login": "", # TODO: add login and password + "password": "", + "extra": { + # Specify extra parameters here, e.g. "region_name": "eu-central-1" + }, + } + ) + if response.status_code != 200: + raise Exception(f"Failed to create Airflow connection {conn_id}. \n {response.json()}") From ef8823c772d8710d30e9507a13ef0f4df1d9d6b6 Mon Sep 17 00:00:00 2001 From: luiz Date: Sun, 16 Jul 2023 15:39:42 +0200 Subject: [PATCH 06/21] start DRY changes - incomplete --- domino/custom_operators/base_operator.py | 48 ++++++++++++++++++++++ domino/custom_operators/docker_operator.py | 31 ++------------ domino/custom_operators/k8s_operator.py | 29 ++----------- domino/custom_operators/worker_operator.py | 18 +------- 4 files changed, 58 insertions(+), 68 deletions(-) create mode 100644 domino/custom_operators/base_operator.py diff --git a/domino/custom_operators/base_operator.py b/domino/custom_operators/base_operator.py new file mode 100644 index 00000000..c624575b --- /dev/null +++ b/domino/custom_operators/base_operator.py @@ -0,0 +1,48 @@ +from typing import Dict, Optional +from airflow.utils.context import Context + +from domino.client.domino_backend_client import DominoBackendRestClient + + +class BaseDominoOperator: + """ + This Operator runs Pieces directly in a Worker. + """ + + def __init__( + self, + dag_id: str, + task_id: str, + piece_name: str, + repository_id: int, + workflow_id: int, + piece_input_kwargs: Optional[Dict] = None, + ): + self.dag_id = dag_id + self.task_id = task_id + self.piece_name = piece_name + self.repository_id = repository_id + self.workflow_id = workflow_id + self.piece_input_kwargs = piece_input_kwargs + self.backend_client = DominoBackendRestClient(base_url="http://domino-rest:8000/") + + def _get_piece_secrets(self, piece_repository_id: int, piece_name: str): + """Get piece secrets values from Domino API""" + secrets_response = self.backend_client.get_piece_secrets( + piece_repository_id=piece_repository_id, + piece_name=piece_name + ) + if secrets_response.status_code != 200: + raise Exception(f"Error getting piece secrets: {secrets_response.json()}") + piece_secrets = { + e.get('name'): e.get('value') + for e in secrets_response.json() + } + return piece_secrets + + @staticmethod + def _get_upstream_xcom_data_from_task_ids(task_ids: list, context: Context): + upstream_xcoms_data = dict() + for tid in task_ids: + upstream_xcoms_data[tid] = context['ti'].xcom_pull(task_ids=tid) + return upstream_xcoms_data \ No newline at end of file diff --git a/domino/custom_operators/docker_operator.py b/domino/custom_operators/docker_operator.py index 0bd0b4a3..55aa685f 100644 --- a/domino/custom_operators/docker_operator.py +++ b/domino/custom_operators/docker_operator.py @@ -2,11 +2,13 @@ from airflow.utils.context import Context from typing import Dict, Optional import os + +from domino.custom_operators.base_operator import BaseDominoOperator from domino.client.domino_backend_client import DominoBackendRestClient from domino.schemas.shared_storage import WorkflowSharedStorage, StorageSource -class DominoDockerOperator(DockerOperator): +class DominoDockerOperator(BaseDominoOperator, DockerOperator): def __init__( self, @@ -48,7 +50,7 @@ def __init__( Mount(source=shared_storage_host_path, target=shared_storage_container_path, type='bind', read_only=False), ) - super().__init__( + super(DockerOperator).__init__( **kwargs, task_id=self.task_id, docker_url=docker_url, @@ -72,13 +74,6 @@ def _set_base_env_vars(self): "AIRFLOW_CONTEXT_DAG_RUN_ID": "{{ run_id }}", } - @staticmethod - def _get_upstream_xcom_data_from_task_ids(task_ids: list, context: 'Context'): - upstream_xcoms_data = dict() - for tid in task_ids: - upstream_xcoms_data[tid] = context['ti'].xcom_pull(task_ids=tid) - return upstream_xcoms_data - def _update_piece_kwargs_with_upstream_xcom(self, upstream_xcoms_data: dict): #domino_docker_run_piece_kwargs = self.environment.get('DOMINO_DOCKER_RUN_PIECE_KWARGS') if not self.piece_input_kwargs: @@ -99,7 +94,6 @@ def _update_piece_kwargs_with_upstream_xcom(self, upstream_xcoms_data: dict): self.environment['AIRFLOW_UPSTREAM_TASKS_IDS_SHARED_STORAGE'] = str(self.shared_storage_upstream_ids_list) self.environment['DOMINO_DOCKER_RUN_PIECE_KWARGS'] = str(self.piece_input_kwargs) - def _prepare_execute_environment(self, context: Context): """ Prepare execution with the following configurations: @@ -123,24 +117,7 @@ def _prepare_execute_environment(self, context: Context): self.workflow_run_subpath = f"{dag_id}/{dag_run_id_path}" self.environment['DOMINO_WORKFLOW_RUN_SUBPATH'] = self.workflow_run_subpath - def execute(self, context: Context) -> Optional[str]: # env var format = {"name": "value"} self._prepare_execute_environment(context=context) return super().execute(context=context) - - - def _get_piece_secrets(self, piece_repository_id: int, piece_name: str): - # Get piece secrets values from api and append to env vars - secrets_response = self.backend_client.get_piece_secrets( - piece_repository_id=piece_repository_id, - piece_name=piece_name - ) - if secrets_response.status_code != 200: - raise Exception(f"Error getting piece secrets: {secrets_response.json()}") - piece_secrets = { - e.get('name'): e.get('value') - for e in secrets_response.json() - } - return piece_secrets - \ No newline at end of file diff --git a/domino/custom_operators/k8s_operator.py b/domino/custom_operators/k8s_operator.py index 0f80eb6d..ee5677b3 100644 --- a/domino/custom_operators/k8s_operator.py +++ b/domino/custom_operators/k8s_operator.py @@ -6,12 +6,14 @@ import copy from contextlib import closing from kubernetes.stream import stream as kubernetes_stream + +from domino.custom_operators.base_operator import BaseDominoOperator from domino.client.domino_backend_client import DominoBackendRestClient from domino.schemas.shared_storage import WorkflowSharedStorage # Ref: https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py -class DominoKubernetesPodOperator(KubernetesPodOperator): +class DominoKubernetesPodOperator(BaseDominoOperator, KubernetesPodOperator): def __init__( self, piece_name: str, @@ -20,7 +22,7 @@ def __init__( *args, **kwargs ): - super().__init__(*args, **kwargs) + super(KubernetesPodOperator).__init__(*args, **kwargs) # This is saved in the self.piece_name airflow @property self.running_piece_name = piece_name self.repository_id = repository_id @@ -33,7 +35,6 @@ def __init__( # TODO change url based on DOMINO_DEPLOY_MODE self.backend_client = DominoBackendRestClient(base_url="http://domino-rest-service:8000/") - def build_pod_request_obj(self, context: Optional['Context'] = None) -> k8s.V1Pod: """ We override this method to add the shared storage to the pod. @@ -86,7 +87,6 @@ def add_local_shared_storage_volumes(self, pod: k8s.V1Pod) -> k8s.V1Pod: ) return pod_cp - def add_shared_storage_sidecar(self, pod: k8s.V1Pod) -> k8s.V1Pod: """ Adds FUSE mounts shared storage sidecar container. @@ -189,20 +189,6 @@ def add_shared_storage_sidecar(self, pod: k8s.V1Pod) -> k8s.V1Pod: return pod_cp - def _get_piece_secrets(self, piece_repository_id: int, piece_name: str): - # Get piece secrets values from api and append to env vars - secrets_response = self.backend_client.get_piece_secrets( - piece_repository_id=piece_repository_id, - piece_name=piece_name - ) - if secrets_response.status_code != 200: - raise Exception(f"Error getting piece secrets: {secrets_response.json()}") - piece_secrets = { - e.get('name'): e.get('value') - for e in secrets_response.json() - } - return piece_secrets - def _get_piece_kwargs_with_upstream_xcom(self, upstream_xcoms_data: dict): domino_k8s_run_op_kwargs = [var for var in self.env_vars if getattr(var, 'name', None) == 'DOMINO_K8S_RUN_PIECE_KWARGS'] if not domino_k8s_run_op_kwargs: @@ -243,13 +229,6 @@ def _get_piece_kwargs_with_upstream_xcom(self, upstream_xcoms_data: dict): return updated_op_kwargs - @staticmethod - def _get_upstream_xcom_data_from_task_ids(task_ids: list, context: 'Context'): - upstream_xcoms_data = dict() - for tid in task_ids: - upstream_xcoms_data[tid] = context['ti'].xcom_pull(task_ids=tid) - return upstream_xcoms_data - def _update_env_var_value_from_name(self, name: str, value: str): for env_var in self.env_vars: if env_var.name == name: diff --git a/domino/custom_operators/worker_operator.py b/domino/custom_operators/worker_operator.py index da63e2d3..206f53f3 100644 --- a/domino/custom_operators/worker_operator.py +++ b/domino/custom_operators/worker_operator.py @@ -1,12 +1,13 @@ from typing import Dict, Optional from airflow.models import BaseOperator +from domino.custom_operators.base_operator import BaseDominoOperator from domino.client.domino_backend_client import DominoBackendRestClient # This is WIP, not working yet. -class DominoWorkerOperator(BaseOperator): +class DominoWorkerOperator(BaseDominoOperator, BaseOperator): """ This Operator runs Pieces directly in a Worker. """ @@ -51,21 +52,6 @@ def _get_airflow_conn_id(self): ) except Exception as e: raise e - - def _get_piece_secrets(self, piece_repository_id: int, piece_name: str): - # Get piece secrets values from api and append to env vars - # TODO - secrets_response = self.backend_client.get_piece_secrets( - piece_repository_id=piece_repository_id, - piece_name=piece_name - ) - if secrets_response.status_code != 200: - raise Exception(f"Error getting piece secrets: {secrets_response.json()}") - piece_secrets = { - e.get('name'): e.get('value') - for e in secrets_response.json() - } - return piece_secrets def execute(self, context): """Execute the Piece.""" From b2c41e1dcb3fb40e298de3c75743bc00d48bdc17 Mon Sep 17 00:00:00 2001 From: luiz Date: Sun, 16 Jul 2023 15:40:22 +0200 Subject: [PATCH 07/21] piece name --- domino/custom_operators/k8s_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/domino/custom_operators/k8s_operator.py b/domino/custom_operators/k8s_operator.py index ee5677b3..6a91fe5e 100644 --- a/domino/custom_operators/k8s_operator.py +++ b/domino/custom_operators/k8s_operator.py @@ -304,7 +304,7 @@ def _prepare_execute_environment(self, context: Context): self._update_env_var_value_from_name(name='DOMINO_K8S_RUN_PIECE_KWARGS', value=str(domino_k8s_run_op_kwargs)) # Add pieces secrets to environment variables - piece_secrets = self._get_piece_secrets(piece_repository_id=self.repository_id, piece_name=self.running_piece_name) + piece_secrets = self._get_piece_secrets(piece_repository_id=self.repository_id, piece_name=self.piece_name) self.env_vars.append({ "name": "DOMINO_K8S_PIECE_SECRETS", "value": str(piece_secrets), From b2da45c50ff78c7d410f8b94636c90664747ef6f Mon Sep 17 00:00:00 2001 From: luiz Date: Sun, 16 Jul 2023 18:14:51 +0200 Subject: [PATCH 08/21] docker operator --- domino/custom_operators/docker_operator.py | 49 +++++++++++----------- domino/custom_operators/k8s_operator.py | 11 ++--- domino/task.py | 6 +-- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/domino/custom_operators/docker_operator.py b/domino/custom_operators/docker_operator.py index 55aa685f..db4c66b4 100644 --- a/domino/custom_operators/docker_operator.py +++ b/domino/custom_operators/docker_operator.py @@ -12,34 +12,35 @@ class DominoDockerOperator(BaseDominoOperator, DockerOperator): def __init__( self, + dag_id: str, + task_id: str, piece_name: str, deploy_mode: str, # TODO enum - task_id: str, - dag_id: str, repository_id: int, - workflow_shared_storage: WorkflowSharedStorage = None, piece_kwargs: Optional[Dict] = None, - **kwargs + workflow_shared_storage: WorkflowSharedStorage = None, + **docker_operator_kwargs ) -> None: - self.running_piece_name = piece_name - self.repository_id = repository_id - self.workflow_shared_storage = workflow_shared_storage - self.deploy_mode = deploy_mode - self.running_dag_id = dag_id - self.task_id = task_id - self.task_id_replaced = self.task_id.replace("_", "-").lower() # doing this because airflow doesn't allow underscores and upper case in mount names - self.piece_input_kwargs = piece_kwargs + super(BaseDominoOperator).__init__( + dag_id=dag_id, + task_id=task_id, + piece_name=piece_name, + deploy_mode=deploy_mode, + repository_id=repository_id, + piece_input_kwargs=piece_kwargs, + domino_client_url="http://domino-rest:8000/", + ) + # Shared Storage variables + self.workflow_shared_storage = workflow_shared_storage self.shared_storage_base_mount_path = '/home/shared_storage' self.shared_storage_upstream_ids_list = list() - self.backend_client = DominoBackendRestClient(base_url="http://domino-rest:8000/") self._set_base_env_vars() - docker_url = 'tcp://docker-proxy:2375' - shared_storage_host_path = os.environ.get('LOCAL_DOMINO_SHARED_DATA_PATH', '') shared_storage_container_path = '/home/shared_storage' mounts = [] - # # TODO remove + + # TODO remove mounts=[ # TODO remove # Mount(source='/media/luiz/storage2/Github/domino/domino', target='/home/domino/domino_py/domino', type='bind', read_only=True), @@ -51,22 +52,20 @@ def __init__( ) super(DockerOperator).__init__( - **kwargs, - task_id=self.task_id, - docker_url=docker_url, - mounts=[ - *mounts, - ], + **docker_operator_kwargs, + task_id=task_id, + docker_url='tcp://docker-proxy:2375', + mounts=mounts, environment=self.environment, ) def _set_base_env_vars(self): self.environment = { - "DOMINO_DOCKER_PIECE": self.running_piece_name, + "DOMINO_DOCKER_PIECE": self.piece_name, "DOMINO_DOCKER_INSTANTIATE_PIECE_KWARGS": str({ "deploy_mode": self.deploy_mode, "task_id": self.task_id, - "dag_id": self.running_dag_id, + "dag_id": self.dag_id, }), "DOMINO_DOCKER_RUN_PIECE_KWARGS": str(self.piece_input_kwargs), "DOMINO_WORKFLOW_SHARED_STORAGE": self.workflow_shared_storage.json() if self.workflow_shared_storage else "", @@ -109,7 +108,7 @@ def _prepare_execute_environment(self, context: Context): # Save updated piece input kwargs with upstream data to environment variable upstream_xcoms_data = self._get_upstream_xcom_data_from_task_ids(task_ids=upstream_task_ids, context=context) self._update_piece_kwargs_with_upstream_xcom(upstream_xcoms_data=upstream_xcoms_data) - piece_secrets = self._get_piece_secrets(piece_repository_id=self.repository_id, piece_name=self.running_piece_name) + piece_secrets = self._get_piece_secrets(piece_repository_id=self.repository_id, piece_name=self.piece_name) self.environment['DOMINO_DOCKER_PIECE_SECRETS'] = str(piece_secrets) dag_id = context["dag_run"].dag_id dag_run_id = context['run_id'] diff --git a/domino/custom_operators/k8s_operator.py b/domino/custom_operators/k8s_operator.py index 6a91fe5e..5dde96a3 100644 --- a/domino/custom_operators/k8s_operator.py +++ b/domino/custom_operators/k8s_operator.py @@ -19,17 +19,18 @@ def __init__( piece_name: str, repository_id: int, workflow_shared_storage: WorkflowSharedStorage, - *args, - **kwargs + **k8s_operator_kwargs ): - super(KubernetesPodOperator).__init__(*args, **kwargs) + super(KubernetesPodOperator).__init__(**k8s_operator_kwargs) + # This is saved in the self.piece_name airflow @property self.running_piece_name = piece_name self.repository_id = repository_id - self.workflow_shared_storage = workflow_shared_storage + self.task_id_replaced = self.task_id.replace("_", "-").lower() # doing this because airflow doesn't allow underscores and upper case in mount names - self.task_env_vars = kwargs.get('env_vars', []) + self.task_env_vars = k8s_operator_kwargs.get('env_vars', []) # Shared Storage variables + self.workflow_shared_storage = workflow_shared_storage self.shared_storage_base_mount_path = '/home/shared_storage' self.shared_storage_upstream_ids_list = list() # TODO change url based on DOMINO_DEPLOY_MODE diff --git a/domino/task.py b/domino/task.py index f358b55d..339d67df 100644 --- a/domino/task.py +++ b/domino/task.py @@ -224,14 +224,14 @@ def _set_operator(self) -> BaseOperator: elif self.deploy_mode == 'local-compose': return DominoDockerOperator( + dag_id=self.dag_id, task_id=self.task_id, piece_name=self.piece.get('name'), + deploy_mode=self.deploy_mode, repository_id=self.repository_id, piece_kwargs=self.piece_input_kwargs, - dag_id=self.dag_id, - deploy_mode=self.deploy_mode, - image=self.piece["source_image"], workflow_shared_storage=self.workflow_shared_storage, + image=self.piece["source_image"], do_xcom_push=True, mount_tmp_dir=False, tty=True, From e50b75b660de471cf49b05b399fa68a1c62c43dc Mon Sep 17 00:00:00 2001 From: luiz Date: Sun, 16 Jul 2023 18:40:40 +0200 Subject: [PATCH 09/21] m --- domino/custom_operators/base_operator.py | 17 +++++++------ domino/custom_operators/docker_operator.py | 4 +-- domino/custom_operators/k8s_operator.py | 29 +++++++++++++++------- domino/task.py | 2 +- 4 files changed, 33 insertions(+), 19 deletions(-) diff --git a/domino/custom_operators/base_operator.py b/domino/custom_operators/base_operator.py index c624575b..557d6216 100644 --- a/domino/custom_operators/base_operator.py +++ b/domino/custom_operators/base_operator.py @@ -6,29 +6,32 @@ class BaseDominoOperator: """ - This Operator runs Pieces directly in a Worker. + This class implements common operations for all Domino Operators running under a Task. """ def __init__( self, dag_id: str, task_id: str, - piece_name: str, - repository_id: int, - workflow_id: int, + piece_name: str, + deploy_mode: str, + repository_id: int, piece_input_kwargs: Optional[Dict] = None, + domino_client_url: Optional[str] = None, ): self.dag_id = dag_id self.task_id = task_id self.piece_name = piece_name + self.deploy_mode = deploy_mode self.repository_id = repository_id - self.workflow_id = workflow_id self.piece_input_kwargs = piece_input_kwargs - self.backend_client = DominoBackendRestClient(base_url="http://domino-rest:8000/") + if domino_client_url is None: + domino_client_url = "http://domino-rest:8000/" + self.domino_client = DominoBackendRestClient(base_url=domino_client_url) def _get_piece_secrets(self, piece_repository_id: int, piece_name: str): """Get piece secrets values from Domino API""" - secrets_response = self.backend_client.get_piece_secrets( + secrets_response = self.domino_client.get_piece_secrets( piece_repository_id=piece_repository_id, piece_name=piece_name ) diff --git a/domino/custom_operators/docker_operator.py b/domino/custom_operators/docker_operator.py index db4c66b4..8904cdd1 100644 --- a/domino/custom_operators/docker_operator.py +++ b/domino/custom_operators/docker_operator.py @@ -17,7 +17,7 @@ def __init__( piece_name: str, deploy_mode: str, # TODO enum repository_id: int, - piece_kwargs: Optional[Dict] = None, + piece_input_kwargs: Optional[Dict] = None, workflow_shared_storage: WorkflowSharedStorage = None, **docker_operator_kwargs ) -> None: @@ -27,7 +27,7 @@ def __init__( piece_name=piece_name, deploy_mode=deploy_mode, repository_id=repository_id, - piece_input_kwargs=piece_kwargs, + piece_input_kwargs=piece_input_kwargs, domino_client_url="http://domino-rest:8000/", ) diff --git a/domino/custom_operators/k8s_operator.py b/domino/custom_operators/k8s_operator.py index 5dde96a3..796ec208 100644 --- a/domino/custom_operators/k8s_operator.py +++ b/domino/custom_operators/k8s_operator.py @@ -2,7 +2,7 @@ from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from airflow.utils.context import Context from kubernetes.client import models as k8s -from typing import Optional +from typing import Dict, Optional import copy from contextlib import closing from kubernetes.stream import stream as kubernetes_stream @@ -16,25 +16,36 @@ class DominoKubernetesPodOperator(BaseDominoOperator, KubernetesPodOperator): def __init__( self, + dag_id: str, + task_id: str, piece_name: str, + deploy_mode: str, # TODO enum repository_id: int, - workflow_shared_storage: WorkflowSharedStorage, + piece_kwargs: Optional[Dict] = None, + workflow_shared_storage: WorkflowSharedStorage = None, **k8s_operator_kwargs ): - super(KubernetesPodOperator).__init__(**k8s_operator_kwargs) - - # This is saved in the self.piece_name airflow @property - self.running_piece_name = piece_name - self.repository_id = repository_id + super(BaseDominoOperator).__init__( + dag_id=dag_id, + task_id=task_id, + piece_name=piece_name, + deploy_mode=deploy_mode, + repository_id=repository_id, + piece_input_kwargs=piece_kwargs, + domino_client_url="http://domino-rest-service:8000/", # TODO change url based on platform configuration + ) + super(KubernetesPodOperator).__init__( + task_id=task_id, + **k8s_operator_kwargs + ) self.task_id_replaced = self.task_id.replace("_", "-").lower() # doing this because airflow doesn't allow underscores and upper case in mount names self.task_env_vars = k8s_operator_kwargs.get('env_vars', []) + # Shared Storage variables self.workflow_shared_storage = workflow_shared_storage self.shared_storage_base_mount_path = '/home/shared_storage' self.shared_storage_upstream_ids_list = list() - # TODO change url based on DOMINO_DEPLOY_MODE - self.backend_client = DominoBackendRestClient(base_url="http://domino-rest-service:8000/") def build_pod_request_obj(self, context: Optional['Context'] = None) -> k8s.V1Pod: """ diff --git a/domino/task.py b/domino/task.py index 339d67df..e0015cba 100644 --- a/domino/task.py +++ b/domino/task.py @@ -201,13 +201,13 @@ def _set_operator(self) -> BaseOperator: pod_startup_timeout_in_seconds = 600 return DominoKubernetesPodOperator( + task_id=self.task_id, piece_name=self.piece.get('name'), repository_id=self.repository_id, workflow_shared_storage=self.workflow_shared_storage, namespace='default', # TODO - separate namespace by User or Workspace? image=self.piece["source_image"], image_pull_policy='IfNotPresent', - task_id=self.task_id, name=f"airflow-worker-pod-{self.task_id}", startup_timeout_seconds=pod_startup_timeout_in_seconds, #cmds=["/bin/bash"], From 3b568f3e597057e969b5a1e5a84cf52f504cd2a2 Mon Sep 17 00:00:00 2001 From: luiz Date: Sun, 16 Jul 2023 18:44:47 +0200 Subject: [PATCH 10/21] adapt worker operator --- domino/custom_operators/worker_operator.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/domino/custom_operators/worker_operator.py b/domino/custom_operators/worker_operator.py index 206f53f3..8a254e8f 100644 --- a/domino/custom_operators/worker_operator.py +++ b/domino/custom_operators/worker_operator.py @@ -17,18 +17,19 @@ def __init__( dag_id: str, task_id: str, piece_name: str, - repository_name: str, - workflow_id: int, + deploy_mode: str, # TODO enum + repository_id: int, piece_input_kwargs: Optional[Dict] = None, ): - self.dag_id = dag_id - self.task_id = task_id - self.piece_name = piece_name - self.repository_name = repository_name - self.workflow_id = workflow_id - self.piece_input_kwargs = piece_input_kwargs - self.backend_client = DominoBackendRestClient(base_url="http://domino-rest:8000/") - + super(BaseDominoOperator).__init__( + dag_id=dag_id, + task_id=task_id, + piece_name=piece_name, + deploy_mode=deploy_mode, + repository_id=repository_id, + piece_input_kwargs=piece_input_kwargs, + domino_client_url="http://domino-rest:8000/", + ) self._get_piece_class() self._get_piece_secrets() self._get_airflow_conn_id() From 59e32ce953d5894ef0d7c93164ddd727b673371a Mon Sep 17 00:00:00 2001 From: luiz Date: Tue, 18 Jul 2023 19:37:42 +0200 Subject: [PATCH 11/21] some more organizing --- domino/custom_operators/base_operator.py | 3 + domino/custom_operators/docker_operator.py | 3 +- domino/custom_operators/k8s_operator.py | 117 ++++++++++++--------- domino/task.py | 34 +++--- 4 files changed, 89 insertions(+), 68 deletions(-) diff --git a/domino/custom_operators/base_operator.py b/domino/custom_operators/base_operator.py index 557d6216..9d437021 100644 --- a/domino/custom_operators/base_operator.py +++ b/domino/custom_operators/base_operator.py @@ -2,6 +2,7 @@ from airflow.utils.context import Context from domino.client.domino_backend_client import DominoBackendRestClient +from domino.schemas.shared_storage import WorkflowSharedStorage class BaseDominoOperator: @@ -17,6 +18,7 @@ def __init__( deploy_mode: str, repository_id: int, piece_input_kwargs: Optional[Dict] = None, + workflow_shared_storage: WorkflowSharedStorage = None, domino_client_url: Optional[str] = None, ): self.dag_id = dag_id @@ -25,6 +27,7 @@ def __init__( self.deploy_mode = deploy_mode self.repository_id = repository_id self.piece_input_kwargs = piece_input_kwargs + self.workflow_shared_storage = workflow_shared_storage if domino_client_url is None: domino_client_url = "http://domino-rest:8000/" self.domino_client = DominoBackendRestClient(base_url=domino_client_url) diff --git a/domino/custom_operators/docker_operator.py b/domino/custom_operators/docker_operator.py index 269bba2d..e796cbda 100644 --- a/domino/custom_operators/docker_operator.py +++ b/domino/custom_operators/docker_operator.py @@ -4,7 +4,6 @@ import os from domino.custom_operators.base_operator import BaseDominoOperator -from domino.client.domino_backend_client import DominoBackendRestClient from domino.schemas.shared_storage import WorkflowSharedStorage, StorageSource @@ -28,11 +27,11 @@ def __init__( deploy_mode=deploy_mode, repository_id=repository_id, piece_input_kwargs=piece_input_kwargs, + workflow_shared_storage=workflow_shared_storage, domino_client_url="http://domino-rest:8000/", ) # Shared Storage variables - self.workflow_shared_storage = workflow_shared_storage self.shared_storage_base_mount_path = '/home/shared_storage' self.shared_storage_upstream_ids_list = list() self._set_base_env_vars() diff --git a/domino/custom_operators/k8s_operator.py b/domino/custom_operators/k8s_operator.py index e0f77024..1285c43d 100644 --- a/domino/custom_operators/k8s_operator.py +++ b/domino/custom_operators/k8s_operator.py @@ -8,7 +8,6 @@ from kubernetes.stream import stream as kubernetes_stream from domino.custom_operators.base_operator import BaseDominoOperator -from domino.client.domino_backend_client import DominoBackendRestClient from domino.schemas.shared_storage import WorkflowSharedStorage @@ -32,28 +31,44 @@ def __init__( deploy_mode=deploy_mode, repository_id=repository_id, piece_input_kwargs=piece_kwargs, + workflow_shared_storage=workflow_shared_storage, domino_client_url="http://domino-rest-service:8000/", # TODO change url based on platform configuration ) + + self.pod_env_vars = { + "DOMINO_PIECE": self.piece_name, + "DOMINO_INSTANTIATE_PIECE_KWARGS": str({ + "deploy_mode": self.deploy_mode, + "task_id": self.task_id, + "dag_id": self.dag_id, + }), + "DOMINO_RUN_PIECE_KWARGS": str(self.piece_input_kwargs), + "DOMINO_WORKFLOW_SHARED_STORAGE": self.workflow_shared_storage.json() if self.workflow_shared_storage else "", + "AIRFLOW_CONTEXT_EXECUTION_DATETIME": "{{ dag_run.logical_date | ts_nodash }}", + "AIRFLOW_CONTEXT_DAG_RUN_ID": "{{ run_id }}", + } + super(KubernetesPodOperator).__init__( task_id=task_id, + env_vars=self.pod_env_vars, **k8s_operator_kwargs ) - self.task_id_replaced = self.task_id.replace("_", "-").lower() # doing this because airflow doesn't allow underscores and upper case in mount names - self.task_env_vars = k8s_operator_kwargs.get('env_vars', []) - # Shared Storage variables - self.workflow_shared_storage = workflow_shared_storage self.shared_storage_base_mount_path = '/home/shared_storage' self.shared_storage_upstream_ids_list = list() - + + def build_pod_request_obj(self, context: Optional['Context'] = None) -> k8s.V1Pod: """ + Runs at the begining of the execute method. We override this method to add the shared storage to the pod. This function runs after our own self.execute, by super().execute() """ pod = super().build_pod_request_obj(context) + self.task_id_replaced = self.task_id.replace("_", "-").lower() # doing this because airflow doesn't allow underscores and upper case in mount names + if not self.workflow_shared_storage or self.workflow_shared_storage.mode.name == 'none': return pod if self.workflow_shared_storage.source.name in ["aws_s3", "gcs"]: @@ -63,6 +78,7 @@ def build_pod_request_obj(self, context: Optional['Context'] = None) -> k8s.V1Po return pod + def add_local_shared_storage_volumes(self, pod: k8s.V1Pod) -> k8s.V1Pod: """ Adds local shared storage volumes to the pod. @@ -99,6 +115,7 @@ def add_local_shared_storage_volumes(self, pod: k8s.V1Pod) -> k8s.V1Pod: ) return pod_cp + def add_shared_storage_sidecar(self, pod: k8s.V1Pod) -> k8s.V1Pod: """ Adds FUSE mounts shared storage sidecar container. @@ -173,7 +190,7 @@ def add_shared_storage_sidecar(self, pod: k8s.V1Pod) -> k8s.V1Pod: env_vars = { 'DOMINO_WORKFLOW_SHARED_STORAGE': self.workflow_shared_storage.json() if self.workflow_shared_storage else "", 'DOMINO_WORKFLOW_SHARED_STORAGE_SECRETS': str(storage_piece_secrets), - 'DOMINO_INSTANTIATE_PIECE_KWARGS': str(self.task_env_vars.get('DOMINO_INSTANTIATE_PIECE_KWARGS')), + 'DOMINO_INSTANTIATE_PIECE_KWARGS': str(self.pod_env_vars.get('DOMINO_INSTANTIATE_PIECE_KWARGS')), 'DOMINO_WORKFLOW_RUN_SUBPATH': self.workflow_run_subpath, 'AIRFLOW_UPSTREAM_TASKS_IDS_SHARED_STORAGE': str(self.shared_storage_upstream_ids_list), } @@ -201,6 +218,7 @@ def add_shared_storage_sidecar(self, pod: k8s.V1Pod) -> k8s.V1Pod: return pod_cp + def _get_piece_kwargs_with_upstream_xcom(self, upstream_xcoms_data: dict): domino_k8s_run_op_kwargs = [var for var in self.env_vars if getattr(var, 'name', None) == 'DOMINO_RUN_PIECE_KWARGS'] if not domino_k8s_run_op_kwargs: @@ -241,55 +259,17 @@ def _get_piece_kwargs_with_upstream_xcom(self, upstream_xcoms_data: dict): return updated_op_kwargs + def _update_env_var_value_from_name(self, name: str, value: str): for env_var in self.env_vars: if env_var.name == name: env_var.value = value break - def execute(self, context: Context): - self._prepare_execute_environment(context=context) - remote_pod = None - try: - self.pod_request_obj = self.build_pod_request_obj(context) - self.pod = self.get_or_create_pod( # must set `self.pod` for `on_kill` - pod_request_obj=self.pod_request_obj, - context=context, - ) - # get remote pod for use in cleanup methods - remote_pod = self.find_pod(self.pod.metadata.namespace, context=context) - self.await_pod_start(pod=self.pod) - - if self.get_logs: - self.pod_manager.fetch_container_logs( - pod=self.pod, - container_name=self.BASE_CONTAINER_NAME, - follow=True, - ) - else: - self.pod_manager.await_container_completion( - pod=self.pod, container_name=self.BASE_CONTAINER_NAME - ) - - if self.do_xcom_push: - result = self.extract_xcom(pod=self.pod) - - if self.workflow_shared_storage and self.workflow_shared_storage.mode.name != 'none': - self._kill_shared_storage_sidecar(pod=self.pod) - remote_pod = self.pod_manager.await_pod_completion(self.pod) - finally: - self.cleanup( - pod=self.pod or self.pod_request_obj, - remote_pod=remote_pod, - ) - ti = context['ti'] - ti.xcom_push(key='pod_name', value=self.pod.metadata.name) - ti.xcom_push(key='pod_namespace', value=self.pod.metadata.namespace) - if self.do_xcom_push: - return result def _prepare_execute_environment(self, context: Context): """ + Runs at the begining of the execute method. Prepare execution with the following configurations: - pass extra arguments and configuration as environment variables to the pod - add shared storage sidecar container to the pod - if shared storage is FUSE based @@ -334,6 +314,49 @@ def _prepare_execute_environment(self, context: Context): 'value_from': None }) + + def execute(self, context: Context): + self._prepare_execute_environment(context=context) + remote_pod = None + try: + self.pod_request_obj = self.build_pod_request_obj(context) + self.pod = self.get_or_create_pod( # must set `self.pod` for `on_kill` + pod_request_obj=self.pod_request_obj, + context=context, + ) + # get remote pod for use in cleanup methods + remote_pod = self.find_pod(self.pod.metadata.namespace, context=context) + self.await_pod_start(pod=self.pod) + + if self.get_logs: + self.pod_manager.fetch_container_logs( + pod=self.pod, + container_name=self.BASE_CONTAINER_NAME, + follow=True, + ) + else: + self.pod_manager.await_container_completion( + pod=self.pod, container_name=self.BASE_CONTAINER_NAME + ) + + if self.do_xcom_push: + result = self.extract_xcom(pod=self.pod) + + if self.workflow_shared_storage and self.workflow_shared_storage.mode.name != 'none': + self._kill_shared_storage_sidecar(pod=self.pod) + remote_pod = self.pod_manager.await_pod_completion(self.pod) + finally: + self.cleanup( + pod=self.pod or self.pod_request_obj, + remote_pod=remote_pod, + ) + ti = context['ti'] + ti.xcom_push(key='pod_name', value=self.pod.metadata.name) + ti.xcom_push(key='pod_namespace', value=self.pod.metadata.namespace) + if self.do_xcom_push: + return result + + def _kill_shared_storage_sidecar(self, pod: k8s.V1Pod): """ This method is used to send a signal to stop and delete the sidecar container with the shared storage mounts. diff --git a/domino/task.py b/domino/task.py index 301d6c22..4cddc44d 100644 --- a/domino/task.py +++ b/domino/task.py @@ -42,25 +42,25 @@ def __init__( self.repository_id = piece["repository_id"] self.piece = piece self.piece_input_kwargs = piece_input_kwargs + + # Shared storage if not workflow_shared_storage: workflow_shared_storage = {} - # Shared storage workflow_shared_storage_source = StorageSource(workflow_shared_storage.pop("source", "None")).name provider_options = workflow_shared_storage.pop("provider_options", {}) self.workflow_shared_storage = shared_storage_map[workflow_shared_storage_source](**workflow_shared_storage, **provider_options) if shared_storage_map[workflow_shared_storage_source] else shared_storage_map[workflow_shared_storage_source] + # Container resources self.container_resources = container_resources if container_resources else {} self.provide_gpu = self.container_resources.pop("use_gpu", False) - self.deploy_mode = os.environ.get('DOMINO_DEPLOY_MODE') - - if self.deploy_mode in ['local-k8s', 'local-k8s-dev', 'prod']: - config.load_incluster_config() - self.k8s_client = client.CoreV1Api() + # Get deploy mode + self.deploy_mode = os.environ.get('DOMINO_DEPLOY_MODE') # Set up task operator self._task_operator = self._set_operator() + def _set_operator(self) -> BaseOperator: """ Set Airflow Operator according to deploy mode and Piece execution mode. @@ -98,18 +98,9 @@ def _set_operator(self) -> BaseOperator: # - good example: https://github.com/apache/airflow/blob/main/tests/system/providers/cncf/kubernetes/example_kubernetes.py # - commands HAVE to go in a list object: https://stackoverflow.com/a/55149915/11483674 elif self.deploy_mode in ["local-k8s", "local-k8s-dev", "prod"]: - container_env_vars = { - "DOMINO_PIECE": self.piece["name"], - "DOMINO_INSTANTIATE_PIECE_KWARGS": str({ - "deploy_mode": self.deploy_mode, - "task_id": self.task_id, - "dag_id": self.dag_id, - }), - "DOMINO_RUN_PIECE_KWARGS": str(self.piece_input_kwargs), - "DOMINO_WORKFLOW_SHARED_STORAGE": self.workflow_shared_storage.json() if self.workflow_shared_storage else "", - "AIRFLOW_CONTEXT_EXECUTION_DATETIME": "{{ dag_run.logical_date | ts_nodash }}", - "AIRFLOW_CONTEXT_DAG_RUN_ID": "{{ run_id }}", - } + config.load_incluster_config() + self.k8s_client = client.CoreV1Api() + base_container_resources_model = ContainerResourcesModel( requests={ "cpu": "100m", @@ -201,10 +192,14 @@ def _set_operator(self) -> BaseOperator: pod_startup_timeout_in_seconds = 600 return DominoKubernetesPodOperator( + dag_id=self.dag_id, task_id=self.task_id, piece_name=self.piece.get('name'), + deploy_mode=self.deploy_mode, repository_id=self.repository_id, + piece_kwargs=self.piece_input_kwargs, workflow_shared_storage=self.workflow_shared_storage, + # ----------------- Kubernetes ----------------- namespace='default', # TODO - separate namespace by User or Workspace? image=self.piece["source_image"], image_pull_policy='IfNotPresent', @@ -214,7 +209,7 @@ def _set_operator(self) -> BaseOperator: #arguments=["-c", "sleep 120;"], cmds=["domino"], arguments=["run-piece-k8s"], - env_vars=container_env_vars, + # env_vars=container_env_vars, do_xcom_push=True, in_cluster=True, volumes=all_volumes, @@ -231,6 +226,7 @@ def _set_operator(self) -> BaseOperator: repository_id=self.repository_id, piece_kwargs=self.piece_input_kwargs, workflow_shared_storage=self.workflow_shared_storage, + # ----------------- Docker ----------------- image=self.piece["source_image"], do_xcom_push=True, mount_tmp_dir=False, From a60b4e59110708790a2998f6821ce54b6c7cf65d Mon Sep 17 00:00:00 2001 From: luiz Date: Tue, 18 Jul 2023 20:41:09 +0200 Subject: [PATCH 12/21] more re-writing --- domino/base_piece.py | 4 +-- domino/custom_operators/docker_operator.py | 2 +- domino/custom_operators/k8s_operator.py | 36 ++++++---------------- domino/task.py | 26 ++++++++-------- 4 files changed, 25 insertions(+), 43 deletions(-) diff --git a/domino/base_piece.py b/domino/base_piece.py index c01c1ef1..5b514f77 100644 --- a/domino/base_piece.py +++ b/domino/base_piece.py @@ -287,8 +287,8 @@ def run_piece_function( self.results_path = f"{self.workflow_shared_storage}/{self.task_id}/results" self.xcom_path = f"{self.workflow_shared_storage}/{self.task_id}/xcom" self.report_path = f"{self.workflow_shared_storage}/{self.task_id}/report" - shared_storage_source = os.environ.get('DOMINO_WORKFLOW_SHARED_STORAGE', None) - if not shared_storage_source or shared_storage_source == "none" or self.deploy_mode == "local-compose": + shared_storage_source_name = os.environ.get('DOMINO_WORKFLOW_SHARED_STORAGE_SOURCE_NAME', None) + if not shared_storage_source_name or shared_storage_source_name == "none" or self.deploy_mode == "local-compose": self.generate_paths() else: self._wait_for_sidecar_paths() diff --git a/domino/custom_operators/docker_operator.py b/domino/custom_operators/docker_operator.py index e796cbda..4a7c3e21 100644 --- a/domino/custom_operators/docker_operator.py +++ b/domino/custom_operators/docker_operator.py @@ -102,7 +102,7 @@ def _prepare_execute_environment(self, context: Context): # Fetch upstream tasks ids and save them in an ENV var upstream_task_ids = [t.task_id for t in self.get_direct_relatives(upstream=True)] self.environment['AIRFLOW_UPSTREAM_TASKS_IDS'] = str(upstream_task_ids) - self.environment['DOMINO_WORKFLOW_SHARED_STORAGE'] = str(self.workflow_shared_storage.source.name) if self.workflow_shared_storage else None + self.environment['DOMINO_WORKFLOW_SHARED_STORAGE_SOURCE_NAME'] = str(self.workflow_shared_storage.source.name) if self.workflow_shared_storage else None # Save updated piece input kwargs with upstream data to environment variable upstream_xcoms_data = self._get_upstream_xcom_data_from_task_ids(task_ids=upstream_task_ids, context=context) diff --git a/domino/custom_operators/k8s_operator.py b/domino/custom_operators/k8s_operator.py index 1285c43d..f5de63c8 100644 --- a/domino/custom_operators/k8s_operator.py +++ b/domino/custom_operators/k8s_operator.py @@ -61,28 +61,23 @@ def __init__( def build_pod_request_obj(self, context: Optional['Context'] = None) -> k8s.V1Pod: """ - Runs at the begining of the execute method. We override this method to add the shared storage to the pod. This function runs after our own self.execute, by super().execute() """ pod = super().build_pod_request_obj(context) - + # Add shared storage to pod self.task_id_replaced = self.task_id.replace("_", "-").lower() # doing this because airflow doesn't allow underscores and upper case in mount names - if not self.workflow_shared_storage or self.workflow_shared_storage.mode.name == 'none': return pod if self.workflow_shared_storage.source.name in ["aws_s3", "gcs"]: pod = self.add_shared_storage_sidecar(pod) elif self.workflow_shared_storage.source.name == "local": pod = self.add_local_shared_storage_volumes(pod) - return pod def add_local_shared_storage_volumes(self, pod: k8s.V1Pod) -> k8s.V1Pod: - """ - Adds local shared storage volumes to the pod. - """ + """Adds local shared storage volumes to the pod.""" pod_cp = copy.deepcopy(pod) pod_cp.spec.volumes = pod.spec.volumes or [] pod_cp.spec.volumes.append( @@ -91,7 +86,6 @@ def add_local_shared_storage_volumes(self, pod: k8s.V1Pod) -> k8s.V1Pod: persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="domino-workflow-shared-storage-volume-claim") ) ) - # Add volume mounts for upstream tasks, using subpaths pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or [] for tid in self.shared_storage_upstream_ids_list: @@ -103,7 +97,6 @@ def add_local_shared_storage_volumes(self, pod: k8s.V1Pod) -> k8s.V1Pod: read_only=True, ) ) - # Add volume mount for this task pod_cp.spec.containers[0].volume_mounts.append( k8s.V1VolumeMount( @@ -118,7 +111,9 @@ def add_local_shared_storage_volumes(self, pod: k8s.V1Pod) -> k8s.V1Pod: def add_shared_storage_sidecar(self, pod: k8s.V1Pod) -> k8s.V1Pod: """ - Adds FUSE mounts shared storage sidecar container. + - add shared storage sidecar container to the pod + - add shared storage volume mounts to the sidecar container + - add FUSE mounts configuration as env vars to the sidecar container """ # Set up pod Volumes and containers VolumemMounts from Upstream tasks volume_mounts_main_container = list() @@ -147,7 +142,6 @@ def add_shared_storage_sidecar(self, pod: k8s.V1Pod) -> k8s.V1Pod: empty_dir=k8s.V1EmptyDirVolumeSource() ) ) - # Set up pod Volumes and containers VolumemMounts for this Operator results volume_mounts_main_container.append( k8s.V1VolumeMount( @@ -170,14 +164,12 @@ def add_shared_storage_sidecar(self, pod: k8s.V1Pod) -> k8s.V1Pod: empty_dir=k8s.V1EmptyDirVolumeSource() ) ) - pod_cp = copy.deepcopy(pod) pod_cp.spec.volumes = pod.spec.volumes or [] pod_cp.spec.volumes.extend(pod_volumes_list) pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or [] pod_cp.spec.containers[0].volume_mounts.extend(volume_mounts_main_container) - # Create and add sidecar container to pod storage_piece_secrets = {} if self.workflow_shared_storage.source != "local": @@ -185,16 +177,14 @@ def add_shared_storage_sidecar(self, pod: k8s.V1Pod) -> k8s.V1Pod: piece_repository_id=self.workflow_shared_storage.storage_repository_id, piece_name=self.workflow_shared_storage.default_piece_name, ) - self.workflow_shared_storage.source = self.workflow_shared_storage.source.name - env_vars = { + sidecar_env_vars = { 'DOMINO_WORKFLOW_SHARED_STORAGE': self.workflow_shared_storage.json() if self.workflow_shared_storage else "", 'DOMINO_WORKFLOW_SHARED_STORAGE_SECRETS': str(storage_piece_secrets), 'DOMINO_INSTANTIATE_PIECE_KWARGS': str(self.pod_env_vars.get('DOMINO_INSTANTIATE_PIECE_KWARGS')), 'DOMINO_WORKFLOW_RUN_SUBPATH': self.workflow_run_subpath, 'AIRFLOW_UPSTREAM_TASKS_IDS_SHARED_STORAGE': str(self.shared_storage_upstream_ids_list), } - self.shared_storage_sidecar_container_name = f"domino-shared-storage-sidecar-{self.task_id_replaced}" sidecar_container = k8s.V1Container( name=self.shared_storage_sidecar_container_name, @@ -202,7 +192,7 @@ def add_shared_storage_sidecar(self, pod: k8s.V1Pod) -> k8s.V1Pod: image='ghcr.io/tauffer-consulting/domino-shared-storage-sidecar:latest', volume_mounts=volume_mounts_sidecar_container, security_context=k8s.V1SecurityContext(privileged=True), - env=[k8s.V1EnvVar(name=k, value=v) for k, v in env_vars.items()], + env=[k8s.V1EnvVar(name=k, value=v) for k, v in sidecar_env_vars.items()], resources=k8s.V1ResourceRequirements( requests={ "cpu": "1m", @@ -215,7 +205,6 @@ def add_shared_storage_sidecar(self, pod: k8s.V1Pod) -> k8s.V1Pod: ), ) pod_cp.spec.containers.append(sidecar_container) - return pod_cp @@ -270,12 +259,8 @@ def _update_env_var_value_from_name(self, name: str, value: str): def _prepare_execute_environment(self, context: Context): """ Runs at the begining of the execute method. - Prepare execution with the following configurations: - - pass extra arguments and configuration as environment variables to the pod - - add shared storage sidecar container to the pod - if shared storage is FUSE based - - add shared storage volume mounts to the pod - if shared storage is NFS based or local + Pass extra arguments and configuration as environment variables to the pod """ - # Fetch upstream tasks ids and save them in an ENV var upstream_task_ids = [t.task_id for t in self.get_direct_relatives(upstream=True)] self.env_vars.append({ @@ -283,13 +268,12 @@ def _prepare_execute_environment(self, context: Context): 'value': str(upstream_task_ids), 'value_from': None }) - + # Pass forward the workflow shared storage source name self.env_vars.append({ - 'name': 'DOMINO_WORKFLOW_SHARED_STORAGE', + 'name': 'DOMINO_WORKFLOW_SHARED_STORAGE_SOURCE_NAME', 'value': str(self.workflow_shared_storage.source.name) if self.workflow_shared_storage else None, 'value_from': None }) - # Save updated piece input kwargs with upstream data to environment variable upstream_xcoms_data = self._get_upstream_xcom_data_from_task_ids(task_ids=upstream_task_ids, context=context) domino_k8s_run_op_kwargs = self._get_piece_kwargs_with_upstream_xcom(upstream_xcoms_data=upstream_xcoms_data) diff --git a/domino/task.py b/domino/task.py index 4cddc44d..3699347e 100644 --- a/domino/task.py +++ b/domino/task.py @@ -46,9 +46,15 @@ def __init__( # Shared storage if not workflow_shared_storage: workflow_shared_storage = {} - workflow_shared_storage_source = StorageSource(workflow_shared_storage.pop("source", "None")).name + shared_storage_source_name = StorageSource(workflow_shared_storage.pop("source", "None")).name provider_options = workflow_shared_storage.pop("provider_options", {}) - self.workflow_shared_storage = shared_storage_map[workflow_shared_storage_source](**workflow_shared_storage, **provider_options) if shared_storage_map[workflow_shared_storage_source] else shared_storage_map[workflow_shared_storage_source] + if shared_storage_map[shared_storage_source_name]: + self.workflow_shared_storage = shared_storage_map[shared_storage_source_name]( + **workflow_shared_storage, + **provider_options + ) + else: + self.workflow_shared_storage = shared_storage_map[shared_storage_source_name] # Container resources self.container_resources = container_resources if container_resources else {} @@ -100,24 +106,16 @@ def _set_operator(self) -> BaseOperator: elif self.deploy_mode in ["local-k8s", "local-k8s-dev", "prod"]: config.load_incluster_config() self.k8s_client = client.CoreV1Api() - - base_container_resources_model = ContainerResourcesModel( - requests={ - "cpu": "100m", - "memory": "128Mi", - }, - limits={ - "cpu": "100m", - "memory": "128Mi", - } - ) # Container resources + base_container_resources_model = ContainerResourcesModel( + requests={"cpu": "100m", "memory": "128Mi",}, + limits={"cpu": "100m", "memory": "128Mi"} + ) basic_container_resources = base_container_resources_model.dict() basic_container_resources = dict_deep_update(basic_container_resources, self.container_resources) if self.provide_gpu: basic_container_resources["limits"]["nvidia.com/gpu"] = "1" - #self.container_resources = ContainerResourcesModel(**self.container_resources) container_resources_obj = k8s.V1ResourceRequirements(**basic_container_resources) # Volumes From a7459bb53642f43cb5471c7e27de7a06f46dc79e Mon Sep 17 00:00:00 2001 From: luiz Date: Tue, 18 Jul 2023 21:01:06 +0200 Subject: [PATCH 13/21] more --- domino/custom_operators/k8s_operator.py | 19 +++++++++++-------- domino/task.py | 1 - 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/domino/custom_operators/k8s_operator.py b/domino/custom_operators/k8s_operator.py index f5de63c8..f6d8875f 100644 --- a/domino/custom_operators/k8s_operator.py +++ b/domino/custom_operators/k8s_operator.py @@ -35,7 +35,7 @@ def __init__( domino_client_url="http://domino-rest-service:8000/", # TODO change url based on platform configuration ) - self.pod_env_vars = { + pod_env_vars = { "DOMINO_PIECE": self.piece_name, "DOMINO_INSTANTIATE_PIECE_KWARGS": str({ "deploy_mode": self.deploy_mode, @@ -50,7 +50,7 @@ def __init__( super(KubernetesPodOperator).__init__( task_id=task_id, - env_vars=self.pod_env_vars, + env_vars=pod_env_vars, **k8s_operator_kwargs ) @@ -181,7 +181,11 @@ def add_shared_storage_sidecar(self, pod: k8s.V1Pod) -> k8s.V1Pod: sidecar_env_vars = { 'DOMINO_WORKFLOW_SHARED_STORAGE': self.workflow_shared_storage.json() if self.workflow_shared_storage else "", 'DOMINO_WORKFLOW_SHARED_STORAGE_SECRETS': str(storage_piece_secrets), - 'DOMINO_INSTANTIATE_PIECE_KWARGS': str(self.pod_env_vars.get('DOMINO_INSTANTIATE_PIECE_KWARGS')), + 'DOMINO_INSTANTIATE_PIECE_KWARGS': str({ + "deploy_mode": self.deploy_mode, + "task_id": self.task_id, + "dag_id": self.dag_id, + }), 'DOMINO_WORKFLOW_RUN_SUBPATH': self.workflow_run_subpath, 'AIRFLOW_UPSTREAM_TASKS_IDS_SHARED_STORAGE': str(self.shared_storage_upstream_ids_list), } @@ -209,6 +213,10 @@ def add_shared_storage_sidecar(self, pod: k8s.V1Pod) -> k8s.V1Pod: def _get_piece_kwargs_with_upstream_xcom(self, upstream_xcoms_data: dict): + """ + Update Operator kwargs with upstream tasks XCOM data + Also updates the list of upstream tasks for which we need to mount the results path + """ domino_k8s_run_op_kwargs = [var for var in self.env_vars if getattr(var, 'name', None) == 'DOMINO_RUN_PIECE_KWARGS'] if not domino_k8s_run_op_kwargs: domino_k8s_run_op_kwargs = { @@ -221,9 +229,6 @@ def _get_piece_kwargs_with_upstream_xcom(self, upstream_xcoms_data: dict): "name": "DOMINO_RUN_PIECE_KWARGS", "value": ast.literal_eval(domino_k8s_run_op_kwargs.value) } - - # Update Operator kwargs with upstream tasks XCOM data - # Also updates the list of upstream tasks for which we need to mount the results path updated_op_kwargs = dict() for k, v in domino_k8s_run_op_kwargs.get('value').items(): if isinstance(v, dict) and v.get("type", None) == "fromUpstream": @@ -239,13 +244,11 @@ def _get_piece_kwargs_with_upstream_xcom(self, upstream_xcoms_data: dict): self.shared_storage_upstream_ids_list.append(upstream_task_id) else: updated_op_kwargs[k] = v - self.env_vars.append({ 'name': 'AIRFLOW_UPSTREAM_TASKS_IDS_SHARED_STORAGE', 'value': str(self.shared_storage_upstream_ids_list), 'value_from': None }) - return updated_op_kwargs diff --git a/domino/task.py b/domino/task.py index 3699347e..f4b567b9 100644 --- a/domino/task.py +++ b/domino/task.py @@ -207,7 +207,6 @@ def _set_operator(self) -> BaseOperator: #arguments=["-c", "sleep 120;"], cmds=["domino"], arguments=["run-piece-k8s"], - # env_vars=container_env_vars, do_xcom_push=True, in_cluster=True, volumes=all_volumes, From 234faed38bdbaa8394733ea12f6d5cd2e1497420 Mon Sep 17 00:00:00 2001 From: luiz Date: Tue, 18 Jul 2023 21:33:53 +0200 Subject: [PATCH 14/21] change dev volumes code --- domino/custom_operators/k8s_operator.py | 91 +++++++++++++++++++++++-- domino/task.py | 91 +++---------------------- 2 files changed, 95 insertions(+), 87 deletions(-) diff --git a/domino/custom_operators/k8s_operator.py b/domino/custom_operators/k8s_operator.py index f6d8875f..069be723 100644 --- a/domino/custom_operators/k8s_operator.py +++ b/domino/custom_operators/k8s_operator.py @@ -1,17 +1,17 @@ -import ast from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from airflow.utils.context import Context from kubernetes.client import models as k8s +from kubernetes import client, config +from kubernetes.stream import stream as kubernetes_stream from typing import Dict, Optional -import copy from contextlib import closing -from kubernetes.stream import stream as kubernetes_stream +import ast +import copy from domino.custom_operators.base_operator import BaseDominoOperator from domino.schemas.shared_storage import WorkflowSharedStorage -# Ref: https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py class DominoKubernetesPodOperator(BaseDominoOperator, KubernetesPodOperator): def __init__( self, @@ -48,9 +48,16 @@ def __init__( "AIRFLOW_CONTEXT_DAG_RUN_ID": "{{ run_id }}", } + # For DEV mode only + volumes_dev, volume_mounts_dev = None, None + if self.deploy_mode == 'local-k8s-dev': + volumes_dev, volume_mounts_dev = self._make_volumes_and_volume_mounts_dev() + super(KubernetesPodOperator).__init__( task_id=task_id, env_vars=pod_env_vars, + volumes=volumes_dev, + volume_mounts=volume_mounts_dev, **k8s_operator_kwargs ) @@ -59,6 +66,82 @@ def __init__( self.shared_storage_upstream_ids_list = list() + def _make_volumes_and_volume_mounts_dev(self): + """ + Make volumes and volume mounts for the pod when in DEVELOPMENT mode. + """ + config.load_incluster_config() + k8s_client = client.CoreV1Api() + + all_volumes = [] + all_volume_mounts = [] + + source_image = self.piece.get('source_image') + repository_raw_project_name = str(source_image).split('/')[2].split(':')[0] + persistent_volume_claim_name = 'pvc-{}'.format(str(repository_raw_project_name.lower().replace('_', '-'))) + + persistent_volume_name = 'pv-{}'.format(str(repository_raw_project_name.lower().replace('_', '-'))) + persistent_volume_claim_name = 'pvc-{}'.format(str(repository_raw_project_name.lower().replace('_', '-'))) + + pvc_exists = False + try: + k8s_client.read_namespaced_persistent_volume_claim(name=persistent_volume_claim_name, namespace='default') + pvc_exists = True + except client.rest.ApiException as e: + if e.status != 404: + raise e + + pv_exists = False + try: + k8s_client.read_persistent_volume(name=persistent_volume_name) + pv_exists = True + except client.rest.ApiException as e: + if e.status != 404: + raise e + + if pv_exists and pvc_exists: + volume_dev_pieces = k8s.V1Volume( + name='dev-op-{path_name}'.format(path_name=str(repository_raw_project_name.lower().replace('_', '-'))), + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource( + claim_name=persistent_volume_claim_name + ), + ) + volume_mount_dev_pieces = k8s.V1VolumeMount( + name='dev-op-{path_name}'.format(path_name=str(repository_raw_project_name.lower().replace('_', '-'))), + mount_path=f'/home/domino/pieces_repository', + sub_path=None, + read_only=True + ) + all_volumes.append(volume_dev_pieces) + all_volume_mounts.append(volume_mount_dev_pieces) + + ######################## For local domino-py dev ############################################### + domino_package_local_claim_name = 'domino-dev-volume-claim' + pvc_exists = False + try: + k8s_client.read_namespaced_persistent_volume_claim(name=domino_package_local_claim_name, namespace='default') + pvc_exists = True + except client.rest.ApiException as e: + if e.status != 404: + raise e + + if pvc_exists: + volume_dev = k8s.V1Volume( + name='jobs-persistent-storage-dev', + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name=domino_package_local_claim_name), + ) + volume_mount_dev = k8s.V1VolumeMount( + name='jobs-persistent-storage-dev', + mount_path='/home/domino/domino_py', + sub_path=None, + read_only=True + ) + all_volumes.append(volume_dev) + all_volume_mounts.append(volume_mount_dev) + + return all_volumes, all_volume_mounts + + def build_pod_request_obj(self, context: Optional['Context'] = None) -> k8s.V1Pod: """ We override this method to add the shared storage to the pod. diff --git a/domino/task.py b/domino/task.py index f4b567b9..bf23a674 100644 --- a/domino/task.py +++ b/domino/task.py @@ -13,7 +13,6 @@ from domino.logger import get_configured_logger from domino.schemas.shared_storage import StorageSource from domino.schemas.container_resources import ContainerResourcesModel -from kubernetes import client, config import os @@ -97,15 +96,13 @@ def _set_operator(self) -> BaseOperator: ) ) - # References: - # - https://airflow.apache.org/docs/apache-airflow/1.10.14/_api/airflow/contrib/operators/kubernetes_pod_operator/index.html - # - https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html - # - https://www.astronomer.io/guides/templating/ - # - good example: https://github.com/apache/airflow/blob/main/tests/system/providers/cncf/kubernetes/example_kubernetes.py - # - commands HAVE to go in a list object: https://stackoverflow.com/a/55149915/11483674 - elif self.deploy_mode in ["local-k8s", "local-k8s-dev", "prod"]: - config.load_incluster_config() - self.k8s_client = client.CoreV1Api() + elif self.deploy_mode in ["local-k8s", "local-k8s-dev", "prod"]: + # References: + # - https://airflow.apache.org/docs/apache-airflow/1.10.14/_api/airflow/contrib/operators/kubernetes_pod_operator/index.html + # - https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html + # - https://www.astronomer.io/guides/templating/ + # - good example: https://github.com/apache/airflow/blob/main/tests/system/providers/cncf/kubernetes/example_kubernetes.py + # - commands HAVE to go in a list object: https://stackoverflow.com/a/55149915/11483674 # Container resources base_container_resources_model = ContainerResourcesModel( @@ -118,76 +115,6 @@ def _set_operator(self) -> BaseOperator: basic_container_resources["limits"]["nvidia.com/gpu"] = "1" container_resources_obj = k8s.V1ResourceRequirements(**basic_container_resources) - # Volumes - all_volumes = [] - all_volume_mounts = [] - - ######################## For local DOMINO_DEPLOY_MODE Operators dev ########################################### - if self.deploy_mode == 'local-k8s-dev': - source_image = self.piece.get('source_image') - repository_raw_project_name = str(source_image).split('/')[2].split(':')[0] - persistent_volume_claim_name = 'pvc-{}'.format(str(repository_raw_project_name.lower().replace('_', '-'))) - - persistent_volume_name = 'pv-{}'.format(str(repository_raw_project_name.lower().replace('_', '-'))) - persistent_volume_claim_name = 'pvc-{}'.format(str(repository_raw_project_name.lower().replace('_', '-'))) - - pvc_exists = False - try: - self.k8s_client.read_namespaced_persistent_volume_claim(name=persistent_volume_claim_name, namespace='default') - pvc_exists = True - except client.rest.ApiException as e: - if e.status != 404: - raise e - - pv_exists = False - try: - self.k8s_client.read_persistent_volume(name=persistent_volume_name) - pv_exists = True - except client.rest.ApiException as e: - if e.status != 404: - raise e - - if pv_exists and pvc_exists: - volume_dev_pieces = k8s.V1Volume( - name='dev-op-{path_name}'.format(path_name=str(repository_raw_project_name.lower().replace('_', '-'))), - persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource( - claim_name=persistent_volume_claim_name - ), - ) - volume_mount_dev_pieces = k8s.V1VolumeMount( - name='dev-op-{path_name}'.format(path_name=str(repository_raw_project_name.lower().replace('_', '-'))), - mount_path=f'/home/domino/pieces_repository', - sub_path=None, - read_only=True - ) - all_volumes.append(volume_dev_pieces) - all_volume_mounts.append(volume_mount_dev_pieces) - - ######################## For local Domino dev ############################################### - domino_package_local_claim_name = 'domino-dev-volume-claim' - pvc_exists = False - try: - self.k8s_client.read_namespaced_persistent_volume_claim(name=domino_package_local_claim_name, namespace='default') - pvc_exists = True - except client.rest.ApiException as e: - if e.status != 404: - raise e - - if pvc_exists: - volume_dev = k8s.V1Volume( - name='jobs-persistent-storage-dev', - persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name=domino_package_local_claim_name), - ) - volume_mount_dev = k8s.V1VolumeMount( - name='jobs-persistent-storage-dev', - mount_path='/home/domino/domino_py', - sub_path=None, - read_only=True - ) - all_volumes.append(volume_dev) - all_volume_mounts.append(volume_mount_dev) - ############################################################################################ - pod_startup_timeout_in_seconds = 600 return DominoKubernetesPodOperator( dag_id=self.dag_id, @@ -199,7 +126,7 @@ def _set_operator(self) -> BaseOperator: workflow_shared_storage=self.workflow_shared_storage, # ----------------- Kubernetes ----------------- namespace='default', # TODO - separate namespace by User or Workspace? - image=self.piece["source_image"], + image=self.piece.get("source_image"), image_pull_policy='IfNotPresent', name=f"airflow-worker-pod-{self.task_id}", startup_timeout_seconds=pod_startup_timeout_in_seconds, @@ -209,8 +136,6 @@ def _set_operator(self) -> BaseOperator: arguments=["run-piece-k8s"], do_xcom_push=True, in_cluster=True, - volumes=all_volumes, - volume_mounts=all_volume_mounts, container_resources=container_resources_obj, ) From c7512ea1565f540517503cbe4d7f32f836d4c3cf Mon Sep 17 00:00:00 2001 From: luiz Date: Tue, 18 Jul 2023 21:44:27 +0200 Subject: [PATCH 15/21] remove new column --- rest/database/models/enums.py | 9 +-------- rest/database/models/piece_repository.py | 3 +-- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/rest/database/models/enums.py b/rest/database/models/enums.py index 2e4ec847..ed4ad701 100644 --- a/rest/database/models/enums.py +++ b/rest/database/models/enums.py @@ -9,14 +9,6 @@ class Config: use_enum_values = True -class PieceExecutionMode(str, enum.Enum): - docker = 'docker' - worker = 'worker' - - class Config: - use_enum_values = True - - class Permission(str, enum.Enum): owner = 'owner' read = 'read' @@ -24,6 +16,7 @@ class Permission(str, enum.Enum): class Config: use_enum_values = True + class UserWorkspaceStatus(str, enum.Enum): pending = 'pending' accepted = 'accepted' diff --git a/rest/database/models/piece_repository.py b/rest/database/models/piece_repository.py index 9261f841..cd13db58 100644 --- a/rest/database/models/piece_repository.py +++ b/rest/database/models/piece_repository.py @@ -1,6 +1,6 @@ from database.models.base import Base, BaseDatabaseModel from sqlalchemy import Column, String, Integer, DateTime, Enum, JSON, ForeignKey -from database.models.enums import RepositorySource, PieceExecutionMode +from database.models.enums import RepositorySource from sqlalchemy.orm import relationship from datetime import datetime @@ -15,7 +15,6 @@ class PieceRepository(Base, BaseDatabaseModel): source = Column(Enum(RepositorySource), nullable=True, default=RepositorySource.github.value) path = Column(String(250), nullable=True) version = Column(String(10), nullable=True) - piece_execution_mode = Column(Enum(PieceExecutionMode), nullable=False, default=PieceExecutionMode.docker.value) dependencies_map = Column(JSON, nullable=True) compiled_metadata = Column(JSON, nullable=True) workspace_id = Column(Integer, ForeignKey("workspace.id", ondelete='cascade'), nullable=False) From 9de5cabbf2e385d76be0e689b28ba561e57e4de9 Mon Sep 17 00:00:00 2001 From: luiz Date: Wed, 19 Jul 2023 11:09:40 +0200 Subject: [PATCH 16/21] fixes --- domino/custom_operators/docker_operator.py | 8 +++++--- domino/custom_operators/k8s_operator.py | 6 ++++-- domino/task.py | 6 +++++- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/domino/custom_operators/docker_operator.py b/domino/custom_operators/docker_operator.py index 4a7c3e21..2a172a89 100644 --- a/domino/custom_operators/docker_operator.py +++ b/domino/custom_operators/docker_operator.py @@ -20,7 +20,8 @@ def __init__( workflow_shared_storage: WorkflowSharedStorage = None, **docker_operator_kwargs ) -> None: - super(BaseDominoOperator).__init__( + BaseDominoOperator.__init__( + self, dag_id=dag_id, task_id=task_id, piece_name=piece_name, @@ -50,12 +51,13 @@ def __init__( Mount(source=shared_storage_host_path, target=shared_storage_container_path, type='bind', read_only=False), ) - super(DockerOperator).__init__( - **docker_operator_kwargs, + DockerOperator.__init__( + self, task_id=task_id, docker_url='tcp://docker-proxy:2375', mounts=mounts, environment=self.environment, + **docker_operator_kwargs, ) def _set_base_env_vars(self): diff --git a/domino/custom_operators/k8s_operator.py b/domino/custom_operators/k8s_operator.py index 069be723..50af30a3 100644 --- a/domino/custom_operators/k8s_operator.py +++ b/domino/custom_operators/k8s_operator.py @@ -24,7 +24,8 @@ def __init__( workflow_shared_storage: WorkflowSharedStorage = None, **k8s_operator_kwargs ): - super(BaseDominoOperator).__init__( + BaseDominoOperator.__init__( + self, dag_id=dag_id, task_id=task_id, piece_name=piece_name, @@ -53,7 +54,8 @@ def __init__( if self.deploy_mode == 'local-k8s-dev': volumes_dev, volume_mounts_dev = self._make_volumes_and_volume_mounts_dev() - super(KubernetesPodOperator).__init__( + KubernetesPodOperator.__init__( + self, task_id=task_id, env_vars=pod_env_vars, volumes=volumes_dev, diff --git a/domino/task.py b/domino/task.py index bf23a674..47a4f5c0 100644 --- a/domino/task.py +++ b/domino/task.py @@ -41,6 +41,10 @@ def __init__( self.repository_id = piece["repository_id"] self.piece = piece self.piece_input_kwargs = piece_input_kwargs + if "execution_mode" not in self.piece: + self.execution_mode = "docker" + else: + self.execution_mode = self.piece["execution_mode"] # Shared storage if not workflow_shared_storage: @@ -70,7 +74,7 @@ def _set_operator(self) -> BaseOperator: """ Set Airflow Operator according to deploy mode and Piece execution mode. """ - if self.piece["execution_mode"] == "worker": + if self.execution_mode == "worker": return DominoWorkerOperator( dag_id=self.dag_id, task_id=self.task_id, From d2f79f7185dbfd4299db5e6a0459258eb23368ff Mon Sep 17 00:00:00 2001 From: luiz Date: Wed, 19 Jul 2023 13:22:05 +0200 Subject: [PATCH 17/21] update dependencies --- Dockerfile-airflow-domino-base-dev | 2 +- Dockerfile-airflow-domino-base-prod | 2 +- Dockerfile-airflow-domino-compose-worker | 2 +- Dockerfile-airflow-domino-pod-dev | 2 +- docker-compose-dev.yaml | 17 ++--- domino/cli/utils/docker-compose.yaml | 2 +- domino/custom_operators/base_operator.py | 2 - domino/custom_operators/docker_operator.py | 77 ++++++++++++++-------- requirements-airflow.txt | 4 +- requirements.txt | 2 +- rest/requirements.txt | 2 +- 11 files changed, 67 insertions(+), 47 deletions(-) diff --git a/Dockerfile-airflow-domino-base-dev b/Dockerfile-airflow-domino-base-dev index 26d44ca5..7e791cad 100644 --- a/Dockerfile-airflow-domino-base-dev +++ b/Dockerfile-airflow-domino-base-dev @@ -1,4 +1,4 @@ -FROM apache/airflow:2.5.3-python3.9 +FROM apache/airflow:2.6.3-python3.9 ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONUNBUFFERED=1 diff --git a/Dockerfile-airflow-domino-base-prod b/Dockerfile-airflow-domino-base-prod index 9532e384..0c6543ef 100644 --- a/Dockerfile-airflow-domino-base-prod +++ b/Dockerfile-airflow-domino-base-prod @@ -1,4 +1,4 @@ -FROM apache/airflow:2.5.3-python3.9 +FROM apache/airflow:2.6.3-python3.9 ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONUNBUFFERED=1 diff --git a/Dockerfile-airflow-domino-compose-worker b/Dockerfile-airflow-domino-compose-worker index 5945a598..15bb5210 100644 --- a/Dockerfile-airflow-domino-compose-worker +++ b/Dockerfile-airflow-domino-compose-worker @@ -29,7 +29,7 @@ # create compiled_metadata for all pieces # provisory dockerfile -FROM apache/airflow:2.5.3-python3.9 +FROM apache/airflow:2.6.3-python3.9 ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONUNBUFFERED=1 diff --git a/Dockerfile-airflow-domino-pod-dev b/Dockerfile-airflow-domino-pod-dev index 6ab9a688..e4c208bc 100644 --- a/Dockerfile-airflow-domino-pod-dev +++ b/Dockerfile-airflow-domino-pod-dev @@ -1,4 +1,4 @@ -FROM python:3.8-slim +FROM python:3.9-slim ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONUNBUFFERED=1 diff --git a/docker-compose-dev.yaml b/docker-compose-dev.yaml index 8864b2f1..82ef87d0 100644 --- a/docker-compose-dev.yaml +++ b/docker-compose-dev.yaml @@ -4,7 +4,7 @@ version: '3.8' x-airflow-common: &airflow-common - image: apache/airflow:2.5.3-python3.9 + image: apache/airflow:2.6.3-python3.9 # build: . environment: &airflow-common-env @@ -295,6 +295,7 @@ services: # Domino REST Api domino_rest: + # image: ghcr.io/tauffer-consulting/domino-rest:latest build: context: ./rest dockerfile: Dockerfile @@ -352,18 +353,18 @@ services: # Domino Frontend domino_frontend: - # image: taufferconsulting/domino-frontend:latest - build: - context: ./frontend - dockerfile: Dockerfile + image: ghcr.io/tauffer-consulting/domino-frontend:compose + # build: + # context: ./frontend + # dockerfile: Dockerfile container_name: domino-frontend - command: yarn start:local + # command: yarn start:local environment: - REACT_APP_DOMINO_DEPLOY_MODE=local-compose ports: - "3000:3000" - volumes: - - ./frontend:/usr/src/app # Enable hot reload for frontend + # volumes: + # - ./frontend:/usr/src/app # Enable hot reload for frontend depends_on: domino_rest: condition: service_started diff --git a/domino/cli/utils/docker-compose.yaml b/domino/cli/utils/docker-compose.yaml index 35efc37a..51a1e3ad 100644 --- a/domino/cli/utils/docker-compose.yaml +++ b/domino/cli/utils/docker-compose.yaml @@ -4,7 +4,7 @@ version: '3.8' x-airflow-common: &airflow-common - image: apache/airflow:2.5.3-python3.9 + image: apache/airflow:2.6.3-python3.9 environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor diff --git a/domino/custom_operators/base_operator.py b/domino/custom_operators/base_operator.py index 9d437021..46777045 100644 --- a/domino/custom_operators/base_operator.py +++ b/domino/custom_operators/base_operator.py @@ -12,7 +12,6 @@ class BaseDominoOperator: def __init__( self, - dag_id: str, task_id: str, piece_name: str, deploy_mode: str, @@ -21,7 +20,6 @@ def __init__( workflow_shared_storage: WorkflowSharedStorage = None, domino_client_url: Optional[str] = None, ): - self.dag_id = dag_id self.task_id = task_id self.piece_name = piece_name self.deploy_mode = deploy_mode diff --git a/domino/custom_operators/docker_operator.py b/domino/custom_operators/docker_operator.py index 2a172a89..764e5268 100644 --- a/domino/custom_operators/docker_operator.py +++ b/domino/custom_operators/docker_operator.py @@ -3,11 +3,11 @@ from typing import Dict, Optional import os -from domino.custom_operators.base_operator import BaseDominoOperator +from domino.client.domino_backend_client import DominoBackendRestClient from domino.schemas.shared_storage import WorkflowSharedStorage, StorageSource -class DominoDockerOperator(BaseDominoOperator, DockerOperator): +class DominoDockerOperator(DockerOperator): def __init__( self, @@ -20,22 +20,31 @@ def __init__( workflow_shared_storage: WorkflowSharedStorage = None, **docker_operator_kwargs ) -> None: - BaseDominoOperator.__init__( - self, - dag_id=dag_id, - task_id=task_id, - piece_name=piece_name, - deploy_mode=deploy_mode, - repository_id=repository_id, - piece_input_kwargs=piece_input_kwargs, - workflow_shared_storage=workflow_shared_storage, - domino_client_url="http://domino-rest:8000/", - ) + self.task_id = task_id + self.piece_name = piece_name + self.deploy_mode = deploy_mode + self.repository_id = repository_id + self.piece_input_kwargs = piece_input_kwargs + self.workflow_shared_storage = workflow_shared_storage + self.domino_client = DominoBackendRestClient(base_url="http://domino-rest:8000/") + + # Environment variables + self.environment = { + "DOMINO_PIECE": piece_name, + "DOMINO_INSTANTIATE_PIECE_KWARGS": str({ + "deploy_mode": deploy_mode, + "task_id": task_id, + "dag_id": dag_id, + }), + "DOMINO_RUN_PIECE_KWARGS": str(piece_input_kwargs), + "DOMINO_WORKFLOW_SHARED_STORAGE": self.workflow_shared_storage.json() if self.workflow_shared_storage else "", + "AIRFLOW_CONTEXT_EXECUTION_DATETIME": "{{ dag_run.logical_date | ts_nodash }}", + "AIRFLOW_CONTEXT_DAG_RUN_ID": "{{ run_id }}", + } # Shared Storage variables self.shared_storage_base_mount_path = '/home/shared_storage' self.shared_storage_upstream_ids_list = list() - self._set_base_env_vars() shared_storage_host_path = os.environ.get('LOCAL_DOMINO_SHARED_DATA_PATH', '') shared_storage_container_path = '/home/shared_storage' mounts = [] @@ -43,7 +52,7 @@ def __init__( # TODO remove mounts=[ # TODO remove - # Mount(source='/media/luiz/storage2/Github/domino/domino', target='/home/domino/domino_py/domino', type='bind', read_only=True), + Mount(source='/media/luiz/storage2/Github/domino/domino', target='/home/domino/domino_py/domino', type='bind', read_only=True), # Mount(source='/media/luiz/storage2/Github/default_domino_pieces', target='/home/domino/pieces_repository/', type='bind', read_only=True), ] if self.workflow_shared_storage and str(self.workflow_shared_storage.source.value).lower() == str(getattr(StorageSource, 'local').value).lower(): @@ -60,19 +69,29 @@ def __init__( **docker_operator_kwargs, ) - def _set_base_env_vars(self): - self.environment = { - "DOMINO_PIECE": self.piece_name, - "DOMINO_INSTANTIATE_PIECE_KWARGS": str({ - "deploy_mode": self.deploy_mode, - "task_id": self.task_id, - "dag_id": self.dag_id, - }), - "DOMINO_RUN_PIECE_KWARGS": str(self.piece_input_kwargs), - "DOMINO_WORKFLOW_SHARED_STORAGE": self.workflow_shared_storage.json() if self.workflow_shared_storage else "", - "AIRFLOW_CONTEXT_EXECUTION_DATETIME": "{{ dag_run.logical_date | ts_nodash }}", - "AIRFLOW_CONTEXT_DAG_RUN_ID": "{{ run_id }}", + + def _get_piece_secrets(self, piece_repository_id: int, piece_name: str): + """Get piece secrets values from Domino API""" + secrets_response = self.domino_client.get_piece_secrets( + piece_repository_id=piece_repository_id, + piece_name=piece_name + ) + if secrets_response.status_code != 200: + raise Exception(f"Error getting piece secrets: {secrets_response.json()}") + piece_secrets = { + e.get('name'): e.get('value') + for e in secrets_response.json() } + return piece_secrets + + + @staticmethod + def _get_upstream_xcom_data_from_task_ids(task_ids: list, context: Context): + upstream_xcoms_data = dict() + for tid in task_ids: + upstream_xcoms_data[tid] = context['ti'].xcom_pull(task_ids=tid) + return upstream_xcoms_data + def _update_piece_kwargs_with_upstream_xcom(self, upstream_xcoms_data: dict): #domino_run_piece_kwargs = self.environment.get('DOMINO_RUN_PIECE_KWARGS') @@ -93,7 +112,8 @@ def _update_piece_kwargs_with_upstream_xcom(self, upstream_xcoms_data: dict): self.piece_input_kwargs = updated_op_kwargs self.environment['AIRFLOW_UPSTREAM_TASKS_IDS_SHARED_STORAGE'] = str(self.shared_storage_upstream_ids_list) self.environment['DOMINO_RUN_PIECE_KWARGS'] = str(self.piece_input_kwargs) - + + def _prepare_execute_environment(self, context: Context): """ Prepare execution with the following configurations: @@ -119,6 +139,7 @@ def _prepare_execute_environment(self, context: Context): self.workflow_run_subpath = f"{dag_id}/{dag_run_id_path}" self.environment['DOMINO_WORKFLOW_RUN_SUBPATH'] = self.workflow_run_subpath + def execute(self, context: Context) -> Optional[str]: # env var format = {"name": "value"} self._prepare_execute_environment(context=context) diff --git a/requirements-airflow.txt b/requirements-airflow.txt index 38395786..da481937 100644 --- a/requirements-airflow.txt +++ b/requirements-airflow.txt @@ -1,7 +1,7 @@ -apache-airflow==2.5.3 +apache-airflow==2.6.3 apache-airflow-providers-cncf-kubernetes==5.0.0 apache-airflow-providers-docker==3.6.0 -pydantic==1.9.1 +pydantic==1.10.11 docker>=6.0.1 tomli==2.0.1 tomli-w==1.0.0 diff --git a/requirements.txt b/requirements.txt index dcdc71a8..511f85a3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -pydantic==1.9.1 +pydantic==1.10.11 docker>=6.0.1 urllib3== 1.26.15 tomli==2.0.1 diff --git a/rest/requirements.txt b/rest/requirements.txt index b113d74a..1db46f06 100644 --- a/rest/requirements.txt +++ b/rest/requirements.txt @@ -3,7 +3,7 @@ bcrypt==3.2.2 fastapi-utils==0.2.1 Jinja2==2.11.3 psycopg2-binary==2.9.3 -pydantic[email] +pydantic[email]==1.10.11 MarkupSafe==2.0.1 pydantic==1.9.1 PyGithub==1.55 From 4fe99122bcb852803bbcc242894908857e3a4484 Mon Sep 17 00:00:00 2001 From: luiz Date: Wed, 19 Jul 2023 13:34:26 +0200 Subject: [PATCH 18/21] update dependency and init import --- domino/custom_operators/docker_operator.py | 3 +-- rest/requirements.txt | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/domino/custom_operators/docker_operator.py b/domino/custom_operators/docker_operator.py index 764e5268..5e235580 100644 --- a/domino/custom_operators/docker_operator.py +++ b/domino/custom_operators/docker_operator.py @@ -60,8 +60,7 @@ def __init__( Mount(source=shared_storage_host_path, target=shared_storage_container_path, type='bind', read_only=False), ) - DockerOperator.__init__( - self, + super().__init__( task_id=task_id, docker_url='tcp://docker-proxy:2375', mounts=mounts, diff --git a/rest/requirements.txt b/rest/requirements.txt index 1db46f06..da5c935a 100644 --- a/rest/requirements.txt +++ b/rest/requirements.txt @@ -5,7 +5,6 @@ Jinja2==2.11.3 psycopg2-binary==2.9.3 pydantic[email]==1.10.11 MarkupSafe==2.0.1 -pydantic==1.9.1 PyGithub==1.55 PyJWT==2.4.0 pytzdata==2020.1 From b78b652786dbe9ff8a203eab6ea103eefc03be1f Mon Sep 17 00:00:00 2001 From: luiz Date: Wed, 19 Jul 2023 14:50:01 +0200 Subject: [PATCH 19/21] fixes --- domino/custom_operators/docker_operator.py | 2 +- domino/task.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/domino/custom_operators/docker_operator.py b/domino/custom_operators/docker_operator.py index 5e235580..eb938c64 100644 --- a/domino/custom_operators/docker_operator.py +++ b/domino/custom_operators/docker_operator.py @@ -61,11 +61,11 @@ def __init__( ) super().__init__( + **docker_operator_kwargs, task_id=task_id, docker_url='tcp://docker-proxy:2375', mounts=mounts, environment=self.environment, - **docker_operator_kwargs, ) diff --git a/domino/task.py b/domino/task.py index 47a4f5c0..f1046216 100644 --- a/domino/task.py +++ b/domino/task.py @@ -150,7 +150,7 @@ def _set_operator(self) -> BaseOperator: piece_name=self.piece.get('name'), deploy_mode=self.deploy_mode, repository_id=self.repository_id, - piece_kwargs=self.piece_input_kwargs, + piece_input_kwargs=self.piece_input_kwargs, workflow_shared_storage=self.workflow_shared_storage, # ----------------- Docker ----------------- image=self.piece["source_image"], @@ -165,4 +165,4 @@ def _set_operator(self) -> BaseOperator: def __call__(self) -> Callable: - return self._task_piece \ No newline at end of file + return self._task_operator \ No newline at end of file From b92ed11587c036dfb0f3816acc604b510ef7f3cb Mon Sep 17 00:00:00 2001 From: luiz Date: Wed, 19 Jul 2023 19:16:59 +0200 Subject: [PATCH 20/21] updates on k8soperator --- domino/custom_operators/base_operator.py | 1 + domino/custom_operators/k8s_operator.py | 93 ++++++++++++++++-------- domino/schemas/container_resources.py | 3 +- domino/task.py | 26 ++----- 4 files changed, 72 insertions(+), 51 deletions(-) diff --git a/domino/custom_operators/base_operator.py b/domino/custom_operators/base_operator.py index 46777045..2552aecb 100644 --- a/domino/custom_operators/base_operator.py +++ b/domino/custom_operators/base_operator.py @@ -8,6 +8,7 @@ class BaseDominoOperator: """ This class implements common operations for all Domino Operators running under a Task. + DEPRECATED - delete this later """ def __init__( diff --git a/domino/custom_operators/k8s_operator.py b/domino/custom_operators/k8s_operator.py index 50af30a3..8b737bf0 100644 --- a/domino/custom_operators/k8s_operator.py +++ b/domino/custom_operators/k8s_operator.py @@ -8,11 +8,13 @@ import ast import copy -from domino.custom_operators.base_operator import BaseDominoOperator +from domino.utils import dict_deep_update +from domino.client.domino_backend_client import DominoBackendRestClient from domino.schemas.shared_storage import WorkflowSharedStorage +from domino.schemas.container_resources import ContainerResourcesModel -class DominoKubernetesPodOperator(BaseDominoOperator, KubernetesPodOperator): +class DominoKubernetesPodOperator(KubernetesPodOperator): def __init__( self, dag_id: str, @@ -20,52 +22,61 @@ def __init__( piece_name: str, deploy_mode: str, # TODO enum repository_id: int, - piece_kwargs: Optional[Dict] = None, + piece_input_kwargs: Optional[Dict] = None, workflow_shared_storage: WorkflowSharedStorage = None, + container_resources: Optional[Dict] = None, **k8s_operator_kwargs ): - BaseDominoOperator.__init__( - self, - dag_id=dag_id, - task_id=task_id, - piece_name=piece_name, - deploy_mode=deploy_mode, - repository_id=repository_id, - piece_input_kwargs=piece_kwargs, - workflow_shared_storage=workflow_shared_storage, - domino_client_url="http://domino-rest-service:8000/", # TODO change url based on platform configuration - ) - + self.task_id = task_id + self.piece_name = piece_name + self.deploy_mode = deploy_mode + self.repository_id = repository_id + self.piece_input_kwargs = piece_input_kwargs + self.workflow_shared_storage = workflow_shared_storage + self.domino_client = DominoBackendRestClient(base_url="http://domino-rest-service:8000/") # TODO change url based on platform configuration + + # Environment variables pod_env_vars = { - "DOMINO_PIECE": self.piece_name, + "DOMINO_PIECE": piece_name, "DOMINO_INSTANTIATE_PIECE_KWARGS": str({ - "deploy_mode": self.deploy_mode, - "task_id": self.task_id, - "dag_id": self.dag_id, + "deploy_mode": deploy_mode, + "task_id": task_id, + "dag_id": dag_id, }), - "DOMINO_RUN_PIECE_KWARGS": str(self.piece_input_kwargs), - "DOMINO_WORKFLOW_SHARED_STORAGE": self.workflow_shared_storage.json() if self.workflow_shared_storage else "", + "DOMINO_RUN_PIECE_KWARGS": str(piece_input_kwargs), + "DOMINO_WORKFLOW_SHARED_STORAGE": workflow_shared_storage.json() if workflow_shared_storage else "", "AIRFLOW_CONTEXT_EXECUTION_DATETIME": "{{ dag_run.logical_date | ts_nodash }}", "AIRFLOW_CONTEXT_DAG_RUN_ID": "{{ run_id }}", } - # For DEV mode only + # Container resources + if container_resources is None: + container_resources = {} + base_container_resources_model = ContainerResourcesModel( + requests={"cpu": "100m", "memory": "128Mi",}, + limits={"cpu": "100m", "memory": "128Mi"}, + use_gpu=False, + ) + basic_container_resources = base_container_resources_model.dict() + updated_container_resources = dict_deep_update(basic_container_resources, container_resources) + use_gpu = updated_container_resources.pop("use_gpu", False) + if use_gpu: + updated_container_resources["limits"]["nvidia.com/gpu"] = "1" + container_resources_obj = k8s.V1ResourceRequirements(**updated_container_resources) + + # Extra volume and volume mounts - for DEV mode only volumes_dev, volume_mounts_dev = None, None if self.deploy_mode == 'local-k8s-dev': volumes_dev, volume_mounts_dev = self._make_volumes_and_volume_mounts_dev() - KubernetesPodOperator.__init__( - self, + super().__init__( task_id=task_id, env_vars=pod_env_vars, + container_resources=container_resources_obj, volumes=volumes_dev, volume_mounts=volume_mounts_dev, **k8s_operator_kwargs ) - - # Shared Storage variables - self.shared_storage_base_mount_path = '/home/shared_storage' - self.shared_storage_upstream_ids_list = list() def _make_volumes_and_volume_mounts_dev(self): @@ -150,8 +161,9 @@ def build_pod_request_obj(self, context: Optional['Context'] = None) -> k8s.V1Po This function runs after our own self.execute, by super().execute() """ pod = super().build_pod_request_obj(context) - # Add shared storage to pod self.task_id_replaced = self.task_id.replace("_", "-").lower() # doing this because airflow doesn't allow underscores and upper case in mount names + self.shared_storage_base_mount_path = '/home/shared_storage' + self.shared_storage_upstream_ids_list = list() if not self.workflow_shared_storage or self.workflow_shared_storage.mode.name == 'none': return pod if self.workflow_shared_storage.source.name in ["aws_s3", "gcs"]: @@ -297,6 +309,29 @@ def add_shared_storage_sidecar(self, pod: k8s.V1Pod) -> k8s.V1Pod: return pod_cp + def _get_piece_secrets(self, piece_repository_id: int, piece_name: str): + """Get piece secrets values from Domino API""" + secrets_response = self.domino_client.get_piece_secrets( + piece_repository_id=piece_repository_id, + piece_name=piece_name + ) + if secrets_response.status_code != 200: + raise Exception(f"Error getting piece secrets: {secrets_response.json()}") + piece_secrets = { + e.get('name'): e.get('value') + for e in secrets_response.json() + } + return piece_secrets + + + @staticmethod + def _get_upstream_xcom_data_from_task_ids(task_ids: list, context: Context): + upstream_xcoms_data = dict() + for tid in task_ids: + upstream_xcoms_data[tid] = context['ti'].xcom_pull(task_ids=tid) + return upstream_xcoms_data + + def _get_piece_kwargs_with_upstream_xcom(self, upstream_xcoms_data: dict): """ Update Operator kwargs with upstream tasks XCOM data diff --git a/domino/schemas/container_resources.py b/domino/schemas/container_resources.py index f609ee03..988728d4 100644 --- a/domino/schemas/container_resources.py +++ b/domino/schemas/container_resources.py @@ -6,4 +6,5 @@ class SystemRequirementsModel(BaseModel): class ContainerResourcesModel(BaseModel): requests: SystemRequirementsModel = Field(default=SystemRequirementsModel(cpu="100m", memory="128Mi")) - limits: SystemRequirementsModel = Field(default=SystemRequirementsModel(cpu="100m", memory="128Mi")) \ No newline at end of file + limits: SystemRequirementsModel = Field(default=SystemRequirementsModel(cpu="100m", memory="128Mi")) + use_gpu: bool = False \ No newline at end of file diff --git a/domino/task.py b/domino/task.py index f1046216..70bacf8f 100644 --- a/domino/task.py +++ b/domino/task.py @@ -3,17 +3,15 @@ from kubernetes.client import models as k8s from datetime import datetime from typing import Callable +import os from domino.custom_operators.docker_operator import DominoDockerOperator from domino.custom_operators.python_operator import PythonOperator from domino.custom_operators.k8s_operator import DominoKubernetesPodOperator from domino.custom_operators.worker_operator import DominoWorkerOperator from domino.schemas.shared_storage import shared_storage_map -from domino.utils import dict_deep_update from domino.logger import get_configured_logger from domino.schemas.shared_storage import StorageSource -from domino.schemas.container_resources import ContainerResourcesModel -import os class Task(object): @@ -60,8 +58,7 @@ def __init__( self.workflow_shared_storage = shared_storage_map[shared_storage_source_name] # Container resources - self.container_resources = container_resources if container_resources else {} - self.provide_gpu = self.container_resources.pop("use_gpu", False) + self.container_resources = container_resources # Get deploy mode self.deploy_mode = os.environ.get('DOMINO_DEPLOY_MODE') @@ -107,40 +104,27 @@ def _set_operator(self) -> BaseOperator: # - https://www.astronomer.io/guides/templating/ # - good example: https://github.com/apache/airflow/blob/main/tests/system/providers/cncf/kubernetes/example_kubernetes.py # - commands HAVE to go in a list object: https://stackoverflow.com/a/55149915/11483674 - - # Container resources - base_container_resources_model = ContainerResourcesModel( - requests={"cpu": "100m", "memory": "128Mi",}, - limits={"cpu": "100m", "memory": "128Mi"} - ) - basic_container_resources = base_container_resources_model.dict() - basic_container_resources = dict_deep_update(basic_container_resources, self.container_resources) - if self.provide_gpu: - basic_container_resources["limits"]["nvidia.com/gpu"] = "1" - container_resources_obj = k8s.V1ResourceRequirements(**basic_container_resources) - - pod_startup_timeout_in_seconds = 600 return DominoKubernetesPodOperator( dag_id=self.dag_id, task_id=self.task_id, piece_name=self.piece.get('name'), deploy_mode=self.deploy_mode, repository_id=self.repository_id, - piece_kwargs=self.piece_input_kwargs, + piece_input_kwargs=self.piece_input_kwargs, workflow_shared_storage=self.workflow_shared_storage, + container_resources=self.container_resources, # ----------------- Kubernetes ----------------- namespace='default', # TODO - separate namespace by User or Workspace? image=self.piece.get("source_image"), image_pull_policy='IfNotPresent', name=f"airflow-worker-pod-{self.task_id}", - startup_timeout_seconds=pod_startup_timeout_in_seconds, + startup_timeout_seconds=600, #cmds=["/bin/bash"], #arguments=["-c", "sleep 120;"], cmds=["domino"], arguments=["run-piece-k8s"], do_xcom_push=True, in_cluster=True, - container_resources=container_resources_obj, ) elif self.deploy_mode == 'local-compose': From 023d1ba4f92817a7457735ad836287b7e89a95f8 Mon Sep 17 00:00:00 2001 From: luiz Date: Wed, 19 Jul 2023 19:40:03 +0200 Subject: [PATCH 21/21] same postgres image --- docker-compose-dev.yaml | 4 ++-- domino/cli/utils/docker-compose.yaml | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/docker-compose-dev.yaml b/docker-compose-dev.yaml index 82ef87d0..cd9a5f0c 100644 --- a/docker-compose-dev.yaml +++ b/docker-compose-dev.yaml @@ -327,7 +327,7 @@ services: # Domino Postgres domino_postgres: - image: postgres + image: postgres:13 container_name: domino-postgres environment: - POSTGRES_DB=postgres @@ -362,7 +362,7 @@ services: environment: - REACT_APP_DOMINO_DEPLOY_MODE=local-compose ports: - - "3000:3000" + - "3000:80" # volumes: # - ./frontend:/usr/src/app # Enable hot reload for frontend depends_on: diff --git a/domino/cli/utils/docker-compose.yaml b/domino/cli/utils/docker-compose.yaml index 51a1e3ad..fbf6bb1e 100644 --- a/domino/cli/utils/docker-compose.yaml +++ b/domino/cli/utils/docker-compose.yaml @@ -39,6 +39,7 @@ services: - "2376:2375" volumes: - /var/run/docker.sock:/var/run/docker.sock + postgres: image: postgres:13 environment: @@ -313,7 +314,7 @@ services: # Domino Postgres domino_postgres: - image: postgres + image: postgres:13 container_name: domino-postgres environment: - POSTGRES_DB=postgres