Skip to content

Commit

Permalink
feat: enable to use defaultDagBuilder with custom fileBuilder or dirB…
Browse files Browse the repository at this point in the history
…uilder
  • Loading branch information
clostao committed Sep 4, 2024
1 parent 3490930 commit 3037bd3
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 47 deletions.
2 changes: 1 addition & 1 deletion packages/ipfs-unixfs-importer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
"hamt-sharding": "^3.0.6",
"interface-blockstore": "^5.2.10",
"interface-store": "^5.1.8",
"ipfs-unixfs": "^11.0.0",
"ipfs-unixfs": "^11.1.4",
"it-all": "^3.0.4",
"it-batch": "^3.0.4",
"it-first": "^3.0.4",
Expand Down
12 changes: 10 additions & 2 deletions packages/ipfs-unixfs-importer/src/dag-builder/dir.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import { encode, prepare } from '@ipld/dag-pb'
import { UnixFS } from 'ipfs-unixfs'
import { persist } from '../utils/persist.js'
import type { Directory, InProgressImportResult, WritableStorage } from '../index.js'
import type {
Directory,
InProgressImportResult,
WritableStorage
} from '../index.js'
import type { Version } from 'multiformats/cid'

export interface DirBuilderOptions {
cidVersion: Version
signal?: AbortSignal
}

