diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java index aeda204da9..7b02dc4db4 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java @@ -62,27 +62,32 @@ public void registerDataStreams(Collection functio var client = new StreamPipesClientResolver().makeStreamPipesClientInstance(); functions.forEach(function -> { function.getFunctionConfig().getOutputDataStreams().values().forEach(ds -> { - DeclarersSingleton.getInstance().add(new IStreamPipesDataStream() { - @Override - public IDataStreamConfiguration declareConfig() { - return DataStreamConfiguration.create( - () -> this, - ds - ); + if (!DeclarersSingleton.getInstance().getDataStreams().containsKey(ds.getAppId())) { + DeclarersSingleton.getInstance().add(new IStreamPipesDataStream() { + @Override + public IDataStreamConfiguration declareConfig() { + return DataStreamConfiguration.create( + () -> this, + ds + ); + } + + @Override + public void executeStream() { + + } + + @Override + public boolean isExecutable() { + return false; + } + }); + if (client.streams().get(ds.getElementId()).isEmpty()) { + client.streams().create(ds); + } else { + client.streams().update(ds); } - - @Override - public void executeStream() { - - } - - @Override - public boolean isExecutable() { - return false; - } - }); - - client.streams().create(ds); + } }); }); }