Skip to content

Commit

Permalink
gopls: allow for asynchronous request handling
Browse files Browse the repository at this point in the history
As described in golang/go#69937, we need a mechanism that allows for
concurrent request handling in gopls. However, this cannot be
implemented entirely within the jsonrpc2 layer, because we need gopls to
observe requests in the order they arrive, so it can handle them with
the correct logical state.

This CL adds such a concurrency mechanism using a trick similar to
t.Parallel. Specifically, a new jsonrpc2.Async method is introduced
which, when invoked on the request context, signals the
jsonrpc2.AsyncHandler to start handling the next request.

Initially, we use this new mechanism within gopls to allow certain
long-running commands to execute asynchronously, once they have acquired
a cache.Snapshot representing the current logical state. This solves a
long-standing awkwardness in the govulncheck integration, which required
an additional gopls.fetch_vulncheck_result command to fetch an
asynchronous result.

This enables some code deletion and simplification, though we could
simplify further. Notably, change the code_action subcommand to
eliminate the progress handler registration, since we don't need
progress to know when a command is complete. Instead, use -v as a signal
to log progress reports to stderr.

Fixes golang/go#69937

Change-Id: I8736a445084cfa093f37c479d419294d5a1acbce
Reviewed-on: https://go-review.googlesource.com/c/tools/+/621055
Reviewed-by: Alan Donovan <[email protected]>
LUCI-TryBot-Result: Go LUCI <[email protected]>
  • Loading branch information
findleyr committed Oct 18, 2024
1 parent 8ecf757 commit a199121
Show file tree
Hide file tree
Showing 14 changed files with 242 additions and 210 deletions.
59 changes: 14 additions & 45 deletions gopls/internal/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"flag"
"fmt"
"log"
"math/rand"
"os"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -391,35 +390,13 @@ type connection struct {
client *cmdClient
}

// registerProgressHandler registers a handler for progress notifications.
// The caller must call unregister when the handler is no longer needed.
func (cli *cmdClient) registerProgressHandler(handler func(*protocol.ProgressParams)) (token protocol.ProgressToken, unregister func()) {
token = fmt.Sprintf("tok%d", rand.Uint64())

// register
cli.progressHandlersMu.Lock()
if cli.progressHandlers == nil {
cli.progressHandlers = make(map[protocol.ProgressToken]func(*protocol.ProgressParams))
}
cli.progressHandlers[token] = handler
cli.progressHandlersMu.Unlock()

unregister = func() {
cli.progressHandlersMu.Lock()
delete(cli.progressHandlers, token)
cli.progressHandlersMu.Unlock()
}
return token, unregister
}

