Skip to content

Commit

Permalink
[#136]: fix: always initialize tracer
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Dec 6, 2024
2 parents 9bfa49b + 13d7705 commit 82d1bf2
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 12 deletions.
5 changes: 5 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0=
golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
Expand All @@ -309,6 +310,7 @@ golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs=
golang.org/x/oauth2 v0.8.0 h1:6dkIjl3j3LtZ/O3sTgZTMsLKSftL/B8Zgq4huOIIUu8=
golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE=
golang.org/x/oauth2 v0.12.0 h1:smVPGxink+n1ZI5pkQa8y6fZT0RW0MgCO5bFpepy4B4=
Expand All @@ -318,6 +320,7 @@ golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs=
golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/oauth2 v0.22.0 h1:BzDx2FehcG7jJwgWLELCdmLuxk2i+x9UDpSiss2u0ZA=
golang.org/x/oauth2 v0.22.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand All @@ -337,6 +340,7 @@ golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0=
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8=
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
Expand All @@ -357,6 +361,7 @@ golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c
golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24=
golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ=
golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg=
golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
Expand Down
8 changes: 7 additions & 1 deletion pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package jobs

import (
"encoding/json"
"math"
"strconv"
"unsafe"
)
Expand Down Expand Up @@ -90,12 +91,17 @@ func (p Pipeline) Int(name string, d int) int {
switch v := value.(type) {
// the most probable case
case string:
res, err := strconv.ParseInt(v, 10, 64)
res, err := strconv.ParseInt(v, 10, 32)
if err != nil {
// return default on failure
return d
}

if res > math.MaxInt32 || res < math.MinInt32 {
// return default if out of bounds
return d
}

return int(res)
case int:
return v
Expand Down
1 change: 1 addition & 0 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (p *Plugin) Init(cfg Configurer, log Logger, server Server) error {
p.log = log.NamedLogger(PluginName)
p.jobsProcessor = newPipesProc(p.log, &p.consumers, &p.consume, p.cfg.CfgOptions.Parallelism)
p.experimental = cfg.Experimental()
p.tracer = sdktrace.NewTracerProvider()

// collector
p.metrics = newStatsExporter(p)
Expand Down
43 changes: 43 additions & 0 deletions tests/configs/.rr-issue2085.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
version: '3'

rpc:
listen: tcp://127.0.0.1:6001

server:
command: "php php_test_files/jobs/jobs_ok.php"
on_init:
command: "php php_test_files/jobs/on-init.php"
exit_on_error: true
relay: "pipes"


logs:
level: debug
encoding: console
mode: development

jobs:
# num logical cores by default
num_pollers: 12
# 1mi by default
pipeline_size: 100000
# worker pool configuration
pool:
num_workers: 10
allocate_timeout: 60s
destroy_timeout: 60s

# list of broker pipelines associated with endpoints
pipelines:
test-1-memory:
driver: memory
config:
priority: 10
prefetch: 10000


# list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
consume: [
"test-1-memory",
]

73 changes: 73 additions & 0 deletions tests/jobs_general_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,79 @@ func TestJobsInit(t *testing.T) {
require.Equal(t, 9, oLogger.FilterMessageSnippet("job was processed successfully").Len())
}

func TestIssue2085(t *testing.T) {
cont := endure.New(slog.LevelDebug)

cfg := &config.Plugin{
Version: "2024.2.0",
Path: "configs/.rr-issue2085.yaml",
}

err := cont.RegisterAll(
&logger.Plugin{},
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
&memory.Plugin{},
)
assert.NoError(t, err)

err = cont.Init()
if err != nil {
t.Fatal(err)
}

ch, err := cont.Serve()
if err != nil {
t.Fatal(err)
}

sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

wg := &sync.WaitGroup{}
wg.Add(1)

stopCh := make(chan struct{}, 1)

go func() {
defer wg.Done()
for {
select {
case e := <-ch:
assert.Fail(t, "error", e.Error.Error())
err = cont.Stop()
if err != nil {
assert.FailNow(t, "error", err.Error())
}
case <-sig:
err = cont.Stop()
if err != nil {
assert.FailNow(t, "error", err.Error())
}
return
case <-stopCh:
// timeout
err = cont.Stop()
if err != nil {
assert.FailNow(t, "error", err.Error())
}
return
}
}
}()

time.Sleep(time.Second * 5)

stopCh <- struct{}{}
wg.Wait()

time.Sleep(time.Second)
}

func TestJOBSMetrics(t *testing.T) {
cont := endure.New(slog.LevelDebug)

Expand Down
16 changes: 5 additions & 11 deletions tests/php_test_files/jobs/jobs_ok.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,15 @@
* @var Goridge\RelayInterface $relay
*/

use Spiral\Goridge;
use Spiral\RoadRunner;
use Spiral\Goridge\StreamRelay;
use Spiral\RoadRunner\Jobs\Consumer;
use Spiral\RoadRunner\Jobs\Serializer\JsonSerializer;

ini_set('display_errors', 'stderr');
require dirname(__DIR__) . "/vendor/autoload.php";

$consumer = new Spiral\RoadRunner\Jobs\Consumer();

while ($task = $consumer->waitTask()) {
try {
$task->complete();
} catch (\Throwable $e) {
$task->error((string)$e);
}
try {
$task->ack();
} catch (\Throwable $e) {
$task->error((string)$e);
}
}
21 changes: 21 additions & 0 deletions tests/php_test_files/jobs/on-init.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

<?php

/**
* @var Goridge\RelayInterface $relay
*/

use Spiral\RoadRunner\Jobs\Jobs;
use Spiral\Goridge\RPC\RPC;
use Spiral\RoadRunner\Environment;

ini_set('display_errors', 'stderr');
require dirname(__DIR__) . "/vendor/autoload.php";

$jobs = new Jobs(
RPC::create(
Environment::fromGlobals()->getRPCAddress(),
),
);

$jobs->count();

0 comments on commit 82d1bf2

Please sign in to comment.