From b6f4c0a4733384d88931a757cf7e7ec6a5f539a1 Mon Sep 17 00:00:00 2001 From: ajubuntu Date: Tue, 4 Jun 2019 19:08:20 -0700 Subject: [PATCH] refactor: Change blocked RPC to non-blocked RPC In the case of all remote workers which are not workable, blocked PRCs not get RPC results until remote workers are workable. Therefore, remote interface needs to change blocked RPC to non-blocked RPC with 10s. When time-out occurs, remote interface goes to run local dcurl. Pass test with a broker and remote interface without remote worker. Close #161 --- src/remote_common.c | 39 +++++++++++++++++++++++++-------------- src/remote_interface.c | 6 +++--- 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/src/remote_common.c b/src/remote_common.c index d9dd902..a9c8b92 100644 --- a/src/remote_common.c +++ b/src/remote_common.c @@ -7,6 +7,7 @@ #include "remote_common.h" #include #include +#include #include "common.h" bool die_on_amqp_error(amqp_rpc_reply_t x, char const *context) @@ -16,11 +17,11 @@ bool die_on_amqp_error(amqp_rpc_reply_t x, char const *context) return true; case AMQP_RESPONSE_NONE: - ddprintf("%s: missing RPC reply type!\n", context); + ddprintf(MSG_PREFIX "%s: missing RPC reply type!\n", context); break; case AMQP_RESPONSE_LIBRARY_EXCEPTION: - ddprintf("%s: %s\n", context, + ddprintf(MSG_PREFIX "%s: %s\n", context, amqp_error_string2(x.library_error)); break; @@ -29,20 +30,20 @@ bool die_on_amqp_error(amqp_rpc_reply_t x, char const *context) case AMQP_CONNECTION_CLOSE_METHOD: { amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded; - ddprintf("%s: server connection error %uh, message: %.*s\n", + ddprintf(MSG_PREFIX "%s: server connection error %uh, message: %.*s\n", context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes); break; } case AMQP_CHANNEL_CLOSE_METHOD: { amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded; - ddprintf("%s: server channel error %uh, message: %.*s\n", + ddprintf(MSG_PREFIX "%s: server channel error %uh, message: %.*s\n", context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes); break; } default: - ddprintf("%s: unknown server error, method id 0x%08X\n", + ddprintf(MSG_PREFIX "%s: unknown server error, method id 0x%08X\n", context, x.reply.id); break; } @@ -55,7 +56,7 @@ bool die_on_amqp_error(amqp_rpc_reply_t x, char const *context) bool die_on_error(int x, char const *context) { if (x < 0) { - ddprintf("%s: %s\n", context, amqp_error_string2(x)); + ddprintf(MSG_PREFIX "%s: %s\n", context, amqp_error_string2(x)); return false; } @@ -70,7 +71,7 @@ bool connect_broker(amqp_connection_state_t *conn) *conn = amqp_new_connection(); socket = amqp_tcp_socket_new(*conn); if (amqp_socket_open(socket, HOSTNAME, 5672) != AMQP_STATUS_OK) { - ddprintf("%s\n", "The rabbitmq broker is closed"); + ddprintf(MSG_PREFIX "%s\n", "The rabbitmq broker is closed"); goto destroy_connection; } @@ -186,7 +187,11 @@ bool wait_response_message(amqp_connection_state_t *conn, * AMQP_FRAME_BODY*/ for (;;) { amqp_maybe_release_buffers(*conn); - amqp_simple_wait_frame(*conn, &frame); + + struct timeval t = {10, 0}; /* RPC timeout: 10s*/ + if(!die_on_error(amqp_simple_wait_frame_noblock(*conn, &frame, &t), "RPC timeout")) + return false; + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait method frame")) return false; @@ -209,12 +214,15 @@ bool wait_response_message(amqp_connection_state_t *conn, #endif amqp_maybe_release_buffers(*conn); - amqp_simple_wait_frame(*conn, &frame); + + if(!die_on_error(amqp_simple_wait_frame_noblock(*conn, &frame, &t), "RPC timeout")) + return false; + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait header frame")) return false; if (frame.frame_type != AMQP_FRAME_HEADER) { - ddprintf("Unexpected header!"); + ddprintf(MSG_PREFIX "Unexpected header!"); return false; } @@ -225,25 +233,28 @@ bool wait_response_message(amqp_connection_state_t *conn, (char *) p->content_type.bytes); } #endif - ddprintf("---\n"); + ddprintf(MSG_PREFIX "---\n"); body_target = (size_t) frame.payload.properties.body_size; body_received = 0; while (body_received < body_target) { - amqp_simple_wait_frame(*conn, &frame); + + if(!die_on_error(amqp_simple_wait_frame_noblock(*conn, &frame, &t), "RPC timeout")) + return false; + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait body frame")) return false; if (frame.frame_type != AMQP_FRAME_BODY) { - ddprintf("Unexpected body"); + ddprintf(MSG_PREFIX "Unexpected body"); return false; } body_received += frame.payload.body_fragment.len; } if (body_received != body_target) { - ddprintf("Received body is small than body target"); + ddprintf(MSG_PREFIX "Received body is small than body target"); return false; } diff --git a/src/remote_interface.c b/src/remote_interface.c index 76f0c6f..08a8d28 100644 --- a/src/remote_interface.c +++ b/src/remote_interface.c @@ -66,20 +66,20 @@ bool PoWValidation(int8_t *output_trytes, int mwm) { Trytes_t *trytes_t = initTrytes(output_trytes, TRANSACTION_TRYTES_LENGTH); if (!trytes_t) { - ddprintf("PoW Validation: Initialization of Trytes fails\n"); + ddprintf(MSG_PREFIX "PoW Validation: Initialization of Trytes fails\n"); goto fail_to_inittrytes; } Trytes_t *hash_trytes = hashTrytes(trytes_t); if (!hash_trytes) { - ddprintf("PoW Validation: Hashing trytes fails\n"); + ddprintf(MSG_PREFIX "PoW Validation: Hashing trytes fails\n"); goto fail_to_hashtrytes; } Trits_t *ret_trits = trits_from_trytes(hash_trytes); for (int i = 243 - 1; i >= 243 - mwm; i--) { if (ret_trits->data[i] != 0) { - ddprintf("PoW Validation fails\n"); + ddprintf(MSG_PREFIX "PoW Validation fails\n"); goto fail_to_validation; } }