diff --git a/python/ray/data/_internal/batcher.py b/python/ray/data/_internal/batcher.py index ace53efc2de93..5ad15a3dbc3c0 100644 --- a/python/ray/data/_internal/batcher.py +++ b/python/ray/data/_internal/batcher.py @@ -1,3 +1,6 @@ +import logging +import os + from typing import Optional from ray.data._internal.arrow_block import ArrowBlockAccessor @@ -11,7 +14,8 @@ # See https://github.com/ray-project/ray/issues/31108 for more details. # TODO(jjyao): remove this once # https://github.com/apache/arrow/issues/35126 is resolved. -MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS = 2 +# Make this configurable to void unnecessary combine chunks +MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS = int(os.getenv('RAY_DATA_MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS', 2)) # Delay compaction until the shuffle buffer has reached this ratio over the min # shuffle buffer size. Setting this to 1 minimizes memory usage, at the cost of @@ -20,6 +24,8 @@ SHUFFLE_BUFFER_COMPACTION_RATIO = 1.5 +logger = logging.getLogger(__name__) + class BatcherInterface: def add(self, block: Block): """Add a block to the block buffer. @@ -142,6 +148,7 @@ def next_batch(self) -> Block: and block.column(0).num_chunks >= MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS ): + logger.warning(f"Detected combine_chunks in Batcher. Number of chunks: {block.column(0).num_chunks}, threshold: {MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS}") accessor = BlockAccessor.for_block( transform_pyarrow.combine_chunks(block) ) @@ -313,6 +320,7 @@ def next_batch(self) -> Block: and self._shuffle_buffer.column(0).num_chunks >= MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS ): + logger.warning(f"Detected combine_chunks in ShuffleBatcher. Number of chunks: {block.column(0).num_chunks}, threshold: {MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS}") self._shuffle_buffer = transform_pyarrow.combine_chunks( self._shuffle_buffer )