Skip to content

Commit

Permalink
feat: instant job event notifications
Browse files Browse the repository at this point in the history
This commit introduces a more efficient mechanism for notifications
related to job events, such as creation, updating, and deletion. Unlike
the previous architecture, where clients had to constantly poll the
server for updates, the new implementation uses server-sent events (SSE)
to allow clients to receive updates from the server, improving
efficiency and reducing network overhead.

Key Features:

- Introduced a new SSE-enabled endpoint, `/jobs/events`, which accepts
  filter parameters to customize the stream of events received.
- Clients can subscribe to `/jobs/events` to receive updates on job
  events that meet specified filter criteria.
- Provided a client reference implementation through `wfxctl`.

Signed-off-by: Michael Adler <[email protected]>
  • Loading branch information
michaeladler authored and stormc committed Dec 13, 2023
1 parent c34417e commit 976875f
Show file tree
Hide file tree
Showing 82 changed files with 4,045 additions and 375 deletions.
3 changes: 2 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,15 @@ linters:
- gocritic
- gofumpt
- importas
- misspell
- nilnil
- prealloc
- reassign
- revive
- stylecheck
- tparallel
- usestdlibvars
- wrapcheck
- misspell

linters-settings:
staticcheck:
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Add optional `description` field to workflows
- Job event notifications via server-sent events (see #11)

### Fixed

Expand All @@ -21,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Refactored `wfxctl workflow delete` command to accept workflows as arguments instead of positional parameters
- Prefer cgroup CPU quota over host CPU count
- Empty or `null` arrays are omitted from JSON responses

### Removed

Expand Down
135 changes: 135 additions & 0 deletions api/job_events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package api

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"context"
"encoding/json"
"io"
"net/http"
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/siemens/wfx/generated/model"
"github.com/siemens/wfx/internal/handler/job"
"github.com/siemens/wfx/internal/handler/job/events"
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/workflow/dau"
"github.com/steinfletcher/apitest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestJobEventsSubscribe(t *testing.T) {
log.Logger = zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.Stamp})

db := newInMemoryDB(t)
wf := dau.DirectWorkflow()
_, err := workflow.CreateWorkflow(context.Background(), db, wf)
require.NoError(t, err)

north, south := createNorthAndSouth(t, db)

