Skip to content

Commit

Permalink
[Feature] improve CsvLoader & clean code (#3830)
Browse files Browse the repository at this point in the history
* Improve CSV Loader

* Improve S3 Loaders

---------

Co-authored-by: Henry <[email protected]>
  • Loading branch information
JJK801 and HenryHengZJ authored Jan 14, 2025
1 parent cc87d85 commit 24eb437
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 206 deletions.
125 changes: 42 additions & 83 deletions packages/components/nodes/documentloaders/Csv/Csv.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { omit } from 'lodash'
import { TextSplitter } from 'langchain/text_splitter'
import { CSVLoader } from '@langchain/community/document_loaders/fs/csv'
import { getFileFromStorage, handleEscapeCharacters } from '../../../src'
import { CSVLoader } from './CsvLoader'
import { getFileFromStorage, handleDocumentLoaderDocuments, handleDocumentLoaderMetadata, handleDocumentLoaderOutput } from '../../../src'
import { ICommonObject, IDocument, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface'

class Csv_DocumentLoaders implements INode {
Expand All @@ -19,7 +18,7 @@ class Csv_DocumentLoaders implements INode {
constructor() {
this.label = 'Csv File'
this.name = 'csvFile'
this.version = 2.0
this.version = 3.0
this.type = 'Document'
this.icon = 'csv.svg'
this.category = 'Document Loaders'
Expand Down Expand Up @@ -82,21 +81,11 @@ class Csv_DocumentLoaders implements INode {
]
}

async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
const textSplitter = nodeData.inputs?.textSplitter as TextSplitter
getFiles(nodeData: INodeData) {
const csvFileBase64 = nodeData.inputs?.csvFile as string
const columnName = nodeData.inputs?.columnName as string
const metadata = nodeData.inputs?.metadata
const output = nodeData.outputs?.output as string
const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string

let omitMetadataKeys: string[] = []
if (_omitMetadataKeys) {
omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim())
}

let docs: IDocument[] = []
let files: string[] = []
let fromStorage: boolean = true

if (csvFileBase64.startsWith('FILE-STORAGE::')) {
const fileName = csvFileBase64.replace('FILE-STORAGE::', '')
Expand All @@ -105,86 +94,56 @@ class Csv_DocumentLoaders implements INode {
} else {
files = [fileName]
}
const chatflowid = options.chatflowid

for (const file of files) {
if (!file) continue
const fileData = await getFileFromStorage(file, chatflowid)
const blob = new Blob([fileData])
const loader = new CSVLoader(blob, columnName.trim().length === 0 ? undefined : columnName.trim())

if (textSplitter) {
docs = await loader.load()
docs = await textSplitter.splitDocuments(docs)
} else {
docs.push(...(await loader.load()))
}
}
} else {
if (csvFileBase64.startsWith('[') && csvFileBase64.endsWith(']')) {
files = JSON.parse(csvFileBase64)
} else {
files = [csvFileBase64]
}

for (const file of files) {
if (!file) continue
const splitDataURI = file.split(',')
splitDataURI.pop()
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
const blob = new Blob([bf])
const loader = new CSVLoader(blob, columnName.trim().length === 0 ? undefined : columnName.trim())

if (textSplitter) {
docs = await loader.load()
docs = await textSplitter.splitDocuments(docs)
} else {
docs.push(...(await loader.load()))
}
}
fromStorage = false
}

if (metadata) {
const parsedMetadata = typeof metadata === 'object' ? metadata : JSON.parse(metadata)
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {
...parsedMetadata
}
: omit(
{
...doc.metadata,
...parsedMetadata
},
omitMetadataKeys
)
}))
return { files, fromStorage }
}

async getFileData(file: string, { chatflowid }: { chatflowid: string }, fromStorage?: boolean) {
if (fromStorage) {
return getFileFromStorage(file, chatflowid)
} else {
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {}
: omit(
{
...doc.metadata
},
omitMetadataKeys
)
}))
const splitDataURI = file.split(',')
splitDataURI.pop()
return Buffer.from(splitDataURI.pop() || '', 'base64')
}
}

