diff --git a/packages/cpp/ArmoniK.Api.Client/source/submitter/ResultsClient.cpp b/packages/cpp/ArmoniK.Api.Client/source/submitter/ResultsClient.cpp index ad354fdd2..4162c0ce1 100644 --- a/packages/cpp/ArmoniK.Api.Client/source/submitter/ResultsClient.cpp +++ b/packages/cpp/ArmoniK.Api.Client/source/submitter/ResultsClient.cpp @@ -69,7 +69,7 @@ void ResultsClient::upload_result_data(const std::string &session_id, const std: if (!stream->Write(request)) { throw armonik::api::common::exceptions::ArmoniKApiException("Unable to continue upload result"); } - payload = payload.substr(maxChunkSize); + payload = payload.substr(chunk.size()); } if (!stream->WritesDone()) { diff --git a/packages/cpp/ArmoniK.Api.Common/header/options/ComputePlane.h b/packages/cpp/ArmoniK.Api.Common/header/options/ComputePlane.h index 8466f692a..44de3c50b 100644 --- a/packages/cpp/ArmoniK.Api.Common/header/options/ComputePlane.h +++ b/packages/cpp/ArmoniK.Api.Common/header/options/ComputePlane.h @@ -34,8 +34,8 @@ class ComputePlane { * @param socket_address The socket address to set for the worker. */ void set_worker_address(std::string socket_address) { - if (socket_address.find("unix:") != 0) { - socket_address.insert(0, "unix:"); + if (socket_address.find("unix://") != 0) { + socket_address.insert(0, "unix://"); } worker_address_ = std::move(socket_address); } @@ -45,8 +45,8 @@ class ComputePlane { * @param agent_address The agent address to set for the agent. */ void set_agent_address(std::string agent_address) { - if (agent_address.find("unix:") != 0) { - agent_address.insert(0, "unix:"); + if (agent_address.find("unix://") != 0) { + agent_address.insert(0, "unix://"); } agent_address_ = std::move(agent_address); } diff --git a/packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp b/packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp index 4391d1575..b24952028 100644 --- a/packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp +++ b/packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp @@ -45,7 +45,7 @@ namespace logger = armonik::api::common::logger; * @param channel The gRPC channel to communicate with the server. * @param default_task_options The default task options. */ -void init(std::shared_ptr &channel, TaskOptions &default_task_options) { +void init(std::shared_ptr &channel, TaskOptions &default_task_options, logger::ILogger &logger) { Configuration configuration; // auto server = std::make_shared(configuration_t); @@ -54,7 +54,7 @@ void init(std::shared_ptr &channel, TaskOptions &default_task_options) std::string server_address = configuration.get("Grpc__EndPoint"); - std::cout << " Server address " << server_address << std::endl; + logger.info(" Server address {address}", {{"address", server_address}}); channel = grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()); @@ -77,6 +77,7 @@ void init(std::shared_ptr &channel, TaskOptions &default_task_options) TEST(testMock, createSession) { // MockStubInterface stub; std::shared_ptr channel; + logger::Logger log{logger::writer_console(), logger::formatter_plain(true)}; ClientContext context; CreateSessionReply reply; @@ -85,7 +86,7 @@ TEST(testMock, createSession) { const std::vector &partition_ids = {""}; TaskOptions task_options; - init(channel, task_options); + init(channel, task_options, log); ASSERT_EQ(task_options.partition_id(), ""); @@ -126,7 +127,7 @@ TEST(testMock, submitTask) { TaskOptions task_options; std::shared_ptr channel; - init(channel, task_options); + init(channel, task_options, log); // MockStubInterface stub; std::unique_ptr stub = Submitter::NewStub(channel); @@ -190,6 +191,7 @@ TEST(testMock, submitTask) { } TEST(testMock, testWorker) { + logger::Logger log{logger::writer_console(), logger::formatter_plain(true)}; std::shared_ptr channel; CreateSessionReply reply; @@ -199,7 +201,7 @@ TEST(testMock, testWorker) { TaskOptions task_options; - init(channel, task_options); + init(channel, task_options, log); auto stub = armonik::api::grpc::v1::results::Results::NewStub(channel); @@ -242,6 +244,7 @@ TEST(testMock, testWorker) { } TEST(testMock, getResult) { + logger::Logger log{logger::writer_console(), logger::formatter_plain(true)}; // MockStubInterface stub; std::shared_ptr channel; @@ -253,15 +256,17 @@ TEST(testMock, getResult) { TaskOptions task_options; armonik::api::grpc::v1::ResultRequest result_request; - init(channel, task_options); + init(channel, task_options, log); auto stub = armonik::api::grpc::v1::results::Results::NewStub(channel); grpc::ClientContext context; + log.debug("Creating Client"); std::unique_ptr stub_client = Submitter::NewStub(channel); armonik::api::client::SubmitterClient submitter(std::move(stub_client)); std::string session_id = submitter.create_session(task_options, partition_ids); + log.debug("Received session id {session_id}", {{"session_id", session_id}}); auto name = "test"; @@ -269,10 +274,13 @@ TEST(testMock, getResult) { request_create.set_session_id(session_id); armonik::api::client::ResultsClient results(armonik::api::grpc::v1::results::Results::NewStub(channel)); auto mapping = results.create_results(session_id, {name}); + log.debug("Created result {result_id}", {{"result_id", mapping[name]}}); ASSERT_TRUE(mapping.size() == 1); std::string payload = "TestPayload"; + results.upload_result_data(session_id, mapping[name], payload); + log.debug("Uploaded result {result_id}", {{"result_id", mapping[name]}}); // EXPECT_CALL(*stub, GetServiceConfiguration(_, _, _)).Times(AtLeast(1)); // EXPECT_CALL(*stub, TryGetResultStreamRaw(_, _)).Times(AtLeast(1)); @@ -281,6 +289,7 @@ TEST(testMock, getResult) { result_request.set_session(session_id); auto result = submitter.get_result_async(result_request).get(); + log.debug("Received result {result_id}", {{"result_id", mapping[name]}}); ASSERT_FALSE(result.empty()); ASSERT_EQ(payload, result); diff --git a/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h b/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h index 3ca818957..7c8f21558 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h @@ -17,6 +17,7 @@ #include "Worker/ProcessStatus.h" #include "Worker/TaskHandler.h" +#include "exceptions/ArmoniKApiException.h" #include "logger/formatter.h" #include "logger/logger.h" #include "logger/writer.h" @@ -57,6 +58,9 @@ class WorkerServer { logger.info("Creating worker"); common::options::ComputePlane compute_plane(configuration); + logger.info("Worker address : " + compute_plane.get_server_address()); + logger.info("Agent address : " + compute_plane.get_agent_address()); + builder_.AddListeningPort(compute_plane.get_server_address(), ::grpc::InsecureServerCredentials()); builder_.SetMaxReceiveMessageSize(-1); diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp index 082a03873..411fb7892 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp @@ -69,7 +69,10 @@ armonik::api::worker::ArmoniKWorker::Process([[maybe_unused]] ::grpc::ServerCont } *response->mutable_output() = std::move(output); } catch (const std::exception &e) { - return {::grpc::StatusCode::UNAVAILABLE, "Error processing task", e.what()}; + logger_.error("Error processing task : {what}", {{"what", e.what()}}); + std::stringstream ss; + ss << "Error processing task : " << e.what(); + return {::grpc::StatusCode::UNAVAILABLE, ss.str(), e.what()}; } return ::grpc::Status::OK; diff --git a/packages/cpp/tools/Dockerfile.worker b/packages/cpp/tools/Dockerfile.worker index 66b02ce54..c2735e186 100644 --- a/packages/cpp/tools/Dockerfile.worker +++ b/packages/cpp/tools/Dockerfile.worker @@ -22,8 +22,7 @@ RUN apk update && apk add --no-cache \ grpc \ grpc-dev \ protobuf \ - protobuf-dev \ - libfmt-dev + protobuf-dev # Update the PATH environment variable to include the gRPC libraries and binaries ENV LD_LIBRARY_PATH="/app/install/lib:$LD_LIBRARY_PATH" @@ -39,6 +38,8 @@ COPY ./packages/cpp/ArmoniK.Api.Worker ./ArmoniK.Api.Worker COPY ./packages/cpp/ArmoniK.Api.Worker.Tests ./ArmoniK.Api.Worker.Tests COPY ./packages/cpp/CMakeLists.txt ./ COPY ./packages/cpp/Dependencies.cmake ./ +COPY ./packages/cpp/Packaging.cmake ./ +COPY ./packages/cpp/tools/packaging/. ./tools/packaging/. # Copy the Protocol Buffer definition files into the image WORKDIR /app/proto