Skip to content

Commit

Permalink
chore: Remove support for non-RTC model; Preparing processing_loop fo…
Browse files Browse the repository at this point in the history
…r new version
  • Loading branch information
fgmacedo committed Dec 6, 2024
1 parent e25a88d commit d6c33e4
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 221 deletions.
68 changes: 5 additions & 63 deletions docs/processing_model.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,8 @@ In the literature, It's expected that all state-machine events should execute on
The main point is: What should happen if the state machine triggers nested events while processing a parent event?

```{hint}
The importance of this decision depends on your state machine definition. Also the difference between RTC
and non-RTC processing models is more pronounced in a multi-threaded system than in a single-threaded system.
In other words, even if you run in {ref}`Non-RTC model`, only one external {ref}`event` will be
handled at a time and all internal events will run before the next external event is called,
so you only notice the difference if your state machine definition has nested event triggers while
processing these external events.
```

There are two distinct models for processing events in the library. The default is to run in
{ref}`RTC model` to be compliant with the specs, where the {ref}`event` is put on a
queue before processing. You can also configure your state machine to run in
{ref}`Non-RTC model`, where the {ref}`event` will be run immediately.
This library atheres to the {ref}`RTC model` to be compliant with the specs, where the {ref}`event` is put on a
queue before processing.

Consider this state machine:

Expand Down Expand Up @@ -60,13 +49,13 @@ Consider this state machine:

In a run-to-completion (RTC) processing model (**default**), the state machine executes each event to completion before processing the next event. This means that the state machine completes all the actions associated with an event before moving on to the next event. This guarantees that the system is always in a consistent state.

If the machine is in `rtc` mode, the event is put on a queue.
Internally, the events are put on a queue before processing.

```{note}
While processing the queue items, if others events are generated, they will be processed sequentially.
While processing the queue items, if others events are generated, they will be processed sequentially in FIFO order.
```

Running the above state machine will give these results on the RTC model:
Running the above state machine will give these results:

```py
>>> sm = ServerConnection()
Expand All @@ -89,50 +78,3 @@ after 'connection_succeed' from 'connecting' to 'connected'
Note that the events `connect` and `connection_succeed` are executed sequentially, and the `connect.after` runs on the expected order.
```

## Non-RTC model

```{deprecated} 2.3.2
`StateMachine.rtc` option is deprecated. We'll keep only the **run-to-completion** (RTC) model.
```

In contrast, in a non-RTC (synchronous) processing model, the state machine starts executing nested events
while processing a parent event. This means that when an event is triggered, the state machine
chains the processing when another event was triggered as a result of the first event.

```{warning}
This can lead to complex and unpredictable behavior in the system if your state-machine definition triggers **nested
events**.
```

If your state machine does not trigger nested events while processing a parent event,
and you plan to use the API in an _imperative programming style_, you can consider using the synchronous mode (non-RTC).

In this model, you can think of events as analogous to simple method calls.

```{note}
While processing the {ref}`event`, if others events are generated, they will also be processed immediately, so a **nested** behavior happens.
```

Running the above state machine will give these results on the non-RTC (synchronous) model:

```py
>>> sm = ServerConnection(rtc=False)
enter 'disconnected' from '' given '__initial__'

>>> sm.send("connect")
exit 'disconnected' to 'connecting' given 'connect'
on 'connect' from 'disconnected' to 'connecting'
enter 'connecting' from 'disconnected' given 'connect'
exit 'connecting' to 'connected' given 'connection_succeed'
on 'connection_succeed' from 'connecting' to 'connected'
enter 'connected' from 'connecting' given 'connection_succeed'
after 'connection_succeed' from 'connecting' to 'connected'
after 'connect' from 'disconnected' to 'connecting'
['on_transition', 'on_connect']

```

```{note}
Note that the events `connect` and `connection_succeed` are nested, and the `connect.after`
unexpectedly only runs after `connection_succeed.after`.
```
18 changes: 1 addition & 17 deletions statemachine/engines/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,15 @@

from ..event_data import EventData
from ..event_data import TriggerData
from ..exceptions import InvalidDefinition
from ..exceptions import TransitionNotAllowed
from ..i18n import _
from .base import BaseEngine

