diff --git a/.gitignore b/.gitignore index 01d7f95..106ee6a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ __pycache__ -venv \ No newline at end of file +venv +**/__init__.py \ No newline at end of file diff --git a/integration/clients/cache.py b/integration/clients/cache.py index a8f9d82..5e67937 100644 --- a/integration/clients/cache.py +++ b/integration/clients/cache.py @@ -1,6 +1,7 @@ import asyncio from typing import Dict, Any + class InMemoryCache: def __init__(self): self.cache = {} diff --git a/integration/clients/humanitec_client.py b/integration/clients/humanitec_client.py index fbeaa6d..b4d856c 100644 --- a/integration/clients/humanitec_client.py +++ b/integration/clients/humanitec_client.py @@ -10,7 +10,6 @@ class CACHE_KEYS: APPLICATION = "APPLICATION_CACHE_KEY" ENVIRONMENT = "ENVIRONMENT_CACHE_KEY" - WORKLOAD = "WORKLOAD_CACHE_KEY" RESOURCE = "RESOURCE_CACHE_KEY" @@ -36,7 +35,7 @@ async def send_api_request( method: str, endpoint: str, headers: Dict[str, str] | None = None, - json: Dict[str, Any] | None = None, + json: Dict[str, Any] | List[Dict[str, Any]] | None = None, ) -> Any: url = self.base_url + endpoint try: @@ -72,91 +71,113 @@ async def get_all_applications(self) -> List[Dict[str, Any]]: return applications async def get_all_environments(self, app) -> List[Dict[str, Any]]: - if cached_environments := await self.cache.get(CACHE_KEYS.ENVIRONMENT): - cached_environments = cached_environments.get(app["id"], {}) - logger.info( - f"Retrieved {len(cached_environments)} environment for {app['id']} from cache" - ) - return list(cached_environments.values()) - endpoint = f"apps/{app['id']}/envs" - humanitec_headers = self.get_humanitec_headers() - environments: List[Dict[str, Any]] = await self.send_api_request( - "GET", endpoint, headers=humanitec_headers - ) - await self.cache.set( - CACHE_KEYS.ENVIRONMENT, - { - app["id"]: { - environment["id"]: environment for environment in environments - } - }, - ) - logger.info(f"Received {len(environments)} environments from Humanitec") - return environments + try: + if cached_environments := await self.cache.get(CACHE_KEYS.ENVIRONMENT): + if app_environments := cached_environments.get(app["id"]): + logger.info( + f"Retrieved {len(app_environments)} environment for {app['id']} from cache" + ) + return list(app_environments.values()) + + logger.info("Fetching environments from Humanitec") + + endpoint = f"apps/{app['id']}/envs" + humanitec_headers = self.get_humanitec_headers() + environments: List[Dict[str, Any]] = await self.send_api_request( + "GET", endpoint, headers=humanitec_headers + ) + await self.cache.set( + CACHE_KEYS.ENVIRONMENT, + { + app["id"]: { + environment["id"]: environment for environment in environments + } + }, + ) + logger.info(f"Received {len(environments)} environments from Humanitec") + return environments + except Exception as e: + logger.error(f"Failed to fetch environments from {app['id']}: {str(e)}") + return [] async def get_all_resources(self, app, env) -> List[Dict[str, Any]]: - if cached_resources := await self.cache.get(CACHE_KEYS.RESOURCE): - cached_resources = cached_resources.get(app["id"], {}).get(env["id"], {}) + try: + if cached_resources := await self.cache.get(CACHE_KEYS.RESOURCE): + if env_resources := cached_resources.get(app["id"], {}).get( + env["id"] + ): + logger.info( + f"Retrieved {len(env_resources)} resources from cache for app {app['id']} and env {env['id']}" + ) + return list(env_resources.values()) + + logger.info("Fetching resources from Humanitec") + endpoint = f"apps/{app['id']}/envs/{env['id']}/resources" + humanitec_headers = self.get_humanitec_headers() + resources: List[Dict[str, Any]] = await self.send_api_request( + "GET", endpoint, headers=humanitec_headers + ) + await self.cache.set( + CACHE_KEYS.RESOURCE, + { + app["id"]: { + env["id"]: { + resource["gu_res_id"]: resource for resource in resources + } + } + }, + ) logger.info( - f"Retrieved {len(cached_resources)} resources from cache for app {app['id']} and env {env['id']}" + f"Received {len(resources)} resources for {env['id']} environment in {app['id']}" ) - return list(cached_resources.values()) + return resources + except Exception as e: + logger.error( + f"Failed to fetch resources for {env['id']} environment in {app[id]}: {str(e)}" + ) + return [] - endpoint = f"apps/{app['id']}/envs/{env['id']}/resources" - humanitec_headers = self.get_humanitec_headers() - resources: List[Dict[str, Any]] = await self.send_api_request( - "GET", endpoint, headers=humanitec_headers - ) - await self.cache.set( - CACHE_KEYS.RESOURCE, - { - app["id"]: { - env["id"]: { - resource["gu_res_id"]: resource for resource in resources - } - } - }, - ) - logger.info(f"Received {len(resources)} resources from Humanitec") - return resources + async def get_dependency_graph( + self, app: Dict[str, Any], env: Dict[str, Any] + ) -> List[Dict[str, Any]]: + try: + if dependency_graph_id := env.get("last_deploy", {}).get("dependency_graph_id"): + endpoint = f"apps/{app['id']}/envs/{env['id']}/resources/graphs/{dependency_graph_id}" + humanitec_headers = self.get_humanitec_headers() + graph = await self.send_api_request( + "GET", endpoint, headers=humanitec_headers + ) + nodes = graph["nodes"] + logger.info( + f"Received {len(nodes)} graph nodes for {env['id']} environment in {app['id']}" + ) + return nodes + + logger.info( + f"No dependency graph found for {env['id']} environment in {app['id']}" + ) + return [] + except Exception as e: + logger.error( + f"Failed to fetch dependency graphs for {env['id']} environment in {app['id']}: {str(e)}" + ) + return [] async def get_resource_graph( - self, app: str, env: str, data: List[Dict[str, Any]] + self, app: Dict[str, Any], env: Dict[str, Any], data: List[Dict[str, Any]] ) -> Any: endpoint = f"apps/{app['id']}/envs/{env['id']}/resources/graph" humanitec_headers = self.get_humanitec_headers() graph = await self.send_api_request( "POST", endpoint, headers=humanitec_headers, json=data ) - return graph - async def get_all_resource_graphs( - self, modules: List[Dict[str, Any]], app: str, env: str - ) -> Any: - - def get_resource_graph_request_body(modules): - return [ - { - "id": module["gu_res_id"], - "type": module["type"], - "resource": module["resource"], - } - for module in modules - ] - data = get_resource_graph_request_body(modules) - - graph_entities = await self.get_resource_graph(app, env, data) - logger.info( - f"Received {len(graph_entities)} resource graph entities from app: {app['id']} and env: {env['id']} using data: {data}" - ) - return graph_entities - def group_resources_by_type( self, data: List[Dict[str, Any]] ) -> Dict[str, List[Dict[str, Any]]]: - grouped_resources = {} + grouped_resources: dict[str, Any] = {} for resource in data: workload_id = resource["res_id"].split(".")[0] if workload_id not in grouped_resources: diff --git a/integration/clients/port_client.py b/integration/clients/port_client.py index 59056fc..e906d7b 100644 --- a/integration/clients/port_client.py +++ b/integration/clients/port_client.py @@ -1,15 +1,13 @@ import httpx from typing import Any, Dict from loguru import logger -from typing import List, Dict, Optional, Union -from .cache import InMemoryCache +from typing import Dict class PortClient: def __init__(self, client_id, client_secret, **kwargs) -> None: self.httpx_async_client = kwargs.get("httpx_async_client", httpx.AsyncClient()) self.client_id = client_id - self.cache = InMemoryCache() self.client_secret = client_secret self.base_url = kwargs.get("base_url", "https://api.getport.io/v1") self.port_headers = None @@ -49,7 +47,7 @@ async def send_api_request( async def upsert_entity( self, blueprint_id: str, entity_object: Dict[str, Any] - ) -> None: + ) -> Dict[str, Any]: endpoint = f"/blueprints/{blueprint_id}/entities?upsert=true&merge=true" port_headers = ( self.port_headers if self.port_headers else await self.get_port_headers() diff --git a/integration/main.py b/integration/main.py index b3f8591..b68a11d 100644 --- a/integration/main.py +++ b/integration/main.py @@ -2,7 +2,7 @@ import argparse import time import datetime -from decouple import config +from decouple import config # type: ignore import re import asyncio from loguru import logger @@ -20,9 +20,21 @@ class BLUEPRINT: class HumanitecExporter: - def __init__(self, port_client, humanitec_client) -> None: - self.port_client = port_client - self.humanitec_client = humanitec_client + def __init__(self, args) -> None: + + timeout = httpx.Timeout(10.0, connect=10.0, read=20.0, write=10.0) + httpx_async_client = httpx.AsyncClient(timeout=timeout) + self.port_client = PortClient( + args.port_client_id, + args.port_client_secret, + httpx_async_client=httpx_async_client, + ) + self.humanitec_client = HumanitecClient( + args.org_id, + args.api_key, + api_url=args.api_url, + httpx_async_client=httpx_async_client, + ) @staticmethod def convert_to_datetime(timestamp: int) -> str: @@ -66,7 +78,7 @@ async def sync_environments(self) -> None: def create_entity(application, environment): return { - "identifier": environment["id"], + "identifier": f"{application['id']}/{environment['id']}", "title": environment["name"], "properties": { "type": environment["type"], @@ -91,7 +103,7 @@ def create_entity(application, environment): ) for application in applications for environments in [ - await humanitec_client.get_all_environments(application) + await self.humanitec_client.get_all_environments(application) ] for environment in environments ] @@ -100,9 +112,11 @@ def create_entity(application, environment): async def sync_workloads(self): logger.info(f"Syncing entities for blueprint {BLUEPRINT.WORKLOAD}") - def create_workload_entity(resource): + + def create_workload_entity(resource, application): + identifier = f"{application['id']}/{environment['id']}/{resource['res_id'].replace('modules.', '')}" return { - "identifier": resource["res_id"].replace("modules.", ""), + "identifier": identifier, "title": self.remove_symbols_and_title_case( resource["res_id"].replace("modules.", "") ), @@ -116,22 +130,24 @@ def create_workload_entity(resource): "graphResourceID": resource["gu_res_id"], }, "relations": { - BLUEPRINT.ENVIRONMENT: resource["env_id"], + BLUEPRINT.ENVIRONMENT: f"{application['id']}/{environment['id']}", }, } - applications = await humanitec_client.get_all_applications() + applications = await self.humanitec_client.get_all_applications() for application in applications: environments = await self.humanitec_client.get_all_environments(application) for environment in environments: resources = await self.humanitec_client.get_all_resources( application, environment ) - resource_group = humanitec_client.group_resources_by_type(resources) + resource_group = self.humanitec_client.group_resources_by_type( + resources + ) tasks = [ self.port_client.upsert_entity( blueprint_id=BLUEPRINT.WORKLOAD, - entity_object=create_workload_entity(resource), + entity_object=create_workload_entity(resource, application), ) for resource in resource_group.get("modules", []) if resource and resource["type"] == "workload" @@ -142,7 +158,9 @@ def create_workload_entity(resource): async def sync_resource_graphs(self) -> None: logger.info(f"Syncing entities for blueprint {BLUEPRINT.RESOURCE_GRAPH}") - def create_resource_graph_entity(graph_data, include_relations): + def create_resource_graph_entity( + graph_data, include_relations, application, environment + ): entity = { "identifier": graph_data["guresid"], "title": self.remove_symbols_and_title_case(graph_data["def_id"]), @@ -155,8 +173,10 @@ def create_resource_graph_entity(graph_data, include_relations): "relations": {}, } if include_relations: + entity["relations"] = { - BLUEPRINT.RESOURCE_GRAPH: graph_data["depends_on"] + BLUEPRINT.RESOURCE_GRAPH: graph_data["depends_on"], + BLUEPRINT.ENVIRONMENT: f"{application['id']}/{environment['id']}", } return entity @@ -164,15 +184,7 @@ def create_resource_graph_entity(graph_data, include_relations): for application in applications: environments = await self.humanitec_client.get_all_environments(application) for environment in environments: - resources = await self.humanitec_client.get_all_resources( - application, environment - ) - resources = humanitec_client.group_resources_by_type(resources) - modules = resources.get("modules", []) - if not modules: - continue - - resource_graph = await humanitec_client.get_all_resource_graphs(modules, + graph_nodes = await self.humanitec_client.get_dependency_graph( application, environment ) @@ -181,10 +193,10 @@ def create_resource_graph_entity(graph_data, include_relations): self.port_client.upsert_entity( blueprint_id=BLUEPRINT.RESOURCE_GRAPH, entity_object=create_resource_graph_entity( - graph_data, include_relations=False + node, False, application, environment ), ) - for graph_data in resource_graph + for node in graph_nodes ] await asyncio.gather(*tasks) @@ -193,39 +205,52 @@ def create_resource_graph_entity(graph_data, include_relations): self.port_client.upsert_entity( blueprint_id=BLUEPRINT.RESOURCE_GRAPH, entity_object=create_resource_graph_entity( - graph_data, include_relations=True + node, True, application, environment ), ) - for graph_data in resource_graph + for node in graph_nodes ] await asyncio.gather(*tasks) - logger.info(f"Finished syncing entities for blueprint {BLUEPRINT.RESOURCE_GRAPH}") + logger.info( + f"Finished syncing entities for blueprint {BLUEPRINT.RESOURCE_GRAPH}" + ) async def enrich_resource_with_graph(self, resource, application, environment): - data = { - "id": resource["gu_res_id"], - "type": resource["type"], - "resource": resource["resource"], - } - response = await humanitec_client.get_resource_graph( - application, environment, [data] - ) + try: + logger.info("Enriching resource %s with graph", resource["res_id"]) + data = { + "id": resource["res_id"], + "type": resource["type"], + "resource": resource["resource"], + } + response = await self.humanitec_client.get_resource_graph( + application, environment, [data] + ) - resource.update( - {"__resourceGraph": i for i in response if i["type"] == data["type"]} - ) - return resource + resource.update( + {"__resourceGraph": i for i in response if i["type"] == data["type"]} + ) + return resource + except Exception as e: + logger.error( + f"Failed to enrich resource {resource['res_id']} with graph: %s", str(e) + ) + return resource async def sync_resources(self) -> None: logger.info(f"Syncing entities for blueprint {BLUEPRINT.RESOURCE}") + def create_resource_entity(resource): workload_id = ( resource["res_id"].split(".")[1] if resource["res_id"].split(".")[0].startswith("modules") else "" ) - return { - "identifier": resource["__resourceGraph"]["guresid"], + resource_id = ( + f"{resource['app_id']}/{resource['env_id']}/{resource['res_id']}" + ) + entity = { + "identifier": resource_id, "title": self.remove_symbols_and_title_case(resource["def_id"]), "properties": { "type": resource["type"], @@ -235,49 +260,33 @@ def create_resource_entity(resource): "updateAt": resource["updated_at"], "driverType": resource["driver_type"], }, - "relations": { - BLUEPRINT.RESOURCE_GRAPH: resource["__resourceGraph"]["depends_on"], - BLUEPRINT.WORKLOAD: workload_id, - }, + "relations": {}, } - - async def fetch_resources(application, environment): - resources = await self.humanitec_client.get_all_resources( - application, environment - ) - resources = humanitec_client.group_resources_by_type(resources) - modules = resources.get("modules", []) - if not modules: - return [] - - tasks = [ - self.enrich_resource_with_graph(resource, application, environment) - for resource in modules - ] - enriched_resources = await asyncio.gather(*tasks) - return enriched_resources + if workload_id: + workload_id = f"{resource['app_id']}/{resource['env_id']}/{workload_id}" + entity["relations"][BLUEPRINT.WORKLOAD] = workload_id + return entity applications = await self.humanitec_client.get_all_applications() for application in applications: environments = await self.humanitec_client.get_all_environments(application) + for environment in environments: + resources = await self.humanitec_client.get_all_resources( + application, environment + ) - resource_tasks = [ - fetch_resources(application, environment) - for environment in environments - ] - all_resources = await asyncio.gather(*resource_tasks) - all_resources = [ - resource for sublist in all_resources for resource in sublist - ] # Flatten the list - - entity_tasks = [ - self.port_client.upsert_entity( - blueprint_id=BLUEPRINT.RESOURCE, - entity_object=create_resource_entity(resource), + entity_tasks = [ + self.port_client.upsert_entity( + blueprint_id=BLUEPRINT.RESOURCE, + entity_object=create_resource_entity(resource), + ) + for resource in resources + ] + await asyncio.gather(*entity_tasks) + logger.info( + "Upserted resource entities for %s environment", environment["id"] ) - for resource in all_resources - ] - await asyncio.gather(*entity_tasks) + logger.info(f"Finished syncing entities for blueprint {BLUEPRINT.RESOURCE}") async def sync_all(self) -> None: @@ -297,7 +306,7 @@ async def __call__(self, args) -> None: def validate_args(args): required_keys = ["org_id", "api_key", "port_client_id", "port_client_secret"] missing_keys = [key for key in required_keys if not getattr(args, key)] - + if missing_keys: logger.error(f"The following keys are required: {', '.join(missing_keys)}") return False @@ -305,37 +314,44 @@ def validate_args(args): parser = argparse.ArgumentParser() parser.add_argument( - "--org-id", required=False,default=config("ORG_ID",""), type=str, help="Humanitec organization ID" + "--org-id", + required=False, + default=config("ORG_ID", ""), + type=str, + help="Humanitec organization ID", + ) + parser.add_argument( + "--api-key", + required=False, + default=config("API_KEY", ""), + type=str, + help="Humanitec API key", ) - parser.add_argument("--api-key", required=False,default=config("API_KEY",""), type=str, help="Humanitec API key") parser.add_argument( "--api-url", type=str, - default=config("API_URL","https://api.humanitec.com"), + default=config("API_URL", "https://api.humanitec.com"), help="Humanitec API URL", ) parser.add_argument( - "--port-client-id", type=str, required=False,default=config("PORT_CLIENT_ID",""), help="Port client ID" + "--port-client-id", + type=str, + required=False, + default=config("PORT_CLIENT_ID", ""), + help="Port client ID", ) parser.add_argument( - "--port-client-secret", type=str, required=False,default = config("PORT_CLIENT_SECRET",""), help="Port client secret" + "--port-client-secret", + type=str, + required=False, + default=config("PORT_CLIENT_SECRET", ""), + help="Port client secret", ) args = parser.parse_args() - if not(validate_args(args)): + if not (validate_args(args)): import sys + sys.exit() - httpx_async_client = httpx.AsyncClient() - port_client = PortClient( - args.port_client_id, - args.port_client_secret, - httpx_async_client=httpx_async_client, - ) - humanitec_client = HumanitecClient( - args.org_id, - args.api_key, - api_url=args.api_url, - httpx_async_client=httpx_async_client, - ) - exporter = HumanitecExporter(port_client, humanitec_client) + exporter = HumanitecExporter(args) asyncio.run(exporter(args)) diff --git a/resources/blueprints.json b/resources/blueprints.json index 7e2226c..4291f9d 100644 --- a/resources/blueprints.json +++ b/resources/blueprints.json @@ -2,7 +2,7 @@ { "identifier": "humanitecApplication", "description": "Humanitec Application", - "title": "humanitecApplication", + "title": "Application", "icon": "Apps", "schema": { "properties": { @@ -19,54 +19,54 @@ "aggregationProperties": {}, "relations": {} }, - { - "identifier": "humanitecEnvironment", - "title": "Humanitec Environment", - "icon": "Environment", - "schema": { - "properties": { - "type": { - "title": "Type", - "icon": "DefaultProperty", - "type": "string" - }, - "createdAt": { - "type": "string", - "format": "date-time", - "title": "Creation Date", - "description": "The date and time when the environment was created." - }, - "lastDeploymentStatus": { - "type": "string", - "title": "Last Deployment Status", - "description": "The status of the last deployment." - }, - "lastDeploymentDate": { - "type": "string", - "format": "date-time", - "title": "Last Deployment Date", - "description": "The date and time of the last time the environment was deployed." - }, - "lastDeploymentComment": { - "type": "string", - "title": "Last Deployment Comment", - "description": "comment on the last deployment" - } + { + "identifier": "humanitecEnvironment", + "title": "Environment", + "icon": "Environment", + "schema": { + "properties": { + "type": { + "title": "Type", + "icon": "DefaultProperty", + "type": "string" }, - "required": [] - }, - "mirrorProperties": {}, - "calculationProperties": {}, - "aggregationProperties": {}, - "relations": { - "humanitecApplication": { - "title": "Application", - "target": "humanitecApplication", - "required": false, - "many": false + "createdAt": { + "type": "string", + "format": "date-time", + "title": "Creation Date", + "description": "The date and time when the environment was created." + }, + "lastDeploymentStatus": { + "type": "string", + "title": "Last Deployment Status", + "description": "The status of the last deployment." + }, + "lastDeploymentDate": { + "type": "string", + "format": "date-time", + "title": "Last Deployment Date", + "description": "The date and time of the last time the environment was deployed." + }, + "lastDeploymentComment": { + "type": "string", + "title": "Last Deployment Comment", + "description": "comment on the last deployment" } - } + }, + "required": [] }, + "mirrorProperties": {}, + "calculationProperties": {}, + "aggregationProperties": {}, + "relations": { + "humanitecApplication": { + "title": "Application", + "target": "humanitecApplication", + "required": false, + "many": false + } + } + }, { "identifier": "humanitecWorkload", "title": "Workload", @@ -127,7 +127,7 @@ }, { "identifier": "humanitecResource", - "title": "Humanitec Resource", + "title": "Active Resource", "icon": "Microservice", "schema": { "properties": { @@ -154,12 +154,6 @@ "description": "The schema of the resource", "type": "object", "icon": "DefaultProperty" - }, - "guresid": { - "title": "GU Resource ID", - "description": "The GU resource ID", - "type": "string", - "icon": "DefaultProperty" } }, "required": [] @@ -168,13 +162,6 @@ "calculationProperties": {}, "aggregationProperties": {}, "relations": { - "humanitecResourceGraph": { - "title": "Depends On", - "description": "Resource Graph", - "target": "humanitecResourceGraph", - "required": false, - "many": true - }, "humanitecWorkload": { "title": "Humanitec Workload", "target": "humanitecWorkload", @@ -196,8 +183,14 @@ "calculationProperties": {}, "aggregationProperties": {}, "relations": { + "humanitecEnvironment": { + "title": "Environment", + "target": "humanitecEnvironment", + "required": false, + "many": false + }, "humanitecResourceGraph": { - "title": "Resource Graph", + "title": "Depends On", "target": "humanitecResourceGraph", "required": false, "many": true