Skip to content
This repository has been archived by the owner on Sep 30, 2023. It is now read-only.

Use the new dag api and use ipld links for both heads and next pointers #213

Merged
merged 3 commits into from
Jan 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,118 changes: 1,823 additions & 1,295 deletions package-lock.json

Large diffs are not rendered by default.

14 changes: 9 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
"node": ">=8.0.0"
},
"dependencies": {
"cids": "^0.5.7",
"ipld-dag-pb": "^0.15.2",
"orbit-db-identity-provider": "github:orbitdb/orbit-db-identity-provider",
"p-map": "^1.1.1",
"p-whilst": "^1.0.0"
"p-whilst": "^1.0.0",
"pify": "^4.0.1"
},
"devDependencies": {
"@babel/cli": "~7.2.3",
Expand All @@ -28,17 +31,18 @@
"@orbitdb/eslint-config-orbitdb": "0.0.2",
"babel-loader": "~8.0.4",
"datastore-level": "~0.10.0",
"go-ipfs-dep": "~0.4.17",
"ipfs": "~0.31.4",
"ipfs-repo": "~0.23.1",
"ipfsd-ctl": "~0.39.1",
"go-ipfs-dep": "~0.4.18",
"ipfs": "~0.33.1",
"ipfs-repo": "~0.24.0",
"ipfsd-ctl": "~0.40.2",
"is-node": "~1.0.2",
"istanbul": "~0.4.5",
"json-loader": "~0.5.7",
"mocha": "~5.2.0",
"mocha-headless-chrome": "~2.0.1",
"orbit-db-keystore": "github:orbitdb/orbit-db-keystore",
"rimraf": "~2.6.1",
"sinon": "~7.2.2",
"standard": "~12.0.1",
"webpack": "~4.28.0",
"webpack-cli": "~3.1.2"
Expand Down
64 changes: 37 additions & 27 deletions src/entry-io.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,53 @@ const pMap = require('p-map')
const Entry = require('./entry')

class EntryIO {
// Fetch log graphs in parallel
static async fetchParallel (ipfs, hashes, length, exclude = [], concurrency, timeout, onProgressCallback) {
const fetchOne = (hash) => EntryIO.fetchAll(ipfs, hash, length, exclude, timeout, onProgressCallback)
/**
* Fetch log entries in parallel.
* @param {IPFS} ipfs An IPFS instance
* @param {string|Array<string>} cids CIDs of the entries to fetch
* @param {number} [amount=-1] How many entries to fetch
* @param {Array<Entry>} [exclude] Entries to not fetch
* @param {number} [concurrency=] Max concurrent fetch operations
* @param {number} [timeout] Maximum time to wait for each fetch operation, in ms
* @param {function(cid, entry, parent, depth)} onProgressCallback
* @returns {Promise<Array<Entry>>}
*/
static async fetchParallel (ipfs, cids, amount = -1, exclude = [], concurrency = null, timeout, onProgressCallback) {
const fetchOne = (cid) => EntryIO.fetchAll(ipfs, cid, amount, exclude, timeout, onProgressCallback)
const concatArrays = (arr1, arr2) => arr1.concat(arr2)
const flatten = (arr) => arr.reduce(concatArrays, [])
concurrency = Math.max(concurrency || hashes.length, 1)
const entries = await pMap(hashes, fetchOne, { concurrency: concurrency })
concurrency = Math.max(concurrency || cids.length, 1)
const entries = await pMap(cids, fetchOne, { concurrency: concurrency })
return flatten(entries) // Flatten the results
}

/**
* Fetch log entries sequentially
*
* @param {IPFS} [ipfs] An IPFS instance
* @param {string} [hash] Multihash of the entry to fetch
* @param {string} [parent] Parent of the node to be fetched
* @param {Object} [all] Entries to skip
* @param {Number} [amount=-1] How many entries to fetch
* @param {Number} [depth=0] Current depth of the recursion
* @param {function(hash, entry, parent, depth)} onProgressCallback
* Fetch log entries sequentially.
* @param {IPFS} ipfs An IPFS instance
* @param {string|Array<string>} cids CIDs of the entries to fetch
* @param {number} [amount=-1] How many entries to fetch
* @param {Array<Entry>} [exclude] Entries to not fetch
* @param {number} [concurrency] Max concurrent fetch operations
* @param {number} [timeout] Maximum time to wait for each fetch operation, in ms
* @param {function(cid, entry, parent, depth)} onProgressCallback
* @returns {Promise<Array<Entry>>}
*/
static async fetchAll (ipfs, hashes, amount, exclude = [], timeout = null, onProgressCallback) {
static async fetchAll (ipfs, cids, amount = -1, exclude = [], timeout = null, onProgressCallback) {
let result = []
let cache = {}
let loadingQueue = Array.isArray(hashes)
? hashes.slice()
: [hashes]
let loadingQueue = Array.isArray(cids)
? cids.slice()
: [cids]

// Add a multihash to the loading queue
// Add a CID to the loading queue
const addToLoadingQueue = e => loadingQueue.push(e)

// Add entries that we don't need to fetch to the "cache"
exclude = exclude && Array.isArray(exclude) ? exclude : []
var addToExcludeCache = e => {
if (Entry.isEntry(e)) {
result.push(e)
cache[e.hash] = e
cache[e.cid] = e
}
}
exclude.forEach(addToExcludeCache)
Expand All @@ -53,9 +62,9 @@ class EntryIO {
}

const fetchEntry = () => {
const hash = loadingQueue.shift()
const cid = loadingQueue.shift()

if (cache[hash]) {
if (cache[cid]) {
return Promise.resolve()
}

Expand All @@ -64,30 +73,31 @@ class EntryIO {
// not get stuck loading a block that is unreachable
const timer = timeout
? setTimeout(() => {
console.warn(`Warning: Couldn't fetch entry '${hash}', request timed out (${timeout}ms)`)
console.warn(`Warning: Couldn't fetch entry '${cid}', request timed out (${timeout}ms)`)
resolve()
}, timeout)
: null

const addToResults = (entry) => {
clearTimeout(timer)
if (Entry.isEntry(entry)) {
entry.next.forEach(addToLoadingQueue)
result.push(entry)
cache[hash] = entry
cache[cid] = entry
if (onProgressCallback) {
onProgressCallback(hash, entry, result.length)
onProgressCallback(cid, entry, result.length)
}
}
}

// Load the entry
try {
const entry = await Entry.fromMultihash(ipfs, hash)
const entry = await Entry.fromCID(ipfs, cid)
addToResults(entry)
resolve()
} catch (e) {
reject(e)
} finally {
clearTimeout(timer)
}
})
}
Expand Down
Loading