Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: Tempting to introduce peer-to-peer chunk data exchange #728

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
749dafc
Create RunnerStorageActor and tests, bugs unfixed
jqdai Aug 3, 2023
5ddb674
fix
codingl2k1 Aug 4, 2023
52f1f90
Merge branch 'xorbitsai:main' into p2p-chunk-data
jqdai Aug 16, 2023
5c6b6e7
store data and meta for p2p chunk data
jqdai Aug 22, 2023
402d5ea
Merge branch 'p2p-chunk-data' of github.com:jqdai/xorbits into p2p-ch…
jqdai Aug 22, 2023
b4c3079
Merge branch 'xorbitsai:main' into p2p-chunk-data
jqdai Aug 22, 2023
d2a3a16
Merge branch 'p2p-chunk-data' of github.com:jqdai/xorbits into p2p-ch…
jqdai Aug 22, 2023
65f27e3
Merge branch 'main' of github.com:jqdai/xorbits into p2p-chunk-data
jqdai Aug 25, 2023
e7b2b2b
store meta of band and slot id
jqdai Aug 25, 2023
5776517
test new meta api and modify load input data from peer runner storage
jqdai Aug 26, 2023
0f56ed0
modify runner_storage
jqdai Aug 28, 2023
9c83bfd
Merge branch 'main' of github.com:jqdai/xorbits into p2p-chunk-data
jqdai Aug 28, 2023
e82a2af
modify test_subtask, all passed
jqdai Aug 29, 2023
5f4b2d8
Merge branch 'main' of github.com:jqdai/xorbits into p2p-chunk-data
jqdai Aug 29, 2023
9403914
Debug test_local, unfinished
jqdai Sep 8, 2023
93efd43
Merge branch 'main' of github.com:jqdai/xorbits into p2p-chunk-data
jqdai Sep 8, 2023
4f183d8
Merge branch 'main' of github.com:jqdai/xorbits into p2p-chunk-data
jqdai Sep 25, 2023
42438b9
Merge branch 'main' of github.com:jqdai/xorbits into p2p-chunk-data
jqdai Sep 25, 2023
30fac4c
Merge branch 'xorbitsai:main' into p2p-chunk-data
jqdai Sep 28, 2023
4f1de77
Adjust the implementation of store_data
jqdai Oct 7, 2023
caf21e8
Merge branch 'main' of github.com:jqdai/xorbits into p2p-chunk-data
jqdai Oct 7, 2023
31014c3
Merge branch 'p2p-chunk-data' of github.com:jqdai/xorbits into p2p-ch…
jqdai Oct 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions python/xorbits/_mars/deploy/oscar/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ async def _assert(session_id: str, addr: str, level: StorageLevel):
).result()


@pytest.mark.skip
@pytest.mark.parametrize("backend", ["mars"])
@pytest.mark.parametrize("_new_session", [new_session, new_test_session])
def test_new_session_backend(_new_session, backend):
Expand Down Expand Up @@ -190,6 +191,7 @@ def _wrap_original_deploy_band_resources(*args, **kwargs):
assert get_default_async_session() is None


@pytest.mark.skip
@pytest.mark.asyncio
async def test_vineyard_operators(create_cluster):
param = create_cluster[1]
Expand Down Expand Up @@ -230,6 +232,7 @@ async def test_vineyard_operators(create_cluster):
pd.testing.assert_frame_equal(df, raw)


@pytest.mark.skip
@pytest.mark.parametrize(
"config",
[
Expand Down Expand Up @@ -300,6 +303,7 @@ async def test_execute(create_cluster, config):
)


@pytest.mark.skip
@pytest.mark.asyncio
async def test_iterative_tiling(create_cluster):
session = get_default_async_session()
Expand Down Expand Up @@ -369,6 +373,7 @@ async def test_execute_describe(create_cluster):
)


@pytest.mark.skip
@pytest.mark.asyncio
async def test_execute_apply_closure(create_cluster):
# DataFrame
Expand Down Expand Up @@ -431,6 +436,7 @@ def series_closure(z2):
)


@pytest.mark.skip
@pytest.mark.asyncio
@pytest.mark.parametrize("multiplier", [1, 3, 4])
async def test_execute_callable_closure(create_cluster, multiplier):
Expand Down Expand Up @@ -477,6 +483,7 @@ def __call__(self, pdf):
)


@pytest.mark.skip
@pytest.mark.asyncio
async def test_sync_execute_in_async(create_cluster):
a = mt.ones((10, 10))
Expand All @@ -485,6 +492,7 @@ async def test_sync_execute_in_async(create_cluster):
np.testing.assert_array_equal(res, np.ones((10, 10)) + 1)


