-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathEventStream.mjs
61 lines (57 loc) · 1.95 KB
/
EventStream.mjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class EventStream {
constructor() {
this.stream = new Map()
this.groups = new Map()
}
async addMessage(id, message) {
if (!this.stream.has(id)) {
this.stream.set(id, [])
}
this.stream.get(id).push(message)
for await (const groupId of this.groups.keys()) {
await this.notifyConsumers(groupId)
}
}
async createGroup(groupName, streamKey, callback) {
if (!this.groups.has(groupName)) {
this.groups.set(groupName, new Map())
}
this.groups.get(groupName).set(streamKey, { position: 0, callback })
await this.notifyConsumers(groupName)
}
deleteGroup(groupName, streamKey) {
const group = this.groups.get(groupName)
if (group) {
group.delete(streamKey)
}
}
async notifyConsumers(groupName) {
const group = this.groups.get(groupName)
if (!group) {
return
}
for await (const [streamId, { position, callback }] of group) {
const messages = this.stream.get(streamId) || []
if (position >= messages.length) {
group.set(streamId, { position: 0, callback })
continue
}
const newMessages = messages.slice(position);
// console.log(`Consumer in group ${groupName} received messages: `, newMessages)
!(callback instanceof Function) || await callback(newMessages)
!(callback?.update) || await callback.update(newMessages)
group.set(streamId, { position: position + newMessages.length, callback })
}
}
}
class KeyValueEvent {
constructor(key, value) {
this.key = key;
this.value = value;
this.kind = 'KeyValueEvent'
this.version = '2023-05-06'
this.occurredAt = new Date().toISOString()
this.recordedAt = new Date().toISOString()
}
}
export { EventStream, KeyValueEvent }