Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream_family): Fix journaling in the XADD and XTRIM commands #4448

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/facade/cmd_arg_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ struct CmdArgParser {
return cur_i_ + i <= args_.size() && !error_;
}

size_t GetCurrentIndex() const {
return cur_i_;
}

private:
template <class T, class... Cases>
std::optional<std::decay_t<T>> MapImpl(std::string_view arg, std::string_view tag, T&& value,
Expand Down
12 changes: 9 additions & 3 deletions src/redis/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ typedef struct {
/* Prototypes of exported APIs. */
// struct client;

// Use this to in streamTrimByLength and streamTrimByID
#define NO_TRIM_LIMIT (-1)

/* Flags for streamCreateConsumer */
#define SCC_DEFAULT 0
#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */
Expand Down Expand Up @@ -163,9 +166,12 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
int streamDeleteItem(stream *s, streamID *id);
void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id);
long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
int64_t streamTrim(stream *s, streamAddTrimArgs *args);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
int64_t streamTrimByID(stream *s, streamID minid, int approx);
int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id);

// If you don't want to specify a limit, use NO_TRIM_LIMIT
int64_t streamTrimByLength(stream *s, long long maxlen, int approx, streamID *last_id, long long limit);
int64_t streamTrimByID(stream *s, streamID minid, int approx, streamID *last_id, long long limit);

void streamFreeCG(streamCG *cg);
void streamDelConsumer(streamCG *cg, streamConsumer *consumer);
void streamLastValidID(stream *s, streamID *maxid);
Expand Down
50 changes: 37 additions & 13 deletions src/redis/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_i
* that should be trimmed, there is a chance we will still have entries with
* IDs < 'id' (or number of elements >= maxlen in case of MAXLEN).
*/
int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
BagritsevichStepan marked this conversation as resolved.
Show resolved Hide resolved
int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id) {
size_t maxlen = args->maxlen;
streamID *id = &args->minid;
int approx = args->approx_trim;
Expand All @@ -315,6 +315,8 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
raxSeek(&ri,"^",NULL,0);

int64_t deleted = 0;
streamID last_deleted_id = {0, 0}; // Initialize last deleted ID

while (raxNext(&ri)) {
if (trim_strategy == TRIM_STRATEGY_MAXLEN && s->length <= maxlen)
break;
Expand All @@ -331,16 +333,24 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
streamID master_id = {0}; /* For MINID */
if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
remove_node = s->length - entries >= maxlen;
if (remove_node) {
streamDecodeID(ri.key, &master_id);
// Write last ID to last_deleted_id
lpGetEdgeStreamID(lp, 0, &master_id, &last_deleted_id);
}
} else {
/* Read the master ID from the radix tree key. */
streamDecodeID(ri.key, &master_id);

/* Read last ID. */
streamID last_id = {0, 0};
lpGetEdgeStreamID(lp, 0, &master_id, &last_id);

/* We can remove the entire node id its last ID < 'id' */
remove_node = streamCompareID(&last_id, id) < 0;
if (remove_node) {
last_deleted_id = last_id;
}
}

if (remove_node) {
Expand All @@ -356,6 +366,10 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
* stop here. */
if (approx) break;

if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
streamDecodeID(ri.key, &master_id);
}

/* Now we have to trim entries from within 'lp' */
int64_t deleted_from_lp = 0;

Expand Down Expand Up @@ -386,11 +400,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
int64_t seq_delta = lpGetInteger(p);
p = lpNext(lp, p); /* Skip ID seq delta */

streamID currid = {0}; /* For MINID */
if (trim_strategy == TRIM_STRATEGY_MINID) {
currid.ms = master_id.ms + ms_delta;
currid.seq = master_id.seq + seq_delta;
}
streamID currid = {master_id.ms + ms_delta, master_id.seq + seq_delta};

int stop;
if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
Expand Down Expand Up @@ -422,6 +432,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
deleted_from_lp++;
s->length--;
p = lp + delta;
last_deleted_id = currid;
}
}
deleted += deleted_from_lp;
Expand Down Expand Up @@ -458,29 +469,42 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
streamGetEdgeID(s,1,1,&s->first_id);
}

/* Set the last deleted ID, if applicable. */
if (last_id) {
*last_id = last_deleted_id;
}

return deleted;
}

/* Trims a stream by length. Returns the number of deleted items. */
int64_t streamTrimByLength(stream *s, long long maxlen, int approx) {
int64_t streamTrimByLength(stream *s, long long maxlen, int approx, streamID *last_id, long long limit) {
if (limit == NO_TRIM_LIMIT) {
limit = approx ? 100 * server.stream_node_max_entries : 0;
}

streamAddTrimArgs args = {
.trim_strategy = TRIM_STRATEGY_MAXLEN,
.approx_trim = approx,
.limit = approx ? 100 * server.stream_node_max_entries : 0,
.limit = limit,
.maxlen = maxlen
};
return streamTrim(s, &args);
return streamTrim(s, &args, last_id);
}

/* Trims a stream by minimum ID. Returns the number of deleted items. */
int64_t streamTrimByID(stream *s, streamID minid, int approx) {
int64_t streamTrimByID(stream *s, streamID minid, int approx, streamID *last_id, long long limit) {
if (limit == NO_TRIM_LIMIT) {
limit = approx ? 100 * server.stream_node_max_entries : 0;
}

streamAddTrimArgs args = {
.trim_strategy = TRIM_STRATEGY_MINID,
.approx_trim = approx,
.limit = approx ? 100 * server.stream_node_max_entries : 0,
.limit = limit,
.minid = minid
};
return streamTrim(s, &args);
return streamTrim(s, &args, last_id);
}

/* Initialize the stream iterator, so that we can call iterating functions
Expand Down
Loading
Loading