Skip to content

Commit

Permalink
minor fixes in the put dataflow api
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishankoradia committed Dec 22, 2023
1 parent 203df30 commit c1aab58
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ Plan to go away from the prefect dbt core blocks & connection blocks
| Before | After |
| ------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| Django: apis to run dbt core operations & git pull operations on transform can be deprecated `/api/prefect/flows/dbt_run/` & `/api/dbt/git_pull/` | Django : api to run tasks (dbt + shell operation(git pull)) `/api/prefect/tasks/{orgtask_id}/run/` <br /> Proxy : new api to run tasks (dbt tasks) via updated flows `/proxy/v1/flows/dbtcore/run/` <br /> Proxy : new api to run shell operations (a general shell op; will be used for git pull for now) via updated flows `/proxy/flows/shell/run/` |
| Django: api to run long tasks via deployment `/api/prefect/flows/{deployment_id}/flow_run` | Django: new api to run long running tasks via deployment `/api/prefect/v1/flows/{deployment_id}/flow_run` |
| Django: api to run long tasks via deployment `/api/prefect/flows/{deployment_id}/flow_run` | Django: new api to run long running tasks via deployment `/api/prefect/v1/flows/{deployment_id}/flow_run/` |

#### <u>Pipeline/orchestrate page</u>

Expand Down
20 changes: 13 additions & 7 deletions ddpui/api/client/prefect_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1564,6 +1564,7 @@ def put_prefect_dataflow_v1(
seq = 0 # global sequence for all tasks
tasks = []
map_org_tasks = [] # map org tasks to dataflow
delete_dataflow_orgtask_type = []

Check warning on line 1567 in ddpui/api/client/prefect_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/client/prefect_api.py#L1567

Added line #L1567 was not covered by tests

# check if pipeline has airbyte syncs
if len(payload.connections) > 0:
Expand All @@ -1574,9 +1575,7 @@ def put_prefect_dataflow_v1(
raise HttpError(400, "airbyte server block not found")

# delete all airbyte sync DataflowOrgTask
DataflowOrgTask.objects.filter(
dataflow=org_data_flow, orgtask__task__type="airbyte"
).delete()
delete_dataflow_orgtask_type.append("airbyte")

Check warning on line 1578 in ddpui/api/client/prefect_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/client/prefect_api.py#L1578

Added line #L1578 was not covered by tests

# push sync tasks to pipeline
payload.connections.sort(key=lambda conn: conn.seq)
Expand Down Expand Up @@ -1631,12 +1630,13 @@ def put_prefect_dataflow_v1(
raise HttpError(400, "dbt cli profile not found")

# delete all transform related DataflowOrgTask
DataflowOrgTask.objects.filter(
dataflow=org_data_flow, orgtask__task__type__in=["dbt", "git"]
).delete()
delete_dataflow_orgtask_type.append("dbt")
delete_dataflow_orgtask_type.append("git")

Check warning on line 1634 in ddpui/api/client/prefect_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/client/prefect_api.py#L1633-L1634

Added lines #L1633 - L1634 were not covered by tests

# push dbt pipeline tasks
for org_task in OrgTask.objects.filter(task__type__in=["dbt", "git"]).all():
for org_task in OrgTask.objects.filter(

Check warning on line 1637 in ddpui/api/client/prefect_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/client/prefect_api.py#L1637

Added line #L1637 was not covered by tests
org=orguser.org, task__type__in=["dbt", "git"]
).all():
logger.info(
f"found transform task {org_task.task.slug}; pushing to pipeline"
)
Expand Down Expand Up @@ -1693,6 +1693,12 @@ def put_prefect_dataflow_v1(
logger.exception(error)
raise HttpError(400, "failed to update a pipeline") from error

# Delete mapping
DataflowOrgTask.objects.filter(

Check warning on line 1697 in ddpui/api/client/prefect_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/client/prefect_api.py#L1697

Added line #L1697 was not covered by tests
dataflow=org_data_flow, orgtask__task__type__in=delete_dataflow_orgtask_type
).delete()

# create mapping
for org_task in map_org_tasks:
DataflowOrgTask.objects.create(dataflow=org_data_flow, orgtask=org_task)

Expand Down
2 changes: 1 addition & 1 deletion ddpui/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
TASK_DBTCLEAN = "dbt-clean"
TASK_DBTDEPS = "dbt-deps"
TASK_GITPULL = "git-pull"
TASK_DOCSGENERATE = "docs-generate"
TASK_DOCSGENERATE = "dbt-docs-generate"
TASK_AIRBYTESYNC = "airbyte-sync"


Expand Down

0 comments on commit c1aab58

Please sign in to comment.