From 9381e6bda95cf8e01c473a182e22936605edc5af Mon Sep 17 00:00:00 2001 From: Theodor Diaconu Date: Fri, 4 Oct 2019 10:02:24 +0300 Subject: [PATCH 1/3] Added ability to avoid protection against race conditions --- docs/finetuning.md | 7 +- lib/mongo/Mutator.js | 117 ++++++++++++++++++++------ lib/mongo/extendMongoCollection.js | 12 ++- lib/mongo/lib/dispatchers.js | 4 +- lib/redis/RedisSubscriptionManager.js | 7 ++ testing/boot.js | 14 ++- testing/include_prev_doc.js | 8 +- testing/main.client.js | 3 +- testing/synthetic_mutators.js | 4 + 9 files changed, 137 insertions(+), 39 deletions(-) diff --git a/docs/finetuning.md b/docs/finetuning.md index 03353d0b..744931a1 100644 --- a/docs/finetuning.md +++ b/docs/finetuning.md @@ -139,9 +139,14 @@ Tasks.configureRedisOplog({ namespace: `company::${companyId}` }); }, + // Optional boolean that determines whether you would like to include // the entire previous document in your redis events - shouldIncludePrevDocument: true, + shouldIncludePrevDocument: false, + + // If you set this to false, will offers you extreme speed when you have a lot of listeners, or when your listeners use a slave mongo database + // If may also have negative impact on performance if you have very large documents + protectAgainstRaceConditions: true, }) ``` diff --git a/lib/mongo/Mutator.js b/lib/mongo/Mutator.js index e5f680fb..78400e8b 100644 --- a/lib/mongo/Mutator.js +++ b/lib/mongo/Mutator.js @@ -1,17 +1,35 @@ -import getMutationConfig from './lib/getMutationConfig'; -import getFields from '../utils/getFields'; +import getMutationConfig from "./lib/getMutationConfig"; +import getFields from "../utils/getFields"; import { dispatchInsert, dispatchUpdate, - dispatchRemove, -} from './lib/dispatchers'; -import Config from '../config'; -import { Events } from '../constants'; + dispatchRemove +} from "./lib/dispatchers"; +import Config from "../config"; +import { Events } from "../constants"; function runCallbackInBackground(fn) { Meteor.defer(Meteor.bindEnvironment(fn)); } +function protectAgainstRaceConditions(collection) { + if (!collection._redisOplog) { + return true; + } + + return ( + collection._redisOplog && + collection._redisOplog.protectAgainstRaceConditions + ); +} + +function shouldIncludePrevDocument(collection) { + return ( + collection._redisOplog && + collection._redisOplog.shouldIncludePrevDocument + ); +} + /** * The Mutator is the interface that does the required updates */ @@ -22,8 +40,8 @@ export default class Mutator { // regardless of your choice, these 2 packages must passConfigDown // we do like this until we find a more elegant way if ( - Package['aldeed:collection2'] !== undefined || - Package['aldeed:collection2-core'] !== undefined + Package["aldeed:collection2"] !== undefined || + Package["aldeed:collection2-core"] !== undefined ) { Mutator.passConfigDown = true; } @@ -32,11 +50,15 @@ export default class Mutator { static insert(Originals, data, _config) { const config = getMutationConfig(this, _config, { doc: data, - event: Events.INSERT, + event: Events.INSERT }); if (canUseOriginalMethod(config)) { - return Originals.insert.call(this, data, _.isFunction(_config) ? _config : undefined); + return Originals.insert.call( + this, + data, + _.isFunction(_config) ? _config : undefined + ); } try { @@ -50,11 +72,17 @@ export default class Mutator { }); } + let doc = { _id: docId }; + + if (!protectAgainstRaceConditions(this)) { + doc = Originals.findOne.call(this, docId); + } + dispatchInsert( config.optimistic, this._name, config._channels, - docId + doc ); return docId; @@ -90,11 +118,17 @@ export default class Mutator { const config = getMutationConfig(this, _config, { event: Events.UPDATE, selector, - modifier, + modifier }); if (canUseOriginalMethod(config)) { - return Originals.update.call(this, selector, modifier, _config, callback); + return Originals.update.call( + this, + selector, + modifier, + _config, + callback + ); } // searching the elements that will get updated by id @@ -103,11 +137,13 @@ export default class Mutator { findOptions.limit = 1; } - const shouldIncludePrevDocument = this._redisOplog && this._redisOplog.shouldIncludePrevDocument; - if (shouldIncludePrevDocument) { - delete findOptions.fields; + let docs; + if (shouldIncludePrevDocument(this)) { + docs = this.find(selector, { ...findOptions, fields: {} }).fetch(); + } else { + docs = this.find(selector, findOptions).fetch(); } - const docs = this.find(selector, findOptions).fetch(); + let docIds = docs.map(doc => doc._id); if (config && config.upsert) { @@ -128,7 +164,7 @@ export default class Mutator { // and we extend the selector, because if between finding the docIds and updating // another matching insert sneaked in, it's update will not be pushed const updateSelector = _.extend({}, selector, { - _id: { $in: docIds }, + _id: { $in: docIds } }); try { @@ -147,6 +183,16 @@ export default class Mutator { }); } + if (!protectAgainstRaceConditions(this)) { + docs = this.find( + { _id: { $in: docIds } }, + { + ...findOptions, + fields: {} + } + ).fetch(); + } + const { fields } = getFields(modifier); dispatchUpdate( @@ -154,7 +200,7 @@ export default class Mutator { this._name, config._channels, docs, - fields, + fields ); return result; @@ -204,11 +250,19 @@ export default class Mutator { if (config.pushToRedis) { if (data.insertedId) { + doc = { + _id: data.insertedId + }; + + if (!protectAgainstRaceConditions(this)) { + doc = this.findOne(doc._id); + } + dispatchInsert( config.optimistic, this._name, config._channels, - data.insertedId + doc ); } else { // it means that we ran an upsert thinking there will be no docs @@ -222,10 +276,13 @@ export default class Mutator { // or a new document was added/modified to match selector before the actual update console.warn( - 'RedisOplog - Warning - A race condition occurred when running upsert.' + "RedisOplog - Warning - A race condition occurred when running upsert." ); } else { const { fields } = getFields(modifier); + + docs = this.find(selector).fetch(); + dispatchUpdate( config.optimistic, this._name, @@ -261,21 +318,24 @@ export default class Mutator { const config = getMutationConfig(this, _config, { selector, - event: Events.REMOVE, + event: Events.REMOVE }); if (canUseOriginalMethod(config)) { - return Originals.remove.call(this, selector, _.isFunction(_config) ? _config : undefined); + return Originals.remove.call( + this, + selector, + _.isFunction(_config) ? _config : undefined + ); } const removeSelector = _.extend({}, selector); const removeOptions = { fields: { _id: 1 }, - transform: null, + transform: null }; - const shouldIncludePrevDocument = this._redisOplog && this._redisOplog.shouldIncludePrevDocument; - if (shouldIncludePrevDocument) { + if (shouldIncludePrevDocument(this)) { delete removeOptions.fields; } @@ -329,5 +389,8 @@ function canUseOriginalMethod(mutationConfig) { // figure out what to publish to redis, and this update doesn't need // optimistic-ui processing, so we don't need to synchronously run // observers. - return !mutationConfig.pushToRedis || (Config.externalRedisPublisher && !mutationConfig.optimistic); + return ( + !mutationConfig.pushToRedis || + (Config.externalRedisPublisher && !mutationConfig.optimistic) + ); } diff --git a/lib/mongo/extendMongoCollection.js b/lib/mongo/extendMongoCollection.js index 4318dfa4..b9bc6e20 100644 --- a/lib/mongo/extendMongoCollection.js +++ b/lib/mongo/extendMongoCollection.js @@ -12,6 +12,7 @@ export default () => { update: Mongo.Collection.prototype.update, remove: Mongo.Collection.prototype.remove, find: Mongo.Collection.prototype.find, + findOne: Mongo.Collection.prototype.findOne, }; Mutator.init(); @@ -66,8 +67,13 @@ export default () => { * @param {function} cursor * @param {boolean} shouldIncludePrevDocument */ - configureRedisOplog({ mutation, cursor, shouldIncludePrevDocument = false }) { - this._redisOplog = {}; + configureRedisOplog({ mutation, cursor, ...rest }) { + this._redisOplog = { + shouldIncludePrevDocument: false, + protectAgainstRaceConditions: true, + ...rest + }; + if (mutation) { if (!_.isFunction(mutation)) { throw new Meteor.Error( @@ -86,8 +92,6 @@ export default () => { this._redisOplog.cursor = cursor; } - - this._redisOplog.shouldIncludePrevDocument = shouldIncludePrevDocument; }, }); }; diff --git a/lib/mongo/lib/dispatchers.js b/lib/mongo/lib/dispatchers.js index c5a42b0e..f49b1a72 100644 --- a/lib/mongo/lib/dispatchers.js +++ b/lib/mongo/lib/dispatchers.js @@ -75,12 +75,12 @@ const dispatchRemove = function(optimistic, collectionName, channels, docs) { dispatchEvents(optimistic, collectionName, channels, events); }; -const dispatchInsert = function(optimistic, collectionName, channels, docId) { +const dispatchInsert = function(optimistic, collectionName, channels, doc) { const uid = optimistic ? RedisSubscriptionManager.uid : null; const event = { [RedisPipe.EVENT]: Events.INSERT, - [RedisPipe.DOC]: { _id: docId }, + [RedisPipe.DOC]: doc, [RedisPipe.UID]: uid, }; diff --git a/lib/redis/RedisSubscriptionManager.js b/lib/redis/RedisSubscriptionManager.js index b07b4231..5ced6f27 100644 --- a/lib/redis/RedisSubscriptionManager.js +++ b/lib/redis/RedisSubscriptionManager.js @@ -193,6 +193,13 @@ class RedisSubscriptionManager { const event = data[RedisPipe.EVENT]; let doc = data[RedisPipe.DOC]; + if (collection._redisOplog && !collection._redisOplog.protectAgainstRaceConditions) { + // If there's no protection against race conditions + // It means we have received the full doc in doc + + return doc; + } + const fieldsOfInterest = getFieldsOfInterestFromAll(subscribers); if (fieldsOfInterest === true) { diff --git a/testing/boot.js b/testing/boot.js index 37652e6f..d6b70de5 100644 --- a/testing/boot.js +++ b/testing/boot.js @@ -17,13 +17,25 @@ const Standard = new Mongo.Collection('test_redis_collection'); const Channel = new Mongo.Collection('test_redis_collection_channel'); const Namespace = new Mongo.Collection('test_redis_collection_namespace'); -const Collections = { Standard, Channel, Namespace }; +const RaceConditionProne = new Mongo.Collection('test_redis_race_condition_prone'); +if (Meteor.isServer) { + RaceConditionProne.configureRedisOplog({ + protectAgainstRaceConditions: false + }); +} + +const Collections = { Standard, Channel, Namespace, RaceConditionProne }; const opts = { Standard: {}, + RaceConditionProne: {}, Channel: { channel: 'some_channel' }, Namespace: { namespace: 'some_namespace' }, }; const config = { + RaceConditionProne: { + suffix: 'race-condition-prone', + disableSyntheticTests: true, + }, Standard: { suffix: 'standard', channel: 'test_redis_collection' }, Channel: { suffix: 'channeled', channel: 'some_channel' }, Namespace: { diff --git a/testing/include_prev_doc.js b/testing/include_prev_doc.js index f89db5ea..21212e56 100644 --- a/testing/include_prev_doc.js +++ b/testing/include_prev_doc.js @@ -23,10 +23,12 @@ describe('PrevDocCollection Serverside', function () { } }); + const random = Random.id() + // trigger insert update and removed redis events - PrevDocCollection.insert({ _id: 'prev_doc_1', value: 'oldValue' }); - PrevDocCollection.update({ _id: 'prev_doc_1' }, { $set: { value: 'newValue' } }); - PrevDocCollection.remove({ _id: 'prev_doc_1' }); + PrevDocCollection.insert({ _id: `${random}`, value: 'oldValue' }); + PrevDocCollection.update({ _id: `${random}` }, { $set: { value: 'newValue' } }); + PrevDocCollection.remove({ _id: `${random}` }); }); it('Should receive an insert event without prev doc', async function (done) { diff --git a/testing/main.client.js b/testing/main.client.js index 49c1e4de..a63f45df 100644 --- a/testing/main.client.js +++ b/testing/main.client.js @@ -1155,7 +1155,7 @@ _.each(Collections, (Collection, key) => { }); it('Should work when updating deep array when it is specified as a field', async function(done) { - const context = 'deep-array-objects'; + const context = `deep-array-objects-${Random.id()}`; let handle = subscribe( { context }, @@ -1168,6 +1168,7 @@ _.each(Collections, (Collection, key) => { ); await waitForHandleToBeReady(handle); + const cursor = Collection.find({ context }); const observer = cursor.observeChanges({ diff --git a/testing/synthetic_mutators.js b/testing/synthetic_mutators.js index 56a3057e..7cc1a49c 100644 --- a/testing/synthetic_mutators.js +++ b/testing/synthetic_mutators.js @@ -13,6 +13,10 @@ _.each(Collections, (Collection, key) => { waitForHandleToBeReady, } = helperGenerator(config[key].suffix); + if (config[key].disableSyntheticTests) { + return; + } + describe('It should work with synthetic mutators: ' + key, function() { it('Should work with insert', async function(done) { let handle = subscribe({ From f55a8ddb59b22e1ee01fe04d5b9118a6a4f5a931 Mon Sep 17 00:00:00 2001 From: Theodor Diaconu Date: Fri, 4 Oct 2019 10:02:38 +0300 Subject: [PATCH 2/3] Bump to 2.0.4 --- package.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.js b/package.js index eb4944aa..d5d932bc 100644 --- a/package.js +++ b/package.js @@ -1,6 +1,6 @@ Package.describe({ name: 'cultofcoders:redis-oplog', - version: '2.0.3', + version: '2.0.4', // Brief, one-line summary of the package. summary: "Replacement for Meteor's MongoDB oplog implementation", // URL to the Git repository containing the source code for this package. From 1bdb5f46c9db45e23e21101a0f25ff8faba709b6 Mon Sep 17 00:00:00 2001 From: Theodor Diaconu Date: Fri, 4 Oct 2019 10:12:14 +0300 Subject: [PATCH 3/3] Updated travis --- .travis.yml | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/.travis.yml b/.travis.yml index 45ca8401..abb0135e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,13 @@ sudo: required -services: - - redis-server - addons: chrome: stable +dist: xenial + +services: + - xvfb + - redis-server + language: node_js node_js: @@ -23,12 +26,10 @@ notifications: email: false before_script: - - "export DISPLAY=:99.0" - - "sh -e /etc/init.d/xvfb start" - sleep 3 script: - meteor create --release 1.8.1-rc.1 --bare test - cd test - - meteor npm i --save selenium-webdriver@3.6.0 chromedriver@2.36.0 simpl-schema + - meteor npm i --save selenium-webdriver@3.6.0 chromedriver@2.36.0 simpl-schema chai - METEOR_PACKAGE_DIRS="../" TEST_BROWSER_DRIVER=chrome meteor test-packages --once --driver-package meteortesting:mocha ../ \ No newline at end of file