Skip to content

Commit

Permalink
Merge pull request #688 from iExecBlockchainComputing/release/8.4.1
Browse files Browse the repository at this point in the history
Release/8.4.1
  • Loading branch information
jbern0rd authored Apr 3, 2024
2 parents 7c93c40 + 3dce7c7 commit 2935a0c
Show file tree
Hide file tree
Showing 50 changed files with 2,311 additions and 2,214 deletions.
36 changes: 33 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,36 @@

All notable changes to this project will be documented in this file.

## [[8.4.1]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.4.1) 2024-04-03

### New Features

- Add `ConsensusReachedTaskDetector` to detect missed `TaskConsensus` on-chain events. (#683 #684)
- Generate enclave challenge with `Authorization` header after on-chain task has been initialized. (#686)

### Bug Fixes

- Keep a single `updateReplicateStatus` method in `ReplicatesService`. (#670)
- Check result has been uploaded for TEE tasks. (#672)
- Check for consensus early if a worker has already `CONTRIBUTED` when the task is updated to `RUNNING`. (#673)
- Always provide a `WorkerpoolAuthorization` to a worker during its recovery. (#674)
- Move task metrics from `TaskUpdateManager` to `TaskService`. (#676)
- Fail fast when tasks are detected past their contribution or final deadline. (#677)
- Mitigate potential race conditions by enforcing `currentStatus` value when updating a task. (#681)
- Use semaphores in `TaskUpdateRequestManager` to avoid blocking task update threads. (#685)

### Quality

- Prepare migration to `java.time` package by building `Date` objects from `Instant` objects. (#671)
- Add logs for better traceability. (#675)
- Remove code only used in tests from `TaskService` and `Task`. (#678 #679)
- Implement each task status transition in a single method. (#680)
- Execute `TaskUpdateManager` tests on a running MongoDB container. (#682)

### Dependency Upgrades

- Upgrade to `iexec-sms-library` 8.5.1. (#687)

## [[8.4.0]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.4.0) 2024-02-29

### New Features
Expand All @@ -21,9 +51,9 @@ All notable changes to this project will be documented in this file.
### Dependency Upgrades

- Upgrade to `iexec-common` 8.4.0. (#666)
- Upgrade to `iexec-blockchain-adapter` 8.4.0. (#667)
- Upgrade to `iexec-result-proxy` 8.4.0. (#667)
- Upgrade to `iexec-sms` 8.5.0. (#667)
- Upgrade to `iexec-blockchain-adapter-library` 8.4.0. (#667)
- Upgrade to `iexec-result-proxy-library` 8.4.0. (#667)
- Upgrade to `iexec-sms-library` 8.5.0. (#667)

## [[8.3.0]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.3.0) 2024-01-11

Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
version=8.4.0
version=8.4.1
iexecCommonVersion=8.4.0
iexecCommonsPocoVersion=3.2.0
iexecBlockchainAdapterVersion=8.4.0
iexecResultVersion=8.4.0
iexecSmsVersion=8.5.0
iexecSmsVersion=8.5.1

nexusUser
nexusPassword
34 changes: 17 additions & 17 deletions src/main/java/com/iexec/core/chain/IexecHubService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@
import org.web3j.protocol.core.methods.response.TransactionReceipt;

import java.math.BigInteger;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.Optional;
Expand All @@ -38,7 +39,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

import static com.iexec.common.utils.DateTimeUtils.now;
import static com.iexec.commons.poco.chain.ChainContributionStatus.CONTRIBUTED;
import static com.iexec.commons.poco.chain.ChainContributionStatus.REVEALED;
import static com.iexec.commons.poco.contract.generated.IexecHubContract.*;
Expand Down Expand Up @@ -160,18 +160,17 @@ public Date getChainDealFinalDeadline(ChainDeal chainDeal) {
}

public boolean canFinalize(String chainTaskId) {
Optional<ChainTask> optional = getChainTask(chainTaskId);
if (optional.isEmpty()) {
final ChainTask chainTask = getChainTask(chainTaskId).orElse(null);
if (chainTask == null) {
return false;
}
ChainTask chainTask = optional.get();

boolean isChainTaskStatusRevealing = chainTask.getStatus().equals(ChainTaskStatus.REVEALING);
boolean isFinalDeadlineInFuture = now() < chainTask.getFinalDeadline();
boolean hasEnoughRevealors = (chainTask.getRevealCounter() == chainTask.getWinnerCounter())
|| (chainTask.getRevealCounter() > 0 && chainTask.getRevealDeadline() <= now());
final boolean isChainTaskStatusRevealing = chainTask.getStatus() == ChainTaskStatus.REVEALING;
final boolean isFinalDeadlineInFuture = Instant.now().toEpochMilli() < chainTask.getFinalDeadline();
final boolean hasEnoughRevealors = chainTask.getRevealCounter() == chainTask.getWinnerCounter()
|| (chainTask.getRevealCounter() > 0 && chainTask.getRevealDeadline() <= Instant.now().toEpochMilli());
final boolean ret = isChainTaskStatusRevealing && isFinalDeadlineInFuture && hasEnoughRevealors;

boolean ret = isChainTaskStatusRevealing && isFinalDeadlineInFuture && hasEnoughRevealors;
if (ret) {
log.info("Finalizable onchain [chainTaskId:{}]", chainTaskId);
} else {
Expand All @@ -183,15 +182,14 @@ public boolean canFinalize(String chainTaskId) {
}

public boolean canReopen(String chainTaskId) {
Optional<ChainTask> optional = getChainTask(chainTaskId);
if (optional.isEmpty()) {
final ChainTask chainTask = getChainTask(chainTaskId).orElse(null);
if (chainTask == null) {
return false;
}
ChainTask chainTask = optional.get();

boolean isChainTaskStatusRevealing = chainTask.getStatus().equals(ChainTaskStatus.REVEALING);
boolean isBeforeFinalDeadline = now() < chainTask.getFinalDeadline();
boolean isAfterRevealDeadline = chainTask.getRevealDeadline() <= now();
boolean isChainTaskStatusRevealing = chainTask.getStatus() == ChainTaskStatus.REVEALING;
boolean isBeforeFinalDeadline = Instant.now().toEpochMilli() < chainTask.getFinalDeadline();
boolean isAfterRevealDeadline = chainTask.getRevealDeadline() <= Instant.now().toEpochMilli();
boolean revealCounterEqualsZero = chainTask.getRevealCounter() == 0;

boolean check = isChainTaskStatusRevealing && isBeforeFinalDeadline && isAfterRevealDeadline
Expand Down Expand Up @@ -247,7 +245,9 @@ Flowable<IexecHubContract.SchedulerNoticeEventResponse> getDealEventObservable(E
}

public boolean hasEnoughGas() {
return hasEnoughGas(credentialsService.getCredentials().getAddress());
final boolean hasEnoughGas = hasEnoughGas(credentialsService.getCredentials().getAddress());
log.debug("Gas status [hasEnoughGas:{}]", hasEnoughGas);
return hasEnoughGas;
}

private ChainReceipt buildChainReceipt(TransactionReceipt receipt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public class CronConfiguration {
@Value("${cron.detector.chain.contribute}")
private int contribute;

@Value("${cron.detector.chain.consensus-reached}")
private int consensusReached;

@Value("${cron.detector.chain.reveal}")
private int reveal;

Expand All @@ -42,5 +45,5 @@ public class CronConfiguration {
private int revealTimeout;

@Value("${cron.detector.timeout.result-upload}")
private int resultUploadTimeout;
private int resultUploadTimeout;
}
5 changes: 3 additions & 2 deletions src/main/java/com/iexec/core/detector/WorkerLostDetector.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import com.google.common.collect.ImmutableSet;
import com.iexec.common.replicate.ReplicateStatus;
import com.iexec.common.replicate.ReplicateStatusUpdate;
import com.iexec.core.replicate.ReplicatesService;
import com.iexec.core.task.TaskService;
import com.iexec.core.worker.Worker;
Expand Down Expand Up @@ -69,7 +70,7 @@ public void detect() {
replicatesService.updateReplicateStatus(
chainTaskId,
workerWallet,
WORKER_LOST
ReplicateStatusUpdate.poolManagerRequest(WORKER_LOST)
);
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package com.iexec.core.detector.replicate;

import com.iexec.common.replicate.ReplicateStatusUpdate;
import com.iexec.core.detector.Detector;
import com.iexec.core.replicate.Replicate;
import com.iexec.core.replicate.ReplicatesService;
Expand All @@ -27,11 +28,10 @@
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.Optional;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

import static com.iexec.common.replicate.ReplicateStatus.RESULT_UPLOAD_FAILED;
import static com.iexec.common.utils.DateTimeUtils.addMinutesToDate;

@Slf4j
@Service
Expand Down Expand Up @@ -62,14 +62,13 @@ public void detect() {
String chainTaskId = task.getChainTaskId();
String uploadingWallet = task.getUploadingWorkerWalletAddress();

Optional<Replicate> oUploadingReplicate = replicatesService.getReplicate(chainTaskId, uploadingWallet);
if (oUploadingReplicate.isEmpty()) {
final Replicate uploadingReplicate = replicatesService.getReplicate(chainTaskId, uploadingWallet).orElse(null);
if (uploadingReplicate == null) {
return;
}

Replicate uploadingReplicate = oUploadingReplicate.get();

boolean startedUploadLongAgo = new Date().after(addMinutesToDate(task.getLatestStatusChange().getDate(), 2));
boolean startedUploadLongAgo = Instant.now().isAfter(
Instant.ofEpochMilli(task.getLatestStatusChange().getDate().getTime()).plus(2L, ChronoUnit.MINUTES));
boolean hasReplicateAlreadyFailedToUpload = uploadingReplicate.containsStatus(RESULT_UPLOAD_FAILED);

if (!startedUploadLongAgo) {
Expand All @@ -84,8 +83,8 @@ public void detect() {
log.info("detected replicate with resultUploadTimeout [chainTaskId:{}, replicate:{}, currentStatus:{}]",
chainTaskId, uploadingReplicate.getWalletAddress(), uploadingReplicate.getCurrentStatus());

replicatesService.updateReplicateStatus(chainTaskId, uploadingReplicate.getWalletAddress(),
RESULT_UPLOAD_FAILED);
replicatesService.updateReplicateStatus(
chainTaskId, uploadingReplicate.getWalletAddress(), ReplicateStatusUpdate.poolManagerRequest(RESULT_UPLOAD_FAILED));
taskUpdateRequestManager.publishRequest(task.getChainTaskId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.iexec.common.replicate.ReplicateStatus;
import com.iexec.common.replicate.ReplicateStatusDetails;
import com.iexec.common.replicate.ReplicateStatusUpdate;
import com.iexec.commons.poco.chain.ChainContributionStatus;
import com.iexec.commons.poco.chain.ChainReceipt;
import com.iexec.core.chain.IexecHubService;
Expand Down Expand Up @@ -185,7 +186,8 @@ private void updateReplicateStatuses(Task task, Replicate replicate) {
// by default, no need to retrieve anything
break;
}
replicatesService.updateReplicateStatus(chainTaskId, wallet, statusToUpdate, details);
final ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.poolManagerRequest(statusToUpdate, details);
replicatesService.updateReplicateStatus(chainTaskId, wallet, statusUpdate);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.iexec.core.detector.task;

import com.iexec.core.chain.IexecHubService;
import com.iexec.core.detector.Detector;
import com.iexec.core.task.Task;
import com.iexec.core.task.TaskService;
import com.iexec.core.task.TaskStatus;
import com.iexec.core.task.update.TaskUpdateRequestManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import static com.iexec.commons.poco.chain.ChainTaskStatus.REVEALING;
import static com.iexec.core.task.TaskStatus.RUNNING;

@Slf4j
@Service
public class ConsensusReachedTaskDetector implements Detector {

private final IexecHubService iexecHubService;
private final TaskService taskService;
private final TaskUpdateRequestManager taskUpdateRequestManager;

public ConsensusReachedTaskDetector(IexecHubService iexecHubService,
TaskService taskService,
TaskUpdateRequestManager taskUpdateRequestManager) {
this.iexecHubService = iexecHubService;
this.taskService = taskService;
this.taskUpdateRequestManager = taskUpdateRequestManager;
}

@Scheduled(fixedRateString = "#{@cronConfiguration.getConsensusReached()}")
@Override
public void detect() {
log.debug("Trying to detect running tasks with on-chain consensus");
taskService.findByCurrentStatus(RUNNING).stream()
.filter(this::isConsensusReached)
.forEach(this::publishTaskUpdateRequest);
}

private boolean isConsensusReached(Task task) {
return iexecHubService.getChainTask(task.getChainTaskId()).stream()
.allMatch(chainTask -> chainTask.getStatus() == REVEALING);
}

private void publishTaskUpdateRequest(Task task) {
log.info("Detected confirmed missing update (task) [is:{}, should:{}, taskId:{}]",
task.getCurrentStatus(), TaskStatus.CONSENSUS_REACHED, task.getChainTaskId());
taskUpdateRequestManager.publishRequest(task.getChainTaskId());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,40 +17,45 @@
package com.iexec.core.detector.task;

import com.iexec.core.detector.Detector;
import com.iexec.core.task.Task;
import com.iexec.core.task.TaskService;
import com.iexec.core.task.TaskStatus;
import com.iexec.core.task.update.TaskUpdateRequestManager;
import com.iexec.core.task.TaskStatusChange;
import com.iexec.core.task.event.ContributionTimeoutEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.Arrays;
import java.util.Date;
import java.time.Instant;

@Slf4j
@Service
public class ContributionTimeoutTaskDetector implements Detector {

private final TaskService taskService;
private final TaskUpdateRequestManager taskUpdateRequestManager;
private final ApplicationEventPublisher applicationEventPublisher;

public ContributionTimeoutTaskDetector(TaskService taskService,
TaskUpdateRequestManager taskUpdateRequestManager) {
public ContributionTimeoutTaskDetector(TaskService taskService, ApplicationEventPublisher applicationEventPublisher) {
this.taskService = taskService;
this.taskUpdateRequestManager = taskUpdateRequestManager;
this.applicationEventPublisher = applicationEventPublisher;
}

@Scheduled(fixedRateString = "#{@cronConfiguration.getContribute()}")
@Override
public void detect() {
log.debug("Trying to detect contribution timeout");
for (Task task : taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))) {
Date now = new Date();
if (now.after(task.getContributionDeadline())) {
log.info("Task with contribution timeout found [chainTaskId:{}]", task.getChainTaskId());
taskUpdateRequestManager.publishRequest(task.getChainTaskId());
}
}
log.debug("Trying to detect tasks after contribution deadline");
final Instant now = Instant.now();
final Query query = Query.query(Criteria.where("currentStatus").in(TaskStatus.INITIALIZED, TaskStatus.RUNNING)
.and("contributionDeadline").lte(now)
.and("finalDeadline").gt(now));
final Update update = Update.update("currentStatus", TaskStatus.FAILED)
.push("dateStatusList").each(
TaskStatusChange.builder().status(TaskStatus.CONTRIBUTION_TIMEOUT).build(),
TaskStatusChange.builder().status(TaskStatus.FAILED).build());
taskService.updateMultipleTasksByQuery(query, update)
.forEach(id -> applicationEventPublisher.publishEvent(new ContributionTimeoutEvent(id)));
}
}
Loading

0 comments on commit 2935a0c

Please sign in to comment.