Skip to content

Commit

Permalink
Merge pull request hyperledger#101 from hyperledger/async-start
Browse files Browse the repository at this point in the history
Do not block startup to check chain head
  • Loading branch information
nguyer authored Sep 28, 2023
2 parents 4d55b8a + b112fc8 commit 25ea3f9
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 38 deletions.
35 changes: 34 additions & 1 deletion config.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
|headers|Adds custom headers to HTTP requests|`map[string]string`|`<nil>`
|idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms`
|maxConcurrentRequests|Maximum of concurrent requests to be submitted to the blockchain|`int`|`50`
|maxConnsPerHost|The max number of connections, per unique hostname. Zero means no limit|`int`|`0`
|maxIdleConns|The max number of idle connections to hold pooled|`int`|`100`
|passthroughHeadersEnabled|Enable passing through the set of allowed HTTP request headers|`boolean`|`false`
|requestTimeout|The maximum amount of time that a request is allowed to remain open|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
Expand Down Expand Up @@ -138,7 +139,36 @@

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|port|An HTTP port on which to enable the go debugger|`int`|`-1`
|address|Listener address|`int`|`127.0.0.1`
|enabled|Whether the debug HTTP endpoint is enabled|`boolean`|`true`
|port|An HTTP port on which to enable the go debugger|`int`|`0`
|publicURL|Externally available URL for the HTTP endpoint|`string`|`<nil>`
|readTimeout|HTTP server read timeout|[`time.Duration`](https://pkg.go.dev/time#Duration)|`15s`
|shutdownTimeout|HTTP server shutdown timeout|[`time.Duration`](https://pkg.go.dev/time#Duration)|`10s`
|writeTimeout|HTTP server write timeout|[`time.Duration`](https://pkg.go.dev/time#Duration)|`15s`

## debug.auth

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|type|The auth plugin to use for server side authentication of requests|`string`|`<nil>`

## debug.auth.basic

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|passwordfile|The path to a .htpasswd file to use for authenticating requests. Passwords should be hashed with bcrypt.|`string`|`<nil>`

## debug.tls

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|caFile|The path to the CA file for TLS on this API|`string`|`<nil>`
|certFile|The path to the certificate file for TLS on this API|`string`|`<nil>`
|clientAuth|Enables or disables client auth for TLS on this API|`string`|`<nil>`
|enabled|Enables or disables TLS on this API|`boolean`|`false`
|keyFile|The path to the private key file for TLS on this API|`string`|`<nil>`
|requiredDNAttributes|A set of required subject DN attributes. Each entry is a regular expression, and the subject certificate must have a matching attribute of the specified type (CN, C, O, OU, ST, L, STREET, POSTALCODE, SERIALNUMBER are valid attributes)|`map[string]string`|`<nil>`

## eventstreams

Expand Down Expand Up @@ -297,6 +327,7 @@
|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
|headers|Adds custom headers to HTTP requests|`map[string]string`|`<nil>`
|idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms`
|maxConnsPerHost|The max number of connections, per unique hostname. Zero means no limit|`int`|`0`
|maxIdleConns|The max number of idle connections to hold pooled|`int`|`100`
|method|Deprecated: Please use 'transactions.handler.simple.gasOracle.method' instead|`string`|`GET`
|mode|Deprecated: Please use 'transactions.handler.simple.gasOracle.mode' instead|'connector', 'restapi', 'fixed', or 'disabled'|`connector`
Expand Down Expand Up @@ -385,6 +416,7 @@
|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
|headers|Adds custom headers to HTTP requests|`map[string]string`|`<nil>`
|idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms`
|maxConnsPerHost|The max number of connections, per unique hostname. Zero means no limit|`int`|`0`
|maxIdleConns|The max number of idle connections to hold pooled|`int`|`100`
|method|The HTTP Method to use when invoking the Gas Oracle REST API|`string`|`GET`
|mode|The gas oracle mode|'connector', 'restapi', 'fixed', or 'disabled'|`connector`
Expand Down Expand Up @@ -445,6 +477,7 @@
|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
|headers|Adds custom headers to HTTP requests|`map[string]string`|`<nil>`
|idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms`
|maxConnsPerHost|The max number of connections, per unique hostname. Zero means no limit|`int`|`0`
|maxIdleConns|The max number of idle connections to hold pooled|`int`|`100`
|passthroughHeadersEnabled|Enable passing through the set of allowed HTTP request headers|`boolean`|`false`
|requestTimeout|The maximum amount of time that a request is allowed to remain open|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ go 1.19

