diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4415a48a..28bddedd 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,13 @@
All notable changes to this project will be documented in this file.
+## [[8.1.2]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.1.2) 2023-06-29
+
+## Bug fixes
+- Prevent race conditions in `WorkerService`. (#602)
+### Dependency Upgrades
+- Upgrade to `iexec-commons-poco` 3.0.5. (#602)
+
## [[8.1.1]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.1.1) 2023-06-23
### Dependency Upgrades
diff --git a/gradle.properties b/gradle.properties
index e8370103..dcee46a3 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,6 +1,6 @@
-version=8.1.1
+version=8.1.2
iexecCommonVersion=8.2.1
-iexecCommonsPocoVersion=3.0.4
+iexecCommonsPocoVersion=3.0.5
iexecBlockchainAdapterVersion=8.1.1
iexecResultVersion=8.1.1
iexecSmsVersion=8.1.1
diff --git a/src/main/java/com/iexec/core/worker/WorkerService.java b/src/main/java/com/iexec/core/worker/WorkerService.java
index 57bd9fb6..9563663f 100644
--- a/src/main/java/com/iexec/core/worker/WorkerService.java
+++ b/src/main/java/com/iexec/core/worker/WorkerService.java
@@ -16,6 +16,7 @@
package com.iexec.core.worker;
+import com.iexec.common.utils.ContextualLockRunner;
import com.iexec.core.configuration.WorkerConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -27,39 +28,33 @@
import static com.iexec.common.utils.DateTimeUtils.addMinutesToDate;
+/**
+ * Manage {@link Worker} objects.
+ *
+ * /!\ Private read-and-write methods are not thread-safe.
+ * They can sometime lead to race conditions.
+ * Please use the public, thread-safe, versions of these methods instead.
+ */
@Slf4j
@Service
public class WorkerService {
private final WorkerRepository workerRepository;
private final WorkerConfiguration workerConfiguration;
+ private final ContextualLockRunner contextualLockRunner;
public WorkerService(WorkerRepository workerRepository,
WorkerConfiguration workerConfiguration) {
this.workerRepository = workerRepository;
this.workerConfiguration = workerConfiguration;
+ this.contextualLockRunner = new ContextualLockRunner<>();
}
+ // region Read methods
public Optional getWorker(String walletAddress) {
return workerRepository.findByWalletAddress(walletAddress);
}
- public Worker addWorker(Worker worker) {
- Optional oWorker = workerRepository.findByWalletAddress(worker.getWalletAddress());
-
- if (oWorker.isPresent()) {
- Worker existingWorker = oWorker.get();
- log.info("The worker is already registered [workerId:{}]", existingWorker.getId());
- worker.setId(existingWorker.getId());
- worker.setParticipatingChainTaskIds(existingWorker.getParticipatingChainTaskIds());
- worker.setComputingChainTaskIds(existingWorker.getComputingChainTaskIds());
- } else {
- log.info("Registering new worker");
- }
-
- return workerRepository.save(worker);
- }
-
public boolean isAllowedToJoin(String workerAddress){
List whitelist = workerConfiguration.getWhitelist();
// if the whitelist is empty, there is no restriction on the workers
@@ -69,18 +64,6 @@ public boolean isAllowedToJoin(String workerAddress){
return whitelist.contains(workerAddress);
}
- public Optional updateLastAlive(String walletAddress) {
- Optional optional = workerRepository.findByWalletAddress(walletAddress);
- if (optional.isPresent()) {
- Worker worker = optional.get();
- worker.setLastAliveDate(new Date());
- workerRepository.save(worker);
- return Optional.of(worker);
- }
-
- return Optional.empty();
- }
-
public boolean isWorkerAllowedToAskReplicate(String walletAddress) {
Optional oDate = getLastReplicateDemand(walletAddress);
if (oDate.isEmpty()) {
@@ -105,29 +88,6 @@ public Optional getLastReplicateDemand(String walletAddress) {
return Optional.ofNullable(worker.getLastReplicateDemandDate());
}
- public Optional updateLastReplicateDemandDate(String walletAddress) {
- Optional optional = workerRepository.findByWalletAddress(walletAddress);
- if (optional.isPresent()) {
- Worker worker = optional.get();
- worker.setLastReplicateDemandDate(new Date());
- workerRepository.save(worker);
- return Optional.of(worker);
- }
-
- return Optional.empty();
- }
-
- public Optional addChainTaskIdToWorker(String chainTaskId, String walletAddress) {
- Optional optional = workerRepository.findByWalletAddress(walletAddress);
- if (optional.isPresent()) {
- Worker worker = optional.get();
- worker.addChainTaskId(chainTaskId);
- log.info("Added chainTaskId to worker [chainTaskId:{}, workerName:{}]", chainTaskId, walletAddress);
- return Optional.of(workerRepository.save(worker));
- }
- return Optional.empty();
- }
-
public List getChainTaskIds(String walletAddress) {
Optional optional = workerRepository.findByWalletAddress(walletAddress);
if (optional.isPresent()) {
@@ -146,28 +106,6 @@ public List getComputingTaskIds(String walletAddress) {
return Collections.emptyList();
}
- public Optional removeChainTaskIdFromWorker(String chainTaskId, String walletAddress) {
- Optional optional = workerRepository.findByWalletAddress(walletAddress);
- if (optional.isPresent()) {
- Worker worker = optional.get();
- worker.removeChainTaskId(chainTaskId);
- log.info("Removed chainTaskId from worker [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
- return Optional.of(workerRepository.save(worker));
- }
- return Optional.empty();
- }
-
- public Optional removeComputedChainTaskIdFromWorker(String chainTaskId, String walletAddress) {
- Optional optional = workerRepository.findByWalletAddress(walletAddress);
- if (optional.isPresent()) {
- Worker worker = optional.get();
- worker.removeComputedChainTaskId(chainTaskId);
- log.info("Removed computed chainTaskId from worker [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
- return Optional.of(workerRepository.save(worker));
- }
- return Optional.empty();
- }
-
// worker is considered lost if it didn't ping for 1 minute
public List getLostWorkers() {
@@ -249,4 +187,122 @@ public int getAliveAvailableGpu () {
}
return availableGpus;
}
+ // endregion
+
+ // region Read-and-write methods
+ public Worker addWorker(Worker worker) {
+ return contextualLockRunner.applyWithLock(
+ worker.getWalletAddress(),
+ address -> addWorkerWithoutThreadSafety(worker)
+ );
+ }
+
+ private Worker addWorkerWithoutThreadSafety(Worker worker) {
+ Optional oWorker = workerRepository.findByWalletAddress(worker.getWalletAddress());
+
+ if (oWorker.isPresent()) {
+ Worker existingWorker = oWorker.get();
+ log.info("The worker is already registered [workerId:{}]", existingWorker.getId());
+ worker.setId(existingWorker.getId());
+ worker.setParticipatingChainTaskIds(existingWorker.getParticipatingChainTaskIds());
+ worker.setComputingChainTaskIds(existingWorker.getComputingChainTaskIds());
+ } else {
+ log.info("Registering new worker");
+ }
+
+ return workerRepository.save(worker);
+ }
+
+ public Optional updateLastAlive(String walletAddress) {
+ return contextualLockRunner.applyWithLock(
+ walletAddress,
+ this::updateLastAliveWithoutThreadSafety
+ );
+ }
+
+ private Optional updateLastAliveWithoutThreadSafety(String walletAddress) {
+ Optional optional = workerRepository.findByWalletAddress(walletAddress);
+ if (optional.isPresent()) {
+ Worker worker = optional.get();
+ worker.setLastAliveDate(new Date());
+ workerRepository.save(worker);
+ return Optional.of(worker);
+ }
+
+ return Optional.empty();
+ }
+
+ public Optional updateLastReplicateDemandDate(String walletAddress) {
+ return contextualLockRunner.applyWithLock(
+ walletAddress,
+ this::updateLastReplicateDemandDateWithoutThreadSafety
+ );
+ }
+
+ private Optional updateLastReplicateDemandDateWithoutThreadSafety(String walletAddress) {
+ Optional optional = workerRepository.findByWalletAddress(walletAddress);
+ if (optional.isPresent()) {
+ Worker worker = optional.get();
+ worker.setLastReplicateDemandDate(new Date());
+ workerRepository.save(worker);
+ return Optional.of(worker);
+ }
+
+ return Optional.empty();
+ }
+
+ public Optional addChainTaskIdToWorker(String chainTaskId, String walletAddress) {
+ return contextualLockRunner.applyWithLock(
+ walletAddress,
+ address -> addChainTaskIdToWorkerWithoutThreadSafety(chainTaskId, address)
+ );
+ }
+
+ private Optional addChainTaskIdToWorkerWithoutThreadSafety(String chainTaskId, String walletAddress) {
+ Optional optional = workerRepository.findByWalletAddress(walletAddress);
+ if (optional.isPresent()) {
+ Worker worker = optional.get();
+ worker.addChainTaskId(chainTaskId);
+ log.info("Added chainTaskId to worker [chainTaskId:{}, workerName:{}]", chainTaskId, walletAddress);
+ return Optional.of(workerRepository.save(worker));
+ }
+ return Optional.empty();
+ }
+
+ public Optional removeChainTaskIdFromWorker(String chainTaskId, String walletAddress) {
+ return contextualLockRunner.applyWithLock(
+ walletAddress,
+ address -> removeChainTaskIdFromWorkerWithoutThreadSafety(chainTaskId, address)
+ );
+ }
+
+ private Optional removeChainTaskIdFromWorkerWithoutThreadSafety(String chainTaskId, String walletAddress) {
+ Optional optional = workerRepository.findByWalletAddress(walletAddress);
+ if (optional.isPresent()) {
+ Worker worker = optional.get();
+ worker.removeChainTaskId(chainTaskId);
+ log.info("Removed chainTaskId from worker [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
+ return Optional.of(workerRepository.save(worker));
+ }
+ return Optional.empty();
+ }
+
+ public Optional removeComputedChainTaskIdFromWorker(String chainTaskId, String walletAddress) {
+ return contextualLockRunner.applyWithLock(
+ walletAddress,
+ address -> removeComputedChainTaskIdFromWorkerWithoutThreadSafety(chainTaskId, address)
+ );
+ }
+
+ private Optional removeComputedChainTaskIdFromWorkerWithoutThreadSafety(String chainTaskId, String walletAddress) {
+ Optional optional = workerRepository.findByWalletAddress(walletAddress);
+ if (optional.isPresent()) {
+ Worker worker = optional.get();
+ worker.removeComputedChainTaskId(chainTaskId);
+ log.info("Removed computed chainTaskId from worker [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
+ return Optional.of(workerRepository.save(worker));
+ }
+ return Optional.empty();
+ }
+ // endregion
}
diff --git a/src/test/java/com/iexec/core/worker/WorkerServiceRealRepositoryTests.java b/src/test/java/com/iexec/core/worker/WorkerServiceRealRepositoryTests.java
new file mode 100644
index 00000000..26db1183
--- /dev/null
+++ b/src/test/java/com/iexec/core/worker/WorkerServiceRealRepositoryTests.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2023-2023 IEXEC BLOCKCHAIN TECH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.iexec.core.worker;
+
+import com.iexec.core.configuration.WorkerConfiguration;
+import lombok.extern.slf4j.Slf4j;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest;
+import org.springframework.boot.test.mock.mockito.SpyBean;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static com.iexec.commons.poco.utils.TestUtils.WALLET_WORKER_1;
+
+@Slf4j
+@DataMongoTest
+@Testcontainers
+class WorkerServiceRealRepositoryTests {
+ @Container
+ private static final MongoDBContainer mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:4.2"));
+
+ @DynamicPropertySource
+ static void registerProperties(DynamicPropertyRegistry registry) {
+ registry.add("spring.data.mongodb.host", mongoDBContainer::getContainerIpAddress);
+ registry.add("spring.data.mongodb.port", () -> mongoDBContainer.getMappedPort(27017));
+ }
+
+ @SpyBean
+ private WorkerRepository workerRepository;
+ @Mock
+ private WorkerConfiguration workerConfiguration;
+ private WorkerService workerService;
+
+ @BeforeEach
+ void init() {
+ MockitoAnnotations.openMocks(this);
+ workerService = new WorkerService(workerRepository, workerConfiguration);
+ }
+
+ /**
+ * Try and add N tasks to a single worker at the same time.
+ * If everything goes right, the Worker should finally have been assigned N tasks.
+ */
+ @Test
+ void addMultipleTaskIds() {
+ workerService.addWorker(
+ Worker.builder()
+ .walletAddress(WALLET_WORKER_1)
+ .build()
+ );
+
+ final int nThreads = 10;
+ final ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+
+ final List>> futures = IntStream.range(0, nThreads)
+ .mapToObj(i -> executor.submit(() -> workerService.addChainTaskIdToWorker(new Date().getTime() + "", WALLET_WORKER_1)))
+ .collect(Collectors.toList());
+
+ Awaitility.await()
+ .atMost(Duration.ofMinutes(1))
+ .until(() -> futures.stream().map(Future::isDone).reduce(Boolean::logicalAnd).orElse(false));
+
+ Assertions.assertThat(workerService.getWorker(WALLET_WORKER_1).get().getComputingChainTaskIds())
+ .hasSize(nThreads);
+ }
+}