Skip to content

Commit

Permalink
refactor: Change blocked RPC to non-blocked RPC
Browse files Browse the repository at this point in the history
In the case of all remote workers which are not workable, blocked PRCs
not get RPC results until remote workers are workable. Therefore,
remote interface needs to change blocked RPC to non-blocked RPC with
10s. When time-out occurs, remote interface goes to run local dcurl.

Pass test with a broker and remote interface without remote worker.

Close DLTcollab#161
  • Loading branch information
ajblane committed Jun 5, 2019
1 parent 08e1cee commit b6f4c0a
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 17 deletions.
39 changes: 25 additions & 14 deletions src/remote_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "remote_common.h"
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include "common.h"

bool die_on_amqp_error(amqp_rpc_reply_t x, char const *context)
Expand All @@ -16,11 +17,11 @@ bool die_on_amqp_error(amqp_rpc_reply_t x, char const *context)
return true;

case AMQP_RESPONSE_NONE:
ddprintf("%s: missing RPC reply type!\n", context);
ddprintf(MSG_PREFIX "%s: missing RPC reply type!\n", context);
break;

case AMQP_RESPONSE_LIBRARY_EXCEPTION:
ddprintf("%s: %s\n", context,
ddprintf(MSG_PREFIX "%s: %s\n", context,
amqp_error_string2(x.library_error));
break;

Expand All @@ -29,20 +30,20 @@ bool die_on_amqp_error(amqp_rpc_reply_t x, char const *context)
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m =
(amqp_connection_close_t *) x.reply.decoded;
ddprintf("%s: server connection error %uh, message: %.*s\n",
ddprintf(MSG_PREFIX "%s: server connection error %uh, message: %.*s\n",
context, m->reply_code, (int) m->reply_text.len,
(char *) m->reply_text.bytes);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
ddprintf("%s: server channel error %uh, message: %.*s\n",
ddprintf(MSG_PREFIX "%s: server channel error %uh, message: %.*s\n",
context, m->reply_code, (int) m->reply_text.len,
(char *) m->reply_text.bytes);
break;
}
default:
ddprintf("%s: unknown server error, method id 0x%08X\n",
ddprintf(MSG_PREFIX "%s: unknown server error, method id 0x%08X\n",
context, x.reply.id);
break;
}
Expand All @@ -55,7 +56,7 @@ bool die_on_amqp_error(amqp_rpc_reply_t x, char const *context)
bool die_on_error(int x, char const *context)
{
if (x < 0) {
ddprintf("%s: %s\n", context, amqp_error_string2(x));
ddprintf(MSG_PREFIX "%s: %s\n", context, amqp_error_string2(x));
return false;
}

Expand All @@ -70,7 +71,7 @@ bool connect_broker(amqp_connection_state_t *conn)
*conn = amqp_new_connection();
socket = amqp_tcp_socket_new(*conn);
if (amqp_socket_open(socket, HOSTNAME, 5672) != AMQP_STATUS_OK) {
ddprintf("%s\n", "The rabbitmq broker is closed");
ddprintf(MSG_PREFIX "%s\n", "The rabbitmq broker is closed");
goto destroy_connection;
}

Expand Down Expand Up @@ -186,7 +187,11 @@ bool wait_response_message(amqp_connection_state_t *conn,
* AMQP_FRAME_BODY*/
for (;;) {
amqp_maybe_release_buffers(*conn);
amqp_simple_wait_frame(*conn, &frame);

struct timeval t = {10, 0}; /* RPC timeout: 10s*/
if(!die_on_error(amqp_simple_wait_frame_noblock(*conn, &frame, &t), "RPC timeout"))
return false;

if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait method frame"))
return false;

Expand All @@ -209,12 +214,15 @@ bool wait_response_message(amqp_connection_state_t *conn,
#endif

amqp_maybe_release_buffers(*conn);
amqp_simple_wait_frame(*conn, &frame);

if(!die_on_error(amqp_simple_wait_frame_noblock(*conn, &frame, &t), "RPC timeout"))
return false;

if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait header frame"))
return false;

if (frame.frame_type != AMQP_FRAME_HEADER) {
ddprintf("Unexpected header!");
ddprintf(MSG_PREFIX "Unexpected header!");
return false;
}

Expand All @@ -225,25 +233,28 @@ bool wait_response_message(amqp_connection_state_t *conn,
(char *) p->content_type.bytes);
}
#endif
ddprintf("---\n");
ddprintf(MSG_PREFIX "---\n");

body_target = (size_t) frame.payload.properties.body_size;
body_received = 0;
while (body_received < body_target) {
amqp_simple_wait_frame(*conn, &frame);

if(!die_on_error(amqp_simple_wait_frame_noblock(*conn, &frame, &t), "RPC timeout"))
return false;

if (!die_on_amqp_error(amqp_get_rpc_reply(*conn),
"Wait body frame"))
return false;

if (frame.frame_type != AMQP_FRAME_BODY) {
ddprintf("Unexpected body");
ddprintf(MSG_PREFIX "Unexpected body");
return false;
}

body_received += frame.payload.body_fragment.len;
}
if (body_received != body_target) {
ddprintf("Received body is small than body target");
ddprintf(MSG_PREFIX "Received body is small than body target");
return false;
}

Expand Down
6 changes: 3 additions & 3 deletions src/remote_interface.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,20 @@ bool PoWValidation(int8_t *output_trytes, int mwm)
{
Trytes_t *trytes_t = initTrytes(output_trytes, TRANSACTION_TRYTES_LENGTH);
if (!trytes_t) {
ddprintf("PoW Validation: Initialization of Trytes fails\n");
ddprintf(MSG_PREFIX "PoW Validation: Initialization of Trytes fails\n");
goto fail_to_inittrytes;
}

Trytes_t *hash_trytes = hashTrytes(trytes_t);
if (!hash_trytes) {
ddprintf("PoW Validation: Hashing trytes fails\n");
ddprintf(MSG_PREFIX "PoW Validation: Hashing trytes fails\n");
goto fail_to_hashtrytes;
}

Trits_t *ret_trits = trits_from_trytes(hash_trytes);
for (int i = 243 - 1; i >= 243 - mwm; i--) {
if (ret_trits->data[i] != 0) {
ddprintf("PoW Validation fails\n");
ddprintf(MSG_PREFIX "PoW Validation fails\n");
goto fail_to_validation;
}
}
Expand Down

0 comments on commit b6f4c0a

Please sign in to comment.