Skip to content

Commit

Permalink
Merge pull request #404 from DalgoT4D/updating-warehouse-in-cli-profile
Browse files Browse the repository at this point in the history
Updating warehouse in cli profile
  • Loading branch information
fatchat authored Dec 31, 2023
2 parents 03440a1 + a801840 commit e329dc7
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 7 deletions.
7 changes: 7 additions & 0 deletions Plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ Plan to go away from the prefect dbt core blocks & connection blocks
| Django: create airbyte workspace `POST /api/airbyte/workspace/` | Django : new api to create airbyte workspace `POST /api/airbyte/v1/workspace/` |
| Django: create organization `POST /api/organizations/` | Django : new api to create org `POST /api/v1/organizations/` |

#### <u>Warehouse and cli profile</u>

| Before | After |
| --------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------- |
| Django: put airbyte destination `PUT /api/airbyte/destinations/{destination_id}/` | Django : new api to create airbyte destination `POST /api/airbyte/v1/destinations/{destination_id}/` |
| | Proxy: update cli profile block `PUT /proxy/blocks/dbtcli/profile/` |

#### <u>Airbyte connections</u>

| Before | After |
Expand Down
82 changes: 77 additions & 5 deletions ddpui/api/client/airbyte_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@
PrefectDataFlowCreateSchema3,
)

from ddpui.ddpprefect import (
AIRBYTESERVER,
AIRBYTECONNECTION,
DBTCORE,
)
from ddpui.ddpprefect import AIRBYTESERVER, AIRBYTECONNECTION, DBTCORE, DBTCLIPROFILE
from ddpui.ddpprefect import prefect_service
from ddpui.models.org import (
OrgPrefectBlock,
Expand Down Expand Up @@ -1472,3 +1468,79 @@ def get_latest_job_for_connection(request, connection_id):
logs = airbyte_service.get_logs_for_job(job_info["job_id"])
job_info["logs"] = logs["logs"]["logLines"]
return job_info


@airbyteapi.put("/v1/destinations/{destination_id}/", auth=auth.CanManagePipelines())
def put_airbyte_destination_v1(
request, destination_id: str, payload: AirbyteDestinationUpdate
):
"""Update an airbyte destination in the user organization workspace"""
orguser: OrgUser = request.orguser
if orguser.org is None:
raise HttpError(400, "create an organization first")
if orguser.org.airbyte_workspace_id is None:
raise HttpError(400, "create an airbyte workspace first")

destination = airbyte_service.update_destination(
destination_id, payload.name, payload.config, payload.destinationDefId
)
logger.info("updated destination having id " + destination["destinationId"])
warehouse = OrgWarehouse.objects.filter(org=orguser.org).first()

if warehouse.name != payload.name:
warehouse.name = payload.name
warehouse.save()

dbt_credentials = secretsmanager.retrieve_warehouse_credentials(warehouse)

if warehouse.wtype == "postgres":
aliases = {
"dbname": "database",
}
for config_key in ["host", "port", "username", "password", "database"]:
if (
config_key in payload.config
and isinstance(payload.config[config_key], str)
and len(payload.config[config_key]) > 0
and list(set(payload.config[config_key]))[0] != "*"
):
dbt_credentials[aliases.get(config_key, config_key)] = payload.config[
config_key
]

elif warehouse.wtype == "bigquery":
dbt_credentials = json.loads(payload.config["credentials_json"])
elif warehouse.wtype == "snowflake":
if (
"credentials" in payload.config
and "password" in payload.config["credentials"]
and isinstance(payload.config["credentials"]["password"], str)
and len(payload.config["credentials"]["password"]) > 0
and list(set(payload.config["credentials"]["password"])) != "*"
):
dbt_credentials["credentials"]["password"] = payload.config["credentials"][
"password"
]

else:
raise HttpError(400, "unknown warehouse type " + warehouse.wtype)

secretsmanager.update_warehouse_credentials(warehouse, dbt_credentials)

cli_profile_block = OrgPrefectBlockv1.objects.filter(
org=orguser.org, block_type=DBTCLIPROFILE
).first()

if cli_profile_block:
logger.info(f"Updating the cli profile block : {cli_profile_block.block_name}")
prefect_service.update_dbt_cli_profile_block(
block_name=cli_profile_block.block_name,
wtype=warehouse.wtype,
credentials=dbt_credentials,
bqlocation=payload.config["dataset_location"]
if "dataset_location" in payload.config
else None,
)
logger.info(f"Successfully updated the cli profile block : {cli_profile_block.block_name}")

return {"destinationId": destination["destinationId"]}
36 changes: 34 additions & 2 deletions ddpui/api/client/dbt_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from ddpui import auth
from ddpui.ddpprefect.schema import OrgDbtSchema, OrgDbtGitHub, OrgDbtTarget
from ddpui.models.org_user import OrgUserResponse, OrgUser
from ddpui.models.org import OrgPrefectBlock
from ddpui.ddpprefect import DBTCORE
from ddpui.models.org import OrgPrefectBlock, OrgPrefectBlockv1
from ddpui.ddpprefect import DBTCORE, DBTCLIPROFILE
from ddpui.utils.helpers import runcmd
from ddpui.utils.dbtdocs import create_single_html
from ddpui.celeryworkers.tasks import (
Expand All @@ -23,6 +23,7 @@
update_dbt_core_block_schema_task,
)
from ddpui.ddpdbt import dbt_service
from ddpui.ddpprefect import prefect_service
from ddpui.utils.custom_logger import CustomLogger
from ddpui.utils.orguserhelpers import from_orguser

Expand Down Expand Up @@ -196,3 +197,34 @@ def post_dbt_makedocs(request):
redis.expire(redis_key, 3600 * 24)

return {"token": token.hex}


@dbtapi.put("/v1/schema/", auth=auth.CanManagePipelines())
def put_dbt_schema_v1(request, payload: OrgDbtTarget):
"""Update the target_configs.schema for the dbt cli profile block"""
orguser: OrgUser = request.orguser
org = orguser.org
if org.dbt is None:
raise HttpError(400, "create a dbt workspace first")

org.dbt.default_schema = payload.target_configs_schema
org.dbt.save()
logger.info("updated orgdbt")

cli_profile_block = OrgPrefectBlockv1.objects.filter(
org=orguser.org, block_type=DBTCLIPROFILE
).first()

if cli_profile_block:
logger.info(
f"Updating the cli profile block's schema : {cli_profile_block.block_name}"
)
prefect_service.update_dbt_cli_profile_block(
block_name=cli_profile_block.block_name,
target=payload.target_configs_schema,
)
logger.info(
f"Successfully updated the cli profile block's schema : {cli_profile_block.block_name}"
)

return {"success": 1}
28 changes: 28 additions & 0 deletions ddpui/ddpprefect/prefect_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,34 @@ def create_dbt_cli_profile_block(
return response


def update_dbt_cli_profile_block(
block_name: str,
wtype: str,
profilename: str = None,
target: str = None,
credentials: dict = None,
bqlocation: str = None,
new_block_name: str = None,
):
"""Update the dbt cli profile for an org"""
response = prefect_put(
"blocks/dbtcli/profile/",
{
"cli_profile_block_name": block_name,
"wtype": wtype,
"profile": {
"name": profilename,
"target": target,
"target_configs_schema": target,
},
"credentials": credentials,
"bqlocation": bqlocation,
"new_block_name": new_block_name,
},
)
return response


def delete_dbt_cli_profile_block(block_id) -> None:
"""Delete dbt cli profile block in prefect"""
prefect_delete_a_block(block_id)
Expand Down

0 comments on commit e329dc7

Please sign in to comment.