Skip to content

Commit

Permalink
refactor: address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Stepan Bagritsevich <[email protected]>
  • Loading branch information
BagritsevichStepan committed Feb 4, 2025
1 parent 8c376ca commit d8282a2
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 66 deletions.
6 changes: 4 additions & 2 deletions src/redis/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,8 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id) {

/* Trims a stream by length. Returns the number of deleted items. */
int64_t streamTrimByLength(stream *s, long long maxlen, int approx, streamID *last_id) {
return streamTrimByLengthLimited(s, maxlen, approx, approx ? 100 * server.stream_node_max_entries : 0, last_id);
long long limit = approx ? 100 * server.stream_node_max_entries : 0;
return streamTrimByLengthLimited(s, maxlen, approx, limit, last_id);
}

/* Trims a stream by length. Returns the number of deleted items. */
Expand All @@ -495,7 +496,8 @@ int64_t streamTrimByLengthLimited(stream *s, long long maxlen, int approx, long

/* Trims a stream by minimum ID. Returns the number of deleted items. */
int64_t streamTrimByID(stream *s, streamID minid, int approx, streamID *last_id) {
return streamTrimByIDLimited(s, minid, approx, approx ? 100 * server.stream_node_max_entries : 0, last_id);
long long limit = approx ? 100 * server.stream_node_max_entries : 0;
return streamTrimByIDLimited(s, minid, approx, limit, last_id);
}

/* Trims a stream by minimum ID. Returns the number of deleted items. */
Expand Down
143 changes: 81 additions & 62 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ struct Record {

using RecordVec = vector<Record>;

using nonstd::make_unexpected;

template <typename T> using ParseResult = io::Result<T, ErrorReply>;

nonstd::unexpected_type<ErrorReply> CreateSyntaxError(std::string_view message) {
return make_unexpected(ErrorReply{message, kSyntaxErrType});
}

struct ParsedStreamId {
streamID val;

Expand Down Expand Up @@ -90,6 +98,7 @@ struct TrimOpts {
int32_t limit = kNoLimit;
bool approx = false;
};

struct AddOpts {
std::optional<TrimOpts> trim_opts;
ParsedStreamId parsed_id;
Expand Down Expand Up @@ -177,6 +186,8 @@ struct ReadOpts {
bool noack = false;
};

const char kTrimOptionConflictErr[] =
"MAXLEN and MINID options at the same time are not compatible";
const char kInvalidStreamId[] = "Invalid stream ID specified as stream command argument";
const char kXGroupKeyNotFound[] =
"The XGROUP subcommand requires the key to exist. "
Expand Down Expand Up @@ -644,8 +655,8 @@ bool JournalAsMinId(const TrimOpts& opts) {
return opts.approx || opts.IsMaxLen();
}

OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const ShardArgs& shard_args,
const AddOpts& opts, CmdArgList args) {
OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts& opts,
bool journal_as_minid, CmdArgList args) {
DCHECK(!args.empty() && args.size() % 2 == 0);

auto& db_slice = op_args.GetDbSlice();
Expand Down Expand Up @@ -696,18 +707,21 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const ShardArgs

mem_tracker.UpdateStreamSize(it->second);

if (op_args.shard->journal()) {
if (opts.trim_opts && JournalAsMinId(opts.trim_opts.value())) {
// We need to set exact MinId in the journal.
std::string last_id = StreamsIdToString(trim_result.second);
std::vector<std::string_view> journal_args = {key, "MINID"sv, "="sv, last_id};
if (opts.no_mkstream) {
journal_args.push_back("NOMKSTREAM"sv);
}
RecordJournal(op_args, "XADD"sv, journal_args);
} else {
RecordJournal(op_args, "XADD"sv, shard_args);
if (op_args.shard->journal() && journal_as_minid) {
// We need to set exact MinId in the journal.
std::string last_id = StreamsIdToString(trim_result.second);
std::vector<std::string_view> journal_args = {key, "MINID"sv, "="sv, last_id};
journal_args.reserve(args.size());

if (opts.no_mkstream) {
journal_args.push_back("NOMKSTREAM"sv);
}

for (size_t i = 0; i < args.size(); i++) {
journal_args.push_back(args[i]);
}

RecordJournal(op_args, "XADD"sv, journal_args);
}

auto blocking_controller = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
Expand Down Expand Up @@ -1970,8 +1984,8 @@ void XGroupHelp(CmdArgList args, const CommandContext& cmd_cntx) {
return rb->SendSimpleStrArr(help_arr);
}

OpResult<int64_t> OpTrim(const OpArgs& op_args, std::string_view key, const ShardArgs& shard_args,
const TrimOpts& opts) {
OpResult<int64_t> OpTrim(const OpArgs& op_args, std::string_view key, const TrimOpts& opts,
bool journal_as_minid) {
auto& db_slice = op_args.GetDbSlice();
auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM);
if (!res_it) {
Expand All @@ -1990,21 +2004,15 @@ OpResult<int64_t> OpTrim(const OpArgs& op_args, std::string_view key, const Shar

mem_tracker.UpdateStreamSize(cobj);

if (op_args.shard->journal()) {
if (JournalAsMinId(opts)) {
// We need to set exact MinId in the journal.
std::string last_id = StreamsIdToString(res.second);
RecordJournal(op_args, "XTRIM"sv, ArgSlice{key, "MINID"sv, "="sv, last_id});
} else {
RecordJournal(op_args, "XTRIM"sv, shard_args);
}
if (op_args.shard->journal() && journal_as_minid) {
std::string last_id = StreamsIdToString(res.second);
RecordJournal(op_args, "XTRIM"sv, ArgSlice{key, "MINID"sv, "="sv, last_id});
}

return res.first;
}

optional<TrimOpts> ParseTrimOptsOrReply(bool max_len, CmdArgParser* parser,
SinkReplyBuilder* builder) {
ParseResult<TrimOpts> ParseTrimOpts(bool max_len, CmdArgParser* parser) {
TrimOpts opts;
opts.approx = parser->Check("~");
if (!opts.approx) {
Expand All @@ -2016,17 +2024,15 @@ optional<TrimOpts> ParseTrimOptsOrReply(bool max_len, CmdArgParser* parser,
} else {
ParsedStreamId parsed_id;
if (!ParseID(parser->Next(), false, 0, &parsed_id)) {
builder->SendError(kSyntaxErr);
return std::nullopt;
return CreateSyntaxError(kSyntaxErr);
}

opts.length_or_id = parsed_id; // trivial copy
}

if (parser->Check("LIMIT")) {
if (!opts.approx) {
builder->SendError(kSyntaxErr);
return std::nullopt;
return CreateSyntaxError(kSyntaxErr);
}

opts.limit = parser->Next<uint32_t>();
Expand All @@ -2035,15 +2041,22 @@ optional<TrimOpts> ParseTrimOptsOrReply(bool max_len, CmdArgParser* parser,
return opts;
}

optional<TrimOpts> ParseTrimOptsOrReply(CmdArgParser* parser, SinkReplyBuilder* builder) {
ParseResult<TrimOpts> ParseTrimOpts(CmdArgParser* parser) {
bool max_len = parser->Check("MAXLEN");
if (!max_len) {
parser->ExpectTag("MINID");
}
return ParseTrimOptsOrReply(max_len, parser, builder);

auto res = ParseTrimOpts(max_len, parser);

if (parser->Check("MAXLEN") || parser->Check("MINID")) {
return CreateSyntaxError(kTrimOptionConflictErr);
}

return res;
}

optional<AddOpts> ParseAddOptsOrReply(CmdArgParser* parser, SinkReplyBuilder* builder) {
ParseResult<AddOpts> ParseAddOpts(CmdArgParser* parser) {
AddOpts opts;
while (parser->HasNext()) {
if (parser->Check("NOMKSTREAM")) {
Expand All @@ -2053,13 +2066,22 @@ optional<AddOpts> ParseAddOptsOrReply(CmdArgParser* parser, SinkReplyBuilder* bu

bool max_len = parser->Check("MAXLEN");
if (max_len || parser->Check("MINID")) {
auto trim_opts = ParseTrimOptsOrReply(max_len, parser, builder);
if (opts.trim_opts) {
return CreateSyntaxError(kTrimOptionConflictErr);
}

auto trim_opts = ParseTrimOpts(max_len, parser);
if (!trim_opts) {
return std::nullopt;
return make_unexpected(trim_opts.error());
}

opts.trim_opts = trim_opts.value(); // trivial copy
} else {
// It is StreamId
std::string_view id = parser->Next();
if (!ParseID(id, true, 0, &opts.parsed_id)) {
return CreateSyntaxError(kInvalidStreamId);
}
break;
}
}
Expand Down Expand Up @@ -2597,32 +2619,29 @@ void StreamFamily::XAdd(CmdArgList args, const CommandContext& cmd_cntx) {

string_view key = parser.Next();

auto parsed_add_opts = ParseAddOptsOrReply(&parser, rb);
if (!parsed_add_opts) {
return;
}
auto parsed_add_opts = ParseAddOpts(&parser);

if (auto err = parser.Error(); err) {
rb->SendError(err->MakeReply());
if (auto err = parser.Error(); err || !parsed_add_opts) {
rb->SendError(!parsed_add_opts ? parsed_add_opts.error() : err->MakeReply());
return;
}

auto& add_opts = parsed_add_opts.value();

std::string_view id = parser.Next();
if (!ParseID(id, true, 0, &add_opts.parsed_id)) {
return rb->SendError(kInvalidStreamId, kSyntaxErrType);
}

CmdArgList fields = parser.Tail();
if (fields.empty() || fields.size() % 2 != 0) {
return rb->SendError(WrongNumArgsError("XADD"), kSyntaxErrType);
}

auto& add_opts = parsed_add_opts.value();

// We can auto-journal if we are not trimming approximately or by maxlen
const bool enable_auto_journaling =
!(add_opts.trim_opts && JournalAsMinId(add_opts.trim_opts.value()));
if (enable_auto_journaling) {
cmd_cntx.tx->ReviveAutoJournal();
}

auto cb = [&](Transaction* t, EngineShard* shard) {
auto op_args = t->GetOpArgs(shard);
ShardArgs shard_args = t->GetShardArgs(shard->shard_id());
return OpAdd(op_args, key, shard_args, add_opts, fields);
return OpAdd(t->GetOpArgs(shard), key, add_opts, !enable_auto_journaling, fields);
};

OpResult<streamID> add_result = cmd_cntx.tx->ScheduleSingleHopT(cb);
Expand Down Expand Up @@ -3229,23 +3248,22 @@ void StreamFamily::XTrim(CmdArgList args, const CommandContext& cmd_cntx) {

std::string_view key = parser.Next();

auto parse_trim_opts = ParseTrimOptsOrReply(&parser, rb);
if (!parse_trim_opts) {
auto parsed_trim_opts = ParseTrimOpts(&parser);
if (!parsed_trim_opts || !parser.Finalize()) {
rb->SendError(!parsed_trim_opts ? parsed_trim_opts.error() : parser.Error()->MakeReply());
return;
}

if (!parser.Finalize()) {
auto err = parser.Error();
rb->SendError(err->MakeReply());
return;
}
auto& trim_opts = parsed_trim_opts.value();

auto& trim_opts = parse_trim_opts.value();
// We can auto-journal if we are not trimming approximately or by maxlen
const bool enable_auto_journaling = !JournalAsMinId(trim_opts);
if (enable_auto_journaling) {
cmd_cntx.tx->ReviveAutoJournal();
}

auto cb = [&](Transaction* t, EngineShard* shard) {
auto op_args = t->GetOpArgs(shard);
ShardArgs shard_args = t->GetShardArgs(shard->shard_id());
return OpTrim(op_args, key, shard_args, trim_opts);
return OpTrim(t->GetOpArgs(shard), key, trim_opts, !enable_auto_journaling);
};

OpResult<int64_t> trim_result = cmd_cntx.tx->ScheduleSingleHopT(cb);
Expand Down Expand Up @@ -3394,7 +3412,8 @@ void StreamFamily::Register(CommandRegistry* registry) {
<< CI{"XREAD", kReadFlags, -3, 3, 3, acl::kXRead}.HFUNC(XRead)
<< CI{"XREADGROUP", kReadFlags, -6, 6, 6, acl::kXReadGroup}.HFUNC(XReadGroup)
<< CI{"XSETID", CO::WRITE, 3, 1, 1, acl::kXSetId}.HFUNC(XSetId)
<< CI{"XTRIM", CO::WRITE | CO::FAST, -4, 1, 1, acl::kXTrim}.HFUNC(XTrim)
<< CI{"XTRIM", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, -4, 1, 1, acl::kXTrim}.HFUNC(
XTrim)
<< CI{"_XGROUP_HELP", CO::NOSCRIPT | CO::HIDDEN, 2, 0, 0, acl::kXGroupHelp}.SetHandler(
XGroupHelp)
<< CI{"XACK", CO::WRITE | CO::FAST, -4, 1, 1, acl::kXAck}.HFUNC(XAck)
Expand Down
4 changes: 2 additions & 2 deletions src/server/stream_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -687,9 +687,9 @@ TEST_F(StreamFamilyTest, XTrimInvalidArgs) {

// Include both maxlen and minid.
resp = Run({"xtrim", "foo", "maxlen", "2", "minid", "1-1"});
EXPECT_THAT(resp, ErrArg("syntax error"));
EXPECT_THAT(resp, ErrArg("MAXLEN and MINID options at the same time are not compatible"));
resp = Run({"xtrim", "foo", "minid", "1-1", "maxlen", "2"});
EXPECT_THAT(resp, ErrArg("syntax error"));
EXPECT_THAT(resp, ErrArg("MAXLEN and MINID options at the same time are not compatible"));

// Invalid limit.
resp = Run({"xtrim", "foo", "maxlen", "~", "2", "limit", "nan"});
Expand Down

0 comments on commit d8282a2

Please sign in to comment.