From dd454e6d112f368415bcdc304a29fb7c12c64cfd Mon Sep 17 00:00:00 2001 From: ingvord Date: Wed, 19 Feb 2020 15:31:07 +0100 Subject: [PATCH] Progress #35 --- common/pom.xml | 5 ++ .../java/fr/esrf/TangoApi/DeviceProxy.java | 16 +++- .../java/org/tango/transport/Message.java | 75 +++++++++++++++++++ .../org/tango/server/admin/AdminDevice.java | 28 ++++++- .../transport/ReadMessageProcessor.java | 38 ++++++++++ .../server/transport/TransportManager.java | 10 ++- .../transport/WriteMessageProcessor.java | 37 +++++++++ .../server/transport/ZmqMessageProcessor.java | 11 +++ .../transport/ZmqMessageProcessorImpl.java | 41 ++++++++++ .../transport/ZmqTransportListener.java | 37 +++++++++ 10 files changed, 290 insertions(+), 8 deletions(-) create mode 100644 common/src/main/java/org/tango/transport/Message.java create mode 100644 server/src/main/java/org/tango/server/transport/ReadMessageProcessor.java create mode 100644 server/src/main/java/org/tango/server/transport/WriteMessageProcessor.java create mode 100644 server/src/main/java/org/tango/server/transport/ZmqMessageProcessor.java create mode 100644 server/src/main/java/org/tango/server/transport/ZmqMessageProcessorImpl.java create mode 100644 server/src/main/java/org/tango/server/transport/ZmqTransportListener.java diff --git a/common/pom.xml b/common/pom.xml index 7ddb806a..483dde0d 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -97,5 +97,10 @@ org.apache.commons commons-lang3 + + com.google.code.gson + gson + 2.8.6 + diff --git a/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java b/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java index 84552d0e..b8549b6c 100644 --- a/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java +++ b/common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java @@ -43,6 +43,7 @@ import fr.esrf.TangoDs.TangoConst; import org.omg.CORBA.Request; import org.tango.network.EndpointAvailabilityChecker; +import org.tango.transport.Message; import org.tango.transport.Transport; import org.tango.transport.TransportMeta; import org.tango.utils.DevFailedUtils; @@ -801,7 +802,14 @@ public void set_attribute_config(AttributeInfo[] attr) throws DevFailed { public DeviceAttribute read_attribute(String attname) throws DevFailed { if (transport.isConnected()) { try { - transport.send(String.format("read:%s", attname).getBytes(StandardCharsets.UTF_8)); + Message message = new Message("read", this.get_name() + "/" + attname, null, null); + + //TODO marshaller + + Message response = Message.fromString( + new String(transport.send(message.toString().getBytes(StandardCharsets.UTF_8)), StandardCharsets.UTF_8)); + System.out.println(response.value); + return new DeviceAttribute(attname, response.value); } catch (IOException e) { throw DevFailedUtils.newDevFailed(e); } @@ -837,7 +845,11 @@ public DeviceAttribute[] read_attribute(String[] attnames) throws DevFailed { public void writeAttribute(String name, double value) throws DevFailed { if (transport.isConnected()) { try { - transport.send(String.format("write:%s;type:double;value:%f", name, value).getBytes(StandardCharsets.UTF_8)); + Message message = new Message("write", this.get_name() + "/" + name, "double", String.valueOf(value)); + + //TODO marshaller + + transport.send(message.toString().getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { throw DevFailedUtils.newDevFailed(e); } diff --git a/common/src/main/java/org/tango/transport/Message.java b/common/src/main/java/org/tango/transport/Message.java new file mode 100644 index 00000000..b72fef74 --- /dev/null +++ b/common/src/main/java/org/tango/transport/Message.java @@ -0,0 +1,75 @@ +package org.tango.transport; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * @author Igor Khokhriakov + * @since 19.02.2020 + */ +public class Message { + public static final String PATTERN_STR = "(read|write|exec|response):([\\w\\W]+);(double|float|int|long|String|null):([\\w.]+)"; + public static final Pattern PATTERN = Pattern.compile(PATTERN_STR); + + public static final String TARGET_PATTERN_STR = "(\\w+)/(\\w+)/(\\w+)/(\\w+)"; + public static final Pattern TARGET_PATTERN = Pattern.compile(TARGET_PATTERN_STR); + + public String action; + public String target; + public String dataType; + public String value; + + public Message(String action, String target, String dataType, String value) { + this.action = action; + this.target = target; + this.dataType = dataType; + this.value = value; + } + + public static Message fromString(String str) { + Matcher matcher = PATTERN.matcher(str); + + if (matcher.matches()) + return new Message(matcher.group(1), matcher.group(2), matcher.group(3), matcher.group(4)); + else + throw new IllegalArgumentException("Unrecognized message: " + str); + } + + @Override + public String toString() { + return String.format( + "%s:%s;%s:%s", action, target, dataType, value + ); + } + + public static class Target { + public final String device; + public final String member; + + public Target(String device, String member) { + this.device = device; + this.member = member; + } + + public static Target fromString(String str) { + Matcher matcher = TARGET_PATTERN.matcher(str); + + if (matcher.matches()) + return new Target(String.format("%s/%s/%s", matcher.group(1), matcher.group(2), matcher.group(3)), matcher.group(4)); + else + throw new IllegalArgumentException("Unrecognized message: " + str); + } + } + + public static class Error extends Message { + public Error(String value) { + super("response", "error", "String", value); + } + } + + public static class Ok extends Message { + public Ok() { + super("response", "ok", null, null); + } + } +} diff --git a/server/src/main/java/org/tango/server/admin/AdminDevice.java b/server/src/main/java/org/tango/server/admin/AdminDevice.java index 2a99a2eb..99883337 100644 --- a/server/src/main/java/org/tango/server/admin/AdminDevice.java +++ b/server/src/main/java/org/tango/server/admin/AdminDevice.java @@ -24,6 +24,7 @@ */ package org.tango.server.admin; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import fr.esrf.Tango.ClntIdent; import fr.esrf.Tango.DevFailed; import fr.esrf.Tango.DevVarLongStringArray; @@ -56,10 +57,14 @@ import org.tango.server.properties.DevicePropertyImpl; import org.tango.server.servant.DeviceImpl; import org.tango.server.transport.TransportManager; +import org.tango.server.transport.ZmqTransportListener; import org.tango.utils.DevFailedUtils; import org.tango.utils.TangoUtil; +import org.zeromq.ZMQ; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -430,6 +435,13 @@ public String[] getPolledDevice() { return pollDevices.toArray(new String[pollDevices.size()]); } + private final ExecutorService executorService = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("ZmqTransportListener-%d") + .build() + ); + /** * @param dvlsa Lg[0]=Upd period. Str[0]=Device name. Str[1]=Object * type(COMMAND or ATTRIBUTE). Str[2]=Object name @@ -808,11 +820,21 @@ private DevVarLongStringArray subscribeEvent(final EventType eventType, final St private final TransportManager transportManager = new TransportManager(); + public DeviceImpl getDeviceImpl(String device) { + return classList.stream() + .map(deviceClassBuilder -> deviceClassBuilder.getDeviceImpl(device)) + .findFirst() + .orElseThrow(() -> new NoSuchElementException(device)); + } + @Command(name = "UpgradeTransport") public DevVarLongStringArray upgradeTransport() { - return transportManager - .upgradeTransport() - .toDevVarLongStringArray(); + + ZMQ.Socket socket = transportManager + .upgradeTransport(); + + executorService.submit(new ZmqTransportListener(socket, this)); + return transportManager.getTransportMeta().toDevVarLongStringArray(); } /** diff --git a/server/src/main/java/org/tango/server/transport/ReadMessageProcessor.java b/server/src/main/java/org/tango/server/transport/ReadMessageProcessor.java new file mode 100644 index 00000000..300f5b13 --- /dev/null +++ b/server/src/main/java/org/tango/server/transport/ReadMessageProcessor.java @@ -0,0 +1,38 @@ +package org.tango.server.transport; + +import fr.esrf.Tango.AttrDataFormat; +import fr.esrf.Tango.DevFailed; +import org.omg.CORBA.Any; +import org.tango.server.idl.CleverAnyAttribute; +import org.tango.server.servant.DeviceImpl; +import org.tango.transport.Message; + +import java.util.Arrays; + +/** + * @author Igor Khokhriakov + * @since 19.02.2020 + */ +public class ReadMessageProcessor implements ZmqMessageProcessor { + private final DeviceImpl device; + private final String attributeName; + private final String dataType; + + public ReadMessageProcessor(DeviceImpl device, String attributeName, String dataType) { + this.device = device; + this.attributeName = attributeName; + this.dataType = dataType; + } + + @Override + public Message process() { + try { + Any any = Arrays.stream(device.read_attributes(new String[]{attributeName})).map(attributeValue -> attributeValue.value).findFirst().get(); + + Object result = CleverAnyAttribute.get(any, Integer.parseInt(dataType), AttrDataFormat.SCALAR); + return new Message("response", attributeName, dataType, String.valueOf(result)); + } catch (DevFailed devFailed) { + return new Message.Error(devFailed.errors[0].reason); + } + } +} diff --git a/server/src/main/java/org/tango/server/transport/TransportManager.java b/server/src/main/java/org/tango/server/transport/TransportManager.java index 0c226f30..18a29e04 100644 --- a/server/src/main/java/org/tango/server/transport/TransportManager.java +++ b/server/src/main/java/org/tango/server/transport/TransportManager.java @@ -16,11 +16,15 @@ public class TransportManager { private final ZContext context = new ZContext(); + private String port; - public TransportMeta upgradeTransport() { + public ZMQ.Socket upgradeTransport() { ZMQ.Socket socket = createZMQSocket(); - int port = socket.bindToRandomPort("tcp://*"); + port = String.valueOf(socket.bindToRandomPort("tcp://*")); + return socket; + } + public TransportMeta getTransportMeta() { List connectionPoints = new NetworkInterfacesExtractor().getIp4Addresses(); TransportMeta result = new TransportMeta(); @@ -32,11 +36,11 @@ public TransportMeta upgradeTransport() { return result; } - public ZMQ.Socket createZMQSocket() { final ZMQ.Socket socket = context.createSocket(ZMQ.REP); socket.setLinger(0); socket.setReconnectIVL(-1); return socket; } + } diff --git a/server/src/main/java/org/tango/server/transport/WriteMessageProcessor.java b/server/src/main/java/org/tango/server/transport/WriteMessageProcessor.java new file mode 100644 index 00000000..e6e9be94 --- /dev/null +++ b/server/src/main/java/org/tango/server/transport/WriteMessageProcessor.java @@ -0,0 +1,37 @@ +package org.tango.server.transport; + +import fr.esrf.Tango.AttributeValue; +import fr.esrf.Tango.DevFailed; +import org.tango.server.idl.TangoIDLAttributeUtil; +import org.tango.server.servant.DeviceImpl; +import org.tango.transport.Message; + +/** + * @author Igor Khokhriakov + * @since 19.02.2020 + */ +public class WriteMessageProcessor implements ZmqMessageProcessor { + private final DeviceImpl device; + private final String attributeName; + private final Object value; + + public WriteMessageProcessor(DeviceImpl device, String attributeName, Object value) { + this.device = device; + this.attributeName = attributeName; + this.value = value; + } + + @Override + public Message process() { + try { + device.write_attributes( + new AttributeValue[]{ + TangoIDLAttributeUtil.toAttributeValue( + device.getAttributeImpl(attributeName).get(), + new org.tango.server.attribute.AttributeValue(value))}); + return new Message.Ok(); + } catch (DevFailed devFailed) { + return new Message.Error(devFailed.errors[0].reason); + } + } +} diff --git a/server/src/main/java/org/tango/server/transport/ZmqMessageProcessor.java b/server/src/main/java/org/tango/server/transport/ZmqMessageProcessor.java new file mode 100644 index 00000000..11c34163 --- /dev/null +++ b/server/src/main/java/org/tango/server/transport/ZmqMessageProcessor.java @@ -0,0 +1,11 @@ +package org.tango.server.transport; + +import org.tango.transport.Message; + +/** + * @author Igor Khokhriakov + * @since 19.02.2020 + */ +public interface ZmqMessageProcessor { + Message process(); +} diff --git a/server/src/main/java/org/tango/server/transport/ZmqMessageProcessorImpl.java b/server/src/main/java/org/tango/server/transport/ZmqMessageProcessorImpl.java new file mode 100644 index 00000000..70aa35bd --- /dev/null +++ b/server/src/main/java/org/tango/server/transport/ZmqMessageProcessorImpl.java @@ -0,0 +1,41 @@ +package org.tango.server.transport; + +import com.google.gson.Gson; +import org.tango.server.admin.AdminDevice; +import org.tango.server.servant.DeviceImpl; +import org.tango.transport.Message; + +/** + * @author Igor Khokhriakov + * @since 19.02.2020 + */ +public class ZmqMessageProcessorImpl implements ZmqMessageProcessor { + private final String msg; + private final AdminDevice admin; + + public ZmqMessageProcessorImpl(String msg, AdminDevice admin) { + this.msg = msg; + this.admin = admin; + } + + @Override + public Message process() { + Message message = Message.fromString(msg); + + Message.Target target = Message.Target.fromString(message.target); + + DeviceImpl device = admin.getDeviceImpl(target.device); + + switch (message.action) { + case "read": + return new ReadMessageProcessor(device, target.member, message.dataType).process(); + case "write": + return new WriteMessageProcessor(device, target.member, new Gson().fromJson(message.value, Object.class)).process(); + case "exec": + default: + return new Message.Error("Unsupported message action - " + message.action); + } + } + + +} diff --git a/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java b/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java new file mode 100644 index 00000000..5d5bd449 --- /dev/null +++ b/server/src/main/java/org/tango/server/transport/ZmqTransportListener.java @@ -0,0 +1,37 @@ +package org.tango.server.transport; + +import org.tango.server.admin.AdminDevice; +import org.zeromq.ZMQ; + +import java.nio.charset.StandardCharsets; + +/** + * @author Igor Khokhriakov + * @since 19.02.2020 + */ +public class ZmqTransportListener implements Runnable { + private final ZMQ.Socket socket; + private final AdminDevice admin; + + public ZmqTransportListener(ZMQ.Socket socket, AdminDevice admin) { + this.socket = socket; + this.admin = admin; + } + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + byte[] data = socket.recv(); + String msg = new String(data, StandardCharsets.UTF_8); + + //TODO non blocking + //TODO thread pool + socket.send( + new ZmqMessageProcessorImpl(msg, admin) + .process() + .toString() + .getBytes(StandardCharsets.UTF_8)); + + } + } +}