if TYPE_CHECKING:
from ..statemachine import StateMachine
from ..transition import Transition


class AsyncEngine(BaseEngine):
def __init__(self, sm: "StateMachine", rtc: bool = True):
if not rtc:
raise InvalidDefinition(_("Only RTC is supported on async engine"))
super().__init__(sm=sm, rtc=rtc)

async def activate_initial_state(self):
"""
Activate the initial state.
Expand All @@ -35,16 +28,7 @@ async def activate_initial_state(self):
async def processing_loop(self):
"""Process event triggers.
The simplest implementation is the non-RTC (synchronous),
where the trigger will be run immediately and the result collected as the return.
.. note::
While processing the trigger, if others events are generated, they
will also be processed immediately, so a "nested" behavior happens.
If the machine is on ``rtc`` model (queued), the event is put on a queue, and only the
first event will have the result collected.
The event is put on a queue, and only the first event will have the result collected.
.. note::
While processing the queue items, if others events are generated, they
Expand Down
59 changes: 42 additions & 17 deletions statemachine/engines/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from itertools import chain
from queue import PriorityQueue
from queue import Queue
from threading import Lock
from typing import TYPE_CHECKING
from weakref import proxy

from statemachine.orderedset import OrderedSet

from ..event import BoundEvent
from ..event_data import EventData
from ..event_data import TriggerData
from ..exceptions import TransitionNotAllowed
from ..state import State
Expand All @@ -14,43 +18,64 @@
from ..statemachine import StateMachine


class EventQueue:
def __init__(self):
self.queue: Queue = PriorityQueue()

def empty(self):
return self.queue.qsize() == 0

def put(self, trigger_data: TriggerData):
"""Put the trigger on the queue without blocking the caller."""
self.queue.put(trigger_data)

def pop(self):
"""Pop a trigger from the queue without blocking the caller."""
return self.queue.get(block=False)

def clear(self):
with self.queue.mutex:
self.queue.queue.clear()

def remove(self, send_id: str):
# We use the internal `queue` to make thins faster as the mutex
# is protecting the block below
with self.queue.mutex:
self.queue.queue = [
trigger_data
for trigger_data in self.queue.queue
if trigger_data.send_id != send_id
]


class BaseEngine:
def __init__(self, sm: "StateMachine", rtc: bool = True):
def __init__(self, sm: "StateMachine"):
self.sm: StateMachine = proxy(sm)
self._external_queue: Queue = PriorityQueue()
self.external_queue = EventQueue()
self.internal_queue = EventQueue()
self._sentinel = object()
self._rtc = rtc
self._running = True
self._processing = Lock()

def empty(self):
return self._external_queue.qsize() == 0
return self.external_queue.empty()

def put(self, trigger_data: TriggerData):
"""Put the trigger on the queue without blocking the caller."""
if not self._running and not self.sm.allow_event_without_transition:
raise TransitionNotAllowed(trigger_data.event, self.sm.current_state)

self._external_queue.put(trigger_data)
self.external_queue.put(trigger_data)

def pop(self):
return self._external_queue.get(block=False)
return self.external_queue.pop()

def clear(self):
with self._external_queue.mutex:
self._external_queue.queue.clear()
self.external_queue.clear()

def cancel_event(self, send_id: str):
"""Cancel the event with the given send_id."""

# We use the internal `queue` to make thins faster as the mutex
# is protecting the block below
with self._external_queue.mutex:
self._external_queue.queue = [
trigger_data
for trigger_data in self._external_queue.queue
if trigger_data.send_id != send_id
]
self.external_queue.remove(send_id)

def start(self):
if self.sm.current_state_value is not None:
Expand Down
20 changes: 4 additions & 16 deletions statemachine/engines/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from time import time
from typing import TYPE_CHECKING

from statemachine.orderedset import OrderedSet

