Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel queues flowtyped #64

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 2 additions & 274 deletions src/simperium/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { format, inherits } from 'util'
import { EventEmitter } from 'events'
import { parseMessage, parseVersionMessage, change as change_util } from './util'
import { v4 as uuid } from 'uuid'
import NetworkQueue from './queues/network-queue';
import LocalQueue from './queues/local-queue';

const UNKNOWN_CV = '?';
const CODE_INVALID_VERSION = 405;
Expand Down Expand Up @@ -262,87 +264,6 @@ internal.indexingComplete = function() {
this.emit( 'ready' )
}

/**
* A ghost represents a version of a bucket object as known by Simperium
*
* Generally a client will keep the last known ghost stored locally for efficient
* diffing and patching of Simperium change operations.
*
* @typedef {Object} Ghost
* @property {Number} version - the ghost's version
* @property {String} key - the simperium bucket object id this ghost is for
* @property {Object} data - the data for the given ghost version
*/

/**
* Callback function used by the ghost store to iterate over existing ghosts
*
* @callback ghostIterator
* @param {Ghost} - the current ghost
*/

/**
* A GhostStore provides the store mechanism for ghost data that the Channel
* uses to maintain syncing state and producing change operations for
* Bucket objects.
*
* @interface GhostStore
*/

/**
* Retrieve a Ghost for the given bucket object id
*
* @function
* @name GhostStore#get
* @param {String} id - bucket object id
* @returns {Promise<Ghost>} - the ghost for this object
*/

/**
* Save a ghost in the store.
*
* @function
* @name GhostStore#put
* @param {String} id - bucket object id
* @param {Number} version - version of ghost data
* @param {Object} data - object literal to save as this ghost's data for this version
* @returns {Promise<Ghost>} - the ghost for this object
*/

/**
* Delete a Ghost from the store.
*
* @function
* @name GhostStore#remove
* @param {String} id - bucket object id
* @returns {Promise<Ghost>} - the ghost for this object
*/

/**
* Iterate over existing Ghost objects with the given callback.
*
* @function
* @name GhostStore#eachGhost
* @param {ghostIterator} - function to run against each ghost
*/

/**
* Get the current change version (cv) that this channel has synced.
*
* @function
* @name GhostStore#getChangeVersion
* @returns {Promise<String>} - the current change version for the bucket
*/

/**
* Set the current change version.
*
* @function
* @name GhostStore#setChangeVersion
* @returns {Promise<Void>} - resolves once the change version is saved
*/


/**
* Maintains syncing state for a Simperium bucket.
*
Expand Down Expand Up @@ -662,199 +583,6 @@ Channel.prototype.onVersion = function( data ) {
this.emit( 'version.' + ghost.id + '.' + ghost.version, ghost.data );
};

function NetworkQueue() {
this.queues = {};
}

NetworkQueue.prototype.queueFor = function( id ) {
var queues = this.queues,
queue = queues[id];

if ( !queue ) {
queue = new Queue();
queue.on( 'finish', function() {
delete queues[id];
} );
queues[id] = queue;
}

return queue;
};

function Queue() {
this.queue = [];
this.running = false;
}

inherits( Queue, EventEmitter );

// Add a function at the end of the queue
Queue.prototype.add = function( fn ) {
this.queue.push( fn );
this.start();
return this;
};

Queue.prototype.start = function() {
if ( this.running ) return;
this.running = true;
this.emit( 'start' );
setImmediate( this.run.bind( this ) );
}

Queue.prototype.run = function() {
var fn;
this.running = true;

if ( this.queue.length === 0 ) {
this.running = false;
this.emit( 'finish' );
return;
}

fn = this.queue.shift();
fn( this.run.bind( this ) );
}

function LocalQueue( store ) {
this.store = store;
this.sent = {};
this.queues = {};
this.ready = false;
}

inherits( LocalQueue, EventEmitter );

LocalQueue.prototype.start = function() {
var queueId;
this.ready = true;
for ( queueId in this.queues ) {
this.processQueue( queueId );
}
}

LocalQueue.prototype.pause = function() {
this.ready = false;
};

LocalQueue.prototype.acknowledge = function( change ) {
if ( this.sent[change.id] === change ) {
delete this.sent[change.id];
}

this.processQueue( change.id );
}

LocalQueue.prototype.queue = function( change ) {
var queue = this.queues[change.id];

if ( !queue ) {
queue = [];
this.queues[change.id] = queue;
}

queue.push( change );

this.emit( 'queued', change.id, change, queue );

if ( !this.ready ) return;

this.processQueue( change.id );
};

LocalQueue.prototype.hasChanges = function() {
return Object.keys( this.queues ).length > 0;
};

LocalQueue.prototype.dequeueChangesFor = function( id ) {
var changes = [], sent = this.sent[id], queue = this.queues[id];

if ( sent ) {
delete this.sent[id];
changes.push( sent );
}

if ( queue ) {
delete this.queues[id];
changes = changes.concat( queue );
}

return changes;
};

LocalQueue.prototype.processQueue = function( id ) {
var queue = this.queues[id];
var compressAndSend = this.compressAndSend.bind( this, id );

// there is no queue, don't do anything
if ( !queue ) return;

// queue is empty, delete it from memory
if ( queue.length === 0 ) {
delete this.queues[id];
return;
}

// waiting for a previous sent change to get acknowledged
if ( this.sent[id] ) {
this.emit( 'wait', id );
return;
}

this.store.get( id ).then( compressAndSend );
}

LocalQueue.prototype.compressAndSend = function( id, ghost ) {
var changes = this.queues[id];
var change;
var target = ghost.data;
var c;
var type;

// a change was sent before we could compress and send
if ( this.sent[id] ) {
this.emit( 'wait', id );
return;
}

if ( changes.length === 1 ) {
change = changes.shift();
this.sent[id] = change;
this.emit( 'send', change );
return;
}

if ( changes.length > 1 && changes[0].type === change_util.type.REMOVE ) {
change = changes.shift();
changes.splice( 0, changes.length - 1 );
this.sent[id] = change;
this.emit( 'send', change );
}

while ( changes.length > 0 ) {
c = changes.shift();

if ( c.o === change_util.type.REMOVE ) {
changes.unshift( c );
break;
}

target = change_util.apply( c.v, target );
}

type = target === null ? change_util.type.REMOVE : change_util.type.MODIFY;
change = change_util.buildChange( type, id, target, ghost );

this.sent[id] = change;
this.emit( 'send', change );
}

LocalQueue.prototype.resendSentChanges = function() {
for ( let ccid in this.sent ) {
this.emit( 'send', this.sent[ccid] )
}
}

/**
* Since revision data is basically immutable we can prevent the
* need to refetch it after it has been loaded once.
Expand Down
25 changes: 25 additions & 0 deletions src/simperium/queues/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Channel Queues

These queues were originally in the Channel module. They have been extracted
and flowtyped to improve code quality and clarify the API.

## LocalQueue

Each `Channel` instance has a single `LocalQueue` that tracks changes that are sent
are pending to be sent to simperium.

As bucket objects are updated, the `Channel` will reference this queue to determine
when an object should be sent. It also uses the `LocalQueue` to report if a bucket object
is currently being synced or not.

## NetworkQueue

Each `Channel` instance has a single `NetworkQueue`. As changes are received from simperium,
the channel will apply the changes in sequence. Together with the `LocalQueue` the channel
will be able to determine when pending changes in the `LocalQueue` have been accepted or
rejected by the server.

## Queue

A generic queue object used by `LocalQueue` and `RemoteQueue` that sequences tasks as
first-in-first-out execution order.
Loading