From 0b2f38cb9d9d04bcd6a5d88932e6d8479bd78283 Mon Sep 17 00:00:00 2001 From: pappist Date: Wed, 10 Aug 2016 16:05:37 +0200 Subject: [PATCH] #3 Added abstract processor component --- .../processor/AbstractProcessorComponent.java | 36 +++++++++++++++++++ .../iot/processor/IDataProcessor.java | 7 ++++ 2 files changed, 43 insertions(+) create mode 100644 lm-demo/com.incquerylabs.iot.communication/src/main/java/com/incquerylabs/iot/processor/AbstractProcessorComponent.java create mode 100644 lm-demo/com.incquerylabs.iot.communication/src/main/java/com/incquerylabs/iot/processor/IDataProcessor.java diff --git a/lm-demo/com.incquerylabs.iot.communication/src/main/java/com/incquerylabs/iot/processor/AbstractProcessorComponent.java b/lm-demo/com.incquerylabs.iot.communication/src/main/java/com/incquerylabs/iot/processor/AbstractProcessorComponent.java new file mode 100644 index 0000000..9d8f25e --- /dev/null +++ b/lm-demo/com.incquerylabs.iot.communication/src/main/java/com/incquerylabs/iot/processor/AbstractProcessorComponent.java @@ -0,0 +1,36 @@ +package com.incquerylabs.iot.processor; + +import com.incquerylabs.iot.communication.IAddress; +import com.incquerylabs.iot.communication.ISubscriberCallback; +import com.incquerylabs.iot.communication.PublisherPool; +import com.incquerylabs.iot.communication.SubscriberPool; +import com.incquerylabs.iot.communication.exception.PoolNotInitializedException; + +public class AbstractProcessorComponent implements ISubscriberCallback { + + IAddress sourceAddress; + IAddress targetAddress; + + IDataProcessor processor; + + public AbstractProcessorComponent(IAddress sourceAddress, IAddress targetAddress, IDataProcessor processor) throws PoolNotInitializedException { + + this.sourceAddress = sourceAddress; + this.targetAddress = targetAddress; + + processor = this.processor; + + SubscriberPool.getInstance().registerCallback(sourceAddress, this); + } + + @Override + public void messageArrived(IAddress address, byte[] data) { + try { + PublisherPool.getInstance().next(targetAddress).publish(processor.process(data), 0); + } catch (PoolNotInitializedException e) { + // TODO Logging ... + e.printStackTrace(); + } + } + +} diff --git a/lm-demo/com.incquerylabs.iot.communication/src/main/java/com/incquerylabs/iot/processor/IDataProcessor.java b/lm-demo/com.incquerylabs.iot.communication/src/main/java/com/incquerylabs/iot/processor/IDataProcessor.java new file mode 100644 index 0000000..77f5583 --- /dev/null +++ b/lm-demo/com.incquerylabs.iot.communication/src/main/java/com/incquerylabs/iot/processor/IDataProcessor.java @@ -0,0 +1,7 @@ +package com.incquerylabs.iot.processor; + +public interface IDataProcessor { + + public byte[] process(byte[] data); + +}