Skip to content

Commit

Permalink
add buff to cpm
Browse files Browse the repository at this point in the history
  • Loading branch information
robbietu committed Nov 21, 2024
1 parent e07edac commit 4e23a9e
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 22 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ endif ()
# set PKTMINERG_MAJOR_VERSION, PKTMINERG_MINOR_VERSION, etc.
set(PKTMINERG_MAJOR_VERSION "0")
set(PKTMINERG_MINOR_VERSION "8")
set(PKTMINERG_PATCH_VERSION "2")
set(PKTMINERG_PATCH_VERSION "3")
set(PKTMINERG_VERSION_STRING "${PKTMINERG_MAJOR_VERSION}.${PKTMINERG_MINOR_VERSION}.${PKTMINERG_PATCH_VERSION}")

if (WIN32)
Expand Down
33 changes: 26 additions & 7 deletions src/daemonManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ void DaemonManager::getDaemonImpl() {
std::stringstream jsonS;
jsonS << report_.toJson();
const auto json = jsonS.str();

if (report_.packetAgentLogsIsSet()) {
report_.unsetPacketAgentLogs();
}
Expand Down Expand Up @@ -510,7 +510,7 @@ int DaemonManager::startPA(io::swagger::server::model::Agent& body, std::strings
}
buffSize = body.getMemLimit()/stratigies;
}
capBuff_ = buffSize;
if(!(port = zmqPortAvlPop())) {
io::swagger::server::model::Error error;
error.setCode(0);
Expand Down Expand Up @@ -542,15 +542,33 @@ int DaemonManager::startPA(io::swagger::server::model::Agent& body, std::strings
if (data->containerIdsIsSet()) {
int id = 0;
for (auto & i: data->getContainerIds()) {
const auto command = boost::str(
std::vector<std::string> results;
boost::split(results, i, boost::is_any_of("_"), boost::token_compress_on);
if (results.size() > 1) {
for (unsigned int count = 1; count < results.size(); count++) {
const auto command = boost::str(
boost::format("%1%%2%%3%%4%")
%(std::string("-c ") + results[0])
%(std::string(" -i ") + results[count])
%getChannelArg(data, id)
%cmdParams);
commandStr.push_back(command);
str += " " + command;
id ++;
}
} else {
const auto command = boost::str(
boost::format("%1%%2%%3%")
%(std::string("-c ") + i)
%getChannelArg(data, id)
%cmdParams);
commandStr.push_back(command);
str += " " + command;
id ++;
commandStr.push_back(command);
str += " " + command;
id ++;
}
}
} else if (data->interfaceNamesIsSet()) {
int id = 0;
Expand Down Expand Up @@ -760,6 +778,7 @@ int DaemonManager::updateChannelStatusSync() {
packet_channel_metric_ptr->setCapDrop(p_status->total_cap_drop_count);
packet_channel_metric_ptr->setFwdBytes(p_status->total_fwd_bytes);
packet_channel_metric_ptr->setFwdPackets(p_status->total_fwd_count);
packet_channel_metric_ptr->setCapBuff(capBuff_);
} else {
ctx_.log("agent status query failed.", log4cpp::Priority::ERROR);
Expand Down Expand Up @@ -1118,7 +1137,7 @@ DaemonManager::DaemonManager(const boost::program_options::variables_map &vm, ti
}
}

daemon_.setClientVersion("0.8.2");
daemon_.setClientVersion("0.8.3");
std::vector<std::string> strs;
split(strs, SUPPORT_API_VERSIONS, boost::algorithm::is_any_of(","));
for (const auto& str:strs) {
Expand Down
1 change: 1 addition & 0 deletions src/daemonManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class DaemonManager
io::swagger::server::model::Daemon daemon_;
io::swagger::server::model::Report report_;
io::swagger::server::model::Agent agent_;
uint64_t capBuff_ = 0;
pid_t agentPid_ = 0;
uint16_t zmqPort_;
const uint32_t ZMQ_TIMEOUT_ = 1000; // millisecond unit
Expand Down
2 changes: 1 addition & 1 deletion src/pcaphandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ int PcapHandler::openPcap() {
}
}
pcapGuard.Dismiss();

_pcap_handle = pcap_handle;

return 0;
Expand Down Expand Up @@ -656,7 +657,6 @@ void PcapHandler::clearHandler() {
closeExport();
closePcapDumper();
closePcap();
setHandlerStatus(HANDLER_DOWN);
}

void PcapHandler::closeExport() {
Expand Down
8 changes: 3 additions & 5 deletions src/pcaphandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
#include "statislog.h"
#include "logfilecontext.h"

#define HANDLER_ACTIVE 1
#define HANDLER_DOWN 2


typedef struct PcapInit {
Expand Down Expand Up @@ -89,7 +87,7 @@ class PcapHandler {
IpPortAddr _addr;
u_int8_t _macAddr[ETH_ALEN] ={0};
bool _autoDirection = false;
int _handlerStatus = HANDLER_DOWN;
long long _handlerCheckTime = -1;
std::string output_buffer;
LogFileContext _ctx;
#ifdef _WIN32
Expand Down Expand Up @@ -117,8 +115,8 @@ class PcapHandler {

void setDirIPPorts(std::string str) {_addr.init(str);};
int openPcapDumper(pcap_t *pcap_handle);
void setHandlerStatus (int status) {_handlerStatus = status;};
int getHandlerStatus () {return _handlerStatus;};
void setHandlerCheckTime (long long t) {_handlerCheckTime = t;};
long long getHandlerCheckTime () {return _handlerCheckTime;};

LogFileContext& getLogFileContext() {return _ctx;};
};
Expand Down
25 changes: 19 additions & 6 deletions src/pktminerg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ Allowed options for each interface:");
("buffsize,b", boost::program_options::value<int>()->default_value(256)->value_name("SIZE"),
"set snoop buffer size; SIZE defaults 256 and units MB")
("priority,p", "set high priority mode")
("in_addr", boost::program_options::value<int>()->value_name("ID"), "set cpu affinity ID")
("cpu", boost::program_options::value<int>()->value_name("ID"), "set cpu affinity ID")
("expression", boost::program_options::value<std::vector<std::string>>()->value_name("FILTER"),
R"(filter packets with FILTER; FILTER as same as tcpdump BPF expression syntax)")
("dump", boost::program_options::value<std::string>()->default_value("./")->value_name("DUMP"),
Expand Down Expand Up @@ -653,7 +653,7 @@ Allowed options for each interface:");
}

// check options
if (vm.count("interface") + vm.count("pcapfile") + vm.count("container") + vm.count("kvm")!= 1) {
if ((vm.count("interface") || vm.count("container")) + vm.count("pcapfile") + vm.count("kvm")!= 1) {
ctx.log("Please choice only one snoop mode, from interface use -i or from pcap file use -f or from container use -c or from KVM use -m.",
log4cpp::Priority::ERROR);
std::cerr << StatisLogContext::getTimeString()
Expand Down Expand Up @@ -758,8 +758,9 @@ Allowed options for each interface:");
}
std::string processId = "";
if (vm.count("container")) {
// suppose that a container only has one interface named "eth0"
intface = "eth0";
if (intface == "") {
intface = "eth0";
}
processId = getProccssIdWithContainer(vm["container"].as<std::string>(), g_ctx);
if (processId.length() == 0) {
continue;
Expand Down Expand Up @@ -978,25 +979,37 @@ Allowed options for each interface:");
#endif
}
allHandlersDown = true;
auto now = std::chrono::steady_clock::now();

auto milliseconds = std::chrono::time_point_cast<std::chrono::milliseconds>(now);
auto epoch = milliseconds.time_since_epoch();
long long currentTime = std::chrono::duration_cast<std::chrono::milliseconds>(epoch).count();

for (auto iter = handlers.begin(); iter != handlers.end() && isLoop;++iter) {
if((*iter)->getHandlerStatus() == HANDLER_DOWN) {
if((*iter)->getHandlerCheckTime() > 0 && currentTime-(*iter)->getHandlerCheckTime() < 100) {
continue;
}
if((*iter)->getHandlerCheckTime() != 0) {
if((*iter)->openPcap() == -1) {
(*iter)->clearHandler();
(*iter)->setHandlerCheckTime(currentTime);
continue;
}
if ((*iter)->initExport() == -1) {
(*iter)->getLogFileContext().log("initExport failed.", log4cpp::Priority::ERROR);
std::cerr << StatisLogContext::getTimeString() << "initExport failed." << std::endl;
(*iter)->clearHandler();
(*iter)->setHandlerCheckTime(currentTime);
continue;
}
(*iter)->setHandlerStatus(HANDLER_ACTIVE);
(*iter)->setHandlerCheckTime(0);
}

if ((*iter)->handlePacket() == 0) {
allHandlersDown = false;
} else {
(*iter)->clearHandler();
(*iter)->setHandlerCheckTime(currentTime);
}
}
}
Expand Down
22 changes: 21 additions & 1 deletion src/restful/model/PacketChannelMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ namespace io {
m_FwdBytes = 0L;
m_FwdBytesIsSet = false;
m_FwdPackets = 0L;
m_CapBuffIsSet = false;
m_CapBuff = 0L;
m_FwdPacketsIsSet = false;
m_StartTime = 0;
m_StartTimeIsSet = false;
Expand Down Expand Up @@ -90,7 +92,9 @@ namespace io {
if (m_FwdBytesIsSet) {
val["fwdPackets"] = m_FwdPackets;
}

if (m_CapBuffIsSet) {
val["capBuff"] = m_CapBuff;
}

return val;
}
Expand All @@ -108,6 +112,7 @@ namespace io {
MB_FSET(capDrop, CapDrop)
MB_FSET(fwdBytes, FwdBytes)
MB_FSET(fwdPackets, FwdPackets)
MB_FSET(capBuff, CapBuff)
}


Expand Down Expand Up @@ -291,7 +296,22 @@ namespace io {
void PacketAgentMetrics::unsetFwdPackets() {
m_FwdPacketsIsSet = false;
}
uint64_t PacketAgentMetrics::getCapBuff() const {
return m_CapBuff;
}

void PacketAgentMetrics::setCapBuff(uint64_t value) {
m_CapBuff = value;
m_CapBuffIsSet = true;
}

bool PacketAgentMetrics::capBuffIsSet() const {
return m_CapBuffIsSet;
}

void PacketAgentMetrics::unsetCapBuff() {
m_CapBuffIsSet = false;
}
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/restful/model/PacketChannelMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,13 @@ class PacketAgentMetrics
void setFwdPackets(uint64_t value);
bool fwdPacketsIsSet() const;
void unsetFwdPackets();

/// <summary>
/// Packet Agent 从启动开始转发的数据包数
/// </summary>
uint64_t getCapBuff() const;
void setCapBuff(uint64_t value);
bool capBuffIsSet() const;
void unsetCapBuff();
protected:
long m_SamplingTimestamp;
long m_SamplingMicroTimestamp;
Expand All @@ -148,6 +154,8 @@ class PacketAgentMetrics
bool m_FwdBytesIsSet;
uint64_t m_FwdPackets;
bool m_FwdPacketsIsSet;
uint64_t m_CapBuff;
bool m_CapBuffIsSet;
long m_StartTime;
bool m_StartTimeIsSet;
};
Expand Down

0 comments on commit 4e23a9e

Please sign in to comment.