Skip to content

Commit

Permalink
[Core] [Logging] | Integration logs not being ingested (#1120)
Browse files Browse the repository at this point in the history
# Description

**What:**
- Fix logs not being ingested on integrations fails or exceptions.

**Why:**

- The logger flush method is spawning a new thread that is not awaited
when the program exists.
- The logger accepts 'exc_info' that cannot be serialized.
- Upon shutdown of the program the logger writes a message that needs to
be awaited in order to be sent to Port API.

**How:**
- Added method - 'wait_for_lingering_threads' to 'HTTPMemoryHandler'
that will wait for running threads.
'wait_for_lingering_threads' is invoked via the 'SignalHandler' upon
exit.
- Modified method - '_serialize_record' to look for 'exc_info' in each
log and serialize it using 'traceback'.
- Added call - 'logger.complete' so log that is sent on exit of program
will be sent to Port API.

## Type of change

Please leave one option from the following and delete the rest:

- [x] Bug fix (non-breaking change which fixes an issue)

<h4> All tests should be run against the port production
environment(using a testing org). </h4>

### Core testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.

---------

Co-authored-by: Ivan Kalinovski <[email protected]>
Co-authored-by: Shalev Avhar <[email protected]>
Co-authored-by: Shalev Avhar <[email protected]>
Co-authored-by: Tom Tankilevitch <[email protected]>
Co-authored-by: Port Bot <[email protected]>
Co-authored-by: GitHub Action <[email protected]>
  • Loading branch information
7 people authored Nov 7, 2024
1 parent ddf2d8b commit 0faaf0d
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 4 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.12.9 (2024-11-07)


### Bug Fixes

- Await logger writing exception on exit (Integration logs not being ingested)
- Await logger thread on exit (Integration logs not being ingested)
- Serialize exception (Integration logs not being ingested)


## 0.12.8 (2024-11-04)


Expand Down
1 change: 0 additions & 1 deletion integrations/gitlab/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Bumped ocean version to ^0.12.8


0.1.135 (2024-10-31)
====================

Expand Down
1 change: 0 additions & 1 deletion port_ocean/clients/port/mixins/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import httpx
from loguru import logger

from port_ocean.clients.port.authentication import PortAuthentication
from port_ocean.clients.port.utils import handle_status_code
from port_ocean.log.sensetive import sensitive_log_filter
Expand Down
25 changes: 23 additions & 2 deletions port_ocean/log/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@

from port_ocean import Ocean
from port_ocean.context.ocean import ocean
from copy import deepcopy
from traceback import format_exception


def _serialize_record(record: logging.LogRecord) -> dict[str, Any]:
extra = {**deepcopy(record.__dict__["extra"])}
if isinstance(extra.get("exc_info"), Exception):
serialized_exception = "".join(format_exception(extra.get("exc_info")))
extra["exc_info"] = serialized_exception
return {
"message": record.msg,
"level": record.levelname,
"timestamp": datetime.utcfromtimestamp(record.created).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
"extra": record.__dict__["extra"],
"extra": extra,
}


Expand All @@ -37,6 +43,7 @@ def __init__(
self.flush_size = flush_size
self.last_flush_time = time.time()
self._serialized_buffer: list[dict[str, Any]] = []
self._thread_pool: list[threading.Thread] = []

@property
def ocean(self) -> Ocean | None:
Expand All @@ -46,6 +53,7 @@ def ocean(self) -> Ocean | None:
return None

def emit(self, record: logging.LogRecord) -> None:

self._serialized_buffer.append(_serialize_record(record))
super().emit(record)

Expand All @@ -61,6 +69,11 @@ def shouldFlush(self, record: logging.LogRecord) -> bool:
return True
return False

def wait_for_lingering_threads(self) -> None:
for thread in self._thread_pool:
if thread.is_alive():
thread.join()

def flush(self) -> None:
if self.ocean is None or not self.buffer:
return
Expand All @@ -70,13 +83,21 @@ def _wrap_event_loop(_ocean: Ocean, logs_to_send: list[dict[str, Any]]) -> None:
loop.run_until_complete(self.send_logs(_ocean, logs_to_send))
loop.close()

def clear_thread_pool() -> None:
for thread in self._thread_pool:
if not thread.is_alive():
self._thread_pool.remove(thread)

self.acquire()
logs = list(self._serialized_buffer)
if logs:
self.buffer.clear()
self._serialized_buffer.clear()
self.last_flush_time = time.time()
threading.Thread(target=_wrap_event_loop, args=(self.ocean, logs)).start()
clear_thread_pool()
thread = threading.Thread(target=_wrap_event_loop, args=(self.ocean, logs))
thread.start()
self._thread_pool.append(thread)
self.release()

async def send_logs(
Expand Down
1 change: 1 addition & 0 deletions port_ocean/log/logger_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def _http_loguru_handler(level: LogLevelType) -> None:
logger.configure(patcher=exception_deserializer)

http_memory_handler = HTTPMemoryHandler()
signal_handler.register(http_memory_handler.wait_for_lingering_threads)
signal_handler.register(http_memory_handler.flush)

queue_listener = QueueListener(queue, http_memory_handler)
Expand Down
1 change: 1 addition & 0 deletions port_ocean/ocean.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async def lifecycle(_: FastAPI) -> AsyncIterator[None]:
yield None
except Exception:
logger.exception("Integration had a fatal error. Shutting down.")
logger.complete()
sys.exit("Server stopped")
finally:
signal_handler.exit()
Expand Down
71 changes: 71 additions & 0 deletions port_ocean/tests/log/test_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from port_ocean.log.handlers import _serialize_record
from loguru import logger
from logging import LogRecord
from queue import Queue
from logging.handlers import QueueHandler
from typing import Callable, Any


log_message = "This is a test log message."
exception_grouop_message = "Test Exception group"
exception_message = "Test Exception"
expected_keys = ["message", "level", "timestamp", "extra"]


def test_serialize_record_log_shape() -> None:
record = log_record(
lambda: logger.exception(
log_message,
exc_info=None,
)
)
serialized_record = _serialize_record(record)
assert all(key in serialized_record for key in expected_keys)
assert log_message in serialized_record.get("message", None)


def test_serialize_record_exc_info_single_exception() -> None:
record = log_record(
lambda: logger.exception(
log_message,
exc_info=ExceptionGroup(
exception_grouop_message, [Exception(exception_message)]
),
)
)
serialized_record = _serialize_record(record)
exc_info = assert_extra(serialized_record.get("extra", {}))
assert exception_grouop_message in exc_info
assert exception_message in exc_info


def test_serialize_record_exc_info_group_exception() -> None:
record = log_record(
lambda: logger.exception(log_message, exc_info=Exception(exception_message))
)
serialized_record = _serialize_record(record)
exc_info = assert_extra(serialized_record.get("extra", {}))
assert exception_message in exc_info


def assert_extra(extra: dict[str, Any]) -> str:
exc_info = extra.get("exc_info", None)
assert type(exc_info) is str
return exc_info


def log_record(cb: Callable[[], None]) -> LogRecord:
queue = Queue[LogRecord]()
queue_handler = QueueHandler(queue)
logger_id = logger.add(
queue_handler,
level="DEBUG",
format="{message}",
diagnose=False,
enqueue=True,
)
cb()
logger.complete()
logger.remove(logger_id)
record = queue.get()
return record

0 comments on commit 0faaf0d

Please sign in to comment.