Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement migration for adapter and pipeline element configurations #2077

Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
d61b2bc
feat(#2002): Align adapter registration with other pipeline elements
dominikriemer Oct 9, 2023
dbf4cec
Fix checkstyle
dominikriemer Oct 9, 2023
d437edc
style: remove trailing whitespace
bossenti Oct 9, 2023
f74ae39
Merge branch 'dev' into 2002-harmonize-registration-of-adapters-and-p…
bossenti Oct 10, 2023
969d2ac
Add initial draft of migration concept
dominikriemer Oct 17, 2023
6f8ad83
refactor: fix logger configuration
bossenti Oct 19, 2023
01ec032
refactor: extend storage implementations by method to get all instanc…
bossenti Oct 19, 2023
a23df74
refactor: update generated typescript model
bossenti Oct 19, 2023
ecd20c4
feat: add version to models & builders
bossenti Oct 19, 2023
646e792
refactor: implement string representation of Notification
bossenti Oct 20, 2023
0792a01
feat: implement data model for migration
bossenti Oct 20, 2023
6bc8bc1
feat: register migrations at service
bossenti Oct 20, 2023
85a6b12
Merge remote-tracking branch 'origin/dev' into 2045-implement-migrati…
bossenti Oct 20, 2023
a86a7c3
Merge remote-tracking branch 'origin/2045-implement-migration-for-ada…
bossenti Oct 20, 2023
599c83b
Merge remote-tracking branch 'origin/dev' into 2045-implement-migrati…
bossenti Oct 23, 2023
9f88b9a
Revert "refactor: implement string representation of Notification"
bossenti Oct 23, 2023
879ced4
refactor: use correct Notification class
bossenti Oct 23, 2023
e65fe82
feat: introduce migrate extensions resource
bossenti Oct 23, 2023
5ae832a
feat: introduce migrate adapter endpoint
bossenti Oct 23, 2023
5696491
feat: implement adapter migration at the core
bossenti Oct 23, 2023
20a9ddf
remove data lake migration
bossenti Oct 23, 2023
cd0e41d
ensure order & uniqueness of migrations
bossenti Oct 23, 2023
18c1bc6
remove redundant exception
bossenti Oct 23, 2023
b31a3ea
remove redundant exception
bossenti Oct 23, 2023
7277ddb
add tests
bossenti Oct 23, 2023
37370f7
remove outdated test
bossenti Oct 23, 2023
86056b8
refactor: separate adapter migration from pipeline element migrations
bossenti Oct 24, 2023
4cbd9c4
refactor: move MigrationResult to StreamPipes model
bossenti Oct 24, 2023
06f349f
refactor: migration result
bossenti Oct 24, 2023
4656bb2
refactor: introduce generic migration request
bossenti Oct 25, 2023
d4390bb
feat: send migration requests to core
bossenti Oct 25, 2023
16dc3b2
feat: process migrations at core
bossenti Oct 25, 2023
0f9475a
refactor: remove legacy generic
bossenti Oct 25, 2023
c915440
refactor: introduce versioned StreamPipes entity
bossenti Oct 25, 2023
a63423a
refactor: remove deprecated generic type
bossenti Oct 25, 2023
8308971
feat: implement migration for processing elements & data sinks
bossenti Oct 25, 2023
796be9a
docs: add endpoint documentation
bossenti Oct 26, 2023
5b859d2
Merge remote-tracking branch 'origin/dev' into 2045-implement-migrati…
bossenti Oct 26, 2023
2658ae0
refactor: move to correct module
bossenti Oct 26, 2023
7129793
feature: add update for descriptions
bossenti Oct 26, 2023
540189b
Merge remote-tracking branch 'origin/dev' into 2045-implement-migrati…
bossenti Oct 27, 2023
509b6b4
refactor: adapt ProcessingElementBuilder to be capable of versions
bossenti Oct 27, 2023
203bd54
refactor: minor improvements
bossenti Oct 27, 2023
3f83ae2
test
bossenti Oct 27, 2023
1b1b4c0
style: fix checkstyle issues
bossenti Oct 27, 2023
542d632
refactor: remove legacy type definition
bossenti Oct 27, 2023
798ed3a
refactor: update generated TS models
bossenti Oct 27, 2023
d0d28c0
fix: add missing license header
bossenti Oct 27, 2023
672745e
Fix adapter model migration, add OPC adapter migration as sample
dominikriemer Oct 27, 2023
28b21c6
Fix typo
dominikriemer Oct 27, 2023
10375f3
Improvements to extension migration (#2101)
dominikriemer Oct 30, 2023
f662872
Merge remote-tracking branch 'origin/dev' into 2045-implement-migrati…
bossenti Oct 30, 2023
765f8ca
refactor: remove redundant assignments
bossenti Oct 30, 2023
a61bf62
refactor: provide migrators as interface
bossenti Oct 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.streampipes.model.extensions.configuration.SpServiceConfiguration;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.function.FunctionDefinition;
import org.apache.streampipes.model.migration.ModelMigratorConfig;

import java.util.List;

Expand All @@ -39,5 +40,9 @@ public interface IAdminApi {

void deregisterFunction(String functionId);

void registerAdapterMigrations(List<ModelMigratorConfig> migrationConfigs, String serviceId);

void registerPipelineElementMigrations(List<ModelMigratorConfig> migratorConfigs, String serviceId);

MessagingSettings getMessagingSettings();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.function.FunctionDefinition;
import org.apache.streampipes.model.message.SuccessMessage;
import org.apache.streampipes.model.migration.ModelMigratorConfig;

import java.util.List;

Expand Down Expand Up @@ -66,6 +67,19 @@ public void deregisterFunction(String functionId) {
delete(getDeleteFunctionPath(functionId), SuccessMessage.class);
}

/**
* Register migration configs {@link ModelMigratorConfig} at the StreamPipes Core service.
* @param migrationConfigs list of migration configs to be registered
*/
@Override
public void registerAdapterMigrations(List<ModelMigratorConfig> migrationConfigs, String serviceId) {
post(getAdapterMigrationPath().addToPath(serviceId), migrationConfigs);
}

public void registerPipelineElementMigrations(List<ModelMigratorConfig> migratorConfigs, String serviceId) {
post(getPipelineElementMigrationPath().addToPath(serviceId), migratorConfigs);
}

@Override
public MessagingSettings getMessagingSettings() {
return getSingle(getMessagingSettingsPath(), MessagingSettings.class);
Expand Down Expand Up @@ -98,4 +112,16 @@ private StreamPipesApiPath getFunctionsPath() {
private StreamPipesApiPath getDeleteFunctionPath(String functionId) {
return getFunctionsPath().addToPath(functionId);
}

private StreamPipesApiPath getAdapterMigrationPath() {
return StreamPipesApiPath
.fromBaseApiPath()
.addToPath("migrations/adapter");
dominikriemer marked this conversation as resolved.
Show resolved Hide resolved
}

private StreamPipesApiPath getPipelineElementMigrationPath() {
return StreamPipesApiPath
.fromBaseApiPath()
.addToPath("migrations/pipeline-element");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public static void stopStreamAdapter(String baseUrl,
AdapterDescription adapterStreamDescription) throws AdapterException {
String url = baseUrl + WorkerPaths.getStreamStopPath();

var ad =
getAdapterDescriptionById(new AdapterInstanceStorageImpl(), adapterStreamDescription.getElementId());
var ad = getAdapterDescriptionById(new AdapterInstanceStorageImpl(), adapterStreamDescription.getElementId());

stopAdapter(ad, url);
updateStreamAdapterStatus(adapterStreamDescription.getElementId(), false);
Expand All @@ -74,8 +73,8 @@ public static List<AdapterDescription> getAllRunningAdapterInstanceDescriptions(
try {
LOG.info("Requesting all running adapter description instances: " + url);
var responseString = ExtensionServiceExecutions
.extServiceGetRequest(url)
.execute().returnContent().asString();
.extServiceGetRequest(url)
.execute().returnContent().asString();

return JacksonSerializer.getObjectMapper().readValue(responseString, List.class);
} catch (IOException e) {
Expand All @@ -84,15 +83,15 @@ public static List<AdapterDescription> getAllRunningAdapterInstanceDescriptions(
}
}

public static void startAdapter(String url,
AdapterDescription ad) throws AdapterException {
private static void startAdapter(String url,
AdapterDescription ad) throws AdapterException {
LOG.info("Trying to start adapter on endpoint {} ", url);
triggerAdapterStateChange(ad, url, "started");
}


public static void stopAdapter(AdapterDescription ad,
String url) throws AdapterException {
private static void stopAdapter(AdapterDescription ad,
String url) throws AdapterException {

LOG.info("Trying to stop adapter on endpoint {} ", url);
triggerAdapterStateChange(ad, url, "stopped");
Expand Down Expand Up @@ -134,14 +133,14 @@ private static HttpResponse triggerPost(String url,
public static RuntimeOptionsResponse getConfiguration(String workerEndpoint,
String appId,
RuntimeOptionsRequest runtimeOptionsRequest)
throws AdapterException, SpConfigurationException {
throws AdapterException, SpConfigurationException {
String url = workerEndpoint + WorkerPaths.getRuntimeResolvablePath(appId);

try {
String payload = JacksonSerializer.getObjectMapper().writeValueAsString(runtimeOptionsRequest);
var response = ExtensionServiceExecutions.extServicePostRequest(url, payload)
.execute()
.returnResponse();
.execute()
.returnResponse();

String responseString = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);

Expand All @@ -163,9 +162,9 @@ public static String getAssets(String workerPath) throws AdapterException {

try {
return Request.Get(url)
.connectTimeout(1000)
.socketTimeout(100000)
.execute().returnContent().asString();
.connectTimeout(1000)
.socketTimeout(100000)
.execute().returnContent().asString();
} catch (IOException e) {
LOG.error(e.getMessage());
throw new AdapterException("Could not get assets endpoint: " + url);
Expand All @@ -178,9 +177,9 @@ public static byte[] getIconAsset(String baseUrl) throws AdapterException {

try {
byte[] responseString = Request.Get(url)
.connectTimeout(1000)
.socketTimeout(100000)
.execute().returnContent().asBytes();
.connectTimeout(1000)
.socketTimeout(100000)
.execute().returnContent().asBytes();
return responseString;
} catch (IOException e) {
LOG.error(e.getMessage());
Expand All @@ -193,9 +192,9 @@ public static String getDocumentationAsset(String baseUrl) throws AdapterExcepti

try {
return Request.Get(url)
.connectTimeout(1000)
.socketTimeout(100000)
.execute().returnContent().asString();
.connectTimeout(1000)
.socketTimeout(100000)
.execute().returnContent().asString();
} catch (IOException e) {
LOG.error(e.getMessage());
throw new AdapterException("Could not get documentation endpoint: " + url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@

package org.apache.streampipes.extensions.api.extractor;

import org.apache.streampipes.model.graph.DataProcessorInvocation;

import java.util.List;

public interface IDataProcessorParameterExtractor extends IParameterExtractor<DataProcessorInvocation> {
public interface IDataProcessorParameterExtractor extends IParameterExtractor {
String outputTopic();

List<String> outputKeySelectors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,5 @@

package org.apache.streampipes.extensions.api.extractor;

import org.apache.streampipes.model.graph.DataSinkInvocation;

public interface IDataSinkParameterExtractor extends IParameterExtractor<DataSinkInvocation> {
public interface IDataSinkParameterExtractor extends IParameterExtractor {
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.streampipes.extensions.api.extractor;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.schema.PropertyScope;
Expand All @@ -30,7 +29,7 @@
import java.io.InputStream;
import java.util.List;

public interface IParameterExtractor<T extends InvocableStreamPipesEntity> {
public interface IParameterExtractor {
String measurementUnit(String runtimeName, Integer streamIndex);

String inputTopic(Integer streamIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,5 @@

package org.apache.streampipes.extensions.api.extractor;

import org.apache.streampipes.model.graph.DataSinkInvocation;

public interface IStaticPropertyExtractor extends IParameterExtractor<DataSinkInvocation> {
public interface IStaticPropertyExtractor extends IParameterExtractor {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.api.migration;

import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;

public abstract class AdapterMigrator implements ModelMigrator<AdapterDescription, IStaticPropertyExtractor> {

@Override
public boolean equals(Object obj) {
if (obj instanceof ModelMigrator<?, ?>) {
return this.config().equals(((ModelMigrator<?, ?>) obj).config());
}
return false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity, where is this equals method used? Is this the only reason the AdapterMigrator has to be an abstract class instead of an interface?

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.api.migration;

import org.apache.streampipes.extensions.api.extractor.IDataProcessorParameterExtractor;
import org.apache.streampipes.model.graph.DataProcessorInvocation;

public abstract class DataProcessorMigrator
implements ModelMigrator<DataProcessorInvocation, IDataProcessorParameterExtractor> {

@Override
public boolean equals(Object obj) {
if (obj instanceof ModelMigrator<?, ?>) {
return this.config().equals(((ModelMigrator<?, ?>) obj).config());
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.api.migration;

import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
import org.apache.streampipes.model.graph.DataSinkInvocation;

public abstract class DataSinkMigrator implements ModelMigrator<DataSinkInvocation, IDataSinkParameterExtractor> {

@Override
public boolean equals(Object obj) {
if (obj instanceof ModelMigrator<?, ?>) {
return this.config().equals(((ModelMigrator<?, ?>) obj).config());
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.api.migration;

import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
import org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity;
import org.apache.streampipes.model.migration.MigrationResult;
import org.apache.streampipes.model.migration.ModelMigratorConfig;

public interface ModelMigrator<
T extends VersionedNamedStreamPipesEntity,
ExT extends IParameterExtractor
> extends Comparable<Object> {

ModelMigratorConfig config();

/**
* Defines the migration to be performed.
*
* @param element Entity to be transformed.
* @param extractor Extractor that allows to handle static properties.
* @return Result of the migration that describes both outcomes: successful and failed migrations
* @throws RuntimeException in case any unexpected error occurs
*/
MigrationResult<T> migrate(T element, ExT extractor) throws RuntimeException;

@Override
default int compareTo(Object o) {
if (!(o instanceof ModelMigrator<?, ?>)) {
throw new ClassCastException("Given object is not an instance of `ModelMigrator` - "
+ "only instances of `ModelMigrator` can be compared.");
} else {
return config().compareTo(((ModelMigrator<?, ?>) o).config());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

public interface IParameterGenerator<
IvT extends InvocableStreamPipesEntity,
PeT extends IParameterExtractor<IvT>,
PeT extends IParameterExtractor,
K extends IPipelineElementParameters<IvT, PeT>> {


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

public interface IPipelineElementParameters<
IvT extends InvocableStreamPipesEntity,
ExT extends IParameterExtractor<IvT>> {
ExT extends IParameterExtractor> {

IvT getModel();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.streampipes.model.schema.EventSchema;

public interface ResolvesContainerProvidedOutputStrategy<T extends InvocableStreamPipesEntity, K
extends IParameterExtractor<T>> {
extends IParameterExtractor> {

EventSchema resolveOutputStrategy(T processingElement, K parameterExtractor) throws SpConfigurationException;
}
Loading
Loading