From 08395e0c8cf992b7bcbfecea6f4d584b28087452 Mon Sep 17 00:00:00 2001 From: jorgee Date: Wed, 30 Oct 2024 08:13:56 +0100 Subject: [PATCH 1/3] Include file sizes after task execution Signed-off-by: jorgee --- .../executor/BashWrapperBuilder.groovy | 12 ++++++++++ .../nextflow/processor/TaskHandler.groovy | 8 +++++++ .../groovy/nextflow/processor/TaskRun.groovy | 1 + .../nextflow/trace/TraceFileObserver.groovy | 4 +++- .../groovy/nextflow/trace/TraceRecord.groovy | 16 +++++++++++++- .../nextflow/executor/command-trace.txt | 22 +++++++++++++++++++ 6 files changed, 61 insertions(+), 2 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy index 6637d7a9b7..37a5551838 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy @@ -367,6 +367,17 @@ class BashWrapperBuilder { binding.stdout_file = TaskRun.CMD_OUTFILE binding.stderr_file = TaskRun.CMD_ERRFILE binding.trace_file = TaskRun.CMD_TRACE + binding.sizes_file = TaskRun.CMD_SIZES + if( bean.inputFiles.size() > 0){ + binding.inputs_list = bean.inputFiles.keySet().join(" ") + } else { + binding.inputs_list = "" + } + if( bean.outputFiles.size() > 0 ){ + binding.outputs_list = bean.outputFiles.join(" ") + } else { + binding.outputs_list = "" + } binding.trace_cmd = getTraceCommand(interpreter) binding.launch_cmd = getLaunchCommand(interpreter,env) @@ -755,6 +766,7 @@ class BashWrapperBuilder { protected String getUnstageCommand() { 'nxf_unstage' } + protected String getUnstageControls() { def result = copyFileToWorkDir(TaskRun.CMD_OUTFILE) + ' || true' + ENDL result += copyFileToWorkDir(TaskRun.CMD_ERRFILE) + ' || true' + ENDL diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy index 1c0c4dc438..36afb794ec 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy @@ -219,6 +219,14 @@ abstract class TaskHandler { catch( NoSuchFileException e ) { // ignore it } + def sizesFile = task.workDir?.resolve(TaskRun.CMD_SIZES) + try { + if(sizesFile) record.parseSizesFile(sizesFile) + } + catch( NoSuchFileException e ) { + // ignore it + } + catch( IOException e ) { log.debug "[WARN] Cannot read trace file: $file -- Cause: ${e.message}" } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy index f3926c0b60..930c0e9539 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy @@ -570,6 +570,7 @@ class TaskRun implements Cloneable { static final public String CMD_RUN = '.command.run' static final public String CMD_STAGE = '.command.stage' static final public String CMD_TRACE = '.command.trace' + static final public String CMD_SIZES = '.command.filesize.yaml' static final public String CMD_ENV = '.command.env' diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceFileObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceFileObserver.groovy index 2a37920e41..981b072d29 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceFileObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceFileObserver.groovy @@ -57,7 +57,9 @@ class TraceFileObserver implements TraceObserver { 'peak_rss', 'peak_vmem', 'rchar', - 'wchar' + 'wchar', + 'inputs', + 'outputs' ] List formats diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy index 33c4a5d510..4c20153117 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy @@ -16,6 +16,8 @@ package nextflow.trace +import groovy.yaml.YamlSlurper + import java.nio.file.Path import java.util.regex.Pattern @@ -101,7 +103,9 @@ class TraceRecord implements Serializable { vol_ctxt: 'num', // -- /proc/$pid/status field 'voluntary_ctxt_switches' inv_ctxt: 'num', // -- /proc/$pid/status field 'nonvoluntary_ctxt_switches' hostname: 'str', - cpu_model: 'str' + cpu_model: 'str', + inputs: 'str', + outputs: 'str' ] static public Map> FORMATTER = [ @@ -465,6 +469,16 @@ class TraceRecord implements Serializable { return this } + TraceRecord parseSizesFile(Path file){ + def yaml = new YamlSlurper().parse(file) + if( yaml.getAt('inputs') != null ) { + this.put('inputs', yaml.getAt('inputs')) + } + if( yaml.getAt('outputs') != null ) { + this.put('outputs', yaml.getAt('outputs')) + } + } + private TraceRecord parseLegacy( Path file, List lines) { String[] header = null for( int count=0; count> $trace_file } +nxf_files_size() { + local sizes_file={{sizes_file}} + local inputs="{{inputs_list}}" + local outputs="{{outputs_list}}" + if [ ! -z "$outputs" ]; then + echo "outputs: " >> $sizes_file + for f in $outputs ; do + echo -e "- name: $f " >> $sizes_file + echo -e " size: $( du -s -b $f | awk '{print $1}')" >> $sizes_file + done + fi + if [ ! -z "$inputs" ]; then + echo "inputs: " >> $sizes_file + for f in $inputs ; do + echo -e "- name: $f " >> $sizes_file + echo -e " size: $( du -s -L -b $f | awk '{print $1}')" >> $sizes_file + done + fi +} + + nxf_trace_mac() { local start_millis=$(nxf_date) @@ -227,4 +248,5 @@ nxf_trace() { else nxf_trace_linux fi + nxf_files_size } From dd38251f54936e37dea95e0f63dfa69a2762ce72 Mon Sep 17 00:00:00 2001 From: jorgee Date: Wed, 30 Oct 2024 09:52:02 +0100 Subject: [PATCH 2/3] include trace changes in tests Signed-off-by: jorgee --- .../trace/TraceFileObserverTest.groovy | 5 +++++ .../executor/test-bash-wrapper-with-trace.txt | 22 +++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/modules/nextflow/src/test/groovy/nextflow/trace/TraceFileObserverTest.groovy b/modules/nextflow/src/test/groovy/nextflow/trace/TraceFileObserverTest.groovy index af7c092094..a139336b13 100644 --- a/modules/nextflow/src/test/groovy/nextflow/trace/TraceFileObserverTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/trace/TraceFileObserverTest.groovy @@ -186,11 +186,14 @@ class TraceFileObserverTest extends Specification { record.peak_vmem = 30_000 * 1024 record.rchar = 30_000 * 1024 record.wchar = 10_000 * 1024 + record.inputs = [ [name: "in_file_1", size: 123456], [name: "in_file_2", size: 654321] ] + record.outputs = [ [name: "out_file_1", size: 123456] ] when: def trace = [:] as TraceFileObserver def result = trace.render(record).split('\t') then: + result.size() == 16 result[0] == '30' // task id result[1] == '43d7ef' // hash result[2] == '2000' // native id @@ -205,6 +208,8 @@ class TraceFileObserverTest extends Specification { result[11] == '29.3 MB' // peak_vmem result[12] == '29.3 MB' // rchar result[13] == '9.8 MB' // wchar + result[14] == '[[name:in_file_1, size:123456], [name:in_file_2, size:654321]]' // inputs + result[15] == '[[name:out_file_1, size:123456]]' // outputs } diff --git a/modules/nextflow/src/test/resources/nextflow/executor/test-bash-wrapper-with-trace.txt b/modules/nextflow/src/test/resources/nextflow/executor/test-bash-wrapper-with-trace.txt index ef1380e7cf..a62f20dc3f 100644 --- a/modules/nextflow/src/test/resources/nextflow/executor/test-bash-wrapper-with-trace.txt +++ b/modules/nextflow/src/test/resources/nextflow/executor/test-bash-wrapper-with-trace.txt @@ -120,6 +120,27 @@ nxf_write_trace() { echo "write_bytes=${io_stat1[5]}" >> $trace_file } +nxf_files_size() { + local sizes_file=.command.filesize.yaml + local inputs="" + local outputs="" + if [ ! -z "$outputs" ]; then + echo "outputs: " >> $sizes_file + for f in $outputs ; do + echo -e "- name: $f " >> $sizes_file + echo -e " size: $( du -s -b $f | awk '{print $1}')" >> $sizes_file + done + fi + if [ ! -z "$inputs" ]; then + echo "inputs: " >> $sizes_file + for f in $inputs ; do + echo -e "- name: $f " >> $sizes_file + echo -e " size: $( du -s -L -b $f | awk '{print $1}')" >> $sizes_file + done + fi +} + + nxf_trace_mac() { local start_millis=$(nxf_date) @@ -197,6 +218,7 @@ nxf_trace() { else nxf_trace_linux fi + nxf_files_size } nxf_sleep() { From 4202754630e31e83dfc1ad382c97c6d3d36bcdc1 Mon Sep 17 00:00:00 2001 From: jorgee Date: Tue, 5 Nov 2024 12:23:16 +0100 Subject: [PATCH 3/3] include json format in trace file Signed-off-by: jorgee --- .../nextflow/trace/TraceFileObserver.groovy | 51 ++++++- .../groovy/nextflow/trace/TraceRecord.groovy | 26 +++- .../trace/TraceFileObserverTest.groovy | 144 +++++++++++++++++- 3 files changed, 206 insertions(+), 15 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceFileObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceFileObserver.groovy index 981b072d29..7c948d25f0 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceFileObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceFileObserver.groovy @@ -38,6 +38,8 @@ import nextflow.processor.TaskProcessor @CompileStatic class TraceFileObserver implements TraceObserver { + enum RENDERER {TEXT, JSON} + public static final String DEF_FILE_NAME = "trace-${TraceHelper.launchTimestampFmt()}.txt" /** @@ -94,6 +96,10 @@ class TraceFileObserver implements TraceObserver { private boolean useRawNumber + private RENDERER renderer = RENDERER.TEXT + + private int numRecords = 0 + void setFields( List entries ) { def names = TraceRecord.FIELDS.keySet() @@ -186,6 +192,9 @@ class TraceFileObserver implements TraceObserver { */ TraceFileObserver( Path traceFile ) { this.tracePath = traceFile + if( tracePath.extension.toUpperCase() == RENDERER.JSON.toString() ){ + this.renderer = RENDERER.JSON + } } /** ONLY FOR TESTING PURPOSE */ @@ -209,7 +218,8 @@ class TraceFileObserver implements TraceObserver { // launch the agent writer = new Agent(traceFile) - writer.send { traceFile.println(fields.join(separator)); traceFile.flush() } + initTraceFile() + } /** @@ -223,11 +233,26 @@ class TraceFileObserver implements TraceObserver { writer.await() // write the remaining records - current.values().each { record -> traceFile.println(render(record)) } + current.values().each { record -> render(traceFile, record) } + finishTraceFile() traceFile.flush() traceFile.close() } + private void initTraceFile(){ + if(renderer == RENDERER.JSON) { + writer.send{ traceFile.print("[")} + } else { + // When renderer is TEXT we need to write the header line with the fields + writer.send { traceFile.println(fields.join(separator)); traceFile.flush() } + } + } + + private void finishTraceFile(){ + if (renderer == RENDERER.JSON){ + traceFile.print("]") + } + } @Override void onProcessCreate(TaskProcessor process) { @@ -269,7 +294,7 @@ class TraceFileObserver implements TraceObserver { current.remove(taskId) // save to the file - writer.send { PrintWriter it -> it.println(render(trace)); it.flush() } + writer.send { PrintWriter it -> render(it, trace); it.flush() } } @Override @@ -280,7 +305,7 @@ class TraceFileObserver implements TraceObserver { } // save to the file - writer.send { PrintWriter it -> it.println(render( trace )); it.flush() } + writer.send { PrintWriter it -> render(it, trace); it.flush() } } /** @@ -289,13 +314,27 @@ class TraceFileObserver implements TraceObserver { * @param trace * @return */ - String render(TraceRecord trace) { + void render(PrintWriter it, TraceRecord trace) { assert trace - trace.renderText(fields, formats, separator) + if ( this.renderer == RENDERER.JSON) { + StringBuilder sb = new StringBuilder() + if (numRecords > 0){ + it.println(",") + } + it.print(trace.renderJson(sb, fields, formats)) + } else { + if (numRecords > 0){ + it.println("") + } + it.print(trace.renderText(fields, formats, separator)) + } + } @Override boolean enableMetrics() { return true } + + } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy index 4c20153117..35ea85c445 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy @@ -16,6 +16,7 @@ package nextflow.trace +import groovy.json.JsonOutput import groovy.yaml.YamlSlurper import java.nio.file.Path @@ -104,8 +105,8 @@ class TraceRecord implements Serializable { inv_ctxt: 'num', // -- /proc/$pid/status field 'nonvoluntary_ctxt_switches' hostname: 'str', cpu_model: 'str', - inputs: 'str', - outputs: 'str' + inputs: 'obj', + outputs: 'obj' ] static public Map> FORMATTER = [ @@ -321,7 +322,7 @@ class TraceRecord implements Serializable { * @param converter A converter string * @return A string value formatted according the specified converter */ - String getFmtStr( String name, String converter = null ) { + String getFmtStr( String name, String converter = null, boolean json = false) { assert name def val = store.get(name) @@ -342,13 +343,26 @@ class TraceRecord implements Serializable { if( !type ) throw new IllegalArgumentException("Not a valid trace field name: '$name'") + if( type == 'obj') { + if (json) { + // render object in json + return JsonOutput.toJson(val) + } else { + return val.toString() + } + } def formatter = FORMATTER.get(type) if( !formatter ) throw new IllegalArgumentException("Not a valid trace formatter for field: '$name' with type: '$type'") try { - return formatter.call(val,sFormat) + def result = formatter.call(val,sFormat) + if( json ){ + return '"' + StringEscapeUtils.escapeJavaScript(result ?: NA) + '"' + } else { + return result + } } catch( Throwable e ) { log.debug "Not a valid trace value -- field: '$name'; value: '$val'; format: '$sFormat'" @@ -391,8 +405,8 @@ class TraceRecord implements Serializable { if( i ) result << ',' String name = fields[i] String format = i> new Session() + task.processor.getName() >> 'x' + task.processor.getExecutor() >> Mock(Executor) + task.processor.getProcessEnvironment() >> [:] + + def handler = new NopeTaskHandler(task) + def now = System.currentTimeMillis() + + // the observer class under test + def observer = new TraceFileObserver(file) + + when: + observer.onFlowCreate(null) + then: + observer.current.isEmpty() + + when: + handler.status = TaskStatus.SUBMITTED + observer.onProcessSubmit( handler, handler.getTraceRecord() ) + def record = observer.current.get(TaskId.of(111)) + then: + observer.separator == '\t' + record.taskId == 111L + record.name == 'simple_task' + record.submit >= now + record.start == 0 + observer.current.containsKey(TaskId.of(111)) + + when: + sleep 50 + handler.status = TaskStatus.RUNNING + observer.onProcessStart( handler, handler.getTraceRecord() ) + record = observer.current.get(TaskId.of(111)) + then: + record.start >= record.submit + observer.current.containsKey(TaskId.of(111)) + + when: + sleep 50 + handler.status = TaskStatus.COMPLETED + handler.task.exitStatus = 127 + observer.onProcessComplete(handler, handler.getTraceRecord()) + then: + !(observer.current.containsKey(TaskId.of(111))) + + when: + record = handler.getTraceRecord() + observer.onFlowComplete() + def result = new JsonSlurper().parse(file) as List + + then: + result[0].task_id == '111' // task-id + result[0].hash == 'fe/ca28af' // hash + result[0].native_id == '-' // native-id + result[0].name == 'simple_task' // process name + result[0].status == TaskStatus.COMPLETED.toString() + result[0].exit == '127' // exist-status + result[0].submit == TraceRecord.fmtDate(record.submit,null) // submit time + result[0].duration == new Duration(record.complete -record.submit).toString() // wall-time + result[0].realtime == new Duration(record.complete -record.start).toString() // run-time + + cleanup: + testFolder.deleteDir() + + } + def 'test render'() { @@ -191,7 +269,9 @@ class TraceFileObserverTest extends Specification { when: def trace = [:] as TraceFileObserver - def result = trace.render(record).split('\t') + StringWriter out = new StringWriter() + trace.render(new PrintWriter(out), record) + def result = out.toString().split('\t') then: result.size() == 16 result[0] == '30' // task id @@ -213,6 +293,60 @@ class TraceFileObserverTest extends Specification { } + def 'test render json'() { + + given: + def record = new TraceRecord() + record.task_id = 30 + record.hash = '43d7ef' + record.native_id = '2000' + record.name = 'hello' + record.status = TaskStatus.COMPLETED + record.exit = 99 + record.start = 1408714875000 + record.submit = 1408714874000 + record.complete = 1408714912000 + record.duration = 1408714912000 - 1408714874000 + record.realtime = 1408714912000 - 1408714875000 + record.'%cpu' = 17.50f + record.peak_rss = 10_000 * 1024 + record.peak_vmem = 30_000 * 1024 + record.rchar = 30_000 * 1024 + record.wchar = 10_000 * 1024 + record.inputs = [ [name: "in_file_1", size: 123456], [name: "in_file_2", size: 654321] ] + record.outputs = [ [name: "out_file_1", size: 123456] ] + + when: + def trace = [renderer: TraceFileObserver.RENDERER.JSON] as TraceFileObserver + StringWriter out = new StringWriter() + trace.render(new PrintWriter(out), record) + Map result = new JsonSlurper().parseText(out.toString()) as Map + then: + result.task_id == '30' // task id + result.hash == '43d7ef' // hash + result.native_id == '2000' // native id + result.name == 'hello' // name + result.status == 'COMPLETED' // status + result.exit == '99' // exit status + result.submit == '2014-08-22 13:41:14.000' // submit + result.duration == '38s' // wall-time + result.realtime == '37s' // run-time + result['%cpu'] == '17.5%' // cpu + result.peak_rss == '9.8 MB' // peak_rss + result.peak_vmem == '29.3 MB' // peak_vmem + result.rchar == '29.3 MB' // rchar + result.wchar == '9.8 MB' // wchar + result.inputs.size() == 2 + result.inputs[0].name == 'in_file_1' + result.inputs[0].size == 123456 + result.inputs[1].name == 'in_file_2' + result.inputs[1].size == 654321 + result.outputs.size() == 1 + result.outputs[0].name == 'out_file_1' + result.outputs[0].size == 123456 + + } + def 'test custom render' () { @@ -226,7 +360,9 @@ class TraceFileObserverTest extends Specification { when: def trace = [:] as TraceFileObserver trace.setFieldsAndFormats( 'task_id,syscr,syscw,rss,rss:num' ) - def result = trace.render(record).split('\t') + StringWriter out = new StringWriter() + trace.render(new PrintWriter(out), record) + def result = out.toString().split('\t') then: result[0] == '5' result[1] == '10' @@ -266,7 +402,9 @@ class TraceFileObserverTest extends Specification { def trace = [:] as TraceFileObserver trace.setFieldsAndFormats('task_id,hash,native_id,name,status,exit,submit,duration,realtime,%cpu,rss,vmem,peak_rss,peak_vmem,rchar,wchar,syscr,syscw,duration:num,realtime:num,rss:num,vmem:num,peak_rss:num,peak_vmem:num,rchar:num,wchar:num,queue') - def result = trace.render(record).split('\t') + StringWriter out = new StringWriter() + trace.render(new PrintWriter(out), record) + def result = out.toString().split('\t') then: result[0] == '3'