Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consistent subscription values handling; derived new and cached values #149

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.fourthline.cling.model.meta.LocalService;
import org.fourthline.cling.model.meta.RemoteService;
import org.fourthline.cling.model.meta.Service;
import org.fourthline.cling.model.state.StateVariableValue;
import org.fourthline.cling.protocol.ProtocolCreationException;
import org.fourthline.cling.protocol.sync.SendingSubscribe;
import org.seamless.util.Exceptions;

import java.util.Collections;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -162,10 +164,10 @@ public void ended(CancelReason reason) {
}
}

public void eventReceived() {
public void eventReceived(Map<String, StateVariableValue<?>> changedValues) {
synchronized (SubscriptionCallback.this) {
log.fine("Local service state updated, notifying callback, sequence is: " + getCurrentSequence());
SubscriptionCallback.this.eventReceived(this);
SubscriptionCallback.this.eventReceived(this, changedValues);
incrementSequence();
}
}
Expand All @@ -178,7 +180,7 @@ public void eventReceived() {
localSubscription.establish();

log.fine("Simulating first initial event for local subscription callback, sequence: " + localSubscription.getCurrentSequence());
eventReceived(localSubscription);
eventReceived(localSubscription, getSubscription().getCurrentValues());
localSubscription.incrementSequence();

log.fine("Starting to monitor state changes of local service");
Expand Down Expand Up @@ -218,9 +220,9 @@ public void ended(CancelReason reason, UpnpResponse responseStatus) {
}
}

public void eventReceived() {
public void eventReceived(Map<String, StateVariableValue<?>> changedValues) {
synchronized (SubscriptionCallback.this) {
SubscriptionCallback.this.eventReceived(this);
SubscriptionCallback.this.eventReceived(this, changedValues);
}
}

Expand Down Expand Up @@ -312,7 +314,7 @@ protected void failed(GENASubscription subscription, UpnpResponse responseStatus
*
* @param subscription The established subscription with fresh state variable values.
*/
protected abstract void eventReceived(GENASubscription subscription);
protected abstract void eventReceived(GENASubscription subscription, Map<String, StateVariableValue<?>> changedValues);

/**
* Called when a received event was out of sequence, indicating that events have been missed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@

package org.fourthline.cling.model.gena;

import java.util.LinkedHashMap;
import java.util.Map;

import org.fourthline.cling.model.UserConstants;
import org.fourthline.cling.model.meta.Service;
import org.fourthline.cling.model.state.StateVariableValue;
import org.fourthline.cling.model.types.UnsignedIntegerFourBytes;

import java.util.LinkedHashMap;
import java.util.Map;

/**
* An established subscription, with identifer, expiration duration, sequence handling, and state variable values.
* <p>
Expand Down Expand Up @@ -87,7 +87,7 @@ synchronized public Map<String, StateVariableValue<S>> getCurrentValues() {
}

public abstract void established();
public abstract void eventReceived();
public abstract void eventReceived(Map<String, StateVariableValue<?>> changedValues);

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,7 @@
import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
import java.net.URL;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not change import orders.

import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -46,9 +39,9 @@
* {@link #registerOnService()} method is called next, and from this point forward all
* {@link org.fourthline.cling.model.ServiceManager#EVENTED_STATE_VARIABLES} property change
* events are detected by this subscription. After moderation of state variable values
* (frequency and range of changes), the {@link #eventReceived()} method is called.
* (frequency and range of changes), the {@link #eventReceived(Map)} method is called.
* Delivery of the event message to the subscriber is not part of this class, but the
* implementor of {@link #eventReceived()}.
* implementor of {@link #eventReceived(Map)}.
* </p>
*
* @author Christian Bauer
Expand Down Expand Up @@ -130,7 +123,7 @@ synchronized public void end(CancelReason reason) {

/**
* Moderates {@link org.fourthline.cling.model.ServiceManager#EVENTED_STATE_VARIABLES} events and state variable
* values, calls {@link #eventReceived()}.
* values, calls {@link #eventReceived(Map)}.
*/
synchronized public void propertyChange(PropertyChangeEvent e) {
if (!e.getPropertyName().equals(ServiceManager.EVENTED_STATE_VARIABLES)) return;
Expand All @@ -142,11 +135,12 @@ synchronized public void propertyChange(PropertyChangeEvent e) {
Collection<StateVariableValue> newValues = (Collection) e.getNewValue();
Set<String> excludedVariables = moderateStateVariables(currentTime, newValues);

currentValues.clear();
Map<String, StateVariableValue<?>> changedValues = new HashMap<>(newValues.size());
for (StateVariableValue newValue : newValues) {
String name = newValue.getStateVariable().getName();
if (!excludedVariables.contains(name)) {
log.fine("Adding state variable value to current values of event: " + newValue.getStateVariable() + " = " + newValue);
changedValues.put(newValue.getStateVariable().getName(), newValue);
currentValues.put(newValue.getStateVariable().getName(), newValue);

// Preserve "last sent" state for future moderation
Expand All @@ -162,7 +156,7 @@ synchronized public void propertyChange(PropertyChangeEvent e) {
// TODO: I'm not happy with this design, this dispatches to a separate thread which _then_
// is supposed to lock and read the values off this instance. That obviously doesn't work
// so it's currently a hack in SendingEvent.java
eventReceived();
eventReceived(changedValues);
} else {
log.fine("No state variable values for event (all moderated out?), not triggering event");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@

import java.beans.PropertyChangeSupport;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.*;

/**
* An outgoing subscription to a remote service.
* <p>
* Once established, calls its {@link #eventReceived()} method whenever an event has
* Once established, calls its {@link #eventReceived(Map)} method whenever an event has
* been received from the remote service.
* </p>
*
Expand Down Expand Up @@ -107,13 +105,14 @@ synchronized public void receive(UnsignedIntegerFourBytes sequence, Collection<S

this.currentSequence = sequence;

Map<String, StateVariableValue<?>> changedValues = new HashMap<>(newValues.size());
for (StateVariableValue newValue : newValues) {
changedValues.put(newValue.getStateVariable().getName(), newValue);
currentValues.put(newValue.getStateVariable().getName(), newValue);
}

eventReceived();
eventReceived(changedValues);
}

public abstract void invalidMessage(UnsupportedDataException ex);

public abstract void failed(UpnpResponse responseStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.fourthline.cling.model.message.gena.OutgoingSubscribeResponseMessage;
import org.fourthline.cling.model.meta.LocalService;
import org.fourthline.cling.model.resource.ServiceEventSubscriptionResource;
import org.fourthline.cling.model.state.StateVariableValue;
import org.fourthline.cling.protocol.ReceivingSync;
import org.fourthline.cling.transport.RouterException;
import org.seamless.util.Exceptions;

import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

/**
Expand Down Expand Up @@ -135,13 +137,13 @@ protected OutgoingSubscribeResponseMessage processNewSubscription(LocalService s
return new OutgoingSubscribeResponseMessage(UpnpResponse.Status.PRECONDITION_FAILED);
}

Integer timeoutSeconds;
Integer timeoutSeconds;
if(getUpnpService().getConfiguration().isReceivedSubscriptionTimeoutIgnored()) {
timeoutSeconds = null; // Use default value
} else {
timeoutSeconds = requestMessage.getRequestedTimeoutSeconds();
}

try {
subscription = new LocalGENASubscription(service, timeoutSeconds, callbackURLs) {
public void established() {
Expand All @@ -150,7 +152,7 @@ public void established() {
public void ended(CancelReason reason) {
}

public void eventReceived() {
public void eventReceived(Map<String, StateVariableValue<?>> changedValues) {
// The only thing we are interested in, sending an event when the state changes
getUpnpService().getConfiguration().getSyncProtocolExecutorService().execute(
getUpnpService().getProtocolFactory().createSendingEvent(this)
Expand Down Expand Up @@ -208,4 +210,4 @@ public void responseException(Throwable t) {
log.fine("Response could not be send to subscriber, removing local GENA subscription: " + subscription);
getUpnpService().getRegistry().removeLocalSubscription(subscription);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void ended(GENASubscription sub,
}

@Override
public void eventReceived(GENASubscription sub) {
public void eventReceived(GENASubscription sub, Map<String, StateVariableValue<?>> changedValues) {

System.out.println("Event: " + sub.getCurrentSequence().getValue());

Expand Down Expand Up @@ -226,4 +226,4 @@ protected StreamResponseMessage createUnsubscribeResponseMessage() {
}


}
}
10 changes: 6 additions & 4 deletions core/src/test/java/example/localservice/EventProviderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.fourthline.cling.model.message.UpnpResponse;
import org.fourthline.cling.model.meta.LocalDevice;
import org.fourthline.cling.model.meta.LocalService;
import org.fourthline.cling.model.state.StateVariableValue;
import org.fourthline.cling.test.data.SampleData;
import org.seamless.util.Reflections;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.testng.Assert.*;

Expand Down Expand Up @@ -121,7 +123,7 @@ public void ended(GENASubscription subscription, CancelReason reason, UpnpRespon
testAssertions.add(true);
}

public void eventReceived(GENASubscription subscription) {
public void eventReceived(GENASubscription subscription, Map<String, StateVariableValue<?>> changedValues) {
if (subscription.getCurrentSequence().getValue() == 0) {
assertEquals(subscription.getCurrentValues().get("Status").toString(), "0");
testAssertions.add(true);
Expand Down Expand Up @@ -194,7 +196,7 @@ public void ended(GENASubscription subscription, CancelReason reason, UpnpRespon
testAssertions.add(true);
}

public void eventReceived(GENASubscription subscription) {
public void eventReceived(GENASubscription subscription, Map<String, StateVariableValue<?>> changedValues) {
if (subscription.getCurrentSequence().getValue() == 0) {
assertEquals(subscription.getCurrentValues().get("Target").toString(), "0");
assertEquals(subscription.getCurrentValues().get("Status").toString(), "0");
Expand Down Expand Up @@ -282,7 +284,7 @@ public void ended(GENASubscription subscription, CancelReason reason, UpnpRespon
testAssertions.add(true);
}

public void eventReceived(GENASubscription subscription) {
public void eventReceived(GENASubscription subscription, Map<String, StateVariableValue<?>> changedValues) {
if (subscription.getCurrentSequence().getValue() == 0) {

// Initial event contains all evented variables, snapshot of the service state
Expand Down Expand Up @@ -396,7 +398,7 @@ public void ended(GENASubscription subscription, CancelReason reason, UpnpRespon
testAssertions.add(true);
}

public void eventReceived(GENASubscription subscription) {
public void eventReceived(GENASubscription subscription, Map<String, StateVariableValue<?>> changedValues) {
if (subscription.getCurrentSequence().getValue() == 0) {

// Initial event contains all evented variables, snapshot of the service state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -96,7 +97,7 @@ public void writeReadRequest(MockUpnpService upnpService) throws Exception {
List<URL> urls = new ArrayList<URL>() {{
add(SampleData.getLocalBaseURL());
}};

LocalGENASubscription subscription =
new LocalGENASubscription(localService, 1800, urls) {
public void failed(Exception ex) {
Expand All @@ -111,7 +112,7 @@ public void established() {

}

public void eventReceived() {
public void eventReceived(Map<String, StateVariableValue<?>> changedValues) {

}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void eventsMissed(int numberOfMissedEvents) {
public void established() {
}

public void eventReceived() {
public void eventReceived(Map<String, StateVariableValue<?>> changedValues) {
}

public void invalidMessage(UnsupportedDataException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.fourthline.cling.model.types.UnsignedIntegerFourBytes;
import org.fourthline.cling.protocol.ReceivingSync;
import org.fourthline.cling.test.data.SampleData;
import org.fourthline.cling.transport.Router;
import org.seamless.util.URIUtil;
import org.testng.annotations.Test;

Expand All @@ -47,6 +46,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.testng.Assert.assertEquals;

Expand Down Expand Up @@ -109,7 +109,7 @@ public void ended(GENASubscription subscription, CancelReason reason, UpnpRespon
testAssertions.add(false);
}

public void eventReceived(GENASubscription subscription) {
public void eventReceived(GENASubscription subscription, Map<String, StateVariableValue<?>> changedValues) {
testAssertions.add(false);
}

Expand Down Expand Up @@ -177,7 +177,7 @@ public void ended(GENASubscription subscription, CancelReason reason, UpnpRespon
testAssertions.add(true);
}

public void eventReceived(GENASubscription subscription) {
public void eventReceived(GENASubscription subscription, Map<String, StateVariableValue<?>> changedValues) {
assertEquals(subscription.getCurrentValues().get("Status").toString(), "0");
assertEquals(subscription.getCurrentValues().get("Target").toString(), "1");
testAssertions.add(true);
Expand Down Expand Up @@ -272,4 +272,4 @@ protected IncomingEventRequestMessage createEventRequestMessage(UpnpService upnp
return new IncomingEventRequestMessage(outgoing, ((RemoteGENASubscription) callback.getSubscription()).getService());
}

}
}
Loading