Skip to content

Commit

Permalink
Port 5475 ocean kill the integration if no partition assignment was m…
Browse files Browse the repository at this point in the history
…ade for the consumer (#392)

Description
What - If there are two instances sharing the same identifier for the integration type with a Kafka event listener, the second instance will remain outdated and unassigned to any partitions. Therefor, it will not get any resyncs. We want to terminate the redundant instance since Ocean does not support multiple instances concurrently.
Why - Ocean does not support multiple instances for the same integration & the kafka topic has only one partition.
How - Checking if the integration received any partitions and shutting down the integration if not

Type of change
 New feature (non-breaking change which adds functionality)
  • Loading branch information
yairsimantov20 authored Feb 20, 2024
1 parent 9d6605c commit 70fd236
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 59 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,23 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.5.1 (2024-02-20)


### Features

- Added handling for kafka consumer empty partition assignment and shutting the application down with an error (PORT-5475)
- Added QOL decorator to help with caching the third party response (PORT-5475_2)

### Improvements

- Changed the Kafka consumer to run in the event loop in async instead of sync in another thread (PORT-5475)

### Bug Fixes

- Fixed an issue causing all the character to be redacted when passing empty string to a sensitive field


## 0.5.0 (2024-02-18)


Expand Down
58 changes: 41 additions & 17 deletions port_ocean/consumers/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import threading
from typing import Any, Callable
import functools
import signal
from asyncio import get_running_loop, ensure_future
from typing import Any, Callable, Awaitable

from confluent_kafka import Consumer, KafkaException, Message # type: ignore
from loguru import logger
Expand All @@ -20,11 +22,12 @@ class KafkaConsumerConfig(BaseModel):
class KafkaConsumer:
def __init__(
self,
msg_process: Callable[[Message], None],
msg_process: Callable[[Message], Awaitable[None]],
config: KafkaConsumerConfig,
org_id: str,
) -> None:
self.running = False
self._assigned_partitions = False
self.org_id = org_id
self.config = config

Expand All @@ -38,6 +41,11 @@ def __init__(
"sasl.password": config.password,
"group.id": f"{self.org_id}.{config.group_name}",
"enable.auto.commit": "false",
# We use the cooperative-sticky assignment strategy to ensure that the same instance of the integration
# is always assigned the same partitions. This is important as only one instance of the integration
# can be assigned to a partition at a time and we dont want a running instance to lose its partitions
# when a new instance starts and causes a rebalance.
"partition.assignment.strategy": "cooperative-sticky",
}
else:
kafka_config = {
Expand All @@ -48,21 +56,37 @@ def __init__(

self.consumer = Consumer(kafka_config)

def start(self, event: threading.Event) -> None:
def _handle_partitions_assignment(self, _: Any, partitions: list[str]) -> None:
logger.info(f"Assigned partitions: {partitions}")
if not partitions and not self._assigned_partitions:
logger.error(
"No partitions assigned. This usually means that there is"
" already another integration from the same type and with"
" the same identifier running. Two integrations of the same"
" type and identifier cannot run at the same time."
)
signal.raise_signal(signal.SIGINT)
else:
self._assigned_partitions = True

async def start(self) -> None:
self.running = True
try:
logger.info("Start consumer...")
logger.info("Starting kafka consumer...")
topics = [f"{self.org_id}.change.log"]
self.consumer.subscribe(
topics,
on_assign=self._handle_partitions_assignment,
)
logger.info(f"Subscribed to topics: {topics}")

self.consumer.subscribe(
[f"{self.org_id}.change.log"],
on_assign=lambda _, partitions: logger.info(
f"Assignment: {partitions}"
),
)
logger.info("Subscribed to topics")
while self.running and not event.is_set():
loop = get_running_loop()
poll = functools.partial(
self.consumer.poll, timeout=self.config.consumer_poll_timeout
)
try:
while self.running:
try:
msg = self.consumer.poll(timeout=self.config.consumer_poll_timeout)
msg = await loop.run_in_executor(None, poll)
if msg is None:
continue
if msg.error():
Expand All @@ -73,7 +97,7 @@ def start(self, event: threading.Event) -> None:
"Process message "
f"from topic {msg.topic()}, partition {msg.partition()}, offset {msg.offset()}"
)
self.msg_process(msg)
ensure_future(self.msg_process(msg))

except Exception as process_error:
logger.exception(
Expand All @@ -89,6 +113,6 @@ def start(self, event: threading.Event) -> None:
self.exit_gracefully()

def exit_gracefully(self, *_: Any) -> None:
logger.info("Exiting gracefully...")
logger.info("Closing the kafka consumer gracefully...")
self.running = False
self.consumer.close()
61 changes: 21 additions & 40 deletions port_ocean/core/event_listener/kafka.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import json
import sys
import threading
from asyncio import ensure_future, Task
from typing import Any, Literal

from confluent_kafka import Message # type: ignore
Expand Down Expand Up @@ -70,8 +70,8 @@ def __init__(
self.org_id = org_id
self.integration_identifier = integration_identifier
self.integration_type = integration_type
self._consumer_kill_event = threading.Event()
self._running_task = None
self._running_task: Task[Any] | None = None
self.consumer: KafkaConsumer | None = None

async def _get_kafka_config(self) -> KafkaConsumerConfig:
"""
Expand Down Expand Up @@ -104,28 +104,7 @@ def _should_be_processed(self, msg_value: dict[Any, Any], topic: str) -> bool:

return False

def _resync_in_new_event_loop(self, message: dict[Any, Any]) -> None:
"""
A private method that handles incoming Kafka messages in a separate thread.
It triggers the `on_resync` event handler.
"""

async def try_wrapper() -> None:
try:
await self.events["on_resync"](message)
except Exception as e:
_type, _, tb = sys.exc_info()
logger.opt(exception=(_type, None, tb)).error(
f"Failed to process message: {str(e)}"
)

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
running_task = loop.create_task(try_wrapper())
self._tasks_to_close.append(running_task)
loop.run_until_complete(running_task)

def _handle_message(self, raw_msg: Message) -> None:
async def _handle_message(self, raw_msg: Message) -> None:
"""
A private method that handles incoming Kafka messages.
If the message should be processed (determined by `_should_be_processed`), it triggers the corresponding event handler.
Expand All @@ -140,31 +119,33 @@ def _handle_message(self, raw_msg: Message) -> None:
return

if "change.log" in topic and message is not None:
thread_name = f"ocean_event_handler_{raw_msg.offset()}"
logger.info(f"spawning thread {thread_name} to start resync")
threading.Thread(
name=thread_name,
target=self._resync_in_new_event_loop,
args=(message,),
).start()
logger.info(f"thread {thread_name} started")
try:
await self.events["on_resync"](message)
except Exception as e:
_type, _, tb = sys.exc_info()
logger.opt(exception=(_type, None, tb)).error(
f"Failed to process message: {str(e)}"
)

async def _start(self) -> None:
"""
The main method that starts the Kafka consumer.
It creates a KafkaConsumer instance with the given configuration and starts it in a separate thread.
"""
consumer = KafkaConsumer(
self.consumer = KafkaConsumer(
msg_process=self._handle_message,
config=await self._get_kafka_config(),
org_id=self.org_id,
)
logger.info("Starting Kafka consumer")
threading.Thread(
name="ocean_kafka_consumer",
target=consumer.start,
args=(self._consumer_kill_event,),
).start()

# We are running the consumer with asyncio.create_task to ensure that it runs in the background and not blocking
# the integration's main event loop from finishing the startup process.
self._running_task = asyncio.create_task(self.consumer.start())
ensure_future(self._running_task)

def _stop(self) -> None:
self._consumer_kill_event.set()
if self.consumer:
self.consumer.exit_gracefully()
if self._running_task:
self._running_task.cancel()
4 changes: 3 additions & 1 deletion port_ocean/log/sensetive.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class SensitiveLogFilter:
compiled_patterns = [re.compile(pattern) for pattern in secret_patterns.values()]

def hide_sensitive_strings(self, *tokens: str) -> None:
self.compiled_patterns.extend([re.compile(token) for token in tokens])
self.compiled_patterns.extend(
[re.compile(token) for token in tokens if token.strip()]
)

def create_filter(self, full_hide: bool = False) -> Callable[["Record"], bool]:
def _filter(record: "Record") -> bool:
Expand Down
49 changes: 49 additions & 0 deletions port_ocean/utils/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import functools
from typing import Callable, AsyncIterator, Any

from port_ocean.context.event import event

AsyncIteratorCallable = Callable[..., AsyncIterator[list[Any]]]


def cache_iterator_result(
cache_key: str,
) -> Callable[[AsyncIteratorCallable], AsyncIteratorCallable]:
"""
This decorator caches the results of an async iterator function. It checks if the result is already in the cache
and if not, it fetches the all the data and caches it at ocean.attributes cache the end of the iteration.
The cache will be stored in the scope of the running event and will be removed when the event is finished.
For example, you can use this to cache data coming back from the third-party API to avoid making the same request
multiple times for each kind.
Usage:
```python
@cache_iterator_result("my_cache_key")
async def my_async_iterator_function():
# Your code here
```
"""

def decorator(func: AsyncIteratorCallable) -> AsyncIteratorCallable:
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
# Check if the result is already in the cache
if cache := event.attributes.get(cache_key):
yield cache
return

# If not in cache, fetch the data
cached_results = list()
async for result in func(*args, **kwargs):
cached_results.extend(result)
yield result

# Cache the results
event.attributes[cache_key] = cached_results
return

return wrapper

return decorator
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.5.0"
version = "0.5.1"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down

0 comments on commit 70fd236

Please sign in to comment.