Skip to content

Commit 2dd04af

Browse files
committed
fix: add parrllel operation
1 parent 3f21dbc commit 2dd04af

File tree

7 files changed

+610
-377
lines changed

7 files changed

+610
-377
lines changed

parsers/docx_parser.py

Lines changed: 68 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -74,31 +74,64 @@ async def parse(self, file_path: Path) -> DocumentData:
7474
result = await loop.run_in_executor(None, self._converter.convert, file_path)
7575
doc_data = result.document
7676

77-
title = self._extract_title(doc_data)
78-
images = self._extract_images(doc_data.pictures)
79-
tables = self._extract_tables(doc_data.tables)
80-
texts = self._extract_texts(doc_data.texts)
77+
# 并行处理不同类型的内容
78+
document_data = await self._process_content_parallel(doc_data)
8179

8280
processing_time = time.time() - start_time
81+
document_data.processing_time = processing_time
8382
logger.info(f"Successfully parsed DOCX: {file_path} (took {processing_time:.2f}s)")
84-
return DocumentData(
85-
title=title,
86-
texts=texts,
87-
tables=tables,
88-
images=images,
89-
processing_time=processing_time,
90-
success=True
91-
)
83+
return document_data
9284

9385
except Exception as e:
94-
processing_time = time.time() - start_time
95-
error_msg = f"Failed to parse DOCX file {file_path}: {type(e).__name__}: {e}"
96-
logger.exception(error_msg) # 记录完整堆栈
97-
return DocumentData(
98-
success=False,
99-
error_message=str(e),
100-
processing_time=processing_time
101-
)
86+
raise Exception(f"Failed to parse DOCX file {file_path}") from e
87+
88+
async def _process_content_parallel(self, doc_data: DoclingDocument) -> DocumentData:
89+
"""并行处理文档内容"""
90+
# 创建任务列表
91+
tasks = []
92+
93+
# 添加图片处理任务
94+
if doc_data.pictures:
95+
tasks.append(self._extract_images_async(doc_data.pictures))
96+
97+
# 添加表格处理任务
98+
if doc_data.tables:
99+
tasks.append(self._extract_tables_async(doc_data.tables))
100+
101+
# 添加文本处理任务
102+
if doc_data.texts:
103+
tasks.append(self._extract_texts_async(doc_data.texts))
104+
105+
# 并行执行所有任务
106+
results = await asyncio.gather(*tasks, return_exceptions=True)
107+
108+
# 处理结果
109+
images: list[ChunkData] = []
110+
tables: list[ChunkData] = []
111+
texts: list[ChunkData] = []
112+
113+
for i, result in enumerate(results):
114+
if isinstance(result, Exception):
115+
logger.error(f"Error processing content type {i}: {result}")
116+
continue
117+
if isinstance(result, list):
118+
if result and result[0].type == ChunkType.IMAGE:
119+
images = result
120+
elif result and result[0].type == ChunkType.TABLE:
121+
tables = result
122+
elif result and result[0].type == ChunkType.TEXT:
123+
texts = result
124+
125+
# 提取标题
126+
title = self._extract_title(doc_data)
127+
128+
return DocumentData(
129+
title=title,
130+
texts=texts,
131+
tables=tables,
132+
images=images,
133+
success=True
134+
)
102135

103136
def _extract_images(self, pictures: list[PictureItem]) -> list[ChunkData]:
104137
"""提取文档中的图片
@@ -213,3 +246,18 @@ def _extract_texts(self, texts:list[TitleItem|SectionHeaderItem|ListItem|CodeIte
213246
)
214247
)
215248
return text_items
249+
250+
async def _extract_images_async(self, pictures: list[PictureItem]) -> list[ChunkData]:
251+
"""异步提取文档中的图片"""
252+
loop = asyncio.get_event_loop()
253+
return await loop.run_in_executor(None, self._extract_images, pictures)
254+
255+
async def _extract_tables_async(self, tables: list[TableItem]) -> list[ChunkData]:
256+
"""异步提取文档中的表格"""
257+
loop = asyncio.get_event_loop()
258+
return await loop.run_in_executor(None, self._extract_tables, tables)
259+
260+
async def _extract_texts_async(self, texts: list[TitleItem|SectionHeaderItem|ListItem|CodeItem|FormulaItem|TextItem]) -> list[ChunkData]:
261+
"""异步提取文档中的文本"""
262+
loop = asyncio.get_event_loop()
263+
return await loop.run_in_executor(None, self._extract_texts, texts)

parsers/excel_parser.py

Lines changed: 79 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
包括表格数据提取和图片处理。
66
"""
77

8+
import asyncio
89
import base64
910
import json
11+
import logging
1012
import time
1113
import warnings
1214
from dataclasses import dataclass
@@ -37,6 +39,7 @@
3739
CellValue = str|int|float|bool|None|datetime|date
3840
TableData = list[list[CellValue]]
3941

