From 552f3fed38abb53c8ea2657b3e4bc242215a67fa Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Thu, 2 Nov 2023 18:53:17 +0100 Subject: [PATCH] Improve startup behaviour (#2105) * feat(#2002): Align adapter registration with other pipeline elements * Fix checkstyle * style: remove trailing whitespace * Add initial draft of migration concept * refactor: fix logger configuration * refactor: extend storage implementations by method to get all instances by the app id * refactor: update generated typescript model * feat: add version to models & builders * refactor: implement string representation of Notification * feat: implement data model for migration * feat: register migrations at service * Revert "refactor: implement string representation of Notification" This reverts commit 646e792d50b3022748e99be738c390270b87c345. * refactor: use correct Notification class * feat: introduce migrate extensions resource * feat: introduce migrate adapter endpoint * feat: implement adapter migration at the core * remove data lake migration * ensure order & uniqueness of migrations * remove redundant exception * remove redundant exception * add tests * remove outdated test * refactor: separate adapter migration from pipeline element migrations * refactor: move MigrationResult to StreamPipes model * refactor: migration result * refactor: introduce generic migration request * feat: send migration requests to core * feat: process migrations at core * refactor: remove legacy generic * refactor: introduce versioned StreamPipes entity * refactor: remove deprecated generic type * feat: implement migration for processing elements & data sinks * docs: add endpoint documentation * refactor: move to correct module * feature: add update for descriptions * refactor: adapt ProcessingElementBuilder to be capable of versions * refactor: minor improvements * style: fix checkstyle issues * refactor: remove legacy type definition * refactor: update generated TS models * fix: add missing license header * Fix adapter model migration, add OPC adapter migration as sample * Fix typo * Extract MigrationResource logic into smaller units * Use single request for submitting migrations from extensions to core * Improve execution order of migrations and service startup tasks * Improve exception logging * Improve pipeline health check * Properly execute migration of adapter models * Fix adapter model migration * Improve service health check * Add more checks to adapter migration * Simplify migration request handling * Fix registration --------- Co-authored-by: bossenti --- .../management/health/AdapterHealthCheck.java | 12 +- .../management/WorkerRestClient.java | 11 +- .../api/migration/IAdapterMigrator.java | 2 +- ...nkMigrator.java => IDataSinkMigrator.java} | 2 +- .../configuration/SpCoreConfiguration.java | 9 ++ .../SpCoreConfigurationStatus.java | 25 +++ .../adapter/migration/MigrationHelpers.java | 4 + .../svcdiscovery/SpServiceRegistration.java | 18 +-- .../svcdiscovery/SpServiceStatus.java | 26 ++++ .../model/graph/DataProcessorInvocation.java | 1 + .../model/graph/DataSinkInvocation.java | 1 + .../ExtensionsServiceEndpointGenerator.java | 2 +- .../health/CoreServiceStatusManager.java | 59 ++++++++ .../manager/health/PipelineHealthCheck.java | 21 +-- .../manager/health/ServiceHealthCheck.java | 31 ++-- .../health/ServiceRegistrationManager.java | 104 +++++++++++++ .../AdapterDescriptionMigration093.java | 73 +++++++++ ...dapterDescriptionMigration093Provider.java | 45 ++++++ .../PipelineElementMigrationManager.java | 143 ++++++++++-------- .../setup/SpCoreConfigurationStep.java | 10 +- .../manager/setup/StreamPipesEnvChecker.java | 2 +- .../migration/DataSinkMigrationResource.java | 4 +- .../rest/impl/admin/MigrationResource.java | 60 ++++++-- .../admin/ServiceRegistrationResource.java | 12 +- .../service/core/PostStartupTask.java | 7 +- .../core/StreamPipesCoreApplication.java | 53 ++++--- .../migrations/v093/AdapterMigration.java | 29 ++-- .../v093/ConsulConfigMigration.java | 2 + .../svcdiscovery/SpServiceDiscoveryCore.java | 3 +- .../extensions/CoreRequestSubmitter.java | 54 +++++++ .../extensions/ExtensionsModelSubmitter.java | 8 +- .../StreamPipesExtensionsServiceBase.java | 30 ++-- .../api/ISpCoreConfigurationStorage.java | 2 + .../impl/CoreConfigurationStorageImpl.java | 5 + .../src/lib/model/gen/streampipes-model.ts | 13 +- ...istered-extensions-services.component.html | 7 +- 36 files changed, 692 insertions(+), 198 deletions(-) rename streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/{DataSinkMigrator.java => IDataSinkMigrator.java} (90%) create mode 100644 streampipes-model/src/main/java/org/apache/streampipes/model/configuration/SpCoreConfigurationStatus.java create mode 100644 streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceStatus.java create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/CoreServiceStatusManager.java create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceRegistrationManager.java create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AdapterDescriptionMigration093.java create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AdapterDescriptionMigration093Provider.java create mode 100644 streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CoreRequestSubmitter.java diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java index 07d70cf7dd..e59187ea69 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java @@ -34,7 +34,7 @@ import java.util.List; import java.util.Map; -public class AdapterHealthCheck { +public class AdapterHealthCheck implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(AdapterHealthCheck.class); @@ -52,6 +52,11 @@ public AdapterHealthCheck(IAdapterStorage adapterStorage, this.adapterMasterManagement = adapterMasterManagement; } + @Override + public void run() { + this.checkAndRestoreAdapters(); + } + /** * In this method it is checked which adapters are currently running. * Then it calls all workers to validate if the adapter instance is @@ -114,7 +119,7 @@ public Map getAdaptersToRecover( allRunningInstancesOfOneWorker.forEach(adapterDescription -> allRunningInstancesAdapterDescription.remove(adapterDescription.getElementId())); } catch (AdapterException e) { - e.printStackTrace(); + LOG.info("Could not recover adapter at endpoint {} due to {}", adapterEndpointUrl, e.getMessage()); } }); @@ -130,10 +135,9 @@ public void recoverAdapters(Map adaptersToRecover) { this.adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId()); } } catch (AdapterException e) { - LOG.warn("Could not start adapter {}", adapterDescription.getName(), e); + LOG.warn("Could not start adapter {} ({})", adapterDescription.getName(), e.getMessage()); } } } - } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java index a551201e29..c715459070 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java @@ -73,14 +73,12 @@ public static void stopStreamAdapter(String baseUrl, public static List getAllRunningAdapterInstanceDescriptions(String url) throws AdapterException { try { - LOG.info("Requesting all running adapter description instances: " + url); var responseString = ExtensionServiceExecutions .extServiceGetRequest(url) .execute().returnContent().asString(); return JacksonSerializer.getObjectMapper().readValue(responseString, List.class); } catch (IOException e) { - LOG.error("List of running adapters could not be fetched", e); throw new AdapterException("List of running adapters could not be fetched from: " + url); } } @@ -112,9 +110,6 @@ private static void triggerAdapterStateChange(AdapterDescription ad, var exception = getSerializer().readValue(responseString, AdapterException.class); throw new AdapterException(exception.getMessage(), exception.getCause()); } - - LOG.info("Adapter {} on endpoint: " + url + " with Response: ", ad.getName() + responseString); - } catch (IOException e) { LOG.error("Adapter was not {} successfully", action, e); throw new AdapterException("Adapter was not " + action + " successfully with url " + url, e); @@ -153,8 +148,7 @@ public static RuntimeOptionsResponse getConfiguration(String workerEndpoint, throw new SpConfigurationException(exception.getMessage(), exception.getCause()); } } catch (IOException e) { - e.printStackTrace(); - throw new AdapterException("Could not resolve runtime configurations from " + url); + throw new AdapterException("Could not resolve runtime configurations from " + url, e); } } @@ -178,11 +172,10 @@ public static byte[] getIconAsset(String baseUrl) throws AdapterException { String url = baseUrl + "/assets/icon"; try { - byte[] responseString = Request.Get(url) + return Request.Get(url) .connectTimeout(1000) .socketTimeout(100000) .execute().returnContent().asBytes(); - return responseString; } catch (IOException e) { LOG.error(e.getMessage()); throw new AdapterException("Could not get icon endpoint: " + url); diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/IAdapterMigrator.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/IAdapterMigrator.java index e7a0996dd9..64ead7f632 100644 --- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/IAdapterMigrator.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/IAdapterMigrator.java @@ -21,5 +21,5 @@ import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; import org.apache.streampipes.model.connect.adapter.AdapterDescription; -public interface IAdapterMigrator extends IModelMigrator { +public interface IAdapterMigrator extends IModelMigrator { } diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/DataSinkMigrator.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/IDataSinkMigrator.java similarity index 90% rename from streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/DataSinkMigrator.java rename to streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/IDataSinkMigrator.java index e30ab00c2a..d1ce71f892 100644 --- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/DataSinkMigrator.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/migration/IDataSinkMigrator.java @@ -21,5 +21,5 @@ import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor; import org.apache.streampipes.model.graph.DataSinkInvocation; -public interface DataSinkMigrator extends IModelMigrator { +public interface IDataSinkMigrator extends IModelMigrator { } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/SpCoreConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/SpCoreConfiguration.java index 3e3bd251d0..6aad62a172 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/SpCoreConfiguration.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/SpCoreConfiguration.java @@ -34,6 +34,7 @@ public class SpCoreConfiguration { private GeneralConfig generalConfig; private boolean isConfigured; + private SpCoreConfigurationStatus serviceStatus; private String assetDir; private String filesDir; @@ -120,4 +121,12 @@ public EmailTemplateConfig getEmailTemplateConfig() { public void setEmailTemplateConfig(EmailTemplateConfig emailTemplateConfig) { this.emailTemplateConfig = emailTemplateConfig; } + + public SpCoreConfigurationStatus getServiceStatus() { + return this.serviceStatus; + } + + public void setServiceStatus(SpCoreConfigurationStatus serviceStatus) { + this.serviceStatus = serviceStatus; + } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/SpCoreConfigurationStatus.java b/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/SpCoreConfigurationStatus.java new file mode 100644 index 0000000000..394b9a8aba --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/configuration/SpCoreConfigurationStatus.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.configuration; + +public enum SpCoreConfigurationStatus { + INSTALLING, + MIGRATING, + READY +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/MigrationHelpers.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/MigrationHelpers.java index d518086e9e..2343d41bb0 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/MigrationHelpers.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/MigrationHelpers.java @@ -40,6 +40,10 @@ public String getRev(JsonObject adapter) { return adapter.get(REV).getAsString(); } + public String getAppId(JsonObject adapter) { + return adapter.get("properties").getAsJsonObject().get(APP_ID).getAsString(); + } + public void updateType(JsonObject adapter, String typeFieldName) { adapter.add(typeFieldName, new JsonPrimitive(AdapterModels.NEW_MODEL)); diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceRegistration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceRegistration.java index b1490adfaa..8edd9932fa 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceRegistration.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceRegistration.java @@ -36,8 +36,8 @@ public class SpServiceRegistration { private int port; private List tags; private String healthCheckPath; - private boolean healthy = true; private long firstTimeSeenUnhealthy = 0; + private SpServiceStatus status = SpServiceStatus.REGISTERED; public SpServiceRegistration() { } @@ -133,14 +133,6 @@ public void setRev(String rev) { this.rev = rev; } - public boolean isHealthy() { - return healthy; - } - - public void setHealthy(boolean healthy) { - this.healthy = healthy; - } - public String getScheme() { return scheme; } @@ -168,4 +160,12 @@ public String getSvcType() { public void setSvcType(String svcType) { this.svcType = svcType; } + + public SpServiceStatus getStatus() { + return status; + } + + public void setStatus(SpServiceStatus status) { + this.status = status; + } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceStatus.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceStatus.java new file mode 100644 index 0000000000..56b388d166 --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceStatus.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.extensions.svcdiscovery; + +public enum SpServiceStatus { + REGISTERED, + MIGRATING, + HEALTHY, + UNHEALTHY +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java index 642134bf1e..64aed3b49b 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java @@ -75,6 +75,7 @@ public DataProcessorInvocation(DataProcessorInvocation other) { public DataProcessorInvocation(DataProcessorDescription sepa, String domId) { this(sepa); this.dom = domId; + this.serviceTagPrefix = SpServiceTagPrefix.DATA_PROCESSOR; } public DataProcessorInvocation() { diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java index 235ca33b44..4476c6651b 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java @@ -59,6 +59,7 @@ public DataSinkInvocation(DataSinkDescription other) { public DataSinkInvocation(DataSinkDescription sec, String domId) { this(sec); this.setDom(domId); + this.serviceTagPrefix = SpServiceTagPrefix.DATA_SINK; } public DataSinkInvocation() { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java index 970d81a0d3..cb2d28408a 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java @@ -72,7 +72,7 @@ private List getServiceEndpoints() { private String selectService() throws NoServiceEndpointsAvailableException { List serviceEndpoints = getServiceEndpoints(); - if (serviceEndpoints.size() > 0) { + if (!serviceEndpoints.isEmpty()) { return getServiceEndpoints().get(0); } else { LOG.error("Could not find any service endpoints for appId {}, serviceTag {}", appId, diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/CoreServiceStatusManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/CoreServiceStatusManager.java new file mode 100644 index 0000000000..3d2c407ded --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/CoreServiceStatusManager.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.health; + +import org.apache.streampipes.model.configuration.SpCoreConfiguration; +import org.apache.streampipes.model.configuration.SpCoreConfigurationStatus; +import org.apache.streampipes.storage.api.ISpCoreConfigurationStorage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CoreServiceStatusManager { + + private static final Logger LOG = LoggerFactory.getLogger(CoreServiceStatusManager.class); + + private final ISpCoreConfigurationStorage storage; + + public CoreServiceStatusManager(ISpCoreConfigurationStorage storage) { + this.storage = storage; + } + + public boolean existsConfig() { + return storage.exists(); + } + + public boolean isCoreReady() { + return existsConfig() && storage.get().getServiceStatus() == SpCoreConfigurationStatus.READY; + } + + public void updateCoreStatus(SpCoreConfigurationStatus status) { + var config = storage.get(); + config.setServiceStatus(status); + storage.updateElement(config); + logService(config); + } + + private void logService(SpCoreConfiguration coreConfig) { + LOG.info( + "Core is now in {} state", + coreConfig.getServiceStatus() + ); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java index d8064055a2..66a448be02 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java @@ -45,6 +45,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import static org.apache.streampipes.manager.pipeline.PipelineManager.getPipeline; + public class PipelineHealthCheck implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(PipelineHealthCheck.class); @@ -68,7 +70,7 @@ public void checkAndRestorePipelineElements() { pipelinesStats.setRunningPipelines(runningPipelines.size()); pipelinesStats.setStoppedPipelines(pipelinesStats.getAllPipelines() - pipelinesStats.getRunningPipelines()); - if (runningPipelines.size() > 0) { + if (!runningPipelines.isEmpty()) { Map> endpointMap = generateEndpointMap(); List allRunningInstances = findRunningInstances(endpointMap.keySet()); @@ -115,15 +117,16 @@ public void checkAndRestorePipelineElements() { } }); if (shouldUpdatePipeline.get()) { - if (failedInstances.size() > 0) { - pipeline.setHealthStatus(PipelineHealthStatus.FAILURE); + var currentPipeline = getPipeline(pipeline.getPipelineId()); + if (!failedInstances.isEmpty()) { + currentPipeline.setHealthStatus(PipelineHealthStatus.FAILURE); pipelinesStats.failedIncrease(); - } else if (recoveredInstances.size() > 0) { - pipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION); + } else if (!recoveredInstances.isEmpty()) { + currentPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION); pipelinesStats.attentionRequiredIncrease(); } - pipeline.setPipelineNotifications(pipelineNotifications); - StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline); + currentPipeline.setPipelineNotifications(pipelineNotifications); + StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(currentPipeline); } }); int healthNum = pipelinesStats.getRunningPipelines() - pipelinesStats.getFailedPipelines() @@ -233,13 +236,11 @@ private List getRunningPipelines(List allPipelines) { } private List getAllPipelines() { - List allPipelines = StorageDispatcher + return StorageDispatcher .INSTANCE .getNoSqlStore() .getPipelineStorageAPI() .getAllPipelines(); - - return allPipelines; } private int getElementsCount(List allPipelines){ diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java index 71842d5925..46d2792eaa 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceHealthCheck.java @@ -21,7 +21,7 @@ import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; -import org.apache.streampipes.storage.api.CRUDStorage; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus; import org.apache.streampipes.storage.management.StorageDispatcher; import org.apache.http.HttpStatus; @@ -35,12 +35,13 @@ public class ServiceHealthCheck implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ServiceHealthCheck.class); - private static final int MAX_UNHEALTHY_DURATION_BEFORE_REMOVAL_MS = 60000; + private static final int MAX_UNHEALTHY_DURATION_BEFORE_REMOVAL_MS = 20000; - private final CRUDStorage storage; + private final ServiceRegistrationManager serviceRegistrationManager; public ServiceHealthCheck() { - this.storage = StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage(); + var storage = StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage(); + this.serviceRegistrationManager = new ServiceRegistrationManager(storage); } @Override @@ -58,9 +59,8 @@ private void checkServiceHealth(SpServiceRegistration service) { if (response.returnResponse().getStatusLine().getStatusCode() != HttpStatus.SC_OK) { processUnhealthyService(service); } else { - if (!service.isHealthy()) { - service.setHealthy(true); - updateService(service); + if (service.getStatus() == SpServiceStatus.UNHEALTHY) { + serviceRegistrationManager.applyServiceStatus(service.getSvcId(), SpServiceStatus.HEALTHY); } } } catch (IOException e) { @@ -69,15 +69,16 @@ private void checkServiceHealth(SpServiceRegistration service) { } private void processUnhealthyService(SpServiceRegistration service) { - if (service.isHealthy()) { - service.setHealthy(false); - service.setFirstTimeSeenUnhealthy(System.currentTimeMillis()); - updateService(service); + if (service.getStatus() == SpServiceStatus.HEALTHY) { + serviceRegistrationManager.applyServiceStatus( + service.getSvcId(), + SpServiceStatus.UNHEALTHY, + System.currentTimeMillis()); } if (shouldDeleteService(service)) { LOG.info("Removing service {} which has been unhealthy for more than {} seconds.", service.getSvcId(), MAX_UNHEALTHY_DURATION_BEFORE_REMOVAL_MS / 1000); - storage.deleteElement(service); + serviceRegistrationManager.removeService(service.getSvcId()); } } @@ -86,15 +87,11 @@ private boolean shouldDeleteService(SpServiceRegistration service) { return (currentTimeMillis - service.getFirstTimeSeenUnhealthy() > MAX_UNHEALTHY_DURATION_BEFORE_REMOVAL_MS); } - private void updateService(SpServiceRegistration service) { - storage.updateElement(service); - } - private String makeHealthCheckUrl(SpServiceRegistration service) { return service.getServiceUrl() + service.getHealthCheckPath(); } private List getRegisteredServices() { - return storage.getAll(); + return serviceRegistrationManager.getAllServices(); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceRegistrationManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceRegistrationManager.java new file mode 100644 index 0000000000..f29b39d11e --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/ServiceRegistrationManager.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.health; + +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus; +import org.apache.streampipes.storage.api.CRUDStorage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class ServiceRegistrationManager { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceRegistrationManager.class); + + private final CRUDStorage storage; + + public ServiceRegistrationManager(CRUDStorage storage) { + this.storage = storage; + } + + public void applyServiceStatus(String serviceId, + SpServiceStatus status, + long firstTimeSeenUnhealthy) { + var serviceRegistration = storage.getElementById(serviceId); + serviceRegistration.setFirstTimeSeenUnhealthy(firstTimeSeenUnhealthy); + applyServiceStatus(status, serviceRegistration); + } + + public void applyServiceStatus(String serviceId, + SpServiceStatus status) { + var serviceRegistration = storage.getElementById(serviceId); + applyServiceStatus(status, serviceRegistration); + } + + private void applyServiceStatus(SpServiceStatus status, + SpServiceRegistration serviceRegistration) { + serviceRegistration.setStatus(status); + storage.updateElement(serviceRegistration); + logService(serviceRegistration); + } + + public void addService(SpServiceRegistration serviceRegistration, + SpServiceStatus status) { + serviceRegistration.setStatus(status); + storage.createElement(serviceRegistration); + logService(serviceRegistration); + } + + public List getAllServices() { + return storage.getAll(); + } + + public SpServiceRegistration getService(String serviceId) { + return storage.getElementById(serviceId); + } + + public boolean isAnyServiceMigrating() { + return storage.getAll() + .stream() + .anyMatch(service -> service.getStatus() == SpServiceStatus.MIGRATING); + } + + public void removeService(String serviceId) { + var serviceRegistration = storage.getElementById(serviceId); + storage.deleteElement(serviceRegistration); + LOG.info( + "Service {} (id={}) has been removed", + serviceRegistration.getSvcGroup(), + serviceRegistration.getSvcId()) + ; + } + + public SpServiceStatus getServiceStatus(String serviceId) { + return storage.getElementById(serviceId).getStatus(); + } + + private void logService(SpServiceRegistration serviceRegistration) { + LOG.info( + "Service {} (id={}) is now in {} state", + serviceRegistration.getSvcGroup(), + serviceRegistration.getSvcId(), + serviceRegistration.getStatus() + ); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AdapterDescriptionMigration093.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AdapterDescriptionMigration093.java new file mode 100644 index 0000000000..d10d279993 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AdapterDescriptionMigration093.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.migration; + +import org.apache.streampipes.commons.exceptions.SepaParseException; +import org.apache.streampipes.manager.endpoint.HttpJsonParser; +import org.apache.streampipes.manager.operations.Operations; +import org.apache.streampipes.manager.util.AuthTokenUtils; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; +import org.apache.streampipes.storage.api.IAdapterStorage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +import static org.apache.streampipes.manager.migration.MigrationUtils.getRequestUrl; + +public class AdapterDescriptionMigration093 extends AbstractMigrationManager { + + private static final Logger LOG = LoggerFactory.getLogger(AdapterDescriptionMigration093.class); + + private final IAdapterStorage adapterDescriptionStorage; + + public AdapterDescriptionMigration093(IAdapterStorage adapterDescriptionStorage) { + this.adapterDescriptionStorage = adapterDescriptionStorage; + } + + public void reinstallAdapters(SpServiceRegistration extensionsServiceConfig) { + var migrationProvider = AdapterDescriptionMigration093Provider.INSTANCE; + if (migrationProvider.hasAppIdsToReinstall()) { + var appIdsToReinstall = migrationProvider.getAppIdsToReinstall(); + var serviceUrl = extensionsServiceConfig.getServiceUrl(); + extensionsServiceConfig.getTags() + .stream() + .filter(tag -> tag.getPrefix() == SpServiceTagPrefix.ADAPTER) + .filter(tag -> appIdsToReinstall.contains(tag.getValue())) + .forEach(tag -> { + var appId = tag.getValue(); + try { + if (adapterDescriptionStorage.getAdaptersByAppId(appId).isEmpty()) { + var requestUrl = getRequestUrl(SpServiceTagPrefix.ADAPTER, appId, serviceUrl); + var entityPayload = HttpJsonParser.getContentFromUrl(URI.create(requestUrl)); + Operations.verifyAndAddElement( + entityPayload, + AuthTokenUtils.getAuthTokenForCurrentUser(), + true); + } + } catch (IOException | SepaParseException e) { + LOG.warn("Could not reinstall adapter description {}", appId); + } + }); + } + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AdapterDescriptionMigration093Provider.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AdapterDescriptionMigration093Provider.java new file mode 100644 index 0000000000..8ad025d5c4 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AdapterDescriptionMigration093Provider.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.migration; + +import java.util.ArrayList; +import java.util.List; + +public enum AdapterDescriptionMigration093Provider { + + INSTANCE; + + private final List appIdsToReinstall; + + AdapterDescriptionMigration093Provider() { + this.appIdsToReinstall = new ArrayList<>(); + } + + public void addAppId(String appId) { + this.appIdsToReinstall.add(appId); + } + + public List getAppIdsToReinstall() { + return appIdsToReinstall; + } + + public boolean hasAppIdsToReinstall() { + return !appIdsToReinstall.isEmpty(); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java index 28338e1ac0..166d24c8e3 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.stream.Stream; import static org.apache.streampipes.manager.migration.MigrationUtils.getApplicableMigration; @@ -60,77 +61,89 @@ public PipelineElementMigrationManager(IPipelineStorage pipelineStorage, @Override public void handleMigrations(SpServiceRegistration extensionsServiceConfig, List migrationConfigs) { + if (!migrationConfigs.isEmpty()) { + LOG.info("Updating pipeline element descriptions by replacement..."); + updateDescriptions(migrationConfigs, extensionsServiceConfig.getServiceUrl()); + LOG.info("Pipeline element descriptions are up to date."); - LOG.info("Updating pipeline element descriptions by replacement..."); - updateDescriptions(migrationConfigs, extensionsServiceConfig.getServiceUrl()); - LOG.info("Pipeline element descriptions are up to date."); - - LOG.info("Received {} pipeline element migrations from extension service {}.", - migrationConfigs.size(), - extensionsServiceConfig.getServiceUrl()); - var availablePipelines = pipelineStorage.getAllPipelines(); - if (!availablePipelines.isEmpty()) { - LOG.info("Found {} available pipelines. Checking pipelines for applicable migrations...", - availablePipelines.size() - ); - } + LOG.info("Received {} pipeline element migrations from extension service {}.", + migrationConfigs.size(), + extensionsServiceConfig.getServiceUrl()); + var availablePipelines = pipelineStorage.getAllPipelines(); + if (!availablePipelines.isEmpty()) { + LOG.info("Found {} available pipelines. Checking pipelines for applicable migrations...", + availablePipelines.size() + ); + } - for (var pipeline : availablePipelines) { - List> failedMigrations = new ArrayList<>(); - - var migratedDataProcessors = pipeline.getSepas() - .stream() - .map(processor -> { - if (getApplicableMigration(processor, migrationConfigs).isPresent()) { - return migratePipelineElement( - processor, - migrationConfigs, - String.format("%s/%s/processor", - extensionsServiceConfig.getServiceUrl(), - MIGRATION_ENDPOINT - ), - failedMigrations - ); - } else { - LOG.info("No migration applicable for data processor '{}'.", processor.getElementId()); - return processor; - } - }) - .toList(); - pipeline.setSepas(migratedDataProcessors); - - var migratedDataSinks = pipeline.getActions() - .stream() - .map(sink -> { - if (getApplicableMigration(sink, migrationConfigs).isPresent()) { - return migratePipelineElement( - sink, - migrationConfigs, - String.format("%s/%s/sink", - extensionsServiceConfig.getServiceUrl(), - MIGRATION_ENDPOINT - ), - failedMigrations - ); - } else { - LOG.info("No migration applicable for data sink '{}'.", sink.getElementId()); - return sink; - } - }) - .toList(); - pipeline.setActions(migratedDataSinks); - - pipelineStorage.updatePipeline(pipeline); - - if (failedMigrations.isEmpty()) { - LOG.info("Migration for pipeline successfully completed."); - } else { - // pass most recent version of pipeline - handleFailedMigrations(pipelineStorage.getPipeline(pipeline.getPipelineId()), failedMigrations); + for (var pipeline : availablePipelines) { + if (shouldMigratePipeline(pipeline, migrationConfigs)) { + List> failedMigrations = new ArrayList<>(); + + var migratedDataProcessors = pipeline.getSepas() + .stream() + .map(processor -> { + if (getApplicableMigration(processor, migrationConfigs).isPresent()) { + return migratePipelineElement( + processor, + migrationConfigs, + String.format("%s/%s/processor", + extensionsServiceConfig.getServiceUrl(), + MIGRATION_ENDPOINT + ), + failedMigrations + ); + } else { + LOG.info("No migration applicable for data processor '{}'.", processor.getElementId()); + return processor; + } + }) + .toList(); + pipeline.setSepas(migratedDataProcessors); + + var migratedDataSinks = pipeline.getActions() + .stream() + .map(sink -> { + if (getApplicableMigration(sink, migrationConfigs).isPresent()) { + return migratePipelineElement( + sink, + migrationConfigs, + String.format("%s/%s/sink", + extensionsServiceConfig.getServiceUrl(), + MIGRATION_ENDPOINT + ), + failedMigrations + ); + } else { + LOG.info("No migration applicable for data sink '{}'.", sink.getElementId()); + return sink; + } + }) + .toList(); + pipeline.setActions(migratedDataSinks); + + pipelineStorage.updatePipeline(pipeline); + + if (failedMigrations.isEmpty()) { + LOG.info("Migration for pipeline successfully completed."); + } else { + // pass most recent version of pipeline + handleFailedMigrations(pipelineStorage.getPipeline(pipeline.getPipelineId()), failedMigrations); + } + } } + } else { + LOG.info("No pipeline element migrations to perform"); } } + private boolean shouldMigratePipeline(Pipeline pipeline, + List migrationConfigs) { + return Stream + .concat(pipeline.getSepas().stream(), pipeline.getActions().stream()) + .anyMatch(element -> getApplicableMigration(element, migrationConfigs).isPresent()); + } + /** * Takes care about the failed migrations of pipeline elements. * This includes the following steps: diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/SpCoreConfigurationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/SpCoreConfigurationStep.java index 187d677efc..b5e0a8bd3b 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/SpCoreConfigurationStep.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/SpCoreConfigurationStep.java @@ -19,14 +19,22 @@ package org.apache.streampipes.manager.setup; import org.apache.streampipes.model.configuration.DefaultSpCoreConfiguration; +import org.apache.streampipes.model.configuration.SpCoreConfigurationStatus; import org.apache.streampipes.storage.management.StorageDispatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class SpCoreConfigurationStep extends InstallationStep { + + private static final Logger LOG = LoggerFactory.getLogger(SpCoreConfigurationStep.class); + @Override public void install() { var coreCfg = new DefaultSpCoreConfiguration().make(); - + coreCfg.setServiceStatus(SpCoreConfigurationStatus.INSTALLING); StorageDispatcher.INSTANCE.getNoSqlStore().getSpCoreConfigurationStorage().createElement(coreCfg); + LOG.info("Core is now in {} state", coreCfg.getServiceStatus()); new StreamPipesEnvChecker().updateEnvironmentVariables(); } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/StreamPipesEnvChecker.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/StreamPipesEnvChecker.java index c4fc1bba08..70e5a8aa59 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/StreamPipesEnvChecker.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/StreamPipesEnvChecker.java @@ -52,7 +52,7 @@ public void updateEnvironmentVariables() { .getNoSqlStore() .getSpCoreConfigurationStorage(); - if (configStorage.getAll().size() > 0) { + if (configStorage.exists()) { this.coreConfig = configStorage.get(); LOG.info("Checking and updating environment variables..."); diff --git a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/DataSinkMigrationResource.java b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/DataSinkMigrationResource.java index 98569ffae7..6d1844743a 100644 --- a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/DataSinkMigrationResource.java +++ b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/migration/DataSinkMigrationResource.java @@ -19,7 +19,7 @@ package org.apache.streampipes.rest.extensions.migration; import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor; -import org.apache.streampipes.extensions.api.migration.DataSinkMigrator; +import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator; import org.apache.streampipes.model.extensions.migration.MigrationRequest; import org.apache.streampipes.model.graph.DataSinkInvocation; import org.apache.streampipes.rest.security.AuthConstants; @@ -42,7 +42,7 @@ public class DataSinkMigrationResource extends MigrateExtensionsResource< DataSinkInvocation, IDataSinkParameterExtractor, - DataSinkMigrator + IDataSinkMigrator > { @POST @Consumes(MediaType.APPLICATION_JSON) diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java index 13c87875fd..6b28afe0a3 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java @@ -18,9 +18,14 @@ package org.apache.streampipes.rest.impl.admin; +import org.apache.streampipes.config.backend.BackendConfig; import org.apache.streampipes.connect.management.management.AdapterMigrationManager; +import org.apache.streampipes.manager.health.CoreServiceStatusManager; +import org.apache.streampipes.manager.health.ServiceRegistrationManager; +import org.apache.streampipes.manager.migration.AdapterDescriptionMigration093; import org.apache.streampipes.manager.migration.PipelineElementMigrationManager; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; import org.apache.streampipes.model.migration.ModelMigratorConfig; import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource; @@ -36,6 +41,8 @@ import io.swagger.v3.oas.annotations.enums.ParameterIn; import io.swagger.v3.oas.annotations.responses.ApiResponse; import org.apache.http.HttpStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.stereotype.Component; @@ -53,9 +60,12 @@ @PreAuthorize(AuthConstants.IS_ADMIN_ROLE) public class MigrationResource extends AbstractAuthGuardedRestResource { + private static final Logger LOG = LoggerFactory.getLogger(MigrationResource.class); + private final CRUDStorage extensionsServiceStorage = getNoSqlStorage().getExtensionsServiceStorage(); + private final IAdapterStorage adapterDescriptionStorage = getNoSqlStorage().getAdapterDescriptionStorage(); private final IAdapterStorage adapterStorage = getNoSqlStorage().getAdapterInstanceStorage(); private final IDataProcessorStorage dataProcessorStorage = getNoSqlStorage().getDataProcessorStorage(); @@ -63,6 +73,10 @@ public class MigrationResource extends AbstractAuthGuardedRestResource { private final IDataSinkStorage dataSinkStorage = getNoSqlStorage().getDataSinkStorage(); private final IPipelineStorage pipelineStorage = getNoSqlStorage().getPipelineStorageAPI(); + private final CoreServiceStatusManager coreServiceStatusManager = new CoreServiceStatusManager( + getNoSqlStorage().getSpCoreConfigurationStorage() + ); + @POST @Path("{serviceId}") @Consumes(MediaType.APPLICATION_JSON) @@ -88,24 +102,42 @@ public Response performMigrations( ) List migrationConfigs) { - var extensionsServiceConfig = extensionsServiceStorage.getElementById(serviceId); - var adapterMigrations = filterConfigs(migrationConfigs, List.of(SpServiceTagPrefix.ADAPTER)); - var pipelineElementMigrations = filterConfigs( - migrationConfigs, - List.of(SpServiceTagPrefix.DATA_PROCESSOR, SpServiceTagPrefix.DATA_SINK) - ); - - new AdapterMigrationManager(adapterStorage).handleMigrations(extensionsServiceConfig, adapterMigrations); - new PipelineElementMigrationManager( - pipelineStorage, - dataProcessorStorage, - dataSinkStorage) - .handleMigrations(extensionsServiceConfig, pipelineElementMigrations); + var serviceManager = new ServiceRegistrationManager(extensionsServiceStorage); + var extensionsServiceConfig = serviceManager.getService(serviceId); + if (BackendConfig.INSTANCE.isConfigured()) { + new AdapterDescriptionMigration093(adapterDescriptionStorage).reinstallAdapters(extensionsServiceConfig); + if (!migrationConfigs.isEmpty()) { + if (serviceManager.isAnyServiceMigrating() || !isCoreReady()) { + LOG.info("Refusing migration request since precondition is not met."); + return Response.status(HttpStatus.SC_CONFLICT).build(); + } else { + serviceManager.applyServiceStatus(serviceId, SpServiceStatus.MIGRATING); + var adapterMigrations = filterConfigs(migrationConfigs, List.of(SpServiceTagPrefix.ADAPTER)); + var pipelineElementMigrations = filterConfigs( + migrationConfigs, + List.of(SpServiceTagPrefix.DATA_PROCESSOR, SpServiceTagPrefix.DATA_SINK) + ); + + new AdapterMigrationManager(adapterStorage).handleMigrations(extensionsServiceConfig, adapterMigrations); + new PipelineElementMigrationManager( + pipelineStorage, + dataProcessorStorage, + dataSinkStorage) + .handleMigrations(extensionsServiceConfig, pipelineElementMigrations); + } + } + } + new ServiceRegistrationManager(extensionsServiceStorage) + .applyServiceStatus(extensionsServiceConfig.getSvcId(), SpServiceStatus.HEALTHY); return ok(); } + private boolean isCoreReady() { + return coreServiceStatusManager.isCoreReady(); + } + private List filterConfigs(List migrationConfigs, - List modelTypes) { + List modelTypes) { return migrationConfigs .stream() .filter(config -> modelTypes.stream().anyMatch(modelType -> modelType == config.modelType())) diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java index f89cf34ace..d97653d866 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ServiceRegistrationResource.java @@ -18,11 +18,15 @@ package org.apache.streampipes.rest.impl.admin; +import org.apache.streampipes.manager.health.ServiceRegistrationManager; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus; import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource; import org.apache.streampipes.rest.security.AuthConstants; import org.apache.streampipes.storage.api.CRUDStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.stereotype.Component; @@ -40,6 +44,8 @@ @PreAuthorize(AuthConstants.IS_ADMIN_ROLE) public class ServiceRegistrationResource extends AbstractAuthGuardedRestResource { + private static final Logger LOG = LoggerFactory.getLogger(ServiceRegistrationResource.class); + private final CRUDStorage extensionsServiceStorage = getNoSqlStorage().getExtensionsServiceStorage(); @@ -52,7 +58,8 @@ public Response getRegisteredServices() { @POST @Consumes(MediaType.APPLICATION_JSON) public Response registerService(SpServiceRegistration serviceRegistration) { - extensionsServiceStorage.createElement(serviceRegistration); + new ServiceRegistrationManager(extensionsServiceStorage) + .addService(serviceRegistration, SpServiceStatus.REGISTERED); return ok(); } @@ -60,8 +67,7 @@ public Response registerService(SpServiceRegistration serviceRegistration) { @Path("/{serviceId}") public Response unregisterService(@PathParam("serviceId") String serviceId) { try { - var serviceRegistration = extensionsServiceStorage.getElementById(serviceId); - extensionsServiceStorage.deleteElement(serviceRegistration); + new ServiceRegistrationManager(extensionsServiceStorage).removeService(serviceId); return ok(); } catch (IllegalArgumentException e) { return badRequest("Could not find registered service with id " + serviceId); diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java index 9355c6b895..2442816746 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java @@ -44,13 +44,13 @@ public class PostStartupTask implements Runnable { private static final int MAX_PIPELINE_START_RETRIES = 3; private static final int WAIT_TIME_AFTER_FAILURE_IN_SECONDS = 10; - private final List allPipelines; + private final IPipelineStorage pipelineStorage; private final Map failedPipelines = new HashMap<>(); private final ScheduledExecutorService executorService; private final WorkerAdministrationManagement workerAdministrationManagement; - public PostStartupTask(List allPipelines) { - this.allPipelines = allPipelines; + public PostStartupTask(IPipelineStorage pipelineStorage) { + this.pipelineStorage = pipelineStorage; this.executorService = Executors.newSingleThreadScheduledExecutor(); this.workerAdministrationManagement = new WorkerAdministrationManagement(); } @@ -76,6 +76,7 @@ private void startAdapters() { } private void startAllPreviouslyStoppedPipelines() { + var allPipelines = pipelineStorage.getAllPipelines(); LOG.info("Checking for orphaned pipelines..."); List orphanedPipelines = allPipelines .stream() diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java index e03d3291d1..49b218f738 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java @@ -18,6 +18,8 @@ package org.apache.streampipes.service.core; import org.apache.streampipes.config.backend.BackendConfig; +import org.apache.streampipes.connect.management.health.AdapterHealthCheck; +import org.apache.streampipes.manager.health.CoreServiceStatusManager; import org.apache.streampipes.manager.health.PipelineHealthCheck; import org.apache.streampipes.manager.health.ServiceHealthCheck; import org.apache.streampipes.manager.monitoring.pipeline.ExtensionsServiceLogExecutor; @@ -30,6 +32,7 @@ import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory; import org.apache.streampipes.messaging.nats.SpNatsProtocolFactory; import org.apache.streampipes.messaging.pulsar.SpPulsarProtocolFactory; +import org.apache.streampipes.model.configuration.SpCoreConfigurationStatus; import org.apache.streampipes.model.pipeline.Pipeline; import org.apache.streampipes.model.pipeline.PipelineOperationStatus; import org.apache.streampipes.rest.security.SpPermissionEvaluator; @@ -37,6 +40,7 @@ import org.apache.streampipes.service.base.StreamPipesServiceBase; import org.apache.streampipes.service.core.migrations.MigrationsHandler; import org.apache.streampipes.storage.api.IPipelineStorage; +import org.apache.streampipes.storage.api.ISpCoreConfigurationStorage; import org.apache.streampipes.storage.couchdb.utils.CouchDbViewGenerator; import org.apache.streampipes.storage.management.StorageDispatcher; @@ -72,11 +76,13 @@ public class StreamPipesCoreApplication extends StreamPipesServiceBase { private static final int LOG_FETCH_INTERVAL = 60; private static final TimeUnit LOG_FETCH_UNIT = TimeUnit.SECONDS; - private static final int HEALTH_CHECK_INTERVAL = 60; + private static final int HEALTH_CHECK_INTERVAL = 30; private static final TimeUnit HEALTH_CHECK_UNIT = TimeUnit.SECONDS; - private static final int SERVICE_HEALTH_CHECK_INTERVAL = 60; - private static final TimeUnit SERVICE_HEALTH_CHECK_UNIT = TimeUnit.SECONDS; + private final ISpCoreConfigurationStorage coreConfigStorage = StorageDispatcher.INSTANCE + .getNoSqlStore().getSpCoreConfigurationStorage(); + + private final CoreServiceStatusManager coreStatusManager = new CoreServiceStatusManager(coreConfigStorage); public static void main(String[] args) { StreamPipesCoreApplication application = new StreamPipesCoreApplication(); @@ -109,9 +115,7 @@ protected void registerProtocols(SupportedProtocols protocols) { @PostConstruct public void init() { var executorService = Executors.newSingleThreadScheduledExecutor(); - var healthCheckExecutorService = Executors.newSingleThreadScheduledExecutor(); var logCheckExecutorService = Executors.newSingleThreadScheduledExecutor(); - var serviceHealthCheckExecutorService = Executors.newSingleThreadScheduledExecutor(); new StreamPipesEnvChecker().updateEnvironmentVariables(); new CouchDbViewGenerator().createGenericDatabaseIfNotExists(); @@ -119,22 +123,20 @@ public void init() { if (!isConfigured()) { doInitialSetup(); } else { + // Check needs to be present since core configuration is part of migration + if (coreConfigStorage.exists()) { + coreStatusManager.updateCoreStatus(SpCoreConfigurationStatus.MIGRATING); + } new MigrationsHandler().performMigrations(); } + coreStatusManager.updateCoreStatus(SpCoreConfigurationStatus.READY); - executorService.schedule(new PostStartupTask(getAllPipelines()), 10, TimeUnit.SECONDS); + executorService.schedule(new PostStartupTask(getPipelineStorage()), 10, TimeUnit.SECONDS); - LOG.info("Service health check will run every {} seconds", SERVICE_HEALTH_CHECK_INTERVAL); - serviceHealthCheckExecutorService.scheduleAtFixedRate(new ServiceHealthCheck(), - SERVICE_HEALTH_CHECK_INTERVAL, - SERVICE_HEALTH_CHECK_INTERVAL, - SERVICE_HEALTH_CHECK_UNIT); - - LOG.info("Pipeline health check will run every {} seconds", HEALTH_CHECK_INTERVAL); - healthCheckExecutorService.scheduleAtFixedRate(new PipelineHealthCheck(), - HEALTH_CHECK_INTERVAL, - HEALTH_CHECK_INTERVAL, - HEALTH_CHECK_UNIT); + scheduleHealthChecks(List.of( + new ServiceHealthCheck(), + new PipelineHealthCheck(), + new AdapterHealthCheck())); LOG.info("Extensions logs will be fetched every {} seconds", LOG_FETCH_INTERVAL); logCheckExecutorService.scheduleAtFixedRate(new ExtensionsServiceLogExecutor(), @@ -143,6 +145,21 @@ public void init() { LOG_FETCH_UNIT); } + private void scheduleHealthChecks(List checks) { + var healthCheckExecutorService = Executors.newSingleThreadScheduledExecutor(); + checks.forEach(check -> { + LOG.info( + "Health check {} configured to run every {} {}", + check.getClass().getCanonicalName(), + HEALTH_CHECK_INTERVAL, + HEALTH_CHECK_UNIT); + healthCheckExecutorService.scheduleAtFixedRate(check, + HEALTH_CHECK_INTERVAL, + HEALTH_CHECK_INTERVAL, + HEALTH_CHECK_UNIT); + }); + } + private boolean isConfigured() { return BackendConfig.INSTANCE.isConfigured(); } @@ -163,8 +180,6 @@ private void doInitialSetup() { } } - - @PreDestroy public void onExit() { LOG.info("Shutting down StreamPipes..."); diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterMigration.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterMigration.java index 94d2d7eecd..29fd205ab3 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterMigration.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterMigration.java @@ -18,6 +18,7 @@ package org.apache.streampipes.service.core.migrations.v093; +import org.apache.streampipes.manager.migration.AdapterDescriptionMigration093Provider; import org.apache.streampipes.model.connect.adapter.migration.MigrationHelpers; import org.apache.streampipes.model.connect.adapter.migration.utils.AdapterModels; import org.apache.streampipes.service.core.migrations.Migration; @@ -45,7 +46,7 @@ public class AdapterMigration implements Migration { private final CouchDbClient adapterInstanceClient; private final CouchDbClient adapterDescriptionClient; private final List adaptersToMigrate; - private final List adapterDescriptionsToMigrate; + private final List adapterDescriptionsToRemove; private final MigrationHelpers helpers; @@ -54,7 +55,7 @@ public AdapterMigration() { this.adapterInstanceClient = Utils.getCouchDbAdapterInstanceClient(); this.adapterDescriptionClient = Utils.getCouchDbAdapterDescriptionClient(); this.adaptersToMigrate = new ArrayList<>(); - this.adapterDescriptionsToMigrate = new ArrayList<>(); + this.adapterDescriptionsToRemove = new ArrayList<>(); this.helpers = new MigrationHelpers(); } @@ -64,9 +65,9 @@ public boolean shouldExecute() { var adapterDescriptionUri = getAllDocsUri(adapterDescriptionClient); findDocsToMigrate(adapterInstanceClient, adapterInstanceUri, adaptersToMigrate); - findDocsToMigrate(adapterDescriptionClient, adapterDescriptionUri, adapterDescriptionsToMigrate); + findDocsToMigrate(adapterDescriptionClient, adapterDescriptionUri, adapterDescriptionsToRemove); - return !adaptersToMigrate.isEmpty() || !adapterDescriptionsToMigrate.isEmpty(); + return !adaptersToMigrate.isEmpty() || !adapterDescriptionsToRemove.isEmpty(); } private void findDocsToMigrate(CouchDbClient adapterClient, @@ -89,15 +90,19 @@ private void findDocsToMigrate(CouchDbClient adapterClient, public void executeMigration() { var adapterInstanceBackupClient = Utils.getCouchDbAdapterInstanceBackupClient(); - adapterDescriptionsToMigrate.forEach(ad -> { + LOG.info("Deleting {} adapter descriptions, which will be regenerated after migration", + adapterDescriptionsToRemove.size()); + + adapterDescriptionsToRemove.forEach(ad -> { + String docId = helpers.getDocId(ad); var adapterType = ad.get("type").getAsString(); - var appId = ad.get("appId"); - if (isSetAdapter(adapterType)) { - LOG.info("Deleting adapter description data set {}", appId); - adapterDescriptionClient.remove(helpers.getDocId(ad), helpers.getRev(ad)); - } else { - LOG.info("Migrating adapter description {} to new adapter model", appId); - getAdapterMigrator(adapterType).migrate(adapterDescriptionClient, ad); + String rev = helpers.getRev(ad); + String appId = helpers.getAppId(ad); + if (!isSetAdapter(adapterType)) { + AdapterDescriptionMigration093Provider.INSTANCE.addAppId(appId); + } + if (docId != null && rev != null) { + adapterDescriptionClient.remove(docId, rev); } }); diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/ConsulConfigMigration.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/ConsulConfigMigration.java index 5da0d0bef7..53857b12ca 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/ConsulConfigMigration.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/ConsulConfigMigration.java @@ -22,6 +22,7 @@ import org.apache.streampipes.config.backend.BackendConfigKeys; import org.apache.streampipes.model.configuration.DefaultMessagingSettings; import org.apache.streampipes.model.configuration.SpCoreConfiguration; +import org.apache.streampipes.model.configuration.SpCoreConfigurationStatus; import org.apache.streampipes.service.core.migrations.Migration; import org.apache.streampipes.storage.api.ISpCoreConfigurationStorage; import org.apache.streampipes.storage.management.StorageDispatcher; @@ -71,6 +72,7 @@ public void executeMigration() { newConf.setFilesDir(currConf.getFilesDir()); newConf.setMessagingSettings(messagingSettings); + newConf.setServiceStatus(SpCoreConfigurationStatus.MIGRATING); storage.createElement(newConf); } diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java index 8170c36ad9..0455dc1d8d 100644 --- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java +++ b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java @@ -19,6 +19,7 @@ package org.apache.streampipes.svcdiscovery; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus; import org.apache.streampipes.storage.api.CRUDStorage; import org.apache.streampipes.storage.management.StorageDispatcher; import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery; @@ -62,7 +63,7 @@ public List getServiceEndpoints(String serviceGroup, .stream() .filter(service -> allFiltersSupported(service, filterByTags)) .filter(service -> !restrictToHealthy - || service.isHealthy()) + || service.getStatus() != SpServiceStatus.UNHEALTHY) .map(this::makeServiceUrl) .collect(Collectors.toList()); } diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CoreRequestSubmitter.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CoreRequestSubmitter.java new file mode 100644 index 0000000000..ab7f631d02 --- /dev/null +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CoreRequestSubmitter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.service.extensions; + +import org.apache.streampipes.commons.exceptions.SpRuntimeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +public class CoreRequestSubmitter { + + private static final Logger LOG = LoggerFactory.getLogger(CoreRequestSubmitter.class); + + private static final int RETRY_INTERVAL_SECONDS = 3; + + public void submitRepeatedRequest(Supplier request, + String successMessage, + String failureMessage) { + try { + request.get(); + LOG.info(successMessage); + } catch (SpRuntimeException e) { + LOG.warn( + failureMessage + " Trying again in {} seconds", + RETRY_INTERVAL_SECONDS + ); + try { + TimeUnit.SECONDS.sleep(RETRY_INTERVAL_SECONDS); + submitRepeatedRequest(request, successMessage, failureMessage); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } +} diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java index cbdb3992f9..24f128cc47 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java @@ -52,7 +52,13 @@ public void afterServiceRegistered(SpServiceDefinition serviceDef) { // register all migrations at StreamPipes Core var migrationConfigs = serviceDef.getMigrators().stream().map(IModelMigrator::config).toList(); - client.adminApi().registerMigrations(migrationConfigs, serviceId()); + new CoreRequestSubmitter().submitRepeatedRequest( + () -> { + client.adminApi().registerMigrations(migrationConfigs, serviceId()); + return true; + }, + "Successfully sent migration request", + "Core currently doesn't accept migration requests."); // initialize all function instances StreamPipesFunctionHandler.INSTANCE.initializeFunctions(serviceDef.getServiceGroup()); diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java index c13b1153a8..0913fd2587 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java @@ -19,7 +19,6 @@ package org.apache.streampipes.service.extensions; import org.apache.streampipes.client.StreamPipesClient; -import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.extensions.management.client.StreamPipesClientResolver; import org.apache.streampipes.extensions.management.init.DeclarersSingleton; import org.apache.streampipes.extensions.management.model.SpServiceDefinition; @@ -40,7 +39,6 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; public abstract class StreamPipesExtensionsServiceBase extends StreamPipesServiceBase { @@ -96,23 +94,17 @@ public void startExtensionsService(Class serviceClass, } private void registerService(SpServiceRegistration serviceRegistration) { - StreamPipesClient client = new StreamPipesClientResolver().makeStreamPipesClientInstance(); - try { - client.adminApi().registerService(serviceRegistration); - LOG.info("Successfully registered service at core."); - } catch (SpRuntimeException e) { - LOG.warn( - "Could not register at core at url {}. Trying again in {} seconds", - client.getConnectionConfig().getBaseUrl(), - RETRY_INTERVAL_SECONDS - ); - try { - TimeUnit.SECONDS.sleep(RETRY_INTERVAL_SECONDS); - registerService(serviceRegistration); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - } + var client = new StreamPipesClientResolver().makeStreamPipesClientInstance(); + new CoreRequestSubmitter().submitRepeatedRequest( + () -> { + client.adminApi().registerService(serviceRegistration); + return true; + }, + "Successfully registered service at core.", + String.format( + "Could not register service at core at url %s", + client.getConnectionConfig().getBaseUrl() + )); } protected List getServiceTags() { diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/ISpCoreConfigurationStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/ISpCoreConfigurationStorage.java index 829dc09ce9..2d0f8ae657 100644 --- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/ISpCoreConfigurationStorage.java +++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/ISpCoreConfigurationStorage.java @@ -24,6 +24,8 @@ public interface ISpCoreConfigurationStorage { + boolean exists(); + List getAll(); void createElement(SpCoreConfiguration element); diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/CoreConfigurationStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/CoreConfigurationStorageImpl.java index 7ff0a99c04..9b25b5ac80 100644 --- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/CoreConfigurationStorageImpl.java +++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/CoreConfigurationStorageImpl.java @@ -33,6 +33,11 @@ public CoreConfigurationStorageImpl() { super(Utils::getCouchDbGeneralConfigStorage, SpCoreConfiguration.class); } + @Override + public boolean exists() { + return !findAll().isEmpty(); + } + @Override public List getAll() { return findAll(); diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts index 47f06b81f2..85f9a230b6 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts @@ -16,10 +16,11 @@ * specific language governing permissions and limitations * under the License. */ + /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.2.1263 on 2023-10-27 10:43:45. +// Generated using typescript-generator version 3.2.1263 on 2023-10-30 22:49:29. export class NamedStreamPipesEntity { '@class': @@ -3599,12 +3600,12 @@ export class SpServiceConfiguration { export class SpServiceRegistration { firstTimeSeenUnhealthy: number; healthCheckPath: string; - healthy: boolean; host: string; port: number; rev: string; scheme: string; serviceUrl: string; + status: SpServiceStatus; svcGroup: string; svcId: string; svcType: string; @@ -3620,12 +3621,12 @@ export class SpServiceRegistration { const instance = target || new SpServiceRegistration(); instance.firstTimeSeenUnhealthy = data.firstTimeSeenUnhealthy; instance.healthCheckPath = data.healthCheckPath; - instance.healthy = data.healthy; instance.host = data.host; instance.port = data.port; instance.rev = data.rev; instance.scheme = data.scheme; instance.serviceUrl = data.serviceUrl; + instance.status = data.status; instance.svcGroup = data.svcGroup; instance.svcId = data.svcId; instance.svcType = data.svcType; @@ -4105,6 +4106,12 @@ export type SpProtocol = 'KAFKA' | 'JMS' | 'MQTT' | 'NATS' | 'PULSAR'; export type SpQueryStatus = 'OK' | 'TOO_MUCH_DATA'; +export type SpServiceStatus = + | 'REGISTERED' + | 'MIGRATING' + | 'HEALTHY' + | 'UNHEALTHY'; + export type SpServiceTagPrefix = | 'SYSTEM' | 'SP_GROUP' diff --git a/ui/src/app/configuration/extensions-service-management/registered-extensions-services/registered-extensions-services.component.html b/ui/src/app/configuration/extensions-service-management/registered-extensions-services/registered-extensions-services.component.html index 32f1534086..c0a1e74b5e 100644 --- a/ui/src/app/configuration/extensions-service-management/registered-extensions-services/registered-extensions-services.component.html +++ b/ui/src/app/configuration/extensions-service-management/registered-extensions-services/registered-extensions-services.component.html @@ -38,11 +38,14 @@ mat-cell *matCellDef="let element" > - + lens lens