From 15b40a813a91262aa41902f1b950c840305810d3 Mon Sep 17 00:00:00 2001 From: Tomasz Godzik Date: Thu, 2 Jan 2025 18:43:03 +0100 Subject: [PATCH] bugfix: Synchronize on target instead oforigin when copying This should help with the FileAlreadyExistsException that we are getting which is most likely caused by multiple threads writing at the same time. --- .../src/main/scala/bloop/io/ParallelOps.scala | 4 +- .../scala/bloop/io/ParallelOpsSuite.scala | 73 +++++++++++++++++++ 2 files changed, 75 insertions(+), 2 deletions(-) create mode 100644 backend/src/test/scala/bloop/io/ParallelOpsSuite.scala diff --git a/backend/src/main/scala/bloop/io/ParallelOps.scala b/backend/src/main/scala/bloop/io/ParallelOps.scala index ae50e6c05..993213133 100644 --- a/backend/src/main/scala/bloop/io/ParallelOps.scala +++ b/backend/src/main/scala/bloop/io/ParallelOps.scala @@ -214,7 +214,7 @@ object ParallelOps { } } } finally { - takenByOtherCopyProcess.remove(originFile) + takenByOtherCopyProcess.remove(targetFile) // Complete successfully to unblock other tasks p.success(()) } @@ -223,7 +223,7 @@ object ParallelOps { def acquireFile: MonixTask[Unit] = { val currentPromise = Promise[Unit]() - val promiseInMap = takenByOtherCopyProcess.putIfAbsent(originFile, currentPromise) + val promiseInMap = takenByOtherCopyProcess.putIfAbsent(targetFile, currentPromise) if (promiseInMap == null) { triggerCopy(currentPromise) } else { diff --git a/backend/src/test/scala/bloop/io/ParallelOpsSuite.scala b/backend/src/test/scala/bloop/io/ParallelOpsSuite.scala new file mode 100644 index 000000000..00798caa8 --- /dev/null +++ b/backend/src/test/scala/bloop/io/ParallelOpsSuite.scala @@ -0,0 +1,73 @@ +package bloop.io + +import java.nio.file.Files +import bloop.io.ParallelOps.CopyMode +import org.junit.Test +import bloop.logging.RecordingLogger +import bloop.task.Task +import monix.execution.Scheduler +import java.nio.file.StandardOpenOption +import scala.concurrent.duration._ +import scala.concurrent.Await + +class ParallelOpsSuite { + + private def createRandomDirectory() = { + + val from = Files.createTempDirectory("parallel") + val inputFile = from.resolve("text.scala") + val text = "random\n" * 100 + Files.write(inputFile, text.getBytes, StandardOpenOption.CREATE, StandardOpenOption.APPEND) + } + @Test + def runMultipleCopies() = { + + val from = createRandomDirectory() + val to = Files.createTempDirectory("parallel") + + val config = + ParallelOps.CopyConfiguration(5, CopyMode.ReplaceExisting, Set.empty, Set.empty) + val logger = new RecordingLogger() + + val tasks = + for (_ <- 0 to 100) + yield ParallelOps + .copyDirectories(config)( + from, + to, + Scheduler.Implicits.global, + enableCancellation = false, + logger + ) + + val result = Task.gatherUnordered(tasks).runAsync(Scheduler.Implicits.global) + val res = Await.result(result, 15.seconds) + assert(res.size == 101) + } + + @Test + def runMultipleCopiesFromDifferentSource() = { + + val to = Files.createTempDirectory("parallel") + + val config = + ParallelOps.CopyConfiguration(100, CopyMode.ReplaceExisting, Set.empty, Set.empty) + val logger = new RecordingLogger() + + val tasks = + for (_ <- 0 to 100) + yield ParallelOps + .copyDirectories(config)( + createRandomDirectory(), + to, + Scheduler.Implicits.global, + enableCancellation = false, + logger + ) + + val result = Task.gatherUnordered(tasks).runAsync(Scheduler.Implicits.global) + val res = Await.result(result, 15.seconds) + assert(res.size == 101) + } + +}