Skip to content

Commit

Permalink
jsonrpc: request logging, cleanup, join BroadcastError
Browse files Browse the repository at this point in the history
  • Loading branch information
jchappelow committed May 24, 2024
1 parent d74a9cf commit 3a67ad3
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 141 deletions.
2 changes: 2 additions & 0 deletions core/rpc/client/user/jsonrpc/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func (cl *Client) Broadcast(ctx context.Context, tx *transactions.Transaction, s
return nil, errors.Join(jsonErr, err)
}

err = errors.Join(berr, err)

switch transactions.TxCode(berr.TxCode) {
case transactions.CodeWrongChain:
return nil, errors.Join(transactions.ErrWrongChain, err)
Expand Down
78 changes: 0 additions & 78 deletions core/rpc/transport/grpc.go

This file was deleted.

51 changes: 0 additions & 51 deletions core/rpc/transport/timeout-conn.go

This file was deleted.

47 changes: 36 additions & 11 deletions internal/services/jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) {
// amazingly, exceeding the server's write timeout does not cancel request
// contexts: https://github.com/golang/go/issues/59602
// So, we add a timeout to the Request's context.
h = jsonRPCTimeoutHandler(h, cfg.timeout)
h = jsonRPCTimeoutHandler(h, cfg.timeout, log)
if cfg.enableCORS {
h = corsHandler(h)
}
Expand All @@ -184,12 +184,32 @@ func NewServer(addr string, log log.Logger, opts ...Opt) (*Server, error) {
return s, nil
}

func jsonRPCTimeoutHandler(h http.Handler, timeout time.Duration) http.Handler {
// jsonRPCTimeoutHandler runs the handler with a time limit. This middleware
// also logs the total request duration since it is a logical place that deals
// with request timing. This duration is the total elapsed duration that
// includes reading the full request, unmarshalling, running the corresponding
// method handler, marshalling the response, and transmitting the entire
// response to the client. This duration is at least as long as the time logged
// in processRequest, which pertains only to the handling of the request and
// thus reflects the server's computational burden, while this duration provides
// insight into the latencies introduced by bandwidth and marshalling.
func jsonRPCTimeoutHandler(h http.Handler, timeout time.Duration, logger log.Logger) http.Handler {
// We'll respond with a jsonrpc.Response type, but the request handler is
// downstream and we don't have the request ID.
resp := jsonrpc.NewErrorResponse(-1, jsonrpc.NewError(jsonrpc.ErrorTimeout, "RPC timeout", nil))
respMsg, _ := json.Marshal(resp)
return http.TimeoutHandler(h, timeout, string(respMsg))
h = http.TimeoutHandler(h, timeout, string(respMsg))

// Log total request handling time (including transfer).
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t0 := time.Now().UTC()
defer func() {
logger.Debug("request handling complete", log.Duration("total_elapsed", time.Since(t0)))
}()
// NOTE, to give downstream handlers access to t0 instead of a defer here:
// ctx := context.WithValue(r.Context(), CtxStartTime, t0); r = r.WithContext(ctx)
h.ServeHTTP(w, r)
})
}

// corsHandler adds CORS headers to the response. We don't need sophisticated
Expand All @@ -214,8 +234,8 @@ func corsHandler(h http.Handler) http.Handler {
// Other SIMPLE requests and non-cors requests
h.ServeHTTP(w, r)
})

}

func recoverer(h http.Handler, log log.Logger) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
Expand Down Expand Up @@ -339,7 +359,9 @@ func (s *Server) handlerV1(w http.ResponseWriter, r *http.Request) {
// appropriate function for the method, creates a response message, and writes
// it to the http.ResponseWriter.
func (s *Server) processRequest(ctx context.Context, w http.ResponseWriter, req *jsonrpc.Request) {
// Handle and time the request.
resp := s.handleRequest(ctx, req)

// Some conventions dictate 200 for everything, with the Response.Error
// being the only sign of issue. However, a certain set of errors warrant an
// http status code.
Expand All @@ -354,6 +376,8 @@ func (s *Server) processRequest(ctx context.Context, w http.ResponseWriter, req
statusCode = http.StatusInternalServerError // 500
}
}

// Write the response
s.writeJSON(w, resp, statusCode)
}

Expand Down Expand Up @@ -400,21 +424,22 @@ func (s *Server) handleRequest(ctx context.Context, req *jsonrpc.Request) *jsonr
return jsonrpc.NewErrorResponse(req.ID, rpcErr)
}

// Find the correct handler for this route. xxx now using a switch in handleMethod
// h, exists := routes[req.Method]
// if !exists {
// resp.Error = jsonrpc.NewError(jsonrpc.ErrorUnknownMethod, "unknown route", nil)
// return resp
// }

s.log.Debug("handling request", log.String("method", req.Method))
t0 := time.Now().UTC() // time only the handling (pertains to server utilization)

// call the method with the params
result, rpcErr := s.handleMethod(ctx, jsonrpc.Method(req.Method), req.Params)
if rpcErr != nil {
s.log.Info("request failure", log.String("method", req.Method),
log.Duration("elapsed", time.Since(t0)), log.Int("code", rpcErr.Code),
log.String("message", rpcErr.Message))

return jsonrpc.NewErrorResponse(req.ID, rpcErr)
}

s.log.Info("request success", log.String("method", req.Method),
log.Duration("elapsed", time.Since(t0)))

resp, err := jsonrpc.NewResponse(req.ID, result)
if err != nil { // failed to marshal result
s.log.Error("failed to marshal result", log.String("method", req.Method), log.Error(err))
Expand Down
3 changes: 2 additions & 1 deletion internal/services/jsonrpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/kwilteam/kwil-db/core/log"
jsonrpc "github.com/kwilteam/kwil-db/core/rpc/json"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -53,7 +54,7 @@ func Test_timeout(t *testing.T) {
})

// Wrap that handler with a 500ms timeout.
h = jsonRPCTimeoutHandler(h, 500*time.Millisecond)
h = jsonRPCTimeoutHandler(h, 500*time.Millisecond, log.NewStdOut(log.DebugLevel))
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodGet, "/", nil)
h.ServeHTTP(w, r)
Expand Down

0 comments on commit 3a67ad3

Please sign in to comment.