Skip to content

Plugin Development

J Tseng edited this page Jul 13, 2023 · 2 revisions

How to write a plugin.

For information on getting set up to run snewpdag, see the Installation wiki page.

Branches

It's recommended that you fork snewpdag, develop code in your branch. Then, when you're ready to push your commits, push it to the branch, and make a pull request.

How to make a plugin

Useful imports

import logging
import numpy as np
import healpy as hp
from snewpdag.dag import Node

Define your class

The plugin derives from the Node class.

If your plugin takes arguments on construction, define them in the __init__ method, and then forward the rest of the keyword arguments to the parent initializer. In the example below, myarg is a required argument, while optarg is an optional argument.

class MyPlugin(Node):
  def __init__(self, myarg, **kwargs):
    self.myarg = myarg
    if 'optarg' in kwargs:
      self.optarg = kwargs.pop('optarg')
    self.map = {}
    super().__init__(**kwargs)

Here I've also initialized the instance variable map, which will be used to hold data from the plugin's inputs. The reason this is useful is because alerts typically come from one experiment, but calculations often involve the data of more than one. So an alert will trigger re-calculation based on the updated information from the alerting experiment, which will of course still depend on data previously sent by the other experiments.

The actions

There are four normal actions, and a fifth one for handling custom (or undefined) actions:

  • alert: this is for handling an individual experiment sending new information.
  • revoke: this is for handling an individual experiment revoking its previous information.
  • reset: this is for revoking all experiments' data. This is used when running MC trials, with a reset clearing the data associated with a single pseudo-experiment.
  • report: this prompts accumulating plugins to produce a summary or plot. Like reset, this is to be used in MC trials.
  • other: this handles actions which aren't any of the above four. By default, it prints an error message.

To implement an action, override the method in your plugin. Each method takes the data payload as an argument:

def alert(self, data):
  logging.info("{0}: alert called".format(self.name))

The return value of the action method determines what happens next. If you return False, the action is "consumed" by the plugin, i.e., it doesn't notify any downstream nodes. If you don't return anything, an error message is printed and the action is consumed.

If you return True, downstream plugins are notified using the same action verb. The history field if the payload is also updated by appending the name of your plugin. A (shallow) copy of the payload is also saved in self.last_data.

You can also return a modified payload, in which case downstream plugins are notified using that payload. If the payload omits the action field, the current action verb will be inserted. The history will also be updated by appending the name of your plugin to whatever history is already recorded in the payload. This is useful if your plugin combines data from multiple inputs, as you should save the history as a tuple of input histories. As in the case for True, a shallow copy of the payload is saved in self.last_data.

History

If your plugin only observes one other node, or the calculation it performs only depends on the input it's given, you won't need to do anything special to keep the history of your calculation in order. Just return True or False as above, or, if you've modified the payload, leave the history field alone. The Node infrastructure will add the name of the current node to the end of the history before notifying downstream observers.

If your plugin combines the results from different inputs, then you should combine the histories so someone looking at the history later can at least have a stab at reconstructing what happened. First, you'll probably want to record the payloads as they come in:

  self.map[self.last_source] = data.copy()
  self.map[self.last_source]['history'] = data['history'].copy()
  self.map[self.last_source]['valid'] = True

These will make copies of the payload and the history (if you're wondering why you need to copy both, see the "Digression on payloads" below). I also keep a flag which indicates whether the payload is valid, in case an experiment decides to revoke its data later on.

Let's say you've done the calculation and modified the payload. Then you update the history as follows:

  hlist = []
  for k in self.map:
    if self.map[k]['valid']:
      hlist.append(self.map[k]['history'])
  data['history'].combine(hlist)
  return data

This replaces the history (which initially just contains the history of the current alert) with a history which combines the histories of all the contributing alerts. Node will then append the name of the current node to the end before notifying downstream observers.

Registering your plugin

You're done with the code. Now add your class to plugins/__init__.py so it can be found by others.

Digression on payloads

This is a note on how payloads are copied. The main messages:

  • The payload takes the form of a python dictionary.
  • Go ahead and add, delete, and replace fields in the dictionary to your heart's content.
  • Don't modify the contents of an underlying list, dictionary, set, or byte array.

Now for the nerdy details, if you're interested (skip if it's not the case yet).

The action methods described here are called with their own, individual shallow copy of the payload. You can then modify the contents in the payload and use it to notify downstream plugins, without affecting the payloads of plugins which aren't downstream. The problem is that if the shallow copy isn't made, modifications you make may then show up in the payloads of plugins which aren't downstream, such as those observing the same upstream plugin but which happen to be executed later; the order of execution of these "sibling" plugins isn't guaranteed.

