From 1df13cae4f08253e017b19e780246ee6a2df5e51 Mon Sep 17 00:00:00 2001 From: Anders Rune Jensen Date: Wed, 23 Mar 2022 00:47:39 +0100 Subject: [PATCH] Implement group by --- README.md | 7 +- index.js | 54 ++++++++- operators.js | 11 +- test/add.js | 140 ++++++++++++++-------- test/bump-version.js | 24 ++-- test/del.js | 17 ++- test/live.js | 20 +++- test/operators.js | 72 +++++++++++- test/prefix.js | 208 +++++++++++++++++++++------------ test/query.js | 204 +++++++++++++++++++++----------- test/reindex.js | 34 +++--- test/seq-index-not-uptodate.js | 22 ++-- test/slow-save.js | 26 +++-- 13 files changed, 594 insertions(+), 245 deletions(-) diff --git a/README.md b/README.md index f77b01b..4c3668c 100644 --- a/README.md +++ b/README.md @@ -487,7 +487,7 @@ First some terminology: offset refers to the byte position in the log of a message. Seq refers to the 0-based position of a message in the log. -### paginate(operation, seq, limit, descending, onlyOffset, sortBy, cb) +### paginate(operation, seq, limit, descending, onlyOffset, sortBy, groupBy, cb) Query the database returning paginated results. If one or more indexes doesn't exist or are outdated, the indexes will be updated before the @@ -497,7 +497,10 @@ ordering messages. Can take values `declared` or `arrival`. `declared` refers to the timestamp for when a message was created, while `arrival` refers to when a message was added to the database. This can be important for messages from other peers that might arrive out of -order compared when they were created. +order compared when they were created. `groupBy` if used, must be a +function that takes a buffer as input and returns an index in the +buffer of the value used for grouping. The idea is to only get 1 +result per grouped value. The result is an object with the fields: diff --git a/index.js b/index.js index 3fc1c00..844f0d5 100644 --- a/index.js +++ b/index.js @@ -1267,12 +1267,47 @@ module.exports = function (log, indexesPath) { descending, onlyOffset, sortBy, + groupBy, cb ) { seq = seq || 0 - const sorted = sortedBy(bitset, descending, sortBy) - const resultSize = sorted.size + let sortedBitset = sortedBy(bitset, descending, sortBy) + + if (groupBy) { + let seqs = sortedBitset.kSmallest(Infinity).map((x) => x.seq) + const uniqueByValue = new Map() + push( + push.values(seqs), + push.asyncMap(getRecord), + push.drain( + (record) => { + const fieldStart = groupBy(record.value) + if (fieldStart < 0) return true + const value = bipf.decode(record.value, fieldStart) + if (!uniqueByValue.has(value)) + uniqueByValue.set(value, record.value) + + if (uniqueByValue.size == seq + limit) return false + }, + (early) => { + if (early) return + + const results = Array.from(uniqueByValue.values()) + .slice(seq) + .map((x) => bipf.decode(x, 0)) + cb(null, { + results: results, + total: results.length, + }) + } + ) + ) + return + } + + const sorted = sortedBitset + const resultSize = sortedBitset.size // seq -> record buffer const recBufferCache = {} @@ -1339,7 +1374,16 @@ module.exports = function (log, indexesPath) { else return bitset.size() - seq } - function paginate(operation, seq, limit, descending, onlyOffset, sortBy, cb) { + function paginate( + operation, + seq, + limit, + descending, + onlyOffset, + sortBy, + groupBy, + cb + ) { onReady(() => { const start = Date.now() executeOperation(operation, (err0, result) => { @@ -1353,6 +1397,7 @@ module.exports = function (log, indexesPath) { descending, onlyOffset, sortBy, + groupBy, (err1, answer) => { if (err1) cb(err1) else { @@ -1373,7 +1418,7 @@ module.exports = function (log, indexesPath) { }) } - function all(operation, seq, descending, onlyOffset, sortBy, cb) { + function all(operation, seq, descending, onlyOffset, sortBy, groupBy, cb) { onReady(() => { const start = Date.now() executeOperation(operation, (err0, result) => { @@ -1387,6 +1432,7 @@ module.exports = function (log, indexesPath) { descending, onlyOffset, sortBy, + groupBy, (err1, answer) => { if (err1) cb(err1) else { diff --git a/operators.js b/operators.js index f425072..f67c9eb 100644 --- a/operators.js +++ b/operators.js @@ -391,6 +391,10 @@ function sortByArrival() { return (ops) => updateMeta(ops, 'sortBy', 'arrival') } +function groupBy(seek) { + return (ops) => updateMeta(ops, 'groupBy', seek) +} + function startFrom(seq) { return (ops) => updateMeta(ops, 'seq', seq) } @@ -484,7 +488,7 @@ function toCallback(cb) { if (end) return cb(end) const seq = meta.seq || 0 - const { pageSize, descending, asOffsets, sortBy } = meta + const { pageSize, descending, asOffsets, sortBy, groupBy } = meta if (meta.count) meta.jitdb.count(ops, seq, descending, cb) else if (pageSize) meta.jitdb.paginate( @@ -494,9 +498,10 @@ function toCallback(cb) { descending, asOffsets, sortBy, + groupBy, cb ) - else meta.jitdb.all(ops, seq, descending, asOffsets, sortBy, cb) + else meta.jitdb.all(ops, seq, descending, asOffsets, sortBy, groupBy, cb) }) } } @@ -530,6 +535,7 @@ function toPullStream() { meta.descending, meta.asOffsets, meta.sortBy, + meta.groupBy, (err, answer) => { if (err) return cb(err) else if (answer.total === 0) cb(true) @@ -602,6 +608,7 @@ module.exports = { descending, sortByArrival, + groupBy, count, startFrom, paginate, diff --git a/test/add.js b/test/add.js index 5d17025..8830717 100644 --- a/test/add.js +++ b/test/add.js @@ -45,6 +45,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => { false, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 2) @@ -56,6 +57,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => { true, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 2) t.equal(results[0].value.author, keys2.id) @@ -67,6 +69,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => { false, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 2) t.equal(results[0].value.author, keys.id) @@ -87,6 +90,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => { false, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 1) t.equal(results[0].id, msg1.id) @@ -99,6 +103,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => { false, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 1) t.equal(results[0].id, msg1.id) @@ -113,6 +118,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => { false, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 1) t.equal(results[0].id, msg1.id) @@ -143,6 +149,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => { false, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 2) t.end() @@ -185,13 +192,13 @@ prepareAndRunTest('Update index', dir, (t, db, raf) => { t.equal(typeof db.status.value['type_post'], 'undefined') addMsg(state.queue[0].value, raf, (err, msg1) => { - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 1) t.equal(db.status.value['seq'], 0) t.equal(db.status.value['type_post'], 0) addMsg(state.queue[1].value, raf, (err, msg1) => { - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 2) t.equal(db.status.value['seq'], 352) t.equal(db.status.value['type_post'], 352) @@ -227,7 +234,7 @@ prepareAndRunTest('obsolete status parts disappear', dir, (t, db, raf) => { addMsg(q.value, raf, cb) }), push.collect(() => { - db.paginate(typeQuery, 0, 1, false, false, 'declared', () => { + db.paginate(typeQuery, 0, 1, false, false, 'declared', null, () => { t.pass(JSON.stringify(db.status.value)) t.ok(db.status.value['seq']) t.ok(db.status.value['type_post']) @@ -254,13 +261,22 @@ prepareAndRunTest('obsolete status parts disappear', dir, (t, db, raf) => { addMsg(q.value, raf, cb) }), push.collect(() => { - db.paginate(aboutQuery, 0, 1, false, false, 'declared', () => { - t.pass(JSON.stringify(db.status.value)) - t.ok(db.status.value['seq']) - t.notOk(db.status.value['type_post']) - t.ok(db.status.value['type_about']) - t.end() - }) + db.paginate( + aboutQuery, + 0, + 1, + false, + false, + 'declared', + null, + () => { + t.pass(JSON.stringify(db.status.value)) + t.ok(db.status.value['seq']) + t.notOk(db.status.value['type_post']) + t.ok(db.status.value['type_about']) + t.end() + } + ) }) ) }) @@ -300,6 +316,7 @@ prepareAndRunTest('grow', dir, (t, db, raf) => { false, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 1) t.equal(results[0].value.content.text, 'Testing 31999') @@ -350,13 +367,21 @@ prepareAndRunTest('indexAll', dir, (t, db, raf) => { addMsg(state.queue[1].value, raf, (err, msg) => { addMsg(state.queue[2].value, raf, (err, msg) => { addMsg(state.queue[3].value, raf, (err, msg) => { - db.all(authorQuery, 0, false, false, 'declared', (err, results) => { - t.error(err) - t.equal(results.length, 1) - t.equal(results[0].value.content.text, 'Testing 1') - t.equal(Object.keys(db.indexes).length, 3 + 2 + 1 + 1) - t.end() - }) + db.all( + authorQuery, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.error(err) + t.equal(results.length, 1) + t.equal(results[0].value.content.text, 'Testing 1') + t.equal(Object.keys(db.indexes).length, 3 + 2 + 1 + 1) + t.end() + } + ) }) }) }) @@ -389,40 +414,55 @@ prepareAndRunTest('indexAll multiple reindexes', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err, msg) => { addMsg(state.queue[1].value, raf, (err, msg) => { - db.all(typeQuery('post'), 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 1) - t.equal(results[0].value.content.text, 'Testing 1') - - addMsg(state.queue[2].value, raf, (err, msg) => { - addMsg(state.queue[3].value, raf, (err, msg) => { - db.all( - typeQuery('about'), - 0, - false, - false, - 'declared', - (err, results) => { - t.equal(results.length, 1) - - db.all( - typeQuery('post'), - 0, - false, - false, - 'declared', - (err, results) => { - t.equal(results.length, 2) - t.deepEqual(db.indexes['type_post'].bitset.array(), [0, 2]) - t.deepEqual(db.indexes['type_contact'].bitset.array(), [1]) - t.deepEqual(db.indexes['type_about'].bitset.array(), [3]) - t.end() - } - ) - } - ) + db.all( + typeQuery('post'), + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 1) + t.equal(results[0].value.content.text, 'Testing 1') + + addMsg(state.queue[2].value, raf, (err, msg) => { + addMsg(state.queue[3].value, raf, (err, msg) => { + db.all( + typeQuery('about'), + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 1) + + db.all( + typeQuery('post'), + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 2) + t.deepEqual(db.indexes['type_post'].bitset.array(), [ + 0, + 2, + ]) + t.deepEqual(db.indexes['type_contact'].bitset.array(), [ + 1, + ]) + t.deepEqual(db.indexes['type_about'].bitset.array(), [3]) + t.end() + } + ) + } + ) + }) }) - }) - }) + } + ) }) }) }) diff --git a/test/bump-version.js b/test/bump-version.js index e334aa2..b3d8de2 100644 --- a/test/bump-version.js +++ b/test/bump-version.js @@ -40,7 +40,7 @@ prepareAndRunTest('Bitvector index version bumped', dir, (t, db, raf) => { } addMsg(state.queue[0].value, raf, (err, msg) => { - db.all(postQuery, 0, false, false, 'declared', (err, results) => { + db.all(postQuery, 0, false, false, 'declared', null, (err, results) => { t.error(err) t.equal(results.length, 1) t.equal(results[0].value.content.text, 'Testing 1') @@ -56,7 +56,7 @@ prepareAndRunTest('Bitvector index version bumped', dir, (t, db, raf) => { }, } - db.all(postQuery2, 0, false, false, 'declared', (err, results) => { + db.all(postQuery2, 0, false, false, 'declared', null, (err, results) => { t.error(err) t.equal(results.length, 1) t.equal(results[0].value.content.text, 'Testing 1') @@ -84,7 +84,7 @@ prepareAndRunTest('Prefix map index version bumped', dir, (t, db, raf) => { }, } - db.all(keyQuery, 0, false, false, 'declared', (err, results) => { + db.all(keyQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 1) t.equal(results[0].value.content.text, 'Testing post 2!') @@ -102,11 +102,19 @@ prepareAndRunTest('Prefix map index version bumped', dir, (t, db, raf) => { }, } - db.all(keyQuery2, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 1) - t.equal(results[0].value.content.text, 'Testing post 2!') - t.end() - }) + db.all( + keyQuery2, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 1) + t.equal(results[0].value.content.text, 'Testing post 2!') + t.end() + } + ) }) }) }) diff --git a/test/del.js b/test/del.js index dbd9e91..7d75b3f 100644 --- a/test/del.js +++ b/test/del.js @@ -46,13 +46,22 @@ prepareAndRunTest('Delete', dir, (t, db, raf) => { false, false, 'declared', + null, (err, { results }) => { t.deepEqual(results, [msg1, msg3]) - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { - t.deepEqual(results, [msg1, msg3]) - t.end() - }) + db.all( + typeQuery, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.deepEqual(results, [msg1, msg3]) + t.end() + } + ) } ) }) diff --git a/test/live.js b/test/live.js index e23e305..9a4db8c 100644 --- a/test/live.js +++ b/test/live.js @@ -252,7 +252,7 @@ prepareAndRunTest('Live with initial values', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err, msg1) => { // create index - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 1) pull( @@ -261,10 +261,18 @@ prepareAndRunTest('Live with initial values', dir, (t, db, raf) => { t.equal(result.key, state.queue[1].key) // rerun on updated index - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 2) - t.end() - }) + db.all( + typeQuery, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 2) + t.end() + } + ) }) ) @@ -326,7 +334,7 @@ prepareAndRunTest('Live with seq values', dir, (t, db, raf) => { } addMsg(state.queue[0].value, raf, (err, msg1) => { - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 1) let liveI = 1 diff --git a/test/operators.js b/test/operators.js index f5074a1..e040802 100644 --- a/test/operators.js +++ b/test/operators.js @@ -10,6 +10,7 @@ const ssbKeys = require('ssb-keys') const { prepareAndRunTest, addMsg, helpers } = require('./common')() const rimraf = require('rimraf') const mkdirp = require('mkdirp') +const { seekKey } = require('bipf') const { query, and, @@ -36,6 +37,7 @@ const { count, descending, sortByArrival, + groupBy, asOffsets, toCallback, toPromise, @@ -446,13 +448,16 @@ prepareAndRunTest( sortByArrival() ) + const groupByFunc = (buf) => 1 + const queryTreeAll = query( fromDB(db), where(slowEqual('value.content.type', 'post')), startFrom(5), paginate(10), descending(), - sortByArrival() + sortByArrival(), + groupBy(groupByFunc) ) t.equal(queryTreePaginate.meta.pageSize, 10) @@ -464,6 +469,7 @@ prepareAndRunTest( t.equal(queryTreeAll.meta.seq, 5) t.equal(queryTreeAll.meta.descending, true) t.equal(queryTreeAll.meta.sortBy, 'arrival') + t.equal(queryTreeAll.meta.groupBy, groupByFunc) t.end() } @@ -972,6 +978,70 @@ prepareAndRunTest('operators toCallback with sortBy', dir, (t, db, raf) => { }) }) +prepareAndRunTest('operators toCallback with groupBy', dir, (t, db, raf) => { + const msg = { type: 'post', text: 'Test 1' } + const msg2 = { type: 'posty', text: 'Testing!' } + const msg3 = { type: 'post', text: 'Test 2' } + let state = validate.initial() + state = validate.appendNew(state, null, alice, msg, Date.now()) + state = validate.appendNew(state, null, alice, msg2, Date.now() + 1) + state = validate.appendNew(state, null, alice, msg3, Date.now() + 2) + state = validate.appendNew(state, null, bob, msg, Date.now() + 3) + + function groupByType(buffer) { + let p = 0 // note you pass in p! + p = seekKey(buffer, p, 'value') + if (p < 0) return + p = seekKey(buffer, p, 'content') + if (p < 0) return + return seekKey(buffer, p, 'type') + } + + // simulate messages was replicated out of order + addMsg(state.queue[0].value, raf, () => { + addMsg(state.queue[1].value, raf, () => { + addMsg(state.queue[2].value, raf, () => { + addMsg(state.queue[3].value, raf, () => { + query( + fromDB(db), + where(slowEqual('value.author', alice.id)), + descending(), + groupBy(groupByType), + toCallback((err, msgs) => { + console.log('first query done') + t.error(err, 'toCallback got no error') + t.equal(msgs.length, 2, 'toCallback got two messages') + t.equal(msgs[0].value.author, alice.id) + t.equal(msgs[0].value.content.type, 'post') + t.equal(msgs[0].value.content.text, 'Test 2') + t.equal(msgs[1].value.author, alice.id) + t.equal(msgs[1].value.content.type, 'posty') + + query( + fromDB(db), + where(slowEqual('value.author', alice.id)), + startFrom(1), + paginate(1), + groupBy(groupByType), + toCallback((err, result) => { + console.log('second query done') + t.error(err, 'toCallback got no error') + const msgs = result.results + t.equal(msgs.length, 1, 'toCallback got one message') + t.equal(msgs[0].value.author, alice.id) + t.equal(msgs[0].value.content.type, 'posty') + + t.end() + }) + ) + }) + ) + }) + }) + }) + }) +}) + prepareAndRunTest('support deferred operations', dir, (t, db, raf) => { const msg = { type: 'post', text: 'Testing!' } let state = validate.initial() diff --git a/test/prefix.js b/test/prefix.js index 6861598..df22695 100644 --- a/test/prefix.js +++ b/test/prefix.js @@ -41,7 +41,7 @@ prepareAndRunTest('Prefix equal', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err, msg) => { addMsg(state.queue[1].value, raf, (err, msg) => { addMsg(state.queue[2].value, raf, (err, msg) => { - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 2) t.equal(results[0].value.content.type, 'post') t.equal(results[1].value.content.type, 'post') @@ -86,17 +86,33 @@ prepareAndRunTest('Normal index renamed to prefix', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err, msg) => { addMsg(state.queue[1].value, raf, (err, msg) => { addMsg(state.queue[2].value, raf, (err, msg) => { - db.all(normalQuery, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 2) - t.equal(results[0].value.content.type, 'post') - t.equal(results[1].value.content.type, 'post') - db.all(prefixQuery, 0, false, false, 'declared', (err, results2) => { - t.equal(results2.length, 2) - t.equal(results2[0].value.content.type, 'post') - t.equal(results2[1].value.content.type, 'post') - t.end() - }) - }) + db.all( + normalQuery, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 2) + t.equal(results[0].value.content.type, 'post') + t.equal(results[1].value.content.type, 'post') + db.all( + prefixQuery, + 0, + false, + false, + 'declared', + null, + (err, results2) => { + t.equal(results2.length, 2) + t.equal(results2[0].value.content.type, 'post') + t.equal(results2[1].value.content.type, 'post') + t.end() + } + ) + } + ) }) }) }) @@ -138,27 +154,36 @@ prepareAndRunTest('Prefix index skips deleted records', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err1) => { addMsg(state.queue[1].value, raf, (err2) => { addMsg(state.queue[2].value, raf, (err3) => { - db.all(prefixQuery, 0, false, true, 'declared', (err4, offsets) => { - t.error(err4, 'no err4') - t.deepEqual(offsets, [0, 760]) - raf.del(760, (err5) => { - t.error(err5, 'no err5') - db.all( - prefixQuery2, - 0, - false, - false, - 'declared', - (err6, results) => { - t.error(err6, 'no err6') - t.equal(results.length, 1) - t.equal(results[0].value.content.type, 'post') - t.equal(results[0].value.content.text, 'Testing!') - t.end() - } - ) - }) - }) + db.all( + prefixQuery, + 0, + false, + true, + 'declared', + null, + (err4, offsets) => { + t.error(err4, 'no err4') + t.deepEqual(offsets, [0, 760]) + raf.del(760, (err5) => { + t.error(err5, 'no err5') + db.all( + prefixQuery2, + 0, + false, + false, + 'declared', + null, + (err6, results) => { + t.error(err6, 'no err6') + t.equal(results.length, 1) + t.equal(results[0].value.content.type, 'post') + t.equal(results[0].value.content.text, 'Testing!') + t.end() + } + ) + }) + } + ) }) }) }) @@ -188,11 +213,19 @@ prepareAndRunTest('Prefix larger than actual value', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err, msg) => { addMsg(state.queue[1].value, raf, (err, msg) => { addMsg(state.queue[2].value, raf, (err, msg) => { - db.all(channelQuery, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 1) - t.equal(results[0].value.content.text, 'First') - t.end() - }) + db.all( + channelQuery, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 1) + t.equal(results[0].value.content.text, 'First') + t.end() + } + ) }) }) }) @@ -222,12 +255,20 @@ prepareAndRunTest('Prefix equal falsy', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err, msg) => { addMsg(state.queue[1].value, raf, (err, msg) => { addMsg(state.queue[2].value, raf, (err, msg) => { - db.all(channelQuery, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 2) - t.equal(results[0].value.content.text, 'Second') - t.equal(results[1].value.content.text, 'Third') - t.end() - }) + db.all( + channelQuery, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 2) + t.equal(results[0].value.content.text, 'Second') + t.equal(results[1].value.content.text, 'Third') + t.end() + } + ) }) }) }) @@ -287,36 +328,45 @@ prepareAndRunTest('Prefix equal', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err, msg) => { addMsg(state.queue[1].value, raf, (err, msg) => { addMsg(state.queue[2].value, raf, (err, msg) => { - db.all(voteQuery, 0, false, false, 'declared', (err, results) => { + db.all(voteQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 1) t.equal(results[0].value.content.type, 'vote') db = jitdb(raf, path.join(dir, 'indexes' + name)) db.onReady(() => { addMsg(state.queue[3].value, raf, (err, msg) => { - db.all(voteQuery, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 2) - - db = jitdb(raf, path.join(dir, 'indexes' + name)) - db.onReady(() => { - addMsg(state.queue[4].value, raf, (err, msg) => { - db.all( - voteQuery, - 0, - false, - false, - 'declared', - (err, results) => { - t.equal(results.length, 3) - t.equal(results[0].value.content.type, 'vote') - t.equal(results[1].value.content.type, 'vote') - t.equal(results[2].value.content.type, 'vote') - t.end() - } - ) + db.all( + voteQuery, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 2) + + db = jitdb(raf, path.join(dir, 'indexes' + name)) + db.onReady(() => { + addMsg(state.queue[4].value, raf, (err, msg) => { + db.all( + voteQuery, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 3) + t.equal(results[0].value.content.type, 'vote') + t.equal(results[1].value.content.type, 'vote') + t.equal(results[2].value.content.type, 'vote') + t.end() + } + ) + }) }) - }) - }) + } + ) }) }) }) @@ -349,10 +399,18 @@ prepareAndRunTest('Prefix equal unknown value', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err, msg) => { addMsg(state.queue[1].value, raf, (err, msg) => { addMsg(state.queue[2].value, raf, (err, msg) => { - db.all(authorQuery, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 0) - t.end() - }) + db.all( + authorQuery, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 0) + t.end() + } + ) }) }) }) @@ -383,7 +441,7 @@ prepareAndRunTest('Prefix map equal', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err, msg) => { addMsg(state.queue[1].value, raf, (err, msg) => { addMsg(state.queue[2].value, raf, (err, msg) => { - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 2) t.equal(results[0].value.content.type, 'post') t.equal(results[1].value.content.type, 'post') @@ -420,7 +478,7 @@ prepareAndRunTest('Prefix offset', dir, (t, db, raf) => { }, } - db.all(keyQuery, 0, false, false, 'declared', (err, results) => { + db.all(keyQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 1) t.equal(results[0].value.content.text, 'Testing 2!') t.end() @@ -450,7 +508,7 @@ prepareAndRunTest('Prefix offset 1 on empty', dir, (t, db, raf) => { }, } - db.all(rootQuery, 0, false, false, 'declared', (err, results) => { + db.all(rootQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 1) t.equal(results[0].value.content.text, 'Testing!') t.end() @@ -502,6 +560,7 @@ prepareAndRunTest('Prefix delete', dir, (t, db, raf) => { false, false, 'declared', + null, (err, results) => { t.equal(results.length, 2) t.equal(results[0].value.content.type, 'post') @@ -518,6 +577,7 @@ prepareAndRunTest('Prefix delete', dir, (t, db, raf) => { false, false, 'declared', + null, (err, answer) => { t.equal(answer.results.length, 1) t.equal(answer.results[0].value.content.text, 'Testing 2!') diff --git a/test/query.js b/test/query.js index 535262c..bc9a435 100644 --- a/test/query.js +++ b/test/query.js @@ -51,17 +51,25 @@ prepareAndRunTest('Multiple types', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err, msg) => { addMsg(state.queue[1].value, raf, (err, msg) => { addMsg(state.queue[2].value, raf, (err, msg) => { - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 2) t.equal(results[0].value.content.type, 'post') t.equal(results[1].value.content.type, 'post') - db.all(contactQuery, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 1) - t.equal(results[0].value.content.type, 'contact') - - t.end() - }) + db.all( + contactQuery, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 1) + t.equal(results[0].value.content.type, 'contact') + + t.end() + } + ) }) }) }) @@ -98,6 +106,7 @@ prepareAndRunTest('Top 1 multiple types', dir, (t, db, raf) => { true, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 1) t.equal(results[0].value.content.text, 'Testing 2!') @@ -133,6 +142,7 @@ prepareAndRunTest('Limit -1', dir, (t, db, raf) => { true, false, 'declared', + null, (err2, { results }) => { t.error(err2) t.equal(results.length, 0) @@ -166,6 +176,7 @@ prepareAndRunTest('Limit 0', dir, (t, db, raf) => { true, false, 'declared', + null, (err2, { results }) => { t.error(err2) t.equal(results.length, 0) @@ -198,13 +209,21 @@ prepareAndRunTest('Includes', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err1, msg) => { addMsg(state.queue[1].value, raf, (err2, msg) => { addMsg(state.queue[2].value, raf, (err3, msg) => { - db.all(typeQuery, 0, false, false, 'declared', (err4, results) => { - t.error(err4) - t.equal(results.length, 2) - t.equal(results[0].value.content.text, '1st') - t.equal(results[1].value.content.text, '2nd') - t.end() - }) + db.all( + typeQuery, + 0, + false, + false, + 'declared', + null, + (err4, results) => { + t.error(err4) + t.equal(results.length, 2) + t.equal(results[0].value.content.text, '1st') + t.equal(results[1].value.content.text, '2nd') + t.end() + } + ) }) }) }) @@ -246,13 +265,21 @@ prepareAndRunTest('Includes and pluck', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err1, msg) => { addMsg(state.queue[1].value, raf, (err2, msg) => { addMsg(state.queue[2].value, raf, (err3, msg) => { - db.all(typeQuery, 0, false, false, 'declared', (err4, results) => { - t.error(err4) - t.equal(results.length, 2) - t.equal(results[0].value.content.text, '1st') - t.equal(results[1].value.content.text, '2nd') - t.end() - }) + db.all( + typeQuery, + 0, + false, + false, + 'declared', + null, + (err4, results) => { + t.error(err4) + t.equal(results.length, 2) + t.equal(results[0].value.content.text, '1st') + t.equal(results[1].value.content.text, '2nd') + t.end() + } + ) }) }) }) @@ -288,6 +315,7 @@ prepareAndRunTest('Paginate many pages', dir, (t, db, raf) => { false, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 1) t.equal(results[0].value.content.text, '1st') @@ -298,6 +326,7 @@ prepareAndRunTest('Paginate many pages', dir, (t, db, raf) => { false, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 1) t.equal(results[0].value.content.text, '2nd') @@ -308,6 +337,7 @@ prepareAndRunTest('Paginate many pages', dir, (t, db, raf) => { false, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 1) t.equal(results[0].value.content.text, '3rd') @@ -350,6 +380,7 @@ prepareAndRunTest('Paginate empty', dir, (t, db, raf) => { false, false, 'declared', + null, (err3, { results }) => { t.error(err3) t.equal(results.length, 0) @@ -390,6 +421,7 @@ prepareAndRunTest('Seq', dir, (t, db, raf) => { true, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 1) t.equal(results[0].value.content.text, 'Testing!') @@ -425,6 +457,7 @@ prepareAndRunTest('Buffer', dir, (t, db, raf) => { true, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 1) t.equal(results[0].value.content.text, 'Testing!') @@ -464,6 +497,7 @@ prepareAndRunTest('Undefined', dir, (t, db, raf) => { false, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 2) t.equal(results[0].value.content.text, 'Testing no root') @@ -506,6 +540,7 @@ prepareAndRunTest('Null', dir, (t, db, raf) => { false, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 1) t.equal(results[0].value.content.text, 'Testing root null') @@ -556,20 +591,19 @@ prepareAndRunTest('GT,GTE,LT,LTE', dir, (t, db, raf) => { addMsg(state.queue[1].value, raf, (err, dbMsg2) => { addMsg(state.queue[2].value, raf, (err, dbMsg3) => { addMsg(state.queue[3].value, raf, (err, dbMsg4) => { - db.all(filterQuery, 0, false, false, 'declared', (err, results) => { - t.error(err, 'no err') - t.equal(results.length, 3) - t.equal(results[0].value.content.text, '2') - - filterQuery.data[0].type = 'GTE' - // clone to force cache invalidation inside db.all: - filterQuery = Object.assign({}, filterQuery) - db.all(filterQuery, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 4) - t.equal(results[0].value.content.text, '1') - - filterQuery.data[0].type = 'LT' - filterQuery.data[0].data.value = 3 + db.all( + filterQuery, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.error(err, 'no err') + t.equal(results.length, 3) + t.equal(results[0].value.content.text, '2') + + filterQuery.data[0].type = 'GTE' // clone to force cache invalidation inside db.all: filterQuery = Object.assign({}, filterQuery) db.all( @@ -578,11 +612,13 @@ prepareAndRunTest('GT,GTE,LT,LTE', dir, (t, db, raf) => { false, false, 'declared', + null, (err, results) => { - t.equal(results.length, 2) + t.equal(results.length, 4) t.equal(results[0].value.content.text, '1') - filterQuery.data[0].type = 'LTE' + filterQuery.data[0].type = 'LT' + filterQuery.data[0].data.value = 3 // clone to force cache invalidation inside db.all: filterQuery = Object.assign({}, filterQuery) db.all( @@ -591,13 +627,12 @@ prepareAndRunTest('GT,GTE,LT,LTE', dir, (t, db, raf) => { false, false, 'declared', + null, (err, results) => { - t.equal(results.length, 3) + t.equal(results.length, 2) t.equal(results[0].value.content.text, '1') - filterQuery.data[0].type = 'GT' - filterQuery.data[0].data.indexName = 'timestamp' - filterQuery.data[0].data.value = dbMsg1.value.timestamp + filterQuery.data[0].type = 'LTE' // clone to force cache invalidation inside db.all: filterQuery = Object.assign({}, filterQuery) db.all( @@ -606,19 +641,39 @@ prepareAndRunTest('GT,GTE,LT,LTE', dir, (t, db, raf) => { false, false, 'declared', + null, (err, results) => { t.equal(results.length, 3) - t.equal(results[0].value.content.text, '2') - - t.end() + t.equal(results[0].value.content.text, '1') + + filterQuery.data[0].type = 'GT' + filterQuery.data[0].data.indexName = 'timestamp' + filterQuery.data[0].data.value = + dbMsg1.value.timestamp + // clone to force cache invalidation inside db.all: + filterQuery = Object.assign({}, filterQuery) + db.all( + filterQuery, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 3) + t.equal(results[0].value.content.text, '2') + + t.end() + } + ) } ) } ) } ) - }) - }) + } + ) }) }) }) @@ -649,14 +704,22 @@ prepareAndRunTest('GTE Zero', dir, (t, db, raf) => { addMsg(state.queue[1].value, raf, (err, dbMsg2) => { addMsg(state.queue[2].value, raf, (err, dbMsg3) => { addMsg(state.queue[3].value, raf, (err, dbMsg4) => { - db.all(filterQuery, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 4) - t.equal(results[0].value.content.text, '1') - t.equal(results[1].value.content.text, '2') - t.equal(results[2].value.content.text, '3') - t.equal(results[3].value.content.text, '4') - t.end() - }) + db.all( + filterQuery, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 4) + t.equal(results[0].value.content.text, '1') + t.equal(results[1].value.content.text, '2') + t.equal(results[2].value.content.text, '3') + t.equal(results[3].value.content.text, '4') + t.end() + } + ) }) }) }) @@ -702,6 +765,7 @@ prepareAndRunTest('Data offsets', dir, (t, db, raf) => { true, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 1) t.equal(results[0].value.content.text, 'Testing no root') @@ -731,7 +795,7 @@ prepareAndRunTest('Data seqs simple', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err, msg) => { addMsg(state.queue[1].value, raf, (err, msg) => { addMsg(state.queue[2].value, raf, (err, msg) => { - db.all(dataQuery, 0, false, false, 'declared', (err, results) => { + db.all(dataQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 2) t.equal(results[0].value.content.name, 'Test') t.equal(results[1].value.content.text, 'Testing no root') @@ -781,6 +845,7 @@ prepareAndRunTest('Data seqs', dir, (t, db, raf) => { true, false, 'declared', + null, (err, { results }) => { t.equal(results.length, 1) t.equal(results[0].value.content.text, 'Testing no root') @@ -838,7 +903,7 @@ prepareAndRunTest('Multiple ands', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err, msg) => { addMsg(state.queue[1].value, raf, (err, msg) => { addMsg(state.queue[2].value, raf, (err, msg) => { - db.all(allQuery, 0, false, false, 'declared', (err, results) => { + db.all(allQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 1) t.equal(results[0].value.content.text, 'Testing 2!') t.end() @@ -911,7 +976,7 @@ prepareAndRunTest('Multiple ors', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err, msg) => { addMsg(state.queue[1].value, raf, (err, msg) => { addMsg(state.queue[2].value, raf, (err, msg) => { - db.all(allQuery, 0, false, false, 'declared', (err, results) => { + db.all(allQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 2) t.equal(results[0].value.content.text, 'Testing!') t.equal(results[1].value.content.text, 'Testing 2!') @@ -949,13 +1014,21 @@ prepareAndRunTest('Timestamp discontinuity', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err, m1) => { addMsg(state.queue[1].value, raf, (err, m2) => { addMsg(state.queue[2].value, raf, (err, m3) => { - db.all(authorQuery, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 3) - t.equal(results[0].value.content.text, '3rd', '3rd ok') - t.equal(results[1].value.content.text, '2nd', '2nd ok') - t.equal(results[2].value.content.text, '1st', '1st ok') - t.end() - }) + db.all( + authorQuery, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 3) + t.equal(results[0].value.content.text, '3rd', '3rd ok') + t.equal(results[1].value.content.text, '2nd', '2nd ok') + t.equal(results[2].value.content.text, '1st', '1st ok') + t.end() + } + ) }) }) }) @@ -992,7 +1065,7 @@ prepareAndRunTest('reindex corrupt indexes', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err, msg) => { addMsg(state.queue[1].value, raf, (err, msg) => { addMsg(state.queue[2].value, raf, (err, msg) => { - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 2) const dir = '/tmp/jitdb-query/indexesreindex corrupt indexes/' @@ -1010,6 +1083,7 @@ prepareAndRunTest('reindex corrupt indexes', dir, (t, db, raf) => { false, false, 'declared', + null, (err, results) => { t.equal(results.length, 2) t.end() diff --git a/test/reindex.js b/test/reindex.js index b3ebe8e..6a00b16 100644 --- a/test/reindex.js +++ b/test/reindex.js @@ -79,22 +79,30 @@ prepareAndRunTest('reindex seq offset', dir, (t, db, raf) => { } addThreeMessages(raf, () => { - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 2) t.equal(results[0].value.content.type, 'post') t.equal(results[1].value.content.type, 'post') db.reindex(0, () => { - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 2) const secondMsgOffset = 352 db.reindex(secondMsgOffset, () => { - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 2) - - t.end() - }) + db.all( + typeQuery, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 2) + + t.end() + } + ) }) }) }) @@ -116,13 +124,13 @@ prepareAndRunTest('reindex bitset', dir, (t, db, raf) => { } addThreeMessages(raf, () => { - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 1) t.equal(results[0].value.content.type, 'post') db.reindex(0, () => { removeFilter() - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 2) t.end() }) @@ -147,13 +155,13 @@ prepareAndRunTest('reindex prefix', dir, (t, db, raf) => { } addThreeMessages(raf, () => { - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 1) t.equal(results[0].value.content.type, 'post') db.reindex(0, () => { removeFilter() - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 2) t.end() }) @@ -179,13 +187,13 @@ prepareAndRunTest('reindex prefix map', dir, (t, db, raf) => { } addThreeMessages(raf, () => { - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 1) t.equal(results[0].value.content.type, 'post') db.reindex(0, () => { removeFilter() - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 2) t.end() }) diff --git a/test/seq-index-not-uptodate.js b/test/seq-index-not-uptodate.js index 537fa99..bb4dfc3 100644 --- a/test/seq-index-not-uptodate.js +++ b/test/seq-index-not-uptodate.js @@ -39,7 +39,7 @@ prepareAndRunTest('Ensure seq index is updated always', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (err, msg) => { addMsg(state.queue[1].value, raf, (err, msg) => { - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { + db.all(typeQuery, 0, false, false, 'declared', null, (err, results) => { t.equal(results.length, 1) t.equal(results[0].value.content.type, 'post') @@ -51,12 +51,20 @@ prepareAndRunTest('Ensure seq index is updated always', dir, (t, db, raf) => { } addMsg(state.queue[2].value, raf, (err, msg) => { - db.all(typeQuery, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 2) - t.equal(results[0].value.content.type, 'post') - t.equal(results[1].value.content.type, 'post') - t.end() - }) + db.all( + typeQuery, + 0, + false, + false, + 'declared', + null, + (err, results) => { + t.equal(results.length, 2) + t.equal(results[0].value.content.type, 'post') + t.equal(results[1].value.content.type, 'post') + t.end() + } + ) }) }) }) diff --git a/test/slow-save.js b/test/slow-save.js index 9daca17..a6e675d 100644 --- a/test/slow-save.js +++ b/test/slow-save.js @@ -64,7 +64,7 @@ prepareAndRunTest('wip-index-save', dir, (t, db, raf) => { t.equal(results1.length, TOTAL) // Run some empty query to update the core indexes - db.all({}, 0, false, false, 'declared', (err2, results2) => { + db.all({}, 0, false, false, 'declared', null, (err2, results2) => { t.error(err2, 'indexed core with ' + TOTAL + ' msgs with no error') t.equal(results2.length, TOTAL) @@ -103,14 +103,22 @@ prepareAndRunTest('wip-index-save', dir, (t, db, raf) => { }, 65e3) // Run an actual query to check if it saves every 1min - db.all(typeQuery, 0, false, false, 'declared', (err3, results3) => { - t.error(err3, 'indexed ' + TOTAL + ' msgs no error') - t.equal(results3.length, TOTAL) - - t.true(savedAfter1min, 'saved after 1 min') - rimraf.sync(dir) // this folder is quite large, lets save space - t.end() - }) + db.all( + typeQuery, + 0, + false, + false, + 'declared', + null, + (err3, results3) => { + t.error(err3, 'indexed ' + TOTAL + ' msgs no error') + t.equal(results3.length, TOTAL) + + t.true(savedAfter1min, 'saved after 1 min') + rimraf.sync(dir) // this folder is quite large, lets save space + t.end() + } + ) }) }) )