Skip to content

Commit

Permalink
Allow diagnostics endpoints command to receive more than one message (#…
Browse files Browse the repository at this point in the history
…13285)

The `linkerd diagnostics endpoints` command initiates a `Get` lookup to the destination controller to get the set of endpoints for a destination.  This is a streaming response API and the command takes only the first response message and displays it.  However, the full current state of endpoints may be split across multiple messages, resulting in an incomplete list of endpoints displayed.

We instead read continuously from the response stream for a short amount of time (5 seconds) before displaying the full set of endpoints received.

Signed-off-by: Alex Leong <[email protected]>
  • Loading branch information
adleong authored Nov 14, 2024
1 parent 600e96a commit 09ee0d4
Showing 1 changed file with 25 additions and 19 deletions.
44 changes: 25 additions & 19 deletions cli/cmd/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"sort"
"strings"
"sync"
"text/tabwriter"
"time"

destinationPb "github.com/linkerd/linkerd2-proxy-api/go/destination"
netPb "github.com/linkerd/linkerd2-proxy-api/go/net"
Expand Down Expand Up @@ -142,15 +143,11 @@ destination.`,

func requestEndpointsFromAPI(client destinationPb.DestinationClient, token string, authorities []string) (endpointsInfo, error) {
info := make(endpointsInfo)
// buffered channels to avoid blocking
events := make(chan *destinationPb.Update, len(authorities))
errs := make(chan error, len(authorities))
var wg sync.WaitGroup
events := make(chan *destinationPb.Update, 1000)
errs := make(chan error, 1000)

for _, authority := range authorities {
wg.Add(1)
go func(authority string) {
defer wg.Done()
if len(errs) == 0 {
dest := &destinationPb.GetDestination{
Scheme: "http:",
Expand All @@ -164,22 +161,31 @@ func requestEndpointsFromAPI(client destinationPb.DestinationClient, token strin
return
}

event, err := rsp.Recv()
if err != nil {
if grpcError, ok := status.FromError(err); ok {
err = errors.New(grpcError.Message())
// Endpoint state may be sent in multiple messages so it's not
// sufficient to read only the first message. Instead, we
// continuously read from the stream. This goroutine will never
// terminate if there are no errors, but this is okay for a
// short lived CLI command.
for {
event, err := rsp.Recv()
if errors.Is(err, io.EOF) {
return
} else if err != nil {
if grpcError, ok := status.FromError(err); ok {
err = errors.New(grpcError.Message())
}
errs <- err
return
}
errs <- err
return
events <- event
}
events <- event
}
}(authority)
}
// Block till all goroutines above are done
wg.Wait()
// Wait an amount of time for some endpoint responses to be received.
timeout := time.NewTimer(5 * time.Second)

for i := 0; i < len(authorities); i++ {
for {
select {
case err := <-errs:
// we only care about the first error
Expand Down Expand Up @@ -210,10 +216,10 @@ func requestEndpointsFromAPI(client destinationPb.DestinationClient, token strin
http2: addr.GetHttp2(),
})
}
case <-timeout.C:
return info, nil
}
}

return info, nil
}

func getIP(tcpAddr *netPb.TcpAddress) string {
Expand Down

0 comments on commit 09ee0d4

Please sign in to comment.