Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve startup behaviour #2105

Merged
merged 63 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 61 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
d61b2bc
feat(#2002): Align adapter registration with other pipeline elements
dominikriemer Oct 9, 2023
dbf4cec
Fix checkstyle
dominikriemer Oct 9, 2023
d437edc
style: remove trailing whitespace
bossenti Oct 9, 2023
f74ae39
Merge branch 'dev' into 2002-harmonize-registration-of-adapters-and-p…
bossenti Oct 10, 2023
969d2ac
Add initial draft of migration concept
dominikriemer Oct 17, 2023
6f8ad83
refactor: fix logger configuration
bossenti Oct 19, 2023
01ec032
refactor: extend storage implementations by method to get all instanc…
bossenti Oct 19, 2023
a23df74
refactor: update generated typescript model
bossenti Oct 19, 2023
ecd20c4
feat: add version to models & builders
bossenti Oct 19, 2023
646e792
refactor: implement string representation of Notification
bossenti Oct 20, 2023
0792a01
feat: implement data model for migration
bossenti Oct 20, 2023
6bc8bc1
feat: register migrations at service
bossenti Oct 20, 2023
85a6b12
Merge remote-tracking branch 'origin/dev' into 2045-implement-migrati…
bossenti Oct 20, 2023
a86a7c3
Merge remote-tracking branch 'origin/2045-implement-migration-for-ada…
bossenti Oct 20, 2023
599c83b
Merge remote-tracking branch 'origin/dev' into 2045-implement-migrati…
bossenti Oct 23, 2023
9f88b9a
Revert "refactor: implement string representation of Notification"
bossenti Oct 23, 2023
879ced4
refactor: use correct Notification class
bossenti Oct 23, 2023
e65fe82
feat: introduce migrate extensions resource
bossenti Oct 23, 2023
5ae832a
feat: introduce migrate adapter endpoint
bossenti Oct 23, 2023
5696491
feat: implement adapter migration at the core
bossenti Oct 23, 2023
20a9ddf
remove data lake migration
bossenti Oct 23, 2023
cd0e41d
ensure order & uniqueness of migrations
bossenti Oct 23, 2023
18c1bc6
remove redundant exception
bossenti Oct 23, 2023
b31a3ea
remove redundant exception
bossenti Oct 23, 2023
7277ddb
add tests
bossenti Oct 23, 2023
37370f7
remove outdated test
bossenti Oct 23, 2023
86056b8
refactor: separate adapter migration from pipeline element migrations
bossenti Oct 24, 2023
4cbd9c4
refactor: move MigrationResult to StreamPipes model
bossenti Oct 24, 2023
06f349f
refactor: migration result
bossenti Oct 24, 2023
4656bb2
refactor: introduce generic migration request
bossenti Oct 25, 2023
d4390bb
feat: send migration requests to core
bossenti Oct 25, 2023
16dc3b2
feat: process migrations at core
bossenti Oct 25, 2023
0f9475a
refactor: remove legacy generic
bossenti Oct 25, 2023
c915440
refactor: introduce versioned StreamPipes entity
bossenti Oct 25, 2023
a63423a
refactor: remove deprecated generic type
bossenti Oct 25, 2023
8308971
feat: implement migration for processing elements & data sinks
bossenti Oct 25, 2023
796be9a
docs: add endpoint documentation
bossenti Oct 26, 2023
5b859d2
Merge remote-tracking branch 'origin/dev' into 2045-implement-migrati…
bossenti Oct 26, 2023
2658ae0
refactor: move to correct module
bossenti Oct 26, 2023
7129793
feature: add update for descriptions
bossenti Oct 26, 2023
540189b
Merge remote-tracking branch 'origin/dev' into 2045-implement-migrati…
bossenti Oct 27, 2023
509b6b4
refactor: adapt ProcessingElementBuilder to be capable of versions
bossenti Oct 27, 2023
203bd54
refactor: minor improvements
bossenti Oct 27, 2023
3f83ae2
test
bossenti Oct 27, 2023
1b1b4c0
style: fix checkstyle issues
bossenti Oct 27, 2023
542d632
refactor: remove legacy type definition
bossenti Oct 27, 2023
798ed3a
refactor: update generated TS models
bossenti Oct 27, 2023
d0d28c0
fix: add missing license header
bossenti Oct 27, 2023
672745e
Fix adapter model migration, add OPC adapter migration as sample
dominikriemer Oct 27, 2023
28b21c6
Fix typo
dominikriemer Oct 27, 2023
44055fe
Extract MigrationResource logic into smaller units
dominikriemer Oct 28, 2023
096dcf6
Use single request for submitting migrations from extensions to core
dominikriemer Oct 28, 2023
67fe516
Improve execution order of migrations and service startup tasks
dominikriemer Oct 29, 2023
38a3cfb
Merge branch 'dev' into improve-startup-behaviour
dominikriemer Oct 30, 2023
17a070b
Improve exception logging
dominikriemer Oct 30, 2023
bcbcf14
Improve pipeline health check
dominikriemer Oct 30, 2023
63cedf1
Properly execute migration of adapter models
dominikriemer Oct 31, 2023
67e93db
Fix adapter model migration
dominikriemer Oct 31, 2023
15c1ffe
Merge branch 'dev' into improve-startup-behaviour
dominikriemer Nov 1, 2023
e4ef56f
Improve service health check
dominikriemer Nov 1, 2023
d2e8cc3
Add more checks to adapter migration
dominikriemer Nov 1, 2023
3d1dfbb
Simplify migration request handling
dominikriemer Nov 2, 2023
51accf4
Fix registration
dominikriemer Nov 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.util.List;
import java.util.Map;

public class AdapterHealthCheck {
public class AdapterHealthCheck implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(AdapterHealthCheck.class);

Expand All @@ -52,6 +52,11 @@ public AdapterHealthCheck(IAdapterStorage adapterStorage,
this.adapterMasterManagement = adapterMasterManagement;
}

@Override
public void run() {
this.checkAndRestoreAdapters();
}

/**
* In this method it is checked which adapters are currently running.
* Then it calls all workers to validate if the adapter instance is
Expand Down Expand Up @@ -114,7 +119,7 @@ public Map<String, AdapterDescription> getAdaptersToRecover(
allRunningInstancesOfOneWorker.forEach(adapterDescription ->
allRunningInstancesAdapterDescription.remove(adapterDescription.getElementId()));
} catch (AdapterException e) {
e.printStackTrace();
LOG.info("Could not recover adapter at endpoint {} due to {}", adapterEndpointUrl, e.getMessage());
}
});

Expand All @@ -130,10 +135,9 @@ public void recoverAdapters(Map<String, AdapterDescription> adaptersToRecover) {
this.adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
}
} catch (AdapterException e) {
LOG.warn("Could not start adapter {}", adapterDescription.getName(), e);
LOG.warn("Could not start adapter {} ({})", adapterDescription.getName(), e.getMessage());
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,12 @@ public static void stopStreamAdapter(String baseUrl,

public static List<AdapterDescription> getAllRunningAdapterInstanceDescriptions(String url) throws AdapterException {
try {
LOG.info("Requesting all running adapter description instances: " + url);
var responseString = ExtensionServiceExecutions
.extServiceGetRequest(url)
.execute().returnContent().asString();

return JacksonSerializer.getObjectMapper().readValue(responseString, List.class);
} catch (IOException e) {
LOG.error("List of running adapters could not be fetched", e);
throw new AdapterException("List of running adapters could not be fetched from: " + url);
}
}
Expand Down Expand Up @@ -112,9 +110,6 @@ private static void triggerAdapterStateChange(AdapterDescription ad,
var exception = getSerializer().readValue(responseString, AdapterException.class);
throw new AdapterException(exception.getMessage(), exception.getCause());
}

LOG.info("Adapter {} on endpoint: " + url + " with Response: ", ad.getName() + responseString);

} catch (IOException e) {
LOG.error("Adapter was not {} successfully", action, e);
throw new AdapterException("Adapter was not " + action + " successfully with url " + url, e);
Expand Down Expand Up @@ -153,8 +148,7 @@ public static RuntimeOptionsResponse getConfiguration(String workerEndpoint,
throw new SpConfigurationException(exception.getMessage(), exception.getCause());
}
} catch (IOException e) {
e.printStackTrace();
throw new AdapterException("Could not resolve runtime configurations from " + url);
throw new AdapterException("Could not resolve runtime configurations from " + url, e);
}
}

Expand All @@ -178,11 +172,10 @@ public static byte[] getIconAsset(String baseUrl) throws AdapterException {
String url = baseUrl + "/assets/icon";

try {
byte[] responseString = Request.Get(url)
return Request.Get(url)
.connectTimeout(1000)
.socketTimeout(100000)
.execute().returnContent().asBytes();
return responseString;
} catch (IOException e) {
LOG.error(e.getMessage());
throw new AdapterException("Could not get icon endpoint: " + url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;

public interface IAdapterMigrator extends IModelMigrator<AdapterDescription, IStaticPropertyExtractor> {
public interface IAdapterMigrator extends IModelMigrator<AdapterDescription, IStaticPropertyExtractor> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@
import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
import org.apache.streampipes.model.graph.DataSinkInvocation;

public interface DataSinkMigrator extends IModelMigrator<DataSinkInvocation, IDataSinkParameterExtractor> {
public interface IDataSinkMigrator extends IModelMigrator<DataSinkInvocation, IDataSinkParameterExtractor> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class SpCoreConfiguration {
private GeneralConfig generalConfig;

private boolean isConfigured;
private SpCoreConfigurationStatus serviceStatus;

private String assetDir;
private String filesDir;
Expand Down Expand Up @@ -120,4 +121,12 @@ public EmailTemplateConfig getEmailTemplateConfig() {
public void setEmailTemplateConfig(EmailTemplateConfig emailTemplateConfig) {
this.emailTemplateConfig = emailTemplateConfig;
}

public SpCoreConfigurationStatus getServiceStatus() {
return this.serviceStatus;
}

public void setServiceStatus(SpCoreConfigurationStatus serviceStatus) {
this.serviceStatus = serviceStatus;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.model.configuration;

public enum SpCoreConfigurationStatus {
INSTALLING,
MIGRATING,
READY
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public String getRev(JsonObject adapter) {
return adapter.get(REV).getAsString();
}

public String getAppId(JsonObject adapter) {
return adapter.get("properties").getAsJsonObject().get(APP_ID).getAsString();
}

public void updateType(JsonObject adapter,
String typeFieldName) {
adapter.add(typeFieldName, new JsonPrimitive(AdapterModels.NEW_MODEL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class SpServiceRegistration {
private int port;
private List<SpServiceTag> tags;
private String healthCheckPath;
private boolean healthy = true;
private long firstTimeSeenUnhealthy = 0;
private SpServiceStatus status = SpServiceStatus.REGISTERED;

public SpServiceRegistration() {
}
Expand Down Expand Up @@ -133,14 +133,6 @@ public void setRev(String rev) {
this.rev = rev;
}

public boolean isHealthy() {
return healthy;
}

public void setHealthy(boolean healthy) {
this.healthy = healthy;
}

public String getScheme() {
return scheme;
}
Expand Down Expand Up @@ -168,4 +160,12 @@ public String getSvcType() {
public void setSvcType(String svcType) {
this.svcType = svcType;
}

public SpServiceStatus getStatus() {
return status;
}

public void setStatus(SpServiceStatus status) {
this.status = status;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.model.extensions.svcdiscovery;

public enum SpServiceStatus {
REGISTERED,
MIGRATING,
HEALTHY,
UNHEALTHY
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public DataProcessorInvocation(DataProcessorInvocation other) {
public DataProcessorInvocation(DataProcessorDescription sepa, String domId) {
this(sepa);
this.dom = domId;
this.serviceTagPrefix = SpServiceTagPrefix.DATA_PROCESSOR;
}

public DataProcessorInvocation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public DataSinkInvocation(DataSinkDescription other) {
public DataSinkInvocation(DataSinkDescription sec, String domId) {
this(sec);
this.setDom(domId);
this.serviceTagPrefix = SpServiceTagPrefix.DATA_SINK;
}

public DataSinkInvocation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private List<String> getServiceEndpoints() {

private String selectService() throws NoServiceEndpointsAvailableException {
List<String> serviceEndpoints = getServiceEndpoints();
if (serviceEndpoints.size() > 0) {
if (!serviceEndpoints.isEmpty()) {
return getServiceEndpoints().get(0);
} else {
LOG.error("Could not find any service endpoints for appId {}, serviceTag {}", appId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.manager.health;

import org.apache.streampipes.model.configuration.SpCoreConfiguration;
import org.apache.streampipes.model.configuration.SpCoreConfigurationStatus;
import org.apache.streampipes.storage.api.ISpCoreConfigurationStorage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CoreServiceStatusManager {

private static final Logger LOG = LoggerFactory.getLogger(CoreServiceStatusManager.class);

private final ISpCoreConfigurationStorage storage;

public CoreServiceStatusManager(ISpCoreConfigurationStorage storage) {
this.storage = storage;
}

public boolean existsConfig() {
return storage.exists();
}

public boolean isCoreReady() {
return existsConfig() && storage.get().getServiceStatus() == SpCoreConfigurationStatus.READY;
}

public void updateCoreStatus(SpCoreConfigurationStatus status) {
var config = storage.get();
config.setServiceStatus(status);
storage.updateElement(config);
logService(config);
}

private void logService(SpCoreConfiguration coreConfig) {
LOG.info(
"Core is now in {} state",
coreConfig.getServiceStatus()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.apache.streampipes.manager.pipeline.PipelineManager.getPipeline;

public class PipelineHealthCheck implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(PipelineHealthCheck.class);
Expand All @@ -68,7 +70,7 @@ public void checkAndRestorePipelineElements() {
pipelinesStats.setRunningPipelines(runningPipelines.size());
pipelinesStats.setStoppedPipelines(pipelinesStats.getAllPipelines() - pipelinesStats.getRunningPipelines());

if (runningPipelines.size() > 0) {
if (!runningPipelines.isEmpty()) {
Map<String, List<InvocableStreamPipesEntity>> endpointMap = generateEndpointMap();
List<String> allRunningInstances = findRunningInstances(endpointMap.keySet());

Expand Down Expand Up @@ -115,15 +117,16 @@ public void checkAndRestorePipelineElements() {
}
});
if (shouldUpdatePipeline.get()) {
if (failedInstances.size() > 0) {
pipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
var currentPipeline = getPipeline(pipeline.getPipelineId());
if (!failedInstances.isEmpty()) {
currentPipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
pipelinesStats.failedIncrease();
} else if (recoveredInstances.size() > 0) {
pipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
} else if (!recoveredInstances.isEmpty()) {
currentPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
pipelinesStats.attentionRequiredIncrease();
}
pipeline.setPipelineNotifications(pipelineNotifications);
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
currentPipeline.setPipelineNotifications(pipelineNotifications);
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(currentPipeline);
}
});
int healthNum = pipelinesStats.getRunningPipelines() - pipelinesStats.getFailedPipelines()
Expand Down Expand Up @@ -233,13 +236,11 @@ private List<Pipeline> getRunningPipelines(List<Pipeline> allPipelines) {
}

private List<Pipeline> getAllPipelines() {
List<Pipeline> allPipelines = StorageDispatcher
return StorageDispatcher
.INSTANCE
.getNoSqlStore()
.getPipelineStorageAPI()
.getAllPipelines();

return allPipelines;
}

private int getElementsCount(List<Pipeline> allPipelines){
Expand Down
Loading
Loading