Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

for executor function call #596

Merged
merged 1 commit into from
Feb 27, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions data_juicer/core/executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from time import time
from typing import Optional
from typing import Optional, Union

from jsonargparse import Namespace
from loguru import logger
Expand All @@ -20,6 +20,7 @@
from ..ops.selector.topk_specified_field_selector import \
TopkSpecifiedFieldSelector
from .adapter import Adapter
from .data import NestedDataset
from .exporter import Exporter
from .tracer import Tracer

Expand Down Expand Up @@ -143,17 +144,21 @@ def sample_data(self,
raise ValueError(f'Unsupported sample_algo: {sample_algo}')

def run(self,
dataset: Union[Dataset, NestedDataset] = None,
load_data_np: Optional[PositiveInt] = None,
skip_return=False):
"""
Running the dataset process pipeline.

:param dataset: a Dataset object to be executed.
:param load_data_np: number of workers when loading the dataset.
:param skip_return: skip return for API called.
:return: processed dataset.
"""
# 1. format data
if self.cfg.use_checkpoint and self.ckpt_manager.ckpt_available:
if dataset is not None:
logger.info(f'Using existing dataset {dataset}')
elif self.cfg.use_checkpoint and self.ckpt_manager.ckpt_available:
logger.info('Loading dataset from checkpoint...')
dataset = self.ckpt_manager.load_ckpt()
else:
Expand Down
Loading