Skip to content

Commit

Permalink
core: Refactor sync modules lifecycle management (#1865)
Browse files Browse the repository at this point in the history
We found out that we were not catching all errors coming from the remote watcher thus not alerting the user and not completely stopping the synchronization process.

We want to make sure this does not happen anymore and that other similar situations cannot happen.
To do so, we need to move away from the complex lifecycle management we have today in the watchers and sync modules.
  • Loading branch information
taratatach authored Apr 24, 2020
2 parents bf4dfb7 + 8501a4a commit 5afb3be
Show file tree
Hide file tree
Showing 10 changed files with 317 additions and 159 deletions.
22 changes: 13 additions & 9 deletions core/local/atom/watcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,27 +141,31 @@ class AtomWatcher {

async start() {
log.debug('starting...')

await stepsInitialState(this.state, this)
const scanDone = new Promise(resolve => {
this.events.on('initial-scan-done', resolve)
})
await this.producer.start()
await scanDone

this.running = new Promise((resolve, reject) => {
this._runningResolve = resolve
// XXX: This rejecter is never used. How can the watcher fail? How to
// catch those errors and feed them to this rejecter?
this._runningReject = reject
})
await stepsInitialState(this.state, this)
let rejectScan
const scanDone = new Promise((resolve, reject) => {
rejectScan = reject
this.events.on('initial-scan-done', resolve)
})
this.producer.start().catch(err => rejectScan(err))
return scanDone
}

async stop() /*: Promise<*> */ {
log.debug('stopping...')

this.producer.stop()

if (this._runningResolve) {
this._runningResolve()
this._runningResolve = null
}
this.producer.stop()
}
}

Expand Down
6 changes: 1 addition & 5 deletions core/remote/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,7 @@ class Remote /*:: implements Reader, Writer */ {
}

start() {
const { started, running } = this.watcher.start()
return {
started: started.then(() => this.warningsPoller.start()),
running
}
return this.watcher.start().then(() => this.warningsPoller.start())
}

stop() {
Expand Down
27 changes: 11 additions & 16 deletions core/remote/watcher/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class RemoteWatcher {
remoteCozy: RemoteCozy
events: EventEmitter
runningResolve: ?() => void
runningReject: ?() => void
running: Promise
*/

constructor(
Expand All @@ -56,26 +56,21 @@ class RemoteWatcher {
this.prep = prep
this.remoteCozy = remoteCozy
this.events = events
this.running = new Promise(() => {})

autoBind(this)
}

start() {
const started /*: Promise<void> */ = this.watch()
const running /*: Promise<void> */ = started.then(() =>
Promise.race([
// run until either stop is called or watchLoop reject
new Promise(resolve => {
this.runningResolve = resolve
}),
this.watchLoop()
])
)
async start() {
await this.watch()

return {
started: started,
running: running
}
this.running = Promise.race([
// run until either stop is called or watchLoop reject
new Promise(resolve => {
this.runningResolve = resolve
}),
this.watchLoop()
])
}

stop() {
Expand Down
186 changes: 84 additions & 102 deletions core/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const { HEARTBEAT } = require('./remote/watcher')
const { otherSide } = require('./side')
const logger = require('./utils/logger')
const measureTime = require('./utils/perfs')
const { LifeCycle } = require('./utils/lifecycle')

/*::
import type EventEmitter from 'events'
Expand Down Expand Up @@ -80,9 +81,7 @@ class Sync {
pouch: Pouch
remote: Remote
moveTo: ?string
stopRequested: Promise
started: ?Promise<boolean>
running: ?Promise
lifecycle: LifeCycle
diskUsage: () => Promise<*>
*/
Expand All @@ -103,7 +102,7 @@ class Sync {
this.events = events
this.local.other = this.remote
this.remote.other = this.local
this.stopRequested = false
this.lifecycle = new LifeCycle(log)

autoBind(this)
}
Expand All @@ -112,73 +111,51 @@ class Sync {
// First, start metadata synchronization in pouch, with the watchers
// Then, when a stable state is reached, start applying changes from pouch
async start() /*: Promise<void> */ {
if (this.started && (await this.started)) return

let runningResolve, runningReject
this.running = new Promise((resolve, reject) => {
runningResolve = resolve
runningReject = reject
})

this.started = new Promise(async (resolve, reject) => {
if (this.stopRequested) {
reject()
runningReject()
return
}

log.info('Starting Sync...')
if (this.lifecycle.willStop()) {
await this.lifecycle.stopped()
} else {
return
}

try {
await this.local.start()
const localRunning = this.local.watcher.running
const {
running: remoteRunning,
started: remoteStarted
} = this.remote.start()
await remoteStarted

Promise.all([localRunning, remoteRunning]).catch(err => {
throw err
})
} catch (err) {
log.error({ err }, 'Could not start watchers')
this.local.stop()
this.remote.stop()
reject(err)
runningReject(err)
}
resolve(true)
try {
this.lifecycle.begin('start')
} catch (err) {
return
}
try {
await this.local.start()
await this.remote.start()
} catch (err) {
this.error(err)
this.lifecycle.end('start')
await this.stop()
return
}
this.lifecycle.end('start')

Promise.all([
this.local.watcher.running,
this.remote.watcher.running
]).catch(err => {
this.error(err)
this.stop()
return
})

await this.started

log.info('Sync started')

try {
// eslint-disable-next-line no-constant-condition
while (!this.stopRequested) {
while (!this.lifecycle.willStop()) {
await this.sync()
}
} catch (err) {
log.error({ err }, 'Sync error')
throw err
} finally {
if (this.changes) {
this.changes.cancel()
this.changes = null
}

await Promise.all([this.local.stop(), this.remote.stop()])

this.started = null
if (runningResolve) {
runningResolve()
}
log.info('Sync stopped')
this.error(err)
await this.stop()
}
}

async started() {
await this.lifecycle.started()
}

// Manually force a full synchronization
async forceSync() {
await this.stop()
Expand All @@ -187,21 +164,33 @@ class Sync {

// Stop the synchronization
async stop() /*: Promise<void> */ {
try {
if (this.started) await this.started
} catch (err) {
log.error({ err }, 'started errored out during Sync stop')
if (this.lifecycle.willStart()) {
await this.lifecycle.started()
} else {
return
}
log.info('Stopping Sync...')
this.stopRequested = true
this.events.emit('stopRequested')
const stopped = this.running || Promise.resolve()

try {
await stopped
this.stopRequested = false
this.lifecycle.begin('stop')
} catch (err) {
log.error({ err }, 'running errored out during Sync stop')
return
}
if (this.changes) {
this.changes.cancel()
this.changes = null
}

await Promise.all([this.local.stop(), this.remote.stop()])
this.lifecycle.end('stop')
}

async stopped() {
await this.lifecycle.stopped()
}

error(err /*: Error */) {
log.error({ err }, 'sync error')
this.events.emit('sync-error', err)
}

// TODO: remove waitForNewChanges to .start while(true)
Expand All @@ -226,7 +215,7 @@ class Sync {
async syncBatch() {
let seq = null
// eslint-disable-next-line no-constant-condition
while (!this.stopRequested) {
while (!this.lifecycle.willStop()) {
seq = await this.pouch.getLocalSeqAsync()
// TODO: Prevent infinite loop
const change = await this.getNextChange(seq)
Expand All @@ -236,7 +225,7 @@ class Sync {
await this.apply(change)
// XXX: apply should call setLocalSeqAsync
} catch (err) {
if (!this.stopRequested) throw err
if (!this.lifecycle.willStop()) throw err
}
}
}
Expand All @@ -260,29 +249,23 @@ class Sync {
const opts = await this.baseChangeOptions(seq)
opts.live = true
return new Promise((resolve, reject) => {
const resolver = data => {
this.events.off('stopRequested', resolver)
resolve(data)
}
const rejecter = err => {
this.events.off('stopRequested', resolver)
reject(err)
}
this.events.on('stopRequested', resolver)
this.lifecycle.once('will-stop', resolve)
this.changes = this.pouch.db
.changes(opts)
.on('change', c => {
.on('change', data => {
this.lifecycle.off('will-stop', resolve)
if (this.changes) {
this.changes.cancel()
this.changes = null
resolver(c)
resolve(data)
}
})
.on('error', err => {
this.lifecycle.off('will-stop', resolve)
if (this.changes) {
// FIXME: pas de cancel ici ??
this.changes.cancel()
this.changes = null
rejecter(err)
reject(err)
}
})
})
Expand All @@ -293,22 +276,21 @@ class Sync {
const opts = await this.baseChangeOptions(seq)
opts.include_docs = true
const p = new Promise((resolve, reject) => {
const resolver = data => {
this.events.off('stopRequested', resolver)
resolve(data)
}
const rejecter = err => {
this.events.off('stopRequested', resolver)
reject(err)
}
this.events.on('stopRequested', resolver)
this.pouch.db
this.lifecycle.once('will-stop', resolve)
this.changes = this.pouch.db
.changes(opts)
.on('change', info => resolver(info))
.on('error', err => rejecter(err))
.on('complete', info => {
if (info.results == null || info.results.length === 0) {
resolver(null)
.on('change', data => {
this.lifecycle.off('will-stop', resolve)
resolve(data)
})
.on('error', err => {
this.lifecycle.off('will-stop', resolve)
reject(err)
})
.on('complete', data => {
this.lifecycle.off('will-stop', resolve)
if (data.results == null || data.results.length === 0) {
resolve(null)
}
})
})
Expand Down
Loading

0 comments on commit 5afb3be

Please sign in to comment.