Skip to content

Commit

Permalink
[#15]: feat(streams): worker and pool stream api
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Mar 17, 2022
2 parents a4f184f + 6d696eb commit c1462e9
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- name: Set up Go 1.17
uses: actions/setup-go@v2 # action page: <https://github.com/actions/setup-go>
with:
go-version: 1.17.7
go-version: 1.17.8

- name: Run linter
uses: golangci/[email protected] # Action page: <https://github.com/golangci/golangci-lint-action>
Expand Down
7 changes: 1 addition & 6 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,14 @@ on:
push:
branches:
- master
- beta
- stable
tags-ignore:
- "**"
paths-ignore:
- "**.md"
- "**.yaml"
- "**.yml"
pull_request:
paths-ignore:
- "**.md"
- "**.yaml"
- "**.yml"

jobs:
golang:
Expand All @@ -26,7 +21,7 @@ jobs:
strategy:
fail-fast: true
matrix:
go: ["1.17.7"]
go: ["1.18"]
os: ["ubuntu-latest"]
steps:
- name: Set up Go ${{ matrix.go }}
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
module github.com/roadrunner-server/api/v2

go 1.17
go 1.18

require (
github.com/goccy/go-json v0.9.5
github.com/prometheus/client_golang v1.12.1
github.com/roadrunner-server/goridge/v3 v3.3.1
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
github.com/valyala/fasthttp v1.34.0
go.uber.org/zap v1.21.0
google.golang.org/protobuf v1.27.1
Expand All @@ -19,7 +19,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/klauspost/compress v1.15.0 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/kr/pretty v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -28,7 +28,7 @@ require (
github.com/prometheus/procfs v0.7.3 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/sys v0.0.0-20220315194320-039c03cc5b86 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
13 changes: 8 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/goccy/go-json v0.9.4/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/goccy/go-json v0.9.5 h1:ooSMW526ZjK+EaL5elrSyN2EzIfi/3V0m4+HJEDYLik=
github.com/goccy/go-json v0.9.5/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
Expand Down Expand Up @@ -145,6 +144,8 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.15.0 h1:xqfchp4whNFxn5A4XFyyYtitiWI8Hy5EW59jEwcyL6U=
github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.1 h1:y9FcTHGyrebwfP0ZZqFiaxTaiDnUrGkJkI+f583BL1A=
github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
Expand Down Expand Up @@ -191,7 +192,6 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/roadrunner-server/errors v1.1.1/go.mod h1:MzHjhRZIZc1ooMyYllUhNs0aTqRUbwcgUSO0TN7kCII=
github.com/roadrunner-server/goridge/v3 v3.3.1 h1:IYdm+smDfKl09AfFgKJeSNpSTp7KTgO3XfGPKrxs0vQ=
github.com/roadrunner-server/goridge/v3 v3.3.1/go.mod h1:f7SPSt9HUw5kbCc6Ofk4eEUU1xh2qHf/NznrTaW+aLA=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand All @@ -203,16 +203,15 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.34.0 h1:d3AAQJ2DRcxJYHm7OXNXtXt2as1vMDfxeIcFvhmGGm4=
github.com/valyala/fasthttp v1.34.0/go.mod h1:epZA5N+7pY6ZaEKRmstzOuYJx9HI8DI1oaCGZpdH4h0=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand All @@ -230,6 +229,8 @@ go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8=
go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down Expand Up @@ -360,6 +361,8 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9 h1:nhht2DYV/Sn3qOayu8lM+cU1ii9sTLUeBQwQQfUHtrs=
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220315194320-039c03cc5b86 h1:A9i04dxx7Cribqbs8jf3FQLogkL/CV2YN7hj9KWJCkc=
golang.org/x/sys v0.0.0-20220315194320-039c03cc5b86/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
10 changes: 7 additions & 3 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ type Pool interface {
Destroy(ctx context.Context)
}

type Queuer interface {
// QueueSize can be implemented on the pool to provide the requests queue information
QueueSize() uint64
// Streamer managed set of inner worker processes.
type Streamer interface {
// ExecStream executes task with payload
ExecStream(rqs *payload.Payload, resp chan *payload.Payload) error

// ExecStreamWithTTL executes task with context which is used with timeout
ExecStreamWithTTL(ctx context.Context, rqs *payload.Payload, resp chan *payload.Payload) error
}
6 changes: 6 additions & 0 deletions pool/queuer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package pool

type Queuer interface {
// QueueSize can be implemented on the pool to provide the requests queue information
QueueSize() uint64
}
33 changes: 23 additions & 10 deletions worker/worker.go → worker/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,29 @@ import (
)

// Allocator is responsible for worker allocation in the pool
type Allocator func() (SyncWorker, error)
type Allocator func() (BaseProcess, error)

type Streamer interface {
// BaseProcess provides basic functionality for the SyncWorker
BaseProcess
// ExecStream used to execute payload on the SyncWorker, there is no TIMEOUTS
ExecStream(rqs *payload.Payload, resp chan *payload.Payload) error
// ExecStreamWithTTL used to handle Exec with TTL
ExecStreamWithTTL(ctx context.Context, p *payload.Payload, resp chan *payload.Payload) error
}

// Worker is a non-bc replacement for the SDK
type Worker = SyncWorker

// SyncWorker is not a good name, since it's just a sync executor, but not all worker is sync
type SyncWorker interface {
// BaseProcess provides basic functionality for the SyncWorker
BaseProcess
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
Exec(rqs *payload.Payload) (*payload.Payload, error)
// ExecWithTTL used to handle Exec with TTL
ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error)
}

// State represents WorkerProcess status and updated time.
type State interface {
Expand Down Expand Up @@ -66,12 +88,3 @@ type BaseProcess interface {
// AttachRelay used to attach goridge relay to the worker process
AttachRelay(rl relay.Relay)
}

type SyncWorker interface {
// BaseProcess provides basic functionality for the SyncWorker
BaseProcess
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
Exec(rqs *payload.Payload) (*payload.Payload, error)
// ExecWithTTL used to handle Exec with TTL
ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error)
}

0 comments on commit c1462e9

Please sign in to comment.