From 437e40a19986d7c3f945708abd68c2d6e6dae9d9 Mon Sep 17 00:00:00 2001 From: ingvord Date: Tue, 18 Feb 2020 17:43:34 +0100 Subject: [PATCH 01/12] Progress #35 --- .../org/tango/server/admin/AdminDevice.java | 26 ++++---- .../org/tango/server/events/EventManager.java | 58 ++-------------- .../network/NetworkInterfacesExtractor.java | 66 +++++++++++++++++++ .../server/transport/TransportManager.java | 41 ++++++++++++ .../tango/server/transport/TransportMeta.java | 29 ++++++++ 5 files changed, 155 insertions(+), 65 deletions(-) create mode 100644 server/src/main/java/org/tango/server/network/NetworkInterfacesExtractor.java create mode 100644 server/src/main/java/org/tango/server/transport/TransportManager.java create mode 100644 server/src/main/java/org/tango/server/transport/TransportMeta.java diff --git a/server/src/main/java/org/tango/server/admin/AdminDevice.java b/server/src/main/java/org/tango/server/admin/AdminDevice.java index c2becc38f..2a99a2eb5 100644 --- a/server/src/main/java/org/tango/server/admin/AdminDevice.java +++ b/server/src/main/java/org/tango/server/admin/AdminDevice.java @@ -40,14 +40,7 @@ import org.tango.server.ExceptionMessages; import org.tango.server.PolledObjectType; import org.tango.server.ServerManager; -import org.tango.server.annotation.Attribute; -import org.tango.server.annotation.Command; -import org.tango.server.annotation.Device; -import org.tango.server.annotation.DeviceProperty; -import org.tango.server.annotation.Init; -import org.tango.server.annotation.StateMachine; -import org.tango.server.annotation.Status; -import org.tango.server.annotation.TransactionType; +import org.tango.server.annotation.*; import org.tango.server.attribute.AttributeImpl; import org.tango.server.attribute.ForwardedAttribute; import org.tango.server.build.DeviceClassBuilder; @@ -62,15 +55,11 @@ import org.tango.server.properties.ClassPropertyImpl; import org.tango.server.properties.DevicePropertyImpl; import org.tango.server.servant.DeviceImpl; +import org.tango.server.transport.TransportManager; import org.tango.utils.DevFailedUtils; import org.tango.utils.TangoUtil; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Set; +import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -817,6 +806,15 @@ private DevVarLongStringArray subscribeEvent(final EventType eventType, final St return result; } + private final TransportManager transportManager = new TransportManager(); + + @Command(name = "UpgradeTransport") + public DevVarLongStringArray upgradeTransport() { + return transportManager + .upgradeTransport() + .toDevVarLongStringArray(); + } + /** * @param argin * @throws DevFailed diff --git a/server/src/main/java/org/tango/server/events/EventManager.java b/server/src/main/java/org/tango/server/events/EventManager.java index 5a1789df8..35c1e42f6 100644 --- a/server/src/main/java/org/tango/server/events/EventManager.java +++ b/server/src/main/java/org/tango/server/events/EventManager.java @@ -24,9 +24,6 @@ */ package org.tango.server.events; -import com.google.common.collect.Collections2; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import fr.esrf.Tango.*; @@ -40,6 +37,7 @@ import org.tango.server.attribute.AttributeImpl; import org.tango.server.attribute.ForwardedAttribute; import org.tango.server.idl.TangoIDLUtil; +import org.tango.server.network.NetworkInterfacesExtractor; import org.tango.server.pipe.PipeImpl; import org.tango.server.pipe.PipeValue; import org.tango.server.servant.DeviceImpl; @@ -47,17 +45,13 @@ import org.zeromq.ZContext; import org.zeromq.ZMQ; -import java.net.InterfaceAddress; -import java.net.NetworkInterface; -import java.net.SocketException; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static org.tango.orb.ORBManager.OAI_ADDR; /** * Set of ZMQ low level utilities @@ -100,7 +94,8 @@ public final class EventManager { }; private EventManager() { - List ipAddresses = getIp4Addresses(); + NetworkInterfacesExtractor networkInterfacesExtractor = new NetworkInterfacesExtractor(); + List ipAddresses = networkInterfacesExtractor.getIp4Addresses(); bindEndpoints(createSocket(), ipAddresses, heartbeatEndpoints, SocketType.HEARTBEAT); bindEndpoints(createEventSocket(), ipAddresses, eventEndpoints, SocketType.EVENTS); @@ -201,46 +196,7 @@ private ZMQ.Socket createEventSocket() { return socket; } - private List getIp4Addresses() { - if (OAI_ADDR != null && !OAI_ADDR.isEmpty()) { - return Lists.newArrayList(OAI_ADDR); - } else { - Iterable networkInterfaces = null; - try { - networkInterfaces = Collections.list(NetworkInterface.getNetworkInterfaces()); - } catch (SocketException e) { - logger.error("Failed to get NICs due to " + e.getMessage(), e); - return Collections.emptyList(); - } - java.util.function.Predicate filter = networkInterface -> { - try { - return !networkInterface.isLoopback() && !networkInterface.isVirtual() && networkInterface.isUp(); - } catch (SocketException e) { - logger.warn("Ignoring NetworkInterface({}) due to an exception: {}", networkInterface.getName(), e); - return false; - } - }; - - Function interfaceAddressToString = interfaceAddress -> interfaceAddress.getAddress().getHostAddress(); - - Iterable filteredNICs = Iterables.filter(networkInterfaces, filter::test); - - List result = Lists.newArrayList(); - //TODO #17 - for (NetworkInterface nic : filteredNICs) { - result.addAll( - Collections2.filter( - nic.getInterfaceAddresses() - .stream() - .map(interfaceAddressToString::apply) - .collect(Collectors.toList()), - s -> s.split("\\.").length == 4) - ); - } - return result; - } - } /** diff --git a/server/src/main/java/org/tango/server/network/NetworkInterfacesExtractor.java b/server/src/main/java/org/tango/server/network/NetworkInterfacesExtractor.java new file mode 100644 index 000000000..45a9f9baf --- /dev/null +++ b/server/src/main/java/org/tango/server/network/NetworkInterfacesExtractor.java @@ -0,0 +1,66 @@ +package org.tango.server.network; + +import com.google.common.collect.Collections2; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InterfaceAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.tango.orb.ORBManager.OAI_ADDR; + +/** + * @author Igor Khokhriakov + * @since 18.02.2020 + */ +public class NetworkInterfacesExtractor { + private final Logger logger = LoggerFactory.getLogger(NetworkInterfacesExtractor.class); + + public List getIp4Addresses() { + if (OAI_ADDR != null && !OAI_ADDR.isEmpty()) { + return Lists.newArrayList(OAI_ADDR); + } else { + Iterable networkInterfaces = null; + try { + networkInterfaces = Collections.list(NetworkInterface.getNetworkInterfaces()); + } catch (SocketException e) { + logger.error("Failed to get NICs due to " + e.getMessage(), e); + return Collections.emptyList(); + } + + java.util.function.Predicate filter = networkInterface -> { + try { + return !networkInterface.isLoopback() && !networkInterface.isVirtual() && networkInterface.isUp(); + } catch (SocketException e) { + logger.warn("Ignoring NetworkInterface({}) due to an exception: {}", networkInterface.getName(), e); + return false; + } + }; + + Function interfaceAddressToString = interfaceAddress -> interfaceAddress.getAddress().getHostAddress(); + + Iterable filteredNICs = Iterables.filter(networkInterfaces, filter::test); + + List result = Lists.newArrayList(); + //TODO #17 + for (NetworkInterface nic : filteredNICs) { + result.addAll( + Collections2.filter( + nic.getInterfaceAddresses() + .stream() + .map(interfaceAddressToString::apply) + .collect(Collectors.toList()), + s -> s.split("\\.").length == 4) + ); + } + return result; + } + } +} diff --git a/server/src/main/java/org/tango/server/transport/TransportManager.java b/server/src/main/java/org/tango/server/transport/TransportManager.java new file mode 100644 index 000000000..6343a947b --- /dev/null +++ b/server/src/main/java/org/tango/server/transport/TransportManager.java @@ -0,0 +1,41 @@ +package org.tango.server.transport; + +import org.tango.server.network.NetworkInterfacesExtractor; +import org.zeromq.ZContext; +import org.zeromq.ZMQ; + +import java.util.List; + +/** + * This class maintains ZMQ REQ/REP required data + * + * @author Igor Khokhriakov + * @since 18.02.2020 + */ +public class TransportManager { + private final ZContext context = new ZContext(); + + + public TransportMeta upgradeTransport() { + ZMQ.Socket socket = createZMQSocket(); + int port = socket.bindToRandomPort("tcp://*"); + + List connectionPoints = new NetworkInterfacesExtractor().getIp4Addresses(); + + TransportMeta result = new TransportMeta(); + + connectionPoints.stream() + .map(s -> s + ":" + port) + .forEach(result::addConnectionPoint); + + return result; + } + + + public ZMQ.Socket createZMQSocket() { + final ZMQ.Socket socket = context.createSocket(ZMQ.REP); + socket.setLinger(0); + socket.setReconnectIVL(-1); + return socket; + } +} diff --git a/server/src/main/java/org/tango/server/transport/TransportMeta.java b/server/src/main/java/org/tango/server/transport/TransportMeta.java new file mode 100644 index 000000000..789ae3431 --- /dev/null +++ b/server/src/main/java/org/tango/server/transport/TransportMeta.java @@ -0,0 +1,29 @@ +package org.tango.server.transport; + +import com.google.common.collect.Lists; +import fr.esrf.Tango.DevVarLongStringArray; + +import java.util.List; + +/** + * @author Igor Khokhriakov + * @since 18.02.2020 + */ +public class TransportMeta { + private final List connectionPoints = Lists.newArrayList(); + + public TransportMeta() { + } + + + public void addConnectionPoint(String connectionPoint) { + connectionPoints.add(connectionPoint); + } + + public DevVarLongStringArray toDevVarLongStringArray() { + DevVarLongStringArray result = new DevVarLongStringArray(new int[0], + connectionPoints.toArray(new String[0])); + + return result; + } +} From e683e3ede6065ff95f4bca155d1af5ff4af2cd0d Mon Sep 17 00:00:00 2001 From: ingvord Date: Wed, 19 Feb 2020 13:16:39 +0100 Subject: [PATCH 02/12] Progress #35 --- .../fr/esrf/Tango/factory/ITangoFactory.java | 2 + .../fr/esrf/Tango/factory/TangoFactory.java | 5 ++ .../java/fr/esrf/TangoApi/DeviceProxy.java | 52 +++++++++++++++++++ .../network/EndpointAvailabilityChecker.java | 41 +++++++++++++++ .../java/org/tango/transport/Transport.java | 18 +++++++ .../org/tango/transport/TransportMeta.java | 43 +++++++++++++++ .../TangoApi/events/ZmqEventConsumer.java | 28 ++-------- .../factory/DefaultTangoFactoryImpl.java | 5 ++ .../org/tango/transport/ZmqTransport.java | 46 ++++++++++++++++ .../server/transport/TransportManager.java | 1 + .../tango/server/transport/TransportMeta.java | 29 ----------- 11 files changed, 218 insertions(+), 52 deletions(-) create mode 100644 common/src/main/java/org/tango/network/EndpointAvailabilityChecker.java create mode 100644 common/src/main/java/org/tango/transport/Transport.java create mode 100644 common/src/main/java/org/tango/transport/TransportMeta.java create mode 100644 dao/src/main/java/org/tango/transport/ZmqTransport.java delete mode 100644 server/src/main/java/org/tango/server/transport/TransportMeta.java diff --git a/common/src/main/java/fr/esrf/Tango/factory/ITangoFactory.java b/common/src/main/java/fr/esrf/Tango/factory/ITangoFactory.java index c0d5641e9..410fbd6a5 100644 --- a/common/src/main/java/fr/esrf/Tango/factory/ITangoFactory.java +++ b/common/src/main/java/fr/esrf/Tango/factory/ITangoFactory.java @@ -45,6 +45,7 @@ package fr.esrf.Tango.factory; import fr.esrf.TangoApi.*; +import org.tango.transport.Transport; public interface ITangoFactory { @@ -70,4 +71,5 @@ public interface ITangoFactory { public String getFactoryName(); + Transport newTransport(); } diff --git a/common/src/main/java/fr/esrf/Tango/factory/TangoFactory.java b/common/src/main/java/fr/esrf/Tango/factory/TangoFactory.java index ff984833d..59fefa14c 100644 --- a/common/src/main/java/fr/esrf/Tango/factory/TangoFactory.java +++ b/common/src/main/java/fr/esrf/Tango/factory/TangoFactory.java @@ -48,6 +48,7 @@ package fr.esrf.Tango.factory; import fr.esrf.TangoApi.*; +import org.tango.transport.Transport; import java.io.BufferedInputStream; import java.io.InputStream; @@ -203,4 +204,8 @@ public boolean isDefaultFactory() { public void setDefaultFactory(final boolean isDefaultFactory) { this.isDefaultFactory = isDefaultFactory; } + + public Transport newTransport() { + return tangoFactory.newTransport(); + } } diff --git a/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java b/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java index 6c7036785..84552d0ef 100644 --- a/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java +++ b/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java @@ -42,7 +42,13 @@ import fr.esrf.TangoDs.Except; import fr.esrf.TangoDs.TangoConst; import org.omg.CORBA.Request; +import org.tango.network.EndpointAvailabilityChecker; +import org.tango.transport.Transport; +import org.tango.transport.TransportMeta; +import org.tango.utils.DevFailedUtils; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -80,6 +86,8 @@ public class DeviceProxy extends Connection implements ApiDefs { private IDeviceProxyDAO deviceProxyDAO = null; + private final Transport transport = TangoFactory.getSingleton().newTransport(); + static final private boolean check_idl = false; /** @@ -791,6 +799,13 @@ public void set_attribute_config(AttributeInfo[] attr) throws DevFailed { */ // ========================================================================== public DeviceAttribute read_attribute(String attname) throws DevFailed { + if (transport.isConnected()) { + try { + transport.send(String.format("read:%s", attname).getBytes(StandardCharsets.UTF_8)); + } catch (IOException e) { + throw DevFailedUtils.newDevFailed(e); + } + } return deviceProxyDAO.read_attribute(this, attname); } @@ -818,6 +833,19 @@ public DeviceAttribute[] read_attribute(String[] attnames) throws DevFailed { return deviceProxyDAO.read_attribute(this, attnames); } + //TODO extract and decorate + public void writeAttribute(String name, double value) throws DevFailed { + if (transport.isConnected()) { + try { + transport.send(String.format("write:%s;type:double;value:%f", name, value).getBytes(StandardCharsets.UTF_8)); + } catch (IOException e) { + throw DevFailedUtils.newDevFailed(e); + } + } + DeviceAttribute deviceAttribute = new DeviceAttribute(name, value); + write_attribute(deviceAttribute); + } + // ========================================================================== /** * Write the attribute value for the specified device. @@ -1825,6 +1853,26 @@ public int subscribe_event(int event, int max_size, boolean stateless) throws De return deviceProxyDAO.subscribe_event(this, event, max_size, stateless); } + public void upgradeProtocol() throws DevFailed { + DeviceData response = this.get_adm_dev().command_inout("UpgradeProtocol"); + + DevVarLongStringArray array = response.extractLongStringArray(); + + TransportMeta transportMeta = TransportMeta.fromDevVarLongStringArray(array); + + EndpointAvailabilityChecker predicate = new EndpointAvailabilityChecker(); + String endpoint = transportMeta.getEndPoints().stream() + .filter(predicate) + .findFirst() + .orElseThrow(() -> new RuntimeException("No reachable connection points were found")); + + try { + transport.connect(endpoint); + } catch (IOException e) { + throw DevFailedUtils.newDevFailed(e); + } + } + // ========================================================================== // ========================================================================== public void setEventQueue(EventQueue eq) { @@ -2020,6 +2068,10 @@ private static void checkDuplication(String[] list, String orig) throws DevFaile */ //========================================================================== protected void finalize() { + try { + transport.close(); + } catch (IOException ignored) { + } if (proxy_lock_cnt>0) { try { unlock(); diff --git a/common/src/main/java/org/tango/network/EndpointAvailabilityChecker.java b/common/src/main/java/org/tango/network/EndpointAvailabilityChecker.java new file mode 100644 index 000000000..5bd33f740 --- /dev/null +++ b/common/src/main/java/org/tango/network/EndpointAvailabilityChecker.java @@ -0,0 +1,41 @@ +package org.tango.network; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.function.Predicate; + +/** + * @author Igor Khokhriakov + * @since 19.02.2020 + */ +public class EndpointAvailabilityChecker implements Predicate { + private final Logger logger = LoggerFactory.getLogger(EndpointAvailabilityChecker.class); + + @Override + public boolean test(String endpoint) { + logger.debug("Check endpoint: {}", endpoint); + URI uri = null; + try { + uri = new URI(endpoint); + } catch (URISyntaxException e) { + logger.debug("Bad endpoint: " + endpoint, e); + return false; + } + + // Try to connect + InetSocketAddress ip = new InetSocketAddress(uri.getHost(), uri.getPort()); + try (Socket socket = new Socket()) { + socket.connect(ip, 10); + return true; + } catch (IOException e) { + logger.debug("Failed to connect to " + ip, e); + return false; + } + } +} diff --git a/common/src/main/java/org/tango/transport/Transport.java b/common/src/main/java/org/tango/transport/Transport.java new file mode 100644 index 000000000..f9667b467 --- /dev/null +++ b/common/src/main/java/org/tango/transport/Transport.java @@ -0,0 +1,18 @@ +package org.tango.transport; + +import java.io.Closeable; +import java.io.IOException; + +/** + * @author Igor Khokhriakov + * @since 19.02.2020 + */ +public interface Transport extends Closeable { + boolean isConnected(); + + void connect(String endpoint) throws IOException; + + void disconnect(String endpoint) throws IOException; + + byte[] send(byte[] data) throws IOException; +} diff --git a/common/src/main/java/org/tango/transport/TransportMeta.java b/common/src/main/java/org/tango/transport/TransportMeta.java new file mode 100644 index 000000000..2a9cba58c --- /dev/null +++ b/common/src/main/java/org/tango/transport/TransportMeta.java @@ -0,0 +1,43 @@ +package org.tango.transport; + +import com.google.common.collect.Lists; +import fr.esrf.Tango.DevVarLongStringArray; + +import java.util.Arrays; +import java.util.List; + +/** + * @author Igor Khokhriakov + * @since 18.02.2020 + */ +public class TransportMeta { + private List endPoints = Lists.newArrayList(); + + public TransportMeta() { + } + + public static TransportMeta fromDevVarLongStringArray(DevVarLongStringArray array) { + TransportMeta transportMeta = new TransportMeta(); + transportMeta.setEndPoints(Arrays.asList(array.svalue)); + return transportMeta; + } + + public List getEndPoints() { + return endPoints; + } + + public void setEndPoints(List endPoints) { + this.endPoints = endPoints; + } + + public void addConnectionPoint(String connectionPoint) { + endPoints.add(connectionPoint); + } + + public DevVarLongStringArray toDevVarLongStringArray() { + DevVarLongStringArray result = new DevVarLongStringArray(new int[0], + endPoints.toArray(new String[0])); + + return result; + } +} diff --git a/dao/src/main/java/fr/esrf/TangoApi/events/ZmqEventConsumer.java b/dao/src/main/java/fr/esrf/TangoApi/events/ZmqEventConsumer.java index d0e55d1b7..8ec59c8db 100755 --- a/dao/src/main/java/fr/esrf/TangoApi/events/ZmqEventConsumer.java +++ b/dao/src/main/java/fr/esrf/TangoApi/events/ZmqEventConsumer.java @@ -48,10 +48,10 @@ import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.tango.network.EndpointAvailabilityChecker; import org.tango.utils.DevFailedUtils; -import java.io.IOException; -import java.net.*; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -379,7 +379,7 @@ private DeviceData checkWithHostAddress(DeviceData deviceData, DeviceProxy devic logger.debug("{} ---> {}", wrongAdd, lsa.svalue[0]); deviceData = new DeviceData(); deviceData.insert(lsa); - isEndpointAvailable(lsa.svalue[0]); + new EndpointAvailabilityChecker().test(lsa.svalue[0]); } } } catch (UnknownHostException e) { @@ -402,12 +402,13 @@ private DeviceData checkZmqAddress(DeviceData deviceData, DeviceProxy deviceProx logger.trace("Inside checkZmqAddress()"); DevVarLongStringArray lsa = deviceData.extractLongStringArray(); + EndpointAvailabilityChecker predicate = new EndpointAvailabilityChecker(); Pair validEndpoints = Optional.ofNullable(Observable.zip( Observable.fromArray(lsa.svalue), Observable.fromArray(lsa.svalue).skip(1), ImmutablePair::of ) - .filter(pair -> isEndpointAvailable(pair.left)) + .filter(pair -> predicate.test(pair.left)) .blockingFirst(null)) .orElseGet(() -> { DeviceData checkedWithHostAddress = checkWithHostAddress(deviceData, deviceProxy); @@ -425,26 +426,7 @@ private DeviceData checkZmqAddress(DeviceData deviceData, DeviceProxy deviceProx return overridden; } - private boolean isEndpointAvailable(String endpoint) { - logger.debug("Check endpoint: {}", endpoint); - URI uri = null; - try { - uri = new URI(endpoint); - } catch (URISyntaxException e) { - logger.debug("Bad endpoint: " + endpoint, e); - return false; - } - // Try to connect - InetSocketAddress ip = new InetSocketAddress(uri.getHost(), uri.getPort()); - try (Socket socket = new Socket()) { - socket.connect(ip, 10); - return true; - } catch (IOException e) { - logger.debug("Failed to connect to " + ip, e); - return false; - } - } private void checkDeviceConnection(DeviceProxy deviceProxy, String attribute, DeviceData deviceData, String event_name) throws DevFailed { diff --git a/dao/src/main/java/fr/esrf/TangoApi/factory/DefaultTangoFactoryImpl.java b/dao/src/main/java/fr/esrf/TangoApi/factory/DefaultTangoFactoryImpl.java index 8fe2c029c..411e45313 100644 --- a/dao/src/main/java/fr/esrf/TangoApi/factory/DefaultTangoFactoryImpl.java +++ b/dao/src/main/java/fr/esrf/TangoApi/factory/DefaultTangoFactoryImpl.java @@ -36,6 +36,7 @@ import fr.esrf.Tango.factory.ITangoFactory; import fr.esrf.TangoApi.*; +import org.tango.transport.ZmqTransport; public class DefaultTangoFactoryImpl implements ITangoFactory { @@ -136,4 +137,8 @@ public String getFactoryName() return "TANGORB Default"; } + public ZmqTransport newTransport() { + return new ZmqTransport(); + } + } diff --git a/dao/src/main/java/org/tango/transport/ZmqTransport.java b/dao/src/main/java/org/tango/transport/ZmqTransport.java new file mode 100644 index 000000000..dfe4bf649 --- /dev/null +++ b/dao/src/main/java/org/tango/transport/ZmqTransport.java @@ -0,0 +1,46 @@ +package org.tango.transport; + +import org.zeromq.ZContext; +import org.zeromq.ZMQ; + +/** + * @author Igor Khokhriakov + * @since 19.02.2020 + */ +//@ThreadSafe +public class ZmqTransport implements Transport { + private final ZContext zcontext = new ZContext(); + private ZMQ.Socket socket; + + @Override + public synchronized boolean isConnected() { + return socket != null; + } + + @Override + public synchronized void connect(String endpoint) { + ZMQ.Socket socket = zcontext.createSocket(ZMQ.REQ); + + socket.connect(endpoint); + + this.socket = socket; + } + + @Override + public synchronized void disconnect(String endpoint) { + zcontext.getSockets() + .forEach(socket -> socket.disconnect(endpoint)); + } + + @Override + public synchronized byte[] send(byte[] data) { + socket.send(data); + + return socket.recv(); + } + + @Override + public synchronized void close() { + zcontext.close(); + } +} diff --git a/server/src/main/java/org/tango/server/transport/TransportManager.java b/server/src/main/java/org/tango/server/transport/TransportManager.java index 6343a947b..0c226f307 100644 --- a/server/src/main/java/org/tango/server/transport/TransportManager.java +++ b/server/src/main/java/org/tango/server/transport/TransportManager.java @@ -1,6 +1,7 @@ package org.tango.server.transport; import org.tango.server.network.NetworkInterfacesExtractor; +import org.tango.transport.TransportMeta; import org.zeromq.ZContext; import org.zeromq.ZMQ; diff --git a/server/src/main/java/org/tango/server/transport/TransportMeta.java b/server/src/main/java/org/tango/server/transport/TransportMeta.java deleted file mode 100644 index 789ae3431..000000000 --- a/server/src/main/java/org/tango/server/transport/TransportMeta.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.tango.server.transport; - -import com.google.common.collect.Lists; -import fr.esrf.Tango.DevVarLongStringArray; - -import java.util.List; - -/** - * @author Igor Khokhriakov - * @since 18.02.2020 - */ -public class TransportMeta { - private final List connectionPoints = Lists.newArrayList(); - - public TransportMeta() { - } - - - public void addConnectionPoint(String connectionPoint) { - connectionPoints.add(connectionPoint); - } - - public DevVarLongStringArray toDevVarLongStringArray() { - DevVarLongStringArray result = new DevVarLongStringArray(new int[0], - connectionPoints.toArray(new String[0])); - - return result; - } -} From dd454e6d112f368415bcdc304a29fb7c12c64cfd Mon Sep 17 00:00:00 2001 From: ingvord Date: Wed, 19 Feb 2020 15:31:07 +0100 Subject: [PATCH 03/12] Progress #35 --- common/pom.xml | 5 ++ .../java/fr/esrf/TangoApi/DeviceProxy.java | 16 +++- .../java/org/tango/transport/Message.java | 75 +++++++++++++++++++ .../org/tango/server/admin/AdminDevice.java | 28 ++++++- .../transport/ReadMessageProcessor.java | 38 ++++++++++ .../server/transport/TransportManager.java | 10 ++- .../transport/WriteMessageProcessor.java | 37 +++++++++ .../server/transport/ZmqMessageProcessor.java | 11 +++ .../transport/ZmqMessageProcessorImpl.java | 41 ++++++++++ .../transport/ZmqTransportListener.java | 37 +++++++++ 10 files changed, 290 insertions(+), 8 deletions(-) create mode 100644 common/src/main/java/org/tango/transport/Message.java create mode 100644 server/src/main/java/org/tango/server/transport/ReadMessageProcessor.java create mode 100644 server/src/main/java/org/tango/server/transport/WriteMessageProcessor.java create mode 100644 server/src/main/java/org/tango/server/transport/ZmqMessageProcessor.java create mode 100644 server/src/main/java/org/tango/server/transport/ZmqMessageProcessorImpl.java create mode 100644 server/src/main/java/org/tango/server/transport/ZmqTransportListener.java diff --git a/common/pom.xml b/common/pom.xml index 7ddb806ac..483dde0d7 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -97,5 +97,10 @@ org.apache.commons commons-lang3 + + com.google.code.gson + gson + 2.8.6 + diff --git a/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java b/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java index 84552d0ef..b8549b6cb 100644 --- a/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java +++ b/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java @@ -43,6 +43,7 @@ import fr.esrf.TangoDs.TangoConst; import org.omg.CORBA.Request; import org.tango.network.EndpointAvailabilityChecker; +import org.tango.transport.Message; import org.tango.transport.Transport; import org.tango.transport.TransportMeta; import org.tango.utils.DevFailedUtils; @@ -801,7 +802,14 @@ public void set_attribute_config(AttributeInfo[] attr) throws DevFailed { public DeviceAttribute read_attribute(String attname) throws DevFailed { if (transport.isConnected()) { try { - transport.send(String.format("read:%s", attname).getBytes(StandardCharsets.UTF_8)); + Message message = new Message("read", this.get_name() + "/" + attname, null, null); + + //TODO marshaller + + Message response = Message.fromString( + new String(transport.send(message.toString().getBytes(StandardCharsets.UTF_8)), StandardCharsets.UTF_8)); + System.out.println(response.value); + return new DeviceAttribute(attname, response.value); } catch (IOException e) { throw DevFailedUtils.newDevFailed(e); } @@ -837,7 +845,11 @@ public DeviceAttribute[] read_attribute(String[] attnames) throws DevFailed { public void writeAttribute(String name, double value) throws DevFailed { if (transport.isConnected()) { try { - transport.send(String.format("write:%s;type:double;value:%f", name, value).getBytes(StandardCharsets.UTF_8)); + Message message = new Message("write", this.get_name() + "/" + name, "double", String.valueOf(value)); + + //TODO marshaller + + transport.send(message.toString().getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { throw DevFailedUtils.newDevFailed(e); } diff --git a/common/src/main/java/org/tango/transport/Message.java b/common/src/main/java/org/tango/transport/Message.java new file mode 100644 index 000000000..b72fef748 --- /dev/null +++ b/common/src/main/java/org/tango/transport/Message.java @@ -0,0 +1,75 @@ +package org.tango.transport; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * @author Igor Khokhriakov + * @since 19.02.2020 + */ +public class Message { + public static final String PATTERN_STR = "(read|write|exec|response):([\\w\\W]+);(double|float|int|long|String|null):([\\w.]+)"; + public static final Pattern PATTERN = Pattern.compile(PATTERN_STR); + + public static final String TARGET_PATTERN_STR = "(\\w+)/(\\w+)/(\\w+)/(\\w+)"; + public static final Pattern TARGET_PATTERN = Pattern.compile(TARGET_PATTERN_STR); + + public String action; + public String target; + public String dataType; + public String value; + + public Message(String action, String target, String dataType, String value) { + this.action = action; + this.target = target; + this.dataType = dataType; + this.value = value; + } + + public static Message fromString(String str) { + Matcher matcher = PATTERN.matcher(str); + + if (matcher.matches()) + return new Message(matcher.group(1), matcher.group(2), matcher.group(3), matcher.group(4)); + else + throw new IllegalArgumentException("Unrecognized message: " + str); + } + + @Override + public String toString() { + return String.format( + "%s:%s;%s:%s", action, target, dataType, value + ); + } + + public static class Target { + public final String device; + public final String member; + + public Target(String device, String member) { + this.device = device; + this.member = member; + } + + public static Target fromString(String str) { + Matcher matcher = TARGET_PATTERN.matcher(str); + + if (matcher.matches()) + return new Target(String.format("%s/%s/%s", matcher.group(1), matcher.group(2), matcher.group(3)), matcher.group(4)); + else + throw new IllegalArgumentException("Unrecognized message: " + str); + } + } + + public static class Error extends Message { + public Error(String value) { + super("response", "error", "String", value); + } + } + + public static class Ok extends Message { + public Ok() { + super("response", "ok", null, null); + } + } +} diff --git a/server/src/main/java/org/tango/server/admin/AdminDevice.java b/server/src/main/java/org/tango/server/admin/AdminDevice.java index 2a99a2eb5..998833373 100644 --- a/server/src/main/java/org/tango/server/admin/AdminDevice.java +++ b/server/src/main/java/org/tango/server/admin/AdminDevice.java @@ -24,6 +24,7 @@ */ package org.tango.server.admin; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import fr.esrf.Tango.ClntIdent; import fr.esrf.Tango.DevFailed; import fr.esrf.Tango.DevVarLongStringArray; @@ -56,10 +57,14 @@ import org.tango.server.properties.DevicePropertyImpl; import org.tango.server.servant.DeviceImpl; import org.tango.server.transport.TransportManager; +import org.tango.server.transport.ZmqTransportListener; import org.tango.utils.DevFailedUtils; import org.tango.utils.TangoUtil; +import org.zeromq.ZMQ; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -430,6 +435,13 @@ public String[] getPolledDevice() { return pollDevices.toArray(new String[pollDevices.size()]); } + private final ExecutorService executorService = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("ZmqTransportListener-%d") + .build() + ); + /** * @param dvlsa Lg[0]=Upd period. Str[0]=Device name. Str[1]=Object * type(COMMAND or ATTRIBUTE). Str[2]=Object name @@ -808,11 +820,21 @@ private DevVarLongStringArray subscribeEvent(final EventType eventType, final St private final TransportManager transportManager = new TransportManager(); + public DeviceImpl getDeviceImpl(String device) { + return classList.stream() + .map(deviceClassBuilder -> deviceClassBuilder.getDeviceImpl(device)) + .findFirst() + .orElseThrow(() -> new NoSuchElementException(device)); + } + @Command(name = "UpgradeTransport") public DevVarLongStringArray upgradeTransport() { - return transportManager - .upgradeTransport() - .toDevVarLongStringArray(); + + ZMQ.Socket socket = transportManager + .upgradeTransport(); + + executorService.submit(new ZmqTransportListener(socket, this)); + return transportManager.getTransportMeta().toDevVarLongStringArray(); } /** diff --git a/server/src/main/java/org/tango/server/transport/ReadMessageProcessor.java b/server/src/main/java/org/tango/server/transport/ReadMessageProcessor.java new file mode 100644 index 000000000..300f5b133 --- /dev/null +++ b/server/src/main/java/org/tango/server/transport/ReadMessageProcessor.java @@ -0,0 +1,38 @@ +package org.tango.server.transport; + +import fr.esrf.Tango.AttrDataFormat; +import fr.esrf.Tango.DevFailed; +import org.omg.CORBA.Any; +import org.tango.server.idl.CleverAnyAttribute; +import org.tango.server.servant.DeviceImpl; +import org.tango.transport.Message; + +import java.util.Arrays; + +/** + * @author Igor Khokhriakov + * @since 19.02.2020 + */ +public class ReadMessageProcessor implements ZmqMessageProcessor { + private final DeviceImpl device; + private final String attributeName; + private final String dataType; + + public ReadMessageProcessor(DeviceImpl device, String attributeName, String dataType) { + this.device = device; + this.attributeName = attributeName; + this.dataType = dataType; + } + + @Override + public Message process() { + try { + Any any = Arrays.stream(device.read_attributes(new String[]{attributeName})).map(attributeValue -> attributeValue.value).findFirst().get(); + + Object result = CleverAnyAttribute.get(any, Integer.parseInt(dataType), AttrDataFormat.SCALAR); + return new Message("response", attributeName, dataType, String.valueOf(result)); + } catch (DevFailed devFailed) { + return new Message.Error(devFailed.errors[0].reason); + } + } +} diff --git a/server/src/main/java/org/tango/server/transport/TransportManager.java b/server/src/main/java/org/tango/server/transport/TransportManager.java index 0c226f307..18a29e042 100644 --- a/server/src/main/java/org/tango/server/transport/TransportManager.java +++ b/server/src/main/java/org/tango/server/transport/TransportManager.java @@ -16,11 +16,15 @@ public class TransportManager { private final ZContext context = new ZContext(); + private String port; - public TransportMeta upgradeTransport() { + public ZMQ.Socket upgradeTransport() { ZMQ.Socket socket = createZMQSocket(); - int port = socket.bindToRandomPort("tcp://*"); + port = String.valueOf(socket.bindToRandomPort("tcp://*")); + return socket; + } + public TransportMeta getTransportMeta() { List connectionPoints = new NetworkInterfacesExtractor().getIp4Addresses(); TransportMeta result = new TransportMeta(); @@ -32,11 +36,11 @@ public TransportMeta upgradeTransport() { return result; } - public ZMQ.Socket createZMQSocket() { final ZMQ.Socket socket = context.createSocket(ZMQ.REP); socket.setLinger(0); socket.setReconnectIVL(-1); return socket; } + } diff --git a/server/src/main/java/org/tango/server/transport/WriteMessageProcessor.java b/server/src/main/java/org/tango/server/transport/WriteMessageProcessor.java new file mode 100644 index 000000000..e6e9be94c --- /dev/null +++ b/server/src/main/java/org/tango/server/transport/WriteMessageProcessor.java @@ -0,0 +1,37 @@ +package org.tango.server.transport; + +import fr.esrf.Tango.AttributeValue; +import fr.esrf.Tango.DevFailed; +import org.tango.server.idl.TangoIDLAttributeUtil; +import org.tango.server.servant.DeviceImpl; +import org.tango.transport.Message; + +/** + * @author Igor Khokhriakov + * @since 19.02.2020 + */ +public class WriteMessageProcessor implements ZmqMessageProcessor { + private final DeviceImpl device; + private final String attributeName; + private final Object value; + + public WriteMessageProcessor(DeviceImpl device, String attributeName, Object value) { + this.device = device; + this.attributeName = attributeName; + this.value = value; + } + + @Override + public Message process() { + try { + device.write_attributes( + new AttributeValue[]{ + TangoIDLAttributeUtil.toAttributeValue( + device.getAttributeImpl(attributeName).get(), + new org.tango.server.attribute.AttributeValue(value))}); + return new Message.Ok(); + } catch (DevFailed devFailed) { + return new Message.Error(devFailed.errors[0].reason); + } + } +} diff --git a/server/src/main/java/org/tango/server/transport/ZmqMessageProcessor.java b/server/src/main/java/org/tango/server/transport/ZmqMessageProcessor.java new file mode 100644 index 000000000..11c341637 --- /dev/null +++ b/server/src/main/java/org/tango/server/transport/ZmqMessageProcessor.java @@ -0,0 +1,11 @@ +package org.tango.server.transport; + +import org.tango.transport.Message; + +/** + * @author Igor Khokhriakov + * @since 19.02.2020 + */ +public interface ZmqMessageProcessor { + Message process(); +} diff --git a/server/src/main/java/org/tango/server/transport/ZmqMessageProcessorImpl.java b/server/src/main/java/org/tango/server/transport/ZmqMessageProcessorImpl.java new file mode 100644 index 000000000..70aa35bdd --- /dev/null +++ b/server/src/main/java/org/tango/server/transport/ZmqMessageProcessorImpl.java @@ -0,0 +1,41 @@ +package org.tango.server.transport; + +import com.google.gson.Gson; +import org.tango.server.admin.AdminDevice; +import org.tango.server.servant.DeviceImpl; +import org.tango.transport.Message; + +/** + * @author Igor Khokhriakov + * @since 19.02.2020 + */ +public class ZmqMessageProcessorImpl implements ZmqMessageProcessor { + private final String msg; + private final AdminDevice admin; + + public ZmqMessageProcessorImpl(String msg, AdminDevice admin) { + this.msg = msg; + this.admin = admin; + } + + @Override + public Message process() { + Message message = Message.fromString(msg); + + Message.Target target = Message.Target.fromString(message.target); + + DeviceImpl device = admin.getDeviceImpl(target.device); + + switch (message.action) { + case "read": + return new ReadMessageProcessor(device, target.member, message.dataType).process(); + case "write": + return new WriteMessageProcessor(device, target.member, new Gson().fromJson(message.value, Object.class)).process(); + case "exec": + default: + return new Message.Error("Unsupported message action - " + message.action); + } + } + + +} diff --git a/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java b/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java new file mode 100644 index 000000000..5d5bd4492 --- /dev/null +++ b/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java @@ -0,0 +1,37 @@ +package org.tango.server.transport; + +import org.tango.server.admin.AdminDevice; +import org.zeromq.ZMQ; + +import java.nio.charset.StandardCharsets; + +/** + * @author Igor Khokhriakov + * @since 19.02.2020 + */ +public class ZmqTransportListener implements Runnable { + private final ZMQ.Socket socket; + private final AdminDevice admin; + + public ZmqTransportListener(ZMQ.Socket socket, AdminDevice admin) { + this.socket = socket; + this.admin = admin; + } + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + byte[] data = socket.recv(); + String msg = new String(data, StandardCharsets.UTF_8); + + //TODO non blocking + //TODO thread pool + socket.send( + new ZmqMessageProcessorImpl(msg, admin) + .process() + .toString() + .getBytes(StandardCharsets.UTF_8)); + + } + } +} From 74cc3c68a5d5417c50dc474adf59a9eb94b1c8ce Mon Sep 17 00:00:00 2001 From: ingvord Date: Wed, 19 Feb 2020 15:46:02 +0100 Subject: [PATCH 04/12] Progress #35 --- server/src/main/java/org/tango/server/admin/AdminDevice.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/tango/server/admin/AdminDevice.java b/server/src/main/java/org/tango/server/admin/AdminDevice.java index 998833373..b4bd00d09 100644 --- a/server/src/main/java/org/tango/server/admin/AdminDevice.java +++ b/server/src/main/java/org/tango/server/admin/AdminDevice.java @@ -827,8 +827,8 @@ public DeviceImpl getDeviceImpl(String device) { .orElseThrow(() -> new NoSuchElementException(device)); } - @Command(name = "UpgradeTransport") - public DevVarLongStringArray upgradeTransport() { + @Command(name = "UpgradeProtocol") + public DevVarLongStringArray upgradeProtocol() { ZMQ.Socket socket = transportManager .upgradeTransport(); From db59992e4d0f8300db08ae468400e6363bd2c8a3 Mon Sep 17 00:00:00 2001 From: ingvord Date: Wed, 19 Feb 2020 15:51:04 +0100 Subject: [PATCH 05/12] Progress #35 --- common/src/main/java/org/tango/transport/TransportMeta.java | 2 +- .../java/org/tango/server/transport/TransportManager.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/org/tango/transport/TransportMeta.java b/common/src/main/java/org/tango/transport/TransportMeta.java index 2a9cba58c..18a73badf 100644 --- a/common/src/main/java/org/tango/transport/TransportMeta.java +++ b/common/src/main/java/org/tango/transport/TransportMeta.java @@ -30,7 +30,7 @@ public void setEndPoints(List endPoints) { this.endPoints = endPoints; } - public void addConnectionPoint(String connectionPoint) { + public void addEndpoint(String connectionPoint) { endPoints.add(connectionPoint); } diff --git a/server/src/main/java/org/tango/server/transport/TransportManager.java b/server/src/main/java/org/tango/server/transport/TransportManager.java index 18a29e042..415dca2c9 100644 --- a/server/src/main/java/org/tango/server/transport/TransportManager.java +++ b/server/src/main/java/org/tango/server/transport/TransportManager.java @@ -30,8 +30,8 @@ public TransportMeta getTransportMeta() { TransportMeta result = new TransportMeta(); connectionPoints.stream() - .map(s -> s + ":" + port) - .forEach(result::addConnectionPoint); + .map(s -> "tcp://" + s + ":" + port) + .forEach(result::addEndpoint); return result; } From 26adec8846f9f01d1cd79ab93aff1130b36f5f38 Mon Sep 17 00:00:00 2001 From: ingvord Date: Wed, 19 Feb 2020 16:09:41 +0100 Subject: [PATCH 06/12] Progress #35 --- server/src/main/java/org/tango/server/admin/AdminDevice.java | 1 + .../org/tango/server/transport/ZmqTransportListener.java | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/server/src/main/java/org/tango/server/admin/AdminDevice.java b/server/src/main/java/org/tango/server/admin/AdminDevice.java index b4bd00d09..b9d176863 100644 --- a/server/src/main/java/org/tango/server/admin/AdminDevice.java +++ b/server/src/main/java/org/tango/server/admin/AdminDevice.java @@ -823,6 +823,7 @@ private DevVarLongStringArray subscribeEvent(final EventType eventType, final St public DeviceImpl getDeviceImpl(String device) { return classList.stream() .map(deviceClassBuilder -> deviceClassBuilder.getDeviceImpl(device)) + .filter(Objects::nonNull) .findFirst() .orElseThrow(() -> new NoSuchElementException(device)); } diff --git a/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java b/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java index 5d5bd4492..c4cb05198 100644 --- a/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java +++ b/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java @@ -1,5 +1,7 @@ package org.tango.server.transport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tango.server.admin.AdminDevice; import org.zeromq.ZMQ; @@ -10,6 +12,8 @@ * @since 19.02.2020 */ public class ZmqTransportListener implements Runnable { + private final Logger logger = LoggerFactory.getLogger(ZmqTransportListener.class); + private final ZMQ.Socket socket; private final AdminDevice admin; @@ -20,6 +24,7 @@ public ZmqTransportListener(ZMQ.Socket socket, AdminDevice admin) { @Override public void run() { + logger.debug("Starting ZmqTransportListener"); while (!Thread.currentThread().isInterrupted()) { byte[] data = socket.recv(); String msg = new String(data, StandardCharsets.UTF_8); From d6920fd387c82cd082ee3e9db5751ff746aae5fc Mon Sep 17 00:00:00 2001 From: ingvord Date: Wed, 19 Feb 2020 16:52:15 +0100 Subject: [PATCH 07/12] Progress #35 --- .../java/org/tango/server/admin/AdminDevice.java | 10 +++++++--- .../tango/server/transport/TransportManager.java | 13 ++++++++----- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/tango/server/admin/AdminDevice.java b/server/src/main/java/org/tango/server/admin/AdminDevice.java index b9d176863..d05f93325 100644 --- a/server/src/main/java/org/tango/server/admin/AdminDevice.java +++ b/server/src/main/java/org/tango/server/admin/AdminDevice.java @@ -820,6 +820,13 @@ private DevVarLongStringArray subscribeEvent(final EventType eventType, final St private final TransportManager transportManager = new TransportManager(); + { + ZMQ.Socket socket = transportManager + .bindZmqTransport(); + + executorService.execute(new ZmqTransportListener(socket, this)); + } + public DeviceImpl getDeviceImpl(String device) { return classList.stream() .map(deviceClassBuilder -> deviceClassBuilder.getDeviceImpl(device)) @@ -831,10 +838,7 @@ public DeviceImpl getDeviceImpl(String device) { @Command(name = "UpgradeProtocol") public DevVarLongStringArray upgradeProtocol() { - ZMQ.Socket socket = transportManager - .upgradeTransport(); - executorService.submit(new ZmqTransportListener(socket, this)); return transportManager.getTransportMeta().toDevVarLongStringArray(); } diff --git a/server/src/main/java/org/tango/server/transport/TransportManager.java b/server/src/main/java/org/tango/server/transport/TransportManager.java index 415dca2c9..e35291d33 100644 --- a/server/src/main/java/org/tango/server/transport/TransportManager.java +++ b/server/src/main/java/org/tango/server/transport/TransportManager.java @@ -6,6 +6,7 @@ import org.zeromq.ZMQ; import java.util.List; +import java.util.Optional; /** * This class maintains ZMQ REQ/REP required data @@ -16,11 +17,15 @@ public class TransportManager { private final ZContext context = new ZContext(); + private ZMQ.Socket socket; private String port; - public ZMQ.Socket upgradeTransport() { - ZMQ.Socket socket = createZMQSocket(); - port = String.valueOf(socket.bindToRandomPort("tcp://*")); + public ZMQ.Socket bindZmqTransport() { + this.socket = Optional.ofNullable(socket).orElseGet(() -> { + ZMQ.Socket socket = createZMQSocket(); + port = String.valueOf(socket.bindToRandomPort("tcp://*")); + return socket; + }); return socket; } @@ -38,8 +43,6 @@ public TransportMeta getTransportMeta() { public ZMQ.Socket createZMQSocket() { final ZMQ.Socket socket = context.createSocket(ZMQ.REP); - socket.setLinger(0); - socket.setReconnectIVL(-1); return socket; } From 09f15710ac333ccad4fe2390d721ac86dbbc478e Mon Sep 17 00:00:00 2001 From: ingvord Date: Thu, 20 Feb 2020 13:31:30 +0100 Subject: [PATCH 08/12] Progress #35 --- common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java | 1 - .../main/java/org/tango/transport/ZmqTransport.java | 10 +++++----- .../server/transport/ZmqMessageProcessorImpl.java | 2 -- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java b/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java index b8549b6cb..b6080b2b9 100644 --- a/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java +++ b/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java @@ -805,7 +805,6 @@ public DeviceAttribute read_attribute(String attname) throws DevFailed { Message message = new Message("read", this.get_name() + "/" + attname, null, null); //TODO marshaller - Message response = Message.fromString( new String(transport.send(message.toString().getBytes(StandardCharsets.UTF_8)), StandardCharsets.UTF_8)); System.out.println(response.value); diff --git a/dao/src/main/java/org/tango/transport/ZmqTransport.java b/dao/src/main/java/org/tango/transport/ZmqTransport.java index dfe4bf649..991643d11 100644 --- a/dao/src/main/java/org/tango/transport/ZmqTransport.java +++ b/dao/src/main/java/org/tango/transport/ZmqTransport.java @@ -13,12 +13,12 @@ public class ZmqTransport implements Transport { private ZMQ.Socket socket; @Override - public synchronized boolean isConnected() { + public boolean isConnected() { return socket != null; } @Override - public synchronized void connect(String endpoint) { + public void connect(String endpoint) { ZMQ.Socket socket = zcontext.createSocket(ZMQ.REQ); socket.connect(endpoint); @@ -27,20 +27,20 @@ public synchronized void connect(String endpoint) { } @Override - public synchronized void disconnect(String endpoint) { + public void disconnect(String endpoint) { zcontext.getSockets() .forEach(socket -> socket.disconnect(endpoint)); } @Override - public synchronized byte[] send(byte[] data) { + public byte[] send(byte[] data) { socket.send(data); return socket.recv(); } @Override - public synchronized void close() { + public void close() { zcontext.close(); } } diff --git a/server/src/main/java/org/tango/server/transport/ZmqMessageProcessorImpl.java b/server/src/main/java/org/tango/server/transport/ZmqMessageProcessorImpl.java index 70aa35bdd..0e6213261 100644 --- a/server/src/main/java/org/tango/server/transport/ZmqMessageProcessorImpl.java +++ b/server/src/main/java/org/tango/server/transport/ZmqMessageProcessorImpl.java @@ -36,6 +36,4 @@ public Message process() { return new Message.Error("Unsupported message action - " + message.action); } } - - } From 483b84b22816dec0b184e74878764c9b449f18db Mon Sep 17 00:00:00 2001 From: ingvord Date: Thu, 20 Feb 2020 15:34:27 +0100 Subject: [PATCH 09/12] Progress #35 --- .../src/main/java/fr/esrf/TangoApi/DeviceProxy.java | 12 +++--------- .../main/java/org/tango/transport/TransportMeta.java | 4 ++++ .../java/org/tango/server/admin/AdminDevice.java | 6 ++---- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java b/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java index b6080b2b9..27fc59bed 100644 --- a/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java +++ b/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java @@ -45,15 +45,11 @@ import org.tango.network.EndpointAvailabilityChecker; import org.tango.transport.Message; import org.tango.transport.Transport; -import org.tango.transport.TransportMeta; import org.tango.utils.DevFailedUtils; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Vector; +import java.util.*; /** * Class Description: This class manage device connection for Tango objects. It @@ -1867,12 +1863,10 @@ public int subscribe_event(int event, int max_size, boolean stateless) throws De public void upgradeProtocol() throws DevFailed { DeviceData response = this.get_adm_dev().command_inout("UpgradeProtocol"); - DevVarLongStringArray array = response.extractLongStringArray(); - - TransportMeta transportMeta = TransportMeta.fromDevVarLongStringArray(array); + String[] array = response.extractStringArray(); EndpointAvailabilityChecker predicate = new EndpointAvailabilityChecker(); - String endpoint = transportMeta.getEndPoints().stream() + String endpoint = Arrays.stream(array) .filter(predicate) .findFirst() .orElseThrow(() -> new RuntimeException("No reachable connection points were found")); diff --git a/common/src/main/java/org/tango/transport/TransportMeta.java b/common/src/main/java/org/tango/transport/TransportMeta.java index 18a73badf..f19146ebd 100644 --- a/common/src/main/java/org/tango/transport/TransportMeta.java +++ b/common/src/main/java/org/tango/transport/TransportMeta.java @@ -40,4 +40,8 @@ public DevVarLongStringArray toDevVarLongStringArray() { return result; } + + public String[] toStringArray() { + return endPoints.toArray(new String[0]); + } } diff --git a/server/src/main/java/org/tango/server/admin/AdminDevice.java b/server/src/main/java/org/tango/server/admin/AdminDevice.java index d05f93325..9fb25679b 100644 --- a/server/src/main/java/org/tango/server/admin/AdminDevice.java +++ b/server/src/main/java/org/tango/server/admin/AdminDevice.java @@ -836,10 +836,8 @@ public DeviceImpl getDeviceImpl(String device) { } @Command(name = "UpgradeProtocol") - public DevVarLongStringArray upgradeProtocol() { - - - return transportManager.getTransportMeta().toDevVarLongStringArray(); + public String[] upgradeProtocol() { + return transportManager.getTransportMeta().toStringArray(); } /** From 08551dd267aec02ae2d22614bf2a2ce24e855884 Mon Sep 17 00:00:00 2001 From: ingvord Date: Fri, 21 Feb 2020 14:28:48 +0100 Subject: [PATCH 10/12] Progress #35 --- .../fr/esrf/Tango/factory/ITangoFactory.java | 2 +- .../fr/esrf/Tango/factory/TangoFactory.java | 4 +- .../java/fr/esrf/TangoApi/DeviceProxy.java | 46 ++++----- .../network/EndpointAvailabilityChecker.java | 41 -------- .../java/org/tango/network/NetworkUtils.java | 94 +++++++++++++++++++ .../org/tango/transport/DefaultTransport.java | 34 +++++++ .../org/tango/transport/GsonTangoMessage.java | 40 ++++++++ .../java/org/tango/transport/Message.java | 75 --------------- .../tango/transport/StringTangoMessage.java | 68 ++++++++++++++ .../org/tango/transport/TangoMessage.java | 35 +++++++ .../transport/TangoMessageMarshaller.java | 9 ++ .../transport/TangoMessageUnmarshaller.java | 15 +++ .../org/tango/transport/TransportMeta.java | 47 ---------- .../TangoApi/events/ZmqEventConsumer.java | 7 +- .../factory/DefaultTangoFactoryImpl.java | 13 ++- .../org/tango/transport/HttpTransport.java | 56 +++++++++++ parent/pom.xml | 3 +- server/pom.xml | 5 + .../org/tango/server/admin/AdminDevice.java | 27 +++--- .../org/tango/server/events/EventManager.java | 5 +- .../network/NetworkInterfacesExtractor.java | 66 ------------- .../transport/HttpTransportListener.java | 90 ++++++++++++++++++ .../transport/NaiveTangoMessageProcessor.java | 59 ++++++++++++ .../transport/ReadMessageProcessor.java | 38 -------- .../transport/TangoMessageProcessor.java | 12 +++ .../server/transport/TransportListener.java | 17 ++++ .../server/transport/TransportManager.java | 46 ++++----- .../transport/WriteMessageProcessor.java | 37 -------- .../server/transport/ZmqMessageProcessor.java | 11 --- .../transport/ZmqMessageProcessorImpl.java | 39 -------- .../transport/ZmqTransportListener.java | 69 +++++++++++--- 31 files changed, 665 insertions(+), 445 deletions(-) delete mode 100644 common/src/main/java/org/tango/network/EndpointAvailabilityChecker.java create mode 100644 common/src/main/java/org/tango/network/NetworkUtils.java create mode 100644 common/src/main/java/org/tango/transport/DefaultTransport.java create mode 100644 common/src/main/java/org/tango/transport/GsonTangoMessage.java delete mode 100644 common/src/main/java/org/tango/transport/Message.java create mode 100644 common/src/main/java/org/tango/transport/StringTangoMessage.java create mode 100644 common/src/main/java/org/tango/transport/TangoMessage.java create mode 100644 common/src/main/java/org/tango/transport/TangoMessageMarshaller.java create mode 100644 common/src/main/java/org/tango/transport/TangoMessageUnmarshaller.java delete mode 100644 common/src/main/java/org/tango/transport/TransportMeta.java create mode 100644 dao/src/main/java/org/tango/transport/HttpTransport.java delete mode 100644 server/src/main/java/org/tango/server/network/NetworkInterfacesExtractor.java create mode 100644 server/src/main/java/org/tango/server/transport/HttpTransportListener.java create mode 100644 server/src/main/java/org/tango/server/transport/NaiveTangoMessageProcessor.java delete mode 100644 server/src/main/java/org/tango/server/transport/ReadMessageProcessor.java create mode 100644 server/src/main/java/org/tango/server/transport/TangoMessageProcessor.java create mode 100644 server/src/main/java/org/tango/server/transport/TransportListener.java delete mode 100644 server/src/main/java/org/tango/server/transport/WriteMessageProcessor.java delete mode 100644 server/src/main/java/org/tango/server/transport/ZmqMessageProcessor.java delete mode 100644 server/src/main/java/org/tango/server/transport/ZmqMessageProcessorImpl.java diff --git a/common/src/main/java/fr/esrf/Tango/factory/ITangoFactory.java b/common/src/main/java/fr/esrf/Tango/factory/ITangoFactory.java index 410fbd6a5..b2da5a11c 100644 --- a/common/src/main/java/fr/esrf/Tango/factory/ITangoFactory.java +++ b/common/src/main/java/fr/esrf/Tango/factory/ITangoFactory.java @@ -71,5 +71,5 @@ public interface ITangoFactory { public String getFactoryName(); - Transport newTransport(); + Transport newTransport(String targetProtocol); } diff --git a/common/src/main/java/fr/esrf/Tango/factory/TangoFactory.java b/common/src/main/java/fr/esrf/Tango/factory/TangoFactory.java index 59fefa14c..b8bb1710d 100644 --- a/common/src/main/java/fr/esrf/Tango/factory/TangoFactory.java +++ b/common/src/main/java/fr/esrf/Tango/factory/TangoFactory.java @@ -205,7 +205,7 @@ public void setDefaultFactory(final boolean isDefaultFactory) { this.isDefaultFactory = isDefaultFactory; } - public Transport newTransport() { - return tangoFactory.newTransport(); + public Transport newTransport(String targetProtocol) { + return tangoFactory.newTransport(targetProtocol); } } diff --git a/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java b/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java index 27fc59bed..8f760c59b 100644 --- a/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java +++ b/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java @@ -42,13 +42,14 @@ import fr.esrf.TangoDs.Except; import fr.esrf.TangoDs.TangoConst; import org.omg.CORBA.Request; -import org.tango.network.EndpointAvailabilityChecker; -import org.tango.transport.Message; +import org.tango.network.NetworkUtils; +import org.tango.transport.DefaultTransport; +import org.tango.transport.StringTangoMessage; +import org.tango.transport.TangoMessage; import org.tango.transport.Transport; import org.tango.utils.DevFailedUtils; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.*; /** @@ -83,7 +84,7 @@ public class DeviceProxy extends Connection implements ApiDefs { private IDeviceProxyDAO deviceProxyDAO = null; - private final Transport transport = TangoFactory.getSingleton().newTransport(); + private final StringTangoMessage marshaller = new StringTangoMessage(); static final private boolean check_idl = false; @@ -787,7 +788,10 @@ public void set_attribute_config(AttributeInfo[] attr) throws DevFailed { deviceProxyDAO.set_attribute_info(this, attr); } + private Transport transport = new DefaultTransport(); + // ========================================================================== + /** * Read the attribute value for the specified device. * @@ -798,13 +802,11 @@ public void set_attribute_config(AttributeInfo[] attr) throws DevFailed { public DeviceAttribute read_attribute(String attname) throws DevFailed { if (transport.isConnected()) { try { - Message message = new Message("read", this.get_name() + "/" + attname, null, null); + TangoMessage message = new TangoMessage("read", this.get_name(), attname, -1, null); + + message = marshaller.unmarshal(transport.send(marshaller.marshal(message))); - //TODO marshaller - Message response = Message.fromString( - new String(transport.send(message.toString().getBytes(StandardCharsets.UTF_8)), StandardCharsets.UTF_8)); - System.out.println(response.value); - return new DeviceAttribute(attname, response.value); + return new DeviceAttribute(attname, String.valueOf(message.value));//TODO respect dataType } catch (IOException e) { throw DevFailedUtils.newDevFailed(e); } @@ -840,11 +842,9 @@ public DeviceAttribute[] read_attribute(String[] attnames) throws DevFailed { public void writeAttribute(String name, double value) throws DevFailed { if (transport.isConnected()) { try { - Message message = new Message("write", this.get_name() + "/" + name, "double", String.valueOf(value)); + TangoMessage message = new TangoMessage("write", this.get_name(), name, TangoConst.Tango_DEV_DOUBLE, value); - //TODO marshaller - - transport.send(message.toString().getBytes(StandardCharsets.UTF_8)); + marshaller.unmarshal(transport.send(marshaller.marshal(message))); } catch (IOException e) { throw DevFailedUtils.newDevFailed(e); } @@ -1860,18 +1860,20 @@ public int subscribe_event(int event, int max_size, boolean stateless) throws De return deviceProxyDAO.subscribe_event(this, event, max_size, stateless); } - public void upgradeProtocol() throws DevFailed { - DeviceData response = this.get_adm_dev().command_inout("UpgradeProtocol"); + public void upgradeProtocol(String targetProtocol) throws DevFailed { + DeviceData argin = new DeviceData(); + argin.insert(targetProtocol); - String[] array = response.extractStringArray(); + DeviceData response = this.get_adm_dev().command_inout("UpgradeProtocol", argin); - EndpointAvailabilityChecker predicate = new EndpointAvailabilityChecker(); - String endpoint = Arrays.stream(array) - .filter(predicate) - .findFirst() - .orElseThrow(() -> new RuntimeException("No reachable connection points were found")); + String[] array = response.extractStringArray(); try { + String endpoint = Arrays.stream(array) + .filter(NetworkUtils.getInstance()::checkEndpoint) + .findFirst() + .orElseThrow(() -> new IOException("No reachable connection points were found")); + transport = TangoFactory.getSingleton().newTransport(targetProtocol); transport.connect(endpoint); } catch (IOException e) { throw DevFailedUtils.newDevFailed(e); diff --git a/common/src/main/java/org/tango/network/EndpointAvailabilityChecker.java b/common/src/main/java/org/tango/network/EndpointAvailabilityChecker.java deleted file mode 100644 index 5bd33f740..000000000 --- a/common/src/main/java/org/tango/network/EndpointAvailabilityChecker.java +++ /dev/null @@ -1,41 +0,0 @@ -package org.tango.network; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.function.Predicate; - -/** - * @author Igor Khokhriakov - * @since 19.02.2020 - */ -public class EndpointAvailabilityChecker implements Predicate { - private final Logger logger = LoggerFactory.getLogger(EndpointAvailabilityChecker.class); - - @Override - public boolean test(String endpoint) { - logger.debug("Check endpoint: {}", endpoint); - URI uri = null; - try { - uri = new URI(endpoint); - } catch (URISyntaxException e) { - logger.debug("Bad endpoint: " + endpoint, e); - return false; - } - - // Try to connect - InetSocketAddress ip = new InetSocketAddress(uri.getHost(), uri.getPort()); - try (Socket socket = new Socket()) { - socket.connect(ip, 10); - return true; - } catch (IOException e) { - logger.debug("Failed to connect to " + ip, e); - return false; - } - } -} diff --git a/common/src/main/java/org/tango/network/NetworkUtils.java b/common/src/main/java/org/tango/network/NetworkUtils.java new file mode 100644 index 000000000..c3da3e5b1 --- /dev/null +++ b/common/src/main/java/org/tango/network/NetworkUtils.java @@ -0,0 +1,94 @@ +package org.tango.network; + +import com.google.common.collect.Collections2; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.*; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * @author Igor Khokhriakov + * @since 18.02.2020 + */ +public class NetworkUtils { + private static final NetworkUtils INSTANCE = new NetworkUtils(); + private final Logger logger = LoggerFactory.getLogger(NetworkUtils.class); + + private NetworkUtils() { + } + + public static NetworkUtils getInstance() { + return INSTANCE; + } + + public int getRandomPort() { + //TODO check if available + return new Random().ints(5000, 65535).findFirst().getAsInt(); + } + + public List getIp4Addresses() { + Iterable networkInterfaces = null; + try { + networkInterfaces = Collections.list(NetworkInterface.getNetworkInterfaces()); + } catch (SocketException e) { + logger.error("Failed to get NICs due to " + e.getMessage(), e); + return Collections.emptyList(); + } + + java.util.function.Predicate filter = networkInterface -> { + try { + return !networkInterface.isLoopback() && !networkInterface.isVirtual() && networkInterface.isUp(); + } catch (SocketException e) { + logger.warn("Ignoring NetworkInterface({}) due to an exception: {}", networkInterface.getName(), e); + return false; + } + }; + + Function interfaceAddressToString = interfaceAddress -> interfaceAddress.getAddress().getHostAddress(); + + Iterable filteredNICs = Iterables.filter(networkInterfaces, filter::test); + + List result = Lists.newArrayList(); + //TODO #17 + for (NetworkInterface nic : filteredNICs) { + result.addAll( + Collections2.filter( + nic.getInterfaceAddresses() + .stream() + .map(interfaceAddressToString::apply) + .collect(Collectors.toList()), + s -> s.split("\\.").length == 4) + ); + } + return result; + } + + public boolean checkEndpoint(String endpoint) { + logger.debug("Check endpoint: {}", endpoint); + URI uri = null; + try { + uri = new URI(endpoint); + } catch (URISyntaxException e) { + logger.debug("Bad endpoint: " + endpoint, e); + return false; + } + + // Try to connect + InetSocketAddress ip = new InetSocketAddress(uri.getHost(), uri.getPort()); + try (Socket socket = new Socket()) { + socket.connect(ip, 10); + return true; + } catch (IOException e) { + logger.debug("Failed to connect to " + ip, e); + return false; + } + } +} diff --git a/common/src/main/java/org/tango/transport/DefaultTransport.java b/common/src/main/java/org/tango/transport/DefaultTransport.java new file mode 100644 index 000000000..4d3936d4f --- /dev/null +++ b/common/src/main/java/org/tango/transport/DefaultTransport.java @@ -0,0 +1,34 @@ +package org.tango.transport; + +import java.io.IOException; + +/** + * @author Igor Khokhriakov + * @since 21.02.2020 + */ +public class DefaultTransport implements Transport { + @Override + public boolean isConnected() { + return false; + } + + @Override + public void connect(String endpoint) throws IOException { + throw new UnsupportedOperationException("This method is not supported in " + this.getClass()); + } + + @Override + public void disconnect(String endpoint) throws IOException { + throw new UnsupportedOperationException("This method is not supported in " + this.getClass()); + } + + @Override + public byte[] send(byte[] data) throws IOException { + throw new UnsupportedOperationException("This method is not supported in " + this.getClass()); + } + + @Override + public void close() throws IOException { + throw new UnsupportedOperationException("This method is not supported in " + this.getClass()); + } +} diff --git a/common/src/main/java/org/tango/transport/GsonTangoMessage.java b/common/src/main/java/org/tango/transport/GsonTangoMessage.java new file mode 100644 index 000000000..98a2689ca --- /dev/null +++ b/common/src/main/java/org/tango/transport/GsonTangoMessage.java @@ -0,0 +1,40 @@ +package org.tango.transport; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; + +/** + * @author Igor Khokhriakov + * @since 21.02.2020 + */ +public class GsonTangoMessage implements TangoMessageMarshaller, TangoMessageUnmarshaller { + private final Gson gson = new GsonBuilder() + .serializeNulls() + .create(); + + + @Override + public byte[] marshal(TangoMessage tangoMessage) { + return gson.toJson(tangoMessage).getBytes(StandardCharsets.UTF_8); + } + + @Override + public TangoMessage unmarshal(InputStream stream) { + return gson.fromJson(new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8)), TangoMessage.class); + } + + @Override + public TangoMessage unmarshal(byte[] stream) { + return unmarshal(new String(stream, StandardCharsets.UTF_8)); + } + + @Override + public TangoMessage unmarshal(String stream) { + return gson.fromJson(stream, TangoMessage.class); + } +} diff --git a/common/src/main/java/org/tango/transport/Message.java b/common/src/main/java/org/tango/transport/Message.java deleted file mode 100644 index b72fef748..000000000 --- a/common/src/main/java/org/tango/transport/Message.java +++ /dev/null @@ -1,75 +0,0 @@ -package org.tango.transport; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * @author Igor Khokhriakov - * @since 19.02.2020 - */ -public class Message { - public static final String PATTERN_STR = "(read|write|exec|response):([\\w\\W]+);(double|float|int|long|String|null):([\\w.]+)"; - public static final Pattern PATTERN = Pattern.compile(PATTERN_STR); - - public static final String TARGET_PATTERN_STR = "(\\w+)/(\\w+)/(\\w+)/(\\w+)"; - public static final Pattern TARGET_PATTERN = Pattern.compile(TARGET_PATTERN_STR); - - public String action; - public String target; - public String dataType; - public String value; - - public Message(String action, String target, String dataType, String value) { - this.action = action; - this.target = target; - this.dataType = dataType; - this.value = value; - } - - public static Message fromString(String str) { - Matcher matcher = PATTERN.matcher(str); - - if (matcher.matches()) - return new Message(matcher.group(1), matcher.group(2), matcher.group(3), matcher.group(4)); - else - throw new IllegalArgumentException("Unrecognized message: " + str); - } - - @Override - public String toString() { - return String.format( - "%s:%s;%s:%s", action, target, dataType, value - ); - } - - public static class Target { - public final String device; - public final String member; - - public Target(String device, String member) { - this.device = device; - this.member = member; - } - - public static Target fromString(String str) { - Matcher matcher = TARGET_PATTERN.matcher(str); - - if (matcher.matches()) - return new Target(String.format("%s/%s/%s", matcher.group(1), matcher.group(2), matcher.group(3)), matcher.group(4)); - else - throw new IllegalArgumentException("Unrecognized message: " + str); - } - } - - public static class Error extends Message { - public Error(String value) { - super("response", "error", "String", value); - } - } - - public static class Ok extends Message { - public Ok() { - super("response", "ok", null, null); - } - } -} diff --git a/common/src/main/java/org/tango/transport/StringTangoMessage.java b/common/src/main/java/org/tango/transport/StringTangoMessage.java new file mode 100644 index 000000000..18cbdcc31 --- /dev/null +++ b/common/src/main/java/org/tango/transport/StringTangoMessage.java @@ -0,0 +1,68 @@ +package org.tango.transport; + +import fr.esrf.TangoDs.TangoConst; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * @author Igor Khokhriakov + * @since 21.02.2020 + */ +public class StringTangoMessage implements TangoMessageMarshaller, TangoMessageUnmarshaller { + public static final String PATTERN_STR = "(read|write|exec|response);([\\w\\W]+)/([\\w\\W]+);(\\d+):([\\w.]+)"; + public static final Pattern PATTERN = Pattern.compile(PATTERN_STR); + + @Override + public byte[] marshal(TangoMessage tangoMessage) { + return toString(tangoMessage).getBytes(StandardCharsets.UTF_8); + } + + @Override + public TangoMessage unmarshal(InputStream stream) { + BufferedReader br = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8)); + return fromString(br.lines().collect(Collectors.joining(System.lineSeparator()))); + } + + @Override + public TangoMessage unmarshal(byte[] stream) { + return unmarshal(new ByteArrayInputStream(stream)); + } + + @Override + public TangoMessage unmarshal(String stream) { + return fromString(stream); + } + + private String toString(TangoMessage message) { + return String.format( + "%s;%s/%s;%d:%s", message.action, message.device, message.target, message.dataType, String.valueOf(message.value) + ); + } + + private TangoMessage fromString(String str) { + Matcher matcher = PATTERN.matcher(str); + + if (matcher.matches()) { + int dataType = Integer.parseInt(matcher.group(4)); + return new TangoMessage(matcher.group(1), matcher.group(2), matcher.group(3), dataType, parseValue(dataType, matcher.group(5))); + } else + throw new IllegalArgumentException("Unrecognized message: " + str); + } + + private Object parseValue(int dataType, String value) { + switch (dataType) { + case TangoConst + .Tango_DEV_DOUBLE: + return Double.valueOf(value); + default: + return value; + } + } +} diff --git a/common/src/main/java/org/tango/transport/TangoMessage.java b/common/src/main/java/org/tango/transport/TangoMessage.java new file mode 100644 index 000000000..40490a6d9 --- /dev/null +++ b/common/src/main/java/org/tango/transport/TangoMessage.java @@ -0,0 +1,35 @@ +package org.tango.transport; + +import fr.esrf.TangoDs.TangoConst; + +/** + * @author Igor Khokhriakov + * @since 19.02.2020 + */ +public class TangoMessage { + public String action; + public String device; + public String target; + public int dataType; + public Object value; + + public TangoMessage(String action, String device, String target, int dataType, Object value) { + this.action = action; + this.device = device; + this.target = target; + this.dataType = dataType; + this.value = value; + } + + public static class Error extends TangoMessage { + public Error(String value) { + super("response", null, "error", TangoConst.Tango_DEV_STRING, value); + } + } + + public static class Ok extends TangoMessage { + public Ok() { + super("response", null, "ok", TangoConst.Tango_DEV_VOID, null); + } + } +} diff --git a/common/src/main/java/org/tango/transport/TangoMessageMarshaller.java b/common/src/main/java/org/tango/transport/TangoMessageMarshaller.java new file mode 100644 index 000000000..ccad03db3 --- /dev/null +++ b/common/src/main/java/org/tango/transport/TangoMessageMarshaller.java @@ -0,0 +1,9 @@ +package org.tango.transport; + +/** + * @author Igor Khokhriakov + * @since 21.02.2020 + */ +public interface TangoMessageMarshaller { + byte[] marshal(TangoMessage tangoMessage); +} diff --git a/common/src/main/java/org/tango/transport/TangoMessageUnmarshaller.java b/common/src/main/java/org/tango/transport/TangoMessageUnmarshaller.java new file mode 100644 index 000000000..db7d463e2 --- /dev/null +++ b/common/src/main/java/org/tango/transport/TangoMessageUnmarshaller.java @@ -0,0 +1,15 @@ +package org.tango.transport; + +import java.io.InputStream; + +/** + * @author Igor Khokhriakov + * @since 21.02.2020 + */ +public interface TangoMessageUnmarshaller { + TangoMessage unmarshal(InputStream stream); + + TangoMessage unmarshal(byte[] stream); + + TangoMessage unmarshal(String stream); +} diff --git a/common/src/main/java/org/tango/transport/TransportMeta.java b/common/src/main/java/org/tango/transport/TransportMeta.java deleted file mode 100644 index f19146ebd..000000000 --- a/common/src/main/java/org/tango/transport/TransportMeta.java +++ /dev/null @@ -1,47 +0,0 @@ -package org.tango.transport; - -import com.google.common.collect.Lists; -import fr.esrf.Tango.DevVarLongStringArray; - -import java.util.Arrays; -import java.util.List; - -/** - * @author Igor Khokhriakov - * @since 18.02.2020 - */ -public class TransportMeta { - private List endPoints = Lists.newArrayList(); - - public TransportMeta() { - } - - public static TransportMeta fromDevVarLongStringArray(DevVarLongStringArray array) { - TransportMeta transportMeta = new TransportMeta(); - transportMeta.setEndPoints(Arrays.asList(array.svalue)); - return transportMeta; - } - - public List getEndPoints() { - return endPoints; - } - - public void setEndPoints(List endPoints) { - this.endPoints = endPoints; - } - - public void addEndpoint(String connectionPoint) { - endPoints.add(connectionPoint); - } - - public DevVarLongStringArray toDevVarLongStringArray() { - DevVarLongStringArray result = new DevVarLongStringArray(new int[0], - endPoints.toArray(new String[0])); - - return result; - } - - public String[] toStringArray() { - return endPoints.toArray(new String[0]); - } -} diff --git a/dao/src/main/java/fr/esrf/TangoApi/events/ZmqEventConsumer.java b/dao/src/main/java/fr/esrf/TangoApi/events/ZmqEventConsumer.java index 8ec59c8db..c1c6961eb 100755 --- a/dao/src/main/java/fr/esrf/TangoApi/events/ZmqEventConsumer.java +++ b/dao/src/main/java/fr/esrf/TangoApi/events/ZmqEventConsumer.java @@ -48,7 +48,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.tango.network.EndpointAvailabilityChecker; +import org.tango.network.NetworkUtils; import org.tango.utils.DevFailedUtils; import java.net.UnknownHostException; @@ -379,7 +379,7 @@ private DeviceData checkWithHostAddress(DeviceData deviceData, DeviceProxy devic logger.debug("{} ---> {}", wrongAdd, lsa.svalue[0]); deviceData = new DeviceData(); deviceData.insert(lsa); - new EndpointAvailabilityChecker().test(lsa.svalue[0]); + NetworkUtils.getInstance().checkEndpoint(lsa.svalue[0]); } } } catch (UnknownHostException e) { @@ -402,13 +402,12 @@ private DeviceData checkZmqAddress(DeviceData deviceData, DeviceProxy deviceProx logger.trace("Inside checkZmqAddress()"); DevVarLongStringArray lsa = deviceData.extractLongStringArray(); - EndpointAvailabilityChecker predicate = new EndpointAvailabilityChecker(); Pair validEndpoints = Optional.ofNullable(Observable.zip( Observable.fromArray(lsa.svalue), Observable.fromArray(lsa.svalue).skip(1), ImmutablePair::of ) - .filter(pair -> predicate.test(pair.left)) + .filter(pair -> NetworkUtils.getInstance().checkEndpoint(pair.left)) .blockingFirst(null)) .orElseGet(() -> { DeviceData checkedWithHostAddress = checkWithHostAddress(deviceData, deviceProxy); diff --git a/dao/src/main/java/fr/esrf/TangoApi/factory/DefaultTangoFactoryImpl.java b/dao/src/main/java/fr/esrf/TangoApi/factory/DefaultTangoFactoryImpl.java index 411e45313..0b2707109 100644 --- a/dao/src/main/java/fr/esrf/TangoApi/factory/DefaultTangoFactoryImpl.java +++ b/dao/src/main/java/fr/esrf/TangoApi/factory/DefaultTangoFactoryImpl.java @@ -36,6 +36,8 @@ import fr.esrf.Tango.factory.ITangoFactory; import fr.esrf.TangoApi.*; +import org.tango.transport.HttpTransport; +import org.tango.transport.Transport; import org.tango.transport.ZmqTransport; public class DefaultTangoFactoryImpl implements ITangoFactory { @@ -137,8 +139,15 @@ public String getFactoryName() return "TANGORB Default"; } - public ZmqTransport newTransport() { - return new ZmqTransport(); + public Transport newTransport(String targetProtocol) { + switch (targetProtocol) { + case "zmq": + return new ZmqTransport(); + case "http": + return new HttpTransport(); + default: + throw new IllegalArgumentException("Unsupported target transport: " + targetProtocol); + } } } diff --git a/dao/src/main/java/org/tango/transport/HttpTransport.java b/dao/src/main/java/org/tango/transport/HttpTransport.java new file mode 100644 index 000000000..02dfb4b01 --- /dev/null +++ b/dao/src/main/java/org/tango/transport/HttpTransport.java @@ -0,0 +1,56 @@ +package org.tango.transport; + + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; + +import static java.net.http.HttpClient.Version.HTTP_2; + +/** + * @author Igor Khokhriakov + * @since 20.02.2020 + */ +public class HttpTransport implements Transport { + private HttpClient client; + private URI uri; + + @Override + public boolean isConnected() { + return uri != null; + } + + @Override + public void connect(String endpoint) throws IOException { + uri = URI.create(endpoint); + client = HttpClient.newBuilder() + // just to show off; HTTP/2 is the default + .version(HTTP_2) + .connectTimeout(Duration.ofSeconds(5)) + .build(); + } + + @Override + public void disconnect(String endpoint) throws IOException { + } + + @Override + public byte[] send(byte[] data) throws IOException { + HttpRequest req = HttpRequest.newBuilder() + .GET() + .uri(uri) + .POST(HttpRequest.BodyPublishers.ofByteArray(data)) + .build(); + + client.sendAsync(req, HttpResponse.BodyHandlers.ofString()); + return new byte[0]; + } + + @Override + public void close() throws IOException { + + } +} diff --git a/parent/pom.xml b/parent/pom.xml index e75b8e07c..fcbcc55df 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -223,8 +223,7 @@ maven-compiler-plugin 3.8.1 - 1.8 - 1.8 + 11 UTF-8 true true diff --git a/server/pom.xml b/server/pom.xml index 0a7057a08..1dde70816 100755 --- a/server/pom.xml +++ b/server/pom.xml @@ -157,6 +157,11 @@ 2.2.0 test + + io.undertow + undertow-core + 2.0.29.Final + diff --git a/server/src/main/java/org/tango/server/admin/AdminDevice.java b/server/src/main/java/org/tango/server/admin/AdminDevice.java index 9fb25679b..5d1df5b5e 100644 --- a/server/src/main/java/org/tango/server/admin/AdminDevice.java +++ b/server/src/main/java/org/tango/server/admin/AdminDevice.java @@ -24,7 +24,6 @@ */ package org.tango.server.admin; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import fr.esrf.Tango.ClntIdent; import fr.esrf.Tango.DevFailed; import fr.esrf.Tango.DevVarLongStringArray; @@ -56,15 +55,13 @@ import org.tango.server.properties.ClassPropertyImpl; import org.tango.server.properties.DevicePropertyImpl; import org.tango.server.servant.DeviceImpl; +import org.tango.server.transport.HttpTransportListener; import org.tango.server.transport.TransportManager; import org.tango.server.transport.ZmqTransportListener; import org.tango.utils.DevFailedUtils; import org.tango.utils.TangoUtil; -import org.zeromq.ZMQ; import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -435,12 +432,6 @@ public String[] getPolledDevice() { return pollDevices.toArray(new String[pollDevices.size()]); } - private final ExecutorService executorService = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("ZmqTransportListener-%d") - .build() - ); /** * @param dvlsa Lg[0]=Upd period. Str[0]=Device name. Str[1]=Object @@ -821,10 +812,16 @@ private DevVarLongStringArray subscribeEvent(final EventType eventType, final St private final TransportManager transportManager = new TransportManager(); { - ZMQ.Socket socket = transportManager - .bindZmqTransport(); + transportManager.registerTransport(new ZmqTransportListener(this)); + transportManager.registerTransport(new HttpTransportListener(this)); - executorService.execute(new ZmqTransportListener(socket, this)); + transportManager.bind(); + transportManager.listen(); + } + + @Attribute + public String[] getSupportedTransports() { + return transportManager.getTransports().toArray(new String[0]); } public DeviceImpl getDeviceImpl(String device) { @@ -836,8 +833,8 @@ public DeviceImpl getDeviceImpl(String device) { } @Command(name = "UpgradeProtocol") - public String[] upgradeProtocol() { - return transportManager.getTransportMeta().toStringArray(); + public String[] upgradeProtocol(String transport) { + return transportManager.getEndpoints(transport).toArray(new String[0]); } /** diff --git a/server/src/main/java/org/tango/server/events/EventManager.java b/server/src/main/java/org/tango/server/events/EventManager.java index 35c1e42f6..3d67c0e4c 100644 --- a/server/src/main/java/org/tango/server/events/EventManager.java +++ b/server/src/main/java/org/tango/server/events/EventManager.java @@ -33,11 +33,11 @@ import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; import org.tango.client.database.DatabaseFactory; +import org.tango.network.NetworkUtils; import org.tango.server.ServerManager; import org.tango.server.attribute.AttributeImpl; import org.tango.server.attribute.ForwardedAttribute; import org.tango.server.idl.TangoIDLUtil; -import org.tango.server.network.NetworkInterfacesExtractor; import org.tango.server.pipe.PipeImpl; import org.tango.server.pipe.PipeValue; import org.tango.server.servant.DeviceImpl; @@ -94,8 +94,7 @@ public final class EventManager { }; private EventManager() { - NetworkInterfacesExtractor networkInterfacesExtractor = new NetworkInterfacesExtractor(); - List ipAddresses = networkInterfacesExtractor.getIp4Addresses(); + List ipAddresses = NetworkUtils.getInstance().getIp4Addresses(); bindEndpoints(createSocket(), ipAddresses, heartbeatEndpoints, SocketType.HEARTBEAT); bindEndpoints(createEventSocket(), ipAddresses, eventEndpoints, SocketType.EVENTS); diff --git a/server/src/main/java/org/tango/server/network/NetworkInterfacesExtractor.java b/server/src/main/java/org/tango/server/network/NetworkInterfacesExtractor.java deleted file mode 100644 index 45a9f9baf..000000000 --- a/server/src/main/java/org/tango/server/network/NetworkInterfacesExtractor.java +++ /dev/null @@ -1,66 +0,0 @@ -package org.tango.server.network; - -import com.google.common.collect.Collections2; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InterfaceAddress; -import java.net.NetworkInterface; -import java.net.SocketException; -import java.util.Collections; -import java.util.List; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static org.tango.orb.ORBManager.OAI_ADDR; - -/** - * @author Igor Khokhriakov - * @since 18.02.2020 - */ -public class NetworkInterfacesExtractor { - private final Logger logger = LoggerFactory.getLogger(NetworkInterfacesExtractor.class); - - public List getIp4Addresses() { - if (OAI_ADDR != null && !OAI_ADDR.isEmpty()) { - return Lists.newArrayList(OAI_ADDR); - } else { - Iterable networkInterfaces = null; - try { - networkInterfaces = Collections.list(NetworkInterface.getNetworkInterfaces()); - } catch (SocketException e) { - logger.error("Failed to get NICs due to " + e.getMessage(), e); - return Collections.emptyList(); - } - - java.util.function.Predicate filter = networkInterface -> { - try { - return !networkInterface.isLoopback() && !networkInterface.isVirtual() && networkInterface.isUp(); - } catch (SocketException e) { - logger.warn("Ignoring NetworkInterface({}) due to an exception: {}", networkInterface.getName(), e); - return false; - } - }; - - Function interfaceAddressToString = interfaceAddress -> interfaceAddress.getAddress().getHostAddress(); - - Iterable filteredNICs = Iterables.filter(networkInterfaces, filter::test); - - List result = Lists.newArrayList(); - //TODO #17 - for (NetworkInterface nic : filteredNICs) { - result.addAll( - Collections2.filter( - nic.getInterfaceAddresses() - .stream() - .map(interfaceAddressToString::apply) - .collect(Collectors.toList()), - s -> s.split("\\.").length == 4) - ); - } - return result; - } - } -} diff --git a/server/src/main/java/org/tango/server/transport/HttpTransportListener.java b/server/src/main/java/org/tango/server/transport/HttpTransportListener.java new file mode 100644 index 000000000..a8c59d134 --- /dev/null +++ b/server/src/main/java/org/tango/server/transport/HttpTransportListener.java @@ -0,0 +1,90 @@ +package org.tango.server.transport; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.undertow.Undertow; +import io.undertow.UndertowOptions; +import io.undertow.server.HttpHandler; +import io.undertow.server.HttpServerExchange; +import io.undertow.util.Headers; +import org.tango.network.NetworkUtils; +import org.tango.server.admin.AdminDevice; +import org.tango.transport.StringTangoMessage; +import org.tango.transport.TangoMessage; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +/** + * @author Igor Khokhriakov + * @since 20.02.2020 + */ +public class HttpTransportListener implements TransportListener { + private final StringTangoMessage marshaller = new StringTangoMessage(); + private final TangoMessageProcessor tangoMessageProcessor = new NaiveTangoMessageProcessor(); + private final AdminDevice adminDevice; + private final ExecutorService executorService = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("HttpTransportListener-%d") + .build() + ); + private Undertow server; + + + public HttpTransportListener(AdminDevice adminDevice) { + this.adminDevice = adminDevice; + } + + @Override + public void bind() { + int port = NetworkUtils.getInstance().getRandomPort(); + + Undertow.Builder builder = Undertow.builder() + .setIoThreads(200) + .setServerOption(UndertowOptions.ENABLE_HTTP2, true) + .setHandler(new HttpHandler() { + @Override + public void handleRequest(HttpServerExchange exchange) { + if (exchange.isInIoThread()) { + exchange.dispatch(this); + return; + } + + exchange.startBlocking(); + + exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/json"); + + TangoMessage message = marshaller.unmarshal(exchange.getInputStream()); + + message = tangoMessageProcessor.process(message, adminDevice); + + exchange.getResponseSender().send(ByteBuffer.wrap(marshaller.marshal(message))); + } + } + ); + + NetworkUtils.getInstance().getIp4Addresses().forEach(s -> builder.addHttpListener(port, s)); + + this.server = builder.build(); + } + + @Override + public void listen() { + executorService.submit(() -> server.start()); + } + + @Override + public String getName() { + return "http"; + } + + @Override + public List endpoints() { + return server.getListenerInfo().stream() + .map(listenerInfo -> listenerInfo.getProtcol() + ":/" + listenerInfo.getAddress().toString()) + .collect(Collectors.toList()); + } +} diff --git a/server/src/main/java/org/tango/server/transport/NaiveTangoMessageProcessor.java b/server/src/main/java/org/tango/server/transport/NaiveTangoMessageProcessor.java new file mode 100644 index 000000000..b869747ee --- /dev/null +++ b/server/src/main/java/org/tango/server/transport/NaiveTangoMessageProcessor.java @@ -0,0 +1,59 @@ +package org.tango.server.transport; + +import fr.esrf.Tango.AttrDataFormat; +import fr.esrf.Tango.AttributeValue; +import fr.esrf.Tango.DevFailed; +import org.omg.CORBA.Any; +import org.tango.server.admin.AdminDevice; +import org.tango.server.idl.CleverAnyAttribute; +import org.tango.server.idl.TangoIDLAttributeUtil; +import org.tango.server.servant.DeviceImpl; +import org.tango.transport.TangoMessage; + +import java.util.Arrays; + +/** + * @author Igor Khokhriakov + * @since 21.02.2020 + */ +public class NaiveTangoMessageProcessor implements TangoMessageProcessor { + @Override + public TangoMessage process(TangoMessage input, AdminDevice locator) { + DeviceImpl device = locator.getDeviceImpl(input.device); + + switch (input.action) { + case "read": + return processRead(input, device); + case "write": + return processWrite(input, device); + case "exec": + default: + return new TangoMessage.Error("Unsupported message action - " + input.action); + } + } + + private TangoMessage processRead(TangoMessage message, DeviceImpl device) { + try { + Any any = Arrays.stream(device.read_attributes(new String[]{message.target})).map(attributeValue -> attributeValue.value).findFirst().get(); + + Object result = CleverAnyAttribute.get(any, message.dataType, AttrDataFormat.SCALAR); + return new TangoMessage("response", message.device, message.target, message.dataType, result); + } catch (DevFailed devFailed) { + return new TangoMessage.Error(devFailed.errors[0].reason); + } + } + + private TangoMessage processWrite(TangoMessage message, DeviceImpl device) { + try { + device.write_attributes( + new AttributeValue[]{ + TangoIDLAttributeUtil.toAttributeValue( + device.getAttributeImpl(message.target).get(), + new org.tango.server.attribute.AttributeValue(message.value))}); + return new TangoMessage.Ok(); + } catch (DevFailed devFailed) { + return new TangoMessage.Error(devFailed.errors[0].reason); + } + } + +} diff --git a/server/src/main/java/org/tango/server/transport/ReadMessageProcessor.java b/server/src/main/java/org/tango/server/transport/ReadMessageProcessor.java deleted file mode 100644 index 300f5b133..000000000 --- a/server/src/main/java/org/tango/server/transport/ReadMessageProcessor.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.tango.server.transport; - -import fr.esrf.Tango.AttrDataFormat; -import fr.esrf.Tango.DevFailed; -import org.omg.CORBA.Any; -import org.tango.server.idl.CleverAnyAttribute; -import org.tango.server.servant.DeviceImpl; -import org.tango.transport.Message; - -import java.util.Arrays; - -/** - * @author Igor Khokhriakov - * @since 19.02.2020 - */ -public class ReadMessageProcessor implements ZmqMessageProcessor { - private final DeviceImpl device; - private final String attributeName; - private final String dataType; - - public ReadMessageProcessor(DeviceImpl device, String attributeName, String dataType) { - this.device = device; - this.attributeName = attributeName; - this.dataType = dataType; - } - - @Override - public Message process() { - try { - Any any = Arrays.stream(device.read_attributes(new String[]{attributeName})).map(attributeValue -> attributeValue.value).findFirst().get(); - - Object result = CleverAnyAttribute.get(any, Integer.parseInt(dataType), AttrDataFormat.SCALAR); - return new Message("response", attributeName, dataType, String.valueOf(result)); - } catch (DevFailed devFailed) { - return new Message.Error(devFailed.errors[0].reason); - } - } -} diff --git a/server/src/main/java/org/tango/server/transport/TangoMessageProcessor.java b/server/src/main/java/org/tango/server/transport/TangoMessageProcessor.java new file mode 100644 index 000000000..f7437842e --- /dev/null +++ b/server/src/main/java/org/tango/server/transport/TangoMessageProcessor.java @@ -0,0 +1,12 @@ +package org.tango.server.transport; + +import org.tango.server.admin.AdminDevice; +import org.tango.transport.TangoMessage; + +/** + * @author Igor Khokhriakov + * @since 21.02.2020 + */ +public interface TangoMessageProcessor { + TangoMessage process(TangoMessage input, AdminDevice locator); +} diff --git a/server/src/main/java/org/tango/server/transport/TransportListener.java b/server/src/main/java/org/tango/server/transport/TransportListener.java new file mode 100644 index 000000000..9809378fe --- /dev/null +++ b/server/src/main/java/org/tango/server/transport/TransportListener.java @@ -0,0 +1,17 @@ +package org.tango.server.transport; + +import java.util.List; + +/** + * @author Igor Khokhriakov + * @since 21.02.2020 + */ +public interface TransportListener { + void bind(); + + void listen(); + + String getName(); + + List endpoints(); +} diff --git a/server/src/main/java/org/tango/server/transport/TransportManager.java b/server/src/main/java/org/tango/server/transport/TransportManager.java index e35291d33..451938f86 100644 --- a/server/src/main/java/org/tango/server/transport/TransportManager.java +++ b/server/src/main/java/org/tango/server/transport/TransportManager.java @@ -1,12 +1,10 @@ package org.tango.server.transport; -import org.tango.server.network.NetworkInterfacesExtractor; -import org.tango.transport.TransportMeta; -import org.zeromq.ZContext; -import org.zeromq.ZMQ; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.util.List; -import java.util.Optional; +import java.util.Map; /** * This class maintains ZMQ REQ/REP required data @@ -15,35 +13,25 @@ * @since 18.02.2020 */ public class TransportManager { - private final ZContext context = new ZContext(); - - private ZMQ.Socket socket; - private String port; - - public ZMQ.Socket bindZmqTransport() { - this.socket = Optional.ofNullable(socket).orElseGet(() -> { - ZMQ.Socket socket = createZMQSocket(); - port = String.valueOf(socket.bindToRandomPort("tcp://*")); - return socket; - }); - return socket; - } - - public TransportMeta getTransportMeta() { - List connectionPoints = new NetworkInterfacesExtractor().getIp4Addresses(); + private final Map transports = Maps.newHashMap(); - TransportMeta result = new TransportMeta(); + public void registerTransport(TransportListener transportListener) { + transports.put(transportListener.getName(), transportListener); + } - connectionPoints.stream() - .map(s -> "tcp://" + s + ":" + port) - .forEach(result::addEndpoint); + public List getEndpoints(String transport) { + return transports.get(transport).endpoints(); + } - return result; + public List getTransports() { + return Lists.newArrayList(transports.keySet()); } - public ZMQ.Socket createZMQSocket() { - final ZMQ.Socket socket = context.createSocket(ZMQ.REP); - return socket; + public void bind() { + transports.values().forEach(TransportListener::bind); } + public void listen() { + transports.values().forEach(TransportListener::listen); + } } diff --git a/server/src/main/java/org/tango/server/transport/WriteMessageProcessor.java b/server/src/main/java/org/tango/server/transport/WriteMessageProcessor.java deleted file mode 100644 index e6e9be94c..000000000 --- a/server/src/main/java/org/tango/server/transport/WriteMessageProcessor.java +++ /dev/null @@ -1,37 +0,0 @@ -package org.tango.server.transport; - -import fr.esrf.Tango.AttributeValue; -import fr.esrf.Tango.DevFailed; -import org.tango.server.idl.TangoIDLAttributeUtil; -import org.tango.server.servant.DeviceImpl; -import org.tango.transport.Message; - -/** - * @author Igor Khokhriakov - * @since 19.02.2020 - */ -public class WriteMessageProcessor implements ZmqMessageProcessor { - private final DeviceImpl device; - private final String attributeName; - private final Object value; - - public WriteMessageProcessor(DeviceImpl device, String attributeName, Object value) { - this.device = device; - this.attributeName = attributeName; - this.value = value; - } - - @Override - public Message process() { - try { - device.write_attributes( - new AttributeValue[]{ - TangoIDLAttributeUtil.toAttributeValue( - device.getAttributeImpl(attributeName).get(), - new org.tango.server.attribute.AttributeValue(value))}); - return new Message.Ok(); - } catch (DevFailed devFailed) { - return new Message.Error(devFailed.errors[0].reason); - } - } -} diff --git a/server/src/main/java/org/tango/server/transport/ZmqMessageProcessor.java b/server/src/main/java/org/tango/server/transport/ZmqMessageProcessor.java deleted file mode 100644 index 11c341637..000000000 --- a/server/src/main/java/org/tango/server/transport/ZmqMessageProcessor.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.tango.server.transport; - -import org.tango.transport.Message; - -/** - * @author Igor Khokhriakov - * @since 19.02.2020 - */ -public interface ZmqMessageProcessor { - Message process(); -} diff --git a/server/src/main/java/org/tango/server/transport/ZmqMessageProcessorImpl.java b/server/src/main/java/org/tango/server/transport/ZmqMessageProcessorImpl.java deleted file mode 100644 index 0e6213261..000000000 --- a/server/src/main/java/org/tango/server/transport/ZmqMessageProcessorImpl.java +++ /dev/null @@ -1,39 +0,0 @@ -package org.tango.server.transport; - -import com.google.gson.Gson; -import org.tango.server.admin.AdminDevice; -import org.tango.server.servant.DeviceImpl; -import org.tango.transport.Message; - -/** - * @author Igor Khokhriakov - * @since 19.02.2020 - */ -public class ZmqMessageProcessorImpl implements ZmqMessageProcessor { - private final String msg; - private final AdminDevice admin; - - public ZmqMessageProcessorImpl(String msg, AdminDevice admin) { - this.msg = msg; - this.admin = admin; - } - - @Override - public Message process() { - Message message = Message.fromString(msg); - - Message.Target target = Message.Target.fromString(message.target); - - DeviceImpl device = admin.getDeviceImpl(target.device); - - switch (message.action) { - case "read": - return new ReadMessageProcessor(device, target.member, message.dataType).process(); - case "write": - return new WriteMessageProcessor(device, target.member, new Gson().fromJson(message.value, Object.class)).process(); - case "exec": - default: - return new Message.Error("Unsupported message action - " + message.action); - } - } -} diff --git a/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java b/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java index c4cb05198..23bea4356 100644 --- a/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java +++ b/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java @@ -1,42 +1,89 @@ package org.tango.server.transport; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.tango.network.NetworkUtils; import org.tango.server.admin.AdminDevice; +import org.tango.transport.StringTangoMessage; +import org.tango.transport.TangoMessage; +import org.zeromq.ZContext; import org.zeromq.ZMQ; import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; /** * @author Igor Khokhriakov * @since 19.02.2020 */ -public class ZmqTransportListener implements Runnable { - private final Logger logger = LoggerFactory.getLogger(ZmqTransportListener.class); +public class ZmqTransportListener implements TransportListener, Runnable { + private final StringTangoMessage marshaller = new StringTangoMessage(); + private final TangoMessageProcessor tangoMessageProcessor = new NaiveTangoMessageProcessor(); + private final ExecutorService executorService = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("ZmqTransportListener-%d") + .build() + ); + private final ZContext ctx = new ZContext(); + + private final Logger logger = LoggerFactory.getLogger(ZmqTransportListener.class); + private String port; private final ZMQ.Socket socket; private final AdminDevice admin; - public ZmqTransportListener(ZMQ.Socket socket, AdminDevice admin) { - this.socket = socket; + public ZmqTransportListener(AdminDevice admin) { this.admin = admin; + this.socket = createZMQSocket(ZMQ.REP); + } + + @Override + public void bind() { + port = String.valueOf(socket.bindToRandomPort("tcp://*")); } + @Override + public String getName() { + return "zmq"; + } + + @Override + public List endpoints() { + List ip4Addresses = NetworkUtils.getInstance().getIp4Addresses(); + + return ip4Addresses.stream() + .map(s -> "tcp://" + s + ":" + port) + .collect(Collectors.toList()); + } + + public void listen() { + executorService.submit(this); + } + + @Override public void run() { logger.debug("Starting ZmqTransportListener"); while (!Thread.currentThread().isInterrupted()) { byte[] data = socket.recv(); - String msg = new String(data, StandardCharsets.UTF_8); - //TODO non blocking - //TODO thread pool + TangoMessage message = marshaller.unmarshal(new String(data, StandardCharsets.UTF_8)); + socket.send( - new ZmqMessageProcessorImpl(msg, admin) - .process() - .toString() - .getBytes(StandardCharsets.UTF_8)); + marshaller.marshal( + tangoMessageProcessor.process(message, admin))); } } + + private ZMQ.Socket createZMQSocket(int type) { + ZMQ.Socket socket = ctx.createSocket(type); + socket.setLinger(0); + return socket; + } } From ce7e3ea30904ce393548887d5d73c6f9504f7594 Mon Sep 17 00:00:00 2001 From: ingvord Date: Fri, 21 Feb 2020 15:30:29 +0100 Subject: [PATCH 11/12] Progress #35 --- .../java/fr/esrf/TangoApi/DeviceProxy.java | 29 +++++++++++-------- .../org/tango/transport/TangoMessage.java | 6 ++-- .../org/tango/transport/HttpTransport.java | 8 +++-- .../transport/ZmqTransportListener.java | 2 +- 4 files changed, 27 insertions(+), 18 deletions(-) diff --git a/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java b/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java index 8f760c59b..9da7da2ea 100644 --- a/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java +++ b/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java @@ -792,25 +792,30 @@ public void set_attribute_config(AttributeInfo[] attr) throws DevFailed { // ========================================================================== - /** - * Read the attribute value for the specified device. - * - * @param attname attribute name to request value. - * @return the attribute value. - */ - // ========================================================================== - public DeviceAttribute read_attribute(String attname) throws DevFailed { + public double readAttributeDouble(String name) throws DevFailed { if (transport.isConnected()) { try { - TangoMessage message = new TangoMessage("read", this.get_name(), attname, -1, null); + TangoMessage message = new TangoMessage<>("read", this.get_name(), name, TangoConst.Tango_DEV_DOUBLE, 0D); message = marshaller.unmarshal(transport.send(marshaller.marshal(message))); - return new DeviceAttribute(attname, String.valueOf(message.value));//TODO respect dataType + return message.value; } catch (IOException e) { throw DevFailedUtils.newDevFailed(e); } + } else { + return read_attribute(name).extractDouble(); } + } + + /** + * Read the attribute value for the specified device. + * + * @param attname attribute name to request value. + * @return the attribute value. + */ + // ========================================================================== + public DeviceAttribute read_attribute(String attname) throws DevFailed { return deviceProxyDAO.read_attribute(this, attname); } @@ -842,9 +847,9 @@ public DeviceAttribute[] read_attribute(String[] attnames) throws DevFailed { public void writeAttribute(String name, double value) throws DevFailed { if (transport.isConnected()) { try { - TangoMessage message = new TangoMessage("write", this.get_name(), name, TangoConst.Tango_DEV_DOUBLE, value); + TangoMessage message = new TangoMessage<>("write", this.get_name(), name, TangoConst.Tango_DEV_DOUBLE, value); - marshaller.unmarshal(transport.send(marshaller.marshal(message))); + transport.send(marshaller.marshal(message)); } catch (IOException e) { throw DevFailedUtils.newDevFailed(e); } diff --git a/common/src/main/java/org/tango/transport/TangoMessage.java b/common/src/main/java/org/tango/transport/TangoMessage.java index 40490a6d9..2405a643a 100644 --- a/common/src/main/java/org/tango/transport/TangoMessage.java +++ b/common/src/main/java/org/tango/transport/TangoMessage.java @@ -6,14 +6,14 @@ * @author Igor Khokhriakov * @since 19.02.2020 */ -public class TangoMessage { +public class TangoMessage { public String action; public String device; public String target; public int dataType; - public Object value; + public T value; - public TangoMessage(String action, String device, String target, int dataType, Object value) { + public TangoMessage(String action, String device, String target, int dataType, T value) { this.action = action; this.device = device; this.target = target; diff --git a/dao/src/main/java/org/tango/transport/HttpTransport.java b/dao/src/main/java/org/tango/transport/HttpTransport.java index 02dfb4b01..0735f02f9 100644 --- a/dao/src/main/java/org/tango/transport/HttpTransport.java +++ b/dao/src/main/java/org/tango/transport/HttpTransport.java @@ -6,6 +6,7 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; import java.time.Duration; import static java.net.http.HttpClient.Version.HTTP_2; @@ -45,8 +46,11 @@ public byte[] send(byte[] data) throws IOException { .POST(HttpRequest.BodyPublishers.ofByteArray(data)) .build(); - client.sendAsync(req, HttpResponse.BodyHandlers.ofString()); - return new byte[0]; + try { + return client.send(req, HttpResponse.BodyHandlers.ofString()).body().getBytes(StandardCharsets.UTF_8); + } catch (InterruptedException e) { + throw new IOException(e); + } } @Override diff --git a/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java b/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java index 23bea4356..7f81860d3 100644 --- a/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java +++ b/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java @@ -83,7 +83,7 @@ public void run() { private ZMQ.Socket createZMQSocket(int type) { ZMQ.Socket socket = ctx.createSocket(type); - socket.setLinger(0); +// socket.setLinger(0); return socket; } } From a4dc9944bb1c05e3d9803b5642c71f4b3f1b435b Mon Sep 17 00:00:00 2001 From: ingvord Date: Fri, 21 Feb 2020 15:47:07 +0100 Subject: [PATCH 12/12] Progress #35 --- common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java | 4 ++-- .../org/tango/server/transport/HttpTransportListener.java | 4 ++-- .../java/org/tango/server/transport/ZmqTransportListener.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java b/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java index 9da7da2ea..2a9258ab4 100644 --- a/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java +++ b/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java @@ -44,7 +44,7 @@ import org.omg.CORBA.Request; import org.tango.network.NetworkUtils; import org.tango.transport.DefaultTransport; -import org.tango.transport.StringTangoMessage; +import org.tango.transport.GsonTangoMessage; import org.tango.transport.TangoMessage; import org.tango.transport.Transport; import org.tango.utils.DevFailedUtils; @@ -84,7 +84,7 @@ public class DeviceProxy extends Connection implements ApiDefs { private IDeviceProxyDAO deviceProxyDAO = null; - private final StringTangoMessage marshaller = new StringTangoMessage(); + private final GsonTangoMessage marshaller = new GsonTangoMessage(); static final private boolean check_idl = false; diff --git a/server/src/main/java/org/tango/server/transport/HttpTransportListener.java b/server/src/main/java/org/tango/server/transport/HttpTransportListener.java index a8c59d134..c2f7efe0e 100644 --- a/server/src/main/java/org/tango/server/transport/HttpTransportListener.java +++ b/server/src/main/java/org/tango/server/transport/HttpTransportListener.java @@ -8,7 +8,7 @@ import io.undertow.util.Headers; import org.tango.network.NetworkUtils; import org.tango.server.admin.AdminDevice; -import org.tango.transport.StringTangoMessage; +import org.tango.transport.GsonTangoMessage; import org.tango.transport.TangoMessage; import java.nio.ByteBuffer; @@ -22,7 +22,7 @@ * @since 20.02.2020 */ public class HttpTransportListener implements TransportListener { - private final StringTangoMessage marshaller = new StringTangoMessage(); + private final GsonTangoMessage marshaller = new GsonTangoMessage(); private final TangoMessageProcessor tangoMessageProcessor = new NaiveTangoMessageProcessor(); private final AdminDevice adminDevice; private final ExecutorService executorService = Executors.newSingleThreadExecutor( diff --git a/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java b/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java index 7f81860d3..bc4b553ec 100644 --- a/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java +++ b/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java @@ -5,7 +5,7 @@ import org.slf4j.LoggerFactory; import org.tango.network.NetworkUtils; import org.tango.server.admin.AdminDevice; -import org.tango.transport.StringTangoMessage; +import org.tango.transport.GsonTangoMessage; import org.tango.transport.TangoMessage; import org.zeromq.ZContext; import org.zeromq.ZMQ; @@ -21,7 +21,7 @@ * @since 19.02.2020 */ public class ZmqTransportListener implements TransportListener, Runnable { - private final StringTangoMessage marshaller = new StringTangoMessage(); + private final GsonTangoMessage marshaller = new GsonTangoMessage(); private final TangoMessageProcessor tangoMessageProcessor = new NaiveTangoMessageProcessor(); private final ExecutorService executorService = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder()