Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added support for emitting connection errors in WireAsset #5479

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ Import-Package: org.apache.logging.log4j;version="2.8.2",
Require-Capability: osgi.ee;filter:="(&(osgi.ee=JavaSE)(version=1.8))"
Bundle-ActivationPolicy: lazy
Service-Component: OSGI-INF/*.xml
Export-Package: org.eclipse.kura.internal.wire.asset;version="1.0.0"
Export-Package: org.eclipse.kura.internal.wire.asset;version="1.1.0";x-internal:=true
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.eclipse.kura.channel.ChannelType.WRITE;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -27,6 +28,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.kura.KuraException;
import org.eclipse.kura.asset.AssetConfiguration;
import org.eclipse.kura.asset.provider.BaseAsset;
import org.eclipse.kura.channel.Channel;
Expand All @@ -36,6 +38,7 @@
import org.eclipse.kura.channel.listener.ChannelListener;
import org.eclipse.kura.core.configuration.metatype.Tad;
import org.eclipse.kura.core.configuration.metatype.Tocd;
import org.eclipse.kura.driver.Driver.ConnectionException;
import org.eclipse.kura.driver.PreparedRead;
import org.eclipse.kura.type.TypedValue;
import org.eclipse.kura.type.TypedValues;
Expand Down Expand Up @@ -249,10 +252,36 @@ private void emitAllReadChannels() {
emitChannelRecords(readAllChannels());
} catch (final Exception e) {
logger.error("Error while performing read from the Wire Asset: {}", getKuraServicePid(), e);
if (this.options.emitErrors() && this.options.emitConnectionErrors()) {
emitAssetError(e);
}
}
}
}

private void emitAssetError(final Throwable e) {
if (e instanceof KuraException && e.getCause() instanceof ConnectionException) {
emitAssetError(e.getCause());
return;
}

final Map<String, TypedValue<?>> wireRecordProperties = new HashMap<>();

final String message = e.getMessage();

wireRecordProperties.put(WireAssetConstants.PROP_ASSET_NAME.value(),
TypedValues.newStringValue(getKuraServicePid()));
wireRecordProperties.put(WireAssetConstants.PROP_ASSET_ERROR.value(),
TypedValues.newStringValue(message != null ? message : e.getClass().getSimpleName()));

if (options.getTimestampMode() != TimestampMode.NO_TIMESTAMPS) {
wireRecordProperties.put(WireAssetConstants.PROP_SINGLE_TIMESTAMP_NAME.value(),
TypedValues.newLongValue(System.currentTimeMillis()));
}

this.wireSupport.emit(Collections.singletonList(new WireRecord(wireRecordProperties)));
}

/**
* Determine the channels to write
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2018, 2021 Eurotech and/or its affiliates and others
* Copyright (c) 2018, 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -23,6 +23,7 @@ public enum WireAssetConstants {
PROP_SINGLE_TIMESTAMP_NAME("assetTimestamp"),
PROP_ASSET_NAME("assetName"),
PROP_SUFFIX_TIMESTAMP(PROPERTY_SEPARATOR.value() + "timestamp"),
PROP_ASSET_ERROR("assetError"),

PROP_SUFFIX_UNIT(PROPERTY_SEPARATOR.value() + "unit"),

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2018, 2022 Eurotech and/or its affiliates and others
* Copyright (c) 2018, 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand Down Expand Up @@ -38,8 +38,18 @@ public class WireAssetOCD extends BaseAssetOCD {
+ "a single driver generated timestamp being respectively"
+ " the max (most recent) or min (oldest) among the timestamps of the channels.";

private static final String EMIT_ERRORS_DESCRIPTION = "Specifies wheter errors should be included or not "
+ "in the emitted envelope";
private static final String EMIT_ERRORS_DESCRIPTION = "Specifies whether channel specific errors should be included or not "
+ "in emitted envelopes. If enabled, the component will add an additional property per channel, named \"<channel_name>_error\"."
+ " If the channel operation fails, the property value will be an error message reported by the Driver,"
+ " if the operation succeeds the property value will be the empty string.";

private static final String EMIT_CONNECTION_ERRORS_DESCRIPTION = "Specifies whether the component should emit an envelope"
+ " in case of a general connection exception reported by the Driver "
+ "(for example due to the fact that the connection with a remote device cannot be established)."
+ " The error message associated with the exception will be emitted in a property named \"assetError\"."
+ " In case of connection exception, channel values are not available and no channel related properties will be emitted."
+ " If the \"timestamp.mode\" property is set to a value other than \"NO_TIMESTAMPS\", the component will also emit a \"assetTimestamp\""
+ " property reporting current system time. This property will be ignored if \"emit.errors\" is disabled.";

private static final String EMIT_ON_CHANGE_DESCRIPTION = "If set to true, this component will include"
+ " a channel value in the output emitted in Kura Wires"
Expand Down Expand Up @@ -98,6 +108,17 @@ public WireAssetOCD() {

addAD(emitErrorsAd);

final Tad emitConnectionErrorsAd = new Tad();
emitConnectionErrorsAd.setId(WireAssetOptions.EMIT_CONNECTION_ERRORS_PROP_NAME);
emitConnectionErrorsAd.setName(WireAssetOptions.EMIT_CONNECTION_ERRORS_PROP_NAME);
emitConnectionErrorsAd.setCardinality(0);
emitConnectionErrorsAd.setType(Tscalar.BOOLEAN);
emitConnectionErrorsAd.setDescription(EMIT_CONNECTION_ERRORS_DESCRIPTION);
emitConnectionErrorsAd.setRequired(true);
emitConnectionErrorsAd.setDefault(FALSE);

addAD(emitConnectionErrorsAd);

final Tad emitOnChangeAd = new Tad();
emitOnChangeAd.setId(WireAssetOptions.EMIT_ON_CHANGE_PROP_NAME);
emitOnChangeAd.setName(WireAssetOptions.EMIT_ON_CHANGE_PROP_NAME);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2018, 2022 Eurotech and/or its affiliates and others
* Copyright (c) 2018, 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -20,12 +20,14 @@ class WireAssetOptions {
public static final String EMIT_ALL_CHANNELS_PROP_NAME = "emit.all.channels";
public static final String TIMESTAMP_MODE_PROP_NAME = "timestamp.mode";
public static final String EMIT_ERRORS_PROP_NAME = "emit.errors";
public static final String EMIT_CONNECTION_ERRORS_PROP_NAME = "emit.connection.errors";
public static final String EMIT_ON_CHANGE_PROP_NAME = "emit.on.change";
public static final String EMIT_EMPTY_ENVELOPES_PROP_NAME = "emit.empty.envelopes";

private boolean emitAllChannels;
private TimestampMode timestampMode;
private boolean emitErrors;
private boolean emitConnectionErrors;
private boolean emitOnChange;
private boolean emitEmptyEnvelopes;

Expand All @@ -35,11 +37,13 @@ public WireAssetOptions() {
public WireAssetOptions(Map<String, Object> properties) {
final Object emitAllChannelsProp = properties.get(EMIT_ALL_CHANNELS_PROP_NAME);
final Object emitErrorsProp = properties.get(EMIT_ERRORS_PROP_NAME);
final Object emitConnectionErrorsProp = properties.get(EMIT_CONNECTION_ERRORS_PROP_NAME);
final Object emitOnChangeProp = properties.get(EMIT_ON_CHANGE_PROP_NAME);
final Object emitEmptyEnvelopesProp = properties.get(EMIT_EMPTY_ENVELOPES_PROP_NAME);

this.emitAllChannels = emitAllChannelsProp instanceof Boolean && (Boolean) emitAllChannelsProp;
this.emitErrors = emitErrorsProp instanceof Boolean && (Boolean) emitErrorsProp;
this.emitConnectionErrors = emitConnectionErrorsProp instanceof Boolean && (Boolean) emitConnectionErrorsProp;
this.emitOnChange = emitOnChangeProp instanceof Boolean && (Boolean) emitOnChangeProp;
this.emitEmptyEnvelopes = !(emitEmptyEnvelopesProp instanceof Boolean) || (Boolean) emitEmptyEnvelopesProp;

Expand All @@ -58,6 +62,10 @@ public boolean emitErrors() {
return this.emitErrors;
}

public boolean emitConnectionErrors() {
return this.emitConnectionErrors;
}

public boolean emitOnChange() {
return this.emitOnChange;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*******************************************************************************
* Copyright (c) 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech
*******************************************************************************/
package org.eclipse.kura.internal.wire.asset.test;

