Skip to content

Commit

Permalink
Add keyword args to get passed to path builder (#100)
Browse files Browse the repository at this point in the history
* added kwargs as optional args to make available for path_builder
  • Loading branch information
Geary-Layne authored Feb 27, 2025
1 parent 916744c commit 97a1d6b
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 19 deletions.
32 changes: 19 additions & 13 deletions python/idsse_common/idsse/common/protocol_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .path_builder import PathBuilder
from .utils import TimeDelta, datetime_gen


class ProtocolUtils(ABC):
"""Base Class - interface for DAS data discovery"""

Expand Down Expand Up @@ -53,27 +54,27 @@ def cp(self, path: str, dest: str) -> bool:
bool: Returns True if copy is successful
"""


def get_path(self, issue: datetime, valid: datetime) -> str:
def get_path(self, issue: datetime, valid: datetime, **kwargs) -> str:
"""Delegates to instant PathBuilder to get full path given issue and valid
Args:
issue (datetime): Issue date time
valid (datetime): Valid date time
kwargs: Additional arguments, e.g. region
Returns:
str: Absolute path to file or object
"""
lead = TimeDelta(valid-issue)
return self.path_builder.build_path(issue=issue, valid=valid, lead=lead)
return self.path_builder.build_path(issue=issue, valid=valid, lead=lead, **kwargs)


def check_for(self, issue: datetime, valid: datetime) -> tuple[datetime, str] | None:
def check_for(self, issue: datetime, valid: datetime, **kwargs) -> tuple[datetime, str] | None:
"""Checks if an object passed issue/valid exists
Args:
issue (datetime): The issue date/time used to format the path to the object's location
valid (datetime): The valid date/time used to format the path to the object's location
kwargs: Additional arguments, e.g. region
Returns:
[tuple[datetime, str] | None]: A tuple of the valid date/time (indicated by object's
Expand All @@ -84,7 +85,7 @@ def check_for(self, issue: datetime, valid: datetime) -> tuple[datetime, str] |
file_path = self.get_path(issue, valid)
dir_path = os.path.dirname(file_path)
filenames = self.ls(dir_path, prepend_path=False)
filename = self.path_builder.build_filename(issue=issue, valid=valid, lead=lead)
filename = self.path_builder.build_filename(issue=issue, valid=valid, lead=lead, **kwargs)

for fname in filenames:
# Support wildcard matches - used for '?' as a single wildcard character in
Expand All @@ -97,7 +98,8 @@ def get_issues(self,
num_issues: int = 1,
issue_start: datetime | None = None,
issue_end: datetime = datetime.now(UTC),
time_delta: timedelta = timedelta(hours=1)
time_delta: timedelta = timedelta(hours=1),
**kwargs
) -> Sequence[datetime]:
"""Determine the available issue date/times
Expand All @@ -106,6 +108,7 @@ def get_issues(self,
issue_start (datetime, optional): The oldest date/time to look for. Defaults to None.
issue_end (datetime): The newest date/time to look for. Defaults to now (UTC).
time_delta (timedelta): The time step size. Defaults to 1 hour.
kwargs: Additional arguments, e.g. region
Returns:
Sequence[datetime]: A sequence of issue date/times
Expand All @@ -126,7 +129,7 @@ def get_issues(self,
if issue_start and issue_dt < issue_start:
break
try:
dir_path = self.path_builder.build_dir(issue=issue_dt)
dir_path = self.path_builder.build_dir(issue=issue_dt, **kwargs)
issues_set.update(self._get_issues(dir_path, num_issues))
if num_issues and len(issues_set) >= num_issues:
break
Expand All @@ -139,7 +142,9 @@ def get_issues(self,
def get_valids(self,
issue: datetime,
valid_start: datetime | None = None,
valid_end: datetime | None = None) -> Sequence[tuple[datetime, str]]:
valid_end: datetime | None = None,
**kwargs
) -> Sequence[tuple[datetime, str]]:
"""Get all objects consistent with the passed issue date/time and filter by valid range
Args:
Expand All @@ -148,6 +153,7 @@ def get_valids(self,
valids >= valid_start. Defaults to None.
valid_end (datetime | None, optional): All returned objects will be for
valids <= valid_end. Defaults to None.
kwargs: Additional arguments, e.g. region
Returns:
Sequence[tuple[datetime, str]]: A sequence of tuples with valid date/time (indicated by
Expand All @@ -158,14 +164,14 @@ def get_valids(self,
valids_and_filenames = self.check_for(issue, valid_start)
return [valids_and_filenames] if valids_and_filenames is not None else []

dir_path = self.path_builder.build_dir(issue=issue)
valid_and_file =[]
dir_path = self.path_builder.build_dir(issue=issue, **kwargs)
valid_and_file = []
for file_path in self.ls(dir_path):
if file_path.endswith(self.path_builder.file_ext):
try:
if issue == self.path_builder.get_issue(file_path):
valid_and_file.append((self.path_builder.get_valid(file_path), file_path))
except ValueError: # Ignore invalid filepaths...
except ValueError: # Ignore invalid filepaths...
pass
valid_and_file = [(dt, path) for (dt, path) in valid_and_file if dt is not None]
# Remove any tuple that has "None" as the valid time
Expand Down Expand Up @@ -209,4 +215,4 @@ def _get_issues(self,
break
except ValueError: # Ignore invalid filepaths...
pass
return issues_set
return issues_set
2 changes: 1 addition & 1 deletion python/idsse_common/idsse/common/rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def blocking_publish(self,
route_key (str): Optional route key, overriding key provided during initialization
Returns:
bool: Returns True if no errors ocurred during publication. If this
bool: Returns True if no errors occurred during publication. If this
publisher is configured to confirm delivery will return False if
failed to confirm.
"""
Expand Down
9 changes: 4 additions & 5 deletions python/idsse_common/test/test_rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def mock_channel() -> Mock:
"""Mock pika.adapters.blocking_connection.BlockingChannel object"""
def mock_queue_declare(queue: str, **_kwargs) -> Method:
return Frame(Method(queue=queue)) # create a usable (mock) Frame using queue name passed

def mock_exch_declare(exchange: str, **_kwargs) -> Method:
return Frame(Method(exchange=exchange))

Expand Down Expand Up @@ -328,8 +329,8 @@ def test_send_requests_returns_none_on_error(rpc_thread: Rpc,
mock_connection: Mock,
monkeypatch: MonkeyPatch):
# pylint: disable=too-many-arguments
def mock_blocking_publish(channel, exch, message_params, queue = None, success_flag = None,
done_event = None):
def mock_blocking_publish(channel, exch, message_params, queue=None, success_flag=None,
done_event=None):
# cause exception for pending request Future
rpc_thread._pending_requests[EXAMPLE_UUID].set_exception(RuntimeError('Something broke'))

Expand Down Expand Up @@ -383,7 +384,6 @@ def mock_rmq_params_and_callback():
return RabbitMqParamsAndCallback(params=params, callback=callback)



@patch('idsse.common.rabbitmq_utils.BlockingConnection')
@patch('idsse.common.rabbitmq_utils.ThreadPoolExecutor')
def test_consumer_initialization(mock_executor, mock_blocking_connection, mock_conn_params,
Expand All @@ -406,8 +406,6 @@ def test_consumer_start(mock_executor, mock_blocking_connection, mock_conn_param
start_consuming.assert_called_once()




@patch('idsse.common.rabbitmq_utils.BlockingConnection')
@patch('idsse.common.rabbitmq_utils.ThreadPoolExecutor')
def test_on_message(mock_executor, mock_blocking_connection, mock_conn_params,
Expand All @@ -424,6 +422,7 @@ def test_on_message(mock_executor, mock_blocking_connection, mock_conn_params,
ANY,
b"Test Message")


@fixture
def mock_message():
return MagicMock(name='RabbitMqMessage', spec=dict)
Expand Down

0 comments on commit 97a1d6b

Please sign in to comment.