Skip to content

Commit a99290d

Browse files
authored
Merge pull request #134 from ssbc/preferred-live-crash
Be more forgiving in getPreferredEpoch stream
2 parents d4d7079 + bb64b76 commit a99290d

File tree

2 files changed

+70
-25
lines changed

2 files changed

+70
-25
lines changed

lib/epochs.js

+66-21
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
// SPDX-License-Identifier: LGPL-3.0-only
44

55
const { promisify: p } = require('util')
6-
const { fromMessageSigil } = require('ssb-uri2')
6+
const { fromMessageSigil, toMessageSigil } = require('ssb-uri2')
77
const pull = require('pull-stream')
88
const pullDefer = require('pull-defer')
99
const pullFlatMerge = require('pull-flat-merge')
@@ -12,7 +12,15 @@ const OverwriteFields = require('@tangle/overwrite-fields')
1212
const clarify = require('clarify-error')
1313
const Butt64 = require('butt64')
1414
const isCanonicalBase64 = require('is-canonical-base64')
15-
const { where, and, type, live, toPullStream } = require('ssb-db2/operators')
15+
const {
16+
where,
17+
and,
18+
type,
19+
live,
20+
key,
21+
isDecrypted,
22+
toPullStream,
23+
} = require('ssb-db2/operators')
1624
const {
1725
validator: {
1826
group: {
@@ -28,6 +36,7 @@ const isSubsetOf = require('set.prototype.issubsetof')
2836

2937
const { groupRecp } = require('./operators')
3038
const getTangleUpdates = require('./tangles/get-tangle-updates')
39+
const pullMany = require('pull-many')
3140

3241
const msgPattern = toPattern(new Butt64('ssb:message/[a-zA-Z0-9-]+/', null, 32))
3342
const feedPattern = toPattern(new Butt64('ssb:feed/[a-zA-Z0-9-]+/', null, 32))
@@ -268,14 +277,16 @@ function Epochs(ssb) {
268277
// then skip all the preferrentEpochs until we get up to the current
269278
// This is important for listMembers to not send confusing results
270279
getPreferredEpoch(groupId, (err, preferredEpoch) => {
271-
if (err) return deferredSource.abort(clarify(err, 'failed to get initial preferred epoch'))
272-
280+
// if we're live and this fails we don't really mind, just go straight to live
273281
if (!live) {
274-
deferredSource.resolve(pull.once(preferredEpoch))
282+
if (err) deferredSource.abort(clarify(err, 'failed to get initial preferred epoch'))
283+
else deferredSource.resolve(pull.once(preferredEpoch))
284+
275285
return
276286
}
277287

278-
var sync = false
288+
// if we couldn't get current preferred, we'll just go live
289+
var sync = !!err
279290
const source = pull(
280291
epochsReduce.stream(groupId, { getters: allGetters, live }),
281292
pull.asyncMap(buildPreferredEpoch),
@@ -509,24 +520,58 @@ function epochNodeStream(ssb, groupId, opts = {}) {
509520

510521
return deferredSource
511522
}
512-
function getGroupInit(ssb, groupId, cb) {
513-
ssb.box2.getGroupInfo(groupId, (err, info) => {
514-
// prettier-ignore
515-
if (err) return cb(clarify(err, 'Failed to get group info for ' + groupId))
516-
if (!info) return cb(new Error('Unknown group'))
517523

518-
// Fetch the tangle root
519-
ssb.db.get(info.root, (err, rootVal) => {
520-
// prettier-ignore
521-
if (err) return cb(clarify(err, 'Failed to load group root with id ' + info.root))
524+
function getRootVal(ssb, msgId, cb) {
525+
pull(
526+
pullMany([
527+
ssb.db.query(
528+
where(and(isDecrypted('box2'), key(toMessageSigil(msgId)))),
529+
live({ old: true }),
530+
toPullStream()
531+
),
532+
pull(
533+
ssb.db.reindexed(),
534+
pull.filter((msg) => fromMessageSigil(msg.key) === msgId)
535+
),
536+
]),
537+
pull.take(1),
538+
pull.drain(
539+
(msg) => cb(null, msg.value),
540+
(err) => {
541+
if (err) cb(Error('Failed getting root msg async', { cause: err }))
542+
}
543+
)
544+
)
545+
}
522546

523-
if (!isInitRoot(rootVal))
547+
function getGroupInit(ssb, groupId, cb) {
548+
pull(
549+
ssb.box2.getGroupInfoUpdates(groupId),
550+
pull.take(1),
551+
pull.drain(
552+
(info) => {
553+
if (!info) return cb(new Error('Unknown group'))
554+
555+
// Fetch the tangle root
556+
// This is based on a live stream since sometimes the group info comes in very quick, before the root msg has had time to get put into the db
557+
// and sometimes it might take ages (we haven't gotten that feed yet)
558+
getRootVal(ssb, info.root, (err, rootVal) => {
559+
// prettier-ignore
560+
if (err) return cb(clarify(err, 'Failed to load group root with id ' + info.root))
561+
562+
if (!isInitRoot(rootVal))
563+
// prettier-ignore
564+
return cb(clarify(new Error(isInitRoot.string), 'Malformed group/init root message'))
565+
566+
cb(null, { key: info.root, value: rootVal })
567+
})
568+
},
569+
(err) => {
524570
// prettier-ignore
525-
return cb(clarify(new Error(isInitRoot.string), 'Malformed group/init root message'))
526-
527-
cb(null, { key: info.root, value: rootVal })
528-
})
529-
})
571+
if (err) return cb(clarify(err, 'Failed to get group info for ' + groupId))
572+
}
573+
)
574+
)
530575
}
531576

532577
/* HELPERS */

test/lib/epochs.test.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ test('lib/epochs (getEpochs, getMembers)', async (t) => {
2626
async function sync(label) {
2727
return run(`(sync ${label})`, replicate(peers), { isTest: false })
2828
}
29-
t.teardown(() => peers.forEach((peer) => peer.close(true)))
29+
t.teardown(() => Promise.all(peers.map((peer) => p(peer.close)(true))))
3030

3131
const [aliceId, bobId, oscarId] = await getRootIds(peers)
3232
await run(
@@ -153,7 +153,7 @@ test('lib/epochs (getMissingMembers)', async (t) => {
153153
{ isTest: false }
154154
)
155155
}
156-
t.teardown(() => peers.forEach((peer) => peer.close(true)))
156+
t.teardown(() => Promise.all(peers.map((peer) => p(peer.close)(true))))
157157

158158
await run(
159159
'start tribes',
@@ -260,7 +260,7 @@ test('lib/epochs (getPreferredEpoch - 4.4. same membership)', async (t) => {
260260
Server({ name: 'bob' }),
261261
Server({ name: 'oscar' }),
262262
]
263-
t.teardown(() => peers.forEach((peer) => peer.close(true)))
263+
t.teardown(() => Promise.all(peers.map((peer) => p(peer.close)(true))))
264264

265265
const [alice, bob, oscar] = peers
266266
const [bobId, oscarId] = await getRootIds([bob, oscar])
@@ -381,7 +381,7 @@ test('lib/epochs (getPreferredEpoch - 4.5. subset membership)', async (t) => {
381381
Server({ name: 'carol' }),
382382
Server({ name: 'oscar' }),
383383
]
384-
t.teardown(() => peers.forEach((peer) => peer.close(true)))
384+
t.teardown(() => Promise.all(peers.map((peer) => p(peer.close)(true))))
385385

386386
const [alice, bob, carol, oscar] = peers
387387
const [bobId, carolId, oscarId] = await getRootIds([bob, carol, oscar])

0 commit comments

Comments
 (0)