From 7ef2ee4a6be34772324f8fc4c8f077bd553e3866 Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Fri, 9 Feb 2018 12:12:40 -0800 Subject: [PATCH 1/5] Adds flow, modernizes Auth component --- .babelrc | 3 +- .flowconfig | 12 ++++ package.json | 9 ++- src/simperium/auth.js | 119 ++++++++++++++++++++++-------------- src/simperium/index.js | 3 +- src/simperium/user.js | 9 --- test/simperium/auth_test.js | 54 ++++++++-------- 7 files changed, 120 insertions(+), 89 deletions(-) create mode 100644 .flowconfig delete mode 100644 src/simperium/user.js diff --git a/.babelrc b/.babelrc index 8a87b4d..9121666 100644 --- a/.babelrc +++ b/.babelrc @@ -1,3 +1,4 @@ { - "presets": ["es2015"] + "presets": ["es2015", "flow"], + "plugins": [ "transform-object-rest-spread" ] } \ No newline at end of file diff --git a/.flowconfig b/.flowconfig new file mode 100644 index 0000000..a3067bf --- /dev/null +++ b/.flowconfig @@ -0,0 +1,12 @@ +[ignore] + +[include] +src/ + +[libs] + +[lints] + +[options] + +[strict] diff --git a/package.json b/package.json index 7e3e991..5e44392 100644 --- a/package.json +++ b/package.json @@ -8,6 +8,7 @@ "url": "git://github.com/Simperium/node-simperium.git" }, "scripts": { + "flow": "flow", "test": "mocha --compilers js:babel-core/register --require test/helper test/**", "prepublish": "babel -q -d lib/ src/" }, @@ -21,8 +22,12 @@ "babel-cli": "^6.2.0", "babel-core": "^6.2.0", "babel-eslint": "^8.2.1", - "babel-preset-es2015": "^6.2.0", + "babel-plugin-transform-object-rest-spread": "^6.26.0", + "babel-preset-es2015": "^6.24.1", + "babel-preset-flow": "^6.23.0", "eslint": "^4.17.0", - "mocha": "^2.3.4" + "flow": "^0.2.3", + "flow-bin": "^0.65.0", + "mocha": "^2.5.3" } } diff --git a/src/simperium/auth.js b/src/simperium/auth.js index a61acee..5fd472e 100644 --- a/src/simperium/auth.js +++ b/src/simperium/auth.js @@ -1,67 +1,94 @@ -import { EventEmitter } from 'events' -import User from './user' -import { format, inherits } from 'util' +// @flow +import events from 'events' import https from 'https' import url from 'url' +// @flow +type User = {}; + +const fromJSON = ( json: string ) => { + const data = JSON.parse( json ); + return { + options: data, + access_token: data.access_token + }; +}; + +const { EventEmitter } = events; + const URL = 'https://auth.simperium.com/1'; -export default function Auth( appId, appSecret ) { - this.appId = appId; - this.appSecret = appSecret; +export class AuthError extends Error { + underlyingError: Error + + constructor( underlyingError: Error ) { + super( 'Failed to authenticate user.' ); + this.underlyingError = underlyingError; + } } -inherits( Auth, EventEmitter ); +export class Auth extends EventEmitter { + appId: string + appSecret: string -Auth.prototype.authorize = function( username, password ) { - var body = JSON.stringify( { username: username, password: password } ), - promise = this.request( 'authorize/', body ); + constructor( appId: string, appSecret: string ) { + super(); + this.appId = appId; + this.appSecret = appSecret; + } - return promise; -} + authorize( username: string, password: string ) { + const body = JSON.stringify( { username: username, password: password } ); + return this.request( 'authorize/', body ); + } -Auth.prototype.create = function( username, password, provider ) { - var userData = { username, password }; - if ( provider ) { - userData.provider = provider; + create( username: String, password: String, provider: ?String ) { + const userData: { username: String, password: String, provider?: String } = { username, password }; + if ( provider ) { + userData.provider = provider; + } + const body = JSON.stringify( userData ); + return this.request( 'create/', body ); } - var body = JSON.stringify( userData ), - promise = this.request( 'create/', body ); - return promise; -} + getUrlOptions( path: string ) { + const options = url.parse( `${URL}/${ this.appId }/${ path}` ); + return { + ... options, + method: 'POST', + headers: {'X-Simperium-API-Key': this.appSecret } + }; + } -Auth.prototype.getUrlOptions = function( path ) { - const options = url.parse( format( '%s/%s/%s', URL, this.appId, path ) ); - return Object.assign( options, { method: 'POST', headers: {'X-Simperium-API-Key': this.appSecret}} ); -} + request( endpoint: string, body: string ): Promise { + return new Promise( ( resolve, reject ) => { + const req = https.request( this.getUrlOptions( endpoint ), ( res ) => { + let responseData = ''; -Auth.prototype.request = function( endpoint, body ) { - return new Promise( ( resolve, reject ) => { - const req = https.request( this.getUrlOptions( endpoint ), ( res ) => { - var responseData = ''; + res.on( 'data', ( data ) => { + responseData += data.toString(); + } ); - res.on( 'data', ( data ) => { - responseData += data.toString(); + res.on( 'end', () => { + try { + const user = fromJSON( responseData ); + resolve( user ); + this.emit( 'authorize', user ); + } catch ( error ) { + return reject( new AuthError( error ) ); + } + } ); } ); - res.on( 'end', () => { - var user; - - try { - user = User.fromJSON( responseData ); - } catch ( e ) { - return reject( new Error( responseData ) ); - } - this.emit( 'authorize', user ); - resolve( user ); + req.on( 'error', ( e ) => { + reject( e ); } ); - } ); - req.on( 'error', ( e ) => { - reject( e ); + req.end( body ); } ); + } +}; - req.end( body ); - } ); -} +export default ( appId: string, appSecret: string ) => { + return new Auth( appId, appSecret ); +}; diff --git a/src/simperium/index.js b/src/simperium/index.js index 7ceba4b..1b66006 100644 --- a/src/simperium/index.js +++ b/src/simperium/index.js @@ -1,4 +1,3 @@ -import User from './user' import Client from './client' import Auth from './auth' import * as util from './util' @@ -7,4 +6,4 @@ export default function( appId, token, options ) { return new Client( appId, token, options ); } -export { Auth, User, Client, util } +export { Auth, Client, util } diff --git a/src/simperium/user.js b/src/simperium/user.js deleted file mode 100644 index 7e94d88..0000000 --- a/src/simperium/user.js +++ /dev/null @@ -1,9 +0,0 @@ -export default function User( options ) { - this.options = options; - this.access_token = options.access_token; -} - -User.fromJSON = function( json ) { - var data = JSON.parse( json ); - return new User( data ); -} diff --git a/test/simperium/auth_test.js b/test/simperium/auth_test.js index 6d9b683..d820fae 100644 --- a/test/simperium/auth_test.js +++ b/test/simperium/auth_test.js @@ -1,4 +1,4 @@ -import Auth from '../../src/simperium/auth' +import buildAuth from '../../src/simperium/auth' import https from 'https' import { equal, deepEqual } from 'assert' import { EventEmitter } from 'events' @@ -19,11 +19,11 @@ const stubResponse = ( data ) => stub( ( body, handler ) => { } ) describe( 'Auth', () => { - var auth + let auth; beforeEach( () => { - auth = new Auth( 'token', 'secret' ); - } ) + auth = buildAuth( 'token', 'secret' ); + } ); it( 'getUrlOptions', () => { const { hostname, headers, pathname, method } = auth.getUrlOptions( 'path' ) @@ -33,7 +33,7 @@ describe( 'Auth', () => { deepEqual( headers, { 'X-Simperium-API-Key': 'secret' } ) } ) - it( 'should request auth token', ( done ) => { + it( 'should request auth token', () => { stub( ( data, handler ) => { const { username, password } = JSON.parse( data ) const response = new EventEmitter() @@ -45,24 +45,22 @@ describe( 'Auth', () => { response.emit( 'end' ); } ) - auth.authorize( 'username', 'password' ) - .then( ( user ) => { - equal( user.access_token, 'secret-token' ) - done() - } ) - } ) + return auth.authorize( 'username', 'password' ) + .then( ( user ) => { + equal( user.access_token, 'secret-token' ); + } ); + } ); - it( 'should fail to auth with invalid credentials', ( done ) => { + it( 'should fail to auth with invalid credentials', () => { stubResponse( 'this is not json' ) - auth.authorize( 'username', 'bad-password' ) - .catch( ( e ) => { - equal( e.message, 'this is not json' ) - done() - } ) + return auth.authorize( 'username', 'bad-password' ) + .catch( ( e ) => { + equal( e.message, 'Failed to authenticate user.' ); + } ); } ) - it( 'should create an account with valid credentials', ( done ) => { + it( 'should create an account with valid credentials', () => { stub( ( data, handler ) => { const { username, password } = JSON.parse( data ) const response = new EventEmitter() @@ -74,20 +72,18 @@ describe( 'Auth', () => { response.emit( 'end' ); } ) - auth.create( 'username', 'password' ) - .then( ( user ) => { - equal( user.access_token, 'secret-token' ) - done() - } ) + return auth.create( 'username', 'password' ) + .then( user => { + equal( user.access_token, 'secret-token' ) + } ); } ) - it( 'should fail to create an account with invalid credentials', ( done ) => { + it( 'should fail to create an account with invalid credentials', () => { stubResponse( 'this is not json' ) - auth.create( 'username', 'bad-password' ) - .catch( ( e ) => { - equal( e.message, 'this is not json' ) - done() - } ) + return auth.create( 'username', 'bad-password' ) + .catch( ( e ) => { + equal( e.message, 'Failed to authenticate user.' ); + } ); } ) } ) From 223bc03a1b43d1a75bac547faa824b3f3b22902e Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Fri, 9 Feb 2018 12:13:47 -0800 Subject: [PATCH 2/5] change import to just https import --- src/simperium/auth.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/simperium/auth.js b/src/simperium/auth.js index 5fd472e..f4d3171 100644 --- a/src/simperium/auth.js +++ b/src/simperium/auth.js @@ -1,6 +1,6 @@ // @flow import events from 'events' -import https from 'https' +import { request } from 'https' import url from 'url' // @flow @@ -62,7 +62,7 @@ export class Auth extends EventEmitter { request( endpoint: string, body: string ): Promise { return new Promise( ( resolve, reject ) => { - const req = https.request( this.getUrlOptions( endpoint ), ( res ) => { + const req = request( this.getUrlOptions( endpoint ), ( res ) => { let responseData = ''; res.on( 'data', ( data ) => { From 082ae408db887f6936cbfda366ac7d83ec5b51c8 Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Mon, 12 Feb 2018 11:18:28 -0800 Subject: [PATCH 3/5] Organizes utility classes to declutter Client module --- src/simperium/auth.js | 30 ++++++++++++++++--- src/simperium/heartbeat.js | 46 +++++++++++++++++++++++++++++ src/simperium/reconnection-timer.js | 41 +++++++++++++++++++++++++ test/simperium/auth_test.js | 9 ++++++ 4 files changed, 122 insertions(+), 4 deletions(-) create mode 100644 src/simperium/heartbeat.js create mode 100644 src/simperium/reconnection-timer.js diff --git a/src/simperium/auth.js b/src/simperium/auth.js index f4d3171..27991cb 100644 --- a/src/simperium/auth.js +++ b/src/simperium/auth.js @@ -4,13 +4,19 @@ import { request } from 'https' import url from 'url' // @flow -type User = {}; +type User = { + options: {}, + access_token: string, +}; -const fromJSON = ( json: string ) => { - const data = JSON.parse( json ); +const fromJSON = ( json: string ): User => { + const data: {} = JSON.parse( json ); + if ( ! data.access_token && typeof data.access_token !== 'string' ) { + throw new Error( 'access_token not present' ); + } return { options: data, - access_token: data.access_token + access_token: new String( data.access_token ).toString() }; }; @@ -27,16 +33,32 @@ export class AuthError extends Error { } } +/** + * Client for creating and authenticating Simperium.com user accounts. + */ export class Auth extends EventEmitter { appId: string appSecret: string + /** + * Creates an instance of the Auth client + * + * @param {string} appId - Simperium.com application ID + * @param {string} appSecret - Simperium.com application secret + */ constructor( appId: string, appSecret: string ) { super(); this.appId = appId; this.appSecret = appSecret; } + /** + * Authorizes a user account with username and password + * + * @param {string} username account username + * @param {string} password account password + * @returns {Promise} user account data + */ authorize( username: string, password: string ) { const body = JSON.stringify( { username: username, password: password } ); return this.request( 'authorize/', body ); diff --git a/src/simperium/heartbeat.js b/src/simperium/heartbeat.js new file mode 100644 index 0000000..57ce2f8 --- /dev/null +++ b/src/simperium/heartbeat.js @@ -0,0 +1,46 @@ +// @flow + +export default class Heartbeat { + count: number + seconds: number + timeout: TimeoutID + timer: TimeoutID + onBeat: number => void + onTimeout: () => void + + constructor( seconds: number, onBeat: number => void, onTimeout: () => void ) { + this.count = 0; + this.seconds = seconds; + this.onBeat = onBeat; + this.onTimeout = onTimeout; + } + + beat() { + this.count ++; + + this.timeout = setTimeout( this.onTimeout.bind( this ), this.seconds * 1000 * 2 ); + this.onBeat( this.count ); + } + + onTimeout() { + this.onTimeout(); + this.stop(); + } + + tick( count?: number ) { + if ( count && count > 0 ) { + this.count = count; + } + this.start(); + } + + start() { + this.stop(); + this.timer = setTimeout( this.beat.bind( this ), this.seconds * 1000 ); + } + + stop() { + clearTimeout( this.timer ); + clearTimeout( this.timeout ); + } +} \ No newline at end of file diff --git a/src/simperium/reconnection-timer.js b/src/simperium/reconnection-timer.js new file mode 100644 index 0000000..6295b66 --- /dev/null +++ b/src/simperium/reconnection-timer.js @@ -0,0 +1,41 @@ +export default class ReconnectionTimer { + started: boolean + interval: number => number; + timer: TimeoutID; + attempt: number; + onTripped: number => void; + + constructor( interval: number => number, onTripped: number => void ) { + this.started = false; + + this.interval = interval || ( () => 1000 ); + + this.onTripped = onTripped; + + this.reset(); + } + + onInterval() { + this.onTripped( this.attempt ); + this.attempt ++; + }; + + start() { + this.started = true; + this.timer = setTimeout( this.onInterval.bind( this ), this.interval( this.attempt ) ); + }; + + restart() { + this.reset(); + this.start(); + }; + + stop() { + this.attempt = 0; + this.started = false; + clearTimeout( this.timer ); + } + reset() { + this.stop(); + } +} \ No newline at end of file diff --git a/test/simperium/auth_test.js b/test/simperium/auth_test.js index d820fae..c1e3062 100644 --- a/test/simperium/auth_test.js +++ b/test/simperium/auth_test.js @@ -51,6 +51,15 @@ describe( 'Auth', () => { } ); } ); + it( 'should fail if missing access_token', () => { + stubResponse( '{"hello":"world"}' ); + return auth.authorize( 'username', 'password' ) + .catch( error => { + equal( error.message, 'Failed to authenticate user.' ); + equal( error.underlyingError.message, 'access_token not present' ); + } ); + } ); + it( 'should fail to auth with invalid credentials', () => { stubResponse( 'this is not json' ) From 9d14a1545ffd34d19f504351a322dcdbc03c97a4 Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Mon, 19 Feb 2018 10:20:50 -0800 Subject: [PATCH 4/5] Flow typed --- src/simperium/bucket.js | 558 +++++---- src/simperium/channel.js | 1242 ++++++++----------- src/simperium/client.js | 1 + src/simperium/ghost/default.js | 1 + src/simperium/ghost/store.js | 1 + src/simperium/index.js | 1 + src/simperium/jsondiff/index.js | 2 +- src/simperium/reconnection-timer.js | 1 + src/simperium/storage/default.js | 1 + src/simperium/util/change.js | 5 +- src/simperium/util/index.js | 1 + src/simperium/util/parse_message.js | 5 +- src/simperium/util/parse_version_message.js | 2 +- test/helper.js | 2 +- test/simperium/bucket_test.js | 4 +- test/simperium/channel_test.js | 37 +- test/simperium/mock_bucket_store.js | 60 +- 17 files changed, 941 insertions(+), 983 deletions(-) diff --git a/src/simperium/bucket.js b/src/simperium/bucket.js index 955c581..9904bab 100644 --- a/src/simperium/bucket.js +++ b/src/simperium/bucket.js @@ -1,22 +1,26 @@ -import { EventEmitter } from 'events' -import { inherits } from 'util' +// @flow +import events from 'events' import { v4 as uuid } from 'uuid'; -/** - * @callback taskCallback - * @param {?Error} - if an error occurred it will be provided, otherwise null - * @param {Any} - the result of task - */ +const { EventEmitter } = events; + +type TaskCallback = ( ?Error, ?T ) => void; /** * Convenience function to turn a function that uses a callback into a function * that returns a Promise. * - * @param {taskCallback} task - function that expects a single callback argument + * @param {TaskCallback} task - function that expects a single callback argument * @returns {Promise} callback wrapped in a promise interface */ -const callbackAsPromise = ( task ) => new Promise( ( resolve, reject ) => { - task( ( error, result ) => error ? reject( error ) : resolve( result ) ); +const callbackAsPromise = ( task: ( TaskCallback ) => void ): Promise => new Promise( ( resolve, reject ) => { + task( ( error: ?Error, result: ?T ) => { + if ( error ) { + reject( error ); + return; + } + resolve( result ); + } ); } ); /** @@ -27,16 +31,17 @@ const callbackAsPromise = ( task ) => new Promise( ( resolve, reject ) => { * @param {Promise} promise - promise to run, executes callback if provieded * @returns {Promise} promise is passed through */ -const deprecateCallback = ( callback, promise ) => { - if ( typeof callback === 'function' ) { +const deprecateCallback = ( callback: ?TaskCallback, promise: Promise ): Promise => { + if ( callback ) { + const perform = callback; // Potentially could warn here if we decide to remove callback API return promise.then( - result => { - callback( null, result ); + ( result ) => { + perform( null, result ); return result; }, - error => { - callback( error ); + ( error ) => { + perform( error, null ); return error; } ); @@ -44,6 +49,76 @@ const deprecateCallback = ( callback, promise ) => { return promise; }; +export type BucketObject = { + id: string, + data: {}, + isIndexing?: boolean +} + +export interface BucketStore { + /** + * Retrieve a bucket object from the store + * @function + * @name BucketStore#get + * @param {String} id - the bucket object id to fetch + * @param {bucketStoreGetCallback} - callback once the object is fetched + */ + get( id: string, callback: ( ?Error, ?BucketObject ) => void ): void; + + /** + * Updates the data for the given object id. + * + * @function + * @name BucketStore#update + * @param {String} id - to of object to update + * @param {Object} data - data to update the object to + * @param {Boolean} isIndexing - indicates the object is being downloaded during an index + * @param {bucketStoreGetCallback} + */ + update( id: string, data: {}, isIndexing: boolean, callback: ( ?Error, ?BucketObject ) => void ): void; + + /** + * Deletes the object at id from the datastore. + * + * @function + * @name BucketStore#remove + * @param {String} id - object to delete from the bucket + * @param {bucketStoreRemoveCallback} - called once the object is deleted + */ + remove( id: string, callback: ( ?Error ) => void ): void; + + /** + * Fetchs all bucket objects from the datastore. + * + * @function + * @name BucketStore#find + * @param {?Object} query - currently undefined + * @param {bucketStoreFindCallback} - called with results + */ + find( query: ?any, callback: ( ?Error, ?BucketObject[] ) => void ): void; +} + +type BucketStoreProvider = ( Bucket ) => BucketStore; + +export type BucketObjectRevision = { + id: string, + version: number, + data: {} +}; + +type RevisionList = BucketObjectRevision[]; + +interface Channel { + removeListener( eventName: string, listener: ?any ) : Channel; + on( eventName: string, listener: any ) : Channel; + reload(): void; + update( object: BucketObject, sync: boolean ): void; + remove( id: string ): void; + hasLocalChanges(): Promise; + getVersion( id: string ): Promise; + getRevisions( id: string ): Promise; +} + /** * A bucket object represents the data stored in Simperium for the given id * @@ -58,6 +133,7 @@ const deprecateCallback = ( callback, promise ) => { * @param {?Error} * @param {?BucketObject} */ +type bucketStoreGetCallback = ( ?Error, ?BucketObject ) => void /** * @callback bucketStoreRemoveCallback @@ -70,56 +146,19 @@ const deprecateCallback = ( callback, promise ) => { * @param {?BucketObject[]} */ -/** - * Used by a bucket to store bucket object data. - * - * @interface BucketStore - */ - -/** - * Retrieve a bucket object from the store - * @function - * @name BucketStore#get - * @param {String} id - the bucket object id to fetch - * @param {bucketStoreGetCallback} - callback once the object is fetched - */ - -/** - * Updates the data for the given object id. - * - * @function - * @name BucketStore#update - * @param {String} id - to of object to update - * @param {Object} data - data to update the object to - * @param {Boolean} isIndexing - indicates the object is being downloaded during an index - * @param {bucketStoreGetCallback} - */ - -/** - * Deletes the object at id from the datastore. - * - * @function - * @name BucketStore#remove - * @param {String} id - object to delete from the bucket - * @param {bucketStoreRemoveCallback} - called once the object is deleted - */ - -/** - * Fetchs all bucket objects from the datastore. - * - * @function - * @name BucketStore#find - * @param {?Object} query - currently undefined - * @param {bucketStoreFindCallback} - called with results - */ - + interface BucketStoreAPI { + get: ( id: string ) => Promise; + update: ( id: string, data: {}, isIndexing: boolean ) => Promise; + remove: ( id: string ) => Promise; + find: ( query: ?any ) => Promise; + } /** * Turns existing bucket storage provider callback api into a promise based API * * @param {BucketStore} store - a bucket storage object * @returns {Object} store api methods that use Promises instead of callbacks */ -const promiseAPI = store => ( { +const promiseAPI = ( store: BucketStore ): BucketStoreAPI => ( { get: id => callbackAsPromise( store.get.bind( store, id ) ), update: ( id, object, isIndexing ) => @@ -130,212 +169,245 @@ const promiseAPI = store => ( { callbackAsPromise( store.find.bind( store, query ) ) } ); -/** - * A bucket that syncs data with Simperium. - * - * @param {String} name - Simperium bucket name - * @param {bucketStoreProvider} storeProvider - a factory function that provides a bucket store - * @param {Channel} channel - a channel instance used for syncing Simperium data - */ -export default function Bucket( name, storeProvider, channel ) { - EventEmitter.call( this ); - this.name = name; - this.store = storeProvider( this ); - this.storeAPI = promiseAPI( this.store ); - this.isIndexing = false; +type updateOptions = { sync: boolean } | bucketStoreGetCallback + +export default class Bucket extends EventEmitter { + name: string; + store: BucketStore; + storeAPI: BucketStoreAPI; + isIndexing: boolean; + channel: Channel; + + onChannelIndex: any; + onChannelError: any; + onChannelUpdate: any; + onChannelIndexingStateChange: any; + onChannelRemove: any; /** - * Listeners for channel events that will be added to Channel instance + * A bucket that syncs data with Simperium. + * + * @param {String} name - Simperium bucket name + * @param {bucketStoreProvider} storeProvider - a factory function that provides a bucket store + * @param {Channel} channel - a channel instance used for syncing Simperium data */ - this.onChannelIndex = this.emit.bind( this, 'index' ); - this.onChannelError = this.emit.bind( this, 'error' ); - this.onChannelUpdate = ( id, data ) => { - this.update( id, data, { sync: false } ); - }; - - this.onChannelIndexingStateChange = ( isIndexing ) => { - this.isIndexing = isIndexing; - if ( isIndexing ) { - this.emit( 'indexing' ); + constructor( name: string, storeProvider: BucketStoreProvider, channel: Channel ) { + super(); + this.name = name; + this.store = storeProvider( this ); + this.storeAPI = promiseAPI( this.store ); + this.isIndexing = false; + + /** + * Listeners for channel events that will be added to Channel instance + */ + this.onChannelIndex = this.emit.bind( this, 'index' ); + this.onChannelError = this.emit.bind( this, 'changeError' ); + this.onChannelUpdate = ( id, data ) => { + this.update( id, data, { sync: false } ); + }; + + this.onChannelIndexingStateChange = ( isIndexing ) => { + this.isIndexing = isIndexing; + if ( isIndexing ) { + this.emit( 'indexing' ); + } } - }; - this.onChannelRemove = ( id ) => this.remove( id ); + this.onChannelRemove = ( id ) => this.remove( id ); - if ( channel ) { - this.setChannel( channel ); + if ( channel ) { + this.setChannel( channel ); + } } -} -inherits( Bucket, EventEmitter ); + // NOTE: for backwards compatibility, listeners for `error` will + // be subscribed to `changeError` + on( eventName: string, listener: Function ): Bucket { + if ( eventName === 'error' ) { + // TODO: deprecation warning + return super.on( 'changeError', listener ); + } + return super.on( eventName, listener ); + } -/** - * Sets the channel the Bucket will use to sync changes. - * - * This exists to allow the Client to provide a backwards compatible API. There - * is probably no reason to change the Channel once it's already set. - * - * @param {Channel} channel - channel instance to use for syncing - */ -Bucket.prototype.setChannel = function( channel ) { - if ( this.channel ) { - this.channel - .removeListener( 'index', this.onChannelIndex ) - .removeListener( 'error', this.onChannelError ) - .removeListener( 'update', this.onChannelUpdate ) - .removeListener( 'indexingStateChange', this.onChannelIndexingStateChange ) - .removeListener( 'remove', this.onChannelRemove ); + removeListener( eventName: string, listener: Function ): Bucket { + if ( eventName === 'error' ) { + // TODO: deprecation warning + return super.removeListener( 'changeError', listener); + } + return super.removeListener( eventName, listener ); } - this.channel = channel; - channel - // forward the index and error events from the channel - .on( 'index', this.onChannelIndex ) - .on( 'error', this.onChannelError ) - // when the channel updates or removes data, the bucket should apply - // the same updates - .on( 'update', this.onChannelUpdate ) - .on( 'indexingStateChange', this.onChannelIndexingStateChange ) - .on( 'remove', this.onChannelRemove ); -}; -/** - * Reloads all the data from the currently cached set of ghost data - */ -Bucket.prototype.reload = function() { - this.channel.reload(); -}; + /** + * Sets the channel the Bucket will use to sync changes. + * + * This exists to allow the Client to provide a backwards compatible API. There + * is probably no reason to change the Channel once it's already set. + * + * @param {Channel} channel - channel instance to use for syncing + */ + setChannel( channel: Channel ) { + const existing = this.channel; + if ( existing ) { + existing + .removeListener( 'index', this.onChannelIndex ) + .removeListener( 'changeError', this.onChannelError ) + .removeListener( 'update', this.onChannelUpdate ) + .removeListener( 'indexingStateChange', this.onChannelIndexingStateChange ) + .removeListener( 'remove', this.onChannelRemove ); + } + this.channel = channel; + channel + // forward the index and error events from the channel + .on( 'index', this.onChannelIndex ) + .on( 'changeError', this.onChannelError ) + // when the channel updates or removes data, the bucket should apply + // the same updates + .on( 'update', this.onChannelUpdate ) + .on( 'indexingStateChange', this.onChannelIndexingStateChange ) + .on( 'remove', this.onChannelRemove ); + }; -/** - * Stores an object in the bucket and syncs it to simperium. Generates an - * object ID to represent the object in simperium. - * - * @param {Object} object - plain js object literal to be saved/synced - * @param {?bucketStoreGetCallback} callback - runs when object has been saved - * @return {Promise} data stored in the bucket - */ -Bucket.prototype.add = function( object, callback ) { - var id = uuid(); - return this.update( id, object, callback ); -}; + /** + * Reloads all the data from the currently cached set of ghost data + */ + reload() { + this.channel.reload(); + }; -/** - * Requests the object data stored in the bucket for the given id. - * - * @param {String} id - bucket object id - * @param {?bucketStoreGetCallback} callback - with the data stored in the bucket - * @return {Promise} the object id, data and indexing status - */ -Bucket.prototype.get = function( id, callback ) { - return deprecateCallback( callback, this.storeAPI.get( id ) ); -}; + /** + * Stores an object in the bucket and syncs it to simperium. Generates an + * object ID to represent the object in simperium. + * + * @param {Object} object - plain js object literal to be saved/synced + * @param {?bucketStoreGetCallback} callback - runs when object has been saved + * @return {Promise} data stored in the bucket + */ + add( object: {}, callback: ?bucketStoreGetCallback ) { + const id = uuid(); + return this.update( id, object, callback ); + }; -/** - * Update the bucket object of `id` with the given data. - * - * @param {String} id - the bucket id for the object to update - * @param {Object} data - object literal to replace the object data with - * @param {Object} [options] - optional settings - * @param {Boolean} [options.sync=true] - false if object should not be synced with this update - * @param {?bucketStoreGetCallback} callback - executed when object is updated localy - * @returns {Promise} - update data - */ -Bucket.prototype.update = function( id, data, options, callback ) { - if ( typeof options === 'function' ) { - callback = options; - options = { sync: true }; - } + /** + * Requests the object data stored in the bucket for the given id. + * + * @param {String} id - bucket object id + * @param {?bucketStoreGetCallback} callback - with the data stored in the bucket + * @return {Promise} the object id, data and indexing status + */ + get( id: string, callback: ?bucketStoreGetCallback ): Promise { + return deprecateCallback( callback, this.storeAPI.get( id ) ); + }; - if ( !! options === false ) { - options = { sync: true }; - } + /** + * Update the bucket object of `id` with the given data. + * + * @param {String} id - the bucket id for the object to update + * @param {Object} data - object literal to replace the object data with + * @param {Object} [options] - optional settings + * @param {Boolean} [options.sync=true] - false if object should not be synced with this update + * @param {?bucketStoreGetCallback} callback - executed when object is updated localy + * @returns {Promise} - update data + */ + update( id: string, data: {}, options: ?updateOptions, callback: ?bucketStoreGetCallback ): Promise { + if ( typeof options === 'function' ) { + callback = options; + options = { sync: true }; + } - const task = this.storeAPI.update( id, data, this.isIndexing ) - .then( bucketObject => { - this.emit( 'update', id, bucketObject.data ); - this.channel.update( bucketObject, options.sync ); - return bucketObject; - } ); - return deprecateCallback( callback, task ); -}; + if ( !! options === false ) { + options = { sync: true }; + } -/** - * @callback bucketHasLocalChanges - * @param {?Error} - * @param {?Boolean} - */ + const task = this.storeAPI.update( id, data, this.isIndexing ) + .then( bucketObject => { + this.emit( 'update', id, bucketObject.data ); + this.channel.update( bucketObject, options ? options.sync : true ); + return bucketObject; + } ); + return deprecateCallback( callback, task ); + }; -/** - * Check if the bucket has pending changes that have not yet been synced. - * - * @param {?bucketHasLocalChanges} callback - optional callback to receive response - * @returns {Promise} resolves to true if their are still changes to sync - */ -Bucket.prototype.hasLocalChanges = function( callback ) { - return deprecateCallback( callback, this.channel.hasLocalChanges() ); -}; + /** + * @callback bucketHasLocalChanges + * @param {?Error} + * @param {?Boolean} + */ -/** - * @callback bucketGetVersion - * @param {?Error} - * @param {Number} - */ + /** + * Check if the bucket has pending changes that have not yet been synced. + * + * @param {?bucketHasLocalChanges} callback - optional callback to receive response + * @returns {Promise} resolves to true if their are still changes to sync + */ + hasLocalChanges( callback: ?( ?Error, ?boolean ) => void ): Promise { + return deprecateCallback( callback, this.channel.hasLocalChanges() ); + }; -/** - * Gets the currently synced version number for the specified object id. - * - * A version of `0` indicates that an object has not been added to simperium yet. - * - * @param {String} id - object to get the version for - * @param {?bucketGetVersionCallback} callback - optional callback - * @returns {Promise} - resolves to the current synced version - */ -Bucket.prototype.getVersion = function( id, callback ) { - return deprecateCallback( callback, this.channel.getVersion( id ) ); -}; + /** + * @callback bucketGetVersion + * @param {?Error} + * @param {Number} + */ -/** - * Attempts to sync the object specified by `id` using whatever data - * is locally stored for the object - * - * @param {String} id - object to sync - * @param {?bucketStoreGetCallback} callback - optional callback - * @returns {Promise} - object id, data - */ -Bucket.prototype.touch = function( id, callback ) { - const task = this.storeAPI.get( id ) - .then( object => this.update( object.id, object.data ) ); + /** + * Gets the currently synced version number for the specified object id. + * + * A version of `0` indicates that an object has not been added to simperium yet. + * + * @param {String} id - object to get the version for + * @param {?bucketGetVersionCallback} callback - optional callback + * @returns {Promise} - resolves to the current synced version + */ + getVersion( id: string, callback: ?( ?Error, ?number ) => void ): Promise { + return deprecateCallback( callback, this.channel.getVersion( id ) ); + }; - return deprecateCallback( callback, task ); -}; + /** + * Attempts to sync the object specified by `id` using whatever data + * is locally stored for the object + * + * @param {String} id - object to sync + * @param {?bucketStoreGetCallback} callback - optional callback + * @returns {Promise} - object id, data + */ + touch( id: string, callback: ?bucketStoreGetCallback ): Promise { + const task = this.storeAPI.get( id ) + .then( object => this.update( object.id, object.data ) ); -/** - * Deletes the object from the bucket - * - * @param {String} id - object to delete - * @param {?bucketStoreRemoveCallback} callback - optional callback - * @returns {Promise} - resolves when object has been deleted - */ -Bucket.prototype.remove = function( id, callback ) { - const task = this.storeAPI.remove( id ) - .then( ( result ) => { - this.emit( 'remove', id ); - this.channel.remove( id ); - return result; - } ) - return deprecateCallback( callback, task ); -}; + return deprecateCallback( callback, task ); + }; -Bucket.prototype.find = function( query, callback ) { - return deprecateCallback( callback, this.storeAPI.find( query ) ); -}; + /** + * Deletes the object from the bucket + * + * @param {String} id - object to delete + * @param {?bucketStoreRemoveCallback} callback - optional callback + * @returns {Promise} - resolves when object has been deleted + */ + remove( id: string, callback: ?( ?Error, ?void ) => void ): Promise { + const task = this.storeAPI.remove( id ) + .then( ( result ) => { + this.emit( 'remove', id ); + this.channel.remove( id ); + return result; + } ) + return deprecateCallback( callback, task ); + }; -/** - * Gets all known past versions of an object - * - * @param {String} id - object to fetch revisions for - * @param {Function} [callback] - optional callback - * @returns {Promise>} - list of objects with id, data and version - */ -Bucket.prototype.getRevisions = function( id, callback ) { - return deprecateCallback( callback, this.channel.getRevisions( id ) ); + find( query: ?any, callback: ?( ?Error, ?BucketObject[] ) => void ): Promise { + return deprecateCallback( callback, this.storeAPI.find( query ) ); + }; + + /** + * Gets all known past versions of an object + * + * @param {String} id - object to fetch revisions for + * @param {Function} [callback] - optional callback + * @returns {Promise>} - list of objects with id, data and version + */ + getRevisions( id: string, callback: ?( ?Error, ?RevisionList ) => void ): Promise { + return deprecateCallback( callback, this.channel.getRevisions( id ) ); + } } diff --git a/src/simperium/channel.js b/src/simperium/channel.js index 5cb5403..e4f5522 100644 --- a/src/simperium/channel.js +++ b/src/simperium/channel.js @@ -1,9 +1,105 @@ +// @flow /*eslint no-shadow: 0*/ import { format, inherits } from 'util' -import { EventEmitter } from 'events' +import events from 'events' import { parseMessage, parseVersionMessage, change as change_util } from './util' import JSONDiff from './jsondiff' import { v4 as uuid } from 'uuid' +import type { BucketObjectRevision, BucketObject } from './bucket'; +import { LocalQueue, NetworkQueue } from './queues'; + +/** + * A ghost represents a version of a bucket object as known by Simperium + * + * Generally a client will keep the last known ghost stored locally for efficient + * diffing and patching of Simperium change operations. + * + * @typedef {Object} Ghost + * @property {Number} version - the ghost's version + * @property {String} key - the simperium bucket object id this ghost is for + * @property {Object} data - the data for the given ghost version + */ +type Ghost = { version: number, key: string, data: {} } + +/** + * Callback function used by the ghost store to iterate over existing ghosts + * + * @callback ghostIterator + * @param {Ghost} - the current ghost + */ + + +/** + * A GhostStore provides the store mechanism for ghost data that the Channel + * uses to maintain syncing state and producing change operations for + * Bucket objects. + * + * @interface GhostStore + */ +export interface GhostStore { + + /** + * Retrieve a Ghost for the given bucket object id + * + * @function + * @name GhostStore#get + * @param {String} id - bucket object id + * @returns {Promise} - the ghost for this object + */ + get( id: string ): Promise; + + /** + * Save a ghost in the store. + * + * @function + * @name GhostStore#put + * @param {String} id - bucket object id + * @param {Number} version - version of ghost data + * @param {Object} data - object literal to save as this ghost's data for this version + * @returns {Promise} - the ghost for this object + */ + put( id: string, version: number, data: {} ): Promise; + + /** + * Delete a Ghost from the store. + * + * @function + * @name GhostStore#remove + * @param {String} id - bucket object id + * @returns {Promise} - the ghost for this object + */ + remove( id: string ): Promise; + + /** + * Iterate over existing Ghost objects with the given callback. + * + * @function + * @name GhostStore#eachGhost + * @param {ghostIterator} - function to run against each ghost + */ + eachGhost( ghostIterator: Ghost => void ): void; + + /** + * Get the current change version (cv) that this channel has synced. + * + * @function + * @name GhostStore#getChangeVersion + * @returns {Promise} - the current change version for the bucket + */ + getChangeVersion(): Promise; + + /** + * Set the current change version. + * + * @function + * @name GhostStore#setChangeVersion + * @param {String} changeVersion - the new version no set + * @returns {Promise} - resolves once the change version is saved + */ + setChangeVersion( changeVersion: ?string ): Promise; +} + +const { EventEmitter } = events; const jsondiff = new JSONDiff( {list_diff: false} ); @@ -12,101 +108,99 @@ const CODE_INVALID_VERSION = 405; const CODE_EMPTY_RESPONSE = 412; const CODE_INVALID_DIFF = 440; -const operation = { +type Modify = 'M'; +type Remove = '-'; +type OperationType = Modify | Remove; + +const operation: { [string]: OperationType } = { MODIFY: 'M', REMOVE: '-' }; -// internal methods used as instance methods on a Channel instance -const internal = {}; +type Change = { id: string }; -/** - * Updates the currently known synced `cv`. - * - * @param {String} cv - the change version synced - * @returns {Promise} the saved `cv` - */ -internal.updateChangeVersion = function( cv ) { - return this.store.setChangeVersion( cv ).then( () => { - // A unit test currently relies on this event, otherwise we can remove it - this.emit( 'change-version', cv ); - return cv; - } ); +const updateAcknowledged = ( channel: Channel, change: Change ) => { + const id = change.id; + if ( channel.localQueue.sent[id] === change ) { + channel.localQueue.acknowledge( change ); + channel.emit( 'acknowledge', id, change ); + } }; -/** - * Called when receive a change from the network. Attempt to apply the change - * to the ghost object and notify. - * - * @param {String} id - id of the object changed - * @param {Object} change - the change to apply to the object - */ -internal.changeObject = function( id, change ) { - // pull out the object from the store and apply the change delta - var applyChange = internal.performChange.bind( this, change ); - - this.networkQueue.queueFor( id ).add( function( done ) { - return applyChange().then( done, done ); +const requestObjectVersion = ( channel: Channel, id: string, version: number ): Promise => { + return new Promise( resolve => { + channel.once( `version.${ id }.${ version }`, ( data: any ) => { + resolve( data ); + } ); + channel.send( `e:${ id }.${ version }` ); } ); }; -/** - * Creates a change operation for the object of `id` that changes - * from the date stored in the `ghost` into the data of `object`. - * - * Queues the change for syncing. - * - * @param {String} id - object id - * @param {Object} object - object literal of the data that the change should produce - * @param {Object} ghost - the ghost version used to produce the change object - */ -internal.buildModifyChange = function( id, object, ghost ) { - var payload = change_util.buildChange( change_util.type.MODIFY, id, object, ghost ), - empty = true, - key; +const handleChangeError = ( channel, err, change, acknowledged ) => { + switch ( err.code ) { + case CODE_INVALID_VERSION: + case CODE_INVALID_DIFF: // Invalid version or diff, send full object back to server + if ( ! change.hasSentFullObject ) { + channel.store.get( change.id ).then( object => { + change.d = object; + change.hasSentFullObject = true; + channel.localQueue.queue( change ); + } ); + } else { + channel.localQueue.dequeueChangesFor( change.id ); + } - for ( key in payload.v ) { - if ( key ) { - empty = false; break; - } - } - - if ( empty ) { - this.emit( 'unmodified', id, object, ghost ); - return; + case CODE_EMPTY_RESPONSE: // Change causes no change, just acknowledge it + updateAcknowledged( channel, acknowledged ); + break; + default: + channel.emit( 'changeError', err, change ); } - - // if the change v is an empty object, do not send, notify? - this.localQueue.queue( payload ); -}; +} /** - * Creates a change object that deletes an object from a bucket. - * - * Queues the change for syncing. + * Updates the currently known synced `cv`. * - * @param {String} id - object to remove - * @param {Object} ghost - current ghost object for the given id + * @param {Channel} channel - channel to perform operation on + * @param {String} cv - the change version synced + * @returns {Promise} the saved `cv` */ -internal.buildRemoveChange = function( id, ghost ) { - var payload = change_util.buildChange( change_util.type.REMOVE, id, {}, ghost ); - this.localQueue.queue( payload ); +const updateChangeVersion = ( channel: Channel, cv: ?string ) => { + return channel.store.setChangeVersion( cv ).then( () => { + // A unit test currently relies on this event, otherwise we can remove it + channel.emit( 'change-version', cv ); + return cv; + } ); }; -internal.diffAndSend = function( id, object ) { - var modify = internal.buildModifyChange.bind( this, id, object ); - return this.store.get( id ).then( modify ); +const findAcknowledgedChange = ( channel: Channel, change: Change ) => { + const possibleChange = channel.localQueue.sent[change.id]; + if ( possibleChange ) { + if ( ( change.ccids || [] ).indexOf( possibleChange.ccid ) > -1 ) { + return possibleChange; + } + } }; -internal.removeAndSend = function( id ) { - var remove = internal.buildRemoveChange.bind( this, id ); - return this.store.get( id ).then( remove ); +const removeObject = ( channel: Channel, id: string, acknowledged: Change ) => { + let notify; + if ( !acknowledged ) { + notify = () => { + channel.emit( 'remove', id ); + } + } else { + notify = () => { + updateAcknowledged( channel, acknowledged ); + } + } + + return channel.store.remove( id ).then( notify ); }; // We've receive a full object from the network. Update the local instance and // notify of the new object version -internal.updateObjectVersion = function( id, version, data, original, patch, acknowledged ) { +const updateObjectVersion = ( channel: Channel, id: string, version: number, data: {}, original, patch, acknowledged ) => { var notify, changes, change, @@ -119,10 +213,11 @@ internal.updateObjectVersion = function( id, version, data, original, patch, ack // we need to provide a way for the current client to respond to // a potential conflict if it has modifications that have not been synced if ( !acknowledged ) { - changes = this.localQueue.dequeueChangesFor( id ); + changes = channel.localQueue.dequeueChangesFor( id ); localModifications = change_util.compressChanges( changes, original ); remoteModifications = patch; transformed = change_util.transform( localModifications, remoteModifications, original ); + console.log( 'shall we transform?', localModifications, remoteModifications, transformed ); update = data; // apply the transformed patch and emit the update @@ -131,62 +226,24 @@ internal.updateObjectVersion = function( id, version, data, original, patch, ack update = jsondiff.apply_object_diff( data, transformed ); // queue up the new change change = change_util.modify( id, version, patch ); - this.localQueue.queue( change ); + console.log( 'new transformed change to queue', change ); + channel.localQueue.queue( change ); } - notify = this.emit.bind( this, 'update', id, update, original, patch, this.isIndexing ); - } else { - notify = internal.updateAcknowledged.bind( this, acknowledged ); - } - - return this.store.put( id, version, data ).then( notify ); -}; - -internal.removeObject = function( id, acknowledged ) { - var notify; - if ( !acknowledged ) { - notify = this.emit.bind( this, 'remove', id ); + notify = () => { + channel.emit( 'update', id, update, original, patch, channel.isIndexing ); + } } else { - notify = internal.updateAcknowledged.bind( this, acknowledged ); - } - - return this.store.remove( id ).then( notify ); -}; - -internal.updateAcknowledged = function( change ) { - var id = change.id; - if ( this.localQueue.sent[id] === change ) { - this.localQueue.acknowledge( change ); - this.emit( 'acknowledge', id, change ); - } -}; - -internal.performChange = function( change ) { - var success = internal.applyChange.bind( this, change ); - return this.store.get( change.id ).then( success ); -}; - -internal.findAcknowledgedChange = function( change ) { - var possibleChange = this.localQueue.sent[change.id]; - if ( possibleChange ) { - if ( ( change.ccids || [] ).indexOf( possibleChange.ccid ) > -1 ) { - return possibleChange; + notify = () => { + updateAcknowledged( channel, acknowledged ); } } -}; -internal.requestObjectVersion = function( id, version ) { - return new Promise( resolve => { - this.once( `version.${ id }.${ version }`, data => { - resolve( data ); - } ); - this.send( `e:${ id }.${ version }` ); - } ); + return channel.store.put( id, version, data ).then( notify ); }; -internal.applyChange = function( change, ghost ) { - const acknowledged = internal.findAcknowledgedChange.bind( this )( change ), - updateChangeVersion = internal.updateChangeVersion.bind( this, change.cv ); +const applyChange = ( channel: Channel, change: Change, ghost: Ghost ) => { + const acknowledged = findAcknowledgedChange( channel, change ); let error, original, @@ -204,14 +261,14 @@ internal.applyChange = function( change, ghost ) { error.code = change.error; error.change = change; error.ghost = ghost; - internal.handleChangeError.call( this, error, change, acknowledged ); + handleChangeError( channel, error, change, acknowledged ); return; } if ( change.o === operation.MODIFY ) { if ( ghost && ( ghost.version !== change.sv ) ) { - internal.requestObjectVersion.call( this, change.id, change.sv ).then( data => { - internal.applyChange.call( this, change, { version: change.sv, data } ) + requestObjectVersion( channel, change.id, change.sv ).then( data => { + applyChange( channel, change, { version: change.sv, data } ) } ); return; } @@ -219,642 +276,443 @@ internal.applyChange = function( change, ghost ) { original = ghost.data; patch = change.v; modified = jsondiff.apply_object_diff( original, patch ); - return internal.updateObjectVersion.call( this, change.id, change.ev, modified, original, patch, acknowledged ) - .then( updateChangeVersion ); + return updateObjectVersion( channel, change.id, change.ev, modified, original, patch, acknowledged ) + .then( () => { + return updateChangeVersion( channel, change.cv ); + } ); } else if ( change.o === operation.REMOVE ) { - return internal.removeObject.bind( this )( change.id, acknowledged ).then( updateChangeVersion ); - } -} - -internal.handleChangeError = function( err, change, acknowledged ) { - switch ( err.code ) { - case CODE_INVALID_VERSION: - case CODE_INVALID_DIFF: // Invalid version or diff, send full object back to server - if ( ! change.hasSentFullObject ) { - this.store.get( change.id ).then( object => { - change.d = object; - change.hasSentFullObject = true; - this.localQueue.queue( change ); - } ); - } else { - this.localQueue.dequeueChangesFor( change.id ); - } - - break; - case CODE_EMPTY_RESPONSE: // Change causes no change, just acknowledge it - internal.updateAcknowledged.call( this, acknowledged ); - break; - default: - this.emit( 'error', err, change ); - } -} - -internal.indexingComplete = function() { - // Indexing has finished - this.setIsIndexing( false ); - - internal.updateChangeVersion.call( this, this.index_cv ) - .then( () => { - this.localQueue.start(); + return removeObject( channel, change.id, acknowledged ).then( () => { + return updateChangeVersion( channel, change.cv ); } ); - - this.emit( 'index', this.index_cv ); - - this.index_last_id = null; - this.index_cv = null; - this.emit( 'ready' ) + } } /** - * A ghost represents a version of a bucket object as known by Simperium - * - * Generally a client will keep the last known ghost stored locally for efficient - * diffing and patching of Simperium change operations. - * - * @typedef {Object} Ghost - * @property {Number} version - the ghost's version - * @property {String} key - the simperium bucket object id this ghost is for - * @property {Object} data - the data for the given ghost version - */ - -/** - * Callback function used by the ghost store to iterate over existing ghosts - * - * @callback ghostIterator - * @param {Ghost} - the current ghost - */ - -/** - * A GhostStore provides the store mechanism for ghost data that the Channel - * uses to maintain syncing state and producing change operations for - * Bucket objects. - * - * @interface GhostStore - */ - -/** - * Retrieve a Ghost for the given bucket object id - * - * @function - * @name GhostStore#get - * @param {String} id - bucket object id - * @returns {Promise} - the ghost for this object - */ - -/** - * Save a ghost in the store. - * - * @function - * @name GhostStore#put - * @param {String} id - bucket object id - * @param {Number} version - version of ghost data - * @param {Object} data - object literal to save as this ghost's data for this version - * @returns {Promise} - the ghost for this object - */ - -/** - * Delete a Ghost from the store. - * - * @function - * @name GhostStore#remove - * @param {String} id - bucket object id - * @returns {Promise} - the ghost for this object - */ - -/** - * Iterate over existing Ghost objects with the given callback. - * - * @function - * @name GhostStore#eachGhost - * @param {ghostIterator} - function to run against each ghost - */ - -/** - * Get the current change version (cv) that this channel has synced. - * - * @function - * @name GhostStore#getChangeVersion - * @returns {Promise} - the current change version for the bucket - */ - -/** - * Set the current change version. - * - * @function - * @name GhostStore#setChangeVersion - * @returns {Promise} - resolves once the change version is saved - */ - - -/** - * Maintains syncing state for a Simperium bucket. - * - * A bucket uses a channel to listen for updates that come from simperium while - * sending updates that are made on the client. - * - * The channel can handle incoming simperium commands via `handleMessage`. These - * messages are stripped of their channel number that separates bucket operations. - * The `Client` maintains which commands should be routed to which channel. - * - * The channel is responsible for creating all change operations and downloading - * bucket data. + * Called when receive a change from the network. Attempt to apply the change + * to the ghost object and notify. * - * @param {String} appid - Simperium app id, used for authenticating - * @param {String} access_token - Simperium user access token - * @param {GhostStore} store - data storage for ghost objects - * @param {String} name - the name of the bucket on Simperium.com + * @param {Channel} channel - channel to apply change + * @param {String} id - id of the object changed + * @param {Object} change - the change to apply to the object */ -export default function Channel( appid, access_token, store, name ) { - // Uses an event emitter to handle different Simperium commands - const message = this.message = new EventEmitter(); - - this.name = name; - this.isIndexing = false; - this.appid = appid; - this.store = store; - this.access_token = access_token; - - this.session_id = 'node-' + uuid(); - - // These are the simperium bucket commands the channel knows how to handle - message.on( 'auth', this.onAuth.bind( this ) ); - message.on( 'i', this.onIndex.bind( this ) ); - message.on( 'c', this.onChanges.bind( this ) ); - message.on( 'e', this.onVersion.bind( this ) ); - message.on( 'cv', this.onChangeVersion.bind( this ) ); - message.on( 'o', function() {} ); - - // Maintain a queue of operations that come from simperium commands - // so that the can be applied to the ghost data. - this.networkQueue = new NetworkQueue(); - // Maintain a queue of operations that originate from this client - // to track their status. - this.localQueue = new LocalQueue( this.store ); - - // When a local queue has indicatie that it should send a change operation - // emit a simperium command. The Client instance will know how to route that - // command correctly to simperium - this.localQueue.on( 'send', ( data ) => { - this.emit( 'send', `c:${ JSON.stringify( data ) }` ); +const changeObject = ( channel, id, change ) => { + channel.networkQueue.queueFor( id ).add( ( done ) => { + channel.store.get( id ) + .then( + ghost => { + applyChange( channel, change, ghost ); + done(); + }, + done + ); } ); - - // Handle change errors caused by changes originating from this client - this.localQueue.on( 'error', internal.handleChangeError.bind( this ) ); -} - -inherits( Channel, EventEmitter ); - -/** - * Called by a bucket when a bucket object has been updated. - * - * The channel uses this method to initiate change operations when objects are updated. - * - * It also uses this method during indexing to track which objects have been successfully - * downloaded. - * - * @param {BucketObject} object - the bucket object - * @param {Boolean} [sync=true] - if the object should be synced - */ -Channel.prototype.update = function( object, sync = true ) { - this.onBucketUpdate( object.id ); - if ( sync === true ) { - internal.diffAndSend.call( this, object.id, object.data ); - } }; /** - * Tracks indexing state and emits `indexingStateChange` - * - * @private - * @param {Boolean} isIndexing - updates indexing state to this value - */ -Channel.prototype.setIsIndexing = function( isIndexing ) { - this.isIndexing = isIndexing; - this.emit( 'indexingStateChange', this.isIndexing ); -} - -/** - * Removes an object from Simperium. Called by a bucket when an object is deleted. - * - * @param {String} id - the id of the object to remove - */ -Channel.prototype.remove = function( id ) { - internal.removeAndSend.call( this, id ) -} - -/** - * Retrieves revisions for a given object from Simperium. - * - * @typedef {Object} BucketObjectRevision - * @property {String} id - bucket object id - * @property {Number} version - revision version - * @property {Object} data - object literal data at given version + * Creates a change operation for the object of `id` that changes + * from the date stored in the `ghost` into the data of `object`. * - * @param {String} id - the bucket object id - * @returns {Promise>} list of known object versions - */ -Channel.prototype.getRevisions = function( id ) { - return new Promise( ( resolve, reject ) => { - collectionRevisions( this, id, ( error, revisions ) => { - if ( error ) { - reject( error ); - return; - } - resolve( revisions ); - } ); - } ); -} - -/** - * Checks if there are unsynced changes. + * Queues the change for syncing. * - * @returns {Promise} true if there are still changes to sync + * @param {Channel} channel - + * @param {String} id - object id + * @param {Object} object - object literal of the data that the change should produce + * @param {Object} ghost - the ghost version used to produce the change object */ -Channel.prototype.hasLocalChanges = function() { - return Promise.resolve( this.localQueue.hasChanges() ); -} +const buildModifyChange = ( channel, id, object, ghost ) => { + var payload = change_util.buildChange( change_util.type.MODIFY, id, object, ghost ), + empty = true, + key; -/** - * Retrieves the currently stored version number for a given object - * - * @param {String} id - object id to get the version for - * @returns {Promise} version number for the object - */ -Channel.prototype.getVersion = function( id ) { - return this.store.get( id ).then( ( ghost ) => { - if ( ghost && ghost.version ) { - return ghost.version; + for ( key in payload.v ) { + if ( key ) { + empty = false; + break; } - return 0; - } ); -} - -/** - * Receives incoming messages from Simperium - * - * Called by a client that strips the channel number prefix before - * seding to a specific channel. - * - * @param {String} data - the message from Simperium - */ -Channel.prototype.handleMessage = function( data ) { - var message = parseMessage( data ); - this.message.emit( message.command, message.data ); -}; - -/** - * Used to send a message from this channel to Simperium - * The client listens for `send` events and correctly sends them to Simperium - * - * @emits Channel#send - * @private - * @param {String} data - the message to send - */ -Channel.prototype.send = function( data ) { - /** - * Send event - * - * @event Channel#send - * @type {String} - the message to send to Simperium - */ - this.emit( 'send', data ); -}; - -/** - * Restores a buckets data to what is currently stored in the ghost data. - */ -Channel.prototype.reload = function() { - this.store.eachGhost( ghost => { - this.emit( 'update', ghost.key, ghost.data ); - } ); -}; - -/** - * Called after a bucket updates an object. - * - * Wile indexing keeps track of which objects have been retrieved. - * - * @param {String} id - object that was updated - */ -Channel.prototype.onBucketUpdate = function( id ) { - if ( ! this.isIndexing ) { - return; - } - if ( this.index_last_id == null || this.index_cv == null ) { - return; - } else if ( this.index_last_id === id ) { - internal.indexingComplete.call( this ); } -}; - -Channel.prototype.onAuth = function( data ) { - var auth; - var init; - try { - auth = JSON.parse( data ); - this.emit( 'unauthorized', auth ); - return; - } catch ( error ) { - // request cv and then send method - this.once( 'ready', () => { - this.localQueue.resendSentChanges(); - } ) - init = ( cv ) => { - if ( cv ) { - this.localQueue.start(); - this.sendChangeVersionRequest( cv ); - } else { - this.startIndexing(); - } - }; - - this.store.getChangeVersion().then( init ); + if ( empty ) { + channel.emit( 'unmodified', id, object, ghost ); return; } -}; -/** - * Re-downloads all Simperium bucket data - */ -Channel.prototype.startIndexing = function() { - this.localQueue.pause(); - this.setIsIndexing( true ); - this.sendIndexRequest(); + // if the change v is an empty object, do not send, notify? + channel.localQueue.queue( payload ); }; /** - * Called when a channel's socket has been connected + * Creates a change object that deletes an object from a bucket. + * + * Queues the change for syncing. + * + * @param {String} id - object to remove + * @param {Object} ghost - current ghost object for the given id */ -Channel.prototype.onConnect = function() { - var init = { - name: this.name, - clientid: this.session_id, - api: '1.1', - token: this.access_token, - app_id: this.appid, - library: 'node-simperium', - version: '0.0.1' - }; - - this.send( format( 'init:%s', JSON.stringify( init ) ) ); +const buildRemoveChange = ( channel, id, ghost ) => { + let payload = change_util.buildChange( change_util.type.REMOVE, id, {}, ghost ); + channel.localQueue.queue( payload ); }; -Channel.prototype.onIndex = function( data ) { - const page = JSON.parse( data ), - objects = page.index, - mark = page.mark, - cv = page.current, - update = internal.updateObjectVersion.bind( this ); - - let objectId; - objects.forEach( function( object ) { - objectId = object.id; - update( object.id, object.v, object.d ); +const diffAndSend = ( channel, id, object ) => { + return channel.store.get( id ).then( ghost => { + buildModifyChange( channel, id, object, ghost ) } ); - - if ( !mark ) { - if ( objectId ) { - this.index_last_id = objectId; - } - if ( !this.index_last_id ) { - internal.indexingComplete.call( this ) - } - this.index_cv = cv; - } else { - this.sendIndexRequest( mark ); - } }; -Channel.prototype.sendIndexRequest = function( mark ) { - this.send( format( 'i:1:%s::10', mark ? mark : '' ) ); -}; - -Channel.prototype.sendChangeVersionRequest = function( cv ) { - this.send( format( 'cv:%s', cv ) ); -}; - -Channel.prototype.onChanges = function( data ) { - var changes = JSON.parse( data ), - onChange = internal.changeObject.bind( this ); - - changes.forEach( function( change ) { - onChange( change.id, change ); +const removeAndSend = ( channel, id ) => { + return channel.store.get( id ).then( ghost => { + buildRemoveChange( channel, id, ghost ); } ); - // emit ready after all server changes have been applied - this.emit( 'ready' ); -}; - -Channel.prototype.onChangeVersion = function( data ) { - if ( data === UNKNOWN_CV ) { - this.store.setChangeVersion( null ) - .then( () => this.startIndexing() ); - } -}; - -Channel.prototype.onVersion = function( data ) { - // invalid version, give up without emitting - if ( data.slice( -2 ) === '\n?' ) { - return; - } - - const ghost = parseVersionMessage( data ); - - this.emit( 'version', ghost.id, ghost.version, ghost.data ); - this.emit( 'version.' + ghost.id, ghost.id, ghost.version, ghost.data ); - this.emit( 'version.' + ghost.id + '.' + ghost.version, ghost.data ); }; -function NetworkQueue() { - this.queues = {}; -} - -NetworkQueue.prototype.queueFor = function( id ) { - var queues = this.queues, - queue = queues[id]; +const indexingComplete = ( channel ) => { + // Indexing has finished + channel.setIsIndexing( false ); - if ( !queue ) { - queue = new Queue(); - queue.on( 'finish', function() { - delete queues[id]; + updateChangeVersion( channel, channel.index_cv ) + .then( () => { + channel.localQueue.start(); } ); - queues[id] = queue; - } - return queue; -}; + channel.emit( 'index', channel.index_cv ); -function Queue() { - this.queue = []; - this.running = false; + channel.index_last_id = null; + channel.index_cv = null; + channel.emit( 'ready' ) } -inherits( Queue, EventEmitter ); +export default class Channel extends EventEmitter { + appid: string + name: string + isIndexing: boolean + access_token: string + store: GhostStore + session_id: string + message: EventEmitter -// Add a function at the end of the queue -Queue.prototype.add = function( fn ) { - this.queue.push( fn ); - this.start(); - return this; -}; + networkQueue: NetworkQueue + localQueue: LocalQueue -Queue.prototype.start = function() { - if ( this.running ) return; - this.running = true; - this.emit( 'start' ); - setImmediate( this.run.bind( this ) ); -} + index_last_id: ?string + index_cv: ?string -Queue.prototype.run = function() { - var fn; - this.running = true; + /** + * Maintains syncing state for a Simperium bucket. + * + * A bucket uses a channel to listen for updates that come from simperium while + * sending updates that are made on the client. + * + * The channel can handle incoming simperium commands via `handleMessage`. These + * messages are stripped of their channel number that separates bucket operations. + * The `Client` maintains which commands should be routed to which channel. + * + * The channel is responsible for creating all change operations and downloading + * bucket data. + * + * @param {String} appid - Simperium app id, used for authenticating + * @param {String} access_token - Simperium user access token + * @param {GhostStore} store - data storage for ghost objects + * @param {String} name - the name of the bucket on Simperium.com + */ + constructor( appid: string, access_token: string, store: GhostStore, name: string ) { + super(); + // Uses an event emitter to handle different Simperium commands + const message = this.message = new EventEmitter(); + + this.name = name; + this.isIndexing = false; + this.appid = appid; + this.store = store; + this.access_token = access_token; + + this.session_id = 'node-' + uuid(); + + // These are the simperium bucket commands the channel knows how to handle + message.on( 'auth', this.onAuth.bind( this ) ); + message.on( 'i', this.onIndex.bind( this ) ); + message.on( 'c', this.onChanges.bind( this ) ); + message.on( 'e', this.onVersion.bind( this ) ); + message.on( 'cv', this.onChangeVersion.bind( this ) ); + message.on( 'o', function() {} ); + + // Maintain a queue of operations that come from simperium commands + // so that the can be applied to the ghost data. + this.networkQueue = new NetworkQueue(); + // Maintain a queue of operations that originate from this client + // to track their status. + this.localQueue = new LocalQueue( this.store ); + + // When a local queue has indicated that it should send a change operation + // emit a simperium command. The Client instance will know how to route that + // command correctly to simperium + this.localQueue.on( 'send', ( data ) => { + this.emit( 'send', `c:${ JSON.stringify( data ) }` ); + } ); - if ( this.queue.length === 0 ) { - this.running = false; - this.emit( 'finish' ); - return; + // Handle change errors caused by changes originating from this client + this.localQueue.on( 'error', handleChangeError.bind( null, this ) ); } - fn = this.queue.shift(); - fn( this.run.bind( this ) ); -} - -function LocalQueue( store ) { - this.store = store; - this.sent = {}; - this.queues = {}; - this.ready = false; -} - -inherits( LocalQueue, EventEmitter ); + /** + * Called by a bucket when a bucket object has been updated. + * + * The channel uses this method to initiate change operations when objects are updated. + * + * It also uses this method during indexing to track which objects have been successfully + * downloaded. + * + * @param {BucketObject} object - the bucket object + * @param {Boolean} [sync=true] - if the object should be synced + */ + update( object: BucketObject, sync: boolean = true ) { + this.onBucketUpdate( object.id ); + if ( sync === true ) { + console.log( 'diff and send', object ); + diffAndSend( this, object.id, object.data ); + } + }; -LocalQueue.prototype.start = function() { - var queueId; - this.ready = true; - for ( queueId in this.queues ) { - this.processQueue( queueId ); + /** + * Tracks indexing state and emits `indexingStateChange` + * + * @private + * @param {Boolean} isIndexing - updates indexing state to this value + */ + setIsIndexing( isIndexing: boolean ) { + this.isIndexing = isIndexing; + this.emit( 'indexingStateChange', this.isIndexing ); } -} - -LocalQueue.prototype.pause = function() { - this.ready = false; -}; -LocalQueue.prototype.acknowledge = function( change ) { - if ( this.sent[change.id] === change ) { - delete this.sent[change.id]; + /** + * Removes an object from Simperium. Called by a bucket when an object is deleted. + * + * @param {String} id - the id of the object to remove + */ + remove( id: string ) { + removeAndSend( this, id ) } - this.processQueue( change.id ); -} - -LocalQueue.prototype.queue = function( change ) { - var queue = this.queues[change.id]; - - if ( !queue ) { - queue = []; - this.queues[change.id] = queue; + /** + * Retrieves revisions for a given object from Simperium. + * + * @typedef {Object} BucketObjectRevision + * @property {String} id - bucket object id + * @property {Number} version - revision version + * @property {Object} data - object literal data at given version + * + * @param {String} id - the bucket object id + * @returns {Promise>} list of known object versions + */ + getRevisions( id: string ): Promise { + return new Promise( ( resolve, reject ) => { + collectionRevisions( this, id, ( error, revisions ) => { + if ( error ) { + reject( error ); + return; + } + if ( revisions ) { + resolve( revisions ); + } + } ); + } ); } - queue.push( change ); - - this.emit( 'queued', change.id, change, queue ); + /** + * Checks if there are unsynced changes. + * + * @returns {Promise} true if there are still changes to sync + */ + hasLocalChanges(): Promise { + return Promise.resolve( this.localQueue.hasChanges() ); + } - if ( !this.ready ) return; + /** + * Retrieves the currently stored version number for a given object + * + * @param {String} id - object id to get the version for + * @returns {Promise} version number for the object + */ + getVersion( id: string ): Promise { + return this.store.get( id ).then( ( ghost ) => { + if ( ghost && ghost.version ) { + return ghost.version; + } + return 0; + } ); + } - this.processQueue( change.id ); -}; + /** + * Receives incoming messages from Simperium + * + * Called by a client that strips the channel number prefix before + * seding to a specific channel. + * + * @param {String} data - the message from Simperium + */ + handleMessage( data: string ) { + const message = parseMessage( data ); + this.message.emit( message.command, message.data ); + }; -LocalQueue.prototype.hasChanges = function() { - return Object.keys( this.queues ).length > 0; -}; + /** + * Used to send a message from this channel to Simperium + * The client listens for `send` events and correctly sends them to Simperium + * + * @emits Channel#send + * @private + * @param {String} data - the message to send + */ + send( data: string ) { + /** + * Send event + * + * @event Channel#send + * @type {String} - the message to send to Simperium + */ + this.emit( 'send', data ); + }; -LocalQueue.prototype.dequeueChangesFor = function( id ) { - var changes = [], sent = this.sent[id], queue = this.queues[id]; + /** + * Restores a buckets data to what is currently stored in the ghost data. + */ + reload() { + this.store.eachGhost( ghost => { + this.emit( 'update', ghost.key, ghost.data ); + } ); + }; - if ( sent ) { - delete this.sent[id]; - changes.push( sent ); - } + /** + * Called after a bucket updates an object. + * + * Wile indexing keeps track of which objects have been retrieved. + * + * @param {String} id - object that was updated + */ + onBucketUpdate( id: string ) { + if ( ! this.isIndexing ) { + return; + } + if ( this.index_last_id == null || this.index_cv == null ) { + return; + } else if ( this.index_last_id === id ) { + indexingComplete( this ); + } + }; - if ( queue ) { - delete this.queues[id]; - changes = changes.concat( queue ); - } + onAuth( data: string ) { + let auth; + let init; + try { + auth = JSON.parse( data ); + this.emit( 'unauthorized', auth ); + return; + } catch ( error ) { + // request cv and then send method + this.once( 'ready', () => { + this.localQueue.resendSentChanges(); + } ) + init = ( cv ) => { + if ( cv ) { + this.localQueue.start(); + this.sendChangeVersionRequest( cv ); + } else { + this.startIndexing(); + } + }; + + this.store.getChangeVersion().then( init ); - return changes; -}; + return; + } + }; -LocalQueue.prototype.processQueue = function( id ) { - var queue = this.queues[id]; - var compressAndSend = this.compressAndSend.bind( this, id ); + /** + * Re-downloads all Simperium bucket data + */ + startIndexing() { + this.localQueue.pause(); + this.setIsIndexing( true ); + this.sendIndexRequest(); + }; - // there is no queue, don't do anything - if ( !queue ) return; + /** + * Called when a channel's socket has been connected + */ + onConnect() { + var init = { + name: this.name, + clientid: this.session_id, + api: '1.1', + token: this.access_token, + app_id: this.appid, + library: 'node-simperium', + version: '0.0.1' + }; - // queue is empty, delete it from memory - if ( queue.length === 0 ) { - delete this.queues[id]; - return; - } + this.send( format( 'init:%s', JSON.stringify( init ) ) ); + }; - // waiting for a previous sent change to get acknowledged - if ( this.sent[id] ) { - this.emit( 'wait', id ); - return; - } + onIndex( data: string ) { + const page = JSON.parse( data ), + objects = page.index, + mark = page.mark, + cv = page.current; - this.store.get( id ).then( compressAndSend ); -} + let objectId; + objects.forEach( ( object ) => { + objectId = object.id; + updateObjectVersion( this, object.id, object.v, object.d ); + } ); -LocalQueue.prototype.compressAndSend = function( id, ghost ) { - var changes = this.queues[id]; - var change; - var target = ghost.data; - var c; - var type; + if ( !mark ) { + if ( objectId ) { + this.index_last_id = objectId; + } + if ( !this.index_last_id ) { + indexingComplete( this ); + } + this.index_cv = cv; + } else { + this.sendIndexRequest( mark ); + } + }; - // a change was sent before we could compress and send - if ( this.sent[id] ) { - this.emit( 'wait', id ); - return; - } + sendIndexRequest( mark: ?string ) { + this.send( format( 'i:1:%s::10', mark ? mark : '' ) ); + }; - if ( changes.length === 1 ) { - change = changes.shift(); - this.sent[id] = change; - this.emit( 'send', change ); - return; - } + sendChangeVersionRequest( cv: string ) { + this.send( format( 'cv:%s', cv ) ); + }; - if ( changes.length > 1 && changes[0].type === change_util.type.REMOVE ) { - change = changes.shift(); - changes.splice( 0, changes.length - 1 ); - this.sent[id] = change; - this.emit( 'send', change ); - } + onChanges( data: string ) { + const changes: any[] = JSON.parse( data ); - while ( changes.length > 0 ) { - c = changes.shift(); + changes.forEach( ( change: any ) => { + changeObject( this, change.id, change ); + } ); + // emit ready after all server changes have been applied + this.emit( 'ready' ); + }; - if ( c.o === change_util.type.REMOVE ) { - changes.unshift( c ); - break; + onChangeVersion( data: string ) { + if ( data === UNKNOWN_CV ) { + this.store.setChangeVersion( null ) + .then( () => this.startIndexing() ); } - - target = jsondiff.apply_object_diff( target, c.v ); } - type = target === null ? change_util.type.REMOVE : change_util.type.MODIFY; - change = change_util.buildChange( type, id, target, ghost ); + onVersion( data: string ) { + // invalid version, give up without emitting + if ( data.slice( -2 ) === '\n?' ) { + return; + } - this.sent[id] = change; - this.emit( 'send', change ); -} + const ghost = parseVersionMessage( data ); -LocalQueue.prototype.resendSentChanges = function() { - for ( let ccid in this.sent ) { - this.emit( 'send', this.sent[ccid] ) + this.emit( 'version', ghost.id, ghost.version, ghost.data ); + this.emit( 'version.' + ghost.id, ghost.id, ghost.version, ghost.data ); + this.emit( 'version.' + ghost.id + '.' + ghost.version, ghost.data ); } } @@ -866,7 +724,7 @@ LocalQueue.prototype.resendSentChanges = function() { * * @type {Map} stores specific revisions as a cache */ -export const revisionCache = new Map(); +export const revisionCache: Map = new Map(); /** * Attempts to fetch an entity's revisions @@ -888,15 +746,15 @@ export const revisionCache = new Map(); * @param {String} id entity id for which to fetch revisions * @param {Function} callback called on error or when finished */ -function collectionRevisions( channel, id, callback ) { +function collectionRevisions( channel: Channel, id: string, callback: ( ?Error, ?BucketObjectRevision[] ) => void ) { /** @type {Number} ms delay arbitrarily chosen to give up on fetch */ const TIMEOUT = 200; /** @type {Set} tracks requested revisions */ - const requestedVersions = new Set(); + const requestedVersions: Set = new Set(); /** @type {Array} contains the revisions and associated data */ - const versions = []; + const versions: BucketObjectRevision[] = []; /** @type {Number} remembers newest version of an entity */ let latestVersion; @@ -912,7 +770,7 @@ function collectionRevisions( channel, id, callback ) { * @param {Number} version version of returned entity * @param {Object} data value of entity at revision */ - function onVersion( id, version, data ) { + function onVersion( id: string, version: number, data: {} ) { revisionCache.set( `${ id }.${ version }`, data ); versions.push( { id, version, data } ); @@ -965,9 +823,9 @@ function collectionRevisions( channel, id, callback ) { requestedVersions.add( version ); - // fetch from server or local cache - if ( revisionCache.has( `${ id }.${ version }` ) ) { - onVersion( id, version, revisionCache.get( `${ id }.${ version }` ) ); + let cached = revisionCache.get( `${ id }.${ version }` ); + if ( cached ) { + onVersion( id, version, cached ); } else { channel.send( `e:${ id }.${ version }` ); } diff --git a/src/simperium/client.js b/src/simperium/client.js index 7bfadb8..cdd990a 100644 --- a/src/simperium/client.js +++ b/src/simperium/client.js @@ -1,3 +1,4 @@ +// @flow import { format, inherits } from 'util' import { EventEmitter } from 'events' import Bucket from './bucket' diff --git a/src/simperium/ghost/default.js b/src/simperium/ghost/default.js index 6c81772..738ae4d 100644 --- a/src/simperium/ghost/default.js +++ b/src/simperium/ghost/default.js @@ -1,3 +1,4 @@ +// @flow import Store from './store' /** diff --git a/src/simperium/ghost/store.js b/src/simperium/ghost/store.js index 212d9c3..249240e 100644 --- a/src/simperium/ghost/store.js +++ b/src/simperium/ghost/store.js @@ -1,3 +1,4 @@ +// @flow /** * An in memory implementation of GhostStore * diff --git a/src/simperium/index.js b/src/simperium/index.js index 1b66006..6164f93 100644 --- a/src/simperium/index.js +++ b/src/simperium/index.js @@ -1,3 +1,4 @@ +// @flow import Client from './client' import Auth from './auth' import * as util from './util' diff --git a/src/simperium/jsondiff/index.js b/src/simperium/jsondiff/index.js index 568cfa5..ca2dd2d 100644 --- a/src/simperium/jsondiff/index.js +++ b/src/simperium/jsondiff/index.js @@ -3,6 +3,6 @@ import diff_match_patch from './diff_match_patch' export { JSONDiff as jsondiff, diff_match_patch } -export default function init( options ) { +export default ( options ) => { return new JSONDiff( options ); } diff --git a/src/simperium/reconnection-timer.js b/src/simperium/reconnection-timer.js index 6295b66..aae67b5 100644 --- a/src/simperium/reconnection-timer.js +++ b/src/simperium/reconnection-timer.js @@ -1,3 +1,4 @@ +// @flow export default class ReconnectionTimer { started: boolean interval: number => number; diff --git a/src/simperium/storage/default.js b/src/simperium/storage/default.js index efa7bb9..56220ac 100644 --- a/src/simperium/storage/default.js +++ b/src/simperium/storage/default.js @@ -1,3 +1,4 @@ +// @flow export default function() { return new BucketStore(); }; diff --git a/src/simperium/util/change.js b/src/simperium/util/change.js index 96c8a72..e106b93 100644 --- a/src/simperium/util/change.js +++ b/src/simperium/util/change.js @@ -1,7 +1,10 @@ +// @flow import { v4 as uuid } from 'uuid' import jsondiff from '../jsondiff' -const changeTypes = { +type ChangeType = 'M' | '-' | '+' + +const changeTypes: { [string]: ChangeType } = { MODIFY: 'M', REMOVE: '-', ADD: '+' diff --git a/src/simperium/util/index.js b/src/simperium/util/index.js index 8f4ee73..11629ec 100644 --- a/src/simperium/util/index.js +++ b/src/simperium/util/index.js @@ -1,3 +1,4 @@ +// @flow import * as change from './change' import parseMessage from './parse_message' import parseVersionMessage from './parse_version_message' diff --git a/src/simperium/util/parse_message.js b/src/simperium/util/parse_message.js index 3ebd318..c4d107f 100644 --- a/src/simperium/util/parse_message.js +++ b/src/simperium/util/parse_message.js @@ -1,5 +1,6 @@ -export default function( data ) { - var marker = data.indexOf( ':' ); +// @flow +export default ( data: string ) => { + const marker = data.indexOf( ':' ); return { command: data.slice( 0, marker ), diff --git a/src/simperium/util/parse_version_message.js b/src/simperium/util/parse_version_message.js index 8a4a423..1fece97 100644 --- a/src/simperium/util/parse_version_message.js +++ b/src/simperium/util/parse_version_message.js @@ -1,4 +1,4 @@ - +// @flow export default function( data ) { var dataMark = data.indexOf( '\n' ), versionMark = data.indexOf( '.' ), diff --git a/test/helper.js b/test/helper.js index c69962d..b72525a 100644 --- a/test/helper.js +++ b/test/helper.js @@ -1,3 +1,3 @@ process.on( 'unhandledRejection', ( promise, reason ) => { - console.error( 'unhandled rejection', reason ); + console.error( 'unhandled rejection', promise, reason ); } ); diff --git a/test/simperium/bucket_test.js b/test/simperium/bucket_test.js index a62fd55..be60263 100644 --- a/test/simperium/bucket_test.js +++ b/test/simperium/bucket_test.js @@ -41,7 +41,7 @@ describe( 'Bucket', () => { bucket.update( id, object, function() { bucket.get( id, function( err, savedObject ) { - deepEqual( object, savedObject ); + deepEqual( object, savedObject.data ); done(); } ); } ); @@ -53,7 +53,7 @@ describe( 'Bucket', () => { bucket.update( id, object, {}, function() { bucket.get( id, function( err, savedObject ) { - deepEqual( object, savedObject ); + deepEqual( object, savedObject.data ); done(); } ) } ) diff --git a/test/simperium/channel_test.js b/test/simperium/channel_test.js index dd57405..4bedee6 100644 --- a/test/simperium/channel_test.js +++ b/test/simperium/channel_test.js @@ -243,7 +243,7 @@ describe( 'Channel', function() { return new Promise( ( resolve ) => { bucket.on( 'update', () => { bucket.get( 'object' ).then( ( object ) => { - equal( object.content, 'step 1' ); + equal( object.data.content, 'step 1' ); resolve(); } ); } ); @@ -326,7 +326,7 @@ describe( 'Channel', function() { // If receiving a remote change while there are unsent local modifications, // local changes should be rebased onto the new ghost and re-sent - it( 'should resolve applying patch to modified object', () => new Promise( ( resolve ) => { + it.only( 'should resolve applying patch to modified object', () => new Promise( ( resolve, reject ) => { // add an item to the index const key = 'hello', current = { title: 'Hello world' }, @@ -337,22 +337,29 @@ describe( 'Channel', function() { // when the channel is updated, it should be the result of // the local changes being rebased on top of changes coming from the // network which should ultimately be "Goodbye kansas" - channel.on( 'update', function( key, data ) { - equal( data.title, 'Goodbye kansas' ); - } ); - - channel.on( 'send', function() { - equal( channel.localQueue.sent[key].v.title.v, '-5\t+Goodbye\t=6' ); - resolve(); + channel.on( 'send', ( data ) => { + console.log( 'channel is sending', data ); + } ); + channel.on( 'send', () => { + console.log( 'requesting object' ); + bucket.get( key ).then( bucketObject => { + try { + equal( channel.localQueue.sent[key].v.title.v, '-5\t+Goodbye\t=7' ); + equal( bucketObject.data.title, 'Goodbye kansas' ); + resolve(); + } catch ( error ) { + reject( error ); + } + } ); } ); - // We receive a remote change from "Hello world" to "Hello kansas" - channel.handleMessage( 'c:' + JSON.stringify( [{ - o: 'M', ev: 2, sv: 1, cv: 'cv1', id: key, v: remoteDiff - }] ) ); - // We're changing "Hello world" to "Goodbye world" - bucket.update( key, {title: 'Goodbye world'} ); + bucket.update( key, {title: 'Goodbye world'} ).then( () => { + // We receive a remote change from "Hello world" to "Hello kansas" + channel.handleMessage( 'c:' + JSON.stringify( [{ + o: 'M', ev: 2, sv: 1, cv: 'cv1', id: key, v: remoteDiff + }] ) ); + } ); } ) ); it( 'should emit errors on the bucket instance', ( done ) => { diff --git a/test/simperium/mock_bucket_store.js b/test/simperium/mock_bucket_store.js index 6028bbe..88f6491 100644 --- a/test/simperium/mock_bucket_store.js +++ b/test/simperium/mock_bucket_store.js @@ -1,28 +1,38 @@ +// @flow -function BucketStore() { - this.objects = {}; +import type { BucketStore, BucketObject } from '../../src/simperium/bucket'; + +class MockStore implements BucketStore { + objects: { [string]: BucketObject }; + + constructor() { + this.objects = {}; + } + + get( id: string, callback: Function ) { + var objects = this.objects; + setImmediate( function() { + callback( null, objects[id] ); + } ); + } + + update( id: string, object: {}, isIndexing: boolean, callback: Function ) { + setImmediate( () => { + let updated = this.objects[id] = {id: id, data: object, isIndexing: isIndexing}; + if ( callback ) callback( null, updated ); + } ); + } + + remove( id: string, callback: Function ) { + setImmediate( () => { + delete this.objects[id]; + callback( null ); + } ); + } + + find() { + throw new Error( 'not implemeted' ); + } } -BucketStore.prototype.get = function( id, callback ) { - var objects = this.objects; - - setImmediate( function() { - callback( null, objects[id] ); - } ); -}; - -BucketStore.prototype.update = function( id, object, isIndexing, callback ) { - this.objects[id] = object; - setImmediate( function() { - if ( callback ) callback( null, {id: id, data: object, isIndexing: isIndexing} ); - } ); -}; - -BucketStore.prototype.remove = function( id, callback ) { - delete this.objects[id]; - setImmediate( function() { - callback( null ); - } ); -}; - -export default () => new BucketStore(); +export default () => new MockStore(); From 9b5dc18e6c1837ab4ae9adf29f6925b2bd9d2807 Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Mon, 19 Feb 2018 11:46:31 -0800 Subject: [PATCH 5/5] Fixes BucketStoreAPI type, gets rid of clever callback to promise wrapper --- src/simperium/bucket.js | 110 ++++++++++++++++++++------------- src/simperium/channel.js | 3 - src/simperium/ghost/default.js | 2 +- test/simperium/channel_test.js | 10 +-- 4 files changed, 70 insertions(+), 55 deletions(-) diff --git a/src/simperium/bucket.js b/src/simperium/bucket.js index 9904bab..05b4b09 100644 --- a/src/simperium/bucket.js +++ b/src/simperium/bucket.js @@ -1,28 +1,11 @@ // @flow import events from 'events' -import { v4 as uuid } from 'uuid'; +import uuid from 'uuid/v4'; const { EventEmitter } = events; type TaskCallback = ( ?Error, ?T ) => void; -/** - * Convenience function to turn a function that uses a callback into a function - * that returns a Promise. - * - * @param {TaskCallback} task - function that expects a single callback argument - * @returns {Promise} callback wrapped in a promise interface - */ -const callbackAsPromise = ( task: ( TaskCallback ) => void ): Promise => new Promise( ( resolve, reject ) => { - task( ( error: ?Error, result: ?T ) => { - if ( error ) { - reject( error ); - return; - } - resolve( result ); - } ); -} ); - /** * Runs a promise with a callback (if one is provided) to support the old callback API. * NOTE: if the callback API is removed this is a place to warn users @@ -108,9 +91,7 @@ export type BucketObjectRevision = { type RevisionList = BucketObjectRevision[]; -interface Channel { - removeListener( eventName: string, listener: ?any ) : Channel; - on( eventName: string, listener: any ) : Channel; +interface Channel extends EventEmitter { reload(): void; update( object: BucketObject, sync: boolean ): void; remove( id: string ): void; @@ -146,7 +127,7 @@ type bucketStoreGetCallback = ( ?Error, ?BucketObject ) => void * @param {?BucketObject[]} */ - interface BucketStoreAPI { + type BucketStoreAPI = { get: ( id: string ) => Promise; update: ( id: string, data: {}, isIndexing: boolean ) => Promise; remove: ( id: string ) => Promise; @@ -159,14 +140,55 @@ type bucketStoreGetCallback = ( ?Error, ?BucketObject ) => void * @returns {Object} store api methods that use Promises instead of callbacks */ const promiseAPI = ( store: BucketStore ): BucketStoreAPI => ( { - get: id => - callbackAsPromise( store.get.bind( store, id ) ), - update: ( id, object, isIndexing ) => - callbackAsPromise( store.update.bind( store, id, object, isIndexing ) ), - remove: id => - callbackAsPromise( store.remove.bind( store, id ) ), - find: query => - callbackAsPromise( store.find.bind( store, query ) ) + get: ( id: string ) => new Promise( ( resolve, reject ) => { + store.get( id, ( error, bucketObject ) => { + if ( error ) { + reject( error ); + return; + } + if ( bucketObject ) { + resolve( bucketObject ); + return; + } + reject( new Error( 'BucketStore failed get without Error' ) ); + } ); + } ), + update: ( id: string, data: {}, isIndexing: boolean ) => new Promise( ( resolve, reject ) => { + store.update(id, data, isIndexing, ( error, bucketObject ) => { + if ( error ) { + reject( error ); + return; + } + if ( bucketObject ) { + resolve( bucketObject ); + return; + } + // this shouldn't happen, come up with good error message + reject( new Error( 'BucketStore failed update without error' ) ); + } ); + } ), + remove: ( id: string ) => new Promise( ( resolve, reject ) => { + store.remove( id, ( error ) => { + if ( error ) { + reject( error ); + return; + } + resolve(); + } ) + } ), + find: ( query: ?any ) => new Promise( ( resolve, reject ) => { + store.find( query, ( error, result ) => { + if ( error ) { + reject( error ); + return; + } + if ( result ) { + resolve( result ); + return; + } + reject( new Error( 'BucketStore failed find without error' ) ); + } ) + } ) } ); type updateOptions = { sync: boolean } | bucketStoreGetCallback @@ -178,11 +200,11 @@ export default class Bucket extends EventEmitter { isIndexing: boolean; channel: Channel; - onChannelIndex: any; - onChannelError: any; - onChannelUpdate: any; - onChannelIndexingStateChange: any; - onChannelRemove: any; + onChannelIndex: Function; + onChannelError: Function; + onChannelUpdate: Function; + onChannelIndexingStateChange: Function; + onChannelRemove: Function; /** * A bucket that syncs data with Simperium. @@ -191,7 +213,7 @@ export default class Bucket extends EventEmitter { * @param {bucketStoreProvider} storeProvider - a factory function that provides a bucket store * @param {Channel} channel - a channel instance used for syncing Simperium data */ - constructor( name: string, storeProvider: BucketStoreProvider, channel: Channel ) { + constructor( name: string, storeProvider: BucketStoreProvider, channel: ?Channel ) { super(); this.name = name; this.store = storeProvider( this ); @@ -203,18 +225,18 @@ export default class Bucket extends EventEmitter { */ this.onChannelIndex = this.emit.bind( this, 'index' ); this.onChannelError = this.emit.bind( this, 'changeError' ); - this.onChannelUpdate = ( id, data ) => { + this.onChannelUpdate = ( id: string, data: {} ) => { this.update( id, data, { sync: false } ); }; - this.onChannelIndexingStateChange = ( isIndexing ) => { + this.onChannelIndexingStateChange = ( isIndexing: boolean ) => { this.isIndexing = isIndexing; if ( isIndexing ) { this.emit( 'indexing' ); } } - this.onChannelRemove = ( id ) => this.remove( id ); + this.onChannelRemove = ( id: string ) => this.remove( id ); if ( channel ) { this.setChannel( channel ); @@ -223,18 +245,18 @@ export default class Bucket extends EventEmitter { // NOTE: for backwards compatibility, listeners for `error` will // be subscribed to `changeError` - on( eventName: string, listener: Function ): Bucket { + addListener( eventName: string, listener: ( ... args: any[] ) => void ): this { if ( eventName === 'error' ) { // TODO: deprecation warning - return super.on( 'changeError', listener ); + return super.addListener( 'changeError', listener ); } - return super.on( eventName, listener ); + return super.addListener( eventName, listener ); } - removeListener( eventName: string, listener: Function ): Bucket { + removeListener( eventName: string, listener: Function ): this { if ( eventName === 'error' ) { // TODO: deprecation warning - return super.removeListener( 'changeError', listener); + return super.removeListener( 'changeError', listener ); } return super.removeListener( eventName, listener ); } diff --git a/src/simperium/channel.js b/src/simperium/channel.js index e4f5522..e32f4e9 100644 --- a/src/simperium/channel.js +++ b/src/simperium/channel.js @@ -217,7 +217,6 @@ const updateObjectVersion = ( channel: Channel, id: string, version: number, dat localModifications = change_util.compressChanges( changes, original ); remoteModifications = patch; transformed = change_util.transform( localModifications, remoteModifications, original ); - console.log( 'shall we transform?', localModifications, remoteModifications, transformed ); update = data; // apply the transformed patch and emit the update @@ -226,7 +225,6 @@ const updateObjectVersion = ( channel: Channel, id: string, version: number, dat update = jsondiff.apply_object_diff( data, transformed ); // queue up the new change change = change_util.modify( id, version, patch ); - console.log( 'new transformed change to queue', change ); channel.localQueue.queue( change ); } @@ -467,7 +465,6 @@ export default class Channel extends EventEmitter { update( object: BucketObject, sync: boolean = true ) { this.onBucketUpdate( object.id ); if ( sync === true ) { - console.log( 'diff and send', object ); diffAndSend( this, object.id, object.data ); } }; diff --git a/src/simperium/ghost/default.js b/src/simperium/ghost/default.js index 738ae4d..60422bd 100644 --- a/src/simperium/ghost/default.js +++ b/src/simperium/ghost/default.js @@ -7,6 +7,6 @@ import Store from './store' * @param {Bucket} a bucket instance to store ghost objects for * @returns {GhostStore} an istance of a GhostStore used to save Ghost data */ -export default function( bucket ) { +export default function( bucket: Bucket ) { return new Store( bucket ); }; diff --git a/test/simperium/channel_test.js b/test/simperium/channel_test.js index 4bedee6..92dc046 100644 --- a/test/simperium/channel_test.js +++ b/test/simperium/channel_test.js @@ -326,7 +326,7 @@ describe( 'Channel', function() { // If receiving a remote change while there are unsent local modifications, // local changes should be rebased onto the new ghost and re-sent - it.only( 'should resolve applying patch to modified object', () => new Promise( ( resolve, reject ) => { + it( 'should resolve applying patch to modified object', () => new Promise( ( resolve, reject ) => { // add an item to the index const key = 'hello', current = { title: 'Hello world' }, @@ -337,15 +337,11 @@ describe( 'Channel', function() { // when the channel is updated, it should be the result of // the local changes being rebased on top of changes coming from the // network which should ultimately be "Goodbye kansas" - channel.on( 'send', ( data ) => { - console.log( 'channel is sending', data ); - } ); channel.on( 'send', () => { - console.log( 'requesting object' ); - bucket.get( key ).then( bucketObject => { + bucket.once( 'update', ( id, data ) => { try { + equal( data.title, 'Goodbye kansas' ); equal( channel.localQueue.sent[key].v.title.v, '-5\t+Goodbye\t=7' ); - equal( bucketObject.data.title, 'Goodbye kansas' ); resolve(); } catch ( error ) { reject( error );