export const dirBuilder = async (dir: Directory, blockstore: WritableStorage, options: DirBuilderOptions): Promise<InProgressImportResult> => {
export const defaultDirBuilder = async (
dir: Directory,
blockstore: WritableStorage,
options: DirBuilderOptions
): Promise<InProgressImportResult> => {
const unixfs = new UnixFS({
type: 'directory',
mtime: dir.mtime,
Expand Down
103 changes: 78 additions & 25 deletions packages/ipfs-unixfs-importer/src/dag-builder/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@ import parallelBatch from 'it-parallel-batch'
import * as rawCodec from 'multiformats/codecs/raw'
import { CustomProgressEvent } from 'progress-events'
import { persist } from '../utils/persist.js'
import type { BufferImporter, File, InProgressImportResult, WritableStorage, SingleBlockImportResult, ImporterProgressEvents } from '../index.js'
import type {
BufferImporter,
File,
InProgressImportResult,
WritableStorage,
SingleBlockImportResult,
ImporterProgressEvents
} from '../index.js'
import type { FileLayout, Reducer } from '../layout/index.js'
import type { CID, Version } from 'multiformats/cid'
import type { ProgressOptions, ProgressEvent } from 'progress-events'
Expand All @@ -14,11 +21,18 @@ interface BuildFileBatchOptions {
blockWriteConcurrency: number
}

async function * buildFileBatch (file: File, blockstore: WritableStorage, options: BuildFileBatchOptions): AsyncGenerator<InProgressImportResult> {
async function * buildFileBatch (
file: File,
blockstore: WritableStorage,
options: BuildFileBatchOptions
): AsyncGenerator<InProgressImportResult> {
let count = -1
let previous: SingleBlockImportResult | undefined

for await (const entry of parallelBatch(options.bufferImporter(file, blockstore), options.blockWriteConcurrency)) {
for await (const entry of parallelBatch(
options.bufferImporter(file, blockstore),
options.blockWriteConcurrency
)) {
count++

if (count === 0) {
Expand All @@ -29,7 +43,7 @@ async function * buildFileBatch (file: File, blockstore: WritableStorage, option
}

continue
} else if (count === 1 && (previous != null)) {
} else if (count === 1 && previous != null) {
// we have the second block of a multiple block import so yield the first
yield {
...previous,
Expand Down Expand Up @@ -63,8 +77,10 @@ export interface LayoutLeafProgress {
path?: string
}

export type ReducerProgressEvents =
ProgressEvent<'unixfs:importer:progress:file:layout', LayoutLeafProgress>
export type ReducerProgressEvents = ProgressEvent<
'unixfs:importer:progress:file:layout',
LayoutLeafProgress
>

interface ReduceOptions extends ProgressOptions<ImporterProgressEvents> {
reduceSingleLeafToSelf: boolean
Expand All @@ -76,13 +92,24 @@ function isSingleBlockImport (result: any): result is SingleBlockImportResult {
return result.single === true
}

const reduce = (file: File, blockstore: WritableStorage, options: ReduceOptions): Reducer => {
const reduce = (
file: File,
blockstore: WritableStorage,
options: ReduceOptions
): Reducer => {
const reducer: Reducer = async function (leaves) {
if (leaves.length === 1 && isSingleBlockImport(leaves[0]) && options.reduceSingleLeafToSelf) {
if (
leaves.length === 1 &&
isSingleBlockImport(leaves[0]) &&
options.reduceSingleLeafToSelf
) {
const leaf = leaves[0]
let node: Uint8Array | PBNode = leaf.block

if (isSingleBlockImport(leaf) && (file.mtime !== undefined || file.mode !== undefined)) {
if (
isSingleBlockImport(leaf) &&
(file.mtime !== undefined || file.mode !== undefined)
) {
// only one leaf node which is a raw leaf - we have metadata so convert it into a
// UnixFS entry otherwise we'll have nowhere to store the metadata
leaf.unixfs = new UnixFS({
Expand All @@ -103,10 +130,15 @@ const reduce = (file: File, blockstore: WritableStorage, options: ReduceOptions)
leaf.size = BigInt(leaf.block.length)
}

options.onProgress?.(new CustomProgressEvent<LayoutLeafProgress>('unixfs:importer:progress:file:layout', {
cid: leaf.cid,
path: leaf.originalPath
}))
options.onProgress?.(
new CustomProgressEvent<LayoutLeafProgress>(
'unixfs:importer:progress:file:layout',
{
cid: leaf.cid,
path: leaf.originalPath
}
)
)

return {
cid: leaf.cid,
Expand All @@ -125,12 +157,16 @@ const reduce = (file: File, blockstore: WritableStorage, options: ReduceOptions)
})

const links: PBLink[] = leaves
.filter(leaf => {
.filter((leaf) => {
if (leaf.cid.code === rawCodec.code && leaf.size > 0) {
return true
}

if ((leaf.unixfs != null) && (leaf.unixfs.data == null) && leaf.unixfs.fileSize() > 0n) {
if (
leaf.unixfs != null &&
leaf.unixfs.data == null &&
leaf.unixfs.fileSize() > 0n
) {
return true
}

Expand All @@ -148,7 +184,7 @@ const reduce = (file: File, blockstore: WritableStorage, options: ReduceOptions)
}
}

if ((leaf.unixfs == null) || (leaf.unixfs.data == null)) {
if (leaf.unixfs == null || leaf.unixfs.data == null) {
// node is an intermediate node
f.addBlockSize(leaf.unixfs?.fileSize() ?? 0n)
} else {
Expand All @@ -170,16 +206,24 @@ const reduce = (file: File, blockstore: WritableStorage, options: ReduceOptions)
const block = encode(prepare(node))
const cid = await persist(block, blockstore, options)

options.onProgress?.(new CustomProgressEvent<LayoutLeafProgress>('unixfs:importer:progress:file:layout', {
cid,
path: file.originalPath
}))
options.onProgress?.(
new CustomProgressEvent<LayoutLeafProgress>(
'unixfs:importer:progress:file:layout',
{
cid,
path: file.originalPath
}
)
)

return {
cid,
path: file.path,
unixfs: f,
size: BigInt(block.length + node.Links.reduce((acc, curr) => acc + (curr.Tsize ?? 0), 0)),
size: BigInt(
block.length +
node.Links.reduce((acc, curr) => acc + (curr.Tsize ?? 0), 0)
),
originalPath: file.originalPath,
block
}
Expand All @@ -188,10 +232,19 @@ const reduce = (file: File, blockstore: WritableStorage, options: ReduceOptions)
return reducer
}

export interface FileBuilderOptions extends BuildFileBatchOptions, ReduceOptions {
export interface FileBuilderOptions
extends BuildFileBatchOptions,
ReduceOptions {
layout: FileLayout
}

export const fileBuilder = async (file: File, block: WritableStorage, options: FileBuilderOptions): Promise<InProgressImportResult> => {
return options.layout(buildFileBatch(file, block, options), reduce(file, block, options))
}
export const defaultFileBuilder = async (
file: File,
block: WritableStorage,
options: FileBuilderOptions
): Promise<InProgressImportResult> => {
return options.layout(
buildFileBatch(file, block, options),
reduce(file, block, options)
);
};
78 changes: 60 additions & 18 deletions packages/ipfs-unixfs-importer/src/dag-builder/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import errCode from 'err-code'
import { CustomProgressEvent } from 'progress-events'
import { dirBuilder, type DirBuilderOptions } from './dir.js'
import { fileBuilder, type FileBuilderOptions } from './file.js'
import { defaultDirBuilder, type DirBuilderOptions } from './dir.js'
import { defaultFileBuilder, type FileBuilderOptions } from './file.js'
import type { ChunkValidator } from './validate-chunks.js'
import type { Chunker } from '../chunker/index.js'
import type { Directory, File, FileCandidate, ImportCandidate, ImporterProgressEvents, InProgressImportResult, WritableStorage } from '../index.js'
import type {
Directory,
File,
FileCandidate,
ImportCandidate,
ImporterProgressEvents,
InProgressImportResult,
WritableStorage
} from '../index.js'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

/**
Expand All @@ -27,8 +35,10 @@ export interface ImportReadProgress {
path?: string
}

export type DagBuilderProgressEvents =
ProgressEvent<'unixfs:importer:progress:file:read', ImportReadProgress>
export type DagBuilderProgressEvents = ProgressEvent<
'unixfs:importer:progress:file:read',
ImportReadProgress
>

function isIterable (thing: any): thing is Iterable<any> {
return Symbol.iterator in thing
Expand All @@ -38,16 +48,18 @@ function isAsyncIterable (thing: any): thing is AsyncIterable<any> {
return Symbol.asyncIterator in thing
}

function contentAsAsyncIterable (content: Uint8Array | AsyncIterable<Uint8Array> | Iterable<Uint8Array>): AsyncIterable<Uint8Array> {
function contentAsAsyncIterable (
content: Uint8Array | AsyncIterable<Uint8Array> | Iterable<Uint8Array>
): AsyncIterable<Uint8Array> {
try {
if (content instanceof Uint8Array) {
return (async function * () {
yield content
}())
})()
} else if (isIterable(content)) {
return (async function * () {
yield * content
}())
})()
} else if (isAsyncIterable(content)) {
return content
}
Expand All @@ -58,16 +70,33 @@ function contentAsAsyncIterable (content: Uint8Array | AsyncIterable<Uint8Array>
throw errCode(new Error('Content was invalid'), 'ERR_INVALID_CONTENT')
}

export interface DagBuilderOptions extends FileBuilderOptions, DirBuilderOptions, ProgressOptions<ImporterProgressEvents> {
export interface DagBuilderOptions
extends FileBuilderOptions,
DirBuilderOptions,
ProgressOptions<ImporterProgressEvents> {
chunker: Chunker
chunkValidator: ChunkValidator
wrapWithDirectory: boolean
dirBuilder?(
dir: Directory,
blockstore: WritableStorage,
options: DirBuilderOptions
): Promise<InProgressImportResult>
fileBuilder?(
file: File,
blockstore: WritableStorage,
options: FileBuilderOptions
): Promise<InProgressImportResult>
}

export type ImporterSourceStream = AsyncIterable<ImportCandidate> | Iterable<ImportCandidate>
export type ImporterSourceStream =
| AsyncIterable<ImportCandidate>
| Iterable<ImportCandidate>

export interface DAGBuilder {
(source: ImporterSourceStream, blockstore: WritableStorage): AsyncIterable<() => Promise<InProgressImportResult>>
(source: ImporterSourceStream, blockstore: WritableStorage): AsyncIterable<
() => Promise<InProgressImportResult>
>
}

export function defaultDagBuilder (options: DagBuilderOptions): DAGBuilder {
Expand All @@ -79,7 +108,7 @@ export function defaultDagBuilder (options: DagBuilderOptions): DAGBuilder {
originalPath = entry.path
entry.path = entry.path
.split('/')
.filter(path => path != null && path !== '.')
.filter((path) => path != null && path !== '.')
.join('/')
}

Expand All @@ -91,22 +120,31 @@ export function defaultDagBuilder (options: DagBuilderOptions): DAGBuilder {
content: (async function * () {
let bytesRead = 0n

for await (const chunk of options.chunker(options.chunkValidator(contentAsAsyncIterable(entry.content)))) {
for await (const chunk of options.chunker(
options.chunkValidator(contentAsAsyncIterable(entry.content))
)) {
const currentChunkSize = BigInt(chunk.byteLength)
bytesRead += currentChunkSize

options.onProgress?.(new CustomProgressEvent<ImportReadProgress>('unixfs:importer:progress:file:read', {
bytesRead,
chunkSize: currentChunkSize,
path: entry.path
}))
options.onProgress?.(
new CustomProgressEvent<ImportReadProgress>(
'unixfs:importer:progress:file:read',
{
bytesRead,
chunkSize: currentChunkSize,
path: entry.path
}
)
)

yield chunk
}
})(),
originalPath
}

const fileBuilder = options.fileBuilder ?? defaultFileBuilder

yield async () => fileBuilder(file, blockstore, options)
} else if (entry.path != null) {
const dir: Directory = {
Expand All @@ -116,6 +154,10 @@ export function defaultDagBuilder (options: DagBuilderOptions): DAGBuilder {
originalPath
}

const dirBuilder =
options.dirBuilder ??
defaultDirBuilder

yield async () => dirBuilder(dir, blockstore, options)
} else {
throw new Error('Import candidate must have content or path or both')
Expand Down
1 change: 1 addition & 0 deletions packages/ipfs-unixfs-importer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ export async function * importer (source: ImportCandidateStream, blockstore: Wri
const fileImportConcurrency = options.fileImportConcurrency ?? 50
const blockWriteConcurrency = options.blockWriteConcurrency ?? 10
const reduceSingleLeafToSelf = options.reduceSingleLeafToSelf ?? true


const chunker = options.chunker ?? fixedSize()
const chunkValidator = options.chunkValidator ?? defaultChunkValidator()
Expand Down
3 changes: 2 additions & 1 deletion packages/ipfs-unixfs-importer/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"extends": "aegir/src/config/tsconfig.aegir.json",
"compilerOptions": {
"outDir": "dist"
"outDir": "dist",
"esModuleInterop": true
},
"include": [
"src",
Expand Down

0 comments on commit 3037bd3

Please sign in to comment.