diff --git a/sdk/py/meca_api.py b/sdk/py/meca_api.py index d3dc7d6..b52141a 100644 --- a/sdk/py/meca_api.py +++ b/sdk/py/meca_api.py @@ -33,23 +33,23 @@ async def initiateConnection(did: str, vc: str) -> None: async def offload_task_and_get_result( task_id: str, - containerRef: str, + container_ref: str, data: str, callback: Callable[[str], None], - resource: dict = None, + resource: str = None, runtime: str = None ) -> str: - print('Offloading task...', containerRef, data) payload = { 'did': global_did, 'task_id': task_id, - 'container_reference': containerRef, + 'container_reference': container_ref, 'content': data } if resource: payload['resource'] = resource if runtime: payload['runtime'] = runtime + print('Offloading task... ', payload) try: r = requests.post( f'{DISCOVERY_URL}/offloading/offload_task_and_get_result', @@ -59,29 +59,33 @@ async def offload_task_and_get_result( r.raise_for_status() except Exception as e: raise SystemExit(e) - status, response, error, task_id = r.json().values() + jsonR = r.json() + status = jsonR['status'] + response = jsonR['response'] + error = jsonR['error'] + task_id = jsonR['task_id'] print("Received in test:", task_id, status, response, error) callback(task_id, status, response, error) async def offload_task( task_id: str, - containerRef: str, + container_ref: str, data: str, callback: Callable[[str], None], resource: dict = None, runtime: str = None ) -> str: - print('Offloading task...', containerRef, data) payload = { 'did': global_did, 'task_id': task_id, - 'container_reference': containerRef, + 'container_reference': container_ref, 'content': data } if resource: payload['resource'] = resource if runtime: payload['runtime'] = runtime + print('Offloading task... ', payload) try: r = requests.post( f'{DISCOVERY_URL}/offloading/offload_task', diff --git a/sdk/test-offload.py b/sdk/test-offload.py index c533f16..9394428 100644 --- a/sdk/test-offload.py +++ b/sdk/test-offload.py @@ -45,14 +45,18 @@ def callback_on_receive(id, status, response, err): sliced_dataset = dataset[start_index:end_index].clone().detach() await meca_api.offload_task_and_get_result( - str(i), - 'jyume/meca:0.0.5', - json.dumps({ + task_id=str(i), + container_ref='jyume/meca:0.0.6', + data=json.dumps({ "dataset": sliced_dataset.tolist(), "point": input_data.tolist(), "k": k, "num_processes": num_processes }), + resource={ + "cpu": 1, + "memory": 256 + }, callback=callback_on_receive) # ==================== ==================== diff --git a/sdk/test_basic_offload.py b/sdk/test_basic_offload.py index 05214bd..d2d6be6 100644 --- a/sdk/test_basic_offload.py +++ b/sdk/test_basic_offload.py @@ -25,12 +25,17 @@ async def main(): str(i), 'sampleserver:latest', "{\"name\": \"meca dev " + str(i) + "\"}", - callback=callback_on_receive + callback=callback_on_receive, + resource={ + "cpu": 1, + "memory": 256 + }, ) tried = 0 while tried <= 1 and len(results) < NUMBER_OF_TASKS: for corr_id, task_id in task_corr_ids.items(): + time.sleep(2) api_response = await meca_api.poll_result(corr_id) status = api_response['status'] response = api_response['response'] @@ -41,7 +46,6 @@ async def main(): else: print(error, "for task: ", task_id, "corr_id:", corr_id) tried += 1 - time.sleep(2) break print("All results received:", results) diff --git a/src/worker_renderer/task_consumer.js b/src/worker_renderer/task_consumer.js index fab50b9..faa1f94 100644 --- a/src/worker_renderer/task_consumer.js +++ b/src/worker_renderer/task_consumer.js @@ -107,9 +107,14 @@ class Consumer { task.runtime ); + let resourceConsumed = 0.1; + if (task.resource != null) { + resourceConsumed = task.resource.cpu * task.resource.memory; + } + console.log(` [con] Resource consumed: ${resourceConsumed}`); const transactionEndDatetime = new Date().getTime(); const duration = transactionEndDatetime - transactionStartDatetime; - const reply = { id: task.id, content: result, resourceConsumed: 0.1, transactionStartDatetime, transactionEndDatetime, duration }; + const reply = { id: task.id, content: result, resourceConsumed, transactionStartDatetime, transactionEndDatetime, duration }; console.log(` [con] Result: ${JSON.stringify(reply)}`);