Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include input and output file sizes in the trace #5450

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,17 @@ class BashWrapperBuilder {
binding.stdout_file = TaskRun.CMD_OUTFILE
binding.stderr_file = TaskRun.CMD_ERRFILE
binding.trace_file = TaskRun.CMD_TRACE
binding.sizes_file = TaskRun.CMD_SIZES
if( bean.inputFiles.size() > 0){
binding.inputs_list = bean.inputFiles.keySet().join(" ")
} else {
binding.inputs_list = ""
}
if( bean.outputFiles.size() > 0 ){
binding.outputs_list = bean.outputFiles.join(" ")
} else {
binding.outputs_list = ""
}

binding.trace_cmd = getTraceCommand(interpreter)
binding.launch_cmd = getLaunchCommand(interpreter,env)
Expand Down Expand Up @@ -755,6 +766,7 @@ class BashWrapperBuilder {

protected String getUnstageCommand() { 'nxf_unstage' }


protected String getUnstageControls() {
def result = copyFileToWorkDir(TaskRun.CMD_OUTFILE) + ' || true' + ENDL
result += copyFileToWorkDir(TaskRun.CMD_ERRFILE) + ' || true' + ENDL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ abstract class TaskHandler {
catch( NoSuchFileException e ) {
// ignore it
}
def sizesFile = task.workDir?.resolve(TaskRun.CMD_SIZES)
try {
if(sizesFile) record.parseSizesFile(sizesFile)
}
catch( NoSuchFileException e ) {
// ignore it
}

catch( IOException e ) {
log.debug "[WARN] Cannot read trace file: $file -- Cause: ${e.message}"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ class TaskRun implements Cloneable {
static final public String CMD_RUN = '.command.run'
static final public String CMD_STAGE = '.command.stage'
static final public String CMD_TRACE = '.command.trace'
static final public String CMD_SIZES = '.command.filesize.yaml'
static final public String CMD_ENV = '.command.env'


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import nextflow.processor.TaskProcessor
@CompileStatic
class TraceFileObserver implements TraceObserver {

enum RENDERER {TEXT, JSON}

public static final String DEF_FILE_NAME = "trace-${TraceHelper.launchTimestampFmt()}.txt"

/**
Expand All @@ -57,7 +59,9 @@ class TraceFileObserver implements TraceObserver {
'peak_rss',
'peak_vmem',
'rchar',
'wchar'
'wchar',
'inputs',
'outputs'
]

List<String> formats
Expand Down Expand Up @@ -92,6 +96,10 @@ class TraceFileObserver implements TraceObserver {

private boolean useRawNumber

private RENDERER renderer = RENDERER.TEXT

private int numRecords = 0

void setFields( List<String> entries ) {

def names = TraceRecord.FIELDS.keySet()
Expand Down Expand Up @@ -184,6 +192,9 @@ class TraceFileObserver implements TraceObserver {
*/
TraceFileObserver( Path traceFile ) {
this.tracePath = traceFile
if( tracePath.extension.toUpperCase() == RENDERER.JSON.toString() ){
this.renderer = RENDERER.JSON
}
}

/** ONLY FOR TESTING PURPOSE */
Expand All @@ -207,7 +218,8 @@ class TraceFileObserver implements TraceObserver {

// launch the agent
writer = new Agent<PrintWriter>(traceFile)
writer.send { traceFile.println(fields.join(separator)); traceFile.flush() }
initTraceFile()

}

/**
Expand All @@ -221,11 +233,26 @@ class TraceFileObserver implements TraceObserver {
writer.await()

// write the remaining records
current.values().each { record -> traceFile.println(render(record)) }
current.values().each { record -> render(traceFile, record) }
finishTraceFile()
traceFile.flush()
traceFile.close()
}

private void initTraceFile(){
if(renderer == RENDERER.JSON) {
writer.send{ traceFile.print("[")}
} else {
// When renderer is TEXT we need to write the header line with the fields
writer.send { traceFile.println(fields.join(separator)); traceFile.flush() }
}
}

private void finishTraceFile(){
if (renderer == RENDERER.JSON){
traceFile.print("]")
}
}

@Override
void onProcessCreate(TaskProcessor process) {
Expand Down Expand Up @@ -267,7 +294,7 @@ class TraceFileObserver implements TraceObserver {
current.remove(taskId)

// save to the file
writer.send { PrintWriter it -> it.println(render(trace)); it.flush() }
writer.send { PrintWriter it -> render(it, trace); it.flush() }
}

@Override
Expand All @@ -278,7 +305,7 @@ class TraceFileObserver implements TraceObserver {
}

// save to the file
writer.send { PrintWriter it -> it.println(render( trace )); it.flush() }
writer.send { PrintWriter it -> render(it, trace); it.flush() }
}

/**
Expand All @@ -287,13 +314,27 @@ class TraceFileObserver implements TraceObserver {
* @param trace
* @return
*/
String render(TraceRecord trace) {
void render(PrintWriter it, TraceRecord trace) {
assert trace
trace.renderText(fields, formats, separator)
if ( this.renderer == RENDERER.JSON) {
StringBuilder sb = new StringBuilder()
if (numRecords > 0){
it.println(",")
}
it.print(trace.renderJson(sb, fields, formats))
} else {
if (numRecords > 0){
it.println("")
}
it.print(trace.renderText(fields, formats, separator))
}

}

@Override
boolean enableMetrics() {
return true
}


}
38 changes: 33 additions & 5 deletions modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package nextflow.trace

import groovy.json.JsonOutput
import groovy.yaml.YamlSlurper

import java.nio.file.Path
import java.util.regex.Pattern

Expand Down Expand Up @@ -101,7 +104,9 @@ class TraceRecord implements Serializable {
vol_ctxt: 'num', // -- /proc/$pid/status field 'voluntary_ctxt_switches'
inv_ctxt: 'num', // -- /proc/$pid/status field 'nonvoluntary_ctxt_switches'
hostname: 'str',
cpu_model: 'str'
cpu_model: 'str',
inputs: 'obj',
outputs: 'obj'
]

static public Map<String,Closure<String>> FORMATTER = [
Expand Down Expand Up @@ -317,7 +322,7 @@ class TraceRecord implements Serializable {
* @param converter A converter string
* @return A string value formatted according the specified converter
*/
String getFmtStr( String name, String converter = null ) {
String getFmtStr( String name, String converter = null, boolean json = false) {
assert name
def val = store.get(name)

Expand All @@ -338,13 +343,26 @@ class TraceRecord implements Serializable {
if( !type )
throw new IllegalArgumentException("Not a valid trace field name: '$name'")

if( type == 'obj') {
if (json) {
// render object in json
return JsonOutput.toJson(val)
} else {
return val.toString()
}
}

def formatter = FORMATTER.get(type)
if( !formatter )
throw new IllegalArgumentException("Not a valid trace formatter for field: '$name' with type: '$type'")

try {
return formatter.call(val,sFormat)
def result = formatter.call(val,sFormat)
if( json ){
return '"' + StringEscapeUtils.escapeJavaScript(result ?: NA) + '"'
} else {
return result
}
}
catch( Throwable e ) {
log.debug "Not a valid trace value -- field: '$name'; value: '$val'; format: '$sFormat'"
Expand Down Expand Up @@ -387,8 +405,8 @@ class TraceRecord implements Serializable {
if( i ) result << ','
String name = fields[i]
String format = i<formats?.size() ? formats[i] : null
String value = StringEscapeUtils.escapeJavaScript(getFmtStr(name, format) ?: NA)
result << QUOTE << name << QUOTE << ":" << QUOTE << value << QUOTE
String value = getFmtStr(name, format, true)
result << QUOTE << name << QUOTE << ":" << value
}
result << "}"
return result
Expand Down Expand Up @@ -465,6 +483,16 @@ class TraceRecord implements Serializable {
return this
}

TraceRecord parseSizesFile(Path file){
def yaml = new YamlSlurper().parse(file)
if( yaml.getAt('inputs') != null ) {
this.put('inputs', yaml.getAt('inputs'))
}
if( yaml.getAt('outputs') != null ) {
this.put('outputs', yaml.getAt('outputs'))
}
}

private TraceRecord parseLegacy( Path file, List<String> lines) {
String[] header = null
for( int count=0; count<lines.size(); count++ ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,27 @@ nxf_write_trace() {
echo "write_bytes=${io_stat1[5]}" >> $trace_file
}

nxf_files_size() {
local sizes_file={{sizes_file}}
local inputs="{{inputs_list}}"
local outputs="{{outputs_list}}"
if [ ! -z "$outputs" ]; then
echo "outputs: " >> $sizes_file
for f in $outputs ; do
echo -e "- name: $f " >> $sizes_file
echo -e " size: $( du -s -b $f | awk '{print $1}')" >> $sizes_file
done
fi
if [ ! -z "$inputs" ]; then
echo "inputs: " >> $sizes_file
for f in $inputs ; do
echo -e "- name: $f " >> $sizes_file
echo -e " size: $( du -s -L -b $f | awk '{print $1}')" >> $sizes_file
done
fi
}


nxf_trace_mac() {
local start_millis=$(nxf_date)

Expand Down Expand Up @@ -227,4 +248,5 @@ nxf_trace() {
else
nxf_trace_linux
fi
nxf_files_size
}
Loading