diff --git a/iwsn-portal/pom.xml b/iwsn-portal/pom.xml index dbf1f2156..7bec4e4bb 100644 --- a/iwsn-portal/pom.xml +++ b/iwsn-portal/pom.xml @@ -79,6 +79,12 @@ propconf + + de.uniluebeck.itm + eventstore + 1.0-SNAPSHOT + + de.uniluebeck.itm.netty-protocols netty-protocols diff --git a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/PortalEventStoreService.java b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/PortalEventStoreService.java deleted file mode 100644 index 904e0defe..000000000 --- a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/PortalEventStoreService.java +++ /dev/null @@ -1,7 +0,0 @@ -package de.uniluebeck.itm.tr.iwsn.portal; - -import com.google.common.util.concurrent.Service; - -public interface PortalEventStoreService extends Service { - -} diff --git a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/PortalEventStoreServiceImpl.java b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/PortalEventStoreServiceImpl.java deleted file mode 100644 index c1c7ba73c..000000000 --- a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/PortalEventStoreServiceImpl.java +++ /dev/null @@ -1,56 +0,0 @@ -package de.uniluebeck.itm.tr.iwsn.portal; - -import com.google.common.eventbus.Subscribe; -import com.google.common.util.concurrent.AbstractService; -import com.google.inject.Inject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PortalEventStoreServiceImpl extends AbstractService implements PortalEventStoreService { - - private static final Logger log = LoggerFactory.getLogger(PortalEventStoreServiceImpl.class); - - private final PortalEventBus portalEventBus; - - @Inject - public PortalEventStoreServiceImpl(final PortalEventBus portalEventBus) { - this.portalEventBus = portalEventBus; - } - - @Override - protected void doStart() { - log.trace("PortalEventStoreServiceImpl.doStart()"); - try { - // TODO implement - portalEventBus.register(this); - notifyStarted(); - } catch (Exception e) { - notifyFailed(e); - } - } - - @Override - protected void doStop() { - log.trace("PortalEventStoreServiceImpl.doStop()"); - try { - // TODO implement - portalEventBus.unregister(this); - notifyStopped(); - } catch (Exception e) { - notifyFailed(e); - } - } - - @Subscribe - public void onReservationStarted(final ReservationStartedEvent event) { - log.trace("PortalEventStoreServiceImpl.onReservationStarted()"); // TODO remove when working - final ReservationEventBus reservationEventBus = event.getReservation().getReservationEventBus(); - // TODO implement - } - - @Subscribe - public void onReservationEnded(final ReservationEndedEvent event) { - log.trace("PortalEventStoreServiceImpl.onReservationEnded()"); // TODO remove when working - // TODO implement - } -} diff --git a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/PortalModule.java b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/PortalModule.java index dd269b58a..c80d171b7 100644 --- a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/PortalModule.java +++ b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/PortalModule.java @@ -26,6 +26,7 @@ import de.uniluebeck.itm.tr.iwsn.common.ResponseTrackerModule; import de.uniluebeck.itm.tr.iwsn.portal.api.rest.v1.RestApiModule; import de.uniluebeck.itm.tr.iwsn.portal.api.soap.v3.SoapApiModule; +import de.uniluebeck.itm.tr.iwsn.portal.eventstore.PortalEventStoreModule; import de.uniluebeck.itm.tr.iwsn.portal.externalplugins.ExternalPluginModule; import de.uniluebeck.itm.tr.iwsn.portal.externalplugins.ExternalPluginServiceConfig; import de.uniluebeck.itm.tr.iwsn.portal.netty.NettyServerModule; @@ -49,160 +50,161 @@ public class PortalModule extends AbstractModule { - private final DeviceDBConfig deviceDBConfig; - - private final PortalServerConfig portalServerConfig; - - private final CommonConfig commonConfig; - - private final RSServiceConfig rsServiceConfig; - - private final SNAAServiceConfig snaaServiceConfig; - - private final WiseGuiServiceConfig wiseGuiServiceConfig; - - private final WisemlProviderConfig wisemlProviderConfig; - - private final ExternalPluginServiceConfig externalPluginServiceConfig; - - @Inject - public PortalModule(final CommonConfig commonConfig, - final DeviceDBConfig deviceDBConfig, - final PortalServerConfig portalServerConfig, - final RSServiceConfig rsServiceConfig, - final SNAAServiceConfig snaaServiceConfig, - final WiseGuiServiceConfig wiseGuiServiceConfig, - final WisemlProviderConfig wisemlProviderConfig, - final ExternalPluginServiceConfig externalPluginServiceConfig) { - this.commonConfig = commonConfig; - this.deviceDBConfig = deviceDBConfig; - this.portalServerConfig = portalServerConfig; - this.rsServiceConfig = rsServiceConfig; - this.snaaServiceConfig = snaaServiceConfig; - this.wiseGuiServiceConfig = wiseGuiServiceConfig; - this.wisemlProviderConfig = wisemlProviderConfig; - this.externalPluginServiceConfig = externalPluginServiceConfig; - } - - @Override - protected void configure() { - - bind(CommonConfig.class).toProvider(of(commonConfig)); - bind(PortalServerConfig.class).toProvider(of(portalServerConfig)); - bind(DeviceDBConfig.class).toProvider(of(deviceDBConfig)); - bind(RSServiceConfig.class).toProvider(of(rsServiceConfig)); - bind(SNAAServiceConfig.class).toProvider(of(snaaServiceConfig)); - bind(WiseGuiServiceConfig.class).toProvider(of(wiseGuiServiceConfig)); - bind(WisemlProviderConfig.class).toProvider(of(wisemlProviderConfig)); - bind(ExternalPluginServiceConfig.class).toProvider(of(externalPluginServiceConfig)); - - install(new SNAAServiceModule(commonConfig, snaaServiceConfig)); - install(new RSServiceModule(commonConfig, rsServiceConfig)); - install(new DeviceDBServiceModule(deviceDBConfig)); - - bind(ServedNodeUrnsProvider.class).to(DeviceDBServedNodeUrnsProvider.class); - bind(ServedNodeUrnPrefixesProvider.class).to(CommonConfigServedNodeUrnPrefixesProvider.class); - - bind(EventBusFactory.class).to(EventBusFactoryImpl.class); - bind(PortalEventBus.class).to(PortalEventBusImpl.class).in(Singleton.class); - bind(ReservationManager.class).to(ReservationManagerImpl.class).in(Singleton.class); - bind(PortalEventStoreService.class).to(PortalEventStoreServiceImpl.class).in(Singleton.class); - - install(new FactoryModuleBuilder() - .implement(Reservation.class, ReservationImpl.class) - .build(ReservationFactory.class) - ); - - install(new NettyServerModule( - new ThreadFactoryBuilder().setNameFormat("Portal-OverlayBossExecutor %d").build(), - new ThreadFactoryBuilder().setNameFormat("Portal-OverlayWorkerExecutor %d").build() - ) - ); - - install(new FactoryModuleBuilder() - .implement(ReservationEventBus.class, ReservationEventBusImpl.class) - .build(ReservationEventBusFactory.class) - ); - - install(new SchedulerServiceModule()); - install(new ServicePublisherCxfModule()); - install(new ResponseTrackerModule()); - install(new NettyProtocolsModule()); - - install(new DeviceDBRestServiceModule()); - install(new SoapApiModule()); - install(new RestApiModule(false)); - install(new WiseGuiServiceModule()); - - install(new PortalPluginModule()); - install(new ExternalPluginModule()); - } - - @Provides - @Singleton - SchedulerService provideSchedulerService(SchedulerServiceFactory factory) { - return factory.create(-1, "PortalScheduler"); - } - - @Provides - @Singleton - ServicePublisher provideServicePublisher(final ServicePublisherFactory factory, final CommonConfig commonConfig) { - return factory.create(new ServicePublisherConfig(commonConfig.getPort(), commonConfig.getShiroIni())); - } - - @Provides - TimeLimiter provideTimeLimiter() { - final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("TimeLimiter %d").build(); - return new SimpleTimeLimiter( - getExitingExecutorService((ThreadPoolExecutor) newCachedThreadPool(threadFactory)) - ); - } - - @Provides - EndpointManager provideEndpointManager(final PortalServerConfig portalServerConfig) { - - return new EndpointManager() { - - @Override - public URI getSnaaEndpointUri() { - return assertNonEmpty( - portalServerConfig.getConfigurationSnaaEndpointUri(), - CONFIGURATION_SNAA_ENDPOINT_URI - ); - } - - @Override - public URI getRsEndpointUri() { - return assertNonEmpty( - portalServerConfig.getConfigurationRsEndpointUri(), - CONFIGURATION_RS_ENDPOINT_URI - ); - } - - @Override - public URI getSmEndpointUri() { - return assertNonEmpty( - portalServerConfig.getConfigurationSmEndpointUri(), - CONFIGURATION_SM_ENDPOINT_URI - ); - } - - @Override - public URI getWsnEndpointUriBase() { - return assertNonEmpty( - portalServerConfig.getConfigurationWsnEndpointUriBase(), - CONFIGURATION_WSN_ENDPOINT_URI_BASE - ); - } - - private URI assertNonEmpty(final URI uri, final String paramName) { - if (uri == null || uri.toString().isEmpty()) { - throw new IllegalArgumentException( - "Configuration parameter " + paramName + " must be set!" - ); - } - return uri; - } - }; - } + private final DeviceDBConfig deviceDBConfig; + + private final PortalServerConfig portalServerConfig; + + private final CommonConfig commonConfig; + + private final RSServiceConfig rsServiceConfig; + + private final SNAAServiceConfig snaaServiceConfig; + + private final WiseGuiServiceConfig wiseGuiServiceConfig; + + private final WisemlProviderConfig wisemlProviderConfig; + + private final ExternalPluginServiceConfig externalPluginServiceConfig; + + @Inject + public PortalModule(final CommonConfig commonConfig, + final DeviceDBConfig deviceDBConfig, + final PortalServerConfig portalServerConfig, + final RSServiceConfig rsServiceConfig, + final SNAAServiceConfig snaaServiceConfig, + final WiseGuiServiceConfig wiseGuiServiceConfig, + final WisemlProviderConfig wisemlProviderConfig, + final ExternalPluginServiceConfig externalPluginServiceConfig) { + this.commonConfig = commonConfig; + this.deviceDBConfig = deviceDBConfig; + this.portalServerConfig = portalServerConfig; + this.rsServiceConfig = rsServiceConfig; + this.snaaServiceConfig = snaaServiceConfig; + this.wiseGuiServiceConfig = wiseGuiServiceConfig; + this.wisemlProviderConfig = wisemlProviderConfig; + this.externalPluginServiceConfig = externalPluginServiceConfig; + } + + @Override + protected void configure() { + + bind(CommonConfig.class).toProvider(of(commonConfig)); + bind(PortalServerConfig.class).toProvider(of(portalServerConfig)); + bind(DeviceDBConfig.class).toProvider(of(deviceDBConfig)); + bind(RSServiceConfig.class).toProvider(of(rsServiceConfig)); + bind(SNAAServiceConfig.class).toProvider(of(snaaServiceConfig)); + bind(WiseGuiServiceConfig.class).toProvider(of(wiseGuiServiceConfig)); + bind(WisemlProviderConfig.class).toProvider(of(wisemlProviderConfig)); + bind(ExternalPluginServiceConfig.class).toProvider(of(externalPluginServiceConfig)); + + install(new SNAAServiceModule(commonConfig, snaaServiceConfig)); + install(new RSServiceModule(commonConfig, rsServiceConfig)); + install(new DeviceDBServiceModule(deviceDBConfig)); + + bind(ServedNodeUrnsProvider.class).to(DeviceDBServedNodeUrnsProvider.class); + bind(ServedNodeUrnPrefixesProvider.class).to(CommonConfigServedNodeUrnPrefixesProvider.class); + + bind(EventBusFactory.class).to(EventBusFactoryImpl.class); + bind(PortalEventBus.class).to(PortalEventBusImpl.class).in(Singleton.class); + bind(ReservationManager.class).to(ReservationManagerImpl.class).in(Singleton.class); + + install(new PortalEventStoreModule()); + + install(new FactoryModuleBuilder() + .implement(Reservation.class, ReservationImpl.class) + .build(ReservationFactory.class) + ); + + install(new NettyServerModule( + new ThreadFactoryBuilder().setNameFormat("Portal-OverlayBossExecutor %d").build(), + new ThreadFactoryBuilder().setNameFormat("Portal-OverlayWorkerExecutor %d").build() + ) + ); + + install(new FactoryModuleBuilder() + .implement(ReservationEventBus.class, ReservationEventBusImpl.class) + .build(ReservationEventBusFactory.class) + ); + + install(new SchedulerServiceModule()); + install(new ServicePublisherCxfModule()); + install(new ResponseTrackerModule()); + install(new NettyProtocolsModule()); + + install(new DeviceDBRestServiceModule()); + install(new SoapApiModule()); + install(new RestApiModule(false)); + install(new WiseGuiServiceModule()); + + install(new PortalPluginModule()); + install(new ExternalPluginModule()); + } + + @Provides + @Singleton + SchedulerService provideSchedulerService(SchedulerServiceFactory factory) { + return factory.create(-1, "PortalScheduler"); + } + + @Provides + @Singleton + ServicePublisher provideServicePublisher(final ServicePublisherFactory factory, final CommonConfig commonConfig) { + return factory.create(new ServicePublisherConfig(commonConfig.getPort(), commonConfig.getShiroIni())); + } + + @Provides + TimeLimiter provideTimeLimiter() { + final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("TimeLimiter %d").build(); + return new SimpleTimeLimiter( + getExitingExecutorService((ThreadPoolExecutor) newCachedThreadPool(threadFactory)) + ); + } + + @Provides + EndpointManager provideEndpointManager(final PortalServerConfig portalServerConfig) { + + return new EndpointManager() { + + @Override + public URI getSnaaEndpointUri() { + return assertNonEmpty( + portalServerConfig.getConfigurationSnaaEndpointUri(), + CONFIGURATION_SNAA_ENDPOINT_URI + ); + } + + @Override + public URI getRsEndpointUri() { + return assertNonEmpty( + portalServerConfig.getConfigurationRsEndpointUri(), + CONFIGURATION_RS_ENDPOINT_URI + ); + } + + @Override + public URI getSmEndpointUri() { + return assertNonEmpty( + portalServerConfig.getConfigurationSmEndpointUri(), + CONFIGURATION_SM_ENDPOINT_URI + ); + } + + @Override + public URI getWsnEndpointUriBase() { + return assertNonEmpty( + portalServerConfig.getConfigurationWsnEndpointUriBase(), + CONFIGURATION_WSN_ENDPOINT_URI_BASE + ); + } + + private URI assertNonEmpty(final URI uri, final String paramName) { + if (uri == null || uri.toString().isEmpty()) { + throw new IllegalArgumentException( + "Configuration parameter " + paramName + " must be set!" + ); + } + return uri; + } + }; + } } diff --git a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/PortalServer.java b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/PortalServer.java index 320322d8c..3f5652e2e 100644 --- a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/PortalServer.java +++ b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/PortalServer.java @@ -12,6 +12,7 @@ import de.uniluebeck.itm.tr.devicedb.DeviceDBService; import de.uniluebeck.itm.tr.iwsn.portal.api.rest.v1.RestApiService; import de.uniluebeck.itm.tr.iwsn.portal.api.soap.v3.SoapApiService; +import de.uniluebeck.itm.tr.iwsn.portal.eventstore.PortalEventStoreService; import de.uniluebeck.itm.tr.iwsn.portal.externalplugins.ExternalPluginServiceConfig; import de.uniluebeck.itm.tr.iwsn.portal.plugins.PortalPluginService; import de.uniluebeck.itm.tr.rs.RSService; diff --git a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/api/rest/v1/resources/EventStoreResourceImpl.java b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/api/rest/v1/resources/EventStoreResourceImpl.java index 312fa96ce..b35afbdb2 100644 --- a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/api/rest/v1/resources/EventStoreResourceImpl.java +++ b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/api/rest/v1/resources/EventStoreResourceImpl.java @@ -1,7 +1,7 @@ package de.uniluebeck.itm.tr.iwsn.portal.api.rest.v1.resources; import com.google.inject.Inject; -import de.uniluebeck.itm.tr.iwsn.portal.PortalEventStoreService; +import de.uniluebeck.itm.tr.iwsn.portal.eventstore.PortalEventStoreService; import javax.ws.rs.GET; import javax.ws.rs.Path; diff --git a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreHelper.java b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreHelper.java new file mode 100644 index 000000000..319717e5d --- /dev/null +++ b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreHelper.java @@ -0,0 +1,10 @@ +package de.uniluebeck.itm.tr.iwsn.portal.eventstore; + +import eventstore.IEventStore; + +import java.io.FileNotFoundException; + +public interface PortalEventStoreHelper { + + IEventStore createAndConfigureEventStore(String serializedReservationKey) throws FileNotFoundException; +} diff --git a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreHelperImpl.java b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreHelperImpl.java new file mode 100644 index 000000000..a202df28c --- /dev/null +++ b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreHelperImpl.java @@ -0,0 +1,110 @@ +package de.uniluebeck.itm.tr.iwsn.portal.eventstore; + +import com.google.common.base.Function; +import com.google.inject.Inject; +import com.google.protobuf.InvalidProtocolBufferException; +import de.uniluebeck.itm.tr.iwsn.messages.Message; +import de.uniluebeck.itm.tr.iwsn.portal.PortalServerConfig; +import de.uniluebeck.itm.tr.iwsn.portal.ReservationEndedEvent; +import de.uniluebeck.itm.tr.iwsn.portal.ReservationManager; +import de.uniluebeck.itm.tr.iwsn.portal.ReservationStartedEvent; +import eventstore.ChronicleBasedEventStore; +import eventstore.IEventStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.FileNotFoundException; +import java.util.HashMap; +import java.util.Map; + +public class PortalEventStoreHelperImpl implements PortalEventStoreHelper { + + private static final Logger log = LoggerFactory.getLogger(PortalEventStoreHelperImpl.class); + private final ReservationManager reservationManager; + private final PortalServerConfig portalServerConfig; + + @Inject + public PortalEventStoreHelperImpl(final ReservationManager reservationManager, final PortalServerConfig portalServerConfig) { + this.reservationManager = reservationManager; + this.portalServerConfig = portalServerConfig; + } + + @Override + public IEventStore createAndConfigureEventStore(String serializedReservationKey) throws FileNotFoundException { + + Map, Function> serializers = new HashMap, Function>(); + Map, Function> deserializers = new HashMap, Function>(); + serializers.put(Message.class, new Function() { + @Nullable + @Override + public byte[] apply(@Nullable Message message) { + return message.toByteArray(); + } + }); + + serializers.put(ReservationStartedEvent.class, new Function() { + @Nullable + @Override + public byte[] apply(@Nullable ReservationStartedEvent input) { + return input.getReservation().getSerializedKey().getBytes(); + } + }); + + serializers.put(ReservationEndedEvent.class, new Function() { + @Nullable + @Override + public byte[] apply(@Nullable ReservationEndedEvent input) { + return input.getReservation().getSerializedKey().getBytes(); + } + }); + + deserializers.put(ReservationStartedEvent.class, new Function() { + @Nullable + @Override + public ReservationStartedEvent apply(@Nullable byte[] input) { + try { + String json = new String(input, "UTF-8"); + return new ReservationStartedEvent(reservationManager.getReservation(json)); + } catch (Exception e) { + log.error("Can't deserialize the ReservationStartedEvent", e); + return null; + } + + } + }); + + deserializers.put(ReservationEndedEvent.class, new Function() { + @Nullable + @Override + public ReservationEndedEvent apply(@Nullable byte[] input) { + try { + String json = new String(input, "UTF-8"); + return new ReservationEndedEvent(reservationManager.getReservation(json)); + } catch (Exception e) { + log.error("Can't deserialize the ReservationEndedEvent", e); + return null; + } + + } + }); + + deserializers.put(Message.class, new Function() { + @Nullable + @Override + public Object apply(@Nullable byte[] input) { + Message message = Message.getDefaultInstance(); + try { + return message.newBuilderForType().mergeFrom(input).build(); + } catch (InvalidProtocolBufferException e) { + log.error("Can't deserialize protobuf message", e); + return null; + } + } + }); + + String baseName = new File(serializedReservationKey, portalServerConfig.getEventStorePath()).getAbsolutePath(); + return new ChronicleBasedEventStore(baseName, serializers, deserializers); + } +} diff --git a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreModule.java b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreModule.java new file mode 100644 index 000000000..0eaae6ed3 --- /dev/null +++ b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreModule.java @@ -0,0 +1,21 @@ +package de.uniluebeck.itm.tr.iwsn.portal.eventstore; + +import com.google.inject.PrivateModule; +import com.google.inject.Singleton; +import com.google.inject.assistedinject.FactoryModuleBuilder; + +public class PortalEventStoreModule extends PrivateModule { + + @Override + protected void configure() { + + bind(PortalEventStoreService.class).to(PortalEventStoreServiceImpl.class).in(Singleton.class); + install(new FactoryModuleBuilder() + .implement(ReservationEventStore.class, ReservationEventStoreImpl.class) + .build(ReservationEventStoreFactory.class) + ); + + bind(PortalEventStoreHelper.class).to(PortalEventStoreHelperImpl.class).in(Singleton.class); + expose(PortalEventStoreService.class); + } +} diff --git a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreService.java b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreService.java new file mode 100644 index 000000000..930799539 --- /dev/null +++ b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreService.java @@ -0,0 +1,12 @@ +package de.uniluebeck.itm.tr.iwsn.portal.eventstore; + +import com.google.common.util.concurrent.Service; +import eventstore.IEventContainer; + +import java.io.IOException; +import java.util.Iterator; + +public interface PortalEventStoreService extends Service { + public Iterator getEvents(final String serializedReservationKey) throws IOException; + public Iterator getEventsBetween(final String serializedReservationKey, long startTime, long endTime) throws IOException; +} diff --git a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreServiceImpl.java b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreServiceImpl.java new file mode 100644 index 000000000..cba08dc98 --- /dev/null +++ b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreServiceImpl.java @@ -0,0 +1,104 @@ +package de.uniluebeck.itm.tr.iwsn.portal.eventstore; + +import com.google.common.eventbus.Subscribe; +import com.google.common.util.concurrent.AbstractService; +import com.google.inject.Inject; +import de.uniluebeck.itm.tr.iwsn.portal.PortalEventBus; +import de.uniluebeck.itm.tr.iwsn.portal.PortalServerConfig; +import de.uniluebeck.itm.tr.iwsn.portal.ReservationEndedEvent; +import de.uniluebeck.itm.tr.iwsn.portal.ReservationStartedEvent; +import eventstore.IEventContainer; +import eventstore.IEventStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; + +class PortalEventStoreServiceImpl extends AbstractService implements PortalEventStoreService { + + private static final Logger log = LoggerFactory.getLogger(PortalEventStoreServiceImpl.class); + private final HashMap reservationStores = new HashMap(); + private final PortalEventBus portalEventBus; + private final ReservationEventStoreFactory reservationEventStoreFactory; + private final PortalEventStoreHelper portalEventStoreHelper; + + @Inject + public PortalEventStoreServiceImpl(final PortalEventBus portalEventBus, + final ReservationEventStoreFactory reservationEventStoreFactory, final PortalEventStoreHelper portalEventStoreHelper) { + this.portalEventBus = portalEventBus; + this.reservationEventStoreFactory = reservationEventStoreFactory; + this.portalEventStoreHelper = portalEventStoreHelper; + } + + @Override + protected void doStart() { + log.trace("PortalEventStoreServiceImpl.doStart()"); + try { + portalEventBus.register(this); + notifyStarted(); + } catch (Exception e) { + notifyFailed(e); + } + } + + @Override + protected void doStop() { + log.trace("PortalEventStoreServiceImpl.doStop()"); + try { + for(ReservationEventStore store : reservationStores.values()) { + store.stop(); + } + portalEventBus.unregister(this); + notifyStopped(); + } catch (Exception e) { + notifyFailed(e); + } + } + + @Subscribe + public void onReservationStarted(final ReservationStartedEvent event) { + log.trace("PortalEventStoreServiceImpl.onReservationStarted()"); // TODO remove when working + ReservationEventStore reservationEventStore = reservationEventStoreFactory.create(event.getReservation()); + reservationStores.put(event.getReservation().getSerializedKey(), reservationEventStore); + reservationEventStore.reservationStarted(event); + } + + @Subscribe + public void onReservationEnded(final ReservationEndedEvent event) { + log.trace("PortalEventStoreServiceImpl.onReservationEnded()"); // TODO remove when working + ReservationEventStore reservationEventStore = reservationStores.remove(event.getReservation().getSerializedKey()); + if (reservationEventStore != null) { + reservationEventStore.reservationEnded(event); + } + } + + private IEventStore getEventStore(String serializedReservationKey) throws IOException { + ReservationEventStore reservationEventStore = reservationStores.get(serializedReservationKey); + IEventStore eventStore = null; + if (reservationEventStore != null) { + eventStore = reservationEventStore.getEventStore(); + } else { + eventStore = portalEventStoreHelper.createAndConfigureEventStore(serializedReservationKey); + } + + if (eventStore == null) { + throw new IOException("Can't open event store for key " + serializedReservationKey); + } + + return eventStore; + } + + @Override + public Iterator getEventsBetween(String serializedReservationKey, long startTime, long endTime) throws IOException{ + IEventStore store = getEventStore(serializedReservationKey); + return store.getEventsBetweenTimestamps(startTime, endTime); + } + + @Override + public Iterator getEvents(String serializedReservationKey) throws IOException { + IEventStore store = getEventStore(serializedReservationKey); + return store.getAllEvents(); + } +} diff --git a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/ReservationEventStore.java b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/ReservationEventStore.java new file mode 100644 index 000000000..bc9ed6edf --- /dev/null +++ b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/ReservationEventStore.java @@ -0,0 +1,16 @@ +package de.uniluebeck.itm.tr.iwsn.portal.eventstore; + +import de.uniluebeck.itm.tr.iwsn.portal.ReservationEndedEvent; +import de.uniluebeck.itm.tr.iwsn.portal.ReservationStartedEvent; +import eventstore.IEventStore; + +public interface ReservationEventStore { + + public void reservationEnded(final ReservationEndedEvent event); + + public void reservationStarted(final ReservationStartedEvent event); + + public void stop(); + + public IEventStore getEventStore(); +} diff --git a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/ReservationEventStoreFactory.java b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/ReservationEventStoreFactory.java new file mode 100644 index 000000000..c7a4df999 --- /dev/null +++ b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/ReservationEventStoreFactory.java @@ -0,0 +1,10 @@ +package de.uniluebeck.itm.tr.iwsn.portal.eventstore; + +import com.google.inject.assistedinject.Assisted; +import de.uniluebeck.itm.tr.iwsn.portal.Reservation; + +public interface ReservationEventStoreFactory { + + ReservationEventStore create(@Assisted Reservation reservation); + +} diff --git a/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/ReservationEventStoreImpl.java b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/ReservationEventStoreImpl.java new file mode 100644 index 000000000..f448264f4 --- /dev/null +++ b/iwsn-portal/src/main/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/ReservationEventStoreImpl.java @@ -0,0 +1,80 @@ +package de.uniluebeck.itm.tr.iwsn.portal.eventstore; + +import com.google.common.eventbus.Subscribe; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; +import de.uniluebeck.itm.tr.iwsn.messages.Message; +import de.uniluebeck.itm.tr.iwsn.portal.Reservation; +import de.uniluebeck.itm.tr.iwsn.portal.ReservationEndedEvent; +import de.uniluebeck.itm.tr.iwsn.portal.ReservationStartedEvent; +import eventstore.IEventStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; + +class ReservationEventStoreImpl implements ReservationEventStore { + private static final Logger log = LoggerFactory.getLogger(ReservationEventStoreImpl.class); + + private Reservation reservation; + private IEventStore eventStore; + + @Inject + public ReservationEventStoreImpl( + @Assisted final Reservation reservation, final PortalEventStoreHelper portalEventStoreHelper) { + this.reservation = reservation; + try { + eventStore = portalEventStoreHelper.createAndConfigureEventStore(reservation.getSerializedKey()); + } catch (FileNotFoundException e) { + log.error("Can't create event store at this location!", e); + } + } + + + @Override + public void reservationStarted(final ReservationStartedEvent event) { + try { + eventStore.storeEvent(event); + reservation.getReservationEventBus().register(this); + } catch (IOException e) { + log.error("Can't store event", e); + } + } + + @Subscribe + public void on(Message message) { + if (message.getType() != Message.Type.PROGRESS) { + storeEvent(message); + } + } + + private void storeEvent(Message event) { + try { + eventStore.storeEvent(event, Message.class); + } catch (IOException e) { + log.error("Failed to store event", e); + } + } + + @Override + public IEventStore getEventStore() { + return eventStore; + } + + @Override + public void reservationEnded(ReservationEndedEvent event) { + try { + eventStore.storeEvent(event); + } catch (IOException e) { + log.error("Error on reservationEnded()", e); + } + stop(); + } + + @Override + public void stop() { + this.reservation.getReservationEventBus().unregister(this); + eventStore.close(); + } +} diff --git a/iwsn-portal/src/test/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreTest.java b/iwsn-portal/src/test/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreTest.java new file mode 100644 index 000000000..505d593bc --- /dev/null +++ b/iwsn-portal/src/test/java/de/uniluebeck/itm/tr/iwsn/portal/eventstore/PortalEventStoreTest.java @@ -0,0 +1,54 @@ +package de.uniluebeck.itm.tr.iwsn.portal.eventstore; + +import de.uniluebeck.itm.tr.iwsn.portal.PortalEventBus; +import de.uniluebeck.itm.tr.iwsn.portal.PortalServerConfig; +import de.uniluebeck.itm.tr.iwsn.portal.Reservation; +import de.uniluebeck.itm.tr.iwsn.portal.ReservationStartedEvent; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.junit.Assert.*; + +@RunWith(MockitoJUnitRunner.class) +public class PortalEventStoreTest { + + @Mock + private PortalEventBus portalEventBus; + + @Mock + private PortalServerConfig portalServerConfig; + + @Mock + private ReservationEventStoreFactory reservationEventStoreFactory; + + @Mock + private PortalEventStoreHelper portalEventStoreHelper; + + private PortalEventStoreServiceImpl store; + + + @Before + public void setUp() throws Exception { + store = new PortalEventStoreServiceImpl(portalEventBus, reservationEventStoreFactory, portalEventStoreHelper); + } + + @Test + public void testIfReservationStartedEventIsPersisted() throws Exception { + + final Reservation reservation = mock(Reservation.class); + when(reservation.getSerializedKey()).thenReturn("abc"); + + //when(portalEventStoreHelper.createAndConfigureEventStore("abc")).thenReturn(new ChronicleBasedEventStore()) + + store.onReservationStarted(new ReservationStartedEvent(reservation)); + + final ReservationStartedEvent loaded = ((ReservationStartedEvent) store.getEvents("abc").next().getEvent()); + + assertNotNull(loaded); + } +}