Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threaded operation #34

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open

Threaded operation #34

wants to merge 19 commits into from

Conversation

dmopalmer
Copy link
Contributor

This is a renewal of an old pull request. (Previously not accepted because @lpsinger intended to implement async instead of threading.)

This supports threaded operation by providing a handler that queues messages.

@lpsinger
Copy link
Member

@dmopalmer, I'm sorry, but I am still convinced that asyncio is a better direction for this packaging than threading, because it's so I/O intensive.

@dmopalmer
Copy link
Contributor Author

Asyncio is probably a better direction in theory, but threading is available with this pull request in time for the next LIGO-VIRGO-ETC run. This request doesn't affect the core functionality in any major way, and won't prevent asyncio from being implemented later.

@lpsinger
Copy link
Member

lpsinger commented Feb 1, 2023

This PR really contains several mostly-separable changes that could be broken into multiple PRs:

  1. Addition of a termination mechanism (beyond mere signals) to listen().
  2. Addition of a handler that inserts events into a queue.
  3. An extra listener CLI tool that uses multithreading.
  4. Sample code in the README file for calling listen() in a thread.

Here are my reactions too each of these:

  1. I would accept this in principal, although there are a few problems with this implementation:
    • It assumes that the calling program is using threading. I expect the termination mechanism to be agnostic toward the calling program's style of concurrency, because that decision is a higher-level concern that should not impact this library.
    • It modifies the semantics of the socket timeouts. The impact should be neutral on the protocol timing.
    • The termination mechanism is only checked in the receiving loop. It is not checked during the connect loop.
    • I would prefer (but I do not require) an implementation with asyncio so that it could use task cancellation. This is the same comment that I had in Allow threaded operation #6.
  2. I would readily accept this.
  3. I require further justification for why the new CLI tool complements the existing pygcn-listen script.
  4. I would readily accept this.

@dmopalmer
Copy link
Contributor Author

  1. You are right, using a {thread,process}.terminate() is cleaner than the stopevent-sentinel approach and means that the reader doesn't have to timeout to check the sentinel and then jump back to reading.
  2. I will keep this
  3. The new CLI is more of an example implementation than a useful utility. Instead I can fold it in to pygcn-listen with a --threaded command line argument to avoid burdening the command-line namespace, or I can remove its entry point from the setup.py file, and leave it in-place in the .py file with a comment of how to put it back in setup.py.
  4. OK

So if I

  1. remove the sentinel code and
  2. no change
  3. update the threaded_listen_main() to remove the sentinel; use .terminate(); and remove its entry point
  4. update the README to use terminate

would that be a pull request you are happy with?

For eventual asyncio upgrade, I haven't found in the documentation how you get individual voevents as they come in. All the tutorials and documentation I have seen assume you want to create a bunch of tasks and await until they complete before using the complete results.

@lpsinger
Copy link
Member

lpsinger commented Feb 2, 2023

  1. You are right, using a {thread,process}.terminate() is cleaner than the stopevent-sentinel approach and means that the reader doesn't have to timeout to check the sentinel and then jump back to reading.
  2. I will keep this
  3. The new CLI is more of an example implementation than a useful utility. Instead I can fold it in to pygcn-listen with a --threaded command line argument to avoid burdening the command-line namespace, or I can remove its entry point from the setup.py file, and leave it in-place in the .py file with a comment of how to put it back in setup.py.
  4. OK

So if I

  1. remove the sentinel code and
  2. no change

If 2 will help your application, I would accept it as a self-contained PR.

  1. update the threaded_listen_main() to remove the sentinel; use .terminate(); and remove its entry point

What is the value of introducing a thread here? The pygcn-listen CLI entry point will terminate gracefully anyway if the process receives SIGINT (C-c).

  1. update the README to use terminate

What is terminate? Python's threading.Thread class doesn't have a terminate method.

For eventual asyncio upgrade, I haven't found in the documentation how you get individual voevents as they come in. All the tutorials and documentation I have seen assume you want to create a bunch of tasks and await until they complete before using the complete results.

I can think of at least two possible styles of async APIs. The first is that the listen method could return an asynchronous iterator suitable for use in an async for loop in the calling program. The second is that the calling program simply passes a handler callback, as it currently does. The callback need not be asynchronous.

I have a working asyncio VOEvent Transport Protocol client here: https://github.com/nasa-gcn/gcn-classic-to-kafka/blob/main/gcn_classic_to_kafka/socket.py

