Skip to content
This repository has been archived by the owner on Sep 27, 2021. It is now read-only.

sendmsg and recvmsg #43

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,25 @@
int fd;
short events;
short revents;
...;
};
struct nn_cmsghdr {
size_t cmsg_len;
int cmsg_level;
int cmsg_type;
...;
};
struct nn_iovec {
void * iov_base;
size_t iov_len;
...;
};
struct nn_msghdr {
struct nn_iovec *msg_iov;
int msg_iovlen;
void * msg_control;
size_t msg_controllen;
...;
};
'''

Expand Down Expand Up @@ -50,6 +69,13 @@ def functions(hfiles):
lines.append(ln)
cont = ln.strip()[-1]

lines.extend([
'struct nn_cmsghdr *NN_CMSG_FIRSTHDR(struct nn_msghdr *hdr);',
'struct nn_cmsghdr *NN_CMSG_NXTHDR(struct nn_msghdr * hdr, struct nn_cmsghdr *cmsg);',
'unsigned char * NN_CMSG_DATA(struct nn_cmsghdr * cmsg);',
'size_t NN_CMSG_SPACE(size_t len);',
'size_t NN_CMSG_LEN(size_t len);'
])
return ''.join(ln[10:] if ln.startswith('NN_') else ln for ln in lines)

def symbols(ffi, host_library):
Expand Down
2 changes: 1 addition & 1 deletion nnpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os

from .errors import NNError
from .socket import Socket
from .socket import Socket, MessageControl

class PollSet(object):

Expand Down
68 changes: 68 additions & 0 deletions nnpy/socket.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from . import errors, ffi, nanomsg
import sys
import collections

NN_MSG = int(ffi.cast("size_t", -1))

ustr = str if sys.version_info[0] > 2 else unicode

MessageControl = collections.namedtuple('MessageControl', ['level', 'type', 'data'])

class Socket(object):
"""
Nanomsg scalability protocols (SP) socket.
Expand Down Expand Up @@ -86,6 +89,71 @@ def recv(self, flags=0):
s = ffi.buffer(buf[0], rc)[:]
nanomsg.nn_freemsg(buf[0])
return s

def sendmsg(self, data, control, flags=0):
# Some data types can use a zero-copy buffer creation strategy when
# paired with new versions of CFFI. Namely, CFFI 1.8 supports `bytes`
# types with `from_buffer`, which is about 18% faster. We try the fast
# way first and degrade as needed for the platform.
hdr = ffi.new('struct nn_msghdr *')

def gen(control_):
chdr = ffi.new('struct nn_cmsghdr *')
for level, tp, data in control_:
chdr.cmsg_level = level
chdr.cmsg_type = tp
chdr.cmsg_len = nanomsg.NN_CMSG_SPACE(len(data))
payload = ffi.buffer(chdr)[:] + data
padding = b'\0' * (chdr.cmsg_len - len(payload))
yield payload + padding

control = b''.join(gen(control))

try:
control = ffi.from_buffer(control)
data = ffi.from_buffer(data)
except TypeError:
control = ffi.new('char[%i]' % len(control), control)
data = data.encode() if isinstance(data, ustr) else data
data = ffi,new('char[%i]' % len(data), data)
iov = ffi.new('struct nn_iovec *')
iov.iov_base = data
iov.iov_len = len(data)
hdr.msg_iov = iov
hdr.msg_iovlen = 1
hdr.msg_control = control
hdr.msg_controllen = len(control)

rc = nanomsg.nn_sendmsg(self.sock, hdr, flags)
return errors.convert(rc, rc)

def recvmsg(self, flags=0):
hdr = ffi.new('struct nn_msghdr *')
iov = ffi.new('struct nn_iovec *')
buf = ffi.new('char**')
control = ffi.new('char **')
iov.iov_base = buf
iov.iov_len = NN_MSG
hdr.msg_iov = iov
hdr.msg_iovlen = 1
hdr.msg_control = control
hdr.msg_controllen = NN_MSG
rc = nanomsg.nn_recvmsg(self.sock, hdr, flags)
errors.convert(rc)

def gen(hdr_):
chdr = nanomsg.NN_CMSG_FIRSTHDR(hdr_)
while chdr:
yield MessageControl(
chdr.cmsg_level, chdr.cmsg_type,
ffi.buffer(nanomsg.NN_CMSG_DATA(chdr), chdr.cmsg_len - ffi.sizeof(chdr[0]))[:])
chdr = nanomsg.NN_CMSG_NXTHDR(hdr_, chdr)

s = ffi.buffer(buf[0], rc)[:]
c = list(gen(hdr))
nanomsg.nn_freemsg(buf[0])
nanomsg.nn_freemsg(control[0])
return s, c

def get_statistic(self, statistic):
rc = nanomsg.nn_get_statistic(self.sock, statistic)
Expand Down