diff --git a/python/ray/data/_internal/planner/plan_read_op.py b/python/ray/data/_internal/planner/plan_read_op.py index fa5fa03fdb426..511b8c3e26b8d 100644 --- a/python/ray/data/_internal/planner/plan_read_op.py +++ b/python/ray/data/_internal/planner/plan_read_op.py @@ -26,16 +26,13 @@ READ_FILE_MAX_ATTEMPTS = 10 READ_FILE_RETRY_MAX_BACKOFF_SECONDS = 32 -import time -import logging -logger = logging.getLogger(__name__) # Defensively compute the size of the block as the max size reported by the # datasource and the actual read task size. This is to guard against issues # with bad metadata reporting. def cleaned_metadata(read_task: ReadTask): block_meta = read_task.get_metadata() - task_size = 0 + task_size = len(cloudpickle.dumps(read_task)) if block_meta.size_bytes is None or task_size > block_meta.size_bytes: if task_size > TASK_SIZE_WARN_THRESHOLD_BYTES: print( @@ -60,15 +57,10 @@ def get_input_data(target_max_block_size) -> List[RefBundle]: assert ( parallelism is not None ), "Read parallelism must be set by the optimizer before execution" - start = time.time() read_tasks = op._datasource_or_legacy_reader.get_read_tasks(parallelism) - end = time.time() - logger.info(f"Getting read tasks took: {end-start} seconds") _warn_on_high_parallelism(parallelism, len(read_tasks)) - # Follow up on the comment below - start = time.time() - bundle = [ + return [ RefBundle( [ ( @@ -85,9 +77,6 @@ def get_input_data(target_max_block_size) -> List[RefBundle]: ) for read_task in read_tasks ] - end = time.time() - logger.info(f"Generating reference bundle took: {end-start} seconds") - return bundle inputs = InputDataBuffer( input_data_factory=get_input_data,