@dmopalmer
Copy link
Contributor Author

dmopalmer commented Feb 2, 2023

Yeah, it appears that the only way to gracefully terminate a thread is cooperatively, such as by the sentinel method, which requires the socket reads to timeout so the sentinel can be checked.

Probably the way forward is to not have any way to stop the thread (apart from program termination). YAGNI. The documentation should suggest using processes instead if killing the listener will be necessary.

For 3. the value of introducing a thread is to show the user how to use threads. Probably the extra section in the README is sufficient for that and we can remove threaded_listen_main() altogether. I also wrote that function as a test case for debugging.

@lpsinger
Copy link
Member

lpsinger commented Feb 2, 2023

For 3. the value of introducing a thread is to show the user how to use threads. Probably the extra section in the README is sufficient for that and we can remove threaded_listen_main() altogether. I also wrote that function as a test case for debugging.

But why is it useful to show the user how to use threads? I do not see how this is any different from placing any other Python code in a thread. Surely users can read the Python standard library documentation on threads.

@dmopalmer
Copy link
Contributor Author

The only use in showing the user how to use threads is showing how to use the gcn.handlers.queuehandlerfor(), which is covered in the addition to the README.md.

I have a working asyncio VOEvent Transport Protocol client here: https://github.com/nasa-gcn/gcn-classic-to-kafka/blob/main/gcn_classic_to_kafka/socket.py

That client immediately awaits on process() or read(), so I don't see how it provides a non-blocking way to check for the next message.

@lpsinger
Copy link
Member

lpsinger commented Feb 6, 2023

That client immediately awaits on process() or read(), so I don't see how it provides a non-blocking way to check for the next message.

I think you'd just use task cancellation.

@dmopalmer
Copy link
Contributor Author

I don't want to cancel the task and then restart it and lose the messages that came in when it was cancelled.

My use case, which is probably common, is to have a telescope make observations of yesterday's GRB while keeping a VOEvent socket open to quickly change observations to a new LIGO event coming in. So after each exposure I check the message queue for something more interesting.

@dmopalmer
Copy link
Contributor Author

I got back to this in preparation for the LIGO/et al. run.

I have stripped out the stopevent capability. The changes consist of a queue handler implementation and a description in the README of how to use it.

@dmopalmer
Copy link
Contributor Author

Just re-pinging his pull request.

Copy link
Member

@lpsinger lpsinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be content with adding the new handler, but not the threading code sample.

@@ -28,6 +28,7 @@ classifiers =
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3.10
Programming Language :: Python :: 3.11
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice addition, but not related to the topic of this PR.

Suggested change
Programming Language :: Python :: 3.11

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why GitHub is showing this in the diff. It's already on the main branch, as of #35. Would you please rebase?

queue.put((payload, root))


def queuehandlerfor(queue):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other handlers have function names that are verbs or verb phrases. Please rename this one to be consistent. Perhaps 'enqueueorput_queue`?

Comment on lines +91 to +129
## Threading

You can run the listener in a separate thread or process and pass the packets back in a `Queue`,
allowing the main program to continue operating while waiting for an event.
Here is an example:

```python
#!/usr/bin/env python
import gcn
import threading
import queue

# Set up communications:
messagequeue = queue.Queue()
# Create a listen handler to enqueue the (payload, root) tuple
handler = gcn.handlers.queuehandlerfor(messagequeue)

# Create and start the thread.
thread = threading.Thread(target=gcn.listen,
kwargs=dict(handler=handler))
thread.start()

# Wait for messages to come in, but do other things if they don't.
nothingcount=0
while True:
try:
# Use block=False if you want to timeout immediately
payload,root = messagequeue.get(timeout=10)
print(root.attrib['ivorn'])
nothingcount = 0
except queue.Empty:
# Do idle stuff here.
print("Nothing...")
nothingcount += 1
if nothingcount > 10:
print("Quitting due to inactivity")
break
```

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this. If someone asked how to run the listener in a thread, this isn't how I would suggest to them that they do it. (I would suggest to them that they use an ordinary handler and just launch gcn.listen in a thread or a subprocess.)

Also, this is missing calls to queue.task_done().

@lpsinger
Copy link
Member

lpsinger commented Aug 4, 2023

FYI, passing the lxml root object through a queue might not be compatible queues under multiprocessing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants