diff --git a/core/comm/mock/new_semaphore.go b/core/comm/mock/new_semaphore.go new file mode 100644 index 00000000000..a76242f0d4e --- /dev/null +++ b/core/comm/mock/new_semaphore.go @@ -0,0 +1,108 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mock + +import ( + sync "sync" + + comm "github.com/hyperledger/fabric/core/comm" +) + +type NewSemaphore struct { + Stub func(int) comm.Semaphore + mutex sync.RWMutex + argsForCall []struct { + arg1 int + } + returns struct { + result1 comm.Semaphore + } + returnsOnCall map[int]struct { + result1 comm.Semaphore + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *NewSemaphore) Spy(arg1 int) comm.Semaphore { + fake.mutex.Lock() + ret, specificReturn := fake.returnsOnCall[len(fake.argsForCall)] + fake.argsForCall = append(fake.argsForCall, struct { + arg1 int + }{arg1}) + fake.recordInvocation("NewSemaphoreFunc", []interface{}{arg1}) + fake.mutex.Unlock() + if fake.Stub != nil { + return fake.Stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fake.returns.result1 +} + +func (fake *NewSemaphore) CallCount() int { + fake.mutex.RLock() + defer fake.mutex.RUnlock() + return len(fake.argsForCall) +} + +func (fake *NewSemaphore) Calls(stub func(int) comm.Semaphore) { + fake.mutex.Lock() + defer fake.mutex.Unlock() + fake.Stub = stub +} + +func (fake *NewSemaphore) ArgsForCall(i int) int { + fake.mutex.RLock() + defer fake.mutex.RUnlock() + return fake.argsForCall[i].arg1 +} + +func (fake *NewSemaphore) Returns(result1 comm.Semaphore) { + fake.mutex.Lock() + defer fake.mutex.Unlock() + fake.Stub = nil + fake.returns = struct { + result1 comm.Semaphore + }{result1} +} + +func (fake *NewSemaphore) ReturnsOnCall(i int, result1 comm.Semaphore) { + fake.mutex.Lock() + defer fake.mutex.Unlock() + fake.Stub = nil + if fake.returnsOnCall == nil { + fake.returnsOnCall = make(map[int]struct { + result1 comm.Semaphore + }) + } + fake.returnsOnCall[i] = struct { + result1 comm.Semaphore + }{result1} +} + +func (fake *NewSemaphore) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.mutex.RLock() + defer fake.mutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *NewSemaphore) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ comm.NewSemaphoreFunc = new(NewSemaphore).Spy diff --git a/core/comm/mock/semaphore.go b/core/comm/mock/semaphore.go new file mode 100644 index 00000000000..58a893d4715 --- /dev/null +++ b/core/comm/mock/semaphore.go @@ -0,0 +1,140 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mock + +import ( + context "context" + sync "sync" + + comm "github.com/hyperledger/fabric/core/comm" +) + +type Semaphore struct { + AcquireStub func(context.Context) error + acquireMutex sync.RWMutex + acquireArgsForCall []struct { + arg1 context.Context + } + acquireReturns struct { + result1 error + } + acquireReturnsOnCall map[int]struct { + result1 error + } + ReleaseStub func() + releaseMutex sync.RWMutex + releaseArgsForCall []struct { + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *Semaphore) Acquire(arg1 context.Context) error { + fake.acquireMutex.Lock() + ret, specificReturn := fake.acquireReturnsOnCall[len(fake.acquireArgsForCall)] + fake.acquireArgsForCall = append(fake.acquireArgsForCall, struct { + arg1 context.Context + }{arg1}) + fake.recordInvocation("Acquire", []interface{}{arg1}) + fake.acquireMutex.Unlock() + if fake.AcquireStub != nil { + return fake.AcquireStub(arg1) + } + if specificReturn { + return ret.result1 + } + fakeReturns := fake.acquireReturns + return fakeReturns.result1 +} + +func (fake *Semaphore) AcquireCallCount() int { + fake.acquireMutex.RLock() + defer fake.acquireMutex.RUnlock() + return len(fake.acquireArgsForCall) +} + +func (fake *Semaphore) AcquireCalls(stub func(context.Context) error) { + fake.acquireMutex.Lock() + defer fake.acquireMutex.Unlock() + fake.AcquireStub = stub +} + +func (fake *Semaphore) AcquireArgsForCall(i int) context.Context { + fake.acquireMutex.RLock() + defer fake.acquireMutex.RUnlock() + argsForCall := fake.acquireArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *Semaphore) AcquireReturns(result1 error) { + fake.acquireMutex.Lock() + defer fake.acquireMutex.Unlock() + fake.AcquireStub = nil + fake.acquireReturns = struct { + result1 error + }{result1} +} + +func (fake *Semaphore) AcquireReturnsOnCall(i int, result1 error) { + fake.acquireMutex.Lock() + defer fake.acquireMutex.Unlock() + fake.AcquireStub = nil + if fake.acquireReturnsOnCall == nil { + fake.acquireReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.acquireReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *Semaphore) Release() { + fake.releaseMutex.Lock() + fake.releaseArgsForCall = append(fake.releaseArgsForCall, struct { + }{}) + fake.recordInvocation("Release", []interface{}{}) + fake.releaseMutex.Unlock() + if fake.ReleaseStub != nil { + fake.ReleaseStub() + } +} + +func (fake *Semaphore) ReleaseCallCount() int { + fake.releaseMutex.RLock() + defer fake.releaseMutex.RUnlock() + return len(fake.releaseArgsForCall) +} + +func (fake *Semaphore) ReleaseCalls(stub func()) { + fake.releaseMutex.Lock() + defer fake.releaseMutex.Unlock() + fake.ReleaseStub = stub +} + +func (fake *Semaphore) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.acquireMutex.RLock() + defer fake.acquireMutex.RUnlock() + fake.releaseMutex.RLock() + defer fake.releaseMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *Semaphore) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ comm.Semaphore = new(Semaphore) diff --git a/core/comm/mock/server_stream.go b/core/comm/mock/server_stream.go new file mode 100644 index 00000000000..07c5ad4c220 --- /dev/null +++ b/core/comm/mock/server_stream.go @@ -0,0 +1,430 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mock + +import ( + context "context" + sync "sync" + + metadata "google.golang.org/grpc/metadata" +) + +type ServerStream struct { + ContextStub func() context.Context + contextMutex sync.RWMutex + contextArgsForCall []struct { + } + contextReturns struct { + result1 context.Context + } + contextReturnsOnCall map[int]struct { + result1 context.Context + } + RecvMsgStub func(interface{}) error + recvMsgMutex sync.RWMutex + recvMsgArgsForCall []struct { + arg1 interface{} + } + recvMsgReturns struct { + result1 error + } + recvMsgReturnsOnCall map[int]struct { + result1 error + } + SendHeaderStub func(metadata.MD) error + sendHeaderMutex sync.RWMutex + sendHeaderArgsForCall []struct { + arg1 metadata.MD + } + sendHeaderReturns struct { + result1 error + } + sendHeaderReturnsOnCall map[int]struct { + result1 error + } + SendMsgStub func(interface{}) error + sendMsgMutex sync.RWMutex + sendMsgArgsForCall []struct { + arg1 interface{} + } + sendMsgReturns struct { + result1 error + } + sendMsgReturnsOnCall map[int]struct { + result1 error + } + SetHeaderStub func(metadata.MD) error + setHeaderMutex sync.RWMutex + setHeaderArgsForCall []struct { + arg1 metadata.MD + } + setHeaderReturns struct { + result1 error + } + setHeaderReturnsOnCall map[int]struct { + result1 error + } + SetTrailerStub func(metadata.MD) + setTrailerMutex sync.RWMutex + setTrailerArgsForCall []struct { + arg1 metadata.MD + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *ServerStream) Context() context.Context { + fake.contextMutex.Lock() + ret, specificReturn := fake.contextReturnsOnCall[len(fake.contextArgsForCall)] + fake.contextArgsForCall = append(fake.contextArgsForCall, struct { + }{}) + fake.recordInvocation("Context", []interface{}{}) + fake.contextMutex.Unlock() + if fake.ContextStub != nil { + return fake.ContextStub() + } + if specificReturn { + return ret.result1 + } + fakeReturns := fake.contextReturns + return fakeReturns.result1 +} + +func (fake *ServerStream) ContextCallCount() int { + fake.contextMutex.RLock() + defer fake.contextMutex.RUnlock() + return len(fake.contextArgsForCall) +} + +func (fake *ServerStream) ContextCalls(stub func() context.Context) { + fake.contextMutex.Lock() + defer fake.contextMutex.Unlock() + fake.ContextStub = stub +} + +func (fake *ServerStream) ContextReturns(result1 context.Context) { + fake.contextMutex.Lock() + defer fake.contextMutex.Unlock() + fake.ContextStub = nil + fake.contextReturns = struct { + result1 context.Context + }{result1} +} + +func (fake *ServerStream) ContextReturnsOnCall(i int, result1 context.Context) { + fake.contextMutex.Lock() + defer fake.contextMutex.Unlock() + fake.ContextStub = nil + if fake.contextReturnsOnCall == nil { + fake.contextReturnsOnCall = make(map[int]struct { + result1 context.Context + }) + } + fake.contextReturnsOnCall[i] = struct { + result1 context.Context + }{result1} +} + +func (fake *ServerStream) RecvMsg(arg1 interface{}) error { + fake.recvMsgMutex.Lock() + ret, specificReturn := fake.recvMsgReturnsOnCall[len(fake.recvMsgArgsForCall)] + fake.recvMsgArgsForCall = append(fake.recvMsgArgsForCall, struct { + arg1 interface{} + }{arg1}) + fake.recordInvocation("RecvMsg", []interface{}{arg1}) + fake.recvMsgMutex.Unlock() + if fake.RecvMsgStub != nil { + return fake.RecvMsgStub(arg1) + } + if specificReturn { + return ret.result1 + } + fakeReturns := fake.recvMsgReturns + return fakeReturns.result1 +} + +func (fake *ServerStream) RecvMsgCallCount() int { + fake.recvMsgMutex.RLock() + defer fake.recvMsgMutex.RUnlock() + return len(fake.recvMsgArgsForCall) +} + +func (fake *ServerStream) RecvMsgCalls(stub func(interface{}) error) { + fake.recvMsgMutex.Lock() + defer fake.recvMsgMutex.Unlock() + fake.RecvMsgStub = stub +} + +func (fake *ServerStream) RecvMsgArgsForCall(i int) interface{} { + fake.recvMsgMutex.RLock() + defer fake.recvMsgMutex.RUnlock() + argsForCall := fake.recvMsgArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *ServerStream) RecvMsgReturns(result1 error) { + fake.recvMsgMutex.Lock() + defer fake.recvMsgMutex.Unlock() + fake.RecvMsgStub = nil + fake.recvMsgReturns = struct { + result1 error + }{result1} +} + +func (fake *ServerStream) RecvMsgReturnsOnCall(i int, result1 error) { + fake.recvMsgMutex.Lock() + defer fake.recvMsgMutex.Unlock() + fake.RecvMsgStub = nil + if fake.recvMsgReturnsOnCall == nil { + fake.recvMsgReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.recvMsgReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *ServerStream) SendHeader(arg1 metadata.MD) error { + fake.sendHeaderMutex.Lock() + ret, specificReturn := fake.sendHeaderReturnsOnCall[len(fake.sendHeaderArgsForCall)] + fake.sendHeaderArgsForCall = append(fake.sendHeaderArgsForCall, struct { + arg1 metadata.MD + }{arg1}) + fake.recordInvocation("SendHeader", []interface{}{arg1}) + fake.sendHeaderMutex.Unlock() + if fake.SendHeaderStub != nil { + return fake.SendHeaderStub(arg1) + } + if specificReturn { + return ret.result1 + } + fakeReturns := fake.sendHeaderReturns + return fakeReturns.result1 +} + +func (fake *ServerStream) SendHeaderCallCount() int { + fake.sendHeaderMutex.RLock() + defer fake.sendHeaderMutex.RUnlock() + return len(fake.sendHeaderArgsForCall) +} + +func (fake *ServerStream) SendHeaderCalls(stub func(metadata.MD) error) { + fake.sendHeaderMutex.Lock() + defer fake.sendHeaderMutex.Unlock() + fake.SendHeaderStub = stub +} + +func (fake *ServerStream) SendHeaderArgsForCall(i int) metadata.MD { + fake.sendHeaderMutex.RLock() + defer fake.sendHeaderMutex.RUnlock() + argsForCall := fake.sendHeaderArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *ServerStream) SendHeaderReturns(result1 error) { + fake.sendHeaderMutex.Lock() + defer fake.sendHeaderMutex.Unlock() + fake.SendHeaderStub = nil + fake.sendHeaderReturns = struct { + result1 error + }{result1} +} + +func (fake *ServerStream) SendHeaderReturnsOnCall(i int, result1 error) { + fake.sendHeaderMutex.Lock() + defer fake.sendHeaderMutex.Unlock() + fake.SendHeaderStub = nil + if fake.sendHeaderReturnsOnCall == nil { + fake.sendHeaderReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.sendHeaderReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *ServerStream) SendMsg(arg1 interface{}) error { + fake.sendMsgMutex.Lock() + ret, specificReturn := fake.sendMsgReturnsOnCall[len(fake.sendMsgArgsForCall)] + fake.sendMsgArgsForCall = append(fake.sendMsgArgsForCall, struct { + arg1 interface{} + }{arg1}) + fake.recordInvocation("SendMsg", []interface{}{arg1}) + fake.sendMsgMutex.Unlock() + if fake.SendMsgStub != nil { + return fake.SendMsgStub(arg1) + } + if specificReturn { + return ret.result1 + } + fakeReturns := fake.sendMsgReturns + return fakeReturns.result1 +} + +func (fake *ServerStream) SendMsgCallCount() int { + fake.sendMsgMutex.RLock() + defer fake.sendMsgMutex.RUnlock() + return len(fake.sendMsgArgsForCall) +} + +func (fake *ServerStream) SendMsgCalls(stub func(interface{}) error) { + fake.sendMsgMutex.Lock() + defer fake.sendMsgMutex.Unlock() + fake.SendMsgStub = stub +} + +func (fake *ServerStream) SendMsgArgsForCall(i int) interface{} { + fake.sendMsgMutex.RLock() + defer fake.sendMsgMutex.RUnlock() + argsForCall := fake.sendMsgArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *ServerStream) SendMsgReturns(result1 error) { + fake.sendMsgMutex.Lock() + defer fake.sendMsgMutex.Unlock() + fake.SendMsgStub = nil + fake.sendMsgReturns = struct { + result1 error + }{result1} +} + +func (fake *ServerStream) SendMsgReturnsOnCall(i int, result1 error) { + fake.sendMsgMutex.Lock() + defer fake.sendMsgMutex.Unlock() + fake.SendMsgStub = nil + if fake.sendMsgReturnsOnCall == nil { + fake.sendMsgReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.sendMsgReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *ServerStream) SetHeader(arg1 metadata.MD) error { + fake.setHeaderMutex.Lock() + ret, specificReturn := fake.setHeaderReturnsOnCall[len(fake.setHeaderArgsForCall)] + fake.setHeaderArgsForCall = append(fake.setHeaderArgsForCall, struct { + arg1 metadata.MD + }{arg1}) + fake.recordInvocation("SetHeader", []interface{}{arg1}) + fake.setHeaderMutex.Unlock() + if fake.SetHeaderStub != nil { + return fake.SetHeaderStub(arg1) + } + if specificReturn { + return ret.result1 + } + fakeReturns := fake.setHeaderReturns + return fakeReturns.result1 +} + +func (fake *ServerStream) SetHeaderCallCount() int { + fake.setHeaderMutex.RLock() + defer fake.setHeaderMutex.RUnlock() + return len(fake.setHeaderArgsForCall) +} + +func (fake *ServerStream) SetHeaderCalls(stub func(metadata.MD) error) { + fake.setHeaderMutex.Lock() + defer fake.setHeaderMutex.Unlock() + fake.SetHeaderStub = stub +} + +func (fake *ServerStream) SetHeaderArgsForCall(i int) metadata.MD { + fake.setHeaderMutex.RLock() + defer fake.setHeaderMutex.RUnlock() + argsForCall := fake.setHeaderArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *ServerStream) SetHeaderReturns(result1 error) { + fake.setHeaderMutex.Lock() + defer fake.setHeaderMutex.Unlock() + fake.SetHeaderStub = nil + fake.setHeaderReturns = struct { + result1 error + }{result1} +} + +func (fake *ServerStream) SetHeaderReturnsOnCall(i int, result1 error) { + fake.setHeaderMutex.Lock() + defer fake.setHeaderMutex.Unlock() + fake.SetHeaderStub = nil + if fake.setHeaderReturnsOnCall == nil { + fake.setHeaderReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.setHeaderReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *ServerStream) SetTrailer(arg1 metadata.MD) { + fake.setTrailerMutex.Lock() + fake.setTrailerArgsForCall = append(fake.setTrailerArgsForCall, struct { + arg1 metadata.MD + }{arg1}) + fake.recordInvocation("SetTrailer", []interface{}{arg1}) + fake.setTrailerMutex.Unlock() + if fake.SetTrailerStub != nil { + fake.SetTrailerStub(arg1) + } +} + +func (fake *ServerStream) SetTrailerCallCount() int { + fake.setTrailerMutex.RLock() + defer fake.setTrailerMutex.RUnlock() + return len(fake.setTrailerArgsForCall) +} + +func (fake *ServerStream) SetTrailerCalls(stub func(metadata.MD)) { + fake.setTrailerMutex.Lock() + defer fake.setTrailerMutex.Unlock() + fake.SetTrailerStub = stub +} + +func (fake *ServerStream) SetTrailerArgsForCall(i int) metadata.MD { + fake.setTrailerMutex.RLock() + defer fake.setTrailerMutex.RUnlock() + argsForCall := fake.setTrailerArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *ServerStream) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.contextMutex.RLock() + defer fake.contextMutex.RUnlock() + fake.recvMsgMutex.RLock() + defer fake.recvMsgMutex.RUnlock() + fake.sendHeaderMutex.RLock() + defer fake.sendHeaderMutex.RUnlock() + fake.sendMsgMutex.RLock() + defer fake.sendMsgMutex.RUnlock() + fake.setHeaderMutex.RLock() + defer fake.setHeaderMutex.RUnlock() + fake.setTrailerMutex.RLock() + defer fake.setTrailerMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *ServerStream) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/core/comm/mock/stream_handler.go b/core/comm/mock/stream_handler.go new file mode 100644 index 00000000000..511b982686c --- /dev/null +++ b/core/comm/mock/stream_handler.go @@ -0,0 +1,108 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mock + +import ( + sync "sync" + + grpc "google.golang.org/grpc" +) + +type StreamHandler struct { + Stub func(interface{}, grpc.ServerStream) error + mutex sync.RWMutex + argsForCall []struct { + arg1 interface{} + arg2 grpc.ServerStream + } + returns struct { + result1 error + } + returnsOnCall map[int]struct { + result1 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *StreamHandler) Spy(arg1 interface{}, arg2 grpc.ServerStream) error { + fake.mutex.Lock() + ret, specificReturn := fake.returnsOnCall[len(fake.argsForCall)] + fake.argsForCall = append(fake.argsForCall, struct { + arg1 interface{} + arg2 grpc.ServerStream + }{arg1, arg2}) + fake.recordInvocation("streamHandler", []interface{}{arg1, arg2}) + fake.mutex.Unlock() + if fake.Stub != nil { + return fake.Stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fake.returns.result1 +} + +func (fake *StreamHandler) CallCount() int { + fake.mutex.RLock() + defer fake.mutex.RUnlock() + return len(fake.argsForCall) +} + +func (fake *StreamHandler) Calls(stub func(interface{}, grpc.ServerStream) error) { + fake.mutex.Lock() + defer fake.mutex.Unlock() + fake.Stub = stub +} + +func (fake *StreamHandler) ArgsForCall(i int) (interface{}, grpc.ServerStream) { + fake.mutex.RLock() + defer fake.mutex.RUnlock() + return fake.argsForCall[i].arg1, fake.argsForCall[i].arg2 +} + +func (fake *StreamHandler) Returns(result1 error) { + fake.mutex.Lock() + defer fake.mutex.Unlock() + fake.Stub = nil + fake.returns = struct { + result1 error + }{result1} +} + +func (fake *StreamHandler) ReturnsOnCall(i int, result1 error) { + fake.mutex.Lock() + defer fake.mutex.Unlock() + fake.Stub = nil + if fake.returnsOnCall == nil { + fake.returnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.returnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *StreamHandler) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.mutex.RLock() + defer fake.mutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *StreamHandler) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/core/comm/mock/unary_handler.go b/core/comm/mock/unary_handler.go new file mode 100644 index 00000000000..a2820781ed9 --- /dev/null +++ b/core/comm/mock/unary_handler.go @@ -0,0 +1,112 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mock + +import ( + context "context" + sync "sync" +) + +type UnaryHandler struct { + Stub func(context.Context, interface{}) (interface{}, error) + mutex sync.RWMutex + argsForCall []struct { + arg1 context.Context + arg2 interface{} + } + returns struct { + result1 interface{} + result2 error + } + returnsOnCall map[int]struct { + result1 interface{} + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *UnaryHandler) Spy(arg1 context.Context, arg2 interface{}) (interface{}, error) { + fake.mutex.Lock() + ret, specificReturn := fake.returnsOnCall[len(fake.argsForCall)] + fake.argsForCall = append(fake.argsForCall, struct { + arg1 context.Context + arg2 interface{} + }{arg1, arg2}) + fake.recordInvocation("unaryHandler", []interface{}{arg1, arg2}) + fake.mutex.Unlock() + if fake.Stub != nil { + return fake.Stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fake.returns.result1, fake.returns.result2 +} + +func (fake *UnaryHandler) CallCount() int { + fake.mutex.RLock() + defer fake.mutex.RUnlock() + return len(fake.argsForCall) +} + +func (fake *UnaryHandler) Calls(stub func(context.Context, interface{}) (interface{}, error)) { + fake.mutex.Lock() + defer fake.mutex.Unlock() + fake.Stub = stub +} + +func (fake *UnaryHandler) ArgsForCall(i int) (context.Context, interface{}) { + fake.mutex.RLock() + defer fake.mutex.RUnlock() + return fake.argsForCall[i].arg1, fake.argsForCall[i].arg2 +} + +func (fake *UnaryHandler) Returns(result1 interface{}, result2 error) { + fake.mutex.Lock() + defer fake.mutex.Unlock() + fake.Stub = nil + fake.returns = struct { + result1 interface{} + result2 error + }{result1, result2} +} + +func (fake *UnaryHandler) ReturnsOnCall(i int, result1 interface{}, result2 error) { + fake.mutex.Lock() + defer fake.mutex.Unlock() + fake.Stub = nil + if fake.returnsOnCall == nil { + fake.returnsOnCall = make(map[int]struct { + result1 interface{} + result2 error + }) + } + fake.returnsOnCall[i] = struct { + result1 interface{} + result2 error + }{result1, result2} +} + +func (fake *UnaryHandler) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.mutex.RLock() + defer fake.mutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *UnaryHandler) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/core/comm/throttle.go b/core/comm/throttle.go new file mode 100644 index 00000000000..846758bdace --- /dev/null +++ b/core/comm/throttle.go @@ -0,0 +1,65 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package comm + +import ( + "context" + + "github.com/hyperledger/fabric/common/semaphore" + "google.golang.org/grpc" +) + +type Semaphore interface { + Acquire(ctx context.Context) error + Release() +} + +type Throttle struct { + newSemaphore NewSemaphoreFunc + semaphore Semaphore +} + +type ThrottleOption func(t *Throttle) +type NewSemaphoreFunc func(size int) Semaphore + +func WithNewSemaphore(newSemaphore NewSemaphoreFunc) ThrottleOption { + return func(t *Throttle) { + t.newSemaphore = newSemaphore + } +} + +func NewThrottle(maxConcurrency int, options ...ThrottleOption) *Throttle { + t := &Throttle{ + newSemaphore: func(count int) Semaphore { return semaphore.New(count) }, + } + + for _, optionFunc := range options { + optionFunc(t) + } + + t.semaphore = t.newSemaphore(maxConcurrency) + return t +} + +func (t *Throttle) UnaryServerIntercptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + if err := t.semaphore.Acquire(ctx); err != nil { + return nil, err + } + defer t.semaphore.Release() + + return handler(ctx, req) +} + +func (t *Throttle) StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + ctx := ss.Context() + if err := t.semaphore.Acquire(ctx); err != nil { + return err + } + defer t.semaphore.Release() + + return handler(srv, ss) +} diff --git a/core/comm/throttle_test.go b/core/comm/throttle_test.go new file mode 100644 index 00000000000..5e8a74667db --- /dev/null +++ b/core/comm/throttle_test.go @@ -0,0 +1,181 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package comm_test + +import ( + "context" + "sync" + "testing" + + "github.com/hyperledger/fabric/core/comm" + "github.com/hyperledger/fabric/core/comm/mock" + . "github.com/onsi/gomega" + "github.com/pkg/errors" + "google.golang.org/grpc" +) + +//go:generate counterfeiter -o mock/semaphore.go -fake-name Semaphore . Semaphore +//go:generate counterfeiter -o mock/new_semaphore.go -fake-name NewSemaphore . NewSemaphoreFunc +//go:generate counterfeiter -o mock/server_stream.go -fake-name ServerStream . serverStream +//go:generate counterfeiter -o mock/stream_handler.go -fake-name StreamHandler . streamHandler +//go:generate counterfeiter -o mock/unary_handler.go -fake-name UnaryHandler . unaryHandler + +type serverStream interface{ grpc.ServerStream } +type streamHandler grpc.StreamHandler +type unaryHandler grpc.UnaryHandler + +func TestThrottle(t *testing.T) { + gt := NewGomegaWithT(t) + + wg := sync.WaitGroup{} + done := make(chan struct{}) + unaryHandler := &mock.UnaryHandler{} + unaryHandler.Stub = func(context.Context, interface{}) (interface{}, error) { + wg.Done() + <-done + return nil, nil + } + streamHandler := &mock.StreamHandler{} + streamHandler.Stub = func(interface{}, grpc.ServerStream) error { + wg.Done() + <-done + return nil + } + serverStream := &mock.ServerStream{} + serverStream.ContextReturns(context.Background()) + + throttle := comm.NewThrottle(2) + + // run requests to the throttle point + wg.Add(2) + go throttle.UnaryServerIntercptor(context.Background(), nil, nil, unaryHandler.Spy) + go throttle.StreamServerInterceptor(nil, serverStream, nil, streamHandler.Spy) + wg.Wait() + + unaryComplete := make(chan struct{}) + blockedUnaryHandler := &mock.UnaryHandler{} + blockedUnaryHandler.Stub = func(context.Context, interface{}) (interface{}, error) { + close(unaryComplete) + return nil, nil + } + go throttle.UnaryServerIntercptor(context.Background(), nil, nil, blockedUnaryHandler.Spy) + gt.Consistently(unaryComplete).ShouldNot(BeClosed()) + + streamComplete := make(chan struct{}) + blockedStreamHandler := &mock.StreamHandler{} + blockedStreamHandler.Stub = func(interface{}, grpc.ServerStream) error { + close(streamComplete) + return nil + } + go throttle.StreamServerInterceptor(nil, serverStream, nil, blockedStreamHandler.Spy) + gt.Consistently(streamComplete).ShouldNot(BeClosed()) + + close(done) + gt.Eventually(unaryComplete).Should(BeClosed()) + gt.Eventually(streamComplete).Should(BeClosed()) +} + +func TestUnaryServerInterceptor(t *testing.T) { + gt := NewGomegaWithT(t) + + semaphore := &mock.Semaphore{} + newSemaphore := &mock.NewSemaphore{} + newSemaphore.Returns(semaphore) + + key := struct{}{} + ctx := context.WithValue(context.Background(), key, "value") + handler := &mock.UnaryHandler{} + handler.Returns("result", nil) + + throttle := comm.NewThrottle(3, comm.WithNewSemaphore(newSemaphore.Spy)) + + result, err := throttle.UnaryServerIntercptor(ctx, "request", nil, handler.Spy) + gt.Expect(err).NotTo(HaveOccurred()) + gt.Expect(result).To(Equal("result")) + + gt.Expect(handler.CallCount()).To(Equal(1)) + c, r := handler.ArgsForCall(0) + gt.Expect(c).To(BeIdenticalTo(ctx)) + gt.Expect(r).To(Equal("request")) + + gt.Expect(semaphore.AcquireCallCount()).To(Equal(1)) + + gt.Expect(semaphore.ReleaseCallCount()).To(Equal(1)) +} + +func TestUnaryServerInterceptorAcquireFail(t *testing.T) { + gt := NewGomegaWithT(t) + + semaphore := &mock.Semaphore{} + semaphore.AcquireReturns(errors.New("you're-dead-to-me")) + newSemaphore := &mock.NewSemaphore{} + newSemaphore.Returns(semaphore) + + throttle := comm.NewThrottle(3, comm.WithNewSemaphore(newSemaphore.Spy)) + ctx := context.Background() + + _, err := throttle.UnaryServerIntercptor(ctx, "request", nil, nil) + gt.Expect(err).To(MatchError("you're-dead-to-me")) + + gt.Expect(semaphore.AcquireCallCount()).To(Equal(1)) + c := semaphore.AcquireArgsForCall(0) + gt.Expect(c).To(Equal(ctx)) + gt.Expect(semaphore.ReleaseCallCount()).To(Equal(0)) +} + +func TestStreamServerInterceptor(t *testing.T) { + gt := NewGomegaWithT(t) + + semaphore := &mock.Semaphore{} + newSemaphore := &mock.NewSemaphore{} + newSemaphore.Returns(semaphore) + + key := struct{}{} + expectedSrv := struct{ string }{"server"} + ctx := context.WithValue(context.Background(), key, "value") + + serverStream := &mock.ServerStream{} + serverStream.ContextReturns(ctx) + + handler := &mock.StreamHandler{} + + throttle := comm.NewThrottle(3, comm.WithNewSemaphore(newSemaphore.Spy)) + err := throttle.StreamServerInterceptor(expectedSrv, serverStream, nil, handler.Spy) + gt.Expect(err).NotTo(HaveOccurred()) + + gt.Expect(handler.CallCount()).To(Equal(1)) + srv, ss := handler.ArgsForCall(0) + gt.Expect(srv).To(Equal(expectedSrv)) + gt.Expect(ss).To(Equal(serverStream)) + + gt.Expect(serverStream.ContextCallCount()).To(Equal(1)) + gt.Expect(semaphore.AcquireCallCount()).To(Equal(1)) + c := semaphore.AcquireArgsForCall(0) + gt.Expect(c).To(Equal(ctx)) + gt.Expect(semaphore.ReleaseCallCount()).To(Equal(1)) +} + +func TestStreamServerInterceptorAcquireFail(t *testing.T) { + gt := NewGomegaWithT(t) + + semaphore := &mock.Semaphore{} + semaphore.AcquireReturns(errors.New("your-name-is-mud")) + newSemaphore := &mock.NewSemaphore{} + newSemaphore.Returns(semaphore) + + throttle := comm.NewThrottle(3, comm.WithNewSemaphore(newSemaphore.Spy)) + ctx := context.Background() + + serverStream := &mock.ServerStream{} + serverStream.ContextReturns(ctx) + + err := throttle.StreamServerInterceptor(nil, serverStream, nil, nil) + gt.Expect(err).To(MatchError("your-name-is-mud")) + + gt.Expect(semaphore.AcquireCallCount()).To(Equal(1)) + gt.Expect(semaphore.ReleaseCallCount()).To(Equal(0)) +} diff --git a/peer/node/start.go b/peer/node/start.go index 5d4ba394a1b..818a9f76e45 100644 --- a/peer/node/start.go +++ b/peer/node/start.go @@ -31,7 +31,7 @@ import ( "github.com/hyperledger/fabric/core/aclmgmt" "github.com/hyperledger/fabric/core/aclmgmt/resources" "github.com/hyperledger/fabric/core/admin" - "github.com/hyperledger/fabric/core/cclifecycle" + cc "github.com/hyperledger/fabric/core/cclifecycle" "github.com/hyperledger/fabric/core/chaincode" "github.com/hyperledger/fabric/core/chaincode/accesscontrol" "github.com/hyperledger/fabric/core/chaincode/lifecycle" @@ -53,7 +53,7 @@ import ( endorsement2 "github.com/hyperledger/fabric/core/handlers/endorsement/api" endorsement3 "github.com/hyperledger/fabric/core/handlers/endorsement/api/identities" "github.com/hyperledger/fabric/core/handlers/library" - "github.com/hyperledger/fabric/core/handlers/validation/api" + validation "github.com/hyperledger/fabric/core/handlers/validation/api" "github.com/hyperledger/fabric/core/ledger/cceventmgmt" "github.com/hyperledger/fabric/core/ledger/ledgermgmt" "github.com/hyperledger/fabric/core/operations" @@ -93,6 +93,7 @@ const ( chaincodeAddrKey = "peer.chaincodeAddress" chaincodeListenAddrKey = "peer.chaincodeListenAddress" defaultChaincodePort = 7052 + grpcMaxConcurrency = 2500 ) var chaincodeDevMode bool @@ -209,17 +210,20 @@ func serve(args []string) error { logger.Fatalf("Error loading secure config for peer (%s)", err) } + throttle := comm.NewThrottle(grpcMaxConcurrency) serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer") serverConfig.MetricsProvider = metricsProvider serverConfig.UnaryInterceptors = append( serverConfig.UnaryInterceptors, grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)), grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()), + throttle.UnaryServerIntercptor, ) serverConfig.StreamInterceptors = append( serverConfig.StreamInterceptors, grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)), grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()), + throttle.StreamServerInterceptor, ) peerServer, err := peer.NewPeerServer(listenAddr, serverConfig) @@ -777,17 +781,20 @@ func startAdminServer(peerListenAddr string, peerServer *grpc.Server, metricsPro if err != nil { logger.Fatalf("Error loading secure config for admin service (%s)", err) } + throttle := comm.NewThrottle(grpcMaxConcurrency) serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "AdminServer") serverConfig.MetricsProvider = metricsProvider serverConfig.UnaryInterceptors = append( serverConfig.UnaryInterceptors, grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)), grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()), + throttle.UnaryServerIntercptor, ) serverConfig.StreamInterceptors = append( serverConfig.StreamInterceptors, grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)), grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()), + throttle.StreamServerInterceptor, ) adminServer, err := peer.NewPeerServer(adminListenAddress, serverConfig) if err != nil {