diff --git a/README.md b/README.md index 5bd568a..c08d77b 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,9 @@ [![Go](https://github.com/ehsaniara/scheduler/actions/workflows/go.yml/badge.svg?branch=main)](https://github.com/ehsaniara/scheduler/actions/workflows/go.yml) -![scheduler-logo.png](docs/scheduler-logo.png) +

+ scheduler logo +

Scheduler is a High Throughput Distributed Task Scheduler is an advanced system designed to manage and execute a vast number of tasks across a distributed network of servers. Built on top of the **Redis** database, it leverages Redis's high-speed in-memory data store for quick access and efficient task management. diff --git a/config/config.go b/config/config.go index baa717e..fda03c0 100644 --- a/config/config.go +++ b/config/config.go @@ -6,6 +6,10 @@ import ( "os" ) +const ( + ExecutionTimestamp = "executionTimestamp" +) + type Config struct { HttpServer HttpServerConfig Storage StorageConfig diff --git a/core/scheduler.go b/core/scheduler.go index 34f4b83..5715c4c 100644 --- a/core/scheduler.go +++ b/core/scheduler.go @@ -2,7 +2,6 @@ package core import ( "context" - "fmt" "github.com/IBM/sarama" "github.com/ehsaniara/scheduler/config" "github.com/ehsaniara/scheduler/kafka" @@ -28,8 +27,6 @@ type scheduler struct { config *config.Config } -const headerName = "executionTimestamp" - func NewScheduler(ctx context.Context, storage storage.TaskStorage, producer kafka.SyncProducer, config *config.Config) Scheduler { s := &scheduler{ quit: make(chan struct{}), @@ -50,10 +47,10 @@ func (s *scheduler) PublishNewTask(task *_pb.Task) { log.Print("--------- publish task ----------") headers := make([]sarama.RecordHeader, 0) - if task.ExecutionTimestamp > 0 { + for k, v := range task.Header { headers = append(headers, sarama.RecordHeader{ - Key: []byte(headerName), - Value: []byte(fmt.Sprintf("%f", task.ExecutionTimestamp)), + Key: []byte(k), + Value: v, }) } @@ -75,28 +72,35 @@ func (s *scheduler) Dispatcher(message *sarama.ConsumerMessage) { log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s\n", string(message.Value), message.Timestamp, message.Topic) if message.Headers == nil { + log.Printf("Headers are missing") return - //message.Headers = make([]sarama.RecordHeader, 0, 1) } + header := make(map[string][]byte) + // if executionTimestamp missing, it will be run right away - var executionTimestamp float64 = 0 - for _, header := range message.Headers { - if header != nil && string(header.Key) == headerName { - floatVal, _ := strconv.ParseFloat(string(header.Value), 64) - executionTimestamp = floatVal + hasExecutionTimestamp := false + for _, h := range message.Headers { + header[string(h.Key)] = h.Value + + if string(h.Key) == config.ExecutionTimestamp { + hasExecutionTimestamp = true } } - if executionTimestamp == 0 { + + if !hasExecutionTimestamp { unixMilli := time.Now().UnixMilli() - log.Printf("executionTimestamp is missing the message header so it's : %v", unixMilli) - executionTimestamp = float64(unixMilli) + log.Printf("executionTimestamp is missing the message h so it's : %v", unixMilli) + header[config.ExecutionTimestamp] = []byte(strconv.FormatInt(unixMilli, 10)) + } + + for _, h := range message.Headers { + header[string(h.Key)] = h.Value } task := _pb.Task{ - ExecutionTimestamp: executionTimestamp, - Header: make(map[string][]byte), - Pyload: message.Value, + Header: header, + Pyload: message.Value, } s.storage.SetNewTask(s.ctx, &task) } diff --git a/core/scheduler_test.go b/core/scheduler_test.go index 9b44317..37d881b 100644 --- a/core/scheduler_test.go +++ b/core/scheduler_test.go @@ -10,6 +10,7 @@ import ( "github.com/ehsaniara/scheduler/storage/storagefakes" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "strconv" "testing" "time" ) @@ -48,7 +49,7 @@ func Test_Dispatcher(t *testing.T) { newScheduler.Dispatcher(&sarama.ConsumerMessage{ Key: []byte(key), Headers: []*sarama.RecordHeader{{ - Key: []byte("executionTimestamp"), + Key: []byte(config.ExecutionTimestamp), Value: []byte(fmt.Sprintf("%v", executionTime)), }}, Value: payloadMarshal, @@ -56,20 +57,13 @@ func Test_Dispatcher(t *testing.T) { }) task := &_pb.Task{ - ExecutionTimestamp: executionTime, - Header: make(map[string][]byte), - Pyload: payloadMarshal, + Header: map[string][]byte{config.ExecutionTimestamp: []byte(fmt.Sprintf("%v", executionTime))}, + Pyload: payloadMarshal, } - // create task from kafka message - //taskMarshal, err := proto.Marshal(task) - //assert.NoError(t, err) - //assert.NotNil(t, taskMarshal) - _ctx, _task := fakeStorage.SetNewTaskArgsForCall(0) assert.Equal(t, ctx, _ctx) assert.Equal(t, task.Header, _task.Header) - assert.Equal(t, task.ExecutionTimestamp, _task.ExecutionTimestamp) assert.Equal(t, task.Pyload, _task.Pyload) } @@ -122,9 +116,8 @@ func Test_scheduler_run_Eval_with_value(t *testing.T) { // create task var tasks []*_pb.Task task := &_pb.Task{ - ExecutionTimestamp: float64(executionTime), - Header: make(map[string][]byte), - Pyload: []byte("some task"), + Header: map[string][]byte{config.ExecutionTimestamp: []byte(strconv.FormatInt(executionTime, 10))}, + Pyload: []byte("some task"), } tasks = append(tasks, task) @@ -182,18 +175,16 @@ func Test_scheduler_PublishNewTask(t *testing.T) { } // 2 seconds from now - executionTime := float64(time.Now().Add(2 * time.Second).UnixMilli()) + executionTime := time.Now().Add(2 * time.Second).UnixMilli() payloadMarshal := []byte("some payload data") task := &_pb.Task{ - ExecutionTimestamp: executionTime, - Header: make(map[string][]byte), - Pyload: payloadMarshal, + Header: map[string][]byte{config.ExecutionTimestamp: []byte(strconv.FormatInt(executionTime, 10))}, + Pyload: payloadMarshal, } // create kafka message from payload newScheduler := NewScheduler(ctx, fakeStorage, fakeSyncProducer, &c) newScheduler.PublishNewTask(task) assert.Equal(t, 1, fakeSyncProducer.SendMessageCallCount()) - } diff --git a/docs/diagram1.png b/docs/diagram1.png index 48763fa..e87b203 100644 Binary files a/docs/diagram1.png and b/docs/diagram1.png differ diff --git a/e2e/e2e_with_kafka_test.go b/e2e/e2e_with_kafka_test.go index bae20df..3761765 100644 --- a/e2e/e2e_with_kafka_test.go +++ b/e2e/e2e_with_kafka_test.go @@ -111,10 +111,9 @@ func TestPositiveIntegrationTest(t *testing.T) { // Schedule 1/4 seconds from now executionTime := float64(time.Now().Add(time.Millisecond * 250).UnixMilli()) task := _task.Task{ - TaskUuid: uuid.NewString(), - ExecutionTimestamp: executionTime, - Header: make(map[string][]byte), - Pyload: []byte("some payload data"), + TaskUuid: uuid.NewString(), + Header: map[string][]byte{config.ExecutionTimestamp: []byte(fmt.Sprintf("%v", executionTime))}, + Pyload: []byte("some payload data"), } m, e := proto.Marshal(&task) assert.NoError(t, e) @@ -167,7 +166,6 @@ func consumerForTaskExecutionTopic(t *testing.T, ctx context.Context, wg *sync.W err := proto.Unmarshal(message.Value, &task) assert.NoError(t, err) - assert.Equal(t, expectedTask.ExecutionTimestamp, task.ExecutionTimestamp) assert.Equal(t, expectedTask.TaskType, task.TaskType) assert.Equal(t, expectedTask.TaskUuid, task.TaskUuid) diff --git a/httpserver/server.go b/httpserver/server.go index 078f5bd..3dcbe19 100644 --- a/httpserver/server.go +++ b/httpserver/server.go @@ -1,5 +1,8 @@ package httpserver +// You only need **one** of these per package! +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate + import ( "context" "encoding/base64" @@ -17,18 +20,18 @@ import ( "time" ) -// You only need **one** of these per package! -//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate - type server struct { - httpPort int - httpServer HTTPServer - taskStorage storage.TaskStorage - scheduler core.Scheduler - config *config.Config - quit chan struct{} - ready chan bool - stop sync.Once + httpPort int + httpServer HTTPServer + taskStorage storage.TaskStorage + scheduler core.Scheduler + config *config.Config + quit chan struct{} + ready chan bool + stop sync.Once + stringToTaskType StringToTaskTypeFn + convertParameterToHeader ConvertParameterToHeaderFn + convertHeaderToParameter ConvertHeaderToParameterFn } // HTTPServer is an interface that abstracts the http.Server methods needed by our server. @@ -43,13 +46,16 @@ var _ HTTPServer = (*http.Server)(nil) // NewServer should be the last service to be run so K8s know application is fully up func NewServer(ctx context.Context, httpServer HTTPServer, taskStorage storage.TaskStorage, scheduler core.Scheduler, config *config.Config) func() { s := &server{ - httpServer: httpServer, - taskStorage: taskStorage, - config: config, - scheduler: scheduler, - quit: make(chan struct{}), - ready: make(chan bool, 1), - httpPort: config.HttpServer.Port, + httpServer: httpServer, + taskStorage: taskStorage, + config: config, + scheduler: scheduler, + quit: make(chan struct{}), + ready: make(chan bool, 1), + httpPort: config.HttpServer.Port, + stringToTaskType: stringToTaskType, + convertParameterToHeader: convertParameterToHeader, + convertHeaderToParameter: convertHeaderToParameter, } go s.runServer(ctx) @@ -128,7 +134,7 @@ type Task struct { TaskUuid string `json:"taskUuid"` ExecutionTimestamp float64 `json:"executionTimestamp"` TaskType string `json:"taskType"` - Header map[string]string `json:"header"` + Parameter map[string]string `json:"parameter,omitempty"` Pyload string `json:"pyload"` } @@ -155,11 +161,10 @@ func (s *server) GetAllTasksHandler(c *gin.Context) { var tasks []Task for _, task := range s.taskStorage.GetAllTasksPagination(c.Request.Context(), int32(offset), int32(limit)) { tasks = append(tasks, Task{ - TaskUuid: task.TaskUuid, - ExecutionTimestamp: task.ExecutionTimestamp, - TaskType: task.TaskType.String(), - Header: ConvertProtoHeaderToHeader(task.Header), - Pyload: base64.StdEncoding.EncodeToString(task.Pyload), + TaskUuid: task.TaskUuid, + TaskType: task.TaskType.String(), + Parameter: s.convertHeaderToParameter(task.Header), + Pyload: base64.StdEncoding.EncodeToString(task.Pyload), }) } if len(tasks) > 0 { @@ -178,7 +183,7 @@ func (s *server) SetNewTaskHandler(c *gin.Context) { return } - taskType, err := StringToTaskType(task.TaskType) + taskType, err := s.stringToTaskType(task.TaskType) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return @@ -192,11 +197,10 @@ func (s *server) SetNewTaskHandler(c *gin.Context) { } pbTask := &_pb.Task{ - TaskUuid: task.TaskUuid, - ExecutionTimestamp: task.ExecutionTimestamp, - TaskType: taskType, - Header: ConvertHeaderToProtoHeader(task.Header), - Pyload: byteArray, + TaskUuid: task.TaskUuid, + TaskType: taskType, + Header: s.convertParameterToHeader(task.Parameter), + Pyload: byteArray, } // enable to use kafka or direct write to redis @@ -212,8 +216,11 @@ func (s *server) SetNewTaskHandler(c *gin.Context) { }) } -// StringToTaskType converts a taskStr to a Task_Type enum. -func StringToTaskType(taskStr string) (_pb.Task_Type, error) { +// StringToTaskTypeFn use as injection +type StringToTaskTypeFn func(taskStr string) (_pb.Task_Type, error) + +// stringToTaskType converts a taskStr to a Task_Type enum. +func stringToTaskType(taskStr string) (_pb.Task_Type, error) { // Look up the enum value by name if enumVal, ok := _pb.Task_Type_value[taskStr]; ok { return _pb.Task_Type(enumVal), nil @@ -221,8 +228,11 @@ func StringToTaskType(taskStr string) (_pb.Task_Type, error) { return _pb.Task_PUB_SUB, fmt.Errorf("invalid enum value: %s", taskStr) } -// ConvertHeaderToProtoHeader return map[string][]byte -func ConvertHeaderToProtoHeader(header map[string]string) map[string][]byte { +// ConvertParameterToHeaderFn use as injection +type ConvertParameterToHeaderFn func(header map[string]string) map[string][]byte + +// convertParameterToHeader return map[string][]byte +func convertParameterToHeader(header map[string]string) map[string][]byte { var pbHeader = make(map[string][]byte) for k, v := range header { byteArray, err := base64.StdEncoding.DecodeString(v) @@ -235,8 +245,11 @@ func ConvertHeaderToProtoHeader(header map[string]string) map[string][]byte { return pbHeader } -// ConvertProtoHeaderToHeader return map[string]string -func ConvertProtoHeaderToHeader(header map[string][]byte) map[string]string { +// ConvertHeaderToParameterFn use as injection +type ConvertHeaderToParameterFn func(header map[string][]byte) map[string]string + +// ConvertHeaderToParameter return map[string]string +func convertHeaderToParameter(header map[string][]byte) map[string]string { var pbHeader = make(map[string]string) for k, v := range header { // Encode the serialized protobuf message to a Base64 encoded string diff --git a/proto/task.pb.go b/proto/task.pb.go index 0845051..e39bba0 100644 --- a/proto/task.pb.go +++ b/proto/task.pb.go @@ -71,11 +71,10 @@ type Task struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - TaskUuid string `protobuf:"bytes,1,opt,name=taskUuid,proto3" json:"taskUuid,omitempty"` - ExecutionTimestamp float64 `protobuf:"fixed64,2,opt,name=executionTimestamp,proto3" json:"executionTimestamp,omitempty"` - TaskType Task_Type `protobuf:"varint,3,opt,name=taskType,proto3,enum=com.github.ehsaniara.Task_Type" json:"taskType,omitempty"` - Header map[string][]byte `protobuf:"bytes,4,rep,name=header,proto3" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - Pyload []byte `protobuf:"bytes,5,opt,name=pyload,proto3" json:"pyload,omitempty"` + TaskUuid string `protobuf:"bytes,1,opt,name=taskUuid,proto3" json:"taskUuid,omitempty"` + TaskType Task_Type `protobuf:"varint,2,opt,name=taskType,proto3,enum=com.github.ehsaniara.Task_Type" json:"taskType,omitempty"` + Header map[string][]byte `protobuf:"bytes,3,rep,name=header,proto3" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Pyload []byte `protobuf:"bytes,4,opt,name=pyload,proto3" json:"pyload,omitempty"` } func (x *Task) Reset() { @@ -117,13 +116,6 @@ func (x *Task) GetTaskUuid() string { return "" } -func (x *Task) GetExecutionTimestamp() float64 { - if x != nil { - return x.ExecutionTimestamp - } - return 0 -} - func (x *Task) GetTaskType() Task_Type { if x != nil { return x.TaskType @@ -150,20 +142,17 @@ var File_task_proto protoreflect.FileDescriptor var file_task_proto_rawDesc = []byte{ 0x0a, 0x0a, 0x74, 0x61, 0x73, 0x6b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x14, 0x63, 0x6f, 0x6d, 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x65, 0x68, 0x73, 0x61, 0x6e, 0x69, 0x61, - 0x72, 0x61, 0x22, 0xc6, 0x02, 0x0a, 0x04, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x1a, 0x0a, 0x08, 0x74, + 0x72, 0x61, 0x22, 0x96, 0x02, 0x0a, 0x04, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x61, 0x73, 0x6b, 0x55, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, - 0x61, 0x73, 0x6b, 0x55, 0x75, 0x69, 0x64, 0x12, 0x2e, 0x0a, 0x12, 0x65, 0x78, 0x65, 0x63, 0x75, - 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x01, 0x52, 0x12, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x69, - 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x3b, 0x0a, 0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, - 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1f, 0x2e, 0x63, 0x6f, 0x6d, 0x2e, + 0x61, 0x73, 0x6b, 0x55, 0x75, 0x69, 0x64, 0x12, 0x3b, 0x0a, 0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, + 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1f, 0x2e, 0x63, 0x6f, 0x6d, 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x65, 0x68, 0x73, 0x61, 0x6e, 0x69, 0x61, 0x72, 0x61, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x08, 0x74, 0x61, 0x73, 0x6b, - 0x54, 0x79, 0x70, 0x65, 0x12, 0x3e, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x04, + 0x54, 0x79, 0x70, 0x65, 0x12, 0x3e, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x63, 0x6f, 0x6d, 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x65, 0x68, 0x73, 0x61, 0x6e, 0x69, 0x61, 0x72, 0x61, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x68, 0x65, - 0x61, 0x64, 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x05, + 0x61, 0x64, 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x70, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x39, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, diff --git a/proto/task.proto b/proto/task.proto index 334540a..91c70a3 100644 --- a/proto/task.proto +++ b/proto/task.proto @@ -6,10 +6,9 @@ package com.github.ehsaniara; message Task{ string taskUuid = 1; - double executionTimestamp = 2; - Type taskType = 3; - map header = 4; - bytes pyload = 5; + Type taskType = 2; + map header = 3; + bytes pyload = 4; enum Type{ PUB_SUB = 0; diff --git a/storage/redis_client.go b/storage/redis_client.go index 382495b..024fbac 100644 --- a/storage/redis_client.go +++ b/storage/redis_client.go @@ -5,6 +5,7 @@ package storage import ( "context" + "encoding/binary" "fmt" "github.com/ehsaniara/scheduler/config" "github.com/ehsaniara/scheduler/interfaces" @@ -13,6 +14,7 @@ import ( "github.com/redis/go-redis/v9" "google.golang.org/protobuf/proto" "log" + "math" "time" ) @@ -90,8 +92,17 @@ func (c *taskRedisClient) SetNewTask(ctx context.Context, task *_pb.Task) { return } + //ExecutionTimestamp + var executionTimestamp float64 + for k, v := range task.Header { + if k == config.ExecutionTimestamp { + bits := binary.LittleEndian.Uint64(v) + executionTimestamp = math.Float64frombits(bits) + } + } + c.rdb.ZAdd(ctx, c.config.Storage.SchedulerKeyName, redis.Z{ - Score: task.ExecutionTimestamp, + Score: executionTimestamp, Member: marshal, }) } diff --git a/storage/redis_client_test.go b/storage/redis_client_test.go index 4e8e2b8..7e5ab05 100644 --- a/storage/redis_client_test.go +++ b/storage/redis_client_test.go @@ -14,6 +14,7 @@ import ( "go.uber.org/goleak" "google.golang.org/protobuf/proto" "log" + "strconv" "testing" "time" ) @@ -58,9 +59,8 @@ func Test_taskRedisClient_SetNewTask(t *testing.T) { // create task task := &_pb.Task{ - ExecutionTimestamp: float64(executionTime), - Header: make(map[string][]byte), - Pyload: []byte("some task"), + Header: map[string][]byte{config.ExecutionTimestamp: []byte(strconv.FormatInt(executionTime, 10))}, + Pyload: []byte("some task"), } // call the method @@ -150,9 +150,8 @@ func Test_taskRedisClient_GetAllTasksPagination(t *testing.T) { var ts []*_pb.Task ts = append(ts, &_pb.Task{ - ExecutionTimestamp: float64(executionTime), - Header: make(map[string][]byte), - Pyload: []byte("some task"), + Header: map[string][]byte{config.ExecutionTimestamp: []byte(strconv.FormatInt(executionTime, 10))}, + Pyload: []byte("some task"), }) tests := []struct { @@ -336,10 +335,9 @@ func TestConvertByteToTasks(t *testing.T) { func TestConvertByteToTasks_ValidProtobufMessages(t *testing.T) { executionTime := time.Now().Add(100 * time.Millisecond).UnixMilli() // Schedule 0.1 seconds from now task := _pb.Task{ - ExecutionTimestamp: float64(executionTime), - Header: make(map[string][]byte), - Pyload: []byte("----- some task ------"), // length should be grater than 32bit - TaskType: _pb.Task_PUB_SUB, + Header: map[string][]byte{config.ExecutionTimestamp: []byte(strconv.FormatInt(executionTime, 10))}, + Pyload: []byte("----- some task ------"), // length should be grater than 32bit + TaskType: _pb.Task_PUB_SUB, } // Serialize the task into bytes @@ -366,7 +364,7 @@ func TestConvertByteToTasks_ValidProtobufMessages(t *testing.T) { for i, actualTask := range tasks { assert.Equal(t, expectedTasks[i].TaskType, actualTask.TaskType) assert.Equal(t, expectedTasks[i].TaskUuid, actualTask.TaskUuid) - assert.Equal(t, expectedTasks[i].ExecutionTimestamp, actualTask.ExecutionTimestamp) + assert.Equal(t, expectedTasks[i].Header, actualTask.Header) } // Assert the result