if (output === 'document') {
return docs
} else {
let finaltext = ''
for (const doc of docs) {
finaltext += `${doc.pageContent}\n`
}
return handleEscapeCharacters(finaltext, false)
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
const textSplitter = nodeData.inputs?.textSplitter as TextSplitter
const columnName = nodeData.inputs?.columnName as string
const metadata = nodeData.inputs?.metadata
const output = nodeData.outputs?.output as string
const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string

let docs: IDocument[] = []

const chatflowid = options.chatflowid

const { files, fromStorage } = this.getFiles(nodeData)

for (const file of files) {
if (!file) continue

const fileData = await this.getFileData(file, { chatflowid }, fromStorage)
const blob = new Blob([fileData])
const loader = new CSVLoader(blob, columnName.trim().length === 0 ? undefined : columnName.trim())

// use spread instead of push, because it raises RangeError: Maximum call stack size exceeded when too many docs
docs = [...docs, ...(await handleDocumentLoaderDocuments(loader, textSplitter))]
}

docs = handleDocumentLoaderMetadata(docs, _omitMetadataKeys, metadata)

return handleDocumentLoaderOutput(docs, output)
}
}

Expand Down
74 changes: 74 additions & 0 deletions packages/components/nodes/documentloaders/Csv/CsvLoader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { TextLoader } from 'langchain/document_loaders/fs/text'
import Papa from 'papaparse'

type CSVLoaderOptions = {
// Return specifific column from key (string) or index (integer)
column?: string | number
// Force separator (default: auto detect)
separator?: string
}

