Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task lock logic #394

Merged
merged 14 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ Plan to go away from the prefect dbt core blocks & connection blocks
| Delete a pipeline `DELETE /api/prefect/flows/{deployment_id}` | Update a connection `DELETE /api/prefect/v1/flows/{deployment_id}` |
| Proxy: update a deployment `PUT /api/proxy/deployments/{deployment_id}` | Proxy: update a deployment `PUT /api/proxy/v1/deployments/{deployment_id}` |

#### <u>Task lock logic</u>

- New function in prefect service to handle the logic `lock_tasks_for_deployment`

- New api for prefect custom webhook `POST /webhooks/v1/notification/`. Need to setup this up in prefect UI.

## Data Migration

#### <u>Phase1 - Airbyte sever & connections</u>
Expand Down
53 changes: 43 additions & 10 deletions ddpui/api/client/airbyte_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
OrgDataFlowv1,
)
from ddpui.models.orgjobs import DataflowBlock, BlockLock
from ddpui.models.tasks import Task, DataflowOrgTask, OrgTask
from ddpui.models.tasks import Task, DataflowOrgTask, OrgTask, TaskLock
from ddpui.models.org_user import OrgUser
from ddpui.ddpairbyte import airbytehelpers
from ddpui.utils.custom_logger import CustomLogger
Expand Down Expand Up @@ -1171,8 +1171,6 @@ def post_airbyte_connection_v1(request, payload: AirbyteConnectionCreate):
"status": airbyte_conn["status"],
"deploymentId": dataflow["deployment"]["id"],
"normalize": payload.normalize,
"lastRun": None, # TODO
"lock": None, # TODO
}
logger.debug(res)
return res
Expand All @@ -1199,7 +1197,30 @@ def get_airbyte_connections_v1(request):
orguser.org.airbyte_workspace_id, org_task.connection_id
)

sync_dataflow = DataflowOrgTask.objects.filter(orgtask=org_task).first()
# a single connection will have a manual deployment and (usually) a pipeline
# we want to show the last sync, from whichever
last_runs = []
for df_orgtask in DataflowOrgTask.objects.filter(
orgtask=org_task,
):
run = prefect_service.get_last_flow_run_by_deployment_id(
df_orgtask.dataflow.deployment_id
)
if run:
last_runs.append(run)

last_runs.sort(
key=lambda run: run["startTime"]
if run["startTime"]
else run["expectedStartTime"]
)

sync_dataflow = DataflowOrgTask.objects.filter(
orgtask=org_task, dataflow__dataflow_type="manual"
).first()

# is the task currently locked?
lock = TaskLock.objects.filter(orgtask=org_task).first()

res.append(
{
Expand All @@ -1213,12 +1234,16 @@ def get_airbyte_connections_v1(request):
"deploymentId": sync_dataflow.dataflow.deployment_id
if sync_dataflow
else None,
"lastRun": None, # TODO
"lock": None, # TODO
"lastRun": last_runs[-1] if len(last_runs) > 0 else None,
"lock": {
"lockedBy": lock.locked_by.user.email,
"lockedAt": lock.locked_at,
}
if lock
else None,
}
)

# TODO: lock task logic
logger.info(res)

# by default normalization is going as False here because we dont do anything with it
Expand Down Expand Up @@ -1249,12 +1274,15 @@ def get_airbyte_connection_v1(request, connection_id):
orguser.org.airbyte_workspace_id, org_task.connection_id
)

dataflow_orgtask = DataflowOrgTask.objects.filter(orgtask=org_task).first()
dataflow_orgtask = DataflowOrgTask.objects.filter(
orgtask=org_task, dataflow__dataflow_type="manual"
).first()

if dataflow_orgtask is None:
raise HttpError(422, "deployment not found")

# TODO: task lock logic
# check if the task is locked or not
lock = TaskLock.objects.filter(orgtask=org_task).first()

# fetch the source and destination names
# the web_backend/connections/get fetches the source & destination objects also so we dont need to query again
Expand All @@ -1281,7 +1309,12 @@ def get_airbyte_connection_v1(request, connection_id):
)
if "operationIds" in airbyte_conn and len(airbyte_conn["operationIds"]) == 1
else False,
"lock": None, # TODO
"lock": {
"lockedBy": lock.locked_by.user.email,
"lockedAt": lock.locked_at,
}
if lock
else None,
}

