Skip to content

Commit

Permalink
Merge pull request #410 from DalgoT4D/cleanup-002
Browse files Browse the repository at this point in the history
Cleanup 002
  • Loading branch information
Abhishek-N authored Jan 3, 2024
2 parents ecbdb77 + fb26102 commit 5443f9f
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 2,396 deletions.
762 changes: 60 additions & 702 deletions ddpui/api/airbyte_api.py

Large diffs are not rendered by default.

30 changes: 3 additions & 27 deletions ddpui/api/dbt_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from uuid import uuid4
from pathlib import Path
from redis import Redis
from uuid import uuid4

from ninja import NinjaAPI
from ninja.errors import HttpError
Expand All @@ -13,14 +13,13 @@
from ddpui import auth
from ddpui.ddpprefect.schema import OrgDbtSchema, OrgDbtGitHub, OrgDbtTarget
from ddpui.models.org_user import OrgUserResponse, OrgUser
from ddpui.models.org import OrgPrefectBlock, OrgPrefectBlockv1
from ddpui.ddpprefect import DBTCORE, DBTCLIPROFILE
from ddpui.models.org import OrgPrefectBlockv1
from ddpui.ddpprefect import DBTCLIPROFILE
from ddpui.utils.helpers import runcmd
from ddpui.utils.dbtdocs import create_single_html
from ddpui.celeryworkers.tasks import (
setup_dbtworkspace,
clone_github_repo,
update_dbt_core_block_schema_task,
)
from ddpui.ddpdbt import dbt_service
from ddpui.ddpprefect import prefect_service
Expand Down Expand Up @@ -97,29 +96,6 @@ def put_dbt_github(request, payload: OrgDbtGitHub):
return {"task_id": task.id}


@dbtapi.put("/schema/", auth=auth.CanManagePipelines())
def put_dbt_schema(request, payload: OrgDbtTarget):
"""Update the target_configs.schema for all dbt-core-op blocks"""
orguser: OrgUser = request.orguser
org = orguser.org
if org.dbt is None:
raise HttpError(400, "create a dbt workspace first")

org.dbt.default_schema = payload.target_configs_schema
org.dbt.save()
logger.info("updated orgdbt")

for dbtblock in OrgPrefectBlock.objects.filter(org=org, block_type=DBTCORE):
logger.info(
"updating schema of %s to %s",
dbtblock.block_name,
org.dbt.default_schema,
)
update_dbt_core_block_schema_task.delay(
dbtblock.block_name, org.dbt.default_schema
)


@dbtapi.delete("/workspace/", response=OrgUserResponse, auth=auth.CanManagePipelines())
def dbt_delete(request):
"""Delete the dbt workspace and project repo created"""
Expand Down
119 changes: 0 additions & 119 deletions ddpui/api/user_org_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
OrgSchema,
OrgWarehouse,
OrgWarehouseSchema,
OrgPrefectBlock,
OrgDataFlow,
)
from ddpui.models.org_user import (
AcceptInvitationSchema,
Expand All @@ -43,10 +41,7 @@
DeleteOrgUserPayload,
)
from ddpui.models.orgtnc import OrgTnC
from ddpui.ddpprefect import prefect_service
from ddpui.ddpairbyte import airbyte_service, airbytehelpers
from ddpui.ddpdbt import dbt_service
from ddpui.ddpprefect import AIRBYTECONNECTION
from ddpui.utils.custom_logger import CustomLogger
from ddpui.utils import secretsmanager
from ddpui.utils import sendgrid
Expand Down Expand Up @@ -330,46 +325,6 @@ def post_transfer_ownership(request, payload: OrgUserNewOwner):
return from_orguser(requestor_orguser)


@user_org_api.post("/organizations/", response=OrgSchema, auth=auth.AnyOrgUser())
def post_organization(request, payload: OrgSchema):
"""creates a new org & new orguser (if required) and attaches it to the requestor"""
userattributes = UserAttributes.objects.filter(user=request.orguser.user).first()
if userattributes is None or userattributes.can_create_orgs is False:
raise HttpError(403, "Insufficient permissions for this operation")

