Skip to content

Commit

Permalink
#32 Provide endpoint for engine status and update analysis status whe…
Browse files Browse the repository at this point in the history
…n disconnected engine reconnects
  • Loading branch information
dmitrys committed Jul 25, 2024
1 parent f3ef5f0 commit e0f908c
Show file tree
Hide file tree
Showing 17 changed files with 331 additions and 256 deletions.
2 changes: 1 addition & 1 deletion datanode/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
<dependency>
<groupId>com.odysseusinc.arachne</groupId>
<artifactId>execution-engine-commons</artifactId>
<version>2.0.0</version>
<version>2.2-DEV-SNAPSHOT</version>
</dependency>
<dependency>
<artifactId>arachne-sys-settings</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@
import com.odysseusinc.arachne.datanode.dto.converters.AnalysisToSubmissionDTOConverter;
import com.odysseusinc.arachne.datanode.dto.submission.SubmissionDTO;
import com.odysseusinc.arachne.datanode.dto.user.UserDTO;
import com.odysseusinc.arachne.datanode.engine.EngineStatusDTO;
import com.odysseusinc.arachne.datanode.engine.EngineStatusService;
import com.odysseusinc.arachne.datanode.exception.PermissionDeniedException;
import com.odysseusinc.arachne.datanode.model.analysis.Analysis;
import com.odysseusinc.arachne.datanode.model.user.User;
import com.odysseusinc.arachne.datanode.repository.AnalysisRepository;
import com.odysseusinc.arachne.datanode.service.UserService;
import io.swagger.annotations.ApiOperation;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.convert.support.GenericConversionService;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
Expand All @@ -55,11 +59,13 @@ public class AdminController extends BaseController {
public static final int DEFAULT_PAGE_SIZE = 10;
private final Map<String, Consumer<List<String>>> propertiesMap = new HashMap<>();
@Autowired
private EngineStatusService engine;
@Autowired
private AnalysisToSubmissionDTOConverter analysisToSubmissionDTO;
@Autowired
private GenericConversionService conversionService;
private GenericConversionService conversionService;
@Autowired
private AnalysisRepository analysisRepository;
private AnalysisRepository analysisRepository;

public AdminController(UserService userService) {
super(userService);
Expand Down Expand Up @@ -128,7 +134,9 @@ public Page<SubmissionDTO> list(@PageableDefault(value = DEFAULT_PAGE_SIZE, sort
} else {
analyses = analysisRepository.findAll(p);
}
return analyses.map(analysis -> analysisToSubmissionDTO.convert(analysis));
Page<SubmissionDTO> items = analyses.map(analysis -> analysisToSubmissionDTO.convert(analysis));
EngineStatusDTO engineStatus = engine.getStatusInfo();
return new PageWithStatus(items, engineStatus);
}

protected Pageable buildPageRequest(Pageable pageable) {
Expand Down Expand Up @@ -196,4 +204,13 @@ protected void initProps() {
propertiesMap.put("status", p -> p.add("journal.state"));
}

@Getter
public static class PageWithStatus extends PageImpl<SubmissionDTO> {
private EngineStatusDTO engine;

public PageWithStatus(Page<SubmissionDTO> page, EngineStatusDTO engine) {
super(page.getContent(), page.getPageable(), page.getTotalElements());
this.engine = engine;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021, 2023 Odysseus Data Services, Inc.
* Copyright 2024 Odysseus Data Services, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -12,9 +12,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.odysseusinc.arachne.datanode.service;
package com.odysseusinc.arachne.datanode.engine;

public enum ExecutionEngineStatus {
ONLINE,
OFFLINE
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.time.Instant;

@Getter
@RequiredArgsConstructor(staticName = "of")
public class EngineStatusDTO {
private final String status;
private final Instant since;
private final Instant seenLast;
private final String error;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2024 Odysseus Data Services, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.odysseusinc.arachne.datanode.engine;

import com.odysseusinc.arachne.datanode.util.JpaSugar;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

/**
* Some statistics on the execution engine
*/
@Slf4j
@Service
public class EngineStatusService {
@PersistenceContext
private EntityManager em;

@Transactional
public void update(Instant timestamp, String error, Runnable onChange) {
getStatus().filter(status ->
Objects.equals(status.getError(), error)
).orElseGet(() -> {
ExecutionEngineStatus status = new ExecutionEngineStatus();
status.setId(UUID.randomUUID());
status.setSince(timestamp);
status.setError(error);
em.persist(status);
onChange.run();
return status;
}).setSeenLast(timestamp);
}

@Transactional
public EngineStatusDTO getStatusInfo() {
return getStatus().map(status ->
EngineStatusDTO.of(status.getError() == null ? "OK" : "ERROR", status.getSince(), status.getSince(), status.getError())
).orElseGet(() ->
EngineStatusDTO.of("UNKNOWN", null, null, null)
);
}

private Optional<ExecutionEngineStatus> getStatus() {
return JpaSugar.select(em, ExecutionEngineStatus.class, (cb, query) -> root ->
query.orderBy(cb.desc(root.get(ExecutionEngineStatus_.since)))
).getResultStream().findFirst();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2024 Odysseus Data Services, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.odysseusinc.arachne.datanode.engine;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import java.time.Instant;
import java.util.UUID;

@Getter
@Setter
@Entity
@Table(name = "engine_status")
@NoArgsConstructor
public class ExecutionEngineStatus {
@Id
@Column(name = "id")
private UUID id;

/**
* Moment when the status was observed for the first time
*/
@Column(name = "since")
private Instant since;

/**
* Moment when the status was observed last time.
*/
@Column(name = "seen_last")
private Instant seenLast;

/**
* Error observed. Null if the record marks interval when EE was connected without errors
*/
@Column(name = "error")
private String error;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2024 Odysseus Data Services, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.odysseusinc.arachne.datanode.engine;

import com.odysseusinc.arachne.datanode.service.AnalysisService;
import com.odysseusinc.arachne.datanode.service.AnalysisStateService;
import com.odysseusinc.arachne.datanode.service.client.engine.ExecutionEngineClient;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.AnalysisResultStatusDTO;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.EngineStatus;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.ExecutionOutcome;
import com.odysseusinc.arachne.execution_engine_common.api.v1.dto.Stage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@Slf4j
@Service
public class ExecutionEngineSyncService {
public static final String UNAVAILABLE = "UNAVAILABLE";

@Autowired
private ExecutionEngineClient engineClient;
@Autowired
private EngineStatusService engine;
@Autowired
private AnalysisService analysisService;
@Autowired
private AnalysisStateService analysisStateService;

@Scheduled(fixedDelayString = "${executionEngine.status.period}")
public void checkStatus() {
Instant now = Instant.now();
List<Long> incomplete = analysisService.getIncompleteIds();
EngineStatus status;
try {
status = engineClient.status(incomplete);
engine.update(now, null, () -> log.info("EE status: CONNECTED"));
} catch (Exception e) {
engine.update(now, e.getMessage(), () -> log.warn("EE status: [ERROR] ({})", e.getMessage()));
return;
}
update(status, incomplete);

}

private void update(EngineStatus engineStatus, List<Long> incomplete) {
Map<Long, ExecutionOutcome> submissions = engineStatus.getSubmissions();
incomplete.forEach(id -> {
ExecutionOutcome status = submissions.get(id);
analysisService.update(id, analysis -> {
if (status != null) {
String error = status.getError();
String stage = status.getStage();
if (!Objects.equals(error, analysis.getError()) || !Objects.equals(stage, analysis.getStage())) {
// TODO stdout handling. The challenge is that any modification here will interfere with normal incremental writing.
// So that normal writing will have to be changed as well.
analysis.setStage(stage);
analysisStateService.handleStateFromEE(analysis, stage, error);
}
} else {
if (analysis.getStage() == null) {
AnalysisResultStatusDTO dbStatus = analysis.getStatus();
if (dbStatus == null) {
analysis.setStage(Stage.INITIALIZE);
analysisStateService.handleStateFromEE(analysis, Stage.INITIALIZE, UNAVAILABLE);
} else if (dbStatus == AnalysisResultStatusDTO.EXECUTED) {
analysis.setStage(Stage.COMPLETED);
} else if (dbStatus == AnalysisResultStatusDTO.FAILED) {
analysis.setStage(Stage.EXECUTE);
analysisStateService.handleStateFromEE(analysis, Stage.EXECUTE, "Failed");
}
} else {
analysisStateService.handleStateFromEE(analysis, analysis.getStage(), UNAVAILABLE);
analysis.setError(UNAVAILABLE);
}
}
});
});
}
}

This file was deleted.

Loading

0 comments on commit e0f908c

Please sign in to comment.