/**
* A class that extends the TextLoader class. It represents a document
* loader that loads documents from a CSV file. It has a constructor that
* takes a `filePathOrBlob` parameter representing the path to the CSV
* file or a Blob object, and an optional `options` parameter of type
* `CSVLoaderOptions` or a string representing the column to use as the
* document's pageContent.
*/
export class CSVLoader extends TextLoader {
protected options: CSVLoaderOptions = {}

constructor(filePathOrBlob: ConstructorParameters<typeof TextLoader>[0], options?: CSVLoaderOptions | string) {
super(filePathOrBlob)

if (typeof options === 'string') {
this.options = { column: options }
} else {
this.options = options ?? this.options
}
}
/**
* A protected method that parses the raw CSV data and returns an array of
* strings representing the pageContent of each document. It uses the
* `papaparse` to parse the CSV data. If
* the `column` option is specified, it checks if the column exists in the
* CSV file and returns the values of that column as the pageContent. If
* the `column` option is not specified, it converts each row of the CSV
* data into key/value pairs and joins them with newline characters.
* @param raw The raw CSV data to be parsed.
* @returns An array of strings representing the pageContent of each document.
*/
async parse(raw: string): Promise<string[]> {
const { column, separator } = this.options

const {
data: parsed,
meta: { fields = [] }
} = Papa.parse<{ [K: string]: string }>(raw.trim(), {
delimiter: separator,
header: true
})

if (column !== undefined) {
if (!fields.length) {
throw new Error(`Unable to resolve fields from header.`)
}

let searchIdx = column

if (typeof column == 'number') {
searchIdx = fields[column]
}

if (!fields.includes(searchIdx as string)) {
throw new Error(`Column ${column} not found in CSV file.`)
}

// Note TextLoader will raise an exception if the value is null.
return parsed.map((row) => row[searchIdx])
}

return parsed.map((row) => fields.map((key) => `${key.trim() || '_0'}: ${row[key]?.trim()}`).join('\n'))
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { omit } from 'lodash'
import { ICommonObject, INode, INodeData, INodeOptionsValue, INodeOutputsValue, INodeParams } from '../../../src/Interface'
import { getCredentialData, getCredentialParam, handleEscapeCharacters } from '../../../src/utils'
import {
getCredentialData,
getCredentialParam,
handleDocumentLoaderDocuments,
handleDocumentLoaderMetadata,
handleDocumentLoaderOutput
} from '../../../src/utils'
import { S3Client, GetObjectCommand, S3ClientConfig, ListObjectsV2Command, ListObjectsV2Output } from '@aws-sdk/client-s3'
import { getRegions, MODEL_TYPE } from '../../../src/modelLoader'
import { Readable } from 'node:stream'
Expand All @@ -10,12 +15,13 @@ import * as os from 'node:os'

import { DirectoryLoader } from 'langchain/document_loaders/fs/directory'
import { JSONLoader } from 'langchain/document_loaders/fs/json'
import { CSVLoader } from '@langchain/community/document_loaders/fs/csv'
import { PDFLoader } from '@langchain/community/document_loaders/fs/pdf'
import { DocxLoader } from '@langchain/community/document_loaders/fs/docx'
import { TextLoader } from 'langchain/document_loaders/fs/text'
import { TextSplitter } from 'langchain/text_splitter'

import { CSVLoader } from '../Csv/CsvLoader'

class S3_DocumentLoaders implements INode {
label: string
name: string
Expand Down Expand Up @@ -151,11 +157,6 @@ class S3_DocumentLoaders implements INode {
const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string
const output = nodeData.outputs?.output as string

let omitMetadataKeys: string[] = []
if (_omitMetadataKeys) {
omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim())
}

let credentials: S3ClientConfig['credentials'] | undefined

if (nodeData.credential) {
Expand Down Expand Up @@ -241,11 +242,11 @@ class S3_DocumentLoaders implements INode {
'.csv': (path) => new CSVLoader(path),
'.docx': (path) => new DocxLoader(path),
'.pdf': (path) =>
pdfUsage === 'perFile'
? // @ts-ignore
new PDFLoader(path, { splitPages: false, pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js') })
: // @ts-ignore
new PDFLoader(path, { pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js') }),
new PDFLoader(path, {
splitPages: pdfUsage !== 'perFile',
// @ts-ignore
pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js')
}),
'.aspx': (path) => new TextLoader(path),
'.asp': (path) => new TextLoader(path),
'.cpp': (path) => new TextLoader(path), // C++
Expand Down Expand Up @@ -284,63 +285,16 @@ class S3_DocumentLoaders implements INode {
true
)

let docs = []

if (textSplitter) {
let splittedDocs = await loader.load()
splittedDocs = await textSplitter.splitDocuments(splittedDocs)
docs.push(...splittedDocs)
} else {
docs = await loader.load()
}

if (metadata) {
const parsedMetadata = typeof metadata === 'object' ? metadata : JSON.parse(metadata)
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {
...parsedMetadata
}
: omit(
{
...doc.metadata,
...parsedMetadata
},
omitMetadataKeys
)
}))
} else {
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {}
: omit(
{
...doc.metadata
},
omitMetadataKeys
)
}))
}
let docs = await handleDocumentLoaderDocuments(loader, textSplitter)

// remove the temp directory before returning docs
fsDefault.rmSync(tempDir, { recursive: true })
docs = handleDocumentLoaderMetadata(docs, _omitMetadataKeys, metadata)

if (output === 'document') {
return docs
} else {
let finaltext = ''
for (const doc of docs) {
finaltext += `${doc.pageContent}\n`
}
return handleEscapeCharacters(finaltext, false)
}
return handleDocumentLoaderOutput(docs, output)
} catch (e: any) {
fsDefault.rmSync(tempDir, { recursive: true })
throw new Error(`Failed to load data from bucket ${bucketName}: ${e.message}`)
} finally {
// remove the temp directory before returning docs
fsDefault.rmSync(tempDir, { recursive: true })
}
}
}
Expand Down
Loading

0 comments on commit 24eb437

Please sign in to comment.