Skip to content

Commit

Permalink
Merge pull request #8 from port-labs/dev
Browse files Browse the repository at this point in the history
Resolved issues with caching
  • Loading branch information
mk-armah authored Jul 29, 2024
2 parents dc58c69 + 0aa56da commit ed66153
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 232 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
__pycache__
venv
venv
**/__init__.py
1 change: 1 addition & 0 deletions integration/clients/cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
from typing import Dict, Any


class InMemoryCache:
def __init__(self):
self.cache = {}
Expand Down
157 changes: 89 additions & 68 deletions integration/clients/humanitec_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
class CACHE_KEYS:
APPLICATION = "APPLICATION_CACHE_KEY"
ENVIRONMENT = "ENVIRONMENT_CACHE_KEY"
WORKLOAD = "WORKLOAD_CACHE_KEY"
RESOURCE = "RESOURCE_CACHE_KEY"


Expand All @@ -36,7 +35,7 @@ async def send_api_request(
method: str,
endpoint: str,
headers: Dict[str, str] | None = None,
json: Dict[str, Any] | None = None,
json: Dict[str, Any] | List[Dict[str, Any]] | None = None,
) -> Any:
url = self.base_url + endpoint
try:
Expand Down Expand Up @@ -72,91 +71,113 @@ async def get_all_applications(self) -> List[Dict[str, Any]]:
return applications

async def get_all_environments(self, app) -> List[Dict[str, Any]]:
if cached_environments := await self.cache.get(CACHE_KEYS.ENVIRONMENT):
cached_environments = cached_environments.get(app["id"], {})
logger.info(
f"Retrieved {len(cached_environments)} environment for {app['id']} from cache"
)
return list(cached_environments.values())

endpoint = f"apps/{app['id']}/envs"
humanitec_headers = self.get_humanitec_headers()
environments: List[Dict[str, Any]] = await self.send_api_request(
"GET", endpoint, headers=humanitec_headers
)
await self.cache.set(
CACHE_KEYS.ENVIRONMENT,
{
app["id"]: {
environment["id"]: environment for environment in environments
}
},
)
logger.info(f"Received {len(environments)} environments from Humanitec")
return environments
try:
if cached_environments := await self.cache.get(CACHE_KEYS.ENVIRONMENT):
if app_environments := cached_environments.get(app["id"]):
logger.info(
f"Retrieved {len(app_environments)} environment for {app['id']} from cache"
)
return list(app_environments.values())

logger.info("Fetching environments from Humanitec")

endpoint = f"apps/{app['id']}/envs"
humanitec_headers = self.get_humanitec_headers()
environments: List[Dict[str, Any]] = await self.send_api_request(
"GET", endpoint, headers=humanitec_headers
)
await self.cache.set(
CACHE_KEYS.ENVIRONMENT,
{
app["id"]: {
environment["id"]: environment for environment in environments
}
},
)
logger.info(f"Received {len(environments)} environments from Humanitec")
return environments
except Exception as e:
logger.error(f"Failed to fetch environments from {app['id']}: {str(e)}")
return []

async def get_all_resources(self, app, env) -> List[Dict[str, Any]]:
if cached_resources := await self.cache.get(CACHE_KEYS.RESOURCE):
cached_resources = cached_resources.get(app["id"], {}).get(env["id"], {})
try:
if cached_resources := await self.cache.get(CACHE_KEYS.RESOURCE):
if env_resources := cached_resources.get(app["id"], {}).get(
env["id"]
):
logger.info(
f"Retrieved {len(env_resources)} resources from cache for app {app['id']} and env {env['id']}"
)
return list(env_resources.values())

logger.info("Fetching resources from Humanitec")
endpoint = f"apps/{app['id']}/envs/{env['id']}/resources"
humanitec_headers = self.get_humanitec_headers()
resources: List[Dict[str, Any]] = await self.send_api_request(
"GET", endpoint, headers=humanitec_headers
)
await self.cache.set(
CACHE_KEYS.RESOURCE,
{
app["id"]: {
env["id"]: {
resource["gu_res_id"]: resource for resource in resources
}
}
},
)
logger.info(
f"Retrieved {len(cached_resources)} resources from cache for app {app['id']} and env {env['id']}"
f"Received {len(resources)} resources for {env['id']} environment in {app['id']}"
)
return list(cached_resources.values())
return resources
except Exception as e:
logger.error(
f"Failed to fetch resources for {env['id']} environment in {app[id]}: {str(e)}"
)
return []

