Skip to content
This repository has been archived by the owner on Feb 8, 2023. It is now read-only.

[BUG] Mars execute failed when the groupby specified method is shuffle and the output is series #36

Open
ChengjieLi28 opened this issue Oct 8, 2022 · 0 comments

Comments

@ChengjieLi28
Copy link

Describe the bug

Mars execute failed when the groupby specified method is shuffle and the output is series.

To Reproduce

To help us reproducing this bug, please provide information below:

  1. Your Python version: 3.9.12
  2. The version of Mars you use: latest
  3. Versions of crucial packages, such as numpy, scipy and pandas: follow mars
  4. Full stack of the error.
IndexError                                Traceback (most recent call last)
Input In [7], in <cell line: 1>()
----> 1 mdf.groupby("b", sort=False)["a"].sum(method="shuffle").execute()

File ~/Projects/workspace/mars/mars/core/entity/tileables.py:462, in HasShapeTileable.execute(self, session, **kw)
    461 def execute(self, session=None, **kw):
--> 462     result = self.data.execute(session=session, **kw)
    463     if isinstance(result, TILEABLE_TYPE):
    464         return self

File ~/Projects/workspace/mars/mars/core/entity/executable.py:144, in _ExecutableMixin.execute(self, session, **kw)
    141 from ...deploy.oscar.session import execute
    143 session = _get_session(self, session)
--> 144 return execute(self, session=session, **kw)

File ~/Projects/workspace/mars/mars/deploy/oscar/session.py:1888, in execute(tileable, session, wait, new_session_kwargs, show_progress, progress_update_interval, *tileables, **kwargs)
   1886     session = get_default_or_create(**(new_session_kwargs or dict()))
   1887 session = _ensure_sync(session)
-> 1888 return session.execute(
   1889     tileable,
   1890     *tileables,
   1891     wait=wait,
   1892     show_progress=show_progress,
   1893     progress_update_interval=progress_update_interval,
   1894     **kwargs,
   1895 )

File ~/Projects/workspace/mars/mars/deploy/oscar/session.py:1682, in SyncSession.execute(self, tileable, show_progress, warn_duplicated_execution, *tileables, **kwargs)
   1680 fut = asyncio.run_coroutine_threadsafe(coro, self._loop)
   1681 try:
-> 1682     execution_info: ExecutionInfo = fut.result(
   1683         timeout=self._isolated_session.timeout
   1684     )
   1685 except KeyboardInterrupt:  # pragma: no cover
   1686     logger.warning("Cancelling running task")

File ~/miniconda3/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout)
    444     raise CancelledError()
    445 elif self._state == FINISHED:
--> 446     return self.__get_result()
    447 else:
    448     raise TimeoutError()

File ~/miniconda3/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File ~/Projects/workspace/mars/mars/deploy/oscar/session.py:1868, in _execute(session, wait, show_progress, progress_update_interval, cancelled, *tileables, **kwargs)
   1865     else:
   1866         # set cancelled to avoid wait task leak
   1867         cancelled.set()
-> 1868     await execution_info
   1869 else:
   1870     return execution_info

File ~/Projects/workspace/mars/mars/deploy/oscar/session.py:105, in ExecutionInfo._ensure_future.<locals>.wait()
    104 async def wait():
--> 105     return await self._aio_task

File ~/Projects/workspace/mars/mars/deploy/oscar/session.py:953, in _IsolatedSession._run_in_background(self, tileables, task_id, progress, profiling)
    947         logger.warning(
    948             "Profile task %s execution result:\n%s",
    949             task_id,
    950             json.dumps(task_result.profiling, indent=4),
    951         )
    952     if task_result.error:
--> 953         raise task_result.error.with_traceback(task_result.traceback)
    954 if cancelled:
    955     return

File ~/Projects/workspace/mars/mars/services/task/supervisor/processor.py:373, in TaskProcessor.run(self)
    371     async with self._executor:
    372         async for stage_args in self._iter_stage_chunk_graph():
--> 373             await self._process_stage_chunk_graph(*stage_args)
    374 except Exception as ex:
    375     self.result.error = ex

File ~/Projects/workspace/mars/mars/services/task/supervisor/processor.py:216, in TaskProcessor._process_stage_chunk_graph(self, stage_id, stage_profiler, chunk_graph)
    212 shuffle_fetch_type = (
    213     self._executor.get_execution_config().get_shuffle_fetch_type()
    214 )
    215 with Timer() as timer:
--> 216     subtask_graph = await asyncio.to_thread(
    217         self._preprocessor.analyze,
    218         chunk_graph,
    219         self._chunk_to_subtasks,
    220         available_bands,
    221         stage_id=stage_id,
    222         op_to_bands=fetch_op_to_bands,
    223         shuffle_fetch_type=shuffle_fetch_type,
    224     )
    225     if self._dump_subtask_graph:
    226         self._subtask_graphs.append(subtask_graph)

