Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared communicator #652

Merged
merged 4 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions python/gui/maas_experiment/settings.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
"""
Django settings for maas_experiment project.

Generated by 'django-admin startproject' using Django 2.2.5.

For more information on this file, see
https://docs.djangoproject.com/en/2.2/topics/settings/

For the full list of settings and their values, see
https://docs.djangoproject.com/en/2.2/ref/settings/
Django settings for maas_experiment project
"""

from .application_values import *
Expand All @@ -20,7 +12,7 @@
# See https://docs.djangoproject.com/en/2.2/howto/deployment/checklist/

# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = os.environ.get("SECRET_KEY",'cm_v*vc*8s048%f46*@t7)hb9rtaa@%)#b!s(+$4+iw^tjt=s6')
SECRET_KEY = os.environ.get("SECRET_KEY", 'cm_v*vc*8s048%f46*@t7)hb9rtaa@%)#b!s(+$4+iw^tjt=s6')

# Must be set in production!
ALLOWED_HOSTS = ['*']
Expand Down
166 changes: 166 additions & 0 deletions python/lib/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,169 @@
Python package for core DMOD types, both concrete and abstract, that are depended upon by other DMOD Python packages and themselves have no dependencies outside of Python and its standard library.

Classes belong here if placing them in a more specialized package would cause undesired consequences, such as circular dependencies or transitive dependency on otherwise unnecessary packages.

## `common`

<b style="color: red">TODO:</b> Write information about the `dmod.core.common` package
robertbartel marked this conversation as resolved.
Show resolved Hide resolved

### `collection`

<b style="color: red">TODO:</b> Write information about the `dmod.core.common.collection` module

### `failure`

<b style="color: red">TODO:</b> Write information about the `dmod.core.common.failure` module

### `helper_functions`

<b style="color: red">TODO:</b> Write information about the `dmod.core.common.helper_functions` module

### `protocols`

<b style="color: red">TODO:</b> Write information about the `dmod.core.common.protocols` module

### `reader`

<b style="color: red">TODO:</b> Write information about the `dmod.core.common.reader` module

### `tasks`

<b style="color: red">TODO:</b> Write information about the `dmod.core.common.tasks` module

### `types`

<b style="color: red">TODO:</b> Write information about the `dmod.core.common.types` module

## `decorators`

<b style="color: red">TODO:</b> Write information about the `dmod.core.decorators` package

### `decorator_constants`

<b style="color: red">TODO:</b> Write information about the `dmod.core.decorators.decorator_constants` module

### `decorator_functions`

<b style="color: red">TODO:</b> Write information about the `dmod.core.decorators.decorator_functions` module

### `message_handlers`

<b style="color: red">TODO:</b> Write information about the `dmod.core.decorators.message_handlers` module

## `context`

The `dmod.core.context` module provides the functionality needed to create automatic proxies for remote objects,
provide a DMOD specific multiprocessed object manager, and a custom implementation of the object manager's server to
overcome issues with the base functionality as of python 3.8. If
`dmod.core.context.DMODObjectManager.register_class(NewClass)` is called after its definition, a proxy for it will be
defined dynamically and a proxy for that type (`NewClass` in this example) may be constructed through code such as:

```python
from dmod.core import context

with context.get_object_manager() as manager:
class_instance = manager.create_object('NewClass', 'one', 2, other_parameter=9)
```

where <code style="color: green">'NewClass'</code> is the name of the desired class and
<code style="color: green">'one'</code>, <code style="color: blue">2</code>, and <code>other_parameter</code>
are the parameters for `NewClass`'s constructor.

Scopes for the manager may be created to track objects that are passed from one process to another. If a
proxy is instantiated within a called function, passed to a new process, and the function returns, the
`decref` function on the server will be called before the `incref` function is called and lead to the
destruction of the object before it may be used. Creating the object through a scope may keep the object
alive and assigning the process to it will allow the object manager to destroy its objects when the process
completes.

For example:

