Skip to content

Commit aa47587

Browse files
committed
[fix](cloud)Fix read from peer use thread pool not asyncio
1 parent e4c029b commit aa47587

File tree

10 files changed

+134
-84
lines changed

10 files changed

+134
-84
lines changed

be/src/cloud/cloud_internal_service.cpp

Lines changed: 116 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -170,110 +170,152 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
170170
<< ", response=" << response->DebugString();
171171
}
172172

173-
void CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController* controller
174-
[[maybe_unused]],
175-
const PFetchPeerDataRequest* request,
176-
PFetchPeerDataResponse* response,
177-
google::protobuf::Closure* done) {
178-
// TODO(dx): use async thread pool to handle the request, not AsyncIO
179-
brpc::ClosureGuard closure_guard(done);
180-
g_file_cache_get_by_peer_num << 1;
181-
if (!config::enable_file_cache) {
182-
LOG_WARNING("try to access file cache data, but file cache not enabled");
183-
return;
184-
}
185-
int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
186-
std::chrono::steady_clock::now().time_since_epoch())
187-
.count();
188-
const auto type = request->type();
189-
const auto& path = request->path();
190-
response->mutable_status()->set_status_code(TStatusCode::OK);
191-
if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
173+
namespace {
174+
// Helper functions for fetch_peer_data
175+
176+
Status handle_peer_file_range_request(const std::string& path,
177+
PFetchPeerDataResponse* response) {
192178
// Read specific range [file_offset, file_offset+file_size) across cached blocks
193179
auto datas = io::FileCacheFactory::instance()->get_cache_data_by_path(path);
194180
for (auto& cb : datas) {
195181
*(response->add_datas()) = std::move(cb);
196182
}
197-
} else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
198-
// Multiple specific blocks
183+
return Status::OK();
184+
}
185+
186+
void set_error_response(PFetchPeerDataResponse* response, const std::string& error_msg) {
187+
response->mutable_status()->add_error_msgs(error_msg);
188+
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
189+
}
190+
191+
Status read_file_block(const std::shared_ptr<io::FileBlock>& file_block,
192+
doris::CacheBlockPB* output) {
193+
std::string data;
194+
data.resize(file_block->range().size());
195+
196+
auto begin_read_file_ts = std::chrono::duration_cast<std::chrono::microseconds>(
197+
std::chrono::steady_clock::now().time_since_epoch())
198+
.count();
199+
200+
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
201+
Slice slice(data.data(), data.size());
202+
Status read_st = file_block->read(slice, /*read_offset=*/0);
203+
204+
auto end_read_file_ts = std::chrono::duration_cast<std::chrono::microseconds>(
205+
std::chrono::steady_clock::now().time_since_epoch())
206+
.count();
207+
g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts - begin_read_file_ts);
208+
209+
if (read_st.ok()) {
210+
output->set_data(std::move(data));
211+
return Status::OK();
212+
} else {
213+
g_file_cache_get_by_peer_failed_num << 1;
214+
LOG(WARNING) << "read cache block failed: " << read_st;
215+
return read_st;
216+
}
217+
}
218+
219+
Status handle_peer_file_cache_block_request(const PFetchPeerDataRequest* request,
220+
PFetchPeerDataResponse* response) {
221+
const auto& path = request->path();
199222
auto hash = io::BlockFileCache::hash(path);
200223
auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
201224
if (cache == nullptr) {
202225
g_file_cache_get_by_peer_failed_num << 1;
203-
response->mutable_status()->add_error_msgs("can't get file cache instance");
204-
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
205-
return;
226+
set_error_response(response, "can't get file cache instance");
227+
return Status::InternalError("can't get file cache instance");
206228
}
229+
207230
io::CacheContext ctx {};
208-
// ensure a valid stats pointer is provided to cache layer
209231
io::ReadStatistics local_stats;
210232
ctx.stats = &local_stats;
233+
211234
for (const auto& cb_req : request->cache_req()) {
212235
size_t offset = static_cast<size_t>(std::max<int64_t>(0, cb_req.block_offset()));
213236
size_t size = static_cast<size_t>(std::max<int64_t>(0, cb_req.block_size()));
214237
auto holder = cache->get_or_set(hash, offset, size, ctx);
238+
215239
for (auto& fb : holder.file_blocks) {
216-
auto state = fb->state();
217-
if (state != io::FileBlock::State::DOWNLOADED) {
240+
if (fb->state() != io::FileBlock::State::DOWNLOADED) {
218241
g_file_cache_get_by_peer_failed_num << 1;
219-
LOG(WARNING) << "read cache block failed, state=" << state;
220-
response->mutable_status()->add_error_msgs("read cache file error");
221-
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
222-
return;
242+
LOG(WARNING) << "read cache block failed, state=" << fb->state();
243+
set_error_response(response, "read cache file error");
244+
return Status::InternalError("cache block not downloaded");
223245
}
246+
224247
g_file_cache_get_by_peer_blocks_num << 1;
225248
doris::CacheBlockPB* out = response->add_datas();
226249
out->set_block_offset(static_cast<int64_t>(fb->offset()));
227250
out->set_block_size(static_cast<int64_t>(fb->range().size()));
228-
std::string data;
229-
data.resize(fb->range().size());
230-
// Offload the file block read to a dedicated OS thread to avoid bthread IO
231-
Status read_st = Status::OK();
232-
// due to file_reader.cpp:33] Check failed: bthread_self() == 0
233-
int64_t begin_read_file_ts =
234-
std::chrono::duration_cast<std::chrono::microseconds>(
235-
std::chrono::steady_clock::now().time_since_epoch())
236-
.count();
237-
auto task = [&] {
238-
// Current thread not exist ThreadContext, usually after the thread is started, using SCOPED_ATTACH_TASK macro to create a ThreadContext and bind a Task.
239-
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
240-
Slice slice(data.data(), data.size());
241-
read_st = fb->read(slice, /*read_offset=*/0);
242-
};
243-
AsyncIO::run_task(task, io::FileSystemType::LOCAL);
244-
int64_t end_read_file_ts =
245-
std::chrono::duration_cast<std::chrono::microseconds>(
246-
std::chrono::steady_clock::now().time_since_epoch())
247-
.count();
248-
g_file_cache_get_by_peer_read_cache_file_latency
249-
<< (end_read_file_ts - begin_read_file_ts);
250-
if (read_st.ok()) {
251-
out->set_data(std::move(data));
252-
} else {
253-
g_file_cache_get_by_peer_failed_num << 1;
254-
LOG(WARNING) << "read cache block failed: " << read_st;
255-
response->mutable_status()->add_error_msgs("read cache file error");
256-
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
257-
return;
251+
252+
Status read_status = read_file_block(fb, out);
253+
if (!read_status.ok()) {
254+
set_error_response(response, "read cache file error");
255+
return read_status;
258256
}
259257
}
260258
}
259+
260+
return Status::OK();
261261
}
262-
DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", {
263-
int st_us = dp->param<int>("sleep", 1000);
264-
LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep", st_us);
265-
// sleep us
266-
bthread_usleep(st_us);
267-
});
262+
} // namespace
263+
264+
void CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController* controller
265+
[[maybe_unused]],
266+
const PFetchPeerDataRequest* request,
267+
PFetchPeerDataResponse* response,
268+
google::protobuf::Closure* done) {
269+
bool ret = _heavy_work_pool.try_offer([request, response, done]() {
270+
brpc::ClosureGuard closure_guard(done);
271+
g_file_cache_get_by_peer_num << 1;
268272

269-
int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
270-
std::chrono::steady_clock::now().time_since_epoch())
271-
.count();
272-
g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts);
273-
g_file_cache_get_by_peer_success_num << 1;
273+
if (!config::enable_file_cache) {
274+
LOG_WARNING("try to access file cache data, but file cache not enabled");
275+
return;
276+
}
274277

275-
VLOG_DEBUG << "fetch cache request=" << request->DebugString()
276-
<< ", response=" << response->DebugString();
278+
auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
279+
std::chrono::steady_clock::now().time_since_epoch())
280+
.count();
281+
282+
const auto type = request->type();
283+
const auto& path = request->path();
284+
response->mutable_status()->set_status_code(TStatusCode::OK);
285+
286+
Status status = Status::OK();
287+
if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
288+
status = handle_peer_file_range_request(path, response);
289+
} else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
290+
status = handle_peer_file_cache_block_request(request, response);
291+
}
292+
293+
if (!status.ok()) {
294+
LOG(WARNING) << "fetch peer data failed: " << status.to_string();
295+
set_error_response(response, status.to_string());
296+
}
297+
298+
DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", {
299+
int st_us = dp->param<int>("sleep", 1000);
300+
LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep", st_us);
301+
bthread_usleep(st_us);
302+
});
303+
304+
auto end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
305+
std::chrono::steady_clock::now().time_since_epoch())
306+
.count();
307+
g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts);
308+
g_file_cache_get_by_peer_success_num << 1;
309+
310+
VLOG_DEBUG << "fetch cache request=" << request->DebugString()
311+
<< ", response=" << response->DebugString();
312+
});
313+
314+
if (!ret) {
315+
brpc::ClosureGuard closure_guard(done);
316+
LOG(WARNING) << "fail to offer fetch peer data request to the work pool, pool="
317+
<< _heavy_work_pool.get_info();
318+
}
277319
}
278320

