Skip to content

Commit

Permalink
refactor: Change blocked RPC to non-blocked RPC
Browse files Browse the repository at this point in the history
In the case of all remote workers which are not workable, blocked PRCs
not get RPC results until remote workers are workable. Therefore,
remote interface needs to change blocked RPC to non-blocked RPC with
10s. When time-out occurs, remote interface goes to run local dcurl.

Pass test with a broker and remote interface without remote worker.

Close DLTcollab#161
  • Loading branch information
ajblane committed Jun 5, 2019
1 parent 08e1cee commit ab94edb
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions src/remote_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "remote_common.h"
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include "common.h"

bool die_on_amqp_error(amqp_rpc_reply_t x, char const *context)
Expand Down Expand Up @@ -186,7 +187,12 @@ bool wait_response_message(amqp_connection_state_t *conn,
* AMQP_FRAME_BODY*/
for (;;) {
amqp_maybe_release_buffers(*conn);
amqp_simple_wait_frame(*conn, &frame);

struct timeval t = {10, 0}; /* RPC timeout: 10s*/
if (!die_on_error(amqp_simple_wait_frame_noblock(*conn, &frame, &t),
"RPC timeout"))
return false;

if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait method frame"))
return false;

Expand All @@ -209,7 +215,11 @@ bool wait_response_message(amqp_connection_state_t *conn,
#endif

amqp_maybe_release_buffers(*conn);
amqp_simple_wait_frame(*conn, &frame);

if (!die_on_error(amqp_simple_wait_frame_noblock(*conn, &frame, &t),
"RPC timeout"))
return false;

if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait header frame"))
return false;

Expand All @@ -230,7 +240,10 @@ bool wait_response_message(amqp_connection_state_t *conn,
body_target = (size_t) frame.payload.properties.body_size;
body_received = 0;
while (body_received < body_target) {
amqp_simple_wait_frame(*conn, &frame);
if (!die_on_error(amqp_simple_wait_frame_noblock(*conn, &frame, &t),
"RPC timeout"))
return false;

if (!die_on_amqp_error(amqp_get_rpc_reply(*conn),
"Wait body frame"))
return false;
Expand Down

0 comments on commit ab94edb

Please sign in to comment.