handlers := []http.Handler{north, south}
for i, name := range allAPIs {
handler := handlers[i]
t.Run(name, func(t *testing.T) {
clientID := "TestJobEventsSubscribe"

var jobID atomic.Pointer[string]

var wg sync.WaitGroup
expectedTags := []string{"tag1", "tag2"}
ch, _ := events.AddSubscriber(context.Background(), events.FilterParams{ClientIDs: []string{clientID}}, expectedTags)
wg.Add(1)
go func() {
defer wg.Done()

// wait for job created event
ev := <-ch
payload := ev.Args[0].(*events.JobEvent)
assert.Equal(t, events.ActionCreate, payload.Action)
assert.Equal(t, expectedTags, payload.Tags)
jobID.Store(&payload.Job.ID)

// wait for event created by our status.Update below
<-ch
// now our GET request should have received the response as well,
// add some extra time to be safe
time.Sleep(100 * time.Millisecond)
events.ShutdownSubscribers()
}()

_, err := job.CreateJob(context.Background(), db, &model.JobRequest{ClientID: clientID, Workflow: wf.Name})
require.NoError(t, err)

wg.Add(1)
go func() {
defer wg.Done()
// wait for subscriber which is created by our GET request below and our test goroutine above
for events.SubscriberCount() != 2 {
time.Sleep(20 * time.Millisecond)
}
// update job
_, err = status.Update(context.Background(), db, *jobID.Load(), &model.JobStatus{State: "INSTALLING"}, model.EligibleEnumCLIENT)
require.NoError(t, err)
}()

// wait for job id
for jobID.Load() == nil {
time.Sleep(20 * time.Millisecond)
}

result := apitest.New().
Handler(handler).
Get("/api/wfx/v1/jobs/events").Query("ids", *jobID.Load()).
Expect(t).
Status(http.StatusOK).
Header("Content-Type", "text/event-stream").
End()

data, _ := io.ReadAll(result.Response.Body)
body := string(data)
require.NotEmpty(t, body)

lines := strings.Split(body, "\n")

t.Log("HTTP resonse body:")
for _, line := range lines {
t.Logf(">> %s", line)
}

assert.Len(t, lines, 4)

// check body starts with data:
assert.True(t, strings.HasPrefix(lines[0], "data: "))

// check content is a job and state is INSTALLING
var ev events.JobEvent
err = json.Unmarshal([]byte(strings.TrimPrefix(lines[0], "data: ")), &ev)
require.NoError(t, err)
assert.Equal(t, events.ActionUpdateStatus, ev.Action)
assert.Equal(t, "INSTALLING", ev.Job.Status.State)
assert.Equal(t, wf.Name, ev.Job.Workflow.Name)
assert.Equal(t, clientID, ev.Job.ClientID)
assert.Equal(t, "id: 1", lines[1])

wg.Wait()
events.ShutdownSubscribers()
})
}
}
8 changes: 4 additions & 4 deletions api/job_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestJobStatusUpdate(t *testing.T) {
apitest.New().
Handler(south).
Put(statusPath).
Body(`{"clientId": "klaus", "state":"DOWNLOAD"}`).
Body(`{"clientId": "foo", "state":"DOWNLOAD"}`).
ContentType("application/json").
Expect(t).
Status(http.StatusBadRequest).
Expand All @@ -90,7 +90,7 @@ func TestJobStatusUpdate(t *testing.T) {
apitest.New().
Handler(north).
Put(statusPath).
Body(`{"clientId": "klaus", "state":"DOWNLOAD"}`).
Body(`{"clientId": "foo", "state":"DOWNLOAD"}`).
ContentType("application/json").
Expect(t).
Status(http.StatusOK).
Expand All @@ -101,7 +101,7 @@ func TestJobStatusUpdate(t *testing.T) {
apitest.New().
Handler(north).
Put(statusPath).
Body(`{"clientId": "klaus", "state":"DOWNLOADING"}`).
Body(`{"clientId": "foo", "state":"DOWNLOADING"}`).
ContentType("application/json").
Expect(t).
Status(http.StatusBadRequest).
Expand All @@ -111,7 +111,7 @@ func TestJobStatusUpdate(t *testing.T) {
apitest.New().
Handler(south).
Put(statusPath).
Body(`{"clientId":"klaus","state":"DOWNLOADING"}`).
Body(`{"clientId":"foo","state":"DOWNLOADING"}`).
ContentType("application/json").
Expect(t).
Status(http.StatusOK).
Expand Down
34 changes: 34 additions & 0 deletions api/northbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package api
import (
"fmt"
"net/http"
"strings"

"github.com/Southclaws/fault/ftag"
"github.com/go-openapi/loads"
Expand All @@ -21,10 +22,12 @@ import (
"github.com/siemens/wfx/generated/northbound/restapi/operations/northbound"
"github.com/siemens/wfx/internal/handler/job"
"github.com/siemens/wfx/internal/handler/job/definition"
"github.com/siemens/wfx/internal/handler/job/events"
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/job/tags"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/middleware/logging"
"github.com/siemens/wfx/middleware/responder/sse"
"github.com/siemens/wfx/persistence"
)

Expand Down Expand Up @@ -266,5 +269,36 @@ func NewNorthboundAPI(storage persistence.Storage) *operations.WorkflowExecutorA
return northbound.NewDeleteJobsIDTagsOK().WithPayload(tags)
})

serverAPI.NorthboundGetJobsEventsHandler = northbound.GetJobsEventsHandlerFunc(
func(params northbound.GetJobsEventsParams) middleware.Responder {
ctx := params.HTTPRequest.Context()
filter := parseFilterParamsNorth(params)
var tags []string
if s := params.Tags; s != nil {
tags = strings.Split(*s, ",")
}
eventChan, err := events.AddSubscriber(ctx, filter, tags)
if err != nil {
return northbound.NewGetJobsEventsDefault(http.StatusInternalServerError)
}
return sse.Responder(ctx, eventChan)
})

return serverAPI
}

func parseFilterParamsNorth(params northbound.GetJobsEventsParams) events.FilterParams {
// same code as parseFilterParamsSouth but params is from a different package;
// this isn't pretty (DRY) but we have a conceptually clear distinction
var filter events.FilterParams
if ids := params.JobIds; ids != nil {
filter.JobIDs = strings.Split(*ids, ",")
}
if ids := params.ClientIds; ids != nil {
filter.ClientIDs = strings.Split(*ids, ",")
}
if wfs := params.Workflows; wfs != nil {
filter.Workflows = strings.Split(*wfs, ",")
}
return filter
}
Loading

0 comments on commit 976875f

Please sign in to comment.