Skip to content

Commit

Permalink
address pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pleasew8t committed Dec 6, 2024
1 parent ad10691 commit a923d97
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 23 deletions.
6 changes: 3 additions & 3 deletions node/pkg/governor/governor_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ func (gov *ChainGovernor) CollectMetrics(ctx context.Context, hb *gossipv1.Heart
}

if startTime.After(gov.nextStatusPublishTime) {
gov.publishStatus(hb, sendC, startTime, guardianSigner, ourAddr)
gov.publishStatus(ctx, hb, sendC, startTime, guardianSigner, ourAddr)
gov.nextStatusPublishTime = startTime.Add(time.Minute)
}
}
Expand Down Expand Up @@ -621,7 +621,7 @@ func (gov *ChainGovernor) publishConfig(ctx context.Context, hb *gossipv1.Heartb
sendC <- b
}

func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan<- []byte, startTime time.Time, guardianSigner guardiansigner.GuardianSigner, ourAddr ethCommon.Address) {
func (gov *ChainGovernor) publishStatus(ctx context.Context, hb *gossipv1.Heartbeat, sendC chan<- []byte, startTime time.Time, guardianSigner guardiansigner.GuardianSigner, ourAddr ethCommon.Address) {
chains := make([]*gossipv1.ChainGovernorStatus_Chain, 0)
numEnqueued := 0
for chainId, ce := range gov.chains {
Expand Down Expand Up @@ -686,7 +686,7 @@ func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan<- []b

digest := ethCrypto.Keccak256Hash(append(governorMessagePrefixStatus, b...))

sig, err := guardianSigner.Sign(context.Background(), digest.Bytes())
sig, err := guardianSigner.Sign(ctx, digest.Bytes())
if err != nil {
panic(err)
}
Expand Down
12 changes: 9 additions & 3 deletions node/pkg/guardiansigner/amazonkms.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewAmazonKmsSigner(ctx context.Context, unsafeDevMode bool, keyPath string)
// an error. This is why the region is first extracted from the keyPath.
cfg, err := config.LoadDefaultConfig(timeoutCtx, config.WithDefaultRegion(amazonKmsSigner.region))
if err != nil {
return nil, errors.New("Failed to load default config")
return nil, errors.New("Failed to load KMS default config")
}

amazonKmsSigner.client = kms.NewFromConfig(cfg)
Expand All @@ -118,6 +118,12 @@ func NewAmazonKmsSigner(ctx context.Context, unsafeDevMode bool, keyPath string)
return nil, fmt.Errorf("Failed to unmarshal KMS public key: %w", err)
}

// It is possible to use `ethcrypto.UnmarshalPubkey(asn1Pubkey.PublicKey.Bytes)`` to get the public key,
// but `UnmarshalPubkey()` uses elliptic.Unmarshal() internally, which has been marked as deprecated.
// The following code implements similar logic, with the indexes meaning the following:
// 0: The first byte is the prefix byte, which is 0x04 for uncompressed keys.
// 1-32: The next 32 bytes are the X coordinate.
// 33-64: The next 32 bytes are the Y coordinate.
ecdsaPubkey := ecdsa.PublicKey{
X: new(big.Int).SetBytes(asn1Pubkey.PublicKey.Bytes[1 : 1+32]),
Y: new(big.Int).SetBytes(asn1Pubkey.PublicKey.Bytes[1+32:]),
Expand Down Expand Up @@ -192,7 +198,7 @@ func (a *AmazonKms) PublicKey(ctx context.Context) ecdsa.PublicKey {
}

func (a *AmazonKms) Verify(ctx context.Context, sig []byte, hash []byte) (bool, error) {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*15)
timeoutCtx, cancel := context.WithTimeout(ctx, KMS_TIMEOUT)
defer cancel()

// Use ethcrypto to recover the public key
Expand Down Expand Up @@ -234,7 +240,7 @@ func derSignatureToRS(signature []byte) ([]byte, []byte, error) {
}

// adjustBufferSize takes an input buffer and
// a) trims it down to 32 bytes, if the input length is greater than 32, or
// a) trims it down to 32 bytes, starting at the most significant byte, if the input length is greater than 32, or
// b) returns the input as-is, if the input length is equal to 32, or
// c) left-pads it to 32 bytes, if the input length is less than 32.
func adjustBufferSize(b []byte) []byte {
Expand Down
12 changes: 6 additions & 6 deletions node/pkg/guardiansigner/benchmarksigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ func (b *BenchmarkSigner) Sign(ctx context.Context, hash []byte) ([]byte, error)
sig, err := b.innerSigner.Sign(ctx, hash)
duration := time.Since(start)

// Add Observation to histogram
guardianSignerSigningLatency.Observe(float64(duration.Microseconds()))

// If an error occured, increment the error counter
if err != nil {
guardianSignerSigningErrorCount.Inc()
} else {
// Add Observation to histogram only if no errors occured
guardianSignerSigningLatency.Observe(float64(duration.Microseconds()))
}

return sig, err
Expand All @@ -99,12 +99,12 @@ func (b *BenchmarkSigner) Verify(ctx context.Context, sig []byte, hash []byte) (
valid, err := b.innerSigner.Verify(ctx, sig, hash)
duration := time.Since(start)

// Add observation to histogram
guardianSignerVerifyLatency.Observe(float64(duration.Microseconds()))

// If an error occured, increment the error counter
if err != nil {
guardianSignerVerifyErrorCount.Inc()
} else {
// Add observation to histogram only if no errors occured
guardianSignerVerifyLatency.Observe(float64(duration.Microseconds()))
}

return valid, err
Expand Down
5 changes: 2 additions & 3 deletions node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func Run(params *RunParams) func(ctx context.Context) error {

msg := gossipv1.GossipMessage{
Message: &gossipv1.GossipMessage_SignedHeartbeat{
SignedHeartbeat: createSignedHeartbeat(params.guardianSigner, heartbeat),
SignedHeartbeat: createSignedHeartbeat(ctx, params.guardianSigner, heartbeat),
},
}

Expand Down Expand Up @@ -983,8 +983,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}
}

func createSignedHeartbeat(guardianSigner guardiansigner.GuardianSigner, heartbeat *gossipv1.Heartbeat) *gossipv1.SignedHeartbeat {
ctx := context.Background()
func createSignedHeartbeat(ctx context.Context, guardianSigner guardiansigner.GuardianSigner, heartbeat *gossipv1.Heartbeat) *gossipv1.SignedHeartbeat {
ourAddr := ethcrypto.PubkeyToAddress(guardianSigner.PublicKey(ctx))

b, err := proto.Marshal(heartbeat)
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/p2p/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestSignedHeartbeat(t *testing.T) {
P2PNodeId: tc.p2pNodeId,
}

s := createSignedHeartbeat(guardianSigner, heartbeat)
s := createSignedHeartbeat(context.Background(), guardianSigner, heartbeat)
gs := &node_common.GuardianSet{
Keys: []common.Address{addr},
Index: 1,
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/processor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func BenchmarkHandleObservation(b *testing.B) {
for count := 0; count < NumObservations; count++ {
k := pd.createMessagePublication(b, uint64(count))
start := time.Now()
p.handleMessage(k)
p.handleMessage(ctx, k)
handleMsgTime += time.Since(start)

for guardianIdx := 1; guardianIdx < 19; guardianIdx++ {
Expand Down Expand Up @@ -108,7 +108,7 @@ func BenchmarkProfileHandleObservation(b *testing.B) {

for count := 0; count < NumObservations; count++ {
k := pd.createMessagePublication(b, uint64(count))
p.handleMessage(k)
p.handleMessage(ctx, k)

for guardianIdx := 1; guardianIdx < 19; guardianIdx++ {
p.handleSingleObservation(pd.guardianAddrs[guardianIdx], pd.createObservation(b, guardianIdx, k))
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/processor/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (

// handleMessage processes a message received from a chain and instantiates our deterministic copy of the VAA. An
// event may be received multiple times and must be handled in an idempotent fashion.
func (p *Processor) handleMessage(k *common.MessagePublication) {
func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublication) {
if p.gs == nil {
p.logger.Warn("dropping observation since we haven't initialized our guardian set yet",
zap.String("message_id", k.MessageIDString()),
Expand Down Expand Up @@ -70,7 +70,7 @@ func (p *Processor) handleMessage(k *common.MessagePublication) {
hash := hex.EncodeToString(digest.Bytes())

// Sign the digest using the node's GuardianSigner
signature, err := p.guardianSigner.Sign(context.Background(), digest.Bytes())
signature, err := p.guardianSigner.Sign(ctx, digest.Bytes())
if err != nil {
panic(err)
}
Expand Down
6 changes: 3 additions & 3 deletions node/pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (p *Processor) Run(ctx context.Context) error {
continue
}
}
p.handleMessage(k)
p.handleMessage(ctx, k)

case k := <-p.acctReadC:
if p.acct == nil {
Expand All @@ -318,7 +318,7 @@ func (p *Processor) Run(ctx context.Context) error {
if !p.acct.IsMessageCoveredByAccountant(k) {
return fmt.Errorf("accountant published a message that is not covered by it: `%s`", k.MessageIDString())
}
p.handleMessage(k)
p.handleMessage(ctx, k)
case m := <-p.obsvC:
observationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds()))
p.handleObservation(m)
Expand Down Expand Up @@ -352,7 +352,7 @@ func (p *Processor) Run(ctx context.Context) error {
continue
}
}
p.handleMessage(k)
p.handleMessage(ctx, k)
}
}
}
Expand Down

0 comments on commit a923d97

Please sign in to comment.