Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync): Reconnect the websocket on failures. #2235

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 49 additions & 7 deletions umap/static/umap/js/modules/sync/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ import { HybridLogicalClock } from './hlc.js'
import { DataLayerUpdater, FeatureUpdater, MapUpdater } from './updaters.js'
import { WebSocketTransport } from './websocket.js'

// Start reconnecting after 2 seconds, then double the delay each time
// maxing out at 32 seconds.
const RECONNECT_DELAY = 2000;
const RECONNECT_DELAY_FACTOR = 2;
const MAX_RECONNECT_DELAY = 32000;

/**
* The syncEngine exposes an API to sync messages between peers over the network.
*
Expand Down Expand Up @@ -42,32 +48,67 @@ import { WebSocketTransport } from './websocket.js'
* ```
*/
export class SyncEngine {
constructor(map) {
constructor(map, websocketTokenURI, websocketURI, server) {
this.updaters = {
map: new MapUpdater(map),
feature: new FeatureUpdater(map),
datalayer: new DataLayerUpdater(map),
}
this.transport = undefined
this._operations = new Operations()
this._server = server

this._websocketTokenURI = websocketTokenURI
this._websocketURI = websocketURI

this._reconnectTimeout = null;
this._reconnectDelay = RECONNECT_DELAY;
this.websocketConnected = false;
}

async authenticate(tokenURI, webSocketURI, server) {
const [response, _, error] = await server.get(tokenURI)
/**
* Authenticate with the server and start the transport layer.
*/
async authenticate() {
console.log("authenticating")
const [response, _, error] = await this._server.get(this._websocketTokenURI)
if (!error) {
this.start(webSocketURI, response.token)
this.start(response.token)
}
}

start(webSocketURI, authToken) {
this.transport = new WebSocketTransport(webSocketURI, authToken, this)
start(authToken) {
this.transport = new WebSocketTransport(this._websocketURI, authToken, this)
}

stop() {
if (this.transport) this.transport.close()
if (this.transport){
this.transport.close()
}
this.transport = undefined
}

onConnection() {
this._reconnectTimeout = null;
this._reconnectDelay = RECONNECT_DELAY;
this.websocketConnected = true;
this.updaters.map.update({ key: 'numberOfConnectedPeers' })
}

reconnect() {
this.websocketConnected = false;
this.updaters.map.update({ key: 'numberOfConnectedPeers' })

console.log("reconnecting in ", this._reconnectDelay, " ms")
this._reconnectTimeout = setTimeout(() => {
if (this._reconnectDelay < MAX_RECONNECT_DELAY) {
this._reconnectDelay = this._reconnectDelay * RECONNECT_DELAY_FACTOR
}
console.log("reconnecting now")
this.authenticate()
}, this._reconnectDelay);
}

upsert(subject, metadata, value) {
this._send({ verb: 'upsert', subject, metadata, value })
}
Expand Down Expand Up @@ -448,3 +489,4 @@ export class Operations {
function debug(...args) {
console.debug('SYNC ⇆', ...args)
}

49 changes: 47 additions & 2 deletions umap/static/umap/js/modules/sync/websocket.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,59 @@
const PONG_TIMEOUT = 5000;
const PING_INTERVAL = 30000;
const FIRST_CONNECTION_TIMEOUT = 2000;

export class WebSocketTransport {
constructor(webSocketURI, authToken, messagesReceiver) {
this.receiver = messagesReceiver
this.closeRequested = false

this.websocket = new WebSocket(webSocketURI)

this.websocket.onopen = () => {
this.send('JoinRequest', { token: authToken })
this.receiver.onConnection()
}
this.websocket.addEventListener('message', this.onMessage.bind(this))
this.receiver = messagesReceiver
this.websocket.onclose = () => {
console.log("websocket closed")
if (!this.closeRequested) {
console.log("Not requested, reconnecting...")
this.receiver.reconnect()
}
}

this.ensureOpen = setInterval(() => {
if (this.websocket.readyState !== WebSocket.OPEN) {
this.websocket.close()
clearInterval(this.ensureOpen)
}
}, FIRST_CONNECTION_TIMEOUT)

// To ensure the connection is still alive, we send ping and expect pong back.
// Websocket provides a `ping` method to keep the connection alive, but it's
// unfortunately not possible to access it from the WebSocket object.
// See https://making.close.com/posts/reliable-websockets/ for more details.
this.pingInterval = setInterval(() => {
if (this.websocket.readyState === WebSocket.OPEN) {
this.websocket.send('ping');
this.pongReceived = false;
setTimeout(() => {
if (!this.pongReceived) {
console.warn('No pong received, reconnecting...');
this.websocket.close()
clearInterval(this.pingInterval)
}
}, PONG_TIMEOUT);
}
}, PING_INTERVAL);
}

onMessage(wsMessage) {
this.receiver.receive(JSON.parse(wsMessage.data))
if (wsMessage.data === 'pong') {
this.pongReceived = true;
} else {
this.receiver.receive(JSON.parse(wsMessage.data))
}
}

send(kind, payload) {
Expand All @@ -20,6 +64,7 @@ export class WebSocketTransport {
}

close() {
this.closeRequested = true
this.websocket.close()
}
}
57 changes: 35 additions & 22 deletions umap/static/umap/js/umap.controls.js
Original file line number Diff line number Diff line change
Expand Up @@ -661,30 +661,43 @@ const ControlsMixin = {
})
}

const connectedPeers = this.sync.getNumberOfConnectedPeers()
if (connectedPeers !== 0) {
const connectedPeersCount = L.DomUtil.createButton(
'leaflet-control-connected-peers',
rightContainer,
''
)
L.DomEvent.on(connectedPeersCount, 'mouseover', () => {
this.tooltip.open({
content: L._('{connectedPeers} peer(s) currently connected to this map', {
connectedPeers: connectedPeers,
}),
anchor: connectedPeersCount,
position: 'bottom',
delay: 500,
duration: 5000,
})
})
if (this.options.syncEnabled) {
const connectedPeers = this.sync.getNumberOfConnectedPeers()
let hoverMessage = ''

if (connectedPeers !== 0 || !this.sync.websocketConnected) {
const connectedPeersCount = L.DomUtil.createButton(
'leaflet-control-connected-peers',
rightContainer,
''
)
if (this.sync.websocketConnected) {
connectedPeersCount.innerHTML =
'<span>' + this.sync.getNumberOfConnectedPeers() + '</span>'
hoverMessage = L._(
'{connectedPeers} peer(s) currently connected to this map',
{
connectedPeers: connectedPeers,
}
)
} else {
connectedPeersCount.innerHTML = '<span>' + L._('Disconnected') + '</span>'
connectedPeersCount.classList.add('disconnected')
hoverMessage = L._('Reconnecting in {seconds} seconds', {
seconds: this.sync._reconnectDelay / 1000,
})
}

const updateConnectedPeersCount = () => {
connectedPeersCount.innerHTML =
'<span>' + this.sync.getNumberOfConnectedPeers() + '</span>'
L.DomEvent.on(connectedPeersCount, 'mouseover', () => {
this.tooltip.open({
content: hoverMessage,
anchor: connectedPeersCount,
position: 'bottom',
delay: 500,
duration: 5000,
})
})
}
updateConnectedPeersCount()
}

this.help.getStartedLink(rightContainer)
Expand Down
47 changes: 40 additions & 7 deletions umap/static/umap/js/umap.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ L.Map.mergeOptions({
// we cannot rely on this because of the y is overriden by Leaflet
// See https://github.com/Leaflet/Leaflet/pull/9201
// And let's remove this -y when this PR is merged and released.
demoTileInfos: { s: 'a', z: 9, x: 265, y: 181, '-y': 181, r: '' },
demoTileInfos: { 's': 'a', 'z': 9, 'x': 265, 'y': 181, '-y': 181, 'r': '' },
licences: [],
licence: '',
enableMarkerDraw: true,
Expand All @@ -30,8 +30,6 @@ U.Map = L.Map.extend({
includes: [ControlsMixin],

initialize: async function (el, geojson) {
this.sync_engine = new U.SyncEngine(this)
this.sync = this.sync_engine.proxy(this)
// Locale name (pt_PT, en_US…)
// To be used for Django localization
if (geojson.properties.locale) L.setLocale(geojson.properties.locale)
Expand Down Expand Up @@ -70,6 +68,21 @@ U.Map = L.Map.extend({
this.server = new U.ServerRequest()
this.request = new U.Request()

// Store URIs to avoid persisting the map
// mainly to ensure separation of concerns.
const websocketTokenURI = this.urls.get('map_websocket_auth_token', {
map_id: this.options.umap_id,
})
const websocketURI = this.options.websocketURI

this.sync_engine = new U.SyncEngine(
this,
websocketTokenURI,
websocketURI,
this.server
)
this.sync = this.sync_engine.proxy(this)

this.initLoader()
this.name = this.options.name
this.description = this.options.description
Expand Down Expand Up @@ -205,14 +218,13 @@ U.Map = L.Map.extend({
},

initSyncEngine: async function () {
// This.options.websocketEnabled is set by the server admin
if (this.options.websocketEnabled === false) return
// This.options.syncEnabled is set by the user in the map settings
if (this.options.syncEnabled !== true) {
this.sync.stop()
} else {
const ws_token_uri = this.urls.get('map_websocket_auth_token', {
map_id: this.options.umap_id,
})
await this.sync.authenticate(ws_token_uri, this.options.websocketURI, this.server)
await this.sync.authenticate()
}
},

Expand All @@ -224,6 +236,27 @@ U.Map = L.Map.extend({
},

render: function (fields) {
console.log("sync.websocketConnected", this.sync.websocketConnected)
console.log("options.syncEnabled", this.options.syncEnabled)
if (this.options.syncEnabled === true) {
if (this.sync.websocketConnected !== true) {
const template = `
<h3><i class="icon icon-16"></i><span>${L._('Disconnected')}</span></h3>
<p>
${L._('This map has enabled real-time synchronization with other users, but you are currently disconnected.It will automatically reconnect when ready.')}
</p>
`
this.dialog.open({
template: template,
className: 'dark',
cancel: false,
accept: false,
})
} else {
this.dialog.close()
}
}

if (fields.includes('numberOfConnectedPeers')) {
this.renderEditToolbar()
this.propagate()
Expand Down
6 changes: 6 additions & 0 deletions umap/static/umap/map.css
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,12 @@ ul.photon-autocomplete {
background-color: var(--color-lightCyan);
color: var(--color-dark);
}
.leaflet-container .leaflet-control-connected-peers.disconnected,
.leaflet-container .leaflet-control-connected-peers.disconnected:hover
{
background-color: var(--color-red);
color: var(--color-darkGray);
}

.leaflet-container .leaflet-control-edit-disable:before,
.leaflet-container .leaflet-control-edit-save:before,
Expand Down
5 changes: 4 additions & 1 deletion umap/static/umap/unittests/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import { MapUpdater } from '../js/modules/sync/updaters.js'
import { SyncEngine, Operations } from '../js/modules/sync/engine.js'

describe('SyncEngine', () => {
const websocketTokenURI = 'http://localhost:8000/api/v1/maps/1/websocket_auth_token/'
const websocketURI = 'ws://localhost:8000/ws/maps/1/'

it('should initialize methods even before start', () => {
const engine = new SyncEngine({})
const engine = new SyncEngine({}, websocketTokenURI, websocketURI, {})
engine.upsert()
engine.update()
engine.delete()
Expand Down
9 changes: 8 additions & 1 deletion umap/websocket_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ async def join_and_listen(

try:
async for raw_message in websocket:
if raw_message == "ping":
await websocket.send("pong")
continue

# recompute the peers list at the time of message-sending.
# as doing so beforehand would miss new connections
other_peers = connections.get_other_peers(websocket)
Expand Down Expand Up @@ -192,4 +196,7 @@ async def _serve():
logging.debug(f"Waiting for connections on {host}:{port}")
await asyncio.Future() # run forever

asyncio.run(_serve())
try:
asyncio.run(_serve())
except KeyboardInterrupt:
print("Closing WebSocket server")