Skip to content

Commit

Permalink
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
Browse files Browse the repository at this point in the history
…e-limits
  • Loading branch information
phalbert authored Nov 7, 2024
2 parents 7ed7220 + 25140ed commit d8e1103
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 18 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.12.9 (2024-11-07)


### Bug Fixes

- Await logger writing exception on exit (Integration logs not being ingested)
- Await logger thread on exit (Integration logs not being ingested)
- Serialize exception (Integration logs not being ingested)


## 0.12.8 (2024-11-04)


Expand Down
8 changes: 8 additions & 0 deletions integrations/azure-devops/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.1.79 (2024-11-07)


### Bug Fixes

- Fixed the API endpoint used in the boards kind to iterate through all project teams, ensuring non-default team boards and columns are ingested


## 0.1.78 (2024-11-06)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,24 +286,27 @@ async def get_columns(self) -> AsyncGenerator[list[dict[str, Any]], None]:
]

async def _enrich_boards(
self, boards: list[dict[str, Any]], project_id: str
self, boards: list[dict[str, Any]], project_id: str, team_id: str
) -> list[dict[str, Any]]:
for board in boards:
response = await self.send_request(
"GET",
f"{self._organization_base_url}/{project_id}/{API_URL_PREFIX}/work/boards/{board['id']}",
f"{self._organization_base_url}/{project_id}/{team_id}/{API_URL_PREFIX}/work/boards/{board['id']}",
)
board.update(response.json())
return boards

async def _get_boards(self, project_id: str) -> list[dict[str, Any]]:
get_boards_url = (
f"{self._organization_base_url}/{project_id}/{API_URL_PREFIX}/work/boards"
)
response = await self.send_request("GET", get_boards_url)
board_data = response.json().get("value", [])
logger.info(f"Found {len(board_data)} boards for project {project_id}")
return await self._enrich_boards(board_data, project_id)
async def _get_boards(
self, project_id: str
) -> AsyncGenerator[list[dict[str, Any]], None]:
teams_url = f"{self._organization_base_url}/{API_URL_PREFIX}/projects/{project_id}/teams"
async for teams_in_project in self._get_paginated_by_top_and_skip(teams_url):
for team in teams_in_project:
get_boards_url = f"{self._organization_base_url}/{project_id}/{team['id']}/{API_URL_PREFIX}/work/boards"
response = await self.send_request("GET", get_boards_url)
board_data = response.json().get("value", [])
logger.info(f"Found {len(board_data)} boards for project {project_id}")
yield await self._enrich_boards(board_data, project_id, team["id"])

