Skip to content

Commit

Permalink
more connection callback prints
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Nov 27, 2023
1 parent 1d063f2 commit c3f31ae
Showing 1 changed file with 55 additions and 38 deletions.
93 changes: 55 additions & 38 deletions bin/mqtt5_canary/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,10 @@ static void s_ParseOptions(int argc, char **argv, struct AppCtx &ctx, struct Aws
static void s_Mqtt5CanaryUpdateTpsSleepTime(struct AwsMqtt5CanaryTesterOptions *testerOptions)
{

testerOptions->tpsSleepTime = testerOptions->tps == 0 ? 0 :
(aws_timestamp_convert(1, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL) / testerOptions->tps);
testerOptions->tpsSleepTime =
testerOptions->tps == 0
? 0
: (aws_timestamp_convert(1, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL) / testerOptions->tps);
}

static void s_AwsMqtt5CanaryInitTesterOptions(struct AwsMqtt5CanaryTesterOptions *testerOptions)
Expand Down Expand Up @@ -272,7 +274,7 @@ struct AwsMqtt5CanaryStatistic
uint64_t unsub_attempt;
uint64_t unsub_succeed;
uint64_t unsub_failed;
}g_statistic;
} g_statistic;

struct AwsMqtt5CanaryTestClient
{
Expand Down Expand Up @@ -386,6 +388,7 @@ static int s_AwsMqtt5CanaryOperationStart(struct AwsMqtt5CanaryTestClient *testC
// If the connection operation failed eventually, "withClientConnectionFailureCallback"
// will set the flag to false.
testClient->isConnected = true;
fprintf(stderr, "ID:%s client started ... ", testClient->clientId.c_str());
return AWS_OP_SUCCESS;
}
fprintf(stderr, "ID:%s client start failed ", testClient->clientId.c_str());
Expand All @@ -410,8 +413,8 @@ static int s_AwsMqtt5CanaryOperationStop(struct AwsMqtt5CanaryTestClient *testCl
return AWS_OP_ERR;
}

OnSubscribeCompletionHandler subscribe_completion = [](int errorcode, std::shared_ptr<SubAckPacket>){
if(errorcode != 0)
OnSubscribeCompletionHandler subscribe_completion = [](int errorcode, std::shared_ptr<SubAckPacket>) {
if (errorcode != 0)
{
++g_statistic.subscribe_failed;
fprintf(stderr, "Subscribe failed with errorcode: %d, %s\n", errorcode, aws_error_str(errorcode));
Expand All @@ -420,7 +423,6 @@ OnSubscribeCompletionHandler subscribe_completion = [](int errorcode, std::share
++g_statistic.subscribe_succeed;
};


static int s_AwsMqtt5CanaryOperationSubscribe(struct AwsMqtt5CanaryTestClient *testClient, Allocator *allocator)
{
if (!testClient->isConnected)
Expand Down Expand Up @@ -455,7 +457,7 @@ static int s_AwsMqtt5CanaryOperationSubscribe(struct AwsMqtt5CanaryTestClient *t

++g_statistic.totalOperations;
++g_statistic.subscribe_attempt;
if (testClient->client->Subscribe(packet,subscribe_completion))
if (testClient->client->Subscribe(packet, subscribe_completion))
{
return AWS_OP_SUCCESS;
}
Expand Down Expand Up @@ -500,7 +502,7 @@ static int s_AwsMqtt5CanaryOperationUnsubscribeBad(struct AwsMqtt5CanaryTestClie
AWS_LOGF_INFO(AWS_LS_MQTT5_CANARY, "ID:%s Unsubscribe Bad", testClient->clientId.c_str());
return AWS_OP_SUCCESS;
}
++g_statistic.unsub_failed;
++g_statistic.unsub_failed;
AWS_LOGF_ERROR(AWS_LS_MQTT5_CANARY, "ID:%s Unsubscribe Bad Operation Failed", testClient->clientId.c_str());
return AWS_OP_ERR;
}
Expand Down Expand Up @@ -542,15 +544,14 @@ static int s_AwsMqtt5CanaryOperationUnsubscribe(struct AwsMqtt5CanaryTestClient
return AWS_OP_ERR;
}

OnPublishCompletionHandler publish_completion = [](int errorcode, std::shared_ptr<PublishResult>){
if(errorcode != 0)
OnPublishCompletionHandler publish_completion = [](int errorcode, std::shared_ptr<PublishResult>) {
if (errorcode != 0)
{
++g_statistic.publish_failed;
fprintf(stderr, "Publish failed with errorcode: %d, %s\n", errorcode, aws_error_str(errorcode));
return;
}
++g_statistic.publish_succeed;

};

/* Help function for Publish Operation. Do not call it directly for operations. */
Expand Down Expand Up @@ -592,7 +593,6 @@ static int s_AwsMqtt5CanaryOperationPublish(
.WithUserProperty(std::move(up2))
.WithUserProperty(std::move(up3));


++g_statistic.totalOperations;
++g_statistic.publish_attempt;
if (testClient->client->Publish(packetPublish, publish_completion))
Expand Down Expand Up @@ -933,19 +933,24 @@ int main(int argc, char **argv)
eventData.negotiatedSettings->getClientId().length());
clients[i].settings = eventData.negotiatedSettings;

fprintf(stderr, "ID:%s client Connection Success ", clients[i].clientId.c_str());
AWS_LOGF_INFO(
AWS_LS_MQTT5_CANARY, "ID:%s Lifecycle Event: Connection Success", clients[i].clientId.c_str());
});

mqtt5Options.WithClientConnectionFailureCallback([&clients, i](const OnConnectionFailureEventData &eventData) {
clients[i].isConnected = false;
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CANARY,
"ID:%s Connection failed with Error Code: %d(%s)",
clients[i].clientId.c_str(),
eventData.errorCode,
aws_error_debug_str(eventData.errorCode));
});
mqtt5Options.WithClientConnectionFailureCallback(
[&clients, i](const OnConnectionFailureEventData &eventData) {
clients[i].isConnected = false;
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CANARY,
"ID:%s Connection failed with Error Code: %d(%s)",
clients[i].clientId.c_str(),
eventData.errorCode,
aws_error_debug_str(eventData.errorCode));

fprintf(stderr, "ID:%s client Connection failed with error code %d(%s)", clients[i].clientId.c_str(),eventData.errorCode,
aws_error_debug_str(eventData.errorCode));
});

mqtt5Options.WithClientDisconnectionCallback([&clients, i](const OnDisconnectionEventData &) {
clients[i].isConnected = false;
Expand Down Expand Up @@ -1004,7 +1009,10 @@ int main(int argc, char **argv)

if (now > timeTestFinish)
{
fprintf(stderr," Operating TPS average over test: %zu\n\n", operationsExecuted / testerOptions.testRunSeconds);
fprintf(
stderr,
" Operating TPS average over test: %zu\n\n",
operationsExecuted / testerOptions.testRunSeconds);
done = true;
}

Expand Down Expand Up @@ -1033,22 +1041,31 @@ int main(int argc, char **argv)
}
}

fprintf(stderr, "Final Statistic: \n"
"total operations: %" PRId64 "\n"
"tps: %" PRId64 "\n"
"subscribe attempt: %" PRId64 "\n"
"subscribe succeed: %" PRId64 "\n"
"subscribe failed: %" PRId64 "\n"
"publish attempt: %" PRId64 "\n"
"publish succeed: %" PRId64 "\n"
"publish failed: %" PRId64 "\n"
"unsub attempt: %" PRId64 "\n"
"unsub succeed: %" PRId64 "\n"
"unsub failed: %" PRId64 "\n",
g_statistic.totalOperations, g_statistic.totalOperations/testerOptions.testRunSeconds,
g_statistic.subscribe_attempt, g_statistic.subscribe_succeed, g_statistic.subscribe_failed,
g_statistic.publish_attempt, g_statistic.publish_succeed, g_statistic.publish_failed,
g_statistic.unsub_attempt, g_statistic.unsub_succeed, g_statistic.unsub_failed);
fprintf(
stderr,
"Final Statistic: \n"
"total operations: %" PRId64 "\n"
"tps: %" PRId64 "\n"
"subscribe attempt: %" PRId64 "\n"
"subscribe succeed: %" PRId64 "\n"
"subscribe failed: %" PRId64 "\n"
"publish attempt: %" PRId64 "\n"
"publish succeed: %" PRId64 "\n"
"publish failed: %" PRId64 "\n"
"unsub attempt: %" PRId64 "\n"
"unsub succeed: %" PRId64 "\n"
"unsub failed: %" PRId64 "\n",
g_statistic.totalOperations,
g_statistic.totalOperations / testerOptions.testRunSeconds,
g_statistic.subscribe_attempt,
g_statistic.subscribe_succeed,
g_statistic.subscribe_failed,
g_statistic.publish_attempt,
g_statistic.publish_succeed,
g_statistic.publish_failed,
g_statistic.unsub_attempt,
g_statistic.unsub_succeed,
g_statistic.unsub_failed);
}

aws_mem_tracer_destroy(allocator);
Expand Down

0 comments on commit c3f31ae

Please sign in to comment.