Skip to content

Commit

Permalink
Defer import of aio_pika
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Dec 11, 2024
1 parent 4611154 commit 2eb7194
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
10 changes: 8 additions & 2 deletions src/plumpy/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
from typing import Optional

from aio_pika.exceptions import ChannelInvalidStateError, ConnectionClosed

__all__ = ['ClosedError', 'InvalidStateError', 'KilledError', 'PersistenceError', 'UnsuccessfulResult']


Expand All @@ -9,8 +11,7 @@ class KilledError(Exception):


class InvalidStateError(Exception):
"""
Raised when an operation is attempted that requires the process to be in a state
"""Raised when an operation is attempted that requires the process to be in a state
that is different from the current state
"""

Expand All @@ -33,3 +34,8 @@ class PersistenceError(Exception):

class ClosedError(Exception):
"""Raised when an mutable operation is attempted on a closed process"""


# Alias aio_pika
CommunicatorConnectionClosed = ConnectionClosed
CommunicatorChannelInvalidStateError = ChannelInvalidStateError
5 changes: 3 additions & 2 deletions src/plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import kiwipy
import yaml
from aio_pika.exceptions import ChannelInvalidStateError, ConnectionClosed

from . import events, exceptions, futures, persistence, ports, process_comms, process_states, utils
from .base import state_machine
Expand Down Expand Up @@ -697,6 +696,8 @@ def on_entering(self, state: process_states.State) -> None:
call_with_super_check(self.on_except, state.get_exc_info()) # type: ignore

def on_entered(self, from_state: Optional[process_states.State]) -> None:
from plumpy.exceptions import CommunicatorChannelInvalidStateError, CommunicatorConnectionClosed

# Map these onto direct functions that the subclass can implement
state_label = self._state.LABEL
if state_label == process_states.ProcessState.RUNNING:
Expand All @@ -716,7 +717,7 @@ def on_entered(self, from_state: Optional[process_states.State]) -> None:
self.logger.info('Process<%s>: Broadcasting state change: %s', self.pid, subject)
try:
self._communicator.broadcast_send(body=None, sender=self.pid, subject=subject)
except (ConnectionClosed, ChannelInvalidStateError):
except (CommunicatorConnectionClosed, CommunicatorChannelInvalidStateError):
message = 'Process<%s>: no connection available to broadcast state change from %s to %s'
self.logger.warning(message, self.pid, from_label, self.state.value)
except kiwipy.TimeoutError:
Expand Down

0 comments on commit 2eb7194

Please sign in to comment.