from ..event_data import EventData
from ..event_data import TriggerData
from ..exceptions import TransitionNotAllowed
Expand Down Expand Up @@ -31,27 +33,13 @@ def activate_initial_state(self):
def processing_loop(self):
"""Process event triggers.
The simplest implementation is the non-RTC (synchronous),
where the trigger will be run immediately and the result collected as the return.
.. note::
While processing the trigger, if others events are generated, they
will also be processed immediately, so a "nested" behavior happens.
If the machine is on ``rtc`` model (queued), the event is put on a queue, and only the
first event will have the result collected.
The event is put on a queue, and only the first event will have the result collected.
.. note::
While processing the queue items, if others events are generated, they
will be processed sequentially (and not nested).
"""
if not self._rtc:
# The machine is in "synchronous" mode
trigger_data = self.pop()
return self._trigger(trigger_data)

# We make sure that only the first event enters the processing critical section,
# next events will only be put on the queue and processed by the same loop.
if not self._processing.acquire(blocking=False):
Expand Down Expand Up @@ -127,7 +115,7 @@ def _activate(self, trigger_data: TriggerData, transition: "Transition"): # noq

result += self.sm._callbacks.call(transition.on.key, *args, **kwargs)

self.sm.current_state = target
self.sm.configuration = OrderedSet([target])
event_data.state = target
kwargs["state"] = target

Expand Down
2 changes: 0 additions & 2 deletions statemachine/io/scxml/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,6 @@ def __init__(
model: Any = None,
state_field: str = "state",
start_value: Any = None,
rtc: bool = True,
allow_event_without_transition: bool = True,
listeners: "List[object] | None" = None,
):
Expand All @@ -431,7 +430,6 @@ def __init__(
model,
state_field=state_field,
start_value=start_value,
rtc=rtc,
allow_event_without_transition=allow_event_without_transition,
listeners=listeners,
)
Expand Down
105 changes: 105 additions & 0 deletions statemachine/orderedset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import itertools
from typing import Iterable
from typing import Iterator
from typing import MutableSet
from typing import TypeVar

T = TypeVar("T")


class OrderedSet(MutableSet[T]):
"""A set that preserves insertion order by internally using a dict.
>>> OrderedSet([1, 2, "foo"])
OrderedSet([1, 2, 'foo'])
>>> OrderedSet([1, 2, 3, 3, 2, 1, 'a', 'b', 'a', 'c'])
OrderedSet([1, 2, 3, 'a', 'b', 'c'])
>>> s = OrderedSet([1, 2, 3])
>>> s.add(4)
>>> s
OrderedSet([1, 2, 3, 4])
>>> s = OrderedSet([1, 2, 3])
>>> "foo" in s
False
>>> 1 in s
True
>>> list(s)
[1, 2, 3]
>>> s == OrderedSet([1, 2, 3])
True
>>> s > OrderedSet([1, 2]) # set is a superset of other
True
>>> s & {2}
OrderedSet([2])
>>> s | {4}
OrderedSet([1, 2, 3, 4])
>>> s - {2}
OrderedSet([1, 3])
>>> s - {1}
OrderedSet([2, 3])
>>> {1} - s
OrderedSet([])
>>> s ^ {2}
OrderedSet([1, 3])
>>> s[1]
2
>>> s[2]
3
>>> eval(repr(OrderedSet(['a', 'b', 'c'])))
OrderedSet(['a', 'b', 'c'])
"""

__slots__ = ("_d",)

def __init__(self, iterable: Iterable[T] | None = None):
self._d = dict.fromkeys(iterable) if iterable else {}

def add(self, x: T) -> None:
self._d[x] = None

def clear(self) -> None:
self._d.clear()

def discard(self, x: T) -> None:
self._d.pop(x, None)

def __getitem__(self, index) -> T:
try:
return next(itertools.islice(self._d, index, index + 1))
except StopIteration as e:
raise IndexError(f"index {index} out of range") from e

def __contains__(self, x: object) -> bool:
return self._d.__contains__(x)

def __len__(self) -> int:
return self._d.__len__()

def __iter__(self) -> Iterator[T]:
return self._d.__iter__()

def __str__(self):
return f"{{{', '.join(str(i) for i in self)}}}"

def __repr__(self):
return f"OrderedSet({list(self._d.keys())})"
Loading

0 comments on commit d6c33e4

Please sign in to comment.