orguser: OrgUser = request.orguser
org = Org.objects.filter(name__iexact=payload.name).first()
if org:
raise HttpError(400, "client org with this name already exists")

org = Org(name=payload.name)
org.slug = slugify(org.name)[:20]
org.save()
logger.info(f"{orguser.user.email} created new org {org.name}")
try:
new_workspace = airbytehelpers.setup_airbyte_workspace(org.slug, org)
except Exception as error:
# delete the org or we won't be able to create it once airbyte comes back up
org.delete()
raise HttpError(400, "could not create airbyte workspace") from error

# create a new orguser if the org is already there
if orguser.org is None:
orguser.org = org
orguser.save()
else:
orguser = OrgUser.objects.create(
user=orguser.user,
role=OrgUserRole.ACCOUNT_MANAGER,
email_verified=True,
org=org,
)

return OrgSchema(
name=org.name, airbyte_workspace_id=new_workspace.workspaceId, slug=org.slug
)


@user_org_api.post("/organizations/warehouse/", auth=auth.CanManagePipelines())
def post_organization_warehouse(request, payload: OrgWarehouseSchema):
"""registers a data warehouse for the org"""
Expand Down Expand Up @@ -424,80 +379,6 @@ def post_organization_warehouse(request, payload: OrgWarehouseSchema):
return {"success": 1}


@user_org_api.delete("/organizations/warehouses/", auth=auth.CanManagePipelines())
def delete_organization_warehouses(request):
"""deletes all (references to) data warehouses for the org"""
orguser: OrgUser = request.orguser
if orguser.org is None:
raise HttpError(400, "create an organization first")

warehouse = OrgWarehouse.objects.filter(org=orguser.org).first()
if warehouse is None:
raise HttpError(400, "warehouse not created")

# delete prefect connection blocks
logger.info("Deleting prefect connection blocks")
for block in OrgPrefectBlock.objects.filter(
org=orguser.org, block_type=AIRBYTECONNECTION
):
try:
prefect_service.delete_airbyte_connection_block(block.block_id)
logger.info(f"delete connecion block id - {block.block_id}")
except Exception: # skipcq PYL-W0703
logger.error(
"failed to delete %s airbyte-connection-block %s in prefect, deleting from OrgPrefectBlock",
orguser.org.slug,
block.block_id,
)
block.delete()

logger.info("FINISHED Deleting prefect connection blocks")

# delete airbyte connections
logger.info("Deleting airbyte connections")
for connection in airbyte_service.get_connections(orguser.org.airbyte_workspace_id)[
"connections"
]:
connection_id = connection["connectionId"]
airbyte_service.delete_connection(
orguser.org.airbyte_workspace_id, connection_id
)
logger.info(f"deleted connection in Airbyte - {connection_id}")

logger.info("FINISHED Deleting airbyte connections")

# delete airbyte destinations
logger.info("Deleting airbyte destinations")
for destination in airbyte_service.get_destinations(
orguser.org.airbyte_workspace_id
)["destinations"]:
destination_id = destination["destinationId"]
airbyte_service.delete_destination(
orguser.org.airbyte_workspace_id, destination_id
)
logger.info(f"deleted destination in Airbyte - {destination_id}")

logger.info("FINISHED Deleting airbyte destinations")

# delete django warehouse row
logger.info("Deleting django warehouse and the credentials in secrets manager")
secretsmanager.delete_warehouse_credentials(warehouse)
warehouse.delete()

# delete dbt workspace and blocks
dbt_service.delete_dbt_workspace(orguser.org)

# delete dataflows
logger.info("Deleting data flows")
for data_flow in OrgDataFlow.objects.filter(org=orguser.org):
prefect_service.delete_deployment_by_id(data_flow.deployment_id)
data_flow.delete()
logger.info(f"Deleted deployment - {data_flow.deployment_id}")
logger.info("FINISHED Deleting data flows")