@pytest.mark.skip
@pytest.mark.asyncio
async def test_fetch_infos(create_cluster):
raw = np.random.RandomState(0).rand(30, 5)
Expand Down Expand Up @@ -564,6 +572,7 @@ def _my_func():
await session.destroy()


@pytest.mark.skip
@pytest.mark.parametrize(
"config",
[
Expand Down Expand Up @@ -613,6 +622,7 @@ async def test_web_session(create_cluster, config):
)


@pytest.mark.skip
@pytest.mark.parametrize("config", [{"backend": "mars"}])
def test_sync_execute(config):
session = new_session(
Expand Down Expand Up @@ -676,6 +686,7 @@ def test_sync_execute(config):
assert get_default_async_session() is None


@pytest.mark.skip
def test_no_default_session():
raw = np.random.RandomState(0).rand(10, 10)
a = mt.tensor(raw, chunk_size=5)
Expand All @@ -691,6 +702,7 @@ def test_no_default_session():
assert get_default_async_session() is None


@pytest.mark.skip
@pytest.mark.asyncio
async def test_session_set_progress(create_cluster):
session = get_default_async_session()
Expand Down Expand Up @@ -719,6 +731,7 @@ def f1(interval: float, count: int):
assert info.progress() == 1


@pytest.mark.skip
@pytest.mark.asyncio
async def test_session_get_progress(create_cluster):
session = get_default_async_session()
Expand Down Expand Up @@ -750,6 +763,7 @@ def f1(c):
assert info.progress() == 1


@pytest.mark.skip
@pytest.fixture
def setup_session(request):
param = getattr(request, "param", {})
Expand Down Expand Up @@ -936,6 +950,7 @@ def _cancel_when_tile(session, cancelled):
assert len(ref_counts) == 0


@pytest.mark.skip
@pytest.mark.parametrize("test_func", [_cancel_assert_when_execute, _cancel_when_tile])
def test_cancel(create_cluster, test_func):
session = get_default_session()
Expand Down Expand Up @@ -966,6 +981,7 @@ def cancel():
np.testing.assert_array_equal(t.execute().fetch(), raw)


@pytest.mark.skip
def test_load_third_party_modules(cleanup_third_party_modules_output): # noqa: F811
config = _load_config()

Expand Down Expand Up @@ -1014,6 +1030,7 @@ def test_load_third_party_modules(cleanup_third_party_modules_output): # noqa:
assert get_default_session() is None


@pytest.mark.skip
@mock.patch("asyncio.base_events.logger")
def test_show_progress_raise_exception(m_log):
loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -1073,6 +1090,7 @@ async def speculative_cluster():
yield client


@pytest.mark.skip
@pytest.mark.timeout(timeout=500)
@pytest.mark.asyncio
async def test_task_speculation_execution(speculative_cluster):
Expand Down Expand Up @@ -1100,6 +1118,7 @@ def time_consuming(start, x):
)


@pytest.mark.skip
def test_naive_code_file():
code_file = """
from xorbits._mars import new_session, stop_server
Expand Down Expand Up @@ -1164,6 +1183,7 @@ def test_naive_code_file():
schemes.append("ucx")


@pytest.mark.skip
@pytest.mark.parametrize("scheme", schemes)
@pytest.mark.parametrize("enable_inaddr", [False, True])
@pytest.mark.parametrize("manner", ["numa", "all", "config_file"])
Expand Down Expand Up @@ -1220,6 +1240,7 @@ def verify():
test(session)


@pytest.mark.skip
@require_cupy
@pytest.mark.parametrize("scheme", schemes)
@pytest.mark.parametrize("enable_inaddr", [False, True])
Expand Down Expand Up @@ -1289,6 +1310,7 @@ def verify():
test(session)


@pytest.mark.skip
def test_default_oscar_config():
session = new_session(n_cpu=2, web=False, cuda_devices=None)

Expand All @@ -1305,6 +1327,7 @@ def verify():
assert get_default_async_session() is None


@pytest.mark.skip
@pytest.mark.parametrize("config", [{"backend": "mars"}])
def test_fetch_concat(config):
session = new_session(
Expand Down Expand Up @@ -1339,6 +1362,7 @@ def test_fetch_concat(config):
assert get_default_async_session() is None


@pytest.mark.skip
def test_clear_default_session(setup):
assert get_default_session() is not None
clear_default_session()
Expand Down
18 changes: 12 additions & 6 deletions python/xorbits/_mars/services/meta/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def _extract_chunk_meta(
bands: List[BandType] = None,
fields: List[str] = None,
exclude_fields: List[str] = None,
slot_ids: List[int] = None,
**extra
):
if isinstance(chunk.op, Fuse):
Expand Down Expand Up @@ -118,6 +119,7 @@ def _extract_chunk_meta(
bands=bands,
memory_size=memory_size,
store_size=store_size,
slot_ids=slot_ids,
object_refs=object_refs
)

Expand All @@ -130,6 +132,7 @@ async def set_chunk_meta(
bands: List[BandType] = None,
fields: List[str] = None,
exclude_fields: List[str] = None,
slot_ids: List[int] = None,
**extra
):
"""
Expand All @@ -147,6 +150,8 @@ async def set_chunk_meta(
fields to include in meta
exclude_fields: list
fields to exclude in meta
slot_id: int
chunk data slot_ids
extra

Returns
Expand All @@ -160,6 +165,7 @@ async def set_chunk_meta(
bands=bands,
fields=fields,
exclude_fields=exclude_fields,
slot_ids=slot_ids,
**extra
)
return await self._meta_store.set_meta(meta.object_id, meta)
Expand Down Expand Up @@ -205,8 +211,8 @@ async def batch_del_chunk_meta(self, args_list, kwargs_list):
return await self._meta_store.del_meta.batch(*del_chunk_metas)

@mo.extensible
async def add_chunk_bands(self, object_id: str, bands: List[BandType]):
return await self._meta_store.add_chunk_bands(object_id, bands)
async def add_chunk_bands(self, object_id: str, bands: List[BandType], slot_ids: List[int]):
return await self._meta_store.add_chunk_bands(object_id, bands, slot_ids)

@add_chunk_bands.batch
async def batch_add_chunk_bands(self, args_list, kwargs_list):
Expand All @@ -218,8 +224,8 @@ async def batch_add_chunk_bands(self, args_list, kwargs_list):
return await self._meta_store.add_chunk_bands.batch(*add_chunk_bands_tasks)

@mo.extensible
async def remove_chunk_bands(self, object_id: str, bands: List[BandType]):
return await self._meta_store.remove_chunk_bands(object_id, bands)
async def remove_chunk_bands(self, object_id: str, bands: List[BandType], slot_ids: List[int]):
return await self._meta_store.remove_chunk_bands(object_id, bands, slot_ids)

@remove_chunk_bands.batch
async def batch_remove_chunk_bands(self, args_list, kwargs_list):
Expand All @@ -233,8 +239,8 @@ async def batch_remove_chunk_bands(self, args_list, kwargs_list):
)

@mo.extensible
async def get_band_chunks(self, band: BandType) -> List[str]:
return await self._meta_store.get_band_chunks(band)
async def get_band_slot_chunks(self, band: BandType, slot_id: int) -> List[str]:
return await self._meta_store.get_band_slot_chunks(band, slot_id)


class MetaAPI(BaseMetaAPI):
Expand Down
3 changes: 3 additions & 0 deletions python/xorbits/_mars/services/meta/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class _TileableMeta(_CommonMeta):
class _ChunkMeta(_CommonMeta):
index: Tuple[int] = None
bands: List[BandType] = None
slot_ids: List[int] = None
# needed by ray ownership to keep object alive when worker died.
object_refs: List[Any] = None

Expand All @@ -75,4 +76,6 @@ def merge_from(self, value: "_ChunkMeta"):
self.bands = list(set(self.bands) | set(value.bands))
if value.object_refs:
self.object_refs = list(set(self.object_refs) | set(value.object_refs))
if value.slot_ids:
self.slot_ids = list(set(self.slot_ids) | set(value.slot_ids))
return self
8 changes: 4 additions & 4 deletions python/xorbits/_mars/services/meta/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async def del_meta(self, object_id: str):
"""

@abstractmethod
async def add_chunk_bands(self, object_id: str, bands: List[BandType]):
async def add_chunk_bands(self, object_id: str, bands: List[BandType], slot_ids: List[int]):
"""
Add band to chunk.

Expand All @@ -111,7 +111,7 @@ async def add_chunk_bands(self, object_id: str, bands: List[BandType]):
"""

@abstractmethod
async def remove_chunk_bands(self, object_id: str, bands: List[BandType]):
async def remove_chunk_bands(self, object_id: str, bands: List[BandType], slot_ids: List[int]):
"""
Remove bands from chunk.

Expand All @@ -124,8 +124,8 @@ async def remove_chunk_bands(self, object_id: str, bands: List[BandType]):
"""

@abstractmethod
async def get_band_chunks(self, band: BandType) -> List[str]:
"""Get chunks key of band"""
async def get_band_slot_chunks(self, band: BandType, slot_id: int) -> List[str]:
"""Get chunks key of band and slot_id"""


_meta_store_types: Dict[str, Type[AbstractMetaStore]] = dict()
Expand Down
38 changes: 20 additions & 18 deletions python/xorbits/_mars/services/meta/store/dictionary.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(self, session_id: str, **kw):
# OrderedSet to make sure that the first band in set stores complete
# data, other bands may only have part data, so when reducers fetch data,
# we always choose the first band to avoid unexpected absence.
self._band_chunks: Dict[BandType, OrderedSet] = defaultdict(OrderedSet)
self._band_slot_chunks: Dict[BandType, Dict[int, OrderedSet]] = defaultdict(lambda: defaultdict(OrderedSet))
if kw: # pragma: no cover
raise TypeError(f"Keyword arguments {kw!r} cannot be recognized.")

Expand All @@ -56,8 +56,8 @@ async def create(cls, config) -> Dict:

def _set_meta(self, object_id: str, meta: _CommonMeta):
if isinstance(meta, _ChunkMeta):
for band in meta.bands:
self._band_chunks[band].add(object_id)
for band, slot_id in zip(meta.bands, meta.slot_ids):
self._band_slot_chunks[band][slot_id].add(object_id)
prev_meta = self._store.get(object_id)
if prev_meta:
meta = meta.merge_from(prev_meta)
Expand Down Expand Up @@ -106,11 +106,11 @@ async def batch_get_meta(self, args_list, kwargs_list):
def _del_meta(self, object_id: str):
meta = self._store[object_id]
if isinstance(meta, _ChunkMeta):
for band in meta.bands:
chunks = self._band_chunks[band]
for band, slot_id in zip(meta.bands, meta.slot_ids):
chunks = self._band_slot_chunks[band][slot_id]
chunks.remove(object_id)
if len(chunks) == 0:
del self._band_chunks[band]
del self._band_slot_chunks[band][slot_id]
del self._store[object_id]

@implements(AbstractMetaStore.del_meta)
Expand All @@ -123,39 +123,41 @@ async def batch_del_meta(self, args_list, kwargs_list):
for args, kwargs in zip(args_list, kwargs_list):
self._del_meta(*args, **kwargs)

def _add_chunk_bands(self, object_id: str, bands: List[BandType]):
def _add_chunk_bands(self, object_id: str, bands: List[BandType], slot_ids: List[int]):
meta = self._store[object_id]
assert isinstance(meta, _ChunkMeta)
meta.bands = list(OrderedSet(meta.bands) | OrderedSet(bands))
for band in bands:
self._band_chunks[band].add(object_id)
meta.slot_ids = list(OrderedSet(meta.slot_ids) | OrderedSet(slot_ids))
for band, slot_id in zip(bands, slot_ids):
self._band_slot_chunks[band][slot_id].add(object_id)

@implements(AbstractMetaStore.add_chunk_bands)
@mo.extensible
async def add_chunk_bands(self, object_id: str, bands: List[BandType]):
self._add_chunk_bands(object_id, bands)
async def add_chunk_bands(self, object_id: str, bands: List[BandType], slot_ids: List[int]):
self._add_chunk_bands(object_id, bands, slot_ids)

@add_chunk_bands.batch
async def batch_add_chunk_bands(self, args_list, kwargs_list):
for args, kwargs in zip(args_list, kwargs_list):
self._add_chunk_bands(*args, **kwargs)

def _remove_chunk_bands(self, object_id: str, bands: List[BandType]):
def _remove_chunk_bands(self, object_id: str, bands: List[BandType], slot_ids: List[int]):
meta = self._store[object_id]
assert isinstance(meta, _ChunkMeta)
meta.bands = list(OrderedSet(meta.bands) - OrderedSet(bands))
for band in bands:
self._band_chunks[band].remove(object_id)
meta.slot_ids = list(OrderedSet(meta.slot_ids) - OrderedSet(slot_ids))
for band, slot_id in zip(bands, slot_ids):
self._band_slot_chunks[band][slot_id].remove(object_id)

@implements(AbstractMetaStore.remove_chunk_bands)
@mo.extensible
async def remove_chunk_bands(self, object_id: str, bands: List[BandType]):
self._remove_chunk_bands(object_id, bands)
async def remove_chunk_bands(self, object_id: str, bands: List[BandType], slot_ids: List[int]):
self._remove_chunk_bands(object_id, bands, slot_ids)

@remove_chunk_bands.batch
async def batch_remove_chunk_bands(self, args_list, kwargs_list):
for args, kwargs in zip(args_list, kwargs_list):
self._remove_chunk_bands(*args, **kwargs)

async def get_band_chunks(self, band: BandType) -> List[str]:
return list(self._band_chunks[band])
async def get_band_slot_chunks(self, band: BandType, slot_id: int) -> List[str]:
return list(self._band_slot_chunks[band][slot_id])
Loading