From ac04d04d615dc4518c69b39bdd1dc9fd5eaa9cc2 Mon Sep 17 00:00:00 2001 From: jorgee Date: Fri, 20 Dec 2024 20:24:02 +0100 Subject: [PATCH] fix minor issue Signed-off-by: jorgee --- .../src/main/groovy/nextflow/Session.groovy | 4 +-- .../processor/PublishOffloadManager.groovy | 28 ++++++++++--------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 6c5485f6a0..908faf9591 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -294,7 +294,7 @@ class Session implements ISession { FilePorter getFilePorter() { filePorter } - int publishOffloadBatchSize + private int publishOffloadBatchSize private PublishOffloadManager publishOffloadManager @@ -406,7 +406,7 @@ class Session implements ISession { if ( this.publishOffloadBatchSize ) { // -- publish offload manager config log.warn("Publish offload flag enabled. Creating Offload Manager") - this.publishOffloadManager = new PublishOffloadManager(this) + this.publishOffloadManager = new PublishOffloadManager(this, publishOffloadBatchSize) } } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy index 01aed46d6c..ff9cdf5e25 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy @@ -40,15 +40,17 @@ class PublishOffloadManager { static final String PUBLISH_FUNCTION = 'nxf_publish' private Session session private PublishTaskProcessor publishProcessor - private List commands = new LinkedList(); - private boolean closed = false; + private List commands = new LinkedList() + private boolean closed = false + private int batchSize /** * Unique offloaded index number */ final protected AtomicInteger indexCount = new AtomicInteger() - PublishOffloadManager(Session session) { + PublishOffloadManager(Session session, int batchSize) { this.session = session; + this.batchSize = batchSize; } @PackageScope TaskProcessor getPublishProcessor(){ publishProcessor } @@ -64,7 +66,7 @@ class PublishOffloadManager { } private boolean checkOffload(Path source, Path destination, String executor){ - return session.publishOffloadBatchSize > 0 && source.scheme in SUPPORTED_SCHEMES[executor] && destination.scheme in SUPPORTED_SCHEMES[executor]; + return this.batchSize > 0 && source.scheme in SUPPORTED_SCHEMES[executor] && destination.scheme in SUPPORTED_SCHEMES[executor]; } private void invokeProcessor(inputValue) { @@ -77,12 +79,12 @@ class PublishOffloadManager { publishProcessor.invokeTask(args.toArray()) } - private synchronized boolean tryOffload(String command, Path origin, Path destination, PublishRetryConfig retryConfig, boolean failonError){ + private synchronized boolean tryOffload(String command, Path origin, Path destination, PublishRetryConfig retryConfig, boolean failOnError){ if (checkOffload(origin, destination, publishProcessor.executor.name)) { final id = indexCount.incrementAndGet() - runningPublications.put(id, Nextflow.tuple(origin, destination, failonError)) + runningPublications.put(id, Nextflow.tuple(origin, destination, failOnError)) commands.add(generateExecutionCommand(id, command, origin, destination, retryConfig)) - if (commands.size() == session.publishOffloadBatchSize){ + if (commands.size() == this.batchSize){ invokeProcessor(commands.join(";")) commands.clear() } @@ -112,20 +114,20 @@ class PublishOffloadManager { } } - boolean tryMoveOffload(Path origin, Path destination, PublishRetryConfig retryConfig, boolean failonError) { + boolean tryMoveOffload(Path origin, Path destination, PublishRetryConfig retryConfig, boolean failOnError) { String command = 'mv' if ( useS5cmd() ) { command = 's5cmd mv' } - tryOffload(command, origin, destination, retryConfig, failonError) + tryOffload(command, origin, destination, retryConfig, failOnError) } - boolean tryCopyOffload(Path origin, Path destination, PublishRetryConfig retryConfig, boolean failonError) { + boolean tryCopyOffload(Path origin, Path destination, PublishRetryConfig retryConfig, boolean failOnError) { String command = 'cp' if ( useS5cmd() ) { command = 's5cmd cp' } - tryOffload(command, origin, destination, retryConfig, failonError) + tryOffload(command, origin, destination, retryConfig, failOnError) } private PublishTaskProcessor createProcessor( String name, BodyDef body){ @@ -183,8 +185,8 @@ class PublishTaskProcessor extends TaskProcessor{ if (result.size() == 2) { final id = result[0] as Integer final tuple = manager.runningPublications.remove(id) - final exitcode = result[1] as Integer - if( exitcode == 0 ){ + final exitCode = result[1] as Integer + if( exitCode == 0 ){ session.notifyFilePublish((Path) tuple.get(0), (Path) tuple.get(1)) } else { if (tuple.get(2) as Boolean) {