From de0f98b293ec0cbf0475f173c51823622bc1b38d Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Sun, 23 Mar 2014 20:03:42 -0700 Subject: [PATCH 01/15] Make this point at my repo instead of upstream. --- ae_install.sh | 2 +- codec/codec.go | 2 +- doc.go | 2 +- example_ae/app/server.go | 2 +- example_ae/client/client.go | 4 ++-- example_ae/whoami/whoami.ae.go | 2 +- example_ae/whoami/whoami.pb.go | 2 +- examples/add/add.go | 2 +- examples/add/addservice/addservice.pb.go | 4 ++-- examples/add/client/client.go | 2 +- examples/echo/client/client.go | 2 +- examples/echo/echo.go | 2 +- examples/echo/echo_test.go | 2 +- examples/echo/echoservice/echoservice.pb.go | 4 ++-- examples/remote/client/client.go | 4 ++-- examples/remote/offload/offload.pb.go | 4 ++-- examples/remote/remote.go | 4 ++-- examples/remote/remote_test.go | 4 ++-- plugin/plugin.go | 6 +++--- protoc-gen-go/main.go | 2 +- protoc-gen-go/testdata/service.pb.go | 4 ++-- test.sh | 2 +- 22 files changed, 32 insertions(+), 32 deletions(-) diff --git a/ae_install.sh b/ae_install.sh index d3364d2..8a6f35f 100755 --- a/ae_install.sh +++ b/ae_install.sh @@ -2,7 +2,7 @@ ROOT="." RPCGEN=$(dirname "$0") -PREFIX="github.com/kylelemons/go-rpcgen" +PREFIX="github.com/bradhe/go-rpcgen" set -e diff --git a/codec/codec.go b/codec/codec.go index d1d8b93..81ebf51 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -15,7 +15,7 @@ import ( "net/rpc" "code.google.com/p/goprotobuf/proto" - "github.com/kylelemons/go-rpcgen/plugin/wire" + "github.com/bradhe/go-rpcgen/plugin/wire" ) // ServerCodec implements the rpc.ServerCodec interface for generic protobufs. diff --git a/doc.go b/doc.go index ba61a18..b3e6b16 100644 --- a/doc.go +++ b/doc.go @@ -20,7 +20,7 @@ // Installation // // To install, run the following command: -// go get -v -u github.com/kylelemons/go-rpcgen/protoc-gen-go +// go get -v -u github.com/bradhe/go-rpcgen/protoc-gen-go // // Usage // diff --git a/example_ae/app/server.go b/example_ae/app/server.go index 8982a5d..8790818 100644 --- a/example_ae/app/server.go +++ b/example_ae/app/server.go @@ -12,7 +12,7 @@ import ( "net/http" "whoami" - _ "github.com/kylelemons/go-rpcgen/webrpc" + _ "github.com/bradhe/go-rpcgen/webrpc" ) type server struct{} diff --git a/example_ae/client/client.go b/example_ae/client/client.go index b5cce82..f25df11 100644 --- a/example_ae/client/client.go +++ b/example_ae/client/client.go @@ -9,8 +9,8 @@ package main import ( - "github.com/kylelemons/go-rpcgen/example_ae/whoami" - "github.com/kylelemons/go-rpcgen/webrpc" + "github.com/bradhe/go-rpcgen/example_ae/whoami" + "github.com/bradhe/go-rpcgen/webrpc" "log" "net/url" "os" diff --git a/example_ae/whoami/whoami.ae.go b/example_ae/whoami/whoami.ae.go index cf6b3e2..0a88bcb 100644 --- a/example_ae/whoami/whoami.ae.go +++ b/example_ae/whoami/whoami.ae.go @@ -11,7 +11,7 @@ import math "math" import "net/url" import "net/http" -import "github.com/kylelemons/go-rpcgen/webrpc" +import "github.com/bradhe/go-rpcgen/webrpc" // Reference proto, json, and math imports to suppress error if they are not otherwise used. var _ = &json.SyntaxError{} diff --git a/example_ae/whoami/whoami.pb.go b/example_ae/whoami/whoami.pb.go index a852c95..55d85f4 100644 --- a/example_ae/whoami/whoami.pb.go +++ b/example_ae/whoami/whoami.pb.go @@ -12,7 +12,7 @@ import math "math" import "net/url" import "net/http" -import "github.com/kylelemons/go-rpcgen/webrpc" +import "github.com/bradhe/go-rpcgen/webrpc" // Reference proto, json, and math imports to suppress error if they are not otherwise used. var _ = proto.Marshal diff --git a/examples/add/add.go b/examples/add/add.go index ec094dc..1140ea7 100644 --- a/examples/add/add.go +++ b/examples/add/add.go @@ -14,7 +14,7 @@ import ( "log" "net" - "github.com/kylelemons/go-rpcgen/examples/add/addservice" + "github.com/bradhe/go-rpcgen/examples/add/addservice" ) var ( diff --git a/examples/add/addservice/addservice.pb.go b/examples/add/addservice/addservice.pb.go index 35cc8b2..4318d2d 100644 --- a/examples/add/addservice/addservice.pb.go +++ b/examples/add/addservice/addservice.pb.go @@ -10,10 +10,10 @@ import math "math" import "net" import "net/rpc" -import "github.com/kylelemons/go-rpcgen/codec" +import "github.com/bradhe/go-rpcgen/codec" import "net/url" import "net/http" -import "github.com/kylelemons/go-rpcgen/webrpc" +import "github.com/bradhe/go-rpcgen/webrpc" // Reference proto, json, and math imports to suppress error if they are not otherwise used. var _ = proto.Marshal diff --git a/examples/add/client/client.go b/examples/add/client/client.go index dca4161..e52ff7b 100644 --- a/examples/add/client/client.go +++ b/examples/add/client/client.go @@ -13,7 +13,7 @@ import ( "fmt" "log" - "github.com/kylelemons/go-rpcgen/examples/add/addservice" + "github.com/bradhe/go-rpcgen/examples/add/addservice" ) var ( diff --git a/examples/echo/client/client.go b/examples/echo/client/client.go index 59ea035..9814af8 100644 --- a/examples/echo/client/client.go +++ b/examples/echo/client/client.go @@ -13,7 +13,7 @@ import ( "log" "os" - "github.com/kylelemons/go-rpcgen/examples/echo/echoservice" + "github.com/bradhe/go-rpcgen/examples/echo/echoservice" ) var server = flag.String("server", "localhost:9999", "RPC server address") diff --git a/examples/echo/echo.go b/examples/echo/echo.go index 85e63d0..636e70f 100644 --- a/examples/echo/echo.go +++ b/examples/echo/echo.go @@ -10,7 +10,7 @@ import ( "flag" "log" - "github.com/kylelemons/go-rpcgen/examples/echo/echoservice" + "github.com/bradhe/go-rpcgen/examples/echo/echoservice" ) var ( diff --git a/examples/echo/echo_test.go b/examples/echo/echo_test.go index 71ba1c3..78ba0ca 100644 --- a/examples/echo/echo_test.go +++ b/examples/echo/echo_test.go @@ -10,7 +10,7 @@ import ( "flag" "testing" - "github.com/kylelemons/go-rpcgen/examples/echo/echoservice" + "github.com/bradhe/go-rpcgen/examples/echo/echoservice" ) var server = flag.String("server", "localhost:9999", "RPC server address") diff --git a/examples/echo/echoservice/echoservice.pb.go b/examples/echo/echoservice/echoservice.pb.go index 7ab1602..39dd8dc 100644 --- a/examples/echo/echoservice/echoservice.pb.go +++ b/examples/echo/echoservice/echoservice.pb.go @@ -10,10 +10,10 @@ import math "math" import "net" import "net/rpc" -import "github.com/kylelemons/go-rpcgen/codec" +import "github.com/bradhe/go-rpcgen/codec" import "net/url" import "net/http" -import "github.com/kylelemons/go-rpcgen/webrpc" +import "github.com/bradhe/go-rpcgen/webrpc" // Reference proto, json, and math imports to suppress error if they are not otherwise used. var _ = proto.Marshal diff --git a/examples/remote/client/client.go b/examples/remote/client/client.go index a731b6c..3351ef2 100644 --- a/examples/remote/client/client.go +++ b/examples/remote/client/client.go @@ -11,8 +11,8 @@ import ( "log" "net/url" - "github.com/kylelemons/go-rpcgen/examples/remote/offload" - "github.com/kylelemons/go-rpcgen/webrpc" + "github.com/bradhe/go-rpcgen/examples/remote/offload" + "github.com/bradhe/go-rpcgen/webrpc" ) var base = flag.String("base", "http://localhost:9999/", "RPC server base URL") diff --git a/examples/remote/offload/offload.pb.go b/examples/remote/offload/offload.pb.go index 077447f..f585f6c 100644 --- a/examples/remote/offload/offload.pb.go +++ b/examples/remote/offload/offload.pb.go @@ -10,10 +10,10 @@ import math "math" import "net" import "net/rpc" -import "github.com/kylelemons/go-rpcgen/codec" +import "github.com/bradhe/go-rpcgen/codec" import "net/url" import "net/http" -import "github.com/kylelemons/go-rpcgen/webrpc" +import "github.com/bradhe/go-rpcgen/webrpc" // Reference proto, json, and math imports to suppress error if they are not otherwise used. var _ = proto.Marshal diff --git a/examples/remote/remote.go b/examples/remote/remote.go index c1257fc..a129ec9 100644 --- a/examples/remote/remote.go +++ b/examples/remote/remote.go @@ -11,8 +11,8 @@ import ( "log" "net/http" - "github.com/kylelemons/go-rpcgen/examples/remote/offload" - "github.com/kylelemons/go-rpcgen/webrpc" + "github.com/bradhe/go-rpcgen/examples/remote/offload" + "github.com/bradhe/go-rpcgen/webrpc" ) var addr = flag.String("addr", ":9999", "RPC server bind address") diff --git a/examples/remote/remote_test.go b/examples/remote/remote_test.go index 09381b0..c88bbdd 100644 --- a/examples/remote/remote_test.go +++ b/examples/remote/remote_test.go @@ -12,8 +12,8 @@ import ( "net/url" "testing" - "github.com/kylelemons/go-rpcgen/examples/remote/offload" - "github.com/kylelemons/go-rpcgen/webrpc" + "github.com/bradhe/go-rpcgen/examples/remote/offload" + "github.com/bradhe/go-rpcgen/webrpc" ) var base = flag.String("base", "http://localhost:9999/", "RPC server base URL") diff --git a/plugin/plugin.go b/plugin/plugin.go index 903e188..5f2a403 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -8,7 +8,7 @@ // RPC stubs for use with the the net/rpc package. // // To register the plugin, import this package as follows: -// import _ "github.com/kylelemons/go-rpcgen/plugin" +// import _ "github.com/bradhe/go-rpcgen/plugin" package plugin import ( @@ -84,12 +84,12 @@ func (p *Plugin) GenerateImports(file *generator.FileDescriptor) { if p.rpcImports { p.P(`import "net"`) p.P(`import "net/rpc"`) - p.P(`import "github.com/kylelemons/go-rpcgen/codec"`) + p.P(`import "github.com/bradhe/go-rpcgen/codec"`) } if p.webImports { p.P(`import "net/url"`) p.P(`import "net/http"`) - p.P(`import "github.com/kylelemons/go-rpcgen/webrpc"`) + p.P(`import "github.com/bradhe/go-rpcgen/webrpc"`) } } diff --git a/protoc-gen-go/main.go b/protoc-gen-go/main.go index 330cf66..ab039a3 100644 --- a/protoc-gen-go/main.go +++ b/protoc-gen-go/main.go @@ -50,7 +50,7 @@ import ( "code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/protoc-gen-go/generator" - _ "github.com/kylelemons/go-rpcgen/plugin" + _ "github.com/bradhe/go-rpcgen/plugin" ) func main() { diff --git a/protoc-gen-go/testdata/service.pb.go b/protoc-gen-go/testdata/service.pb.go index 76b314d..56ca33e 100644 --- a/protoc-gen-go/testdata/service.pb.go +++ b/protoc-gen-go/testdata/service.pb.go @@ -10,10 +10,10 @@ import math "math" import "net" import "net/rpc" -import "github.com/kylelemons/go-rpcgen/codec" +import "github.com/bradhe/go-rpcgen/codec" import "net/url" import "net/http" -import "github.com/kylelemons/go-rpcgen/webrpc" +import "github.com/bradhe/go-rpcgen/webrpc" // Reference proto, json, and math imports to suppress error if they are not otherwise used. var _ = proto.Marshal diff --git a/test.sh b/test.sh index 990b206..74c97c9 100755 --- a/test.sh +++ b/test.sh @@ -1,6 +1,6 @@ #!/bin/bash -REPO="github.com/kylelemons/go-rpcgen" +REPO="github.com/bradhe/go-rpcgen" function err { echo "$@" From 6f5a9f4e51c766a36e88662476f2699e0e6998d5 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Sun, 23 Mar 2014 20:09:36 -0700 Subject: [PATCH 02/15] Don't blow up when messages are empty. --- codec/codec.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/codec/codec.go b/codec/codec.go index 81ebf51..fd60733 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -159,7 +159,10 @@ func NewClientCodec(conn net.Conn) *ClientCodec { func (c *ClientCodec) WriteRequest(req *rpc.Request, obj interface{}) error { pb, ok := obj.(proto.Message) if !ok { - return fmt.Errorf("%T does not implement proto.Message", obj) + fmt.Errorf("%T does not implement proto.Message", obj) + + // Empty message. + pb = Message{} } // Write the header From 6dfd8889a8573435bde6cb4e242ad503e1c3dd93 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Sun, 23 Mar 2014 20:19:34 -0700 Subject: [PATCH 03/15] When we can't write, don't write. --- codec/codec.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/codec/codec.go b/codec/codec.go index fd60733..9919298 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -115,9 +115,11 @@ func (s *ServerCodec) ReadRequestBody(obj interface{}) error { func (s *ServerCodec) WriteResponse(resp *rpc.Response, obj interface{}) error { pb, ok := obj.(proto.Message) if !ok { - return fmt.Errorf("%T does not implement proto.Message", obj) + fmt.Errorf("%T does not implement proto.Message", obj) } + shouldWriteResponse := ok + // Write the header header := wire.Header{ Method: &resp.ServiceMethod, @@ -131,7 +133,11 @@ func (s *ServerCodec) WriteResponse(resp *rpc.Response, obj interface{}) error { } // Write the proto - return WriteProto(s.w, pb) + if shouldWriteResponse { + return WriteProto(s.w, pb) + } + + return nil } // Close closes the underlying conneciton. @@ -160,11 +166,10 @@ func (c *ClientCodec) WriteRequest(req *rpc.Request, obj interface{}) error { pb, ok := obj.(proto.Message) if !ok { fmt.Errorf("%T does not implement proto.Message", obj) - - // Empty message. - pb = Message{} } + shouldWriteBody := ok + // Write the header header := wire.Header{ Method: &req.ServiceMethod, @@ -174,7 +179,12 @@ func (c *ClientCodec) WriteRequest(req *rpc.Request, obj interface{}) error { return err } - return WriteProto(c.w, pb) + if shouldWriteBody { + return WriteProto(c.w, pb) + } + + // Nothing to see here. + return nil } // ReadResponseHeader reads the header protobuf (which is prefixed by a uvarint From ac2fb445fd307c3d16e841c469f44c05c9aff48a Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Tue, 28 Jul 2015 22:25:22 -0700 Subject: [PATCH 04/15] Update to use new protobuf upstream --- Makefile | 11 +++ codec/codec.go | 6 +- codec/codec_test.go | 2 +- example_ae/whoami/whoami.ae.go | 84 +++++--------------- example_ae/whoami/whoami.pb.go | 87 +++++---------------- examples/add/addservice/addservice.pb.go | 46 ++++++----- examples/echo/echoservice/echoservice.pb.go | 27 ++++--- examples/remote/offload/offload.pb.go | 40 ++++++---- plugin/common.go | 4 +- plugin/common_test.go | 6 +- plugin/plugin.go | 2 +- plugin/plugin_test.go | 4 +- plugin/rpc.go | 4 +- plugin/rpc_test.go | 6 +- plugin/web.go | 4 +- plugin/web_test.go | 6 +- plugin/wire/wire.pb.go | 39 +++++---- protoc-gen-go/main.go | 6 +- protoc-gen-go/testdata/service.pb.go | 46 ++++++----- webrpc/proto.go | 2 +- 20 files changed, 199 insertions(+), 233 deletions(-) create mode 100644 Makefile diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..fda5578 --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ +SHELL = /bin/bash +GO = /usr/local/go/bin/go + +build: + $(GO) build ./... + +setup: + $(GO) get ./... + +install: + $(GO) install -v ./... diff --git a/codec/codec.go b/codec/codec.go index 9919298..59fd576 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -14,8 +14,8 @@ import ( "net" "net/rpc" - "code.google.com/p/goprotobuf/proto" "github.com/bradhe/go-rpcgen/plugin/wire" + "github.com/golang/protobuf/proto" ) // ServerCodec implements the rpc.ServerCodec interface for generic protobufs. @@ -215,6 +215,10 @@ func (c *ClientCodec) ReadResponseHeader(resp *rpc.Response) error { // is zero, nothing is done (this indicates an error condition, which was // encapsulated in the header) func (c *ClientCodec) ReadResponseBody(obj interface{}) error { + if obj == nil { + + } + pb, ok := obj.(proto.Message) if !ok { return fmt.Errorf("%T does not implement proto.Message", obj) diff --git a/codec/codec_test.go b/codec/codec_test.go index 1ce9a4e..e1042ca 100644 --- a/codec/codec_test.go +++ b/codec/codec_test.go @@ -12,7 +12,7 @@ import ( "reflect" "testing" - "code.google.com/p/goprotobuf/proto" + "github.com/golang/protobuf/proto" ) type InvalidRequest struct{} diff --git a/example_ae/whoami/whoami.ae.go b/example_ae/whoami/whoami.ae.go index 0a88bcb..fe00376 100644 --- a/example_ae/whoami/whoami.ae.go +++ b/example_ae/whoami/whoami.ae.go @@ -4,89 +4,47 @@ // source: whoami/whoami.proto // DO NOT EDIT! +/* +Package whoami is a generated protocol buffer package. + +It is generated from these files: + whoami/whoami.proto + +It has these top-level messages: + Empty + YouAre +*/ package whoami -import json "encoding/json" +import proto "github.com/golang/protobuf/proto" import math "math" -import "net/url" -import "net/http" -import "github.com/bradhe/go-rpcgen/webrpc" - -// Reference proto, json, and math imports to suppress error if they are not otherwise used. -var _ = &json.SyntaxError{} +// Reference imports to suppress errors if they are not otherwise used. var _ = math.Inf type Empty struct { XXX_unrecognized []byte `json:"-"` } -func (this *Empty) Reset() { *this = Empty{} } -func (*Empty) ProtoMessage() {} +func (m *Empty) Reset() { *m = Empty{} } +func (m *Empty) String() string { return proto.CompactTextString(m) } +func (*Empty) ProtoMessage() {} type YouAre struct { IpAddr *string `protobuf:"bytes,1,req,name=ip_addr" json:"ip_addr,omitempty"` XXX_unrecognized []byte `json:"-"` } -func (this *YouAre) Reset() { *this = YouAre{} } -func (*YouAre) ProtoMessage() {} +func (m *YouAre) Reset() { *m = YouAre{} } +func (m *YouAre) String() string { return proto.CompactTextString(m) } +func (*YouAre) ProtoMessage() {} -func (this *YouAre) GetIpAddr() string { - if this != nil && this.IpAddr != nil { - return *this.IpAddr +func (m *YouAre) GetIpAddr() string { + if m != nil && m.IpAddr != nil { + return *m.IpAddr } return "" } func init() { } - -// WhoamiService is an interface satisfied by the generated client and -// which must be implemented by the object wrapped by the server. -type WhoamiService interface { - Whoami(in *Empty, out *YouAre) error -} - -// WhoamiServiceWeb is the web-based RPC version of the interface which -// must be implemented by the object wrapped by the webrpc server. -type WhoamiServiceWeb interface { - Whoami(r *http.Request, in *Empty, out *YouAre) error -} - -// internal wrapper for type-safe webrpc calling -type rpcWhoamiServiceWebClient struct { - remote *url.URL - protocol webrpc.Protocol -} - -func (this rpcWhoamiServiceWebClient) Whoami(in *Empty, out *YouAre) error { - return webrpc.Post(this.protocol, this.remote, "/WhoamiService/Whoami", in, out) -} - -// Register a WhoamiServiceWeb implementation with the given webrpc ServeMux. -// If mux is nil, the default webrpc.ServeMux is used. -func RegisterWhoamiServiceWeb(this WhoamiServiceWeb, mux webrpc.ServeMux) error { - if mux == nil { - mux = webrpc.DefaultServeMux - } - if err := mux.Handle("/WhoamiService/Whoami", func(c *webrpc.Call) error { - in, out := new(Empty), new(YouAre) - if err := c.ReadRequest(in); err != nil { - return err - } - if err := this.Whoami(c.Request, in, out); err != nil { - return err - } - return c.WriteResponse(out) - }); err != nil { - return err - } - return nil -} - -// NewWhoamiServiceWebClient returns a webrpc wrapper for calling the methods of WhoamiService -// remotely via the web. The remote URL is the base URL of the webrpc server. -func NewWhoamiServiceWebClient(pro webrpc.Protocol, remote *url.URL) WhoamiService { - return rpcWhoamiServiceWebClient{remote, pro} -} diff --git a/example_ae/whoami/whoami.pb.go b/example_ae/whoami/whoami.pb.go index 55d85f4..f2be81b 100644 --- a/example_ae/whoami/whoami.pb.go +++ b/example_ae/whoami/whoami.pb.go @@ -4,93 +4,48 @@ // source: whoami/whoami.proto // DO NOT EDIT! +/* +Package whoami is a generated protocol buffer package. + +It is generated from these files: + whoami/whoami.proto + +It has these top-level messages: + Empty + YouAre +*/ package whoami -import proto "code.google.com/p/goprotobuf/proto" -import json "encoding/json" +import proto "github.com/golang/protobuf/proto" import math "math" -import "net/url" -import "net/http" -import "github.com/bradhe/go-rpcgen/webrpc" - -// Reference proto, json, and math imports to suppress error if they are not otherwise used. +// Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal -var _ = &json.SyntaxError{} var _ = math.Inf type Empty struct { XXX_unrecognized []byte `json:"-"` } -func (this *Empty) Reset() { *this = Empty{} } -func (this *Empty) String() string { return proto.CompactTextString(this) } -func (*Empty) ProtoMessage() {} +func (m *Empty) Reset() { *m = Empty{} } +func (m *Empty) String() string { return proto.CompactTextString(m) } +func (*Empty) ProtoMessage() {} type YouAre struct { IpAddr *string `protobuf:"bytes,1,req,name=ip_addr" json:"ip_addr,omitempty"` XXX_unrecognized []byte `json:"-"` } -func (this *YouAre) Reset() { *this = YouAre{} } -func (this *YouAre) String() string { return proto.CompactTextString(this) } -func (*YouAre) ProtoMessage() {} +func (m *YouAre) Reset() { *m = YouAre{} } +func (m *YouAre) String() string { return proto.CompactTextString(m) } +func (*YouAre) ProtoMessage() {} -func (this *YouAre) GetIpAddr() string { - if this != nil && this.IpAddr != nil { - return *this.IpAddr +func (m *YouAre) GetIpAddr() string { + if m != nil && m.IpAddr != nil { + return *m.IpAddr } return "" } func init() { } - -// WhoamiService is an interface satisfied by the generated client and -// which must be implemented by the object wrapped by the server. -type WhoamiService interface { - Whoami(in *Empty, out *YouAre) error -} - -// WhoamiServiceWeb is the web-based RPC version of the interface which -// must be implemented by the object wrapped by the webrpc server. -type WhoamiServiceWeb interface { - Whoami(r *http.Request, in *Empty, out *YouAre) error -} - -// internal wrapper for type-safe webrpc calling -type rpcWhoamiServiceWebClient struct { - remote *url.URL - protocol webrpc.Protocol -} - -func (this rpcWhoamiServiceWebClient) Whoami(in *Empty, out *YouAre) error { - return webrpc.Post(this.protocol, this.remote, "/WhoamiService/Whoami", in, out) -} - -// Register a WhoamiServiceWeb implementation with the given webrpc ServeMux. -// If mux is nil, the default webrpc.ServeMux is used. -func RegisterWhoamiServiceWeb(this WhoamiServiceWeb, mux webrpc.ServeMux) error { - if mux == nil { - mux = webrpc.DefaultServeMux - } - if err := mux.Handle("/WhoamiService/Whoami", func(c *webrpc.Call) error { - in, out := new(Empty), new(YouAre) - if err := c.ReadRequest(in); err != nil { - return err - } - if err := this.Whoami(c.Request, in, out); err != nil { - return err - } - return c.WriteResponse(out) - }); err != nil { - return err - } - return nil -} - -// NewWhoamiServiceWebClient returns a webrpc wrapper for calling the methods of WhoamiService -// remotely via the web. The remote URL is the base URL of the webrpc server. -func NewWhoamiServiceWebClient(pro webrpc.Protocol, remote *url.URL) WhoamiService { - return rpcWhoamiServiceWebClient{remote, pro} -} diff --git a/examples/add/addservice/addservice.pb.go b/examples/add/addservice/addservice.pb.go index 4318d2d..728b5fd 100644 --- a/examples/add/addservice/addservice.pb.go +++ b/examples/add/addservice/addservice.pb.go @@ -2,10 +2,19 @@ // source: examples/add/addservice/addservice.proto // DO NOT EDIT! +/* +Package addservice is a generated protocol buffer package. + +It is generated from these files: + examples/add/addservice/addservice.proto + +It has these top-level messages: + AddMessage + SumMessage +*/ package addservice -import proto "code.google.com/p/goprotobuf/proto" -import json "encoding/json" +import proto "github.com/golang/protobuf/proto" import math "math" import "net" @@ -15,9 +24,8 @@ import "net/url" import "net/http" import "github.com/bradhe/go-rpcgen/webrpc" -// Reference proto, json, and math imports to suppress error if they are not otherwise used. +// Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal -var _ = &json.SyntaxError{} var _ = math.Inf type AddMessage struct { @@ -26,20 +34,20 @@ type AddMessage struct { XXX_unrecognized []byte `json:"-"` } -func (this *AddMessage) Reset() { *this = AddMessage{} } -func (this *AddMessage) String() string { return proto.CompactTextString(this) } -func (*AddMessage) ProtoMessage() {} +func (m *AddMessage) Reset() { *m = AddMessage{} } +func (m *AddMessage) String() string { return proto.CompactTextString(m) } +func (*AddMessage) ProtoMessage() {} -func (this *AddMessage) GetX() int32 { - if this != nil && this.X != nil { - return *this.X +func (m *AddMessage) GetX() int32 { + if m != nil && m.X != nil { + return *m.X } return 0 } -func (this *AddMessage) GetY() int32 { - if this != nil && this.Y != nil { - return *this.Y +func (m *AddMessage) GetY() int32 { + if m != nil && m.Y != nil { + return *m.Y } return 0 } @@ -49,13 +57,13 @@ type SumMessage struct { XXX_unrecognized []byte `json:"-"` } -func (this *SumMessage) Reset() { *this = SumMessage{} } -func (this *SumMessage) String() string { return proto.CompactTextString(this) } -func (*SumMessage) ProtoMessage() {} +func (m *SumMessage) Reset() { *m = SumMessage{} } +func (m *SumMessage) String() string { return proto.CompactTextString(m) } +func (*SumMessage) ProtoMessage() {} -func (this *SumMessage) GetZ() int32 { - if this != nil && this.Z != nil { - return *this.Z +func (m *SumMessage) GetZ() int32 { + if m != nil && m.Z != nil { + return *m.Z } return 0 } diff --git a/examples/echo/echoservice/echoservice.pb.go b/examples/echo/echoservice/echoservice.pb.go index 39dd8dc..0338797 100644 --- a/examples/echo/echoservice/echoservice.pb.go +++ b/examples/echo/echoservice/echoservice.pb.go @@ -2,10 +2,18 @@ // source: examples/echo/echoservice/echoservice.proto // DO NOT EDIT! +/* +Package echoservice is a generated protocol buffer package. + +It is generated from these files: + examples/echo/echoservice/echoservice.proto + +It has these top-level messages: + Payload +*/ package echoservice -import proto "code.google.com/p/goprotobuf/proto" -import json "encoding/json" +import proto "github.com/golang/protobuf/proto" import math "math" import "net" @@ -15,9 +23,8 @@ import "net/url" import "net/http" import "github.com/bradhe/go-rpcgen/webrpc" -// Reference proto, json, and math imports to suppress error if they are not otherwise used. +// Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal -var _ = &json.SyntaxError{} var _ = math.Inf type Payload struct { @@ -25,13 +32,13 @@ type Payload struct { XXX_unrecognized []byte `json:"-"` } -func (this *Payload) Reset() { *this = Payload{} } -func (this *Payload) String() string { return proto.CompactTextString(this) } -func (*Payload) ProtoMessage() {} +func (m *Payload) Reset() { *m = Payload{} } +func (m *Payload) String() string { return proto.CompactTextString(m) } +func (*Payload) ProtoMessage() {} -func (this *Payload) GetMessage() string { - if this != nil && this.Message != nil { - return *this.Message +func (m *Payload) GetMessage() string { + if m != nil && m.Message != nil { + return *m.Message } return "" } diff --git a/examples/remote/offload/offload.pb.go b/examples/remote/offload/offload.pb.go index f585f6c..67767a2 100644 --- a/examples/remote/offload/offload.pb.go +++ b/examples/remote/offload/offload.pb.go @@ -2,10 +2,19 @@ // source: examples/remote/offload/offload.proto // DO NOT EDIT! +/* +Package offload is a generated protocol buffer package. + +It is generated from these files: + examples/remote/offload/offload.proto + +It has these top-level messages: + DataSet + ResultSet +*/ package offload -import proto "code.google.com/p/goprotobuf/proto" -import json "encoding/json" +import proto "github.com/golang/protobuf/proto" import math "math" import "net" @@ -15,9 +24,8 @@ import "net/url" import "net/http" import "github.com/bradhe/go-rpcgen/webrpc" -// Reference proto, json, and math imports to suppress error if they are not otherwise used. +// Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal -var _ = &json.SyntaxError{} var _ = math.Inf type DataSet struct { @@ -25,13 +33,13 @@ type DataSet struct { XXX_unrecognized []byte `json:"-"` } -func (this *DataSet) Reset() { *this = DataSet{} } -func (this *DataSet) String() string { return proto.CompactTextString(this) } -func (*DataSet) ProtoMessage() {} +func (m *DataSet) Reset() { *m = DataSet{} } +func (m *DataSet) String() string { return proto.CompactTextString(m) } +func (*DataSet) ProtoMessage() {} -func (this *DataSet) GetData() string { - if this != nil && this.Data != nil { - return *this.Data +func (m *DataSet) GetData() string { + if m != nil && m.Data != nil { + return *m.Data } return "" } @@ -41,13 +49,13 @@ type ResultSet struct { XXX_unrecognized []byte `json:"-"` } -func (this *ResultSet) Reset() { *this = ResultSet{} } -func (this *ResultSet) String() string { return proto.CompactTextString(this) } -func (*ResultSet) ProtoMessage() {} +func (m *ResultSet) Reset() { *m = ResultSet{} } +func (m *ResultSet) String() string { return proto.CompactTextString(m) } +func (*ResultSet) ProtoMessage() {} -func (this *ResultSet) GetResult() string { - if this != nil && this.Result != nil { - return *this.Result +func (m *ResultSet) GetResult() string { + if m != nil && m.Result != nil { + return *m.Result } return "" } diff --git a/plugin/common.go b/plugin/common.go index 98799f7..350df52 100644 --- a/plugin/common.go +++ b/plugin/common.go @@ -7,8 +7,8 @@ package plugin import ( - descriptor "code.google.com/p/goprotobuf/protoc-gen-go/descriptor" - "code.google.com/p/goprotobuf/protoc-gen-go/generator" + descriptor "github.com/golang/protobuf/protoc-gen-go/descriptor" + "github.com/golang/protobuf/protoc-gen-go/generator" ) // GenerateCommonStubs is the core of the plugin package. diff --git a/plugin/common_test.go b/plugin/common_test.go index 585cf6d..798b1f2 100644 --- a/plugin/common_test.go +++ b/plugin/common_test.go @@ -11,9 +11,9 @@ import ( "strings" "testing" - "code.google.com/p/goprotobuf/proto" - descriptor "code.google.com/p/goprotobuf/protoc-gen-go/descriptor" - "code.google.com/p/goprotobuf/protoc-gen-go/generator" + "github.com/golang/protobuf/proto" + descriptor "github.com/golang/protobuf/protoc-gen-go/descriptor" + "github.com/golang/protobuf/protoc-gen-go/generator" ) func TestGenerateCommonStubs(t *testing.T) { diff --git a/plugin/plugin.go b/plugin/plugin.go index 5f2a403..85a85d4 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -15,7 +15,7 @@ import ( "os" "strings" - "code.google.com/p/goprotobuf/protoc-gen-go/generator" + "github.com/golang/protobuf/protoc-gen-go/generator" ) // Fail to compile if Plugin doesn't implement the generator.Plugin interface diff --git a/plugin/plugin_test.go b/plugin/plugin_test.go index 1ab5de3..c3469d3 100644 --- a/plugin/plugin_test.go +++ b/plugin/plugin_test.go @@ -7,8 +7,8 @@ package plugin import ( - descriptor "code.google.com/p/goprotobuf/protoc-gen-go/descriptor" - "code.google.com/p/goprotobuf/protoc-gen-go/generator" + descriptor "github.com/golang/protobuf/protoc-gen-go/descriptor" + "github.com/golang/protobuf/protoc-gen-go/generator" ) type fakeObject string diff --git a/plugin/rpc.go b/plugin/rpc.go index a747291..3758385 100644 --- a/plugin/rpc.go +++ b/plugin/rpc.go @@ -7,8 +7,8 @@ package plugin import ( - descriptor "code.google.com/p/goprotobuf/protoc-gen-go/descriptor" - "code.google.com/p/goprotobuf/protoc-gen-go/generator" + descriptor "github.com/golang/protobuf/protoc-gen-go/descriptor" + "github.com/golang/protobuf/protoc-gen-go/generator" ) // TODO: Use io.ReadWriteCloser instead of net.Conn? diff --git a/plugin/rpc_test.go b/plugin/rpc_test.go index 5ba80a0..67a58cc 100644 --- a/plugin/rpc_test.go +++ b/plugin/rpc_test.go @@ -11,9 +11,9 @@ import ( "strings" "testing" - "code.google.com/p/goprotobuf/proto" - descriptor "code.google.com/p/goprotobuf/protoc-gen-go/descriptor" - "code.google.com/p/goprotobuf/protoc-gen-go/generator" + "github.com/golang/protobuf/proto" + descriptor "github.com/golang/protobuf/protoc-gen-go/descriptor" + "github.com/golang/protobuf/protoc-gen-go/generator" ) func TestGenerateRPCStubs(t *testing.T) { diff --git a/plugin/web.go b/plugin/web.go index d9eb56e..3d80db3 100644 --- a/plugin/web.go +++ b/plugin/web.go @@ -7,8 +7,8 @@ package plugin import ( - descriptor "code.google.com/p/goprotobuf/protoc-gen-go/descriptor" - "code.google.com/p/goprotobuf/protoc-gen-go/generator" + descriptor "github.com/golang/protobuf/protoc-gen-go/descriptor" + "github.com/golang/protobuf/protoc-gen-go/generator" ) // GenerateWebStubs generates the webrpc stubs. diff --git a/plugin/web_test.go b/plugin/web_test.go index cc1ca98..5734c43 100644 --- a/plugin/web_test.go +++ b/plugin/web_test.go @@ -11,9 +11,9 @@ import ( "strings" "testing" - "code.google.com/p/goprotobuf/proto" - descriptor "code.google.com/p/goprotobuf/protoc-gen-go/descriptor" - "code.google.com/p/goprotobuf/protoc-gen-go/generator" + "github.com/golang/protobuf/proto" + descriptor "github.com/golang/protobuf/protoc-gen-go/descriptor" + "github.com/golang/protobuf/protoc-gen-go/generator" ) func TestGenerateWebStubs(t *testing.T) { diff --git a/plugin/wire/wire.pb.go b/plugin/wire/wire.pb.go index 55a6b06..c8cd179 100644 --- a/plugin/wire/wire.pb.go +++ b/plugin/wire/wire.pb.go @@ -2,15 +2,22 @@ // source: plugin/wire/wire.proto // DO NOT EDIT! +/* +Package wire is a generated protocol buffer package. + +It is generated from these files: + plugin/wire/wire.proto + +It has these top-level messages: + Header +*/ package wire -import proto "code.google.com/p/goprotobuf/proto" -import json "encoding/json" +import proto "github.com/golang/protobuf/proto" import math "math" -// Reference proto, json, and math imports to suppress error if they are not otherwise used. +// Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal -var _ = &json.SyntaxError{} var _ = math.Inf type Header struct { @@ -20,27 +27,27 @@ type Header struct { XXX_unrecognized []byte `json:"-"` } -func (this *Header) Reset() { *this = Header{} } -func (this *Header) String() string { return proto.CompactTextString(this) } -func (*Header) ProtoMessage() {} +func (m *Header) Reset() { *m = Header{} } +func (m *Header) String() string { return proto.CompactTextString(m) } +func (*Header) ProtoMessage() {} -func (this *Header) GetMethod() string { - if this != nil && this.Method != nil { - return *this.Method +func (m *Header) GetMethod() string { + if m != nil && m.Method != nil { + return *m.Method } return "" } -func (this *Header) GetSeq() uint64 { - if this != nil && this.Seq != nil { - return *this.Seq +func (m *Header) GetSeq() uint64 { + if m != nil && m.Seq != nil { + return *m.Seq } return 0 } -func (this *Header) GetError() string { - if this != nil && this.Error != nil { - return *this.Error +func (m *Header) GetError() string { + if m != nil && m.Error != nil { + return *m.Error } return "" } diff --git a/protoc-gen-go/main.go b/protoc-gen-go/main.go index ab039a3..21d501c 100644 --- a/protoc-gen-go/main.go +++ b/protoc-gen-go/main.go @@ -1,7 +1,7 @@ // Go support for Protocol Buffers - Google's data interchange format // // Copyright 2010 Google Inc. All rights reserved. -// http://code.google.com/p/goprotobuf/ +// http://github.com/golang/protobuf/ // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are @@ -47,8 +47,8 @@ import ( "io/ioutil" "os" - "code.google.com/p/goprotobuf/proto" - "code.google.com/p/goprotobuf/protoc-gen-go/generator" + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/protoc-gen-go/generator" _ "github.com/bradhe/go-rpcgen/plugin" ) diff --git a/protoc-gen-go/testdata/service.pb.go b/protoc-gen-go/testdata/service.pb.go index 56ca33e..324b217 100644 --- a/protoc-gen-go/testdata/service.pb.go +++ b/protoc-gen-go/testdata/service.pb.go @@ -2,10 +2,19 @@ // source: protoc-gen-go/testdata/service.proto // DO NOT EDIT! +/* +Package svc is a generated protocol buffer package. + +It is generated from these files: + protoc-gen-go/testdata/service.proto + +It has these top-level messages: + Args + Return +*/ package svc -import proto "code.google.com/p/goprotobuf/proto" -import json "encoding/json" +import proto "github.com/golang/protobuf/proto" import math "math" import "net" @@ -15,9 +24,8 @@ import "net/url" import "net/http" import "github.com/bradhe/go-rpcgen/webrpc" -// Reference proto, json, and math imports to suppress error if they are not otherwise used. +// Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal -var _ = &json.SyntaxError{} var _ = math.Inf type Args struct { @@ -26,20 +34,20 @@ type Args struct { XXX_unrecognized []byte `json:"-"` } -func (this *Args) Reset() { *this = Args{} } -func (this *Args) String() string { return proto.CompactTextString(this) } -func (*Args) ProtoMessage() {} +func (m *Args) Reset() { *m = Args{} } +func (m *Args) String() string { return proto.CompactTextString(m) } +func (*Args) ProtoMessage() {} -func (this *Args) GetA() string { - if this != nil && this.A != nil { - return *this.A +func (m *Args) GetA() string { + if m != nil && m.A != nil { + return *m.A } return "" } -func (this *Args) GetB() string { - if this != nil && this.B != nil { - return *this.B +func (m *Args) GetB() string { + if m != nil && m.B != nil { + return *m.B } return "" } @@ -49,13 +57,13 @@ type Return struct { XXX_unrecognized []byte `json:"-"` } -func (this *Return) Reset() { *this = Return{} } -func (this *Return) String() string { return proto.CompactTextString(this) } -func (*Return) ProtoMessage() {} +func (m *Return) Reset() { *m = Return{} } +func (m *Return) String() string { return proto.CompactTextString(m) } +func (*Return) ProtoMessage() {} -func (this *Return) GetC() string { - if this != nil && this.C != nil { - return *this.C +func (m *Return) GetC() string { + if m != nil && m.C != nil { + return *m.C } return "" } diff --git a/webrpc/proto.go b/webrpc/proto.go index 9a35ddd..77e260b 100644 --- a/webrpc/proto.go +++ b/webrpc/proto.go @@ -13,7 +13,7 @@ import ( "io" "io/ioutil" - "code.google.com/p/goprotobuf/proto" + "github.com/golang/protobuf/proto" ) // ProtoBuf implements the Google Protocol Buffer implementation of the From fecc636c6bc39d0e74c8c80fb05623f20b9f1b15 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Wed, 19 Aug 2015 13:35:22 -0700 Subject: [PATCH 05/15] Create a wrapper around rpc.Client that is pooled. --- client/client.go | 165 +++++++++++++++++++++++++++++++++++++++++++++++ plugin/plugin.go | 3 +- plugin/rpc.go | 23 ++----- 3 files changed, 172 insertions(+), 19 deletions(-) create mode 100644 client/client.go diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..1c86028 --- /dev/null +++ b/client/client.go @@ -0,0 +1,165 @@ +package client + +import ( + "errors" + "github.com/bradhe/go-rpcgen/codec" + "math" + "net" + "net/rpc" + "sync" + "sync/atomic" + "time" +) + +var ( + ErrConnectionFailure = errors.New("failed to connect") + ErrClosed = errors.New("closed") + ErrInvalidPoolObject = errors.New("invalid pool object") + ErrPermanentlyShutdown = errors.New("permenantly shutdown") +) + +const ( + DefaultRetryCount = 6 +) + +type Client struct { + wg sync.WaitGroup + + addr string + pool sync.Pool + shutdown int32 +} + +func backoff(i int) time.Duration { + if i < 1 { + i = 1 + } + + ms := int(math.Exp2(float64(i))) + return time.Duration(ms) * time.Millisecond +} + +func (c *Client) Close() { + // If someone else called this, we'll just wait a bit for it all to close down. + if atomic.LoadInt32(&c.shutdown) > 0 { + c.wg.Wait() + return + } + + atomic.SwapInt32(&c.shutdown, 1) + c.wg.Wait() + + // By adding a second number here that means that we're completely shut down. + atomic.AddInt32(&c.shutdown, 1) + + // Now let's close down all of the connections in the poo in the pool. + for { + client, ok := c.pool.Get().(*rpc.Client) + + if !ok { + // we closed them all + break + } + + client.Close() + } +} + +func (c *Client) create() interface{} { + // If the service is shutdown, let's wait to kill it all. + if atomic.LoadInt32(&c.shutdown) > 1 { + return nil + } + + conn, err := net.Dial("tcp", c.addr) + + // If the connection failed, there's nothin' we can really do about it. + if err != nil { + return nil + } + + return rpc.NewClientWithCodec(codec.NewClientCodec(conn)) +} + +func (c *Client) Call(serviceMethod string, args interface{}, reply interface{}) error { + // If we're shut down, let's tell the user. + if c.shutdown > 0 { + return ErrClosed + } + + // Signal that something is going on. + c.wg.Add(1) + defer c.wg.Done() + + // Number of times we've retried + var retry int + + for { + client, ok := c.pool.Get().(*rpc.Client) + + if !ok { + return ErrInvalidPoolObject + } + + if client == nil { + return ErrConnectionFailure + } + + err := client.Call(serviceMethod, args, reply) + + // No error, so let's relinquish this back to the pool and get outta here. + if err == nil { + c.pool.Put(client) + break + } + + // If we got here, let's see what type of error it is. + if err == rpc.ErrShutdown { + retry += 1 + + if retry > DefaultRetryCount { + return ErrPermanentlyShutdown + } + + // Let's try again! + time.Sleep(backoff(retry)) + } else { + // This means err != nil, so we just report the error. + return err + } + } + + // We win the day! + return nil +} + +func (c *Client) doCall(call *rpc.Call, serviceMethod string, args interface{}, reply interface{}, done chan *rpc.Call) { + err := c.Call(serviceMethod, args, reply) + call.Error = err + done <- call +} + +func (c *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *rpc.Call) *rpc.Call { + call := new(rpc.Call) + call.ServiceMethod = serviceMethod + call.Args = args + call.Reply = reply + call.Done = done + + // If we're shut down, let's tell the user. + if c.shutdown > 0 { + call.Error = ErrClosed + return call + } + + // If we made it here, we're good. + go c.doCall(call, serviceMethod, args, reply, done) + return call +} + +func NewClient(addr string) *Client { + c := new(Client) + c.addr = addr + c.pool.New = c.create + return c +} diff --git a/plugin/plugin.go b/plugin/plugin.go index 85a85d4..e09de69 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -84,7 +84,8 @@ func (p *Plugin) GenerateImports(file *generator.FileDescriptor) { if p.rpcImports { p.P(`import "net"`) p.P(`import "net/rpc"`) - p.P(`import "github.com/bradhe/go-rpcgen/codec"`) + p.P(`import "github.com/bradhe/go-rpcgen/client"`) + p.P(`import "errors"`) } if p.webImports { p.P(`import "net/url"`) diff --git a/plugin/rpc.go b/plugin/rpc.go index 3758385..1986e84 100644 --- a/plugin/rpc.go +++ b/plugin/rpc.go @@ -25,25 +25,25 @@ func (p *Plugin) GenerateRPCStubs(svc *descriptor.ServiceDescriptorProto) { p.P("// internal wrapper for type-safe RPC calling") p.P("type rpc", name, "Client struct {") p.In() - p.P("*rpc.Client") + p.P("*client.Client") p.Out() p.P("}") for _, m := range svc.Method { method := generator.CamelCase(*m.Name) iType := p.ObjectNamed(*m.InputType) oType := p.ObjectNamed(*m.OutputType) - p.P("func (this rpc", name, "Client) ", method, "(in *", p.TypeName(iType), ", out *", p.TypeName(oType), ") error {") + p.P("func (c rpc", name, "Client) ", method, "(in *", p.TypeName(iType), ", out *", p.TypeName(oType), ") error {") p.In() - p.P(`return this.Call("`, name, ".", method, `", in, out)`) + p.P(`return c.Call("`, name, ".", method, `", in, out)`) p.Out() p.P("}") } p.P() p.P("// New", name, "Client returns an *rpc.Client wrapper for calling the methods of") p.P("// ", name, " remotely.") - p.P("func New", name, "Client(conn net.Conn) ", name, " {") + p.P("func New", name, "Client(addr string) ", name, " {") p.In() - p.P("return rpc", name, "Client{rpc.NewClientWithCodec(codec.NewClientCodec(conn))}") + p.P("return rpc", name, "Client{client.NewClient(addr)}") p.Out() p.P("}") p.P() @@ -61,19 +61,6 @@ func (p *Plugin) GenerateRPCStubs(svc *descriptor.ServiceDescriptorProto) { p.Out() p.P("}") p.P() - p.P("// Dial", name, " returns a ", name, " for calling the ", name, " servince at addr (TCP).") - p.P("func Dial", name, "(addr string) (", name, ", error) {") - p.In() - p.P(`conn, err := net.Dial("tcp", addr)`) - p.P("if err != nil {") - p.In() - p.P("return nil, err") - p.Out() - p.P("}") - p.P("return New", name, "Client(conn), nil") - p.Out() - p.P("}") - p.P() p.P("// ListenAndServe", name, " serves the given ", name, " backend implementation") p.P("// on all connections accepted as a result of listening on addr (TCP).") p.P("func ListenAndServe", name, "(addr string, backend ", name, ") error {") From 7597ed7000f412f50f40af1d42b5e4046f637e02 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Wed, 19 Aug 2015 14:54:26 -0700 Subject: [PATCH 06/15] Include codec, or course --- plugin/plugin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/plugin.go b/plugin/plugin.go index e09de69..38b5ca9 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -84,8 +84,8 @@ func (p *Plugin) GenerateImports(file *generator.FileDescriptor) { if p.rpcImports { p.P(`import "net"`) p.P(`import "net/rpc"`) + p.P(`import "github.com/bradhe/go-rpcgen/codec"`) p.P(`import "github.com/bradhe/go-rpcgen/client"`) - p.P(`import "errors"`) } if p.webImports { p.P(`import "net/url"`) From 2094fc5942536c27dad58e60cbacdd1937098f7d Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Tue, 20 Oct 2015 16:21:49 -0700 Subject: [PATCH 07/15] Add logging to go-rpcgen clients (conditionally). --- client/client.go | 3 ++- client/logging.go | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) create mode 100644 client/logging.go diff --git a/client/client.go b/client/client.go index 1c86028..29c1fb5 100644 --- a/client/client.go +++ b/client/client.go @@ -75,6 +75,7 @@ func (c *Client) create() interface{} { // If the connection failed, there's nothin' we can really do about it. if err != nil { + logMessage("[go-rpcgen/client] Failed to open connection to %s: %v", c.addr, err) return nil } @@ -87,7 +88,7 @@ func (c *Client) Call(serviceMethod string, args interface{}, reply interface{}) return ErrClosed } - // Signal that something is going on. + // Signal that something is goig on. c.wg.Add(1) defer c.wg.Done() diff --git a/client/logging.go b/client/logging.go new file mode 100644 index 0000000..3eeba90 --- /dev/null +++ b/client/logging.go @@ -0,0 +1,18 @@ +package client + +import ( + "log" +) + +var ( + // Set this to something non-null if you want log messages to flow. + Logger *log.Logger +) + +func logMessage(format string, args ...interface{}) { + if Logger == nil { + return + } + + Logger.Printf(format, args...) +} From 64cd301fa0b7f2048ed54e3cdb658f8bebe36609 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Tue, 20 Oct 2015 18:58:05 -0700 Subject: [PATCH 08/15] Add support for timing out gracefully. --- client/client.go | 33 ++++++++++++++++++++++++--------- client/utils.go | 15 +++++++++++++++ 2 files changed, 39 insertions(+), 9 deletions(-) create mode 100644 client/utils.go diff --git a/client/client.go b/client/client.go index 29c1fb5..1cda1bc 100644 --- a/client/client.go +++ b/client/client.go @@ -11,15 +11,21 @@ import ( "time" ) +const ( + DefaultRetryCount = 6 + + // Default number of seconds to timeout for all connections + DefaultTimeout = 200 * time.Millisecond +) + var ( ErrConnectionFailure = errors.New("failed to connect") ErrClosed = errors.New("closed") ErrInvalidPoolObject = errors.New("invalid pool object") ErrPermanentlyShutdown = errors.New("permenantly shutdown") -) -const ( - DefaultRetryCount = 6 + // Number of seconds to use when timing out. + ConnectionTimeout = DefaultTimeout ) type Client struct { @@ -71,15 +77,24 @@ func (c *Client) create() interface{} { return nil } - conn, err := net.Dial("tcp", c.addr) + for { + conn, err := net.DialTimeout("tcp", c.addr, ConnectionTimeout) - // If the connection failed, there's nothin' we can really do about it. - if err != nil { - logMessage("[go-rpcgen/client] Failed to open connection to %s: %v", c.addr, err) - return nil + // If the connection failed, there's nothin' we can really do about it. + if IsTimeoutError(err) { + logMessage("[go-rpcgen/client] Connection to %s timed out. Retrying.", c.addr) + continue + } + + if err != nil { + logMessage("[go-rpcgen/client] Failed to open connection to %s: %v", c.addr, err) + return nil + } + + return rpc.NewClientWithCodec(codec.NewClientCodec(conn)) } - return rpc.NewClientWithCodec(codec.NewClientCodec(conn)) + panic("uncreachable") } func (c *Client) Call(serviceMethod string, args interface{}, reply interface{}) error { diff --git a/client/utils.go b/client/utils.go new file mode 100644 index 0000000..071e518 --- /dev/null +++ b/client/utils.go @@ -0,0 +1,15 @@ +package client + +import ( + "strings" +) + +// Given an error, makes a crappy attempt to determine if the error is a +// timeout error. +func IsTimeoutError(err error) bool { + if err == nil { + return false + } + + return strings.HasSuffix(err.Error(), "connection timed out") +} From 6b23c3e56fe6b40c75015b06fc9715aaa199b46b Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Tue, 20 Oct 2015 20:54:50 -0700 Subject: [PATCH 09/15] Seccond timeout error type. WTF? --- client/utils.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/client/utils.go b/client/utils.go index 071e518..5a5c35f 100644 --- a/client/utils.go +++ b/client/utils.go @@ -11,5 +11,13 @@ func IsTimeoutError(err error) bool { return false } - return strings.HasSuffix(err.Error(), "connection timed out") + if strings.HasSuffix(err.Error(), "timed out") { + return true + } + + if strings.HasSuffix(err.Error(), "i/o timeout") { + return true + } + + return false } From 6e8f09953f834dc78a1f5656af8143d5f42114cc Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Tue, 27 Oct 2015 22:02:02 -0700 Subject: [PATCH 10/15] Add channel-based connection pooling. --- client/client.go | 51 +++++++++++++------------------ client/connection_pool.go | 63 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 31 deletions(-) create mode 100644 client/connection_pool.go diff --git a/client/client.go b/client/client.go index 1cda1bc..9c7298f 100644 --- a/client/client.go +++ b/client/client.go @@ -26,13 +26,16 @@ var ( // Number of seconds to use when timing out. ConnectionTimeout = DefaultTimeout + + // Number of clients to open for this connection pool. + PoolSize = 100 ) type Client struct { wg sync.WaitGroup addr string - pool sync.Pool + pool *ConnectionPool shutdown int32 } @@ -46,6 +49,8 @@ func backoff(i int) time.Duration { } func (c *Client) Close() { + logMessage("[go-rpcgen/client] Closing RPC client.") + // If someone else called this, we'll just wait a bit for it all to close down. if atomic.LoadInt32(&c.shutdown) > 0 { c.wg.Wait() @@ -60,41 +65,24 @@ func (c *Client) Close() { // Now let's close down all of the connections in the poo in the pool. for { - client, ok := c.pool.Get().(*rpc.Client) - - if !ok { - // we closed them all - break - } - - client.Close() + c.pool.Close() } } -func (c *Client) create() interface{} { +func (c *Client) create() (*rpc.Client, error) { // If the service is shutdown, let's wait to kill it all. if atomic.LoadInt32(&c.shutdown) > 1 { - return nil + return nil, nil } - for { - conn, err := net.DialTimeout("tcp", c.addr, ConnectionTimeout) + conn, err := net.DialTimeout("tcp", c.addr, ConnectionTimeout) - // If the connection failed, there's nothin' we can really do about it. - if IsTimeoutError(err) { - logMessage("[go-rpcgen/client] Connection to %s timed out. Retrying.", c.addr) - continue - } - - if err != nil { - logMessage("[go-rpcgen/client] Failed to open connection to %s: %v", c.addr, err) - return nil - } - - return rpc.NewClientWithCodec(codec.NewClientCodec(conn)) + if err != nil { + return nil, err } - panic("uncreachable") + co := rpc.NewClientWithCodec(codec.NewClientCodec(conn)) + return co, nil } func (c *Client) Call(serviceMethod string, args interface{}, reply interface{}) error { @@ -111,11 +99,7 @@ func (c *Client) Call(serviceMethod string, args interface{}, reply interface{}) var retry int for { - client, ok := c.pool.Get().(*rpc.Client) - - if !ok { - return ErrInvalidPoolObject - } + client := c.pool.Get() if client == nil { return ErrConnectionFailure @@ -131,6 +115,8 @@ func (c *Client) Call(serviceMethod string, args interface{}, reply interface{}) // If we got here, let's see what type of error it is. if err == rpc.ErrShutdown { + client.Close() + retry += 1 if retry > DefaultRetryCount { @@ -140,6 +126,8 @@ func (c *Client) Call(serviceMethod string, args interface{}, reply interface{}) // Let's try again! time.Sleep(backoff(retry)) } else { + client.Close() + // This means err != nil, so we just report the error. return err } @@ -176,6 +164,7 @@ func (c *Client) Go(serviceMethod string, args interface{}, reply interface{}, d func NewClient(addr string) *Client { c := new(Client) c.addr = addr + c.pool = NewConnectionPool(PoolSize) c.pool.New = c.create return c } diff --git a/client/connection_pool.go b/client/connection_pool.go new file mode 100644 index 0000000..d19fbfc --- /dev/null +++ b/client/connection_pool.go @@ -0,0 +1,63 @@ +package client + +import ( + "net/rpc" +) + +type ConnectionPool struct { + conns chan *rpc.Client + + New func() (*rpc.Client, error) +} + +func (p *ConnectionPool) open() *rpc.Client { + for { + c, err := p.New() + + if IsTimeoutError(err) { + continue + } + + if err != nil { + logMessage("[go-rpcgen/connection_pool] Error opening connection. %v", err) + return nil + } + + return c + } + + panic("unreachable") +} + +func (p *ConnectionPool) Get() *rpc.Client { + select { + case c := <-p.conns: + return c + default: + return p.open() + } +} + +func (p *ConnectionPool) Put(c *rpc.Client) { + select { + case p.conns <- c: + // Do nothing. + return + default: + c.Close() + } +} + +func (p *ConnectionPool) Close() { + close(p.conns) + + for c := range p.conns { + c.Close() + } +} + +func NewConnectionPool(capacity int) *ConnectionPool { + p := new(ConnectionPool) + p.conns = make(chan *rpc.Client, capacity) + return p +} From 29c189217735b809fa246bf4c86ae63b2c71018a Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Wed, 4 Nov 2015 10:35:40 -0800 Subject: [PATCH 11/15] Add "test" target to Makefile --- Makefile | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index fda5578..e531a60 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,11 @@ build: $(GO) build ./... setup: - $(GO) get ./... + $(GO) get -d ./... + +test: + $(GO) generate ./... + $(GO) test ./... install: $(GO) install -v ./... From dab49a8565b35d178dc07f10f092833c551eed3e Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Wed, 4 Nov 2015 10:35:58 -0800 Subject: [PATCH 12/15] Updated test fixture. --- plugin/rpc_test.go | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/plugin/rpc_test.go b/plugin/rpc_test.go index 67a58cc..b124e5c 100644 --- a/plugin/rpc_test.go +++ b/plugin/rpc_test.go @@ -40,19 +40,19 @@ func TestGenerateRPCStubs(t *testing.T) { Output: ` // internal wrapper for type-safe RPC calling type rpcMathClient struct { - *rpc.Client + *client.Client } -func (this rpcMathClient) Sqrt(in *SqrtInput, out *SqrtOutput) error { - return this.Call("Math.Sqrt", in, out) +func (c rpcMathClient) Sqrt(in *SqrtInput, out *SqrtOutput) error { + return c.Call("Math.Sqrt", in, out) } -func (this rpcMathClient) Add(in *AddInput, out *AddOutput) error { - return this.Call("Math.Add", in, out) +func (c rpcMathClient) Add(in *AddInput, out *AddOutput) error { + return c.Call("Math.Add", in, out) } // NewMathClient returns an *rpc.Client wrapper for calling the methods of // Math remotely. -func NewMathClient(conn net.Conn) Math { - return rpcMathClient{rpc.NewClientWithCodec(codec.NewClientCodec(conn))} +func NewMathClient(addr string) Math { + return rpcMathClient{client.NewClient(addr)} } // ServeMath serves the given Math backend implementation on conn. @@ -65,15 +65,6 @@ func ServeMath(conn net.Conn, backend Math) error { return nil } -// DialMath returns a Math for calling the Math servince at addr (TCP). -func DialMath(addr string) (Math, error) { - conn, err := net.Dial("tcp", addr) - if err != nil { - return nil, err - } - return NewMathClient(conn), nil -} - // ListenAndServeMath serves the given Math backend implementation // on all connections accepted as a result of listening on addr (TCP). func ListenAndServeMath(addr string, backend Math) error { From c7b1b8f4cef70ea0fc617d14355a340f8dc3362c Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Wed, 4 Nov 2015 10:36:07 -0800 Subject: [PATCH 13/15] Add generate step and all that. --- example_ae/client/client.go | 1 + example_ae/whoami/whoami.pb.go | 109 +++++++++++++++++++++++++++++++-- 2 files changed, 106 insertions(+), 4 deletions(-) diff --git a/example_ae/client/client.go b/example_ae/client/client.go index f25df11..45a559a 100644 --- a/example_ae/client/client.go +++ b/example_ae/client/client.go @@ -1,3 +1,4 @@ +//go:generate protoc --proto_path=../whoami --go_out=plugins=go-rpcgen:../whoami ../whoami/whoami.proto // Copyright 2013 Google. All rights reserved. // // Use of this source code is governed by a BSD-style diff --git a/example_ae/whoami/whoami.pb.go b/example_ae/whoami/whoami.pb.go index f2be81b..5d4967a 100644 --- a/example_ae/whoami/whoami.pb.go +++ b/example_ae/whoami/whoami.pb.go @@ -1,14 +1,12 @@ -// +build !appengine - // Code generated by protoc-gen-go. -// source: whoami/whoami.proto +// source: whoami.proto // DO NOT EDIT! /* Package whoami is a generated protocol buffer package. It is generated from these files: - whoami/whoami.proto + whoami.proto It has these top-level messages: Empty @@ -19,6 +17,14 @@ package whoami import proto "github.com/golang/protobuf/proto" import math "math" +import "net" +import "net/rpc" +import "github.com/bradhe/go-rpcgen/codec" +import "github.com/bradhe/go-rpcgen/client" +import "net/url" +import "net/http" +import "github.com/bradhe/go-rpcgen/webrpc" + // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal var _ = math.Inf @@ -49,3 +55,98 @@ func (m *YouAre) GetIpAddr() string { func init() { } + +// WhoamiService is an interface satisfied by the generated client and +// which must be implemented by the object wrapped by the server. +type WhoamiService interface { + Whoami(in *Empty, out *YouAre) error +} + +// internal wrapper for type-safe RPC calling +type rpcWhoamiServiceClient struct { + *client.Client +} + +func (c rpcWhoamiServiceClient) Whoami(in *Empty, out *YouAre) error { + return c.Call("WhoamiService.Whoami", in, out) +} + +// NewWhoamiServiceClient returns an *rpc.Client wrapper for calling the methods of +// WhoamiService remotely. +func NewWhoamiServiceClient(addr string) WhoamiService { + return rpcWhoamiServiceClient{client.NewClient(addr)} +} + +// ServeWhoamiService serves the given WhoamiService backend implementation on conn. +func ServeWhoamiService(conn net.Conn, backend WhoamiService) error { + srv := rpc.NewServer() + if err := srv.RegisterName("WhoamiService", backend); err != nil { + return err + } + srv.ServeCodec(codec.NewServerCodec(conn)) + return nil +} + +// ListenAndServeWhoamiService serves the given WhoamiService backend implementation +// on all connections accepted as a result of listening on addr (TCP). +func ListenAndServeWhoamiService(addr string, backend WhoamiService) error { + clients, err := net.Listen("tcp", addr) + if err != nil { + return err + } + srv := rpc.NewServer() + if err := srv.RegisterName("WhoamiService", backend); err != nil { + return err + } + for { + conn, err := clients.Accept() + if err != nil { + return err + } + go srv.ServeCodec(codec.NewServerCodec(conn)) + } + panic("unreachable") +} + +// WhoamiServiceWeb is the web-based RPC version of the interface which +// must be implemented by the object wrapped by the webrpc server. +type WhoamiServiceWeb interface { + Whoami(r *http.Request, in *Empty, out *YouAre) error +} + +// internal wrapper for type-safe webrpc calling +type rpcWhoamiServiceWebClient struct { + remote *url.URL + protocol webrpc.Protocol +} + +func (this rpcWhoamiServiceWebClient) Whoami(in *Empty, out *YouAre) error { + return webrpc.Post(this.protocol, this.remote, "/WhoamiService/Whoami", in, out) +} + +// Register a WhoamiServiceWeb implementation with the given webrpc ServeMux. +// If mux is nil, the default webrpc.ServeMux is used. +func RegisterWhoamiServiceWeb(this WhoamiServiceWeb, mux webrpc.ServeMux) error { + if mux == nil { + mux = webrpc.DefaultServeMux + } + if err := mux.Handle("/WhoamiService/Whoami", func(c *webrpc.Call) error { + in, out := new(Empty), new(YouAre) + if err := c.ReadRequest(in); err != nil { + return err + } + if err := this.Whoami(c.Request, in, out); err != nil { + return err + } + return c.WriteResponse(out) + }); err != nil { + return err + } + return nil +} + +// NewWhoamiServiceWebClient returns a webrpc wrapper for calling the methods of WhoamiService +// remotely via the web. The remote URL is the base URL of the webrpc server. +func NewWhoamiServiceWebClient(pro webrpc.Protocol, remote *url.URL) WhoamiService { + return rpcWhoamiServiceWebClient{remote, pro} +} From c1971eb5b7c9ad26d40c93435f7d7eeaab7ab396 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Wed, 4 Nov 2015 10:40:55 -0800 Subject: [PATCH 14/15] Cleanup --- codec/codec.go | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/codec/codec.go b/codec/codec.go index 59fd576..9e41223 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -113,21 +113,22 @@ func (s *ServerCodec) ReadRequestBody(obj interface{}) error { // the response was invalid, the size of the body of the resp is reported as // having size zero and is not sent. func (s *ServerCodec) WriteResponse(resp *rpc.Response, obj interface{}) error { - pb, ok := obj.(proto.Message) - if !ok { - fmt.Errorf("%T does not implement proto.Message", obj) - } - - shouldWriteResponse := ok + // If we can't actually do the cast it means that (most likely) obj is nil, + // indicating that we don't actually have a response to send back (e.g. it's + // a "void" method call). This is OK because the metadata about the response + // object will indicate that everything is A-OK. + pb, shouldWriteResponse := obj.(proto.Message) // Write the header header := wire.Header{ Method: &resp.ServiceMethod, Seq: &resp.Seq, } + if resp.Error != "" { header.Error = &resp.Error } + if err := WriteProto(s.w, &header); err != nil { return nil } @@ -163,27 +164,22 @@ func NewClientCodec(conn net.Conn) *ClientCodec { // WriteRequest writes the appropriate header protobuf and the given protobuf // to the connection (each prefixed with a uvarint indicating its size). func (c *ClientCodec) WriteRequest(req *rpc.Request, obj interface{}) error { - pb, ok := obj.(proto.Message) - if !ok { - fmt.Errorf("%T does not implement proto.Message", obj) - } - - shouldWriteBody := ok + pb, hasMessage := obj.(proto.Message) // Write the header header := wire.Header{ Method: &req.ServiceMethod, Seq: &req.Seq, } + if err := WriteProto(c.w, &header); err != nil { return err } - if shouldWriteBody { + if hasMessage { return WriteProto(c.w, pb) } - // Nothing to see here. return nil } @@ -215,11 +211,8 @@ func (c *ClientCodec) ReadResponseHeader(resp *rpc.Response) error { // is zero, nothing is done (this indicates an error condition, which was // encapsulated in the header) func (c *ClientCodec) ReadResponseBody(obj interface{}) error { - if obj == nil { - - } - pb, ok := obj.(proto.Message) + if !ok { return fmt.Errorf("%T does not implement proto.Message", obj) } From 20751a87170a811952a71a39fe2a815b242b6e49 Mon Sep 17 00:00:00 2001 From: Brad Heller Date: Tue, 15 Dec 2015 17:50:27 -0800 Subject: [PATCH 15/15] Add some logging to codecs for instances where THAT fails. --- codec/codec.go | 5 +++++ codec/logging.go | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+) create mode 100644 codec/logging.go diff --git a/codec/codec.go b/codec/codec.go index 9e41223..3b78f1f 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -39,11 +39,13 @@ func ReadProto(r ProtoReader, pb proto.Message) error { if err != nil { return err } + // TODO max size? buf := make([]byte, size) if _, err := io.ReadFull(r, buf); err != nil { return err } + return proto.Unmarshal(buf, pb) } @@ -57,15 +59,18 @@ func WriteProto(w io.Writer, pb proto.Message) error { // Marshal the protobuf data, err := proto.Marshal(pb) if err != nil { + logMessage("[go-rpcgen/codec] Marshal failed. %v", err) return err } // Write the size and data n := binary.PutUvarint(size[:], uint64(len(data))) if _, err = w.Write(size[:n]); err != nil { + logMessage("[go-rpcgen/codec] Writing message length failed. %v", err) return err } if _, err = w.Write(data); err != nil { + logMessage("[go-rpcgen/codec] Writing message data failed. %v", err) return err } return nil diff --git a/codec/logging.go b/codec/logging.go new file mode 100644 index 0000000..a58f319 --- /dev/null +++ b/codec/logging.go @@ -0,0 +1,18 @@ +package codec + +import ( + "log" +) + +var ( + // Set this to something non-null if you want log messages to flow. + Logger *log.Logger +) + +func logMessage(format string, args ...interface{}) { + if Logger == nil { + return + } + + Logger.Printf(format, args...) +}