Skip to content

Commit

Permalink
#3 Enhanced communication & processing components
Browse files Browse the repository at this point in the history
  • Loading branch information
pappist committed Aug 11, 2016
1 parent 8ba625c commit 83ef91b
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 41 deletions.
6 changes: 1 addition & 5 deletions lm-demo/com.incquerylabs.iot.commons/.classpath
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@
<classpath>
<classpathentry kind="src" path="src/main/java"/>
<classpathentry kind="src" path="src/main/test"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="output" path="target/classes"/>
</classpath>
8 changes: 4 additions & 4 deletions lm-demo/com.incquerylabs.iot.commons/.project
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>com.incquerylabs.iot.communication</name>
<name>com.incquerylabs.iot.commons</name>
<comment></comment>
<projects>
</projects>
Expand All @@ -11,17 +11,17 @@
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<name>org.eclipse.pde.ManifestBuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.pde.ManifestBuilder</name>
<name>org.eclipse.pde.SchemaBuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.pde.SchemaBuilder</name>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,2 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.7
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.source=1.7
2 changes: 1 addition & 1 deletion lm-demo/com.incquerylabs.iot.commons/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Communication
Bundle-SymbolicName: com.incquerylabs.iot.communication
Bundle-SymbolicName: com.incquerylabs.iot.commons
Bundle-Version: 1.0.0.qualifier
Export-Package: com.incquerylabs.iot.communication
Require-Bundle: com.google.guava;bundle-version="15.0.0"
4 changes: 2 additions & 2 deletions lm-demo/com.incquerylabs.iot.commons/pom.xml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.incquerylabs.iot</groupId>
<artifactId>communication</artifactId>
<artifactId>iot-commons</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<sourceDirectory>src</sourceDirectory>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

public interface ISubscriber {

public void subscribe(IAddress address, ISubscriberCallback callback);
public void registerCallback(IAddress address, ISubscriberCallback callback);

public void unregisterCallback(IAddress address, ISubscriberCallback callback);

public void disconnectAll();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@ public static SubscriberPool getInstance() throws PoolNotInitializedException {

public void registerCallback(IAddress address, ISubscriberCallback callback) {
if(subscribers.containsKey(address.getFullAddress())) {
subscribers.get(address.getFullAddress()).subscribe(address, callback);
subscribers.get(address.getFullAddress()).registerCallback(address, callback);
} else {
ISubscriber subscriber = factory.createSubscriber(pool);
subscriber.subscribe(address, callback);
subscriber.registerCallback(address, callback);
subscribers.put(address.getFullAddress(), subscriber);
}
}
}

public void unregisterCallback(ISubscriberCallback callback) {
// TODO:
}

private static class DefaultExecutor extends ThreadPoolExecutor implements IExecutorPool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,34 @@

import com.incquerylabs.iot.communication.IAddress;
import com.incquerylabs.iot.communication.ISubscriberCallback;
import com.incquerylabs.iot.communication.PublisherPool;
import com.incquerylabs.iot.communication.SubscriberPool;
import com.incquerylabs.iot.communication.exception.PoolNotInitializedException;

public class AbstractProcessorComponent implements ISubscriberCallback {

IAddress sourceAddress;
IAddress targetAddress;

IDataProcessor processor;

public AbstractProcessorComponent(IAddress sourceAddress, IAddress targetAddress, IDataProcessor processor) throws PoolNotInitializedException {

public AbstractProcessorComponent(IAddress sourceAddress) {
this.sourceAddress = sourceAddress;
this.targetAddress = targetAddress;
}

processor = this.processor;

SubscriberPool.getInstance().registerCallback(sourceAddress, this);
protected void setProcessor(IDataProcessor processor) {
this.processor = processor;
}

@Override
public void messageArrived(IAddress address, byte[] data) {
try {
PublisherPool.getInstance().next(targetAddress).publish(processor.process(data), 0);
} catch (PoolNotInitializedException e) {
// TODO Logging ...
e.printStackTrace();
}
processor.process(data);
}

public void start() throws PoolNotInitializedException {
SubscriberPool.getInstance().registerCallback(sourceAddress, this);
}

public void stop() throws PoolNotInitializedException {
SubscriberPool.getInstance().unregisterCallback(this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

public interface IDataProcessor {

public byte[] process(byte[] data);
public void process(byte[] data);

}

0 comments on commit 83ef91b

Please sign in to comment.