Skip to content

Commit

Permalink
Implement single shard use-case for rpoplpush. Some BLPOP related ref…
Browse files Browse the repository at this point in the history
…actoring
  • Loading branch information
romange committed Apr 28, 2022
1 parent d3764ef commit b36c16b
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 130 deletions.
19 changes: 17 additions & 2 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
1. To move lua_project to dragonfly from helio
1. To move lua_project to dragonfly from helio (DONE)
2. To limit lua stack to something reasonable like 4096.
3. To inject our own allocator to lua to track its memory.
3. To inject our own allocator to lua to track its memory.


## Object lifecycle and thread-safety.

Currently our transactional and locking model is based on an assumption that any READ or WRITE
access to objects must be performed in a shard where they belong.

However, this assumption can be relaxed to get significant gains for read-only queries.

### Explanation
Our transactional framework prevents from READ-locked objects to be mutated. It does not prevent from their PrimaryTable to grow or change, of course. These objects can move to different entries inside the table. However, our CompactObject maintains the following property - its reference CompactObject.AsRef() is valid no matter where the master object moves and it's valid and safe for reading even from other threads. The exception regarding thread safety is SmallString which uses translation table for its pointers.

If we change the SmallString translation table to be global and thread-safe (it should not have lots of write contention anyway) we may access primetable keys and values from another thread and write them directly to sockets.

Use-case: large strings that need to be copied. Sets that need to be serialized for SMEMBERS/HGETALL commands etc. Additional complexity - we will need to lock those variables even for single hop transactions and unlock them afterwards. The unlocking hop does not need to increase user-visible latency since it can be done after we send reply to the socket.
2 changes: 1 addition & 1 deletion src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ pair<PrimeIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, string_view
return make_pair(it, ExpireIterator{});
}

