Skip to content

Commit

Permalink
chore: update command interface for bitops, bloom, zset families (#4235)
Browse files Browse the repository at this point in the history
  • Loading branch information
romange authored Dec 1, 2024
1 parent 4a85c69 commit c857ff9
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 224 deletions.
59 changes: 30 additions & 29 deletions src/server/bitops_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ using BitsStrVec = vector<string>;

// The following is the list of the functions that would handle the
// commands that handle the bit operations
void BitPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitField(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitFieldRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitOp(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void GetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void SetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitPos(CmdArgList args, const CommandContext& cmd_cntx);
void BitCount(CmdArgList args, const CommandContext& cmd_cntx);
void BitField(CmdArgList args, const CommandContext& cmd_cntx);
void BitFieldRo(CmdArgList args, const CommandContext& cmd_cntx);
void BitOp(CmdArgList args, const CommandContext& cmd_cntx);
void GetBit(CmdArgList args, const CommandContext& cmd_cntx);
void SetBit(CmdArgList args, const CommandContext& cmd_cntx);

OpResult<string> ReadValue(const DbContext& context, string_view key, EngineShard* shard);
OpResult<bool> ReadValueBitsetAt(const OpArgs& op_args, string_view key, uint32_t offset);
Expand Down Expand Up @@ -513,10 +513,10 @@ void HandleOpValueResult(const OpResult<T>& result, SinkReplyBuilder* builder) {

// ------------------------------------------------------------------------- //
// Impl for the command functions
void BitPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void BitPos(CmdArgList args, const CommandContext& cmd_cntx) {
// Support for the command BITPOS
// See details at https://redis.io/commands/bitpos/

auto* builder = cmd_cntx.rb;
if (args.size() < 1 || args.size() > 5) {
return builder->SendError(kSyntaxErr);
}
Expand Down Expand Up @@ -560,11 +560,11 @@ void BitPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return FindFirstBitWithValue(t->GetOpArgs(shard), key, value, start, end, as_bit);
};
OpResult<int64_t> res = tx->ScheduleSingleHopT(std::move(cb));
OpResult<int64_t> res = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, builder);
}

void BitCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void BitCount(CmdArgList args, const CommandContext& cmd_cntx) {
// Support for the command BITCOUNT
// See details at https://redis.io/commands/bitcount/
// Please note that if the key don't exists, it would return 0
Expand All @@ -577,14 +577,14 @@ void BitCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
: std::pair<int64_t, int64_t>{0, std::numeric_limits<int64_t>::max()};

bool as_bit = parser.HasNext() ? parser.MapNext("BYTE", false, "BIT", true) : false;

auto* builder = cmd_cntx.rb;
if (!parser.Finalize()) {
return builder->SendError(parser.Error()->MakeReply());
}
auto cb = [&](Transaction* t, EngineShard* shard) {
return CountBitsForValue(t->GetOpArgs(shard), key, start, end, as_bit);
};
OpResult<std::size_t> res = tx->ScheduleSingleHopT(std::move(cb));
OpResult<std::size_t> res = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, builder);
}

Expand Down Expand Up @@ -1111,26 +1111,27 @@ void BitFieldGeneric(CmdArgList args, bool read_only, Transaction* tx, SinkReply
SendResults(*res, builder);
}

void BitField(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
BitFieldGeneric(args, false, tx, builder);
void BitField(CmdArgList args, const CommandContext& cmd_cntx) {
BitFieldGeneric(args, false, cmd_cntx.tx, cmd_cntx.rb);
}

void BitFieldRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
BitFieldGeneric(args, true, tx, builder);
void BitFieldRo(CmdArgList args, const CommandContext& cmd_cntx) {
BitFieldGeneric(args, true, cmd_cntx.tx, cmd_cntx.rb);
}

#ifndef __clang__
#pragma GCC diagnostic pop
#endif

void BitOp(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void BitOp(CmdArgList args, const CommandContext& cmd_cntx) {
static const std::array<string_view, 4> BITOP_OP_NAMES{OR_OP_NAME, XOR_OP_NAME, AND_OP_NAME,
NOT_OP_NAME};
string op = absl::AsciiStrToUpper(ArgS(args, 0));
string_view dest_key = ArgS(args, 1);
bool illegal = std::none_of(BITOP_OP_NAMES.begin(), BITOP_OP_NAMES.end(),
[&op](auto val) { return op == val; });

auto* builder = cmd_cntx.rb;
if (illegal || (op == NOT_OP_NAME && args.size() > 3)) {
return builder->SendError(kSyntaxErr); // too many arguments
}
Expand All @@ -1155,12 +1156,12 @@ void BitOp(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return OpStatus::OK;
};

tx->Execute(std::move(shard_bitop), false); // we still have more work to do
cmd_cntx.tx->Execute(std::move(shard_bitop), false); // we still have more work to do
// All result from each shard
const auto joined_results = CombineResultOp(result_set, op);
// Second phase - save to target key if successful
if (!joined_results) {
tx->Conclude();
cmd_cntx.tx->Conclude();
builder->SendError(joined_results.status());
return;
} else {
Expand Down Expand Up @@ -1191,45 +1192,45 @@ void BitOp(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return OpStatus::OK;
};

tx->Execute(std::move(store_cb), true);
cmd_cntx.tx->Execute(std::move(store_cb), true);
builder->SendLong(op_result.size());
}
}

void GetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void GetBit(CmdArgList args, const CommandContext& cmd_cntx) {
// Support for the command "GETBIT key offset"
// see https://redis.io/commands/getbit/

uint32_t offset{0};
string_view key = ArgS(args, 0);

if (!absl::SimpleAtoi(ArgS(args, 1), &offset)) {
return builder->SendError(kInvalidIntErr);
return cmd_cntx.rb->SendError(kInvalidIntErr);
}
auto cb = [&](Transaction* t, EngineShard* shard) {
return ReadValueBitsetAt(t->GetOpArgs(shard), key, offset);
};
OpResult<bool> res = tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, builder);
OpResult<bool> res = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, cmd_cntx.rb);
}

void SetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void SetBit(CmdArgList args, const CommandContext& cmd_cntx) {
// Support for the command "SETBIT key offset new_value"
// see https://redis.io/commands/setbit/

CmdArgParser parser(args);
auto [key, offset, value] = parser.Next<string_view, uint32_t, FInt<0, 1>>();

if (auto err = parser.Error(); err) {
return builder->SendError(err->MakeReply());
return cmd_cntx.rb->SendError(err->MakeReply());
}

auto cb = [&](Transaction* t, EngineShard* shard) {
return BitNewValue(t->GetOpArgs(shard), key, offset, value != 0);
};

OpResult<bool> res = tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, builder);
OpResult<bool> res = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, cmd_cntx.rb);
}