return {"success": 1}


@user_org_api.get("/organizations/warehouses", auth=auth.CanManagePipelines())
def get_organizations_warehouses(request):
"""returns all warehouses associated with this org"""
Expand Down
5 changes: 5 additions & 0 deletions ddpui/ddpairbyte/airbyte_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
"""
Airbyte service module
Functions which communicate with Airbyte
These functions do not access the Dalgo database
"""
import os
from typing import Dict, List
import requests
Expand Down
68 changes: 1 addition & 67 deletions ddpui/ddpairbyte/airbytehelpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from ddpui.ddpairbyte.schema import AirbyteWorkspace
from ddpui.ddpprefect import prefect_service
from ddpui.ddpprefect import AIRBYTESERVER
from ddpui.models.org import OrgPrefectBlock, OrgPrefectBlockv1
from ddpui.models.org import OrgPrefectBlockv1
from ddpui.utils.custom_logger import CustomLogger

logger = CustomLogger("airbyte")
Expand Down Expand Up @@ -82,72 +82,6 @@ def upgrade_custom_sources(workspace_id: str) -> None:
)


def setup_airbyte_workspace(wsname, org) -> AirbyteWorkspace:
"""creates an airbyte workspace and attaches it to the org
also creates an airbyte server block in prefect if there isn't one already
we don't need to update any existing server block because it does not hold
the workspace id... only the connection details of the airbyte server
"""
workspace = airbyte_service.create_workspace(wsname)

org.airbyte_workspace_id = workspace["workspaceId"]
org.save()

try:
for custom_source_info in settings.AIRBYTE_CUSTOM_SOURCES.values():
add_custom_airbyte_connector(
workspace["workspaceId"],
custom_source_info["name"],
custom_source_info["docker_repository"],
custom_source_info["docker_image_tag"],
custom_source_info["documentation_url"],
)
except Exception as error:
logger.error("Error creating custom source definitions: %s", str(error))
raise error

# Airbyte server block details. prefect doesn't know the workspace id
block_name = f"{org.slug}-{slugify(AIRBYTESERVER)}"
display_name = wsname

airbyte_server_block_cleaned_name = block_name
try:
airbyte_server_block_id = prefect_service.get_airbyte_server_block_id(
block_name
)
except Exception as exc:
raise Exception("could not connect to prefect-proxy") from exc

if airbyte_server_block_id is None:
(
airbyte_server_block_id,
airbyte_server_block_cleaned_name,
) = prefect_service.create_airbyte_server_block(block_name)
logger.info(f"Created Airbyte server block with ID {airbyte_server_block_id}")

if not OrgPrefectBlock.objects.filter(org=org, block_type=AIRBYTESERVER).exists():
org_airbyte_server_block = OrgPrefectBlock(
org=org,
block_type=AIRBYTESERVER,
block_id=airbyte_server_block_id,
block_name=airbyte_server_block_cleaned_name,
display_name=display_name,
)
try:
org_airbyte_server_block.save()
except Exception as error:
prefect_service.delete_airbyte_server_block(airbyte_server_block_id)
raise Exception(
"could not create orgprefectblock for airbyte-server"
) from error

return AirbyteWorkspace(
name=workspace["name"],
workspaceId=workspace["workspaceId"],
initialSetupComplete=workspace["initialSetupComplete"],
)


def setup_airbyte_workspace_v1(wsname, org) -> AirbyteWorkspace:
"""creates an airbyte workspace and attaches it to the org
also creates an airbyte server block in prefect if there isn't one already
Expand Down
2 changes: 1 addition & 1 deletion ddpui/ddpprefect/prefect_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def create_dbt_cli_profile_block(

def update_dbt_cli_profile_block(
block_name: str,
wtype: str,
wtype: str = None,
profilename: str = None,
target: str = None,
credentials: dict = None,
Expand Down
Loading

0 comments on commit 5443f9f

Please sign in to comment.