Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
<okhttp.version>3.13.1</okhttp.version>
<opencsv.version>5.9</opencsv.version>
<opennlp.version>2.3.1</opennlp.version>
<plc4x.version>0.12.0</plc4x.version>
<plc4x.version>0.13.1</plc4x.version>
<plexus-component-annotations.version>2.2.0</plexus-component-annotations.version>
<plexus-interactivity-api.version>1.1</plexus-interactivity-api.version>
<plexus-utils.version>4.0.0</plexus-utils.version>
Expand Down Expand Up @@ -1179,8 +1179,6 @@
</profiles>

<!-- Build Settings -->


<build>

<!-- Plugin Management -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -47,8 +48,8 @@ public void schedule(IPullAdapter pullAdapter,
final Runnable task = () -> {
try {
pullAdapter.pullData();
} catch (ExecutionException | InterruptedException | TimeoutException e) {
LOG.error("Error while pulling data", e);
} catch (ExecutionException | InterruptedException | TimeoutException | CompletionException e) {
LOG.error("Error while pulling data: {}", e.getMessage());
SpMonitoringManager.INSTANCE.addErrorMessage(
adapterElementId,
SpLogEntry.from(System.currentTimeMillis(), SpLogMessage.from(e))
Expand Down
6 changes: 5 additions & 1 deletion streampipes-extensions/streampipes-connectors-plc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-spi</artifactId>
<version>${plc4x.version}</version>
</dependency>
<dependency>
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-driver-all</artifactId>
Expand All @@ -78,7 +83,6 @@
<version>${plc4x.version}</version>
</dependency>


<!-- Test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.apache.streampipes.extensions.connectors.plc.adapter.migration.Plc4xS7AdapterMigrationV1;
import org.apache.streampipes.extensions.connectors.plc.adapter.modbus.Plc4xModbusAdapter;
import org.apache.streampipes.extensions.connectors.plc.adapter.s7.Plc4xS7Adapter;
import org.apache.streampipes.extensions.connectors.plc.cache.SpCachedPlcConnectionManager;

import org.apache.plc4x.java.api.PlcDriverManager;
import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -43,7 +43,7 @@ public class PlcConnectorsModuleExport implements IExtensionModuleExport {
public List<StreamPipesAdapter> adapters() {
var env = Environments.getEnvironment();
var driverManager = PlcDriverManager.getDefault();
var cachedConnectionManager = CachedPlcConnectionManager
var cachedConnectionManager = SpCachedPlcConnectionManager
.getBuilder(driverManager.getConnectionManager())
.withMaxWaitTime(Duration.ofMillis(env.getPlc4xMaxWaitTimeMs().getValueOrDefault()))
.withMaxLeaseTime(Duration.ofMillis(env.getPlc4xMaxLeaseTimeMs().getValueOrDefault()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,19 @@ public void pullData() throws RuntimeException {

private void connectAndReadPlcData() {
try (PlcConnection plcConnection = connectionManager.getConnection(settings.connectionString())) {
var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes());
var readResponse = readRequest.execute()
.get(5000, TimeUnit.MILLISECONDS);
processPlcReadResponse(readResponse);
if (plcConnection.isConnected()) {
var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes());
var readResponse = readRequest.execute()
.get(5000, TimeUnit.MILLISECONDS);
processPlcReadResponse(readResponse);
} else {
LOG.info("Not connected");
}
} catch (Exception e) {
handleFailingPlcRead(e);
}
}

private void processPlcReadResponse(PlcReadResponse readResponse) {
var event = eventGenerator.makeEvent(readResponse);
collector.collect(event);
this.resetIdlePulls();
}

private void handleFailingPlcRead(Exception e) {
// ensure that the cached connection manager removes the broken connection
if (connectionManager instanceof CachedPlcConnectionManager) {
Expand All @@ -98,13 +96,19 @@ private void handleFailingPlcRead(Exception e) {
}

LOG.error(
"Error while reading from PLC with connection string {}. Setting adapter to idle for {} attemtps. {} ",
"Error while reading from PLC with connection string {}. Setting adapter to idle for {} attempts. {} ",
settings.connectionString(), idlePullsBeforeNextAttempt, e.getMessage()
);

currentIdlePulls = 0;
}

private void processPlcReadResponse(PlcReadResponse readResponse) {
var event = eventGenerator.makeEvent(readResponse);
collector.collect(event);
this.resetIdlePulls();
}

private void idleRead() {
LOG.debug("Skipping pullData call for {}. Idle pulls left: {}",
settings.connectionString(), idlePullsBeforeNextAttempt - currentIdlePulls);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class PlcEventGenerator {
Expand All @@ -43,9 +42,8 @@ public Map<String, Object> makeEvent(PlcReadResponse response) {

for (String key : nodes.keySet()) {
if (response.getResponseCode(key) == PlcResponseCode.OK) {

// if the response is a list, add each element to the result
if (response.getObject(key) instanceof List) {
if (response.getAsPlcValue().getValue(key).isList()) {
event.put(key,
response.getAsPlcValue()
.getValue(key)
Expand All @@ -57,8 +55,7 @@ public Map<String, Object> makeEvent(PlcReadResponse response) {
event.put(key, response.getObject(key));
}
} else {
LOG.error("Error[" + key + "]: "
+ response.getResponseCode(key).name());
LOG.error("Error[{}]: {}", key, response.getResponseCode(key).name());
}
}
return event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,28 @@ public Plc4xConnectionExtractor(IStaticPropertyExtractor extractor,
}

public Plc4xConnectionSettings makeSettings() {
var modifiedProtocolCode = checkAndOverrideProtocolCode();
var host = extractHost();
var transportCode = extractTransportCode();
var transportConfigs = extractTransportMetadata(transportCode);
var protocolConfigs = extractProtocolMetadata();
var configParameters = makeConfigParameters(transportConfigs, protocolConfigs);

return new Plc4xConnectionSettings(
getConnectionString(host, transportCode, protocolCode, configParameters),
getConnectionString(host, transportCode, modifiedProtocolCode, configParameters),
extractor.singleValueParameter(Plc4xLabels.PLC_POLLING_INTERVAL, Integer.class),
extractNodes()
);
}

private String checkAndOverrideProtocolCode() {
if (protocolCode.equals("s7")) {
return "s7-light";
} else {
return protocolCode;
}
}

private String getConnectionString(String host,
String transportCode,
String protocolCode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class Plc4xS7Adapter implements StreamPipesAdapter {
*/
public static final String ID = "org.apache.streampipes.connect.iiot.adapters.plc4x.s7";

private static final String S7_URL = "s7://";
private static final String S7_URL = "s7-light://";

/**
* Keys of user configuration parameters
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.connectors.plc.cache;

import org.apache.plc4x.java.DefaultPlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.utils.cache.exceptions.PlcConnectionManagerClosedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

public class SpCachedPlcConnectionManager implements PlcConnectionManager, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(SpCachedPlcConnectionManager.class);

private final PlcConnectionManager connectionManager;
private final Duration maxLeaseTime;
private final Duration maxWaitTime;
private final Duration maxIdleTime;

private final Map<String, SpConnectionContainer> connectionContainers;

private final AtomicBoolean closed = new AtomicBoolean(false);

public static SpCachedPlcConnectionManager.Builder getBuilder() {
return new SpCachedPlcConnectionManager.Builder(new DefaultPlcDriverManager());
}

public static SpCachedPlcConnectionManager.Builder getBuilder(PlcConnectionManager connectionManager) {
return new SpCachedPlcConnectionManager.Builder(connectionManager);
}

public SpCachedPlcConnectionManager(PlcConnectionManager connectionManager, Duration maxLeaseTime, Duration maxWaitTime, Duration maxIdleTime) {
this.connectionManager = connectionManager;
this.maxLeaseTime = maxLeaseTime;
this.maxWaitTime = maxWaitTime;
this.maxIdleTime = maxIdleTime;
this.connectionContainers = new HashMap<>();
}

/**
* @return set of connection-urls the CachedPlcConnectionManager is currently managing.
*/
public Set<String> getCachedConnections() {
synchronized (connectionContainers) {
return connectionContainers.keySet();
}
}

/**
* Removes a given connection from the cache (Should only be used in order to remove somehow broken connections).
*
* @param url url of the connection that should be removed.
*/
public void removeCachedConnection(String url) {
synchronized (connectionContainers) {
// Make sure the connection is closed before removing it.
if (connectionContainers.containsKey(url)) {
connectionContainers.get(url).close();
}
connectionContainers.remove(url);
}
}

public PlcConnection getConnection(String url) throws PlcConnectionException {
// If the connection manager is already closed, abort.
if (closed.get()) {
throw new PlcConnectionManagerClosedException();
}

// Get a connection container for the given url.
SpConnectionContainer connectionContainer;
synchronized (connectionContainers) {
connectionContainer = connectionContainers.get(url);
if (connectionContainer == null) {
LOG.debug("Creating new connection");

// Crate a connection container to manage handling this connection
connectionContainer = new SpConnectionContainer(connectionManager, url, maxLeaseTime, maxIdleTime,
closeConnection -> {
removeCachedConnection(closeConnection);
return null;
});
connectionContainers.put(url, connectionContainer);
} else {
LOG.debug("Reusing exising connection");
}
}

// Get a lease (a future for a connection)
Future<PlcConnection> leaseFuture = connectionContainer.lease();
try {
return leaseFuture.get(this.maxWaitTime.toMillis(), TimeUnit.MILLISECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
throw new PlcConnectionException("Error acquiring lease for connection");
}
}

public PlcConnection getConnection(String url, PlcAuthentication authentication) throws PlcConnectionException {
throw new PlcConnectionException("the cached driver manager currently doesn't support authentication");
}

@Override
public void close() throws Exception {
// Set the cache to "closed" so no new connections can be requested.
closed.set(true);

// Tell all connections to close themselves.
connectionContainers.forEach((connectionString, connectionContainer) -> {
connectionContainer.close();
});
}

public static class Builder {

private final PlcConnectionManager connectionManager;
private Duration maxLeaseTime;
private Duration maxWaitTime;
private Duration maxIdleTime;

public Builder(PlcConnectionManager connectionManager) {
this.connectionManager = connectionManager;
this.maxLeaseTime = Duration.ofSeconds(4);
this.maxWaitTime = Duration.ofSeconds(20);
this.maxIdleTime = Duration.ofMinutes(5);
}

public SpCachedPlcConnectionManager build() {
return new SpCachedPlcConnectionManager(
this.connectionManager, this.maxLeaseTime, this.maxWaitTime, this.maxIdleTime);
}

public SpCachedPlcConnectionManager.Builder withMaxLeaseTime(Duration maxLeaseTime) {
this.maxLeaseTime = maxLeaseTime;
return this;
}

public SpCachedPlcConnectionManager.Builder withMaxWaitTime(Duration maxWaitTime) {
this.maxWaitTime = maxWaitTime;
return this;
}

public SpCachedPlcConnectionManager.Builder withMaxIdleTime(Duration maxIdleTime) {
this.maxIdleTime = maxIdleTime;
return this;
}
}

}

Loading
Loading