Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
kostasrim committed Nov 11, 2024
1 parent 14e0f6c commit ef6fa2c
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 8 deletions.
3 changes: 1 addition & 2 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
}

void DbSlice::FlushDb(DbIndex db_ind) {
std::unique_lock<LocalBlockingCounter> lk(block_counter_);
block_counter_.Wait();
// clear client tracking map.
client_tracking_map_.clear();

Expand Down Expand Up @@ -1370,7 +1370,6 @@ void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key,
}

void DbSlice::UnregisterConnectionWatches(const ConnectionState::ExecInfo* exec_info) {
// Because we might remove while another fiber is preempted and miss a notification
for (const auto& [db_indx, key] : exec_info->watched_keys) {
auto& watched_keys = db_arr_[db_indx]->watched_keys;
if (auto it = watched_keys.find(key); it != watched_keys.end()) {
Expand Down
6 changes: 4 additions & 2 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,10 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor,
PrimeTable::Cursor cur = *cursor;
auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_cntx.db_index);
string scratch;
cur = prime_table->Traverse(
cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, scan_opts, &scratch, vec); });
do {
cur = prime_table->Traverse(
cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, scan_opts, &scratch, vec); });
} while (cur && cnt < scan_opts.limit);
VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur.value();
*cursor = cur.value();
}
Expand Down
7 changes: 5 additions & 2 deletions tests/dragonfly/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ def __init__(self, params: DflyParams, args):
if threads > 1:
self.args["num_shards"] = threads - 1

# Add 1 byte limit for big values
self.args["serialization_max_chunk_size"] = 1
if "disable_serialization_max_chunk_size" not in self.args:
# Add 1 byte limit for big values
self.args["serialization_max_chunk_size"] = 1
else:
self.args.pop("disable_serialization_max_chunk_size")

def __del__(self):
assert self.proc == None
Expand Down
12 changes: 10 additions & 2 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2407,9 +2407,17 @@ async def test_replicate_old_master(

dfly_version = "v1.19.2"
released_dfly_path = download_dragonfly_release(dfly_version)
master = df_factory.create(version=1.19, path=released_dfly_path, cluster_mode=cluster_mode)
master = df_factory.create(
disable_serialization_max_chunk_size=0,
version=1.19,
path=released_dfly_path,
cluster_mode=cluster_mode,
)
replica = df_factory.create(
cluster_mode=cluster_mode, cluster_announce_ip=announce_ip, announce_port=announce_port
disable_serialization_max_chunk_size=0,
cluster_mode=cluster_mode,
cluster_announce_ip=announce_ip,
announce_port=announce_port,
)

df_factory.start_all([master, replica])
Expand Down

0 comments on commit ef6fa2c

Please sign in to comment.