diff --git a/tangogateway/gateway.py b/tangogateway/gateway.py index f551313..9c75741 100644 --- a/tangogateway/gateway.py +++ b/tangogateway/gateway.py @@ -349,18 +349,27 @@ def check_zmq(raw_body, bind_address, loop): for endpoint in endpoints: host, port = giop.decode_zmq_endpoint(endpoint) # Start port forwarding - _, _, server_port = yield from get_forwarding( + _, zmq_bind_address, server_port = yield from get_forwarding( host, port, HandlerType.ZMQ, bind_address, loop=loop) # Make new endpoints - new_endpoint = giop.encode_zmq_endpoint(bind_address, server_port) + new_endpoint = giop.encode_zmq_endpoint(zmq_bind_address, server_port) new_endpoints.append(new_endpoint) + # Exctract event sources + # For tango >= 9.3.0 (ZMQ Topics are now returned by the server) + (tango_names, _) = giop.find_tango_names(raw_body) + for tango_name in tango_names: + host, port, name = giop.decode_tango_name(tango_name) + # Make new names + new_tango_name = giop.encode_tango_name( + bind_address, loop.server_port, name) + new_endpoints.append(new_tango_name) # Repack body return giop.repack_zmq_endpoints(raw_body, new_endpoints, start) # Run server -def run_gateway_server(bind_address, server_port, tango_host, debug=False): +def run_gateway_server(bind_address, server_port, tango_host, debug=True): """Run a Tango gateway server.""" # Configure logger if debug: diff --git a/tangogateway/giop.py b/tangogateway/giop.py index 03ca71f..264f86e 100644 --- a/tangogateway/giop.py +++ b/tangogateway/giop.py @@ -18,6 +18,7 @@ HEXA_DIGIT_SET = set(b'0123456789abcdef') ZMQ_STRUCT = 'I{:d}sI{:d}s' ZMQ_TOKEN = b'tcp://' +TANGO_TOKEN = b'tango://' STRING_TERM = b'\x00' DEVVARSTRINGARRAY_TOKEN = b'DevVarStringArray\x00' CSD_OFFSET = 48 @@ -258,9 +259,45 @@ def encode_zmq_endpoint(host, port): return ZMQ_TOKEN + encoded + STRING_TERM -def repack_zmq_endpoints(body, zmqs, start): - form = 'I' + 'I{:d}s' * len(zmqs) - pattern = form.format(*map(len, zmqs)) - values = [x for zmq in zmqs for x in (len(zmq), zmq)] - string = struct.pack(pattern, len(zmqs), *values) - return body[:start] + string +def find_tango_names(body): + strings = [] + pattern = 'I' + index = body.find(TANGO_TOKEN, 4) - 8 + sub_body = body[index:] + for i in range(2): + pattern += 'I' + size = struct.unpack_from(pattern, sub_body)[-1] + pattern += '{:d}s'.format(size) + string = struct.unpack_from(pattern, sub_body)[-1] + strings.append(string) + return strings, index + + +def decode_tango_name(encoded): + host, rest = encoded[:-1].lstrip(TANGO_TOKEN).decode().split(':') + port, devname = rest.split('/', 1) + port = int(port) + return host, port, devname + + +def encode_tango_name(host, port, devname): + encoded = '{}:{}/{}'.format(host, str(port), devname).encode() + return TANGO_TOKEN + encoded + STRING_TERM + + +def repack_zmq_endpoints(body, endpoints, start): + n_eps = len(endpoints) + lens = [] + encoded_lens = [] + for endpoint in endpoints: + eplen = len(endpoint) + if endpoint != endpoints[-1]: + while len(endpoint) % 4 > 0: + endpoint = endpoint + STRING_TERM + lens.append(eplen) + encoded_lens.append(len(endpoint)) + form = 'I' + 'I{:d}s' * n_eps + pattern = form.format(*encoded_lens) + values = [x for y in zip(lens, endpoints) for x in y] + string_zmq = struct.pack(pattern, n_eps, *values) + return body[:start] + string_zmq