endpoint = f"apps/{app['id']}/envs/{env['id']}/resources"
humanitec_headers = self.get_humanitec_headers()
resources: List[Dict[str, Any]] = await self.send_api_request(
"GET", endpoint, headers=humanitec_headers
)
await self.cache.set(
CACHE_KEYS.RESOURCE,
{
app["id"]: {
env["id"]: {
resource["gu_res_id"]: resource for resource in resources
}
}
},
)
logger.info(f"Received {len(resources)} resources from Humanitec")
return resources
async def get_dependency_graph(
self, app: Dict[str, Any], env: Dict[str, Any]
) -> List[Dict[str, Any]]:
try:
if dependency_graph_id := env.get("last_deploy", {}).get("dependency_graph_id"):
endpoint = f"apps/{app['id']}/envs/{env['id']}/resources/graphs/{dependency_graph_id}"
humanitec_headers = self.get_humanitec_headers()
graph = await self.send_api_request(
"GET", endpoint, headers=humanitec_headers
)
nodes = graph["nodes"]
logger.info(
f"Received {len(nodes)} graph nodes for {env['id']} environment in {app['id']}"
)
return nodes

logger.info(
f"No dependency graph found for {env['id']} environment in {app['id']}"
)
return []
except Exception as e:
logger.error(
f"Failed to fetch dependency graphs for {env['id']} environment in {app['id']}: {str(e)}"
)
return []

async def get_resource_graph(
self, app: str, env: str, data: List[Dict[str, Any]]
self, app: Dict[str, Any], env: Dict[str, Any], data: List[Dict[str, Any]]
) -> Any:
endpoint = f"apps/{app['id']}/envs/{env['id']}/resources/graph"
humanitec_headers = self.get_humanitec_headers()
graph = await self.send_api_request(
"POST", endpoint, headers=humanitec_headers, json=data
)

return graph

async def get_all_resource_graphs(
self, modules: List[Dict[str, Any]], app: str, env: str
) -> Any:

def get_resource_graph_request_body(modules):
return [
{
"id": module["gu_res_id"],
"type": module["type"],
"resource": module["resource"],
}
for module in modules
]
data = get_resource_graph_request_body(modules)

graph_entities = await self.get_resource_graph(app, env, data)
logger.info(
f"Received {len(graph_entities)} resource graph entities from app: {app['id']} and env: {env['id']} using data: {data}"
)
return graph_entities

def group_resources_by_type(
self, data: List[Dict[str, Any]]
) -> Dict[str, List[Dict[str, Any]]]:
grouped_resources = {}
grouped_resources: dict[str, Any] = {}
for resource in data:
workload_id = resource["res_id"].split(".")[0]
if workload_id not in grouped_resources:
Expand Down
6 changes: 2 additions & 4 deletions integration/clients/port_client.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import httpx
from typing import Any, Dict
from loguru import logger
from typing import List, Dict, Optional, Union
from .cache import InMemoryCache
from typing import Dict


class PortClient:
def __init__(self, client_id, client_secret, **kwargs) -> None:
self.httpx_async_client = kwargs.get("httpx_async_client", httpx.AsyncClient())
self.client_id = client_id
self.cache = InMemoryCache()
self.client_secret = client_secret
self.base_url = kwargs.get("base_url", "https://api.getport.io/v1")
self.port_headers = None
Expand Down Expand Up @@ -49,7 +47,7 @@ async def send_api_request(

async def upsert_entity(
self, blueprint_id: str, entity_object: Dict[str, Any]
) -> None:
) -> Dict[str, Any]:
endpoint = f"/blueprints/{blueprint_id}/entities?upsert=true&merge=true"
port_headers = (
self.port_headers if self.port_headers else await self.get_port_headers()
Expand Down
Loading

0 comments on commit ed66153

Please sign in to comment.