Skip to content
This repository has been archived by the owner on Sep 30, 2023. It is now read-only.

Commit

Permalink
Merge pull request #213 from satazor/ipld
Browse files Browse the repository at this point in the history
Use the new dag api and use ipld links for both heads and next pointers
  • Loading branch information
shamb0t authored Jan 14, 2019
2 parents 1d567b5 + 496f9a5 commit f45f896
Show file tree
Hide file tree
Showing 20 changed files with 2,810 additions and 1,786 deletions.
3,118 changes: 1,823 additions & 1,295 deletions package-lock.json

Large diffs are not rendered by default.

14 changes: 9 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
"node": ">=8.0.0"
},
"dependencies": {
"cids": "^0.5.7",
"ipld-dag-pb": "^0.15.2",
"orbit-db-identity-provider": "github:orbitdb/orbit-db-identity-provider",
"p-map": "^1.1.1",
"p-whilst": "^1.0.0"
"p-whilst": "^1.0.0",
"pify": "^4.0.1"
},
"devDependencies": {
"@babel/cli": "~7.2.3",
Expand All @@ -28,17 +31,18 @@
"@orbitdb/eslint-config-orbitdb": "0.0.2",
"babel-loader": "~8.0.4",
"datastore-level": "~0.10.0",
"go-ipfs-dep": "~0.4.17",
"ipfs": "~0.31.4",
"ipfs-repo": "~0.23.1",
"ipfsd-ctl": "~0.39.1",
"go-ipfs-dep": "~0.4.18",
"ipfs": "~0.33.1",
"ipfs-repo": "~0.24.0",
"ipfsd-ctl": "~0.40.2",
"is-node": "~1.0.2",
"istanbul": "~0.4.5",
"json-loader": "~0.5.7",
"mocha": "~5.2.0",
"mocha-headless-chrome": "~2.0.1",
"orbit-db-keystore": "github:orbitdb/orbit-db-keystore",
"rimraf": "~2.6.1",
"sinon": "~7.2.2",
"standard": "~12.0.1",
"webpack": "~4.28.0",
"webpack-cli": "~3.1.2"
Expand Down
64 changes: 37 additions & 27 deletions src/entry-io.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,53 @@ const pMap = require('p-map')
const Entry = require('./entry')

class EntryIO {
// Fetch log graphs in parallel
static async fetchParallel (ipfs, hashes, length, exclude = [], concurrency, timeout, onProgressCallback) {
const fetchOne = (hash) => EntryIO.fetchAll(ipfs, hash, length, exclude, timeout, onProgressCallback)
/**
* Fetch log entries in parallel.
* @param {IPFS} ipfs An IPFS instance
* @param {string|Array<string>} cids CIDs of the entries to fetch
* @param {number} [amount=-1] How many entries to fetch
* @param {Array<Entry>} [exclude] Entries to not fetch
* @param {number} [concurrency=] Max concurrent fetch operations
* @param {number} [timeout] Maximum time to wait for each fetch operation, in ms
* @param {function(cid, entry, parent, depth)} onProgressCallback
* @returns {Promise<Array<Entry>>}
*/
static async fetchParallel (ipfs, cids, amount = -1, exclude = [], concurrency = null, timeout, onProgressCallback) {
const fetchOne = (cid) => EntryIO.fetchAll(ipfs, cid, amount, exclude, timeout, onProgressCallback)
const concatArrays = (arr1, arr2) => arr1.concat(arr2)
const flatten = (arr) => arr.reduce(concatArrays, [])
concurrency = Math.max(concurrency || hashes.length, 1)
const entries = await pMap(hashes, fetchOne, { concurrency: concurrency })
concurrency = Math.max(concurrency || cids.length, 1)
const entries = await pMap(cids, fetchOne, { concurrency: concurrency })
return flatten(entries) // Flatten the results
}

/**
* Fetch log entries sequentially
*
* @param {IPFS} [ipfs] An IPFS instance
* @param {string} [hash] Multihash of the entry to fetch
* @param {string} [parent] Parent of the node to be fetched
* @param {Object} [all] Entries to skip
* @param {Number} [amount=-1] How many entries to fetch
* @param {Number} [depth=0] Current depth of the recursion
* @param {function(hash, entry, parent, depth)} onProgressCallback
* Fetch log entries sequentially.
* @param {IPFS} ipfs An IPFS instance
* @param {string|Array<string>} cids CIDs of the entries to fetch
* @param {number} [amount=-1] How many entries to fetch
* @param {Array<Entry>} [exclude] Entries to not fetch
* @param {number} [concurrency] Max concurrent fetch operations
* @param {number} [timeout] Maximum time to wait for each fetch operation, in ms
* @param {function(cid, entry, parent, depth)} onProgressCallback
* @returns {Promise<Array<Entry>>}
*/
static async fetchAll (ipfs, hashes, amount, exclude = [], timeout = null, onProgressCallback) {
static async fetchAll (ipfs, cids, amount = -1, exclude = [], timeout = null, onProgressCallback) {
let result = []
let cache = {}
let loadingQueue = Array.isArray(hashes)
? hashes.slice()
: [hashes]
let loadingQueue = Array.isArray(cids)
? cids.slice()
: [cids]

// Add a multihash to the loading queue
// Add a CID to the loading queue
const addToLoadingQueue = e => loadingQueue.push(e)

// Add entries that we don't need to fetch to the "cache"
exclude = exclude && Array.isArray(exclude) ? exclude : []
var addToExcludeCache = e => {
if (Entry.isEntry(e)) {
result.push(e)
cache[e.hash] = e
cache[e.cid] = e
}
}
exclude.forEach(addToExcludeCache)
Expand All @@ -53,9 +62,9 @@ class EntryIO {
}

const fetchEntry = () => {
const hash = loadingQueue.shift()
const cid = loadingQueue.shift()

if (cache[hash]) {
if (cache[cid]) {
return Promise.resolve()
}

Expand All @@ -64,30 +73,31 @@ class EntryIO {
// not get stuck loading a block that is unreachable
const timer = timeout
? setTimeout(() => {
console.warn(`Warning: Couldn't fetch entry '${hash}', request timed out (${timeout}ms)`)
console.warn(`Warning: Couldn't fetch entry '${cid}', request timed out (${timeout}ms)`)
resolve()
}, timeout)
: null

const addToResults = (entry) => {
clearTimeout(timer)
if (Entry.isEntry(entry)) {
entry.next.forEach(addToLoadingQueue)
result.push(entry)
cache[hash] = entry
cache[cid] = entry
if (onProgressCallback) {
onProgressCallback(hash, entry, result.length)
onProgressCallback(cid, entry, result.length)
}
}
}

// Load the entry
try {
const entry = await Entry.fromMultihash(ipfs, hash)
const entry = await Entry.fromCID(ipfs, cid)
addToResults(entry)
resolve()
} catch (e) {
reject(e)
} finally {
clearTimeout(timer)
}
})
}
Expand Down
Loading

0 comments on commit f45f896

Please sign in to comment.