Skip to content

Commit

Permalink
Add consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
cv65kr committed Feb 9, 2024
1 parent 4180ef7 commit 81394dd
Show file tree
Hide file tree
Showing 11 changed files with 891 additions and 85 deletions.
12 changes: 7 additions & 5 deletions pubsubjobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ import "os"

// pipeline rabbitmq info
const (
exchangeKey string = "exchange"
pref string = "prefetch"
skipTopicDeclaration string = "skip_topic_declaration"
topic string = "topic"
)

// config is used to parse pipeline configuration
type config struct {
// global
ProjectID string `mapstructure:"project_id"`
Topic string `mapstructure:"topic"`
SkipTopicDeclaration bool `mapstructure:"skip_topic_declaration"`
ProjectID string `mapstructure:"project_id"`
Topic string `mapstructure:"topic"`
SkipTopicDeclaration bool `mapstructure:"skip_topic_declaration"`

// local
Prefetch int `mapstructure:"prefetch"`
Prefetch int32 `mapstructure:"prefetch"`
Priority int64 `mapstructure:"priority"`
Host string `mapstructure:"host"`
}
Expand Down
85 changes: 60 additions & 25 deletions pubsubjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package pubsubjobs

import (
"context"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"cloud.google.com/go/pubsub"
"github.com/goccy/go-json"
"github.com/roadrunner-server/api/v4/plugins/v3/jobs"
"github.com/roadrunner-server/errors"
jprop "go.opentelemetry.io/contrib/propagators/jaeger"
Expand Down Expand Up @@ -36,14 +38,16 @@ type Driver struct {
mu sync.Mutex
cond sync.Cond

log *zap.Logger
pq jobs.Queue
pipeline atomic.Pointer[jobs.Pipeline]
tracer *sdktrace.TracerProvider
prop propagation.TextMapPropagator
consumeAll bool
skipDeclare bool
topic string
log *zap.Logger
pq jobs.Queue
pipeline atomic.Pointer[jobs.Pipeline]
tracer *sdktrace.TracerProvider
prop propagation.TextMapPropagator
consumeAll bool
skipDeclare bool
topic string
msgInFlight *int64
msgInFlightLimit *int32

// if user invoke several resume operations
listeners uint32
Expand Down Expand Up @@ -93,14 +97,16 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip
// PARSE CONFIGURATION END -------

jb := &Driver{
tracer: tracer,
prop: prop,
log: log,
skipDeclare: conf.SkipTopicDeclaration,
topic: conf.Topic,
pq: pq,
pauseCh: make(chan struct{}, 1),
cond: sync.Cond{L: &sync.Mutex{}},
tracer: tracer,
prop: prop,
log: log,
skipDeclare: conf.SkipTopicDeclaration,
topic: conf.Topic,
pq: pq,
pauseCh: make(chan struct{}, 1),
cond: sync.Cond{L: &sync.Mutex{}},
msgInFlightLimit: ptr(conf.Prefetch),
msgInFlight: ptr(int64(0)),
}

ctx := context.Background()
Expand Down Expand Up @@ -145,14 +151,22 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
// PARSE CONFIGURATION -------

jb := &Driver{
prop: prop,
tracer: tracer,
log: log,
pq: pq,
pauseCh: make(chan struct{}, 1),
skipDeclare: conf.SkipTopicDeclaration,
topic: conf.Topic,
cond: sync.Cond{L: &sync.Mutex{}},
prop: prop,
tracer: tracer,
log: log,
pq: pq,
pauseCh: make(chan struct{}, 1),
skipDeclare: pipe.Bool(skipTopicDeclaration, false),
topic: pipe.String(topic, "default"),
cond: sync.Cond{L: &sync.Mutex{}},
msgInFlightLimit: ptr(int32(pipe.Int(pref, 10))),
msgInFlight: ptr(int64(0)),
}

ctx := context.Background()
jb.client, err = pubsub.NewClient(ctx, conf.ProjectID)
if err != nil {
return nil, err
}

err = jb.manageTopic(context.Background())
Expand All @@ -174,7 +188,24 @@ func (d *Driver) Push(ctx context.Context, jb jobs.Message) error {
ctx, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "google_pub_sub_push")
defer span.End()

result := d.client.Topic(d.topic).Publish(ctx, &pubsub.Message{Data: jb.Payload()})
job := fromJob(jb)

data, err := json.Marshal(job.headers)
if err != nil {
return err
}

result := d.client.Topic(d.topic).Publish(ctx, &pubsub.Message{
Data: jb.Payload(),
Attributes: map[string]string{
jobs.RRID: job.Ident,
jobs.RRJob: job.Job,
jobs.RRDelay: strconv.Itoa(int(job.Options.Delay)),
jobs.RRHeaders: string(data),
jobs.RRPriority: strconv.Itoa(int(job.Options.Priority)),
jobs.RRAutoAck: btos(job.Options.AutoAck),
},
})
id, err := result.Get(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -325,3 +356,7 @@ func (d *Driver) manageTopic(ctx context.Context) error {

return nil
}

func ptr[T any](val T) *T {
return &val
}
146 changes: 130 additions & 16 deletions pubsubjobs/item.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package pubsubjobs

import (
"strconv"
"sync"
"sync/atomic"
"time"
"unsafe"

"cloud.google.com/go/pubsub"
"github.com/goccy/go-json"
"go.uber.org/zap"

"github.com/roadrunner-server/api/v4/plugins/v1/jobs"
)

var _ jobs.Acknowledger = (*Item)(nil)

const (
auto string = "deduced_by_rr"
"github.com/roadrunner-server/api/v4/plugins/v3/jobs"
"github.com/roadrunner-server/errors"
)

type Item struct {
Expand All @@ -23,7 +23,7 @@ type Item struct {
// Payload is string data (usually JSON) passed to Job broker.
Payload string `json:"payload"`
// Headers with key-values pairs
Headers map[string][]string `json:"headers"`
headers map[string][]string `json:"headers"`
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
}
Expand All @@ -41,6 +41,11 @@ type Options struct {
AutoAck bool `json:"auto_ack"`
// AMQP Queue
Queue string `json:"queue,omitempty"`
// Private ================
cond *sync.Cond
message *pubsub.Message
msgInFlight *int64
stopped *uint64
}

// DelayDuration returns delay duration in a form of time.Duration.
Expand All @@ -56,13 +61,17 @@ func (i *Item) Priority() int64 {
return i.Options.Priority
}

func (i *Item) GroupID() string {
return i.Options.Pipeline
}

// Body packs job payload into binary payload.
func (i *Item) Body() []byte {
return strToBytes(i.Payload)
}

func (i *Item) Metadata() map[string][]string {
return i.Headers
func (i *Item) Headers() map[string][]string {
return i.headers
}

// Context packs job context (job, id) into binary payload.
Expand All @@ -80,7 +89,7 @@ func (i *Item) Context() ([]byte, error) {
ID: i.Ident,
Job: i.Job,
Driver: pluginName,
Headers: i.Headers,
Headers: i.headers,
Queue: i.Options.Queue,
Pipeline: i.Options.Pipeline,
},
Expand All @@ -94,10 +103,37 @@ func (i *Item) Context() ([]byte, error) {
}

func (i *Item) Ack() error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
}
defer func() {
i.Options.cond.Signal()
atomic.AddInt64(i.Options.msgInFlight, ^int64(0))
}()
// just return in case of auto-ack
if i.Options.AutoAck {
return nil
}

i.Options.message.Ack()
return nil
}

func (i *Item) Nack() error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
}
defer func() {
i.Options.cond.Signal()
atomic.AddInt64(i.Options.msgInFlight, ^int64(0))
}()
// message already deleted
if i.Options.AutoAck {
return nil
}

i.Options.message.Nack()

return nil
}

Expand All @@ -110,21 +146,20 @@ func (i *Item) Respond(_ []byte, _ string) error {
return nil
}

func fromJob(job jobs.Job) *Item {
func fromJob(job jobs.Message) *Item {
return &Item{
Job: job.Name(),
Ident: job.ID(),
Payload: job.Payload(),
Headers: job.Headers(),
Payload: string(job.Payload()),
headers: job.Headers(),
Options: &Options{
Priority: job.Priority(),
Pipeline: job.Pipeline(),
Pipeline: job.GroupID(),
Delay: job.Delay(),
AutoAck: job.AutoAck(),
},
}
}

func bytesToStr(data []byte) string {
if len(data) == 0 {
return ""
Expand All @@ -140,3 +175,82 @@ func strToBytes(data string) []byte {

return unsafe.Slice(unsafe.StringData(data), len(data))
}

func (c *Driver) unpack(message *pubsub.Message) *Item {
attributes := message.Attributes

var rrid string
if val, ok := attributes[jobs.RRID]; ok {
rrid = val
}

var rrj string
if val, ok := attributes[jobs.RRJob]; ok {
rrj = val
}

h := make(map[string][]string)
if val, ok := attributes[jobs.RRHeaders]; ok {
err := json.Unmarshal([]byte(val), &h)
if err != nil {
c.log.Debug("failed to unpack the headers, not a JSON", zap.Error(err))
}
}

var autoAck bool
if val, ok := attributes[jobs.RRAutoAck]; ok {
autoAck = stob(val)
}

var dl int
var err error
if val, ok := attributes[jobs.RRDelay]; ok {
dl, err = strconv.Atoi(val)
if err != nil {
c.log.Debug("failed to unpack the delay, not a number", zap.Error(err))
}
}

var priority int
if val, ok := attributes[jobs.RRPriority]; ok {
priority, err = strconv.Atoi(val)
if err != nil {
priority = int((*c.pipeline.Load()).Priority())
c.log.Debug("failed to unpack the priority; inheriting the pipeline's default priority", zap.Error(err))
}
}

return &Item{
Job: rrj,
Ident: rrid,
Payload: string(message.Data),
headers: h,
Options: &Options{
AutoAck: autoAck,
Delay: int64(dl),
Priority: int64(priority),
Pipeline: (*c.pipeline.Load()).Name(),
// private
message: message,
msgInFlight: c.msgInFlight,
cond: &c.cond,
stopped: &c.stopped,
},
}
}

func btos(b bool) string {
if b {
return "true"
}

return "false"
}

func stob(s string) bool {
if s != "" {
return s == "true"
}

return false
}
Loading

0 comments on commit 81394dd

Please sign in to comment.