Skip to content

Commit

Permalink
WQ RemoteTask Cleanup (#3600)
Browse files Browse the repository at this point in the history
* Do not check in serverless_function.py which is generated by poncho_package_serverize

* Split out functions for send/recv message.

* Use send_message for sending responses.

* Only encode to utf-8 in one place.

* Increase invocation line to 4KB just in case.

* - Use rpipe and wpipe for pipe FDs.
- Create rpipestream and wpipestream for buffered I/O.

* json.dumps

* use binary mode for string, just convert data portion

* Run on Darwin

* Fix up stream types correctly.

* Fix up handling of coprocess invocation in several ways:
- Consistent handling of timeouts with seconds.
- Fixed gonzo bug in work_queue_process_write_link that sent duplicate data.
- Failure to connect or invoke now results in task failure instead of worker failure.

* Only write output if it was returned.

* format

* Make coprocess resource errors a notice.  (This was the silent error on osx.)

* Go back to skip on osx.

* format

* Increase resource consumption for remotetask test.

* Spelling

* Listen on any port for greater flexibility.
  • Loading branch information
dthain authored Jan 12, 2024
1 parent 2bcd138 commit c53286b
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 324 deletions.
160 changes: 94 additions & 66 deletions poncho/src/poncho/wq_network_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,24 @@ def wq_network_code():
import json
import os
import sys

# If enabled, coprocess will print to stdout
debug_mode = False

# Send a message on a binary I/O stream by sending the message length and then the (string) message.
def send_message(stream, data):
size = len(data)
size_msg = "{}\n".format(size)
stream.write(size_msg)
stream.write(data)

# Receive a standard message from a binary I/O stream by reading length and then returning the (string) message
def recv_message(stream):
line = stream.readline()
length = int(line)
return stream.read(length)

# Decorator for remotely execution functions to package things as json.
def remote_execute(func):
def remote_wrapper(event):
kwargs = event["fn_kwargs"]
Expand All @@ -27,96 +45,106 @@ def remote_wrapper(event):
return response
return remote_wrapper

read, write = os.pipe()
def send_configuration(config):
config_string = json.dumps(config)
config_cmd = f"{len(config_string) + 1}\n{config_string}\n"
sys.stdout.write(config_cmd)
sys.stdout.flush()
# Main loop of coprocess for executing network functions.
def main():
# Listen on an arbitrary port to be reported to the worker.
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# modify the port argument to be 0 to listen on an arbitrary port
s.bind(('localhost', 0))
except Exception as e:
s.close()
print(e, file=sys.stderr)
sys.exit(1)
# information to print to stdout for worker

# Inform the worker of name and port for later connection.
config = {
"name": name(), # noqa: F821
"port": s.getsockname()[1],
}
send_configuration(config)
send_message(sys.stdout,json.dumps(config))
sys.stdout.flush()

# Remember original working directory b/c we change for each invocation.
abs_working_dir = os.getcwd()

# Create pipe for communication with child process
rpipe, wpipe = os.pipe()
rpipestream = os.fdopen(rpipe,"r")

while True:
s.listen()
conn, addr = s.accept()
print('Network function: connection from {}'.format(addr), file=sys.stderr)
connstream = conn.makefile("rw",encoding="utf-8")

if debug_mode:
print('Network function: connection from {}'.format(addr), file=sys.stderr)

while True:
# peek at message to find newline to get the size
event_size = None
line = conn.recv(100, socket.MSG_PEEK)
eol = line.find(b'\n')
if eol >= 0:
size = eol+1
# actually read the size of the event
input_spec = conn.recv(size).decode('utf-8').split()
function_name = input_spec[0]
task_id = int(input_spec[1])
event_size = int(input_spec[2])
# Read the invocation header from the worker
line = connstream.readline()

# If end of file, then break out and accept again
if not line:
break

# Parse the invocation header.
input_spec = line.split()
function_name = input_spec[0]
task_id = int(input_spec[1])
event_size = int(input_spec[2])

# then read the contents of the event itself
event_str = connstream.read(event_size)
event = json.loads(event_str)
exec_method = event.get("remote_task_exec_method", None)

try:
if event_size:
# receive the bytes containing the event and turn it into a string
event_str = conn.recv(event_size).decode("utf-8")
# turn the event into a python dictionary
event = json.loads(event_str)
# see if the user specified an execution method
exec_method = event.get("remote_task_exec_method", None)
os.chdir(os.path.join(abs_working_dir, f't.{task_id}'))
if exec_method == "direct":
response = json.dumps(globals()[function_name](event)).encode("utf-8")
else:
p = os.fork()
if p == 0:
response =globals()[function_name](event)
os.write(write, json.dumps(response).encode("utf-8"))
os._exit(0)
elif p < 0:
# First move to target directory (is undone in finally block)
os.chdir(os.path.join(abs_working_dir, f't.{task_id}'))

# Then invoke function by desired method, resulting in
# response containing the text representation of the result.

if exec_method == "direct":
response = json.dumps(globals()[function_name](event))
else:
p = os.fork()
if p == 0:
response = globals()[function_name](event)
wpipestream = os.fdopen(wpipe,"w")
send_message(wpipestream,json.dumps(response))
wpipestream.flush()
os._exit(0)
elif p < 0:
if debug_mode:
print(f'Network function: unable to fork to execute {function_name}', file=sys.stderr)
response = {
"Result": "unable to fork",
"StatusCode": 500
}
else:
max_read = 65536
chunk = os.read(read, max_read).decode("utf-8")
all_chunks = [chunk]
while (len(chunk) >= max_read):
chunk = os.read(read, max_read).decode("utf-8")
all_chunks.append(chunk)
response = "".join(all_chunks).encode("utf-8")
os.waitpid(p, 0)
response_size = len(response)
size_msg = "{}\n".format(response_size)
# send the size of response
conn.sendall(size_msg.encode('utf-8'))
# send response
conn.sendall(response)
break
response = {
"Result": "unable to fork",
"StatusCode": 500
}
response = json.dumps(response)
else:
# Get response string from child process.
response = recv_message(rpipestream)
# Wait for child process to complete
os.waitpid(p, 0)

# At this point, response is set to a value one way or the other

except Exception as e:
print("Network function encountered exception ", str(e), file=sys.stderr)
if debug_mode:
print("Network function encountered exception ", str(e), file=sys.stderr)
response = {
'Result': f'network function encountered exception {e}',
'Status Code': 500
}
response = json.dumps(response).encode('utf-8')
response_size = len(response)
size_msg = "{}\n".format(response_size)
# send the size of response
conn.sendall(size_msg.encode('utf-8'))
# send response
conn.sendall(response)
response = json.dumps(response)
finally:
# Restore the working directory, no matter how the function ended.
os.chdir(abs_working_dir)

# Send response string back to parent worker process.
send_message(connstream,response)
connstream.flush()

return 0
148 changes: 48 additions & 100 deletions work_queue/src/work_queue_coprocess.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,80 +26,41 @@ See the file COPYING for details.
#include "stringtools.h"
#include "xxmalloc.h"

static int coprocess_max_timeout = 1000 * 60 * 5; // set max timeout to 5 minutes
static time_t coprocess_connect_timeout = 60; // max time to connect, one minute
static time_t coprocess_execute_timeout = 3600; // max time to execute, one hour

int work_queue_coprocess_write_to_link(char *buffer, int len, int timeout, struct link* link)
{
timestamp_t curr_time = timestamp_get();
timestamp_t stoptime = curr_time + timeout;

int bytes_sent = link_printf(link, stoptime, "%s %d\n", buffer, len);
if(bytes_sent < 0) {
fatal("could not send input data size: %s", strerror(errno));
}

// send actual data
bytes_sent = link_write(link, buffer, len, stoptime);
if(bytes_sent < 0) {
fatal("could not send input data: %s", strerror(errno));
}
return bytes_sent;
}

int work_queue_coprocess_read_from_link(char *buffer, int len, int timeout, struct link* link){

timestamp_t curr_time = timestamp_get();
timestamp_t stoptime = curr_time + timeout;
/* Read back a fixed size message consisting of a length header, then the data itself. */

char len_buffer[WORK_QUEUE_LINE_MAX];
int length, poll_result, bytes_read = 0;

struct link_info coprocess_link_info[] = {{link, LINK_READ, stoptime}};
poll_result = link_poll(coprocess_link_info, sizeof(coprocess_link_info) / sizeof(coprocess_link_info[0]), stoptime);
if (poll_result == 0) {
debug(D_WQ, "No data to read from coprocess\n");
char * work_queue_coprocess_read_message( struct link *link, time_t stoptime )
{
char line[WORK_QUEUE_LINE_MAX];
int length;

if(!link_readline(link, line, WORK_QUEUE_LINE_MAX, stoptime)) {
return 0;
}

sscanf(line, "%d", &length);

char *buffer = malloc(length+1);

link_readline(link, len_buffer, WORK_QUEUE_LINE_MAX, stoptime);
sscanf(len_buffer, "%d", &length);

int current_bytes_read = 0;
while (1)
{
current_bytes_read = link_read(link, buffer + bytes_read, length - bytes_read, stoptime);
if (current_bytes_read < 0) {
debug(D_WQ, "Read from coprocess link failed\n");
return -1;
}
else if (current_bytes_read == 0) {
debug(D_WQ, "Read from coprocess link failed: pipe closed\n");
return -1;
}
bytes_read += current_bytes_read;
if (bytes_read == length) {
break;
}
}
int result = link_read(link,buffer,length,stoptime);

if (bytes_read < 0)
{
debug(D_WQ, "Read from coprocess failed: %s\n", strerror(errno));
return -1;
if(result==length) {
buffer[length] = 0;
return buffer;
} else {
free(buffer);
return 0;
}
buffer[bytes_read] = '\0';

return bytes_read;
}

int work_queue_coprocess_setup(struct work_queue_coprocess *coprocess)
{
int bytes_read = 0;
char buffer[WORK_QUEUE_LINE_MAX];
char *name = NULL;
char *name = NULL;

bytes_read = work_queue_coprocess_read_from_link(buffer, WORK_QUEUE_LINE_MAX, coprocess_max_timeout, coprocess->read_link);
if (bytes_read < 0) {
char * buffer = work_queue_coprocess_read_message(coprocess->read_link,time(0)+coprocess_connect_timeout);
if(!buffer) {
fatal("Unable to get information from coprocess\n");
}

Expand Down Expand Up @@ -134,6 +95,8 @@ int work_queue_coprocess_setup(struct work_queue_coprocess *coprocess)

coprocess->name = name;

free(buffer);

return 0;
}

Expand Down Expand Up @@ -199,52 +162,37 @@ int work_queue_coprocess_check(struct work_queue_coprocess *coprocess)
return 1;
}

char *work_queue_coprocess_run(const char *function_name, const char *function_input, struct work_queue_coprocess *coprocess, int task_id) {
char addr[DOMAIN_NAME_MAX];
int timeout = 60000000; // one minute, can be changed
/* Invoke a function by connecting, sending the invocation, and reading back the result. */

if(!domain_name_lookup("localhost", addr)) {
fatal("could not lookup address of localhost");
}
char *work_queue_coprocess_run(const char *function_name, const char *function_input, struct work_queue_coprocess *coprocess, int task_id)
{
int connect_stoptime = time(0) + coprocess_connect_timeout;
int execute_stoptime = time(0) + coprocess_execute_timeout;

timestamp_t curr_time = timestamp_get();
time_t stoptime = curr_time + timeout;

int connected = 0;
int tries = 0;
// retry connection for ~30 seconds
while(!connected && tries < 30) {
coprocess->network_link = link_connect(addr, coprocess->port, stoptime);
if(coprocess->network_link) {
connected = 1;
} else {
tries++;
sleep(1);
}
}
// if we can't connect at all, abort
/* Connect to the coprocess if we haven't already done so. */
if(!coprocess->network_link) {
fatal("connection error: %s", strerror(errno));
coprocess->network_link = link_connect("127.0.0.1",coprocess->port,connect_stoptime);
if(!coprocess->network_link) {
debug(D_WQ,"failed to connect to coprocess: %s",strerror(errno));
return 0;
}
}

curr_time = timestamp_get();
stoptime = curr_time + timeout;
int bytes_sent = link_printf(coprocess->network_link, stoptime, "%s %d %ld\n", function_name, task_id, strlen(function_input));
if(bytes_sent < 0) {
fatal("could not send input data size: %s", strerror(errno));
}
/* Send the invocation header indicating the function name and length of input. */
link_printf(coprocess->network_link, connect_stoptime, "%s %d %ld\n", function_name, task_id, strlen(function_input));
/* Followed by the function_input itself. */
link_write(coprocess->network_link, function_input, strlen(function_input), connect_stoptime );

char *buffer = calloc(WORK_QUEUE_LINE_MAX, sizeof(char));
strcpy(buffer, function_input);
work_queue_coprocess_write_to_link(buffer, strlen(function_input), timeout, coprocess->network_link);
/* Read back the result buffer with a longer timeout. */
char *result = work_queue_coprocess_read_message(coprocess->network_link,execute_stoptime);

memset(buffer, 0, WORK_QUEUE_LINE_MAX * sizeof(char));
if (work_queue_coprocess_read_from_link(buffer, WORK_QUEUE_LINE_MAX, timeout, coprocess->network_link) < 0) {
free(buffer);
return NULL;
/* If the invocation did not work, close the link and return failure. */
if(!result) {
link_close(coprocess->network_link);
coprocess->network_link = 0;
}

return buffer;
return result;
}

struct work_queue_coprocess *work_queue_coprocess_find_state(struct work_queue_coprocess *coprocess_info, int number_of_coprocesses, work_queue_coprocess_state_t state) {
Expand All @@ -256,7 +204,7 @@ struct work_queue_coprocess *work_queue_coprocess_find_state(struct work_queue_c
return NULL;
}

struct work_queue_coprocess *work_queue_coprocess_initalize_all_coprocesses(int coprocess_cores, int coprocess_memory, int coprocess_disk, int coprocess_gpus, struct work_queue_resources *total_resources, struct work_queue_resources *coprocess_resources, char *coprocess_command, int number_of_coprocess_instances) {
struct work_queue_coprocess *work_queue_coprocess_initialize_all_coprocesses(int coprocess_cores, int coprocess_memory, int coprocess_disk, int coprocess_gpus, struct work_queue_resources *total_resources, struct work_queue_resources *coprocess_resources, char *coprocess_command, int number_of_coprocess_instances) {
if (number_of_coprocess_instances <= 0) return NULL;
int coprocess_cores_normalized = ( (coprocess_cores > 0) ? coprocess_cores : total_resources->cores.total);
int coprocess_memory_normalized = ( (coprocess_memory > 0) ? coprocess_memory : total_resources->memory.total);
Expand Down
Loading

0 comments on commit c53286b

Please sign in to comment.