Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] [Logging] | Integration logs not being ingested #1120

Merged
Merged
10 changes: 10 additions & 0 deletions integrations/gitlab/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

0.1.136 (2024-11-05)
===================

### Bug Fixes

- Await logger writing exception on exit (Integration logs not being ingested)
- Await logger thread on exit (Integration logs not being ingested)
- Sirealize exception (Integration logs not being ingested)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

git grep -i sirealize



0.1.135 (2024-10-31)
====================

Expand Down
14 changes: 12 additions & 2 deletions port_ocean/clients/port/mixins/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

import httpx
from loguru import logger

import copy
from traceback import format_exception
from port_ocean.clients.port.authentication import PortAuthentication
from port_ocean.clients.port.utils import handle_status_code
from port_ocean.log.sensetive import sensitive_log_filter
Expand Down Expand Up @@ -99,15 +100,24 @@ async def patch_integration(
handle_status_code(response)
return response.json()["integration"]

def sirealize_logs(self,logs: list[dict[str, Any]]):
Tankilevitch marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a pretty isolated method, lets add a test for it

_logs = copy.deepcopy(logs)
for log in _logs:
if 'extra' in log.keys() and 'exc_info' in log['extra'].keys() and isinstance(log['extra']['exc_info'],Exception):
Tankilevitch marked this conversation as resolved.
Show resolved Hide resolved
sirealized_exception = ''.join(format_exception(log['extra']['exc_info']))
Tankilevitch marked this conversation as resolved.
Show resolved Hide resolved
log['extra']['exc_info'] = sirealized_exception
return _logs
Tankilevitch marked this conversation as resolved.
Show resolved Hide resolved

async def ingest_integration_logs(self, logs: list[dict[str, Any]]) -> None:
logger.debug("Ingesting logs")
sirealized_logs=self.sirealize_logs(logs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sirealized_logs=self.sirealize_logs(logs)
serialized_logs=self.serialize_logs(logs)

log_attributes = await self.get_log_attributes()
headers = await self.auth.headers()
response = await self.client.post(
log_attributes["ingestUrl"],
headers=headers,
json={
"logs": logs,
"logs": sirealized_logs,
},
)
handle_status_code(response, should_log=False)
Expand Down
17 changes: 16 additions & 1 deletion port_ocean/log/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(
self.flush_size = flush_size
self.last_flush_time = time.time()
self._serialized_buffer: list[dict[str, Any]] = []
self._thread_pool: list[threading.Thread] = []

@property
def ocean(self) -> Ocean | None:
Expand All @@ -60,6 +61,11 @@ def shouldFlush(self, record: logging.LogRecord) -> bool:
):
return True
return False

def wait_for_lingering_threads(self)->None:
for thread in self._thread_pool:
if thread.is_alive():
thread.join()

def flush(self) -> None:
if self.ocean is None or not self.buffer:
Expand All @@ -70,15 +76,24 @@ def _wrap_event_loop(_ocean: Ocean, logs_to_send: list[dict[str, Any]]) -> None:
loop.run_until_complete(self.send_logs(_ocean, logs_to_send))
loop.close()

def clear_thread_pool()->None:
for thread in self._thread_pool:
if not thread.is_alive():
self._thread_pool.remove(thread)

self.acquire()
logs = list(self._serialized_buffer)
if logs:
self.buffer.clear()
self._serialized_buffer.clear()
self.last_flush_time = time.time()
threading.Thread(target=_wrap_event_loop, args=(self.ocean, logs)).start()
clear_thread_pool()
thread=threading.Thread(target=_wrap_event_loop, args=(self.ocean, logs))
thread.start()
self._thread_pool.append(thread)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we supporting multiple threads flushing logs concurrently?
lets test long running, scheduled resync to verify we didn't break anything.

self.release()


async def send_logs(
self, _ocean: Ocean, logs_to_send: list[dict[str, Any]]
) -> None:
Expand Down
1 change: 1 addition & 0 deletions port_ocean/log/logger_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def _http_loguru_handler(level: LogLevelType) -> None:
logger.configure(patcher=exception_deserializer)

http_memory_handler = HTTPMemoryHandler()
signal_handler.register(http_memory_handler.wait_for_lingering_threads)
signal_handler.register(http_memory_handler.flush)

queue_listener = QueueListener(queue, http_memory_handler)
Expand Down
1 change: 1 addition & 0 deletions port_ocean/ocean.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async def lifecycle(_: FastAPI) -> AsyncIterator[None]:
yield None
except Exception:
logger.exception("Integration had a fatal error. Shutting down.")
logger.complete()
sys.exit("Server stopped")
finally:
signal_handler.exit()
Expand Down
Loading