Skip to content
This repository has been archived by the owner on Sep 30, 2023. It is now read-only.

Commit

Permalink
Add toCID and fromCID
Browse files Browse the repository at this point in the history
  • Loading branch information
satazor committed Jan 8, 2019
1 parent 2bb8587 commit 040ff46
Show file tree
Hide file tree
Showing 21 changed files with 1,045 additions and 518 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"node": ">=8.0.0"
},
"dependencies": {
"cids": "^0.5.7",
"ipld-dag-pb": "^0.15.2",
"orbit-db-identity-provider": "github:orbitdb/orbit-db-identity-provider",
"p-map": "^1.1.1",
Expand Down
64 changes: 37 additions & 27 deletions src/entry-io.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,53 @@ const pMap = require('p-map')
const Entry = require('./entry')

class EntryIO {
// Fetch log graphs in parallel
static async fetchParallel (ipfs, hashes, length, exclude = [], concurrency, timeout, onProgressCallback) {
const fetchOne = (hash) => EntryIO.fetchAll(ipfs, hash, length, exclude, timeout, onProgressCallback)
/**
* Fetch log entries in parallel.
* @param {IPFS} ipfs An IPFS instance
* @param {string|Array<string>} cids CIDs of the entries to fetch
* @param {number} [amount=-1] How many entries to fetch
* @param {Array<Entry>} [exclude] Entries to not fetch
* @param {number} [concurrency=] Max concurrent fetch operations
* @param {number} [timeout] Maximum time to wait for each fetch operation, in ms
* @param {function(cid, entry, parent, depth)} onProgressCallback
* @returns {Promise<Array<Entry>>}
*/
static async fetchParallel (ipfs, cids, amount = -1, exclude = [], concurrency = null, timeout, onProgressCallback) {
const fetchOne = (cid) => EntryIO.fetchAll(ipfs, cid, amount, exclude, timeout, onProgressCallback)
const concatArrays = (arr1, arr2) => arr1.concat(arr2)
const flatten = (arr) => arr.reduce(concatArrays, [])
concurrency = Math.max(concurrency || hashes.length, 1)
const entries = await pMap(hashes, fetchOne, { concurrency: concurrency })
concurrency = Math.max(concurrency || cids.length, 1)
const entries = await pMap(cids, fetchOne, { concurrency: concurrency })
return flatten(entries) // Flatten the results
}

/**
* Fetch log entries sequentially
*
* @param {IPFS} [ipfs] An IPFS instance
* @param {string} [hash] Multihash of the entry to fetch
* @param {string} [parent] Parent of the node to be fetched
* @param {Object} [all] Entries to skip
* @param {Number} [amount=-1] How many entries to fetch
* @param {Number} [depth=0] Current depth of the recursion
* @param {function(hash, entry, parent, depth)} onProgressCallback
* Fetch log entries sequentially.
* @param {IPFS} ipfs An IPFS instance
* @param {string|Array<string>} cids CIDs of the entries to fetch
* @param {number} [amount=-1] How many entries to fetch
* @param {Array<Entry>} [exclude] Entries to not fetch
* @param {number} [concurrency] Max concurrent fetch operations
* @param {number} [timeout] Maximum time to wait for each fetch operation, in ms
* @param {function(cid, entry, parent, depth)} onProgressCallback
* @returns {Promise<Array<Entry>>}
*/
static async fetchAll (ipfs, hashes, amount, exclude = [], timeout = null, onProgressCallback) {
static async fetchAll (ipfs, cids, amount = -1, exclude = [], timeout = null, onProgressCallback) {
let result = []
let cache = {}
let loadingQueue = Array.isArray(hashes)
? hashes.slice()
: [hashes]
let loadingQueue = Array.isArray(cids)
? cids.slice()
: [cids]

// Add a multihash to the loading queue
// Add a CID to the loading queue
const addToLoadingQueue = e => loadingQueue.push(e)

// Add entries that we don't need to fetch to the "cache"
exclude = exclude && Array.isArray(exclude) ? exclude : []
var addToExcludeCache = e => {
if (Entry.isEntry(e)) {
result.push(e)
cache[e.hash] = e
cache[e.cid] = e
}
}
exclude.forEach(addToExcludeCache)
Expand All @@ -53,9 +62,9 @@ class EntryIO {
}

const fetchEntry = () => {
const hash = loadingQueue.shift()
const cid = loadingQueue.shift()

if (cache[hash]) {
if (cache[cid]) {
return Promise.resolve()
}

Expand All @@ -64,30 +73,31 @@ class EntryIO {
// not get stuck loading a block that is unreachable
const timer = timeout
? setTimeout(() => {
console.warn(`Warning: Couldn't fetch entry '${hash}', request timed out (${timeout}ms)`)
console.warn(`Warning: Couldn't fetch entry '${cid}', request timed out (${timeout}ms)`)
resolve()
}, timeout)
: null

const addToResults = (entry) => {
clearTimeout(timer)
if (Entry.isEntry(entry)) {
entry.next.forEach(addToLoadingQueue)
result.push(entry)
cache[hash] = entry
cache[cid] = entry
if (onProgressCallback) {
onProgressCallback(hash, entry, result.length)
onProgressCallback(cid, entry, result.length)
}
}
}

// Load the entry
try {
const entry = await Entry.fromMultihash(ipfs, hash)
const entry = await Entry.fromCID(ipfs, cid)
addToResults(entry)
resolve()
} catch (e) {
reject(e)
} finally {
clearTimeout(timer)
}
})
}
Expand Down
Loading

0 comments on commit 040ff46

Please sign in to comment.