From b3349bb2a67690d0c66dbc673ca6a4f3743cb7b2 Mon Sep 17 00:00:00 2001 From: David Palmer Date: Sat, 1 Dec 2018 11:29:48 -0700 Subject: [PATCH 01/10] Allow threaded operation, including a handler that places the messages on the queue and an event to stop the thread. --- README.md | 45 ++++++++++++++++++++++++++++++ gcn/__init__.py | 2 +- gcn/cmdline.py | 66 ++++++++++++++++++++++++++++++++++++++++++++ gcn/handlers.py | 18 +++++++++++- gcn/voeventclient.py | 20 +++++++++++--- setup.cfg | 1 + 6 files changed, 146 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index a5aebb1..2a5e052 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,51 @@ def handler(payload, root): gcn.listen(handler=handler) ``` +## Threading + +You can run the listener in a separate thread and communicate with it +through `Event` and `Queue` instances. Here is an example: + +```python +#!/usr/bin/env python +import gcn +import threading +import queue # For python 2.7, import Queue + +# Set up communications: +stopevent = threading.Event() +messagequeue = queue.Queue() +# Create a listen handler that enqueue the (payload, root) tuple +handler = gcn.handlers.queuehandlerfor(messagequeue) + +# Create and start the thread. +thread = threading.Thread(target=gcn.listen, + kwargs=dict(handler=handler, stopevent=stopevent)) +thread.start() + +# Wait for messages to come in, but do other things if they don't. +nothingcount=0 +while thread.is_alive(): + try: + # Use block=False if you want to timeout immediately + payload,root = messagequeue.get(timeout=10) + print(root.attrib['ivorn']) + nothingcount = 0 + except queue.Empty: + # Do idle stuff here. + print("Nothing...") + nothingcount += 1 + if nothingcount > 10: + print("Quitting due to inactivity") + break + +# Send a stop event to the listen thread, then wait for it to quit. +stopevent.set() +thread.join() +print("Done") + +``` + [1]: http://gcn.gsfc.nasa.gov [2]: http://www.ivoa.net/documents/VOEvent diff --git a/gcn/__init__.py b/gcn/__init__.py index 072b64f..dffa54d 100755 --- a/gcn/__init__.py +++ b/gcn/__init__.py @@ -40,4 +40,4 @@ from .voeventclient import * __all__ = handlers.__all__ + notice_types.__all__ + voeventclient.__all__ -__version__ = '0.1.17' +__version__ = '0.1.18' diff --git a/gcn/cmdline.py b/gcn/cmdline.py index 8a1dd62..6cf95b3 100644 --- a/gcn/cmdline.py +++ b/gcn/cmdline.py @@ -20,6 +20,16 @@ import argparse import collections import logging +import datetime +import sys +import threading +import signal + +try: + import queue +except ImportError: + import Queue as queue # Python 2.7 + from . import handlers, listen, serve, __version__ @@ -80,6 +90,62 @@ def listen_main(args=None): listen(host=args.addr.host, port=args.addr.port, handler=handlers.archive) +def threaded_listen_main(args=None): + """Example VOEvent listener that demonstrates threaded operation""" + + # Command line interface + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('addr', default='68.169.57.253:8099', + action=HostPortAction, + help='Server host and port (default: %(default)s)') + parser.add_argument('--version', action='version', + version='pygcn ' + __version__) + parser.add_argument('--maxtime', default=None, help='Time to process until returning (s)') + args = parser.parse_args(args) + + if args.maxtime is not None: + args.maxtime = datetime.timedelta(seconds = float(args.maxtime)) + + # Set up logger + logging.basicConfig(level=logging.INFO) + + # Listen for GCN notices (until interrupted, killed, or maxtime reached) + # in a second thread, while counting up seconds in the main thread. + messagequeue = queue.Queue() + stopevent = threading.Event() + def inthandler(signum, frame): + stopevent.set() + signal.signal(signal.SIGINT, handler=inthandler) # Keyboard etc interrupt + try: + listenargs = dict(host=args.addr.host, port=args.addr.port, + handler=handlers.queuehandlerfor(messagequeue), + stopevent=stopevent) + thread = threading.Thread(target=listen, kwargs=listenargs) + starttime = datetime.datetime.utcnow() + lasttime = starttime + thread.start() + + while thread.is_alive(): + try: + payload,root = messagequeue.get(timeout=1) + print('\r{} {}' + .format(datetime.datetime.utcnow().strftime("%H:%M:%S"), root.attrib['ivorn'])) + lasttime = datetime.datetime.utcnow() + except queue.Empty: + print('\r{:.0f}' + .format((datetime.datetime.utcnow() - lasttime).total_seconds()), + end='\r') + if args.maxtime is not None: + if (datetime.datetime.utcnow() - starttime) > args.maxtime: + stopevent.set() + break + except Exception as e: + stopevent.set() + print(e, file=sys.stderr) + thread.join() + raise + print('\nFinishing') + thread.join() def serve_main(args=None): """Rudimentary GCN server, for testing purposes. Serves just one connection diff --git a/gcn/handlers.py b/gcn/handlers.py index bbcf4b9..4f14ff8 100755 --- a/gcn/handlers.py +++ b/gcn/handlers.py @@ -23,7 +23,7 @@ from six.moves.urllib.parse import quote_plus __all__ = ('get_notice_type', 'include_notice_types', 'exclude_notice_types', - 'archive') + 'archive', 'queuehandlerfor') def get_notice_type(root): @@ -86,3 +86,19 @@ def archive(payload, root): with open(filename, 'wb') as f: f.write(payload) logging.getLogger('gcn.handlers.archive').info("archived %s", ivorn) + +def _queuehandler(payload, root, queue=None): + """ Place (payload, root) on queue for threaded operation. + This can be used in the following manner: + gcn.listen(handler = functools.partial(partialize_queue, queue=a_queue)) + """ + if queue is None: + raise TypeError("The queue must be set (use queuehandlerfor())") + queue.put( (payload,root) ) + +def queuehandlerfor(queue): + """Create a handler that places (payload, root) on the given queue + This can be used in the following manner: + gcn.listen(handler = queuehandlerfor(queue)) + """ + return functools.partial(_queuehandler, queue=queue) diff --git a/gcn/voeventclient.py b/gcn/voeventclient.py index f0211a3..8969805 100644 --- a/gcn/voeventclient.py +++ b/gcn/voeventclient.py @@ -193,7 +193,8 @@ def _ingest_packet(sock, ivorn, handler, log): def listen(host="68.169.57.253", port=8099, ivorn="ivo://python_voeventclient/anonymous", iamalive_timeout=150, - max_reconnect_timeout=1024, handler=None, log=None): + max_reconnect_timeout=1024, handler=None, log=None, + stopevent=None, stopinterval=1): """Connect to a VOEvent Transport Protocol server on the given `host` and `port`, then listen for VOEvents until interrupted (i.e., by a keyboard interrupt, `SIGINTR`, or `SIGTERM`). @@ -214,17 +215,28 @@ def listen(host="68.169.57.253", port=8099, used for reporting the client's status. If `log` is not provided, a default logger will be used. - Note that this function does not return.""" + If stopevent is used, then the function returns when the event is set() + (with a delay of up to stopinterval seconds). + + Note that this function does not return unless the stopevent is used.""" if log is None: log = logging.getLogger('gcn.listen') while True: sock = _open_socket(host, port, iamalive_timeout, max_reconnect_timeout, log) - + if stopevent is not None: + sock.settimeout(stopinterval) try: while True: - _ingest_packet(sock, ivorn, handler, log) + try: + _ingest_packet(sock, ivorn, handler, log) + except socket.timeout: + if stopevent is None: + raise + else: + if stopevent.is_set(): + return # Stop listening. except socket.timeout: log.warn("timed out") except socket.error: diff --git a/setup.cfg b/setup.cfg index 39ef500..cd662cd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,6 +43,7 @@ tests_require = console_scripts = pygcn-listen = gcn.cmdline:listen_main pygcn-serve = gcn.cmdline:serve_main + pygcn-threaded-listen = gcn.cmdline:threaded_listen_main [options.package_data] gcn.tests.data = *.xml From 6bcf1539864bba7e086f3ea97809adaf3a9d006a Mon Sep 17 00:00:00 2001 From: David Palmer Date: Sat, 1 Dec 2018 11:29:48 -0700 Subject: [PATCH 02/10] Allow threaded operation, including a handler that places the messages on the queue and an event to stop the thread. --- README.md | 45 ++++++++++++++++++++++++ gcn/__init__.py | 2 +- gcn/cmdline.py | 72 ++++++++++++++++++++++++++++++++++++++ gcn/handlers.py | 18 +++++++++- gcn/tests/test_handlers.py | 25 +++++++++++++ gcn/voeventclient.py | 20 ++++++++--- setup.cfg | 1 + 7 files changed, 177 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index a5aebb1..2a5e052 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,51 @@ def handler(payload, root): gcn.listen(handler=handler) ``` +## Threading + +You can run the listener in a separate thread and communicate with it +through `Event` and `Queue` instances. Here is an example: + +```python +#!/usr/bin/env python +import gcn +import threading +import queue # For python 2.7, import Queue + +# Set up communications: +stopevent = threading.Event() +messagequeue = queue.Queue() +# Create a listen handler that enqueue the (payload, root) tuple +handler = gcn.handlers.queuehandlerfor(messagequeue) + +# Create and start the thread. +thread = threading.Thread(target=gcn.listen, + kwargs=dict(handler=handler, stopevent=stopevent)) +thread.start() + +# Wait for messages to come in, but do other things if they don't. +nothingcount=0 +while thread.is_alive(): + try: + # Use block=False if you want to timeout immediately + payload,root = messagequeue.get(timeout=10) + print(root.attrib['ivorn']) + nothingcount = 0 + except queue.Empty: + # Do idle stuff here. + print("Nothing...") + nothingcount += 1 + if nothingcount > 10: + print("Quitting due to inactivity") + break + +# Send a stop event to the listen thread, then wait for it to quit. +stopevent.set() +thread.join() +print("Done") + +``` + [1]: http://gcn.gsfc.nasa.gov [2]: http://www.ivoa.net/documents/VOEvent diff --git a/gcn/__init__.py b/gcn/__init__.py index 072b64f..dffa54d 100755 --- a/gcn/__init__.py +++ b/gcn/__init__.py @@ -40,4 +40,4 @@ from .voeventclient import * __all__ = handlers.__all__ + notice_types.__all__ + voeventclient.__all__ -__version__ = '0.1.17' +__version__ = '0.1.18' diff --git a/gcn/cmdline.py b/gcn/cmdline.py index 8a1dd62..a4edcec 100644 --- a/gcn/cmdline.py +++ b/gcn/cmdline.py @@ -17,9 +17,20 @@ """ Utilities for command-line interface. """ +from __future__ import print_function import argparse import collections import logging +import datetime +import sys +import threading +import signal + +try: + import queue +except ImportError: + import Queue as queue # Python 2.7 + from . import handlers, listen, serve, __version__ @@ -81,6 +92,67 @@ def listen_main(args=None): handler=handlers.archive) +def threaded_listen_main(args=None): + """Example VOEvent listener that demonstrates threaded operation""" + + # Command line interface + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('addr', default='68.169.57.253:8099', + action=HostPortAction, + help='Server host and port (default: %(default)s)') + parser.add_argument('--version', action='version', + version='pygcn ' + __version__) + parser.add_argument('--maxtime', default=None, + help='Time to process until returning (s)') + args = parser.parse_args(args) + + if args.maxtime is not None: + args.maxtime = datetime.timedelta(seconds=float(args.maxtime)) + + # Set up logger + logging.basicConfig(level=logging.INFO) + + # Listen for GCN notices (until interrupted, killed, or maxtime reached) + # in a second thread, while counting up seconds in the main thread. + messagequeue = queue.Queue() + stopevent = threading.Event() + + def inthandler(signum, frame): + stopevent.set() + + signal.signal(signal.SIGINT, handler=inthandler) # Keyboard etc interrupt + try: + listenargs = dict(host=args.addr.host, port=args.addr.port, + handler=handlers.queuehandlerfor(messagequeue), + stopevent=stopevent) + thread = threading.Thread(target=listen, kwargs=listenargs) + starttime = datetime.datetime.utcnow() + lasttime = starttime + thread.start() + + while thread.is_alive(): + try: + payload, root = messagequeue.get(timeout=1) + print('\r{} {}' + .format(datetime.datetime.utcnow().strftime("%H:%M:%S"), + root.attrib['ivorn'])) + lasttime = datetime.datetime.utcnow() + except queue.Empty: + dt = (datetime.datetime.utcnow() - lasttime).total_seconds() + print('\r{:.0f}'.format(dt), end='\r') + if args.maxtime is not None: + if (datetime.datetime.utcnow() - starttime) > args.maxtime: + stopevent.set() + break + except Exception as e: + stopevent.set() + print(e, file=sys.stderr) + thread.join() + raise + print('\nFinishing') + thread.join() + + def serve_main(args=None): """Rudimentary GCN server, for testing purposes. Serves just one connection at a time, and repeats the same payloads in order, repeating, for each diff --git a/gcn/handlers.py b/gcn/handlers.py index bbcf4b9..7e1f244 100755 --- a/gcn/handlers.py +++ b/gcn/handlers.py @@ -23,7 +23,7 @@ from six.moves.urllib.parse import quote_plus __all__ = ('get_notice_type', 'include_notice_types', 'exclude_notice_types', - 'archive') + 'archive', 'queuehandlerfor') def get_notice_type(root): @@ -86,3 +86,19 @@ def archive(payload, root): with open(filename, 'wb') as f: f.write(payload) logging.getLogger('gcn.handlers.archive').info("archived %s", ivorn) + + +def _queuehandler(payload, root, queue=None): + """ Place (payload, root) on queue for threaded operation. + """ + if queue is None: + raise TypeError("The queue must be set (use queuehandlerfor())") + queue.put((payload, root)) + + +def queuehandlerfor(queue): + """Create a handler that places (payload, root) on the given queue + This can be used in the following manner: + gcn.listen(handler=queuehandlerfor(queue)) + """ + return functools.partial(_queuehandler, queue=queue) diff --git a/gcn/tests/test_handlers.py b/gcn/tests/test_handlers.py index 86ebc7f..20d7016 100644 --- a/gcn/tests/test_handlers.py +++ b/gcn/tests/test_handlers.py @@ -7,6 +7,10 @@ from .. import handlers from .. import notice_types +try: + import queue +except: + import Queue as queue payloads = [pkg_resources.resource_string(__name__, 'data/gbm_flt_pos.xml'), pkg_resources.resource_string(__name__, 'data/kill_socket.xml')] @@ -52,3 +56,24 @@ def test_archive(tmpdir): assert (tmpdir / filename).exists() finally: os.chdir(old_dir) + + +def test_queuehandler(): + queue_ = queue.Queue() + queuehandler = handlers.queuehandlerfor(queue_) + assert queue_.empty() + + for payload in payloads: + queuehandler(payload, fromstring(payload)) + qpayload, qtree = queue_.get() + assert qpayload == payload + + assert queue_.empty() + + for payload in payloads: + queuehandler(payload, fromstring(payload)) + for payload in payloads: + qpayload, qtree = queue_.get() + assert qpayload == payload + + assert queue_.empty() diff --git a/gcn/voeventclient.py b/gcn/voeventclient.py index f0211a3..8969805 100644 --- a/gcn/voeventclient.py +++ b/gcn/voeventclient.py @@ -193,7 +193,8 @@ def _ingest_packet(sock, ivorn, handler, log): def listen(host="68.169.57.253", port=8099, ivorn="ivo://python_voeventclient/anonymous", iamalive_timeout=150, - max_reconnect_timeout=1024, handler=None, log=None): + max_reconnect_timeout=1024, handler=None, log=None, + stopevent=None, stopinterval=1): """Connect to a VOEvent Transport Protocol server on the given `host` and `port`, then listen for VOEvents until interrupted (i.e., by a keyboard interrupt, `SIGINTR`, or `SIGTERM`). @@ -214,17 +215,28 @@ def listen(host="68.169.57.253", port=8099, used for reporting the client's status. If `log` is not provided, a default logger will be used. - Note that this function does not return.""" + If stopevent is used, then the function returns when the event is set() + (with a delay of up to stopinterval seconds). + + Note that this function does not return unless the stopevent is used.""" if log is None: log = logging.getLogger('gcn.listen') while True: sock = _open_socket(host, port, iamalive_timeout, max_reconnect_timeout, log) - + if stopevent is not None: + sock.settimeout(stopinterval) try: while True: - _ingest_packet(sock, ivorn, handler, log) + try: + _ingest_packet(sock, ivorn, handler, log) + except socket.timeout: + if stopevent is None: + raise + else: + if stopevent.is_set(): + return # Stop listening. except socket.timeout: log.warn("timed out") except socket.error: diff --git a/setup.cfg b/setup.cfg index 39ef500..cd662cd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,6 +43,7 @@ tests_require = console_scripts = pygcn-listen = gcn.cmdline:listen_main pygcn-serve = gcn.cmdline:serve_main + pygcn-threaded-listen = gcn.cmdline:threaded_listen_main [options.package_data] gcn.tests.data = *.xml From 27257753e917b7514ab7127cefb0b579c39a9e0d Mon Sep 17 00:00:00 2001 From: "David M. Palmer" Date: Sat, 1 Dec 2018 14:54:42 -0700 Subject: [PATCH 03/10] PEP-8 cleanup. --- gcn/cmdline.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/gcn/cmdline.py b/gcn/cmdline.py index 51b7df7..c838b23 100644 --- a/gcn/cmdline.py +++ b/gcn/cmdline.py @@ -91,6 +91,7 @@ def listen_main(args=None): listen(host=args.addr.host, port=args.addr.port, handler=handlers.archive) + def threaded_listen_main(args=None): """Example VOEvent listener that demonstrates threaded operation""" @@ -101,11 +102,12 @@ def threaded_listen_main(args=None): help='Server host and port (default: %(default)s)') parser.add_argument('--version', action='version', version='pygcn ' + __version__) - parser.add_argument('--maxtime', default=None, help='Time to process until returning (s)') + parser.add_argument('--maxtime', default=None, + help='Time to process until returning (s)') args = parser.parse_args(args) if args.maxtime is not None: - args.maxtime = datetime.timedelta(seconds = float(args.maxtime)) + args.maxtime = datetime.timedelta(seconds=float(args.maxtime)) # Set up logger logging.basicConfig(level=logging.INFO) @@ -114,13 +116,15 @@ def threaded_listen_main(args=None): # in a second thread, while counting up seconds in the main thread. messagequeue = queue.Queue() stopevent = threading.Event() + def inthandler(signum, frame): stopevent.set() - signal.signal(signal.SIGINT, handler=inthandler) # Keyboard etc interrupt + + signal.signal(signal.SIGINT, handler=inthandler) # Keyboard etc interrupt try: listenargs = dict(host=args.addr.host, port=args.addr.port, - handler=handlers.queuehandlerfor(messagequeue), - stopevent=stopevent) + handler=handlers.queuehandlerfor(messagequeue), + stopevent=stopevent) thread = threading.Thread(target=listen, kwargs=listenargs) starttime = datetime.datetime.utcnow() lasttime = starttime @@ -128,14 +132,14 @@ def inthandler(signum, frame): while thread.is_alive(): try: - payload,root = messagequeue.get(timeout=1) + payload, root = messagequeue.get(timeout=1) print('\r{} {}' - .format(datetime.datetime.utcnow().strftime("%H:%M:%S"), root.attrib['ivorn'])) + .format(datetime.datetime.utcnow().strftime("%H:%M:%S"), + root.attrib['ivorn'])) lasttime = datetime.datetime.utcnow() except queue.Empty: - print('\r{:.0f}' - .format((datetime.datetime.utcnow() - lasttime).total_seconds()), - end='\r') + dt = (datetime.datetime.utcnow() - lasttime).total_seconds() + print('\r{:.0f}'.format(dt), end='\r') if args.maxtime is not None: if (datetime.datetime.utcnow() - starttime) > args.maxtime: stopevent.set() @@ -148,6 +152,7 @@ def inthandler(signum, frame): print('\nFinishing') thread.join() + def threaded_listen_main(args=None): """Example VOEvent listener that demonstrates threaded operation""" From fa06b7fb60901115f35c7d143ce831847a074111 Mon Sep 17 00:00:00 2001 From: David Palmer Date: Sat, 1 Dec 2018 11:29:48 -0700 Subject: [PATCH 04/10] Allow threaded operation, including a handler that places the messages on the queue and an event to stop the thread. --- README.md | 45 ++++++++++++++++++++++++++++++ gcn/__init__.py | 2 +- gcn/cmdline.py | 66 ++++++++++++++++++++++++++++++++++++++++++++ gcn/handlers.py | 18 +++++++++++- gcn/voeventclient.py | 20 +++++++++++--- setup.cfg | 1 + 6 files changed, 146 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index a5aebb1..2a5e052 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,51 @@ def handler(payload, root): gcn.listen(handler=handler) ``` +## Threading + +You can run the listener in a separate thread and communicate with it +through `Event` and `Queue` instances. Here is an example: + +```python +#!/usr/bin/env python +import gcn +import threading +import queue # For python 2.7, import Queue + +# Set up communications: +stopevent = threading.Event() +messagequeue = queue.Queue() +# Create a listen handler that enqueue the (payload, root) tuple +handler = gcn.handlers.queuehandlerfor(messagequeue) + +# Create and start the thread. +thread = threading.Thread(target=gcn.listen, + kwargs=dict(handler=handler, stopevent=stopevent)) +thread.start() + +# Wait for messages to come in, but do other things if they don't. +nothingcount=0 +while thread.is_alive(): + try: + # Use block=False if you want to timeout immediately + payload,root = messagequeue.get(timeout=10) + print(root.attrib['ivorn']) + nothingcount = 0 + except queue.Empty: + # Do idle stuff here. + print("Nothing...") + nothingcount += 1 + if nothingcount > 10: + print("Quitting due to inactivity") + break + +# Send a stop event to the listen thread, then wait for it to quit. +stopevent.set() +thread.join() +print("Done") + +``` + [1]: http://gcn.gsfc.nasa.gov [2]: http://www.ivoa.net/documents/VOEvent diff --git a/gcn/__init__.py b/gcn/__init__.py index a9fa786..5d32a97 100755 --- a/gcn/__init__.py +++ b/gcn/__init__.py @@ -40,4 +40,4 @@ from .voeventclient import * __all__ = handlers.__all__ + notice_types.__all__ + voeventclient.__all__ -__version__ = '0.1.19' +__version__ = '0.1.19a1' diff --git a/gcn/cmdline.py b/gcn/cmdline.py index 8a1dd62..6cf95b3 100644 --- a/gcn/cmdline.py +++ b/gcn/cmdline.py @@ -20,6 +20,16 @@ import argparse import collections import logging +import datetime +import sys +import threading +import signal + +try: + import queue +except ImportError: + import Queue as queue # Python 2.7 + from . import handlers, listen, serve, __version__ @@ -80,6 +90,62 @@ def listen_main(args=None): listen(host=args.addr.host, port=args.addr.port, handler=handlers.archive) +def threaded_listen_main(args=None): + """Example VOEvent listener that demonstrates threaded operation""" + + # Command line interface + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('addr', default='68.169.57.253:8099', + action=HostPortAction, + help='Server host and port (default: %(default)s)') + parser.add_argument('--version', action='version', + version='pygcn ' + __version__) + parser.add_argument('--maxtime', default=None, help='Time to process until returning (s)') + args = parser.parse_args(args) + + if args.maxtime is not None: + args.maxtime = datetime.timedelta(seconds = float(args.maxtime)) + + # Set up logger + logging.basicConfig(level=logging.INFO) + + # Listen for GCN notices (until interrupted, killed, or maxtime reached) + # in a second thread, while counting up seconds in the main thread. + messagequeue = queue.Queue() + stopevent = threading.Event() + def inthandler(signum, frame): + stopevent.set() + signal.signal(signal.SIGINT, handler=inthandler) # Keyboard etc interrupt + try: + listenargs = dict(host=args.addr.host, port=args.addr.port, + handler=handlers.queuehandlerfor(messagequeue), + stopevent=stopevent) + thread = threading.Thread(target=listen, kwargs=listenargs) + starttime = datetime.datetime.utcnow() + lasttime = starttime + thread.start() + + while thread.is_alive(): + try: + payload,root = messagequeue.get(timeout=1) + print('\r{} {}' + .format(datetime.datetime.utcnow().strftime("%H:%M:%S"), root.attrib['ivorn'])) + lasttime = datetime.datetime.utcnow() + except queue.Empty: + print('\r{:.0f}' + .format((datetime.datetime.utcnow() - lasttime).total_seconds()), + end='\r') + if args.maxtime is not None: + if (datetime.datetime.utcnow() - starttime) > args.maxtime: + stopevent.set() + break + except Exception as e: + stopevent.set() + print(e, file=sys.stderr) + thread.join() + raise + print('\nFinishing') + thread.join() def serve_main(args=None): """Rudimentary GCN server, for testing purposes. Serves just one connection diff --git a/gcn/handlers.py b/gcn/handlers.py index bbcf4b9..4f14ff8 100755 --- a/gcn/handlers.py +++ b/gcn/handlers.py @@ -23,7 +23,7 @@ from six.moves.urllib.parse import quote_plus __all__ = ('get_notice_type', 'include_notice_types', 'exclude_notice_types', - 'archive') + 'archive', 'queuehandlerfor') def get_notice_type(root): @@ -86,3 +86,19 @@ def archive(payload, root): with open(filename, 'wb') as f: f.write(payload) logging.getLogger('gcn.handlers.archive').info("archived %s", ivorn) + +def _queuehandler(payload, root, queue=None): + """ Place (payload, root) on queue for threaded operation. + This can be used in the following manner: + gcn.listen(handler = functools.partial(partialize_queue, queue=a_queue)) + """ + if queue is None: + raise TypeError("The queue must be set (use queuehandlerfor())") + queue.put( (payload,root) ) + +def queuehandlerfor(queue): + """Create a handler that places (payload, root) on the given queue + This can be used in the following manner: + gcn.listen(handler = queuehandlerfor(queue)) + """ + return functools.partial(_queuehandler, queue=queue) diff --git a/gcn/voeventclient.py b/gcn/voeventclient.py index f0211a3..8969805 100644 --- a/gcn/voeventclient.py +++ b/gcn/voeventclient.py @@ -193,7 +193,8 @@ def _ingest_packet(sock, ivorn, handler, log): def listen(host="68.169.57.253", port=8099, ivorn="ivo://python_voeventclient/anonymous", iamalive_timeout=150, - max_reconnect_timeout=1024, handler=None, log=None): + max_reconnect_timeout=1024, handler=None, log=None, + stopevent=None, stopinterval=1): """Connect to a VOEvent Transport Protocol server on the given `host` and `port`, then listen for VOEvents until interrupted (i.e., by a keyboard interrupt, `SIGINTR`, or `SIGTERM`). @@ -214,17 +215,28 @@ def listen(host="68.169.57.253", port=8099, used for reporting the client's status. If `log` is not provided, a default logger will be used. - Note that this function does not return.""" + If stopevent is used, then the function returns when the event is set() + (with a delay of up to stopinterval seconds). + + Note that this function does not return unless the stopevent is used.""" if log is None: log = logging.getLogger('gcn.listen') while True: sock = _open_socket(host, port, iamalive_timeout, max_reconnect_timeout, log) - + if stopevent is not None: + sock.settimeout(stopinterval) try: while True: - _ingest_packet(sock, ivorn, handler, log) + try: + _ingest_packet(sock, ivorn, handler, log) + except socket.timeout: + if stopevent is None: + raise + else: + if stopevent.is_set(): + return # Stop listening. except socket.timeout: log.warn("timed out") except socket.error: diff --git a/setup.cfg b/setup.cfg index 39ef500..cd662cd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,6 +43,7 @@ tests_require = console_scripts = pygcn-listen = gcn.cmdline:listen_main pygcn-serve = gcn.cmdline:serve_main + pygcn-threaded-listen = gcn.cmdline:threaded_listen_main [options.package_data] gcn.tests.data = *.xml From 836f229e2df56c0133eae4f1f4a7ed0e4de94043 Mon Sep 17 00:00:00 2001 From: David Palmer Date: Sat, 1 Dec 2018 11:29:48 -0700 Subject: [PATCH 05/10] Allow threaded operation, including a handler that places the messages on the queue and an event to stop the thread. --- gcn/cmdline.py | 62 ++++++++++++++++++++++++++++++++++++++ gcn/handlers.py | 4 ++- gcn/tests/test_handlers.py | 25 +++++++++++++++ 3 files changed, 90 insertions(+), 1 deletion(-) diff --git a/gcn/cmdline.py b/gcn/cmdline.py index 6cf95b3..51b7df7 100644 --- a/gcn/cmdline.py +++ b/gcn/cmdline.py @@ -17,6 +17,7 @@ """ Utilities for command-line interface. """ +from __future__ import print_function import argparse import collections import logging @@ -147,6 +148,67 @@ def inthandler(signum, frame): print('\nFinishing') thread.join() +def threaded_listen_main(args=None): + """Example VOEvent listener that demonstrates threaded operation""" + + # Command line interface + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('addr', default='68.169.57.253:8099', + action=HostPortAction, + help='Server host and port (default: %(default)s)') + parser.add_argument('--version', action='version', + version='pygcn ' + __version__) + parser.add_argument('--maxtime', default=None, + help='Time to process until returning (s)') + args = parser.parse_args(args) + + if args.maxtime is not None: + args.maxtime = datetime.timedelta(seconds=float(args.maxtime)) + + # Set up logger + logging.basicConfig(level=logging.INFO) + + # Listen for GCN notices (until interrupted, killed, or maxtime reached) + # in a second thread, while counting up seconds in the main thread. + messagequeue = queue.Queue() + stopevent = threading.Event() + + def inthandler(signum, frame): + stopevent.set() + + signal.signal(signal.SIGINT, handler=inthandler) # Keyboard etc interrupt + try: + listenargs = dict(host=args.addr.host, port=args.addr.port, + handler=handlers.queuehandlerfor(messagequeue), + stopevent=stopevent) + thread = threading.Thread(target=listen, kwargs=listenargs) + starttime = datetime.datetime.utcnow() + lasttime = starttime + thread.start() + + while thread.is_alive(): + try: + payload, root = messagequeue.get(timeout=1) + print('\r{} {}' + .format(datetime.datetime.utcnow().strftime("%H:%M:%S"), + root.attrib['ivorn'])) + lasttime = datetime.datetime.utcnow() + except queue.Empty: + dt = (datetime.datetime.utcnow() - lasttime).total_seconds() + print('\r{:.0f}'.format(dt), end='\r') + if args.maxtime is not None: + if (datetime.datetime.utcnow() - starttime) > args.maxtime: + stopevent.set() + break + except Exception as e: + stopevent.set() + print(e, file=sys.stderr) + thread.join() + raise + print('\nFinishing') + thread.join() + + def serve_main(args=None): """Rudimentary GCN server, for testing purposes. Serves just one connection at a time, and repeats the same payloads in order, repeating, for each diff --git a/gcn/handlers.py b/gcn/handlers.py index 4f14ff8..f971117 100755 --- a/gcn/handlers.py +++ b/gcn/handlers.py @@ -87,6 +87,7 @@ def archive(payload, root): f.write(payload) logging.getLogger('gcn.handlers.archive').info("archived %s", ivorn) + def _queuehandler(payload, root, queue=None): """ Place (payload, root) on queue for threaded operation. This can be used in the following manner: @@ -94,7 +95,8 @@ def _queuehandler(payload, root, queue=None): """ if queue is None: raise TypeError("The queue must be set (use queuehandlerfor())") - queue.put( (payload,root) ) + queue.put((payload, root)) + def queuehandlerfor(queue): """Create a handler that places (payload, root) on the given queue diff --git a/gcn/tests/test_handlers.py b/gcn/tests/test_handlers.py index 86ebc7f..20d7016 100644 --- a/gcn/tests/test_handlers.py +++ b/gcn/tests/test_handlers.py @@ -7,6 +7,10 @@ from .. import handlers from .. import notice_types +try: + import queue +except: + import Queue as queue payloads = [pkg_resources.resource_string(__name__, 'data/gbm_flt_pos.xml'), pkg_resources.resource_string(__name__, 'data/kill_socket.xml')] @@ -52,3 +56,24 @@ def test_archive(tmpdir): assert (tmpdir / filename).exists() finally: os.chdir(old_dir) + + +def test_queuehandler(): + queue_ = queue.Queue() + queuehandler = handlers.queuehandlerfor(queue_) + assert queue_.empty() + + for payload in payloads: + queuehandler(payload, fromstring(payload)) + qpayload, qtree = queue_.get() + assert qpayload == payload + + assert queue_.empty() + + for payload in payloads: + queuehandler(payload, fromstring(payload)) + for payload in payloads: + qpayload, qtree = queue_.get() + assert qpayload == payload + + assert queue_.empty() From 19e8a5f33d38b184f807378c840c8ac626ed2c85 Mon Sep 17 00:00:00 2001 From: "David M. Palmer" Date: Sat, 1 Dec 2018 14:54:42 -0700 Subject: [PATCH 06/10] PEP-8 cleanup. --- gcn/cmdline.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/gcn/cmdline.py b/gcn/cmdline.py index 51b7df7..c838b23 100644 --- a/gcn/cmdline.py +++ b/gcn/cmdline.py @@ -91,6 +91,7 @@ def listen_main(args=None): listen(host=args.addr.host, port=args.addr.port, handler=handlers.archive) + def threaded_listen_main(args=None): """Example VOEvent listener that demonstrates threaded operation""" @@ -101,11 +102,12 @@ def threaded_listen_main(args=None): help='Server host and port (default: %(default)s)') parser.add_argument('--version', action='version', version='pygcn ' + __version__) - parser.add_argument('--maxtime', default=None, help='Time to process until returning (s)') + parser.add_argument('--maxtime', default=None, + help='Time to process until returning (s)') args = parser.parse_args(args) if args.maxtime is not None: - args.maxtime = datetime.timedelta(seconds = float(args.maxtime)) + args.maxtime = datetime.timedelta(seconds=float(args.maxtime)) # Set up logger logging.basicConfig(level=logging.INFO) @@ -114,13 +116,15 @@ def threaded_listen_main(args=None): # in a second thread, while counting up seconds in the main thread. messagequeue = queue.Queue() stopevent = threading.Event() + def inthandler(signum, frame): stopevent.set() - signal.signal(signal.SIGINT, handler=inthandler) # Keyboard etc interrupt + + signal.signal(signal.SIGINT, handler=inthandler) # Keyboard etc interrupt try: listenargs = dict(host=args.addr.host, port=args.addr.port, - handler=handlers.queuehandlerfor(messagequeue), - stopevent=stopevent) + handler=handlers.queuehandlerfor(messagequeue), + stopevent=stopevent) thread = threading.Thread(target=listen, kwargs=listenargs) starttime = datetime.datetime.utcnow() lasttime = starttime @@ -128,14 +132,14 @@ def inthandler(signum, frame): while thread.is_alive(): try: - payload,root = messagequeue.get(timeout=1) + payload, root = messagequeue.get(timeout=1) print('\r{} {}' - .format(datetime.datetime.utcnow().strftime("%H:%M:%S"), root.attrib['ivorn'])) + .format(datetime.datetime.utcnow().strftime("%H:%M:%S"), + root.attrib['ivorn'])) lasttime = datetime.datetime.utcnow() except queue.Empty: - print('\r{:.0f}' - .format((datetime.datetime.utcnow() - lasttime).total_seconds()), - end='\r') + dt = (datetime.datetime.utcnow() - lasttime).total_seconds() + print('\r{:.0f}'.format(dt), end='\r') if args.maxtime is not None: if (datetime.datetime.utcnow() - starttime) > args.maxtime: stopevent.set() @@ -148,6 +152,7 @@ def inthandler(signum, frame): print('\nFinishing') thread.join() + def threaded_listen_main(args=None): """Example VOEvent listener that demonstrates threaded operation""" From 2fe080b2bc377a8ee10c324384da60bf009ea705 Mon Sep 17 00:00:00 2001 From: David Palmer Date: Wed, 8 Jan 2020 11:00:49 -0700 Subject: [PATCH 07/10] Removed accidental duplication. Fixed pep-8 (required by Travis). --- gcn/cmdline.py | 62 ------------------------------------------------- gcn/handlers.py | 2 +- 2 files changed, 1 insertion(+), 63 deletions(-) diff --git a/gcn/cmdline.py b/gcn/cmdline.py index c838b23..7e9d12e 100644 --- a/gcn/cmdline.py +++ b/gcn/cmdline.py @@ -31,7 +31,6 @@ except ImportError: import Queue as queue # Python 2.7 - from . import handlers, listen, serve, __version__ @@ -153,67 +152,6 @@ def inthandler(signum, frame): thread.join() -def threaded_listen_main(args=None): - """Example VOEvent listener that demonstrates threaded operation""" - - # Command line interface - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument('addr', default='68.169.57.253:8099', - action=HostPortAction, - help='Server host and port (default: %(default)s)') - parser.add_argument('--version', action='version', - version='pygcn ' + __version__) - parser.add_argument('--maxtime', default=None, - help='Time to process until returning (s)') - args = parser.parse_args(args) - - if args.maxtime is not None: - args.maxtime = datetime.timedelta(seconds=float(args.maxtime)) - - # Set up logger - logging.basicConfig(level=logging.INFO) - - # Listen for GCN notices (until interrupted, killed, or maxtime reached) - # in a second thread, while counting up seconds in the main thread. - messagequeue = queue.Queue() - stopevent = threading.Event() - - def inthandler(signum, frame): - stopevent.set() - - signal.signal(signal.SIGINT, handler=inthandler) # Keyboard etc interrupt - try: - listenargs = dict(host=args.addr.host, port=args.addr.port, - handler=handlers.queuehandlerfor(messagequeue), - stopevent=stopevent) - thread = threading.Thread(target=listen, kwargs=listenargs) - starttime = datetime.datetime.utcnow() - lasttime = starttime - thread.start() - - while thread.is_alive(): - try: - payload, root = messagequeue.get(timeout=1) - print('\r{} {}' - .format(datetime.datetime.utcnow().strftime("%H:%M:%S"), - root.attrib['ivorn'])) - lasttime = datetime.datetime.utcnow() - except queue.Empty: - dt = (datetime.datetime.utcnow() - lasttime).total_seconds() - print('\r{:.0f}'.format(dt), end='\r') - if args.maxtime is not None: - if (datetime.datetime.utcnow() - starttime) > args.maxtime: - stopevent.set() - break - except Exception as e: - stopevent.set() - print(e, file=sys.stderr) - thread.join() - raise - print('\nFinishing') - thread.join() - - def serve_main(args=None): """Rudimentary GCN server, for testing purposes. Serves just one connection at a time, and repeats the same payloads in order, repeating, for each diff --git a/gcn/handlers.py b/gcn/handlers.py index f971117..988b27e 100755 --- a/gcn/handlers.py +++ b/gcn/handlers.py @@ -91,7 +91,7 @@ def archive(payload, root): def _queuehandler(payload, root, queue=None): """ Place (payload, root) on queue for threaded operation. This can be used in the following manner: - gcn.listen(handler = functools.partial(partialize_queue, queue=a_queue)) + gcn.listen(handler=functools.partial(partialize_queue, queue=a_queue)) """ if queue is None: raise TypeError("The queue must be set (use queuehandlerfor())") From 7b309e764e437da52a4186a6b5cb61d9e8c1029d Mon Sep 17 00:00:00 2001 From: David Palmer Date: Thu, 18 Nov 2021 13:07:11 -0700 Subject: [PATCH 08/10] Added extra and keyword args to the wrappers. --- gcn/handlers.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/gcn/handlers.py b/gcn/handlers.py index 855ccb6..ee71ac7 100755 --- a/gcn/handlers.py +++ b/gcn/handlers.py @@ -45,9 +45,9 @@ def handle(payload, root): def decorate(handler): @functools.wraps(handler) - def handle(payload, root): + def handle(payload, root, *args, **kwargs): if get_notice_type(root) in notice_types: - handler(payload, root) + handler(payload, root, *args, **kwargs) return handle return decorate @@ -69,9 +69,9 @@ def handle(payload, root): def decorate(handler): @functools.wraps(handler) - def handle(payload, root): + def handle(payload, root, *args, **kwargs): if get_notice_type(root) not in notice_types: - handler(payload, root) + handler(payload, root, *args, **kwargs) return handle return decorate From 86c4e5cd80aa537d2ad4622e0998bb3d43ade4dd Mon Sep 17 00:00:00 2001 From: David Palmer Date: Tue, 4 Apr 2023 17:54:46 -0600 Subject: [PATCH 09/10] Removed stopevent capability to simplify threaded operation. --- README.md | 18 ++++------ gcn/cmdline.py | 71 -------------------------------------- gcn/handlers.py | 4 +-- gcn/tests/test_handlers.py | 12 +++---- gcn/voeventclient.py | 20 +++-------- setup.cfg | 2 +- 6 files changed, 16 insertions(+), 111 deletions(-) diff --git a/README.md b/README.md index 0b42b3c..7e73c59 100644 --- a/README.md +++ b/README.md @@ -90,24 +90,24 @@ gcn.listen(handler=handler) ## Threading -You can run the listener in a separate thread and communicate with it -through `Event` and `Queue` instances. Here is an example: +You can run the listener in a separate thread and pass the packets back in a `Queue`, +allowing the main program to continue operating while waiting for an event. +Here is an example: ```python #!/usr/bin/env python import gcn import threading -import queue # For python 2.7, import Queue +import queue # Set up communications: -stopevent = threading.Event() messagequeue = queue.Queue() -# Create a listen handler that enqueue the (payload, root) tuple +# Create a listen handler to enqueue the (payload, root) tuple handler = gcn.handlers.queuehandlerfor(messagequeue) # Create and start the thread. thread = threading.Thread(target=gcn.listen, - kwargs=dict(handler=handler, stopevent=stopevent)) + kwargs=dict(handler=handler)) thread.start() # Wait for messages to come in, but do other things if they don't. @@ -125,12 +125,6 @@ while thread.is_alive(): if nothingcount > 10: print("Quitting due to inactivity") break - -# Send a stop event to the listen thread, then wait for it to quit. -stopevent.set() -thread.join() -print("Done") - ``` diff --git a/gcn/cmdline.py b/gcn/cmdline.py index f626939..cad594b 100644 --- a/gcn/cmdline.py +++ b/gcn/cmdline.py @@ -16,19 +16,9 @@ """ Utilities for command-line interface. """ -from __future__ import print_function import argparse import collections import logging -import datetime -import sys -import threading -import signal - -try: - import queue -except ImportError: - import Queue as queue # Python 2.7 from . import handlers, listen, serve, __version__ @@ -98,67 +88,6 @@ def listen_main(args=None): listen(host=host, port=port, handler=handlers.archive) -def threaded_listen_main(args=None): - """Example VOEvent listener that demonstrates threaded operation""" - - # Command line interface - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument('addr', default='68.169.57.253:8099', - action=HostPortAction, - help='Server host and port (default: %(default)s)') - parser.add_argument('--version', action='version', - version='pygcn ' + __version__) - parser.add_argument('--maxtime', default=None, - help='Time to process until returning (s)') - args = parser.parse_args(args) - - if args.maxtime is not None: - args.maxtime = datetime.timedelta(seconds=float(args.maxtime)) - - # Set up logger - logging.basicConfig(level=logging.INFO) - - # Listen for GCN notices (until interrupted, killed, or maxtime reached) - # in a second thread, while counting up seconds in the main thread. - messagequeue = queue.Queue() - stopevent = threading.Event() - - def inthandler(signum, frame): - stopevent.set() - - signal.signal(signal.SIGINT, handler=inthandler) # Keyboard etc interrupt - try: - listenargs = dict(host=args.addr.host, port=args.addr.port, - handler=handlers.queuehandlerfor(messagequeue), - stopevent=stopevent) - thread = threading.Thread(target=listen, kwargs=listenargs) - starttime = datetime.datetime.utcnow() - lasttime = starttime - thread.start() - - while thread.is_alive(): - try: - payload, root = messagequeue.get(timeout=1) - print('\r{} {}' - .format(datetime.datetime.utcnow().strftime("%H:%M:%S"), - root.attrib['ivorn'])) - lasttime = datetime.datetime.utcnow() - except queue.Empty: - dt = (datetime.datetime.utcnow() - lasttime).total_seconds() - print('\r{:.0f}'.format(dt), end='\r') - if args.maxtime is not None: - if (datetime.datetime.utcnow() - starttime) > args.maxtime: - stopevent.set() - break - except Exception as e: - stopevent.set() - print(e, file=sys.stderr) - thread.join() - raise - print('\nFinishing') - thread.join() - - def serve_main(args=None): """Rudimentary GCN server, for testing purposes. Serves just one connection at a time, and repeats the same payloads in order, repeating, for each diff --git a/gcn/handlers.py b/gcn/handlers.py index 0331d1d..04b352c 100755 --- a/gcn/handlers.py +++ b/gcn/handlers.py @@ -87,13 +87,11 @@ def archive(payload, root): logging.getLogger('gcn.handlers.archive').info("archived %s", ivorn) -def _queuehandler(payload, root, queue=None): +def _queuehandler(payload, root, *, queue): """ Place (payload, root) on queue for threaded operation. This can be used in the following manner: gcn.listen(handler=functools.partial(partialize_queue, queue=a_queue)) """ - if queue is None: - raise TypeError("The queue must be set (use queuehandlerfor())") queue.put((payload, root)) diff --git a/gcn/tests/test_handlers.py b/gcn/tests/test_handlers.py index 297d05a..844d943 100644 --- a/gcn/tests/test_handlers.py +++ b/gcn/tests/test_handlers.py @@ -7,11 +7,7 @@ from . import data from .. import handlers from .. import notice_types - -try: - import queue -except: - import Queue as queue +import queue payloads = [resources.read_binary(data, 'gbm_flt_pos.xml'), resources.read_binary(data, 'kill_socket.xml')] @@ -61,18 +57,18 @@ def test_archive(tmpdir): def test_queuehandler(): queue_ = queue.Queue() - queuehandler = handlers.queuehandlerfor(queue_) + queuehandlerfor = handlers.queuehandlerfor(queue_) assert queue_.empty() for payload in payloads: - queuehandler(payload, fromstring(payload)) + queuehandlerfor(payload, fromstring(payload)) qpayload, qtree = queue_.get() assert qpayload == payload assert queue_.empty() for payload in payloads: - queuehandler(payload, fromstring(payload)) + queuehandlerfor(payload, fromstring(payload)) for payload in payloads: qpayload, qtree = queue_.get() assert qpayload == payload diff --git a/gcn/voeventclient.py b/gcn/voeventclient.py index a33cdc7..0e42b5b 100644 --- a/gcn/voeventclient.py +++ b/gcn/voeventclient.py @@ -212,8 +212,7 @@ def _validate_host_port(host, port): def listen(host=("45.58.43.186", "68.169.57.253"), port=8099, ivorn="ivo://python_voeventclient/anonymous", iamalive_timeout=150, - max_reconnect_timeout=1024, handler=None, log=None, - stopevent=None, stopinterval=1): + max_reconnect_timeout=1024, handler=None, log=None): """Connect to a VOEvent Transport Protocol server on the given `host` and `port`, then listen for VOEvents until interrupted (i.e., by a keyboard interrupt, `SIGINTR`, or `SIGTERM`). @@ -234,10 +233,7 @@ def listen(host=("45.58.43.186", "68.169.57.253"), port=8099, used for reporting the client's status. If `log` is not provided, a default logger will be used. - If stopevent is used, then the function returns when the event is set() - (with a delay of up to stopinterval seconds). - - Note that this function does not return unless the stopevent is used.""" + Note that this function does not return.""" if log is None: log = logging.getLogger('gcn.listen') @@ -247,18 +243,10 @@ def listen(host=("45.58.43.186", "68.169.57.253"), port=8099, sock = _open_socket(hosts_ports, iamalive_timeout, max_reconnect_timeout, log) - if stopevent is not None: - sock.settimeout(stopinterval) + try: while True: - try: - _ingest_packet(sock, ivorn, handler, log) - except socket.timeout: - if stopevent is None: - raise - else: - if stopevent.is_set(): - return # Stop listening. + _ingest_packet(sock, ivorn, handler, log) except socket.timeout: log.warn("timed out") except socket.error: diff --git a/setup.cfg b/setup.cfg index 75370a7..65c9431 100644 --- a/setup.cfg +++ b/setup.cfg @@ -28,6 +28,7 @@ classifiers = Programming Language :: Python :: 3.8 Programming Language :: Python :: 3.9 Programming Language :: Python :: 3.10 + Programming Language :: Python :: 3.11 Topic :: Internet Topic :: Scientific/Engineering :: Astronomy project_urls = @@ -46,7 +47,6 @@ tests_require = console_scripts = pygcn-listen = gcn.cmdline:listen_main pygcn-serve = gcn.cmdline:serve_main - pygcn-threaded-listen = gcn.cmdline:threaded_listen_main [options.package_data] gcn.tests.data = *.xml From 5db02d40dd0a90d2b6c974a5dddcd6678939a507 Mon Sep 17 00:00:00 2001 From: David Palmer Date: Tue, 4 Apr 2023 18:15:07 -0600 Subject: [PATCH 10/10] Readme change --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 7e73c59..38aa2b1 100644 --- a/README.md +++ b/README.md @@ -90,7 +90,7 @@ gcn.listen(handler=handler) ## Threading -You can run the listener in a separate thread and pass the packets back in a `Queue`, +You can run the listener in a separate thread or process and pass the packets back in a `Queue`, allowing the main program to continue operating while waiting for an event. Here is an example: @@ -112,7 +112,7 @@ thread.start() # Wait for messages to come in, but do other things if they don't. nothingcount=0 -while thread.is_alive(): +while True: try: # Use block=False if you want to timeout immediately payload,root = messagequeue.get(timeout=10)