Skip to content

Commit

Permalink
som: handle pruned transactions for slower updating feeds (#857)
Browse files Browse the repository at this point in the history
* som: handle pruned transactions for slowly updating feeds

* Update pkg/monitoring/source_envelope.go

Co-authored-by: Jordan Krage <[email protected]>

* fix: logger import

---------

Co-authored-by: Jordan Krage <[email protected]>
  • Loading branch information
aalu1418 and jmank88 authored Sep 16, 2024
1 parent ac425a1 commit edd33df
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 9 deletions.
35 changes: 28 additions & 7 deletions pkg/monitoring/source_envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func (s *envelopeSourceFactory) NewSource(
return nil, fmt.Errorf("expected feedConfig to be of type config.SolanaFeedConfig not %T", feedConfig)
}
return &envelopeSource{
s.client,
solanaFeedConfig,
s.log,
client: s.client,
feedConfig: solanaFeedConfig,
log: s.log,
}, nil
}

Expand All @@ -57,6 +57,25 @@ type envelopeSource struct {
client ChainReader
feedConfig config.SolanaFeedConfig
log commonMonitoring.Logger

// these values are cached because transactions can be pruned from an RPC node
// if no transactions are found and without a cache, the `getJuelsPerLamport` call will block the entire source.Fetch
lock sync.RWMutex
cacheJuelsPerLamport uint64
}

func (s *envelopeSource) setJuelsPerLamport(v uint64) error {
s.lock.Lock()
s.cacheJuelsPerLamport = v
s.lock.Unlock()

return nil
}

func (s *envelopeSource) readJuelsPerLamport() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.cacheJuelsPerLamport
}

func (s *envelopeSource) Fetch(ctx context.Context) (interface{}, error) {
Expand Down Expand Up @@ -163,7 +182,9 @@ func (s *envelopeSource) getJuelsPerLamport(ctx context.Context) (*big.Int, erro
return nil, fmt.Errorf("failed to fetch tx signatures for state account '%s': %w", s.feedConfig.StateAccountBase58, err)
}
if len(txSigs) == 0 {
return nil, fmt.Errorf("found no transactions from state account '%s'", s.feedConfig.StateAccountBase58)
val := s.readJuelsPerLamport()
s.log.Warnw("no transactions found, falling back to cached value - history may have been pruned (cached_value=0 indicates pruned txs encountered on startup)", "state_account", s.feedConfig.StateAccountBase58, "cached_value", val)
return new(big.Int).SetUint64(val), nil
}
for _, txSig := range txSigs {
if txSig.Err != nil {
Expand Down Expand Up @@ -197,11 +218,11 @@ func (s *envelopeSource) getJuelsPerLamport(ctx context.Context) (*big.Int, erro
if !isNewTransmission {
continue
}
// don't block on zero value - handling zero values should happen on the consumer
if newTransmission.JuelsPerLamport == 0 {
s.log.Infow("zero value for juels/lamport feed is not supported")
continue
s.log.Warnw("zero value for juels/lamport feed is not supported")
}
return new(big.Int).SetUint64(newTransmission.JuelsPerLamport), nil
return new(big.Int).SetUint64(newTransmission.JuelsPerLamport), s.setJuelsPerLamport(newTransmission.JuelsPerLamport)
}
}
return nil, fmt.Errorf("no correct NewTransmission event found in the last %d transactions on contract state '%s'", txSigsPageSize, s.feedConfig.StateAccountBase58)
Expand Down
25 changes: 23 additions & 2 deletions pkg/monitoring/source_envelope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
commonMonitoring "github.com/smartcontractkit/chainlink-common/pkg/monitoring"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink-solana/pkg/monitoring/mocks"
"github.com/smartcontractkit/chainlink-solana/pkg/monitoring/testutils"
Expand Down Expand Up @@ -144,7 +147,7 @@ func TestEnvelopeSource(t *testing.T) {
mock.Anything, // ctx
feedConfig.StateAccount,
mock.Anything, // // because it's hard to mock pointer values!
).Return(fakeTxSignatures, nil)
).Return(fakeTxSignatures, nil).Once()
chainReader.On("GetTransaction",
mock.Anything, // ctx
fakeTxSignatures[0].Signature,
Expand All @@ -155,7 +158,8 @@ func TestEnvelopeSource(t *testing.T) {
).Return(fakeTxResult, nil)

// Call Fetch
factory := NewEnvelopeSourceFactory(chainReader, testutils.NewNullLogger())
lgr, logs := logger.TestObserved(t, zapcore.DebugLevel)
factory := NewEnvelopeSourceFactory(chainReader, lgr)
source, err := factory.NewSource(chainConfig, feedConfig)
require.NoError(t, err)
rawEnvelope, err := source.Fetch(context.Background())
Expand Down Expand Up @@ -221,6 +225,23 @@ func TestEnvelopeSource(t *testing.T) {
AggregatorRoundID: 0x13841a,
}
require.Equal(t, expectedEnvelope, envelope)

t.Run("empty tx list should return cached JuelsPerLamport", func(t *testing.T) {
chainReader.On("GetSignaturesForAddressWithOpts",
mock.Anything,
feedConfig.StateAccount,
mock.Anything,
).Return([]*rpc.TransactionSignature{}, nil).Once() // return empty

envSource, ok := source.(*envelopeSource)
require.True(t, ok)
cacheValue := envSource.readJuelsPerLamport()

v, err := envSource.getJuelsPerLamport(tests.Context(t))
require.NoError(t, err)
require.Equal(t, cacheValue, v.Uint64())
tests.AssertLogEventually(t, logs.FilterLevelExact(zapcore.WarnLevel), "no transactions found, falling back to cached value - history may have been pruned (cached_value=0 indicates pruned txs encountered on startup)")
})
}

func TestGetLinkAvailableForPayment(t *testing.T) {
Expand Down

0 comments on commit edd33df

Please sign in to comment.