The other benefit of the shallow copy is that only references to constituent objects are copied, so if one of your dictionary elements contains a long list, the shallow copy only copies the reference to the list rather than the whole list itself. This saves memory and time. However, this also means that any mutable elements can be modified underneath the payload, and these changes would be reflected in the payloads of subsequent (but not necessarily downstream) plugins. For reference, the mutable types in python are list, dict, set, and byte array. (This problem doesn't exist for immutable types like basic types, strings, and tuples.) On the other hand, there is little need to change these payload elements, so it's probably best to just leave them alone. Instead, do your calculations and just add to the payload.

Be careful to check the validity of the data in the payload before you use it.

Unit tests

With unit tests, you can construct contrived tests of your plugin by itself. This is better done in isolation, rather than by seeing it malfunction among a lot of other calculations.

Go to the tests directory and create a file named something like test_myplugin.py. Import the following:

import unittests
from snewpdag.dag.app import configure, inject

Of course, you may also want to import numpy and healpy as well.

Then define a test case, in which you then write the individual tests. The usual pattern is to set up the plugin as a one-node DAG, construct an input data dictionary, and inject it into the DAG. Then, extract the last data with which the plugin notified its children (of which there aren't any, but the last data is stored anyway), and make the usual test assertions against it.

class TestMyPlugin(unittest.TestCase):
  spec = [                              # specification for an array of nodes
           {                            # each node is specified with a dictionary
             'name': 'Node1',           # a name by which the node is referenced
             'class': 'MyPlugin',       # the class
             'kwargs': { 'myarg': 42 }  # initialization arguments (see MyPlugin.__init__)
           }
         ]
  nodes = configure(spec)               # construct the DAG

  data = [                              # an array of alerts
           {                            # each alert is specified with a dictionary
             'name': 'Node1',           # the node to be notified with this alert
             'action': 'alert',
           }
         ]
  inject(nodes, data)                   # send data through the DAG
  tdata = nodes['Node1'].last_data      # results of last calculation
  self.assertEqual(tdata['action'], 'alert')

Note that in the data dictionary, you don't have to give a history field, since there isn't any history at the point of injection. The notify method constructs a history field if there wasn't one there before. However, if your plugin needs to identify its sources, you'll need to pretend it comes from some input node:

data = [ { 'name': 'Node1', 'action': 'alert', 'history': ('Input1',) },
         { 'name': 'Node1', 'action': 'alert', 'history': ('Input2',) } ]
inject(nodes, data)
tdata = nodes['Node1'].last_data
self.assertEqual(tdata['history'], ( ('Input1',), ('Input2',), 'Node1' ) )

Note that the histories are tuples, which is why the extraneous-looking commas are necessary.

When you've written your test cases, you can use the command python -m unittest snewpdag.tests.test_myplugin from the root of your snewpdag directory. When you want it to become a regular part of the unittests, add it to the Makefile until the test target.

DAG tests

You can also set up a full graph. Examples can be found in the snewpdag/data subdirectory, such as test-liq-config.py.

If you find python-style dictionaries too fiddly to edit (as probably most will), you can also use a csv file which you can edit using a spreadsheet program. See test-csv.csv in the data subdirectory.

Further instructions will be written...

Sharing

When your plugin is ready, push your branch to the SNEWS repository and make a pull request.

(Deprecated material)

This section describes features which were useful in the past, but have been superseded by (hopefully) easier ways to do things. The information is kept here for reference.

The update() method (deprecated)

Note: this section describes the general update method, which is more finicky and requires the developer to handle more details. It's easier and preferred to use the individual action methods, described above.

Instead of using the action methods, the developer can instead override the update method. The signature is update(self, data), where data is the payload with which to update the calculation.

The payload dictionary will have the following fields:

field type description
action string alert action, either alert or revoke
history tuple list of plugin names. The last will be the source of the data.
other other fields to be transmitted
def update(self, data):
  action = data['action']       # the alert action, which is either 'alert' or 'revoke'
  source = data['history'][-1]  # the alert source
  if action == 'alert':
    # get set up for new data

    # update the data for this source and flag it as valid
    self.map[source] = data
    self.map[source]['valid'] = True

  elif action == `revoke':
    # get set up to revoke data (data from other experiments remains valid)

    # flag the data as invalid
    if source in self.map:
      self.map[source]['valid'] = False
    else:
      logging.error('[{}] revoke received for unknown source {}'.format(self.name, source))
      return

  else:
    logging.error('[{}] unknown action {}'.format(self.name, action)

At this point, start constructing the result of the calculation, which will be passed to the next plugin. Here I've called it ndata. Then check to see if there's any valid data with which to compute, in which case the action for the next plugin will be alert. If all the data for this plugin has been revoked, then one can pass revoke to the next plugin.

Remember to check the data from each source is valid before you use it.

To notify the downstream plugins, you need an action_verb (usually alert or revoke), a history, and ndata. Since most plugins will combine the results of previous plugins, the history will take the form of a nested tuple. (Don't add the name of your plugin; that will be done when you call the notify method to transmit your results to downstream plugins.)

hlist = []
for k in self.map:
  v = self.map[k]
  if v['valid']:
    hlist.append(v['history'])

# notify
self.notify(action_verb, tuple(hist), ndata)

(You can also copy the history from the input data into ndata, and then pass in None in place of the history tuple. notify() will then append the current plugin name to the history in ndata. But this is prone to error, so this feature will probably disappear soon.)

You're done with the code. Now add your class to plugins/__init__.py so it can be found by others.