require (
github.com/hashicorp/golang-lru v0.5.4
github.com/hyperledger/firefly-common v1.2.18
github.com/hyperledger/firefly-signer v1.1.9
github.com/hyperledger/firefly-transaction-manager v1.3.3
github.com/hyperledger/firefly-common v1.3.0
github.com/hyperledger/firefly-signer v1.1.10-0.20230928181245-a5e84e149fdb
github.com/hyperledger/firefly-transaction-manager v1.3.4
github.com/sirupsen/logrus v1.9.2
github.com/spf13/cobra v1.6.1
github.com/stretchr/testify v1.8.1
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/huandu/xstrings v1.3.1 h1:4jgBlKK6tLKFvO8u5pmYjG91cqytmDCDvGh7ECVFfFs=
github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/hyperledger/firefly-common v1.2.18 h1:oMxmhVsVhitoEeZXJPVIM10RxwK0Z33GeR+VPXxULms=
github.com/hyperledger/firefly-common v1.2.18/go.mod h1:17lOH4YufiPy82LpKm8fPa/YXJ0pUyq01zK1CmklJwM=
github.com/hyperledger/firefly-signer v1.1.9 h1:Tx1iPTOLTpdFOLtkKFUWpjNsL/+oswnAaU4CTLMDw4Q=
github.com/hyperledger/firefly-signer v1.1.9/go.mod h1:vNbbROziwqkOmO0b+9ky3devjcFg0JIkR2M1KG7seTQ=
github.com/hyperledger/firefly-transaction-manager v1.3.3 h1:jOBlFljFgz/pn8g1DlSGMkuJyGmf/Scsu3SDCqIJlZ8=
github.com/hyperledger/firefly-transaction-manager v1.3.3/go.mod h1:Bbp4hDoOFbu463pTfyFY/MPDWyTq89uAFk4OcJS7UXY=
github.com/hyperledger/firefly-common v1.3.0 h1:eLFUJuPU8E5iZXYGHlXghQuN+opWG/qp7zvMKavKEPU=
github.com/hyperledger/firefly-common v1.3.0/go.mod h1:17lOH4YufiPy82LpKm8fPa/YXJ0pUyq01zK1CmklJwM=
github.com/hyperledger/firefly-signer v1.1.10-0.20230928181245-a5e84e149fdb h1:0tlOFV8x9NBYvHUfJp5dwS9cZdwqRXuipGLOreh67Gw=
github.com/hyperledger/firefly-signer v1.1.10-0.20230928181245-a5e84e149fdb/go.mod h1:0w6HjamOI21i9oGYDzW5p1A2ijLenM+liciGRzLhV5w=
github.com/hyperledger/firefly-transaction-manager v1.3.4 h1:L3KNuyVdOpw+wgS44gUBs+5dh3vxL921h1rlKGZFz6s=
github.com/hyperledger/firefly-transaction-manager v1.3.4/go.mod h1:Bbp4hDoOFbu463pTfyFY/MPDWyTq89uAFk4OcJS7UXY=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
Expand Down
8 changes: 5 additions & 3 deletions internal/ethereum/blocklistener.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -404,7 +404,7 @@ func (bl *blockListener) addConsumer(c *blockUpdateConsumer) {
bl.consumers[*c.id] = c
}

func (bl *blockListener) getHighestBlock(ctx context.Context) int64 {
func (bl *blockListener) getHighestBlock(ctx context.Context) (int64, bool) {
bl.mux.Lock()
bl.checkStartedLocked()
highestBlock := bl.highestBlock
Expand All @@ -414,13 +414,15 @@ func (bl *blockListener) getHighestBlock(ctx context.Context) int64 {
select {
case <-bl.initialBlockHeightObtained:
case <-ctx.Done():
// Inform caller we timed out, or were closed
return -1, false
}
}
bl.mux.Lock()
highestBlock = bl.highestBlock
bl.mux.Unlock()
log.L(ctx).Debugf("ChainHead=%d", highestBlock)
return highestBlock
return highestBlock, true
}

func (bl *blockListener) waitClosed() {
Expand Down
8 changes: 6 additions & 2 deletions internal/ethereum/blocklistener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ func TestBlockListenerStartGettingHighestBlockRetry(t *testing.T) {
*hbh = *ethtypes.NewHexInteger64(12345)
})

assert.Equal(t, int64(12345), bl.getHighestBlock(bl.ctx))
h, ok := bl.getHighestBlock(bl.ctx)
assert.Equal(t, int64(12345), h)
assert.True(t, ok)
done() // Stop immediately in this case, while we're in the polling interval

<-bl.listenLoopDone
Expand All @@ -59,7 +61,9 @@ func TestBlockListenerStartGettingHighestBlockFailBeforeStop(t *testing.T) {
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").
Return(&rpcbackend.RPCError{Message: "pop"}).Maybe()

assert.Equal(t, int64(-1), bl.getHighestBlock(bl.ctx))
h, ok := bl.getHighestBlock(bl.ctx)
assert.False(t, ok)
assert.Equal(t, int64(-1), h)

<-bl.listenLoopDone

Expand Down
26 changes: 8 additions & 18 deletions internal/ethereum/event_actions.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -44,30 +44,20 @@ func (c *ethConnector) EventStreamStart(ctx context.Context, req *ffcapi.EventSt
streamLoopDone: make(chan struct{}),
}

chainHead := c.blockListener.getHighestBlock(ctx)
for _, lReq := range req.InitialListeners {
l, err := es.addEventListener(ctx, lReq)
// We add all the initial event listeners, checking for errors, before kicking off the streamLoop().
for _, il := range req.InitialListeners {
// Add, but do NOT start, the listener.
// We do pre-startup processing on this special set in streamLoop() after we've established
// the chain head, then start them all after that.
_, err := es.addEventListener(ctx, il)
if err != nil {
return nil, "", err
}
// During initial start we move the "head" block forwards to be the highest of all the initial streams
if l.hwmBlock > es.headBlock {
if l.hwmBlock > chainHead {
es.headBlock = chainHead
} else {
es.headBlock = l.hwmBlock
}
}
}

// From this point we consider ourselves started
// From this point we consider ourselves as in started state, but we might not actually be ready
c.eventStreams[*req.ID] = es

// Start all the listeners
for _, l := range es.listeners {
es.startEventListener(l)
}

// Start the listener head routine, which reads events for all listeners that are not in catchup mode
go es.streamLoop()

Expand Down
4 changes: 2 additions & 2 deletions internal/ethereum/event_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func (cp *listenerCheckpoint) LessThan(b ffcapi.EventListenerCheckpoint) bool {
func (l *listener) getInitialBlock(ctx context.Context, fromBlockInstruction string) (int64, error) {
if fromBlockInstruction == ffcapi.FromBlockLatest || fromBlockInstruction == "" {
// Get the latest block number of the chain
chainHead := l.c.blockListener.getHighestBlock(ctx)
if chainHead < 0 {
chainHead, ok := l.c.blockListener.getHighestBlock(ctx)
if !ok {
return -1, i18n.NewError(ctx, msgs.MsgTimedOutQueryingChainHead)
}
return chainHead, nil
Expand Down
37 changes: 34 additions & 3 deletions internal/ethereum/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,11 @@ func (es *eventStream) leadGroupCatchup() bool {
return true
}

chainHeadBlock := es.c.blockListener.getHighestBlock(es.ctx)
chainHeadBlock, ok := es.c.blockListener.getHighestBlock(es.ctx)
if !ok {
log.L(es.ctx).Debugf("Stream catchup exiting (closed checking block height)")
return true
}

// Build the aggregated listener list (doesn't matter if it's changed, as we build the list each time)
_ = es.buildReuseLeadGroupListener(&lastUpdate, &ag)
Expand Down Expand Up @@ -311,7 +315,8 @@ func (es *eventStream) leadGroupSteadyState() bool {

// High water mark is a point safely behind the head of the chain in this case,
// where re-orgs are not expected.
hwmBlock := es.c.blockListener.getHighestBlock(es.ctx) - es.c.checkpointBlockGap
bh, _ := es.c.blockListener.getHighestBlock(es.ctx) /* note we know we're initialized here and will not block */
hwmBlock := bh - es.c.checkpointBlockGap
if hwmBlock < 0 {
hwmBlock = 0
}
Expand All @@ -333,7 +338,7 @@ func (es *eventStream) leadGroupSteadyState() bool {
}

// Check we're not outside of the steady state window, and need to fall back to catchup mode
chainHeadBlock := es.c.blockListener.getHighestBlock(es.ctx)
chainHeadBlock, _ := es.c.blockListener.getHighestBlock(es.ctx) /* note we know we're initialized here and will not block */
blockGapEstimate := (chainHeadBlock - fromBlock)
if blockGapEstimate > es.c.catchupThreshold {
log.L(es.ctx).Warnf("Block gap estimate reached %d (above threshold of %d) - reverting to catchup mode", blockGapEstimate, es.c.catchupThreshold)
Expand Down Expand Up @@ -405,9 +410,35 @@ func (es *eventStream) leadGroupSteadyState() bool {
}
}

func (es *eventStream) preStartProcessing() {
ctx := es.ctx
chainHead, ok := es.c.blockListener.getHighestBlock(ctx)
if !ok {
log.L(ctx).Warnf("Event stream closed before establishing block height")
return
}
for _, l := range es.listeners {
// During initial start we move the "head" block forwards to be the highest of all the initial streams
if l.hwmBlock > es.headBlock {
if l.hwmBlock > chainHead {
es.headBlock = chainHead
} else {
es.headBlock = l.hwmBlock
}
}
}

// Now we've done that, we can start all the listeners
for _, l := range es.listeners {
es.startEventListener(l)
}
}

func (es *eventStream) streamLoop() {
defer close(es.streamLoopDone)

es.preStartProcessing()

for {
// When we first start, we might find our leading pack of listeners are all way behind
// the head of the chain. So we run a catchup mode loop to ensure we don't ask the blockchain
Expand Down
3 changes: 3 additions & 0 deletions internal/ethereum/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func testEventStreamExistingConnector(t *testing.T, ctx context.Context, done fu
es.c.eventFilterPollingInterval = 1 * time.Millisecond
es.c.retry.MaximumDelay = 1 * time.Microsecond
assert.NotNil(t, es)

es.preStartProcessing()

return es, events, mRPC, func() {
done()
_, _, err := c.EventStreamStopped(ctx, &ffcapi.EventStreamStoppedRequest{
Expand Down

0 comments on commit 25ea3f9

Please sign in to comment.