42+
logger = logging.getLogger(__name__)
4043

4144
@dataclass
4245
class ExcelParseConfig:
@@ -79,53 +82,88 @@ async def parse(self, file_path: Path) -> DocumentData:
7982

8083
try:
8184
# 初始化内容列表和图片列表
82-
texts: list[ChunkData] = []
83-
tables: list[ChunkData] = []
84-
images: list[ChunkData] = []
8585

8686
# 加载工作簿
8787
workbook = self._load_workbook(file_path)
8888

89-
# 处理每个工作表
90-
for sheet_index, sheet_name in enumerate(workbook.sheetnames):
91-
sheet = workbook[sheet_name]
92-
93-
# 添加工作表标题
94-
texts.append(ChunkData(
95-
type=ChunkType.TEXT,
96-
name=sheet_name,
97-
content=TextDataItem(
98-
text=f"工作表 {sheet_index + 1}: {sheet_name}",
99-
),
100-
))
101-
102-
# 处理图片
103-
sheet_images = self._extract_sheet_images(sheet)
104-
images.extend(sheet_images)
105-
106-
# 处理表格数据
107-
table_content = self._extract_table_data(sheet)
108-
tables.append(ChunkData(
109-
type=ChunkType.TABLE,
110-
name=f"#/tables/{sheet_index}",
111-
content=table_content
112-
))
89+
# 并行处理每个工作表
90+
document_data = await self._process_sheets_parallel(workbook, file_path)
91+
11392
processing_time = time.time() - start_time
114-
return DocumentData(
115-
title=Path(file_path).stem,
116-
texts=texts,
117-
tables=tables,
118-
images=images,
119-
processing_time=processing_time,
120-
success=True
121-
)
93+
document_data.processing_time = processing_time
94+
return document_data
12295
except Exception as e:
123-
processing_time = time.time() - start_time
124-
return DocumentData(
125-
success=False,
126-
error_message=str(e),
127-
processing_time=processing_time
128-
)
96+
raise Exception(f"Failed to parse Excel file {file_path}: {type(e).__name__}: {e}") from e
97+
98+
async def _process_sheets_parallel(self, workbook: Workbook, file_path: Path) -> DocumentData:
99+
"""并行处理所有工作表"""
100+
# 创建任务列表
101+
tasks = []
102+
103+
for sheet_index, sheet_name in enumerate(workbook.sheetnames):
104+
sheet = workbook[sheet_name]
105+
tasks.append(self._process_sheet_async(sheet, sheet_index, sheet_name))
106+
107+
# 并行执行所有工作表处理任务
108+
if tasks:
109+
results = await asyncio.gather(*tasks)
110+
111+
# 合并结果
112+
texts: list[ChunkData] = []
113+
tables: list[ChunkData] = []
114+
images: list[ChunkData] = []
115+
116+
for result in results:
117+
if result:
118+
texts.extend(result.get('texts', []))
119+
tables.extend(result.get('tables', []))
120+
images.extend(result.get('images', []))
121+
122+
return DocumentData(
123+
title=Path(file_path).stem,
124+
texts=texts,
125+
tables=tables,
126+
images=images,
127+
success=True
128+
)
129+
130+
async def _process_sheet_async(self, sheet: Worksheet, sheet_index: int, sheet_name: str) -> dict|None:
131+
"""异步处理单个工作表"""
132+
try:
133+
loop = asyncio.get_event_loop()
134+
135+
# 并行处理图片和表格
136+
image_task = loop.run_in_executor(None, self._extract_sheet_images, sheet)
137+
table_task = loop.run_in_executor(None, self._extract_table_data, sheet)
138+
139+
# 等待两个任务完成
140+
sheet_images, table_content = await asyncio.gather(image_task, table_task)
141+
142+
# 添加工作表标题
143+
texts = [ChunkData(
144+
type=ChunkType.TEXT,
145+
name=sheet_name,
146+
content=TextDataItem(
147+
text=f"工作表 {sheet_index + 1}: {sheet_name}",
148+
),
149+
)]
150+
151+
# 创建表格数据
152+
tables = [ChunkData(
153+
type=ChunkType.TABLE,
154+
name=f"#/tables/{sheet_index}",
155+
content=table_content
156+
)] if table_content else []
157+
158+
return {
159+
'texts': texts,
160+
'tables': tables,
161+
'images': sheet_images
162+
}
163+
164+
except Exception as e:
165+
logger.error(f"Error processing sheet {sheet_name}: {e}")
166+
return None
129167

130168
def _load_workbook(self, excel_path: Path) -> Workbook:
131169
"""

0 commit comments

Comments
 (0)