OpResult<pair<PrimeIterator, unsigned>> DbSlice::FindFirst(DbIndex db_index, const ArgSlice& args) {
OpResult<pair<PrimeIterator, unsigned>> DbSlice::FindFirst(DbIndex db_index, ArgSlice args) {
DCHECK(!args.empty());

for (unsigned i = 0; i < args.size(); ++i) {
Expand Down
4 changes: 2 additions & 2 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ class DbSlice {
// Returns (value, expire) dict entries if key exists, null if it does not exist or has expired.
std::pair<PrimeIterator, ExpireIterator> FindExt(DbIndex db_ind, std::string_view key) const;

// Returns dictEntry, args-index if found, KEY_NOTFOUND otherwise.
// Returns (iterator, args-index) if found, KEY_NOTFOUND otherwise.
// If multiple keys are found, returns the first index in the ArgSlice.
OpResult<std::pair<PrimeIterator, unsigned>> FindFirst(DbIndex db_index, const ArgSlice& args);
OpResult<std::pair<PrimeIterator, unsigned>> FindFirst(DbIndex db_index, ArgSlice args);

// Return .second=true if insertion ocurred, false if we return the existing key.
std::pair<PrimeIterator, bool> AddOrFind(DbIndex db_ind, std::string_view key);
Expand Down
182 changes: 145 additions & 37 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,75 @@ bool ElemCompare(const quicklistEntry& entry, string_view elem) {
return elem == an.Piece();
}

using FFResult = pair<PrimeKey, unsigned>; // key, argument index.

struct ShardFFResult {
PrimeKey key;
ShardId sid = kInvalidSid;
};

OpResult<ShardFFResult> FindFirst(Transaction* trans) {
VLOG(2) << "FindFirst::Find " << trans->DebugId();

// Holds Find results: (iterator to a found key, and its index in the passed arguments).
// See DbSlice::FindFirst for more details.
// spans all the shards for now.
std::vector<OpResult<FFResult>> find_res(trans->shard_set()->size());
fill(find_res.begin(), find_res.end(), OpStatus::KEY_NOTFOUND);

auto cb = [&find_res](auto* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
OpResult<pair<PrimeIterator, unsigned>> ff_res =
shard->db_slice().FindFirst(t->db_index(), args);

if (ff_res) {
FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second);
find_res[shard->shard_id()] = move(ff_result);
} else {
find_res[shard->shard_id()] = ff_res.status();
}
return OpStatus::OK;
};

trans->Execute(move(cb), false);

uint32_t min_arg_indx = UINT32_MAX;

ShardFFResult shard_result;

for (size_t sid = 0; sid < find_res.size(); ++sid) {
const auto& fr = find_res[sid];
auto status = fr.status();
if (status == OpStatus::KEY_NOTFOUND)
continue;

if (status == OpStatus::WRONG_TYPE) {
return status;
}

CHECK(fr);

const auto& it_pos = fr.value();

size_t arg_indx = trans->ReverseArgIndex(sid, it_pos.second);
if (arg_indx < min_arg_indx) {
min_arg_indx = arg_indx;
shard_result.sid = sid;

// we do not dereference the key, do not extract the string value, so it it
// ok to just move it. We can not dereference it due to limitations of SmallString
// that rely on thread-local data-structure for pointer translation.
shard_result.key = it_pos.first.AsRef();
}
}

if (shard_result.sid == kInvalidSid) {
return OpStatus::KEY_NOTFOUND;
}

return OpResult<ShardFFResult>{move(shard_result)};
}

class BPopper {
public:
explicit BPopper(ListDir dir);
Expand All @@ -122,22 +191,18 @@ class BPopper {
// If OK is returned then use result() to fetch the value.
OpStatus Run(Transaction* t, unsigned msec);


// returns (key, value) pair.
auto result() const {
return make_pair<string_view, string_view>(key_, value_);
}

bool found() const {
return found_;
}

private:
OpStatus Pop(Transaction* t, EngineShard* shard);

ListDir dir_;

bool found_ = false;
PrimeIterator find_it_;
ShardId find_sid_ = std::numeric_limits<ShardId>::max();
ShardFFResult ff_result_;

string key_;
string value_;
Expand All @@ -158,7 +223,7 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {

auto* stats = ServerState::tl_connection_stats();

OpResult<Transaction::FindFirstResult> result = t->FindFirst();
OpResult<ShardFFResult> result = FindFirst(t);

if (result.status() == OpStatus::KEY_NOTFOUND) {
if (is_multi) {
Expand All @@ -169,14 +234,16 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
return OpStatus::TIMED_OUT;
}

// Block
++stats->num_blocked_clients;
bool wait_succeeded = t->WaitOnWatch(tp);
--stats->num_blocked_clients;

if (!wait_succeeded)
return OpStatus::TIMED_OUT;

result = t->FindFirst(); // retry - must find something.
// Now we have something for sure.
result = FindFirst(t); // retry - must find something.
}

if (!result) {
Expand All @@ -185,9 +252,7 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
}

VLOG(1) << "Popping an element";
find_sid_ = result->sid;
find_it_ = result->find_res;
found_ = true;
ff_result_ = move(result.value());

auto cb = [this](Transaction* t, EngineShard* shard) { return Pop(t, shard); };
t->Execute(std::move(cb), true);
Expand All @@ -196,18 +261,20 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
}

OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) {
DCHECK(found());

if (shard->shard_id() == find_sid_) {
find_it_->first.GetString(&key_);
if (shard->shard_id() == ff_result_.sid) {
ff_result_.key.GetString(&key_);

quicklist* ql = GetQL(find_it_->second);
auto it_res = shard->db_slice().Find(t->db_index(), key_, OBJ_LIST);
CHECK(it_res); // must exist and must be ok.
PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);
value_ = ListPop(dir_, ql);

if (quicklistCount(ql) == 0) {
CHECK(shard->db_slice().Del(t->db_index(), find_it_));
CHECK(shard->db_slice().Del(t->db_index(), it));
}
}

return OpStatus::OK;
}

Expand Down Expand Up @@ -242,9 +309,10 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) {
string_view dest = ArgS(args, 2);

OpResult<string> result;
if (dest == src) {

if (cntx->transaction->unique_shard_cnt() == 1) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpRPopLPushSingleKey(OpArgs{shard, t->db_index()}, src);
return OpRPopLPushSingleShard(OpArgs{shard, t->db_index()}, src, dest);
};

result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
Expand Down Expand Up @@ -446,11 +514,12 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn
OpStatus result = popper.Run(transaction, unsigned(timeout * 1000));

if (result == OpStatus::OK) {
CHECK(popper.found());
VLOG(1) << "BLPop returned ";

auto res = popper.result();

VLOG(1) << "BLPop returned from " << res.first; // key.

std::string_view str_arr[2] = {res.first, res.second};

return (*cntx)->SendStringArr(str_arr);
}

Expand Down Expand Up @@ -550,7 +619,7 @@ OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view ke
} else {
tie(it, new_key) = es->db_slice().AddOrFind(op_args.db_ind, key);
}
quicklist* ql;
quicklist* ql = nullptr;

if (new_key) {
robj* o = createQuicklistObject();
Expand All @@ -572,10 +641,12 @@ OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view ke
quicklistPush(ql, es->tmp_str1, sdslen(es->tmp_str1), pos);
}

if (new_key && es->blocking_controller()) {
string tmp;
string_view key = it->first.GetSlice(&tmp);
es->blocking_controller()->AwakeWatched(op_args.db_ind, key);
if (new_key) {
if (es->blocking_controller()) {
string tmp;
string_view key = it->first.GetSlice(&tmp);
es->blocking_controller()->AwakeWatched(op_args.db_ind, key);
}
} else {
es->db_slice().PostUpdate(op_args.db_ind, it);
}
Expand Down Expand Up @@ -811,17 +882,54 @@ OpResult<StringVec> ListFamily::OpRange(const OpArgs& op_args, std::string_view
return str_vec;
}

OpResult<string> ListFamily::OpRPopLPushSingleKey(const OpArgs& op_args, std::string_view key) {
OpResult<string> ListFamily::OpRPopLPushSingleShard(const OpArgs& op_args, string_view src,
string_view dest) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
if (!it_res)
return it_res.status();
auto src_res = db_slice.Find(op_args.db_ind, src, OBJ_LIST);
if (!src_res)
return src_res.status();

PrimeIterator src_it = *src_res;
quicklist* src_ql = GetQL(src_it->second);

if (src == dest) { // simple case.
db_slice.PreUpdate(op_args.db_ind, src_it);
string val = ListPop(ListDir::RIGHT, src_ql);

quicklistPushHead(src_ql, val.data(), val.size());
db_slice.PostUpdate(op_args.db_ind, src_it);

return val;
}

quicklist* dest_ql = nullptr;
auto [dest_it, created] = db_slice.AddOrFind(op_args.db_ind, dest);
if (created) {
robj* obj = createQuicklistObject();
dest_ql = (quicklist*)obj->ptr;
quicklistSetOptions(dest_ql, FLAGS_list_max_listpack_size, FLAGS_list_compress_depth);
dest_it->second.ImportRObj(obj);

// Insertion of dest could invalidate src_it. Find it again.
src_it = db_slice.GetTables(op_args.db_ind).first->Find(src);
} else {
if (dest_it->second.ObjType() != OBJ_LIST)
return OpStatus::WRONG_TYPE;

dest_ql = GetQL(dest_it->second);
db_slice.PreUpdate(op_args.db_ind, dest_it);
}

db_slice.PreUpdate(op_args.db_ind, src_it);
string val = ListPop(ListDir::RIGHT, src_ql);
quicklistPushHead(dest_ql, val.data(), val.size());
db_slice.PostUpdate(op_args.db_ind, src_it);
db_slice.PostUpdate(op_args.db_ind, dest_it);

if (quicklistCount(src_ql) == 0) {
CHECK(db_slice.Del(op_args.db_ind, src_it));
}

PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);
db_slice.PreUpdate(op_args.db_ind, it);
string val = ListPop(ListDir::RIGHT, ql);
quicklistPushHead(ql, val.data(), val.size());
return val;
}

Expand Down
3 changes: 2 additions & 1 deletion src/server/list_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class ListFamily {
static OpResult<StringVec> OpRange(const OpArgs& op_args, std::string_view key, long start,
long end);

static OpResult<std::string> OpRPopLPushSingleKey(const OpArgs& op_args, std::string_view key);
static OpResult<std::string> OpRPopLPushSingleShard(const OpArgs& op_args, std::string_view src,
std::string_view dest);
};

} // namespace dfly
Loading

0 comments on commit b36c16b

Please sign in to comment.