From 4dacf0ab468fdccf33524759cbd9663e906841f1 Mon Sep 17 00:00:00 2001 From: laizy Date: Thu, 11 Jul 2024 14:12:37 +0800 Subject: [PATCH] use custom http handler (#1445) --- http/base/rpc/rpc.go | 63 +++++++---------------------- http/jsonrpc/rpc_server.go | 76 +++++++++++++++++++---------------- http/localrpc/local_server.go | 22 +++++----- http/nodeinfo/server.go | 8 ++-- p2pserver/protocols/rpcapi.go | 5 ++- 5 files changed, 75 insertions(+), 99 deletions(-) diff --git a/http/base/rpc/rpc.go b/http/base/rpc/rpc.go index 1fe8bb9cfd..206b53e3fd 100644 --- a/http/base/rpc/rpc.go +++ b/http/base/rpc/rpc.go @@ -21,12 +21,8 @@ package rpc import ( "encoding/json" - "fmt" "io" - "io/ioutil" "net/http" - "os" - "strings" "sync" "github.com/ontio/ontology/common/log" @@ -41,35 +37,32 @@ type JReq struct { ID interface{} `json:"id"` } -func init() { - mainMux.m = make(map[string]func([]interface{}) map[string]interface{}) -} - -//an instance of the multiplexer -var mainMux ServeMux - -//multiplexer that keeps track of every function to be called on specific rpc call +// multiplexer that keeps track of every function to be called on specific rpc call type ServeMux struct { sync.RWMutex m map[string]func([]interface{}) map[string]interface{} defaultFunction func(http.ResponseWriter, *http.Request) } -//a function to register functions to be called for specific rpc calls -func HandleFunc(pattern string, handler func([]interface{}) map[string]interface{}) { - mainMux.Lock() - defer mainMux.Unlock() - mainMux.m[pattern] = handler +func NewServeMux() *ServeMux { + return &ServeMux{ + m: make(map[string]func([]interface{}) map[string]interface{}), + } } -//a function to be called if the request is not a HTTP JSON RPC call -func SetDefaultFunc(def func(http.ResponseWriter, *http.Request)) { - mainMux.defaultFunction = def +func (self *ServeMux) HandleFunc(pattern string, handler func([]interface{}) map[string]interface{}) { + self.Lock() + defer self.Unlock() + self.m[pattern] = handler +} + +func (self *ServeMux) SetDefaultFunc(def func(http.ResponseWriter, *http.Request)) { + self.defaultFunction = def } // this is the function that should be called in order to answer an rpc call // should be registered like "http.HandleFunc("/", httpjsonrpc.Handle)" -func Handle(w http.ResponseWriter, r *http.Request) { +func (mainMux *ServeMux) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method == "OPTIONS" { w.Header().Add("Access-Control-Allow-Headers", "Content-Type") w.Header().Set("content-type", "application/json;charset=utf-8") @@ -153,31 +146,3 @@ func Handle(w http.ResponseWriter, r *http.Request) { w.Write(data) } } - -// Call sends RPC request to server -func Call(address string, method string, id interface{}, params []interface{}) ([]byte, error) { - data, err := json.Marshal(map[string]interface{}{ - "method": method, - "id": id, - "params": params, - }) - if err != nil { - fmt.Fprintf(os.Stderr, "Marshal JSON request: %v\n", err) - return nil, err - } - - resp, err := http.Post(address, "application/json", strings.NewReader(string(data))) - if err != nil { - fmt.Fprintf(os.Stderr, "POST request: %v\n", err) - return nil, err - } - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - fmt.Fprintf(os.Stderr, "GET response: %v\n", err) - return nil, err - } - - return body, nil -} diff --git a/http/jsonrpc/rpc_server.go b/http/jsonrpc/rpc_server.go index 5d316c9f24..a042f4ab0a 100644 --- a/http/jsonrpc/rpc_server.go +++ b/http/jsonrpc/rpc_server.go @@ -29,46 +29,54 @@ import ( "github.com/ontio/ontology/http/base/rpc" ) -func StartRPCServer() error { - log.Debug() - http.HandleFunc("/", rpc.Handle) - rpc.HandleFunc("getbestblockhash", GetBestBlockHash) - rpc.HandleFunc("getblock", GetBlock) - rpc.HandleFunc("getblockcount", GetBlockCount) - rpc.HandleFunc("getblockhash", GetBlockHash) - rpc.HandleFunc("getconnectioncount", GetConnectionCount) - rpc.HandleFunc("getsyncstatus", GetSyncStatus) +func NewRPCHandler() *rpc.ServeMux { + mux := rpc.NewServeMux() + mux.HandleFunc("getbestblockhash", GetBestBlockHash) + mux.HandleFunc("getblock", GetBlock) + mux.HandleFunc("getblockcount", GetBlockCount) + mux.HandleFunc("getblockhash", GetBlockHash) + mux.HandleFunc("getconnectioncount", GetConnectionCount) + mux.HandleFunc("getsyncstatus", GetSyncStatus) //HandleFunc("getrawmempool", GetRawMemPool) - rpc.HandleFunc("getrawtransaction", GetRawTransaction) - rpc.HandleFunc("sendrawtransaction", SendRawTransaction) - rpc.HandleFunc("getstorage", GetStorage) - rpc.HandleFunc("getversion", GetNodeVersion) - rpc.HandleFunc("getnetworkid", GetNetworkId) + mux.HandleFunc("getrawtransaction", GetRawTransaction) + mux.HandleFunc("sendrawtransaction", SendRawTransaction) + mux.HandleFunc("getstorage", GetStorage) + mux.HandleFunc("getversion", GetNodeVersion) + mux.HandleFunc("getnetworkid", GetNetworkId) - rpc.HandleFunc("getcontractstate", GetContractState) - rpc.HandleFunc("getmempooltxcount", GetMemPoolTxCount) - rpc.HandleFunc("getmempooltxstate", GetMemPoolTxState) - rpc.HandleFunc("getmempooltxhashlist", GetMemPoolTxHashList) - rpc.HandleFunc("getsmartcodeevent", GetSmartCodeEvent) - rpc.HandleFunc("getblockheightbytxhash", GetBlockHeightByTxHash) + mux.HandleFunc("getcontractstate", GetContractState) + mux.HandleFunc("getmempooltxcount", GetMemPoolTxCount) + mux.HandleFunc("getmempooltxstate", GetMemPoolTxState) + mux.HandleFunc("getmempooltxhashlist", GetMemPoolTxHashList) + mux.HandleFunc("getsmartcodeevent", GetSmartCodeEvent) + mux.HandleFunc("getblockheightbytxhash", GetBlockHeightByTxHash) - rpc.HandleFunc("getbalance", GetBalance) - rpc.HandleFunc("getbalancev2", GetBalanceV2) - rpc.HandleFunc("getoep4balance", GetOep4Balance) - rpc.HandleFunc("getallowance", GetAllowance) - rpc.HandleFunc("getallowancev2", GetAllowanceV2) - rpc.HandleFunc("getmerkleproof", GetMerkleProof) - rpc.HandleFunc("getblocktxsbyheight", GetBlockTxsByHeight) - rpc.HandleFunc("getgasprice", GetGasPrice) - rpc.HandleFunc("getunboundong", GetUnboundOng) - rpc.HandleFunc("getgrantong", GetGrantOng) + mux.HandleFunc("getbalance", GetBalance) + mux.HandleFunc("getbalancev2", GetBalanceV2) + mux.HandleFunc("getoep4balance", GetOep4Balance) + mux.HandleFunc("getallowance", GetAllowance) + mux.HandleFunc("getallowancev2", GetAllowanceV2) + mux.HandleFunc("getmerkleproof", GetMerkleProof) + mux.HandleFunc("getblocktxsbyheight", GetBlockTxsByHeight) + mux.HandleFunc("getgasprice", GetGasPrice) + mux.HandleFunc("getunboundong", GetUnboundOng) + mux.HandleFunc("getgrantong", GetGrantOng) - rpc.HandleFunc("getcrosschainmsg", GetCrossChainMsg) - rpc.HandleFunc("getcrossstatesproof", GetCrossStatesProof) - rpc.HandleFunc("getcrossstatesleafhashes", GetCrossStatesLeafHashes) + mux.HandleFunc("getcrosschainmsg", GetCrossChainMsg) + mux.HandleFunc("getcrossstatesproof", GetCrossStatesProof) + mux.HandleFunc("getcrossstatesleafhashes", GetCrossStatesLeafHashes) + + return mux +} + +func StartRPCServer() error { + log.Debug() - err := http.ListenAndServe(":"+strconv.Itoa(int(cfg.DefConfig.Rpc.HttpJsonPort)), nil) + rpcMux := NewRPCHandler() + mux := http.NewServeMux() + mux.Handle("/", rpcMux) + err := http.ListenAndServe(":"+strconv.Itoa(int(cfg.DefConfig.Rpc.HttpJsonPort)), mux) if err != nil { return fmt.Errorf("ListenAndServe error:%s", err) } diff --git a/http/localrpc/local_server.go b/http/localrpc/local_server.go index 246232c635..44020afa5b 100644 --- a/http/localrpc/local_server.go +++ b/http/localrpc/local_server.go @@ -26,26 +26,28 @@ import ( cfg "github.com/ontio/ontology/common/config" "github.com/ontio/ontology/common/log" - "github.com/ontio/ontology/http/base/rpc" + "github.com/ontio/ontology/http/jsonrpc" ) const ( LOCAL_HOST string = "127.0.0.1" - LOCAL_DIR string = "/local" ) +var LocalRpcMux = jsonrpc.NewRPCHandler() + func StartLocalServer() error { log.Debug() - http.HandleFunc(LOCAL_DIR, rpc.Handle) + rpcMux := LocalRpcMux - rpc.HandleFunc("getneighbor", GetNeighbor) - rpc.HandleFunc("getnodestate", GetNodeState) - rpc.HandleFunc("startconsensus", StartConsensus) - rpc.HandleFunc("stopconsensus", StopConsensus) - rpc.HandleFunc("setdebuginfo", SetDebugInfo) + rpcMux.HandleFunc("getneighbor", GetNeighbor) + rpcMux.HandleFunc("getnodestate", GetNodeState) + rpcMux.HandleFunc("startconsensus", StartConsensus) + rpcMux.HandleFunc("stopconsensus", StopConsensus) + rpcMux.HandleFunc("setdebuginfo", SetDebugInfo) - // TODO: only listen to local host - err := http.ListenAndServe(LOCAL_HOST+":"+strconv.Itoa(int(cfg.DefConfig.Rpc.HttpLocalPort)), nil) + mux := http.NewServeMux() + mux.Handle("/", rpcMux) + err := http.ListenAndServe(LOCAL_HOST+":"+strconv.Itoa(int(cfg.DefConfig.Rpc.HttpLocalPort)), mux) if err != nil { return fmt.Errorf("ListenAndServe error:%s", err) } diff --git a/http/nodeinfo/server.go b/http/nodeinfo/server.go index abf2ac2736..1e1cdb2214 100644 --- a/http/nodeinfo/server.go +++ b/http/nodeinfo/server.go @@ -114,15 +114,15 @@ func viewHandler(w http.ResponseWriter, r *http.Request) { func StartServer(n p2p.P2P) { node = n port := int(config.DefConfig.P2PNode.HttpInfoPort) - - http.HandleFunc("/info", viewHandler) + mux := http.NewServeMux() + mux.HandleFunc("/info", viewHandler) // prom related if err := initMetric(); err != nil { panic("init prometheus metrics fail") } - http.Handle("/metrics", promhttp.Handler()) + mux.Handle("/metrics", promhttp.Handler()) go updateMetric(n) - http.ListenAndServe(":"+strconv.Itoa(port), nil) + http.ListenAndServe(":"+strconv.Itoa(port), mux) } diff --git a/p2pserver/protocols/rpcapi.go b/p2pserver/protocols/rpcapi.go index 8faddf57ff..d4a8afc065 100644 --- a/p2pserver/protocols/rpcapi.go +++ b/p2pserver/protocols/rpcapi.go @@ -21,12 +21,13 @@ package protocols import ( "github.com/ontio/ontology/http/base/error" "github.com/ontio/ontology/http/base/rpc" + "github.com/ontio/ontology/http/localrpc" "github.com/ontio/ontology/p2pserver/protocols/subnet" ) func RegisterProposeOfflineVote(subnet *subnet.SubNet) { // curl http://localhost:20337/local -v -d '{"method":"proposeOfflineVote", "params":["pubkey1", "pubkey2"]}' - rpc.HandleFunc("proposeOfflineVote", func(params []interface{}) map[string]interface{} { + localrpc.LocalRpcMux.HandleFunc("proposeOfflineVote", func(params []interface{}) map[string]interface{} { var nodes []string for _, key := range params { switch pubKey := key.(type) { @@ -46,7 +47,7 @@ func RegisterProposeOfflineVote(subnet *subnet.SubNet) { }) // curl http://localhost:20337/local -v -d '{"method":"getOfflineVotes", "params":[]}' - rpc.HandleFunc("getOfflineVotes", func(params []interface{}) map[string]interface{} { + localrpc.LocalRpcMux.HandleFunc("getOfflineVotes", func(params []interface{}) map[string]interface{} { votes := subnet.GetOfflineVotes() return rpc.ResponseSuccess(votes)