diff --git a/tests/Makefile b/tests/Makefile index 5b8f51ca..64b8ae23 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -1,7 +1,7 @@ # Tests that should be run each time -RUNTESTS := qf ordering metadata tls unresponsive dummy oneway config correctable +RUNTESTS := qf ordering metadata tls unresponsive dummy oneway config correctable all2all -.PHONY: all qf ordering metadata tls unresponsive dummy oneway config correctable +.PHONY: all qf ordering metadata tls unresponsive dummy oneway config correctable all2all all: $(RUNTESTS) @@ -23,6 +23,8 @@ config: config/config.pb.go config/config_gorums.pb.go correctable: correctable/correctable.pb.go correctable/correctable_gorums.pb.go +all2all: all2all/all2all.pb.go all2all/all2all_gorums.pb.go + %.pb.go : %.proto @protoc -I=..:. --go_out=paths=source_relative:. $< diff --git a/tests/all2all/all2all.pb.go b/tests/all2all/all2all.pb.go new file mode 100644 index 00000000..2cafba4b --- /dev/null +++ b/tests/all2all/all2all.pb.go @@ -0,0 +1,224 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.19.4 +// source: all2all/all2all.proto + +package all2all + +import ( + _ "github.com/relab/gorums" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type WriteResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + New bool `protobuf:"varint,1,opt,name=New,proto3" json:"New,omitempty"` +} + +func (x *WriteResponse) Reset() { + *x = WriteResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_all2all_all2all_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WriteResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteResponse) ProtoMessage() {} + +func (x *WriteResponse) ProtoReflect() protoreflect.Message { + mi := &file_all2all_all2all_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteResponse.ProtoReflect.Descriptor instead. +func (*WriteResponse) Descriptor() ([]byte, []int) { + return file_all2all_all2all_proto_rawDescGZIP(), []int{0} +} + +func (x *WriteResponse) GetNew() bool { + if x != nil { + return x.New + } + return false +} + +type WriteRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Key string `protobuf:"bytes,1,opt,name=Key,proto3" json:"Key,omitempty"` + Value string `protobuf:"bytes,2,opt,name=Value,proto3" json:"Value,omitempty"` +} + +func (x *WriteRequest) Reset() { + *x = WriteRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_all2all_all2all_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WriteRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteRequest) ProtoMessage() {} + +func (x *WriteRequest) ProtoReflect() protoreflect.Message { + mi := &file_all2all_all2all_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteRequest.ProtoReflect.Descriptor instead. +func (*WriteRequest) Descriptor() ([]byte, []int) { + return file_all2all_all2all_proto_rawDescGZIP(), []int{1} +} + +func (x *WriteRequest) GetKey() string { + if x != nil { + return x.Key + } + return "" +} + +func (x *WriteRequest) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +var File_all2all_all2all_proto protoreflect.FileDescriptor + +var file_all2all_all2all_proto_rawDesc = []byte{ + 0x0a, 0x15, 0x61, 0x6c, 0x6c, 0x32, 0x61, 0x6c, 0x6c, 0x2f, 0x61, 0x6c, 0x6c, 0x32, 0x61, 0x6c, + 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x61, 0x6c, 0x6c, 0x32, 0x61, 0x6c, 0x6c, + 0x1a, 0x0c, 0x67, 0x6f, 0x72, 0x75, 0x6d, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x21, + 0x0a, 0x0d, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x10, 0x0a, 0x03, 0x4e, 0x65, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x4e, 0x65, + 0x77, 0x22, 0x36, 0x0a, 0x0c, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x10, 0x0a, 0x03, 0x4b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, + 0x4b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x32, 0x48, 0x0a, 0x06, 0x53, 0x61, 0x6d, + 0x70, 0x6c, 0x65, 0x12, 0x3e, 0x0a, 0x07, 0x57, 0x72, 0x69, 0x74, 0x65, 0x51, 0x43, 0x12, 0x15, + 0x2e, 0x61, 0x6c, 0x6c, 0x32, 0x61, 0x6c, 0x6c, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x61, 0x6c, 0x6c, 0x32, 0x61, 0x6c, 0x6c, 0x2e, + 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x04, 0xa0, + 0xb5, 0x18, 0x01, 0x42, 0x27, 0x5a, 0x25, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x72, 0x65, 0x6c, 0x61, 0x62, 0x2f, 0x67, 0x6f, 0x72, 0x75, 0x6d, 0x73, 0x2f, 0x74, + 0x65, 0x73, 0x74, 0x73, 0x2f, 0x61, 0x6c, 0x6c, 0x32, 0x61, 0x6c, 0x6c, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_all2all_all2all_proto_rawDescOnce sync.Once + file_all2all_all2all_proto_rawDescData = file_all2all_all2all_proto_rawDesc +) + +func file_all2all_all2all_proto_rawDescGZIP() []byte { + file_all2all_all2all_proto_rawDescOnce.Do(func() { + file_all2all_all2all_proto_rawDescData = protoimpl.X.CompressGZIP(file_all2all_all2all_proto_rawDescData) + }) + return file_all2all_all2all_proto_rawDescData +} + +var file_all2all_all2all_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_all2all_all2all_proto_goTypes = []interface{}{ + (*WriteResponse)(nil), // 0: all2all.WriteResponse + (*WriteRequest)(nil), // 1: all2all.WriteRequest +} +var file_all2all_all2all_proto_depIdxs = []int32{ + 1, // 0: all2all.Sample.WriteQC:input_type -> all2all.WriteRequest + 0, // 1: all2all.Sample.WriteQC:output_type -> all2all.WriteResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_all2all_all2all_proto_init() } +func file_all2all_all2all_proto_init() { + if File_all2all_all2all_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_all2all_all2all_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*WriteResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_all2all_all2all_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*WriteRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_all2all_all2all_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_all2all_all2all_proto_goTypes, + DependencyIndexes: file_all2all_all2all_proto_depIdxs, + MessageInfos: file_all2all_all2all_proto_msgTypes, + }.Build() + File_all2all_all2all_proto = out.File + file_all2all_all2all_proto_rawDesc = nil + file_all2all_all2all_proto_goTypes = nil + file_all2all_all2all_proto_depIdxs = nil +} diff --git a/tests/all2all/all2all.proto b/tests/all2all/all2all.proto new file mode 100644 index 00000000..2415c7f0 --- /dev/null +++ b/tests/all2all/all2all.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; + +package all2all; + +option go_package = "github.com/relab/gorums/tests/all2all"; + +import "gorums.proto"; + +service Sample { + rpc WriteQC(WriteRequest) returns (WriteResponse) { + option (gorums.quorumcall) = true; + } +} + +message WriteResponse { + bool New = 1; +} +message WriteRequest { + string Key = 1; + string Value = 2; +} diff --git a/tests/all2all/all2all_gorums.pb.go b/tests/all2all/all2all_gorums.pb.go new file mode 100644 index 00000000..12f94016 --- /dev/null +++ b/tests/all2all/all2all_gorums.pb.go @@ -0,0 +1,198 @@ +// Code generated by protoc-gen-gorums. DO NOT EDIT. +// versions: +// protoc-gen-gorums v0.7.0-devel +// protoc v3.19.4 +// source: all2all/all2all.proto + +package all2all + +import ( + context "context" + fmt "fmt" + gorums "github.com/relab/gorums" + encoding "google.golang.org/grpc/encoding" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = gorums.EnforceVersion(7 - gorums.MinVersion) + // Verify that the gorums runtime is sufficiently up-to-date. + _ = gorums.EnforceVersion(gorums.MaxVersion - 7) +) + +// A Configuration represents a static set of nodes on which quorum remote +// procedure calls may be invoked. +type Configuration struct { + gorums.RawConfiguration + nodes []*Node + qspec QuorumSpec +} + +// ConfigurationFromRaw returns a new Configuration from the given raw configuration and QuorumSpec. +// +// This function may for example be used to "clone" a configuration but install a different QuorumSpec: +// cfg1, err := mgr.NewConfiguration(qspec1, opts...) +// cfg2 := ConfigurationFromRaw(cfg1.RawConfig, qspec2) +func ConfigurationFromRaw(rawCfg gorums.RawConfiguration, qspec QuorumSpec) *Configuration { + // return an error if the QuorumSpec interface is not empty and no implementation was provided. + var test interface{} = struct{}{} + if _, empty := test.(QuorumSpec); !empty && qspec == nil { + panic("QuorumSpec may not be nil") + } + return &Configuration{ + RawConfiguration: rawCfg, + qspec: qspec, + } +} + +// Nodes returns a slice of each available node. IDs are returned in the same +// order as they were provided in the creation of the Manager. +// +// NOTE: mutating the returned slice is not supported. +func (c *Configuration) Nodes() []*Node { + if c.nodes == nil { + c.nodes = make([]*Node, 0, c.Size()) + for _, n := range c.RawConfiguration { + c.nodes = append(c.nodes, &Node{n}) + } + } + return c.nodes +} + +// And returns a NodeListOption that can be used to create a new configuration combining c and d. +func (c Configuration) And(d *Configuration) gorums.NodeListOption { + return c.RawConfiguration.And(d.RawConfiguration) +} + +// Except returns a NodeListOption that can be used to create a new configuration +// from c without the nodes in rm. +func (c Configuration) Except(rm *Configuration) gorums.NodeListOption { + return c.RawConfiguration.Except(rm.RawConfiguration) +} + +func init() { + if encoding.GetCodec(gorums.ContentSubtype) == nil { + encoding.RegisterCodec(gorums.NewCodec()) + } +} + +// Manager maintains a connection pool of nodes on +// which quorum calls can be performed. +type Manager struct { + *gorums.RawManager +} + +// NewManager returns a new Manager for managing connection to nodes added +// to the manager. This function accepts manager options used to configure +// various aspects of the manager. +func NewManager(opts ...gorums.ManagerOption) (mgr *Manager) { + mgr = &Manager{} + mgr.RawManager = gorums.NewRawManager(opts...) + return mgr +} + +// NewConfiguration returns a configuration based on the provided list of nodes (required) +// and an optional quorum specification. The QuorumSpec is necessary for call types that +// must process replies. For configurations only used for unicast or multicast call types, +// a QuorumSpec is not needed. The QuorumSpec interface is also a ConfigOption. +// Nodes can be supplied using WithNodeMap or WithNodeList, or WithNodeIDs. +// A new configuration can also be created from an existing configuration, +// using the And, WithNewNodes, Except, and WithoutNodes methods. +func (m *Manager) NewConfiguration(opts ...gorums.ConfigOption) (c *Configuration, err error) { + if len(opts) < 1 || len(opts) > 2 { + return nil, fmt.Errorf("wrong number of options: %d", len(opts)) + } + c = &Configuration{} + for _, opt := range opts { + switch v := opt.(type) { + case gorums.NodeListOption: + c.RawConfiguration, err = gorums.NewRawConfiguration(m.RawManager, v) + if err != nil { + return nil, err + } + case QuorumSpec: + // Must be last since v may match QuorumSpec if it is interface{} + c.qspec = v + default: + return nil, fmt.Errorf("unknown option type: %v", v) + } + } + // return an error if the QuorumSpec interface is not empty and no implementation was provided. + var test interface{} = struct{}{} + if _, empty := test.(QuorumSpec); !empty && c.qspec == nil { + return nil, fmt.Errorf("missing required QuorumSpec") + } + return c, nil +} + +// Nodes returns a slice of available nodes on this manager. +// IDs are returned in the order they were added at creation of the manager. +func (m *Manager) Nodes() []*Node { + gorumsNodes := m.RawManager.Nodes() + nodes := make([]*Node, 0, len(gorumsNodes)) + for _, n := range gorumsNodes { + nodes = append(nodes, &Node{n}) + } + return nodes +} + +// Node encapsulates the state of a node on which a remote procedure call +// can be performed. +type Node struct { + *gorums.RawNode +} + +// QuorumSpec is the interface of quorum functions for Sample. +type QuorumSpec interface { + gorums.ConfigOption + + // WriteQCQF is the quorum function for the WriteQC + // quorum call method. The in parameter is the request object + // supplied to the WriteQC method at call time, and may or may not + // be used by the quorum function. If the in parameter is not needed + // you should implement your quorum function with '_ *WriteRequest'. + WriteQCQF(in *WriteRequest, replies map[uint32]*WriteResponse) (*WriteResponse, bool) +} + +// WriteQC is a quorum call invoked on all nodes in configuration c, +// with the same argument in, and returns a combined result. +func (c *Configuration) WriteQC(ctx context.Context, in *WriteRequest) (resp *WriteResponse, err error) { + cd := gorums.QuorumCallData{ + Message: in, + Method: "all2all.Sample.WriteQC", + } + cd.QuorumFunction = func(req protoreflect.ProtoMessage, replies map[uint32]protoreflect.ProtoMessage) (protoreflect.ProtoMessage, bool) { + r := make(map[uint32]*WriteResponse, len(replies)) + for k, v := range replies { + r[k] = v.(*WriteResponse) + } + return c.qspec.WriteQCQF(req.(*WriteRequest), r) + } + + res, err := c.RawConfiguration.QuorumCall(ctx, cd) + if err != nil { + return nil, err + } + return res.(*WriteResponse), err +} + +// Sample is the server-side API for the Sample Service +type Sample interface { + WriteQC(ctx gorums.ServerCtx, request *WriteRequest) (response *WriteResponse, err error) +} + +func RegisterSampleServer(srv *gorums.Server, impl Sample) { + srv.RegisterHandler("all2all.Sample.WriteQC", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { + req := in.Message.(*WriteRequest) + defer ctx.Release() + resp, err := impl.WriteQC(ctx, req) + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) + }) +} + +type internalWriteResponse struct { + nid uint32 + reply *WriteResponse + err error +} diff --git a/tests/all2all/alltoallconfig_test.go b/tests/all2all/alltoallconfig_test.go new file mode 100644 index 00000000..4c2c05a6 --- /dev/null +++ b/tests/all2all/alltoallconfig_test.go @@ -0,0 +1,134 @@ +package all2all + +import ( + "context" + "flag" + "fmt" + "net" + "testing" + "time" + + "github.com/relab/gorums" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +var replicaCount = flag.Int("replicas", 10, "number of replicas to create all-to-all communication") + +func TestAllToAllConfiguration(t *testing.T) { + replicas, err := createReplicas(*replicaCount) + if err != nil { + t.Fatal(err) + } + defer func() { + for _, replica := range replicas { + replica.stopServer() + } + }() + nodeMap := make(map[string]uint32) + for _, replica := range replicas { + nodeMap[replica.address] = replica.id + } + for _, replica := range replicas { + if err := replica.createConfiguration(nodeMap); err != nil { + t.Error(err) + } + } +} + +// createReplicas returns a slice of replicas. +// The function waits for all serve goroutines to start and one additional +// second to allow the servers to start before returning. +func createReplicas(numReplicas int) ([]*replica, error) { + replicas := make([]*replica, numReplicas) + errChan := make(chan error, numReplicas) + startedChan := make(chan struct{}, numReplicas) + for i := range replicas { + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, err + } + replica := &replica{ + address: lis.Addr().String(), + id: uint32(i), + lis: lis, + server: gorums.NewServer(), + } + RegisterSampleServer(replica.server, replica) + replicas[i] = replica + go func() { + startedChan <- struct{}{} + if err := replica.serve(); err != nil { + errChan <- fmt.Errorf("failed to serve at %q: %w", replica.address, err) + } + }() + } + for range replicas { + <-startedChan + } + + select { + case err := <-errChan: + return nil, err + case <-time.After(1000 * time.Millisecond): + // slept for a bit to allow replica serve goroutines to fail + } + return replicas, nil +} + +type qspec struct{} + +func (q qspec) WriteQCQF(in *WriteRequest, replies map[uint32]*WriteResponse) (*WriteResponse, bool) { + return &WriteResponse{New: true}, true +} + +type replica struct { + address string + lis net.Listener + id uint32 + server *gorums.Server // the replica's gRPC server + mgr *Manager // the replica's Gorums manager (used as a client) + conn *grpc.ClientConn +} + +func (r replica) WriteQC(ctx gorums.ServerCtx, request *WriteRequest) (response *WriteResponse, err error) { + return &WriteResponse{New: true}, nil +} + +// createConfiguration creates a configuration for the replica, allowing +// this replica to communicate with the other replicas in the configuration. +func (r *replica) createConfiguration(nodeMap map[string]uint32) error { + r.mgr = NewManager(gorums.WithDialTimeout(100*time.Millisecond), + gorums.WithGrpcDialOptions( + grpc.WithTransportCredentials(insecure.NewCredentials()), + ), + ) + _, err := r.mgr.NewConfiguration(qspec{}, gorums.WithNodeMap(nodeMap)) + return err +} + +func (r *replica) serve() error { + return r.server.Serve(r.lis) +} + +func (r *replica) stopServer() { + r.mgr.Close() + r.server.Stop() +} + +func TestGrpcDial(t *testing.T) { + replicas, err := createReplicas(*replicaCount) + if err != nil { + t.Fatal(err) + } + for _, replica := range replicas { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(10000)*time.Millisecond) + defer cancel() + replica.conn, err = grpc.DialContext(ctx, replica.address, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + t.Fatal(err) + } + } +}