From 0a0da7d58062a8a578f5924b05d60aa9d31ae086 Mon Sep 17 00:00:00 2001 From: aalu1418 <50029043+aalu1418@users.noreply.github.com> Date: Fri, 13 Sep 2024 16:46:09 -0600 Subject: [PATCH 1/3] som: handle pruned transactions for slowly updating feeds --- pkg/monitoring/source_envelope.go | 35 ++++++++++++++++++++------ pkg/monitoring/source_envelope_test.go | 25 ++++++++++++++++-- 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/pkg/monitoring/source_envelope.go b/pkg/monitoring/source_envelope.go index 30648ecef..7b970c197 100644 --- a/pkg/monitoring/source_envelope.go +++ b/pkg/monitoring/source_envelope.go @@ -43,9 +43,9 @@ func (s *envelopeSourceFactory) NewSource( return nil, fmt.Errorf("expected feedConfig to be of type config.SolanaFeedConfig not %T", feedConfig) } return &envelopeSource{ - s.client, - solanaFeedConfig, - s.log, + client: s.client, + feedConfig: solanaFeedConfig, + log: s.log, }, nil } @@ -57,6 +57,25 @@ type envelopeSource struct { client ChainReader feedConfig config.SolanaFeedConfig log commonMonitoring.Logger + + // these values are cached because transactions can be pruned from an RPC node + // if no transactions are found and without a cache, the `getJuelsPerLamport` call will block the entire source.Fetch + lock sync.RWMutex + cacheJuelsPerLamport uint64 +} + +func (s *envelopeSource) setJuelsPerLamport(v uint64) error { + s.lock.Lock() + s.cacheJuelsPerLamport = v + s.lock.Unlock() + + return nil +} + +func (s *envelopeSource) readJuelsPerLamport() uint64 { + s.lock.RLock() + s.lock.RUnlock() + return s.cacheJuelsPerLamport } func (s *envelopeSource) Fetch(ctx context.Context) (interface{}, error) { @@ -163,7 +182,9 @@ func (s *envelopeSource) getJuelsPerLamport(ctx context.Context) (*big.Int, erro return nil, fmt.Errorf("failed to fetch tx signatures for state account '%s': %w", s.feedConfig.StateAccountBase58, err) } if len(txSigs) == 0 { - return nil, fmt.Errorf("found no transactions from state account '%s'", s.feedConfig.StateAccountBase58) + val := s.readJuelsPerLamport() + s.log.Warnw("no transactions found, falling back to cached value - history may have been pruned (cached_value=0 indicates pruned txs encountered on startup)", "state_account", s.feedConfig.StateAccountBase58, "cached_value", val) + return new(big.Int).SetUint64(val), nil } for _, txSig := range txSigs { if txSig.Err != nil { @@ -197,11 +218,11 @@ func (s *envelopeSource) getJuelsPerLamport(ctx context.Context) (*big.Int, erro if !isNewTransmission { continue } + // don't block on zero value - handling zero values should happen on the consumer if newTransmission.JuelsPerLamport == 0 { - s.log.Infow("zero value for juels/lamport feed is not supported") - continue + s.log.Warnw("zero value for juels/lamport feed is not supported") } - return new(big.Int).SetUint64(newTransmission.JuelsPerLamport), nil + return new(big.Int).SetUint64(newTransmission.JuelsPerLamport), s.setJuelsPerLamport(newTransmission.JuelsPerLamport) } } return nil, fmt.Errorf("no correct NewTransmission event found in the last %d transactions on contract state '%s'", txSigsPageSize, s.feedConfig.StateAccountBase58) diff --git a/pkg/monitoring/source_envelope_test.go b/pkg/monitoring/source_envelope_test.go index d16db09b9..f4f58847e 100644 --- a/pkg/monitoring/source_envelope_test.go +++ b/pkg/monitoring/source_envelope_test.go @@ -10,11 +10,14 @@ import ( bin "github.com/gagliardetto/binary" "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/libocr/offchainreporting2/types" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" commonMonitoring "github.com/smartcontractkit/chainlink-common/pkg/monitoring" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/mocks" "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/testutils" @@ -144,7 +147,7 @@ func TestEnvelopeSource(t *testing.T) { mock.Anything, // ctx feedConfig.StateAccount, mock.Anything, // // because it's hard to mock pointer values! - ).Return(fakeTxSignatures, nil) + ).Return(fakeTxSignatures, nil).Once() chainReader.On("GetTransaction", mock.Anything, // ctx fakeTxSignatures[0].Signature, @@ -155,7 +158,8 @@ func TestEnvelopeSource(t *testing.T) { ).Return(fakeTxResult, nil) // Call Fetch - factory := NewEnvelopeSourceFactory(chainReader, testutils.NewNullLogger()) + lgr, logs := logger.TestLoggerObserved(t, zapcore.DebugLevel) + factory := NewEnvelopeSourceFactory(chainReader, lgr) source, err := factory.NewSource(chainConfig, feedConfig) require.NoError(t, err) rawEnvelope, err := source.Fetch(context.Background()) @@ -221,6 +225,23 @@ func TestEnvelopeSource(t *testing.T) { AggregatorRoundID: 0x13841a, } require.Equal(t, expectedEnvelope, envelope) + + t.Run("empty tx list should return cached JuelsPerLamport", func(t *testing.T) { + chainReader.On("GetSignaturesForAddressWithOpts", + mock.Anything, + feedConfig.StateAccount, + mock.Anything, + ).Return([]*rpc.TransactionSignature{}, nil).Once() // return empty + + envSource, ok := source.(*envelopeSource) + require.True(t, ok) + cacheValue := envSource.readJuelsPerLamport() + + v, err := envSource.getJuelsPerLamport(tests.Context(t)) + require.NoError(t, err) + require.Equal(t, cacheValue, v.Uint64()) + tests.AssertLogEventually(t, logs.FilterLevelExact(zapcore.WarnLevel), "no transactions found, falling back to cached value - history may have been pruned (cached_value=0 indicates pruned txs encountered on startup)") + }) } func TestGetLinkAvailableForPayment(t *testing.T) { From 5db105672bc386daaad9d8905648f1127f594ea1 Mon Sep 17 00:00:00 2001 From: Aaron Lu <50029043+aalu1418@users.noreply.github.com> Date: Mon, 16 Sep 2024 07:29:08 -0600 Subject: [PATCH 2/3] Update pkg/monitoring/source_envelope.go Co-authored-by: Jordan Krage --- pkg/monitoring/source_envelope.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/monitoring/source_envelope.go b/pkg/monitoring/source_envelope.go index 7b970c197..8b0850ed6 100644 --- a/pkg/monitoring/source_envelope.go +++ b/pkg/monitoring/source_envelope.go @@ -74,7 +74,7 @@ func (s *envelopeSource) setJuelsPerLamport(v uint64) error { func (s *envelopeSource) readJuelsPerLamport() uint64 { s.lock.RLock() - s.lock.RUnlock() + defer s.lock.RUnlock() return s.cacheJuelsPerLamport } From 2a8ded29f8e4c47e94d9499e5b42dd029ad9d6a6 Mon Sep 17 00:00:00 2001 From: aalu1418 <50029043+aalu1418@users.noreply.github.com> Date: Mon, 16 Sep 2024 08:38:35 -0600 Subject: [PATCH 3/3] fix: logger import --- pkg/monitoring/source_envelope_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/monitoring/source_envelope_test.go b/pkg/monitoring/source_envelope_test.go index f4f58847e..08abb79a2 100644 --- a/pkg/monitoring/source_envelope_test.go +++ b/pkg/monitoring/source_envelope_test.go @@ -10,12 +10,12 @@ import ( bin "github.com/gagliardetto/binary" "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" - "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/libocr/offchainreporting2/types" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" + "github.com/smartcontractkit/chainlink-common/pkg/logger" commonMonitoring "github.com/smartcontractkit/chainlink-common/pkg/monitoring" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" @@ -158,7 +158,7 @@ func TestEnvelopeSource(t *testing.T) { ).Return(fakeTxResult, nil) // Call Fetch - lgr, logs := logger.TestLoggerObserved(t, zapcore.DebugLevel) + lgr, logs := logger.TestObserved(t, zapcore.DebugLevel) factory := NewEnvelopeSourceFactory(chainReader, lgr) source, err := factory.NewSource(chainConfig, feedConfig) require.NoError(t, err)