1- // Copyright 2020 The Go Language Server Authors.
21// SPDX-License-Identifier: BSD-3-Clause
2+ // SPDX-FileCopyrightText: Copyright 2021 The Go Language Server Authors
33
44package jsonrpc2
55
@@ -8,10 +8,13 @@ import (
88 "fmt"
99 "sync"
1010 "sync/atomic"
11+
12+ "go.lsp.dev/pkg/event"
13+ "go.lsp.dev/pkg/event/label"
14+ "go.lsp.dev/pkg/event/tag"
1115)
1216
1317// Conn is the common interface to jsonrpc clients and servers.
14- //
1518// Conn is bidirectional; it does not have a designated server or client end.
1619// It manages the jsonrpc2 protocol, connecting responses back to their calls.
1720type Conn interface {
@@ -52,7 +55,7 @@ type Conn interface {
5255}
5356
5457type conn struct {
55- seq int64 // access atomic
58+ seq int64 // access atomically
5659 writeMu sync.Mutex // protects writes to the stream
5760 stream Stream
5861 pendingMu sync.Mutex // protects the pending map
@@ -69,7 +72,6 @@ func NewConn(s Stream) Conn {
6972 pending : make (map [ID ]chan * Response ),
7073 done : make (chan struct {}),
7174 }
72-
7375 return conn
7476}
7577
@@ -79,41 +81,51 @@ func (c *conn) Notify(ctx context.Context, method string, params interface{}) (e
7981 if err != nil {
8082 return fmt .Errorf ("marshaling notify parameters: %w" , err )
8183 }
82-
83- _ , err = c .write (ctx , notify )
84+ ctx , done := event .Start (ctx , method ,
85+ tag .Method .Of (method ),
86+ tag .RPCDirection .Of (tag .Outbound ),
87+ )
88+ defer func () {
89+ recordStatus (ctx , err )
90+ done ()
91+ }()
92+
93+ event .Metric (ctx , tag .Started .Of (1 ))
94+ n , err := c .write (ctx , notify )
95+ event .Metric (ctx , tag .SentBytes .Of (n ))
8496 return err
8597}
8698
87- func (c * conn ) replier (req Message ) Replier {
88- reply := func (ctx context.Context , result interface {}, err error ) error {
89- call , ok := req .(* Request )
99+ func (c * conn ) replier (req Message , spanDone func ()) Replier {
100+ return func (ctx context.Context , result interface {}, err error ) error {
101+ defer func () {
102+ recordStatus (ctx , err )
103+ spanDone ()
104+ }()
105+ call , ok := req .(* Call )
90106 if ! ok {
91107 // request was a notify, no need to respond
92108 return nil
93109 }
94-
95110 response , err := NewResponse (call .id , result , err )
96111 if err != nil {
97112 return err
98113 }
99-
100- _ , err = c . write (ctx , response )
114+ n , err := c . write ( ctx , response )
115+ event . Metric (ctx , tag . SentBytes . Of ( n ) )
101116 if err != nil {
102117 // TODO(iancottrell): if a stream write fails, we really need to shut down
103118 // the whole stream
104119 return err
105120 }
106121 return nil
107122 }
108-
109- return reply
110123}
111124
112- func (c * conn ) write (ctx context.Context , msg Message ) (n int64 , err error ) {
125+ func (c * conn ) write (ctx context.Context , msg Message ) (int64 , error ) {
113126 c .writeMu .Lock ()
114- n , err = c .stream .Write (ctx , msg )
115- c .writeMu .Unlock ()
116- return
127+ defer c .writeMu .Unlock ()
128+ return c .stream .Write (ctx , msg )
117129}
118130
119131// Go implemens Conn.
@@ -123,24 +135,34 @@ func (c *conn) Go(ctx context.Context, handler Handler) {
123135
124136func (c * conn ) run (ctx context.Context , handler Handler ) {
125137 defer close (c .done )
126-
127138 for {
128139 // get the next message
129- msg , _ , err := c .stream .Read (ctx )
140+ msg , n , err := c .stream .Read (ctx )
130141 if err != nil {
131142 // The stream failed, we cannot continue.
132143 c .fail (err )
133144 return
134145 }
135-
136146 switch msg := msg .(type ) {
137- case Requester :
138- if err := handler (ctx , c .replier (msg ), msg ); err != nil {
147+ case Request :
148+ labels := []label.Label {
149+ tag .Method .Of (msg .Method ()),
150+ tag .RPCDirection .Of (tag .Inbound ),
151+ {}, // reserved for ID if present
152+ }
153+ if call , ok := msg .(* Call ); ok {
154+ labels [len (labels )- 1 ] = tag .RPCID .Of (fmt .Sprintf ("%q" , call .ID ()))
155+ } else {
156+ labels = labels [:len (labels )- 1 ]
157+ }
158+ reqCtx , spanDone := event .Start (ctx , msg .Method (), labels ... )
159+ event .Metric (reqCtx ,
160+ tag .Started .Of (1 ),
161+ tag .ReceivedBytes .Of (n ))
162+ if err := handler (reqCtx , c .replier (msg , spanDone ), msg ); err != nil {
139163 // delivery failed, not much we can do
140- c .fail (err )
141- return
164+ event .Error (reqCtx , "jsonrpc2 message delivery failed" , err )
142165 }
143-
144166 case * Response :
145167 // If method is not set, this should be a response, in which case we must
146168 // have an id to send the response back to the caller.
@@ -177,3 +199,11 @@ func (c *conn) fail(err error) {
177199 c .err .Store (err )
178200 c .stream .Close ()
179201}
202+
203+ func recordStatus (ctx context.Context , err error ) {
204+ if err != nil {
205+ event .Label (ctx , tag .StatusCode .Of ("ERROR" ))
206+ } else {
207+ event .Label (ctx , tag .StatusCode .Of ("OK" ))
208+ }
209+ }
0 commit comments