Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update Python API: support upload decoupling #447

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,17 @@ jobs:
- name: Install dependencies
run: pip install "$(echo pkg/armonik*.whl)[tests]"

- name: Install .NET Core
uses: actions/setup-dotnet@3447fd6a9f9e57506b15f895c5b76d3b197dc7c2 # v3
with:
dotnet-version: 6.x

- name: Start Mock server
run: |
cd ../csharp/ArmoniK.Api.Mock
nohup dotnet run > /dev/null 2>&1 &
sleep 60

- name: Run tests
run: python -m pytest tests --cov=armonik --cov-config=.coveragerc --cov-report=term-missing --cov-report xml:coverage.xml --cov-report html:coverage_report

Expand Down
1 change: 1 addition & 0 deletions packages/python/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ build/
*.egg-info
**/_version.py
**/.pytest_cache
.ruff_cache
2 changes: 1 addition & 1 deletion packages/python/proto2python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ python -m pip install --upgrade pip
python -m venv $PYTHON_VENV
source $PYTHON_VENV/bin/activate
# We need to fix grpc to 1.56 until this bug is solved : https://github.com/grpc/grpc/issues/34305
python -m pip install build grpcio==1.56.2 grpcio-tools==1.56.2 click pytest setuptools_scm[toml]
python -m pip install build grpcio==1.56.2 grpcio-tools==1.56.2 click pytest setuptools_scm[toml] ruff requests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pytest, setuptool_scm, ruff and requests should not be here but in the pyproject.toml file


unset proto_files
for proto in ${armonik_worker_files[@]}; do
Expand Down
3 changes: 2 additions & 1 deletion packages/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ tests = [
'pytest',
'pytest-cov',
'pytest-benchmark[histogram]',
'requests'
]

[tool.pytest.ini_options]
addopts = [
"--import-mode=importlib",
]
]
18 changes: 17 additions & 1 deletion packages/python/src/armonik/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
from .partitions import ArmoniKPartitions
from .sessions import ArmoniKSessions
from .submitter import ArmoniKSubmitter
from .tasks import ArmoniKTasks
from .results import ArmoniKResult
from .results import ArmoniKResults
aneojgurhem marked this conversation as resolved.
Show resolved Hide resolved
from .versions import ArmoniKVersions
from .events import ArmoniKEvents
from .health_check import ArmoniKHealthChecks

__all__ = [
'ArmoniKPartitions',
'ArmoniKSessions',
'ArmoniKSubmitter',
'ArmoniKTasks',
'ArmoniKResults',
"ArmoniKVersions",
"ArmoniKEvents",
"ArmoniKHealthChecks"
]
86 changes: 86 additions & 0 deletions packages/python/src/armonik/client/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from typing import Any, Callable, cast, List

from grpc import Channel

from .results import ArmoniKResults
from ..common import EventTypes, Filter, NewTaskEvent, NewResultEvent, ResultOwnerUpdateEvent, ResultStatusUpdateEvent, TaskStatusUpdateEvent, ResultStatus, Event
from .results import ResultFieldFilter
from ..protogen.client.events_service_pb2_grpc import EventsStub
from ..protogen.common.events_common_pb2 import EventSubscriptionRequest, EventSubscriptionResponse
from ..protogen.common.results_filters_pb2 import Filters as rawResultFilters
from ..protogen.common.tasks_filters_pb2 import Filters as rawTaskFilters

class ArmoniKEvents:

_events_obj_mapping = {
"new_result": NewResultEvent,
"new_task": NewTaskEvent,
"result_owner_update": ResultOwnerUpdateEvent,
"result_status_update": ResultStatusUpdateEvent,
"task_status_update": TaskStatusUpdateEvent
}

def __init__(self, grpc_channel: Channel):
"""Events service client

Args:
grpc_channel: gRPC channel to use
"""
self._client = EventsStub(grpc_channel)
self._results_client = ArmoniKResults(grpc_channel)

def get_events(self, session_id: str, event_types: List[EventTypes], event_handlers: List[Callable[[str, EventTypes, Event], bool]], task_filter: Filter | None = None, result_filter: Filter | None = None) -> None:
"""Get events that represents updates of result and tasks data.

Args:
session_id: The ID of the session.
event_types: The list of the types of event to catch.
event_handlers: The list of handlers that process the events. Handlers are evaluated in he order they are provided.
An handler takes three positional arguments: the ID of the session, the type of event and the event as an object.
An handler returns a boolean, if True the process continues, otherwise the stream is closed and the service stops
listening to new events.
task_filter: A filter on tasks.
result_filter: A filter on results.

"""
request = EventSubscriptionRequest(
session_id=session_id,
returned_events=event_types
)
if task_filter:
request.tasks_filters=cast(rawTaskFilters, task_filter.to_disjunction().to_message()),
if result_filter:
request.results_filters=cast(rawResultFilters, result_filter.to_disjunction().to_message()),

