Skip to content

Commit

Permalink
Progress #35
Browse files Browse the repository at this point in the history
  • Loading branch information
Ingvord committed Feb 19, 2020
1 parent e683e3e commit dd454e6
Show file tree
Hide file tree
Showing 10 changed files with 290 additions and 8 deletions.
5 changes: 5 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
</dependencies>
</project>
16 changes: 14 additions & 2 deletions common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import fr.esrf.TangoDs.TangoConst;
import org.omg.CORBA.Request;
import org.tango.network.EndpointAvailabilityChecker;
import org.tango.transport.Message;
import org.tango.transport.Transport;
import org.tango.transport.TransportMeta;
import org.tango.utils.DevFailedUtils;
Expand Down Expand Up @@ -801,7 +802,14 @@ public void set_attribute_config(AttributeInfo[] attr) throws DevFailed {
public DeviceAttribute read_attribute(String attname) throws DevFailed {
if (transport.isConnected()) {
try {
transport.send(String.format("read:%s", attname).getBytes(StandardCharsets.UTF_8));
Message message = new Message("read", this.get_name() + "/" + attname, null, null);

//TODO marshaller

Message response = Message.fromString(
new String(transport.send(message.toString().getBytes(StandardCharsets.UTF_8)), StandardCharsets.UTF_8));
System.out.println(response.value);
return new DeviceAttribute(attname, response.value);
} catch (IOException e) {
throw DevFailedUtils.newDevFailed(e);
}
Expand Down Expand Up @@ -837,7 +845,11 @@ public DeviceAttribute[] read_attribute(String[] attnames) throws DevFailed {
public void writeAttribute(String name, double value) throws DevFailed {
if (transport.isConnected()) {
try {
transport.send(String.format("write:%s;type:double;value:%f", name, value).getBytes(StandardCharsets.UTF_8));
Message message = new Message("write", this.get_name() + "/" + name, "double", String.valueOf(value));

//TODO marshaller

transport.send(message.toString().getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
throw DevFailedUtils.newDevFailed(e);
}
Expand Down
75 changes: 75 additions & 0 deletions common/src/main/java/org/tango/transport/Message.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.tango.transport;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* @author Igor Khokhriakov <[email protected]>
* @since 19.02.2020
*/
public class Message {
public static final String PATTERN_STR = "(read|write|exec|response):([\\w\\W]+);(double|float|int|long|String|null):([\\w.]+)";
public static final Pattern PATTERN = Pattern.compile(PATTERN_STR);

public static final String TARGET_PATTERN_STR = "(\\w+)/(\\w+)/(\\w+)/(\\w+)";
public static final Pattern TARGET_PATTERN = Pattern.compile(TARGET_PATTERN_STR);

public String action;
public String target;
public String dataType;
public String value;

public Message(String action, String target, String dataType, String value) {
this.action = action;
this.target = target;
this.dataType = dataType;
this.value = value;
}

public static Message fromString(String str) {
Matcher matcher = PATTERN.matcher(str);

if (matcher.matches())
return new Message(matcher.group(1), matcher.group(2), matcher.group(3), matcher.group(4));
else
throw new IllegalArgumentException("Unrecognized message: " + str);
}

@Override
public String toString() {
return String.format(
"%s:%s;%s:%s", action, target, dataType, value
);
}

public static class Target {
public final String device;
public final String member;

public Target(String device, String member) {
this.device = device;
this.member = member;
}

public static Target fromString(String str) {
Matcher matcher = TARGET_PATTERN.matcher(str);

if (matcher.matches())
return new Target(String.format("%s/%s/%s", matcher.group(1), matcher.group(2), matcher.group(3)), matcher.group(4));
else
throw new IllegalArgumentException("Unrecognized message: " + str);
}
}

public static class Error extends Message {
public Error(String value) {
super("response", "error", "String", value);
}
}

public static class Ok extends Message {
public Ok() {
super("response", "ok", null, null);
}
}
}
28 changes: 25 additions & 3 deletions server/src/main/java/org/tango/server/admin/AdminDevice.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
*/
package org.tango.server.admin;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import fr.esrf.Tango.ClntIdent;
import fr.esrf.Tango.DevFailed;
import fr.esrf.Tango.DevVarLongStringArray;
Expand Down Expand Up @@ -56,10 +57,14 @@
import org.tango.server.properties.DevicePropertyImpl;
import org.tango.server.servant.DeviceImpl;
import org.tango.server.transport.TransportManager;
import org.tango.server.transport.ZmqTransportListener;
import org.tango.utils.DevFailedUtils;
import org.tango.utils.TangoUtil;
import org.zeromq.ZMQ;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -430,6 +435,13 @@ public String[] getPolledDevice() {
return pollDevices.toArray(new String[pollDevices.size()]);
}

private final ExecutorService executorService = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("ZmqTransportListener-%d")
.build()
);

/**
* @param dvlsa Lg[0]=Upd period. Str[0]=Device name. Str[1]=Object
* type(COMMAND or ATTRIBUTE). Str[2]=Object name
Expand Down Expand Up @@ -808,11 +820,21 @@ private DevVarLongStringArray subscribeEvent(final EventType eventType, final St

private final TransportManager transportManager = new TransportManager();

public DeviceImpl getDeviceImpl(String device) {
return classList.stream()
.map(deviceClassBuilder -> deviceClassBuilder.getDeviceImpl(device))
.findFirst()
.orElseThrow(() -> new NoSuchElementException(device));
}

@Command(name = "UpgradeTransport")
public DevVarLongStringArray upgradeTransport() {
return transportManager
.upgradeTransport()
.toDevVarLongStringArray();

ZMQ.Socket socket = transportManager
.upgradeTransport();

executorService.submit(new ZmqTransportListener(socket, this));
return transportManager.getTransportMeta().toDevVarLongStringArray();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.tango.server.transport;

import fr.esrf.Tango.AttrDataFormat;
import fr.esrf.Tango.DevFailed;
import org.omg.CORBA.Any;
import org.tango.server.idl.CleverAnyAttribute;
import org.tango.server.servant.DeviceImpl;
import org.tango.transport.Message;

import java.util.Arrays;

/**
* @author Igor Khokhriakov <[email protected]>
* @since 19.02.2020
*/
public class ReadMessageProcessor implements ZmqMessageProcessor {
private final DeviceImpl device;
private final String attributeName;
private final String dataType;

public ReadMessageProcessor(DeviceImpl device, String attributeName, String dataType) {
this.device = device;
this.attributeName = attributeName;
this.dataType = dataType;
}

@Override
public Message process() {
try {
Any any = Arrays.stream(device.read_attributes(new String[]{attributeName})).map(attributeValue -> attributeValue.value).findFirst().get();

Object result = CleverAnyAttribute.get(any, Integer.parseInt(dataType), AttrDataFormat.SCALAR);
return new Message("response", attributeName, dataType, String.valueOf(result));
} catch (DevFailed devFailed) {
return new Message.Error(devFailed.errors[0].reason);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
public class TransportManager {
private final ZContext context = new ZContext();

private String port;

public TransportMeta upgradeTransport() {
public ZMQ.Socket upgradeTransport() {
ZMQ.Socket socket = createZMQSocket();
int port = socket.bindToRandomPort("tcp://*");
port = String.valueOf(socket.bindToRandomPort("tcp://*"));
return socket;
}

public TransportMeta getTransportMeta() {
List<String> connectionPoints = new NetworkInterfacesExtractor().getIp4Addresses();

TransportMeta result = new TransportMeta();
Expand All @@ -32,11 +36,11 @@ public TransportMeta upgradeTransport() {
return result;
}


public ZMQ.Socket createZMQSocket() {
final ZMQ.Socket socket = context.createSocket(ZMQ.REP);
socket.setLinger(0);
socket.setReconnectIVL(-1);
return socket;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.tango.server.transport;

import fr.esrf.Tango.AttributeValue;
import fr.esrf.Tango.DevFailed;
import org.tango.server.idl.TangoIDLAttributeUtil;
import org.tango.server.servant.DeviceImpl;
import org.tango.transport.Message;

/**
* @author Igor Khokhriakov <[email protected]>
* @since 19.02.2020
*/
public class WriteMessageProcessor implements ZmqMessageProcessor {
private final DeviceImpl device;
private final String attributeName;
private final Object value;

public WriteMessageProcessor(DeviceImpl device, String attributeName, Object value) {
this.device = device;
this.attributeName = attributeName;
this.value = value;
}

@Override
public Message process() {
try {
device.write_attributes(
new AttributeValue[]{
TangoIDLAttributeUtil.toAttributeValue(
device.getAttributeImpl(attributeName).get(),
new org.tango.server.attribute.AttributeValue(value))});
return new Message.Ok();
} catch (DevFailed devFailed) {
return new Message.Error(devFailed.errors[0].reason);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.tango.server.transport;

import org.tango.transport.Message;

/**
* @author Igor Khokhriakov <[email protected]>
* @since 19.02.2020
*/
public interface ZmqMessageProcessor {
Message process();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.tango.server.transport;

import com.google.gson.Gson;
import org.tango.server.admin.AdminDevice;
import org.tango.server.servant.DeviceImpl;
import org.tango.transport.Message;

/**
* @author Igor Khokhriakov <[email protected]>
* @since 19.02.2020
*/
public class ZmqMessageProcessorImpl implements ZmqMessageProcessor {
private final String msg;
private final AdminDevice admin;

public ZmqMessageProcessorImpl(String msg, AdminDevice admin) {
this.msg = msg;
this.admin = admin;
}

@Override
public Message process() {
Message message = Message.fromString(msg);

Message.Target target = Message.Target.fromString(message.target);

DeviceImpl device = admin.getDeviceImpl(target.device);

switch (message.action) {
case "read":
return new ReadMessageProcessor(device, target.member, message.dataType).process();
case "write":
return new WriteMessageProcessor(device, target.member, new Gson().fromJson(message.value, Object.class)).process();
case "exec":
default:
return new Message.Error("Unsupported message action - " + message.action);
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.tango.server.transport;

import org.tango.server.admin.AdminDevice;
import org.zeromq.ZMQ;

import java.nio.charset.StandardCharsets;

/**
* @author Igor Khokhriakov <[email protected]>
* @since 19.02.2020
*/
public class ZmqTransportListener implements Runnable {
private final ZMQ.Socket socket;
private final AdminDevice admin;

public ZmqTransportListener(ZMQ.Socket socket, AdminDevice admin) {
this.socket = socket;
this.admin = admin;
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
byte[] data = socket.recv();
String msg = new String(data, StandardCharsets.UTF_8);

//TODO non blocking
//TODO thread pool
socket.send(
new ZmqMessageProcessorImpl(msg, admin)
.process()
.toString()
.getBytes(StandardCharsets.UTF_8));

}
}
}

0 comments on commit dd454e6

Please sign in to comment.