Skip to content

Commit

Permalink
MON-63843-agent-linux-streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-christophe81 committed Jul 8, 2024
1 parent 8f1e7a1 commit 1575b36
Show file tree
Hide file tree
Showing 16 changed files with 1,837 additions and 21 deletions.
7 changes: 6 additions & 1 deletion agent/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ add_custom_command(
add_library(centagent_lib STATIC
${SRC_DIR}/agent.grpc.pb.cc
${SRC_DIR}/agent.pb.cc
${SRC_DIR}/bireactor.cc
${SRC_DIR}/check.cc
${SRC_DIR}/check_exec.cc
${SRC_DIR}/opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.cc
Expand All @@ -110,6 +111,8 @@ add_library(centagent_lib STATIC
${SRC_DIR}/opentelemetry/proto/resource/v1/resource.pb.cc
${SRC_DIR}/config.cc
${SRC_DIR}/scheduler.cc
${SRC_DIR}/streaming_client.cc
${SRC_DIR}/streaming_server.cc
)

include_directories(
Expand All @@ -136,7 +139,9 @@ target_link_libraries(
centreon_process
-L${Boost_LIBRARY_DIR_RELEASE}
boost_program_options
fmt::fmt)
fmt::fmt
stdc++fs
)

target_precompile_headers(${CENTREON_AGENT} REUSE_FROM centagent_lib)

Expand Down
84 changes: 84 additions & 0 deletions agent/inc/com/centreon/agent/bireactor.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Copyright 2024 Centreon
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* For more information : [email protected]
*/

#ifndef CENTREON_AGENT_BIREACTOR_HH
#define CENTREON_AGENT_BIREACTOR_HH

#include "agent.grpc.pb.h"

namespace com::centreon::agent {

template <class bireactor_class>
class bireactor
: public bireactor_class,
public std::enable_shared_from_this<bireactor<bireactor_class>> {
private:
static std::set<std::shared_ptr<bireactor>> _instances;
static std::mutex _instances_m;

bool _write_pending;
std::deque<std::shared_ptr<MessageFromAgent>> _write_queue;
std::shared_ptr<MessageToAgent> _read_current;

const std::string_view _class_name;

const std::string _peer;

protected:
std::shared_ptr<boost::asio::io_context> _io_context;
std::shared_ptr<spdlog::logger> _logger;

bool _alive;
mutable std::mutex _protect;

public:
bireactor(const std::shared_ptr<boost::asio::io_context>& io_context,
const std::shared_ptr<spdlog::logger>& logger,
const std::string_view& class_name,
const std::string& peer);

virtual ~bireactor();

static void register_stream(const std::shared_ptr<bireactor>& strm);

void start_read();

void start_write();
void write(const std::shared_ptr<MessageFromAgent>& request);

// bireactor part
void OnReadDone(bool ok) override;

virtual void on_incomming_request(
const std::shared_ptr<MessageToAgent>& request) = 0;

virtual void on_error() = 0;

void OnWriteDone(bool ok) override;

// server version
void OnDone();
// client version
void OnDone(const ::grpc::Status& /*s*/);

virtual void shutdown();
};

} // namespace com::centreon::agent

#endif
109 changes: 109 additions & 0 deletions agent/inc/com/centreon/agent/streaming_client.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Copyright 2024 Centreon
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* For more information : [email protected]
*/

#ifndef CENTREON_AGENT_STREAMING_CLIENT_HH
#define CENTREON_AGENT_STREAMING_CLIENT_HH

#include "com/centreon/common/grpc/grpc_client.hh"

#include "bireactor.hh"
#include "scheduler.hh"

namespace com::centreon::agent {

class streaming_client;

class client_reactor
: public bireactor<
::grpc::ClientBidiReactor<MessageFromAgent, MessageToAgent>> {
std::weak_ptr<streaming_client> _parent;
::grpc::ClientContext _context;

public:
client_reactor(const std::shared_ptr<boost::asio::io_context>& io_context,
const std::shared_ptr<spdlog::logger>& logger,
const std::shared_ptr<streaming_client>& parent,
const std::string& peer);

std::shared_ptr<client_reactor> shared_from_this() {
return std::static_pointer_cast<client_reactor>(
bireactor<::grpc::ClientBidiReactor<MessageFromAgent, MessageToAgent>>::
shared_from_this());
}

::grpc::ClientContext& get_context() { return _context; }

void on_incomming_request(
const std::shared_ptr<MessageToAgent>& request) override;

void on_error() override;

void shutdown() override;
};

/**
* @brief this object not only manages connection to engine, but also embed
* check scheduler
*
*/
class streaming_client : public common::grpc::grpc_client_base,
public std::enable_shared_from_this<streaming_client> {
std::shared_ptr<boost::asio::io_context> _io_context;
std::shared_ptr<spdlog::logger> _logger;
std::string _supervised_host;

std::unique_ptr<AgentService::Stub> _stub;

std::shared_ptr<client_reactor> _reactor;
std::shared_ptr<scheduler> _sched;

std::mutex _protect;

void _create_reactor();

void _start();

void _send(const std::shared_ptr<MessageFromAgent>& request);

public:
streaming_client(const std::shared_ptr<boost::asio::io_context>& io_context,
const std::shared_ptr<spdlog::logger>& logger,
const std::shared_ptr<common::grpc::grpc_config>& conf,
const std::string& supervised_host);

static std::shared_ptr<streaming_client> load(
const std::shared_ptr<boost::asio::io_context>& io_context,
const std::shared_ptr<spdlog::logger>& logger,
const std::shared_ptr<common::grpc::grpc_config>& conf,
const std::string& supervised_host);

void on_incomming_request(const std::shared_ptr<client_reactor>& caller,
const std::shared_ptr<MessageToAgent>& request);
void on_error(const std::shared_ptr<client_reactor>& caller);

void shutdown();

// use only for tests
engine_to_agent_request_ptr get_last_message_to_agent() const {
return _sched->get_last_message_to_agent();
}
};

} // namespace com::centreon::agent

#endif
73 changes: 73 additions & 0 deletions agent/inc/com/centreon/agent/streaming_server.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Copyright 2024 Centreon
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* For more information : [email protected]
*/

#ifndef CENTREON_AGENT_STREAMING_SERVER_HH
#define CENTREON_AGENT_STREAMING_SERVER_HH

#include "com/centreon/common/grpc/grpc_server.hh"

#include "bireactor.hh"
#include "scheduler.hh"

namespace com::centreon::agent {

class server_reactor;

/**
* @brief grpc engine to agent server (reverse connection)
* It accept only one connection at a time
* If another connection occurs, previous connection is shutdown
* This object is both grpc server and grpc service
*/
class streaming_server : public common::grpc::grpc_server_base,
public std::enable_shared_from_this<streaming_server>,
public ReversedAgentService::Service {
std::shared_ptr<boost::asio::io_context> _io_context;
std::shared_ptr<spdlog::logger> _logger;
std::string _supervised_host;

/** active engine to agent connection*/
std::shared_ptr<server_reactor> _incoming;

mutable std::mutex _protect;

void _start();

public:
streaming_server(const std::shared_ptr<boost::asio::io_context>& io_context,
const std::shared_ptr<spdlog::logger>& logger,
const std::shared_ptr<common::grpc::grpc_config>& conf,
const std::string& supervised_host);

~streaming_server();

static std::shared_ptr<streaming_server> load(
const std::shared_ptr<boost::asio::io_context>& io_context,
const std::shared_ptr<spdlog::logger>& logger,
const std::shared_ptr<common::grpc::grpc_config>& conf,
const std::string& supervised_host);

::grpc::ServerBidiReactor<MessageToAgent, MessageFromAgent>* Import(
::grpc::CallbackServerContext* context);

void shutdown();
};

} // namespace com::centreon::agent

#endif
Loading

5 comments on commit 1575b36

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
29 8 0 37 78.38 23m35.239163s

Failed Tests

Name Message ⏱️ Duration Suite
BEEXTCMD_REVERSE_GRPC1 ((5.0,),) != ((10.0,),) 61.152 s External-Commands2
BEEXTCMD_REVERSE_GRPC2 ((5.0,),) != ((15.0,),) 36.624 s External-Commands2
BEEXTCMD_REVERSE_GRPC3 ((15.0,),) != ((10.0,),) 31.336 s External-Commands2
BEEXTCMD_REVERSE_GRPC4 centengine0 not correctly stopped (coredump generated) 81.786 s External-Commands2
BRGC1 Central Broker not correctly stopped (coredump generated) 124.688 s Reverse-Connection
BRCTS1 Central Broker not correctly stopped (coredump generated) 225.766 s Reverse-Connection
BRCTSMN Central Broker not correctly stopped (coredump generated) 193.416 s Reverse-Connection
BRCTSMNS Filters badly applied in Broker 131.922 s Reverse-Connection

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
29 8 0 37 78.38 23m46.063753s

Failed Tests

Name Message ⏱️ Duration Suite
BEEXTCMD_REVERSE_GRPC1 ((5.0,),) != ((10.0,),) 61.153 s External-Commands2
BEEXTCMD_REVERSE_GRPC2 ((5.0,),) != ((15.0,),) 36.629 s External-Commands2
BEEXTCMD_REVERSE_GRPC3 ((15.0,),) != ((10.0,),) 36.641 s External-Commands2
BEEXTCMD_REVERSE_GRPC4 centengine0 not correctly stopped (coredump generated) 81.457 s External-Commands2
BRGC1 Central Broker not correctly stopped (coredump generated) 124.413 s Reverse-Connection
BRCTS1 Central Broker not correctly stopped (coredump generated) 224.891 s Reverse-Connection
BRCTSMN Central Broker not correctly stopped (coredump generated) 192.928 s Reverse-Connection
BRCTSMNS Filters badly applied in Broker 125.541 s Reverse-Connection

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
58 16 0 74 78.38 47m32.589965s

Failed Tests

Name Message ⏱️ Duration Suite
BEEXTCMD_REVERSE_GRPC1 ((5.0,),) != ((10.0,),) 61.153 s External-Commands2
BEEXTCMD_REVERSE_GRPC2 ((5.0,),) != ((15.0,),) 36.629 s External-Commands2
BEEXTCMD_REVERSE_GRPC3 ((15.0,),) != ((10.0,),) 36.641 s External-Commands2
BEEXTCMD_REVERSE_GRPC4 centengine0 not correctly stopped (coredump generated) 81.457 s External-Commands2
BEEXTCMD_REVERSE_GRPC1 ((5.0,),) != ((10.0,),) 61.153 s External-Commands2
BEEXTCMD_REVERSE_GRPC2 ((5.0,),) != ((15.0,),) 36.696 s External-Commands2
BEEXTCMD_REVERSE_GRPC3 ((15.0,),) != ((10.0,),) 31.279 s External-Commands2
BEEXTCMD_REVERSE_GRPC4 centengine0 not correctly stopped (coredump generated) 81.513 s External-Commands2
BRGC1 Central Broker not correctly stopped (coredump generated) 124.413 s Reverse-Connection
BRCTS1 Central Broker not correctly stopped (coredump generated) 224.891 s Reverse-Connection
BRCTSMN Central Broker not correctly stopped (coredump generated) 192.928 s Reverse-Connection
BRCTSMNS Filters badly applied in Broker 125.541 s Reverse-Connection
BRGC1 Central Broker not correctly stopped (coredump generated) 124.459 s Reverse-Connection
BRCTS1 Central Broker not correctly stopped (coredump generated) 224.814 s Reverse-Connection
BRCTSMN Central Broker not correctly stopped (coredump generated) 192.866 s Reverse-Connection
BRCTSMNS Filters badly applied in Broker 132.010 s Reverse-Connection

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
35 9 0 44 79.55 27m8.346986s

Failed Tests

Name Message ⏱️ Duration Suite
BEEXTCMD_REVERSE_GRPC1 ((5.0,),) != ((10.0,),) 61.019 s External-Commands2
BEEXTCMD_REVERSE_GRPC2 ((5.0,),) != ((15.0,),) 36.490 s External-Commands2
BEEXTCMD_REVERSE_GRPC3 ((15.0,),) != ((10.0,),) 36.494 s External-Commands2
BEEXTCMD_REVERSE_GRPC4 centengine0 not correctly stopped (coredump generated) 82.383 s External-Commands2
EBNHGU4_BBDO3 hostgroup_1 not found in /tmp/lua-engine.log 75.649 s Hostgroups
BRGC1 Central Broker not correctly stopped (coredump generated) 125.170 s Reverse-Connection
BRCTS1 Central Broker not correctly stopped (coredump generated) 225.922 s Reverse-Connection
BRCTSMN Central Broker not correctly stopped (coredump generated) 193.429 s Reverse-Connection
BRCTSMNS Filters badly applied in Broker 131.610 s Reverse-Connection

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
29 8 0 37 78.38 22m28.24102s

Failed Tests

Name Message ⏱️ Duration Suite
BEEXTCMD_REVERSE_GRPC1 ((5.0,),) != ((10.0,),) 61.010 s External-Commands2
BEEXTCMD_REVERSE_GRPC2 ((5.0,),) != ((15.0,),) 36.557 s External-Commands2
BEEXTCMD_REVERSE_GRPC3 ((15.0,),) != ((10.0,),) 31.154 s External-Commands2
BEEXTCMD_REVERSE_GRPC4 centengine0 not correctly stopped (coredump generated) 87.666 s External-Commands2
BRGC1 Central Broker not correctly stopped (coredump generated) 127.101 s Reverse-Connection
BRCTS1 Central Broker not correctly stopped (coredump generated) 287.091 s Reverse-Connection
BRCTSMN Central Broker not correctly stopped (coredump generated) 195.600 s Reverse-Connection
BRCTSMNS OSError: [Errno 28] No space left on device 0.208 s Reverse-Connection

Please sign in to comment.