mflow facilitates the handling of ZMQ data streams. It provides basic accounting and statistics on messages received/send as well as an easy way to handle different types of messages within a stream.
Right now mflow comes with following message type support:
- array-1.0
- bsr_m-1.0
- dheader-1.0 (Dectris Eiger)
- dimage-1.0 (Dectris Eiger)
- dseries_end-1.0 (Dectris Eiger)
The mflow package is available on https://pypi.python.org and can be installed via pip
pip install mflow
The mflow package is available on anaconda.org and can be installed as follows:
conda install -c https://conda.anaconda.org/paulscherrerinstitute mflow
Connect/Create stream:
stream = mflow.connect(address, conn_type=mflow.CONNECT, mode=mflow.PULL, receive_timeout=None, queue_size=100)
Receive a message:
message = stream.receive(self, handler=None)
The returned message
object contains the current receiving statistics in message.statistics
and the actual
message data in message.data
.
If there should be no dynamic resolution of the message handler an explicit handler can be specified to handle the incoming message.
Disconnecting stream:
stream.disconnect()
Sending message (ensure that you specified the correct mode!):
stream.send('message content', send_more=True)
Register multiple custom (htype) handlers:
def receive_function(receiver):
header = receiver.next(as_json=True)
return_value = {}
data = []
# Receiving data
while receiver.has_more():
raw_data = receiver.next()
if raw_data:
data.append(raw_data)
else:
data.append(None)
return_value['header'] = header
return_value['data'] = data
return return_value
my_handlers = dict()
my_handlers['my_htype-1.0'] = receive_function
# ... register more handlers ...
# set handlers
stream.handlers = my_handlers
Note: Handlers need to be registered before calling receive()
.
Example:
import mflow
stream = mflow.connect('tcp://sf-lc:9999')
# Receive "loop"
message = stream.receive()
print(message.statistics.messages_received)
stream.disconnect()
Manually register more handlers that are not provided by this package (after creating the stream)
stream.handlers['id'] = myhandler
mflow provides a simple class to merge two ore more streams. The default implementation merges the messages round robin, i.e. you will receive message 1 from stream 1 then message 1 from stream 2, then message 2 from stream 1 ...
import mflow
stream_one = mflow.connect('tcp://source1:7777')
stream_two = mflow.connect('tcp://source2:7779')
import mflow.tools
stream = mflow.tools.Merge(stream_one, stream_two)
message = stream.receive()
stream.disconnect()
The Anaconda mflow package comes with several command line tools useful for testing streaming.
Show statistics for incoming streams. Useful for measure the maximum throughput for a given stream on a link.
usage: m_stats [-h] source
Stream statistic utility
positional arguments:
source Source address - format "tcp://<address>:<port>"
optional arguments:
-h, --help show this help message and exit
Generate a random stream. This is useful, together with m_stats
to measure possible throughput.
usage: m_generate [-h] [-a ADDRESS] [-s SIZE]
Stream generation utility
optional arguments:
-h, --help show this help message and exit
-a ADDRESS, --address ADDRESS
Address - format "tcp://<address>:<port>"
-s SIZE, --size SIZE Size of data to send (MB)"
Dump an incoming stream to disk or screen. While dumping into files, m_dump
saves all sub-messages into individual files.
The option -s
can be used if you are only interested in the first n submessages (e.g. header)
usage: m_dump [-h] [-s SKIP] source [folder]
Stream dump utility
positional arguments:
source Source address - format "tcp://<address>:<port>"
folder Destination folder
optional arguments:
-h, --help show this help message and exit
-s SKIP, --skip SKIP Skip sub-messages starting from this number (including
number)
Replay a recorded (via m_dump) stream.
usage: m_replay [-h] [-a ADDRESS] folder
Stream replay utility
positional arguments:
folder Destination folder
optional arguments:
-h, --help show this help message and exit
-a ADDRESS, --address ADDRESS
Address - format "tcp://<address>:<port>" (default:
"tcp://*:9999")
Split an incoming stream into multiple streams. Currently only the PUSH/PULL scheme is supported.
usage: m_split [-h] [-c CONFIG] [source] [streams [streams ...]]
Stream dump utility
positional arguments:
source Source address - format "tcp://<address>:<port>"
streams Streams to generate - "tcp://<address>:<port>"
optional arguments:
-h, --help show this help message and exit
-c CONFIG, --config CONFIG
Configuration file
The -c / --config option accepts a configuration file as follows:
{
"source": {
"address": "tcp://localhost:7777",
"mode": "PULL",
"queue_size": 10
},
"streams": [
{
"address": "tcp://*:8888",
"mode": "PUSH"
}
]
}
If an address is specified in the format of 'tcp://*:' the splitter will do a bind on that address and opens the specified port. If there is a hostname given, the splitter tries to connect to the address. Supported modes are PULL/SUB for the source and PUSH/PUB for outgoing streams.
The default value for mode (if omitted) is PULL for the source and PUSH for output streams. The default queue size (if omitted) is 100 for both source and output streams.
Output streams can be reduced by applying a modulo. This can be done by specifying the modulo attribute as follows:
{
"source": {
"address": "tcp://localhost:7777",
"mode": "PULL",
"queue_size": 10
},
"streams": [
{
"address": "tcp://*:8888",
"mode": "PUSH",
"modulo": 1000
}
]
}
Such a configuration will result in that only every 1000 message is send out to the output stream.
# Development
## PyPi
Upload package to pypi.python.org
```bash
python setup.py sdist upload
To build the anaconda package do:
conda build conda_recipe
Afterwards the package can be uploaded to anaconda.org via
anaconda upload <path_to.tar.bz2_file>