279321
#include "common/compile_check_end.h"

be/src/cloud/config.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ DEFINE_mBool(enable_standby_passive_compaction, "true");
133133

134134
DEFINE_mDouble(standby_compaction_version_ratio, "0.8");
135135

136-
DEFINE_mBool(enable_cache_read_from_peer, "false");
136+
DEFINE_mBool(enable_cache_read_from_peer, "true");
137137

138138
// Cache the expiration time of the peer address.
139139
// This can be configured to be less than the `rehash_tablet_after_be_dead_seconds` setting in the `fe` configuration.

be/src/io/cache/block_file_cache_downloader.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,13 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met
318318

319319
std::unique_ptr<char[]> buffer(new char[one_single_task_size]);
320320

321+
DBUG_EXECUTE_IF("FileCacheBlockDownloader::download_segment_file_sleep", {
322+
auto sleep_time = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
323+
"FileCacheBlockDownloader::download_segment_file_sleep", "sleep_time", 3);
324+
LOG(INFO) << "FileCacheBlockDownloader::download_segment_file_sleep: sleep_time=" << sleep_time;
325+
sleep(sleep_time);
326+
});
327+
321328
size_t task_offset = 0;
322329
for (size_t i = 0; i < task_num; i++) {
323330
size_t offset = meta.offset + task_offset;

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3339,7 +3339,7 @@ public static int metaServiceRpcRetryTimes() {
33393339
@ConfField(mutable = true, masterOnly = true)
33403340
public static int cloud_min_balance_tablet_num_per_run = 2;
33413341

3342-
@ConfField(description = {"指定存算分离模式下所有Compute group的扩缩容预热方式。"
3342+
@ConfField(mutable = true, masterOnly = true, description = {"指定存算分离模式下所有Compute group的扩缩容预热方式。"
33433343
+ "without_warmup: 直接修改tablet分片映射,首次读从S3拉取,均衡最快但性能波动最大;"
33443344
+ "async_warmup: 异步预热,尽力而为拉取cache,均衡较快但可能cache miss;"
33453345
+ "sync_warmup: 同步预热,确保cache迁移完成,均衡较慢但无cache miss;"

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -980,7 +980,7 @@ private void handleWarmupCompletion(InfightTask task, String clusterId, boolean
980980
sendPreHeatingRpc(task.pickedTablet, task.srcBe, task.destBe);
981981
} catch (Exception e) {
982982
LOG.warn("Failed to preheat tablet {} from {} to {}, "
983-
+ "help msg turn off fe config enable_cloud_warm_up_for_rebalance",
983+
+ "help msg change fe config cloud_warm_up_for_rebalance_type to without_warmup, ",
984984
task.pickedTablet.getId(), task.srcBe, task.destBe, e);
985985
}
986986
}
@@ -1278,7 +1278,7 @@ private void preheatAndUpdateTablet(Tablet pickedTablet, long srcBe, long destBe
12781278
sendPreHeatingRpc(pickedTablet, srcBe, destBe);
12791279
} catch (Exception e) {
12801280
LOG.warn("Failed to preheat tablet {} from {} to {}, "
1281-
+ "help msg turn off fe config enable_cloud_warm_up_for_rebalance",
1281+
+ "help msg change fe config cloud_warm_up_for_rebalance_type to without_warmup ",
12821282
pickedTablet.getId(), srcBe, destBe, e);
12831283
return;
12841284
}

regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ suite('test_balance_metrics', 'docker') {
2929
'sys_log_verbose_modules=org',
3030
'heartbeat_interval_second=1',
3131
'rehash_tablet_after_be_dead_seconds=3600',
32-
'enable_cloud_warm_up_for_rebalance=false'
32+
'cloud_warm_up_for_rebalance_type=without_warmup'
3333
]
3434
options.beConfigs += [
3535
'report_tablet_interval_seconds=1',

regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ suite('test_peer_read_async_warmup', 'docker') {
3838
'schedule_sync_tablets_interval_s=18000',
3939
'disable_auto_compaction=true',
4040
'sys_log_verbose_modules=*',
41+
'enable_cache_read_from_peer=true',
4142
]
4243
options.setFeNum(1)
4344
options.setBeNum(1)

regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ suite('test_drop_cluster_clean_metrics', 'docker') {
2929
'sys_log_verbose_modules=org',
3030
'heartbeat_interval_second=1',
3131
'rehash_tablet_after_be_dead_seconds=3600',
32-
'enable_cloud_warm_up_for_rebalance=false'
32+
'cloud_warm_up_for_rebalance_type=without_warmup'
3333
]
3434
options.beConfigs += [
3535
'report_tablet_interval_seconds=1',

regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ suite('test_fe_tablet_same_backend', 'multi_cluster,docker') {
107107
def options = new ClusterOptions()
108108
options.feConfigs += [
109109
'cloud_cluster_check_interval_second=1',
110-
'enable_cloud_warm_up_for_rebalance=true',
110+
'cloud_warm_up_for_rebalance_type=async_warmup',
111111
'cloud_tablet_rebalancer_interval_second=1',
112112
'cloud_balance_tablet_percent_per_run=1.0',
113113
]

regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ suite('test_rebalance_in_cloud', 'multi_cluster,docker') {
4040
'sys_log_verbose_modules=org',
4141
]
4242
}
43-
clusterOptions[0].feConfigs += ['enable_cloud_warm_up_for_rebalance=true', 'cloud_pre_heating_time_limit_sec=300']
44-
clusterOptions[1].feConfigs += ['enable_cloud_warm_up_for_rebalance=false']
43+
clusterOptions[0].feConfigs += ['cloud_warm_up_for_rebalance_type=sync_warmup','cloud_pre_heating_time_limit_sec=300']
44+
clusterOptions[1].feConfigs += ['cloud_warm_up_for_rebalance_type=without_warmup']
4545

4646

4747
for (int i = 0; i < clusterOptions.size(); i++) {
@@ -178,7 +178,7 @@ suite('test_rebalance_in_cloud', 'multi_cluster,docker') {
178178
// add a be
179179
cluster.addBackend(1, null)
180180
// warm up
181-
sql """admin set frontend config("enable_cloud_warm_up_for_rebalance"="true")"""
181+
sql """admin set frontend config("cloud_warm_up_for_rebalance_type"="sync_warmup")"""
182182

183183
// test rebalance thread still work
184184
awaitUntil(30) {

0 commit comments

Comments
 (0)