Skip to content

Commit 64a1cb7

Browse files
EastSun5566galaxian85
authored andcommitted
feat: add enableAwareness option to API and SocketIO server
1 parent bb53494 commit 64a1cb7

File tree

4 files changed

+40
-17
lines changed

4 files changed

+40
-17
lines changed

src/api.js

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ const decodeRedisRoomStreamName = (rediskey, expectedPrefix) => {
8484

8585
/**
8686
* @param {import('./storage.js').AbstractStorage} store
87-
* @param {{ redisPrefix?: string, redisUrl?: string }} opts
87+
* @param {{ redisPrefix?: string, redisUrl?: string, enableAwareness?: boolean }} opts
8888
*/
89-
export const createApiClient = async (store, { redisPrefix, redisUrl }) => {
90-
const a = new Api(store, redisPrefix, redisUrl)
89+
export const createApiClient = async (store, { redisPrefix, redisUrl, enableAwareness = true }) => {
90+
const a = new Api(store, redisPrefix, redisUrl, { enableAwareness })
9191
await a.redis.connect()
9292
try {
9393
await a.redis.xGroupCreate(a.redisWorkerStreamName, a.redisWorkerGroupName, '0', { MKSTREAM: true })
@@ -100,10 +100,13 @@ export class Api {
100100
* @param {import('./storage.js').AbstractStorage} store
101101
* @param {string=} prefix
102102
* @param {string=} url
103+
* @param {Object} opts
104+
* @param {boolean=} opts.enableAwareness
103105
*/
104-
constructor (store, prefix = 'y', url = env.ensureConf('ysr-redis')) {
106+
constructor (store, prefix = 'y', url = env.ensureConf('ysr-redis'), { enableAwareness = true } = {}) {
105107
this.store = store
106108
this.prefix = prefix
109+
this.enableAwareness = enableAwareness
107110
this.consumername = random.uuidv4()
108111
/**
109112
* After this timeout, a new worker will pick up the task
@@ -240,8 +243,11 @@ export class Api {
240243
if (docMessages?.messages) logApi(`processing messages of length: ${docMessages?.messages.length} in room: ${room}`)
241244
const docstate = await this.store.retrieveDoc(room, docid)
242245
const ydoc = new Y.Doc()
243-
const awareness = new awarenessProtocol.Awareness(ydoc)
244-
awareness.setLocalState(null) // we don't want to propagate awareness state
246+
let awareness = null
247+
if (this.enableAwareness) {
248+
awareness = new awarenessProtocol.Awareness(ydoc)
249+
awareness.setLocalState(null) // we don't want to propagate awareness state
250+
}
245251
const now = performance.now()
246252
if (docstate) { Y.applyUpdateV2(ydoc, docstate.doc) }
247253
let changed = false
@@ -257,7 +263,9 @@ export class Api {
257263
break
258264
}
259265
case 1: { // awareness message
260-
awarenessProtocol.applyAwarenessUpdate(awareness, decoding.readVarUint8Array(decoder), null)
266+
if (this.enableAwareness && awareness) {
267+
awarenessProtocol.applyAwarenessUpdate(awareness, decoding.readVarUint8Array(decoder), null)
268+
}
261269
break
262270
}
263271
}
@@ -394,7 +402,7 @@ export class Api {
394402

395403
/**
396404
* @param {import('./storage.js').AbstractStorage} store
397-
* @param {{ redisPrefix?: string, redisUrl?: string }} opts
405+
* @param {{ redisPrefix?: string, redisUrl?: string, enableAwareness?: boolean }} opts
398406
*/
399407
export const createWorker = async (store, opts) => {
400408
const a = await createApiClient(store, opts)

src/socketio.js

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,16 @@ class YSocketIOServer {
3838
* @param {string} [conf.redisUrl]
3939
* @param {import('./y-socket-io/y-socket-io.js').YSocketIOConfiguration['authenticate']} conf.authenticate
4040
* @param {import('worker_threads').Worker=} [conf.persistWorker]
41+
* @param {boolean} [conf.enableAwareness]
4142
*/
42-
export const registerYSocketIOServer = async (io, store, { authenticate, redisUrl, redisPrefix, persistWorker }) => {
43-
const app = new YSocketIO(io, { authenticate })
43+
export const registerYSocketIOServer = async (io, store, {
44+
authenticate,
45+
redisUrl,
46+
redisPrefix,
47+
persistWorker,
48+
enableAwareness = true
49+
}) => {
50+
const app = new YSocketIO(io, { authenticate, enableAwareness })
4451
const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix, persistWorker })
4552
return new YSocketIOServer(app, client, subscriber)
4653
}

src/subscriber.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ const run = async subscriber => {
3232

3333
/**
3434
* @param {import('./storage.js').AbstractStorage} store
35-
* @param {{ redisPrefix?: string, redisUrl?: string }} opts
35+
* @param {{ redisPrefix?: string, redisUrl?: string, enableAwareness?: boolean }} opts
3636
*/
3737
export const createSubscriber = async (store, opts) => {
3838
const client = await api.createApiClient(store, opts)

src/y-socket-io/y-socket-io.js

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ process.on('SIGINT', function () {
5757
*
5858
* @prop {(socket: Socket)=> Promise<UserLike | null> | UserLike | null} authenticate
5959
* Callback to authenticate the client connection.
60+
* @prop {boolean=} enableAwareness
61+
* Enable/disable awareness functionality, defaults to true
6062
*/
6163

6264
/**
@@ -159,7 +161,10 @@ export class YSocketIO {
159161
*/
160162
constructor (io, configuration) {
161163
this.io = io
162-
this.configuration = configuration
164+
this.configuration = {
165+
enableAwareness: true,
166+
...configuration
167+
}
163168
}
164169

165170
/**
@@ -174,9 +179,10 @@ export class YSocketIO {
174179
* @public
175180
*/
176181
async initialize (store, { redisUrl, redisPrefix = 'y', persistWorker } = {}) {
182+
const { enableAwareness } = this.configuration
177183
const [client, subscriber] = await promise.all([
178-
api.createApiClient(store, { redisUrl, redisPrefix }),
179-
createSubscriber(store, { redisUrl, redisPrefix })
184+
api.createApiClient(store, { redisUrl, redisPrefix, enableAwareness }),
185+
createSubscriber(store, { redisUrl, redisPrefix, enableAwareness })
180186
])
181187
this.client = client
182188
this.subscriber = subscriber
@@ -243,7 +249,9 @@ export class YSocketIO {
243249
this.streamNamespaceMap.set(stream, namespace)
244250

245251
this.initSyncListeners(socket)
246-
this.initAwarenessListeners(socket)
252+
if (this.configuration.enableAwareness) {
253+
this.initAwarenessListeners(socket)
254+
}
247255
this.initSocketListeners(socket)
248256
;(async () => {
249257
assert(this.client)
@@ -404,7 +412,7 @@ export class YSocketIO {
404412
.catch(console.error)
405413
}
406414
)
407-
if (doc.awareness.states.size > 0) {
415+
if (this.configuration.enableAwareness && doc.awareness.states.size > 0) {
408416
socket.emit(
409417
'awareness-update',
410418
AwarenessProtocol.encodeAwarenessUpdate(
@@ -436,7 +444,7 @@ export class YSocketIO {
436444

437445
for (const m of messages) {
438446
const decoded = this.fromRedis(m)
439-
if (decoded.type === 'awareness-update') awareness.push(decoded.message)
447+
if (decoded.type === 'awareness-update' && this.configuration.enableAwareness) awareness.push(decoded.message)
440448
else updates.push(decoded.message)
441449
}
442450

0 commit comments

Comments
 (0)