From 7800a7a5e195c25862e235963a71dde35c7057d6 Mon Sep 17 00:00:00 2001 From: mk-armah Date: Fri, 12 Jul 2024 17:25:31 +0000 Subject: [PATCH 01/12] resolved issues with caching --- .gitignore | 3 +- integration/clients/cache.py | 1 + integration/clients/humanitec_client.py | 158 ++++++++++++++---------- integration/clients/port_client.py | 6 +- integration/main.py | 55 +++++++-- 5 files changed, 141 insertions(+), 82 deletions(-) diff --git a/.gitignore b/.gitignore index 01d7f95..106ee6a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ __pycache__ -venv \ No newline at end of file +venv +**/__init__.py \ No newline at end of file diff --git a/integration/clients/cache.py b/integration/clients/cache.py index a8f9d82..5e67937 100644 --- a/integration/clients/cache.py +++ b/integration/clients/cache.py @@ -1,6 +1,7 @@ import asyncio from typing import Dict, Any + class InMemoryCache: def __init__(self): self.cache = {} diff --git a/integration/clients/humanitec_client.py b/integration/clients/humanitec_client.py index fbeaa6d..4186df2 100644 --- a/integration/clients/humanitec_client.py +++ b/integration/clients/humanitec_client.py @@ -10,7 +10,6 @@ class CACHE_KEYS: APPLICATION = "APPLICATION_CACHE_KEY" ENVIRONMENT = "ENVIRONMENT_CACHE_KEY" - WORKLOAD = "WORKLOAD_CACHE_KEY" RESOURCE = "RESOURCE_CACHE_KEY" @@ -36,7 +35,7 @@ async def send_api_request( method: str, endpoint: str, headers: Dict[str, str] | None = None, - json: Dict[str, Any] | None = None, + json: Dict[str, Any] | List[Dict[str, Any]] | None = None, ) -> Any: url = self.base_url + endpoint try: @@ -72,57 +71,73 @@ async def get_all_applications(self) -> List[Dict[str, Any]]: return applications async def get_all_environments(self, app) -> List[Dict[str, Any]]: - if cached_environments := await self.cache.get(CACHE_KEYS.ENVIRONMENT): - cached_environments = cached_environments.get(app["id"], {}) - logger.info( - f"Retrieved {len(cached_environments)} environment for {app['id']} from cache" - ) - return list(cached_environments.values()) - endpoint = f"apps/{app['id']}/envs" - humanitec_headers = self.get_humanitec_headers() - environments: List[Dict[str, Any]] = await self.send_api_request( - "GET", endpoint, headers=humanitec_headers - ) - await self.cache.set( - CACHE_KEYS.ENVIRONMENT, - { - app["id"]: { - environment["id"]: environment for environment in environments - } - }, - ) - logger.info(f"Received {len(environments)} environments from Humanitec") - return environments + try: + if cached_environments := await self.cache.get(CACHE_KEYS.ENVIRONMENT): + if cached_environments := cached_environments.get(app["id"], {}): + logger.info( + f"Retrieved {len(cached_environments)} environment for {app['id']} from cache" + ) + return list(cached_environments.values()) + + logger.info("Fetching environments from Humanitec") + + endpoint = f"apps/{app['id']}/envs" + humanitec_headers = self.get_humanitec_headers() + environments: List[Dict[str, Any]] = await self.send_api_request( + "GET", endpoint, headers=humanitec_headers + ) + await self.cache.set( + CACHE_KEYS.ENVIRONMENT, + { + app["id"]: { + environment["id"]: environment for environment in environments + } + }, + ) + logger.info(f"Received {len(environments)} environments from Humanitec") + return environments + except Exception as e: + logger.error(f"Failed to fetch environments from {app['id']}: {str(e)}") + return [] async def get_all_resources(self, app, env) -> List[Dict[str, Any]]: - if cached_resources := await self.cache.get(CACHE_KEYS.RESOURCE): - cached_resources = cached_resources.get(app["id"], {}).get(env["id"], {}) - logger.info( - f"Retrieved {len(cached_resources)} resources from cache for app {app['id']} and env {env['id']}" + try: + if cached_resources := await self.cache.get(CACHE_KEYS.RESOURCE): + if cached_resources := cached_resources.get(app["id"], {}).get( + env["id"], {} + ): + logger.info( + f"Retrieved {len(cached_resources)} resources from cache for app {app['id']} and env {env['id']}" + ) + return list(cached_resources.values()) + + logger.info("Fetching resources from Humanitec") + endpoint = f"apps/{app['id']}/envs/{env['id']}/resources" + humanitec_headers = self.get_humanitec_headers() + resources: List[Dict[str, Any]] = await self.send_api_request( + "GET", endpoint, headers=humanitec_headers ) - return list(cached_resources.values()) - - endpoint = f"apps/{app['id']}/envs/{env['id']}/resources" - humanitec_headers = self.get_humanitec_headers() - resources: List[Dict[str, Any]] = await self.send_api_request( - "GET", endpoint, headers=humanitec_headers - ) - await self.cache.set( - CACHE_KEYS.RESOURCE, - { - app["id"]: { - env["id"]: { - resource["gu_res_id"]: resource for resource in resources + await self.cache.set( + CACHE_KEYS.RESOURCE, + { + app["id"]: { + env["id"]: { + resource["gu_res_id"]: resource for resource in resources + } } - } - }, - ) - logger.info(f"Received {len(resources)} resources from Humanitec") - return resources + }, + ) + logger.info(f"Received {len(resources)} resources from Humanitec") + return resources + except Exception as e: + logger.error( + f"Failed to fetch resources from {env['id']} environment in {app[id]}: {str(e)}" + ) + return [] async def get_resource_graph( - self, app: str, env: str, data: List[Dict[str, Any]] + self, app: Dict[str, Any], env: Dict[str, Any], data: List[Dict[str, Any]] ) -> Any: endpoint = f"apps/{app['id']}/envs/{env['id']}/resources/graph" humanitec_headers = self.get_humanitec_headers() @@ -133,30 +148,41 @@ async def get_resource_graph( return graph async def get_all_resource_graphs( - self, modules: List[Dict[str, Any]], app: str, env: str - ) -> Any: - - def get_resource_graph_request_body(modules): - return [ - { - "id": module["gu_res_id"], - "type": module["type"], - "resource": module["resource"], - } - for module in modules - ] - data = get_resource_graph_request_body(modules) - - graph_entities = await self.get_resource_graph(app, env, data) - logger.info( - f"Received {len(graph_entities)} resource graph entities from app: {app['id']} and env: {env['id']} using data: {data}" - ) - return graph_entities + self, modules: List[Dict[str, Any]], app: Dict[str, Any], env: Dict[str, Any] + ) -> List[Dict[str, Any]]: + + try: + + def get_resource_graph_request_body(modules): + return [ + { + "id": module["gu_res_id"], + "type": module["type"], + "resource": module["resource"], + } + for module in modules + ] + + data = get_resource_graph_request_body(modules) + + graph_entities: List[Dict[str, Any]] = await self.get_resource_graph( + app, env, data + ) + logger.info( + f"Received {len(graph_entities)} resource graph entities from app: {app['id']} and env: {env['id']} using data: {data}" + ) + print("Graph Entities", graph_entities) + return graph_entities + except Exception as e: + logger.error( + f"Failed to fetch resource graphs from {env['id']} environment in {app['id']}: {str(e)}" + ) + return [] def group_resources_by_type( self, data: List[Dict[str, Any]] ) -> Dict[str, List[Dict[str, Any]]]: - grouped_resources = {} + grouped_resources: dict[str, Any] = {} for resource in data: workload_id = resource["res_id"].split(".")[0] if workload_id not in grouped_resources: diff --git a/integration/clients/port_client.py b/integration/clients/port_client.py index 59056fc..e906d7b 100644 --- a/integration/clients/port_client.py +++ b/integration/clients/port_client.py @@ -1,15 +1,13 @@ import httpx from typing import Any, Dict from loguru import logger -from typing import List, Dict, Optional, Union -from .cache import InMemoryCache +from typing import Dict class PortClient: def __init__(self, client_id, client_secret, **kwargs) -> None: self.httpx_async_client = kwargs.get("httpx_async_client", httpx.AsyncClient()) self.client_id = client_id - self.cache = InMemoryCache() self.client_secret = client_secret self.base_url = kwargs.get("base_url", "https://api.getport.io/v1") self.port_headers = None @@ -49,7 +47,7 @@ async def send_api_request( async def upsert_entity( self, blueprint_id: str, entity_object: Dict[str, Any] - ) -> None: + ) -> Dict[str, Any]: endpoint = f"/blueprints/{blueprint_id}/entities?upsert=true&merge=true" port_headers = ( self.port_headers if self.port_headers else await self.get_port_headers() diff --git a/integration/main.py b/integration/main.py index b3f8591..f07ab88 100644 --- a/integration/main.py +++ b/integration/main.py @@ -2,7 +2,7 @@ import argparse import time import datetime -from decouple import config +from decouple import config # type: ignore import re import asyncio from loguru import logger @@ -100,6 +100,7 @@ def create_entity(application, environment): async def sync_workloads(self): logger.info(f"Syncing entities for blueprint {BLUEPRINT.WORKLOAD}") + def create_workload_entity(resource): return { "identifier": resource["res_id"].replace("modules.", ""), @@ -172,8 +173,8 @@ def create_resource_graph_entity(graph_data, include_relations): if not modules: continue - resource_graph = await humanitec_client.get_all_resource_graphs(modules, - application, environment + resource_graph = await humanitec_client.get_all_resource_graphs( + modules, application, environment ) # First pass: Create entities without relations @@ -199,7 +200,9 @@ def create_resource_graph_entity(graph_data, include_relations): for graph_data in resource_graph ] await asyncio.gather(*tasks) - logger.info(f"Finished syncing entities for blueprint {BLUEPRINT.RESOURCE_GRAPH}") + logger.info( + f"Finished syncing entities for blueprint {BLUEPRINT.RESOURCE_GRAPH}" + ) async def enrich_resource_with_graph(self, resource, application, environment): data = { @@ -218,6 +221,7 @@ async def enrich_resource_with_graph(self, resource, application, environment): async def sync_resources(self) -> None: logger.info(f"Syncing entities for blueprint {BLUEPRINT.RESOURCE}") + def create_resource_entity(resource): workload_id = ( resource["res_id"].split(".")[1] @@ -297,7 +301,7 @@ async def __call__(self, args) -> None: def validate_args(args): required_keys = ["org_id", "api_key", "port_client_id", "port_client_secret"] missing_keys = [key for key in required_keys if not getattr(args, key)] - + if missing_keys: logger.error(f"The following keys are required: {', '.join(missing_keys)}") return False @@ -305,24 +309,43 @@ def validate_args(args): parser = argparse.ArgumentParser() parser.add_argument( - "--org-id", required=False,default=config("ORG_ID",""), type=str, help="Humanitec organization ID" + "--org-id", + required=False, + default=config("ORG_ID", ""), + type=str, + help="Humanitec organization ID", + ) + parser.add_argument( + "--api-key", + required=False, + default=config("API_KEY", ""), + type=str, + help="Humanitec API key", ) - parser.add_argument("--api-key", required=False,default=config("API_KEY",""), type=str, help="Humanitec API key") parser.add_argument( "--api-url", type=str, - default=config("API_URL","https://api.humanitec.com"), + default=config("API_URL", "https://api.humanitec.com"), help="Humanitec API URL", ) parser.add_argument( - "--port-client-id", type=str, required=False,default=config("PORT_CLIENT_ID",""), help="Port client ID" + "--port-client-id", + type=str, + required=False, + default=config("PORT_CLIENT_ID", ""), + help="Port client ID", ) parser.add_argument( - "--port-client-secret", type=str, required=False,default = config("PORT_CLIENT_SECRET",""), help="Port client secret" + "--port-client-secret", + type=str, + required=False, + default=config("PORT_CLIENT_SECRET", ""), + help="Port client secret", ) args = parser.parse_args() - if not(validate_args(args)): + if not (validate_args(args)): import sys + sys.exit() httpx_async_client = httpx.AsyncClient() @@ -339,3 +362,13 @@ def validate_args(args): ) exporter = HumanitecExporter(port_client, humanitec_client) asyncio.run(exporter(args)) + +# PORT_CLIENT_ID = "Ex3GeM9hXjiYowHNkoWUxsMnP0ZXsMNm" # config("PORT_CLIENT_ID") +# PORT_CLIENT_SECRET = "ZFZudx0u4rlLGrLme944a9gBOh2j3eirqd9SwJ3XqEU7oCRthv088uBCj1CyZCy0" # config("PORT_CLIENT_SECRET") +# PORT_API_URL = "https://api.getport.io/v1" + +# HUMANITEC_API_TOKEN = "WLnM3EJm1bGMScqvuu2HH4YBB5FaNRlYUqkuc2C-RrDM" +# BASE_URL = "https://api.humanitec.io" +# HUMANITEC_ORG_ID = "port-testing" + +# python integration/main.py --org-id=port-testing --api-key=WLnM3EJm1bGMScqvuu2HH4YBB5FaNRlYUqkuc2C-RrDM --port-client-id=Ex3GeM9hXjiYowHNkoWUxsMnP0ZXsMNm --port-client-secret=ZFZudx0u4rlLGrLme944a9gBOh2j3eirqd9SwJ3XqEU7oCRthv088uBCj1CyZCy0 From ab5eab9229b708c4d4df124765d9595e90c7b10e Mon Sep 17 00:00:00 2001 From: mk-armah Date: Fri, 12 Jul 2024 17:44:09 +0000 Subject: [PATCH 02/12] removed creds --- integration/main.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/integration/main.py b/integration/main.py index f07ab88..3fe6f08 100644 --- a/integration/main.py +++ b/integration/main.py @@ -362,13 +362,3 @@ def validate_args(args): ) exporter = HumanitecExporter(port_client, humanitec_client) asyncio.run(exporter(args)) - -# PORT_CLIENT_ID = "Ex3GeM9hXjiYowHNkoWUxsMnP0ZXsMNm" # config("PORT_CLIENT_ID") -# PORT_CLIENT_SECRET = "ZFZudx0u4rlLGrLme944a9gBOh2j3eirqd9SwJ3XqEU7oCRthv088uBCj1CyZCy0" # config("PORT_CLIENT_SECRET") -# PORT_API_URL = "https://api.getport.io/v1" - -# HUMANITEC_API_TOKEN = "WLnM3EJm1bGMScqvuu2HH4YBB5FaNRlYUqkuc2C-RrDM" -# BASE_URL = "https://api.humanitec.io" -# HUMANITEC_ORG_ID = "port-testing" - -# python integration/main.py --org-id=port-testing --api-key=WLnM3EJm1bGMScqvuu2HH4YBB5FaNRlYUqkuc2C-RrDM --port-client-id=Ex3GeM9hXjiYowHNkoWUxsMnP0ZXsMNm --port-client-secret=ZFZudx0u4rlLGrLme944a9gBOh2j3eirqd9SwJ3XqEU7oCRthv088uBCj1CyZCy0 From e1b72bb8125b95d55ad1eee9f4d0e93460cd7ad3 Mon Sep 17 00:00:00 2001 From: mk-armah Date: Fri, 12 Jul 2024 20:53:35 +0000 Subject: [PATCH 03/12] extended timeouts --- integration/main.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/integration/main.py b/integration/main.py index 3fe6f08..c355637 100644 --- a/integration/main.py +++ b/integration/main.py @@ -66,7 +66,7 @@ async def sync_environments(self) -> None: def create_entity(application, environment): return { - "identifier": environment["id"], + "identifier": f"{application['id']}/{environment['id']}", "title": environment["name"], "properties": { "type": environment["type"], @@ -101,7 +101,7 @@ def create_entity(application, environment): async def sync_workloads(self): logger.info(f"Syncing entities for blueprint {BLUEPRINT.WORKLOAD}") - def create_workload_entity(resource): + def create_workload_entity(resource, application): return { "identifier": resource["res_id"].replace("modules.", ""), "title": self.remove_symbols_and_title_case( @@ -117,7 +117,7 @@ def create_workload_entity(resource): "graphResourceID": resource["gu_res_id"], }, "relations": { - BLUEPRINT.ENVIRONMENT: resource["env_id"], + BLUEPRINT.ENVIRONMENT: f"{application['id']}/{environment['id']}", }, } @@ -132,7 +132,7 @@ def create_workload_entity(resource): tasks = [ self.port_client.upsert_entity( blueprint_id=BLUEPRINT.WORKLOAD, - entity_object=create_workload_entity(resource), + entity_object=create_workload_entity(resource, application), ) for resource in resource_group.get("modules", []) if resource and resource["type"] == "workload" @@ -348,7 +348,8 @@ def validate_args(args): sys.exit() - httpx_async_client = httpx.AsyncClient() + timeout = httpx.Timeout(10.0, connect=10.0, read=10.0, write=10.0) + httpx_async_client = httpx.AsyncClient(timeout=timeout) port_client = PortClient( args.port_client_id, args.port_client_secret, From 3b9db0dfe4f76b9dd2271292b6aaa6f38f1c09cd Mon Sep 17 00:00:00 2001 From: mk-armah Date: Fri, 12 Jul 2024 21:00:01 +0000 Subject: [PATCH 04/12] remove print logs --- integration/clients/humanitec_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/integration/clients/humanitec_client.py b/integration/clients/humanitec_client.py index 4186df2..db1580a 100644 --- a/integration/clients/humanitec_client.py +++ b/integration/clients/humanitec_client.py @@ -171,7 +171,6 @@ def get_resource_graph_request_body(modules): logger.info( f"Received {len(graph_entities)} resource graph entities from app: {app['id']} and env: {env['id']} using data: {data}" ) - print("Graph Entities", graph_entities) return graph_entities except Exception as e: logger.error( From b43205583cb155cc5802192df055b214f111617e Mon Sep 17 00:00:00 2001 From: mk-armah Date: Mon, 15 Jul 2024 13:21:40 +0000 Subject: [PATCH 05/12] Fixed issue with resource graph --- integration/clients/humanitec_client.py | 4 ++-- integration/main.py | 10 ++++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/integration/clients/humanitec_client.py b/integration/clients/humanitec_client.py index db1580a..927abcd 100644 --- a/integration/clients/humanitec_client.py +++ b/integration/clients/humanitec_client.py @@ -74,7 +74,7 @@ async def get_all_environments(self, app) -> List[Dict[str, Any]]: try: if cached_environments := await self.cache.get(CACHE_KEYS.ENVIRONMENT): - if cached_environments := cached_environments.get(app["id"], {}): + if cached_environments := cached_environments.get(app["id"]): logger.info( f"Retrieved {len(cached_environments)} environment for {app['id']} from cache" ) @@ -156,7 +156,7 @@ async def get_all_resource_graphs( def get_resource_graph_request_body(modules): return [ { - "id": module["gu_res_id"], + "id": module["res_id"], "type": module["type"], "resource": module["resource"], } diff --git a/integration/main.py b/integration/main.py index c355637..8171dec 100644 --- a/integration/main.py +++ b/integration/main.py @@ -102,8 +102,9 @@ async def sync_workloads(self): logger.info(f"Syncing entities for blueprint {BLUEPRINT.WORKLOAD}") def create_workload_entity(resource, application): + identifier = f"{application['id']}/{environment['id']}/{resource['res_id'].replace('modules.', '')}" return { - "identifier": resource["res_id"].replace("modules.", ""), + "identifier": identifier, "title": self.remove_symbols_and_title_case( resource["res_id"].replace("modules.", "") ), @@ -151,7 +152,7 @@ def create_resource_graph_entity(graph_data, include_relations): "type": graph_data["type"], "class": graph_data["class"], "resourceSchema": graph_data["resource_schema"], - "resource": graph_data["resource"], + "resource": graph_data["resource"] }, "relations": {}, } @@ -206,9 +207,9 @@ def create_resource_graph_entity(graph_data, include_relations): async def enrich_resource_with_graph(self, resource, application, environment): data = { - "id": resource["gu_res_id"], + "id": resource["res_id"], "type": resource["type"], - "resource": resource["resource"], + "resource": resource["resource"] } response = await humanitec_client.get_resource_graph( application, environment, [data] @@ -228,6 +229,7 @@ def create_resource_entity(resource): if resource["res_id"].split(".")[0].startswith("modules") else "" ) + workload_id = f"{resource['app_id']}/{resource['env_id']}/{workload_id}" return { "identifier": resource["__resourceGraph"]["guresid"], "title": self.remove_symbols_and_title_case(resource["def_id"]), From 23f7a4407175edee46e72a28dcb542a38acea70a Mon Sep 17 00:00:00 2001 From: mk-armah Date: Mon, 15 Jul 2024 14:19:27 +0000 Subject: [PATCH 06/12] improved error handling on resource syncing --- integration/main.py | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/integration/main.py b/integration/main.py index 8171dec..175f386 100644 --- a/integration/main.py +++ b/integration/main.py @@ -152,7 +152,7 @@ def create_resource_graph_entity(graph_data, include_relations): "type": graph_data["type"], "class": graph_data["class"], "resourceSchema": graph_data["resource_schema"], - "resource": graph_data["resource"] + "resource": graph_data["resource"], }, "relations": {}, } @@ -206,19 +206,26 @@ def create_resource_graph_entity(graph_data, include_relations): ) async def enrich_resource_with_graph(self, resource, application, environment): - data = { - "id": resource["res_id"], - "type": resource["type"], - "resource": resource["resource"] - } - response = await humanitec_client.get_resource_graph( - application, environment, [data] - ) + try: + logger.info("Enriching resource %s with graph", resource["res_id"]) + data = { + "id": resource["res_id"], + "type": resource["type"], + "resource": resource["resource"], + } + response = await humanitec_client.get_resource_graph( + application, environment, [data] + ) - resource.update( - {"__resourceGraph": i for i in response if i["type"] == data["type"]} - ) - return resource + resource.update( + {"__resourceGraph": i for i in response if i["type"] == data["type"]} + ) + return resource + except Exception as e: + logger.info( + f"Failed to enrich resource {resource['res_id']} with graph: %s", str(e) + ) + return resource async def sync_resources(self) -> None: logger.info(f"Syncing entities for blueprint {BLUEPRINT.RESOURCE}") @@ -242,7 +249,9 @@ def create_resource_entity(resource): "driverType": resource["driver_type"], }, "relations": { - BLUEPRINT.RESOURCE_GRAPH: resource["__resourceGraph"]["depends_on"], + BLUEPRINT.RESOURCE_GRAPH: resource.get("__resourceGraph", {}).get( + "depends_on", [] + ), BLUEPRINT.WORKLOAD: workload_id, }, } From 4199c71f1440be0ac7c9a28cdb7fbd7a25d57ae3 Mon Sep 17 00:00:00 2001 From: mk-armah Date: Mon, 15 Jul 2024 17:25:35 +0000 Subject: [PATCH 07/12] updated script --- integration/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/main.py b/integration/main.py index 175f386..cd6e2c3 100644 --- a/integration/main.py +++ b/integration/main.py @@ -222,7 +222,7 @@ async def enrich_resource_with_graph(self, resource, application, environment): ) return resource except Exception as e: - logger.info( + logger.error( f"Failed to enrich resource {resource['res_id']} with graph: %s", str(e) ) return resource From 149dec885c8c28ddfa65580da3da6ae14dc3e034 Mon Sep 17 00:00:00 2001 From: mk-armah Date: Thu, 18 Jul 2024 06:14:31 +0000 Subject: [PATCH 08/12] remodeled integration --- integration/clients/humanitec_client.py | 28 ++++- integration/main.py | 138 +++++++++++------------- 2 files changed, 86 insertions(+), 80 deletions(-) diff --git a/integration/clients/humanitec_client.py b/integration/clients/humanitec_client.py index 927abcd..e4f20fa 100644 --- a/integration/clients/humanitec_client.py +++ b/integration/clients/humanitec_client.py @@ -128,14 +128,37 @@ async def get_all_resources(self, app, env) -> List[Dict[str, Any]]: } }, ) - logger.info(f"Received {len(resources)} resources from Humanitec") + logger.info( + f"Received {len(resources)} resources for {env['id']} environment in {app['id']}" + ) + print("RESOURCES ", resources) return resources except Exception as e: logger.error( - f"Failed to fetch resources from {env['id']} environment in {app[id]}: {str(e)}" + f"Failed to fetch resources for {env['id']} environment in {app[id]}: {str(e)}" ) return [] + async def get_dependency_graph( + self, app: Dict[str, Any], env: Dict[str, Any] + ) -> List[Dict[str, Any]]: + if dependency_graph_id := env.get("last_deploy", {}).get("dependency_graph_id"): + endpoint = f"apps/{app['id']}/envs/{env['id']}/resources/graphs/{dependency_graph_id}" + humanitec_headers = self.get_humanitec_headers() + graph = await self.send_api_request( + "GET", endpoint, headers=humanitec_headers + ) + nodes = graph["nodes"] + logger.info( + f"Received {len(nodes)} graph nodes for {env['id']} environment in {app['id']}" + ) + return nodes + + logger.info( + f"No dependency graph found for {env['id']} environment in {app['id']}" + ) + return [] + async def get_resource_graph( self, app: Dict[str, Any], env: Dict[str, Any], data: List[Dict[str, Any]] ) -> Any: @@ -144,7 +167,6 @@ async def get_resource_graph( graph = await self.send_api_request( "POST", endpoint, headers=humanitec_headers, json=data ) - return graph async def get_all_resource_graphs( diff --git a/integration/main.py b/integration/main.py index cd6e2c3..4905a63 100644 --- a/integration/main.py +++ b/integration/main.py @@ -20,9 +20,21 @@ class BLUEPRINT: class HumanitecExporter: - def __init__(self, port_client, humanitec_client) -> None: - self.port_client = port_client - self.humanitec_client = humanitec_client + def __init__(self, args) -> None: + + timeout = httpx.Timeout(10.0, connect=10.0, read=20.0, write=10.0) + httpx_async_client = httpx.AsyncClient(timeout=timeout) + self.port_client = PortClient( + args.port_client_id, + args.port_client_secret, + httpx_async_client=httpx_async_client, + ) + self.humanitec_client = HumanitecClient( + args.org_id, + args.api_key, + api_url=args.api_url, + httpx_async_client=httpx_async_client, + ) @staticmethod def convert_to_datetime(timestamp: int) -> str: @@ -91,7 +103,7 @@ def create_entity(application, environment): ) for application in applications for environments in [ - await humanitec_client.get_all_environments(application) + await self.humanitec_client.get_all_environments(application) ] for environment in environments ] @@ -122,14 +134,16 @@ def create_workload_entity(resource, application): }, } - applications = await humanitec_client.get_all_applications() + applications = await self.humanitec_client.get_all_applications() for application in applications: environments = await self.humanitec_client.get_all_environments(application) for environment in environments: resources = await self.humanitec_client.get_all_resources( application, environment ) - resource_group = humanitec_client.group_resources_by_type(resources) + resource_group = self.humanitec_client.group_resources_by_type( + resources + ) tasks = [ self.port_client.upsert_entity( blueprint_id=BLUEPRINT.WORKLOAD, @@ -144,7 +158,10 @@ def create_workload_entity(resource, application): async def sync_resource_graphs(self) -> None: logger.info(f"Syncing entities for blueprint {BLUEPRINT.RESOURCE_GRAPH}") - def create_resource_graph_entity(graph_data, include_relations): + def create_resource_graph_entity( + graph_data, include_relations, application, environment + ): + print("GRAPH DATA", graph_data) entity = { "identifier": graph_data["guresid"], "title": self.remove_symbols_and_title_case(graph_data["def_id"]), @@ -157,8 +174,10 @@ def create_resource_graph_entity(graph_data, include_relations): "relations": {}, } if include_relations: + entity["relations"] = { - BLUEPRINT.RESOURCE_GRAPH: graph_data["depends_on"] + BLUEPRINT.RESOURCE_GRAPH: graph_data["depends_on"], + BLUEPRINT.ENVIRONMENT: f"{application['id']}/{environment['id']}", } return entity @@ -166,27 +185,19 @@ def create_resource_graph_entity(graph_data, include_relations): for application in applications: environments = await self.humanitec_client.get_all_environments(application) for environment in environments: - resources = await self.humanitec_client.get_all_resources( + graph_nodes = await self.humanitec_client.get_dependency_graph( application, environment ) - resources = humanitec_client.group_resources_by_type(resources) - modules = resources.get("modules", []) - if not modules: - continue - - resource_graph = await humanitec_client.get_all_resource_graphs( - modules, application, environment - ) # First pass: Create entities without relations tasks = [ self.port_client.upsert_entity( blueprint_id=BLUEPRINT.RESOURCE_GRAPH, entity_object=create_resource_graph_entity( - graph_data, include_relations=False + node, False, application, environment ), ) - for graph_data in resource_graph + for node in graph_nodes ] await asyncio.gather(*tasks) @@ -195,10 +206,10 @@ def create_resource_graph_entity(graph_data, include_relations): self.port_client.upsert_entity( blueprint_id=BLUEPRINT.RESOURCE_GRAPH, entity_object=create_resource_graph_entity( - graph_data, include_relations=True + node, True, application, environment ), ) - for graph_data in resource_graph + for node in graph_nodes ] await asyncio.gather(*tasks) logger.info( @@ -213,7 +224,7 @@ async def enrich_resource_with_graph(self, resource, application, environment): "type": resource["type"], "resource": resource["resource"], } - response = await humanitec_client.get_resource_graph( + response = await self.humanitec_client.get_resource_graph( application, environment, [data] ) @@ -231,14 +242,17 @@ async def sync_resources(self) -> None: logger.info(f"Syncing entities for blueprint {BLUEPRINT.RESOURCE}") def create_resource_entity(resource): + print("RESOURCE", resource) workload_id = ( resource["res_id"].split(".")[1] if resource["res_id"].split(".")[0].startswith("modules") else "" ) - workload_id = f"{resource['app_id']}/{resource['env_id']}/{workload_id}" - return { - "identifier": resource["__resourceGraph"]["guresid"], + resource_id = ( + f"{resource['app_id']}/{resource['env_id']}/{resource['res_id']}" + ) + entity = { + "identifier": resource_id, "title": self.remove_symbols_and_title_case(resource["def_id"]), "properties": { "type": resource["type"], @@ -248,51 +262,34 @@ def create_resource_entity(resource): "updateAt": resource["updated_at"], "driverType": resource["driver_type"], }, - "relations": { - BLUEPRINT.RESOURCE_GRAPH: resource.get("__resourceGraph", {}).get( - "depends_on", [] - ), - BLUEPRINT.WORKLOAD: workload_id, - }, + "relations": {}, } - - async def fetch_resources(application, environment): - resources = await self.humanitec_client.get_all_resources( - application, environment - ) - resources = humanitec_client.group_resources_by_type(resources) - modules = resources.get("modules", []) - if not modules: - return [] - - tasks = [ - self.enrich_resource_with_graph(resource, application, environment) - for resource in modules - ] - enriched_resources = await asyncio.gather(*tasks) - return enriched_resources + if workload_id: + workload_id = f"{resource['app_id']}/{resource['env_id']}/{workload_id}" + entity["relations"][BLUEPRINT.WORKLOAD] = workload_id + entity["relations"][BLUEPRINT.RESOURCE_GRAPH] = resource["gu_res_id"] + return entity applications = await self.humanitec_client.get_all_applications() for application in applications: environments = await self.humanitec_client.get_all_environments(application) + for environment in environments: + resources = await self.humanitec_client.get_all_resources( + application, environment + ) - resource_tasks = [ - fetch_resources(application, environment) - for environment in environments - ] - all_resources = await asyncio.gather(*resource_tasks) - all_resources = [ - resource for sublist in all_resources for resource in sublist - ] # Flatten the list - - entity_tasks = [ - self.port_client.upsert_entity( - blueprint_id=BLUEPRINT.RESOURCE, - entity_object=create_resource_entity(resource), + entity_tasks = [ + self.port_client.upsert_entity( + blueprint_id=BLUEPRINT.RESOURCE, + entity_object=create_resource_entity(resource), + ) + for resource in resources + ] + await asyncio.gather(*entity_tasks) + logger.info( + "Upserted resource entities for %s environment", environment["id"] ) - for resource in all_resources - ] - await asyncio.gather(*entity_tasks) + logger.info(f"Finished syncing entities for blueprint {BLUEPRINT.RESOURCE}") async def sync_all(self) -> None: @@ -359,18 +356,5 @@ def validate_args(args): sys.exit() - timeout = httpx.Timeout(10.0, connect=10.0, read=10.0, write=10.0) - httpx_async_client = httpx.AsyncClient(timeout=timeout) - port_client = PortClient( - args.port_client_id, - args.port_client_secret, - httpx_async_client=httpx_async_client, - ) - humanitec_client = HumanitecClient( - args.org_id, - args.api_key, - api_url=args.api_url, - httpx_async_client=httpx_async_client, - ) - exporter = HumanitecExporter(port_client, humanitec_client) + exporter = HumanitecExporter(args) asyncio.run(exporter(args)) From 5f048bc03783198ab434b4845423c2740a5388c3 Mon Sep 17 00:00:00 2001 From: mk-armah Date: Thu, 18 Jul 2024 06:32:01 +0000 Subject: [PATCH 09/12] updated blueprints --- resources/blueprints.json | 113 +++++++++++++++++++------------------- 1 file changed, 56 insertions(+), 57 deletions(-) diff --git a/resources/blueprints.json b/resources/blueprints.json index 7e2226c..62c7b6c 100644 --- a/resources/blueprints.json +++ b/resources/blueprints.json @@ -2,7 +2,7 @@ { "identifier": "humanitecApplication", "description": "Humanitec Application", - "title": "humanitecApplication", + "title": "Application", "icon": "Apps", "schema": { "properties": { @@ -19,54 +19,54 @@ "aggregationProperties": {}, "relations": {} }, - { - "identifier": "humanitecEnvironment", - "title": "Humanitec Environment", - "icon": "Environment", - "schema": { - "properties": { - "type": { - "title": "Type", - "icon": "DefaultProperty", - "type": "string" - }, - "createdAt": { - "type": "string", - "format": "date-time", - "title": "Creation Date", - "description": "The date and time when the environment was created." - }, - "lastDeploymentStatus": { - "type": "string", - "title": "Last Deployment Status", - "description": "The status of the last deployment." - }, - "lastDeploymentDate": { - "type": "string", - "format": "date-time", - "title": "Last Deployment Date", - "description": "The date and time of the last time the environment was deployed." - }, - "lastDeploymentComment": { - "type": "string", - "title": "Last Deployment Comment", - "description": "comment on the last deployment" - } + { + "identifier": "humanitecEnvironment", + "title": "Environment", + "icon": "Environment", + "schema": { + "properties": { + "type": { + "title": "Type", + "icon": "DefaultProperty", + "type": "string" }, - "required": [] - }, - "mirrorProperties": {}, - "calculationProperties": {}, - "aggregationProperties": {}, - "relations": { - "humanitecApplication": { - "title": "Application", - "target": "humanitecApplication", - "required": false, - "many": false + "createdAt": { + "type": "string", + "format": "date-time", + "title": "Creation Date", + "description": "The date and time when the environment was created." + }, + "lastDeploymentStatus": { + "type": "string", + "title": "Last Deployment Status", + "description": "The status of the last deployment." + }, + "lastDeploymentDate": { + "type": "string", + "format": "date-time", + "title": "Last Deployment Date", + "description": "The date and time of the last time the environment was deployed." + }, + "lastDeploymentComment": { + "type": "string", + "title": "Last Deployment Comment", + "description": "comment on the last deployment" } - } + }, + "required": [] }, + "mirrorProperties": {}, + "calculationProperties": {}, + "aggregationProperties": {}, + "relations": { + "humanitecApplication": { + "title": "Application", + "target": "humanitecApplication", + "required": false, + "many": false + } + } + }, { "identifier": "humanitecWorkload", "title": "Workload", @@ -127,7 +127,7 @@ }, { "identifier": "humanitecResource", - "title": "Humanitec Resource", + "title": "Active Resource", "icon": "Microservice", "schema": { "properties": { @@ -154,12 +154,6 @@ "description": "The schema of the resource", "type": "object", "icon": "DefaultProperty" - }, - "guresid": { - "title": "GU Resource ID", - "description": "The GU resource ID", - "type": "string", - "icon": "DefaultProperty" } }, "required": [] @@ -169,11 +163,10 @@ "aggregationProperties": {}, "relations": { "humanitecResourceGraph": { - "title": "Depends On", - "description": "Resource Graph", + "title": "Graph", "target": "humanitecResourceGraph", "required": false, - "many": true + "many": false }, "humanitecWorkload": { "title": "Humanitec Workload", @@ -196,8 +189,14 @@ "calculationProperties": {}, "aggregationProperties": {}, "relations": { + "humanitecEnvironment": { + "title": "Environment", + "target": "humanitecEnvironment", + "required": false, + "many": false + }, "humanitecResourceGraph": { - "title": "Resource Graph", + "title": "Depends On", "target": "humanitecResourceGraph", "required": false, "many": true From 6fe5822d9aac507af4cc6fa5419215c38d50b300 Mon Sep 17 00:00:00 2001 From: mk-armah Date: Thu, 18 Jul 2024 06:34:13 +0000 Subject: [PATCH 10/12] removed prints --- integration/clients/humanitec_client.py | 1 - integration/main.py | 2 -- 2 files changed, 3 deletions(-) diff --git a/integration/clients/humanitec_client.py b/integration/clients/humanitec_client.py index e4f20fa..d8e2110 100644 --- a/integration/clients/humanitec_client.py +++ b/integration/clients/humanitec_client.py @@ -131,7 +131,6 @@ async def get_all_resources(self, app, env) -> List[Dict[str, Any]]: logger.info( f"Received {len(resources)} resources for {env['id']} environment in {app['id']}" ) - print("RESOURCES ", resources) return resources except Exception as e: logger.error( diff --git a/integration/main.py b/integration/main.py index 4905a63..2fec152 100644 --- a/integration/main.py +++ b/integration/main.py @@ -161,7 +161,6 @@ async def sync_resource_graphs(self) -> None: def create_resource_graph_entity( graph_data, include_relations, application, environment ): - print("GRAPH DATA", graph_data) entity = { "identifier": graph_data["guresid"], "title": self.remove_symbols_and_title_case(graph_data["def_id"]), @@ -242,7 +241,6 @@ async def sync_resources(self) -> None: logger.info(f"Syncing entities for blueprint {BLUEPRINT.RESOURCE}") def create_resource_entity(resource): - print("RESOURCE", resource) workload_id = ( resource["res_id"].split(".")[1] if resource["res_id"].split(".")[0].startswith("modules") From 7d3e515e256e86a76bff80e8860221c206791470 Mon Sep 17 00:00:00 2001 From: mk-armah Date: Thu, 18 Jul 2024 06:47:15 +0000 Subject: [PATCH 11/12] removed unused functions --- integration/clients/humanitec_client.py | 79 +++++++++---------------- 1 file changed, 27 insertions(+), 52 deletions(-) diff --git a/integration/clients/humanitec_client.py b/integration/clients/humanitec_client.py index d8e2110..b4d856c 100644 --- a/integration/clients/humanitec_client.py +++ b/integration/clients/humanitec_client.py @@ -74,11 +74,11 @@ async def get_all_environments(self, app) -> List[Dict[str, Any]]: try: if cached_environments := await self.cache.get(CACHE_KEYS.ENVIRONMENT): - if cached_environments := cached_environments.get(app["id"]): + if app_environments := cached_environments.get(app["id"]): logger.info( - f"Retrieved {len(cached_environments)} environment for {app['id']} from cache" + f"Retrieved {len(app_environments)} environment for {app['id']} from cache" ) - return list(cached_environments.values()) + return list(app_environments.values()) logger.info("Fetching environments from Humanitec") @@ -104,13 +104,13 @@ async def get_all_environments(self, app) -> List[Dict[str, Any]]: async def get_all_resources(self, app, env) -> List[Dict[str, Any]]: try: if cached_resources := await self.cache.get(CACHE_KEYS.RESOURCE): - if cached_resources := cached_resources.get(app["id"], {}).get( - env["id"], {} + if env_resources := cached_resources.get(app["id"], {}).get( + env["id"] ): logger.info( - f"Retrieved {len(cached_resources)} resources from cache for app {app['id']} and env {env['id']}" + f"Retrieved {len(env_resources)} resources from cache for app {app['id']} and env {env['id']}" ) - return list(cached_resources.values()) + return list(env_resources.values()) logger.info("Fetching resources from Humanitec") endpoint = f"apps/{app['id']}/envs/{env['id']}/resources" @@ -141,22 +141,28 @@ async def get_all_resources(self, app, env) -> List[Dict[str, Any]]: async def get_dependency_graph( self, app: Dict[str, Any], env: Dict[str, Any] ) -> List[Dict[str, Any]]: - if dependency_graph_id := env.get("last_deploy", {}).get("dependency_graph_id"): - endpoint = f"apps/{app['id']}/envs/{env['id']}/resources/graphs/{dependency_graph_id}" - humanitec_headers = self.get_humanitec_headers() - graph = await self.send_api_request( - "GET", endpoint, headers=humanitec_headers - ) - nodes = graph["nodes"] + try: + if dependency_graph_id := env.get("last_deploy", {}).get("dependency_graph_id"): + endpoint = f"apps/{app['id']}/envs/{env['id']}/resources/graphs/{dependency_graph_id}" + humanitec_headers = self.get_humanitec_headers() + graph = await self.send_api_request( + "GET", endpoint, headers=humanitec_headers + ) + nodes = graph["nodes"] + logger.info( + f"Received {len(nodes)} graph nodes for {env['id']} environment in {app['id']}" + ) + return nodes + logger.info( - f"Received {len(nodes)} graph nodes for {env['id']} environment in {app['id']}" + f"No dependency graph found for {env['id']} environment in {app['id']}" ) - return nodes - - logger.info( - f"No dependency graph found for {env['id']} environment in {app['id']}" - ) - return [] + return [] + except Exception as e: + logger.error( + f"Failed to fetch dependency graphs for {env['id']} environment in {app['id']}: {str(e)}" + ) + return [] async def get_resource_graph( self, app: Dict[str, Any], env: Dict[str, Any], data: List[Dict[str, Any]] @@ -168,37 +174,6 @@ async def get_resource_graph( ) return graph - async def get_all_resource_graphs( - self, modules: List[Dict[str, Any]], app: Dict[str, Any], env: Dict[str, Any] - ) -> List[Dict[str, Any]]: - - try: - - def get_resource_graph_request_body(modules): - return [ - { - "id": module["res_id"], - "type": module["type"], - "resource": module["resource"], - } - for module in modules - ] - - data = get_resource_graph_request_body(modules) - - graph_entities: List[Dict[str, Any]] = await self.get_resource_graph( - app, env, data - ) - logger.info( - f"Received {len(graph_entities)} resource graph entities from app: {app['id']} and env: {env['id']} using data: {data}" - ) - return graph_entities - except Exception as e: - logger.error( - f"Failed to fetch resource graphs from {env['id']} environment in {app['id']}: {str(e)}" - ) - return [] - def group_resources_by_type( self, data: List[Dict[str, Any]] ) -> Dict[str, List[Dict[str, Any]]]: From 0aa56da2cef50c9f3f54d4ac57d33278be9d210c Mon Sep 17 00:00:00 2001 From: mk-armah Date: Sun, 21 Jul 2024 20:55:16 +0000 Subject: [PATCH 12/12] disconnected relationship between resources and graphs --- integration/main.py | 1 - resources/blueprints.json | 6 ------ 2 files changed, 7 deletions(-) diff --git a/integration/main.py b/integration/main.py index 2fec152..b68a11d 100644 --- a/integration/main.py +++ b/integration/main.py @@ -265,7 +265,6 @@ def create_resource_entity(resource): if workload_id: workload_id = f"{resource['app_id']}/{resource['env_id']}/{workload_id}" entity["relations"][BLUEPRINT.WORKLOAD] = workload_id - entity["relations"][BLUEPRINT.RESOURCE_GRAPH] = resource["gu_res_id"] return entity applications = await self.humanitec_client.get_all_applications() diff --git a/resources/blueprints.json b/resources/blueprints.json index 62c7b6c..4291f9d 100644 --- a/resources/blueprints.json +++ b/resources/blueprints.json @@ -162,12 +162,6 @@ "calculationProperties": {}, "aggregationProperties": {}, "relations": { - "humanitecResourceGraph": { - "title": "Graph", - "target": "humanitecResourceGraph", - "required": false, - "many": false - }, "humanitecWorkload": { "title": "Humanitec Workload", "target": "humanitecWorkload",