Skip to content

Commit

Permalink
Update sdk example and consumer message
Browse files Browse the repository at this point in the history
  • Loading branch information
LimJunxue committed Oct 29, 2023
1 parent 0562262 commit 31c9bc5
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 14 deletions.
20 changes: 12 additions & 8 deletions sdk/py/meca_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,23 @@ async def initiateConnection(did: str, vc: str) -> None:

async def offload_task_and_get_result(
task_id: str,
containerRef: str,
container_ref: str,
data: str,
callback: Callable[[str], None],
resource: dict = None,
resource: str = None,
runtime: str = None
) -> str:
print('Offloading task...', containerRef, data)
payload = {
'did': global_did,
'task_id': task_id,
'container_reference': containerRef,
'container_reference': container_ref,
'content': data
}
if resource:
payload['resource'] = resource
if runtime:
payload['runtime'] = runtime
print('Offloading task... ', payload)
try:
r = requests.post(
f'{DISCOVERY_URL}/offloading/offload_task_and_get_result',
Expand All @@ -59,29 +59,33 @@ async def offload_task_and_get_result(
r.raise_for_status()
except Exception as e:
raise SystemExit(e)
status, response, error, task_id = r.json().values()
jsonR = r.json()
status = jsonR['status']
response = jsonR['response']
error = jsonR['error']
task_id = jsonR['task_id']
print("Received in test:", task_id, status, response, error)
callback(task_id, status, response, error)

async def offload_task(
task_id: str,
containerRef: str,
container_ref: str,
data: str,
callback: Callable[[str], None],
resource: dict = None,
runtime: str = None
) -> str:
print('Offloading task...', containerRef, data)
payload = {
'did': global_did,
'task_id': task_id,
'container_reference': containerRef,
'container_reference': container_ref,
'content': data
}
if resource:
payload['resource'] = resource
if runtime:
payload['runtime'] = runtime
print('Offloading task... ', payload)
try:
r = requests.post(
f'{DISCOVERY_URL}/offloading/offload_task',
Expand Down
10 changes: 7 additions & 3 deletions sdk/test-offload.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,18 @@ def callback_on_receive(id, status, response, err):
sliced_dataset = dataset[start_index:end_index].clone().detach()

await meca_api.offload_task_and_get_result(
str(i),
'jyume/meca:0.0.5',
json.dumps({
task_id=str(i),
container_ref='jyume/meca:0.0.6',
data=json.dumps({
"dataset": sliced_dataset.tolist(),
"point": input_data.tolist(),
"k": k,
"num_processes": num_processes
}),
resource={
"cpu": 1,
"memory": 256
},
callback=callback_on_receive)

# ==================== ====================
Expand Down
8 changes: 6 additions & 2 deletions sdk/test_basic_offload.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ async def main():
str(i),
'sampleserver:latest',
"{\"name\": \"meca dev " + str(i) + "\"}",
callback=callback_on_receive
callback=callback_on_receive,
resource={
"cpu": 1,
"memory": 256
},
)

tried = 0
while tried <= 1 and len(results) < NUMBER_OF_TASKS:
for corr_id, task_id in task_corr_ids.items():
time.sleep(2)
api_response = await meca_api.poll_result(corr_id)
status = api_response['status']
response = api_response['response']
Expand All @@ -41,7 +46,6 @@ async def main():
else:
print(error, "for task: ", task_id, "corr_id:", corr_id)
tried += 1
time.sleep(2)
break

print("All results received:", results)
Expand Down
7 changes: 6 additions & 1 deletion src/worker_renderer/task_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,14 @@ class Consumer {
task.runtime
);

let resourceConsumed = 0.1;
if (task.resource != null) {
resourceConsumed = task.resource.cpu * task.resource.memory;
}
console.log(` [con] Resource consumed: ${resourceConsumed}`);
const transactionEndDatetime = new Date().getTime();
const duration = transactionEndDatetime - transactionStartDatetime;
const reply = { id: task.id, content: result, resourceConsumed: 0.1, transactionStartDatetime, transactionEndDatetime, duration };
const reply = { id: task.id, content: result, resourceConsumed, transactionStartDatetime, transactionEndDatetime, duration };

console.log(` [con] Result: ${JSON.stringify(reply)}`);

Expand Down

0 comments on commit 31c9bc5

Please sign in to comment.