Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass environment file in the Wait response to the scheduler #689

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions compiler/orchestrator_runtime/speculative/speculative_runtime.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ pash_redir_output echo "$$: (1) Bash variables saved in: $pash_runtime_shell_var
## Once the scheduler determines if there are environment changes, it can then
## decide to rerun or not the speculated commands with the new environment.


## Determine all current loop iterations and send them to the scheduler
pash_loop_iter_counters=${pash_loop_iters:-None}
pash_redir_output echo "$$: Loop node iteration counters: $pash_loop_iter_counters"

## Send and receive from daemon
msg="Wait:${pash_speculative_command_id}|Loop iters:${pash_loop_iter_counters}"
msg="Wait:${pash_speculative_command_id}|Loop iters:${pash_loop_iter_counters}|Variables file:${pash_runtime_shell_variables_file}"
daemon_response=$(pash_spec_communicate_scheduler "$msg") # Blocking step, daemon will not send response until it's safe to continue

## Receive an exit code
Expand All @@ -32,6 +31,25 @@ if [[ "$daemon_response" == *"OK:"* ]]; then
cmd_exit_code=${response_args[1]}
output_variable_file=${response_args[2]}
stdout_file=${response_args[3]}

## TODO: Restore the variables (doesn't work currently because variables are printed using `env`)
pash_redir_output echo "$$: (2) Recovering script variables from: $output_variable_file"
# source "$RUNTIME_DIR/pash_source_declare_vars.sh" "$output_variable_file"

pash_redir_output echo "$$: (2) Recovering stdout from: $stdout_file"
cat "${stdout_file}"
elif [[ "$daemon_response" == *"UNSAFE:"* ]]; then
pash_redir_output echo "$$: (2) Scheduler responded: $daemon_response"
pash_redir_output echo "$$: (2) Executing command: $pash_speculative_command_id"
## Execute the command.
## KK 2023-06-01 Does `eval` work in general? We need to be precise
## about which commands are unsafe to determine how to execute them.
cmd=$(cat "$PASH_SPEC_NODE_DIRECTORY/$pash_speculative_command_id")
## KK 2023-06-01 Not sure if this shellcheck warning must be resolved:
## > note: Double quote to prevent globbing and word splitting.
# shellcheck disable=SC2086
eval $cmd
cmd_exit_code=$?
elif [ -z "$daemon_response" ]; then
## Trouble... Daemon crashed, rip
pash_redir_output echo "$$: ERROR: (2) Scheduler crashed!"
Expand All @@ -47,11 +65,5 @@ pash_redir_output echo "$$: (2) Scheduler returned exit code: ${cmd_exit_code} f

pash_runtime_final_status=${cmd_exit_code}

## TODO: Restore the variables (doesn't work currently because variables are printed using `env`)
pash_redir_output echo "$$: (2) Recovering script variables from: $output_variable_file"
# source "$RUNTIME_DIR/pash_source_declare_vars.sh" "$output_variable_file"

pash_redir_output echo "$$: (2) Recovering stdout from: $stdout_file"
cat "${stdout_file}"

## TODO: Also need to use wrap_vars maybe to `set` properly etc
26 changes: 15 additions & 11 deletions runtime/r_split.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ int find_new_line_pivot(char *buffer, int start_pos, int end_pos, bool backward)
if (backward) {
for (int i = end_pos; i >= start_pos; i--) {
if (buffer[i] == '\n')
return i;
return i;
}
}
else {
for (int i = start_pos; i <= end_pos; i++) {
if (buffer[i] == '\n')
return i;
return i;
}
}
return -1;
Expand Down Expand Up @@ -73,7 +73,7 @@ void SplitByLines(FILE *inputFile, int batchSize, FILE *outputFiles[], unsigned
if (len < full_payload) {
init_batch_size = len/numOutputFiles;
}

for (current_file_id = 0; current_file_id < numOutputFiles; current_file_id++) {
outputFile = outputFiles[current_file_id];
int next_start = 0;
Expand All @@ -89,7 +89,7 @@ void SplitByLines(FILE *inputFile, int batchSize, FILE *outputFiles[], unsigned
next_start = pivot + 1;
blockSize = next_start - start_pos;
}

} else {
// Process output for last node
blockSize = len - start_pos;
Expand All @@ -98,22 +98,22 @@ void SplitByLines(FILE *inputFile, int batchSize, FILE *outputFiles[], unsigned
} else {
is_last = false;
}

}

if (add_header)
writeHeader(outputFile, id, blockSize, is_last);

safeWriteWithFlush(init_buffer + start_pos, 1, blockSize, outputFile);

start_pos = next_start;

if (is_last) {
id += 1;
}
}

// This will wrap around if the last chunk was complete
// This will wrap around if the last chunk was complete
// otherwise keep pointer on the last file
if (is_last) {
current_file_id = 0;
Expand Down Expand Up @@ -146,7 +146,7 @@ void SplitByLines(FILE *inputFile, int batchSize, FILE *outputFiles[], unsigned
headSize = len;
blockSize = prevRestSize + headSize;
if (add_header) {
if (feof(inputFile))
if (feof(inputFile))
writeHeader(outputFile, id, blockSize, 1);
else
writeHeader(outputFile, id, blockSize, 0);
Expand Down Expand Up @@ -283,8 +283,12 @@ int main(int argc, char *argv[])
// flags: -b to use bytes (batch_size will be exact number of bytes instead of approximating to the closest line)
if (argc < 4)
{
// TODO print usage string
fprintf(stderr, "missing input!\n");
// TODO: document -r flag
fprintf(stderr,
"\n"
"Usage: %s [-b] [-r] input_file batch_size output_file_1 output_file_2 [output_file_3 ...]\n\n"
" -b: use bytes (batch_size will be exact number of bytes instead of approximating to the closest line)\n\n",
argv[0]);
exit(1);
}
bool useBytes = 0, offset = 0, raw = 0;
Expand Down