Skip to content

Commit f38e1ac

Browse files
authored
Merge pull request #140 from ssbc/4.6_overlap_membership
4.6 overlap membership
2 parents 45bf6ab + 126976a commit f38e1ac

File tree

7 files changed

+613
-305
lines changed

7 files changed

+613
-305
lines changed

lib/epochs/get-members.js

+117
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// SPDX-FileCopyrightText: 2022 Mix Irving
2+
//
3+
// SPDX-License-Identifier: LGPL-3.0-only
4+
5+
const { promisify: p } = require('util')
6+
const pull = require('pull-stream')
7+
const pullDefer = require('pull-defer')
8+
const clarify = require('clarify-error')
9+
const {
10+
validator: {
11+
group: { addMember: isAddMember, excludeMember: isExcludeMember },
12+
},
13+
} = require('private-group-spec')
14+
const { fromMessageSigil } = require('ssb-uri2')
15+
16+
const getTangleUpdates = require('../tangles/get-tangle-updates')
17+
18+
function toMsgURI(id) {
19+
return id.startsWith('%') ? fromMessageSigil(id) : id
20+
}
21+
22+
module.exports = function GetMembers(ssb) {
23+
function getMembers(epochRootId, cb) {
24+
if (cb === undefined) return p(getMembers)(epochRootId)
25+
26+
epochRootId = toMsgURI(epochRootId)
27+
const added = new Set()
28+
const toExclude = new Set()
29+
30+
pull(
31+
getTangleUpdates.stream(ssb, 'members', epochRootId),
32+
pull.filter((msg) => isAddMember(msg) || isExcludeMember(msg)),
33+
pull.through((msg) => {
34+
const { type, recps, excludes } = msg.value.content
35+
if (type === 'group/add-member')
36+
recps.slice(1).forEach((feedId) => added.add(feedId))
37+
else return excludes.forEach((feedId) => toExclude.add(feedId))
38+
}),
39+
pull.collect((err) => {
40+
if (err) return cb(clarify(err, 'Failed to resolve epoch membership'))
41+
42+
cb(null, {
43+
added: [...added],
44+
toExclude: [...toExclude],
45+
})
46+
})
47+
)
48+
}
49+
getMembers.stream = function getMembersStream(epochRootId, opts = {}) {
50+
const { live } = opts
51+
52+
const deferredSource = pullDefer.source()
53+
54+
getMembers(epochRootId, (err, res) => {
55+
// prettier-ignore
56+
if (err) return deferredSource.abort(clarify(err, 'error getting members'))
57+
58+
if (!live) {
59+
deferredSource.resolve(pull.once(res))
60+
return
61+
}
62+
63+
const added = new Set(res.added)
64+
const toExclude = new Set(res.toExclude)
65+
66+
const source = pull(
67+
// create a stream of "there is an update" events
68+
pull.values([
69+
// one event for current state
70+
pull.once(true),
71+
72+
// run a live stream, only emiting "true" if there is new info in the
73+
// message that comes in
74+
pull(
75+
getTangleUpdates.stream(ssb, 'members', epochRootId, { live }),
76+
pull.map((msg) => {
77+
if (isAddMember(msg)) {
78+
const initialSize = added.size
79+
msg.value.content.recps
80+
.slice(1)
81+
.forEach((feedId) => added.add(feedId))
82+
return added.size > initialSize
83+
}
84+
85+
if (isExcludeMember(msg)) {
86+
const initialSize = toExclude.size
87+
msg.value.content.excludes.forEach((feedId) =>
88+
toExclude.add(feedId)
89+
)
90+
return toExclude.size > initialSize
91+
}
92+
93+
return false
94+
}),
95+
pull.filter(Boolean)
96+
),
97+
]),
98+
pull.flatten(),
99+
100+
// for each "there is an update" event, map that to emitting the current
101+
// membereship state of the epoch
102+
pull.map(() => {
103+
return {
104+
added: [...added],
105+
toExclude: [...toExclude],
106+
}
107+
})
108+
)
109+
110+
return deferredSource.resolve(source)
111+
})
112+
113+
return deferredSource
114+
}
115+
116+
return getMembers
117+
}

0 commit comments

Comments
 (0)