Skip to content

Commit

Permalink
fix(bulkload) fix unit test.
Browse files Browse the repository at this point in the history
  • Loading branch information
lupengfan1 committed Sep 24, 2024
1 parent 020b105 commit 40ae94b
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 10 deletions.
261 changes: 261 additions & 0 deletions config-server.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
; Licensed to the Apache Software Foundation (ASF) under one
; or more contributor license agreements. See the NOTICE file
; distributed with this work for additional information
; regarding copyright ownership. The ASF licenses this file
; to you under the Apache License, Version 2.0 (the
; "License"); you may not use this file except in compliance
; with the License. You may obtain a copy of the License at
;
; http://www.apache.org/licenses/LICENSE-2.0
;
; Unless required by applicable law or agreed to in writing,
; software distributed under the License is distributed on an
; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
; KIND, either express or implied. See the License for the
; specific language governing permissions and limitations
; under the License.
[apps..default]
run = true
count = 1

[apps.meta]
type = meta
ports = @META_PORT@
pools = THREAD_POOL_DEFAULT,THREAD_POOL_META_SERVER,THREAD_POOL_META_STATE,THREAD_POOL_FD,THREAD_POOL_DLOCK,THREAD_POOL_BLOCK_SERVICE

[apps.replica]
type = replica
ports = @REPLICA_PORT@
pools = THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION_LONG,THREAD_POOL_REPLICATION,THREAD_POOL_FD,THREAD_POOL_LOCAL_APP,THREAD_POOL_BLOCK_SERVICE,THREAD_POOL_COMPACT,THREAD_POOL_INGESTION,THREAD_POOL_PLOG,THREAD_POOL_SCAN

[apps.collector]
type = collector
ports = 34101
pools = THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION

[core]
tool = nativerun
toollets = profiler
enable_default_app_mimic = true
logging_start_level = LOG_LEVEL_INFO

[network]
primary_interface = lpf-desk.

[block_service.local_service]
type = local_service
args = ../block_service/local_service

[tools.simple_logger]
short_header = false
stderr_start_level = LOG_LEVEL_ERROR

[threadpool..default]
worker_count = 4
worker_priority = THREAD_xPRIORITY_NORMAL
partitioned = false

[threadpool.THREAD_POOL_DEFAULT]
name = default
# The worker count in THREAD_POOL_DEFAULT must be >= 5.
# Because in info collector server, there are four timer tasks(LPC_PEGASUS_APP_STAT_TIMER, LPC_PEGASUS_STORAGE_SIZE_STAT_TIMER,
# LPC_DETECT_AVAILABLE and LPC_PEGASUS_CAPACITY_UNIT_STAT_TIMER). Each of these timer tasks occupies a thread in THREAD_POOL_DEFAULT.
# Each of these timer tasks calls remote procedure to meta server(which produce a callback), and waits for the rpc's callback to execute.
# If the worker_count <= 4, all of these threads are occupied by these timer tasks. so their rpc's callbacks can't get a thread to run.
# it comes to be a deadlock(timer task wait for rpc's callback to execute, and rpc's callback wait for the timer task to release the thread).
worker_count = 5

[threadpool.THREAD_POOL_REPLICATION]
name = replica
partitioned = true
worker_count = 2

[threadpool.THREAD_POOL_META_STATE]
name = meta_state
partitioned = true
worker_count = 1

[threadpool.THREAD_POOL_DLOCK]
name = dist_lock
partitioned = true
worker_count = 1

[threadpool.THREAD_POOL_FD]
name = fd
worker_count = 2

[threadpool.THREAD_POOL_LOCAL_APP]
name = local_app
worker_count = 2

[threadpool.THREAD_POOL_SCAN]
name = scan_query
worker_count = 2

[threadpool.THREAD_POOL_REPLICATION_LONG]
name = rep_long
worker_count = 2

[threadpool.THREAD_POOL_BLOCK_SERVICE]
name = block_service
worker_count = 1

[threadpool.THREAD_POOL_COMPACT]
name = compact
worker_count = 1

[threadpool.THREAD_POOL_INGESTION]
name = ingestion
partitioned = false
worker_count = 2

[threadpool.THREAD_POOL_PLOG]
name = plog
partitioned = true
worker_count = 4

[meta_server]
server_list = lpf-desk.:34601,lpf-desk.:34602,lpf-desk.:34603
cluster_root = /pegasus/onebox/lpf-desk.
distributed_lock_service_type = distributed_lock_service_zookeeper
distributed_lock_service_parameters = /pegasus/onebox/lpf-desk.
meta_state_service_type = meta_state_service_zookeeper
stable_rs_min_running_seconds = 0
server_load_balancer_type = greedy_load_balancer
min_live_node_count_for_unfreeze = 1
cold_backup_disabled = false
recover_from_replica_server = false

[replication]
mutation_2pc_min_replica_count = 1
disk_min_available_space_ratio = 10
cold_backup_root = onebox
cluster_name = onebox

[meta_server.apps.temp]
app_name = temp
app_type = pegasus
partition_count = 8

[meta_server.apps.stat]
app_name = stat
app_type = pegasus
partition_count = 4

[pegasus.server]
# Where the metrics are collected. If no value is given, no sink is used.
# Options:
# - falcon
# - prometheus
perf_counter_sink =
# The HTTP port exposed to Prometheus for pulling metrics from pegasus server.
prometheus_port = @PROMETHEUS_PORT@
encrypt_data_at_rest = false

[pegasus.collector]
available_detect_app = stat
available_detect_alert_script_dir = ./package/bin
usage_stat_app = stat
enable_detect_hotkey = false

