diff --git a/CHANGELOG.md b/CHANGELOG.md index 21af602d97..427df2960e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,16 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm +## 0.12.9 (2024-11-07) + + +### Bug Fixes + +- Await logger writing exception on exit (Integration logs not being ingested) +- Await logger thread on exit (Integration logs not being ingested) +- Serialize exception (Integration logs not being ingested) + + ## 0.12.8 (2024-11-04) diff --git a/integrations/azure-devops/CHANGELOG.md b/integrations/azure-devops/CHANGELOG.md index fb25c3a3d0..ed1272aefd 100644 --- a/integrations/azure-devops/CHANGELOG.md +++ b/integrations/azure-devops/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +## 0.1.79 (2024-11-07) + + +### Bug Fixes + +- Fixed the API endpoint used in the boards kind to iterate through all project teams, ensuring non-default team boards and columns are ingested + + ## 0.1.78 (2024-11-06) diff --git a/integrations/azure-devops/azure_devops/client/azure_devops_client.py b/integrations/azure-devops/azure_devops/client/azure_devops_client.py index 666110ce10..4e86886148 100644 --- a/integrations/azure-devops/azure_devops/client/azure_devops_client.py +++ b/integrations/azure-devops/azure_devops/client/azure_devops_client.py @@ -286,24 +286,27 @@ async def get_columns(self) -> AsyncGenerator[list[dict[str, Any]], None]: ] async def _enrich_boards( - self, boards: list[dict[str, Any]], project_id: str + self, boards: list[dict[str, Any]], project_id: str, team_id: str ) -> list[dict[str, Any]]: for board in boards: response = await self.send_request( "GET", - f"{self._organization_base_url}/{project_id}/{API_URL_PREFIX}/work/boards/{board['id']}", + f"{self._organization_base_url}/{project_id}/{team_id}/{API_URL_PREFIX}/work/boards/{board['id']}", ) board.update(response.json()) return boards - async def _get_boards(self, project_id: str) -> list[dict[str, Any]]: - get_boards_url = ( - f"{self._organization_base_url}/{project_id}/{API_URL_PREFIX}/work/boards" - ) - response = await self.send_request("GET", get_boards_url) - board_data = response.json().get("value", []) - logger.info(f"Found {len(board_data)} boards for project {project_id}") - return await self._enrich_boards(board_data, project_id) + async def _get_boards( + self, project_id: str + ) -> AsyncGenerator[list[dict[str, Any]], None]: + teams_url = f"{self._organization_base_url}/{API_URL_PREFIX}/projects/{project_id}/teams" + async for teams_in_project in self._get_paginated_by_top_and_skip(teams_url): + for team in teams_in_project: + get_boards_url = f"{self._organization_base_url}/{project_id}/{team['id']}/{API_URL_PREFIX}/work/boards" + response = await self.send_request("GET", get_boards_url) + board_data = response.json().get("value", []) + logger.info(f"Found {len(board_data)} boards for project {project_id}") + yield await self._enrich_boards(board_data, project_id, team["id"]) @cache_iterator_result() async def get_boards_in_organization( @@ -313,7 +316,8 @@ async def get_boards_in_organization( yield [ {**board, "__project": project} for project in projects - for board in await self._get_boards(project["id"]) + async for boards in self._get_boards(project["id"]) + for board in boards ] async def generate_subscriptions_webhook_events(self) -> list[WebhookEvent]: diff --git a/integrations/azure-devops/pyproject.toml b/integrations/azure-devops/pyproject.toml index 960386f154..e38f19bfb6 100644 --- a/integrations/azure-devops/pyproject.toml +++ b/integrations/azure-devops/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "azure-devops" -version = "0.1.78" +version = "0.1.79" description = "An Azure Devops Ocean integration" authors = ["Matan Geva "] diff --git a/integrations/azure-devops/tests/azure_devops/client/test_azure_devops_client.py b/integrations/azure-devops/tests/azure_devops/client/test_azure_devops_client.py index ab8a3ce55e..66f2003204 100644 --- a/integrations/azure-devops/tests/azure_devops/client/test_azure_devops_client.py +++ b/integrations/azure-devops/tests/azure_devops/client/test_azure_devops_client.py @@ -572,8 +572,10 @@ async def test_get_boards_in_organization(mock_event_context: MagicMock) -> None async def mock_generate_projects() -> AsyncGenerator[List[Dict[str, Any]], None]: yield [{"id": "proj1", "name": "Project One"}] - async def mock_get_boards(project_id: str) -> List[Dict[str, Any]]: - return [ + async def mock_get_boards( + project_id: str, + ) -> AsyncGenerator[List[Dict[str, Any]], None]: + yield [ {"id": "board1", "name": "Board One"}, {"id": "board2", "name": "Board Two"}, ] diff --git a/integrations/gitlab/CHANGELOG.md b/integrations/gitlab/CHANGELOG.md index bc7cec2836..0482c7ca9b 100644 --- a/integrations/gitlab/CHANGELOG.md +++ b/integrations/gitlab/CHANGELOG.md @@ -14,7 +14,6 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Bumped ocean version to ^0.12.8 - 0.1.135 (2024-10-31) ==================== diff --git a/port_ocean/clients/port/mixins/integrations.py b/port_ocean/clients/port/mixins/integrations.py index 9a7de7ffcb..9148dec519 100644 --- a/port_ocean/clients/port/mixins/integrations.py +++ b/port_ocean/clients/port/mixins/integrations.py @@ -3,7 +3,6 @@ import httpx from loguru import logger - from port_ocean.clients.port.authentication import PortAuthentication from port_ocean.clients.port.utils import handle_status_code from port_ocean.log.sensetive import sensitive_log_filter diff --git a/port_ocean/log/handlers.py b/port_ocean/log/handlers.py index 2c13243e1a..5d01727c05 100644 --- a/port_ocean/log/handlers.py +++ b/port_ocean/log/handlers.py @@ -11,16 +11,22 @@ from port_ocean import Ocean from port_ocean.context.ocean import ocean +from copy import deepcopy +from traceback import format_exception def _serialize_record(record: logging.LogRecord) -> dict[str, Any]: + extra = {**deepcopy(record.__dict__["extra"])} + if isinstance(extra.get("exc_info"), Exception): + serialized_exception = "".join(format_exception(extra.get("exc_info"))) + extra["exc_info"] = serialized_exception return { "message": record.msg, "level": record.levelname, "timestamp": datetime.utcfromtimestamp(record.created).strftime( "%Y-%m-%dT%H:%M:%S.%fZ" ), - "extra": record.__dict__["extra"], + "extra": extra, } @@ -37,6 +43,7 @@ def __init__( self.flush_size = flush_size self.last_flush_time = time.time() self._serialized_buffer: list[dict[str, Any]] = [] + self._thread_pool: list[threading.Thread] = [] @property def ocean(self) -> Ocean | None: @@ -46,6 +53,7 @@ def ocean(self) -> Ocean | None: return None def emit(self, record: logging.LogRecord) -> None: + self._serialized_buffer.append(_serialize_record(record)) super().emit(record) @@ -61,6 +69,11 @@ def shouldFlush(self, record: logging.LogRecord) -> bool: return True return False + def wait_for_lingering_threads(self) -> None: + for thread in self._thread_pool: + if thread.is_alive(): + thread.join() + def flush(self) -> None: if self.ocean is None or not self.buffer: return @@ -70,13 +83,21 @@ def _wrap_event_loop(_ocean: Ocean, logs_to_send: list[dict[str, Any]]) -> None: loop.run_until_complete(self.send_logs(_ocean, logs_to_send)) loop.close() + def clear_thread_pool() -> None: + for thread in self._thread_pool: + if not thread.is_alive(): + self._thread_pool.remove(thread) + self.acquire() logs = list(self._serialized_buffer) if logs: self.buffer.clear() self._serialized_buffer.clear() self.last_flush_time = time.time() - threading.Thread(target=_wrap_event_loop, args=(self.ocean, logs)).start() + clear_thread_pool() + thread = threading.Thread(target=_wrap_event_loop, args=(self.ocean, logs)) + thread.start() + self._thread_pool.append(thread) self.release() async def send_logs( diff --git a/port_ocean/log/logger_setup.py b/port_ocean/log/logger_setup.py index 789cbfe729..0ad692a188 100644 --- a/port_ocean/log/logger_setup.py +++ b/port_ocean/log/logger_setup.py @@ -55,6 +55,7 @@ def _http_loguru_handler(level: LogLevelType) -> None: logger.configure(patcher=exception_deserializer) http_memory_handler = HTTPMemoryHandler() + signal_handler.register(http_memory_handler.wait_for_lingering_threads) signal_handler.register(http_memory_handler.flush) queue_listener = QueueListener(queue, http_memory_handler) diff --git a/port_ocean/ocean.py b/port_ocean/ocean.py index 57232f2d50..eb0e3abca7 100644 --- a/port_ocean/ocean.py +++ b/port_ocean/ocean.py @@ -123,6 +123,7 @@ async def lifecycle(_: FastAPI) -> AsyncIterator[None]: yield None except Exception: logger.exception("Integration had a fatal error. Shutting down.") + logger.complete() sys.exit("Server stopped") finally: signal_handler.exit() diff --git a/port_ocean/tests/log/test_handlers.py b/port_ocean/tests/log/test_handlers.py new file mode 100644 index 0000000000..99ba9b90de --- /dev/null +++ b/port_ocean/tests/log/test_handlers.py @@ -0,0 +1,71 @@ +from port_ocean.log.handlers import _serialize_record +from loguru import logger +from logging import LogRecord +from queue import Queue +from logging.handlers import QueueHandler +from typing import Callable, Any + + +log_message = "This is a test log message." +exception_grouop_message = "Test Exception group" +exception_message = "Test Exception" +expected_keys = ["message", "level", "timestamp", "extra"] + + +def test_serialize_record_log_shape() -> None: + record = log_record( + lambda: logger.exception( + log_message, + exc_info=None, + ) + ) + serialized_record = _serialize_record(record) + assert all(key in serialized_record for key in expected_keys) + assert log_message in serialized_record.get("message", None) + + +def test_serialize_record_exc_info_single_exception() -> None: + record = log_record( + lambda: logger.exception( + log_message, + exc_info=ExceptionGroup( + exception_grouop_message, [Exception(exception_message)] + ), + ) + ) + serialized_record = _serialize_record(record) + exc_info = assert_extra(serialized_record.get("extra", {})) + assert exception_grouop_message in exc_info + assert exception_message in exc_info + + +def test_serialize_record_exc_info_group_exception() -> None: + record = log_record( + lambda: logger.exception(log_message, exc_info=Exception(exception_message)) + ) + serialized_record = _serialize_record(record) + exc_info = assert_extra(serialized_record.get("extra", {})) + assert exception_message in exc_info + + +def assert_extra(extra: dict[str, Any]) -> str: + exc_info = extra.get("exc_info", None) + assert type(exc_info) is str + return exc_info + + +def log_record(cb: Callable[[], None]) -> LogRecord: + queue = Queue[LogRecord]() + queue_handler = QueueHandler(queue) + logger_id = logger.add( + queue_handler, + level="DEBUG", + format="{message}", + diagnose=False, + enqueue=True, + ) + cb() + logger.complete() + logger.remove(logger_id) + record = queue.get() + return record