diff --git a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java index 2b9d9e45eb..ba776ce51c 100644 --- a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java +++ b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java @@ -22,6 +22,7 @@ import org.apache.streampipes.model.extensions.configuration.SpServiceConfiguration; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.function.FunctionDefinition; +import org.apache.streampipes.model.migration.ModelMigratorConfig; import java.util.List; @@ -39,5 +40,7 @@ public interface IAdminApi { void deregisterFunction(String functionId); + void registerMigrations(List migrationConfigs, String serviceId); + MessagingSettings getMessagingSettings(); } diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java index b502632839..970eb62249 100644 --- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java +++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java @@ -24,6 +24,7 @@ import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.function.FunctionDefinition; import org.apache.streampipes.model.message.SuccessMessage; +import org.apache.streampipes.model.migration.ModelMigratorConfig; import java.util.List; @@ -66,6 +67,15 @@ public void deregisterFunction(String functionId) { delete(getDeleteFunctionPath(functionId), SuccessMessage.class); } + /** + * Register migration configs {@link ModelMigratorConfig} at the StreamPipes Core service. + * @param migrationConfigs list of migration configs to be registered + */ + @Override + public void registerMigrations(List migrationConfigs, String serviceId) { + post(getMigrationPath().addToPath(serviceId), migrationConfigs); + } + @Override public MessagingSettings getMessagingSettings() { return getSingle(getMessagingSettingsPath(), MessagingSettings.class); @@ -98,4 +108,10 @@ private StreamPipesApiPath getFunctionsPath() { private StreamPipesApiPath getDeleteFunctionPath(String functionId) { return getFunctionsPath().addToPath(functionId); } + + private StreamPipesApiPath getMigrationPath() { + return StreamPipesApiPath + .fromBaseApiPath() + .addToPath("migrations"); + } } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java new file mode 100644 index 0000000000..26f158a5b7 --- /dev/null +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.connect.management.management; + +import org.apache.streampipes.commons.exceptions.connect.AdapterException; +import org.apache.streampipes.manager.migration.AbstractMigrationManager; +import org.apache.streampipes.manager.migration.IMigrationHandler; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; +import org.apache.streampipes.model.migration.ModelMigratorConfig; +import org.apache.streampipes.storage.api.IAdapterStorage; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class AdapterMigrationManager extends AbstractMigrationManager implements IMigrationHandler { + + private static final Logger LOG = LoggerFactory.getLogger(AdapterMigrationManager.class); + + private final IAdapterStorage adapterStorage; + + public AdapterMigrationManager(IAdapterStorage adapterStorage) { + this.adapterStorage = adapterStorage; + } + + @Override + public void handleMigrations(SpServiceRegistration extensionsServiceConfig, + List migrationConfigs) { + + LOG.info("Received {} migrations from extension service {}.", + migrationConfigs.size(), + extensionsServiceConfig.getServiceUrl()); + LOG.info("Updating adapter descriptions by replacement..."); + updateDescriptions(migrationConfigs, extensionsServiceConfig.getServiceUrl()); + LOG.info("Adapter descriptions are up to date."); + + LOG.info("Checking migrations for existing adapters in StreamPipes Core ..."); + for (var migrationConfig : migrationConfigs) { + LOG.info("Searching for assets of '{}'", migrationConfig.targetAppId()); + LOG.debug("Searching for assets of '{}' with config {}", migrationConfig.targetAppId(), migrationConfig); + var adapterDescriptions = adapterStorage.getAdaptersByAppId(migrationConfig.targetAppId()); + LOG.info("Found {} instances for appId '{}'", adapterDescriptions.size(), migrationConfig.targetAppId()); + for (var adapterDescription : adapterDescriptions) { + + var adapterVersion = adapterDescription.getVersion(); + + if (adapterVersion == migrationConfig.fromVersion()) { + LOG.info("Migration is required for adapter '{}'. Migrating from version '{}' to '{}' ...", + adapterDescription.getElementId(), + adapterVersion, migrationConfig.toVersion() + ); + + var migrationResult = performMigration( + adapterDescription, + migrationConfig, + String.format("%s/%s/adapter", + extensionsServiceConfig.getServiceUrl(), + MIGRATION_ENDPOINT + ) + ); + + if (migrationResult.success()) { + LOG.info("Migration successfully performed by extensions service. Updating adapter description ..."); + LOG.debug( + "Migration was performed by extensions service '{}'", + extensionsServiceConfig.getServiceUrl()); + + adapterStorage.updateAdapter(migrationResult.element()); + LOG.info("Adapter description is updated - Migration successfully completed at Core."); + } else { + LOG.error("Migration failed with the following reason: {}", migrationResult.message()); + LOG.error( + "Migration for adapter '{}' failed - Stopping adapter ...", + migrationResult.element().getElementId() + ); + try { + WorkerRestClient.stopStreamAdapter(extensionsServiceConfig.getServiceUrl(), adapterDescription); + } catch (AdapterException e) { + LOG.error("Stopping adapter failed: {}", StringUtils.join(e.getStackTrace(), "\n")); + } + LOG.info("Adapter successfully stopped."); + } + } else { + LOG.info( + "Migration is not applicable for adapter '{}' because of a version mismatch - " + + "adapter version: '{}', migration starts at: '{}'", + adapterDescription.getElementId(), + adapterVersion, + migrationConfig.fromVersion() + ); + } + } + } + } +} diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java index bbb92edc71..c45506d0f5 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java @@ -63,8 +63,7 @@ public static void stopStreamAdapter(String baseUrl, AdapterDescription adapterStreamDescription) throws AdapterException { String url = baseUrl + WorkerPaths.getStreamStopPath(); - var ad = - getAdapterDescriptionById(new AdapterInstanceStorageImpl(), adapterStreamDescription.getElementId()); + var ad = getAdapterDescriptionById(new AdapterInstanceStorageImpl(), adapterStreamDescription.getElementId()); stopAdapter(ad, url); updateStreamAdapterStatus(adapterStreamDescription.getElementId(), false); @@ -74,8 +73,8 @@ public static List getAllRunningAdapterInstanceDescriptions( try { LOG.info("Requesting all running adapter description instances: " + url); var responseString = ExtensionServiceExecutions - .extServiceGetRequest(url) - .execute().returnContent().asString(); + .extServiceGetRequest(url) + .execute().returnContent().asString(); return JacksonSerializer.getObjectMapper().readValue(responseString, List.class); } catch (IOException e) { @@ -84,15 +83,15 @@ public static List getAllRunningAdapterInstanceDescriptions( } } - public static void startAdapter(String url, - AdapterDescription ad) throws AdapterException { + private static void startAdapter(String url, + AdapterDescription ad) throws AdapterException { LOG.info("Trying to start adapter on endpoint {} ", url); triggerAdapterStateChange(ad, url, "started"); } - public static void stopAdapter(AdapterDescription ad, - String url) throws AdapterException { + private static void stopAdapter(AdapterDescription ad, + String url) throws AdapterException { LOG.info("Trying to stop adapter on endpoint {} ", url); triggerAdapterStateChange(ad, url, "stopped"); @@ -134,14 +133,14 @@ private static HttpResponse triggerPost(String url, public static RuntimeOptionsResponse getConfiguration(String workerEndpoint, String appId, RuntimeOptionsRequest runtimeOptionsRequest) - throws AdapterException, SpConfigurationException { + throws AdapterException, SpConfigurationException { String url = workerEndpoint + WorkerPaths.getRuntimeResolvablePath(appId); try { String payload = JacksonSerializer.getObjectMapper().writeValueAsString(runtimeOptionsRequest); var response = ExtensionServiceExecutions.extServicePostRequest(url, payload) - .execute() - .returnResponse(); + .execute() + .returnResponse(); String responseString = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); @@ -163,9 +162,9 @@ public static String getAssets(String workerPath) throws AdapterException { try { return Request.Get(url) - .connectTimeout(1000) - .socketTimeout(100000) - .execute().returnContent().asString(); + .connectTimeout(1000) + .socketTimeout(100000) + .execute().returnContent().asString(); } catch (IOException e) { LOG.error(e.getMessage()); throw new AdapterException("Could not get assets endpoint: " + url); @@ -178,9 +177,9 @@ public static byte[] getIconAsset(String baseUrl) throws AdapterException { try { byte[] responseString = Request.Get(url) - .connectTimeout(1000) - .socketTimeout(100000) - .execute().returnContent().asBytes(); + .connectTimeout(1000) + .socketTimeout(100000) + .execute().returnContent().asBytes(); return responseString; } catch (IOException e) { LOG.error(e.getMessage()); @@ -193,9 +192,9 @@ public static String getDocumentationAsset(String baseUrl) throws AdapterExcepti try { return Request.Get(url) - .connectTimeout(1000) - .socketTimeout(100000) - .execute().returnContent().asString(); + .connectTimeout(1000) + .socketTimeout(100000) + .execute().returnContent().asString(); } catch (IOException e) { LOG.error(e.getMessage()); throw new AdapterException("Could not get documentation endpoint: " + url); diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IDataProcessorParameterExtractor.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IDataProcessorParameterExtractor.java index 30e9e52525..986c0d0810 100644 --- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IDataProcessorParameterExtractor.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IDataProcessorParameterExtractor.java @@ -18,11 +18,9 @@ package org.apache.streampipes.extensions.api.extractor; -import org.apache.streampipes.model.graph.DataProcessorInvocation; - import java.util.List; -public interface IDataProcessorParameterExtractor extends IParameterExtractor { +public interface IDataProcessorParameterExtractor extends IParameterExtractor { String outputTopic(); List outputKeySelectors(); diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IDataSinkParameterExtractor.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IDataSinkParameterExtractor.java index 16c69c572c..fac25b1ca2 100644 --- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IDataSinkParameterExtractor.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IDataSinkParameterExtractor.java @@ -18,7 +18,5 @@ package org.apache.streampipes.extensions.api.extractor; -import org.apache.streampipes.model.graph.DataSinkInvocation; - -public interface IDataSinkParameterExtractor extends IParameterExtractor { +public interface IDataSinkParameterExtractor extends IParameterExtractor { } diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IParameterExtractor.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IParameterExtractor.java index 9c091b7d2e..7b3e7539b6 100644 --- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IParameterExtractor.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IParameterExtractor.java @@ -19,7 +19,6 @@ package org.apache.streampipes.extensions.api.extractor; import org.apache.streampipes.commons.exceptions.SpRuntimeException; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.schema.EventProperty; import org.apache.streampipes.model.schema.EventPropertyPrimitive; import org.apache.streampipes.model.schema.PropertyScope; @@ -30,7 +29,7 @@ import java.io.InputStream; import java.util.List; -public interface IParameterExtractor { +public interface IParameterExtractor { String measurementUnit(String runtimeName, Integer streamIndex); String inputTopic(Integer streamIndex); diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IStaticPropertyExtractor.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IStaticPropertyExtractor.java index 017c3ad777..67692b45d9 100644 --- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IStaticPropertyExtractor.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IStaticPropertyExtractor.java @@ -18,7 +18,5 @@ package org.apache.streampipes.extensions.api.extractor; -import org.apache.streampipes.model.graph.DataSinkInvocation; - -public interface IStaticPropertyExtractor extends IParameterExtractor { +public interface IStaticPropertyExtractor extends IParameterExtractor { } diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/DataSinkMigrator.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/DataSinkMigrator.java new file mode 100644 index 0000000000..e30ab00c2a --- /dev/null +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/DataSinkMigrator.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.api.migration; + +import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor; +import org.apache.streampipes.model.graph.DataSinkInvocation; + +public interface DataSinkMigrator extends IModelMigrator { +} diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/IAdapterMigrator.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/IAdapterMigrator.java new file mode 100644 index 0000000000..e7a0996dd9 --- /dev/null +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/IAdapterMigrator.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.api.migration; + +import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; + +public interface IAdapterMigrator extends IModelMigrator { +} diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/IDataProcessorMigrator.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/IDataProcessorMigrator.java new file mode 100644 index 0000000000..c3932d0e20 --- /dev/null +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/IDataProcessorMigrator.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.api.migration; + +import org.apache.streampipes.extensions.api.extractor.IDataProcessorParameterExtractor; +import org.apache.streampipes.model.graph.DataProcessorInvocation; + +public interface IDataProcessorMigrator + extends IModelMigrator { +} diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/IModelMigrator.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/IModelMigrator.java new file mode 100644 index 0000000000..77e2e3f3c0 --- /dev/null +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/IModelMigrator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.api.migration; + +import org.apache.streampipes.extensions.api.extractor.IParameterExtractor; +import org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity; +import org.apache.streampipes.model.migration.MigrationResult; +import org.apache.streampipes.model.migration.ModelMigratorConfig; + +public interface IModelMigrator< + T extends VersionedNamedStreamPipesEntity, + ExT extends IParameterExtractor + > extends Comparable { + + ModelMigratorConfig config(); + + /** + * Defines the migration to be performed. + * + * @param element Entity to be transformed. + * @param extractor Extractor that allows to handle static properties. + * @return Result of the migration that describes both outcomes: successful and failed migrations + * @throws RuntimeException in case any unexpected error occurs + */ + MigrationResult migrate(T element, ExT extractor) throws RuntimeException; + + @Override + default int compareTo(Object o) { + if (!(o instanceof IModelMigrator)) { + throw new ClassCastException("Given object is not an instance of `IModelMigrator` - " + + "only instances of `IModelMigrator` can be compared."); + } else { + return config().compareTo(((IModelMigrator) o).config()); + } + } +} diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/MigrationComparison.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/MigrationComparison.java new file mode 100644 index 0000000000..ef275a4702 --- /dev/null +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/MigrationComparison.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.api.migration; + +public class MigrationComparison { + + public static boolean isEqual(IModelMigrator first, IModelMigrator second) { + return first.config().equals(second.config()); + } +} + diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/pe/param/IParameterGenerator.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/pe/param/IParameterGenerator.java index fef867b047..a9edb8b48e 100644 --- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/pe/param/IParameterGenerator.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/pe/param/IParameterGenerator.java @@ -23,7 +23,7 @@ public interface IParameterGenerator< IvT extends InvocableStreamPipesEntity, - PeT extends IParameterExtractor, + PeT extends IParameterExtractor, K extends IPipelineElementParameters> { diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/pe/param/IPipelineElementParameters.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/pe/param/IPipelineElementParameters.java index 2ef9dc3d1a..39a1a3be37 100644 --- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/pe/param/IPipelineElementParameters.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/pe/param/IPipelineElementParameters.java @@ -28,7 +28,7 @@ public interface IPipelineElementParameters< IvT extends InvocableStreamPipesEntity, - ExT extends IParameterExtractor> { + ExT extends IParameterExtractor> { IvT getModel(); diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/runtime/ResolvesContainerProvidedOutputStrategy.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/runtime/ResolvesContainerProvidedOutputStrategy.java index b37871daff..2db19a880a 100644 --- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/runtime/ResolvesContainerProvidedOutputStrategy.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/runtime/ResolvesContainerProvidedOutputStrategy.java @@ -23,7 +23,7 @@ import org.apache.streampipes.model.schema.EventSchema; public interface ResolvesContainerProvidedOutputStrategy> { + extends IParameterExtractor> { EventSchema resolveOutputStrategy(T processingElement, K parameterExtractor) throws SpConfigurationException; } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/model/SpServiceDefinition.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/model/SpServiceDefinition.java index ca22193907..3c7f3af96e 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/model/SpServiceDefinition.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/model/SpServiceDefinition.java @@ -20,12 +20,15 @@ import org.apache.streampipes.dataformat.SpDataFormatFactory; import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter; import org.apache.streampipes.extensions.api.declarer.IStreamPipesFunctionDeclarer; +import org.apache.streampipes.extensions.api.migration.IModelMigrator; +import org.apache.streampipes.extensions.api.migration.MigrationComparison; import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement; import org.apache.streampipes.extensions.api.pe.runtime.IStreamPipesRuntimeProvider; import org.apache.streampipes.messaging.SpProtocolDefinitionFactory; import org.apache.streampipes.model.extensions.configuration.ConfigItem; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -47,6 +50,7 @@ public class SpServiceDefinition { private List kvConfigs; private List runtimeProviders; + private List> migrators; public SpServiceDefinition() { this.serviceId = UUID.randomUUID().toString(); @@ -57,6 +61,7 @@ public SpServiceDefinition() { this.functions = new ArrayList<>(); this.adapters = new ArrayList<>(); this.runtimeProviders = new ArrayList<>(); + this.migrators = new ArrayList<>(); } public String getServiceGroup() { @@ -182,4 +187,24 @@ public List getRuntimeProviders() { public void addRuntimeProvider(IStreamPipesRuntimeProvider runtimeProvider) { this.runtimeProviders.add(runtimeProvider); } + + public List> getMigrators() { + return this.migrators; + } + + /** + * Add a list of migrations to the service definition. + * This inherently checks for duplicates and sorts the migrations as such that + * migrations affecting lower versions always come first. + * + * @param migrators migrators to add + */ + public void addMigrators(List> migrators) { + for (var migratorToAdd : migrators) { + if (this.migrators.stream().noneMatch(migrator -> MigrationComparison.isEqual(migrator, migratorToAdd))) { + this.migrators.add(migratorToAdd); + } + } + Collections.sort(this.migrators); + } } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/model/SpServiceDefinitionBuilder.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/model/SpServiceDefinitionBuilder.java index a350950c9e..e8e9220a0a 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/model/SpServiceDefinitionBuilder.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/model/SpServiceDefinitionBuilder.java @@ -20,6 +20,7 @@ import org.apache.streampipes.dataformat.SpDataFormatFactory; import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter; import org.apache.streampipes.extensions.api.declarer.IStreamPipesFunctionDeclarer; +import org.apache.streampipes.extensions.api.migration.IModelMigrator; import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement; import org.apache.streampipes.extensions.api.pe.runtime.IStreamPipesRuntimeProvider; import org.apache.streampipes.messaging.SpProtocolDefinitionFactory; @@ -36,7 +37,6 @@ public class SpServiceDefinitionBuilder { private static final Logger LOG = LoggerFactory.getLogger(SpServiceDefinitionBuilder.class); private SpServiceDefinition serviceDefinition; - //private SpConfig config; private SpServiceDefinitionBuilder(String serviceGroup, String serviceName, @@ -47,7 +47,6 @@ private SpServiceDefinitionBuilder(String serviceGroup, this.serviceDefinition.setServiceName(serviceName); this.serviceDefinition.setServiceDescription(serviceDescription); this.serviceDefinition.setDefaultPort(defaultPort); - //this.config = new ConsulSpConfig(serviceGroup); } public static SpServiceDefinitionBuilder create(String serviceGroup, @@ -123,9 +122,22 @@ public SpServiceDefinitionBuilder registerMessagingProtocols(SpProtocolDefinitio return this; } + /** + * Include migrations in the service definition. + *
+ * Please refrain from providing {@link IModelMigrator}s with overlapping version definitions for one application id. + * @param migrations List of migrations to be registered + * @return {@link SpServiceDefinitionBuilder} + */ + public SpServiceDefinitionBuilder registerMigrators(IModelMigrator... migrations) { + this.serviceDefinition.addMigrators(List.of(migrations)); + return this; + } + public SpServiceDefinitionBuilder merge(SpServiceDefinition other) { this.serviceDefinition.addDeclarers(other.getDeclarers()); this.serviceDefinition.addAdapters(other.getAdapters()); + this.serviceDefinition.addMigrators(other.getMigrators()); other.getKvConfigs().forEach(value -> { if (this.serviceDefinition.getKvConfigs().stream().anyMatch(c -> c.getKey().equals(value.getKey()))) { LOG.warn("Config key {} already exists and will be overridden by merge, which might lead to strange results.", diff --git a/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/model/SpServiceDefinitionBuilderTest.java b/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/model/SpServiceDefinitionBuilderTest.java index 523d9c1f62..c66fd30c02 100644 --- a/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/model/SpServiceDefinitionBuilderTest.java +++ b/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/model/SpServiceDefinitionBuilderTest.java @@ -24,7 +24,13 @@ import org.apache.streampipes.extensions.api.connect.context.IAdapterGuessSchemaContext; import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext; import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor; +import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; +import org.apache.streampipes.extensions.api.migration.IAdapterMigrator; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.connect.guess.GuessSchema; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; +import org.apache.streampipes.model.migration.MigrationResult; +import org.apache.streampipes.model.migration.ModelMigratorConfig; import org.junit.Test; @@ -43,6 +49,27 @@ public void registerAdapter() { assertEquals(expected, result.getAdapters().get(0)); } + @Test + public void registerMigration() { + + var migration1 = new TestMigration("app-id", 1, 2); + var migration2 = new TestMigration("app-id", 0, 1); + var migration3 = new TestMigration("other-id", 0, 1); + var migration4 = new TestMigration("app-id", 0, 1); + + var result = SpServiceDefinitionBuilder.create("", "", "", 1) + .registerMigrators(migration1, migration2, migration3, migration4) + .build(); + + // assert de-duplication (last migration should not be registered) + assertEquals(3, result.getMigrators().size()); + + // assert ordering + assertEquals(migration2, result.getMigrators().get(0)); + assertEquals(migration1, result.getMigrators().get(1)); + assertEquals(migration3, result.getMigrators().get(2)); + } + private static class TestAdapter implements StreamPipesAdapter { @Override @@ -67,4 +94,29 @@ public GuessSchema onSchemaRequested(IAdapterParameterExtractor extractor, return null; } } + + private static class TestMigration implements IAdapterMigrator { + + String appId; + int fromVersion; + int toVersion; + + public TestMigration(String appId, int fromVersion, int toVersion) { + this.appId = appId; + this.fromVersion = fromVersion; + this.toVersion = toVersion; + } + + @Override + public ModelMigratorConfig config() { + return new ModelMigratorConfig(appId, SpServiceTagPrefix.ADAPTER, fromVersion, toVersion); + } + + @Override + public MigrationResult migrate( + AdapterDescription element, + IStaticPropertyExtractor extractor) throws RuntimeException { + return null; + } + } } diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java index 72270d31fd..fa651ea92d 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java @@ -30,6 +30,7 @@ import org.apache.streampipes.extensions.connectors.mqtt.adapter.MqttProtocol; import org.apache.streampipes.extensions.connectors.nats.adapter.NatsProtocol; import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter; +import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV1; import org.apache.streampipes.extensions.connectors.pulsar.adapter.PulsarProtocol; import org.apache.streampipes.extensions.connectors.rocketmq.adapter.RocketMQProtocol; import org.apache.streampipes.extensions.connectors.tubemq.adapter.TubeMQProtocol; @@ -63,6 +64,8 @@ public SpServiceDefinition provideServiceDefinition() { .registerAdapter(new RocketMQProtocol()) .registerAdapter(new HttpServerProtocol()) .registerAdapter(new TubeMQProtocol()) + + .registerMigrators(new OpcUaAdapterMigrationV1()) .build(); } } diff --git a/streampipes-extensions/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/netio/NetioRestAdapter.java b/streampipes-extensions/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/netio/NetioRestAdapter.java index e4898fb45e..02b7ee85ce 100644 --- a/streampipes-extensions/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/netio/NetioRestAdapter.java +++ b/streampipes-extensions/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/netio/NetioRestAdapter.java @@ -113,7 +113,7 @@ public PollingSettings getPollingInterval() { * * @param extractor StaticPropertyExtractor */ - private void applyConfiguration(IParameterExtractor extractor) { + private void applyConfiguration(IParameterExtractor extractor) { this.ip = extractor.singleValueParameter(NETIO_IP, String.class); this.username = extractor.singleValueParameter(NETIO_USERNAME, String.class); diff --git a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/shared/InfluxConfigs.java b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/shared/InfluxConfigs.java index d3eee716e1..4c6e1d4cd5 100644 --- a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/shared/InfluxConfigs.java +++ b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/shared/InfluxConfigs.java @@ -64,7 +64,7 @@ public static void appendSharedInfluxConfig(AbstractConfigurablePipelineElementB ); } - public static InfluxConnectionSettings fromExtractor(IParameterExtractor extractor) { + public static InfluxConnectionSettings fromExtractor(IParameterExtractor extractor) { String protocol = extractor.selectedSingleValueInternalName(DATABASE_PROTOCOL, String.class); String hostname = extractor.singleValueParameter(DATABASE_HOST_KEY, String.class); Integer port = extractor.singleValueParameter(DATABASE_PORT_KEY, Integer.class); diff --git a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConnectUtils.java b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConnectUtils.java index aedbf922a0..6b4b4a7e55 100644 --- a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConnectUtils.java +++ b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConnectUtils.java @@ -69,11 +69,11 @@ public static StaticPropertyAlternative getAlternativesTwo() { } - public static MqttConfig getMqttConfig(IParameterExtractor extractor) { + public static MqttConfig getMqttConfig(IParameterExtractor extractor) { return getMqttConfig(extractor, null); } - public static MqttConfig getMqttConfig(IParameterExtractor extractor, String topicInput) { + public static MqttConfig getMqttConfig(IParameterExtractor extractor, String topicInput) { MqttConfig mqttConfig; String brokerUrl = extractor.singleValueParameter(BROKER_URL, String.class); diff --git a/streampipes-extensions/streampipes-connectors-nats/src/main/java/org/apache/streampipes/extensions/connectors/nats/shared/NatsConfigUtils.java b/streampipes-extensions/streampipes-connectors-nats/src/main/java/org/apache/streampipes/extensions/connectors/nats/shared/NatsConfigUtils.java index 59194dd6f2..c23fba90a1 100644 --- a/streampipes-extensions/streampipes-connectors-nats/src/main/java/org/apache/streampipes/extensions/connectors/nats/shared/NatsConfigUtils.java +++ b/streampipes-extensions/streampipes-connectors-nats/src/main/java/org/apache/streampipes/extensions/connectors/nats/shared/NatsConfigUtils.java @@ -39,7 +39,7 @@ public class NatsConfigUtils { public static final String CONNECTION_PROPERTIES_GROUP = "connection-group"; public static final String PROPERTIES_KEY = "properties"; - public static NatsConfig from(IParameterExtractor extractor) { + public static NatsConfig from(IParameterExtractor extractor) { String subject = extractor.singleValueParameter(SUBJECT_KEY, String.class); String natsUrls = extractor.singleValueParameter(URLS_KEY, String.class); String authentication = extractor.selectedAlternativeInternalId(ACCESS_MODE); diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java index bfa035c0cc..5df4f6329d 100644 --- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java +++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java @@ -231,7 +231,7 @@ public StaticProperty resolveConfiguration(String staticPropertyInternalName, @Override public IAdapterConfiguration declareConfig() { - var builder = AdapterConfigurationBuilder.create(ID, OpcUaAdapter::new) + var builder = AdapterConfigurationBuilder.create(ID, 1, OpcUaAdapter::new) .withAssets(Assets.DOCUMENTATION, Assets.ICON) .withLocales(Locales.EN) .withCategory(AdapterType.Generic, AdapterType.Manufacturing) diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java index 647e66fdb7..a5691da311 100644 --- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java +++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java @@ -45,13 +45,7 @@ public class SharedUserConfiguration { public static void appendSharedOpcUaConfig(AbstractConfigurablePipelineElementBuilder builder, boolean adapterConfig) { - var dependsOn = adapterConfig ? List.of( - ADAPTER_TYPE.name(), - ACCESS_MODE.name(), - OPC_HOST_OR_URL.name() - ) : List.of( - ACCESS_MODE.name(), - OPC_HOST_OR_URL.name()); + var dependsOn = getDependsOn(adapterConfig); builder .requiredAlternatives(Labels.withId(ACCESS_MODE), @@ -86,4 +80,14 @@ public static void appendSharedOpcUaConfig(AbstractConfigurablePipelineElementBu adapterConfig ); } + + public static List getDependsOn(boolean adapterConfig) { + return adapterConfig ? List.of( + ADAPTER_TYPE.name(), + ACCESS_MODE.name(), + OPC_HOST_OR_URL.name() + ) : List.of( + ACCESS_MODE.name(), + OPC_HOST_OR_URL.name()); + } } diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java index 4f461c9774..2c3e3b1383 100644 --- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java +++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java @@ -62,11 +62,11 @@ public static OpcUaAdapterConfig extractAdapterConfig(IStaticPropertyExtractor e return config; } - public static OpcUaConfig extractSinkConfig(IParameterExtractor extractor) { + public static OpcUaConfig extractSinkConfig(IParameterExtractor extractor) { return extractSharedConfig(extractor, new OpcUaConfig()); } - public static T extractSharedConfig(IParameterExtractor extractor, + public static T extractSharedConfig(IParameterExtractor extractor, T config) { String selectedAlternativeConnection = diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV1.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV1.java new file mode 100644 index 0000000000..1269041f00 --- /dev/null +++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV1.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.connectors.opcua.migration; + +import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; +import org.apache.streampipes.extensions.api.migration.IAdapterMigrator; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; +import org.apache.streampipes.model.migration.MigrationResult; +import org.apache.streampipes.model.migration.ModelMigratorConfig; + +public class OpcUaAdapterMigrationV1 implements IAdapterMigrator { + + private static final String OldNamespaceIndexKey = "NAMESPACE_INDEX"; + private static final String OldNodeId = "NODE_ID"; + + @Override + public ModelMigratorConfig config() { + return new ModelMigratorConfig( + "org.apache.streampipes.connect.iiot.adapters.opcua", + SpServiceTagPrefix.ADAPTER, + 0, + 1); + } + + @Override + public MigrationResult migrate(AdapterDescription element, + IStaticPropertyExtractor extractor) throws RuntimeException { + + element.getConfig().removeIf(c -> c.getInternalName().equals(OldNamespaceIndexKey)); + element.getConfig().removeIf(c -> c.getInternalName().equals(OldNodeId)); + + return MigrationResult.success(element); + } +} diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentProcessor.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentProcessor.java index 20372678eb..3bb61d2654 100644 --- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentProcessor.java +++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentProcessor.java @@ -184,7 +184,7 @@ private List getColumnNames(String fileContents, List columnsToI .collect(Collectors.toList()); } - private String getFileContents(IParameterExtractor extractor) { + private String getFileContents(IParameterExtractor extractor) { String filename = extractor.selectedFilename(CSV_FILE_KEY); return getStreamPipesClientInstance().fileApi().getFileContentAsString(filename); } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java index 67be04100a..e2fcb83f7d 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/ConsumableStreamPipesEntity.java @@ -26,7 +26,7 @@ import java.util.ArrayList; import java.util.List; -public abstract class ConsumableStreamPipesEntity extends NamedStreamPipesEntity { +public abstract class ConsumableStreamPipesEntity extends VersionedNamedStreamPipesEntity { private static final long serialVersionUID = -6617391345752016449L; @@ -42,8 +42,8 @@ public ConsumableStreamPipesEntity() { this.staticProperties = new ArrayList<>(); } - public ConsumableStreamPipesEntity(String uri, String name, String description, String iconUrl) { - super(uri, name, description, iconUrl); + public ConsumableStreamPipesEntity(String elementId, String name, String description, String iconUrl) { + super(elementId, name, description, iconUrl); this.spDataStreams = new ArrayList<>(); this.staticProperties = new ArrayList<>(); } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java index b43c37d4af..2f1fc8fa12 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/InvocableStreamPipesEntity.java @@ -21,6 +21,7 @@ import org.apache.streampipes.commons.constants.InstanceIdExtractor; import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.api.EndpointSelectable; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; import org.apache.streampipes.model.grounding.EventGrounding; import org.apache.streampipes.model.monitoring.ElementStatusInfoSettings; import org.apache.streampipes.model.staticproperty.StaticProperty; @@ -30,7 +31,9 @@ import java.util.List; -public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity implements EndpointSelectable { +public abstract class InvocableStreamPipesEntity + extends VersionedNamedStreamPipesEntity + implements EndpointSelectable { protected List inputStreams; @@ -53,6 +56,7 @@ public abstract class InvocableStreamPipesEntity extends NamedStreamPipesEntity private boolean uncompleted; private String selectedEndpointUrl; + protected SpServiceTagPrefix serviceTagPrefix; public InvocableStreamPipesEntity() { super(); @@ -67,6 +71,7 @@ public InvocableStreamPipesEntity(InvocableStreamPipesEntity other) { this.uncompleted = other.isUncompleted(); this.correspondingUser = other.getCorrespondingUser(); this.selectedEndpointUrl = other.getSelectedEndpointUrl(); + this.serviceTagPrefix = other.serviceTagPrefix; if (other.getStreamRequirements() != null) { this.streamRequirements = new Cloner().streams(other.getStreamRequirements()); } @@ -79,8 +84,15 @@ public InvocableStreamPipesEntity(InvocableStreamPipesEntity other) { } } - public InvocableStreamPipesEntity(String uri, String name, String description, String iconUrl) { + public InvocableStreamPipesEntity( + String uri, + String name, + String description, + String iconUrl, + SpServiceTagPrefix serviceTagPrefix + ) { super(uri, name, description, iconUrl); + this.serviceTagPrefix = serviceTagPrefix; this.configured = false; } @@ -170,6 +182,10 @@ public void setUncompleted(boolean uncompleted) { this.uncompleted = uncompleted; } + public SpServiceTagPrefix getServiceTagPrefix() { + return serviceTagPrefix; + } + @Override public String getSelectedEndpointUrl() { return selectedEndpointUrl; diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/base/VersionedNamedStreamPipesEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/base/VersionedNamedStreamPipesEntity.java new file mode 100644 index 0000000000..4d533eaf3e --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/base/VersionedNamedStreamPipesEntity.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.streampipes.model.base; + +public abstract class VersionedNamedStreamPipesEntity extends NamedStreamPipesEntity { + + private int version; + + public VersionedNamedStreamPipesEntity() { + super(); + this.version = 0; + } + + public VersionedNamedStreamPipesEntity(VersionedNamedStreamPipesEntity other) { + super(other); + this.version = other.version; + } + + public VersionedNamedStreamPipesEntity(String elementId, String name, String description){ + super(elementId, name, description); + } + + public VersionedNamedStreamPipesEntity(String elementId, String name, String description, String iconUrl) { + super(elementId, name, description, iconUrl); + } + + public int getVersion(){ + return version; + } + + public void setVersion(int version) { + this.version = version; + } +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java index d77d8567a3..5fd6b21b3f 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java @@ -19,7 +19,7 @@ package org.apache.streampipes.model.connect.adapter; import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.base.NamedStreamPipesEntity; +import org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity; import org.apache.streampipes.model.connect.rules.TransformationRuleDescription; import org.apache.streampipes.model.connect.rules.schema.SchemaTransformationRuleDescription; import org.apache.streampipes.model.connect.rules.stream.StreamTransformationRuleDescription; @@ -35,7 +35,7 @@ import java.util.List; @TsModel -public class AdapterDescription extends NamedStreamPipesEntity { +public class AdapterDescription extends VersionedNamedStreamPipesEntity { protected SpDataStream dataStream; @@ -74,6 +74,16 @@ public AdapterDescription() { this.dataStream = new SpDataStream(); } + public AdapterDescription(int version) { + super(); + this.rules = new ArrayList<>(); + this.eventGrounding = new EventGrounding(); + this.config = new ArrayList<>(); + this.category = new ArrayList<>(); + this.dataStream = new SpDataStream(); + this.setVersion(version); + } + public AdapterDescription(String elementId, String name, String description) { super(elementId, name, description); this.rules = new ArrayList<>(); @@ -249,5 +259,4 @@ public boolean isRunning() { public void setRunning(boolean running) { this.running = running; } - } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/GenericAdapterConverter.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/GenericAdapterConverter.java index 8b1f17e72e..abd51c8187 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/GenericAdapterConverter.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/GenericAdapterConverter.java @@ -72,14 +72,21 @@ public JsonObject convert(JsonObject adapter) { helpers.updateFieldType(adapter); } - JsonObject formatDescription = getProperties(adapter).get(FORMAT_DESC_KEY).getAsJsonObject(); - JsonObject protocolDescription = getProperties(adapter).get(PROTOCOL_DESC_KEY).getAsJsonObject(); + var properties = getProperties(adapter); + if (!properties.has(CONFIG_KEY)) { + properties.add(CONFIG_KEY, new JsonArray()); + } + + JsonObject protocolDescription = properties.get(PROTOCOL_DESC_KEY).getAsJsonObject(); migrateProtocolDescription(adapter, protocolDescription); - migrateFormatDescription(adapter, formatDescription); + properties.remove(PROTOCOL_DESC_KEY); - getProperties(adapter).remove(FORMAT_DESC_KEY); - getProperties(adapter).remove(PROTOCOL_DESC_KEY); + if (properties.has(FORMAT_DESC_KEY)) { + JsonObject formatDescription = properties.get(FORMAT_DESC_KEY).getAsJsonObject(); + migrateFormatDescription(adapter, formatDescription); + properties.remove(FORMAT_DESC_KEY); + } return adapter; } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/migration/MigrationRequest.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/migration/MigrationRequest.java new file mode 100644 index 0000000000..13837d12e1 --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/migration/MigrationRequest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.extensions.migration; + +import org.apache.streampipes.model.migration.ModelMigratorConfig; + +/** + * Models a request that is sent from the core to an extensions service to mandate a migration. + * @param migrationElement element that needs to be migrated + * @param modelMigratorConfig migration config that describes the migration to be applied. + */ +public record MigrationRequest(T migrationElement, ModelMigratorConfig modelMigratorConfig) {} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java index 89cac11dff..642134bf1e 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java @@ -20,6 +20,7 @@ import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; import org.apache.streampipes.model.output.OutputStrategy; import org.apache.streampipes.model.staticproperty.StaticProperty; import org.apache.streampipes.model.util.Cloner; @@ -57,6 +58,7 @@ public DataProcessorInvocation(DataProcessorDescription other) { this.setIncludesAssets(other.isIncludesAssets()); this.setIncludedAssets(other.getIncludedAssets()); this.setElementId(ElementIdGenerator.makeElementId(this)); + this.serviceTagPrefix = SpServiceTagPrefix.DATA_PROCESSOR; } public DataProcessorInvocation(DataProcessorInvocation other) { @@ -67,6 +69,7 @@ public DataProcessorInvocation(DataProcessorInvocation other) { } this.pathName = other.getPathName(); this.category = new Cloner().epaTypes(other.getCategory()); + this.serviceTagPrefix = SpServiceTagPrefix.DATA_PROCESSOR; } public DataProcessorInvocation(DataProcessorDescription sepa, String domId) { @@ -77,18 +80,19 @@ public DataProcessorInvocation(DataProcessorDescription sepa, String domId) { public DataProcessorInvocation() { super(); inputStreams = new ArrayList<>(); + this.serviceTagPrefix = SpServiceTagPrefix.DATA_PROCESSOR; } public DataProcessorInvocation(String uri, String name, String description, String iconUrl, String pathName, List spDataStreams, List staticProperties) { - super(uri, name, description, iconUrl); + super(uri, name, description, iconUrl, SpServiceTagPrefix.DATA_PROCESSOR); this.pathName = pathName; this.inputStreams = spDataStreams; this.staticProperties = staticProperties; } public DataProcessorInvocation(String uri, String name, String description, String iconUrl, String pathName) { - super(uri, name, description, iconUrl); + super(uri, name, description, iconUrl, SpServiceTagPrefix.DATA_PROCESSOR); this.pathName = pathName; inputStreams = new ArrayList<>(); staticProperties = new ArrayList<>(); diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java index f334778508..235ca33b44 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java @@ -19,6 +19,7 @@ package org.apache.streampipes.model.graph; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; import org.apache.streampipes.model.staticproperty.StaticProperty; import org.apache.streampipes.model.util.ElementIdGenerator; @@ -34,6 +35,7 @@ public class DataSinkInvocation extends InvocableStreamPipesEntity { public DataSinkInvocation(DataSinkInvocation sec) { super(sec); this.category = sec.getCategory(); + this.serviceTagPrefix = SpServiceTagPrefix.DATA_SINK; } public DataSinkInvocation(DataSinkDescription other) { @@ -51,6 +53,7 @@ public DataSinkInvocation(DataSinkDescription other) { this.setIncludesAssets(other.isIncludesAssets()); this.setElementId(ElementIdGenerator.makeElementId(this)); this.setIncludedAssets(other.getIncludedAssets()); + this.serviceTagPrefix = SpServiceTagPrefix.DATA_SINK; } public DataSinkInvocation(DataSinkDescription sec, String domId) { @@ -61,6 +64,7 @@ public DataSinkInvocation(DataSinkDescription sec, String domId) { public DataSinkInvocation() { super(); inputStreams = new ArrayList<>(); + this.serviceTagPrefix = SpServiceTagPrefix.DATA_SINK; } public List getStaticProperties() { diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/migration/MigrationResult.java b/streampipes-model/src/main/java/org/apache/streampipes/model/migration/MigrationResult.java new file mode 100644 index 0000000000..805dc65ea4 --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/migration/MigrationResult.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.migration; + +/** + * Models the outcome of a migration. + * @param success whether the migration was successfully or not. + * @param element the migrated pipeline element or in case of a failure the original one + * @param message message that describes the outcome of the migration + * @param type of the migration element + */ +public record MigrationResult (boolean success, T element, String message){ + + public static MigrationResult failure(T element, String message) { + return new MigrationResult<>(false, element, message); + } + + public static MigrationResult success(T element) { + return new MigrationResult<>(true, element, "SUCCESS"); + } +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/migration/ModelMigratorConfig.java b/streampipes-model/src/main/java/org/apache/streampipes/model/migration/ModelMigratorConfig.java new file mode 100644 index 0000000000..23356e7502 --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/migration/ModelMigratorConfig.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.migration; + +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; + +/** + * The configuration element for a 'ModelMigrator'. + * + * @param targetAppId 'appId' of the model to be migrated + * @param modelType Type of the model to be migrated, e.g., adapter + * @param fromVersion Base version from which the migration starts + * @param toVersion Target version that the migration aims to achieve + */ +public record ModelMigratorConfig(String targetAppId, SpServiceTagPrefix modelType, + int fromVersion, int toVersion) implements Comparable{ + + + @Override + public int compareTo(Object o) { + + if (o == null) { + throw new NullPointerException(); + } + + if (!(o instanceof ModelMigratorConfig)){ + throw new ClassCastException("Given object is not an instance of `ModelMigratorConfig` - " + + "only instances of `ModelMigratorConfig` can be compared."); + } + + if (targetAppId.equals(((ModelMigratorConfig) o).targetAppId())) { + return this.fromVersion() - ((ModelMigratorConfig) o).fromVersion(); + } + + return 0; + } +} diff --git a/streampipes-model/src/test/java/org/apache/streampipes/model/migration/ModelMigratorConfigTest.java b/streampipes-model/src/test/java/org/apache/streampipes/model/migration/ModelMigratorConfigTest.java new file mode 100644 index 0000000000..914e1ed4ee --- /dev/null +++ b/streampipes-model/src/test/java/org/apache/streampipes/model/migration/ModelMigratorConfigTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.migration; + +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +public class ModelMigratorConfigTest { + + static ModelMigratorConfig config = new ModelMigratorConfig("app-id", SpServiceTagPrefix.ADAPTER, 0, 1); + + @Test + public void compareToWithNullPointer() { + assertThrows(NullPointerException.class, () -> config.compareTo(null)); + } + + @Test + public void compareToWithClassCastException() { + assertThrows(ClassCastException.class, () -> config.compareTo(5)); + } + + @Test + public void compareTo() { + assertEquals(0, config.compareTo(new ModelMigratorConfig("other-app-id", SpServiceTagPrefix.ADAPTER, 0, 1))); + assertEquals(0, config.compareTo(new ModelMigratorConfig("app-id", SpServiceTagPrefix.ADAPTER, 0, 1))); + assertEquals(-1, config.compareTo(new ModelMigratorConfig("app-id", SpServiceTagPrefix.ADAPTER, 1, 2))); + assertEquals(0, config.compareTo(new ModelMigratorConfig("app-id", SpServiceTagPrefix.ADAPTER, 0, 3))); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java new file mode 100644 index 0000000000..3641f0b7c7 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.migration; + +import org.apache.streampipes.commons.exceptions.SepaParseException; +import org.apache.streampipes.manager.endpoint.HttpJsonParser; +import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; +import org.apache.streampipes.manager.operations.Operations; +import org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity; +import org.apache.streampipes.model.extensions.migration.MigrationRequest; +import org.apache.streampipes.model.message.Notification; +import org.apache.streampipes.model.migration.MigrationResult; +import org.apache.streampipes.model.migration.ModelMigratorConfig; +import org.apache.streampipes.serializers.json.JacksonSerializer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.streampipes.manager.migration.MigrationUtils.getRequestUrl; + +public abstract class AbstractMigrationManager { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractMigrationManager.class); + + protected static final String MIGRATION_ENDPOINT = "api/v1/migrations"; + + /** + * Performs the actual migration of a pipeline element. + * This includes the communication with the extensions service which runs the migration. + * + * @param pipelineElement pipeline element to be migrated + * @param migrationConfig config of the migration to be performed + * @param url url of the migration endpoint at the extensions service + * where the migration should be performed + * @param type of the processing element + * @return result of the migration + */ + protected MigrationResult performMigration( + T pipelineElement, + ModelMigratorConfig migrationConfig, + String url + ) { + + try { + + var migrationRequest = new MigrationRequest<>(pipelineElement, migrationConfig); + + String serializedRequest = JacksonSerializer.getObjectMapper().writeValueAsString(migrationRequest); + + var migrationResponse = ExtensionServiceExecutions.extServicePostRequest( + url, + serializedRequest + ).execute(); + + TypeReference> typeReference = new TypeReference<>() { + }; + + return JacksonSerializer + .getObjectMapper() + .readValue(migrationResponse.returnContent().asString(), typeReference); + } catch (JsonProcessingException e) { + LOG.error( + "Migration of pipeline element failed before sending to the extensions service, " + + "pipeline element is not migrated. Serialization of migration request failed: {}", + StringUtils.join(e.getStackTrace(), "\n") + ); + } catch (IOException e) { + LOG.error("Migration of pipeline element failed at the extensions service, pipeline element is not migrated: {}.", + StringUtils.join(e.getStackTrace(), "\n") + ); + } + return MigrationResult.failure(pipelineElement, "Internal error during migration at StreamPipes Core"); + } + + /** + * Update all descriptions of entities in the Core that are affected by migrations. + * + * @param migrationConfigs List of migrations to take in account + * @param serviceUrl Url of the extension service that provides the migrations. + */ + protected void updateDescriptions(List migrationConfigs, String serviceUrl) { + migrationConfigs + .stream() + .collect( + // We only need to update the description once per appId, + // because this is directly done with the newest version of the description and + // there is iterative migration required. + // To avoid unnecessary, multiple updates, + // we filter the migration configs such that every appId is unique. + // This ensures that every description is only updated once. + Collectors.toMap( + ModelMigratorConfig::targetAppId, + Function.identity(), + (existing, replacement) -> existing + ) + ) + .values() + .stream() + .peek(config -> { + var requestUrl = getRequestUrl(config.modelType(), config.targetAppId(), serviceUrl); + performUpdate(requestUrl); + }) + .toList(); + } + + /** + * Perform the update of the description based on the given requestUrl + * + * @param requestUrl URl that references the description to be updated at the extensions service. + */ + protected void performUpdate(String requestUrl) { + + try { + var entityPayload = HttpJsonParser.getContentFromUrl(URI.create(requestUrl)); + var updateResult = Operations.verifyAndUpdateElement(entityPayload); + if (!updateResult.isSuccess()) { + LOG.error( + "Updating the pipeline element description failed: {}", + StringUtils.join( + updateResult.getNotifications().stream().map(Notification::toString).toList(), + "\n") + ); + } + } catch (IOException | SepaParseException e) { + LOG.error("Updating the pipeline element description failed due to the following exception:\n{}", + StringUtils.join(e.getStackTrace(), "\n") + ); + } + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/IMigrationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/IMigrationHandler.java new file mode 100644 index 0000000000..7847e24324 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/IMigrationHandler.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.migration; + +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; +import org.apache.streampipes.model.migration.ModelMigratorConfig; + +import java.util.List; + +public interface IMigrationHandler { + + void handleMigrations(SpServiceRegistration serviceRegistration, + List migrationConfigs); +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationUtils.java new file mode 100644 index 0000000000..8a96c3c1f4 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/MigrationUtils.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.migration; + +import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; +import org.apache.streampipes.model.migration.ModelMigratorConfig; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; + +import java.util.List; +import java.util.Optional; + +public class MigrationUtils { + + /** + * Filter the application migration for a pipeline definition. + * By definition, there is only one migration config that fulfills the requirements. + * Otherwise, it should have been detected as duplicate by the extensions service. + * + * @param pipelineElement pipeline element that should be migrated + * @param migrationConfigs available migration configs to pick the applicable from + * @return config that is applicable for the given pipeline element + */ + public static Optional getApplicableMigration( + InvocableStreamPipesEntity pipelineElement, + List migrationConfigs + ) { + return migrationConfigs + .stream() + .filter( + config -> config.modelType().equals(pipelineElement.getServiceTagPrefix()) + && config.targetAppId().equals(pipelineElement.getAppId()) + && config.fromVersion() == pipelineElement.getVersion() + ) + .findFirst(); + } + + /** + * Get the URL that provides the description for an entity. + * + * @param entityType Type of the entity to be updated. + * @param appId AppId of the entity to be updated + * @param serviceUrl URL of the extensions service to which the entity belongs + * @return URL of the endpoint that provides the description for the given entity + */ + public static String getRequestUrl(SpServiceTagPrefix entityType, String appId, String serviceUrl) { + + SpServiceUrlProvider urlProvider; + switch (entityType) { + case ADAPTER -> urlProvider = SpServiceUrlProvider.ADAPTER; + case DATA_PROCESSOR -> urlProvider = SpServiceUrlProvider.DATA_PROCESSOR; + case DATA_SINK -> urlProvider = SpServiceUrlProvider.DATA_SINK; + default -> throw new RuntimeException("Unexpected instance type."); + } + return urlProvider.getInvocationUrl(serviceUrl, appId); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java new file mode 100644 index 0000000000..28338e1ac0 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.migration; + +import org.apache.streampipes.manager.execution.PipelineExecutor; +import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.graph.DataSinkInvocation; +import org.apache.streampipes.model.migration.MigrationResult; +import org.apache.streampipes.model.migration.ModelMigratorConfig; +import org.apache.streampipes.model.pipeline.Pipeline; +import org.apache.streampipes.model.pipeline.PipelineHealthStatus; +import org.apache.streampipes.model.staticproperty.StaticProperty; +import org.apache.streampipes.storage.api.IDataProcessorStorage; +import org.apache.streampipes.storage.api.IDataSinkStorage; +import org.apache.streampipes.storage.api.IPipelineStorage; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.streampipes.manager.migration.MigrationUtils.getApplicableMigration; + +public class PipelineElementMigrationManager extends AbstractMigrationManager implements IMigrationHandler { + + private static final Logger LOG = LoggerFactory.getLogger(PipelineElementMigrationManager.class); + + private final IPipelineStorage pipelineStorage; + private final IDataProcessorStorage dataProcessorStorage; + private final IDataSinkStorage dataSinkStorage; + + public PipelineElementMigrationManager(IPipelineStorage pipelineStorage, + IDataProcessorStorage dataProcessorStorage, + IDataSinkStorage dataSinkStorage) { + this.pipelineStorage = pipelineStorage; + this.dataProcessorStorage = dataProcessorStorage; + this.dataSinkStorage = dataSinkStorage; + } + + @Override + public void handleMigrations(SpServiceRegistration extensionsServiceConfig, + List migrationConfigs) { + + LOG.info("Updating pipeline element descriptions by replacement..."); + updateDescriptions(migrationConfigs, extensionsServiceConfig.getServiceUrl()); + LOG.info("Pipeline element descriptions are up to date."); + + LOG.info("Received {} pipeline element migrations from extension service {}.", + migrationConfigs.size(), + extensionsServiceConfig.getServiceUrl()); + var availablePipelines = pipelineStorage.getAllPipelines(); + if (!availablePipelines.isEmpty()) { + LOG.info("Found {} available pipelines. Checking pipelines for applicable migrations...", + availablePipelines.size() + ); + } + + for (var pipeline : availablePipelines) { + List> failedMigrations = new ArrayList<>(); + + var migratedDataProcessors = pipeline.getSepas() + .stream() + .map(processor -> { + if (getApplicableMigration(processor, migrationConfigs).isPresent()) { + return migratePipelineElement( + processor, + migrationConfigs, + String.format("%s/%s/processor", + extensionsServiceConfig.getServiceUrl(), + MIGRATION_ENDPOINT + ), + failedMigrations + ); + } else { + LOG.info("No migration applicable for data processor '{}'.", processor.getElementId()); + return processor; + } + }) + .toList(); + pipeline.setSepas(migratedDataProcessors); + + var migratedDataSinks = pipeline.getActions() + .stream() + .map(sink -> { + if (getApplicableMigration(sink, migrationConfigs).isPresent()) { + return migratePipelineElement( + sink, + migrationConfigs, + String.format("%s/%s/sink", + extensionsServiceConfig.getServiceUrl(), + MIGRATION_ENDPOINT + ), + failedMigrations + ); + } else { + LOG.info("No migration applicable for data sink '{}'.", sink.getElementId()); + return sink; + } + }) + .toList(); + pipeline.setActions(migratedDataSinks); + + pipelineStorage.updatePipeline(pipeline); + + if (failedMigrations.isEmpty()) { + LOG.info("Migration for pipeline successfully completed."); + } else { + // pass most recent version of pipeline + handleFailedMigrations(pipelineStorage.getPipeline(pipeline.getPipelineId()), failedMigrations); + } + } + } + + /** + * Takes care about the failed migrations of pipeline elements. + * This includes the following steps: + *
    + *
  • logging of failed pipeline elements + *
  • setting migration results as pipeline notifications + *
  • updating pipeline health status + *
  • stopping the pipeline + *
+ * + * @param pipeline the pipeline affected by failed migrations + * @param failedMigrations the list of failed migrations + */ + protected void handleFailedMigrations(Pipeline pipeline, List> failedMigrations) { + LOG.error("Failures in migration detected - The following pipeline elements could to be migrated:\n" + + StringUtils.join(failedMigrations.stream().map(Record::toString).toList()), "\n"); + + pipeline.setPipelineNotifications(failedMigrations.stream().map( + failedMigration -> "Failed migration of pipeline element: %s".formatted(failedMigration.message()) + ).toList()); + pipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION); + + pipelineStorage.updatePipeline(pipeline); + + // get updated version of pipeline after modification + pipeline = pipelineStorage.getPipeline(pipeline.getPipelineId()); + + stopPipeline(pipeline); + } + + + public void stopPipeline(Pipeline pipeline) { + var pipelineExecutor = new PipelineExecutor(pipeline, true); + var pipelineStopResult = pipelineExecutor.stopPipeline(); + + if (pipelineStopResult.isSuccess()) { + LOG.info("Pipeline successfully stopped."); + } else { + LOG.error("Pipeline stop failed."); + } + } + + /** + * Handle the migration of a pipeline element with respect to the given model migration configs. + * All applicable migrations found in the provided configs are executed for the given pipeline element. + * In case a migration fails, the related pipeline element receives the latest definition of its static properties, + * so that the pipeline element can be adapted by the user to resolve the failed migration. + * + * @param pipelineElement pipeline element to be migrated + * @param modelMigrations list of model migrations that might be applicable for this pipeline element + * @param url url of the extensions service endpoint that handles the migration + * @param failedMigrations collection of failed migrations which is extended by occurring migration failures + * @param type of the pipeline element (e.g., DataProcessorInvocation) + * @return the migrated (or - in case of a failure - updated) pipeline element + */ + protected T migratePipelineElement( + T pipelineElement, + List modelMigrations, + String url, + List> failedMigrations + ) { + + // loop until no migrations are available anymore + // this allows to apply multiple migrations for a pipeline element sequentially + // For example, first migration from 0 to 1 and the second migration from 1 to 2 + while (getApplicableMigration(pipelineElement, modelMigrations).isPresent() && failedMigrations.isEmpty()) { + + var migrationConfig = getApplicableMigration(pipelineElement, modelMigrations).get(); + LOG.info( + "Found applicable migration for pipeline element '{}': {}", + pipelineElement.getElementId(), + migrationConfig + ); + + var migrationResult = performMigration( + pipelineElement, + migrationConfig, + url + ); + + if (migrationResult.success()) { + LOG.info("Migration successfully performed by extensions service. Updating pipeline element invocation ..."); + LOG.debug("Migration was performed at extensions service endpoint '{}'", url); + pipelineElement = migrationResult.element(); + } else { + LOG.error("Migration failed with the following reason: {}", migrationResult.message()); + failedMigrations.add(migrationResult); + } + } + if (!failedMigrations.isEmpty()) { + updateFailedPipelineElement(pipelineElement); + LOG.info("Updated pipeline elements with new description where automatic migration failed."); + } + return pipelineElement; + } + + /** + * Update the static properties of the failed pipeline element with its description. + * This allows to adapt the failed pipeline element in the UI to overcome the failed migration. + * + * @param pipelineElement pipeline element with failed migration + */ + protected void updateFailedPipelineElement(InvocableStreamPipesEntity pipelineElement) { + List updatedStaticProperties = new ArrayList<>(); + if (pipelineElement instanceof DataProcessorInvocation) { + updatedStaticProperties = dataProcessorStorage + .getFirstDataProcessorByAppId(pipelineElement.getAppId()) + .getStaticProperties(); + } else if (pipelineElement instanceof DataSinkInvocation) { + updatedStaticProperties = dataSinkStorage + .getFirstDataSinkByAppId(pipelineElement.getAppId()) + .getStaticProperties(); + } + pipelineElement.setStaticProperties(updatedStaticProperties); + } +} diff --git a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/migration/MigrationUtilsTest.java b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/migration/MigrationUtilsTest.java new file mode 100644 index 0000000000..05545db197 --- /dev/null +++ b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/migration/MigrationUtilsTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.migration; + +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.graph.DataSinkInvocation; +import org.apache.streampipes.model.migration.ModelMigratorConfig; + +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class MigrationUtilsTest { + + List migrationConfigs = List.of( + new ModelMigratorConfig("app-id", SpServiceTagPrefix.DATA_PROCESSOR, 0, 1), + new ModelMigratorConfig("app-id", SpServiceTagPrefix.DATA_PROCESSOR, 1, 2), + new ModelMigratorConfig("other-app-id", SpServiceTagPrefix.DATA_PROCESSOR, 0, 1), + new ModelMigratorConfig("other-app-id", SpServiceTagPrefix.DATA_SINK, 1, 2) + ); + + @Test + public void findMigrations() { + + var pipelineElement1 = new DataProcessorInvocation(); + pipelineElement1.setAppId("app-id"); + pipelineElement1.setVersion(0); + + var pipelineElement2 = new DataProcessorInvocation(); + pipelineElement2.setAppId("app-id"); + pipelineElement2.setVersion(1); + + var pipelineElement3 = new DataProcessorInvocation(); + pipelineElement3.setAppId("other-app-id"); + pipelineElement3.setVersion(0); + + var pipelineElement4 = new DataSinkInvocation(); + pipelineElement4.setAppId("other-app-id"); + pipelineElement4.setVersion(0); + + assertEquals( + migrationConfigs.get(0), + MigrationUtils.getApplicableMigration(pipelineElement1, migrationConfigs).get() + ); + assertEquals( + migrationConfigs.get(1), + MigrationUtils.getApplicableMigration(pipelineElement2, migrationConfigs).get() + ); + assertEquals( + migrationConfigs.get(2), + MigrationUtils.getApplicableMigration(pipelineElement3, migrationConfigs).get() + ); + assertTrue( + MigrationUtils.getApplicableMigration(pipelineElement4, migrationConfigs).isEmpty() + ); + } +} diff --git a/streampipes-rest-extensions/pom.xml b/streampipes-rest-extensions/pom.xml index b3a39e4319..211d2e73cb 100644 --- a/streampipes-rest-extensions/pom.xml +++ b/streampipes-rest-extensions/pom.xml @@ -32,12 +32,16 @@ + + org.apache.streampipes + streampipes-rest + 0.93.0-SNAPSHOT + org.apache.streampipes streampipes-rest-shared 0.93.0-SNAPSHOT - org.apache.streampipes streampipes-extensions-management @@ -64,6 +68,10 @@ + + org.springframework.security + spring-security-core + diff --git a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/AdapterMigrationResource.java b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/AdapterMigrationResource.java new file mode 100644 index 0000000000..f4e94691ce --- /dev/null +++ b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/AdapterMigrationResource.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.rest.extensions.migration; + +import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; +import org.apache.streampipes.extensions.api.migration.IAdapterMigrator; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; +import org.apache.streampipes.model.extensions.migration.MigrationRequest; +import org.apache.streampipes.rest.security.AuthConstants; +import org.apache.streampipes.rest.shared.annotation.JacksonSerialized; +import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.ExampleObject; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import org.apache.http.HttpStatus; +import org.springframework.security.access.prepost.PreAuthorize; + +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; + + +@Path("/api/v1/migrations/adapter") +public class AdapterMigrationResource extends MigrateExtensionsResource< + AdapterDescription, + IStaticPropertyExtractor, + IAdapterMigrator + > { + + @POST + @Consumes(MediaType.APPLICATION_JSON) + @JacksonSerialized + @PreAuthorize(AuthConstants.IS_ADMIN_ROLE) + @Operation( + summary = "Execute the migration for a specific adapter instance", tags = {"Extensions", "Migration"}, + responses = { + @ApiResponse( + responseCode = "" + HttpStatus.SC_OK, + description = "The migration was executed. Its result is described in the response. " + + "The Response needs to be handled accordingly.", + content = @Content( + examples = @ExampleObject( + name = "Successful migration", + value = "{\"success\": true,\"messages\": \"SUCCESS\", \"element\": {}}" + ), + mediaType = MediaType.APPLICATION_JSON + ) + ) + } + ) + public Response migrateAdapter( + @Parameter( + description = "request that encompasses the adapter description(AdapterDescription) and " + + "the configuration of the migration", + example = "{\"migrationElement\": {}, \"modelMigratorConfig\": {\"targetAppId\": \"app-id\"," + + "\"modelType\": \"adapter\", \"fromVersion\": 0, \"toVersion\": 1}}", + required = true + ) + MigrationRequest adapterMigrationRequest) { + return ok(handleMigration(adapterMigrationRequest)); + } + + @Override + protected IStaticPropertyExtractor getPropertyExtractor(AdapterDescription pipelineElementDescription) { + return StaticPropertyExtractor.from(pipelineElementDescription.getConfig()); + } +} diff --git a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/DataProcessorMigrationResource.java b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/DataProcessorMigrationResource.java new file mode 100644 index 0000000000..f8153b4640 --- /dev/null +++ b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/DataProcessorMigrationResource.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.rest.extensions.migration; + +import org.apache.streampipes.extensions.api.extractor.IDataProcessorParameterExtractor; +import org.apache.streampipes.extensions.api.migration.IDataProcessorMigrator; +import org.apache.streampipes.model.extensions.migration.MigrationRequest; +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.rest.security.AuthConstants; +import org.apache.streampipes.rest.shared.annotation.JacksonSerialized; +import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.ExampleObject; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import org.apache.http.HttpStatus; +import org.springframework.security.access.prepost.PreAuthorize; + +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; + +@Path("api/v1/migrations/processor") +public class DataProcessorMigrationResource extends MigrateExtensionsResource< + DataProcessorInvocation, + IDataProcessorParameterExtractor, + IDataProcessorMigrator + > { + + @POST + @Consumes(MediaType.APPLICATION_JSON) + @JacksonSerialized + @PreAuthorize(AuthConstants.IS_ADMIN_ROLE) + @Operation( + summary = "Execute the migration for a specific data processor instance", tags = {"Extensions", "Migration"}, + responses = { + @ApiResponse( + responseCode = "" + HttpStatus.SC_OK, + description = "The migration was executed. It's result is described in the response. " + + "The Response needs to be handled accordingly.", + content = @Content( + examples = @ExampleObject( + name = "Successful migration", + value = "{\"success\": true,\"messages\": \"SUCCESS\", \"element\": {}}" + ), + mediaType = MediaType.APPLICATION_JSON + ) + ) + } + ) + public Response migrateDataProcessor( + @Parameter( + description = "Request that encompasses the data processor description (DataProcessorInvocation) and " + + "the configuration of the migration to be performed", + example = "{\"migrationElement\": {}, \"modelMigratorConfig\": {\"targetAppId\": \"app-id\", " + + "\"modelType\": \"dprocessor\", \"fromVersion\": 0, \"toVersion\": 1}}", + required = true + ) + MigrationRequest processorMigrationRequest) { + return ok(handleMigration(processorMigrationRequest)); + } + + @Override + protected IDataProcessorParameterExtractor getPropertyExtractor(DataProcessorInvocation pipelineElementDescription) { + return ProcessingElementParameterExtractor.from(pipelineElementDescription); + } +} diff --git a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/DataSinkMigrationResource.java b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/DataSinkMigrationResource.java new file mode 100644 index 0000000000..98569ffae7 --- /dev/null +++ b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/DataSinkMigrationResource.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.rest.extensions.migration; + +import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor; +import org.apache.streampipes.extensions.api.migration.DataSinkMigrator; +import org.apache.streampipes.model.extensions.migration.MigrationRequest; +import org.apache.streampipes.model.graph.DataSinkInvocation; +import org.apache.streampipes.rest.security.AuthConstants; +import org.apache.streampipes.rest.shared.annotation.JacksonSerialized; +import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.ExampleObject; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import org.apache.http.HttpStatus; +import org.springframework.security.access.prepost.PreAuthorize; + +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; + +public class DataSinkMigrationResource extends MigrateExtensionsResource< + DataSinkInvocation, + IDataSinkParameterExtractor, + DataSinkMigrator + > { + @POST + @Consumes(MediaType.APPLICATION_JSON) + @JacksonSerialized + @PreAuthorize(AuthConstants.IS_ADMIN_ROLE) + @Operation( + summary = "Execute the migration for a specific data sink instance", tags = {"Extensions", "Migration"}, + responses = { + @ApiResponse( + responseCode = "" + HttpStatus.SC_OK, + description = "The migration was executed. It's result is described in the response. " + + "The Response needs to be handled accordingly.", + content = @Content( + examples = @ExampleObject( + name = "Successful migration", + value = "{\"success\": true,\"messages\": \"SUCCESS\", \"element\": {}}" + ), + mediaType = MediaType.APPLICATION_JSON + ) + ) + } + ) + public Response migrateDataSink( + @Parameter( + description = "Request that encompasses the data sink description (DataSinkInvocation) and " + + "the configuration of the migration to be performed", + example = "{\"migrationElement\": {}, \"modelMigratorConfig\": {\"targetAppId\": \"app-id\", " + + "\"modelType\": \"dsink\", \"fromVersion\": 0, \"toVersion\": 1}}", + required = true + ) + MigrationRequest sinkMigrationRequest) { + return ok(handleMigration(sinkMigrationRequest)); + } + + @Override + protected IDataSinkParameterExtractor getPropertyExtractor(DataSinkInvocation pipelineElementDescription) { + return DataSinkParameterExtractor.from(pipelineElementDescription); + } +} diff --git a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/MigrateExtensionsResource.java b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/MigrateExtensionsResource.java new file mode 100644 index 0000000000..07fc157f6c --- /dev/null +++ b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/MigrateExtensionsResource.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.rest.extensions.migration; + +import org.apache.streampipes.extensions.api.extractor.IParameterExtractor; +import org.apache.streampipes.extensions.api.migration.IModelMigrator; +import org.apache.streampipes.extensions.management.init.DeclarersSingleton; +import org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity; +import org.apache.streampipes.model.extensions.migration.MigrationRequest; +import org.apache.streampipes.model.migration.MigrationResult; +import org.apache.streampipes.model.migration.ModelMigratorConfig; +import org.apache.streampipes.rest.extensions.AbstractExtensionsResource; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +public abstract class MigrateExtensionsResource< + T extends VersionedNamedStreamPipesEntity, + ExT extends IParameterExtractor, + MmT extends IModelMigrator> extends AbstractExtensionsResource { + + private static final Logger LOG = LoggerFactory.getLogger(MigrateExtensionsResource.class); + + /** + * Find and return the corresponding {@link IModelMigrator} instance within the registered migrators. + * This allows to pass the corresponding model migrator to a {@link ModelMigratorConfig} which is exchanged + * between Core and Extensions service. + * + * @param modelMigratorConfig config that describes the model migrator to be returned + * @return Optional model migrator which is empty in case no appropriate migrator is found among the registered. + */ + public Optional getMigrator(ModelMigratorConfig modelMigratorConfig) { + return DeclarersSingleton.getInstance().getServiceDefinition().getMigrators() + .stream() + .filter(modelMigrator -> modelMigrator.config().equals(modelMigratorConfig)) + .map(modelMigrator -> (MmT) modelMigrator) + .findFirst(); + } + + /** + * Migrates a pipeline element instance based on the provided {@link MigrationRequest}. + * The outcome of the migration is described in {@link MigrationResult}. + * The result is always part of the response. + * Independent, of the migration outcome, the returned response always has OK as status code. + * It is the responsibility of the recipient to interpret the migration result and act accordingly. + * @param migrationRequest Request that contains both the pipeline element to be migrated and the migration config. + * @return A response with status code ok, that contains a migration result reflecting the outcome of the operation. + */ + protected MigrationResult handleMigration(MigrationRequest migrationRequest) { + + var pipelineElementDescription = migrationRequest.migrationElement(); + var migrationConfig = migrationRequest.modelMigratorConfig(); + + LOG.info("Received migration request for pipeline element '{}' to migrate from version {} to {}", + pipelineElementDescription.getElementId(), + migrationConfig.fromVersion(), + migrationConfig.toVersion() + ); + + var migratorOptional = getMigrator(migrationConfig); + + if (migratorOptional.isPresent()) { + LOG.info("Migrator found for request, starting migration..."); + return executeMigration(migratorOptional.get(), pipelineElementDescription); + } + LOG.error("Migrator for migration config {} could not be found. Migration is cancelled.", migrationConfig); + return MigrationResult.failure( + pipelineElementDescription, + String.format( + "The given migration config '%s' could not be mapped to a registered migrator.", + migrationConfig + ) + ); + } + + /** + * Executes the migration for the given pipeline element based on the given migrator. + * @param migrator migrator that executes the migration + * @param pipelineElementDescription pipeline element to be migrated + * @return the migration result containing either the migrated element or the original one in case of a failure + */ + protected MigrationResult executeMigration( + MmT migrator, + T pipelineElementDescription + ) { + + var extractor = getPropertyExtractor(pipelineElementDescription); + + try { + var result = migrator.migrate(pipelineElementDescription, extractor); + + if (result.success()) { + LOG.info("Migration successfully finished."); + + // Since adapter migration was successful, version can be adapted to the target version. + // this step is explicitly performed here and not left to the migration itself to + // prevent leaving this step out + var migratedProcessor = result.element(); + migratedProcessor.setVersion(migrator.config().toVersion()); + return MigrationResult.success(migratedProcessor); + + } else { + LOG.error("Migration failed with the following reason: {}", result.message()); + // The failed migration is documented in the MigrationResult + // The core is expected to handle the response accordingly, so we can safely return a positive status code + return result; + } + } catch (RuntimeException e) { + LOG.error("An unexpected exception caused the migration to fail - " + + "sending exception report in migration result"); + return MigrationResult.failure( + pipelineElementDescription, + String.format( + "Migration failed due to an unexpected exception: %s", + StringUtils.join(e.getStackTrace(), "\n") + ) + ); + } + } + + protected abstract ExT getPropertyExtractor(T pipelineElementDescription); +} diff --git a/streampipes-rest-extensions/src/test/java/org/apache/streampipes/rest/extensions/migration/MigrateExtensionsResourceTest.java b/streampipes-rest-extensions/src/test/java/org/apache/streampipes/rest/extensions/migration/MigrateExtensionsResourceTest.java new file mode 100644 index 0000000000..c6e56e0f54 --- /dev/null +++ b/streampipes-rest-extensions/src/test/java/org/apache/streampipes/rest/extensions/migration/MigrateExtensionsResourceTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.streampipes.rest.extensions.migration; + +import org.apache.streampipes.extensions.api.extractor.IDataProcessorParameterExtractor; +import org.apache.streampipes.extensions.api.migration.IDataProcessorMigrator; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.migration.MigrationResult; +import org.apache.streampipes.model.migration.ModelMigratorConfig; +import org.apache.streampipes.sdk.StaticProperties; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.utils.Datatypes; + +import org.junit.Test; + +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class MigrateExtensionsResourceTest { + + @Test + public void executeMigration() { + var migrationsResource = new DataProcessorMigrationResource(); + + var migrator = new IDataProcessorMigrator() { + @Override + public ModelMigratorConfig config() { + return new ModelMigratorConfig("app-id", SpServiceTagPrefix.DATA_PROCESSOR, 0, 1); + } + + @Override + public MigrationResult migrate( + DataProcessorInvocation element, + IDataProcessorParameterExtractor extractor + ) throws RuntimeException { + var properties = element.getStaticProperties(); + properties.add( + StaticProperties.freeTextProperty(Labels.empty(), Datatypes.String) + ); + element.setStaticProperties(properties); + return MigrationResult.success(element); + } + }; + + var dataProcessor = new DataProcessorInvocation(); + dataProcessor.setStaticProperties(new ArrayList<>()); + + var result = migrationsResource.executeMigration(migrator, dataProcessor); + + assertTrue(result.success()); + assertEquals("SUCCESS", result.message()); + assertEquals(1, result.element().getVersion()); + assertEquals(1, result.element().getStaticProperties().size()); + + } + + @Test + public void executeMigrationWithFailure() { + var migrationsResource = new DataProcessorMigrationResource(); + + var migrator = new IDataProcessorMigrator() { + @Override + public ModelMigratorConfig config() { + return new ModelMigratorConfig("app-id", SpServiceTagPrefix.DATA_PROCESSOR, 0, 1); + } + + @Override + public MigrationResult migrate( + DataProcessorInvocation element, + IDataProcessorParameterExtractor extractor + ) throws RuntimeException { + return MigrationResult.failure(element, "This should fail"); + } + }; + + var dataProcessor = new DataProcessorInvocation(); + + var result = migrationsResource.executeMigration(migrator, dataProcessor); + + assertFalse(result.success()); + assertEquals("This should fail", result.message()); + assertEquals(0, result.element().getVersion()); + } + + @Test + public void executeMigrationWithUnknownFailure() { + var migrationsResource = new DataProcessorMigrationResource(); + + var migrator = new IDataProcessorMigrator() { + @Override + public ModelMigratorConfig config() { + return new ModelMigratorConfig("app-id", SpServiceTagPrefix.DATA_PROCESSOR, 0, 1); + } + + @Override + public MigrationResult migrate( + DataProcessorInvocation element, + IDataProcessorParameterExtractor extractor + ) throws RuntimeException { + throw new NullPointerException(); + } + }; + + var dataProcessor = new DataProcessorInvocation(); + + var result = migrationsResource.executeMigration(migrator, dataProcessor); + + assertFalse(result.success()); + assertTrue(result.message().startsWith("Migration failed due to an unexpected exception:")); + assertEquals(0, result.element().getVersion()); + } +} diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java new file mode 100644 index 0000000000..13c87875fd --- /dev/null +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.rest.impl.admin; + +import org.apache.streampipes.connect.management.management.AdapterMigrationManager; +import org.apache.streampipes.manager.migration.PipelineElementMigrationManager; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; +import org.apache.streampipes.model.migration.ModelMigratorConfig; +import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource; +import org.apache.streampipes.rest.security.AuthConstants; +import org.apache.streampipes.storage.api.CRUDStorage; +import org.apache.streampipes.storage.api.IAdapterStorage; +import org.apache.streampipes.storage.api.IDataProcessorStorage; +import org.apache.streampipes.storage.api.IDataSinkStorage; +import org.apache.streampipes.storage.api.IPipelineStorage; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.enums.ParameterIn; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import org.apache.http.HttpStatus; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.stereotype.Component; + +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; + +import java.util.List; + +@Path("v2/migrations") +@Component +@PreAuthorize(AuthConstants.IS_ADMIN_ROLE) +public class MigrationResource extends AbstractAuthGuardedRestResource { + + private final CRUDStorage extensionsServiceStorage = + getNoSqlStorage().getExtensionsServiceStorage(); + + private final IAdapterStorage adapterStorage = getNoSqlStorage().getAdapterInstanceStorage(); + + private final IDataProcessorStorage dataProcessorStorage = getNoSqlStorage().getDataProcessorStorage(); + + private final IDataSinkStorage dataSinkStorage = getNoSqlStorage().getDataSinkStorage(); + private final IPipelineStorage pipelineStorage = getNoSqlStorage().getPipelineStorageAPI(); + + @POST + @Path("{serviceId}") + @Consumes(MediaType.APPLICATION_JSON) + @Operation( + summary = "Migrate adapters and pipeline elements based on migration configs", tags = {"Core", "Migration"}, + responses = { + @ApiResponse( + responseCode = "" + HttpStatus.SC_OK, + description = "All provided migrations are handled. If an error appeared, " + + "the corresponding actions are taken.") + } + ) + public Response performMigrations( + @Parameter( + in = ParameterIn.PATH, + description = "the id of the extensions service that requests migrations", + required = true + ) + @PathParam("serviceId") String serviceId, + @Parameter( + description = "list of configs (ModelMigratorConfig) that describe the requested migrations", + required = true + ) + List migrationConfigs) { + + var extensionsServiceConfig = extensionsServiceStorage.getElementById(serviceId); + var adapterMigrations = filterConfigs(migrationConfigs, List.of(SpServiceTagPrefix.ADAPTER)); + var pipelineElementMigrations = filterConfigs( + migrationConfigs, + List.of(SpServiceTagPrefix.DATA_PROCESSOR, SpServiceTagPrefix.DATA_SINK) + ); + + new AdapterMigrationManager(adapterStorage).handleMigrations(extensionsServiceConfig, adapterMigrations); + new PipelineElementMigrationManager( + pipelineStorage, + dataProcessorStorage, + dataSinkStorage) + .handleMigrations(extensionsServiceConfig, pipelineElementMigrations); + return ok(); + } + + private List filterConfigs(List migrationConfigs, + List modelTypes) { + return migrationConfigs + .stream() + .filter(config -> modelTypes.stream().anyMatch(modelType -> modelType == config.modelType())) + .toList(); + } +} diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java index 2a91e74ae0..8db25a0578 100644 --- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java +++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractProcessingElementBuilder.java @@ -264,6 +264,10 @@ public K setStream2() { stream2 = true; return me(); } + public K withVersion(int version) { + this.elementDescription.setVersion(version); + return me(); + } @Override diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ProcessingElementBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ProcessingElementBuilder.java index 057820a47a..c244a2cf11 100644 --- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ProcessingElementBuilder.java +++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/ProcessingElementBuilder.java @@ -35,43 +35,88 @@ public class ProcessingElementBuilder private List outputStrategies; - private ProcessingElementBuilder(String id, String name, String description) { + private ProcessingElementBuilder(String id, String name, String description, int version) { super(id, name, description, new DataProcessorDescription()); this.outputStrategies = new ArrayList<>(); + this.elementDescription.setVersion(version); } - private ProcessingElementBuilder(String id) { + private ProcessingElementBuilder(String id, int version) { super(id, new DataProcessorDescription()); this.outputStrategies = new ArrayList<>(); + this.elementDescription.setVersion(version); } /** * Creates a new processing element using the builder pattern. - * + * @deprecated + * This method is no longer recommend since we rely on a version for migration purposes. + *

Please adopt {@link #create(String, String, String, int)} instead. * @param id A unique identifier of the new element, e.g., com.mycompany.processor.mynewdataprocessor * @param label A human-readable name of the element. * Will later be shown as the element name in the StreamPipes UI. * @param description A human-readable description of the element. */ + @Deprecated(since = "0.93.0", forRemoval = true) public static ProcessingElementBuilder create(String id, String label, String description) { - return new ProcessingElementBuilder(id, label, description); + return new ProcessingElementBuilder(id, label, description, 0); } + /** + * Creates a new processing element based on a label using the builder pattern. + * @param id A unique identifier of the new element, e.g., com.mycompany.processor.mynewdataprocessor + * @param label A human-readable name of the element. + * Will later be shown as the element name in the StreamPipes UI. + * @param description A human-readable description of the element. + * @param version version of the processing element for migration purposes. Should be 0 in standard cases. + * Only in case there exist migrations for the specific element the version needs to be aligned. + * @return Builder for the pre-defined processing element. + */ + public static ProcessingElementBuilder create(String id, String label, String description, int version) { + return new ProcessingElementBuilder(id, label, description, version); + } + + /** + * Creates a new processing element based on a label using the builder pattern. + * @deprecated + * This method is no longer recommend since we rely on a version for migration purposes. + *

Please adopt {@link #create(String, String, String, int)} instead. + */ + @Deprecated(since = "0.93.0", forRemoval = true) public static ProcessingElementBuilder create(Label label) { - return new ProcessingElementBuilder(label.getInternalId(), label.getLabel(), label.getDescription()); + return new ProcessingElementBuilder(label.getInternalId(), label.getLabel(), label.getDescription(), 0); } /** - * Creates a new processing element using the builder pattern. If no label and description is + * Creates a new processing element using the builder pattern. + * @deprecated + * This method is no longer recommend since we rely on a version for migration purposes. + *

Please adopt {@link #create(String, int)} instead. + * If no label and description is * given * for an element, * {@link org.apache.streampipes.sdk.builder.AbstractProcessingElementBuilder#withLocales(Locales...)} * must be called. * * @param id A unique identifier of the new element, e.g., com.mycompany.sink.mynewdatasink + * */ + @Deprecated(since = "0.93.0", forRemoval = true) public static ProcessingElementBuilder create(String id) { - return new ProcessingElementBuilder(id); + return new ProcessingElementBuilder(id, 0); + } + + /** + * Creates a new processing element using the builder pattern. If no label and description is + * given + * for an element, + * {@link org.apache.streampipes.sdk.builder.AbstractProcessingElementBuilder#withLocales(Locales...)} + * must be called. + * + * @param id A unique identifier of the new element, e.g., com.mycompany.sink.mynewdatasink + */ + public static ProcessingElementBuilder create(String id, int version) { + return new ProcessingElementBuilder(id, version); } /** diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/AdapterConfigurationBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/AdapterConfigurationBuilder.java index 3d626e2e59..fcebdbfaf6 100644 --- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/AdapterConfigurationBuilder.java +++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/AdapterConfigurationBuilder.java @@ -38,15 +38,38 @@ public class AdapterConfigurationBuilder extends private final Supplier supplier; protected AdapterConfigurationBuilder(String appId, + int version, Supplier supplier) { - super(appId, new AdapterDescription()); + super(appId, new AdapterDescription(version)); supportedParsers = new ArrayList<>(); this.supplier = supplier; } + /** + * Creates a new adapter configuration using the builder pattern. + * @deprecated + * This method is no longer recommend since we rely on a version for migration purposes. + *

Please adopt {@link #create(String, int, Supplier)} instead. + * @param appId A unique identifier of the new adapter, e.g., com.mycompany.processor.mynewdataprocessor + * @param supplier instance of the adapter to be described + */ + @Deprecated(since = "0.93.0", forRemoval = true) public static AdapterConfigurationBuilder create(String appId, Supplier supplier) { - return new AdapterConfigurationBuilder(appId, supplier); + return new AdapterConfigurationBuilder(appId, 0, supplier); + } + + /** + * Creates a new adapter configuration using the builder pattern. + * @param appId A unique identifier of the new adapter, e.g., com.mycompany.processor.mynewdataprocessor + * @param supplier instance of the adapter to be described + * @param version version of the processing element for migration purposes. Should be 0 in standard cases. + * Only in case there exist migrations for the specific element the version needs to be aligned. + */ + public static AdapterConfigurationBuilder create(String appId, + int version, + Supplier supplier) { + return new AdapterConfigurationBuilder(appId, version, supplier); } @Override @@ -87,5 +110,8 @@ public AdapterConfigurationBuilder withCategory(AdapterType... categories) { return me(); } - + public AdapterConfigurationBuilder withVersion(int version) { + this.elementDescription.setVersion(version); + return this; + } } diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java index 3ec7ee6299..a86a1ad587 100644 --- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java +++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java @@ -64,7 +64,7 @@ import java.util.stream.Collectors; public abstract class AbstractParameterExtractor - implements IParameterExtractor { + implements IParameterExtractor { protected T sepaElement; private TypeParser typeParser; diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java index db54d3c3d0..cc628981b8 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java @@ -62,6 +62,7 @@ import org.apache.streampipes.rest.impl.admin.ExtensionsServiceEndpointResource; import org.apache.streampipes.rest.impl.admin.GeneralConfigurationResource; import org.apache.streampipes.rest.impl.admin.MessagingConfigurationResource; +import org.apache.streampipes.rest.impl.admin.MigrationResource; import org.apache.streampipes.rest.impl.admin.PermissionResource; import org.apache.streampipes.rest.impl.admin.PipelineElementImport; import org.apache.streampipes.rest.impl.admin.ServiceConfigurationResource; @@ -130,6 +131,7 @@ public Set> getClassesToRegister() { LabelResource.class, MeasurementUnitResource.class, MessagingConfigurationResource.class, + MigrationResource.class, Notification.class, OntologyMeasurementUnit.class, PermissionResource.class, diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterMigration.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterMigration.java index 7e8d29824f..94d2d7eecd 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterMigration.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterMigration.java @@ -31,11 +31,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import static org.apache.streampipes.model.connect.adapter.migration.utils.AdapterModels.GENERIC_STREAM; +import static org.apache.streampipes.model.connect.adapter.migration.utils.AdapterModels.isSetAdapter; public class AdapterMigration implements Migration { @@ -45,7 +45,7 @@ public class AdapterMigration implements Migration { private final CouchDbClient adapterInstanceClient; private final CouchDbClient adapterDescriptionClient; private final List adaptersToMigrate; - private final List adapterDescriptionsToDelete; + private final List adapterDescriptionsToMigrate; private final MigrationHelpers helpers; @@ -54,7 +54,7 @@ public AdapterMigration() { this.adapterInstanceClient = Utils.getCouchDbAdapterInstanceClient(); this.adapterDescriptionClient = Utils.getCouchDbAdapterDescriptionClient(); this.adaptersToMigrate = new ArrayList<>(); - this.adapterDescriptionsToDelete = new ArrayList<>(); + this.adapterDescriptionsToMigrate = new ArrayList<>(); this.helpers = new MigrationHelpers(); } @@ -64,9 +64,9 @@ public boolean shouldExecute() { var adapterDescriptionUri = getAllDocsUri(adapterDescriptionClient); findDocsToMigrate(adapterInstanceClient, adapterInstanceUri, adaptersToMigrate); - findDocsToMigrate(adapterDescriptionClient, adapterDescriptionUri, adapterDescriptionsToDelete); + findDocsToMigrate(adapterDescriptionClient, adapterDescriptionUri, adapterDescriptionsToMigrate); - return adaptersToMigrate.size() > 0 || adapterDescriptionsToDelete.size() > 0; + return !adaptersToMigrate.isEmpty() || !adapterDescriptionsToMigrate.isEmpty(); } private void findDocsToMigrate(CouchDbClient adapterClient, @@ -86,16 +86,19 @@ private void findDocsToMigrate(CouchDbClient adapterClient, } @Override - public void executeMigration() throws IOException { + public void executeMigration() { var adapterInstanceBackupClient = Utils.getCouchDbAdapterInstanceBackupClient(); - LOG.info("Deleting {} adapter descriptions, which will be regenerated after migration", - adapterDescriptionsToDelete.size()); - - adapterDescriptionsToDelete.forEach(ad -> { - String docId = helpers.getDocId(ad); - String rev = helpers.getRev(ad); - adapterDescriptionClient.remove(docId, rev); + adapterDescriptionsToMigrate.forEach(ad -> { + var adapterType = ad.get("type").getAsString(); + var appId = ad.get("appId"); + if (isSetAdapter(adapterType)) { + LOG.info("Deleting adapter description data set {}", appId); + adapterDescriptionClient.remove(helpers.getDocId(ad), helpers.getRev(ad)); + } else { + LOG.info("Migrating adapter description {} to new adapter model", appId); + getAdapterMigrator(adapterType).migrate(adapterDescriptionClient, ad); + } }); LOG.info("Migrating {} adapter models", adaptersToMigrate.size()); diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java index cbd3ffa8bf..cbdb3992f9 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java @@ -17,6 +17,9 @@ */ package org.apache.streampipes.service.extensions; +import org.apache.streampipes.client.StreamPipesClient; +import org.apache.streampipes.extensions.api.migration.IModelMigrator; +import org.apache.streampipes.extensions.management.client.StreamPipesClientResolver; import org.apache.streampipes.extensions.management.init.DeclarersSingleton; import org.apache.streampipes.extensions.management.model.SpServiceDefinition; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; @@ -45,6 +48,13 @@ public void onExit() { @Override public void afterServiceRegistered(SpServiceDefinition serviceDef) { + StreamPipesClient client = new StreamPipesClientResolver().makeStreamPipesClientInstance(); + + // register all migrations at StreamPipes Core + var migrationConfigs = serviceDef.getMigrators().stream().map(IModelMigrator::config).toList(); + client.adminApi().registerMigrations(migrationConfigs, serviceId()); + + // initialize all function instances StreamPipesFunctionHandler.INSTANCE.initializeFunctions(serviceDef.getServiceGroup()); } diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsResourceConfig.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsResourceConfig.java index e85be4593f..574d2d8e30 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsResourceConfig.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsResourceConfig.java @@ -25,6 +25,9 @@ import org.apache.streampipes.rest.extensions.connect.GuessResource; import org.apache.streampipes.rest.extensions.connect.HttpServerAdapterResource; import org.apache.streampipes.rest.extensions.connect.RuntimeResolvableResource; +import org.apache.streampipes.rest.extensions.migration.AdapterMigrationResource; +import org.apache.streampipes.rest.extensions.migration.DataProcessorMigrationResource; +import org.apache.streampipes.rest.extensions.migration.DataSinkMigrationResource; import org.apache.streampipes.rest.extensions.monitoring.MonitoringResource; import org.apache.streampipes.rest.extensions.pe.DataProcessorPipelineElementResource; import org.apache.streampipes.rest.extensions.pe.DataSinkPipelineElementResource; @@ -46,22 +49,23 @@ public class ExtensionsResourceConfig extends BaseResourceConfig { @Override public Set> getClassesToRegister() { return Set.of( - GuessResource.class, - RuntimeResolvableResource.class, - AdapterWorkerResource.class, - MultiPartFeature.class, AdapterAssetResource.class, AdapterDescriptionResource.class, - HttpServerAdapterResource.class, - - DataSinkPipelineElementResource.class, + AdapterMigrationResource.class, + AdapterWorkerResource.class, + DataProcessorMigrationResource.class, DataProcessorPipelineElementResource.class, + DataSinkMigrationResource.class, + DataSinkPipelineElementResource.class, DataStreamPipelineElementResource.class, - WelcomePage.class, - - ServiceHealthResource.class, + GuessResource.class, + HttpServerAdapterResource.class, JacksonSerializationProvider.class, - MonitoringResource.class + MonitoringResource.class, + MultiPartFeature.class, + RuntimeResolvableResource.class, + ServiceHealthResource.class, + WelcomePage.class ); } diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IAdapterStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IAdapterStorage.java index ed88faf134..651d7274ba 100644 --- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IAdapterStorage.java +++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IAdapterStorage.java @@ -33,4 +33,8 @@ public interface IAdapterStorage extends CRUDStorage AdapterDescription getAdapter(String adapterId); void deleteAdapter(String adapterId); + + AdapterDescription getFirstAdapterByAppId(String appId); + + List getAdaptersByAppId(String appId); } diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java index dfc02b8649..c9f4d6630c 100644 --- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java +++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java @@ -19,7 +19,11 @@ import org.apache.streampipes.model.graph.DataProcessorDescription; +import java.util.List; + public interface IDataProcessorStorage extends CRUDStorage { - DataProcessorDescription getDataProcessorByAppId(String appId); + DataProcessorDescription getFirstDataProcessorByAppId(String appId); + + List getDataProcessorsByAppId(String appId); } diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java index 7f67c07304..e740816d70 100644 --- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java +++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java @@ -19,7 +19,11 @@ import org.apache.streampipes.model.graph.DataSinkDescription; +import java.util.List; + public interface IDataSinkStorage extends CRUDStorage { - DataSinkDescription getDataSinkByAppId(String appId); + DataSinkDescription getFirstDataSinkByAppId(String appId); + + List getDataSinksByAppId(String appId); } diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterDescriptionStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterDescriptionStorageImpl.java index 1fafc6e045..9a771a408b 100644 --- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterDescriptionStorageImpl.java +++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterDescriptionStorageImpl.java @@ -26,6 +26,7 @@ import org.apache.streampipes.storage.couchdb.utils.Utils; import java.util.List; +import java.util.NoSuchElementException; import java.util.Optional; public class AdapterDescriptionStorageImpl extends AbstractDao implements IAdapterStorage { @@ -64,6 +65,23 @@ public void deleteAdapter(String adapterId) { } + @Override + public AdapterDescription getFirstAdapterByAppId(String appId) { + return getAll() + .stream() + .filter(p -> p.getAppId().equals(appId)) + .findFirst() + .orElseThrow(NoSuchElementException::new); + } + + @Override + public List getAdaptersByAppId(String appId) { + return getAll() + .stream() + .filter(p -> p.getAppId().equals(appId)) + .toList(); + } + @Override public List getAll() { return findAll(); diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterInstanceStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterInstanceStorageImpl.java index 9dbdcab883..e8cc511374 100644 --- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterInstanceStorageImpl.java +++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/AdapterInstanceStorageImpl.java @@ -26,6 +26,7 @@ import org.apache.streampipes.storage.couchdb.utils.Utils; import java.util.List; +import java.util.NoSuchElementException; import java.util.Optional; public class AdapterInstanceStorageImpl extends AbstractDao implements IAdapterStorage { @@ -52,7 +53,7 @@ public void updateAdapter(AdapterDescription adapter) { @Override public AdapterDescription getAdapter(String adapterId) { DbCommand, AdapterDescription> cmd = - new FindCommand<>(couchDbClientSupplier, adapterId, AdapterDescription.class); + new FindCommand<>(couchDbClientSupplier, adapterId, AdapterDescription.class); return cmd.execute().orElse(null); } @@ -79,6 +80,23 @@ public AdapterDescription getElementById(String id) { return findWithNullIfEmpty(id); } + @Override + public AdapterDescription getFirstAdapterByAppId(String appId) { + return getAll() + .stream() + .filter(p -> p.getAppId().equals(appId)) + .findFirst() + .orElseThrow(NoSuchElementException::new); + } + + @Override + public List getAdaptersByAppId(String appId) { + return getAll() + .stream() + .filter(p -> p.getAppId().equals(appId)) + .toList(); + } + @Override public AdapterDescription updateElement(AdapterDescription element) { update(element); diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java index c046e314b0..cf013b56ea 100644 --- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java +++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java @@ -60,7 +60,7 @@ public void deleteElement(DataProcessorDescription element) { } @Override - public DataProcessorDescription getDataProcessorByAppId(String appId) { + public DataProcessorDescription getFirstDataProcessorByAppId(String appId) { return getAll() .stream() .filter(p -> p.getAppId().equals(appId)) @@ -68,6 +68,14 @@ public DataProcessorDescription getDataProcessorByAppId(String appId) { .orElseThrow(NoSuchElementException::new); } + @Override + public List getDataProcessorsByAppId(String appId) { + return getAll() + .stream() + .filter(p -> p.getAppId().equals(appId)) + .toList(); + } + private String getCurrentRev(String elementId) { return find(elementId).get().getRev(); } diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java index 5f80ea77fb..f15149c1b1 100644 --- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java +++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java @@ -59,7 +59,7 @@ public void deleteElement(DataSinkDescription element) { } @Override - public DataSinkDescription getDataSinkByAppId(String appId) { + public DataSinkDescription getFirstDataSinkByAppId(String appId) { return getAll() .stream() .filter(s -> s.getAppId().equals(appId)) @@ -67,6 +67,14 @@ public DataSinkDescription getDataSinkByAppId(String appId) { .orElseThrow(IllegalArgumentException::new); } + @Override + public List getDataSinksByAppId(String appId) { + return getAll() + .stream() + .filter(s -> s.getAppId().equals(appId)) + .toList(); + } + private String getCurrentRev(String elementId) { return find(elementId).get().getRev(); } diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/PipelineElementDescriptionStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/PipelineElementDescriptionStorageImpl.java index 92a7d5ad7b..0b8e26ca6a 100644 --- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/PipelineElementDescriptionStorageImpl.java +++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/PipelineElementDescriptionStorageImpl.java @@ -80,7 +80,7 @@ public DataProcessorDescription getDataProcessorById(String rdfId) { @Override public DataProcessorDescription getDataProcessorByAppId(String appId) { - return this.dataProcessorStorage.getDataProcessorByAppId(appId); + return this.dataProcessorStorage.getFirstDataProcessorByAppId(appId); } @Override @@ -90,7 +90,7 @@ public DataSinkDescription getDataSinkById(String rdfId) { @Override public DataSinkDescription getDataSinkByAppId(String appId) { - return this.dataSinkStorage.getDataSinkByAppId(appId); + return this.dataSinkStorage.getFirstDataSinkByAppId(appId); } @Override @@ -100,7 +100,7 @@ public AdapterDescription getAdapterById(String elementId) { @Override public AdapterDescription getAdapterByAppId(String appId) { - throw new IllegalArgumentException("Not yet implemented"); + return this.adapterStorage.getFirstAdapterByAppId(appId); } @Override diff --git a/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java b/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java index d24e9a65af..9e9b0578ce 100644 --- a/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java +++ b/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java @@ -43,7 +43,7 @@ public abstract class DistributedRuntime< PeT extends IStreamPipesPipelineElement, IvT extends InvocableStreamPipesEntity, RcT extends RuntimeContext, - ExT extends IParameterExtractor, + ExT extends IParameterExtractor, PepT extends IPipelineElementParameters> extends PipelineElementRuntime implements IStreamPipesRuntime { diff --git a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java index af3afce599..5b77768f4d 100644 --- a/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java +++ b/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java @@ -69,7 +69,7 @@ public abstract class FlinkRuntime< PeT extends IStreamPipesPipelineElement, IvT extends InvocableStreamPipesEntity, RcT extends RuntimeContext, - ExT extends IParameterExtractor, + ExT extends IParameterExtractor, PepT extends IPipelineElementParameters, FpT extends IFlinkProgram> extends DistributedRuntime diff --git a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsRuntime.java b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsRuntime.java index d9f22287ca..6ba4311e04 100644 --- a/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsRuntime.java +++ b/streampipes-wrapper-kafka-streams/src/main/java/org/apache/streampipes/wrapper/kafka/KafkaStreamsRuntime.java @@ -39,7 +39,7 @@ public abstract class KafkaStreamsRuntime< PeT extends IStreamPipesPipelineElement, IvT extends InvocableStreamPipesEntity, RcT extends RuntimeContext, - ExT extends IParameterExtractor, + ExT extends IParameterExtractor, PepT extends IPipelineElementParameters> extends DistributedRuntime implements IStreamPipesRuntime { diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java index d35317cda5..a8e48596c0 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java @@ -43,7 +43,7 @@ public abstract class StandalonePipelineElementRuntime< PeT extends IStreamPipesPipelineElement, IvT extends InvocableStreamPipesEntity, RcT extends RuntimeContext, - ExT extends IParameterExtractor, + ExT extends IParameterExtractor, PepT extends IPipelineElementParameters> extends PipelineElementRuntime implements RawDataProcessor { diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/PipelineElementParameters.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/PipelineElementParameters.java index 607b7f437b..6fcd1cfc11 100644 --- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/PipelineElementParameters.java +++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/params/PipelineElementParameters.java @@ -31,7 +31,7 @@ import java.util.List; import java.util.Map; -public class PipelineElementParameters> +public class PipelineElementParameters implements IPipelineElementParameters { private final List inputStreamParams; diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/PipelineElementRuntime.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/PipelineElementRuntime.java index 84d66579d0..0d33e47457 100644 --- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/PipelineElementRuntime.java +++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/PipelineElementRuntime.java @@ -32,7 +32,7 @@ public abstract class PipelineElementRuntime< PeT extends IStreamPipesPipelineElement, IvT extends InvocableStreamPipesEntity, RcT extends RuntimeContext, - ExT extends IParameterExtractor, + ExT extends IParameterExtractor, PepT extends IPipelineElementParameters> implements IStreamPipesRuntime { diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model-client.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model-client.ts index 0f076fb290..fe8e13571f 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model-client.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model-client.ts @@ -19,7 +19,7 @@ /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.1.1185 on 2023-05-15 15:33:05. +// Generated using typescript-generator version 3.2.1263 on 2023-10-27 10:44:54. export class ExtensionsServiceEndpointItem { appId: string; diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts index 1d04b839d6..47f06b81f2 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts @@ -16,18 +16,18 @@ * specific language governing permissions and limitations * under the License. */ - /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.2.1263 on 2023-10-24 20:02:02. +// Generated using typescript-generator version 3.2.1263 on 2023-10-27 10:43:45. export class NamedStreamPipesEntity { '@class': - | 'org.apache.streampipes.model.connect.adapter.AdapterDescription' | 'org.apache.streampipes.model.connect.grounding.ProtocolDescription' | 'org.apache.streampipes.model.template.PipelineTemplateDescription' | 'org.apache.streampipes.model.SpDataStream' + | 'org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity' + | 'org.apache.streampipes.model.connect.adapter.AdapterDescription' | 'org.apache.streampipes.model.base.InvocableStreamPipesEntity' | 'org.apache.streampipes.model.graph.DataProcessorInvocation' | 'org.apache.streampipes.model.graph.DataSinkInvocation'; @@ -82,7 +82,30 @@ export class NamedStreamPipesEntity { } } -export class AdapterDescription extends NamedStreamPipesEntity { +export class VersionedNamedStreamPipesEntity extends NamedStreamPipesEntity { + '@class': + | 'org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity' + | 'org.apache.streampipes.model.connect.adapter.AdapterDescription' + | 'org.apache.streampipes.model.base.InvocableStreamPipesEntity' + | 'org.apache.streampipes.model.graph.DataProcessorInvocation' + | 'org.apache.streampipes.model.graph.DataSinkInvocation'; + 'version': number; + + static 'fromData'( + data: VersionedNamedStreamPipesEntity, + target?: VersionedNamedStreamPipesEntity, + ): VersionedNamedStreamPipesEntity { + if (!data) { + return data; + } + const instance = target || new VersionedNamedStreamPipesEntity(); + super.fromData(data, instance); + instance.version = data.version; + return instance; + } +} + +export class AdapterDescription extends VersionedNamedStreamPipesEntity { '@class': 'org.apache.streampipes.model.connect.adapter.AdapterDescription'; 'category': string[]; 'config': StaticPropertyUnion[]; @@ -1159,7 +1182,7 @@ export class DataLakeMeasure { } export class InvocableStreamPipesEntity - extends NamedStreamPipesEntity + extends VersionedNamedStreamPipesEntity implements EndpointSelectable { '@class': @@ -1173,6 +1196,7 @@ export class InvocableStreamPipesEntity 'detachPath': string; 'inputStreams': SpDataStream[]; 'selectedEndpointUrl': string; + 'serviceTagPrefix': SpServiceTagPrefix; 'staticProperties': StaticPropertyUnion[]; 'statusInfoSettings': ElementStatusInfoSettings; 'streamRequirements': SpDataStream[]; @@ -1197,6 +1221,7 @@ export class InvocableStreamPipesEntity data.inputStreams, ); instance.selectedEndpointUrl = data.selectedEndpointUrl; + instance.serviceTagPrefix = data.serviceTagPrefix; instance.staticProperties = __getCopyArrayFn( StaticProperty.fromDataUnion, )(data.staticProperties);