@cache_iterator_result()
async def get_boards_in_organization(
Expand All @@ -313,7 +316,8 @@ async def get_boards_in_organization(
yield [
{**board, "__project": project}
for project in projects
for board in await self._get_boards(project["id"])
async for boards in self._get_boards(project["id"])
for board in boards
]

async def generate_subscriptions_webhook_events(self) -> list[WebhookEvent]:
Expand Down
2 changes: 1 addition & 1 deletion integrations/azure-devops/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "azure-devops"
version = "0.1.78"
version = "0.1.79"
description = "An Azure Devops Ocean integration"
authors = ["Matan Geva <[email protected]>"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,10 @@ async def test_get_boards_in_organization(mock_event_context: MagicMock) -> None
async def mock_generate_projects() -> AsyncGenerator[List[Dict[str, Any]], None]:
yield [{"id": "proj1", "name": "Project One"}]

async def mock_get_boards(project_id: str) -> List[Dict[str, Any]]:
return [
async def mock_get_boards(
project_id: str,
) -> AsyncGenerator[List[Dict[str, Any]], None]:
yield [
{"id": "board1", "name": "Board One"},
{"id": "board2", "name": "Board Two"},
]
Expand Down
1 change: 0 additions & 1 deletion integrations/gitlab/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Bumped ocean version to ^0.12.8


0.1.135 (2024-10-31)
====================

Expand Down
1 change: 0 additions & 1 deletion port_ocean/clients/port/mixins/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import httpx
from loguru import logger

from port_ocean.clients.port.authentication import PortAuthentication
from port_ocean.clients.port.utils import handle_status_code
from port_ocean.log.sensetive import sensitive_log_filter
Expand Down
25 changes: 23 additions & 2 deletions port_ocean/log/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@

from port_ocean import Ocean
from port_ocean.context.ocean import ocean
from copy import deepcopy
from traceback import format_exception


def _serialize_record(record: logging.LogRecord) -> dict[str, Any]:
extra = {**deepcopy(record.__dict__["extra"])}
if isinstance(extra.get("exc_info"), Exception):
serialized_exception = "".join(format_exception(extra.get("exc_info")))
extra["exc_info"] = serialized_exception
return {
"message": record.msg,
"level": record.levelname,
"timestamp": datetime.utcfromtimestamp(record.created).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
"extra": record.__dict__["extra"],
"extra": extra,
}


Expand All @@ -37,6 +43,7 @@ def __init__(
self.flush_size = flush_size
self.last_flush_time = time.time()
self._serialized_buffer: list[dict[str, Any]] = []
self._thread_pool: list[threading.Thread] = []

@property
def ocean(self) -> Ocean | None:
Expand All @@ -46,6 +53,7 @@ def ocean(self) -> Ocean | None:
return None

def emit(self, record: logging.LogRecord) -> None:

self._serialized_buffer.append(_serialize_record(record))
super().emit(record)

Expand All @@ -61,6 +69,11 @@ def shouldFlush(self, record: logging.LogRecord) -> bool:
return True
return False

def wait_for_lingering_threads(self) -> None:
for thread in self._thread_pool:
if thread.is_alive():
thread.join()

def flush(self) -> None:
if self.ocean is None or not self.buffer:
return
Expand All @@ -70,13 +83,21 @@ def _wrap_event_loop(_ocean: Ocean, logs_to_send: list[dict[str, Any]]) -> None:
loop.run_until_complete(self.send_logs(_ocean, logs_to_send))
loop.close()

def clear_thread_pool() -> None:
for thread in self._thread_pool:
if not thread.is_alive():
self._thread_pool.remove(thread)

self.acquire()
logs = list(self._serialized_buffer)
if logs:
self.buffer.clear()
self._serialized_buffer.clear()
self.last_flush_time = time.time()
threading.Thread(target=_wrap_event_loop, args=(self.ocean, logs)).start()
clear_thread_pool()
thread = threading.Thread(target=_wrap_event_loop, args=(self.ocean, logs))
thread.start()
self._thread_pool.append(thread)
self.release()

async def send_logs(
Expand Down
1 change: 1 addition & 0 deletions port_ocean/log/logger_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def _http_loguru_handler(level: LogLevelType) -> None:
logger.configure(patcher=exception_deserializer)

http_memory_handler = HTTPMemoryHandler()
signal_handler.register(http_memory_handler.wait_for_lingering_threads)
signal_handler.register(http_memory_handler.flush)

queue_listener = QueueListener(queue, http_memory_handler)
Expand Down
1 change: 1 addition & 0 deletions port_ocean/ocean.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async def lifecycle(_: FastAPI) -> AsyncIterator[None]:
yield None
except Exception:
logger.exception("Integration had a fatal error. Shutting down.")
logger.complete()
sys.exit("Server stopped")
finally:
signal_handler.exit()
Expand Down
71 changes: 71 additions & 0 deletions port_ocean/tests/log/test_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from port_ocean.log.handlers import _serialize_record
from loguru import logger
from logging import LogRecord
from queue import Queue
from logging.handlers import QueueHandler
from typing import Callable, Any


log_message = "This is a test log message."
exception_grouop_message = "Test Exception group"
exception_message = "Test Exception"
expected_keys = ["message", "level", "timestamp", "extra"]


def test_serialize_record_log_shape() -> None:
record = log_record(
lambda: logger.exception(
log_message,
exc_info=None,
)
)
serialized_record = _serialize_record(record)
assert all(key in serialized_record for key in expected_keys)
assert log_message in serialized_record.get("message", None)


def test_serialize_record_exc_info_single_exception() -> None:
record = log_record(
lambda: logger.exception(
log_message,
exc_info=ExceptionGroup(
exception_grouop_message, [Exception(exception_message)]
),
)
)
serialized_record = _serialize_record(record)
exc_info = assert_extra(serialized_record.get("extra", {}))
assert exception_grouop_message in exc_info
assert exception_message in exc_info


def test_serialize_record_exc_info_group_exception() -> None:
record = log_record(
lambda: logger.exception(log_message, exc_info=Exception(exception_message))
)
serialized_record = _serialize_record(record)
exc_info = assert_extra(serialized_record.get("extra", {}))
assert exception_message in exc_info


def assert_extra(extra: dict[str, Any]) -> str:
exc_info = extra.get("exc_info", None)
assert type(exc_info) is str
return exc_info


def log_record(cb: Callable[[], None]) -> LogRecord:
queue = Queue[LogRecord]()
queue_handler = QueueHandler(queue)
logger_id = logger.add(
queue_handler,
level="DEBUG",
format="{message}",
diagnose=False,
enqueue=True,
)
cb()
logger.complete()
logger.remove(logger_id)
record = queue.get()
return record

0 comments on commit d8e1103

Please sign in to comment.