-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
#32 Provide endpoint for engine status and update analysis status whe…
…n disconnected engine reconnects
- Loading branch information
dmitrys
committed
Jul 25, 2024
1 parent
f3ef5f0
commit b70be7e
Showing
17 changed files
with
331 additions
and
256 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
68 changes: 68 additions & 0 deletions
68
datanode/src/main/java/com/odysseusinc/arachne/datanode/engine/EngineStatusService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* | ||
* Copyright 2024 Odysseus Data Services, Inc. | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.odysseusinc.arachne.datanode.engine; | ||
|
||
import com.odysseusinc.arachne.datanode.util.JpaSugar; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.springframework.stereotype.Service; | ||
import org.springframework.transaction.annotation.Transactional; | ||
|
||
import javax.persistence.EntityManager; | ||
import javax.persistence.PersistenceContext; | ||
import java.time.Instant; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.UUID; | ||
|
||
/** | ||
* Some statistics on the execution engine | ||
*/ | ||
@Slf4j | ||
@Service | ||
public class EngineStatusService { | ||
@PersistenceContext | ||
private EntityManager em; | ||
|
||
@Transactional | ||
public void update(Instant timestamp, String error, Runnable onChange) { | ||
getStatus().filter(status -> | ||
Objects.equals(status.getError(), error) | ||
).orElseGet(() -> { | ||
ExecutionEngineStatus status = new ExecutionEngineStatus(); | ||
status.setId(UUID.randomUUID()); | ||
status.setSince(timestamp); | ||
status.setError(error); | ||
em.persist(status); | ||
onChange.run(); | ||
return status; | ||
}).setSeenLast(timestamp); | ||
} | ||
|
||
@Transactional | ||
public EngineStatusDTO getStatusInfo() { | ||
return getStatus().map(status -> | ||
EngineStatusDTO.of(status.getError() == null ? "OK" : "ERROR", status.getSince(), status.getSince(), status.getError()) | ||
).orElseGet(() -> | ||
EngineStatusDTO.of("UNKNOWN", null, null, null) | ||
); | ||
} | ||
|
||
private Optional<ExecutionEngineStatus> getStatus() { | ||
return JpaSugar.select(em, ExecutionEngineStatus.class, (cb, query) -> root -> | ||
query.orderBy(cb.desc(root.get(ExecutionEngineStatus_.since))) | ||
).getResultStream().findFirst(); | ||
} | ||
|
||
} |
55 changes: 55 additions & 0 deletions
55
datanode/src/main/java/com/odysseusinc/arachne/datanode/engine/ExecutionEngineStatus.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/* | ||
* Copyright 2024 Odysseus Data Services, Inc. | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.odysseusinc.arachne.datanode.engine; | ||
|
||
import lombok.Getter; | ||
import lombok.NoArgsConstructor; | ||
import lombok.Setter; | ||
|
||
import javax.persistence.Column; | ||
import javax.persistence.Entity; | ||
import javax.persistence.Id; | ||
import javax.persistence.Table; | ||
import java.time.Instant; | ||
import java.util.UUID; | ||
|
||
@Getter | ||
@Setter | ||
@Entity | ||
@Table(name = "engine_status") | ||
@NoArgsConstructor | ||
public class ExecutionEngineStatus { | ||
@Id | ||
@Column(name = "id") | ||
private UUID id; | ||
|
||
/** | ||
* Moment when the status was observed for the first time | ||
*/ | ||
@Column(name = "since") | ||
private Instant since; | ||
|
||
/** | ||
* Moment when the status was observed last time. | ||
*/ | ||
@Column(name = "seen_last") | ||
private Instant seenLast; | ||
|
||
/** | ||
* Error observed. Null if the record marks interval when EE was connected without errors | ||
*/ | ||
@Column(name = "error") | ||
private String error; | ||
} |
98 changes: 98 additions & 0 deletions
98
...ode/src/main/java/com/odysseusinc/arachne/datanode/engine/ExecutionEngineSyncService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
/* | ||
* Copyright 2024 Odysseus Data Services, Inc. | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.odysseusinc.arachne.datanode.engine; | ||
|
||
import com.odysseusinc.arachne.datanode.service.AnalysisService; | ||
import com.odysseusinc.arachne.datanode.service.AnalysisStateService; | ||
import com.odysseusinc.arachne.datanode.service.client.engine.ExecutionEngineClient; | ||
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisResultStatusDTO; | ||
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.EngineStatus; | ||
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.ExecutionOutcome; | ||
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.Stage; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.scheduling.annotation.Scheduled; | ||
import org.springframework.stereotype.Service; | ||
|
||
import java.time.Instant; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
@Slf4j | ||
@Service | ||
public class ExecutionEngineSyncService { | ||
public static final String UNAVAILABLE = "UNAVAILABLE"; | ||
|
||
@Autowired | ||
private ExecutionEngineClient engineClient; | ||
@Autowired | ||
private EngineStatusService engine; | ||
@Autowired | ||
private AnalysisService analysisService; | ||
@Autowired | ||
private AnalysisStateService analysisStateService; | ||
|
||
@Scheduled(fixedDelayString = "${executionEngine.status.period}") | ||
public void checkStatus() { | ||
Instant now = Instant.now(); | ||
List<Long> incomplete = analysisService.getIncompleteIds(); | ||
EngineStatus status; | ||
try { | ||
status = engineClient.status(incomplete); | ||
engine.update(now, null, () -> log.info("EE status: CONNECTED")); | ||
} catch (Exception e) { | ||
engine.update(now, e.getMessage(), () -> log.warn("EE status: [ERROR] ({})", e.getMessage())); | ||
return; | ||
} | ||
update(status, incomplete); | ||
|
||
} | ||
|
||
private void update(EngineStatus engineStatus, List<Long> incomplete) { | ||
Map<Long, ExecutionOutcome> submissions = engineStatus.getSubmissions(); | ||
incomplete.forEach(id -> { | ||
ExecutionOutcome status = submissions.get(id); | ||
analysisService.update(id, analysis -> { | ||
if (status != null) { | ||
String error = status.getError(); | ||
String stage = status.getStage(); | ||
if (!Objects.equals(error, analysis.getError()) || !Objects.equals(stage, analysis.getStage())) { | ||
// TODO stdout handling. The challenge is that any modification here will interfere with normal incremental writing. | ||
// So that normal writing will have to be changed as well. | ||
analysis.setStage(stage); | ||
analysisStateService.handleStateFromEE(analysis, stage, error); | ||
} | ||
} else { | ||
if (analysis.getStage() == null) { | ||
AnalysisResultStatusDTO dbStatus = analysis.getStatus(); | ||
if (dbStatus == null) { | ||
analysis.setStage(Stage.INITIALIZE); | ||
analysisStateService.handleStateFromEE(analysis, Stage.INITIALIZE, UNAVAILABLE); | ||
} else if (dbStatus == AnalysisResultStatusDTO.EXECUTED) { | ||
analysis.setStage(Stage.COMPLETED); | ||
} else if (dbStatus == AnalysisResultStatusDTO.FAILED) { | ||
analysis.setStage(Stage.EXECUTE); | ||
analysisStateService.handleStateFromEE(analysis, Stage.EXECUTE, "Failed"); | ||
} | ||
} else { | ||
analysisStateService.handleStateFromEE(analysis, analysis.getStage(), UNAVAILABLE); | ||
analysis.setError(UNAVAILABLE); | ||
} | ||
} | ||
}); | ||
}); | ||
} | ||
} |
28 changes: 0 additions & 28 deletions
28
...ode/src/main/java/com/odysseusinc/arachne/datanode/repository/AnalysisFileRepository.java
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.