File ~/miniconda3/lib/python3.9/asyncio/threads.py:25, in to_thread(func, *args, **kwargs)
     23 ctx = contextvars.copy_context()
     24 func_call = functools.partial(ctx.run, func, *args, **kwargs)
---> 25 return await loop.run_in_executor(None, func_call)

File ~/miniconda3/lib/python3.9/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)

File ~/Projects/workspace/mars/mars/services/task/supervisor/preprocessor.py:230, in TaskPreprocessor.analyze(self, chunk_graph, chunk_to_subtasks, available_bands, stage_id, op_to_bands, shuffle_fetch_type)
    220 task = self._task
    221 analyzer = GraphAnalyzer(
    222     chunk_graph,
    223     available_bands,
   (...)
    228     shuffle_fetch_type=shuffle_fetch_type,
    229 )
--> 230 graph = analyzer.gen_subtask_graph(op_to_bands)
    231 logger.debug(
    232     "Generated subtask graph of %s subtasks for task %s",
    233     len(graph),
    234     self._task.task_id,
    235 )
    236 return graph

File ~/Projects/workspace/mars/mars/core/mode.py:77, in _EnterModeFuncWrapper.__call__.<locals>._inner(*args, **kwargs)
     74 @functools.wraps(func)
     75 def _inner(*args, **kwargs):
     76     with enter_mode(**mode_name_to_value):
---> 77         return func(*args, **kwargs)

File ~/Projects/workspace/mars/mars/services/task/analyzer/analyzer.py:466, in GraphAnalyzer.gen_subtask_graph(self, op_to_bands)
    463 if all(isinstance(c.op, Fetch) for c in same_color_chunks):
    464     # all fetch ops, no need to gen subtask
    465     continue
--> 466 subtask, inp_subtasks, is_shuffle_proxy = self._gen_subtask_info(
    467     same_color_chunks,
    468     chunk_to_subtask,
    469     chunk_to_bands,
    470     chunk_to_fetch_chunk,
    471 )
    472 subtask_graph.add_node(subtask)
    473 if is_shuffle_proxy:

File ~/Projects/workspace/mars/mars/services/task/analyzer/analyzer.py:244, in GraphAnalyzer._gen_subtask_info(self, chunks, chunk_to_subtask, chunk_to_bands, chunk_to_fetch_chunk)
    239 copied_op = chunk.op.copy()
    240 copied_op._key = chunk.op.key
    241 out_chunks = [
    242     c.data
    243     for c in copied_op.new_chunks(
--> 244         inp_chunks, kws=[c.params.copy() for c in chunk.op.outputs]
    245     )
    246 ]
    247 for src_chunk, out_chunk in zip(chunk.op.outputs, out_chunks):
    248     processed.add(src_chunk)

File ~/Projects/workspace/mars/mars/services/task/analyzer/analyzer.py:244, in <listcomp>(.0)
    239 copied_op = chunk.op.copy()
    240 copied_op._key = chunk.op.key
    241 out_chunks = [
    242     c.data
    243     for c in copied_op.new_chunks(
--> 244         inp_chunks, kws=[c.params.copy() for c in chunk.op.outputs]
    245     )
    246 ]
    247 for src_chunk, out_chunk in zip(chunk.op.outputs, out_chunks):
    248     processed.add(src_chunk)

File ~/Projects/workspace/mars/mars/dataframe/core.py:1854, in BaseDataFrameChunkData._get_params(self)
   1850 def _get_params(self) -> Dict[str, Any]:
   1851     # params return the properties which useful to rebuild a new chunk
   1852     return {
   1853         "shape": self.shape,
-> 1854         "dtypes": self.dtypes,
   1855         "dtypes_value": self.dtypes_value,
   1856         "index": self.index,
   1857         "index_value": self.index_value,
   1858         "columns_value": self.columns_value,
   1859     }

File ~/Projects/workspace/mars/mars/dataframe/core.py:1907, in BaseDataFrameChunkData.dtypes(self)
   1905 @property
   1906 def dtypes(self):
-> 1907     dt = getattr(self, "_dtypes", None)
   1908     if dt is not None:
   1909         return dt

File ~/Projects/workspace/mars/mars/dataframe/core.py:523, in ChunkDtypesField.__get__(self, instance, owner)
    520     return super().__get__(instance, owner)
    522 # get dtypes lazily
--> 523 index = instance.index[1]
    524 dtypes = self._gen_chunk_dtypes(instance, index)
    525 # cache dtypes

IndexError: tuple index out of range
  1. Minimized code to reproduce the error.
rs = np.random.RandomState(0)
data_size = 100
data_dict = {
    "a": rs.randint(0, 10, size=(data_size,)),
    "b": rs.choice(list("abcd"), size=(data_size,)),
    "c": rs.choice(list("abcd"), size=(data_size,)),
}
df = pd.DataFrame(data_dict)
mdf = md.DataFrame(df, chunk_size=13)
mdf.groupby("b", sort=False)["a"]
        .sum(method="shuffle")
        .execute()

Additional context

sort = False method = shuffle, failed
sort = True method = shuffle, succeed

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant