Skip to content

Commit

Permalink
remove executionTimestamp from task in favor of headers
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay authored and Jay committed Jul 8, 2024
1 parent 8755453 commit 502d8ee
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 114 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
[![Go](https://github.com/ehsaniara/scheduler/actions/workflows/go.yml/badge.svg?branch=main)](https://github.com/ehsaniara/scheduler/actions/workflows/go.yml)


![scheduler-logo.png](docs/scheduler-logo.png)
<p align="center">
<img src="docs/scheduler-logo.png" alt="scheduler logo"/>
</p>

Scheduler is a High Throughput Distributed Task Scheduler is an advanced system designed to manage and execute a vast number of tasks across a distributed network of servers.
Built on top of the **Redis** database, it leverages Redis's high-speed in-memory data store for quick access and efficient task management.
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"os"
)

const (
ExecutionTimestamp = "executionTimestamp"
)

type Config struct {
HttpServer HttpServerConfig
Storage StorageConfig
Expand Down
40 changes: 22 additions & 18 deletions core/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package core

import (
"context"
"fmt"
"github.com/IBM/sarama"
"github.com/ehsaniara/scheduler/config"
"github.com/ehsaniara/scheduler/kafka"
Expand All @@ -28,8 +27,6 @@ type scheduler struct {
config *config.Config
}

const headerName = "executionTimestamp"

func NewScheduler(ctx context.Context, storage storage.TaskStorage, producer kafka.SyncProducer, config *config.Config) Scheduler {
s := &scheduler{
quit: make(chan struct{}),
Expand All @@ -50,10 +47,10 @@ func (s *scheduler) PublishNewTask(task *_pb.Task) {
log.Print("--------- publish task ----------")

headers := make([]sarama.RecordHeader, 0)
if task.ExecutionTimestamp > 0 {
for k, v := range task.Header {
headers = append(headers, sarama.RecordHeader{
Key: []byte(headerName),
Value: []byte(fmt.Sprintf("%f", task.ExecutionTimestamp)),
Key: []byte(k),
Value: v,
})
}

Expand All @@ -75,28 +72,35 @@ func (s *scheduler) Dispatcher(message *sarama.ConsumerMessage) {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s\n", string(message.Value), message.Timestamp, message.Topic)

if message.Headers == nil {
log.Printf("Headers are missing")
return
//message.Headers = make([]sarama.RecordHeader, 0, 1)
}

header := make(map[string][]byte)

// if executionTimestamp missing, it will be run right away
var executionTimestamp float64 = 0
for _, header := range message.Headers {
if header != nil && string(header.Key) == headerName {
floatVal, _ := strconv.ParseFloat(string(header.Value), 64)
executionTimestamp = floatVal
hasExecutionTimestamp := false
for _, h := range message.Headers {
header[string(h.Key)] = h.Value

if string(h.Key) == config.ExecutionTimestamp {
hasExecutionTimestamp = true
}
}
if executionTimestamp == 0 {

if !hasExecutionTimestamp {
unixMilli := time.Now().UnixMilli()
log.Printf("executionTimestamp is missing the message header so it's : %v", unixMilli)
executionTimestamp = float64(unixMilli)
log.Printf("executionTimestamp is missing the message h so it's : %v", unixMilli)
header[config.ExecutionTimestamp] = []byte(strconv.FormatInt(unixMilli, 10))
}

for _, h := range message.Headers {
header[string(h.Key)] = h.Value
}

task := _pb.Task{
ExecutionTimestamp: executionTimestamp,
Header: make(map[string][]byte),
Pyload: message.Value,
Header: header,
Pyload: message.Value,
}
s.storage.SetNewTask(s.ctx, &task)
}
Expand Down
27 changes: 9 additions & 18 deletions core/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ehsaniara/scheduler/storage/storagefakes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"strconv"
"testing"
"time"
)
Expand Down Expand Up @@ -48,28 +49,21 @@ func Test_Dispatcher(t *testing.T) {
newScheduler.Dispatcher(&sarama.ConsumerMessage{
Key: []byte(key),
Headers: []*sarama.RecordHeader{{
Key: []byte("executionTimestamp"),
Key: []byte(config.ExecutionTimestamp),
Value: []byte(fmt.Sprintf("%v", executionTime)),
}},
Value: payloadMarshal,
Topic: schedulerTopic,
})

task := &_pb.Task{
ExecutionTimestamp: executionTime,
Header: make(map[string][]byte),
Pyload: payloadMarshal,
Header: map[string][]byte{config.ExecutionTimestamp: []byte(fmt.Sprintf("%v", executionTime))},
Pyload: payloadMarshal,
}

// create task from kafka message
//taskMarshal, err := proto.Marshal(task)
//assert.NoError(t, err)
//assert.NotNil(t, taskMarshal)

_ctx, _task := fakeStorage.SetNewTaskArgsForCall(0)
assert.Equal(t, ctx, _ctx)
assert.Equal(t, task.Header, _task.Header)
assert.Equal(t, task.ExecutionTimestamp, _task.ExecutionTimestamp)
assert.Equal(t, task.Pyload, _task.Pyload)

}
Expand Down Expand Up @@ -122,9 +116,8 @@ func Test_scheduler_run_Eval_with_value(t *testing.T) {
// create task
var tasks []*_pb.Task
task := &_pb.Task{
ExecutionTimestamp: float64(executionTime),
Header: make(map[string][]byte),
Pyload: []byte("some task"),
Header: map[string][]byte{config.ExecutionTimestamp: []byte(strconv.FormatInt(executionTime, 10))},
Pyload: []byte("some task"),
}

tasks = append(tasks, task)
Expand Down Expand Up @@ -182,18 +175,16 @@ func Test_scheduler_PublishNewTask(t *testing.T) {
}

// 2 seconds from now
executionTime := float64(time.Now().Add(2 * time.Second).UnixMilli())
executionTime := time.Now().Add(2 * time.Second).UnixMilli()

payloadMarshal := []byte("some payload data")

task := &_pb.Task{
ExecutionTimestamp: executionTime,
Header: make(map[string][]byte),
Pyload: payloadMarshal,
Header: map[string][]byte{config.ExecutionTimestamp: []byte(strconv.FormatInt(executionTime, 10))},
Pyload: payloadMarshal,
}
// create kafka message from payload
newScheduler := NewScheduler(ctx, fakeStorage, fakeSyncProducer, &c)
newScheduler.PublishNewTask(task)
assert.Equal(t, 1, fakeSyncProducer.SendMessageCallCount())

}
Binary file modified docs/diagram1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 3 additions & 5 deletions e2e/e2e_with_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,9 @@ func TestPositiveIntegrationTest(t *testing.T) {
// Schedule 1/4 seconds from now
executionTime := float64(time.Now().Add(time.Millisecond * 250).UnixMilli())
task := _task.Task{
TaskUuid: uuid.NewString(),
ExecutionTimestamp: executionTime,
Header: make(map[string][]byte),
Pyload: []byte("some payload data"),
TaskUuid: uuid.NewString(),
Header: map[string][]byte{config.ExecutionTimestamp: []byte(fmt.Sprintf("%v", executionTime))},
Pyload: []byte("some payload data"),
}
m, e := proto.Marshal(&task)
assert.NoError(t, e)
Expand Down Expand Up @@ -167,7 +166,6 @@ func consumerForTaskExecutionTopic(t *testing.T, ctx context.Context, wg *sync.W
err := proto.Unmarshal(message.Value, &task)
assert.NoError(t, err)

assert.Equal(t, expectedTask.ExecutionTimestamp, task.ExecutionTimestamp)
assert.Equal(t, expectedTask.TaskType, task.TaskType)
assert.Equal(t, expectedTask.TaskUuid, task.TaskUuid)

Expand Down
85 changes: 49 additions & 36 deletions httpserver/server.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package httpserver

// You only need **one** of these per package!
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate

import (
"context"
"encoding/base64"
Expand All @@ -17,18 +20,18 @@ import (
"time"
)

// You only need **one** of these per package!
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate

type server struct {
httpPort int
httpServer HTTPServer
taskStorage storage.TaskStorage
scheduler core.Scheduler
config *config.Config
quit chan struct{}
ready chan bool
stop sync.Once
httpPort int
httpServer HTTPServer
taskStorage storage.TaskStorage
scheduler core.Scheduler
config *config.Config
quit chan struct{}
ready chan bool
stop sync.Once
stringToTaskType StringToTaskTypeFn
convertParameterToHeader ConvertParameterToHeaderFn
convertHeaderToParameter ConvertHeaderToParameterFn
}

// HTTPServer is an interface that abstracts the http.Server methods needed by our server.
Expand All @@ -43,13 +46,16 @@ var _ HTTPServer = (*http.Server)(nil)
// NewServer should be the last service to be run so K8s know application is fully up
func NewServer(ctx context.Context, httpServer HTTPServer, taskStorage storage.TaskStorage, scheduler core.Scheduler, config *config.Config) func() {
s := &server{
httpServer: httpServer,
taskStorage: taskStorage,
config: config,
scheduler: scheduler,
quit: make(chan struct{}),
ready: make(chan bool, 1),
httpPort: config.HttpServer.Port,
httpServer: httpServer,
taskStorage: taskStorage,
config: config,
scheduler: scheduler,
quit: make(chan struct{}),
ready: make(chan bool, 1),
httpPort: config.HttpServer.Port,
stringToTaskType: stringToTaskType,
convertParameterToHeader: convertParameterToHeader,
convertHeaderToParameter: convertHeaderToParameter,
}
go s.runServer(ctx)

Expand Down Expand Up @@ -128,7 +134,7 @@ type Task struct {
TaskUuid string `json:"taskUuid"`
ExecutionTimestamp float64 `json:"executionTimestamp"`
TaskType string `json:"taskType"`
Header map[string]string `json:"header"`
Parameter map[string]string `json:"parameter,omitempty"`
Pyload string `json:"pyload"`
}

Expand All @@ -155,11 +161,10 @@ func (s *server) GetAllTasksHandler(c *gin.Context) {
var tasks []Task
for _, task := range s.taskStorage.GetAllTasksPagination(c.Request.Context(), int32(offset), int32(limit)) {
tasks = append(tasks, Task{
TaskUuid: task.TaskUuid,
ExecutionTimestamp: task.ExecutionTimestamp,
TaskType: task.TaskType.String(),
Header: ConvertProtoHeaderToHeader(task.Header),
Pyload: base64.StdEncoding.EncodeToString(task.Pyload),
TaskUuid: task.TaskUuid,
TaskType: task.TaskType.String(),
Parameter: s.convertHeaderToParameter(task.Header),
Pyload: base64.StdEncoding.EncodeToString(task.Pyload),
})
}
if len(tasks) > 0 {
Expand All @@ -178,7 +183,7 @@ func (s *server) SetNewTaskHandler(c *gin.Context) {
return
}

taskType, err := StringToTaskType(task.TaskType)
taskType, err := s.stringToTaskType(task.TaskType)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
Expand All @@ -192,11 +197,10 @@ func (s *server) SetNewTaskHandler(c *gin.Context) {
}

pbTask := &_pb.Task{
TaskUuid: task.TaskUuid,
ExecutionTimestamp: task.ExecutionTimestamp,
TaskType: taskType,
Header: ConvertHeaderToProtoHeader(task.Header),
Pyload: byteArray,
TaskUuid: task.TaskUuid,
TaskType: taskType,
Header: s.convertParameterToHeader(task.Parameter),
Pyload: byteArray,
}

// enable to use kafka or direct write to redis
Expand All @@ -212,17 +216,23 @@ func (s *server) SetNewTaskHandler(c *gin.Context) {
})
}

// StringToTaskType converts a taskStr to a Task_Type enum.
func StringToTaskType(taskStr string) (_pb.Task_Type, error) {
// StringToTaskTypeFn use as injection
type StringToTaskTypeFn func(taskStr string) (_pb.Task_Type, error)

// stringToTaskType converts a taskStr to a Task_Type enum.
func stringToTaskType(taskStr string) (_pb.Task_Type, error) {
// Look up the enum value by name
if enumVal, ok := _pb.Task_Type_value[taskStr]; ok {
return _pb.Task_Type(enumVal), nil
}
return _pb.Task_PUB_SUB, fmt.Errorf("invalid enum value: %s", taskStr)
}

// ConvertHeaderToProtoHeader return map[string][]byte
func ConvertHeaderToProtoHeader(header map[string]string) map[string][]byte {
// ConvertParameterToHeaderFn use as injection
type ConvertParameterToHeaderFn func(header map[string]string) map[string][]byte

// convertParameterToHeader return map[string][]byte
func convertParameterToHeader(header map[string]string) map[string][]byte {
var pbHeader = make(map[string][]byte)
for k, v := range header {
byteArray, err := base64.StdEncoding.DecodeString(v)
Expand All @@ -235,8 +245,11 @@ func ConvertHeaderToProtoHeader(header map[string]string) map[string][]byte {
return pbHeader
}

// ConvertProtoHeaderToHeader return map[string]string
func ConvertProtoHeaderToHeader(header map[string][]byte) map[string]string {
// ConvertHeaderToParameterFn use as injection
type ConvertHeaderToParameterFn func(header map[string][]byte) map[string]string

// ConvertHeaderToParameter return map[string]string
func convertHeaderToParameter(header map[string][]byte) map[string]string {
var pbHeader = make(map[string]string)
for k, v := range header {
// Encode the serialized protobuf message to a Base64 encoded string
Expand Down
Loading

0 comments on commit 502d8ee

Please sign in to comment.