import static org.junit.Assert.assertTrue;

import java.util.Map;
import java.util.Optional;

import org.eclipse.kura.channel.ChannelType;
import org.eclipse.kura.driver.Driver.ConnectionException;
import org.eclipse.kura.type.DataType;
import org.eclipse.kura.type.LongValue;
import org.eclipse.kura.type.TypedValue;
import org.eclipse.kura.wire.WireEnvelope;
import org.junit.Test;

public class AssetErrorTest extends WireAssetTestBase {

@Test
public void shouldNotEmitAssetErrorIfNotEnabled() {
givenAssetConfig(map("emit.errors", false, "emit.connection.errors", true));
givenConnectionException(new ConnectionException());

whenAssetReceivesEnvelope();

thenTotalEmittedEnvelopeCountAfter1SecIs(0);
}

@Test
public void shouldNotEmitAssetErrorIfEmitErrorsIsNotEnabled() {
givenAssetConfig(map("emit.errors", false, "emit.connection.errors", true));
givenConnectionException(new ConnectionException());

whenAssetReceivesEnvelope();

thenTotalEmittedEnvelopeCountAfter1SecIs(0);
}

@Test
public void shouldNotEmitAssetErrorIfEnabledButEmitErrorsIsNotEnabled() {
givenAssetConfig(map("emit.errors", true, "emit.connection.errors", false));
givenConnectionException(new ConnectionException());

whenAssetReceivesEnvelope();

thenTotalEmittedEnvelopeCountAfter1SecIs(0);
}

@Test
public void shouldEmitErrorEnvelopeProperty() {
givenAssetConfig(map("emit.errors", true, "emit.connection.errors", true));
givenConnectionException(new ConnectionException("exception message"));

whenAssetReceivesEnvelope();

thenTotalEmittedEnvelopeCountAfter1SecIs(1);
thenAssetOutputContains(0, "assetError", "exception message");
thenAssetOutputContains(0, "assetName", "testAsset");
thenAssetOutputContainsAssetTimestamp();
}

@Test
public void shouldTolerateNullMessage() {
givenAssetConfig(map("emit.errors", true, "emit.connection.errors", true));
givenConnectionException(new ConnectionException((String) null));

whenAssetReceivesEnvelope();

thenTotalEmittedEnvelopeCountAfter1SecIs(1);
thenAssetOutputContains(0, "assetError", "ConnectionException");
thenAssetOutputContainsAssetTimestamp();
}

@Test
public void shouldNotEmitTimestampIfDisabled() {
givenAssetConfig(map("emit.errors", true, "emit.connection.errors", true, "timestamp.mode", "NO_TIMESTAMPS"));
givenConnectionException(new ConnectionException("exception message"));

whenAssetReceivesEnvelope();

thenTotalEmittedEnvelopeCountAfter1SecIs(1);
thenAssetOutputContains(0, "assetError", "exception message");
thenAssetOutputContains(0, "assetName", "testAsset");
thenAssetOutputDoesNotContain(0, "assetTimestamp");
}

@Override
protected void givenAssetConfig(Map<String, Object> assetConfig) {

assetConfig.put("foo#+name", "foo");
assetConfig.put("foo#+type", ChannelType.READ.name());
assetConfig.put("foo#+value.type", DataType.STRING.name());

super.givenAssetConfig(assetConfig);
}

private void thenAssetOutputContainsAssetTimestamp() {
final WireEnvelope envelope = awaitEnvelope(0);

final long timestamp = Optional.ofNullable(envelope.getRecords().get(0).getProperties())
.map(p -> p.get("assetTimestamp")).filter(LongValue.class::isInstance).map(LongValue.class::cast)
.map(TypedValue::getValue)
.orElseThrow(() -> new IllegalStateException("asset timestamp property not found"));

assertTrue(Math.abs(timestamp - System.currentTimeMillis()) < 30000);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2022, 2023 Eurotech and/or its affiliates and others
* Copyright (c) 2022, 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand Down Expand Up @@ -37,6 +37,7 @@ class MockDriver implements Driver {
private final Map<String, List<TypedValue<?>>> values = new HashMap<>();
final Map<String, ChannelListener> listeners = new HashMap<>();
CompletableFuture<Void> preparedReadCalled = new CompletableFuture<>();
Optional<ConnectionException> connectionException = Optional.empty();

@Override
public void connect() throws ConnectionException {
Expand All @@ -60,6 +61,12 @@ public Object getDescriptor() {

@Override
public void read(List<ChannelRecord> records) throws ConnectionException {
final Optional<ConnectionException> ex = this.connectionException;

if (ex.isPresent()) {
throw ex.get();
}

for (final ChannelRecord record : records) {
final Optional<TypedValue<?>> value = Optional.ofNullable(values.get(record.getChannelName()))
.flatMap(l -> {
Expand Down Expand Up @@ -110,6 +117,10 @@ public void unregisterChannelListener(ChannelListener listener) throws Connectio
public void write(List<ChannelRecord> records) throws ConnectionException {
}

synchronized void throwConnectionException(final Optional<ConnectionException> connectionException) {
this.connectionException = connectionException;
}

synchronized void addReadResult(final String channelName, final TypedValue<?> value) {
this.values.computeIfAbsent(channelName, a -> new ArrayList<>()).add(value);
}
Expand Down
Loading
Loading