```python
from dmod.core import context
from concurrent import futures

def do_something(new_class: NewClass):
...

def start_process(manager: context.DMODObjectManager, pool: futures.ProcessPoolExecutor):
scope = manager.establish_scope("example")
example_object = scope.create_object('NewClass', 'one', 2, other_parameter=9)
task = pool.submit(do_something, example_object)

# The scope and everything with it will be deleted when `task.done()`
manager.monitor_operation(scope, task)

# Tell the object manager to monitor scopes when creating it
with futures.ProcessPoolExecutor() as pool, context.get_object_manager(monitor_scope=True) as manager:
start_process(manager, pool)
```

### <span style="color: red">Common Errors</span>

#### Remote Error in `Server.incref`

Sometimes you might encounter an error that reads like:

```shell
Traceback (most recent call last):
File "/path/to/python/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/path/to/python/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/path/to/python/3.8/lib/python3.8/multiprocessing/pool.py", line 114, in worker
task = get()

File "/path/to/python/3.8/lib/python3.8/multiprocessing/queues.py", line 358, in get
return _ForkingPickler.loads(res)
File "/path/to/python/3.8/lib/python3.8/multiprocessing/managers.py", line 959, in RebuildProxy
return func(token, serializer, incref=incref, **kwds)
File "/path/to/python/3.8/lib/python3.8/multiprocessing/managers.py", line 809, in __init__
self._incref()
File "/path/to/python/3.8/lib/python3.8/multiprocessing/managers.py", line 864, in _incref
dispatch(conn, None, 'incref', (self._id,))
File "/path/to/python/3.8/lib/python3.8/multiprocessing/managers.py", line 91, in dispatch
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/path/to/python/3.8/lib/python3.8/multiprocessing/managers.py", line 210, in handle_request
result = func(c, *args, **kwds)
File "/path/to/python/3.8/lib/python3.8/multiprocessing/managers.py", line 456, in incref
raise ke
File "/path/to/python/3.8/lib/python3.8/multiprocessing/managers.py", line 443, in incref
self.id_to_refcount[ident] += 1
KeyError: '3067171c0'
```

This sort of error occurs when an instantiated object has fallen out of scope _before_ another process has had
a chance to use it. The Server (in this case the `dmod.core.context.DMODObjectServer`) that the manager (in this case
the `dmod.core.context.DMODObjectManager`) keeps track of objects via reference counters. When a proxy is created, the
real object is created on the instantiated server and its reference count increases. When the created proxy leaves
scope, that reference count decreases. When that number reaches 0, the real object that the proxy refers to is
removed. If a proxy is created in the scope of one function and passed to another process, the reference count will
be decremented when that function exits unless the proxy is created within a scope that does not end when the
function does.

## `dataset`

<b style="color: red">TODO:</b> Write information about the `dmod.core.dataset` module

## `enum`

<b style="color: red">TODO:</b> Write information about the `dmod.core.enum` module

## `exception`

<b style="color: red">TODO:</b> Write information about the `dmod.core.exception` module

## `execution`

<b style="color: red">TODO:</b> Write information about the `dmod.core.execution` module

## `meta_data`

<b style="color: red">TODO:</b> Write information about the `dmod.core.meta_data` module

## `serializable`

<b style="color: red">TODO:</b> Write information about the `dmod.core.serializable` module
2 changes: 1 addition & 1 deletion python/lib/core/dmod/core/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.17.0'
__version__ = '0.18.0'
4 changes: 4 additions & 0 deletions python/lib/core/dmod/core/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

from .failure import Failure

from .helper_functions import get_current_function_name
from .helper_functions import is_sequence_type
from .helper_functions import is_iterable_type
Expand All @@ -21,9 +22,12 @@
from .helper_functions import humanize_text
from .helper_functions import generate_identifier
from .helper_functions import generate_key
from .helper_functions import format_stack_trace

from .tasks import wait_on_task
from .tasks import cancel_task
from .tasks import cancel_tasks

from .collection import Bag
from .protocols import DBAPIConnection
from .protocols import DBAPICursor
Expand Down
168 changes: 168 additions & 0 deletions python/lib/core/dmod/core/common/collection.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels a little weird to put this in the common subpackage. This feels like it makes a fair number of assumptions about how it will be used and should probably live closed to where it is being used, at least for now. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have had same thought about a lot of the new things in this set of changes and other similar ones involving dmod.core. One the one hand, we want to be able to reuse things. On the other, we don't want to pack too much into a "core" package that isn't really fundamental to what DMOD does. And on the third hand, too many library packages doesn't make sense either.

