Skip to content

Commit

Permalink
[#386]: chore: pass RR version from the config plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Jul 6, 2023
2 parents e80e47c + 0a52c59 commit 432c8ad
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 34 deletions.
15 changes: 13 additions & 2 deletions aggregatedpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ func (wp *Workflow) flushQueue() error {
defer wp.mh.Gauge(RrWorkflowsMetricName).Update(float64(wp.pool.QueueSize()))
}

// todo(rustatian) to sync.Pool
pld := &payload.Payload{}
pld := wp.getPld()
defer wp.putPld(pld)
err := wp.codec.Encode(wp.getContext(), pld, wp.mq.Messages()...)
if err != nil {
return err
Expand Down Expand Up @@ -415,3 +415,14 @@ func (wp *Workflow) runCommand(cmd any, payloads *commonpb.Payloads, header *com

return msgs[0], nil
}

func (wp *Workflow) getPld() *payload.Payload {
return wp.pldPool.Get().(*payload.Payload)
}

func (wp *Workflow) putPld(pld *payload.Payload) {
pld.Codec = 0
pld.Context = nil
pld.Body = nil
wp.pldPool.Put(pld)
}
16 changes: 16 additions & 0 deletions aggregatedpool/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package aggregatedpool

import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/roadrunner-server/sdk/v4/payload"
"github.com/temporalio/roadrunner-temporal/v4/canceller"
"github.com/temporalio/roadrunner-temporal/v4/common"
"github.com/temporalio/roadrunner-temporal/v4/internal"
Expand Down Expand Up @@ -54,13 +56,22 @@ type Workflow struct {

log *zap.Logger
mh temporalClient.MetricsHandler

// objects pool
pldPool *sync.Pool
}

// RoadRunner function
func NewWorkflowDefinition(codec common.Codec, pool common.Pool, log *zap.Logger) *Workflow {
return &Workflow{
log: log,
codec: codec,
pool: pool,
pldPool: &sync.Pool{
New: func() any {
return new(payload.Payload)
},
},
}
}

Expand All @@ -71,6 +82,11 @@ func (wp *Workflow) NewWorkflowDefinition() bindings.WorkflowDefinition {
pool: wp.pool,
codec: wp.codec,
log: wp.log,
pldPool: &sync.Pool{
New: func() any {
return new(payload.Payload)
},
},
}
}

Expand Down
3 changes: 0 additions & 3 deletions common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,10 @@ type Informer interface {
type Configurer interface {
// UnmarshalKey takes a single key and unmarshal it into a Struct.
UnmarshalKey(name string, out any) error

// Has checks if config section exists.
Has(name string) bool

// GracefulTimeout represents timeout for all servers registered in the endure
GracefulTimeout() time.Duration

// RRVersion returns running RR version
RRVersion() string
}
Expand Down
1 change: 0 additions & 1 deletion docs/temporal.drawio

This file was deleted.

6 changes: 6 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ const (
namespace = "rr_temporal"
)

func (p *Plugin) MetricsCollector() []prom.Collector {
// p - implements Exporter interface (workers)
// other - request duration and count
return []prom.Collector{p.statsExporter}
}

// Informer used to get workers from particular plugin or set of plugins
type Informer interface {
Workers() []*process.State
Expand Down
46 changes: 22 additions & 24 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sync"
"time"

prom "github.com/prometheus/client_golang/prometheus"
"github.com/roadrunner-server/endure/v2/dep"
"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sdk/v4/events"
Expand All @@ -37,23 +36,20 @@ import (

const (
// PluginName defines public service name.
PluginName string = "temporal"
pluginName string = "temporal"
metricsKey string = "temporal.metrics"

// RrMode env variable key
RrMode string = "RR_MODE"

// RrCodec env variable key
RrCodec string = "RR_CODEC"

// RrCodecVal - codec name, should be in sync with the PHP-SDK
RrCodecVal string = "protobuf"

// temporal, sync with https://github.com/temporalio/sdk-go/blob/master/internal/internal_utils.go#L44
clientNameHeaderName = "client-name"
clientNameHeaderValue = "roadrunner-temporal"
clientVersionHeaderName = "client-version"
clientVersionHeaderValue = "2.0.0"
clientNameHeaderName = "client-name"
clientNameHeaderValue = "roadrunner-temporal"
clientVersionHeaderName = "client-version"
)

type Logger interface {
Expand Down Expand Up @@ -97,11 +93,11 @@ type Plugin struct {
func (p *Plugin) Init(cfg common.Configurer, log Logger, server common.Server) error {
const op = errors.Op("temporal_plugin_init")

if !cfg.Has(PluginName) {
if !cfg.Has(pluginName) {
return errors.E(op, errors.Disabled)
}

err := cfg.UnmarshalKey(PluginName, &p.config)
err := cfg.UnmarshalKey(pluginName, &p.config)
if err != nil {
return errors.E(op, err)
}
Expand Down Expand Up @@ -137,7 +133,7 @@ func (p *Plugin) Init(cfg common.Configurer, log Logger, server common.Server) e

// CONFIG INIT END -----

p.log = log.NamedLogger(PluginName)
p.log = log.NamedLogger(pluginName)

p.server = server
p.rrVersion = cfg.RRVersion()
Expand Down Expand Up @@ -252,7 +248,7 @@ func (p *Plugin) Serve() chan error {
ConnectionOptions: temporalClient.ConnectionOptions{
TLS: p.tlsCfg,
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(rewriteNameAndVersion),
grpc.WithUnaryInterceptor(p.rewriteNameAndVersion),
},
},
}
Expand Down Expand Up @@ -344,9 +340,10 @@ func (p *Plugin) Stop(context.Context) error {

func (p *Plugin) Workers() []*process.State {
p.mu.RLock()
defer p.mu.RUnlock()

wfPw := p.wfP.Workers()
actPw := p.actP.Workers()
p.mu.RUnlock()

states := make([]*process.State, 0, len(wfPw)+len(actPw))

Expand Down Expand Up @@ -397,6 +394,7 @@ func (p *Plugin) ResetAP() error {

func (p *Plugin) Reset() error {
const op = errors.Op("temporal_reset")

p.mu.Lock()
defer p.mu.Unlock()

Expand Down Expand Up @@ -453,27 +451,27 @@ func (p *Plugin) Reset() error {
}

func (p *Plugin) Name() string {
return PluginName
return pluginName
}

func (p *Plugin) RPC() any {
return &rpc{srv: p, client: p.client}
}

func (p *Plugin) MetricsCollector() []prom.Collector {
// p - implements Exporter interface (workers)
// other - request duration and count
return []prom.Collector{p.statsExporter}
}

func rewriteNameAndVersion(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
func (p *Plugin) rewriteNameAndVersion(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption) error {
md, _, _ := metadata.FromOutgoingContextRaw(ctx)
if md == nil {
return invoker(ctx, method, req, reply, cc, opts...)
}

md.Set(clientNameHeaderName, clientNameHeaderValue)
md.Set(clientVersionHeaderName, clientVersionHeaderValue)
md.Set(clientVersionHeaderName, p.rrVersion)

ctx = metadata.NewOutgoingContext(ctx, md)

Expand All @@ -482,7 +480,7 @@ func rewriteNameAndVersion(ctx context.Context, method string, req, reply interf

func (p *Plugin) initPool() error {
var err error
ap, err := p.server.NewPool(context.Background(), p.config.Activities, map[string]string{RrMode: PluginName, RrCodec: RrCodecVal}, p.log)
ap, err := p.server.NewPool(context.Background(), p.config.Activities, map[string]string{RrMode: pluginName, RrCodec: RrCodecVal}, p.log)
if err != nil {
return err
}
Expand All @@ -500,7 +498,7 @@ func (p *Plugin) initPool() error {
// no supervisor for the workflow worker
Supervisor: nil,
},
map[string]string{RrMode: PluginName, RrCodec: RrCodecVal},
map[string]string{RrMode: pluginName, RrCodec: RrCodecVal},
nil,
)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,28 @@ func (mq *MessageQueue) AllocateMessage(cmd any, payloads *common.Payloads, head

func (mq *MessageQueue) PushCommand(cmd any, payloads *common.Payloads, header *common.Header) {
mq.mu.Lock()
defer mq.mu.Unlock()
mq.queue = append(mq.queue, &internal.Message{
ID: mq.SeqID(),
Command: cmd,
Payloads: payloads,
Header: header,
})
mq.mu.Unlock()
}

func (mq *MessageQueue) PushResponse(id uint64, payloads *common.Payloads) {
mq.mu.Lock()
defer mq.mu.Unlock()
mq.queue = append(mq.queue, &internal.Message{
ID: id,
Payloads: payloads,
})
mq.mu.Unlock()
}

func (mq *MessageQueue) PushError(id uint64, failure *failure.Failure) {
mq.mu.Lock()
defer mq.mu.Unlock()
mq.queue = append(mq.queue, &internal.Message{ID: id, Failure: failure})
mq.mu.Unlock()
}

func (mq *MessageQueue) Messages() []*internal.Message {
Expand Down

0 comments on commit 432c8ad

Please sign in to comment.