Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Comet-style server side filtering #48

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,24 @@ def handler(payload, root):
gcn.listen(handler=handler)
```

## Filtering
## Server-Side Filtering

VOEvent brokers that are powered by [Comet](https://comet.transientskp.org/) support [server-side filtering of alerts](https://comet.transientskp.org/en/stable/filtering.html). You configure the server-side filtering when you connect by supplying an [XPath expression](https://www.w3schools.com/xml/xpath_syntax.asp) in the optional `filter` argument for `gcn.listen`:

```python
gcn.listen(handler=handler, filter='insert-filter-here')
```

Here is a cheat sheet for some common filter expressions.

| Filter expression | What it does |
| - | - |
| `//Param[@name="Packet_Type" and @value="115"]` | Pass only alerts of notice type 115 (`FERMI_GBM_FIN_POS`) |
| `//Param[@name="Packet_Type" and @value="115"] and //Error2Radius<=6` | Pass only alerts of notice type 115 (`FERMI_GBM_FIN_POS`) with error radius less than or equal to 6° |
| `//Param[@name="Packet_Type" and (@value="112" or @value="115")]` | Pass only alerts of notice type 112 (`FERMI_GBM_GND_POS`) or 115 (`FERMI_GBM_FIN_POS`) |
| `starts-with(@ivorn, "ivo://gwnet/") and @role!="test"` | Pass only LIGO-Virgo-KAGRA gravitational-wave alerts that are not test alerts |

## Client-Side Filtering

You can also filter events by notice type using
`gcn.include_notice_types` or `gcn.exclude_notice_types`.
Expand Down
7 changes: 6 additions & 1 deletion gcn/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@
'provided, then loop over hosts until a connection '
'with one of them is established. '
'(default: %(default)s)')
parser.add_argument(
'--filter', help='Optional XPath expression for server-side event '
'filtering. Only supported by Comet brokers. See '
'https://comet.transientskp.org/en/stable/filtering.html for '
'supported syntax')
parser.add_argument('--version', action='version',
version='pygcn ' + __version__)
args = parser.parse_args(args)
Expand All @@ -85,7 +90,7 @@

# Listen for GCN notices (until interrupted or killed)
host, port = [list(_) for _ in zip(*args.addr)]
listen(host=host, port=port, handler=handlers.archive)
listen(host=host, port=port, handler=handlers.archive, filter=args.filter)

Check warning on line 93 in gcn/cmdline.py

View check run for this annotation

Codecov / codecov/patch

gcn/cmdline.py#L93

Added line #L93 was not covered by tests


def serve_main(args=None):
Expand Down
30 changes: 23 additions & 7 deletions gcn/voeventclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import struct
import time
import itertools
from xml.sax.saxutils import quoteattr

from lxml.etree import fromstring, XMLSyntaxError

Expand All @@ -42,7 +43,7 @@

def _get_now_iso8601():
"""Get current date-time in ISO 8601 format."""
return datetime.datetime.now().isoformat()
return datetime.datetime.now().isoformat() + "Z"


def _open_socket(hosts_ports, iamalive_timeout, max_reconnect_timeout, log):
Expand Down Expand Up @@ -121,7 +122,7 @@
sock.sendall(_size_struct.pack(len(payload)) + payload)


def _form_response(role, origin, response, timestamp):
def _form_response(role, origin, response, timestamp, meta=''):
"""Form a VOEvent Transport Protocol packet suitable for sending an `ack`
or `iamalive` response."""
return (
Expand All @@ -133,11 +134,12 @@
'Transport/v1.1 '
'http://telescope-networks.org/schema/Transport-v1.1.xsd"><Origin>' +
origin + '</Origin><Response>' + response +
'</Response><TimeStamp>' + timestamp +
'</TimeStamp></trn:Transport>').encode('UTF-8')
'</Response>' + '<Meta>' + meta + '</Meta>'
+ '<TimeStamp>' + timestamp + '</TimeStamp></trn:Transport>'
).encode('UTF-8')


def _ingest_packet(sock, ivorn, handler, log):
def _ingest_packet(sock, ivorn, handler, log, filter):
"""Ingest one VOEvent Transport Protocol packet and act on it, first
sending the appropriate response and then calling the handler if the
payload is a VOEvent."""
Expand All @@ -163,6 +165,13 @@
root.find("Origin").text, ivorn,
_get_now_iso8601()))
log.debug("sent iamalive response")
elif root.attrib["role"] == "authenticate" and filter is not None:
log.debug("received authenticate message")
_send_packet(sock, _form_response("authenticate",

Check warning on line 170 in gcn/voeventclient.py

View check run for this annotation

Codecov / codecov/patch

gcn/voeventclient.py#L168-L170

Added lines #L168 - L170 were not covered by tests
root.find("Origin").text, ivorn,
_get_now_iso8601(),
f'<Param name="xpath-filter" value={filter}/>'))
log.debug("sent authenticate response")

Check warning on line 174 in gcn/voeventclient.py

View check run for this annotation

Codecov / codecov/patch

gcn/voeventclient.py#L174

Added line #L174 was not covered by tests
else:
log.error(
'received transport message with unrecognized role: %s',
Expand Down Expand Up @@ -212,7 +221,7 @@

def listen(host=("45.58.43.186", "68.169.57.253"), port=8099,
ivorn="ivo://python_voeventclient/anonymous", iamalive_timeout=150,
max_reconnect_timeout=1024, handler=None, log=None):
max_reconnect_timeout=1024, handler=None, log=None, filter=None):
"""Connect to a VOEvent Transport Protocol server on the given `host` and
`port`, then listen for VOEvents until interrupted (i.e., by a keyboard
interrupt, `SIGINTR`, or `SIGTERM`).
Expand All @@ -233,10 +242,17 @@
used for reporting the client's status. If `log` is not provided, a default
logger will be used.

If `filter` is provided, then it is passed to the server as an
`XPath filtering expression
<https://comet.transientskp.org/en/stable/filtering.html>`_.

Note that this function does not return."""
if log is None:
log = logging.getLogger('gcn.listen')

if filter is not None:
filter = quoteattr(filter)

Check warning on line 254 in gcn/voeventclient.py

View check run for this annotation

Codecov / codecov/patch

gcn/voeventclient.py#L254

Added line #L254 was not covered by tests

hosts_ports = itertools.cycle(zip(*_validate_host_port(host, port)))

while True:
Expand All @@ -246,7 +262,7 @@

try:
while True:
_ingest_packet(sock, ivorn, handler, log)
_ingest_packet(sock, ivorn, handler, log, filter)
except socket.timeout:
log.warn("timed out")
except socket.error:
Expand Down
Loading