Skip to content

Commit

Permalink
implement migration for adapter and pipeline element configurations (#…
Browse files Browse the repository at this point in the history
…2077)

* feat(#2002): Align adapter registration with other pipeline elements

* Fix checkstyle

* style: remove trailing whitespace

* Add initial draft of migration concept

* refactor: fix logger configuration

* refactor: extend storage implementations by method to get all instances by the app id

* refactor: update generated typescript model

* feat: add version to models & builders

* refactor: implement string representation of Notification

* feat: implement data model for migration

* feat: register migrations at service

* Revert "refactor: implement string representation of Notification"

This reverts commit 646e792.

* refactor: use correct Notification class

* feat: introduce migrate extensions resource

* feat: introduce migrate adapter endpoint

* feat: implement adapter migration at the core

* remove data lake migration

* ensure order & uniqueness of migrations

* remove redundant exception

* remove redundant exception

* add tests

* remove outdated test

* refactor: separate adapter migration from pipeline element migrations

* refactor: move MigrationResult to StreamPipes model

* refactor: migration result

* refactor: introduce generic migration request

* feat: send migration requests to core

* feat: process migrations at core

* refactor: remove legacy generic

* refactor: introduce versioned StreamPipes entity

* refactor: remove deprecated generic type

* feat: implement migration for processing elements & data sinks

* docs: add endpoint documentation

* refactor: move to correct module

* feature: add update for descriptions

* refactor: adapt ProcessingElementBuilder to be capable of versions

* refactor: minor improvements

* style: fix checkstyle issues

* refactor: remove legacy type definition

* refactor: update generated TS models

* fix: add missing license header

* Fix adapter model migration, add OPC adapter migration as sample

* Fix typo

* Improvements to extension migration (#2101)

* Extract MigrationResource logic into smaller units

* Use single request for submitting migrations from extensions to core

* refactor: remove dead code

* style: change formatting

---------

Co-authored-by: bossenti <[email protected]>

* refactor: remove redundant assignments

* refactor: provide migrators as interface

---------

Co-authored-by: Dominik Riemer <[email protected]>
  • Loading branch information
bossenti and dominikriemer committed Oct 30, 2023
1 parent 1a7a17e commit a34a4dd
Show file tree
Hide file tree
Showing 76 changed files with 2,225 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.streampipes.model.extensions.configuration.SpServiceConfiguration;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.function.FunctionDefinition;
import org.apache.streampipes.model.migration.ModelMigratorConfig;

import java.util.List;

Expand All @@ -39,5 +40,7 @@ public interface IAdminApi {

void deregisterFunction(String functionId);

void registerMigrations(List<ModelMigratorConfig> migrationConfigs, String serviceId);

MessagingSettings getMessagingSettings();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.function.FunctionDefinition;
import org.apache.streampipes.model.message.SuccessMessage;
import org.apache.streampipes.model.migration.ModelMigratorConfig;

import java.util.List;

Expand Down Expand Up @@ -66,6 +67,15 @@ public void deregisterFunction(String functionId) {
delete(getDeleteFunctionPath(functionId), SuccessMessage.class);
}

/**
* Register migration configs {@link ModelMigratorConfig} at the StreamPipes Core service.
* @param migrationConfigs list of migration configs to be registered
*/
@Override
public void registerMigrations(List<ModelMigratorConfig> migrationConfigs, String serviceId) {
post(getMigrationPath().addToPath(serviceId), migrationConfigs);
}

@Override
public MessagingSettings getMessagingSettings() {
return getSingle(getMessagingSettingsPath(), MessagingSettings.class);
Expand Down Expand Up @@ -98,4 +108,10 @@ private StreamPipesApiPath getFunctionsPath() {
private StreamPipesApiPath getDeleteFunctionPath(String functionId) {
return getFunctionsPath().addToPath(functionId);
}

private StreamPipesApiPath getMigrationPath() {
return StreamPipesApiPath
.fromBaseApiPath()
.addToPath("migrations");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.connect.management.management;

import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.manager.migration.AbstractMigrationManager;
import org.apache.streampipes.manager.migration.IMigrationHandler;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.migration.ModelMigratorConfig;
import org.apache.streampipes.storage.api.IAdapterStorage;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class AdapterMigrationManager extends AbstractMigrationManager implements IMigrationHandler {

private static final Logger LOG = LoggerFactory.getLogger(AdapterMigrationManager.class);

private final IAdapterStorage adapterStorage;

public AdapterMigrationManager(IAdapterStorage adapterStorage) {
this.adapterStorage = adapterStorage;
}

@Override
public void handleMigrations(SpServiceRegistration extensionsServiceConfig,
List<ModelMigratorConfig> migrationConfigs) {

LOG.info("Received {} migrations from extension service {}.",
migrationConfigs.size(),
extensionsServiceConfig.getServiceUrl());
LOG.info("Updating adapter descriptions by replacement...");
updateDescriptions(migrationConfigs, extensionsServiceConfig.getServiceUrl());
LOG.info("Adapter descriptions are up to date.");

LOG.info("Checking migrations for existing adapters in StreamPipes Core ...");
for (var migrationConfig : migrationConfigs) {
LOG.info("Searching for assets of '{}'", migrationConfig.targetAppId());
LOG.debug("Searching for assets of '{}' with config {}", migrationConfig.targetAppId(), migrationConfig);
var adapterDescriptions = adapterStorage.getAdaptersByAppId(migrationConfig.targetAppId());
LOG.info("Found {} instances for appId '{}'", adapterDescriptions.size(), migrationConfig.targetAppId());
for (var adapterDescription : adapterDescriptions) {

var adapterVersion = adapterDescription.getVersion();

if (adapterVersion == migrationConfig.fromVersion()) {
LOG.info("Migration is required for adapter '{}'. Migrating from version '{}' to '{}' ...",
adapterDescription.getElementId(),
adapterVersion, migrationConfig.toVersion()
);

var migrationResult = performMigration(
adapterDescription,
migrationConfig,
String.format("%s/%s/adapter",
extensionsServiceConfig.getServiceUrl(),
MIGRATION_ENDPOINT
)
);

if (migrationResult.success()) {
LOG.info("Migration successfully performed by extensions service. Updating adapter description ...");
LOG.debug(
"Migration was performed by extensions service '{}'",
extensionsServiceConfig.getServiceUrl());

adapterStorage.updateAdapter(migrationResult.element());
LOG.info("Adapter description is updated - Migration successfully completed at Core.");
} else {
LOG.error("Migration failed with the following reason: {}", migrationResult.message());
LOG.error(
"Migration for adapter '{}' failed - Stopping adapter ...",
migrationResult.element().getElementId()
);
try {
WorkerRestClient.stopStreamAdapter(extensionsServiceConfig.getServiceUrl(), adapterDescription);
} catch (AdapterException e) {
LOG.error("Stopping adapter failed: {}", StringUtils.join(e.getStackTrace(), "\n"));
}
LOG.info("Adapter successfully stopped.");
}
} else {
LOG.info(
"Migration is not applicable for adapter '{}' because of a version mismatch - "
+ "adapter version: '{}', migration starts at: '{}'",
adapterDescription.getElementId(),
adapterVersion,
migrationConfig.fromVersion()
);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public static void stopStreamAdapter(String baseUrl,
AdapterDescription adapterStreamDescription) throws AdapterException {
String url = baseUrl + WorkerPaths.getStreamStopPath();

var ad =
getAdapterDescriptionById(new AdapterInstanceStorageImpl(), adapterStreamDescription.getElementId());
var ad = getAdapterDescriptionById(new AdapterInstanceStorageImpl(), adapterStreamDescription.getElementId());

stopAdapter(ad, url);
updateStreamAdapterStatus(adapterStreamDescription.getElementId(), false);
Expand All @@ -74,8 +73,8 @@ public static List<AdapterDescription> getAllRunningAdapterInstanceDescriptions(
try {
LOG.info("Requesting all running adapter description instances: " + url);
var responseString = ExtensionServiceExecutions
.extServiceGetRequest(url)
.execute().returnContent().asString();
.extServiceGetRequest(url)
.execute().returnContent().asString();

return JacksonSerializer.getObjectMapper().readValue(responseString, List.class);
} catch (IOException e) {
Expand All @@ -84,15 +83,15 @@ public static List<AdapterDescription> getAllRunningAdapterInstanceDescriptions(
}
}

public static void startAdapter(String url,
AdapterDescription ad) throws AdapterException {
private static void startAdapter(String url,
AdapterDescription ad) throws AdapterException {
LOG.info("Trying to start adapter on endpoint {} ", url);
triggerAdapterStateChange(ad, url, "started");
}


public static void stopAdapter(AdapterDescription ad,
String url) throws AdapterException {
private static void stopAdapter(AdapterDescription ad,
String url) throws AdapterException {

LOG.info("Trying to stop adapter on endpoint {} ", url);
triggerAdapterStateChange(ad, url, "stopped");
Expand Down Expand Up @@ -134,14 +133,14 @@ private static HttpResponse triggerPost(String url,
public static RuntimeOptionsResponse getConfiguration(String workerEndpoint,
String appId,
RuntimeOptionsRequest runtimeOptionsRequest)
throws AdapterException, SpConfigurationException {
throws AdapterException, SpConfigurationException {
String url = workerEndpoint + WorkerPaths.getRuntimeResolvablePath(appId);

try {
String payload = JacksonSerializer.getObjectMapper().writeValueAsString(runtimeOptionsRequest);
var response = ExtensionServiceExecutions.extServicePostRequest(url, payload)
.execute()
.returnResponse();
.execute()
.returnResponse();

String responseString = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);

Expand All @@ -163,9 +162,9 @@ public static String getAssets(String workerPath) throws AdapterException {

try {
return Request.Get(url)
.connectTimeout(1000)
.socketTimeout(100000)
.execute().returnContent().asString();
.connectTimeout(1000)
.socketTimeout(100000)
.execute().returnContent().asString();
} catch (IOException e) {
LOG.error(e.getMessage());
throw new AdapterException("Could not get assets endpoint: " + url);
Expand All @@ -178,9 +177,9 @@ public static byte[] getIconAsset(String baseUrl) throws AdapterException {

try {
byte[] responseString = Request.Get(url)
.connectTimeout(1000)
.socketTimeout(100000)
.execute().returnContent().asBytes();
.connectTimeout(1000)
.socketTimeout(100000)
.execute().returnContent().asBytes();
return responseString;
} catch (IOException e) {
LOG.error(e.getMessage());
Expand All @@ -193,9 +192,9 @@ public static String getDocumentationAsset(String baseUrl) throws AdapterExcepti

try {
return Request.Get(url)
.connectTimeout(1000)
.socketTimeout(100000)
.execute().returnContent().asString();
.connectTimeout(1000)
.socketTimeout(100000)
.execute().returnContent().asString();
} catch (IOException e) {
LOG.error(e.getMessage());
throw new AdapterException("Could not get documentation endpoint: " + url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@

package org.apache.streampipes.extensions.api.extractor;

import org.apache.streampipes.model.graph.DataProcessorInvocation;

import java.util.List;

public interface IDataProcessorParameterExtractor extends IParameterExtractor<DataProcessorInvocation> {
public interface IDataProcessorParameterExtractor extends IParameterExtractor {
String outputTopic();

List<String> outputKeySelectors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,5 @@

package org.apache.streampipes.extensions.api.extractor;

import org.apache.streampipes.model.graph.DataSinkInvocation;

public interface IDataSinkParameterExtractor extends IParameterExtractor<DataSinkInvocation> {
public interface IDataSinkParameterExtractor extends IParameterExtractor {
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.streampipes.extensions.api.extractor;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.schema.PropertyScope;
Expand All @@ -30,7 +29,7 @@
import java.io.InputStream;
import java.util.List;

public interface IParameterExtractor<T extends InvocableStreamPipesEntity> {
public interface IParameterExtractor {
String measurementUnit(String runtimeName, Integer streamIndex);

String inputTopic(Integer streamIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,5 @@

package org.apache.streampipes.extensions.api.extractor;

import org.apache.streampipes.model.graph.DataSinkInvocation;

public interface IStaticPropertyExtractor extends IParameterExtractor<DataSinkInvocation> {
public interface IStaticPropertyExtractor extends IParameterExtractor {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.api.migration;

import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
import org.apache.streampipes.model.graph.DataSinkInvocation;

public interface DataSinkMigrator extends IModelMigrator<DataSinkInvocation, IDataSinkParameterExtractor> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.api.migration;

import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;

public interface IAdapterMigrator extends IModelMigrator<AdapterDescription, IStaticPropertyExtractor> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.api.migration;

import org.apache.streampipes.extensions.api.extractor.IDataProcessorParameterExtractor;
import org.apache.streampipes.model.graph.DataProcessorInvocation;

public interface IDataProcessorMigrator
extends IModelMigrator<DataProcessorInvocation, IDataProcessorParameterExtractor> {
}
Loading

0 comments on commit a34a4dd

Please sign in to comment.