If all these new things are general enough to belong in the dmod.core package, then, given how large this PR is, I'd like to see the dmod.core changes (or at least the bulk of them) broken out into a dedicated PR. That can be handled first in isolation and will make things much easier to review and discuss effectively.

There may also be other similar points of decoupling - e.g., perhaps GUI-related changes - that can also be separated into individual PRs to further help make all of this easier to digest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is to have general tools. Sticking these structures within a purpose built library makes it more difficult to find useful shared tools. If I buy a spark plug wrench and keep it in my truck, I probably won't remember I have one when it comes time to work on my lawn mower.

A Bag data structure or a map that triggers events don't bear any tight coupling to specific actions, either. An EventfulMap doesn't provide any functionality that performs operations like evaluations, but functionality that helps perform evaluations specifically can leverage it for easier object management operations.

A dmod.common library would work well to keep functionality separate. As it stands today, dmod.core is the only python library that presents itself as general or cross-functionality purposed.

There are multiple issues when it comes to breaking out changes to common code and putting that in a separate PR:

  1. The changes that came to the common code only occurred because of the overarching goal of the tasks encompassed by the ticket for this PR
  2. Without the major functionality accomplished here, there is no ability to demonstrate the need for these changes or how they are intended to be used - they make no sense without context
  3. Putting this into a separate PR would require the creation of one or more new branches after all work has been completed
  4. Putting this into a separate PR would create a pipeline for sequential reviews that would hinder the completion of tasks
  5. Cherry picking code changes into a totally new branch provides a lot of potential for critical errors that would prevent a merge then produce merge conflicts when the code that enabled that to work needs to be added

Putting isolated tasks into their own PRs makes sense - if I'm specifically going in to add in a new feature or add in a bug fix or add documentation, a PR for each makes sense. Breaking off utilities into their own PRs does not provide that same benefit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree with having general tools centralized for easy reuse. And part of what I was trying to say was that I don't want to add another library package. But I'm having trouble wrapping my head around the idea of tools that are general but make no sense without context.

The relationship you're describing sure sounds like things that are tightly coupled, in which case they'd probably belong in the same package. I'm not saying they do, but they should be separable otherwise (or at least the one not dependent on the other should be).

I'll think a bit more about this over the weekend; I don't want to make things more complicated just from trying to reducing complexity elsewhere. But I am a little worried about the notion of not being able to separate these things (as opposed to it not being worth the trouble).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can land somewhere in the middle? The majority of the features introduced in the PR could make up a entire library or framework. I think we can and should use python's subpackaging semantics to make that clearer than what we have now. Im thinking we create dmod.core.multiprocessing or dmod.core.rpc and move the majority of this work into that subpackage. As for naming, im just throwing ideas out there, i'm certain there are better names that what I just suggested.

The main point of this is to group code in submodules that capture the expressed concepts and keep related or dependent concepts close to one another (A needs B so A is close to B). I hear your point about classes like a Bag, but I think the classes added in this PR are more specialized and arent really abstract data types like a bag is. Thoughts?

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import enum
import inspect
import typing
import uuid
from datetime import datetime
from datetime import timedelta

import pydantic
from pydantic import PrivateAttr
Expand Down Expand Up @@ -235,6 +238,171 @@ def __contains__(self, obj: object) -> bool:
return obj in self.__data


