Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream_family): Fix journaling in the XADD and XTRIM commands #4448

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

BagritsevichStepan
Copy link
Contributor

@BagritsevichStepan BagritsevichStepan commented Jan 13, 2025

fixes #4202

  1. Fix journaling
  2. Clean up and refactor the parsing logic for the XADD and XTRIM commands

@BagritsevichStepan BagritsevichStepan self-assigned this Jan 13, 2025
@@ -300,7 +300,7 @@ void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_i
* that should be trimmed, there is a chance we will still have entries with
* IDs < 'id' (or number of elements >= maxlen in case of MAXLEN).
*/
int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain where is the randomness ?
If we run the same xadd command on master and replica (without your changes) how can it trim different number of elements?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no randomness here. Internally, a stream is a rax tree of listpacks. When the user trims with the ~ option, it means that instead of trimming elements within the listpacks before the specified entry, only the rax tree nodes smaller than the specified entry are removed.

However, I still believe it’s worth adding this type of journaling, as the Redis API doesn’t guarantee anything in this regard.

Btw, most of the changes are related to fixing the options parsing for the XADD and XTRIM commands, so there isn’t a significant change in journaling

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is not randomness not sure what is exactly the bug that we have as replica should execute the exact same algorithm as master does. Can you explain how we can have a bug in replicaiton? @romange

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#4202 marked as "enhancement" so it's not a bug. While in theory there should not be any discrepancies, in practice I do not know if this can not happen due to a slightly different state of the underlying data structures or slight differences in clock. As I wrote in the issue, I prefer we do not replicate declarative commands but use commands that preserve the exact state if possible.

@@ -668,13 +668,13 @@ TEST_F(StreamFamilyTest, XTrimInvalidArgs) {

// Include both maxlen and minid.
resp = Run({"xtrim", "foo", "maxlen", "2", "minid", "1-1"});
EXPECT_THAT(resp, ErrArg("MAXLEN and MINID options at the same time are not compatible"));
EXPECT_THAT(resp, ErrArg("syntax error"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error that was sent before is compatible with the redis error in this case. I suggest not to change this

args.remove_prefix(id_indx);
if (args.size() < 2 || args.size() % 2 == 0) {
return cmd_cntx.rb->SendError(WrongNumArgsError("XADD"), kSyntaxErrType);
if (auto err = parser.Error(); err) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit confusing why in some cases ParseAddOptsOrReply sends the error reply and some cases it is sent here.
I suggest make ParseAddOptsOrReply just parser (will not get the reply builder) it will return the parsed args and in case of an error return the error and the error will be sent only here


std::string_view id = parser.Next();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add this also to ParseAddOpts?


std::string_view key = parser.Next();

auto parse_trim_opts = ParseTrimOptsOrReply(&parser, rb);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, pareser can parse the command and return error,
Here we will send the error using reply builder

std::string last_id = StreamsIdToString(res.second);
RecordJournal(op_args, "XTRIM"sv, ArgSlice{key, "MINID"sv, "="sv, last_id});
} else {
RecordJournal(op_args, "XTRIM"sv, shard_args);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cases where the journal does not need rewrite you can use
ReviveAutoJournal before you call OpTrim
This way you will not need to pass shard_args

await asyncio.sleep(1)

# Check replica data consistent
master_data = await StaticSeeder.capture(c_master)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that that static seeder does not work on stream type, see class StaticSeeder impl
also we do not have the hash funciton for stream type in script-hashlib.lua
you will need to add this functionality
Please make a separate PR for this change and validate the static seeder capture finds fails if there is a diff by temporary inserting a bug to replication code of streams

return deleted;
}

/* Trims a stream by length. Returns the number of deleted items. */
int64_t streamTrimByLength(stream *s, long long maxlen, int approx) {
int64_t streamTrimByLength(stream *s, long long maxlen, int approx, streamID *last_id) {
return streamTrimByLengthLimited(s, maxlen, approx, approx ? 100 * server.stream_node_max_entries : 0, last_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plase add the var to describe what is "approx ? 100 * server.stream_node_max_entries : 0"


/* Trims a stream by minimum ID. Returns the number of deleted items. */
int64_t streamTrimByID(stream *s, streamID minid, int approx, streamID *last_id) {
return streamTrimByIDLimited(s, minid, approx, approx ? 100 * server.stream_node_max_entries : 0, last_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you use the sane approach. I think you can move skipped parameters to the end and make a default value to avoid 2 functions declaration

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

streams XADD/XTRIM require journal rewrite during the replication
4 participants