From c92b21f1e1529e4082fd005abe9bb06ce4a3b03d Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Thu, 11 Jul 2024 12:46:09 -0500 Subject: [PATCH] FIx race condition with workflow events Signed-off-by: Ben Sherman --- .../main/nextflow/prov/ProvObserver.groovy | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/plugins/nf-prov/src/main/nextflow/prov/ProvObserver.groovy b/plugins/nf-prov/src/main/nextflow/prov/ProvObserver.groovy index 658de6d..f508959 100644 --- a/plugins/nf-prov/src/main/nextflow/prov/ProvObserver.groovy +++ b/plugins/nf-prov/src/main/nextflow/prov/ProvObserver.groovy @@ -19,6 +19,8 @@ package nextflow.prov import java.nio.file.FileSystems import java.nio.file.Path import java.nio.file.PathMatcher +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReentrantLock import groovy.transform.CompileStatic import groovy.util.logging.Slf4j @@ -50,6 +52,8 @@ class ProvObserver implements TraceObserver { private Map workflowOutputs = [:] + private Lock lock = new ReentrantLock() + ProvObserver(Map formats, List patterns) { this.renderers = formats.collect( (name, config) -> createRenderer(name, config) ) this.matchers = patterns.collect( pattern -> @@ -82,24 +86,27 @@ class ProvObserver implements TraceObserver { if( !task.isSuccess() ) return - tasks << task + lock.withLock { + tasks << task + } } @Override void onProcessCached(TaskHandler handler, TraceRecord trace) { - tasks << handler.task + lock.withLock { + tasks << handler.task + } } @Override void onFilePublish(Path destination, Path source) { - boolean match = matchers.isEmpty() || matchers.any { matcher -> - matcher.matches(destination) - } - + final match = matchers.isEmpty() || matchers.any { matcher -> matcher.matches(destination) } if( !match ) return - workflowOutputs[source] = destination + lock.withLock { + workflowOutputs[source] = destination + } } @Override @@ -107,9 +114,8 @@ class ProvObserver implements TraceObserver { if( !session.isSuccess() ) return - renderers.each( renderer -> + for( final renderer : renderers ) renderer.render(session, tasks, workflowOutputs) - ) } }