class _OccurrenceTracker(typing.Generic[_T]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now with a little more understanding of how all the parts fit together, why can't this be a Communicator. From what i'm seeing, an OccurrenceTracker subtype will always be on the server side (e.g. it will track calls made through a prototype).

"""
Keeps track of occurrences of a type of value that have been encountered within a duration
"""
def __init__(self, key: _T, duration: timedelta, threshold: int, on_filled: typing.Callable[[_T], typing.Any]):
self.__key = key
self.__duration = duration
self.__threshold = threshold
self.__occurences: typing.List[datetime] = []
self.__on_filled = on_filled

def value_encountered(self):
"""
Inform the tracker that the value has been encountered again
"""
self.update_occurrences()
self.__occurences.append(datetime.now())
if len(self.__occurences) >= self.__threshold:
self.__on_filled(self.__key)

def update_occurrences(self) -> int:
"""
Update the list of occurrences to include only those within the current duration

Returns:
The number of occurrences still being tracked
"""
cutoff: datetime = datetime.now() - self.__duration
self.__occurences = [
occurrence
for occurrence in self.__occurences
if occurrence > cutoff
]
return len(self.__occurences)

@property
def key(self):
"""
The identifier that is being tracked
"""
return self.__key

def __len__(self):
return len(self.__occurences)

def __str__(self):
if len(self.__occurences) == 0:
occurrences_details = f"No Occurences within the last {self.__duration.total_seconds()} seconds."
else:
occurrences_details = (f"{len(self.__occurences)} occurrences since "
f"{self.__occurences[0].strftime('%Y-%m-%d %H:%M:%S')}")
return f"{self.key}: {occurrences_details}"


class TimedOccurrenceWatcher:
"""
Keeps track of the amount of occurrences of items within a range of time
"""
MINIMUM_TRACKING_SECONDS: typing.Final[float] = 0.1
"""
The lowest number of seconds to watch for multiple occurrences. Only acting when multiple occurrences are tracked
in under 100ms would create a scenario where the watcher will most likely never trigger an action, rendering
this the wrong tool for the job.
"""

@staticmethod
def default_key_function(obj: object) -> type:
"""
The function used to find a common identifier for an object if one is not provided
"""
return type(obj)

def __init__(
self,
duration: timedelta,
threshold: int,
on_filled: typing.Callable[[_T], typing.Any],
key_function: typing.Callable[[_VT], _KT] = None
):
if not isinstance(duration, timedelta):
raise ValueError(f"Cannot create a {self.__class__.__name__} - {duration} is not a timedelta object")

if duration.total_seconds() < self.MINIMUM_TRACKING_SECONDS:
raise ValueError(
f"Cannot create a {self.__class__.__name__} - the duration is too short ({duration.total_seconds()}s)"
)

self.__duration = duration

if not isinstance(key_function, typing.Callable):
key_function = self.default_key_function

self.__key_function = key_function
self.__entries: typing.Dict[uuid.UUID, _OccurrenceTracker] = {}
self.__threshold = threshold
self.__on_filled = on_filled

def value_encountered(self, value: _T):
"""
Add an occurrence of an object to track

Args:
value: The item to track
"""
self.__update_trackers()
self._get_tracker(value).value_encountered()

def _get_tracker(self, value: _T) -> _OccurrenceTracker[_T]:
"""
Get an occurrence tracker for the given value

Args:
value: The value to track

Returns:
A tracker for the value
"""
key = self.__key_function(value)

for tracker in self.__entries.values():
if tracker.key == key:
return tracker

new_tracker = _OccurrenceTracker(
key=key,
duration=self.__duration,
threshold=self.__threshold,
on_filled=self.__on_filled
)
self.__entries[uuid.uuid1()] = new_tracker
return new_tracker

def __update_trackers(self):
"""
Update the amount of items in each tracker

If a tracker becomes empty it will be removed
"""
for tracker_id, tracker in self.__entries.items():
amount_left = tracker.update_occurrences()
if amount_left == 0:
del self.__entries[tracker_id]

@property
def size(self) -> int:
"""
The number of items encountered within the duration
"""
self.__update_trackers()
return sum(len(tracker) for tracker in self.__entries.values())

@property
def duration(self) -> timedelta:
"""
The amount of time to track items for
"""
return self.__duration

def __str__(self):
return f"{self.__class__.__name__}: {self.size} items within the last {self.duration.total_seconds()} Seconds"

def __repr__(self):
return self.__str__()


class EventfulMap(abc.ABC, typing.MutableMapping[_KT, _VT], typing.Generic[_KT, _VT]):
@abc.abstractmethod
def get_handlers(self) -> typing.Dict[CollectionEvent, typing.MutableSequence[typing.Callable]]:
Expand Down
Loading
Loading