Skip to content

Commit

Permalink
fix: bad chunk size for result upload (#415)
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrasseur-aneo authored Sep 19, 2023
2 parents 866938f + 73188b3 commit c8c4c5c
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void ResultsClient::upload_result_data(const std::string &session_id, const std:
if (!stream->Write(request)) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to continue upload result");
}
payload = payload.substr(maxChunkSize);
payload = payload.substr(chunk.size());
}

if (!stream->WritesDone()) {
Expand Down
8 changes: 4 additions & 4 deletions packages/cpp/ArmoniK.Api.Common/header/options/ComputePlane.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class ComputePlane {
* @param socket_address The socket address to set for the worker.
*/
void set_worker_address(std::string socket_address) {
if (socket_address.find("unix:") != 0) {
socket_address.insert(0, "unix:");
if (socket_address.find("unix://") != 0) {
socket_address.insert(0, "unix://");
}
worker_address_ = std::move(socket_address);
}
Expand All @@ -45,8 +45,8 @@ class ComputePlane {
* @param agent_address The agent address to set for the agent.
*/
void set_agent_address(std::string agent_address) {
if (agent_address.find("unix:") != 0) {
agent_address.insert(0, "unix:");
if (agent_address.find("unix://") != 0) {
agent_address.insert(0, "unix://");
}
agent_address_ = std::move(agent_address);
}
Expand Down
21 changes: 15 additions & 6 deletions packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace logger = armonik::api::common::logger;
* @param channel The gRPC channel to communicate with the server.
* @param default_task_options The default task options.
*/
void init(std::shared_ptr<Channel> &channel, TaskOptions &default_task_options) {
void init(std::shared_ptr<Channel> &channel, TaskOptions &default_task_options, logger::ILogger &logger) {

Configuration configuration;
// auto server = std::make_shared<EnvConfiguration>(configuration_t);
Expand All @@ -54,7 +54,7 @@ void init(std::shared_ptr<Channel> &channel, TaskOptions &default_task_options)

std::string server_address = configuration.get("Grpc__EndPoint");

std::cout << " Server address " << server_address << std::endl;
logger.info(" Server address {address}", {{"address", server_address}});

channel = grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials());

Expand All @@ -77,6 +77,7 @@ void init(std::shared_ptr<Channel> &channel, TaskOptions &default_task_options)
TEST(testMock, createSession) {
// MockStubInterface stub;
std::shared_ptr<Channel> channel;
logger::Logger log{logger::writer_console(), logger::formatter_plain(true)};

ClientContext context;
CreateSessionReply reply;
Expand All @@ -85,7 +86,7 @@ TEST(testMock, createSession) {
const std::vector<std::string> &partition_ids = {""};

TaskOptions task_options;
init(channel, task_options);
init(channel, task_options, log);

ASSERT_EQ(task_options.partition_id(), "");

Expand Down Expand Up @@ -126,7 +127,7 @@ TEST(testMock, submitTask) {
TaskOptions task_options;

std::shared_ptr<Channel> channel;
init(channel, task_options);
init(channel, task_options, log);

// MockStubInterface stub;
std::unique_ptr<Submitter::StubInterface> stub = Submitter::NewStub(channel);
Expand Down Expand Up @@ -190,6 +191,7 @@ TEST(testMock, submitTask) {
}

TEST(testMock, testWorker) {
logger::Logger log{logger::writer_console(), logger::formatter_plain(true)};
std::shared_ptr<Channel> channel;

CreateSessionReply reply;
Expand All @@ -199,7 +201,7 @@ TEST(testMock, testWorker) {

TaskOptions task_options;

init(channel, task_options);
init(channel, task_options, log);

auto stub = armonik::api::grpc::v1::results::Results::NewStub(channel);

Expand Down Expand Up @@ -242,6 +244,7 @@ TEST(testMock, testWorker) {
}

TEST(testMock, getResult) {
logger::Logger log{logger::writer_console(), logger::formatter_plain(true)};
// MockStubInterface stub;
std::shared_ptr<Channel> channel;

Expand All @@ -253,26 +256,31 @@ TEST(testMock, getResult) {
TaskOptions task_options;
armonik::api::grpc::v1::ResultRequest result_request;

init(channel, task_options);
init(channel, task_options, log);

auto stub = armonik::api::grpc::v1::results::Results::NewStub(channel);

grpc::ClientContext context;

log.debug("Creating Client");
std::unique_ptr<Submitter::StubInterface> stub_client = Submitter::NewStub(channel);
armonik::api::client::SubmitterClient submitter(std::move(stub_client));
std::string session_id = submitter.create_session(task_options, partition_ids);
log.debug("Received session id {session_id}", {{"session_id", session_id}});

auto name = "test";

armonik::api::grpc::v1::results::CreateResultsMetaDataRequest request_create;
request_create.set_session_id(session_id);
armonik::api::client::ResultsClient results(armonik::api::grpc::v1::results::Results::NewStub(channel));
auto mapping = results.create_results(session_id, {name});
log.debug("Created result {result_id}", {{"result_id", mapping[name]}});
ASSERT_TRUE(mapping.size() == 1);

std::string payload = "TestPayload";

results.upload_result_data(session_id, mapping[name], payload);
log.debug("Uploaded result {result_id}", {{"result_id", mapping[name]}});

// EXPECT_CALL(*stub, GetServiceConfiguration(_, _, _)).Times(AtLeast(1));
// EXPECT_CALL(*stub, TryGetResultStreamRaw(_, _)).Times(AtLeast(1));
Expand All @@ -281,6 +289,7 @@ TEST(testMock, getResult) {
result_request.set_session(session_id);

auto result = submitter.get_result_async(result_request).get();
log.debug("Received result {result_id}", {{"result_id", mapping[name]}});

ASSERT_FALSE(result.empty());
ASSERT_EQ(payload, result);
Expand Down
4 changes: 4 additions & 0 deletions packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "Worker/ProcessStatus.h"
#include "Worker/TaskHandler.h"
#include "exceptions/ArmoniKApiException.h"
#include "logger/formatter.h"
#include "logger/logger.h"
#include "logger/writer.h"
Expand Down Expand Up @@ -57,6 +58,9 @@ class WorkerServer {
logger.info("Creating worker");
common::options::ComputePlane compute_plane(configuration);

logger.info("Worker address : " + compute_plane.get_server_address());
logger.info("Agent address : " + compute_plane.get_agent_address());

builder_.AddListeningPort(compute_plane.get_server_address(), ::grpc::InsecureServerCredentials());
builder_.SetMaxReceiveMessageSize(-1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ armonik::api::worker::ArmoniKWorker::Process([[maybe_unused]] ::grpc::ServerCont
}
*response->mutable_output() = std::move(output);
} catch (const std::exception &e) {
return {::grpc::StatusCode::UNAVAILABLE, "Error processing task", e.what()};
logger_.error("Error processing task : {what}", {{"what", e.what()}});
std::stringstream ss;
ss << "Error processing task : " << e.what();
return {::grpc::StatusCode::UNAVAILABLE, ss.str(), e.what()};
}

return ::grpc::Status::OK;
Expand Down
5 changes: 3 additions & 2 deletions packages/cpp/tools/Dockerfile.worker
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ RUN apk update && apk add --no-cache \
grpc \
grpc-dev \
protobuf \
protobuf-dev \
libfmt-dev
protobuf-dev

# Update the PATH environment variable to include the gRPC libraries and binaries
ENV LD_LIBRARY_PATH="/app/install/lib:$LD_LIBRARY_PATH"
Expand All @@ -39,6 +38,8 @@ COPY ./packages/cpp/ArmoniK.Api.Worker ./ArmoniK.Api.Worker
COPY ./packages/cpp/ArmoniK.Api.Worker.Tests ./ArmoniK.Api.Worker.Tests
COPY ./packages/cpp/CMakeLists.txt ./
COPY ./packages/cpp/Dependencies.cmake ./
COPY ./packages/cpp/Packaging.cmake ./
COPY ./packages/cpp/tools/packaging/. ./tools/packaging/.

# Copy the Protocol Buffer definition files into the image
WORKDIR /app/proto
Expand Down

0 comments on commit c8c4c5c

Please sign in to comment.