-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add the `mdsthin.ext` package for experimental features and extensions. Add `GetManyMany` prototype class for accessing many things from many shots. Document this in `Experimental.md`
- Loading branch information
1 parent
0418c05
commit c6750a7
Showing
5 changed files
with
157 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
|
||
# Experimental Features and Extensions | ||
|
||
The `mdsplus.ext` package provides features that in development, or otherwise not ready for prime time. | ||
|
||
## GetManyMany | ||
|
||
The `GetManyMany` class provides an interface for accessing many things from many shots. It will create up to `num_workers` threads with one `mdsthin.Connection` object each, and series of `GetMany` objects to complete all the queries. You can then iterate over the results as they come in. | ||
|
||
```py | ||
from mdsthin.ext import GetManyMany | ||
|
||
gmm = GetManyMany(SERVER, num_workers=8) | ||
|
||
gmm.append('y', '_sig = \\IP') | ||
gmm.append('x', 'dim_of(_sig)') | ||
# ... | ||
|
||
gmm.add_shots(TREE, [ SHOTS...]) | ||
# ... | ||
|
||
# Process the results one at a time | ||
for result in gmm.execute(): | ||
print(result.tree, result.shot) | ||
print(result.get('y').data()) | ||
print(result.get('x').data()) | ||
|
||
# Or wait for them all to complete | ||
all_results = list(gmm.execute()) | ||
``` | ||
|
||
When using `ssh://` or `sshp://`, you may need to put a small delay in-between each connection to avoid getting flagged as "too many connections appearing too quickly". If you find your connections are failing in this way, you can use the `worker_delay` argument to provide a delay in seconds. | ||
|
||
```py | ||
gmm = GetManyMany(SERVER, worker_delay=0.1) | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
|
||
import time | ||
import queue | ||
import threading | ||
|
||
from ..connection import Connection | ||
from ..exceptions import getExceptionFromError | ||
|
||
class GetManyMany: | ||
|
||
class Result: | ||
|
||
def __init__(self, tree, shot, result): | ||
self._tree = tree | ||
self._shot = shot | ||
self._result = result | ||
|
||
@property | ||
def tree(self): | ||
return self._tree | ||
|
||
@property | ||
def shot(self): | ||
return self._shot | ||
|
||
def get(self, name): | ||
|
||
if name not in self._result: | ||
return None | ||
|
||
result = self._result[name] | ||
if 'value' in result: | ||
return result['value'] | ||
|
||
raise getExceptionFromError(result['error'].data()) | ||
|
||
class Worker(threading.Thread): | ||
|
||
def __init__(self, gmm): | ||
super().__init__() | ||
|
||
self._gmm = gmm | ||
|
||
def run(self): | ||
|
||
c = Connection(self._gmm._connection_url, **self._gmm._connection_kwargs) | ||
|
||
while True: | ||
try: | ||
tree, shot = self._gmm._shots.get_nowait() | ||
except queue.Empty: | ||
break | ||
|
||
gm = c.getMany() | ||
gm.append('_gmm_open', 'TreeOpen($,$)', tree, shot) | ||
|
||
for query in self._gmm._queries: | ||
gm.append(query['name'], query['exp'], *query['args']) | ||
|
||
result = gm.execute() | ||
|
||
self._gmm._results.put(GetManyMany.Result(tree, shot, result)) | ||
|
||
c.disconnect() | ||
|
||
def __init__(self, connection_url: str, num_workers: int = 8, worker_delay: float = 0.0, **connection_kwargs): | ||
|
||
self._connection_url = connection_url | ||
self._connection_kwargs = connection_kwargs | ||
self._num_workers = num_workers | ||
self._worker_delay = worker_delay | ||
|
||
self._total_shots = 0 | ||
self._shots = queue.Queue() | ||
self._results = queue.Queue() | ||
self._queries = [] | ||
self._workers = [] | ||
|
||
def add_shots(self, tree, shots): | ||
if not isinstance(shots, list): | ||
shots = [ shots ] | ||
|
||
for shot in shots: | ||
self._shots.put((tree, shot)) | ||
self._total_shots += 1 | ||
|
||
def append(self, name, exp, *args): | ||
self._queries.append({ | ||
'name': name, | ||
'exp': exp, | ||
'args': list(args), | ||
}) | ||
|
||
def execute(self): | ||
|
||
num_workers = min(self._num_workers, self._total_shots) | ||
for _ in range(num_workers): | ||
worker = self.Worker(self) | ||
worker.start() | ||
|
||
if self._worker_delay > 0: | ||
time.sleep(self._worker_delay) | ||
|
||
self._workers.append(worker) | ||
|
||
# If the previous threads have already finished, don't make new ones | ||
if self._shots.qsize() == 0: | ||
break | ||
|
||
for _ in range(self._total_shots): | ||
yield self._results.get() | ||
|
||
for worker in self._workers: | ||
worker.join() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
|
||
from .GetManyMany import GetManyMany |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters