diff --git a/CHANGELOG.md b/CHANGELOG.md index f25c7d33..7cf60b9a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ All notable changes to this project will be documented in this file. +## [[8.2.3]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.2.3) 2023-12-14 + +### Bug Fixes + +- Check if Worker can still accept more work right before giving it a new replicate. (#644) + ## [[8.2.2]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.2.2) 2023-12-13 ### Bug Fixes diff --git a/gradle.properties b/gradle.properties index bbad4cdc..d3c77ac1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=8.2.2 +version=8.2.3 iexecCommonVersion=8.3.0 iexecCommonsPocoVersion=3.1.0 iexecBlockchainAdapterVersion=8.2.0 diff --git a/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java b/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java index d1d5d370..4da8738c 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java +++ b/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java @@ -86,14 +86,9 @@ public ReplicateSupplyService(ReplicatesService replicatesService, */ @Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 5) Optional getAvailableReplicateTaskSummary(long workerLastBlock, String walletAddress) { - // return empty if max computing task is reached or if the worker is not found - if (!workerService.canAcceptMoreWorks(walletAddress)) { - return Optional.empty(); - } - // return empty if the worker is not sync //TODO Check if worker node is sync - boolean isWorkerLastBlockAvailable = workerLastBlock > 0; + final boolean isWorkerLastBlockAvailable = workerLastBlock > 0; if (!isWorkerLastBlockAvailable) { return Optional.empty(); } @@ -102,13 +97,16 @@ Optional getAvailableReplicateTaskSummary(long workerLastB return Optional.empty(); } - // TODO : Remove this, the optional can never be empty - // This is covered in workerService.canAcceptMoreWorks - Optional optional = workerService.getWorker(walletAddress); + final Optional optional = workerService.getWorker(walletAddress); if (optional.isEmpty()) { return Optional.empty(); } - Worker worker = optional.get(); + final Worker worker = optional.get(); + + // return empty if max computing task is reached or if the worker is not found + if (!workerService.canAcceptMoreWorks(worker)) { + return Optional.empty(); + } return getReplicateTaskSummaryForAnyAvailableTask( walletAddress, @@ -161,8 +159,8 @@ private Optional getReplicateTaskSummary(Task task, String chainTaskId, task.getEnclaveChallenge()); ReplicateTaskSummaryBuilder replicateTaskSummary = ReplicateTaskSummary.builder() - .workerpoolAuthorization(authorization); - if(task.isTeeTask()){ + .workerpoolAuthorization(authorization); + if (task.isTeeTask()) { replicateTaskSummary.smsUrl(task.getSmsUrl()); } return Optional.of(replicateTaskSummary.build()); @@ -173,7 +171,7 @@ private Optional getReplicateTaskSummary(Task task, String * tries to accept the task - i.e. create a new {@link Replicate} * for that task on that worker. * - * @param task {@link Task} needing at least one new {@link Replicate}. + * @param task {@link Task} needing at least one new {@link Replicate}. * @param walletAddress Wallet address of a worker looking for new {@link Task}. * @return {@literal true} if the task has been accepted, * {@literal false} otherwise. @@ -184,22 +182,6 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) { } final String chainTaskId = task.getChainTaskId(); - final Optional oReplicatesList = replicatesService.getReplicatesList(chainTaskId); - // Check is only here to prevent - // "`Optional.get()` without `isPresent()` warning". - // This case should not happen. - if (oReplicatesList.isEmpty()) { - return false; - } - - final ReplicatesList replicatesList = oReplicatesList.get(); - - final boolean hasWorkerAlreadyParticipated = - replicatesList.hasWorkerAlreadyParticipated(walletAddress); - if (hasWorkerAlreadyParticipated) { - return false; - } - final Lock lock = taskAccessForNewReplicateLocks .computeIfAbsent(chainTaskId, k -> new ReentrantLock()); if (!lock.tryLock()) { @@ -209,33 +191,56 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) { } try { - final boolean taskNeedsMoreContributions = ConsensusHelper.doesTaskNeedMoreContributionsForConsensus( - chainTaskId, - replicatesList.getReplicates(), - task.getTrust(), - task.getMaxExecutionTime()); - - if (!taskNeedsMoreContributions - || taskService.isConsensusReached(replicatesList)) { - return false; - } - - replicatesService.addNewReplicate(chainTaskId, walletAddress); - workerService.addChainTaskIdToWorker(chainTaskId, walletAddress); + return replicatesService.getReplicatesList(chainTaskId) + .map(replicatesList -> acceptOrRejectTask(task, walletAddress, replicatesList)) + .orElse(false); } finally { // We should always unlock the task // so that it could be taken by another replicate // if there's any issue. lock.unlock(); } + } - return true; + /** + * Given a {@link Task}, a {@code walletAddress} of a worker and a {@link ReplicatesList}, + * tries to accept the task - i.e. create a new {@link Replicate} + * for that task on that worker. + * + * @param task {@link Task} needing at least one new {@link Replicate}. + * @param walletAddress Wallet address of a worker looking for new {@link Task}. + * @param replicatesList Replicates of given {@link Task}. + * @return {@literal true} if the task has been accepted, + * {@literal false} otherwise. + */ + boolean acceptOrRejectTask(Task task, String walletAddress, ReplicatesList replicatesList) { + final boolean hasWorkerAlreadyParticipated = + replicatesList.hasWorkerAlreadyParticipated(walletAddress); + if (hasWorkerAlreadyParticipated) { + return false; + } + + final String chainTaskId = replicatesList.getChainTaskId(); + final boolean taskNeedsMoreContributions = ConsensusHelper.doesTaskNeedMoreContributionsForConsensus( + chainTaskId, + replicatesList.getReplicates(), + task.getTrust(), + task.getMaxExecutionTime()); + + if (!taskNeedsMoreContributions + || taskService.isConsensusReached(replicatesList)) { + return false; + } + + return workerService.addChainTaskIdToWorker(chainTaskId, walletAddress) + .map(worker -> replicatesService.addNewReplicate(replicatesList, walletAddress)) + .orElse(false); } /** * Get notifications missed by the worker during the time it was absent. - * - * @param blockNumber last seen blocknumber by the worker + * + * @param blockNumber last seen blocknumber by the worker * @param walletAddress of the worker * @return list of missed notifications. Can be empty if no notification is found */ @@ -264,7 +269,7 @@ public List getMissedTaskNotifications(long blockNumber, Strin continue; } TaskNotificationExtra taskNotificationExtra = - getTaskNotificationExtra(task, taskNotificationType.get(), walletAddress, enclaveChallenge); + getTaskNotificationExtra(task, taskNotificationType.get(), walletAddress, enclaveChallenge); TaskNotification taskNotification = TaskNotification.builder() .chainTaskId(chainTaskId) @@ -286,7 +291,7 @@ public List getMissedTaskNotifications(long blockNumber, Strin private TaskNotificationExtra getTaskNotificationExtra(Task task, TaskNotificationType taskNotificationType, String walletAddress, String enclaveChallenge) { TaskNotificationExtra taskNotificationExtra = TaskNotificationExtra.builder().build(); - switch (taskNotificationType){ + switch (taskNotificationType) { case PLEASE_CONTRIBUTE: WorkerpoolAuthorization authorization = signatureService.createAuthorization( walletAddress, task.getChainTaskId(), enclaveChallenge); @@ -312,7 +317,7 @@ public Optional getTaskNotificationType(Task task, Replica // CONTRIBUTION_TIMEOUT or CONSENSUS_REACHED without contribution if (task.getCurrentStatus().equals(TaskStatus.CONTRIBUTION_TIMEOUT) || (task.getCurrentStatus().equals(TaskStatus.CONSENSUS_REACHED) - && !replicate.containsContributedStatus())) { + && !replicate.containsContributedStatus())) { return Optional.of(TaskNotificationType.PLEASE_ABORT); } diff --git a/src/main/java/com/iexec/core/replicate/ReplicatesService.java b/src/main/java/com/iexec/core/replicate/ReplicatesService.java index ea793cea..04dd9995 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicatesService.java +++ b/src/main/java/com/iexec/core/replicate/ReplicatesService.java @@ -71,22 +71,21 @@ public ReplicatesService(ReplicatesRepository replicatesRepository, this.taskLogsService = taskLogsService; } - public void addNewReplicate(String chainTaskId, String walletAddress) { - if (getReplicate(chainTaskId, walletAddress).isEmpty()) { - Optional optional = getReplicatesList(chainTaskId); - if (optional.isPresent()) { - ReplicatesList replicatesList = optional.get(); - Replicate replicate = new Replicate(walletAddress, chainTaskId); - replicate.setWorkerWeight(iexecHubService.getWorkerWeight(walletAddress));// workerWeight value for pendingWeight estimate - replicatesList.getReplicates().add(replicate); - - replicatesRepository.save(replicatesList); - log.info("New replicate saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress); - } + public boolean addNewReplicate(ReplicatesList replicatesList, String walletAddress) { + final String chainTaskId = replicatesList.getChainTaskId(); + if (replicatesList.getReplicateOfWorker(walletAddress).isEmpty()) { + Replicate replicate = new Replicate(walletAddress, chainTaskId); + replicate.setWorkerWeight(iexecHubService.getWorkerWeight(walletAddress));// workerWeight value for pendingWeight estimate + replicatesList.getReplicates().add(replicate); + + replicatesRepository.save(replicatesList); + log.info("New replicate saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress); } else { log.error("Replicate already saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress); + return false; } + return true; } public synchronized void createEmptyReplicateList(String chainTaskId) { @@ -635,4 +634,4 @@ public void setRevealTimeoutStatusIfNeeded(String chainTaskId, Replicate replica updateReplicateStatus(chainTaskId, replicate.getWalletAddress(), statusUpdate); } } -} \ No newline at end of file +} diff --git a/src/main/java/com/iexec/core/worker/WorkerService.java b/src/main/java/com/iexec/core/worker/WorkerService.java index ee92158d..bddb3ecd 100644 --- a/src/main/java/com/iexec/core/worker/WorkerService.java +++ b/src/main/java/com/iexec/core/worker/WorkerService.java @@ -69,10 +69,10 @@ public Optional getWorker(String walletAddress) { return workerRepository.findByWalletAddress(walletAddress); } - public boolean isAllowedToJoin(String workerAddress){ + public boolean isAllowedToJoin(String workerAddress) { List whitelist = workerConfiguration.getWhitelist(); // if the whitelist is empty, there is no restriction on the workers - if (whitelist.isEmpty()){ + if (whitelist.isEmpty()) { return true; } return whitelist.contains(workerAddress); @@ -133,19 +133,13 @@ public List getAliveWorkers() { return workerRepository.findByWalletAddressIn(aliveWorkers); } - public boolean canAcceptMoreWorks(String walletAddress) { - Optional optionalWorker = getWorker(walletAddress); - if (optionalWorker.isEmpty()){ - return false; - } - - Worker worker = optionalWorker.get(); + public boolean canAcceptMoreWorks(Worker worker) { int workerMaxNbTasks = worker.getMaxNbTasks(); int runningReplicateNb = worker.getComputingChainTaskIds().size(); if (runningReplicateNb >= workerMaxNbTasks) { log.debug("Worker asking for too many replicates [walletAddress:{}, runningReplicateNb:{}, workerMaxNbTasks:{}]", - walletAddress, runningReplicateNb, workerMaxNbTasks); + worker.getWalletAddress(), runningReplicateNb, workerMaxNbTasks); return false; } @@ -154,7 +148,7 @@ public boolean canAcceptMoreWorks(String walletAddress) { public int getAliveAvailableCpu() { int availableCpus = 0; - for (Worker worker: getAliveWorkers()) { + for (Worker worker : getAliveWorkers()) { if (worker.isGpuEnabled()) { continue; } @@ -162,18 +156,18 @@ public int getAliveAvailableCpu() { int workerCpuNb = worker.getCpuNb(); int computingReplicateNb = worker.getComputingChainTaskIds().size(); int availableCpu = workerCpuNb - computingReplicateNb; - availableCpus+= availableCpu; + availableCpus += availableCpu; } return availableCpus; } public int getAliveTotalCpu() { int totalCpus = 0; - for (Worker worker: getAliveWorkers()){ - if(worker.isGpuEnabled()) { + for (Worker worker : getAliveWorkers()) { + if (worker.isGpuEnabled()) { continue; } - totalCpus+= worker.getCpuNb(); + totalCpus += worker.getCpuNb(); } return totalCpus; } @@ -181,7 +175,7 @@ public int getAliveTotalCpu() { // We suppose for now that 1 Gpu enabled worker has only one GPU public int getAliveTotalGpu() { int totalGpus = 0; - for(Worker worker: getAliveWorkers()) { + for (Worker worker : getAliveWorkers()) { if (worker.isGpuEnabled()) { totalGpus++; } @@ -189,9 +183,9 @@ public int getAliveTotalGpu() { return totalGpus; } - public int getAliveAvailableGpu () { + public int getAliveAvailableGpu() { int availableGpus = getAliveTotalGpu(); - for (Worker worker: getAliveWorkers()) { + for (Worker worker : getAliveWorkers()) { if (worker.isGpuEnabled()) { boolean isWorking = !worker.getComputingChainTaskIds().isEmpty(); if (isWorking) { @@ -246,13 +240,20 @@ public Optional addChainTaskIdToWorker(String chainTaskId, String wallet } private Optional addChainTaskIdToWorkerWithoutThreadSafety(String chainTaskId, String walletAddress) { - Optional optional = workerRepository.findByWalletAddress(walletAddress); + final Optional optional = workerRepository.findByWalletAddress(walletAddress); if (optional.isPresent()) { - Worker worker = optional.get(); + final Worker worker = optional.get(); + if (!canAcceptMoreWorks(worker)) { + log.warn("Can't add chainTaskId to worker when already full [chainTaskId:{}, workerName:{}]", + chainTaskId, walletAddress); + return Optional.empty(); + } worker.addChainTaskId(chainTaskId); log.info("Added chainTaskId to worker [chainTaskId:{}, workerName:{}]", chainTaskId, walletAddress); return Optional.of(workerRepository.save(worker)); } + log.warn("Can't add chainTaskId to worker when unknown worker [chainTaskId:{}, workerName:{}]", + chainTaskId, walletAddress); return Optional.empty(); } diff --git a/src/test/java/com/iexec/core/replicate/ReplicateServiceTests.java b/src/test/java/com/iexec/core/replicate/ReplicateServiceTests.java index 37621391..790c56db 100644 --- a/src/test/java/com/iexec/core/replicate/ReplicateServiceTests.java +++ b/src/test/java/com/iexec/core/replicate/ReplicateServiceTests.java @@ -83,7 +83,7 @@ void shouldCreateNewReplicate() { ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID, list); when(replicatesRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); when(replicatesRepository.save(any())).thenReturn(replicatesList); - replicatesService.addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_3); + replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_3); Mockito.verify(replicatesRepository, Mockito.times(1)) .save(any()); } @@ -103,11 +103,11 @@ void shouldNotCreateNewReplicate() { when(replicatesRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); when(replicatesRepository.save(any())).thenReturn(replicatesList); - replicatesService.addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1); + replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1); Mockito.verify(replicatesRepository, Mockito.times(0)) .save(any()); - replicatesService.addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_2); + replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_2); Mockito.verify(replicatesRepository, Mockito.times(0)) .save(any()); } @@ -1505,4 +1505,4 @@ void computeUpdateReplicateStatusArgsResultUploadFailed() { .build()); } -} \ No newline at end of file +} diff --git a/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java b/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java index 2a28f281..0bd5d703 100644 --- a/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java +++ b/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java @@ -58,7 +58,7 @@ class ReplicateSupplyServiceTests { private final static String WALLET_WORKER_1 = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; private final static String WALLET_WORKER_2 = "0xdcfeffee1443fbf9277e6fa3b50cf3b38f7101af"; - private final static String CHAIN_TASK_ID = "0x65bc5e94ed1486b940bd6cc0013c418efad58a0a52a3d08cee89faaa21970426"; + private final static String CHAIN_TASK_ID = "0x65bc5e94ed1486b940bd6cc0013c418efad58a0a52a3d08cee89faaa21970426"; private final static String CHAIN_TASK_ID_2 = "0xc536af16737e02bb28100452a932056d499be3c462619751a9ed36515de64d50"; private final static String DAPP_NAME = "dappName"; @@ -69,12 +69,18 @@ class ReplicateSupplyServiceTests { private final static long maxExecutionTime = 60000; long workerLastBlock = 12; - @Mock private ReplicatesService replicatesService; - @Mock private SignatureService signatureService; - @Mock private TaskService taskService; - @Mock private TaskUpdateRequestManager taskUpdateRequestManager; - @Mock private WorkerService workerService; - @Mock private Web3jService web3jService; + @Mock + private ReplicatesService replicatesService; + @Mock + private SignatureService signatureService; + @Mock + private TaskService taskService; + @Mock + private TaskUpdateRequestManager taskUpdateRequestManager; + @Mock + private WorkerService workerService; + @Mock + private Web3jService web3jService; @Spy @InjectMocks @@ -85,11 +91,6 @@ void init() { MockitoAnnotations.openMocks(this); } - void workerCanWorkAndHasGas(String workerAddress) { - when(workerService.canAcceptMoreWorks(workerAddress)).thenReturn(true); - when(web3jService.hasEnoughGas(workerAddress)).thenReturn(true); - } - // Tests on getAuthOfAvailableReplicate() // If worker does not exist, canAcceptMoreWorks return false @@ -97,16 +98,17 @@ void workerCanWorkAndHasGas(String workerAddress) { // in getAuthOfAvailableReplicate method @Test void shouldNotGetAnyReplicateSinceWorkerDoesNotExist() { + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(workerService.getWorker(Mockito.anyString())).thenReturn(Optional.empty()); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); assertThat(replicateTaskSummary).isEmpty(); - Mockito.verifyNoInteractions(web3jService, taskService, taskUpdateRequestManager, replicatesService, signatureService); + Mockito.verifyNoInteractions(taskService, taskUpdateRequestManager, replicatesService, signatureService); } @Test void shouldNotGetReplicateSinceWorkerLastBlockNotAvailable() { - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(0, WALLET_WORKER_1); assertThat(replicateTaskSummary).isEmpty(); @@ -115,7 +117,18 @@ void shouldNotGetReplicateSinceWorkerLastBlockNotAvailable() { @Test void shouldNotGetReplicateSinceNoRunningTask() { - workerCanWorkAndHasGas(WALLET_WORKER_1); + final Worker worker = Worker.builder() + .id("1") + .walletAddress(WALLET_WORKER_1) + .cpuNb(4) + .maxNbTasks(3) + .teeEnabled(false) + .build(); + + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); + when(workerService.getWorker(WALLET_WORKER_1)) + .thenReturn(Optional.ofNullable(worker)); + when(taskService.getPrioritizedInitializedOrRunningTask(false, Collections.emptyList())).thenReturn(Optional.empty()); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); @@ -130,6 +143,7 @@ void shouldNotGetReplicateSinceNoReplicatesList() { .id("1") .walletAddress(WALLET_WORKER_2) .cpuNb(4) + .maxNbTasks(3) .teeEnabled(false) .build(); @@ -140,7 +154,7 @@ void shouldNotGetReplicateSinceNoReplicatesList() { runningTask.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60)); runningTask.setEnclaveChallenge(BytesUtils.EMPTY_ADDRESS); - workerCanWorkAndHasGas(WALLET_WORKER_2); + when(web3jService.hasEnoughGas(WALLET_WORKER_2)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_2)).thenReturn(Optional.of(worker)); @@ -160,6 +174,7 @@ void shouldNotGetReplicateSinceConsensusReachedOnChain() { .id("1") .walletAddress(WALLET_WORKER_2) .cpuNb(4) + .maxNbTasks(3) .teeEnabled(false) .build(); final Replicate replicate = new Replicate(WALLET_WORKER_2, CHAIN_TASK_ID); @@ -176,10 +191,11 @@ void shouldNotGetReplicateSinceConsensusReachedOnChain() { runningTask.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60)); runningTask.setEnclaveChallenge(BytesUtils.EMPTY_ADDRESS); - workerCanWorkAndHasGas(WALLET_WORKER_2); + when(web3jService.hasEnoughGas(WALLET_WORKER_2)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_2)).thenReturn(Optional.of(worker)); + when(workerService.canAcceptMoreWorks(worker)).thenReturn(true); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); when(taskService.isConsensusReached(replicatesList)).thenReturn(true); when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_2)).thenReturn(false); @@ -196,7 +212,14 @@ void shouldNotGetReplicateSinceConsensusReachedOnChain() { @Test void shouldNotGetAnyReplicateSinceWorkerIsFull() { - when(workerService.canAcceptMoreWorks(WALLET_WORKER_1)).thenReturn(false); + final Worker worker = Worker.builder() + .walletAddress(WALLET_WORKER_1) + .cpuNb(2) + .maxNbTasks(1) + .build(); + when(workerService.getWorker(WALLET_WORKER_1)) + .thenReturn(Optional.of(worker)); + when(workerService.canAcceptMoreWorks(worker)).thenReturn(false); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); assertThat(replicateTaskSummary).isEmpty(); @@ -205,7 +228,6 @@ void shouldNotGetAnyReplicateSinceWorkerIsFull() { @Test void shouldNotGetAnyReplicateSinceWorkerDoesNotHaveEnoughGas() { - when(workerService.canAcceptMoreWorks(WALLET_WORKER_1)).thenReturn(true); when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(false); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); @@ -220,6 +242,7 @@ void shouldNotGetAnyReplicateSinceWorkerAlreadyParticipated() { .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) + .maxNbTasks(1) .build(); Task runningTask = new Task(DAPP_NAME, COMMAND_LINE, 5, CHAIN_TASK_ID); @@ -234,7 +257,8 @@ void shouldNotGetAnyReplicateSinceWorkerAlreadyParticipated() { Collections.singletonList(new Replicate(WALLET_WORKER_1, CHAIN_TASK_ID)) )); - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); @@ -256,6 +280,7 @@ void shouldNotGetReplicateSinceDoesNotNeedMoreContributionsForConsensus() { .id("1") .walletAddress(WALLET_WORKER_2) .cpuNb(2) + .maxNbTasks(1) .build(); int trust = 5; @@ -277,11 +302,14 @@ void shouldNotGetReplicateSinceDoesNotNeedMoreContributionsForConsensus() { ); // Try to see if a replicate of the task can be scheduled on worker2 - workerCanWorkAndHasGas(WALLET_WORKER_2); + when(web3jService.hasEnoughGas(WALLET_WORKER_2)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_2)).thenReturn(Optional.of(existingWorker)); + when(workerService.canAcceptMoreWorks(existingWorker)).thenReturn(true); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); + when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) + .thenReturn(Optional.of(existingWorker)); when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_2); @@ -297,6 +325,7 @@ void shouldNotGetReplicateSinceEnclaveChallengeNeededButNotGenerated() { .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) + .maxNbTasks(1) .teeEnabled(true) .build(); @@ -311,7 +340,7 @@ void shouldNotGetReplicateSinceEnclaveChallengeNeededButNotGenerated() { new ReplicatesList(CHAIN_TASK_ID, Collections.emptyList()) ); - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(false, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); @@ -323,7 +352,7 @@ void shouldNotGetReplicateSinceEnclaveChallengeNeededButNotGenerated() { assertThat(replicateTaskSummary).isEmpty(); - Mockito.verify(replicatesService, Mockito.never()).addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1); + Mockito.verify(replicatesService, Mockito.never()).addNewReplicate(replicatesList, WALLET_WORKER_1); Mockito.verify(workerService, Mockito.never()).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1); Mockito.verifyNoInteractions(signatureService); assertTaskAccessForNewReplicateLockNeverUsed(CHAIN_TASK_ID); @@ -335,6 +364,7 @@ void shouldGetOnlyOneReplicateSinceOtherOneReachedConsensusDeadline() { .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(4) + .maxNbTasks(3) .teeEnabled(false) .build(); @@ -355,22 +385,27 @@ void shouldGetOnlyOneReplicateSinceOtherOneReachedConsensusDeadline() { ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID, Collections.emptyList()); - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(true, List.of(CHAIN_TASK_ID))) .thenReturn(Optional.of(taskDeadlineReached)); when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.of(task1)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); + when(workerService.canAcceptMoreWorks(existingWorker)).thenReturn(true); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); when(signatureService.createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, BytesUtils.EMPTY_ADDRESS)) .thenReturn(WorkerpoolAuthorization.builder().chainTaskId(CHAIN_TASK_ID).build()); + when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) + .thenReturn(Optional.of(existingWorker)); + when(replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1)) + .thenReturn(true); final Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); assertThat(replicateTaskSummary).isPresent(); assertThat(replicateTaskSummary.get().getWorkerpoolAuthorization().getChainTaskId()).isEqualTo(CHAIN_TASK_ID); - Mockito.verify(replicatesService).addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1); + Mockito.verify(replicatesService).addNewReplicate(replicatesList, WALLET_WORKER_1); Mockito.verify(workerService).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1); Mockito.verify(signatureService, times(0)).createAuthorization(any(), eq(CHAIN_TASK_ID_2), any()); assertTaskAccessForNewReplicateNotDeadLocking(CHAIN_TASK_ID); @@ -382,6 +417,7 @@ void shouldNotGetReplicateWhenTaskAlreadyAccessed() { .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) + .maxNbTasks(1) .teeEnabled(false) .build(); @@ -392,7 +428,7 @@ void shouldNotGetReplicateWhenTaskAlreadyAccessed() { runningTask.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60)); runningTask.setEnclaveChallenge(BytesUtils.EMPTY_ADDRESS); - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); @@ -413,6 +449,7 @@ void shouldGetReplicateWithNoTee() { .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) + .maxNbTasks(1) .teeEnabled(false) .build(); @@ -427,20 +464,25 @@ void shouldGetReplicateWithNoTee() { new ReplicatesList(CHAIN_TASK_ID, Collections.emptyList()) ); - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); + when(workerService.canAcceptMoreWorks(existingWorker)).thenReturn(true); when(signatureService.createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, BytesUtils.EMPTY_ADDRESS)) .thenReturn(new WorkerpoolAuthorization()); when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false); + when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) + .thenReturn(Optional.of(existingWorker)); + when(replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1)) + .thenReturn(true); Optional replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1); assertThat(replicateTaskSummary).isPresent(); - Mockito.verify(replicatesService).addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1); + Mockito.verify(replicatesService).addNewReplicate(replicatesList, WALLET_WORKER_1); Mockito.verify(workerService).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1); Mockito.verify(signatureService).createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, BytesUtils.EMPTY_ADDRESS); assertTaskAccessForNewReplicateNotDeadLocking(CHAIN_TASK_ID); @@ -452,6 +494,7 @@ void shouldGetReplicateWithTee() { .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) + .maxNbTasks(1) .teeEnabled(true) .build(); @@ -466,13 +509,18 @@ void shouldGetReplicateWithTee() { new ReplicatesList(CHAIN_TASK_ID, Collections.emptyList()) ); - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(false, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); + when(workerService.canAcceptMoreWorks(existingWorker)).thenReturn(true); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); when(signatureService.createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, ENCLAVE_CHALLENGE)) .thenReturn(new WorkerpoolAuthorization()); + when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) + .thenReturn(Optional.of(existingWorker)); + when(replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1)) + .thenReturn(true); when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false); Optional replicateTaskSummary = @@ -480,7 +528,7 @@ void shouldGetReplicateWithTee() { assertThat(replicateTaskSummary).isPresent(); - Mockito.verify(replicatesService).addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1); + Mockito.verify(replicatesService).addNewReplicate(replicatesList, WALLET_WORKER_1); Mockito.verify(workerService).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1); assertTaskAccessForNewReplicateNotDeadLocking(CHAIN_TASK_ID); } @@ -491,6 +539,7 @@ void shouldTeeNeededTaskNotBeGivenToTeeDisabledWorker() { .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) + .maxNbTasks(1) .teeEnabled(false) .build(); @@ -500,7 +549,7 @@ void shouldTeeNeededTaskNotBeGivenToTeeDisabledWorker() { runningTask.setTag(TEE_TAG); runningTask.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60)); - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(true, Collections.emptyList())) .thenReturn(Optional.empty()); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); @@ -519,6 +568,7 @@ void shouldTeeNeededTaskBeGivenToTeeEnabledWorker() { .id("1") .walletAddress(WALLET_WORKER_1) .cpuNb(2) + .maxNbTasks(1) .teeEnabled(true) .build(); @@ -533,13 +583,18 @@ void shouldTeeNeededTaskBeGivenToTeeEnabledWorker() { new ReplicatesList(CHAIN_TASK_ID, Collections.emptyList()) ); - workerCanWorkAndHasGas(WALLET_WORKER_1); + when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true); when(taskService.getPrioritizedInitializedOrRunningTask(false, Collections.emptyList())) .thenReturn(Optional.of(runningTask)); when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker)); + when(workerService.canAcceptMoreWorks(existingWorker)).thenReturn(true); when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList)); when(signatureService.createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, ENCLAVE_CHALLENGE)) .thenReturn(new WorkerpoolAuthorization()); + when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1)) + .thenReturn(Optional.of(existingWorker)); + when(replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1)) + .thenReturn(true); when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false); Optional replicateTaskSummary = @@ -547,7 +602,7 @@ void shouldTeeNeededTaskBeGivenToTeeEnabledWorker() { assertThat(replicateTaskSummary).isPresent(); - Mockito.verify(replicatesService).addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1); + Mockito.verify(replicatesService).addNewReplicate(replicatesList, WALLET_WORKER_1); Mockito.verify(workerService).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1); assertTaskAccessForNewReplicateNotDeadLocking(CHAIN_TASK_ID); } @@ -559,6 +614,7 @@ void shouldTeeNeededTaskBeGivenToTeeEnabledWorker() { */ private void assertTaskAccessForNewReplicateNotDeadLocking(String chainTaskId) { final Lock lock = replicateSupplyService.taskAccessForNewReplicateLocks.get(chainTaskId); + System.out.println("Task: " + chainTaskId + " ; lock : " + lock); final Boolean successfulLock = CompletableFuture.supplyAsync(() -> { final boolean locked = lock.tryLock(); if (!locked) { @@ -574,7 +630,8 @@ private void assertTaskAccessForNewReplicateNotDeadLocking(String chainTaskId) { private void assertTaskAccessForNewReplicateLockNeverUsed(String chainTaskId) { final Lock lock = replicateSupplyService.taskAccessForNewReplicateLocks.get(chainTaskId); - assertThat(lock).isNull();; + assertThat(lock).isNull(); + ; } // Tests on getMissedTaskNotifications() @@ -612,12 +669,12 @@ void shouldNotGetInterruptedReplicateSinceEnclaveChallengeNeededButNotGenerated( assertThat(taskNotifications).isEmpty(); Mockito.verify(replicatesService, times(0)) - .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); + .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); } @Test - // CREATED, ..., CAN_CONTRIBUTE => RecoveryAction.CONTRIBUTE + // CREATED, ..., CAN_CONTRIBUTE => RecoveryAction.CONTRIBUTE void shouldTellReplicateToContributeWhenComputing() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.RUNNING); @@ -641,7 +698,7 @@ void shouldTellReplicateToContributeWhenComputing() { } @Test - // CONTRIBUTING + !onChain => RecoveryAction.CONTRIBUTE + // CONTRIBUTING + !onChain => RecoveryAction.CONTRIBUTE void shouldTellReplicateToContributeSinceNotDoneOnchain() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.RUNNING); @@ -668,8 +725,8 @@ void shouldTellReplicateToContributeSinceNotDoneOnchain() { } @Test - // CONTRIBUTING + done onChain => updateStatus to CONTRIBUTED - // Task not in CONSENSUS_REACHED => RecoveryAction.WAIT + // CONTRIBUTING + done onChain => updateStatus to CONTRIBUTED + // Task not in CONSENSUS_REACHED => RecoveryAction.WAIT void shouldTellReplicateToWaitSinceContributedOnchain() { long blockNumber = 3; // ChainReceipt chainReceipt = new ChainReceipt(blockNumber, ""); @@ -707,8 +764,8 @@ void shouldTellReplicateToWaitSinceContributedOnchain() { } @Test - // CONTRIBUTING + done onChain => updateStatus to CONTRIBUTED - // Task in CONSENSUS_REACHED => RecoveryAction.REVEAL + // CONTRIBUTING + done onChain => updateStatus to CONTRIBUTED + // Task in CONSENSUS_REACHED => RecoveryAction.REVEAL void shouldTellReplicateToRevealSinceConsensusReached() { long blockNumber = 3; // ChainReceipt chainReceipt = new ChainReceipt(blockNumber, ""); @@ -746,7 +803,7 @@ void shouldTellReplicateToRevealSinceConsensusReached() { } @Test - // any status + Task in CONTRIBUTION_TIMEOUT => RecoveryAction.ABORT_CONTRIBUTION_TIMEOUT + // any status + Task in CONTRIBUTION_TIMEOUT => RecoveryAction.ABORT_CONTRIBUTION_TIMEOUT void shouldTellReplicateToAbortSinceContributionTimeout() { long blockNumber = 3; List ids = List.of(CHAIN_TASK_ID); @@ -773,7 +830,7 @@ void shouldTellReplicateToAbortSinceContributionTimeout() { } @Test - // !CONTRIBUTED + Task in CONSENSUS_REACHED => RecoveryAction.ABORT_CONSENSUS_REACHED + // !CONTRIBUTED + Task in CONSENSUS_REACHED => RecoveryAction.ABORT_CONSENSUS_REACHED void shouldTellReplicateToWaitSinceConsensusReachedAndItDidNotContribute() { long blockNumber = 3; List ids = List.of(CHAIN_TASK_ID); @@ -801,7 +858,7 @@ void shouldTellReplicateToWaitSinceConsensusReachedAndItDidNotContribute() { } @Test - // CONTRIBUTED + Task in REVEAL phase => RecoveryAction.REVEAL + // CONTRIBUTED + Task in REVEAL phase => RecoveryAction.REVEAL void shouldTellReplicateToRevealSinceContributed() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.AT_LEAST_ONE_REVEALED); @@ -825,7 +882,7 @@ void shouldTellReplicateToRevealSinceContributed() { } @Test - // REVEALING + !onChain => RecoveryAction.REVEAL + // REVEALING + !onChain => RecoveryAction.REVEAL void shouldTellReplicateToRevealSinceNotDoneOnchain() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.AT_LEAST_ONE_REVEALED); @@ -852,8 +909,8 @@ void shouldTellReplicateToRevealSinceNotDoneOnchain() { } @Test - // REVEALING + done onChain => updateStatus to REVEALED - // no RESULT_UPLOAD_REQUESTED => RecoveryAction.WAIT + // REVEALING + done onChain => updateStatus to REVEALED + // no RESULT_UPLOAD_REQUESTED => RecoveryAction.WAIT void shouldTellReplicateToWaitSinceRevealed() { long blockNumber = 3; // ChainReceipt chainReceipt = new ChainReceipt(blockNumber, ""); @@ -891,8 +948,8 @@ void shouldTellReplicateToWaitSinceRevealed() { } @Test - // REVEALING + done onChain => updateStatus to REVEALED - // RESULT_UPLOAD_REQUESTED => RecoveryAction.UPLOAD_RESULT + // REVEALING + done onChain => updateStatus to REVEALED + // RESULT_UPLOAD_REQUESTED => RecoveryAction.UPLOAD_RESULT void shouldTellReplicateToUploadResultSinceRequestedAfterRevealing() { long blockNumber = 3; // ChainReceipt chainReceipt = new ChainReceipt(blockNumber, ""); @@ -929,7 +986,7 @@ void shouldTellReplicateToUploadResultSinceRequestedAfterRevealing() { } @Test - // RESULT_UPLOAD_REQUESTED => RecoveryAction.UPLOAD_RESULT + // RESULT_UPLOAD_REQUESTED => RecoveryAction.UPLOAD_RESULT void shouldTellReplicateToUploadResultSinceRequested() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.RESULT_UPLOADING); @@ -953,7 +1010,7 @@ void shouldTellReplicateToUploadResultSinceRequested() { } @Test - // RESULT_UPLOADING + not done yet => RecoveryAction.UPLOAD_RESULT + // RESULT_UPLOADING + not done yet => RecoveryAction.UPLOAD_RESULT void shouldTellReplicateToUploadResultSinceNotDoneYet() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.RESULT_UPLOADING); @@ -979,8 +1036,8 @@ void shouldTellReplicateToUploadResultSinceNotDoneYet() { } @Test - // RESULT_UPLOADING + done => update to ReplicateStatus.RESULT_UPLOADED - // RecoveryAction.WAIT + // RESULT_UPLOADING + done => update to ReplicateStatus.RESULT_UPLOADED + // RecoveryAction.WAIT void shouldTellReplicateToWaitSinceDetectedResultUpload() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.RESULT_UPLOADING); @@ -1009,7 +1066,7 @@ void shouldTellReplicateToWaitSinceDetectedResultUpload() { } @Test - // RESULT_UPLOADED => RecoveryAction.WAIT + // RESULT_UPLOADED => RecoveryAction.WAIT void shouldTellReplicateToWaitSinceItUploadedResult() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.RESULT_UPLOADING); @@ -1038,7 +1095,7 @@ void shouldTellReplicateToWaitSinceItUploadedResult() { } @Test - // REVEALED + Task in completion phase => RecoveryAction.WAIT + // REVEALED + Task in completion phase => RecoveryAction.WAIT void shouldTellReplicateToWaitForCompletionSinceItRevealed() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.FINALIZING); @@ -1064,7 +1121,7 @@ void shouldTellReplicateToWaitForCompletionSinceItRevealed() { } @Test - // REVEALED + RESULT_UPLOADED + Task in completion phase => RecoveryAction.WAIT + // REVEALED + RESULT_UPLOADED + Task in completion phase => RecoveryAction.WAIT void shouldTellReplicateToWaitForCompletionSinceItRevealedAndUploaded() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.FINALIZING); @@ -1122,7 +1179,7 @@ void shouldTellReplicateToCompleteSinceItRevealed() { } @Test - // !REVEALED + Task in completion phase => null / nothing + // !REVEALED + Task in completion phase => null / nothing void shouldNotTellReplicateToWaitForCompletionSinceItDidNotReveal() { List ids = List.of(CHAIN_TASK_ID); List taskList = getStubTaskList(TaskStatus.FINALIZING); @@ -1215,4 +1272,4 @@ Optional getStubReplicate(ReplicateStatus status) { WorkerpoolAuthorization getStubAuth() { return new WorkerpoolAuthorization(); } -} \ No newline at end of file +} diff --git a/src/test/java/com/iexec/core/worker/WorkerServiceRealRepositoryTests.java b/src/test/java/com/iexec/core/worker/WorkerServiceRealRepositoryTests.java index 989206c8..cab83cba 100644 --- a/src/test/java/com/iexec/core/worker/WorkerServiceRealRepositoryTests.java +++ b/src/test/java/com/iexec/core/worker/WorkerServiceRealRepositoryTests.java @@ -37,7 +37,9 @@ import java.util.Date; import java.util.List; import java.util.Optional; -import java.util.concurrent.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -74,13 +76,14 @@ void init() { */ @Test void addMultipleTaskIds() { + final int nThreads = 10; workerService.addWorker( Worker.builder() .walletAddress(WALLET_WORKER_1) + .maxNbTasks(nThreads) .build() ); - final int nThreads = 10; final ExecutorService executor = Executors.newFixedThreadPool(nThreads); final List>> futures = IntStream.range(0, nThreads) diff --git a/src/test/java/com/iexec/core/worker/WorkerServiceTests.java b/src/test/java/com/iexec/core/worker/WorkerServiceTests.java index 9d7a2201..c5d59fd5 100644 --- a/src/test/java/com/iexec/core/worker/WorkerServiceTests.java +++ b/src/test/java/com/iexec/core/worker/WorkerServiceTests.java @@ -19,7 +19,10 @@ import com.iexec.core.configuration.WorkerConfiguration; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.*; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -194,7 +197,7 @@ void shouldWorkerBeAllowedToAskReplicateSinceFirstTime() { void shouldWorkerNotBeAllowedToAskReplicateSinceTooSoon() { String wallet = "wallet"; workerService.getWorkerStatsMap().computeIfAbsent(wallet, WorkerService.WorkerStats::new) - .setLastReplicateDemandDate(Date.from(Instant.now().minusSeconds(1))); + .setLastReplicateDemandDate(Date.from(Instant.now().minusSeconds(1))); when(workerConfiguration.getAskForReplicatePeriod()).thenReturn(5000L); assertThat(workerService.isWorkerAllowedToAskReplicate(wallet)).isFalse(); @@ -219,7 +222,7 @@ void shouldUpdateLastReplicateDemand() { // addChainTaskIdToWorker @Test - void shouldAddTaskIdToWorker(){ + void shouldAddTaskIdToWorker() { String workerName = "worker1"; String walletAddress = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; Worker existingWorker = Worker.builder() @@ -229,6 +232,7 @@ void shouldAddTaskIdToWorker(){ .os("Linux") .cpu("x86") .cpuNb(8) + .maxNbTasks(7) .participatingChainTaskIds(new ArrayList<>(Arrays.asList("task1", "task2"))) .computingChainTaskIds(new ArrayList<>(Arrays.asList("task1", "task2"))) .build(); @@ -246,12 +250,35 @@ void shouldAddTaskIdToWorker(){ } @Test - void shouldNotAddTaskIdToWorker(){ + void shouldNotAddTaskIdToWorkerSinceUnknownWorker() { when(workerRepository.findByWalletAddress(Mockito.anyString())).thenReturn(Optional.empty()); Optional addedWorker = workerService.addChainTaskIdToWorker("task1", "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"); assertThat(addedWorker).isEmpty(); } + @Test + void shouldNotAddTaskIdToWorkerSinceCantAcceptMoreWorker() { + String workerName = "worker1"; + String walletAddress = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; + Worker existingWorker = Worker.builder() + .id("1") + .name(workerName) + .walletAddress(walletAddress) + .os("Linux") + .cpu("x86") + .cpuNb(3) + .maxNbTasks(2) + .participatingChainTaskIds(new ArrayList<>(Arrays.asList("task1", "task2"))) + .computingChainTaskIds(new ArrayList<>(Arrays.asList("task1", "task2"))) + .build(); + + when(workerRepository.findByWalletAddress(walletAddress)).thenReturn(Optional.of(existingWorker)); + when(workerRepository.save(existingWorker)).thenReturn(existingWorker); + + Optional addedWorker = workerService.addChainTaskIdToWorker("task3", walletAddress); + assertThat(addedWorker).isEmpty(); + } + // getChainTaskIds @Test @@ -294,11 +321,11 @@ void shouldNotGetComputingTaskIdsSinceNoWorker() { assertThat(workerService.getComputingTaskIds(wallet)).isEmpty(); } - + // removeChainTaskIdFromWorker @Test - void shouldRemoveTaskIdFromWorker(){ + void shouldRemoveTaskIdFromWorker() { String workerName = "worker1"; String walletAddress = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; Worker existingWorker = Worker.builder() @@ -325,14 +352,14 @@ void shouldRemoveTaskIdFromWorker(){ } @Test - void shouldNotRemoveTaskIdWorkerNotFound(){ + void shouldNotRemoveTaskIdWorkerNotFound() { when(workerRepository.findByWalletAddress(Mockito.anyString())).thenReturn(Optional.empty()); Optional addedWorker = workerService.removeChainTaskIdFromWorker("task1", "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"); assertThat(addedWorker).isEmpty(); } @Test - void shouldNotRemoveAnythingSinceTaskIdNotFound(){ + void shouldNotRemoveAnythingSinceTaskIdNotFound() { String workerName = "worker1"; String walletAddress = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; List participatingIds = new ArrayList<>(Arrays.asList("task1", "task2")); @@ -362,7 +389,7 @@ void shouldNotRemoveAnythingSinceTaskIdNotFound(){ } @Test - void shouldRemoveComputedChainTaskIdFromWorker(){ + void shouldRemoveComputedChainTaskIdFromWorker() { String workerName = "worker1"; String walletAddress = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; List participatingIds = new ArrayList<>(Arrays.asList("task1", "task2")); @@ -392,7 +419,7 @@ void shouldRemoveComputedChainTaskIdFromWorker(){ } @Test - void shouldNotRemoveComputedChainTaskIdFromWorkerSinceWorkerNotFound(){ + void shouldNotRemoveComputedChainTaskIdFromWorkerSinceWorkerNotFound() { String workerName = "worker1"; String walletAddress = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; List participatingIds = new ArrayList<>(Arrays.asList("task1", "task2")); @@ -416,7 +443,7 @@ void shouldNotRemoveComputedChainTaskIdFromWorkerSinceWorkerNotFound(){ } @Test - void shouldNotRemoveComputedChainTaskIdFromWorkerSinceChainTaskIdNotFound(){ + void shouldNotRemoveComputedChainTaskIdFromWorkerSinceChainTaskIdNotFound() { String workerName = "worker1"; String walletAddress = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; List participatingIds = new ArrayList<>(Arrays.asList("task1", "task2")); @@ -495,19 +522,8 @@ void shouldAcceptMoreWorks() { 3, Arrays.asList("task1", "task2", "task3", "task4", "task5"), Arrays.asList("task1", "task3")); - when(workerRepository.findByWalletAddress(walletAddress)).thenReturn(Optional.of(worker)); - - assertThat(workerService.canAcceptMoreWorks(walletAddress)).isTrue(); - } - - @Test - void shouldNotAcceptMoreWorksSinceWorkerNotFound() { - String walletAddress = "0x1a69b2eb604db8eba185df03ea4f5288dcbbd248"; - - when(workerRepository.findByWalletAddress(Mockito.any())).thenReturn(Optional.empty()); - boolean canAccept = workerService.canAcceptMoreWorks(walletAddress); - assertThat(canAccept).isFalse(); + assertThat(workerService.canAcceptMoreWorks(worker)).isTrue(); } @Test @@ -518,9 +534,8 @@ void shouldNotAcceptMoreWorksSinceSaturatedCpus() { 2, Arrays.asList("task1", "task2", "task3", "task4"), Arrays.asList("task1", "task3")); - when(workerRepository.findByWalletAddress(Mockito.anyString())).thenReturn(Optional.of(worker)); - assertThat(workerService.canAcceptMoreWorks(walletAddress)).isFalse(); + assertThat(workerService.canAcceptMoreWorks(worker)).isFalse(); } List getDummyWorkers() {