Skip to content

Commit

Permalink
Logprep on Mac Testing
Browse files Browse the repository at this point in the history
  • Loading branch information
Konstantin Neumann authored and ekneg54 committed Dec 12, 2024
1 parent 474d906 commit 7567837
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 8 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ examples/k8s/charts
target
wheelhouse
requirements.*
.DS_Store
3 changes: 2 additions & 1 deletion examples/exampledata/config/http_pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ process_count: 4
config_refresh_interval: 5
profile_pipelines: false
restart_count: 3
error_backlog_size: 150
logger:
level: INFO
loggers:
Expand All @@ -29,7 +30,7 @@ metrics:
input:
httpinput:
type: http_input
message_backlog_size: 1500000
message_backlog_size: 150
collect_meta: true
metafield_name: "@metadata"
uvicorn_config:
Expand Down
4 changes: 2 additions & 2 deletions examples/exampledata/config/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ process_count: 2
timeout: 0.1
restart_count: 2
config_refresh_interval: 5
error_backlog_size: 1500000
error_backlog_size: 150
logger:
level: INFO
level: DEBUG
format: "%(asctime)-15s %(hostname)-5s %(name)-10s %(levelname)-8s: %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
loggers:
Expand Down
2 changes: 1 addition & 1 deletion logprep/connector/http/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ def _get_asgi_app(endpoints_config: dict) -> falcon.asgi.App:

def _get_event(self, timeout: float) -> Tuple:
"""Returns the first message from the queue"""
self.metrics.message_backlog_size += self.messages.qsize()
#self.metrics.message_backlog_size += self.messages.qsize()
try:
message = self.messages.get(timeout=timeout)
raw_message = str(message).encode("utf8")
Expand Down
2 changes: 1 addition & 1 deletion logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def _drain_input_queues(self) -> None:
if not hasattr(self._input, "messages"):
return
if isinstance(self._input.messages, multiprocessing.queues.Queue):
while self._input.messages.qsize():
while not self._input.messages.empty():
self.process_pipeline()

def stop(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def throttle(self, batch_size=1):

def put(self, obj, block=True, timeout=None, batch_size=1):
"""Put an obj into the queue."""
self.throttle(batch_size)
#self.throttle(batch_size)
super().put(obj, block=block, timeout=timeout)


Expand Down
4 changes: 2 additions & 2 deletions logprep/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ def _iterate(self):
self.exit_code = EXITCODES.PIPELINE_ERROR.value
self._logger.error("Restart count exceeded. Exiting.")
sys.exit(self.exit_code)
if self._manager.error_queue is not None:
self.metrics.number_of_events_in_error_queue += self._manager.error_queue.qsize()
#if self._manager.error_queue is not None:
# self.metrics.number_of_events_in_error_queue += self._manager.error_queue.qsize()
self._manager.restart_failed_pipeline()

def reload_configuration(self):
Expand Down
6 changes: 6 additions & 0 deletions logprep/util/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
import multiprocessing as mp
from logging.handlers import QueueListener
from socket import gethostname
import platform
if platform.system() != "Linux":
from multiprocessing import set_start_method
set_start_method("fork")



logqueue = mp.Queue(-1)

Expand Down

0 comments on commit 7567837

Please sign in to comment.