Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

som: handle pruned transactions for slower updating feeds #857

Merged
merged 3 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions pkg/monitoring/source_envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func (s *envelopeSourceFactory) NewSource(
return nil, fmt.Errorf("expected feedConfig to be of type config.SolanaFeedConfig not %T", feedConfig)
}
return &envelopeSource{
s.client,
solanaFeedConfig,
s.log,
client: s.client,
feedConfig: solanaFeedConfig,
log: s.log,
}, nil
}

Expand All @@ -57,6 +57,25 @@ type envelopeSource struct {
client ChainReader
feedConfig config.SolanaFeedConfig
log commonMonitoring.Logger

// these values are cached because transactions can be pruned from an RPC node
// if no transactions are found and without a cache, the `getJuelsPerLamport` call will block the entire source.Fetch
lock sync.RWMutex
cacheJuelsPerLamport uint64
}

func (s *envelopeSource) setJuelsPerLamport(v uint64) error {
s.lock.Lock()
s.cacheJuelsPerLamport = v
s.lock.Unlock()

return nil
}

func (s *envelopeSource) readJuelsPerLamport() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.cacheJuelsPerLamport
}

func (s *envelopeSource) Fetch(ctx context.Context) (interface{}, error) {
Expand Down Expand Up @@ -163,7 +182,9 @@ func (s *envelopeSource) getJuelsPerLamport(ctx context.Context) (*big.Int, erro
return nil, fmt.Errorf("failed to fetch tx signatures for state account '%s': %w", s.feedConfig.StateAccountBase58, err)
}
if len(txSigs) == 0 {
return nil, fmt.Errorf("found no transactions from state account '%s'", s.feedConfig.StateAccountBase58)
val := s.readJuelsPerLamport()
s.log.Warnw("no transactions found, falling back to cached value - history may have been pruned (cached_value=0 indicates pruned txs encountered on startup)", "state_account", s.feedConfig.StateAccountBase58, "cached_value", val)
return new(big.Int).SetUint64(val), nil
}
for _, txSig := range txSigs {
if txSig.Err != nil {
Expand Down Expand Up @@ -197,11 +218,11 @@ func (s *envelopeSource) getJuelsPerLamport(ctx context.Context) (*big.Int, erro
if !isNewTransmission {
continue
}
// don't block on zero value - handling zero values should happen on the consumer
if newTransmission.JuelsPerLamport == 0 {
s.log.Infow("zero value for juels/lamport feed is not supported")
continue
s.log.Warnw("zero value for juels/lamport feed is not supported")
}
return new(big.Int).SetUint64(newTransmission.JuelsPerLamport), nil
return new(big.Int).SetUint64(newTransmission.JuelsPerLamport), s.setJuelsPerLamport(newTransmission.JuelsPerLamport)
}
}
return nil, fmt.Errorf("no correct NewTransmission event found in the last %d transactions on contract state '%s'", txSigsPageSize, s.feedConfig.StateAccountBase58)
Expand Down
25 changes: 23 additions & 2 deletions pkg/monitoring/source_envelope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import (
bin "github.com/gagliardetto/binary"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/libocr/offchainreporting2/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

commonMonitoring "github.com/smartcontractkit/chainlink-common/pkg/monitoring"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink-solana/pkg/monitoring/mocks"
"github.com/smartcontractkit/chainlink-solana/pkg/monitoring/testutils"
Expand Down Expand Up @@ -144,7 +147,7 @@ func TestEnvelopeSource(t *testing.T) {
mock.Anything, // ctx
feedConfig.StateAccount,
mock.Anything, // // because it's hard to mock pointer values!
).Return(fakeTxSignatures, nil)
).Return(fakeTxSignatures, nil).Once()
chainReader.On("GetTransaction",
mock.Anything, // ctx
fakeTxSignatures[0].Signature,
Expand All @@ -155,7 +158,8 @@ func TestEnvelopeSource(t *testing.T) {
).Return(fakeTxResult, nil)

// Call Fetch
factory := NewEnvelopeSourceFactory(chainReader, testutils.NewNullLogger())
lgr, logs := logger.TestLoggerObserved(t, zapcore.DebugLevel)
factory := NewEnvelopeSourceFactory(chainReader, lgr)
source, err := factory.NewSource(chainConfig, feedConfig)
require.NoError(t, err)
rawEnvelope, err := source.Fetch(context.Background())
Expand Down Expand Up @@ -221,6 +225,23 @@ func TestEnvelopeSource(t *testing.T) {
AggregatorRoundID: 0x13841a,
}
require.Equal(t, expectedEnvelope, envelope)

t.Run("empty tx list should return cached JuelsPerLamport", func(t *testing.T) {
chainReader.On("GetSignaturesForAddressWithOpts",
mock.Anything,
feedConfig.StateAccount,
mock.Anything,
).Return([]*rpc.TransactionSignature{}, nil).Once() // return empty

envSource, ok := source.(*envelopeSource)
require.True(t, ok)
cacheValue := envSource.readJuelsPerLamport()

v, err := envSource.getJuelsPerLamport(tests.Context(t))
require.NoError(t, err)
require.Equal(t, cacheValue, v.Uint64())
tests.AssertLogEventually(t, logs.FilterLevelExact(zapcore.WarnLevel), "no transactions found, falling back to cached value - history may have been pruned (cached_value=0 indicates pruned txs encountered on startup)")
})
}

func TestGetLinkAvailableForPayment(t *testing.T) {
Expand Down
Loading