From c2e2b81f7eb9a3a439aeb1d5187e1b60afca6c23 Mon Sep 17 00:00:00 2001 From: Sven Sterbling Date: Mon, 5 Feb 2024 10:21:35 +0100 Subject: [PATCH] fix xadd inconsistency with redis implementation This commit aligns id generation with the official "specification/docu": > Both quantities are 64-bit numbers. When an ID is auto-generated, the > first part is the Unix time in milliseconds of the Redis instance > generating the ID. The second part is just a sequence number and is > used in order to distinguish IDs generated in the same millisecond. Main motivation is that the current implementation was flawed when using capped streams by creating invalid ids (e.g. generating the same id twice) as well as re-using mocked tests and run them in an e2e fashion. Tests were added to cover the additional funcionality. Links: https://redis.io/commands/xadd/ --- src/commands/xadd.js | 32 +++++++-- test/integration/commands/xadd.js | 105 ++++++++++++++++++++++++++--- test/integration/commands/xread.js | 90 +++++++++++++++++-------- 3 files changed, 184 insertions(+), 43 deletions(-) diff --git a/src/commands/xadd.js b/src/commands/xadd.js index f5ed186a4..55a32be33 100644 --- a/src/commands/xadd.js +++ b/src/commands/xadd.js @@ -25,18 +25,40 @@ export function xadd(stream, id, ...args) { keyId = args.shift() } - const eventId = `${ - keyId === '*' ? this.data.get(stream).length + 1 : keyId - }-0` const list = this.data.get(stream) + // Last timestamp is relevant for auto generating valid ids + let lastTimestamp = 0 + let lastCounter = 0 + if (list.length > 0) { + ;[lastTimestamp, lastCounter] = list[list.length - 1][0].split('-') + } + + let [unixTimestamp, counter] = keyId.split('-') + if (unixTimestamp === '*') { + unixTimestamp = + Number(lastTimestamp) > Date.now() ? Number(lastTimestamp) : Date.now() + } + if (counter === undefined || counter === '*') { + counter = + Number(lastTimestamp) === Number(unixTimestamp) + ? Number(lastCounter) + 1 + : 0 + } - if (list.length > 0 && list[0][0] === `${eventId}`) { + const sequentialIdProvided = + Number(unixTimestamp) > Number(lastTimestamp) || + (Number(unixTimestamp) === Number(lastTimestamp) && + Number(counter) >= Number(lastCounter)) + if (!sequentialIdProvided) { throw new Error( 'ERR The ID specified in XADD is equal or smaller than the target stream top item' ) } - this.data.set(`stream:${stream}:${eventId}`, { polled: false }) + const eventId = `${unixTimestamp}-${counter}` + this.data.set(`stream:${stream}:${eventId}`, { + polled: false, + }) let newData = list.concat([[`${eventId}`, [...args]]]) if (threshold && newData.length > threshold) { diff --git a/test/integration/commands/xadd.js b/test/integration/commands/xadd.js index 9e93fd3ad..779d0a7b9 100644 --- a/test/integration/commands/xadd.js +++ b/test/integration/commands/xadd.js @@ -2,27 +2,87 @@ import Redis from 'ioredis' describe('xadd', () => { const redis = new Redis() + if (!process.env.IS_E2E) jest.useFakeTimers() + const fixedTimestamp = Date.now() + afterAll(() => { redis.disconnect() + if (!process.env.IS_E2E) jest.useRealTimers() + }) + + it('should add events with a given and sequential id to a stream', () => { + return redis + .xadd('stream', '0-1', 'key', 'val') + .then(id => { + expect(id).toBe('0-1') + }) + .then(() => redis.xadd('stream', '0-2', 'key', 'val')) + .then(id => { + expect(id).toBe('0-2') + }) + .then(() => redis.xadd('stream', '1-0', 'key', 'val')) + .then(id => { + expect(id).toBe('1-0') + }) + .then(() => redis.xadd('stream', '1337-1337', 'key', 'val')) + .then(id => { + expect(id).toBe('1337-1337') + }) + }) + + it('should only increment counter when last timestamp is greater than current date', () => { + return redis + .xadd('stream', `${fixedTimestamp + 100000}-0`, 'key', 'val') + .then(id => { + expect(id).toBe(`${fixedTimestamp + 100000}-0`) + }) + .then(() => redis.xadd('stream', '*', 'key', 'val')) + .then(id => { + expect(id).toBe(`${fixedTimestamp + 100000}-1`) + }) + }) + + it('should correctly generate ids on capped streams', () => { + return redis + .xadd('stream', 'MAXLEN', '=', '2', '*', 'key', 'val') + .then(id => { + expect(id).toBe(`${fixedTimestamp}-0`) + }) + .then(() => redis + .xadd('stream', 'MAXLEN', '=', '1', '*', 'key', 'val')) + .then(id => { + expect(id).toBe(`${fixedTimestamp}-1`) + }).then(() => redis + .xadd('stream', 'MAXLEN', '=', '2', '*', 'key', 'val')) + .then(id => { + expect(id).toBe(`${fixedTimestamp}-2`) + }).then(() => redis + .xadd('stream', 'MAXLEN', '=', '1', '*', 'key', 'val')) + .then(id => { + expect(id).toBe(`${fixedTimestamp}-3`) + }) }) // @TODO Rewrite test so it runs on a real Redis instance - ;(process.env.IS_E2E ? it.skip : it)('should add events to a stream', () => { + ; + (process.env.IS_E2E ? it.skip : it)('should add events to a stream', () => { return redis .xadd('stream', '*', 'key', 'val') .then(id => { - expect(id).toBe('1-0') - expect(redis.data.get('stream')).toEqual([['1-0', ['key', 'val']]]) + expect(id).toBe(`${fixedTimestamp}-0`) + expect(redis.data.get('stream')).toEqual([ + [`${fixedTimestamp}-0`, ['key', 'val']], + ]) expect(redis.data.get(`stream:stream:${id}`)).toEqual({ polled: false, }) }) .then(() => redis.xadd('stream', '*', 'key', 'val')) .then(id => { - expect(id).toBe('2-0') + expect(id).toBe(`${fixedTimestamp}-1`) expect(redis.data.get('stream')).toEqual([ - ['1-0', ['key', 'val']], - ['2-0', ['key', 'val']], + [`${fixedTimestamp}-0`, ['key', 'val']], + [`${fixedTimestamp}-1`, ['key', 'val']], ]) expect(redis.data.get(`stream:stream:${id}`)).toEqual({ polled: false, @@ -40,10 +100,24 @@ describe('xadd', () => { ) ) .then(id => { - expect(id).toBe('3-0') + expect(id).toBe(`${fixedTimestamp}-2`) + expect(redis.data.get('stream')).toEqual([ + [`${fixedTimestamp}-1`, ['key', 'val']], + [`${fixedTimestamp}-2`, ['reading', '{"key": "value"}']], + ]) + expect(redis.data.get(`stream:stream:${id}`)).toEqual({ + polled: false, + }) + // Advancing the time will reset the counter to 0 + jest.advanceTimersByTime(1) + }) + .then(() => redis.xadd('stream', '*', 'key', 'val')) + .then(id => { + expect(id).toBe(`${fixedTimestamp + 1}-0`) expect(redis.data.get('stream')).toEqual([ - ['2-0', ['key', 'val']], - ['3-0', ['reading', '{"key": "value"}']], + [`${fixedTimestamp}-1`, ['key', 'val']], + [`${fixedTimestamp}-2`, ['reading', '{"key": "value"}']], + [`${fixedTimestamp + 1}-0`, ['key', 'val']], ]) expect(redis.data.get(`stream:stream:${id}`)).toEqual({ polled: false, @@ -74,4 +148,15 @@ describe('xadd', () => { ) ) }) -}) + + it('should throw when adding a smaller id', () => { + redis + .xadd('stream', '0-2', 'key', 'value') + .then(() => redis.xadd('stream', '0-1', 'key', 'value')) + .catch(err => + expect(err.message).toBe( + 'ERR The ID specified in XADD is equal or smaller than the target stream top item' + ) + ) + }) +}) \ No newline at end of file diff --git a/test/integration/commands/xread.js b/test/integration/commands/xread.js index a6259dd80..2a0042e1a 100644 --- a/test/integration/commands/xread.js +++ b/test/integration/commands/xread.js @@ -2,8 +2,14 @@ import Redis from 'ioredis' describe('xread', () => { const redis = new Redis() + if (!process.env.IS_E2E) jest.useFakeTimers({ + doNotFake: ['setTimeout', 'performance'], + }) + const fixedTimestamp = Date.now() + afterAll(() => { redis.disconnect() + if (!process.env.IS_E2E) jest.useRealTimers() }) // @TODO Rewrite test so it runs on a real Redis instance @@ -13,26 +19,34 @@ describe('xread', () => { const redis = new Redis({ data: { stream: [ - ['1-0', ['key', 'val']], - ['2-0', ['key', 'val']], - ['3-0', ['key', 'val']], - ['4-0', ['key', 'val']], + [`${fixedTimestamp}-0`, ['key', 'val']], + [`${fixedTimestamp}-1`, ['key', 'val']], + [`${fixedTimestamp}-2`, ['key', 'val']], + [`${fixedTimestamp}-3`, ['key', 'val']], ], - 'stream:stream:1-0': { polled: false }, - 'stream:stream:2-0': { polled: false }, - 'stream:stream:3-0': { polled: false }, - 'stream:stream:4-0': { polled: false }, + [`stream:stream:${fixedTimestamp}-0`]: { + polled: false, + }, + [`stream:stream:${fixedTimestamp}-1`]: { + polled: false, + }, + [`stream:stream:${fixedTimestamp}-2`]: { + polled: false, + }, + [`stream:stream:${fixedTimestamp}-3`]: { + polled: false, + }, }, }) return redis - .xread('COUNT', '2', 'STREAMS', 'stream', '2-0') + .xread('COUNT', '2', 'STREAMS', 'stream', `${fixedTimestamp}-1`) .then(events => expect(events).toEqual([ [ 'stream', [ - ['2-0', ['key', 'val']], - ['3-0', ['key', 'val']], + [`${fixedTimestamp}-1`, ['key', 'val']], + [`${fixedTimestamp}-2`, ['key', 'val']], ], ], ]) @@ -52,10 +66,18 @@ describe('xread', () => { ['3-0', ['key', 'val']], ], 'other-stream': [['1-0', ['key', 'val']]], - 'stream:stream:1-0': { polled: false }, - 'stream:stream:2-0': { polled: false }, - 'stream:stream:3-0': { polled: false }, - 'stream:other-stream:1-0': { polled: false }, + 'stream:stream:1-0': { + polled: false, + }, + 'stream:stream:2-0': { + polled: false, + }, + 'stream:stream:3-0': { + polled: false, + }, + 'stream:other-stream:1-0': { + polled: false, + }, }, }) return redis @@ -87,10 +109,18 @@ describe('xread', () => { ['3-0', ['key', 'val']], ], 'other-stream': [['1-0', ['key', 'val']]], - 'stream:stream:1-0': { polled: false }, - 'stream:stream:2-0': { polled: false }, - 'stream:stream:3-0': { polled: false }, - 'stream:other-stream:1-0': { polled: false }, + 'stream:stream:1-0': { + polled: false, + }, + 'stream:stream:2-0': { + polled: false, + }, + 'stream:stream:3-0': { + polled: false, + }, + 'stream:other-stream:1-0': { + polled: false, + }, }, }) return redis @@ -119,7 +149,7 @@ describe('xread', () => { .then(row => { const [[stream, [[id, values]]]] = row expect(stream).toBe('stream') - expect(id).toBe('1-0') + expect(id).toBe(`${fixedTimestamp}-0`) expect(values).toEqual(['key', 'val']) }) return redis.xadd('stream', '*', 'key', 'val').then(() => op) @@ -131,11 +161,11 @@ describe('xread', () => { 'should block reads till data becomes available since an id', () => { const op = redis - .xread('BLOCK', '0', 'STREAMS', 'stream', '2-0') + .xread('BLOCK', '0', 'STREAMS', 'stream', `${fixedTimestamp}-1`) .then(row => { const [[stream, [[id, values]]]] = row expect(stream).toBe('stream') - expect(id).toBe('2-0') + expect(id).toBe(`${fixedTimestamp}-1`) expect(values).toEqual(['key', 'val']) }) return redis @@ -146,7 +176,6 @@ describe('xread', () => { ) it('should block reads on a stream with a time out', () => { - const redis = new Redis() const before = performance.now() return redis .xread('BLOCK', '500', 'STREAMS', 'empty-stream', '$') @@ -158,7 +187,6 @@ describe('xread', () => { }) it('should block reads on multiple streams with a time out', () => { - const redis = new Redis() const before = performance.now() return redis .xread( @@ -202,7 +230,7 @@ describe('xread', () => { const after = performance.now() expect(after - before >= 100).toBe(true) expect(row).toEqual([ - ['empty-stream-2', [['1-0', ['key', 'val']]]], + ['empty-stream-2', [[`${fixedTimestamp}-0`, ['key', 'val']]]], ['empty-stream', []], ]) }) @@ -220,9 +248,15 @@ describe('xread', () => { ['2-0', ['key', 'val']], ['3-0', ['key', 'val']], ], - 'stream:stream:1-0': { polled: false }, - 'stream:stream:2-0': { polled: false }, - 'stream:stream:3-0': { polled: false }, + 'stream:stream:1-0': { + polled: false, + }, + 'stream:stream:2-0': { + polled: false, + }, + 'stream:stream:3-0': { + polled: false, + }, }, }) return redis.xread('STREAMS', 'stream', '2-0').then(events =>