Skip to content

Commit

Permalink
use global JS settings in bench command (#853)
Browse files Browse the repository at this point in the history
Signed-off-by: Caleb Lloyd <[email protected]>
  • Loading branch information
caleblloyd authored Sep 1, 2023
1 parent 3e3bfee commit b15b464
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 25 deletions.
12 changes: 6 additions & 6 deletions cli/bench_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (c *benchCmd) bench(_ *fisk.ParseContext) error {
log.Fatalf("NATS connection failed: %v", err)
}

js, err = nc.JetStream(nats.MaxWait(c.jsTimeout))
js, err = nc.JetStream(append(jsOpts(), nats.MaxWait(c.jsTimeout))...)
if err != nil {
log.Fatalf("Couldn't get the JetStream context: %v", err)
}
Expand Down Expand Up @@ -507,7 +507,7 @@ func coreNATSPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg
}

func jsPublisher(c *benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg []byte, numMsg int, idPrefix string, pubNumber string, offset int) {
js, err := nc.JetStream()
js, err := nc.JetStream(jsOpts()...)
if err != nil {
log.Fatalf("Couldn't get the JetStream context: %v", err)
}
Expand Down Expand Up @@ -562,7 +562,7 @@ func jsPublisher(c *benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg []byt
case <-time.After(c.jsTimeout):
c.retriesUsed = true
log.Printf("JS PubAsync ack timeout (pending=%d)", js.PublishAsyncPending())
js, err = nc.JetStream()
js, err = nc.JetStream(jsOpts()...)
if err != nil {
log.Fatalf("Couldn't get the JetStream context: %v", err)
}
Expand Down Expand Up @@ -597,7 +597,7 @@ func jsPublisher(c *benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg []byt
}

func kvPutter(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg []byte, numMsg int, offset int) {
js, err := nc.JetStream()
js, err := nc.JetStream(jsOpts()...)
if err != nil {
log.Fatalf("Couldn't get the JetStream context: %v", err)
}
Expand Down Expand Up @@ -737,7 +737,7 @@ func (c *benchCmd) runSubscriber(bm *bench.Benchmark, nc *nats.Conn, startwg *sy
if c.js {
var js nats.JetStreamContext

js, err = nc.JetStream()
js, err = nc.JetStream(jsOpts()...)
if err != nil {
log.Fatalf("Couldn't get the JetStream context: %v", err)
}
Expand Down Expand Up @@ -800,7 +800,7 @@ func (c *benchCmd) runSubscriber(bm *bench.Benchmark, nc *nats.Conn, startwg *sy
if c.kv {
var js nats.JetStreamContext

js, err = nc.JetStream()
js, err = nc.JetStream(jsOpts()...)
if err != nil {
log.Fatalf("Couldn't get the JetStream context: %v", err)
}
Expand Down
42 changes: 23 additions & 19 deletions cli/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,28 @@ func natsOpts() []nats.Option {
}...)
}

func jsOpts() []nats.JSOpt {
jso := []nats.JSOpt{
nats.Domain(opts.JsDomain),
nats.APIPrefix(opts.JsApiPrefix),
nats.MaxWait(opts.Timeout),
}

if opts.Trace {
ct := &nats.ClientTrace{
RequestSent: func(subj string, payload []byte) {
log.Printf(">>> %s\n%s\n\n", subj, string(payload))
},
ResponseReceived: func(subj string, payload []byte, hdr nats.Header) {
log.Printf("<<< %s: %s", subj, string(payload))
},
}
jso = append(jso, ct)
}

return jso
}

func addCheat(name string, cmd *fisk.CmdClause) {
if opts.NoCheats {
return
Expand Down Expand Up @@ -427,25 +449,7 @@ func prepareJSHelper() (*nats.Conn, nats.JetStreamContext, error) {
return opts.Conn, opts.JSc, nil
}

jso := []nats.JSOpt{
nats.Domain(opts.JsDomain),
nats.APIPrefix(opts.JsApiPrefix),
nats.MaxWait(opts.Timeout),
}

if opts.Trace {
ct := &nats.ClientTrace{
RequestSent: func(subj string, payload []byte) {
log.Printf(">>> %s\n%s\n\n", subj, string(payload))
},
ResponseReceived: func(subj string, payload []byte, hdr nats.Header) {
log.Printf("<<< %s: %s", subj, string(payload))
},
}
jso = append(jso, ct)
}

opts.JSc, err = opts.Conn.JetStream(jso...)
opts.JSc, err = opts.Conn.JetStream(jsOpts()...)
if err != nil {
return nil, nil, err
}
Expand Down

0 comments on commit b15b464

Please sign in to comment.