diff --git a/src/redis/t_stream.c b/src/redis/t_stream.c index f922476e665c..e38b1d646b9a 100644 --- a/src/redis/t_stream.c +++ b/src/redis/t_stream.c @@ -479,7 +479,8 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id) { /* Trims a stream by length. Returns the number of deleted items. */ int64_t streamTrimByLength(stream *s, long long maxlen, int approx, streamID *last_id) { - return streamTrimByLengthLimited(s, maxlen, approx, approx ? 100 * server.stream_node_max_entries : 0, last_id); + long long limit = approx ? 100 * server.stream_node_max_entries : 0; + return streamTrimByLengthLimited(s, maxlen, approx, limit, last_id); } /* Trims a stream by length. Returns the number of deleted items. */ @@ -495,7 +496,8 @@ int64_t streamTrimByLengthLimited(stream *s, long long maxlen, int approx, long /* Trims a stream by minimum ID. Returns the number of deleted items. */ int64_t streamTrimByID(stream *s, streamID minid, int approx, streamID *last_id) { - return streamTrimByIDLimited(s, minid, approx, approx ? 100 * server.stream_node_max_entries : 0, last_id); + long long limit = approx ? 100 * server.stream_node_max_entries : 0; + return streamTrimByIDLimited(s, minid, approx, limit, last_id); } /* Trims a stream by minimum ID. Returns the number of deleted items. */ diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 7295cb3a621e..19281b602221 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -48,6 +48,14 @@ struct Record { using RecordVec = vector; +using nonstd::make_unexpected; + +template using ParseResult = io::Result; + +nonstd::unexpected_type CreateSyntaxError(std::string_view message) { + return make_unexpected(ErrorReply{message, kSyntaxErrType}); +} + struct ParsedStreamId { streamID val; @@ -90,6 +98,7 @@ struct TrimOpts { int32_t limit = kNoLimit; bool approx = false; }; + struct AddOpts { std::optional trim_opts; ParsedStreamId parsed_id; @@ -177,6 +186,8 @@ struct ReadOpts { bool noack = false; }; +const char kTrimOptionConflictErr[] = + "MAXLEN and MINID options at the same time are not compatible"; const char kInvalidStreamId[] = "Invalid stream ID specified as stream command argument"; const char kXGroupKeyNotFound[] = "The XGROUP subcommand requires the key to exist. " @@ -644,8 +655,8 @@ bool JournalAsMinId(const TrimOpts& opts) { return opts.approx || opts.IsMaxLen(); } -OpResult OpAdd(const OpArgs& op_args, string_view key, const ShardArgs& shard_args, - const AddOpts& opts, CmdArgList args) { +OpResult OpAdd(const OpArgs& op_args, string_view key, const AddOpts& opts, + bool journal_as_minid, CmdArgList args) { DCHECK(!args.empty() && args.size() % 2 == 0); auto& db_slice = op_args.GetDbSlice(); @@ -696,18 +707,21 @@ OpResult OpAdd(const OpArgs& op_args, string_view key, const ShardArgs mem_tracker.UpdateStreamSize(it->second); - if (op_args.shard->journal()) { - if (opts.trim_opts && JournalAsMinId(opts.trim_opts.value())) { - // We need to set exact MinId in the journal. - std::string last_id = StreamsIdToString(trim_result.second); - std::vector journal_args = {key, "MINID"sv, "="sv, last_id}; - if (opts.no_mkstream) { - journal_args.push_back("NOMKSTREAM"sv); - } - RecordJournal(op_args, "XADD"sv, journal_args); - } else { - RecordJournal(op_args, "XADD"sv, shard_args); + if (op_args.shard->journal() && journal_as_minid) { + // We need to set exact MinId in the journal. + std::string last_id = StreamsIdToString(trim_result.second); + std::vector journal_args = {key, "MINID"sv, "="sv, last_id}; + journal_args.reserve(args.size()); + + if (opts.no_mkstream) { + journal_args.push_back("NOMKSTREAM"sv); + } + + for (size_t i = 0; i < args.size(); i++) { + journal_args.push_back(args[i]); } + + RecordJournal(op_args, "XADD"sv, journal_args); } auto blocking_controller = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id()); @@ -1970,8 +1984,8 @@ void XGroupHelp(CmdArgList args, const CommandContext& cmd_cntx) { return rb->SendSimpleStrArr(help_arr); } -OpResult OpTrim(const OpArgs& op_args, std::string_view key, const ShardArgs& shard_args, - const TrimOpts& opts) { +OpResult OpTrim(const OpArgs& op_args, std::string_view key, const TrimOpts& opts, + bool journal_as_minid) { auto& db_slice = op_args.GetDbSlice(); auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) { @@ -1990,21 +2004,15 @@ OpResult OpTrim(const OpArgs& op_args, std::string_view key, const Shar mem_tracker.UpdateStreamSize(cobj); - if (op_args.shard->journal()) { - if (JournalAsMinId(opts)) { - // We need to set exact MinId in the journal. - std::string last_id = StreamsIdToString(res.second); - RecordJournal(op_args, "XTRIM"sv, ArgSlice{key, "MINID"sv, "="sv, last_id}); - } else { - RecordJournal(op_args, "XTRIM"sv, shard_args); - } + if (op_args.shard->journal() && journal_as_minid) { + std::string last_id = StreamsIdToString(res.second); + RecordJournal(op_args, "XTRIM"sv, ArgSlice{key, "MINID"sv, "="sv, last_id}); } return res.first; } -optional ParseTrimOptsOrReply(bool max_len, CmdArgParser* parser, - SinkReplyBuilder* builder) { +ParseResult ParseTrimOpts(bool max_len, CmdArgParser* parser) { TrimOpts opts; opts.approx = parser->Check("~"); if (!opts.approx) { @@ -2016,8 +2024,7 @@ optional ParseTrimOptsOrReply(bool max_len, CmdArgParser* parser, } else { ParsedStreamId parsed_id; if (!ParseID(parser->Next(), false, 0, &parsed_id)) { - builder->SendError(kSyntaxErr); - return std::nullopt; + return CreateSyntaxError(kSyntaxErr); } opts.length_or_id = parsed_id; // trivial copy @@ -2025,8 +2032,7 @@ optional ParseTrimOptsOrReply(bool max_len, CmdArgParser* parser, if (parser->Check("LIMIT")) { if (!opts.approx) { - builder->SendError(kSyntaxErr); - return std::nullopt; + return CreateSyntaxError(kSyntaxErr); } opts.limit = parser->Next(); @@ -2035,15 +2041,22 @@ optional ParseTrimOptsOrReply(bool max_len, CmdArgParser* parser, return opts; } -optional ParseTrimOptsOrReply(CmdArgParser* parser, SinkReplyBuilder* builder) { +ParseResult ParseTrimOpts(CmdArgParser* parser) { bool max_len = parser->Check("MAXLEN"); if (!max_len) { parser->ExpectTag("MINID"); } - return ParseTrimOptsOrReply(max_len, parser, builder); + + auto res = ParseTrimOpts(max_len, parser); + + if (parser->Check("MAXLEN") || parser->Check("MINID")) { + return CreateSyntaxError(kTrimOptionConflictErr); + } + + return res; } -optional ParseAddOptsOrReply(CmdArgParser* parser, SinkReplyBuilder* builder) { +ParseResult ParseAddOpts(CmdArgParser* parser) { AddOpts opts; while (parser->HasNext()) { if (parser->Check("NOMKSTREAM")) { @@ -2053,13 +2066,22 @@ optional ParseAddOptsOrReply(CmdArgParser* parser, SinkReplyBuilder* bu bool max_len = parser->Check("MAXLEN"); if (max_len || parser->Check("MINID")) { - auto trim_opts = ParseTrimOptsOrReply(max_len, parser, builder); + if (opts.trim_opts) { + return CreateSyntaxError(kTrimOptionConflictErr); + } + + auto trim_opts = ParseTrimOpts(max_len, parser); if (!trim_opts) { - return std::nullopt; + return make_unexpected(trim_opts.error()); } opts.trim_opts = trim_opts.value(); // trivial copy } else { + // It is StreamId + std::string_view id = parser->Next(); + if (!ParseID(id, true, 0, &opts.parsed_id)) { + return CreateSyntaxError(kInvalidStreamId); + } break; } } @@ -2597,32 +2619,29 @@ void StreamFamily::XAdd(CmdArgList args, const CommandContext& cmd_cntx) { string_view key = parser.Next(); - auto parsed_add_opts = ParseAddOptsOrReply(&parser, rb); - if (!parsed_add_opts) { - return; - } + auto parsed_add_opts = ParseAddOpts(&parser); - if (auto err = parser.Error(); err) { - rb->SendError(err->MakeReply()); + if (auto err = parser.Error(); err || !parsed_add_opts) { + rb->SendError(!parsed_add_opts ? parsed_add_opts.error() : err->MakeReply()); return; } - auto& add_opts = parsed_add_opts.value(); - - std::string_view id = parser.Next(); - if (!ParseID(id, true, 0, &add_opts.parsed_id)) { - return rb->SendError(kInvalidStreamId, kSyntaxErrType); - } - CmdArgList fields = parser.Tail(); if (fields.empty() || fields.size() % 2 != 0) { return rb->SendError(WrongNumArgsError("XADD"), kSyntaxErrType); } + auto& add_opts = parsed_add_opts.value(); + + // We can auto-journal if we are not trimming approximately or by maxlen + const bool enable_auto_journaling = + !(add_opts.trim_opts && JournalAsMinId(add_opts.trim_opts.value())); + if (enable_auto_journaling) { + cmd_cntx.tx->ReviveAutoJournal(); + } + auto cb = [&](Transaction* t, EngineShard* shard) { - auto op_args = t->GetOpArgs(shard); - ShardArgs shard_args = t->GetShardArgs(shard->shard_id()); - return OpAdd(op_args, key, shard_args, add_opts, fields); + return OpAdd(t->GetOpArgs(shard), key, add_opts, !enable_auto_journaling, fields); }; OpResult add_result = cmd_cntx.tx->ScheduleSingleHopT(cb); @@ -3229,23 +3248,22 @@ void StreamFamily::XTrim(CmdArgList args, const CommandContext& cmd_cntx) { std::string_view key = parser.Next(); - auto parse_trim_opts = ParseTrimOptsOrReply(&parser, rb); - if (!parse_trim_opts) { + auto parsed_trim_opts = ParseTrimOpts(&parser); + if (!parsed_trim_opts || !parser.Finalize()) { + rb->SendError(!parsed_trim_opts ? parsed_trim_opts.error() : parser.Error()->MakeReply()); return; } - if (!parser.Finalize()) { - auto err = parser.Error(); - rb->SendError(err->MakeReply()); - return; - } + auto& trim_opts = parsed_trim_opts.value(); - auto& trim_opts = parse_trim_opts.value(); + // We can auto-journal if we are not trimming approximately or by maxlen + const bool enable_auto_journaling = !JournalAsMinId(trim_opts); + if (enable_auto_journaling) { + cmd_cntx.tx->ReviveAutoJournal(); + } auto cb = [&](Transaction* t, EngineShard* shard) { - auto op_args = t->GetOpArgs(shard); - ShardArgs shard_args = t->GetShardArgs(shard->shard_id()); - return OpTrim(op_args, key, shard_args, trim_opts); + return OpTrim(t->GetOpArgs(shard), key, trim_opts, !enable_auto_journaling); }; OpResult trim_result = cmd_cntx.tx->ScheduleSingleHopT(cb); @@ -3394,7 +3412,8 @@ void StreamFamily::Register(CommandRegistry* registry) { << CI{"XREAD", kReadFlags, -3, 3, 3, acl::kXRead}.HFUNC(XRead) << CI{"XREADGROUP", kReadFlags, -6, 6, 6, acl::kXReadGroup}.HFUNC(XReadGroup) << CI{"XSETID", CO::WRITE, 3, 1, 1, acl::kXSetId}.HFUNC(XSetId) - << CI{"XTRIM", CO::WRITE | CO::FAST, -4, 1, 1, acl::kXTrim}.HFUNC(XTrim) + << CI{"XTRIM", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, -4, 1, 1, acl::kXTrim}.HFUNC( + XTrim) << CI{"_XGROUP_HELP", CO::NOSCRIPT | CO::HIDDEN, 2, 0, 0, acl::kXGroupHelp}.SetHandler( XGroupHelp) << CI{"XACK", CO::WRITE | CO::FAST, -4, 1, 1, acl::kXAck}.HFUNC(XAck) diff --git a/src/server/stream_family_test.cc b/src/server/stream_family_test.cc index 5cde6b0153eb..56e05f22e996 100644 --- a/src/server/stream_family_test.cc +++ b/src/server/stream_family_test.cc @@ -687,9 +687,9 @@ TEST_F(StreamFamilyTest, XTrimInvalidArgs) { // Include both maxlen and minid. resp = Run({"xtrim", "foo", "maxlen", "2", "minid", "1-1"}); - EXPECT_THAT(resp, ErrArg("syntax error")); + EXPECT_THAT(resp, ErrArg("MAXLEN and MINID options at the same time are not compatible")); resp = Run({"xtrim", "foo", "minid", "1-1", "maxlen", "2"}); - EXPECT_THAT(resp, ErrArg("syntax error")); + EXPECT_THAT(resp, ErrArg("MAXLEN and MINID options at the same time are not compatible")); // Invalid limit. resp = Run({"xtrim", "foo", "maxlen", "~", "2", "limit", "nan"});