Skip to content

Commit

Permalink
implement cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
v9n committed Dec 8, 2024
1 parent c0ea62c commit 4aa7acc
Show file tree
Hide file tree
Showing 11 changed files with 1,311 additions and 2,723 deletions.
13 changes: 13 additions & 0 deletions aggregator/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,19 @@ func (r *RpcServer) ListTasks(ctx context.Context, payload *avsproto.ListTasksRe
}, nil
}

func (r *RpcServer) ListExecutions(ctx context.Context, payload *avsproto.ListExecutionsReq) (*avsproto.ListExecutionsResp, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "%s: %s", auth.InvalidAuthenticationKey, err.Error())
}

r.config.Logger.Info("process list execution",
"user", user.Address.String(),
"task_id", payload.Id,
)
return r.engine.ListExecutions(user, payload)
}

func (r *RpcServer) GetTask(ctx context.Context, payload *avsproto.IdReq) (*avsproto.Task, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
Expand Down
37 changes: 37 additions & 0 deletions core/taskengine/cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package taskengine

import (
"encoding/base64"
"encoding/json"
)

type CursorDirection string

const (
CursorDirectionNext = CursorDirection("next")
CursorDirectionPrevious = CursorDirection("prev")
)

type Cursor struct {
Direction CursorDirection `json:"direction"`
Position string `json:"mark"`
}

func NewCursor(direction CursorDirection, position string) *Cursor {
return &Cursor{
Direction: direction,
Position: position,
}
}
func (c *Cursor) String() string {
var d []byte
d, err := json.Marshal(c)

if err != nil {
return ""
}

encoded := base64.StdEncoding.EncodeToString(d)

return encoded
}
42 changes: 40 additions & 2 deletions core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"math/big"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -30,6 +29,8 @@ import (

const (
ExecuteTask = "execute_task"
// TODO Change this before merge
ItemPerPage = 2
)

var (
Expand Down Expand Up @@ -456,13 +457,50 @@ func (n *Engine) GetTask(user *model.User, taskID string) (*model.Task, error) {
return nil, err
}

if strings.ToLower(task.Owner) != strings.ToLower(user.Address.Hex()) {
if !task.OwnedBy(user.Address) {
return nil, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}

return task, nil
}

// List Execution for a given task id
func (n *Engine) ListExecutions(user *model.User, payload *avsproto.ListExecutionsReq) (*avsproto.ListExecutionsResp, error) {
task, err := n.GetTaskByID(payload.Id)
if err != nil {
return nil, err
}

if !task.OwnedBy(user.Address) {
return nil, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}

executionKVs, err := n.db.GetByPrefix([]byte(fmt.Sprintf("history:%s", task.Id)))

if err != nil {
return nil, grpcstatus.Errorf(codes.Code(avsproto.Error_StorageUnavailable), StorageUnavailableError)
}

executioResp := &avsproto.ListExecutionsResp{
Executions: make([]*avsproto.Execution, len(executionKVs)),
Cursor: "",
}
for i, kv := range executionKVs {
exec := avsproto.Execution{}
protojson.Unmarshal(kv.Value, &exec)
executioResp.Executions[i] = &exec
if i >= ItemPerPage+1 {
break
}
}

if len(executioResp.Executions) >= ItemPerPage {
last := len(executioResp.Executions) - 1
executioResp.Cursor = NewCursor(CursorDirectionNext, fmt.Sprintf("%d", executioResp.Executions[last].Id)).String()
}
return executioResp, nil
}

func (n *Engine) DeleteTaskByUser(user *model.User, taskID string) (bool, error) {
task, err := n.GetTask(user, taskID)

Expand Down
13 changes: 12 additions & 1 deletion core/taskengine/vm_runner_rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,25 @@ func NewRestProrcessor() *RestProcessor {
}

func (r *RestProcessor) Execute(stepID string, node *avsproto.RestAPINode) (*avsproto.Execution_Step, error) {
t0 := time.Now().Unix()
s := &avsproto.Execution_Step{
NodeId: stepID,
Log: "",
OutputData: "",
Success: true,
Error: "",
StartAt: t0,
}

var err error
defer func() {
s.EndAt = time.Now().Unix()
s.Success = err == nil
if err != nil {
s.Error = err.Error()
}
}()

var log strings.Builder

request := r.client.R().
Expand All @@ -52,7 +63,6 @@ func (r *RestProcessor) Execute(stepID string, node *avsproto.RestAPINode) (*avs
}

var resp *resty.Response
var err error
if strings.EqualFold(node.Method, "post") {
resp, err = request.Post(node.Url)
} else if strings.EqualFold(node.Method, "get") {
Expand All @@ -63,6 +73,7 @@ func (r *RestProcessor) Execute(stepID string, node *avsproto.RestAPINode) (*avs

u, err := url.Parse(node.Url)
if err != nil {
s.Error = fmt.Sprintf("cannot parse url: %s", node.Url)
return nil, err
}

Expand Down
13 changes: 13 additions & 0 deletions examples/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ async function getTask(owner, token, taskId) {
console.log(util.inspect(result, { depth: 4, colors: true }));
}

async function listExecutions(owner, token, taskId) {
const metadata = new grpc.Metadata();
metadata.add("authkey", token);

const result = await asyncRPC(client, "ListExecutions", { id: taskId }, metadata);

console.log(util.inspect(result, { depth: 4, colors: true }));
}


async function cancel(owner, token, taskId) {
const metadata = new grpc.Metadata();
metadata.add("authkey", token);
Expand Down Expand Up @@ -329,6 +339,9 @@ const main = async (cmd) => {
await getTask(owner, token, process.argv[3]);
break;

case "executions":
await listExecutions(owner, token, process.argv[3]);
break;
case "cancel":
await cancel(owner, token, process.argv[3]);
break;
Expand Down
160 changes: 33 additions & 127 deletions examples/static_codegen/avs_grpc_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,8 @@
'use strict';
var grpc = require('@grpc/grpc-js');
var avs_pb = require('./avs_pb.js');
var google_protobuf_timestamp_pb = require('google-protobuf/google/protobuf/timestamp_pb.js');
var google_protobuf_wrappers_pb = require('google-protobuf/google/protobuf/wrappers_pb.js');

function serialize_aggregator_AckMessageReq(arg) {
if (!(arg instanceof avs_pb.AckMessageReq)) {
throw new Error('Expected argument of type aggregator.AckMessageReq');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_aggregator_AckMessageReq(buffer_arg) {
return avs_pb.AckMessageReq.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_Checkin(arg) {
if (!(arg instanceof avs_pb.Checkin)) {
throw new Error('Expected argument of type aggregator.Checkin');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_aggregator_Checkin(buffer_arg) {
return avs_pb.Checkin.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_CheckinResp(arg) {
if (!(arg instanceof avs_pb.CheckinResp)) {
throw new Error('Expected argument of type aggregator.CheckinResp');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_aggregator_CheckinResp(buffer_arg) {
return avs_pb.CheckinResp.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_CreateTaskReq(arg) {
if (!(arg instanceof avs_pb.CreateTaskReq)) {
throw new Error('Expected argument of type aggregator.CreateTaskReq');
Expand Down Expand Up @@ -116,6 +82,28 @@ function deserialize_aggregator_KeyResp(buffer_arg) {
return avs_pb.KeyResp.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_ListExecutionsReq(arg) {
if (!(arg instanceof avs_pb.ListExecutionsReq)) {
throw new Error('Expected argument of type aggregator.ListExecutionsReq');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_aggregator_ListExecutionsReq(buffer_arg) {
return avs_pb.ListExecutionsReq.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_ListExecutionsResp(arg) {
if (!(arg instanceof avs_pb.ListExecutionsResp)) {
throw new Error('Expected argument of type aggregator.ListExecutionsResp');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_aggregator_ListExecutionsResp(buffer_arg) {
return avs_pb.ListExecutionsResp.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_ListTasksReq(arg) {
if (!(arg instanceof avs_pb.ListTasksReq)) {
throw new Error('Expected argument of type aggregator.ListTasksReq');
Expand Down Expand Up @@ -182,50 +170,6 @@ function deserialize_aggregator_NonceResp(buffer_arg) {
return avs_pb.NonceResp.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_NotifyTriggersReq(arg) {
if (!(arg instanceof avs_pb.NotifyTriggersReq)) {
throw new Error('Expected argument of type aggregator.NotifyTriggersReq');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_aggregator_NotifyTriggersReq(buffer_arg) {
return avs_pb.NotifyTriggersReq.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_NotifyTriggersResp(arg) {
if (!(arg instanceof avs_pb.NotifyTriggersResp)) {
throw new Error('Expected argument of type aggregator.NotifyTriggersResp');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_aggregator_NotifyTriggersResp(buffer_arg) {
return avs_pb.NotifyTriggersResp.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_SyncMessagesReq(arg) {
if (!(arg instanceof avs_pb.SyncMessagesReq)) {
throw new Error('Expected argument of type aggregator.SyncMessagesReq');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_aggregator_SyncMessagesReq(buffer_arg) {
return avs_pb.SyncMessagesReq.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_SyncMessagesResp(arg) {
if (!(arg instanceof avs_pb.SyncMessagesResp)) {
throw new Error('Expected argument of type aggregator.SyncMessagesResp');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_aggregator_SyncMessagesResp(buffer_arg) {
return avs_pb.SyncMessagesResp.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_Task(arg) {
if (!(arg instanceof avs_pb.Task)) {
throw new Error('Expected argument of type aggregator.Task');
Expand Down Expand Up @@ -330,6 +274,17 @@ createTask: {
responseSerialize: serialize_aggregator_Task,
responseDeserialize: deserialize_aggregator_Task,
},
listExecutions: {
path: '/aggregator.Aggregator/ListExecutions',
requestStream: false,
responseStream: false,
requestType: avs_pb.ListExecutionsReq,
responseType: avs_pb.ListExecutionsResp,
requestSerialize: serialize_aggregator_ListExecutionsReq,
requestDeserialize: deserialize_aggregator_ListExecutionsReq,
responseSerialize: serialize_aggregator_ListExecutionsResp,
responseDeserialize: deserialize_aggregator_ListExecutionsResp,
},
cancelTask: {
path: '/aggregator.Aggregator/CancelTask',
requestStream: false,
Expand All @@ -355,52 +310,3 @@ createTask: {
};

exports.AggregatorClient = grpc.makeGenericClientConstructor(AggregatorService);
var NodeService = exports.NodeService = {
// Operator endpoint
ping: {
path: '/aggregator.Node/Ping',
requestStream: false,
responseStream: false,
requestType: avs_pb.Checkin,
responseType: avs_pb.CheckinResp,
requestSerialize: serialize_aggregator_Checkin,
requestDeserialize: deserialize_aggregator_Checkin,
responseSerialize: serialize_aggregator_CheckinResp,
responseDeserialize: deserialize_aggregator_CheckinResp,
},
syncMessages: {
path: '/aggregator.Node/SyncMessages',
requestStream: false,
responseStream: true,
requestType: avs_pb.SyncMessagesReq,
responseType: avs_pb.SyncMessagesResp,
requestSerialize: serialize_aggregator_SyncMessagesReq,
requestDeserialize: deserialize_aggregator_SyncMessagesReq,
responseSerialize: serialize_aggregator_SyncMessagesResp,
responseDeserialize: deserialize_aggregator_SyncMessagesResp,
},
ack: {
path: '/aggregator.Node/Ack',
requestStream: false,
responseStream: false,
requestType: avs_pb.AckMessageReq,
responseType: google_protobuf_wrappers_pb.BoolValue,
requestSerialize: serialize_aggregator_AckMessageReq,
requestDeserialize: deserialize_aggregator_AckMessageReq,
responseSerialize: serialize_google_protobuf_BoolValue,
responseDeserialize: deserialize_google_protobuf_BoolValue,
},
notifyTriggers: {
path: '/aggregator.Node/NotifyTriggers',
requestStream: false,
responseStream: false,
requestType: avs_pb.NotifyTriggersReq,
responseType: avs_pb.NotifyTriggersResp,
requestSerialize: serialize_aggregator_NotifyTriggersReq,
requestDeserialize: deserialize_aggregator_NotifyTriggersReq,
responseSerialize: serialize_aggregator_NotifyTriggersResp,
responseDeserialize: deserialize_aggregator_NotifyTriggersResp,
},
};

exports.NodeClient = grpc.makeGenericClientConstructor(NodeService);
Loading

0 comments on commit 4aa7acc

Please sign in to comment.