Skip to content

Commit

Permalink
Merge pull request #340 from cult-of-coders/feature/extreme-speed
Browse files Browse the repository at this point in the history
[R] Opt-out of race condition protection
  • Loading branch information
theodorDiaconu authored Oct 4, 2019
2 parents 100e59c + 1bdb5f4 commit 29d89fa
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 46 deletions.
13 changes: 7 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
sudo: required
services:
- redis-server

addons:
chrome: stable

dist: xenial

services:
- xvfb
- redis-server

language: node_js

node_js:
Expand All @@ -23,12 +26,10 @@ notifications:
email: false

before_script:
- "export DISPLAY=:99.0"
- "sh -e /etc/init.d/xvfb start"
- sleep 3

script:
- meteor create --release 1.8.1-rc.1 --bare test
- cd test
- meteor npm i --save [email protected] [email protected] simpl-schema
- meteor npm i --save [email protected] [email protected] simpl-schema chai
- METEOR_PACKAGE_DIRS="../" TEST_BROWSER_DRIVER=chrome meteor test-packages --once --driver-package meteortesting:mocha ../
7 changes: 6 additions & 1 deletion docs/finetuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,14 @@ Tasks.configureRedisOplog({
namespace: `company::${companyId}`
});
},

// Optional boolean that determines whether you would like to include
// the entire previous document in your redis events
shouldIncludePrevDocument: true,
shouldIncludePrevDocument: false,

