diff --git a/compiler/orchestrator_runtime/speculative/speculative_runtime.sh b/compiler/orchestrator_runtime/speculative/speculative_runtime.sh index b9a188484..c07a13159 100644 --- a/compiler/orchestrator_runtime/speculative/speculative_runtime.sh +++ b/compiler/orchestrator_runtime/speculative/speculative_runtime.sh @@ -15,13 +15,12 @@ pash_redir_output echo "$$: (1) Bash variables saved in: $pash_runtime_shell_var ## Once the scheduler determines if there are environment changes, it can then ## decide to rerun or not the speculated commands with the new environment. - ## Determine all current loop iterations and send them to the scheduler pash_loop_iter_counters=${pash_loop_iters:-None} pash_redir_output echo "$$: Loop node iteration counters: $pash_loop_iter_counters" ## Send and receive from daemon -msg="Wait:${pash_speculative_command_id}|Loop iters:${pash_loop_iter_counters}" +msg="Wait:${pash_speculative_command_id}|Loop iters:${pash_loop_iter_counters}|Variables file:${pash_runtime_shell_variables_file}" daemon_response=$(pash_spec_communicate_scheduler "$msg") # Blocking step, daemon will not send response until it's safe to continue ## Receive an exit code @@ -32,6 +31,25 @@ if [[ "$daemon_response" == *"OK:"* ]]; then cmd_exit_code=${response_args[1]} output_variable_file=${response_args[2]} stdout_file=${response_args[3]} + + ## TODO: Restore the variables (doesn't work currently because variables are printed using `env`) + pash_redir_output echo "$$: (2) Recovering script variables from: $output_variable_file" + # source "$RUNTIME_DIR/pash_source_declare_vars.sh" "$output_variable_file" + + pash_redir_output echo "$$: (2) Recovering stdout from: $stdout_file" + cat "${stdout_file}" +elif [[ "$daemon_response" == *"UNSAFE:"* ]]; then + pash_redir_output echo "$$: (2) Scheduler responded: $daemon_response" + pash_redir_output echo "$$: (2) Executing command: $pash_speculative_command_id" + ## Execute the command. + ## KK 2023-06-01 Does `eval` work in general? We need to be precise + ## about which commands are unsafe to determine how to execute them. + cmd=$(cat "$PASH_SPEC_NODE_DIRECTORY/$pash_speculative_command_id") + ## KK 2023-06-01 Not sure if this shellcheck warning must be resolved: + ## > note: Double quote to prevent globbing and word splitting. + # shellcheck disable=SC2086 + eval $cmd + cmd_exit_code=$? elif [ -z "$daemon_response" ]; then ## Trouble... Daemon crashed, rip pash_redir_output echo "$$: ERROR: (2) Scheduler crashed!" @@ -47,11 +65,5 @@ pash_redir_output echo "$$: (2) Scheduler returned exit code: ${cmd_exit_code} f pash_runtime_final_status=${cmd_exit_code} -## TODO: Restore the variables (doesn't work currently because variables are printed using `env`) -pash_redir_output echo "$$: (2) Recovering script variables from: $output_variable_file" -# source "$RUNTIME_DIR/pash_source_declare_vars.sh" "$output_variable_file" - -pash_redir_output echo "$$: (2) Recovering stdout from: $stdout_file" -cat "${stdout_file}" ## TODO: Also need to use wrap_vars maybe to `set` properly etc diff --git a/runtime/r_split.c b/runtime/r_split.c index 92331fe61..ebe59d31a 100644 --- a/runtime/r_split.c +++ b/runtime/r_split.c @@ -39,13 +39,13 @@ int find_new_line_pivot(char *buffer, int start_pos, int end_pos, bool backward) if (backward) { for (int i = end_pos; i >= start_pos; i--) { if (buffer[i] == '\n') - return i; + return i; } } else { for (int i = start_pos; i <= end_pos; i++) { if (buffer[i] == '\n') - return i; + return i; } } return -1; @@ -73,7 +73,7 @@ void SplitByLines(FILE *inputFile, int batchSize, FILE *outputFiles[], unsigned if (len < full_payload) { init_batch_size = len/numOutputFiles; } - + for (current_file_id = 0; current_file_id < numOutputFiles; current_file_id++) { outputFile = outputFiles[current_file_id]; int next_start = 0; @@ -89,7 +89,7 @@ void SplitByLines(FILE *inputFile, int batchSize, FILE *outputFiles[], unsigned next_start = pivot + 1; blockSize = next_start - start_pos; } - + } else { // Process output for last node blockSize = len - start_pos; @@ -98,14 +98,14 @@ void SplitByLines(FILE *inputFile, int batchSize, FILE *outputFiles[], unsigned } else { is_last = false; } - + } - + if (add_header) writeHeader(outputFile, id, blockSize, is_last); safeWriteWithFlush(init_buffer + start_pos, 1, blockSize, outputFile); - + start_pos = next_start; if (is_last) { @@ -113,7 +113,7 @@ void SplitByLines(FILE *inputFile, int batchSize, FILE *outputFiles[], unsigned } } - // This will wrap around if the last chunk was complete + // This will wrap around if the last chunk was complete // otherwise keep pointer on the last file if (is_last) { current_file_id = 0; @@ -146,7 +146,7 @@ void SplitByLines(FILE *inputFile, int batchSize, FILE *outputFiles[], unsigned headSize = len; blockSize = prevRestSize + headSize; if (add_header) { - if (feof(inputFile)) + if (feof(inputFile)) writeHeader(outputFile, id, blockSize, 1); else writeHeader(outputFile, id, blockSize, 0); @@ -283,8 +283,12 @@ int main(int argc, char *argv[]) // flags: -b to use bytes (batch_size will be exact number of bytes instead of approximating to the closest line) if (argc < 4) { - // TODO print usage string - fprintf(stderr, "missing input!\n"); + // TODO: document -r flag + fprintf(stderr, + "\n" + "Usage: %s [-b] [-r] input_file batch_size output_file_1 output_file_2 [output_file_3 ...]\n\n" + " -b: use bytes (batch_size will be exact number of bytes instead of approximating to the closest line)\n\n", + argv[0]); exit(1); } bool useBytes = 0, offset = 0, raw = 0;