From 09ee0d41fc924c905548158a4a9af0859f3adc56 Mon Sep 17 00:00:00 2001 From: Alex Leong Date: Thu, 14 Nov 2024 09:30:49 -0800 Subject: [PATCH] Allow diagnostics endpoints command to receive more than one message (#13285) The `linkerd diagnostics endpoints` command initiates a `Get` lookup to the destination controller to get the set of endpoints for a destination. This is a streaming response API and the command takes only the first response message and displays it. However, the full current state of endpoints may be split across multiple messages, resulting in an incomplete list of endpoints displayed. We instead read continuously from the response stream for a short amount of time (5 seconds) before displaying the full set of endpoints received. Signed-off-by: Alex Leong --- cli/cmd/endpoints.go | 44 +++++++++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/cli/cmd/endpoints.go b/cli/cmd/endpoints.go index 8c76b1983bb01..3fc88895e89cd 100644 --- a/cli/cmd/endpoints.go +++ b/cli/cmd/endpoints.go @@ -6,11 +6,12 @@ import ( "encoding/json" "errors" "fmt" + "io" "os" "sort" "strings" - "sync" "text/tabwriter" + "time" destinationPb "github.com/linkerd/linkerd2-proxy-api/go/destination" netPb "github.com/linkerd/linkerd2-proxy-api/go/net" @@ -142,15 +143,11 @@ destination.`, func requestEndpointsFromAPI(client destinationPb.DestinationClient, token string, authorities []string) (endpointsInfo, error) { info := make(endpointsInfo) - // buffered channels to avoid blocking - events := make(chan *destinationPb.Update, len(authorities)) - errs := make(chan error, len(authorities)) - var wg sync.WaitGroup + events := make(chan *destinationPb.Update, 1000) + errs := make(chan error, 1000) for _, authority := range authorities { - wg.Add(1) go func(authority string) { - defer wg.Done() if len(errs) == 0 { dest := &destinationPb.GetDestination{ Scheme: "http:", @@ -164,22 +161,31 @@ func requestEndpointsFromAPI(client destinationPb.DestinationClient, token strin return } - event, err := rsp.Recv() - if err != nil { - if grpcError, ok := status.FromError(err); ok { - err = errors.New(grpcError.Message()) + // Endpoint state may be sent in multiple messages so it's not + // sufficient to read only the first message. Instead, we + // continuously read from the stream. This goroutine will never + // terminate if there are no errors, but this is okay for a + // short lived CLI command. + for { + event, err := rsp.Recv() + if errors.Is(err, io.EOF) { + return + } else if err != nil { + if grpcError, ok := status.FromError(err); ok { + err = errors.New(grpcError.Message()) + } + errs <- err + return } - errs <- err - return + events <- event } - events <- event } }(authority) } - // Block till all goroutines above are done - wg.Wait() + // Wait an amount of time for some endpoint responses to be received. + timeout := time.NewTimer(5 * time.Second) - for i := 0; i < len(authorities); i++ { + for { select { case err := <-errs: // we only care about the first error @@ -210,10 +216,10 @@ func requestEndpointsFromAPI(client destinationPb.DestinationClient, token strin http2: addr.GetHttp2(), }) } + case <-timeout.C: + return info, nil } } - - return info, nil } func getIP(tcpAddr *netPb.TcpAddress) string {