// If you set this to false, will offers you extreme speed when you have a lot of listeners, or when your listeners use a slave mongo database
// If may also have negative impact on performance if you have very large documents
protectAgainstRaceConditions: true,
})
```

Expand Down
117 changes: 90 additions & 27 deletions lib/mongo/Mutator.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,35 @@
import getMutationConfig from './lib/getMutationConfig';
import getFields from '../utils/getFields';
import getMutationConfig from "./lib/getMutationConfig";
import getFields from "../utils/getFields";
import {
dispatchInsert,
dispatchUpdate,
dispatchRemove,
} from './lib/dispatchers';
import Config from '../config';
import { Events } from '../constants';
dispatchRemove
} from "./lib/dispatchers";
import Config from "../config";
import { Events } from "../constants";

function runCallbackInBackground(fn) {
Meteor.defer(Meteor.bindEnvironment(fn));
}

function protectAgainstRaceConditions(collection) {
if (!collection._redisOplog) {
return true;
}

return (
collection._redisOplog &&
collection._redisOplog.protectAgainstRaceConditions
);
}

function shouldIncludePrevDocument(collection) {
return (
collection._redisOplog &&
collection._redisOplog.shouldIncludePrevDocument
);
}

/**
* The Mutator is the interface that does the required updates
*/
Expand All @@ -22,8 +40,8 @@ export default class Mutator {
// regardless of your choice, these 2 packages must passConfigDown
// we do like this until we find a more elegant way
if (
Package['aldeed:collection2'] !== undefined ||
Package['aldeed:collection2-core'] !== undefined
Package["aldeed:collection2"] !== undefined ||
Package["aldeed:collection2-core"] !== undefined
) {
Mutator.passConfigDown = true;
}
Expand All @@ -32,11 +50,15 @@ export default class Mutator {
static insert(Originals, data, _config) {
const config = getMutationConfig(this, _config, {
doc: data,
event: Events.INSERT,
event: Events.INSERT
});

if (canUseOriginalMethod(config)) {
return Originals.insert.call(this, data, _.isFunction(_config) ? _config : undefined);
return Originals.insert.call(
this,
data,
_.isFunction(_config) ? _config : undefined
);
}

try {
Expand All @@ -50,11 +72,17 @@ export default class Mutator {
});
}

let doc = { _id: docId };

if (!protectAgainstRaceConditions(this)) {
doc = Originals.findOne.call(this, docId);
}

dispatchInsert(
config.optimistic,
this._name,
config._channels,
docId
doc
);

return docId;
Expand Down Expand Up @@ -90,11 +118,17 @@ export default class Mutator {
const config = getMutationConfig(this, _config, {
event: Events.UPDATE,
selector,
modifier,
modifier
});

if (canUseOriginalMethod(config)) {
return Originals.update.call(this, selector, modifier, _config, callback);
return Originals.update.call(
this,
selector,
modifier,
_config,
callback
);
}

// searching the elements that will get updated by id
Expand All @@ -103,11 +137,13 @@ export default class Mutator {
findOptions.limit = 1;
}

const shouldIncludePrevDocument = this._redisOplog && this._redisOplog.shouldIncludePrevDocument;
if (shouldIncludePrevDocument) {
delete findOptions.fields;
let docs;
if (shouldIncludePrevDocument(this)) {
docs = this.find(selector, { ...findOptions, fields: {} }).fetch();
} else {
docs = this.find(selector, findOptions).fetch();
}
const docs = this.find(selector, findOptions).fetch();

let docIds = docs.map(doc => doc._id);

if (config && config.upsert) {
Expand All @@ -128,7 +164,7 @@ export default class Mutator {
// and we extend the selector, because if between finding the docIds and updating
// another matching insert sneaked in, it's update will not be pushed
const updateSelector = _.extend({}, selector, {
_id: { $in: docIds },
_id: { $in: docIds }
});

try {
Expand All @@ -147,14 +183,24 @@ export default class Mutator {
});
}

if (!protectAgainstRaceConditions(this)) {
docs = this.find(
{ _id: { $in: docIds } },
{
...findOptions,
fields: {}
}
).fetch();
}

const { fields } = getFields(modifier);

dispatchUpdate(
config.optimistic,
this._name,
config._channels,
docs,
fields,
fields
);

return result;
Expand Down Expand Up @@ -204,11 +250,19 @@ export default class Mutator {

if (config.pushToRedis) {
if (data.insertedId) {
doc = {
_id: data.insertedId
};

if (!protectAgainstRaceConditions(this)) {
doc = this.findOne(doc._id);
}

dispatchInsert(
config.optimistic,
this._name,
config._channels,
data.insertedId
doc
);
} else {
// it means that we ran an upsert thinking there will be no docs
Expand All @@ -222,10 +276,13 @@ export default class Mutator {

// or a new document was added/modified to match selector before the actual update
console.warn(
'RedisOplog - Warning - A race condition occurred when running upsert.'
"RedisOplog - Warning - A race condition occurred when running upsert."
);
} else {
const { fields } = getFields(modifier);

docs = this.find(selector).fetch();

dispatchUpdate(
config.optimistic,
this._name,
Expand Down Expand Up @@ -261,21 +318,24 @@ export default class Mutator {

const config = getMutationConfig(this, _config, {
selector,
event: Events.REMOVE,
event: Events.REMOVE
});

if (canUseOriginalMethod(config)) {
return Originals.remove.call(this, selector, _.isFunction(_config) ? _config : undefined);
return Originals.remove.call(
this,
selector,
_.isFunction(_config) ? _config : undefined
);
}

const removeSelector = _.extend({}, selector);
const removeOptions = {
fields: { _id: 1 },
transform: null,
transform: null
};

const shouldIncludePrevDocument = this._redisOplog && this._redisOplog.shouldIncludePrevDocument;
if (shouldIncludePrevDocument) {
if (shouldIncludePrevDocument(this)) {
delete removeOptions.fields;
}

Expand Down Expand Up @@ -329,5 +389,8 @@ function canUseOriginalMethod(mutationConfig) {
// figure out what to publish to redis, and this update doesn't need
// optimistic-ui processing, so we don't need to synchronously run
// observers.
return !mutationConfig.pushToRedis || (Config.externalRedisPublisher && !mutationConfig.optimistic);
return (
!mutationConfig.pushToRedis ||
(Config.externalRedisPublisher && !mutationConfig.optimistic)
);
}
12 changes: 8 additions & 4 deletions lib/mongo/extendMongoCollection.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export default () => {
update: Mongo.Collection.prototype.update,
remove: Mongo.Collection.prototype.remove,
find: Mongo.Collection.prototype.find,
findOne: Mongo.Collection.prototype.findOne,
};

Mutator.init();
Expand Down Expand Up @@ -66,8 +67,13 @@ export default () => {
* @param {function} cursor
* @param {boolean} shouldIncludePrevDocument
*/
configureRedisOplog({ mutation, cursor, shouldIncludePrevDocument = false }) {
this._redisOplog = {};
configureRedisOplog({ mutation, cursor, ...rest }) {
this._redisOplog = {
shouldIncludePrevDocument: false,
protectAgainstRaceConditions: true,
...rest
};

if (mutation) {
if (!_.isFunction(mutation)) {
throw new Meteor.Error(
Expand All @@ -86,8 +92,6 @@ export default () => {

this._redisOplog.cursor = cursor;
}

this._redisOplog.shouldIncludePrevDocument = shouldIncludePrevDocument;
},
});
};
4 changes: 2 additions & 2 deletions lib/mongo/lib/dispatchers.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ const dispatchRemove = function(optimistic, collectionName, channels, docs) {
dispatchEvents(optimistic, collectionName, channels, events);
};

const dispatchInsert = function(optimistic, collectionName, channels, docId) {
const dispatchInsert = function(optimistic, collectionName, channels, doc) {
const uid = optimistic ? RedisSubscriptionManager.uid : null;

const event = {
[RedisPipe.EVENT]: Events.INSERT,
[RedisPipe.DOC]: { _id: docId },
[RedisPipe.DOC]: doc,
[RedisPipe.UID]: uid,
};

Expand Down
7 changes: 7 additions & 0 deletions lib/redis/RedisSubscriptionManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ class RedisSubscriptionManager {
const event = data[RedisPipe.EVENT];
let doc = data[RedisPipe.DOC];

if (collection._redisOplog && !collection._redisOplog.protectAgainstRaceConditions) {
// If there's no protection against race conditions
// It means we have received the full doc in doc

return doc;
}

const fieldsOfInterest = getFieldsOfInterestFromAll(subscribers);

if (fieldsOfInterest === true) {
Expand Down
2 changes: 1 addition & 1 deletion package.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package.describe({
name: 'cultofcoders:redis-oplog',
version: '2.0.3',
version: '2.0.4',
// Brief, one-line summary of the package.
summary: "Replacement for Meteor's MongoDB oplog implementation",
// URL to the Git repository containing the source code for this package.
Expand Down
14 changes: 13 additions & 1 deletion testing/boot.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,25 @@ const Standard = new Mongo.Collection('test_redis_collection');
const Channel = new Mongo.Collection('test_redis_collection_channel');
const Namespace = new Mongo.Collection('test_redis_collection_namespace');

const Collections = { Standard, Channel, Namespace };
const RaceConditionProne = new Mongo.Collection('test_redis_race_condition_prone');
if (Meteor.isServer) {
RaceConditionProne.configureRedisOplog({
protectAgainstRaceConditions: false
});
}

const Collections = { Standard, Channel, Namespace, RaceConditionProne };
const opts = {
Standard: {},
RaceConditionProne: {},
Channel: { channel: 'some_channel' },
Namespace: { namespace: 'some_namespace' },
};
const config = {
RaceConditionProne: {
suffix: 'race-condition-prone',
disableSyntheticTests: true,
},
Standard: { suffix: 'standard', channel: 'test_redis_collection' },
Channel: { suffix: 'channeled', channel: 'some_channel' },
Namespace: {
Expand Down
8 changes: 5 additions & 3 deletions testing/include_prev_doc.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ describe('PrevDocCollection Serverside', function () {
}
});

const random = Random.id()

// trigger insert update and removed redis events
PrevDocCollection.insert({ _id: 'prev_doc_1', value: 'oldValue' });
PrevDocCollection.update({ _id: 'prev_doc_1' }, { $set: { value: 'newValue' } });
PrevDocCollection.remove({ _id: 'prev_doc_1' });
PrevDocCollection.insert({ _id: `${random}`, value: 'oldValue' });
PrevDocCollection.update({ _id: `${random}` }, { $set: { value: 'newValue' } });
PrevDocCollection.remove({ _id: `${random}` });
});

it('Should receive an insert event without prev doc', async function (done) {
Expand Down
Loading

0 comments on commit 29d89fa

Please sign in to comment.