Skip to content

Commit

Permalink
[Integration][PagerDuty] Fix Rate limit from querying Pagerduty servi…
Browse files Browse the repository at this point in the history
…ce analytics (#1085)

# Description

What - Implemented rate limiter on PagerDuty

Why - Lots of requests are failing as 429.

How - Used a single sephamore since concurrent requests are not advised

## Type of change

- [ ] Bug fix (non-breaking change which fixes an issue)

<h4> All tests should be run against the port production
environment(using a testing org). </h4>

### Core testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector


### Integration testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Resync finishes successfully
- [ ] If new resource kind is added or updated in the integration, add
example raw data, mapping and expected result to the `examples` folder
in the integration directory.
- [ ] If resource kind is updated, run the integration with the example
data and check if the expected result is achieved
- [ ] If new resource kind is added or updated, validate that
live-events for that resource are working as expected
- [ ] Docs PR link [here](#)

### Preflight checklist

- [ ] Handled rate limiting
- [ ] Handled pagination
- [ ] Implemented the code in async
- [ ] Support Multi account

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.

---------

Co-authored-by: PagesCoffy <[email protected]>
Co-authored-by: shariff-6 <[email protected]>
Co-authored-by: Tom Tankilevitch <[email protected]>
Co-authored-by: Matan <[email protected]>
  • Loading branch information
5 people authored Nov 21, 2024
1 parent 7d6cbb4 commit 6e438e6
Show file tree
Hide file tree
Showing 6 changed files with 378 additions and 104 deletions.
8 changes: 8 additions & 0 deletions integrations/pagerduty/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.1.118 (2024-11-21)


### Improvements

- Added a mechanism to control concurrency and rate limit using semaphores (0.1.118)


## 0.1.117 (2024-11-21)


Expand Down
157 changes: 82 additions & 75 deletions integrations/pagerduty/clients/pagerduty.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from typing import Any, AsyncGenerator
import asyncio
from typing import Any, AsyncGenerator, Dict, Optional

import httpx
from loguru import logger

from port_ocean.utils import http_async_client
from port_ocean.context.event import event
from port_ocean.utils import http_async_client

from .utils import get_date_range_for_last_n_months

USER_KEY = "users"

MAX_CONCURRENT_REQUESTS = 10


class PagerDutyClient:
def __init__(self, token: str, api_url: str, app_host: str | None):
Expand All @@ -17,6 +20,7 @@ def __init__(self, token: str, api_url: str, app_host: str | None):
self.app_host = app_host
self.http_client = http_async_client
self.http_client.headers.update(self.api_auth_param["headers"])
self._semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)

@property
def incident_upsert_events(self) -> list[str]:
Expand Down Expand Up @@ -67,51 +71,40 @@ def api_auth_param(self) -> dict[str, Any]:
}

async def paginate_request_to_pager_duty(
self, data_key: str, params: dict[str, Any] | None = None
self, resource: str, params: dict[str, Any] | None = None
) -> AsyncGenerator[list[dict[str, Any]], None]:
url = f"{self.api_url}/{data_key}"
offset = 0
has_more_data = True

while has_more_data:
try:
response = await self.http_client.get(
url, params={"offset": offset, **(params or {})}
data = await self.send_api_request(
endpoint=resource, query_params={"offset": offset, **(params or {})}
)
response.raise_for_status()
data = response.json()
yield data[data_key]
yield data[resource]

has_more_data = data["more"]
if has_more_data:
offset += data["limit"]
except httpx.HTTPStatusError as e:
logger.error(
f"HTTP error with status code: {e.response.status_code} and response text: {e.response.text}"
f"Got {e.response.status_code} status code while fetching paginated data: {str(e)}"
)
raise
except httpx.HTTPError as e:
logger.error(f"HTTP occurred while fetching paginated data: {e}")
raise

async def get_singular_from_pager_duty(
self, object_type: str, identifier: str
) -> dict[str, Any]:
url = f"{self.api_url}/{object_type}/{identifier}"

try:
response = await self.http_client.get(url)
response.raise_for_status()
data = response.json()
data = await self.send_api_request(
endpoint=f"{object_type}/{identifier}", method="GET"
)
return data
except httpx.HTTPStatusError as e:
except (httpx.HTTPStatusError, httpx.HTTPError) as e:
logger.error(
f"HTTP error with status code: {e.response.status_code} and response text: {e.response.text}"
f"Error fetching data for {object_type} with identifier {identifier}: {e}"
)
raise
except httpx.HTTPError as e:
logger.error(f"HTTP occurred while fetching data: {e}")
raise

async def create_webhooks_if_not_exists(self) -> None:
if not self.app_host:
Expand All @@ -123,7 +116,7 @@ async def create_webhooks_if_not_exists(self) -> None:

invoke_url = f"{self.app_host}/integration/webhook"
async for subscriptions in self.paginate_request_to_pager_duty(
data_key="webhook_subscriptions"
resource="webhook_subscriptions"
):
for webhook in subscriptions:
if webhook["delivery_method"]["url"] == invoke_url:
Expand All @@ -143,16 +136,11 @@ async def create_webhooks_if_not_exists(self) -> None:
}

