Skip to content

Commit

Permalink
Merge pull request #54 from Tauffer-Consulting/airflow-operators-wrapper
Browse files Browse the repository at this point in the history
Re-organize Task and Operators
  • Loading branch information
luiztauffer authored Jul 19, 2023
2 parents a405192 + 023d1ba commit f36f5cb
Show file tree
Hide file tree
Showing 19 changed files with 493 additions and 406 deletions.
2 changes: 1 addition & 1 deletion Dockerfile-airflow-domino-base-dev
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM apache/airflow:2.5.3-python3.9
FROM apache/airflow:2.6.3-python3.9

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile-airflow-domino-base-prod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM apache/airflow:2.5.3-python3.9
FROM apache/airflow:2.6.3-python3.9

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile-airflow-domino-compose-worker
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
# create compiled_metadata for all pieces

# provisory dockerfile
FROM apache/airflow:2.5.3-python3.9
FROM apache/airflow:2.6.3-python3.9

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile-airflow-domino-pod-dev
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.8-slim
FROM python:3.9-slim

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
Expand Down
21 changes: 11 additions & 10 deletions docker-compose-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
version: '3.8'
x-airflow-common:
&airflow-common
image: apache/airflow:2.5.3-python3.9
image: apache/airflow:2.6.3-python3.9
# build: .
environment:
&airflow-common-env
Expand Down Expand Up @@ -295,6 +295,7 @@ services:

# Domino REST Api
domino_rest:
# image: ghcr.io/tauffer-consulting/domino-rest:latest
build:
context: ./rest
dockerfile: Dockerfile
Expand Down Expand Up @@ -326,7 +327,7 @@ services:

# Domino Postgres
domino_postgres:
image: postgres
image: postgres:13
container_name: domino-postgres
environment:
- POSTGRES_DB=postgres
Expand All @@ -352,18 +353,18 @@ services:

# Domino Frontend
domino_frontend:
# image: taufferconsulting/domino-frontend:latest
build:
context: ./frontend
dockerfile: Dockerfile
image: ghcr.io/tauffer-consulting/domino-frontend:compose
# build:
# context: ./frontend
# dockerfile: Dockerfile
container_name: domino-frontend
command: yarn start:local
# command: yarn start:local
environment:
- REACT_APP_DOMINO_DEPLOY_MODE=local-compose
ports:
- "3000:3000"
volumes:
- ./frontend:/usr/src/app # Enable hot reload for frontend
- "3000:80"
# volumes:
# - ./frontend:/usr/src/app # Enable hot reload for frontend
depends_on:
domino_rest:
condition: service_started
Expand Down
4 changes: 2 additions & 2 deletions domino/base_piece.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ def run_piece_function(
self.results_path = f"{self.workflow_shared_storage}/{self.task_id}/results"
self.xcom_path = f"{self.workflow_shared_storage}/{self.task_id}/xcom"
self.report_path = f"{self.workflow_shared_storage}/{self.task_id}/report"
shared_storage_source = os.environ.get('DOMINO_WORKFLOW_SHARED_STORAGE', None)
if not shared_storage_source or shared_storage_source == "none" or self.deploy_mode == "local-compose":
shared_storage_source_name = os.environ.get('DOMINO_WORKFLOW_SHARED_STORAGE_SOURCE_NAME', None)
if not shared_storage_source_name or shared_storage_source_name == "none" or self.deploy_mode == "local-compose":
self.generate_paths()
else:
self._wait_for_sidecar_paths()
Expand Down
5 changes: 3 additions & 2 deletions domino/cli/utils/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
version: '3.8'
x-airflow-common:
&airflow-common
image: apache/airflow:2.5.3-python3.9
image: apache/airflow:2.6.3-python3.9
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
Expand Down Expand Up @@ -39,6 +39,7 @@ services:
- "2376:2375"
volumes:
- /var/run/docker.sock:/var/run/docker.sock

postgres:
image: postgres:13
environment:
Expand Down Expand Up @@ -313,7 +314,7 @@ services:

# Domino Postgres
domino_postgres:
image: postgres
image: postgres:13
container_name: domino-postgres
environment:
- POSTGRES_DB=postgres
Expand Down
94 changes: 0 additions & 94 deletions domino/client/airflow_client.py

This file was deleted.

28 changes: 28 additions & 0 deletions domino/client/domino_backend_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,31 @@ def get_piece_repositories(self, workspace_id: int, filters: dict):
params=filters
)
return response

def check_create_airflow_connection(self, conn_id: str, conn_type: str):
"""
This should check if a specific Airflow connection exists and create it if it doesn't.
ref: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/post_connection
"""
# TODO - this is WIP
resource = f"/airflow/connections/{conn_id}"
response = self.request(
method='get',
resource=resource,
)
if response.status_code == 404:
response = self.request(
method='post',
resource=resource,
json={
"conn_id": conn_id,
"conn_type": conn_type,
"login": "", # TODO: add login and password
"password": "",
"extra": {
# Specify extra parameters here, e.g. "region_name": "eu-central-1"
},
}
)
if response.status_code != 200:
raise Exception(f"Failed to create Airflow connection {conn_id}. \n {response.json()}")
53 changes: 53 additions & 0 deletions domino/custom_operators/base_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import Dict, Optional
from airflow.utils.context import Context

from domino.client.domino_backend_client import DominoBackendRestClient
from domino.schemas.shared_storage import WorkflowSharedStorage


class BaseDominoOperator:
"""
This class implements common operations for all Domino Operators running under a Task.
DEPRECATED - delete this later
"""

def __init__(
self,
task_id: str,
piece_name: str,
deploy_mode: str,
repository_id: int,
piece_input_kwargs: Optional[Dict] = None,
workflow_shared_storage: WorkflowSharedStorage = None,
domino_client_url: Optional[str] = None,
):
self.task_id = task_id
self.piece_name = piece_name
self.deploy_mode = deploy_mode
self.repository_id = repository_id
self.piece_input_kwargs = piece_input_kwargs
self.workflow_shared_storage = workflow_shared_storage
if domino_client_url is None:
domino_client_url = "http://domino-rest:8000/"
self.domino_client = DominoBackendRestClient(base_url=domino_client_url)

def _get_piece_secrets(self, piece_repository_id: int, piece_name: str):
"""Get piece secrets values from Domino API"""
secrets_response = self.domino_client.get_piece_secrets(
piece_repository_id=piece_repository_id,
piece_name=piece_name
)
if secrets_response.status_code != 200:
raise Exception(f"Error getting piece secrets: {secrets_response.json()}")
piece_secrets = {
e.get('name'): e.get('value')
for e in secrets_response.json()
}
return piece_secrets

@staticmethod
def _get_upstream_xcom_data_from_task_ids(task_ids: list, context: Context):
upstream_xcoms_data = dict()
for tid in task_ids:
upstream_xcoms_data[tid] = context['ti'].xcom_pull(task_ids=tid)
return upstream_xcoms_data
Loading

0 comments on commit f36f5cb

Please sign in to comment.