Skip to content

Commit

Permalink
Progress #35
Browse files Browse the repository at this point in the history
  • Loading branch information
Ingvord committed Feb 19, 2020
1 parent 437e40a commit e683e3e
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 52 deletions.
2 changes: 2 additions & 0 deletions common/src/main/java/fr/esrf/Tango/factory/ITangoFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
package fr.esrf.Tango.factory;

import fr.esrf.TangoApi.*;
import org.tango.transport.Transport;

public interface ITangoFactory {

Expand All @@ -70,4 +71,5 @@ public interface ITangoFactory {

public String getFactoryName();

Transport newTransport();
}
5 changes: 5 additions & 0 deletions common/src/main/java/fr/esrf/Tango/factory/TangoFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
package fr.esrf.Tango.factory;

import fr.esrf.TangoApi.*;
import org.tango.transport.Transport;

import java.io.BufferedInputStream;
import java.io.InputStream;
Expand Down Expand Up @@ -203,4 +204,8 @@ public boolean isDefaultFactory() {
public void setDefaultFactory(final boolean isDefaultFactory) {
this.isDefaultFactory = isDefaultFactory;
}

public Transport newTransport() {
return tangoFactory.newTransport();
}
}
52 changes: 52 additions & 0 deletions common/src/main/java/fr/esrf/TangoApi/DeviceProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@
import fr.esrf.TangoDs.Except;
import fr.esrf.TangoDs.TangoConst;
import org.omg.CORBA.Request;
import org.tango.network.EndpointAvailabilityChecker;
import org.tango.transport.Transport;
import org.tango.transport.TransportMeta;
import org.tango.utils.DevFailedUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -80,6 +86,8 @@ public class DeviceProxy extends Connection implements ApiDefs {

private IDeviceProxyDAO deviceProxyDAO = null;

private final Transport transport = TangoFactory.getSingleton().newTransport();

static final private boolean check_idl = false;

/**
Expand Down Expand Up @@ -791,6 +799,13 @@ public void set_attribute_config(AttributeInfo[] attr) throws DevFailed {
*/
// ==========================================================================
public DeviceAttribute read_attribute(String attname) throws DevFailed {
if (transport.isConnected()) {
try {
transport.send(String.format("read:%s", attname).getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
throw DevFailedUtils.newDevFailed(e);
}
}
return deviceProxyDAO.read_attribute(this, attname);
}

Expand Down Expand Up @@ -818,6 +833,19 @@ public DeviceAttribute[] read_attribute(String[] attnames) throws DevFailed {
return deviceProxyDAO.read_attribute(this, attnames);
}

//TODO extract and decorate
public void writeAttribute(String name, double value) throws DevFailed {
if (transport.isConnected()) {
try {
transport.send(String.format("write:%s;type:double;value:%f", name, value).getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
throw DevFailedUtils.newDevFailed(e);
}
}
DeviceAttribute deviceAttribute = new DeviceAttribute(name, value);
write_attribute(deviceAttribute);
}

// ==========================================================================
/**
* Write the attribute value for the specified device.
Expand Down Expand Up @@ -1825,6 +1853,26 @@ public int subscribe_event(int event, int max_size, boolean stateless) throws De
return deviceProxyDAO.subscribe_event(this, event, max_size, stateless);
}

public void upgradeProtocol() throws DevFailed {
DeviceData response = this.get_adm_dev().command_inout("UpgradeProtocol");

DevVarLongStringArray array = response.extractLongStringArray();

TransportMeta transportMeta = TransportMeta.fromDevVarLongStringArray(array);

EndpointAvailabilityChecker predicate = new EndpointAvailabilityChecker();
String endpoint = transportMeta.getEndPoints().stream()
.filter(predicate)
.findFirst()
.orElseThrow(() -> new RuntimeException("No reachable connection points were found"));

try {
transport.connect(endpoint);
} catch (IOException e) {
throw DevFailedUtils.newDevFailed(e);
}
}

// ==========================================================================
// ==========================================================================
public void setEventQueue(EventQueue eq) {
Expand Down Expand Up @@ -2020,6 +2068,10 @@ private static void checkDuplication(String[] list, String orig) throws DevFaile
*/
//==========================================================================
protected void finalize() {
try {
transport.close();
} catch (IOException ignored) {
}
if (proxy_lock_cnt>0) {
try {
unlock();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.tango.network;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.function.Predicate;

/**
* @author Igor Khokhriakov <[email protected]>
* @since 19.02.2020
*/
public class EndpointAvailabilityChecker implements Predicate<String> {
private final Logger logger = LoggerFactory.getLogger(EndpointAvailabilityChecker.class);

@Override
public boolean test(String endpoint) {
logger.debug("Check endpoint: {}", endpoint);
URI uri = null;
try {
uri = new URI(endpoint);
} catch (URISyntaxException e) {
logger.debug("Bad endpoint: " + endpoint, e);
return false;
}

// Try to connect
InetSocketAddress ip = new InetSocketAddress(uri.getHost(), uri.getPort());
try (Socket socket = new Socket()) {
socket.connect(ip, 10);
return true;
} catch (IOException e) {
logger.debug("Failed to connect to " + ip, e);
return false;
}
}
}
18 changes: 18 additions & 0 deletions common/src/main/java/org/tango/transport/Transport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.tango.transport;

import java.io.Closeable;
import java.io.IOException;

/**
* @author Igor Khokhriakov <[email protected]>
* @since 19.02.2020
*/
public interface Transport extends Closeable {
boolean isConnected();

void connect(String endpoint) throws IOException;

void disconnect(String endpoint) throws IOException;

byte[] send(byte[] data) throws IOException;
}
43 changes: 43 additions & 0 deletions common/src/main/java/org/tango/transport/TransportMeta.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.tango.transport;

import com.google.common.collect.Lists;
import fr.esrf.Tango.DevVarLongStringArray;

import java.util.Arrays;
import java.util.List;

/**
* @author Igor Khokhriakov <[email protected]>
* @since 18.02.2020
*/
public class TransportMeta {
private List<String> endPoints = Lists.newArrayList();

public TransportMeta() {
}

public static TransportMeta fromDevVarLongStringArray(DevVarLongStringArray array) {
TransportMeta transportMeta = new TransportMeta();
transportMeta.setEndPoints(Arrays.asList(array.svalue));
return transportMeta;
}

public List<String> getEndPoints() {
return endPoints;
}

public void setEndPoints(List<String> endPoints) {
this.endPoints = endPoints;
}

public void addConnectionPoint(String connectionPoint) {
endPoints.add(connectionPoint);
}

public DevVarLongStringArray toDevVarLongStringArray() {
DevVarLongStringArray result = new DevVarLongStringArray(new int[0],
endPoints.toArray(new String[0]));

return result;
}
}
28 changes: 5 additions & 23 deletions dao/src/main/java/fr/esrf/TangoApi/events/ZmqEventConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tango.network.EndpointAvailabilityChecker;
import org.tango.utils.DevFailedUtils;

import java.io.IOException;
import java.net.*;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -379,7 +379,7 @@ private DeviceData checkWithHostAddress(DeviceData deviceData, DeviceProxy devic
logger.debug("{} ---> {}", wrongAdd, lsa.svalue[0]);
deviceData = new DeviceData();
deviceData.insert(lsa);
isEndpointAvailable(lsa.svalue[0]);
new EndpointAvailabilityChecker().test(lsa.svalue[0]);
}
}
} catch (UnknownHostException e) {
Expand All @@ -402,12 +402,13 @@ private DeviceData checkZmqAddress(DeviceData deviceData, DeviceProxy deviceProx
logger.trace("Inside checkZmqAddress()");
DevVarLongStringArray lsa = deviceData.extractLongStringArray();

EndpointAvailabilityChecker predicate = new EndpointAvailabilityChecker();
Pair<String, String> validEndpoints = Optional.ofNullable(Observable.zip(
Observable.fromArray(lsa.svalue),
Observable.fromArray(lsa.svalue).skip(1),
ImmutablePair::of
)
.filter(pair -> isEndpointAvailable(pair.left))
.filter(pair -> predicate.test(pair.left))
.blockingFirst(null))
.orElseGet(() -> {
DeviceData checkedWithHostAddress = checkWithHostAddress(deviceData, deviceProxy);
Expand All @@ -425,26 +426,7 @@ private DeviceData checkZmqAddress(DeviceData deviceData, DeviceProxy deviceProx
return overridden;
}

private boolean isEndpointAvailable(String endpoint) {
logger.debug("Check endpoint: {}", endpoint);
URI uri = null;
try {
uri = new URI(endpoint);
} catch (URISyntaxException e) {
logger.debug("Bad endpoint: " + endpoint, e);
return false;
}

// Try to connect
InetSocketAddress ip = new InetSocketAddress(uri.getHost(), uri.getPort());
try (Socket socket = new Socket()) {
socket.connect(ip, 10);
return true;
} catch (IOException e) {
logger.debug("Failed to connect to " + ip, e);
return false;
}
}

private void checkDeviceConnection(DeviceProxy deviceProxy,
String attribute, DeviceData deviceData, String event_name) throws DevFailed {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import fr.esrf.Tango.factory.ITangoFactory;
import fr.esrf.TangoApi.*;
import org.tango.transport.ZmqTransport;

public class DefaultTangoFactoryImpl implements ITangoFactory {

Expand Down Expand Up @@ -136,4 +137,8 @@ public String getFactoryName()
return "TANGORB Default";
}

public ZmqTransport newTransport() {
return new ZmqTransport();
}

}
46 changes: 46 additions & 0 deletions dao/src/main/java/org/tango/transport/ZmqTransport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.tango.transport;

import org.zeromq.ZContext;
import org.zeromq.ZMQ;

/**
* @author Igor Khokhriakov <[email protected]>
* @since 19.02.2020
*/
//@ThreadSafe
public class ZmqTransport implements Transport {
private final ZContext zcontext = new ZContext();
private ZMQ.Socket socket;

@Override
public synchronized boolean isConnected() {
return socket != null;
}

@Override
public synchronized void connect(String endpoint) {
ZMQ.Socket socket = zcontext.createSocket(ZMQ.REQ);

socket.connect(endpoint);

this.socket = socket;
}

@Override
public synchronized void disconnect(String endpoint) {
zcontext.getSockets()
.forEach(socket -> socket.disconnect(endpoint));
}

@Override
public synchronized byte[] send(byte[] data) {
socket.send(data);

return socket.recv();
}

@Override
public synchronized void close() {
zcontext.close();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.tango.server.transport;

import org.tango.server.network.NetworkInterfacesExtractor;
import org.tango.transport.TransportMeta;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

Expand Down
29 changes: 0 additions & 29 deletions server/src/main/java/org/tango/server/transport/TransportMeta.java

This file was deleted.

0 comments on commit e683e3e

Please sign in to comment.