diff --git a/pkg/monitoring/source_envelope.go b/pkg/monitoring/source_envelope.go index 30648ecef..8b0850ed6 100644 --- a/pkg/monitoring/source_envelope.go +++ b/pkg/monitoring/source_envelope.go @@ -43,9 +43,9 @@ func (s *envelopeSourceFactory) NewSource( return nil, fmt.Errorf("expected feedConfig to be of type config.SolanaFeedConfig not %T", feedConfig) } return &envelopeSource{ - s.client, - solanaFeedConfig, - s.log, + client: s.client, + feedConfig: solanaFeedConfig, + log: s.log, }, nil } @@ -57,6 +57,25 @@ type envelopeSource struct { client ChainReader feedConfig config.SolanaFeedConfig log commonMonitoring.Logger + + // these values are cached because transactions can be pruned from an RPC node + // if no transactions are found and without a cache, the `getJuelsPerLamport` call will block the entire source.Fetch + lock sync.RWMutex + cacheJuelsPerLamport uint64 +} + +func (s *envelopeSource) setJuelsPerLamport(v uint64) error { + s.lock.Lock() + s.cacheJuelsPerLamport = v + s.lock.Unlock() + + return nil +} + +func (s *envelopeSource) readJuelsPerLamport() uint64 { + s.lock.RLock() + defer s.lock.RUnlock() + return s.cacheJuelsPerLamport } func (s *envelopeSource) Fetch(ctx context.Context) (interface{}, error) { @@ -163,7 +182,9 @@ func (s *envelopeSource) getJuelsPerLamport(ctx context.Context) (*big.Int, erro return nil, fmt.Errorf("failed to fetch tx signatures for state account '%s': %w", s.feedConfig.StateAccountBase58, err) } if len(txSigs) == 0 { - return nil, fmt.Errorf("found no transactions from state account '%s'", s.feedConfig.StateAccountBase58) + val := s.readJuelsPerLamport() + s.log.Warnw("no transactions found, falling back to cached value - history may have been pruned (cached_value=0 indicates pruned txs encountered on startup)", "state_account", s.feedConfig.StateAccountBase58, "cached_value", val) + return new(big.Int).SetUint64(val), nil } for _, txSig := range txSigs { if txSig.Err != nil { @@ -197,11 +218,11 @@ func (s *envelopeSource) getJuelsPerLamport(ctx context.Context) (*big.Int, erro if !isNewTransmission { continue } + // don't block on zero value - handling zero values should happen on the consumer if newTransmission.JuelsPerLamport == 0 { - s.log.Infow("zero value for juels/lamport feed is not supported") - continue + s.log.Warnw("zero value for juels/lamport feed is not supported") } - return new(big.Int).SetUint64(newTransmission.JuelsPerLamport), nil + return new(big.Int).SetUint64(newTransmission.JuelsPerLamport), s.setJuelsPerLamport(newTransmission.JuelsPerLamport) } } return nil, fmt.Errorf("no correct NewTransmission event found in the last %d transactions on contract state '%s'", txSigsPageSize, s.feedConfig.StateAccountBase58) diff --git a/pkg/monitoring/source_envelope_test.go b/pkg/monitoring/source_envelope_test.go index d16db09b9..08abb79a2 100644 --- a/pkg/monitoring/source_envelope_test.go +++ b/pkg/monitoring/source_envelope_test.go @@ -13,8 +13,11 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2/types" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + "github.com/smartcontractkit/chainlink-common/pkg/logger" commonMonitoring "github.com/smartcontractkit/chainlink-common/pkg/monitoring" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/mocks" "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/testutils" @@ -144,7 +147,7 @@ func TestEnvelopeSource(t *testing.T) { mock.Anything, // ctx feedConfig.StateAccount, mock.Anything, // // because it's hard to mock pointer values! - ).Return(fakeTxSignatures, nil) + ).Return(fakeTxSignatures, nil).Once() chainReader.On("GetTransaction", mock.Anything, // ctx fakeTxSignatures[0].Signature, @@ -155,7 +158,8 @@ func TestEnvelopeSource(t *testing.T) { ).Return(fakeTxResult, nil) // Call Fetch - factory := NewEnvelopeSourceFactory(chainReader, testutils.NewNullLogger()) + lgr, logs := logger.TestObserved(t, zapcore.DebugLevel) + factory := NewEnvelopeSourceFactory(chainReader, lgr) source, err := factory.NewSource(chainConfig, feedConfig) require.NoError(t, err) rawEnvelope, err := source.Fetch(context.Background()) @@ -221,6 +225,23 @@ func TestEnvelopeSource(t *testing.T) { AggregatorRoundID: 0x13841a, } require.Equal(t, expectedEnvelope, envelope) + + t.Run("empty tx list should return cached JuelsPerLamport", func(t *testing.T) { + chainReader.On("GetSignaturesForAddressWithOpts", + mock.Anything, + feedConfig.StateAccount, + mock.Anything, + ).Return([]*rpc.TransactionSignature{}, nil).Once() // return empty + + envSource, ok := source.(*envelopeSource) + require.True(t, ok) + cacheValue := envSource.readJuelsPerLamport() + + v, err := envSource.getJuelsPerLamport(tests.Context(t)) + require.NoError(t, err) + require.Equal(t, cacheValue, v.Uint64()) + tests.AssertLogEventually(t, logs.FilterLevelExact(zapcore.WarnLevel), "no transactions found, falling back to cached value - history may have been pruned (cached_value=0 indicates pruned txs encountered on startup)") + }) } func TestGetLinkAvailableForPayment(t *testing.T) {