Skip to content

Commit

Permalink
Merge pull request #53 from bosch-io/feature/search-protocol
Browse files Browse the repository at this point in the history
Add search support and restructure messaging providers.
  • Loading branch information
Yufei Cai authored Apr 16, 2020
2 parents 21efb53 + 9c1743f commit e9d3720
Show file tree
Hide file tree
Showing 67 changed files with 5,405 additions and 3,369 deletions.
44 changes: 44 additions & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@
<mockito.version>3.1.0</mockito.version>
<jsonassert.version>1.2.3</jsonassert.version>

<!-- reactive streams versions -->
<reactive-streams.version>1.0.3</reactive-streams.version>
<reactive-streams-tck.version>${reactive-streams.version}</reactive-streams-tck.version>
<!-- carefully set testng version to one whose junit dependency is equal to ${junit.version} -->
<reactive-streams-tck.testng.version>6.14.3</reactive-streams-tck.testng.version>

<json-unit.version>1.28.1</json-unit.version>
<pax.exam.version>4.13.0</pax.exam.version>
<felix.version>6.0.3</felix.version>
Expand Down Expand Up @@ -622,6 +628,10 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
Expand Down Expand Up @@ -759,6 +769,11 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>${reactive-streams.version}</version>
</dependency>

<!-- ### Compile - OSGi ### -->
<dependency>
Expand Down Expand Up @@ -799,6 +814,11 @@
<artifactId>ditto-model-things</artifactId>
<version>${ditto.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-model-thingsearch</artifactId>
<version>${ditto.version}</version>
</dependency>

<dependency>
<groupId>org.eclipse.ditto</groupId>
Expand All @@ -825,6 +845,11 @@
<artifactId>ditto-signals-commands-messages</artifactId>
<version>${ditto.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-signals-commands-thingsearch</artifactId>
<version>${ditto.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-signals-events-base</artifactId>
Expand All @@ -835,6 +860,11 @@
<artifactId>ditto-signals-events-things</artifactId>
<version>${ditto.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-signals-events-thingsearch</artifactId>
<version>${ditto.version}</version>
</dependency>

<dependency>
<groupId>org.eclipse.ditto</groupId>
Expand Down Expand Up @@ -945,6 +975,20 @@
<scope>test</scope>
</dependency>

<!-- Testing - reactive streams compatibility -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>${reactive-streams-tck.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>${reactive-streams-tck.testng.version}</version>
<scope>test</scope>
</dependency>

<!-- ### Testing - Ditto artifacts ### -->
<dependency>
<groupId>org.eclipse.ditto</groupId>
Expand Down
3 changes: 3 additions & 0 deletions java/src/main/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,17 @@
<include>org.eclipse.ditto:ditto-model-policies</include>
<include>org.eclipse.ditto:ditto-model-messages</include>
<include>org.eclipse.ditto:ditto-model-things</include>
<include>org.eclipse.ditto:ditto-model-thingsearch</include>
<include>org.eclipse.ditto:ditto-signals-base</include>
<include>org.eclipse.ditto:ditto-signals-commands-base</include>
<include>org.eclipse.ditto:ditto-signals-commands-thingsearch</include>
<include>org.eclipse.ditto:ditto-signals-commands-things</include>
<include>org.eclipse.ditto:ditto-signals-commands-policies</include>
<include>org.eclipse.ditto:ditto-signals-commands-messages</include>
<include>org.eclipse.ditto:ditto-signals-commands-live</include>
<include>org.eclipse.ditto:ditto-signals-events-base</include>
<include>org.eclipse.ditto:ditto-signals-events-things</include>
<include>org.eclipse.ditto:ditto-signals-events-thingsearch</include>
<include>org.eclipse.ditto:ditto-protocol-adapter</include>
<include>org.eclipse.ditto:ditto-client</include>
</includes>
Expand Down
20 changes: 12 additions & 8 deletions java/src/main/java/org/eclipse/ditto/client/DittoClients.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.client.internal.DefaultDittoClient;
import org.eclipse.ditto.client.internal.ResponseForwarder;
import org.eclipse.ditto.client.live.internal.MessageSerializerFactory;
import org.eclipse.ditto.client.live.messages.MessageSerializerRegistry;
import org.eclipse.ditto.client.messaging.MessagingProvider;
Expand Down Expand Up @@ -83,11 +82,13 @@ public static DittoClient newInstance(final MessagingProvider twinMessagingProvi
* @since 1.1.0
*/
public static DittoClient newInstance(final MessagingProvider twinMessagingProvider,
final MessagingProvider liveMessagingProvider, final MessagingProvider policyMessagingProvider) {
final MessagingProvider liveMessagingProvider,
final MessagingProvider policyMessagingProvider) {

final MessageSerializerRegistry messageSerializerRegistry =
MessageSerializerFactory.newInstance().getMessageSerializerRegistry();
return newInstance(twinMessagingProvider, liveMessagingProvider, policyMessagingProvider, messageSerializerRegistry);
return newInstance(twinMessagingProvider, liveMessagingProvider, policyMessagingProvider,
messageSerializerRegistry);
}

/**
Expand All @@ -103,9 +104,11 @@ public static DittoClient newInstance(final MessagingProvider twinMessagingProvi
* could not be established
*/
public static DittoClient newInstance(final MessagingProvider twinMessagingProvider,
final MessagingProvider liveMessagingProvider, final MessageSerializerRegistry messageSerializerRegistry) {
final MessagingProvider liveMessagingProvider,
final MessageSerializerRegistry messageSerializerRegistry) {

return newInstance(twinMessagingProvider, liveMessagingProvider, twinMessagingProvider, messageSerializerRegistry);
return newInstance(twinMessagingProvider, liveMessagingProvider, twinMessagingProvider,
messageSerializerRegistry);
}

/**
Expand All @@ -126,9 +129,10 @@ public static DittoClient newInstance(final MessagingProvider twinMessagingProvi
final MessagingProvider liveMessagingProvider, final MessagingProvider policyMessagingProvider,
final MessageSerializerRegistry messageSerializerRegistry) {

final ResponseForwarder responseForwarder = ResponseForwarder.getInstance();
return DefaultDittoClient.newInstance(twinMessagingProvider, liveMessagingProvider, policyMessagingProvider,
responseForwarder, messageSerializerRegistry);
return DefaultDittoClient.newInstance(twinMessagingProvider,
liveMessagingProvider,
policyMessagingProvider,
messageSerializerRegistry);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ private static JsonValue getJsonValueForThing(@Nullable final Thing thing) {
* deleted.
* @param revision the revision (change counter) of the change.
* @param timestamp the timestamp of the change.
* @param extra the extra data to be included in the change.
* @throws NullPointerException if any required argument is {@code null}.
*/
public ImmutableThingChange(final ThingId thingId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.client.configuration;

import java.net.URI;
import java.time.Duration;
import java.util.Optional;

import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
Expand All @@ -24,6 +25,13 @@
*/
public interface MessagingConfiguration {

/**
* Returns how long to wait for a response before giving up.
*
* @return the timeout.
*/
Duration getTimeout();

/**
* Returns the JSON schema version to use for messaging.
*
Expand Down Expand Up @@ -63,6 +71,14 @@ public interface MessagingConfiguration {
*/
interface Builder {

/**
* Set the timeout waiting for a response.
*
* @param timeout the timeout.
* @return this builder.
*/
Builder timeout(Duration timeout);

/**
* Sets the {@code JSON schema version}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@
*/
package org.eclipse.ditto.client.configuration;

import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkArgument;
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;

import javax.annotation.Nullable;
import java.net.URI;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.eclipse.ditto.model.base.common.ConditionChecker.checkArgument;
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;
import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.json.JsonSchemaVersion;

/**
* Provides Ditto WebSocket messaging specific configuration.
Expand All @@ -33,15 +35,18 @@
*/
public final class WebSocketMessagingConfiguration implements MessagingConfiguration {

private final Duration timeout;
private final JsonSchemaVersion jsonSchemaVersion;
private final URI endpointUri;
private final boolean reconnectEnabled;
@Nullable private final ProxyConfiguration proxyConfiguration;
@Nullable private final TrustStoreConfiguration trustStoreConfiguration;

private WebSocketMessagingConfiguration(final JsonSchemaVersion jsonSchemaVersion, final URI endpointUri,
private WebSocketMessagingConfiguration(final Duration timeout, final JsonSchemaVersion jsonSchemaVersion,
final URI endpointUri,
final boolean reconnectEnabled, @Nullable final ProxyConfiguration proxyConfiguration,
@Nullable final TrustStoreConfiguration trustStoreConfiguration) {
this.timeout = timeout;
this.jsonSchemaVersion = jsonSchemaVersion;
this.endpointUri = endpointUri;
this.reconnectEnabled = reconnectEnabled;
Expand All @@ -53,6 +58,11 @@ public static MessagingConfiguration.Builder newBuilder() {
return new WebSocketMessagingConfigurationBuilder();
}

@Override
public Duration getTimeout() {
return timeout;
}

@Override
public JsonSchemaVersion getJsonSchemaVersion() {
return jsonSchemaVersion;
Expand Down Expand Up @@ -82,14 +92,21 @@ private static final class WebSocketMessagingConfigurationBuilder implements Mes

private static final List<String> ALLOWED_URI_SCHEME = Arrays.asList("wss", "ws");
private static final String WS_PATH = "/ws/";
private static final String WS_PATH_REGEX = "/ws/(1|2)/?";
private static final String WS_PATH_REGEX = "/ws/([12])/?";

private Duration timeout = Duration.ofSeconds(60L);
private JsonSchemaVersion jsonSchemaVersion = JsonSchemaVersion.LATEST;
private URI endpointUri;
private boolean reconnectEnabled = true;
private ProxyConfiguration proxyConfiguration;
private TrustStoreConfiguration trustStoreConfiguration;

@Override
public Builder timeout(final Duration timeout) {
this.timeout = timeout;
return this;
}

@Override
public MessagingConfiguration.Builder jsonSchemaVersion(final JsonSchemaVersion jsonSchemaVersion) {
this.jsonSchemaVersion = checkNotNull(jsonSchemaVersion, "jsonSchemaVersion");
Expand Down Expand Up @@ -130,8 +147,8 @@ public MessagingConfiguration.Builder trustStoreConfiguration(

@Override
public MessagingConfiguration build() {
final URI wsEndpointUri= appendWsPathIfNecessary(this.endpointUri, jsonSchemaVersion);
return new WebSocketMessagingConfiguration(jsonSchemaVersion, wsEndpointUri, reconnectEnabled,
final URI wsEndpointUri = appendWsPathIfNecessary(this.endpointUri, jsonSchemaVersion);
return new WebSocketMessagingConfiguration(timeout, jsonSchemaVersion, wsEndpointUri, reconnectEnabled,
proxyConfiguration, trustStoreConfiguration);
}

Expand All @@ -152,13 +169,15 @@ private static boolean needToAppendWsPath(final URI baseUri) {
return !matcher.find();
}

private static void checkIfBaseUriAndSchemaVersionMatch(final URI baseUri, final JsonSchemaVersion schemaVersion) {
private static void checkIfBaseUriAndSchemaVersionMatch(final URI baseUri,
final JsonSchemaVersion schemaVersion) {
final String path = removeTrailingSlashFromPath(baseUri.getPath());
final String apiVersion = path.substring(path.length() - 1, path.length());
final String apiVersion = path.substring(path.length() - 1);
if (!schemaVersion.toString().equals(apiVersion)) {
throw new IllegalArgumentException("The jsonSchemaVersion and apiVersion of the endpoint do not match. " +
"Either remove the ws path from the endpoint or " +
"use the same jsonSchemaVersion as in the ws path of the endpoint.");
throw new IllegalArgumentException(
"The jsonSchemaVersion and apiVersion of the endpoint do not match. " +
"Either remove the ws path from the endpoint or " +
"use the same jsonSchemaVersion as in the ws path of the endpoint.");
}
}

Expand Down
Loading

0 comments on commit e9d3720

Please sign in to comment.