// cmdClient defines the protocol.Client interface behavior of the gopls CLI tool.
type cmdClient struct {
app *Application

progressHandlersMu sync.Mutex
progressHandlers map[protocol.ProgressToken]func(*protocol.ProgressParams)
iwlToken protocol.ProgressToken
iwlDone chan struct{}
progressMu sync.Mutex
iwlToken protocol.ProgressToken
iwlDone chan struct{}

filesMu sync.Mutex // guards files map
files map[protocol.DocumentURI]*cmdFile
Expand Down Expand Up @@ -698,41 +675,33 @@ func (c *cmdClient) PublishDiagnostics(ctx context.Context, p *protocol.PublishD
}

func (c *cmdClient) Progress(_ context.Context, params *protocol.ProgressParams) error {
token, ok := params.Token.(string)
if !ok {
if _, ok := params.Token.(string); !ok {
return fmt.Errorf("unexpected progress token: %[1]T %[1]v", params.Token)
}

c.progressHandlersMu.Lock()
handler := c.progressHandlers[token]
c.progressHandlersMu.Unlock()
if handler == nil {
handler = c.defaultProgressHandler
}
handler(params)
return nil
}

// defaultProgressHandler is the default handler of progress messages,
// used during the initialize request.
func (c *cmdClient) defaultProgressHandler(params *protocol.ProgressParams) {
switch v := params.Value.(type) {
case *protocol.WorkDoneProgressBegin:
if v.Title == server.DiagnosticWorkTitle(server.FromInitialWorkspaceLoad) {
c.progressHandlersMu.Lock()
c.progressMu.Lock()
c.iwlToken = params.Token
c.progressHandlersMu.Unlock()
c.progressMu.Unlock()
}

case *protocol.WorkDoneProgressReport:
if c.app.Verbose {
fmt.Fprintln(os.Stderr, v.Message)
}

case *protocol.WorkDoneProgressEnd:
c.progressHandlersMu.Lock()
c.progressMu.Lock()
iwlToken := c.iwlToken
c.progressHandlersMu.Unlock()
c.progressMu.Unlock()

if params.Token == iwlToken {
close(c.iwlDone)
}
}
return nil
}

func (c *cmdClient) ShowDocument(ctx context.Context, params *protocol.ShowDocumentParams) (*protocol.ShowDocumentResult, error) {
Expand Down
35 changes: 3 additions & 32 deletions gopls/internal/cmd/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ import (
"flag"
"fmt"
"log"
"os"
"slices"

"golang.org/x/tools/gopls/internal/protocol"
"golang.org/x/tools/gopls/internal/protocol/command"
"golang.org/x/tools/gopls/internal/server"
"golang.org/x/tools/internal/tool"
)

Expand Down Expand Up @@ -98,38 +96,11 @@ func (e *execute) Run(ctx context.Context, args ...string) error {

// executeCommand executes a protocol.Command, displaying progress
// messages and awaiting completion of asynchronous commands.
//
// TODO(rfindley): inline away all calls, ensuring they inline idiomatically.
func (conn *connection) executeCommand(ctx context.Context, cmd *protocol.Command) (any, error) {
endStatus := make(chan string, 1)
token, unregister := conn.client.registerProgressHandler(func(params *protocol.ProgressParams) {
switch v := params.Value.(type) {
case *protocol.WorkDoneProgressReport:
fmt.Fprintln(os.Stderr, v.Message) // combined std{out,err}

case *protocol.WorkDoneProgressEnd:
endStatus <- v.Message // = canceled | failed | completed
}
})
defer unregister()

res, err := conn.ExecuteCommand(ctx, &protocol.ExecuteCommandParams{
return conn.ExecuteCommand(ctx, &protocol.ExecuteCommandParams{
Command: cmd.Command,
Arguments: cmd.Arguments,
WorkDoneProgressParams: protocol.WorkDoneProgressParams{
WorkDoneToken: token,
},
})
if err != nil {
return nil, err
}

// Some commands are asynchronous, so clients
// must wait for the "end" progress notification.
if command.Command(cmd.Command).IsAsync() {
status := <-endStatus
if status != server.CommandCompleted {
return nil, fmt.Errorf("command %s", status)
}
}

return res, nil
}
2 changes: 1 addition & 1 deletion gopls/internal/cmd/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestFail(t *testing.T) { t.Fatal("fail") }
}
// run the passing test
{
res := gopls(t, tree, "codelens", "-exec", "./a/a_test.go:3", "run test")
res := gopls(t, tree, "-v", "codelens", "-exec", "./a/a_test.go:3", "run test")
res.checkExit(true)
res.checkStderr(`PASS: TestPass`) // from go test
res.checkStderr("Info: all tests passed") // from gopls.test
Expand Down
8 changes: 8 additions & 0 deletions gopls/internal/protocol/command/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,15 @@ type VulncheckArgs struct {
type RunVulncheckResult struct {
// Token holds the progress token for LSP workDone reporting of the vulncheck
// invocation.
//
// Deprecated: previously, this was used as a signal to retrieve the result
// using gopls.fetch_vulncheck_result. Clients should ignore this field:
// gopls.vulncheck now runs synchronously, and returns a result in the Result
// field.
Token protocol.ProgressToken

// Result holds the result of running vulncheck.
Result *vulncheck.Result
}

// MemStatsResult holds selected fields from runtime.MemStats.
Expand Down
12 changes: 0 additions & 12 deletions gopls/internal/protocol/command/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,6 @@ type Command string

func (c Command) String() string { return string(c) }

// IsAsync reports whether the command is asynchronous:
// clients must wait for the "end" progress notification.
func (c Command) IsAsync() bool {
switch string(c) {
// TODO(adonovan): derive this list from interface.go somewhow.
// Unfortunately we can't even reference the enum from here...
case "gopls.run_tests", "gopls.run_govulncheck", "gopls.test":
return true
}
return false
}

// MarshalArgs encodes the given arguments to json.RawMessages. This function
// is used to construct arguments to a protocol.Command.
//
Expand Down
45 changes: 16 additions & 29 deletions gopls/internal/server/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"regexp"
Expand Down Expand Up @@ -41,6 +40,7 @@ import (
"golang.org/x/tools/internal/diff"
"golang.org/x/tools/internal/event"
"golang.org/x/tools/internal/gocommand"
"golang.org/x/tools/internal/jsonrpc2"
"golang.org/x/tools/internal/tokeninternal"
"golang.org/x/tools/internal/xcontext"
)
Expand Down Expand Up @@ -278,7 +278,7 @@ func (*commandHandler) AddTelemetryCounters(_ context.Context, args command.AddT
// commandConfig configures common command set-up and execution.
type commandConfig struct {
requireSave bool // whether all files must be saved for the command to work
progress string // title to use for progress reporting. If empty, no progress will be reported. Mandatory for async commands.
progress string // title to use for progress reporting. If empty, no progress will be reported.
forView string // view to resolve to a snapshot; incompatible with forURI
forURI protocol.DocumentURI // URI to resolve to a snapshot. If unset, snapshot will be nil.
}
Expand Down Expand Up @@ -370,18 +370,6 @@ func (c *commandHandler) run(ctx context.Context, cfg commandConfig, run command
return err
}

if enum := command.Command(c.params.Command); enum.IsAsync() {
if cfg.progress == "" {
log.Fatalf("asynchronous command %q does not enable progress reporting",
enum)
}
go func() {
if err := runcmd(); err != nil {
showMessage(ctx, c.s.client, protocol.Error, err.Error())
}
}()
return nil
}
return runcmd()
}

Expand Down Expand Up @@ -725,6 +713,7 @@ func (c *commandHandler) RunTests(ctx context.Context, args command.RunTestsArgs
requireSave: true, // go test honors overlays, but tests themselves cannot
forURI: args.URI,
}, func(ctx context.Context, deps commandDeps) error {
jsonrpc2.Async(ctx) // don't block RPCs behind this command, since it can take a while
return c.runTests(ctx, deps.snapshot, deps.work, args.URI, args.Tests, args.Benchmarks)
})
}
Expand Down Expand Up @@ -1233,23 +1222,25 @@ func (c *commandHandler) FetchVulncheckResult(ctx context.Context, arg command.U
return ret, err
}

const GoVulncheckCommandTitle = "govulncheck"

func (c *commandHandler) RunGovulncheck(ctx context.Context, args command.VulncheckArgs) (command.RunVulncheckResult, error) {
if args.URI == "" {
return command.RunVulncheckResult{}, errors.New("VulncheckArgs is missing URI field")
}

// Return the workdone token so that clients can identify when this
// vulncheck invocation is complete.
//
// Since the run function executes asynchronously, we use a channel to
// synchronize the start of the run and return the token.
tokenChan := make(chan protocol.ProgressToken, 1)
var commandResult command.RunVulncheckResult
err := c.run(ctx, commandConfig{
progress: "govulncheck", // (asynchronous)
requireSave: true, // govulncheck cannot honor overlays
progress: GoVulncheckCommandTitle,
requireSave: true, // govulncheck cannot honor overlays
forURI: args.URI,
}, func(ctx context.Context, deps commandDeps) error {
tokenChan <- deps.work.Token()
// For compatibility with the legacy asynchronous API, return the workdone
// token that clients used to use to identify when this vulncheck
// invocation is complete.
commandResult.Token = deps.work.Token()

jsonrpc2.Async(ctx) // run this in parallel with other requests: vulncheck can be slow.

workDoneWriter := progress.NewWorkDoneWriter(ctx, deps.work)
dir := filepath.Dir(args.URI.Path())
Expand All @@ -1259,6 +1250,7 @@ func (c *commandHandler) RunGovulncheck(ctx context.Context, args command.Vulnch
if err != nil {
return err
}
commandResult.Result = result

snapshot, release, err := c.s.session.InvalidateView(ctx, deps.snapshot.View(), cache.StateChange{
Vulns: map[protocol.DocumentURI]*vulncheck.Result{args.URI: result},
Expand Down Expand Up @@ -1295,12 +1287,7 @@ func (c *commandHandler) RunGovulncheck(ctx context.Context, args command.Vulnch
if err != nil {
return command.RunVulncheckResult{}, err
}
select {
case <-ctx.Done():
return command.RunVulncheckResult{}, ctx.Err()
case token := <-tokenChan:
return command.RunVulncheckResult{Token: token}, nil
}
return commandResult, nil
}

// MemStats implements the MemStats command. It returns an error as a
Expand Down
4 changes: 2 additions & 2 deletions gopls/internal/test/integration/codelens/codelens_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,10 @@ require golang.org/x/hello v1.2.3
if !found {
t.Fatalf("found no command with the title %s", commandTitle)
}
if _, err := env.Editor.ExecuteCommand(env.Ctx, &protocol.ExecuteCommandParams{
if err := env.Editor.ExecuteCommand(env.Ctx, &protocol.ExecuteCommandParams{
Command: lens.Command.Command,
Arguments: lens.Command.Arguments,
}); err != nil {
}, nil); err != nil {
t.Fatal(err)
}
env.AfterChange()
Expand Down
30 changes: 20 additions & 10 deletions gopls/internal/test/integration/expectation.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,17 +452,27 @@ type WorkStatus struct {
EndMsg string
}

// CompletedProgress expects that workDone progress is complete for the given
// progress token. When non-nil WorkStatus is provided, it will be filled
// when the expectation is met.
// CompletedProgress expects that there is exactly one workDone progress with
// the given title, and is satisfied when that progress completes. If it is
// met, the corresponding status is written to the into argument.
//
// If the token is not a progress token that the client has seen, this
// expectation is Unmeetable.
func CompletedProgress(token protocol.ProgressToken, into *WorkStatus) Expectation {
// TODO(rfindley): refactor to eliminate the redundancy with CompletedWork.
// This expectation is a vestige of older workarounds for asynchronous command
// execution.
func CompletedProgress(title string, into *WorkStatus) Expectation {
check := func(s State) Verdict {
work, ok := s.work[token]
if !ok {
return Unmeetable // TODO(rfindley): refactor to allow the verdict to explain this result
var work *workProgress
for _, w := range s.work {
if w.title == title {
if work != nil {
// TODO(rfindley): refactor to allow the verdict to explain this result
return Unmeetable // multiple matches
}
work = w
}
}
if work == nil {
return Unmeetable // zero matches
}
if work.complete {
if into != nil {
Expand All @@ -473,7 +483,7 @@ func CompletedProgress(token protocol.ProgressToken, into *WorkStatus) Expectati
}
return Unmet
}
desc := fmt.Sprintf("completed work for token %v", token)
desc := fmt.Sprintf("exactly 1 completed workDoneProgress with title %v", title)
return Expectation{
Check: check,
Description: desc,
Expand Down
Loading

0 comments on commit a199121

Please sign in to comment.