[pegasus.clusters]
onebox = lpf-desk.:34601,lpf-desk.:34602,lpf-desk.:34603
onebox2 = 0.0.0.0:35601

# The group of clusters participating in duplication.
# Each cluster is assigned with a unique cluster id [1, 127] to identify which cluster
# the write comes from.
[duplication-group]
onebox = 1
onebox2 = 2

[zookeeper]
hosts_list = 127.0.0.1:22181
timeout_ms = 60000
logfile = zoo.log

[task..default]
is_trace = false
is_profile = false
allow_inline = false
fast_execution_in_network_thread = false
rpc_call_header_format = NET_HDR_DSN
rpc_call_channel = RPC_CHANNEL_TCP
rpc_timeout_milliseconds = 5000

[task.RPC_PREPARE]
is_profile = true

[task.RPC_PREPARE_ACK]
is_profile = true

[task.RPC_RRDB_RRDB_PUT]
is_profile = true
profiler::size.request.server = true
rpc_request_throttling_mode = TM_REJECT

[task.RPC_RRDB_RRDB_PUT_ACK]
is_profile = true

[task.RPC_RRDB_RRDB_MULTI_PUT]
is_profile = true
profiler::size.request.server = true
rpc_request_throttling_mode = TM_REJECT

[task.RPC_RRDB_RRDB_MULTI_PUT_ACK]
is_profile = true

[task.RPC_RRDB_RRDB_REMOVE]
is_profile = true
rpc_request_throttling_mode = TM_REJECT

[task.RPC_RRDB_RRDB_REMOVE_ACK]
is_profile = true

[task.RPC_RRDB_RRDB_MULTI_REMOVE]
is_profile = true
rpc_request_throttling_mode = TM_REJECT

[task.RPC_RRDB_RRDB_MULTI_REMOVE_ACK]
is_profile = true

[task.RPC_RRDB_RRDB_INCR]
is_profile = true
rpc_request_throttling_mode = TM_REJECT

[task.RPC_RRDB_RRDB_INCR_ACK]
is_profile = true

[task.RPC_RRDB_RRDB_CHECK_AND_SET]
is_profile = true
rpc_request_throttling_mode = TM_REJECT

[task.RPC_RRDB_RRDB_CHECK_AND_SET_ACK]
is_profile = true

[task.RPC_RRDB_RRDB_CHECK_AND_MUTATE]
is_profile = true
rpc_request_throttling_mode = TM_REJECT

[task.RPC_RRDB_RRDB_CHECK_AND_MUTATE_ACK]
is_profile = true

[task.RPC_RRDB_RRDB_GET]
is_profile = true
profiler::size.response.server = true
rpc_request_throttling_mode = TM_REJECT

[task.RPC_RRDB_RRDB_GET_ACK]
is_profile = true

[task.RPC_RRDB_RRDB_MULTI_GET]
is_profile = true
profiler::size.response.server = true
rpc_request_throttling_mode = TM_REJECT

[task.RPC_RRDB_RRDB_BATCH_GET]
is_profile = true
profiler::size.response.server = true

[task.RPC_RRDB_RRDB_BATCH_GET_ACK]
is_profile = true
14 changes: 5 additions & 9 deletions src/replica/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,10 +530,8 @@ void replica_bulk_loader::download_files(const std::string &provider_name,
}
if (!download_file_metas.empty()) {
_download_files_task[download_file_metas.back().name] = tasking::enqueue(
LPC_BACKGROUND_BULK_LOAD,
tracker(),
[this, remote_dir, local_dir, download_file_metas, fs]() mutable {
this->download_sst_file(remote_dir, local_dir, download_file_metas, fs);
LPC_BACKGROUND_BULK_LOAD, tracker(), [=, file_metas = download_file_metas]() mutable {
this->download_sst_file(remote_dir, local_dir, std::move(file_metas), fs);
});
}
}
Expand All @@ -542,7 +540,7 @@ void replica_bulk_loader::download_files(const std::string &provider_name,
void replica_bulk_loader::download_sst_file(
const std::string &remote_dir,
const std::string &local_dir,
std::vector<::dsn::replication::file_meta> &download_file_metas,
std::vector<::dsn::replication::file_meta> &&download_file_metas,
dist::block_service::block_filesystem *fs)
{
if (_status != bulk_load_status::BLS_DOWNLOADING) {
Expand Down Expand Up @@ -609,10 +607,8 @@ void replica_bulk_loader::download_sst_file(
// download next file
if (!download_file_metas.empty()) {
_download_files_task[download_file_metas.back().name] = tasking::enqueue(
LPC_BACKGROUND_BULK_LOAD,
tracker(),
[this, remote_dir, local_dir, download_file_metas, fs]() mutable {
this->download_sst_file(remote_dir, local_dir, download_file_metas, fs);
LPC_BACKGROUND_BULK_LOAD, tracker(), [=, file_metas = download_file_metas]() mutable {
this->download_sst_file(remote_dir, local_dir, std::move(file_metas), fs);
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/replica/bulk_load/replica_bulk_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class replica_bulk_loader : replica_base
// download sst files from remote provider
void download_sst_file(const std::string &remote_dir,
const std::string &local_dir,
std::vector<::dsn::replication::file_meta> &download_file_metas,
std::vector<::dsn::replication::file_meta> &&download_file_metas,
dist::block_service::block_filesystem *fs);

// \return ERR_PATH_NOT_FOUND: file not exist
Expand Down

0 comments on commit 40ae94b

Please sign in to comment.