logger.debug(res)
Expand Down
48 changes: 41 additions & 7 deletions ddpui/api/client/prefect_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
)
from ddpui.models.orgjobs import BlockLock, DataflowBlock
from ddpui.models.org_user import OrgUser
from ddpui.models.tasks import Task, DataflowOrgTask, OrgTask
from ddpui.models.tasks import Task, DataflowOrgTask, OrgTask, TaskLock
from ddpui.ddpprefect.schema import (
PrefectAirbyteSync,
PrefectDbtCore,
Expand Down Expand Up @@ -845,7 +845,15 @@ def post_run_prefect_org_task(request, orgtask_id): # pylint: disable=unused-ar
dbtrepodir = Path(os.getenv("CLIENTDBT_ROOT")) / orguser.org.slug / "dbtrepo"
project_dir = str(dbtrepodir)

# TODO: add task lock logic
# check if the task is locked
task_lock = TaskLock.objects.filter(orgtask=org_task).first()
if task_lock:
raise HttpError(
400, f"{task_lock.locked_by.user.email} is running this operation"
)

# lock the task
task_lock = TaskLock.objects.create(orgtask=org_task, locked_by=orguser)

if org_task.task.slug == TASK_GITPULL:
shell_env = {"secret-git-pull-url-block": ""}
Expand Down Expand Up @@ -874,6 +882,7 @@ def post_run_prefect_org_task(request, orgtask_id): # pylint: disable=unused-ar
try:
result = prefect_service.run_shell_task_sync(payload)
except Exception as error:
task_lock.delete()
logger.exception(error)
raise HttpError(
400, f"failed to run the shell task {org_task.task.slug}"
Expand Down Expand Up @@ -915,9 +924,14 @@ def post_run_prefect_org_task(request, orgtask_id): # pylint: disable=unused-ar
try:
result = prefect_service.run_dbt_task_sync(payload)
except Exception as error:
task_lock.delete()
logger.exception(error)
raise HttpError(400, "failed to run dbt") from error

# release the lock
task_lock.delete()
logger.info("released lock on task %s", org_task.task.slug)

return result


Expand All @@ -941,13 +955,21 @@ def post_run_prefect_org_deployment_task(request, deployment_id):
if dataflow_orgtask is None:
raise HttpError(400, "no org task mapped to the deployment")

# TODO: add task lock logic
locks = prefect_service.lock_tasks_for_deployment(deployment_id, orguser)

try:
res = prefect_service.create_deployment_flow_run(deployment_id)
except Exception as error:
for task_lock in locks:
logger.info("deleting TaskLock %s", task_lock.orgtask.task.slug)
task_lock.delete()
logger.exception(error)
raise HttpError(400, "failed to start a run") from error

for tasklock in locks:
tasklock.flow_run_id = res["flow_run_id"]
tasklock.save()

return res


Expand All @@ -962,6 +984,7 @@ def post_prefect_transformation_tasks(request):
- dbt clean
- dbt run
- dbt test
- dbt docs generate
"""
orguser: OrgUser = request.orguser
if orguser.org.dbt is None:
Expand Down Expand Up @@ -1037,7 +1060,7 @@ def post_prefect_transformation_tasks(request):

# create a dbt cli profile block
try:
cli_block_name = f"{orguser.org.slug}_{profile_name}"
cli_block_name = f"{orguser.org.slug}-{profile_name}"
cli_block_response = prefect_service.create_dbt_cli_profile_block(
cli_block_name,
profile_name,
Expand Down Expand Up @@ -1135,13 +1158,21 @@ def get_prefect_transformation_tasks(request):
.order_by("task__id")
.all()
):
# TODO: add task locking logic here later
# check if task is locked
lock = TaskLock.objects.filter(orgtask=org_task).first()

org_tasks.append(
{
"label": org_task.task.label,
"slug": org_task.task.slug,
"id": org_task.id,
"deploymentId": None,
"lock": {
"lockedBy": lock.locked_by.user.email,
"lockedAt": lock.locked_at,
}
if lock
else None,
}
)

Expand Down Expand Up @@ -1399,9 +1430,12 @@ def get_prefect_dataflows_v1(request):
# lock = BlockLock.objects.filter(
# opb__block_id__in=[x["opb__block_id"] for x in block_ids]
# ).first()
org_task_ids = DataflowOrgTask.objects.filter(dataflow=flow).values_list(
"id", flat=True
)

lock = TaskLock.objects.filter(orgtask_id__in=org_task_ids).first()

# TODO: task lock logic
lock = None
res.append(
{
"name": flow.name,
Expand Down
61 changes: 61 additions & 0 deletions ddpui/api/client/webhook_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ddpui.utils.custom_logger import CustomLogger
from ddpui.api.client.prefect_api import prefect_service
from ddpui.models.orgjobs import BlockLock
from ddpui.models.tasks import TaskLock
from ddpui.models.org_user import OrgUser
from ddpui.utils.webhook_helpers import (
get_message_type,
Expand Down Expand Up @@ -81,6 +82,66 @@ def post_notification(request): # pylint: disable=unused-argument
return {"status": "ok"}


@webhookapi.post("/v1/notification/")
def post_notification_v1(request): # pylint: disable=unused-argument
"""webhook endpoint for notifications"""
if request.headers.get("X-Notification-Key") != os.getenv(
"PREFECT_NOTIFICATIONS_WEBHOOK_KEY"
):
raise HttpError(400, "unauthorized")
notification = json.loads(request.body)
logger.info(notification)
message = notification["body"]
logger.info(message)

message_object = None
try:
message_object = json.loads(message)
except ValueError:
# not json, oh well
pass
if message_object is None and isinstance(message, dict):
message_object = message

flow_run_id = None
if message_object:
message_type = get_message_type(message_object)
if message_type == FLOW_RUN:
flow_run_id = message_object["id"]

else:
# 'Flow run {flow_run_name} with id {flow_run_id} entered state {flow_run_state_name}'
flow_run_id, state = get_flowrun_id_and_state(message)

if flow_run_id:
logger.info("found flow-run id %s, state %s", flow_run_id, state)

if state in ["Cancelled", "Completed", "Failed", "Crashed"]:
logger.info("deleting the task locks")
TaskLock.objects.filter(flow_run_id=flow_run_id).delete()

elif state in ["Pending"]:
flow_run = prefect_service.get_flow_run(flow_run_id)
deployment_id = flow_run["deployment_id"]
system_user = OrgUser.objects.filter(user__email="System User").first()
try:
prefect_service.lock_tasks_for_deployment(deployment_id, system_user)
except HttpError:
# silently ignore
logger.info(
"failed to lock blocks for deployment %s, ignoring", deployment_id
)

# logger.info(flow_run)
if state in ["Failed", "Crashed"]:
flow_run = prefect_service.get_flow_run(flow_run_id)
org = get_org_from_flow_run(flow_run)
if org:
email_flowrun_logs_to_orgusers(org, flow_run_id)

return {"status": "ok"}


# setting up the notification and customizing the message format
#
# 1. create the custom-webhook notification. not the notification block! just the notification,
Expand Down
30 changes: 30 additions & 0 deletions ddpui/ddpprefect/prefect_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
PrefectDataFlowUpdateSchema3,
)
from ddpui.utils.custom_logger import CustomLogger
from ddpui.models.tasks import DataflowOrgTask, TaskLock
from ddpui.models.orgjobs import BlockLock, DataflowBlock
from ddpui.models.org_user import OrgUser
from ddpui.models.flow_runs import PrefectFlowRun
Expand Down Expand Up @@ -606,3 +607,32 @@ def lock_blocks_for_deployment(deployment_id: str, orguser: OrgUser):
400, "Someone else is trying to run this pipeline... try again"
) from error
return locks


def lock_tasks_for_deployment(deployment_id: str, orguser: OrgUser):
"""locks all orgtasks for a deployment"""
dataflow_orgtasks = DataflowOrgTask.objects.filter(
dataflow__deployment_id=deployment_id
).all()

orgtask_ids = [df_orgtask.orgtask.id for df_orgtask in dataflow_orgtasks]
lock = TaskLock.objects.filter(orgtask_id__in=orgtask_ids).first()
if lock:
logger.info(f"{lock.locked_by.user.email} is running this pipeline right now")
raise HttpError(
400, f"{lock.locked_by.user.email} is running this pipeline right now"
)

locks = []
try:
with transaction.atomic():
for df_orgtask in dataflow_orgtasks:
task_lock = TaskLock.objects.create(
orgtask=df_orgtask.orgtask, locked_by=orguser
)
locks.append(task_lock)
except Exception as error:
raise HttpError(
400, "Someone else is trying to run this pipeline... try again"
) from error
return locks
Loading
Loading