From d6ad2447d4130aa765ce3474188a742f1d1e45a8 Mon Sep 17 00:00:00 2001 From: Erwan Guyader Date: Thu, 23 Apr 2020 11:15:27 +0200 Subject: [PATCH 1/3] core/remote: Offer same API as local side The local side offers a simpler API than the actual remote side : - `local.start()` returns a promise that resolves when the local watcher is started (and the initial scan is done) - it sets a `running` proimse attributes that resolves when the side (i.e. the watcher and all other components) is stopped (and should reject when errors are not caught) On the other hand, the `remote.start()` method returns an object with 2 promises: - `started` that resolves when the watcher is all set and rejects on errors during the initial setup - `running` that resolves when the watcher is stopped and is supposed to reject when errors occur after the initial setup However, we've seen that the remote API, besides being different than the local one, does not enable us to catch errors thrown during the running phase while awaiting the end of the starting phase in the Sync module. This is an issue because we should alert the user in this situation and stop the synchronization process to avoid increasing the differences between the user's file system and their Cozy. The new API enables this by letting us await the call to `remote.start()` and then only reference the `running` promise within an error catching block. It's also simpler to reason about which should help us catch any other bug and make changes to this part of the application. --- core/remote/index.js | 6 +----- core/remote/watcher/index.js | 27 +++++++++++---------------- core/sync.js | 14 ++++++-------- test/unit/remote/offline.js | 2 +- test/unit/remote/watcher.js | 31 +++++++++++++++++++++++-------- test/unit/sync.js | 7 ++----- 6 files changed, 44 insertions(+), 43 deletions(-) diff --git a/core/remote/index.js b/core/remote/index.js index 33069c932..145c313e0 100644 --- a/core/remote/index.js +++ b/core/remote/index.js @@ -74,11 +74,7 @@ class Remote /*:: implements Reader, Writer */ { } start() { - const { started, running } = this.watcher.start() - return { - started: started.then(() => this.warningsPoller.start()), - running - } + return this.watcher.start().then(() => this.warningsPoller.start()) } stop() { diff --git a/core/remote/watcher/index.js b/core/remote/watcher/index.js index c16332f35..df906295d 100644 --- a/core/remote/watcher/index.js +++ b/core/remote/watcher/index.js @@ -43,7 +43,7 @@ class RemoteWatcher { remoteCozy: RemoteCozy events: EventEmitter runningResolve: ?() => void - runningReject: ?() => void + running: Promise */ constructor( @@ -56,26 +56,21 @@ class RemoteWatcher { this.prep = prep this.remoteCozy = remoteCozy this.events = events + this.running = new Promise(() => {}) autoBind(this) } - start() { - const started /*: Promise */ = this.watch() - const running /*: Promise */ = started.then(() => - Promise.race([ - // run until either stop is called or watchLoop reject - new Promise(resolve => { - this.runningResolve = resolve - }), - this.watchLoop() - ]) - ) + async start() { + await this.watch() - return { - started: started, - running: running - } + this.running = Promise.race([ + // run until either stop is called or watchLoop reject + new Promise(resolve => { + this.runningResolve = resolve + }), + this.watchLoop() + ]) } stop() { diff --git a/core/sync.js b/core/sync.js index a9bdfc310..b59070961 100644 --- a/core/sync.js +++ b/core/sync.js @@ -131,14 +131,12 @@ class Sync { try { await this.local.start() - const localRunning = this.local.watcher.running - const { - running: remoteRunning, - started: remoteStarted - } = this.remote.start() - await remoteStarted - - Promise.all([localRunning, remoteRunning]).catch(err => { + await this.remote.start() + + Promise.all([ + this.local.watcher.running, + this.remote.watcher.running + ]).catch(err => { throw err }) } catch (err) { diff --git a/test/unit/remote/offline.js b/test/unit/remote/offline.js index 5e39ec10a..827ba2978 100644 --- a/test/unit/remote/offline.js +++ b/test/unit/remote/offline.js @@ -41,7 +41,7 @@ describe('Remote', function() { .stub(global, 'fetch') .rejects(new FetchError('net::ERR_INTERNET_DISCONNECTED')) let eventsSpy = sinon.spy(this.events, 'emit') - await this.remote.start().started + await this.remote.start() eventsSpy.should.have.been.calledWith('offline') fetchStub.restore() // skip waiting for HEARTBEAT diff --git a/test/unit/remote/watcher.js b/test/unit/remote/watcher.js index 693a53b69..ceb33e054 100644 --- a/test/unit/remote/watcher.js +++ b/test/unit/remote/watcher.js @@ -8,6 +8,7 @@ const path = require('path') const sinon = require('sinon') const should = require('should') const CozyClient = require('cozy-client-js').Client +const { Promise } = require('bluebird') const configHelpers = require('../../support/helpers/config') const { posixifyPath } = require('../../support/helpers/context_dir') @@ -72,17 +73,30 @@ describe('RemoteWatcher', function() { }) describe('start', function() { - beforeEach(function() { + it('calls watch() a first time', async function() { sinon.stub(this.watcher, 'watch').returns(Promise.resolve()) - return this.watcher.start() + await this.watcher.start() + this.watcher.watch.callCount.should.equal(1) }) - afterEach(function() { - this.watcher.watch.restore() + async function fakeWatch() { + throw new Error('from watch') + } + + it('returns a promise that rejects on error during first watch()', async function() { + sinon.stub(this.watcher, 'watch').callsFake(fakeWatch) + await should(this.watcher.start()).be.rejectedWith('from watch') }) - it('calls watch() a first time', function() { - this.watcher.watch.callCount.should.equal(1) + it('sets a "running" promise that rejects on error during second watch()', async function() { + sinon + .stub(this.watcher, 'watch') + .onFirstCall() + .resolves() + .onSecondCall() + .callsFake(fakeWatch) + await this.watcher.start() + await should(this.watcher.running).be.rejectedWith('from watch') }) }) @@ -96,14 +110,15 @@ describe('RemoteWatcher', function() { }) it('ensures watch is not called anymore', async function() { - await this.watcher.start().started + await this.watcher.start() should(this.watcher.runningResolve).not.be.null() this.watcher.stop() should(this.watcher.runningResolve).be.null() + await should(this.watcher.running).be.fulfilled() }) it('does nothing when called again', async function() { - await this.watcher.start().started + await this.watcher.start() this.watcher.stop() this.watcher.stop() }) diff --git a/test/unit/sync.js b/test/unit/sync.js index 20708dc63..c53832367 100644 --- a/test/unit/sync.js +++ b/test/unit/sync.js @@ -43,14 +43,11 @@ describe('Sync', function() { describe('start', function() { beforeEach('instanciate sync', function() { - const ret = { - started: Promise.resolve(), - running: new Promise(() => {}) - } this.local.start = sinon.stub().resolves() this.local.watcher.running = sinon.stub().resolves() this.local.stop = sinon.stub().resolves() - this.remote.start = sinon.stub().returns(ret) + this.remote.start = sinon.stub().resolves() + this.remote.watcher.running = sinon.stub().resolves() this.remote.stop = sinon.stub().resolves() this.sync.sync = sinon.stub().rejects(new Error('stopped')) }) From 78da0c9cc34e582a8e0314802c3806811e0bd285 Mon Sep 17 00:00:00 2001 From: Erwan Guyader Date: Thu, 23 Apr 2020 10:56:50 +0200 Subject: [PATCH 2/3] core/local/atom/watcher: Simplify start With the same goal of reducing the number of promises and resolve/reject methods used in our lifecyle methods, we made a small refactoring of the Atom Watcher start method, relying slightly more on async/await and avoiding the unnecessary use of reject methods. We also made sure the `producer.stop()` method is called before we resolve the `running` promise when calling `stop()` since it seems more appropriate to actually stop a process before saying it's been stopped. --- core/local/atom/watcher.js | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/core/local/atom/watcher.js b/core/local/atom/watcher.js index f813bfa79..ee286d8b2 100644 --- a/core/local/atom/watcher.js +++ b/core/local/atom/watcher.js @@ -141,27 +141,31 @@ class AtomWatcher { async start() { log.debug('starting...') + + await stepsInitialState(this.state, this) + const scanDone = new Promise(resolve => { + this.events.on('initial-scan-done', resolve) + }) + await this.producer.start() + await scanDone + this.running = new Promise((resolve, reject) => { this._runningResolve = resolve + // XXX: This rejecter is never used. How can the watcher fail? How to + // catch those errors and feed them to this rejecter? this._runningReject = reject }) - await stepsInitialState(this.state, this) - let rejectScan - const scanDone = new Promise((resolve, reject) => { - rejectScan = reject - this.events.on('initial-scan-done', resolve) - }) - this.producer.start().catch(err => rejectScan(err)) - return scanDone } async stop() /*: Promise<*> */ { log.debug('stopping...') + + this.producer.stop() + if (this._runningResolve) { this._runningResolve() this._runningResolve = null } - this.producer.stop() } } From 8501a4a7b4415e34f9eeb9da6d2e7bd0ef08b3ed Mon Sep 17 00:00:00 2001 From: Erwan Guyader Date: Thu, 23 Apr 2020 11:21:43 +0200 Subject: [PATCH 3/3] core/sync: Use event based lifecyle management The synchronization module's lifecycle is a bit complex to manage because we want to make sure we can't start it more than once at a time, that it is fully started before we can stop it and that it is fully stopped before we can start it. Also, we need to be able to stop different infinite loops (e.g. waiting for changes from the PouchDB database) and react to errors coming the different parts started by this module. Managing all of this with promises is quite complex and error prone. We attempt to simplify this lifecyle management by switching to an event based process, allowing us to move away from promises to know whether the synchronization module is started, stopped, starting or stopping and to provide a single hook to subscribe to all error sources. This should help us catch more errors and alert the user that they happened. --- core/sync.js | 184 ++++++++++++++++------------------ core/utils/lifecycle.js | 90 +++++++++++++++++ gui/main.js | 4 +- test/support/helpers/index.js | 2 + test/unit/sync.js | 99 +++++++++++++++--- 5 files changed, 266 insertions(+), 113 deletions(-) create mode 100644 core/utils/lifecycle.js diff --git a/core/sync.js b/core/sync.js index b59070961..055206e09 100644 --- a/core/sync.js +++ b/core/sync.js @@ -15,6 +15,7 @@ const { HEARTBEAT } = require('./remote/watcher') const { otherSide } = require('./side') const logger = require('./utils/logger') const measureTime = require('./utils/perfs') +const { LifeCycle } = require('./utils/lifecycle') /*:: import type EventEmitter from 'events' @@ -80,9 +81,7 @@ class Sync { pouch: Pouch remote: Remote moveTo: ?string - stopRequested: Promise - started: ?Promise - running: ?Promise + lifecycle: LifeCycle diskUsage: () => Promise<*> */ @@ -103,7 +102,7 @@ class Sync { this.events = events this.local.other = this.remote this.remote.other = this.local - this.stopRequested = false + this.lifecycle = new LifeCycle(log) autoBind(this) } @@ -112,71 +111,51 @@ class Sync { // First, start metadata synchronization in pouch, with the watchers // Then, when a stable state is reached, start applying changes from pouch async start() /*: Promise */ { - if (this.started && (await this.started)) return - - let runningResolve, runningReject - this.running = new Promise((resolve, reject) => { - runningResolve = resolve - runningReject = reject - }) - - this.started = new Promise(async (resolve, reject) => { - if (this.stopRequested) { - reject() - runningReject() - return - } - - log.info('Starting Sync...') + if (this.lifecycle.willStop()) { + await this.lifecycle.stopped() + } else { + return + } - try { - await this.local.start() - await this.remote.start() - - Promise.all([ - this.local.watcher.running, - this.remote.watcher.running - ]).catch(err => { - throw err - }) - } catch (err) { - log.error({ err }, 'Could not start watchers') - this.local.stop() - this.remote.stop() - reject(err) - runningReject(err) - } - resolve(true) + try { + this.lifecycle.begin('start') + } catch (err) { + return + } + try { + await this.local.start() + await this.remote.start() + } catch (err) { + this.error(err) + this.lifecycle.end('start') + await this.stop() + return + } + this.lifecycle.end('start') + + Promise.all([ + this.local.watcher.running, + this.remote.watcher.running + ]).catch(err => { + this.error(err) + this.stop() + return }) - await this.started - - log.info('Sync started') - try { - // eslint-disable-next-line no-constant-condition - while (!this.stopRequested) { + while (!this.lifecycle.willStop()) { await this.sync() } } catch (err) { - log.error({ err }, 'Sync error') - throw err - } finally { - if (this.changes) { - this.changes.cancel() - this.changes = null - } - - await Promise.all([this.local.stop(), this.remote.stop()]) - - this.started = null - if (runningResolve) { - runningResolve() - } - log.info('Sync stopped') + this.error(err) + await this.stop() } } + async started() { + await this.lifecycle.started() + } + // Manually force a full synchronization async forceSync() { await this.stop() @@ -185,21 +164,33 @@ class Sync { // Stop the synchronization async stop() /*: Promise */ { - try { - if (this.started) await this.started - } catch (err) { - log.error({ err }, 'started errored out during Sync stop') + if (this.lifecycle.willStart()) { + await this.lifecycle.started() + } else { + return } - log.info('Stopping Sync...') - this.stopRequested = true - this.events.emit('stopRequested') - const stopped = this.running || Promise.resolve() + try { - await stopped - this.stopRequested = false + this.lifecycle.begin('stop') } catch (err) { - log.error({ err }, 'running errored out during Sync stop') + return } + if (this.changes) { + this.changes.cancel() + this.changes = null + } + + await Promise.all([this.local.stop(), this.remote.stop()]) + this.lifecycle.end('stop') + } + + async stopped() { + await this.lifecycle.stopped() + } + + error(err /*: Error */) { + log.error({ err }, 'sync error') + this.events.emit('sync-error', err) } // TODO: remove waitForNewChanges to .start while(true) @@ -224,7 +215,7 @@ class Sync { async syncBatch() { let seq = null // eslint-disable-next-line no-constant-condition - while (!this.stopRequested) { + while (!this.lifecycle.willStop()) { seq = await this.pouch.getLocalSeqAsync() // TODO: Prevent infinite loop const change = await this.getNextChange(seq) @@ -234,7 +225,7 @@ class Sync { await this.apply(change) // XXX: apply should call setLocalSeqAsync } catch (err) { - if (!this.stopRequested) throw err + if (!this.lifecycle.willStop()) throw err } } } @@ -258,29 +249,23 @@ class Sync { const opts = await this.baseChangeOptions(seq) opts.live = true return new Promise((resolve, reject) => { - const resolver = data => { - this.events.off('stopRequested', resolver) - resolve(data) - } - const rejecter = err => { - this.events.off('stopRequested', resolver) - reject(err) - } - this.events.on('stopRequested', resolver) + this.lifecycle.once('will-stop', resolve) this.changes = this.pouch.db .changes(opts) - .on('change', c => { + .on('change', data => { + this.lifecycle.off('will-stop', resolve) if (this.changes) { this.changes.cancel() this.changes = null - resolver(c) + resolve(data) } }) .on('error', err => { + this.lifecycle.off('will-stop', resolve) if (this.changes) { - // FIXME: pas de cancel ici ?? + this.changes.cancel() this.changes = null - rejecter(err) + reject(err) } }) }) @@ -291,22 +276,21 @@ class Sync { const opts = await this.baseChangeOptions(seq) opts.include_docs = true const p = new Promise((resolve, reject) => { - const resolver = data => { - this.events.off('stopRequested', resolver) - resolve(data) - } - const rejecter = err => { - this.events.off('stopRequested', resolver) - reject(err) - } - this.events.on('stopRequested', resolver) - this.pouch.db + this.lifecycle.once('will-stop', resolve) + this.changes = this.pouch.db .changes(opts) - .on('change', info => resolver(info)) - .on('error', err => rejecter(err)) - .on('complete', info => { - if (info.results == null || info.results.length === 0) { - resolver(null) + .on('change', data => { + this.lifecycle.off('will-stop', resolve) + resolve(data) + }) + .on('error', err => { + this.lifecycle.off('will-stop', resolve) + reject(err) + }) + .on('complete', data => { + this.lifecycle.off('will-stop', resolve) + if (data.results == null || data.results.length === 0) { + resolve(null) } }) }) diff --git a/core/utils/lifecycle.js b/core/utils/lifecycle.js new file mode 100644 index 000000000..23e261428 --- /dev/null +++ b/core/utils/lifecycle.js @@ -0,0 +1,90 @@ +/** + * @module core/utils/lifecycle + * @flow + */ + +const EventEmitter = require('events') + +/*:: +import type { Logger } from './logger' + +type State = 'done-stop' | 'will-start' | 'done-start' | 'will-stop' +*/ + +class LifeCycle extends EventEmitter { + /*:: + currentState: State + log: Logger + */ + + constructor(logger /*: Logger */) { + super() + this.currentState = 'done-stop' + this.log = logger + } + + canTransitionTo(state /*: State */) { + return this.currentState !== state + } + + transitionTo(newState /*: State */) { + this.currentState = newState + this.emit(newState) + this.log.info(newState) + } + + async transitionedTo(futureState /*: State */) { + return new Promise(resolve => { + if (this.currentState === futureState) resolve() + else this.once(futureState, resolve) + }) + } + + begin(endState /*: 'start' | 'stop' */) { + switch (endState) { + case 'start': + if (this.canTransitionTo('will-start')) { + this.transitionTo('will-start') + } else { + throw new Error(`Cannot begin ${endState}`) + } + break + case 'stop': + if (this.canTransitionTo('will-stop')) { + this.transitionTo('will-stop') + } else { + throw new Error(`Cannot begin ${endState}`) + } + break + } + } + + end(endState /*: 'start' | 'stop' */) { + switch (endState) { + case 'start': + this.transitionTo('done-start') + break + case 'stop': + this.transitionTo('done-stop') + break + } + } + + willStop() { + return ['will-stop', 'done-stop'].includes(this.currentState) + } + + async stopped() { + await this.transitionedTo('done-stop') + } + + willStart() { + return ['will-start', 'done-start'].includes(this.currentState) + } + + async started() { + await this.transitionedTo('done-start') + } +} + +module.exports = { LifeCycle } diff --git a/gui/main.js b/gui/main.js index ebcd968cd..ae54c817a 100644 --- a/gui/main.js +++ b/gui/main.js @@ -434,7 +434,7 @@ const startSync = async () => { }) desktop.events.on('delete-file', removeFile) - desktop.startSync().catch(err => { + desktop.events.on('sync-error', err => { if (err.status === 402) { // Only show notification popup on the first check (the GUI will // include a warning anyway). @@ -455,6 +455,8 @@ const startSync = async () => { } sendDiskUsage() }) + + desktop.startSync() sendDiskUsage() autoLaunch.isEnabled().then(enabled => { diff --git a/test/support/helpers/index.js b/test/support/helpers/index.js index 4711905a2..dde532894 100644 --- a/test/support/helpers/index.js +++ b/test/support/helpers/index.js @@ -75,7 +75,9 @@ class TestHelpers { } async syncAll() { + this._sync.lifecycle.end('start') await this._sync.sync(false) + this._sync.lifecycle.end('stop') } async pullAndSyncAll() { diff --git a/test/unit/sync.js b/test/unit/sync.js index c53832367..d39d1dc87 100644 --- a/test/unit/sync.js +++ b/test/unit/sync.js @@ -50,25 +50,98 @@ describe('Sync', function() { this.remote.watcher.running = sinon.stub().resolves() this.remote.stop = sinon.stub().resolves() this.sync.sync = sinon.stub().rejects(new Error('stopped')) + sinon.spy(this.sync, 'stop') + sinon.spy(this.sync.events, 'emit') }) it('starts the metadata replication of both sides', async function() { - await should(this.sync.start()).be.rejectedWith({ - message: 'stopped' + await this.sync.start() + should(this.local.start).be.calledOnce() + should(this.remote.start).be.calledOnce() + should(this.sync.sync).be.calledOnce() + }) + + context('if local watcher fails to start', () => { + beforeEach(function() { + this.local.start = sinon.stub().rejects(new Error('failed')) + }) + + it('does not start replication', async function() { + await this.sync.start() + should(this.sync.sync).not.be.called() + }) + + it('does not start remote watcher', async function() { + await this.sync.start() + should(this.remote.start).not.be.called() + }) + + it('stops local watcher', async function() { + await this.sync.start() + should(this.local.stop).be.calledOnce() + }) + + it('emits a sync error', async function() { + await this.sync.start() + should(this.sync.events.emit).have.been.calledWith('sync-error') }) - this.local.start.calledOnce.should.be.true() - this.remote.start.calledOnce.should.be.true() - this.sync.sync.calledOnce.should.be.true() }) - it('does not start sync if metadata replication fails', async function() { - this.local.start = sinon.stub().rejects(new Error('failed')) - await should(this.sync.start()).be.rejectedWith({ - message: 'failed' + context('if remote watcher fails to start', () => { + beforeEach(function() { + this.remote.start = sinon.stub().rejects(new Error('failed')) + }) + + it('does not start replication', async function() { + await this.sync.start() + should(this.sync.sync).not.be.called() + }) + + it('starts local watcher', async function() { + await this.sync.start() + should(this.local.start).be.calledOnce() + }) + + it('stops local watcher', async function() { + await this.sync.start() + should(this.local.stop).be.calledOnce() + }) + + it('stops remote watcher', async function() { + await this.sync.start() + should(this.remote.stop).be.calledOnce() + }) + + it('emits a sync error', async function() { + await this.sync.start() + should(this.sync.events.emit).have.been.calledWith('sync-error') + }) + }) + + context('if local watcher rejects while running', () => { + beforeEach(function() { + this.local.watcher.running = sinon.stub().rejects(new Error('failed')) + }) + + it('stops replication', async function() { + await this.sync.start() + should(this.sync.stop).be.calledOnce() + }) + + it('stops local watcher', async function() { + await this.sync.start() + should(this.local.stop).be.calledOnce() + }) + + it('stops remote watcher', async function() { + await this.sync.start() + should(this.remote.stop).be.calledOnce() + }) + + it('emits a sync error', async function() { + await this.sync.start() + should(this.sync.events.emit).have.been.calledWith('sync-error') }) - this.local.start.calledOnce.should.be.true() - this.remote.start.called.should.be.false() - this.sync.sync.calledOnce.should.be.false() }) }) @@ -77,10 +150,12 @@ describe('Sync', function() { describe('sync', function() { beforeEach('stub lifecycle', function() { this.sync.events = new EventEmitter() + this.sync.lifecycle.end('start') }) afterEach('restore lifecycle', function() { this.sync.events.emit('stopped') delete this.sync.events + this.sync.lifecycle.end('stop') }) it('waits for and applies available changes', async function() {