Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Feb 27, 2025
1 parent 6ce2bb6 commit 6209bae
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 15 deletions.
4 changes: 2 additions & 2 deletions src/replica/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ void replica_bulk_loader::check_ingestion_finish()
// checkpoint, to gurantee the condition above, we should pop all committed mutations in
// prepare list to gurantee learn type is LT_APP
mutation_ptr mu = _replica->new_mutation(invalid_decree);
mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
mu->add_client_request(nullptr);
_replica->init_prepare(mu, false, true);
_replica->_primary_states.ingestion_is_empty_prepare_sent = true;
}
Expand All @@ -727,7 +727,7 @@ void replica_bulk_loader::handle_bulk_load_succeed()
// send an empty prepare again to gurantee that learner should learn from checkpoint
if (status() == partition_status::PS_PRIMARY) {
mutation_ptr mu = _replica->new_mutation(invalid_decree);
mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
mu->add_client_request(nullptr);
_replica->init_prepare(mu, false, true);
}
}
Expand Down
10 changes: 2 additions & 8 deletions src/replica/mutation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,14 @@ void mutation::copy_from(mutation_ptr &old)
}
}

void mutation::add_client_request(task_code code, dsn::message_ex *request)
void mutation::add_client_request(dsn::message_ex *request)
{
data.updates.push_back(mutation_update());
mutation_update &update = data.updates.back();
_appro_data_bytes += 32; // approximate code size

if (request != nullptr) {
update.code = code;
update.code = request->rpc_code();
update.serialization_type =
(dsn_msg_serialize_format)request->header->context.u.serialize_format;
update.__set_start_time_ns(dsn_now_ns());
Expand All @@ -184,12 +184,6 @@ void mutation::add_client_request(task_code code, dsn::message_ex *request)
CHECK_EQ(client_requests.size(), data.updates.size());
}

void mutation::add_client_request(dsn::message_ex *request)
{
CHECK_NOTNULL(request, "");
add_client_request(request->rpc_code(), request);
}

void mutation::write_to(const std::function<void(const blob &)> &inserter) const
{
binary_writer writer(1024);
Expand Down
8 changes: 7 additions & 1 deletion src/replica/mutation.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,14 @@ class mutation : public ref_counter
// state change
void set_id(ballot b, decree c);
void set_timestamp(int64_t timestamp) { data.header.timestamp = timestamp; }
void add_client_request(task_code code, dsn::message_ex *request);

// Append a write request to this mutation, and also hold it if it is from a client
// to build the response to the client later.
//
// Parameters:
// - request: it is from a client if non-null, otherwise it is an empty write.
void add_client_request(dsn::message_ex *request);

void copy_from(mutation_ptr &old);
void set_logged()
{
Expand Down
2 changes: 1 addition & 1 deletion src/replica/replica_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ void replica::broadcast_group_check()
if (!FLAGS_empty_write_disabled &&
dsn_now_ms() >= _primary_states.last_prepare_ts_ms + FLAGS_group_check_interval_ms) {
mutation_ptr mu = new_mutation(invalid_decree);
mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
mu->add_client_request(nullptr);
init_prepare(mu, false);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/replica/replica_chkpt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ void replica::async_trigger_manual_emergency_checkpoint(decree min_checkpoint_de
// the decree to at least 1, to ensure that the checkpoint would inevitably
// be created even if the replica is empty.
mutation_ptr mu = new_mutation(invalid_decree);
mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
mu->add_client_request(nullptr);
init_prepare(mu, false);

async_trigger_manual_emergency_checkpoint(
Expand Down
2 changes: 1 addition & 1 deletion src/replica/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ void replica::replay_prepare_list()
"copy mutation from mutation_tid={} to mutation_tid={}", old->tid(), mu->tid());
mu->copy_from(old);
} else {
mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
mu->add_client_request(nullptr);

LOG_INFO_PREFIX("emit empty mutation {} with mutation_tid={} when replay prepare list",
mu->name(),
Expand Down
2 changes: 1 addition & 1 deletion src/replica/split/replica_split_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ void replica_split_manager::parent_handle_child_catch_up(
if (!FLAGS_empty_write_disabled) {
// empty wirte here to commit sync_point
mutation_ptr mu = _replica->new_mutation(invalid_decree);
mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
mu->add_client_request(nullptr);
_replica->init_prepare(mu, false);
CHECK_EQ_PREFIX_MSG(
sync_point, mu->data.header.decree, "sync_point should be equal to mutation's decree");
Expand Down

0 comments on commit 6209bae

Please sign in to comment.