streaming_call = self._client.GetEvents(request)
for message in streaming_call:
event_type = message.WhichOneof("update")
if any([event_handler(session_id, EventTypes.from_string(event_type), self._events_obj_mapping[event_type].from_raw_event(getattr(message, event_type))) for event_handler in event_handlers]):
break

def wait_for_result_availability(self, result_id: str, session_id: str) -> None:
"""Wait until a result is ready i.e its status updates to COMPLETED.

Args:
result_id: The ID of the result.
session_id: The ID of the session.

Raises:
RuntimeError: If the result status is ABORTED.
"""
def handler(session_id, event_type, event):
ngruelaneo marked this conversation as resolved.
Show resolved Hide resolved
if not isinstance(event, ResultStatusUpdateEvent):
raise ValueError("Handler should receive event of type 'ResultStatusUpdateEvent'.")
if event.status == ResultStatus.COMPLETED:
return False
elif event.status == ResultStatus.ABORTED:
raise RuntimeError(f"Result {result.name} with ID {result_id} is aborted.")
return True

result = self._results_client.get_result(result_id)
if result.status == ResultStatus.COMPLETED:
return
elif result.status == ResultStatus.ABORTED:
raise RuntimeError(f"Result {result.name} with ID {result_id} is aborted.")

self.get_events(session_id, [EventTypes.RESULT_STATUS_UPDATE], [handler], result_filter=(ResultFieldFilter.RESULT_ID == result_id))
21 changes: 21 additions & 0 deletions packages/python/src/armonik/client/health_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import cast, List, Tuple
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used


from grpc import Channel

from ..common import HealthCheckStatus
from ..protogen.client.health_checks_service_pb2_grpc import HealthChecksServiceStub
from ..protogen.common.health_checks_common_pb2 import CheckHealthRequest, CheckHealthResponse


class ArmoniKHealthChecks:
def __init__(self, grpc_channel: Channel):
""" Result service client

Args:
grpc_channel: gRPC channel to use
"""
self._client = HealthChecksServiceStub(grpc_channel)

def check_health(self):
response: CheckHealthResponse = self._client.CheckHealth(CheckHealthRequest())
return {service.name: {"message": service.message, "status": service.healthy} for service in response.services}
66 changes: 66 additions & 0 deletions packages/python/src/armonik/client/partitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from typing import cast, List, Tuple

from grpc import Channel

from ..common import Direction, Partition
from ..common.filter import Filter, NumberFilter
from ..protogen.client.partitions_service_pb2_grpc import PartitionsStub
from ..protogen.common.partitions_common_pb2 import ListPartitionsRequest, ListPartitionsResponse, GetPartitionRequest, GetPartitionResponse
from ..protogen.common.partitions_fields_pb2 import PartitionField, PartitionRawField, PARTITION_RAW_ENUM_FIELD_PRIORITY
from ..protogen.common.partitions_filters_pb2 import Filters as rawFilters, FiltersAnd as rawFiltersAnd, FilterField as rawFilterField
from ..protogen.common.sort_direction_pb2 import SortDirection


class PartitionFieldFilter:
PRIORITY = NumberFilter(
PartitionField(partition_raw_field=PartitionRawField(field=PARTITION_RAW_ENUM_FIELD_PRIORITY)),
rawFilters,
rawFiltersAnd,
rawFilterField
)


class ArmoniKPartitions:
def __init__(self, grpc_channel: Channel):
""" Result service client

Args:
grpc_channel: gRPC channel to use
"""
self._client = PartitionsStub(grpc_channel)

def list_partitions(self, partition_filter: Filter | None = None, page: int = 0, page_size: int = 1000, sort_field: Filter = PartitionFieldFilter.PRIORITY, sort_direction: SortDirection = Direction.ASC) -> Tuple[int, List[Partition]]:
"""List partitions based on a filter.

Args:
partition_filter: Filter to apply when listing partitions
page: page number to request, useful for pagination, defaults to 0
page_size: size of a page, defaults to 1000
sort_field: field to sort the resulting list by, defaults to the status
sort_direction: direction of the sort, defaults to ascending

Returns:
A tuple containing :
- The total number of results for the given filter
- The obtained list of results
"""
request = ListPartitionsRequest(
page=page,
page_size=page_size,
sort=ListPartitionsRequest.Sort(field=cast(PartitionField, sort_field.field), direction=sort_direction),
)
if partition_filter:
request.filters = cast(rawFilters, partition_filter.to_disjunction().to_message()),
response: ListPartitionsResponse = self._client.ListPartitions(request)
return response.total, [Partition.from_message(p) for p in response.partitions]

def get_partition(self, partition_id: str) -> Partition:
"""Get a partition by its ID.

Args:
partition_id: The partition ID.

Return:
The partition summary.
"""
return Partition.from_message(self._client.GetPartition(GetPartitionRequest(id=partition_id)).partition)
Loading
Loading