Skip to content

Commit

Permalink
[INLONG-10838][SDK] Optimize the ability to send data for DataProxy C…
Browse files Browse the repository at this point in the history
…++ SDK (apache#10850)
  • Loading branch information
doleyzi committed Aug 21, 2024
1 parent b57d631 commit 9ac4e5a
Show file tree
Hide file tree
Showing 15 changed files with 362 additions and 325 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef INLONG_SDK_API_H
#define INLONG_SDK_API_H

#include <clocale>
#include <cstdint>
#include <functional>
#include <memory>
#include <vector>

namespace inlong {

typedef int (*UserCallBack)(const char *, const char *, const char *, int32_t,
const int64_t, const char *);

class ApiImp;

class InLongApi {
public:
InLongApi();
~InLongApi();
int32_t InitApi(const char *config_path);

int32_t AddInLongGroupId(const std::vector<std::string> &group_ids);

int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len,
UserCallBack call_back = nullptr);

int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len,
int64_t data_time, UserCallBack call_back = nullptr);

int32_t CloseApi(int32_t max_waitms);

private:
std::shared_ptr<ApiImp> api_impl_;
};
} // namespace inlong
#endif // INLONG_SDK_API_H
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ void TcpClient::BeginWrite() {
}
last_update_time_ = Utils::getCurrentMsTime();
status_ = kWriting;
asio::async_write(*socket_, asio::buffer(sendBuffer_->content(), sendBuffer_->len()),
asio::async_write(*socket_, asio::buffer(sendBuffer_->GetData(), sendBuffer_->GetDataLen()),
std::bind(&TcpClient::OnWroten, this, std::placeholders::_1, std::placeholders::_2));
}
void TcpClient::OnWroten(const asio::error_code error, std::size_t bytes_transferred) {
Expand Down Expand Up @@ -390,8 +390,8 @@ void TcpClient::ParseHeartBeat(size_t total_length) {

void TcpClient::ParseGenericResponse() {
if (sendBuffer_ != nullptr) {
std::string stat_key = sendBuffer_->getGroupId() + kStatJoiner + sendBuffer_->getStreamId();
stat_map_[stat_key].AddSendSuccessMsgNum(sendBuffer_->msgCnt());
std::string stat_key = sendBuffer_->GetInlongGroupId() + kStatJoiner + sendBuffer_->GetInlongStreamId();
stat_map_[stat_key].AddSendSuccessMsgNum(sendBuffer_->GetMsgCnt());
stat_map_[stat_key].AddSendSuccessPackNum(1);
stat_map_[stat_key].AddTimeCost(Utils::getCurrentMsTime() - last_update_time_);

Expand Down Expand Up @@ -448,8 +448,8 @@ void TcpClient::ResetSendBuffer() {
}
retry_times_++;
if (retry_times_ > SdkConfig::getInstance()->retry_times_) {
std::string stat_key = sendBuffer_->getGroupId() + kStatJoiner + sendBuffer_->getStreamId();
stat_map_[stat_key].AddSendFailMsgNum(sendBuffer_->msgCnt());
std::string stat_key = sendBuffer_->GetInlongGroupId() + kStatJoiner + sendBuffer_->GetInlongStreamId();
stat_map_[stat_key].AddSendFailMsgNum(sendBuffer_->GetMsgCnt());
stat_map_[stat_key].AddSendFailPackNum(1);
stat_map_[stat_key].AddTimeCost(Utils::getCurrentMsTime() - last_update_time_);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,12 @@ void SdkConfig::InitTcpParam(const rapidjson::Value &doc) {
} else {
retry_times_ = constants::kRetryTimes;
}
if (doc.HasMember("proxy_repeat_times") && doc["proxy_repeat_times"].IsInt() && doc["proxy_repeat_times"].GetInt() >= 0) {
const rapidjson::Value &obj = doc["proxy_repeat_times"];
proxy_repeat_times_ = obj.GetInt();
} else {
proxy_repeat_times_ = constants::kProxyRepeatTimes;
}
}
void SdkConfig::InitAuthParm(const rapidjson::Value &doc) {
// auth settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class SdkConfig {
bool enable_balance_;
bool enable_local_cache_;
uint32_t retry_times_;

uint32_t proxy_repeat_times_;

// auth settings
bool need_auth_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,31 +46,31 @@ int32_t ApiImp::InitApi(const char *config_file_path) {
return DoInit();
}

int32_t ApiImp::Send(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len,
int32_t ApiImp::Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len,
UserCallBack call_back) {
int32_t code=ValidateParams(group_id, stream_id, msg, msg_len);
int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len);
if(code !=SdkCode::kSuccess){
return code;
}

return this->SendBase(group_id, stream_id, {msg, msg_len}, call_back);
return this->SendBase(inlong_group_id, inlong_stream_id, {msg, msg_len}, call_back);
}
int32_t ApiImp::Send(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len,
int32_t ApiImp::Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len,
int64_t data_time, UserCallBack call_back) {
int32_t code=ValidateParams(group_id, stream_id, msg, msg_len);
int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len);
if(code !=SdkCode::kSuccess){
return code;
}

return this->SendBase(group_id, stream_id, {msg, msg_len}, call_back, data_time);
return this->SendBase(inlong_group_id, inlong_stream_id, {msg, msg_len}, call_back, data_time);
}

int32_t ApiImp::ValidateParams(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len){
int32_t ApiImp::ValidateParams(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len){
if (msg_len > max_msg_length_) {
MetricManager::GetInstance()->AddTooLongMsgCount(group_id,stream_id,1);
MetricManager::GetInstance()->AddTooLongMsgCount(inlong_group_id, inlong_stream_id, 1);
return SdkCode::kMsgTooLong;
}
if (group_id == nullptr || stream_id == nullptr || msg == nullptr || msg_len <= 0) {
if (inlong_group_id == nullptr || inlong_stream_id == nullptr || msg == nullptr || msg_len <= 0) {
return SdkCode::kInvalidInput;
}

Expand All @@ -80,9 +80,9 @@ int32_t ApiImp::ValidateParams(const char *group_id, const char *stream_id, cons
return SdkCode::kSuccess;
}

int32_t ApiImp::SendBase(const std::string& inlong_group_id, const std::string& stream_id, const std::string& msg,
int32_t ApiImp::SendBase(const std::string& inlong_group_id, const std::string& inlong_stream_id, const std::string& msg,
UserCallBack call_back, int64_t report_time) {
int32_t check_ret = CheckData(inlong_group_id, stream_id, msg);
int32_t check_ret = CheckData(inlong_group_id, inlong_stream_id, msg);
if (check_ret != SdkCode::kSuccess) {
return check_ret;
}
Expand All @@ -91,11 +91,11 @@ int32_t ApiImp::SendBase(const std::string& inlong_group_id, const std::string&

auto recv_group = recv_manager_->GetRecvGroup(inlong_group_id);
if (recv_group == nullptr) {
LOG_ERROR("fail to get pack queue, group id:" << inlong_group_id << ",getStreamId:" << stream_id);
LOG_ERROR("fail to get pack queue, group id:" << inlong_group_id << ",getStreamId:" << inlong_stream_id);
return SdkCode::kFailGetRevGroup;
}

return recv_group->SendData(msg, inlong_group_id, stream_id, report_time, call_back);
return recv_group->SendData(msg, inlong_group_id, inlong_stream_id, report_time, call_back);
}

int32_t ApiImp::CloseApi(int32_t max_waitms) {
Expand Down Expand Up @@ -125,19 +125,19 @@ int32_t ApiImp::DoInit() {
return InitManager();
}

int32_t ApiImp::CheckData(const std::string& group_id, const std::string& stream_id, const std::string& msg) {
int32_t ApiImp::CheckData(const std::string& inlong_group_id, const std::string& inlong_stream_id, const std::string& msg) {
if (init_succeed_ == 0 || user_exit_flag_.get() == 1) {
LOG_ERROR("capi has been closed, Init first and then send");
return SdkCode::kSendAfterClose;
}

if (msg.empty() || group_id.empty() || stream_id.empty()) {
LOG_ERROR("invalid input, group id:" << group_id << " stream id:" << stream_id << "msg" << msg);
if (msg.empty() || inlong_group_id.empty() || inlong_stream_id.empty()) {
LOG_ERROR("invalid input, group id:" << inlong_group_id << " stream id:" << inlong_stream_id << "msg" << msg);
return SdkCode::kInvalidInput;
}

if (msg.size() > SdkConfig::getInstance()->max_msg_size_) {
MetricManager::GetInstance()->AddTooLongMsgCount(group_id,stream_id,1);
MetricManager::GetInstance()->AddTooLongMsgCount(inlong_group_id, inlong_stream_id, 1);
LOG_ERROR("msg DataLen is too long, cur msg_len" << msg.size() << " ext_pack_size"
<< SdkConfig::getInstance()->max_msg_size_);
return SdkCode::kMsgTooLong;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ class ApiImp {
~ApiImp();
int32_t InitApi(const char *config_file_path);

int32_t Send(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len,
int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len,
UserCallBack call_back = nullptr);

int32_t Send(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len,
int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len,
int64_t report_time, UserCallBack call_back = nullptr);
int32_t CloseApi(int32_t max_waitms);

Expand All @@ -46,12 +46,12 @@ class ApiImp {
private:
int32_t DoInit();
int32_t InitManager();
int32_t SendBase(const std::string& inlong_group_id, const std::string& stream_id, const std::string& msg, UserCallBack call_back,
int32_t SendBase(const std::string& inlong_group_id, const std::string& inlong_stream_id, const std::string& msg, UserCallBack call_back,
int64_t report_time = 0);

int32_t CheckData(const std::string& group_id, const std::string& stream_id, const std::string& msg);
int32_t CheckData(const std::string& inlong_group_id, const std::string& inlong_stream_id, const std::string& msg);

int32_t ValidateParams(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len);
int32_t ValidateParams(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len);

AtomicInt user_exit_flag_{0};
volatile bool init_flag_ = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,25 @@
* limitations under the License.
*/

#include "inlong_api.h"
#include "../../include/inlong_api.h"
#include "../core/api_imp.h"
namespace inlong {

InLongApi::InLongApi() { api_impl_ = std::make_shared<ApiImp>(); };
InLongApi::InLongApi() { api_impl_ = std::make_shared<ApiImp>(); }
InLongApi::~InLongApi() { api_impl_->CloseApi(10); }

int32_t InLongApi::InitApi(const char *config_path) {
return api_impl_->InitApi(config_path);
}
int32_t InLongApi::InitApi(const char *config_path) { return api_impl_->InitApi(config_path); }

int32_t InLongApi::Send(const char *inlong_group_id,
const char *inlong_stream_id, const char *msg,
int32_t msg_len, UserCallBack call_back) {
return api_impl_->Send(inlong_group_id, inlong_stream_id, msg, msg_len,
call_back);
int32_t InLongApi::Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len,
UserCallBack call_back) {
return api_impl_->Send(inlong_group_id, inlong_stream_id, msg, msg_len, call_back);
}

int32_t InLongApi::CloseApi(int32_t max_waitms) {
return api_impl_->CloseApi(max_waitms);
}
int32_t InLongApi::AddBid(const std::vector<std::string> &groupids) {
return api_impl_->AddInLongGroupId(groupids);
int32_t InLongApi::Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len,
int64_t data_time, UserCallBack call_back) {
return api_impl_->Send(inlong_group_id, inlong_stream_id, msg, msg_len, data_time, call_back);
}
} // namespace inlong

int32_t InLongApi::CloseApi(int32_t max_waitms) { return api_impl_->CloseApi(max_waitms); }
int32_t InLongApi::AddInLongGroupId(const std::vector<std::string> &bids) { return api_impl_->AddInLongGroupId(bids); }
} // namespace inlong
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef INLONG_SDK_API_H
Expand All @@ -34,21 +32,23 @@ typedef int (*UserCallBack)(const char *, const char *, const char *, int32_t,
class ApiImp;

class InLongApi {
public:
public:
InLongApi();
~InLongApi();
int32_t InitApi(const char *config_path);

int32_t AddBid(const std::vector<std::string> &groupids);
int32_t AddGroupId(const std::vector<std::string> &group_ids);

int32_t Send(const char *inlong_group_id, const char *inlong_stream_id,
const char *msg, int32_t msg_len,
int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len,
UserCallBack call_back = nullptr);

int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len,
int64_t data_time, UserCallBack call_back = nullptr);

int32_t CloseApi(int32_t max_waitms);

private:
private:
std::shared_ptr<ApiImp> api_impl_;
};
} // namespace inlong
#endif // INLONG_SDK_API_H
} // namespace inlong
#endif // INLONG_SDK_API_H
Loading

0 comments on commit 9ac4e5a

Please sign in to comment.