try:
await self.http_client.post(
f"{self.api_url}/webhook_subscriptions", json=body
await self.send_api_request(
endpoint="webhook_subscriptions", method="POST", json_data=body
)
except httpx.HTTPStatusError as e:
logger.error(
f"HTTP error with status code: {e.response.status_code} and response text: {e.response.text}"
)
except httpx.HTTPError as e:
logger.error(f"HTTP occurred while creating webhook subscription {e}")
raise
except (httpx.HTTPStatusError, httpx.HTTPError) as e:
logger.error(f"Error creating webhook subscription: {e}")

async def get_oncall_user(
self, *escalation_policy_ids: str
Expand All @@ -167,7 +155,7 @@ async def get_oncall_user(
oncalls = []

async for oncall_batch in self.paginate_request_to_pager_duty(
data_key="oncalls", params=params
resource="oncalls", params=params
):
logger.info(f"Received oncalls with batch size {len(oncall_batch)}")
oncalls.extend(oncall_batch)
Expand Down Expand Up @@ -197,67 +185,86 @@ async def update_oncall_users(

async def get_incident_analytics(self, incident_id: str) -> dict[str, Any]:
logger.info(f"Fetching analytics for incident: {incident_id}")
url = f"{self.api_url}/analytics/raw/incidents/{incident_id}"

try:
response = await self.http_client.get(url)
response.raise_for_status()
data = response.json()
data = await self.send_api_request(
endpoint=f"analytics/raw/incidents/{incident_id}", method="GET"
)
return data
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
except (httpx.HTTPStatusError, httpx.HTTPError) as e:
if isinstance(e, httpx.HTTPStatusError) and e.response.status_code == 404:
logger.debug(
f"Incident {incident_id} analytics data was not found, skipping..."
)
return {}

logger.error(
f"HTTP error with status code: {e.response.status_code} and response text: {e.response.text}"
)
return {}
except httpx.HTTPError as e:
logger.error(f"HTTP occurred while fetching incident analytics data: {e}")
return {}
else:
logger.error(f"Error fetching incident analytics data: {e}")
return {}

async def get_service_analytics(
self, service_id: str, months_period: int = 3
) -> dict[str, Any]:
) -> Dict[str, Any]:
logger.info(f"Fetching analytics for service: {service_id}")
url = f"{self.api_url}/analytics/metrics/incidents/services"
date_ranges = get_date_range_for_last_n_months(months_period)

try:
body = {
"filters": {
"service_ids": [service_id],
"created_at_start": date_ranges[0],
"created_at_end": date_ranges[1],
}
body = {
"filters": {
"service_ids": [service_id],
"created_at_start": date_ranges[0],
"created_at_end": date_ranges[1],
}
response = await self.http_client.post(url, json=body)
response.raise_for_status()
}

try:
response = await self.send_api_request(
"analytics/metrics/incidents/services",
method="POST",
json_data=body,
extensions={"retryable": True},
)
logger.info(f"Successfully fetched analytics for service: {service_id}")
return response.get("data", [])[0] if response.get("data") else {}

return response.json()["data"][0] if response.json()["data"] else {}
except (httpx.HTTPStatusError, httpx.HTTPError) as e:
logger.error(f"Error fetching analytics for service {service_id}: {e}")
raise

except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
logger.debug(
f"Service {service_id} analytics data was not found, skipping..."
)
return {}
async def send_api_request(
self,
endpoint: str,
method: str = "GET",
query_params: Optional[dict[str, Any]] = None,
json_data: Optional[dict[str, Any]] = None,
extensions: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
logger.debug(
f"Sending API request to {method} {endpoint} with query params: {query_params}"
)

logger.error(
f"HTTP error with status code: {e.response.status_code} and response text: {e.response.text}"
)
return {}
except httpx.HTTPError as e:
logger.error(f"HTTP occurred while fetching service analytics data: {e}")
return {}
async with self._semaphore:
try:
response = await self.http_client.request(
method=method,
url=f"{self.api_url}/{endpoint}",
params=query_params,
json=json_data,
extensions=extensions,
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
logger.debug(
f"Resource not found at endpoint '{endpoint}' with params: {query_params}, method: {method}"
)
return {}
logger.error(
f"HTTP error for endpoint '{endpoint}': Status code {e.response.status_code}, Method: {method}, Query params: {query_params}, Response text: {e.response.text}"
)
raise

async def fetch_and_cache_users(self) -> None:
async for users in self.paginate_request_to_pager_duty(data_key=USER_KEY):
async for users in self.paginate_request_to_pager_duty(resource=USER_KEY):
for user in users:
event.attributes[user["id"]] = user["email"]

Expand Down
7 changes: 3 additions & 4 deletions integrations/pagerduty/integration.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
from typing import Literal, Any

from pydantic.fields import Field
from pydantic.main import BaseModel
from typing import Any, Literal

from port_ocean.core.handlers.port_app_config.api import APIPortAppConfig
from port_ocean.core.handlers.port_app_config.models import (
Expand All @@ -10,6 +7,8 @@
Selector,
)
from port_ocean.core.integrations.base import BaseIntegration
from pydantic.fields import Field
from pydantic.main import BaseModel

from clients.utils import (
get_date_range_for_last_n_months,
Expand Down
47 changes: 23 additions & 24 deletions integrations/pagerduty/main.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import asyncio
import typing
from typing import Any
import asyncio

from loguru import logger
from port_ocean.context.event import event
from port_ocean.context.ocean import ocean
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE

from clients.pagerduty import PagerDutyClient
from integration import ObjectKind, PagerdutyServiceResourceConfig
from integration import (
ObjectKind,
PagerdutyEscalationPolicyResourceConfig,
PagerdutyIncidentResourceConfig,
PagerdutyScheduleResourceConfig,
PagerdutyOncallResourceConfig,
PagerdutyEscalationPolicyResourceConfig,
PagerdutyScheduleResourceConfig,
PagerdutyServiceResourceConfig,
)
from port_ocean.context.event import event
from port_ocean.context.ocean import ocean
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE


def initialize_client() -> PagerDutyClient:
Expand All @@ -28,20 +29,18 @@ def initialize_client() -> PagerDutyClient:
async def enrich_service_with_analytics_data(
client: PagerDutyClient, services: list[dict[str, Any]], months_period: int
) -> list[dict[str, Any]]:
analytics_data = await asyncio.gather(
*[
client.get_service_analytics(service["id"], months_period)
for service in services
]
async def fetch_service_analytics(service: dict[str, Any]) -> dict[str, Any]:
try:
analytics = await client.get_service_analytics(service["id"], months_period)
return {**service, "__analytics": analytics}
except Exception as e:
logger.error(f"Failed to fetch analytics for service {service['id']}: {e}")
return {**service, "__analytics": None}

return await asyncio.gather(
*[fetch_service_analytics(service) for service in services]
)

enriched_services = [
{**service, "__analytics": analytics}
for service, analytics in zip(services, analytics_data)
]

return enriched_services


async def enrich_incidents_with_analytics_data(
client: PagerDutyClient,
Expand Down Expand Up @@ -71,7 +70,7 @@ async def on_incidents_resync(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
query_params = selector.api_query_params

async for incidents in pager_duty_client.paginate_request_to_pager_duty(
data_key=ObjectKind.INCIDENTS,
resource=ObjectKind.INCIDENTS,
params=query_params.generate_request_params() if query_params else None,
):
logger.info(f"Received batch with {len(incidents)} incidents")
Expand All @@ -95,7 +94,7 @@ async def on_services_resync(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
).selector

async for services in pager_duty_client.paginate_request_to_pager_duty(
data_key=ObjectKind.SERVICES,
resource=ObjectKind.SERVICES,
params=(
selector.api_query_params.generate_request_params()
if selector.api_query_params
Expand All @@ -121,7 +120,7 @@ async def on_schedules_resync(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
).selector.api_query_params

async for schedules in pager_duty_client.paginate_request_to_pager_duty(
data_key=ObjectKind.SCHEDULES,
resource=ObjectKind.SCHEDULES,
params=query_params.generate_request_params() if query_params else None,
):
yield await pager_duty_client.transform_user_ids_to_emails(schedules)
Expand All @@ -137,7 +136,7 @@ async def on_oncalls_resync(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
).selector.api_query_params

async for oncalls in pager_duty_client.paginate_request_to_pager_duty(
data_key=ObjectKind.ONCALLS,
resource=ObjectKind.ONCALLS,
params=query_params.generate_request_params() if query_params else None,
):
yield oncalls
Expand All @@ -152,7 +151,7 @@ async def on_escalation_policies_resync(kind: str) -> ASYNC_GENERATOR_RESYNC_TYP
).selector

async for escalation_policies in pager_duty_client.paginate_request_to_pager_duty(
data_key=ObjectKind.ESCALATION_POLICIES,
resource=ObjectKind.ESCALATION_POLICIES,
params=(
selector.api_query_params.generate_request_params()
if selector.api_query_params
Expand Down
Loading

0 comments on commit 6e438e6

Please sign in to comment.