From ab94edb383181d80af1c8a452ec8139cae36844e Mon Sep 17 00:00:00 2001 From: ajubuntu Date: Tue, 4 Jun 2019 20:46:20 -0700 Subject: [PATCH] refactor: Change blocked RPC to non-blocked RPC In the case of all remote workers which are not workable, blocked PRCs not get RPC results until remote workers are workable. Therefore, remote interface needs to change blocked RPC to non-blocked RPC with 10s. When time-out occurs, remote interface goes to run local dcurl. Pass test with a broker and remote interface without remote worker. Close #161 --- src/remote_common.c | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/remote_common.c b/src/remote_common.c index d9dd902..9a39669 100644 --- a/src/remote_common.c +++ b/src/remote_common.c @@ -7,6 +7,7 @@ #include "remote_common.h" #include #include +#include #include "common.h" bool die_on_amqp_error(amqp_rpc_reply_t x, char const *context) @@ -186,7 +187,12 @@ bool wait_response_message(amqp_connection_state_t *conn, * AMQP_FRAME_BODY*/ for (;;) { amqp_maybe_release_buffers(*conn); - amqp_simple_wait_frame(*conn, &frame); + + struct timeval t = {10, 0}; /* RPC timeout: 10s*/ + if (!die_on_error(amqp_simple_wait_frame_noblock(*conn, &frame, &t), + "RPC timeout")) + return false; + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait method frame")) return false; @@ -209,7 +215,11 @@ bool wait_response_message(amqp_connection_state_t *conn, #endif amqp_maybe_release_buffers(*conn); - amqp_simple_wait_frame(*conn, &frame); + + if (!die_on_error(amqp_simple_wait_frame_noblock(*conn, &frame, &t), + "RPC timeout")) + return false; + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait header frame")) return false; @@ -230,7 +240,10 @@ bool wait_response_message(amqp_connection_state_t *conn, body_target = (size_t) frame.payload.properties.body_size; body_received = 0; while (body_received < body_target) { - amqp_simple_wait_frame(*conn, &frame); + if (!die_on_error(amqp_simple_wait_frame_noblock(*conn, &frame, &t), + "RPC timeout")) + return false; + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait body frame")) return false;