diff --git a/src/commands/xadd.js b/src/commands/xadd.js index f5ed186a..55a32be3 100644 --- a/src/commands/xadd.js +++ b/src/commands/xadd.js @@ -25,18 +25,40 @@ export function xadd(stream, id, ...args) { keyId = args.shift() } - const eventId = `${ - keyId === '*' ? this.data.get(stream).length + 1 : keyId - }-0` const list = this.data.get(stream) + // Last timestamp is relevant for auto generating valid ids + let lastTimestamp = 0 + let lastCounter = 0 + if (list.length > 0) { + ;[lastTimestamp, lastCounter] = list[list.length - 1][0].split('-') + } + + let [unixTimestamp, counter] = keyId.split('-') + if (unixTimestamp === '*') { + unixTimestamp = + Number(lastTimestamp) > Date.now() ? Number(lastTimestamp) : Date.now() + } + if (counter === undefined || counter === '*') { + counter = + Number(lastTimestamp) === Number(unixTimestamp) + ? Number(lastCounter) + 1 + : 0 + } - if (list.length > 0 && list[0][0] === `${eventId}`) { + const sequentialIdProvided = + Number(unixTimestamp) > Number(lastTimestamp) || + (Number(unixTimestamp) === Number(lastTimestamp) && + Number(counter) >= Number(lastCounter)) + if (!sequentialIdProvided) { throw new Error( 'ERR The ID specified in XADD is equal or smaller than the target stream top item' ) } - this.data.set(`stream:${stream}:${eventId}`, { polled: false }) + const eventId = `${unixTimestamp}-${counter}` + this.data.set(`stream:${stream}:${eventId}`, { + polled: false, + }) let newData = list.concat([[`${eventId}`, [...args]]]) if (threshold && newData.length > threshold) { diff --git a/test/integration/commands/xadd.js b/test/integration/commands/xadd.js index 9e93fd3a..779d0a7b 100644 --- a/test/integration/commands/xadd.js +++ b/test/integration/commands/xadd.js @@ -2,27 +2,87 @@ import Redis from 'ioredis' describe('xadd', () => { const redis = new Redis() + if (!process.env.IS_E2E) jest.useFakeTimers() + const fixedTimestamp = Date.now() + afterAll(() => { redis.disconnect() + if (!process.env.IS_E2E) jest.useRealTimers() + }) + + it('should add events with a given and sequential id to a stream', () => { + return redis + .xadd('stream', '0-1', 'key', 'val') + .then(id => { + expect(id).toBe('0-1') + }) + .then(() => redis.xadd('stream', '0-2', 'key', 'val')) + .then(id => { + expect(id).toBe('0-2') + }) + .then(() => redis.xadd('stream', '1-0', 'key', 'val')) + .then(id => { + expect(id).toBe('1-0') + }) + .then(() => redis.xadd('stream', '1337-1337', 'key', 'val')) + .then(id => { + expect(id).toBe('1337-1337') + }) + }) + + it('should only increment counter when last timestamp is greater than current date', () => { + return redis + .xadd('stream', `${fixedTimestamp + 100000}-0`, 'key', 'val') + .then(id => { + expect(id).toBe(`${fixedTimestamp + 100000}-0`) + }) + .then(() => redis.xadd('stream', '*', 'key', 'val')) + .then(id => { + expect(id).toBe(`${fixedTimestamp + 100000}-1`) + }) + }) + + it('should correctly generate ids on capped streams', () => { + return redis + .xadd('stream', 'MAXLEN', '=', '2', '*', 'key', 'val') + .then(id => { + expect(id).toBe(`${fixedTimestamp}-0`) + }) + .then(() => redis + .xadd('stream', 'MAXLEN', '=', '1', '*', 'key', 'val')) + .then(id => { + expect(id).toBe(`${fixedTimestamp}-1`) + }).then(() => redis + .xadd('stream', 'MAXLEN', '=', '2', '*', 'key', 'val')) + .then(id => { + expect(id).toBe(`${fixedTimestamp}-2`) + }).then(() => redis + .xadd('stream', 'MAXLEN', '=', '1', '*', 'key', 'val')) + .then(id => { + expect(id).toBe(`${fixedTimestamp}-3`) + }) }) // @TODO Rewrite test so it runs on a real Redis instance - ;(process.env.IS_E2E ? it.skip : it)('should add events to a stream', () => { + ; + (process.env.IS_E2E ? it.skip : it)('should add events to a stream', () => { return redis .xadd('stream', '*', 'key', 'val') .then(id => { - expect(id).toBe('1-0') - expect(redis.data.get('stream')).toEqual([['1-0', ['key', 'val']]]) + expect(id).toBe(`${fixedTimestamp}-0`) + expect(redis.data.get('stream')).toEqual([ + [`${fixedTimestamp}-0`, ['key', 'val']], + ]) expect(redis.data.get(`stream:stream:${id}`)).toEqual({ polled: false, }) }) .then(() => redis.xadd('stream', '*', 'key', 'val')) .then(id => { - expect(id).toBe('2-0') + expect(id).toBe(`${fixedTimestamp}-1`) expect(redis.data.get('stream')).toEqual([ - ['1-0', ['key', 'val']], - ['2-0', ['key', 'val']], + [`${fixedTimestamp}-0`, ['key', 'val']], + [`${fixedTimestamp}-1`, ['key', 'val']], ]) expect(redis.data.get(`stream:stream:${id}`)).toEqual({ polled: false, @@ -40,10 +100,24 @@ describe('xadd', () => { ) ) .then(id => { - expect(id).toBe('3-0') + expect(id).toBe(`${fixedTimestamp}-2`) + expect(redis.data.get('stream')).toEqual([ + [`${fixedTimestamp}-1`, ['key', 'val']], + [`${fixedTimestamp}-2`, ['reading', '{"key": "value"}']], + ]) + expect(redis.data.get(`stream:stream:${id}`)).toEqual({ + polled: false, + }) + // Advancing the time will reset the counter to 0 + jest.advanceTimersByTime(1) + }) + .then(() => redis.xadd('stream', '*', 'key', 'val')) + .then(id => { + expect(id).toBe(`${fixedTimestamp + 1}-0`) expect(redis.data.get('stream')).toEqual([ - ['2-0', ['key', 'val']], - ['3-0', ['reading', '{"key": "value"}']], + [`${fixedTimestamp}-1`, ['key', 'val']], + [`${fixedTimestamp}-2`, ['reading', '{"key": "value"}']], + [`${fixedTimestamp + 1}-0`, ['key', 'val']], ]) expect(redis.data.get(`stream:stream:${id}`)).toEqual({ polled: false, @@ -74,4 +148,15 @@ describe('xadd', () => { ) ) }) -}) + + it('should throw when adding a smaller id', () => { + redis + .xadd('stream', '0-2', 'key', 'value') + .then(() => redis.xadd('stream', '0-1', 'key', 'value')) + .catch(err => + expect(err.message).toBe( + 'ERR The ID specified in XADD is equal or smaller than the target stream top item' + ) + ) + }) +}) \ No newline at end of file diff --git a/test/integration/commands/xread.js b/test/integration/commands/xread.js index a6259dd8..2a0042e1 100644 --- a/test/integration/commands/xread.js +++ b/test/integration/commands/xread.js @@ -2,8 +2,14 @@ import Redis from 'ioredis' describe('xread', () => { const redis = new Redis() + if (!process.env.IS_E2E) jest.useFakeTimers({ + doNotFake: ['setTimeout', 'performance'], + }) + const fixedTimestamp = Date.now() + afterAll(() => { redis.disconnect() + if (!process.env.IS_E2E) jest.useRealTimers() }) // @TODO Rewrite test so it runs on a real Redis instance @@ -13,26 +19,34 @@ describe('xread', () => { const redis = new Redis({ data: { stream: [ - ['1-0', ['key', 'val']], - ['2-0', ['key', 'val']], - ['3-0', ['key', 'val']], - ['4-0', ['key', 'val']], + [`${fixedTimestamp}-0`, ['key', 'val']], + [`${fixedTimestamp}-1`, ['key', 'val']], + [`${fixedTimestamp}-2`, ['key', 'val']], + [`${fixedTimestamp}-3`, ['key', 'val']], ], - 'stream:stream:1-0': { polled: false }, - 'stream:stream:2-0': { polled: false }, - 'stream:stream:3-0': { polled: false }, - 'stream:stream:4-0': { polled: false }, + [`stream:stream:${fixedTimestamp}-0`]: { + polled: false, + }, + [`stream:stream:${fixedTimestamp}-1`]: { + polled: false, + }, + [`stream:stream:${fixedTimestamp}-2`]: { + polled: false, + }, + [`stream:stream:${fixedTimestamp}-3`]: { + polled: false, + }, }, }) return redis - .xread('COUNT', '2', 'STREAMS', 'stream', '2-0') + .xread('COUNT', '2', 'STREAMS', 'stream', `${fixedTimestamp}-1`) .then(events => expect(events).toEqual([ [ 'stream', [ - ['2-0', ['key', 'val']], - ['3-0', ['key', 'val']], + [`${fixedTimestamp}-1`, ['key', 'val']], + [`${fixedTimestamp}-2`, ['key', 'val']], ], ], ]) @@ -52,10 +66,18 @@ describe('xread', () => { ['3-0', ['key', 'val']], ], 'other-stream': [['1-0', ['key', 'val']]], - 'stream:stream:1-0': { polled: false }, - 'stream:stream:2-0': { polled: false }, - 'stream:stream:3-0': { polled: false }, - 'stream:other-stream:1-0': { polled: false }, + 'stream:stream:1-0': { + polled: false, + }, + 'stream:stream:2-0': { + polled: false, + }, + 'stream:stream:3-0': { + polled: false, + }, + 'stream:other-stream:1-0': { + polled: false, + }, }, }) return redis @@ -87,10 +109,18 @@ describe('xread', () => { ['3-0', ['key', 'val']], ], 'other-stream': [['1-0', ['key', 'val']]], - 'stream:stream:1-0': { polled: false }, - 'stream:stream:2-0': { polled: false }, - 'stream:stream:3-0': { polled: false }, - 'stream:other-stream:1-0': { polled: false }, + 'stream:stream:1-0': { + polled: false, + }, + 'stream:stream:2-0': { + polled: false, + }, + 'stream:stream:3-0': { + polled: false, + }, + 'stream:other-stream:1-0': { + polled: false, + }, }, }) return redis @@ -119,7 +149,7 @@ describe('xread', () => { .then(row => { const [[stream, [[id, values]]]] = row expect(stream).toBe('stream') - expect(id).toBe('1-0') + expect(id).toBe(`${fixedTimestamp}-0`) expect(values).toEqual(['key', 'val']) }) return redis.xadd('stream', '*', 'key', 'val').then(() => op) @@ -131,11 +161,11 @@ describe('xread', () => { 'should block reads till data becomes available since an id', () => { const op = redis - .xread('BLOCK', '0', 'STREAMS', 'stream', '2-0') + .xread('BLOCK', '0', 'STREAMS', 'stream', `${fixedTimestamp}-1`) .then(row => { const [[stream, [[id, values]]]] = row expect(stream).toBe('stream') - expect(id).toBe('2-0') + expect(id).toBe(`${fixedTimestamp}-1`) expect(values).toEqual(['key', 'val']) }) return redis @@ -146,7 +176,6 @@ describe('xread', () => { ) it('should block reads on a stream with a time out', () => { - const redis = new Redis() const before = performance.now() return redis .xread('BLOCK', '500', 'STREAMS', 'empty-stream', '$') @@ -158,7 +187,6 @@ describe('xread', () => { }) it('should block reads on multiple streams with a time out', () => { - const redis = new Redis() const before = performance.now() return redis .xread( @@ -202,7 +230,7 @@ describe('xread', () => { const after = performance.now() expect(after - before >= 100).toBe(true) expect(row).toEqual([ - ['empty-stream-2', [['1-0', ['key', 'val']]]], + ['empty-stream-2', [[`${fixedTimestamp}-0`, ['key', 'val']]]], ['empty-stream', []], ]) }) @@ -220,9 +248,15 @@ describe('xread', () => { ['2-0', ['key', 'val']], ['3-0', ['key', 'val']], ], - 'stream:stream:1-0': { polled: false }, - 'stream:stream:2-0': { polled: false }, - 'stream:stream:3-0': { polled: false }, + 'stream:stream:1-0': { + polled: false, + }, + 'stream:stream:2-0': { + polled: false, + }, + 'stream:stream:3-0': { + polled: false, + }, }, }) return redis.xread('STREAMS', 'stream', '2-0').then(events =>