// ------------------------------------------------------------------------- //
Expand Down
45 changes: 23 additions & 22 deletions src/server/bloom_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,96 +91,97 @@ OpResult<ExistsResult> OpExists(const OpArgs& op_args, string_view key, CmdArgLi

} // namespace

void BloomFamily::Reserve(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void BloomFamily::Reserve(CmdArgList args, const CommandContext& cmd_cntx) {
CmdArgParser parser(args);
string_view key = parser.Next();
SbfParams params;

tie(params.error, params.init_capacity) = parser.Next<double, uint32_t>();

if (parser.Error())
return builder->SendError(kSyntaxErr);
return cmd_cntx.rb->SendError(kSyntaxErr);

if (!params.ok())
return builder->SendError("error rate is out of range", kSyntaxErrType);
return cmd_cntx.rb->SendError("error rate is out of range", kSyntaxErrType);

const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpReserve(params, t->GetOpArgs(shard), key);
};

OpStatus res = tx->ScheduleSingleHop(std::move(cb));
OpStatus res = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
if (res == OpStatus::KEY_EXISTS) {
return builder->SendError("item exists");
return cmd_cntx.rb->SendError("item exists");
}
return builder->SendError(res);
return cmd_cntx.rb->SendError(res);
}

void BloomFamily::Add(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void BloomFamily::Add(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);

const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpAdd(t->GetOpArgs(shard), key, args);
};

OpResult res = tx->ScheduleSingleHopT(std::move(cb));
OpResult res = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
OpStatus status = res.status();
if (res) {
if (res->front())
return builder->SendLong(*res->front());
return cmd_cntx.rb->SendLong(*res->front());
else
status = res->front().status();
}

return builder->SendError(status);
return cmd_cntx.rb->SendError(status);
}

void BloomFamily::Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void BloomFamily::Exists(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);
const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExists(t->GetOpArgs(shard), key, args);
};

OpResult res = tx->ScheduleSingleHopT(std::move(cb));
return builder->SendLong(res ? res->front() : 0);
OpResult res = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
return cmd_cntx.rb->SendLong(res ? res->front() : 0);
}

void BloomFamily::MAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void BloomFamily::MAdd(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);

const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpAdd(t->GetOpArgs(shard), key, args);
};

OpResult res = tx->ScheduleSingleHopT(std::move(cb));
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
OpResult res = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (!res) {
return builder->SendError(res.status());
return rb->SendError(res.status());
}
const AddResult& add_res = *res;
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(builder);

rb->StartArray(add_res.size());
for (const OpResult<bool>& val : add_res) {
if (val) {
builder->SendLong(*val);
rb->SendLong(*val);
} else {
builder->SendError(val.status());
rb->SendError(val.status());
}
}
}

void BloomFamily::MExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void BloomFamily::MExists(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);

const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExists(t->GetOpArgs(shard), key, args);
};

OpResult res = tx->ScheduleSingleHopT(std::move(cb));
OpResult res = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));

RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(builder);
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
rb->StartArray(args.size());
for (size_t i = 0; i < args.size(); ++i) {
rb->SendLong(res ? res->at(i) : 0);
Expand Down
12 changes: 6 additions & 6 deletions src/server/bloom_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class SinkReplyBuilder;
namespace dfly {

class CommandRegistry;
class ConnectionContext;
struct CommandContext;

class BloomFamily {
public:
Expand All @@ -22,11 +22,11 @@ class BloomFamily {
private:
using SinkReplyBuilder = facade::SinkReplyBuilder;

static void Reserve(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Add(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void MAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Exists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void MExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void Reserve(CmdArgList args, const CommandContext& cmd_cntx);
static void Add(CmdArgList args, const CommandContext& cmd_cntx);
static void MAdd(CmdArgList args, const CommandContext& cmd_cntx);
static void Exists(CmdArgList args, const CommandContext& cmd_cntx);
static void MExists(CmdArgList args, const CommandContext& cmd_cntx);
};

} // namespace dfly
1 change: 0 additions & 1 deletion src/server/generic_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ namespace dfly {
using facade::CmdArgList;
using facade::OpResult;

class ConnectionContext;
class CommandRegistry;
class Transaction;
struct CommandContext;
Expand Down
Loading

0 comments on commit c857ff9

Please sign in to comment.