Skip to content

Commit

Permalink
Add fields to task message and fix sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
LimJunxue committed Oct 27, 2023
1 parent b99a0c8 commit 8bfe0ef
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 10 deletions.
11 changes: 8 additions & 3 deletions sdk/py/meca_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
access_token = None
auth_header = None

DISCOVERY_URL = 'http://sbip-g2.d2.comp.nus.edu.sg:11000/fn-discovery'
DISCOVERY_URL = 'http://localhost:7000'
# DISCOVERY_URL = 'http://sbip-g2.d2.comp.nus.edu.sg:11000/fn-discovery'

# TODO: change to signed vc
async def initiateConnection(did: str, vc: str) -> None:
Expand Down Expand Up @@ -87,7 +88,11 @@ async def offload_task(
r.raise_for_status()
except Exception as e:
raise SystemExit(e)
status, response, error, task_id = r.json().values()
jsonR = r.json()
status = jsonR['status']
response = jsonR['response']
error = jsonR['error']
task_id = jsonR['task_id']
print("Offload response:", task_id, status, response, error)
callback(task_id, status, response, error)

Expand All @@ -102,7 +107,7 @@ async def poll_result(corr_id: str):
r.raise_for_status()
except Exception as e:
raise SystemExit(e)
return r.json().values()
return r.json()

async def disconnect():
print('Disconnecting...')
Expand Down
5 changes: 4 additions & 1 deletion sdk/test_basic_offload.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ async def main():
tried = 0
while tried <= 1 and len(results) < NUMBER_OF_TASKS:
for corr_id, task_id in task_corr_ids.items():
status, response, error, task_id = await meca_api.poll_result(corr_id)
api_response = await meca_api.poll_result(corr_id)
status = api_response['status']
response = api_response['response']
error = api_response['error']
if status == 1:
results[task_id] = response
print("Received result for task", task_id, ":", response)
Expand Down
4 changes: 4 additions & 0 deletions src/worker_renderer/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,8 @@ message Task {
message TaskResult {
string id = 1;
string content = 2;
float resource_consumed = 3;
int32 transaction_start_datetime = 4;
int32 transaction_end_datetime = 5;
int32 duration = 6;
}
25 changes: 19 additions & 6 deletions src/worker_renderer/task_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Consumer {

this.startConsumer = async function startConsumer() {
connection = await amqp.connect(MQ_URL);
console.log(' [con] Connected to ', MQ_URL);
channel = await connection.createChannel();
await channel.assertQueue(queueName, {
durable: true,
Expand All @@ -72,16 +73,23 @@ class Consumer {
});
};

this.close = async function close() {
await channel.close();
await connection.close();
this.close = function close() {
if (channel != null) {
channel.close();
channel = null;
}
if (connection != null) {
connection.close();
connection = null;
}
console.log(' [con] Connection closed');
delete Consumer.openQueues[queueName];
};

this.handleMsgContent = async function handleMsgContent(content) {
const task = parseTaskFromProto(content);
const transactionStartDatetime = new Date().getTime();

const task = parseTaskFromProto(content);
ipcRenderer.send(
'job-received',
task.id,
Expand All @@ -99,8 +107,13 @@ class Consumer {
task.runtime
);

ipcRenderer.send('job-results-received', task.id, result);
return { id: task.id, content: result };
const transactionEndDatetime = new Date().getTime();
const duration = transactionEndDatetime - transactionStartDatetime;
const reply = { id: task.id, content: result, resourceConsumed: 0.1, transactionStartDatetime, transactionEndDatetime, duration };

console.log(` [con] Result: ${JSON.stringify(reply)}`);

return reply;
};
}
}
Expand Down

0 comments on commit 8bfe0ef

Please sign in to comment.