Skip to content

Commit

Permalink
Merge pull request #888 from ripienaar/early_filter_connz
Browse files Browse the repository at this point in the history
Adjust when connz responses are filtered
  • Loading branch information
ripienaar authored Sep 29, 2023
2 parents a278fce + 68bfc2e commit 962b031
Showing 1 changed file with 74 additions and 43 deletions.
117 changes: 74 additions & 43 deletions cli/server_report_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ package cli

import (
"encoding/json"
"errors"
"fmt"
"sort"

"github.com/antonmedv/expr"
"github.com/antonmedv/expr/vm"
"github.com/choria-io/fisk"
"github.com/dustin/go-humanize"
"github.com/fatih/color"
Expand Down Expand Up @@ -497,7 +499,7 @@ func (c *SrvReportCmd) reportConnections(_ *fisk.ParseContext) error {
return fmt.Errorf("did not get results from any servers")
}

conns := connz.flatConnInfo(c.filterExpression)
conns := connz.flatConnInfo()

if c.json {
printJSON(conns)
Expand Down Expand Up @@ -629,42 +631,15 @@ func (c *SrvReportCmd) renderConnections(report []connInfo) {

type connzList []*server.ServerAPIConnzResponse

func (c connzList) flatConnInfo(filter string) []connInfo {
func (c connzList) flatConnInfo() []connInfo {
var conns []connInfo
for _, conn := range c {
srv := structWithoutOmitEmpty(*conn.Server)

for _, conn := range c {
for _, c := range conn.Data.Conns {
if filter != "" {
ci := structWithoutOmitEmpty(*c)
env := map[string]any{
"server": srv,
"Server": conn.Server,
"conns": ci,
"Conns": c,
}

program, err := expr.Compile(filter, expr.Env(env), expr.AsBool())
fisk.FatalIfError(err, "Invalid expression: %v", err)

out, err := expr.Run(program, env)
if err != nil {
fisk.FatalIfError(err, "Invalid expression: %v", err)
}

should, ok := out.(bool)
if !ok {
fisk.FatalIfError(err, "expression did not return a boolean")
}

if !should {
continue
}
}

conns = append(conns, connInfo{c, conn.Server})
}
}

return conns
}

Expand All @@ -691,6 +666,45 @@ func (c *SrvReportCmd) getConnz(limit int, nc *nats.Conn) (connzList, error) {
result := connzList{}
found := 0

var program *vm.Program
var err error
env := map[string]any{}

if c.filterExpression != "" {
program, err = expr.Compile(c.filterExpression, expr.Env(map[string]any{}), expr.AsBool(), expr.AllowUndefinedVariables())
fisk.FatalIfError(err, "Invalid expression: %v", err)
}

removeFilteredConns := func(co *server.ServerAPIConnzResponse) error {
conns := make([]*server.ConnInfo, len(co.Data.Conns))
copy(conns, co.Data.Conns)
co.Data.Conns = []*server.ConnInfo{}
srv := structWithoutOmitEmpty(*co.Server)

for _, conn := range conns {
env["server"] = srv
env["Server"] = co.Server
env["conns"] = structWithoutOmitEmpty(*conn)
env["Conns"] = conn

out, err := expr.Run(program, env)
if err != nil {
fisk.FatalIfError(err, "Invalid expression: %v", err)
}

should, ok := out.(bool)
if !ok {
fisk.FatalIfError(err, "expression did not return a boolean")
}

if should {
co.Data.Conns = append(co.Data.Conns, conn)
}
}

return nil
}

req := &server.ConnzEventOptions{
ConnzOptions: server.ConnzOptions{
Subscriptions: true,
Expand All @@ -702,18 +716,28 @@ func (c *SrvReportCmd) getConnz(limit int, nc *nats.Conn) (connzList, error) {
},
EventFilterOptions: c.reqFilter(),
}
res, err := doReq(req, "$SYS.REQ.SERVER.PING.CONNZ", c.waitFor, nc)
results, err := doReq(req, "$SYS.REQ.SERVER.PING.CONNZ", c.waitFor, nc)
if err != nil {
return nil, err
}

for _, c := range res {
co, err := parseConnzResp(c)
for _, res := range results {
co, err := parseConnzResp(res)
if err != nil {
return nil, err
}
result = append(result, co)
found += len(co.Data.Conns)

if c.filterExpression != "" {
err = removeFilteredConns(co)
if err != nil {
return nil, err
}
}

if len(co.Data.Conns) > 0 {
result = append(result, co)
}
}

if limit != 0 && found > limit {
Expand Down Expand Up @@ -760,30 +784,37 @@ func (c *SrvReportCmd) getConnz(limit int, nc *nats.Conn) (connzList, error) {
}

res, err := doReq(req, "$SYS.REQ.SERVER.PING.CONNZ", c.waitFor, nc)
if err == nats.ErrNoResponders {
if errors.Is(err, nats.ErrNoResponders) {
return nil, fmt.Errorf("server request failed, ensure the account used has system privileges and appropriate permissions")
} else if err != nil {
return nil, err
}

offset = 0

for _, c := range res {
conn, err := parseConnzResp(c)
for _, res := range res {
co, err := parseConnzResp(res)
if err != nil {
return nil, err
}

found += len(conn.Data.Conns)
found += len(co.Data.Conns)

if len(conn.Data.Conns) == 0 {
if len(co.Data.Conns) == 0 {
continue
}

result = append(result, conn)
if c.filterExpression != "" {
err = removeFilteredConns(co)
if err != nil {
return nil, err
}
}

result = append(result, co)

if conn.Data.Offset+conn.Data.Limit < conn.Data.Total {
offset = conn.Data.Offset + conn.Data.Limit + 1
if co.Data.Offset+co.Data.Limit < co.Data.Total {
offset = co.Data.Offset + co.Data.Limit + 1
}
}
}
Expand Down

0 comments on commit 962b031

Please sign in to comment.