Skip to content

Commit

Permalink
[INLONG-10861][SDK] Optimize the coredump caused by the DataProxy C++…
Browse files Browse the repository at this point in the history
… SDK (apache#10862)
  • Loading branch information
doleyzi authored Aug 23, 2024
1 parent 2bb4c0e commit 73881b4
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ int32_t ApiImp::InitApi(const char *config_file_path) {
return SdkCode::kMultiInit;
}

user_exit_flag_.getAndSet(0);

if (!SdkConfig::getInstance()->ParseConfig(config_file_path)) {
return SdkCode::kErrorInit;
}
Expand All @@ -48,6 +46,9 @@ int32_t ApiImp::InitApi(const char *config_file_path) {

int32_t ApiImp::Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len,
UserCallBack call_back) {
if (inited_ == false || exit_flag_) {
return SdkCode::kSendBeforeInit;
}
int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len);
if(code !=SdkCode::kSuccess){
return code;
Expand All @@ -57,6 +58,9 @@ int32_t ApiImp::Send(const char *inlong_group_id, const char *inlong_stream_id,
}
int32_t ApiImp::Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len,
int64_t data_time, UserCallBack call_back) {
if (inited_ == false || exit_flag_) {
return SdkCode::kSendBeforeInit;
}
int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len);
if(code !=SdkCode::kSuccess){
return code;
Expand All @@ -73,10 +77,6 @@ int32_t ApiImp::ValidateParams(const char *inlong_group_id, const char *inlong_s
if (inlong_group_id == nullptr || inlong_stream_id == nullptr || msg == nullptr || msg_len <= 0) {
return SdkCode::kInvalidInput;
}

if (inited_ == false) {
return SdkCode::kSendBeforeInit;
}
return SdkCode::kSuccess;
}

Expand All @@ -99,10 +99,7 @@ int32_t ApiImp::SendBase(const std::string& inlong_group_id, const std::string&
}

int32_t ApiImp::CloseApi(int32_t max_waitms) {
if (!__sync_bool_compare_and_swap(&init_flag_, false, true)) {
LOG_ERROR("sdk has been closed! .");
return SdkCode::kMultiExits;
}
exit_flag_ = true;
std::this_thread::sleep_for(std::chrono::milliseconds(max_waitms));
return SdkCode::kSuccess;
}
Expand All @@ -126,8 +123,7 @@ int32_t ApiImp::DoInit() {
}

int32_t ApiImp::CheckData(const std::string& inlong_group_id, const std::string& inlong_stream_id, const std::string& msg) {
if (init_succeed_ == 0 || user_exit_flag_.get() == 1) {
LOG_ERROR("capi has been closed, Init first and then send");
if (!init_succeed_) {
return SdkCode::kSendAfterClose;
}

Expand All @@ -154,14 +150,13 @@ int32_t ApiImp::InitManager() {

recv_manager_ = std::make_shared<RecvManager>(send_manager_);
if (!recv_manager_) {
LOG_ERROR("fail to Init global packqueue");
return SdkCode::kErrorInit;
}
init_succeed_ = true;
return SdkCode::kSuccess;
}
int32_t ApiImp::AddInLongGroupId(const std::vector<std::string> &group_ids) {
if (inited_ == false) {
if (!inited_) {
return SdkCode::kSendBeforeInit;
}
for (auto group_id : group_ids) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ class ApiImp {

int32_t ValidateParams(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len);

AtomicInt user_exit_flag_{0};
volatile bool init_flag_ = false;
volatile bool inited_ = false;
volatile bool init_succeed_ = false;
AtomicInt buf_full_{0};
volatile bool exit_flag_ = false;

uint32_t max_msg_length_;
std::string local_ip_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class BufferManager {
std::queue<SendBufferPtrT> buffer_queue_;
mutable std::mutex mutex_;
uint32_t queue_limit_;
bool exit_= false;
BufferManager() {
uint32_t data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_,
SdkConfig::getInstance()->pack_size_);
Expand All @@ -54,13 +55,21 @@ class BufferManager {
AddSendBuffer(send_buffer);
}
}
~BufferManager(){
std::lock_guard<std::mutex> lck(mutex_);
exit_ = true;
LOG_INFO("Buffer manager exited");
}

public:
static BufferManager *GetInstance() {
static BufferManager instance;
return &instance;
}
SendBufferPtrT GetSendBuffer() {
if(exit_){
return nullptr;
}
std::lock_guard<std::mutex> lck(mutex_);
if (buffer_queue_.empty()) {
return nullptr;
Expand All @@ -70,7 +79,7 @@ class BufferManager {
return buf;
}
void AddSendBuffer(const SendBufferPtrT &send_buffer) {
if (nullptr == send_buffer) {
if (nullptr == send_buffer || exit_) {
return;
}
send_buffer->releaseBuf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "../config/sdk_conf.h"
#include "../metric/environment.h"
#include "../metric/metric.h"
#include "../utils/logger.h"

#ifndef INLONG_METRIC_MANAGER_H
#define INLONG_METRIC_MANAGER_H
Expand All @@ -40,10 +41,6 @@ class MetricManager {
Environment environment_;
std::string coreParma_;

MetricManager() {

}

public:
static MetricManager *GetInstance() {
static MetricManager instance;
Expand All @@ -54,23 +51,35 @@ class MetricManager {
void PrintMetric();
void Run();
void UpdateMetric(const std::string &stat_key, Metric &stat) {
if(!running_){
return;
}
std::lock_guard<std::mutex> lck(mutex_);
stat_map_[stat_key].Update(stat);
}

void AddReceiveBufferFullCount(const std::string &inlong_group_id, const std::string &inlong_stream_id,uint64_t count) {
if(!running_){
return;
}
std::lock_guard<std::mutex> lck(mutex_);
std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
stat_map_[stat_key].AddReceiveBufferFullCount(count);
}

void AddTooLongMsgCount(const std::string &inlong_group_id, const std::string &inlong_stream_id,uint64_t count) {
if (!running_) {
return;
}
std::lock_guard<std::mutex> lck(mutex_);
std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
stat_map_[stat_key].AddTooLongMsgCount(count);
}

void AddMetadataFailCount(const std::string &inlong_group_id, const std::string &inlong_stream_id,uint64_t count) {
if (!running_) {
return;
}
std::lock_guard<std::mutex> lck(mutex_);
std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
stat_map_[stat_key].AddMetadataFailCount(count);
Expand All @@ -87,6 +96,7 @@ class MetricManager {
if (update_thread_.joinable()) {
update_thread_.join();
}
LOG_INFO("Metric manager exited");
}
};
} // namespace inlong
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class MsgManager {
mutable std::mutex mutex_;
uint32_t queue_limit_;
bool enable_share_msg_;
bool exit_= false;
MsgManager() {
uint32_t data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_, SdkConfig::getInstance()->pack_size_);
uint32_t buffer_num = SdkConfig::getInstance()->recv_buf_size_ / data_capacity_;
Expand All @@ -48,14 +49,19 @@ class MsgManager {
AddMsg(msg_ptr);
}
}
~MsgManager(){
std::lock_guard<std::mutex> lck(mutex_);
exit_ = true;
LOG_INFO("Msg manager exited");
}

public:
static MsgManager *GetInstance() {
static MsgManager instance;
return &instance;
}
SdkMsgPtr GetMsg() {
if (!enable_share_msg_) {
if (!enable_share_msg_ || exit_) {
return nullptr;
}
std::lock_guard<std::mutex> lck(mutex_);
Expand All @@ -67,7 +73,7 @@ class MsgManager {
return buf;
}
void AddMsg(const SdkMsgPtr &msg_ptr) {
if (nullptr == msg_ptr || !enable_share_msg_) {
if (nullptr == msg_ptr || !enable_share_msg_ || exit_) {
return;
}
std::lock_guard<std::mutex> lck(mutex_);
Expand All @@ -78,7 +84,7 @@ class MsgManager {
}

void AddMsg(const std::vector<SdkMsgPtr> &user_msg_vector) {
if (!enable_share_msg_) {
if (!enable_share_msg_ || exit_) {
return;
}
std::lock_guard<std::mutex> lck(mutex_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "api_code.h"
#include <fstream>
#include <curl/curl.h>

#include "../config/ini_help.h"
#include "../utils/capi_constant.h"
Expand All @@ -41,11 +42,14 @@ ProxyManager::~ProxyManager() {
if (update_conf_thread_.joinable()) {
update_conf_thread_.join();
}

curl_global_cleanup();
}
void ProxyManager::Init() {
timeout_ = SdkConfig::getInstance()->manager_url_timeout_;
last_update_time_ = Utils::getCurrentMsTime();
if (__sync_bool_compare_and_swap(&inited_, false, true)) {
curl_global_init(CURL_GLOBAL_ALL);
ReadLocalCache();
update_conf_thread_ = std::thread(&ProxyManager::Update, this);
}
Expand Down
42 changes: 10 additions & 32 deletions inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ int32_t Utils::requestUrl(const std::string &url, std::string &urlByDNS,
}

CURL *curl = NULL;
curl_global_init(CURL_GLOBAL_ALL);

curl = curl_easy_init();
if (!curl) {
Expand All @@ -330,9 +329,7 @@ int32_t Utils::requestUrl(const std::string &url, std::string &urlByDNS,
LOG_INFO("request from tdm:" << res);
if (ret != 0) {
LOG_ERROR("failed to request data from " << urlByDNS);
if (curl)
curl_easy_cleanup(curl);
curl_global_cleanup();
if (curl) curl_easy_cleanup(curl);

return SdkCode::kErrorCURL;
}
Expand All @@ -341,25 +338,17 @@ int32_t Utils::requestUrl(const std::string &url, std::string &urlByDNS,
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &code);
if (code != 200) {
LOG_ERROR("tdm responsed with code " << code);
if (curl)
curl_easy_cleanup(curl);
curl_global_cleanup();

if (curl) curl_easy_cleanup(curl);
return SdkCode::kErrorCURL;
}

if (res.empty()) {
LOG_ERROR("tdm return empty data");
if (curl)
curl_easy_cleanup(curl);
curl_global_cleanup();

if (curl) curl_easy_cleanup(curl);
return SdkCode::kErrorCURL;
}

if (curl)
curl_easy_cleanup(curl);
curl_global_cleanup();
if (curl) curl_easy_cleanup(curl);

return 0;
}
Expand Down Expand Up @@ -406,17 +395,14 @@ int32_t Utils::requestUrl(std::string &res, const HttpRequest *request) {
CURL *curl = NULL;
struct curl_slist *list = NULL;

curl_global_init(CURL_GLOBAL_ALL);

curl = curl_easy_init();
if (!curl) {
LOG_ERROR("failed to init curl object");
return SdkCode::kErrorCURL;
}

// http header
list = curl_slist_append(list,
"Content-Type: application/x-www-form-urlencoded");
list = curl_slist_append(list,"Content-Type: application/x-www-form-urlencoded");

if (request->need_auth && !request->auth_id.empty() &&
!request->auth_key.empty()) {
Expand Down Expand Up @@ -446,36 +432,28 @@ int32_t Utils::requestUrl(std::string &res, const HttpRequest *request) {
if (ret != 0) {
LOG_ERROR(curl_easy_strerror(ret));
LOG_ERROR("failed to request data from " << request->url.c_str());
if (curl)
curl_easy_cleanup(curl);
curl_global_cleanup();
if (curl) curl_easy_cleanup(curl);

return SdkCode::kErrorCURL;
}

int32_t code;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &code);
if (code != 200) {
LOG_ERROR("tdm responsed with code " << code);
if (curl)
curl_easy_cleanup(curl);
curl_global_cleanup();
if (curl) curl_easy_cleanup(curl);

return SdkCode::kErrorCURL;
}

if (res.empty()) {
LOG_ERROR("tdm return empty data");
if (curl)
curl_easy_cleanup(curl);
curl_global_cleanup();
LOG_ERROR("Empty response");
if (curl) curl_easy_cleanup(curl);

return SdkCode::kErrorCURL;
}

// clean work
// Clean work
curl_easy_cleanup(curl);
curl_global_cleanup();

return 0;
}
Expand Down

0 comments on commit 73881b4

Please sign in to comment.