diff --git a/datanode/pom.xml b/datanode/pom.xml index 7187c0ad..3c516de4 100644 --- a/datanode/pom.xml +++ b/datanode/pom.xml @@ -86,7 +86,7 @@ com.odysseusinc.arachne execution-engine-commons - 2.0.0 + 2.2-DEV-SNAPSHOT arachne-sys-settings diff --git a/datanode/src/main/java/com/odysseusinc/arachne/datanode/controller/admin/AdminController.java b/datanode/src/main/java/com/odysseusinc/arachne/datanode/controller/admin/AdminController.java index 2b5147dc..66cde3d0 100644 --- a/datanode/src/main/java/com/odysseusinc/arachne/datanode/controller/admin/AdminController.java +++ b/datanode/src/main/java/com/odysseusinc/arachne/datanode/controller/admin/AdminController.java @@ -20,15 +20,19 @@ import com.odysseusinc.arachne.datanode.dto.converters.AnalysisToSubmissionDTOConverter; import com.odysseusinc.arachne.datanode.dto.submission.SubmissionDTO; import com.odysseusinc.arachne.datanode.dto.user.UserDTO; +import com.odysseusinc.arachne.datanode.engine.EngineStatusDTO; +import com.odysseusinc.arachne.datanode.engine.EngineStatusService; import com.odysseusinc.arachne.datanode.exception.PermissionDeniedException; import com.odysseusinc.arachne.datanode.model.analysis.Analysis; import com.odysseusinc.arachne.datanode.model.user.User; import com.odysseusinc.arachne.datanode.repository.AnalysisRepository; import com.odysseusinc.arachne.datanode.service.UserService; import io.swagger.annotations.ApiOperation; +import lombok.Getter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.convert.support.GenericConversionService; import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Sort; @@ -55,11 +59,13 @@ public class AdminController extends BaseController { public static final int DEFAULT_PAGE_SIZE = 10; private final Map>> propertiesMap = new HashMap<>(); @Autowired + private EngineStatusService engine; + @Autowired private AnalysisToSubmissionDTOConverter analysisToSubmissionDTO; @Autowired - private GenericConversionService conversionService; + private GenericConversionService conversionService; @Autowired - private AnalysisRepository analysisRepository; + private AnalysisRepository analysisRepository; public AdminController(UserService userService) { super(userService); @@ -128,7 +134,9 @@ public Page list(@PageableDefault(value = DEFAULT_PAGE_SIZE, sort } else { analyses = analysisRepository.findAll(p); } - return analyses.map(analysis -> analysisToSubmissionDTO.convert(analysis)); + Page items = analyses.map(analysis -> analysisToSubmissionDTO.convert(analysis)); + EngineStatusDTO engineStatus = engine.getStatusInfo(); + return new PageWithStatus(items, engineStatus); } protected Pageable buildPageRequest(Pageable pageable) { @@ -196,4 +204,13 @@ protected void initProps() { propertiesMap.put("status", p -> p.add("journal.state")); } + @Getter + public static class PageWithStatus extends PageImpl { + private EngineStatusDTO engine; + + public PageWithStatus(Page page, EngineStatusDTO engine) { + super(page.getContent(), page.getPageable(), page.getTotalElements()); + this.engine = engine; + } + } } diff --git a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/ExecutionEngineStatus.java b/datanode/src/main/java/com/odysseusinc/arachne/datanode/engine/EngineStatusDTO.java similarity index 58% rename from datanode/src/main/java/com/odysseusinc/arachne/datanode/service/ExecutionEngineStatus.java rename to datanode/src/main/java/com/odysseusinc/arachne/datanode/engine/EngineStatusDTO.java index 47565fe5..b213a1ac 100644 --- a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/ExecutionEngineStatus.java +++ b/datanode/src/main/java/com/odysseusinc/arachne/datanode/engine/EngineStatusDTO.java @@ -1,5 +1,5 @@ /* - * Copyright 2021, 2023 Odysseus Data Services, Inc. + * Copyright 2024 Odysseus Data Services, Inc. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -12,9 +12,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.odysseusinc.arachne.datanode.service; +package com.odysseusinc.arachne.datanode.engine; -public enum ExecutionEngineStatus { - ONLINE, - OFFLINE +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.time.Instant; + +@Getter +@RequiredArgsConstructor(staticName = "of") +public class EngineStatusDTO { + private final String status; + private final Instant since; + private final Instant seenLast; + private final String error; } diff --git a/datanode/src/main/java/com/odysseusinc/arachne/datanode/engine/EngineStatusService.java b/datanode/src/main/java/com/odysseusinc/arachne/datanode/engine/EngineStatusService.java new file mode 100644 index 00000000..534a6e8e --- /dev/null +++ b/datanode/src/main/java/com/odysseusinc/arachne/datanode/engine/EngineStatusService.java @@ -0,0 +1,68 @@ +/* + * Copyright 2024 Odysseus Data Services, Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.odysseusinc.arachne.datanode.engine; + +import com.odysseusinc.arachne.datanode.util.JpaSugar; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import javax.persistence.EntityManager; +import javax.persistence.PersistenceContext; +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; + +/** + * Some statistics on the execution engine + */ +@Slf4j +@Service +public class EngineStatusService { + @PersistenceContext + private EntityManager em; + + @Transactional + public void update(Instant timestamp, String error, Runnable onChange) { + getStatus().filter(status -> + Objects.equals(status.getError(), error) + ).orElseGet(() -> { + ExecutionEngineStatus status = new ExecutionEngineStatus(); + status.setId(UUID.randomUUID()); + status.setSince(timestamp); + status.setError(error); + em.persist(status); + onChange.run(); + return status; + }).setSeenLast(timestamp); + } + + @Transactional + public EngineStatusDTO getStatusInfo() { + return getStatus().map(status -> + EngineStatusDTO.of(status.getError() == null ? "OK" : "ERROR", status.getSince(), status.getSince(), status.getError()) + ).orElseGet(() -> + EngineStatusDTO.of("UNKNOWN", null, null, null) + ); + } + + private Optional getStatus() { + return JpaSugar.select(em, ExecutionEngineStatus.class, (cb, query) -> root -> + query.orderBy(cb.desc(root.get(ExecutionEngineStatus_.since))) + ).getResultStream().findFirst(); + } + +} diff --git a/datanode/src/main/java/com/odysseusinc/arachne/datanode/engine/ExecutionEngineStatus.java b/datanode/src/main/java/com/odysseusinc/arachne/datanode/engine/ExecutionEngineStatus.java new file mode 100644 index 00000000..5304b838 --- /dev/null +++ b/datanode/src/main/java/com/odysseusinc/arachne/datanode/engine/ExecutionEngineStatus.java @@ -0,0 +1,55 @@ +/* + * Copyright 2024 Odysseus Data Services, Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.odysseusinc.arachne.datanode.engine; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Table; +import java.time.Instant; +import java.util.UUID; + +@Getter +@Setter +@Entity +@Table(name = "engine_status") +@NoArgsConstructor +public class ExecutionEngineStatus { + @Id + @Column(name = "id") + private UUID id; + + /** + * Moment when the status was observed for the first time + */ + @Column(name = "since") + private Instant since; + + /** + * Moment when the status was observed last time. + */ + @Column(name = "seen_last") + private Instant seenLast; + + /** + * Error observed. Null if the record marks interval when EE was connected without errors + */ + @Column(name = "error") + private String error; +} diff --git a/datanode/src/main/java/com/odysseusinc/arachne/datanode/engine/ExecutionEngineSyncService.java b/datanode/src/main/java/com/odysseusinc/arachne/datanode/engine/ExecutionEngineSyncService.java new file mode 100644 index 00000000..eca350d4 --- /dev/null +++ b/datanode/src/main/java/com/odysseusinc/arachne/datanode/engine/ExecutionEngineSyncService.java @@ -0,0 +1,98 @@ +/* + * Copyright 2024 Odysseus Data Services, Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.odysseusinc.arachne.datanode.engine; + +import com.odysseusinc.arachne.datanode.service.AnalysisService; +import com.odysseusinc.arachne.datanode.service.AnalysisStateService; +import com.odysseusinc.arachne.datanode.service.client.engine.ExecutionEngineClient; +import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisResultStatusDTO; +import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.EngineStatus; +import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.ExecutionOutcome; +import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.Stage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +@Slf4j +@Service +public class ExecutionEngineSyncService { + public static final String UNAVAILABLE = "UNAVAILABLE"; + + @Autowired + private ExecutionEngineClient engineClient; + @Autowired + private EngineStatusService engine; + @Autowired + private AnalysisService analysisService; + @Autowired + private AnalysisStateService analysisStateService; + + @Scheduled(fixedDelayString = "${executionEngine.status.period}") + public void checkStatus() { + Instant now = Instant.now(); + List incomplete = analysisService.getIncompleteIds(); + EngineStatus status; + try { + status = engineClient.status(incomplete); + engine.update(now, null, () -> log.info("EE status: CONNECTED")); + } catch (Exception e) { + engine.update(now, e.getMessage(), () -> log.warn("EE status: [ERROR] ({})", e.getMessage())); + return; + } + update(status, incomplete); + + } + + private void update(EngineStatus engineStatus, List incomplete) { + Map submissions = engineStatus.getSubmissions(); + incomplete.forEach(id -> { + ExecutionOutcome status = submissions.get(id); + analysisService.update(id, analysis -> { + if (status != null) { + String error = status.getError(); + String stage = status.getStage(); + if (!Objects.equals(error, analysis.getError()) || !Objects.equals(stage, analysis.getStage())) { + // TODO stdout handling. The challenge is that any modification here will interfere with normal incremental writing. + // So that normal writing will have to be changed as well. + analysis.setStage(stage); + analysisStateService.handleStateFromEE(analysis, stage, error); + } + } else { + if (analysis.getStage() == null) { + AnalysisResultStatusDTO dbStatus = analysis.getStatus(); + if (dbStatus == null) { + analysis.setStage(Stage.INITIALIZE); + analysisStateService.handleStateFromEE(analysis, Stage.INITIALIZE, UNAVAILABLE); + } else if (dbStatus == AnalysisResultStatusDTO.EXECUTED) { + analysis.setStage(Stage.COMPLETED); + } else if (dbStatus == AnalysisResultStatusDTO.FAILED) { + analysis.setStage(Stage.EXECUTE); + analysisStateService.handleStateFromEE(analysis, Stage.EXECUTE, "Failed"); + } + } else { + analysisStateService.handleStateFromEE(analysis, analysis.getStage(), UNAVAILABLE); + analysis.setError(UNAVAILABLE); + } + } + }); + }); + } +} diff --git a/datanode/src/main/java/com/odysseusinc/arachne/datanode/repository/AnalysisFileRepository.java b/datanode/src/main/java/com/odysseusinc/arachne/datanode/repository/AnalysisFileRepository.java deleted file mode 100644 index 94aa3aa5..00000000 --- a/datanode/src/main/java/com/odysseusinc/arachne/datanode/repository/AnalysisFileRepository.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2018, 2023 Odysseus Data Services, Inc. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.odysseusinc.arachne.datanode.repository; - -import com.odysseusinc.arachne.datanode.model.analysis.AnalysisFile; -import com.odysseusinc.arachne.datanode.model.analysis.AnalysisFileStatus; -import com.odysseusinc.arachne.datanode.model.analysis.AnalysisFileType; -import java.util.List; -import org.springframework.data.jpa.repository.JpaRepository; - -public interface AnalysisFileRepository extends JpaRepository { - - List findAllByAnalysisIdAndTypeAndStatus(Long analysisId, AnalysisFileType type, AnalysisFileStatus status); - List findAllByAnalysisIdAndType(Long analysisId, AnalysisFileType type); -} diff --git a/datanode/src/main/java/com/odysseusinc/arachne/datanode/repository/AnalysisRepository.java b/datanode/src/main/java/com/odysseusinc/arachne/datanode/repository/AnalysisRepository.java index e6c96cb4..9d485ab2 100644 --- a/datanode/src/main/java/com/odysseusinc/arachne/datanode/repository/AnalysisRepository.java +++ b/datanode/src/main/java/com/odysseusinc/arachne/datanode/repository/AnalysisRepository.java @@ -20,47 +20,13 @@ import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; -import org.springframework.data.repository.query.Param; -import java.util.Date; -import java.util.List; import java.util.Optional; public interface AnalysisRepository extends JpaRepository { Optional findById(Long id); - @Query(nativeQuery = true, value = - "SELECT analyses.* FROM analyses WHERE analyses.id = :id AND analyses.callback_password = :password") - Optional findOneExecuting(@Param("id") Long id, @Param("password") String password); - - @Query(nativeQuery = true, value = - "SELECT analyses.* " - + "FROM analyses " - + " JOIN analysis_state_journal AS journal ON journal.analysis_id = analyses.id " - + " JOIN (SELECT analysis_id, max(date) AS latest FROM analysis_state_journal " - + " GROUP BY analysis_id) AS FOO ON journal.date = FOO.latest AND journal.analysis_id=FOO.analysis_id " - + " WHERE journal.state = :state AND analyses.central_id IS NOT NULL") - List findAllByState(@Param("state") String state); - - @Query(nativeQuery = true, value = - "SELECT analyses.* " - + "FROM analyses " - + " JOIN analysis_state_journal AS journal ON journal.analysis_id = analyses.id " - + " JOIN (SELECT analysis_id, max(date) AS latest FROM analysis_state_journal " - + " GROUP BY analysis_id) AS FOO ON journal.date = FOO.latest AND journal.analysis_id=FOO.analysis_id " - + " WHERE journal.state NOT IN (:states)") - List findAllByNotStateIn(@Param("states") List states); - - @Query(nativeQuery = true, value = - "SELECT analyses.* " - + " FROM analyses " - + " JOIN analysis_state_journal AS journal ON journal.analysis_id = analyses.id " - + " JOIN (SELECT analysis_id, max(date) AS latest FROM analysis_state_journal " - + " GROUP BY analysis_id) AS FOO ON journal.date = FOO.latest AND journal.analysis_id=FOO.analysis_id " - + " WHERE journal.state = :state AND journal.date < :time") - List findAllExecutingMoreThan(@Param("state") String state, @Param("time") Date time); - @Query(nativeQuery = true, value = "select a.* from analyses a " + " JOIN analysis_state_journal AS journal ON journal.analysis_id = a.id " diff --git a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/AnalysisResultsService.java b/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/AnalysisResultsService.java index 813b9dbd..81256669 100644 --- a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/AnalysisResultsService.java +++ b/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/AnalysisResultsService.java @@ -21,8 +21,10 @@ import com.odysseusinc.arachne.datanode.model.analysis.Analysis; import com.odysseusinc.arachne.datanode.model.analysis.AnalysisFile; import com.odysseusinc.arachne.datanode.model.analysis.AnalysisFileType; -import com.odysseusinc.arachne.datanode.repository.AnalysisFileRepository; +import com.odysseusinc.arachne.datanode.model.analysis.AnalysisFile_; +import com.odysseusinc.arachne.datanode.model.analysis.Analysis_; import com.odysseusinc.arachne.datanode.repository.AnalysisRepository; +import com.odysseusinc.arachne.datanode.util.JpaSugar; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.Stage; import lombok.extern.slf4j.Slf4j; import net.lingala.zip4j.model.FileHeader; @@ -32,6 +34,8 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import javax.persistence.EntityManager; +import javax.persistence.PersistenceContext; import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; @@ -57,18 +61,20 @@ @Slf4j public class AnalysisResultsService { - @Autowired - private AnalysisFileRepository analysisFileRepository; @Autowired private AnalysisRepository analysisRepository; @Autowired private AnalysisStateService stateService; + @PersistenceContext + private EntityManager em; public List getAnalysisResults(Analysis analysis) { - - return analysisFileRepository.findAllByAnalysisIdAndType( - analysis.getId(), - AnalysisFileType.ANALYSYS_RESULT); + return JpaSugar.select(em, AnalysisFile.class, (cb, q) -> root -> + q.select(root).where( + cb.equal(root.get(AnalysisFile_.analysis).get(Analysis_.id), analysis.getId()), + cb.equal(root.get(AnalysisFile_.type), AnalysisFileType.ANALYSYS_RESULT) + ) + ).getResultList(); } public List getAnalysisResults(Long analysisId) { @@ -175,17 +181,15 @@ private static boolean isValidZipOrSplit(net.lingala.zip4j.ZipFile file) { public Analysis markExecuted(Long id, File resultDir, String stage, String error, String stdout) { return analysisRepository.findById(id).map(analysis -> { File[] files = resultDir.listFiles(); - List resultFiles = Stream.of(files).map(file -> + Stream.of(files).map(file -> new AnalysisFile(file.getAbsolutePath(), AnalysisFileType.ANALYSYS_RESULT, analysis) - ).collect(Collectors.toList()); - analysisFileRepository.saveAll(resultFiles); + ).forEach(em::persist); analysis.setAnalysisFolder(resultDir.getAbsolutePath()); + String resultError = Optional.ofNullable(error).orElseGet(() -> evaluateErrorStatus(stage, resultDir)); analysis.setStage(stage); - analysis.setError(Optional.ofNullable(error).orElseGet(() -> evaluateErrorStatus(stage, resultDir))); analysis.setStdout(stdout); - analysisRepository.save(analysis); - stateService.handleStateFromEE(analysis, stage, analysis.getError()); + stateService.handleStateFromEE(analysis, stage, resultError); return analysis; }).orElseGet(() -> { diff --git a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/AnalysisService.java b/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/AnalysisService.java index caed546d..324e0281 100644 --- a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/AnalysisService.java +++ b/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/AnalysisService.java @@ -30,13 +30,14 @@ import com.odysseusinc.arachne.datanode.model.analysis.AnalysisFileType; import com.odysseusinc.arachne.datanode.model.analysis.AnalysisOrigin; import com.odysseusinc.arachne.datanode.model.analysis.AnalysisState; +import com.odysseusinc.arachne.datanode.model.analysis.Analysis_; import com.odysseusinc.arachne.datanode.model.datasource.DataSource; import com.odysseusinc.arachne.datanode.model.user.User; -import com.odysseusinc.arachne.datanode.repository.AnalysisRepository; import com.odysseusinc.arachne.datanode.repository.AnalysisStateJournalRepository; import com.odysseusinc.arachne.datanode.service.impl.AnalysisPreprocessorService; import com.odysseusinc.arachne.datanode.service.impl.DataSourceServiceImpl; import com.odysseusinc.arachne.datanode.util.AnalysisUtils; +import com.odysseusinc.arachne.datanode.util.JpaSugar; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisRequestDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisRequestStatusDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisResultDTO; @@ -45,8 +46,6 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -54,6 +53,7 @@ import javax.persistence.EntityManager; import javax.persistence.PersistenceContext; +import javax.persistence.criteria.Path; import java.io.File; import java.util.Arrays; import java.util.Comparator; @@ -70,8 +70,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; -import java.util.stream.Stream; - @Slf4j @Service @@ -82,11 +80,6 @@ public class AnalysisService { ); private static final Comparator BY_STAGE_ORDER = Comparator.comparing(STAGE_ORDER::indexOf); - private static final List FINISHED_STATES = Stream.concat( - Stream.of(AnalysisState.ABORTED), - Stream.of(AnalysisState.values()).filter(AnalysisState::isTerminal) - ).map(AnalysisState::toString).collect(Collectors.toList()); - private final ExecutorService executor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES, new LinkedBlockingDeque<>()); @Autowired @@ -96,8 +89,6 @@ public class AnalysisService { @Autowired private AnalysisPreprocessorService preprocessorService; @Autowired - private AnalysisRepository analysisRepository; - @Autowired private AnalysisStateJournalRepository analysisStateJournalRepository; @Autowired private AnalysisStateService stateService; @@ -120,31 +111,9 @@ public class AnalysisService { @Value("${files.store.path}") private String filesStorePath; - @Autowired - private EnvironmentDescriptorService descriptorService; - - @PersistenceContext private EntityManager em; - @EventListener(ApplicationReadyEvent.class) - @Transactional - public void init() { - List closing = analysisRepository.findAllByState(AnalysisState.ABORTING.name()); - log.info("Found {} submissions pending to be aborted", closing.size()); - closing.forEach(analysis -> executor.submit(() -> sendToEngineCancel(analysis.getId()))); - } - - @Transactional - public Integer invalidateAllUnfinishedAnalyses(final User user) { - List unfinished = analysisRepository.findAllByNotStateIn(FINISHED_STATES); - unfinished.forEach(analysis -> { - stateService.updateState(analysis, AnalysisState.ABORTING, "Cancelled by [" + user.getTitle() + "] (mass invalidation)"); - executor.submit(() -> sendToEngineCancel(analysis.getId())); - }); - return unfinished.size(); - } - @Transactional public void cancel(Long id, User user) { Analysis analysis = find(id); @@ -170,7 +139,6 @@ public void sendToEngineCancel(Long id) { String stage = result.getStage(); String stdout = result.getStdout(); analysis.setStage(stage); - analysis.setError(error); analysis.setStdout(stdout); log.info("Analysis {} state after abort: [{}], errors: [{}] (stdout {} bytes)", id, stage, error, StringUtils.length(stdout)); stateService.handleStateFromEE(analysis, stage, error); @@ -187,7 +155,7 @@ public Long rerun( ) { Analysis original = find(id); Analysis analysis = toAnalysis(dto, user, original.getSourceFolder()); - analysisRepository.save(analysis); + em.persist(analysis); stateService.updateState(analysis, AnalysisState.CREATED, "Rerun analysis [" + id + "] by [" + user.getTitle() + "]"); log.info("Request [{}] sending to engine for DS [{}] (manual upload by [{}], reruns analysis {})", analysis.getId(), analysis.getDataSource().getId(), user.getTitle(), id @@ -220,7 +188,7 @@ public Long run( analysis.setAnalysisFiles(analysisFiles); } - analysisRepository.save(analysis); + em.persist(analysis); stateService.updateState(analysis, AnalysisState.CREATED, "Created by [" + user.getTitle() + "]"); log.info("Request [{}] sending to engine for DS [{}] (manual upload by [{}])", analysis.getId(), analysis.getDataSource().getId(), user.getTitle() @@ -254,20 +222,16 @@ public void sendToEngine(Analysis analysis) { } @Transactional - public Analysis persist(Analysis analysis) { - - Analysis exists = Objects.nonNull(analysis.getId()) ? analysisRepository.getOne(analysis.getId()) : null; - if (exists == null) { - log.debug("Analysis with id: '{}' is not exist. Saving...", analysis.getId()); - return analysisRepository.save(analysis); - } else { - return exists; - } + public void update(Long id, Consumer updates) { + Analysis analysis = em.find(Analysis.class, id); + updates.accept(analysis); } @Transactional public void updateStatus(Long id, String password, String stage, String stdoutDiff) { - Analysis analysis = analysisRepository.findOneExecuting(id, password).orElseThrow(() -> + Analysis analysis = findAnalysis(id).filter(entity -> + Objects.equals(entity.getCallbackPassword(), password) + ).orElseThrow(() -> new ValidationException("Submission [" + id + "] not found or password invalid") ); String currentStage = analysis.getStage(); @@ -276,11 +240,23 @@ public void updateStatus(Long id, String password, String stage, String stdoutDi } else { analysis.setStdout(StringUtils.join(analysis.getStdout(), stdoutDiff)); if (BY_STAGE_ORDER.compare(currentStage, stage) < 0) { + analysis.setStage(stage); stateService.handleStateFromEE(analysis, stage, null); } } } + @Transactional + public List getIncompleteIds() { + return JpaSugar.select(em, Analysis.class, (cb, q) -> root -> { + Path stage = root.get(Analysis_.stage); + return q.where( + cb.isNull(root.get(Analysis_.error)), + cb.or(stage.isNull(), stage.in(Stage.EXECUTE, Stage.INITIALIZE, Stage.ABORT)) + ); + }).getResultStream().map(Analysis::getId).collect(Collectors.toList()); + } + @Transactional public com.odysseusinc.arachne.datanode.dto.analysis.AnalysisRequestDTO get(Long id) { Analysis analysis = find(id); @@ -302,12 +278,11 @@ public String getStdout(Long id) { @Transactional public Optional findAnalysis(Long id) { - - return analysisRepository.findById(id); + return Optional.ofNullable(em.find(Analysis.class, id)); } private Analysis find(Long id) { - return analysisRepository.findById(id).orElseThrow(() -> new ValidationException("Analysis not found: " + id)); + return findAnalysis(id).orElseThrow(() -> new ValidationException("Analysis not found: " + id)); } private Analysis toAnalysis(com.odysseusinc.arachne.datanode.dto.analysis.AnalysisRequestDTO dto, User user, String sourceFolder) { @@ -368,7 +343,7 @@ public AnalysisAuthor toAuthor(User user) { } private EnvironmentDescriptor findEnvironment(Long descriptorId) { - EnvironmentDescriptor descriptor = Optional.ofNullable(descriptorService.byId(descriptorId)).orElseThrow(() -> + EnvironmentDescriptor descriptor = Optional.ofNullable(environmentService.byId(descriptorId)).orElseThrow(() -> new BadRequestException("Invalid environment id: " + descriptorId) ); if (descriptor.getTerminated() != null) { diff --git a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/AnalysisStateService.java b/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/AnalysisStateService.java index e4cc12c7..75c48ff5 100644 --- a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/AnalysisStateService.java +++ b/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/AnalysisStateService.java @@ -14,6 +14,7 @@ */ package com.odysseusinc.arachne.datanode.service; +import com.odysseusinc.arachne.datanode.engine.ExecutionEngineSyncService; import com.odysseusinc.arachne.datanode.model.analysis.Analysis; import com.odysseusinc.arachne.datanode.model.analysis.AnalysisState; import com.odysseusinc.arachne.datanode.model.analysis.AnalysisStateEntry; @@ -36,6 +37,9 @@ public class AnalysisStateService { @Transactional public void handleStateFromEE(Analysis analysis, String stage, String error) { + if (error != null) { + analysis.setError(error); + } updateState(analysis, toState(error, stage), Optional.ofNullable(error).orElse("Update from Execution Engine")); } @@ -53,7 +57,9 @@ public void updateState(Analysis analysis, AnalysisState state, String reason) { } private static AnalysisState toState(String error, String stage) { - if (Objects.equals(stage, Stage.ABORTED)) { + if (Objects.equals(error, ExecutionEngineSyncService.UNAVAILABLE)) { + return AnalysisState.DEAD; + } else if (Objects.equals(stage, Stage.ABORTED)) { return (error == null) ? AnalysisState.ABORTED : AnalysisState.ABORT_FAILURE; } else if (Objects.equals(stage, Stage.ABORT)) { return (error == null) ? AnalysisState.ABORTING : AnalysisState.ABORT_FAILURE; diff --git a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/ExecutionEngineIntegrationService.java b/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/ExecutionEngineIntegrationService.java index b811b798..f66bff0d 100644 --- a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/ExecutionEngineIntegrationService.java +++ b/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/ExecutionEngineIntegrationService.java @@ -29,5 +29,4 @@ AnalysisRequestStatusDTO sendAnalysisRequest(AnalysisRequestDTO requestDTO, AnalysisResultDTO sendCancel(Long analysisId); - ExecutionEngineStatus getExecutionEngineStatus(); } diff --git a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/client/engine/EngineClient.java b/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/client/engine/EngineClient.java deleted file mode 100644 index b8846e88..00000000 --- a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/client/engine/EngineClient.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2018, 2023 Odysseus Data Services, Inc. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.odysseusinc.arachne.datanode.service.client.engine; - -import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisRequestDTO; -import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisRequestStatusDTO; -import feign.Headers; -import feign.Param; -import feign.RequestLine; -import org.springframework.http.MediaType; -import org.springframework.web.multipart.MultipartFile; - -public interface EngineClient { - - @RequestLine("POST /api/v1/analyze") - @Headers({ - "Content-Type: " + MediaType.MULTIPART_FORM_DATA_VALUE, - "arachne-compressed: true", - "arachne-waiting-compressed-result: {compressed}", - "arachne-datasource-check: {healthCheck}" - }) - AnalysisRequestStatusDTO sendAnalysisRequest(@Param("analysisRequest") AnalysisRequestDTO analysisRequest, - @Param("file") MultipartFile file, - @Param("compressed") Boolean compressedResult, - @Param("healthCheck") Boolean healthCheck); - - @RequestLine("GET /api/v1/metrics") - @Headers({ - "Content-Type: " + MediaType.TEXT_PLAIN_VALUE - }) - String checkStatus(); -} diff --git a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/client/engine/EngineClientConfig.java b/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/client/engine/EngineClientConfig.java index 9971e13a..91a5cdd9 100644 --- a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/client/engine/EngineClientConfig.java +++ b/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/client/engine/EngineClientConfig.java @@ -15,18 +15,9 @@ package com.odysseusinc.arachne.datanode.service.client.engine; import com.odysseusinc.arachne.datanode.service.client.ArachneHttpClientBuilder; -import com.odysseusinc.arachne.datanode.service.client.FeignSpringFormEncoder; -import com.odysseusinc.arachne.datanode.util.RestUtils; -import feign.Feign; -import feign.codec.Decoder; -import feign.codec.StringDecoder; -import feign.jackson.JacksonDecoder; -import feign.slf4j.Slf4jLogger; import lombok.Getter; import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Primary; @Getter @Configuration @@ -54,26 +45,4 @@ public EngineClientConfig(ArachneHttpClientBuilder arachneHttpClientBuilder) { this.arachneHttpClientBuilder = arachneHttpClientBuilder; } - @Bean - @Primary - public EngineClient engineClient(){ - return getEngineClient(new JacksonDecoder()); - } - - @Bean("engineStatusClient") - public EngineClient engineStatusClient(){ - return getEngineClient(new StringDecoder()); - } - - private EngineClient getEngineClient(Decoder decoder) { - String url = String.format("%s://%s:%s", protocol, host, port); - return Feign.builder() - .client(arachneHttpClientBuilder.build(proxyEnabledForEngine)) - .encoder(new FeignSpringFormEncoder()) - .decoder(decoder) - .requestInterceptor(rt -> rt.header("Authorization", RestUtils.checkCredentials(token))) - .logger(new Slf4jLogger(EngineClient.class)) - .logLevel(feign.Logger.Level.FULL) - .target(EngineClient.class, url); - } } diff --git a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/client/engine/ExecutionEngineClient.java b/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/client/engine/ExecutionEngineClient.java index c9e90805..304dcefd 100644 --- a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/client/engine/ExecutionEngineClient.java +++ b/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/client/engine/ExecutionEngineClient.java @@ -20,6 +20,7 @@ import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisRequestDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisRequestStatusDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisResultDTO; +import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.EngineStatus; import com.odysseusinc.arachne.execution_engine_common.descriptor.dto.RuntimeEnvironmentDescriptorsDTO; import dev.failsafe.RetryPolicy; import dev.failsafe.okhttp.FailsafeCall; @@ -38,9 +39,11 @@ import java.io.IOException; import java.net.SocketTimeoutException; import java.time.temporal.ChronoUnit; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.function.Supplier; +import java.util.stream.Collectors; @Component @Slf4j @@ -60,6 +63,13 @@ public ExecutionEngineClient( this.objectMapper = objectMapper; } + public EngineStatus status(List ids) { + String url = buildUrl("/api/v1/status" + ids.stream().map(id -> "id=" + id).collect(Collectors.joining("&", "?", ""))); + Request request = new Request.Builder().get().url(url).build(); + Call call = httpClient.newCall(request); + return executeRequest(url, FailsafeCall.with(noRetry()).compose(call), EngineStatus.class); + } + public AnalysisResultDTO cancel(long id) { String url = buildUrl("/api/v1/abort/" + id); // Yes, this is the weird way to make okhttp send empty body @@ -116,7 +126,7 @@ private T executeRequest(String url, FailsafeCall call, Class valueType) try (Response response = call.execute()) { if (!response.isSuccessful()) { log.error("Failed to execute to [{}]. Response code: [{}]", url, response.code()); - throw new AnalysisExecutionException("Failed to get descriptors. Response code: " + response.code()); + throw new AnalysisExecutionException("Failed to request [" + url + ". Response code: " + response.code()); } ResponseBody responseBody = response.body(); if (responseBody != null && Objects.equals(responseBody.contentType(), APPLICATION_JSON)) { @@ -130,7 +140,7 @@ private T executeRequest(String url, FailsafeCall call, Class valueType) log.warn("Request to [{}] timed out: {}", url, e.getMessage()); throw new AnalysisExecutionException("Failed to request [" + url + "]: " + e.getMessage(), e); } catch (IOException e) { - log.error("Request to [{}] failed: {}", url, e.getMessage(), e); + log.debug("Request to [{}] failed: {}", url, e.getMessage(), e); throw new AnalysisExecutionException("Failed to request [" + url + "]: " + e.getMessage(), e); } } @@ -148,6 +158,10 @@ private String toJson(AnalysisRequestDTO analysisRequest) { } } + private RetryPolicy noRetry() { + return RetryPolicy.builder().withMaxAttempts(1).build(); + } + private RetryPolicy retryPolicy() { return RetryPolicy.builder() .withMaxAttempts(2) diff --git a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/impl/ExecutionEngineIntegrationServiceImpl.java b/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/impl/ExecutionEngineIntegrationServiceImpl.java index 76656fd9..0c5dad04 100644 --- a/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/impl/ExecutionEngineIntegrationServiceImpl.java +++ b/datanode/src/main/java/com/odysseusinc/arachne/datanode/service/impl/ExecutionEngineIntegrationServiceImpl.java @@ -18,19 +18,14 @@ import com.odysseusinc.arachne.datanode.exception.ArachneSystemRuntimeException; import com.odysseusinc.arachne.datanode.exception.ValidationException; import com.odysseusinc.arachne.datanode.service.ExecutionEngineIntegrationService; -import com.odysseusinc.arachne.datanode.service.ExecutionEngineStatus; -import com.odysseusinc.arachne.datanode.service.client.engine.EngineClient; import com.odysseusinc.arachne.datanode.service.client.engine.ExecutionEngineClient; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisRequestDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisRequestStatusDTO; import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisResultDTO; import com.odysseusinc.arachne.execution_engine_common.util.CommonFileUtils; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.web.client.ResourceAccessException; @@ -39,23 +34,11 @@ import java.io.UncheckedIOException; import java.nio.file.Files; -import static com.odysseusinc.arachne.datanode.service.ExecutionEngineStatus.OFFLINE; -import static com.odysseusinc.arachne.datanode.service.ExecutionEngineStatus.ONLINE; - +@Slf4j @Service public class ExecutionEngineIntegrationServiceImpl implements ExecutionEngineIntegrationService { - private static final Logger logger = LoggerFactory.getLogger(ExecutionEngineIntegrationServiceImpl.class); - @Autowired private ExecutionEngineClient engineClient; - private final EngineClient engineStatusClient; - - private volatile ExecutionEngineStatus executionEngineStatus = OFFLINE; - - @Autowired - public ExecutionEngineIntegrationServiceImpl(@Qualifier("engineStatusClient") EngineClient engineStatusClient) { - this.engineStatusClient = engineStatusClient; - } @Override public AnalysisRequestStatusDTO sendAnalysisRequest( @@ -65,7 +48,7 @@ public AnalysisRequestStatusDTO sendAnalysisRequest( try { final File archive = new File(analysisTempDir.toString(), "request.zip"); CommonFileUtils.compressAndSplit(analysisFolder, archive, null); - logger.info("Request [{}] with files for [{}], sending now", requestDTO.getId(), analysisFolder.getName()); + log.info("Request [{}] with files for [{}], sending now", requestDTO.getId(), analysisFolder.getName()); return engineClient.sendAnalysisRequest(requestDTO, archive, compressedResult, healthCheck); } catch (ResourceAccessException exception) { throw new ValidationException("Cannot establish connection to the execution engine"); @@ -85,28 +68,6 @@ private File getTempDirectory(String prefix) { } } - @Scheduled(fixedDelayString = "${executionEngine.status.period}") - public void checkStatus() { - try { - engineStatusClient.checkStatus(); - - if (OFFLINE.equals(this.executionEngineStatus)) { - logger.info("Execution engine is online"); - } - executionEngineStatus = ONLINE; - } catch (Exception e) { - if (ONLINE.equals(this.executionEngineStatus)) { - logger.info("Execution engine is offline"); - } - executionEngineStatus = OFFLINE; - } - } - - @Override - public ExecutionEngineStatus getExecutionEngineStatus() { - return this.executionEngineStatus; - } - @Override public AnalysisResultDTO sendCancel(Long analysisId) { return engineClient.cancel(analysisId); diff --git a/datanode/src/main/resources/db/migration/V20240719000000__ee_connection_log.sql b/datanode/src/main/resources/db/migration/V20240719000000__ee_connection_log.sql new file mode 100644 index 00000000..24b237ce --- /dev/null +++ b/datanode/src/main/resources/db/migration/V20240719000000__ee_connection_log.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS engine_status ( + id UUID PRIMARY KEY, + since TIMESTAMP WITHOUT TIME ZONE, + seen_last TIMESTAMP WITHOUT TIME ZONE, + error VARCHAR +);