From e4655ff1912bef203dca452d16a08ce168ccc893 Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 18 Sep 2024 04:58:00 -0600 Subject: [PATCH] view function EA telem support in mercury 2.15 release (#14465) * * Adds support for "tags" to Tasks that can be used generically. * Adds a descendent task search method * Added support in Mercury EA telemetry to utilize tags for telemetry extraction * fixing build issue * set version --- .changeset/shaggy-flowers-call.md | 8 + core/services/ocrcommon/telemetry.go | 171 +++++++++--- core/services/ocrcommon/telemetry_test.go | 258 +++++++++++++++++- core/services/pipeline/common.go | 12 + core/services/pipeline/common_test.go | 74 +++++ core/services/pipeline/runner_test.go | 44 +++ core/services/pipeline/task.base.go | 31 +++ core/services/pipeline/task.bridge_test.go | 12 + .../pipeline/task.eth_abi_decode_test.go | 14 + .../relay/evm/mercury/mocks/pipeline.go | 4 + .../telem/telem_enhanced_ea_mercury.proto | 1 + .../synchronization/telem/telem_wsrpc.pb.go | 2 +- package.json | 4 +- tools/bin/goreleaser_utils | 2 +- 14 files changed, 583 insertions(+), 54 deletions(-) create mode 100644 .changeset/shaggy-flowers-call.md diff --git a/.changeset/shaggy-flowers-call.md b/.changeset/shaggy-flowers-call.md new file mode 100644 index 00000000000..c88bdfa3043 --- /dev/null +++ b/.changeset/shaggy-flowers-call.md @@ -0,0 +1,8 @@ +--- +"chainlink": patch +--- + +#added and internal changes +* Adds support for "tags" to Tasks that can be used generically. +* Adds a descendent task search method +* Added support in Mercury EA telemetry to utilize tags for telemetry extraction diff --git a/core/services/ocrcommon/telemetry.go b/core/services/ocrcommon/telemetry.go index 2ef76800a42..f9d419ce427 100644 --- a/core/services/ocrcommon/telemetry.go +++ b/core/services/ocrcommon/telemetry.go @@ -368,17 +368,17 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced e.lggr.Warnw(fmt.Sprintf("cannot get bridge response from bridge task, job=%d, id=%s, name=%q, expected string got %T", e.job.ID, trr.Task.DotID(), bridgeName, trr.Result.Value), "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName) continue } - eaTelem, err := parseEATelemetry([]byte(bridgeRawResponse)) + eaResponse, err := parseEATelemetry([]byte(bridgeRawResponse)) if err != nil { e.lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry, job=%d, id=%s, name=%q", e.job.ID, trr.Task.DotID(), bridgeName), "err", err, "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName) } assetSymbol := e.getAssetSymbolFromRequestData(bridgeTask.RequestData) - benchmarkPrice, bidPrice, askPrice := e.getPricesFromResults(trr, d.TaskRunResults, d.FeedVersion) + benchmarkPrice, bidPrice, askPrice := e.getPricesFromBridgeTask(trr, d.TaskRunResults, d.FeedVersion) t := &telem.EnhancedEAMercury{ - DataSource: eaTelem.DataSource, + DataSource: eaResponse.DataSource, DpBenchmarkPrice: benchmarkPrice, DpBid: bidPrice, DpAsk: askPrice, @@ -390,10 +390,10 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced MaxFinalizedTimestamp: mfts, BridgeTaskRunStartedTimestamp: trr.CreatedAt.UnixMilli(), BridgeTaskRunEndedTimestamp: trr.FinishedAt.Time.UnixMilli(), - ProviderRequestedTimestamp: eaTelem.ProviderRequestedTimestamp, - ProviderReceivedTimestamp: eaTelem.ProviderReceivedTimestamp, - ProviderDataStreamEstablished: eaTelem.ProviderDataStreamEstablished, - ProviderIndicatedTime: eaTelem.ProviderIndicatedTime, + ProviderRequestedTimestamp: eaResponse.ProviderRequestedTimestamp, + ProviderReceivedTimestamp: eaResponse.ProviderReceivedTimestamp, + ProviderDataStreamEstablished: eaResponse.ProviderDataStreamEstablished, + ProviderIndicatedTime: eaResponse.ProviderIndicatedTime, Feed: e.job.OCR2OracleSpec.FeedID.Hex(), ObservationBenchmarkPrice: bp.Int64(), ObservationBid: bid.Int64(), @@ -408,10 +408,11 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced ConfigDigest: d.RepTimestamp.ConfigDigest.Hex(), Round: int64(d.RepTimestamp.Round), Epoch: int64(d.RepTimestamp.Epoch), + BridgeRequestData: bridgeTask.RequestData, AssetSymbol: assetSymbol, Version: uint32(d.FeedVersion), } - + e.lggr.Debugw(fmt.Sprintf("EA Telemetry = %+v", t), "feedID", e.job.OCR2OracleSpec.FeedID.Hex(), "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName) bytes, err := proto.Marshal(t) if err != nil { e.lggr.Warnf("protobuf marshal failed %v", err.Error()) @@ -422,11 +423,25 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced } } +type telemetryAttributes struct { + PriceType *string `json:"priceType"` +} + +func (e *EnhancedTelemetryService[T]) parseTelemetryAttributes(a string) (telemetryAttributes, error) { + attrs := &telemetryAttributes{} + err := json.Unmarshal([]byte(a), attrs) + if err != nil { + return telemetryAttributes{}, err + } + return *attrs, nil +} + // getAssetSymbolFromRequestData parses the requestData of the bridge to generate an asset symbol pair func (e *EnhancedTelemetryService[T]) getAssetSymbolFromRequestData(requestData string) string { type reqDataPayload struct { - To string `json:"to"` - From string `json:"from"` + To *string `json:"to"` + From *string `json:"from"` + Address *string `json:"address"` // used for view function ea only } type reqData struct { Data reqDataPayload `json:"data"` @@ -438,7 +453,15 @@ func (e *EnhancedTelemetryService[T]) getAssetSymbolFromRequestData(requestData return "" } - return rd.Data.From + "/" + rd.Data.To + if rd.Data.From != nil && rd.Data.To != nil { + return *rd.Data.From + "/" + *rd.Data.To + } + + if rd.Data.Address != nil { + return *rd.Data.Address + } + + return "" } // ShouldCollectEnhancedTelemetryMercury checks if enhanced telemetry should be collected and sent @@ -449,11 +472,100 @@ func ShouldCollectEnhancedTelemetryMercury(jb job.Job) bool { return false } -// getPricesFromResults parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice, +const ( + bid = "bid" + ask = "ask" + benchmark = "benchmark" + exchangeRate = "exchangeRate" +) + +func (e *EnhancedTelemetryService[T]) getPricesFromBridgeTask(bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) { + var benchmarkPrice, bidPrice, askPrice float64 + + // This will assume that all fields we care about are tagged with the correct priceType + benchmarkPrice, bidPrice, askPrice = e.getPricesFromBridgeTaskByTelemetryField(bridgeTask, allTasks) + + // If prices weren't parsed by telemetry fields - attempt to get prices using the legacy method + // This is for backwards compatibility with job specs that don't have the telemetry attributes set + if benchmarkPrice == 0 && bidPrice == 0 && askPrice == 0 { + benchmarkP, bidP, askP := e.getPricesFromResultsByOrder(bridgeTask, allTasks, mercuryVersion) + bidPrice = bidP + askPrice = askP + benchmarkPrice = benchmarkP + } + + return benchmarkPrice, bidPrice, askPrice +} + +// CollectTaskRunResultsWithTags collects TaskRunResults for descendent tasks with non-empty TaskTags. +func (e *EnhancedTelemetryService[T]) collectTaskRunResultsWithTags(bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults) []pipeline.TaskRunResult { + startTask := bridgeTask.Task + descendants := startTask.GetDescendantTasks() + var taskRunResultsWithTags []pipeline.TaskRunResult + for _, task := range descendants { + trr := allTasks.GetTaskRunResultOf(task) + if trr != nil { + if trr.Task.TaskTags() != "" { + taskRunResultsWithTags = append(taskRunResultsWithTags, *trr) + } + } + } + return taskRunResultsWithTags +} + +// getPricesFromBridgeTaskByTelemetryField attempts to parse prices from via telemetry fields in the TaskTags +func (e *EnhancedTelemetryService[T]) getPricesFromBridgeTaskByTelemetryField(bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults) (float64, float64, float64) { + var benchmarkPrice, bidPrice, askPrice float64 + + // Outputs are the mapped tasks from this task. + var tasksWithTags = e.collectTaskRunResultsWithTags(bridgeTask, allTasks) + + for _, trr := range tasksWithTags { + + attributes, err := e.parseTelemetryAttributes(trr.Task.TaskTags()) + if err != nil { + e.lggr.Warnw(fmt.Sprintf("cannot parse telemetry attributes, feed=%s, taskTags=%s", e.job.OCR2OracleSpec.FeedID.Hex(), trr.Task.TaskTags()), "err", err) + continue + } + + if attributes.PriceType != nil { + switch *attributes.PriceType { + case bid: + bidPrice = e.parsePriceFromTask(trr) + case ask: + askPrice = e.parsePriceFromTask(trr) + case benchmark: + benchmarkPrice = e.parsePriceFromTask(trr) + case exchangeRate: + price := e.parsePriceFromTask(trr) + benchmarkPrice, bidPrice, askPrice = price, price, price + case "": + e.lggr.Warnw(fmt.Sprintf("no priceType found in attributes, parsedAttributes=%+v, job %d, id %s", attributes, e.job.ID, trr.Task.DotID())) + } + } + } + + return benchmarkPrice, bidPrice, askPrice +} + +func (e *EnhancedTelemetryService[T]) parsePriceFromTask(trr pipeline.TaskRunResult) float64 { + var val float64 + if trr.Result.Error != nil { + e.lggr.Warnw(fmt.Sprintf("got error on EA telemetry price task, job %d, id %s: %s", e.job.ID, trr.Task.DotID(), trr.Result.Error), "err", trr.Result.Error) + return 0 + } + val, err := getResultFloat64(&trr) + if err != nil { + e.lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry price to float64, DOT id %s", trr.Task.DotID()), "job", e.job.ID, "task_type", trr.Task.Type(), "task_tags", trr.Task.TaskTags(), "err", err) + } + return val +} + +// getPricesFromResultsByOrder parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice, // bid and ask. This functions expects the pipeline.TaskRunResults to be correctly ordered -func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) { +func (e *EnhancedTelemetryService[T]) getPricesFromResultsByOrder(startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) { var benchmarkPrice, askPrice, bidPrice float64 - var err error + // We rely on task results to be sorted in the correct order benchmarkPriceTask := allTasks.GetNextTaskOf(startTask) if benchmarkPriceTask == nil { @@ -461,14 +573,7 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta return 0, 0, 0 } if benchmarkPriceTask.Task.Type() == pipeline.TaskTypeJSONParse { - if benchmarkPriceTask.Result.Error != nil { - e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry benchmark price, job %d, id %s: %s", e.job.ID, benchmarkPriceTask.Task.DotID(), benchmarkPriceTask.Result.Error), "err", benchmarkPriceTask.Result.Error) - } else { - benchmarkPrice, err = getResultFloat64(benchmarkPriceTask) - if err != nil { - e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry benchmark price, job %d, id %s", e.job.ID, benchmarkPriceTask.Task.DotID()), "err", err) - } - } + benchmarkPrice = e.parsePriceFromTask(*benchmarkPriceTask) } // mercury version 2 only supports benchmarkPrice @@ -482,15 +587,8 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta return benchmarkPrice, 0, 0 } - if bidTask != nil && bidTask.Task.Type() == pipeline.TaskTypeJSONParse { - if bidTask.Result.Error != nil { - e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry bid price, job %d, id %s: %s", e.job.ID, bidTask.Task.DotID(), bidTask.Result.Error), "err", bidTask.Result.Error) - } else { - bidPrice, err = getResultFloat64(bidTask) - if err != nil { - e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry bid price, job %d, id %s", e.job.ID, bidTask.Task.DotID()), "err", err) - } - } + if bidTask.Task.Type() == pipeline.TaskTypeJSONParse { + bidPrice = e.parsePriceFromTask(*bidTask) } askTask := allTasks.GetNextTaskOf(*bidTask) @@ -498,15 +596,8 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta e.lggr.Warnf("cannot parse enhanced EA telemetry ask price, task is nil, job %d, id %s", e.job.ID, benchmarkPriceTask.Task.DotID()) return benchmarkPrice, bidPrice, 0 } - if askTask != nil && askTask.Task.Type() == pipeline.TaskTypeJSONParse { - if bidTask.Result.Error != nil { - e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry ask price, job %d, id %s: %s", e.job.ID, askTask.Task.DotID(), askTask.Result.Error), "err", askTask.Result.Error) - } else { - askPrice, err = getResultFloat64(askTask) - if err != nil { - e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry ask price, job %d, id %s", e.job.ID, askTask.Task.DotID()), "err", err) - } - } + if askTask.Task.Type() == pipeline.TaskTypeJSONParse { + askPrice = e.parsePriceFromTask(*askTask) } return benchmarkPrice, bidPrice, askPrice diff --git a/core/services/ocrcommon/telemetry_test.go b/core/services/ocrcommon/telemetry_test.go index f764e7380f8..13b5ef34d43 100644 --- a/core/services/ocrcommon/telemetry_test.go +++ b/core/services/ocrcommon/telemetry_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/ethereum/go-ethereum/common" + "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -445,7 +446,83 @@ var trrsMercuryV2 = pipeline.TaskRunResults{ }, } -func TestGetPricesFromResults(t *testing.T) { +func TestGetPricesFromBridgeByTelemetryField(t *testing.T) { + lggr, _ := logger.TestLoggerObserved(t, zap.WarnLevel) + e := EnhancedTelemetryService[EnhancedTelemetryMercuryData]{ + lggr: lggr, + job: &job.Job{ + ID: 0, + }, + } + + // These are intentionally out of order from the "legacy" method which expects order of `benchmark, bid, ask` + jsonParseTaskBid := pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(1, "json_parse_2", nil, nil, 2), + } + jsonParseTaskBid.BaseTask.Tags = `{"priceType": "bid"}` + jsonParseTaskAsk := pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(2, "json_parse_3", nil, nil, 3), + } + jsonParseTaskAsk.BaseTask.Tags = `{"priceType": "ask"}` + jsonParseTaskBenchmark := pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(3, "json_parse_1", nil, nil, 1), + } + jsonParseTaskBenchmark.BaseTask.Tags = `{"priceType": "benchmark"}` + + bridgeOutputs := []pipeline.Task{&jsonParseTaskAsk, &jsonParseTaskBid, &jsonParseTaskBenchmark} + + bridgeTask := pipeline.BridgeTask{ + Name: "bridge-task", + BaseTask: pipeline.NewBaseTask(0, "bridge", nil, bridgeOutputs, 0), + } + + // Create task run results + taskRunResults := pipeline.TaskRunResults{ + pipeline.TaskRunResult{ + Task: &bridgeTask, + Result: pipeline.Result{ + Value: bridgeResponse, + }, + }, + pipeline.TaskRunResult{ + Task: &jsonParseTaskBenchmark, + Result: pipeline.Result{ + Value: "123456.123456", + }, + }, + pipeline.TaskRunResult{ + Task: &jsonParseTaskBid, + Result: pipeline.Result{ + Value: "1234567.1234567", + }, + }, + pipeline.TaskRunResult{ + Task: &jsonParseTaskAsk, + Result: pipeline.Result{ + Value: "321123", + }, + }, + } + + benchmarkPrice, bidPrice, askPrice := e.getPricesFromBridgeTask(taskRunResults[0], taskRunResults, 1) + + require.Equal(t, 123456.123456, benchmarkPrice) + require.Equal(t, 1234567.1234567, bidPrice) + require.Equal(t, 321123.0, askPrice) + + // now removing the TaskTags will throw off the parsed order - and we'll be parsing the "incorrect" prices + // according to the legacy ordering approach + jsonParseTaskAsk.BaseTask.Tags = "" + jsonParseTaskBid.BaseTask.Tags = "" + jsonParseTaskBenchmark.BaseTask.Tags = "" + + wrongBenchmarkPrice, wrongBidPrice, wrongAskPrice := e.getPricesFromBridgeTask(taskRunResults[0], taskRunResults, 1) + require.Equal(t, 1234567.1234567, wrongBenchmarkPrice) + require.Equal(t, 321123.0, wrongBidPrice) + require.Equal(t, 123456.123456, wrongAskPrice) +} + +func TestGetPricesFromBridgeTaskByOrder(t *testing.T) { lggr, logs := logger.TestLoggerObserved(t, zap.WarnLevel) e := EnhancedTelemetryService[EnhancedTelemetryMercuryData]{ lggr: lggr, @@ -454,12 +531,12 @@ func TestGetPricesFromResults(t *testing.T) { }, } - benchmarkPrice, bid, ask := e.getPricesFromResults(trrsMercuryV1[0], trrsMercuryV1, 1) + benchmarkPrice, bid, ask := e.getPricesFromBridgeTask(trrsMercuryV1[0], trrsMercuryV1, 1) require.Equal(t, 123456.123456, benchmarkPrice) require.Equal(t, 1234567.1234567, bid) require.Equal(t, float64(321123), ask) - benchmarkPrice, bid, ask = e.getPricesFromResults(trrsMercuryV1[0], pipeline.TaskRunResults{}, 1) + benchmarkPrice, bid, ask = e.getPricesFromBridgeTask(trrsMercuryV1[0], pipeline.TaskRunResults{}, 1) require.Equal(t, float64(0), benchmarkPrice) require.Equal(t, float64(0), bid) require.Equal(t, float64(0), ask) @@ -467,12 +544,12 @@ func TestGetPricesFromResults(t *testing.T) { require.Contains(t, logs.All()[0].Message, "cannot parse enhanced EA telemetry") tt := trrsMercuryV1[:2] - e.getPricesFromResults(trrsMercuryV1[0], tt, 1) + e.getPricesFromBridgeTask(trrsMercuryV1[0], tt, 1) require.Equal(t, 2, logs.Len()) require.Contains(t, logs.All()[1].Message, "cannot parse enhanced EA telemetry bid price, task is nil") tt = trrsMercuryV1[:3] - e.getPricesFromResults(trrsMercuryV1[0], tt, 1) + e.getPricesFromBridgeTask(trrsMercuryV1[0], tt, 1) require.Equal(t, 3, logs.Len()) require.Contains(t, logs.All()[2].Message, "cannot parse enhanced EA telemetry ask price, task is nil") @@ -510,16 +587,16 @@ func TestGetPricesFromResults(t *testing.T) { Value: nil, }, }} - benchmarkPrice, bid, ask = e.getPricesFromResults(trrsMercuryV1[0], trrs2, 3) + benchmarkPrice, bid, ask = e.getPricesFromBridgeTask(trrsMercuryV1[0], trrs2, 3) require.Equal(t, benchmarkPrice, float64(0)) require.Equal(t, bid, float64(0)) require.Equal(t, ask, float64(0)) require.Equal(t, logs.Len(), 6) - require.Contains(t, logs.All()[3].Message, "cannot parse enhanced EA telemetry benchmark price") - require.Contains(t, logs.All()[4].Message, "cannot parse enhanced EA telemetry bid price") - require.Contains(t, logs.All()[5].Message, "cannot parse enhanced EA telemetry ask price") + require.Contains(t, logs.All()[3].Message, "cannot parse EA telemetry price to float64, DOT id ds1_benchmark") + require.Contains(t, logs.All()[4].Message, "cannot parse EA telemetry price to float64, DOT id ds2_bid") + require.Contains(t, logs.All()[5].Message, "cannot parse EA telemetry price to float64, DOT id ds3_ask") - benchmarkPrice, bid, ask = e.getPricesFromResults(trrsMercuryV1[0], trrsMercuryV2, 2) + benchmarkPrice, bid, ask = e.getPricesFromBridgeTask(trrsMercuryV1[0], trrsMercuryV2, 2) require.Equal(t, 123456.123456, benchmarkPrice) require.Equal(t, float64(0), bid) require.Equal(t, float64(0), ask) @@ -546,6 +623,165 @@ func TestGetAssetSymbolFromRequestData(t *testing.T) { require.Equal(t, e.getAssetSymbolFromRequestData(""), "") reqData := `{"data":{"to":"LINK","from":"USD"}}` require.Equal(t, e.getAssetSymbolFromRequestData(reqData), "USD/LINK") + viewFunctionReqData := `{"data":{"address":"0x12345678", "signature": "function stEthPerToken() view returns (int256)"}}` + require.Equal(t, "0x12345678", e.getAssetSymbolFromRequestData(viewFunctionReqData)) +} + +func getViewFunctionTaskRunResults() pipeline.TaskRunResults { + var taskViewFunctionParseValue = func() pipeline.MultiplyTask { + task := pipeline.MultiplyTask{ + BaseTask: pipeline.NewBaseTask(3, "ds1_parse", nil, nil, 3), + Times: "1", + } + task.BaseTask.Tags = `{"priceType": "exchangeRate"}` + return task + }() + + var taskViewFunctionDecode = pipeline.ETHABIDecodeTask{ + ABI: "uint256 data", + BaseTask: pipeline.NewBaseTask(2, "ds1_decode", nil, []pipeline.Task{&taskViewFunctionParseValue}, 2), + } + + var taskViewFunctionJSONParse = pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(1, "ds1_parse", nil, []pipeline.Task{&taskViewFunctionDecode}, 1), + } + + const viewFunctionBridgeResponse = `{ + "data": { + "result": "0x000000000000000000000000000000000000000000000000105ba6a589b23a81" + }, + "statusCode": 200, + "result": "0x000000000000000000000000000000000000000000000000105ba6a589b23a81", + "timestamps": { + "providerDataRequestedUnixMs": 1726243598046, + "providerDataReceivedUnixMs": 1726243598341 + }, + "meta": { + "adapterName": "VIEW_FUNCTION" + } + }` + + var taskViewFunctionBridgeRequest = pipeline.BridgeTask{ + Name: "bridge-view-function", + BaseTask: pipeline.NewBaseTask(0, "ds1", nil, []pipeline.Task{&taskViewFunctionJSONParse}, 0), + RequestData: `{"data":{"address":"0x1234","signature":"function stEthPerToken() external view returns (uint256)"}}`, + } + + return pipeline.TaskRunResults{ + pipeline.TaskRunResult{ + Task: &taskViewFunctionBridgeRequest, + Result: pipeline.Result{ + Value: viewFunctionBridgeResponse, + }, + }, + pipeline.TaskRunResult{ + Task: &taskViewFunctionJSONParse, + Result: pipeline.Result{ + Value: `0x000000000000000000000000000000000000000000000000105ba6a589b23a81`, + }, + }, + pipeline.TaskRunResult{ + Task: &taskViewFunctionDecode, + Result: pipeline.Result{ + Value: map[string]interface{}{ + "data": big.NewInt(1178718957397490305), + }, + }, + }, + pipeline.TaskRunResult{ + Task: &taskViewFunctionParseValue, + Result: pipeline.Result{ + Value: decimal.NewFromInt(1178718957397490305), + }, + }, + } +} + +func TestCollectMercuryEnhancedTelemetryV1ViewFunction(t *testing.T) { + wg := sync.WaitGroup{} + ingressClient := mocks.NewTelemetryService(t) + ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient) + monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("test-network", "test-chainID", "0xa", synchronization.EnhancedEAMercury) + + var sentMessage []byte + ingressClient.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) { + sentMessage = args[1].([]byte) + wg.Done() + }) + + lggr, _ := logger.TestLoggerObserved(t, zap.WarnLevel) + chTelem := make(chan EnhancedTelemetryMercuryData, 100) + chDone := make(chan struct{}) + feedID := common.HexToHash("0x111") + e := EnhancedTelemetryService[EnhancedTelemetryMercuryData]{ + chDone: chDone, + chTelem: chTelem, + job: &job.Job{ + Type: job.Type(pipeline.OffchainReporting2JobType), + OCR2OracleSpec: &job.OCR2OracleSpec{ + CaptureEATelemetry: true, + FeedID: &feedID, + }, + }, + lggr: lggr, + monitoringEndpoint: monitoringEndpoint, + } + servicetest.Run(t, &e) + + wg.Add(1) + + taskRunResults := getViewFunctionTaskRunResults() + + chTelem <- EnhancedTelemetryMercuryData{ + TaskRunResults: taskRunResults, + V1Observation: &mercuryv1.Observation{ + BenchmarkPrice: mercury.ObsResult[*big.Int]{Val: big.NewInt(111111)}, + Bid: mercury.ObsResult[*big.Int]{Val: big.NewInt(222222)}, + Ask: mercury.ObsResult[*big.Int]{Val: big.NewInt(333333)}, + CurrentBlockNum: mercury.ObsResult[int64]{Val: 123456789}, + CurrentBlockHash: mercury.ObsResult[[]byte]{Val: common.HexToHash("0x123321").Bytes()}, + CurrentBlockTimestamp: mercury.ObsResult[uint64]{Val: 987654321}, + }, + RepTimestamp: types.ReportTimestamp{ + ConfigDigest: types.ConfigDigest{2}, + Epoch: 11, + Round: 22, + }, + } + + expectedTelemetry := telem.EnhancedEAMercury{ + DataSource: "VIEW_FUNCTION", + DpBenchmarkPrice: 1178718957397490400, + DpBid: 1178718957397490400, + DpAsk: 1178718957397490400, + CurrentBlockNumber: 123456789, + CurrentBlockHash: common.HexToHash("0x123321").String(), + CurrentBlockTimestamp: 987654321, + BridgeTaskRunStartedTimestamp: taskRunResults[0].CreatedAt.UnixMilli(), + BridgeTaskRunEndedTimestamp: taskRunResults[0].FinishedAt.Time.UnixMilli(), + ProviderRequestedTimestamp: 1726243598046, + ProviderReceivedTimestamp: 1726243598341, + ProviderDataStreamEstablished: 0, + ProviderIndicatedTime: 0, + Feed: common.HexToHash("0x111").String(), + ObservationBenchmarkPrice: 111111, + ObservationBid: 222222, + ObservationAsk: 333333, + ConfigDigest: "0200000000000000000000000000000000000000000000000000000000000000", + Round: 22, + Epoch: 11, + BridgeRequestData: `{"data":{"address":"0x1234","signature":"function stEthPerToken() external view returns (uint256)"}}`, + AssetSymbol: "0x1234", + ObservationBenchmarkPriceString: "111111", + ObservationBidString: "222222", + ObservationAskString: "333333", + } + + expectedMessage, _ := proto.Marshal(&expectedTelemetry) + wg.Wait() + require.Equal(t, expectedMessage, sentMessage) + + chDone <- struct{}{} } func TestCollectMercuryEnhancedTelemetryV1(t *testing.T) { @@ -619,6 +855,7 @@ func TestCollectMercuryEnhancedTelemetryV1(t *testing.T) { ConfigDigest: "0200000000000000000000000000000000000000000000000000000000000000", Round: 22, Epoch: 11, + BridgeRequestData: `{"data":{"to":"LINK","from":"USD"}}`, AssetSymbol: "USD/LINK", ObservationBenchmarkPriceString: "111111", ObservationBidString: "222222", @@ -732,6 +969,7 @@ func TestCollectMercuryEnhancedTelemetryV2(t *testing.T) { ConfigDigest: "0200000000000000000000000000000000000000000000000000000000000000", Round: 22, Epoch: 11, + BridgeRequestData: `{"data":{"to":"LINK","from":"USD"}}`, AssetSymbol: "USD/LINK", ObservationBenchmarkPriceString: "111111", MaxFinalizedTimestamp: 321, diff --git a/core/services/pipeline/common.go b/core/services/pipeline/common.go index 763e50546fd..a8483ed68e5 100644 --- a/core/services/pipeline/common.go +++ b/core/services/pipeline/common.go @@ -58,6 +58,8 @@ type ( TaskRetries() uint32 TaskMinBackoff() time.Duration TaskMaxBackoff() time.Duration + TaskTags() string + GetDescendantTasks() []Task } Config interface { @@ -261,6 +263,16 @@ func (trrs TaskRunResults) Terminals() (terminals []TaskRunResult) { return } +// GetNextTaskOf returns the task with the next id or nil if it does not exist +func (trrs *TaskRunResults) GetTaskRunResultOf(task Task) *TaskRunResult { + for _, trr := range *trrs { + if trr.Task.Base().id == task.Base().id { + return &trr + } + } + return nil +} + // GetNextTaskOf returns the task with the next id or nil if it does not exist func (trrs *TaskRunResults) GetNextTaskOf(task TaskRunResult) *TaskRunResult { nextID := task.Task.Base().id + 1 diff --git a/core/services/pipeline/common_test.go b/core/services/pipeline/common_test.go index ce545ec14a0..ed7998b79e3 100644 --- a/core/services/pipeline/common_test.go +++ b/core/services/pipeline/common_test.go @@ -17,6 +17,14 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" ) +func TestAtrributesAttribute(t *testing.T) { + a := `ds1 [type=http method=GET tags=<{"attribute1":"value1", "attribute2":42}>];` + p, err := pipeline.Parse(a) + require.NoError(t, err) + task := p.Tasks[0] + assert.Equal(t, "{\"attribute1\":\"value1\", \"attribute2\":42}", task.TaskTags()) +} + func TestTimeoutAttribute(t *testing.T) { t.Parallel() @@ -320,3 +328,69 @@ func TestGetNextTaskOf(t *testing.T) { nextTask = trrs.GetNextTaskOf(*nextTask) assert.Empty(t, nextTask) } + +func TestGetDescendantTasks(t *testing.T) { + t.Parallel() + + t.Run("GetDescendantTasks with multiple levels of tasks", func(t *testing.T) { + l3T2 := pipeline.AnyTask{ + BaseTask: pipeline.NewBaseTask(6, "l3T2", nil, nil, 1), + } + l3T1 := pipeline.MedianTask{ + BaseTask: pipeline.NewBaseTask(5, "l3T1", nil, nil, 1), + } + l2T1 := pipeline.MultiplyTask{ + BaseTask: pipeline.NewBaseTask(4, "l2T1", nil, []pipeline.Task{&l3T1, &l3T2}, 1), + } + l1T1 := pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(3, "l1T1", nil, []pipeline.Task{&l2T1}, 2), + } + l1T2 := pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(2, "l1T2", nil, nil, 3), + } + l1T3 := pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(1, "l1T3", nil, nil, 4), + } + + baseTask := pipeline.BridgeTask{ + Name: "bridge-task", + BaseTask: pipeline.NewBaseTask(0, "baseTask", nil, []pipeline.Task{&l1T1, &l1T2, &l1T3}, 0), + } + + descendents := baseTask.GetDescendantTasks() + assert.Len(t, descendents, 6) + }) + + t.Run("GetDescendantTasks with duplicate tasks defined", func(t *testing.T) { + l2T1 := pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(2, "l1T2", nil, nil, 3), + } + l1T1 := pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(1, "l1T2", nil, []pipeline.Task{&l2T1, &l2T1, &l2T1}, 3), + } + taskWithRepeats := pipeline.BridgeTask{ + Name: "bridge-task", + BaseTask: pipeline.NewBaseTask(0, "taskWithRepeats", nil, []pipeline.Task{&l1T1, &l1T1, &l1T1}, 0), + } + descendents := taskWithRepeats.GetDescendantTasks() + assert.Len(t, descendents, 2) + }) + + t.Run("GetDescendantTasks with nil output tasks", func(t *testing.T) { + taskWithRepeats := pipeline.BridgeTask{ + Name: "bridge-task", + BaseTask: pipeline.NewBaseTask(0, "taskWithRepeats", nil, nil, 0), + } + descendents := taskWithRepeats.GetDescendantTasks() + assert.Len(t, descendents, 0) + }) + + t.Run("GetDescendantTasks with empty list of output tasks", func(t *testing.T) { + taskWithRepeats := pipeline.BridgeTask{ + Name: "bridge-task", + BaseTask: pipeline.NewBaseTask(0, "taskWithRepeats", nil, []pipeline.Task{}, 0), + } + descendents := taskWithRepeats.GetDescendantTasks() + assert.Len(t, descendents, 0) + }) +} diff --git a/core/services/pipeline/runner_test.go b/core/services/pipeline/runner_test.go index 022a77c9471..ea30b3ff086 100644 --- a/core/services/pipeline/runner_test.go +++ b/core/services/pipeline/runner_test.go @@ -131,6 +131,50 @@ ds5 [type=http method="GET" url="%s" index=2] require.Len(t, errorResults, 3) } +func Test_PipelineRunner_ExecuteEthAbiDecode(t *testing.T) { + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewTestGeneralConfig(t) + + mockResult := `{"data":{"result":"0x000000000000000000000000000000000000000000000000105ba6a589b23a81"}}` + s1 := httptest.NewServer(NewMockHandler(mockResult)) + defer s1.Close() + + bridgeFeedURL, err := url.ParseRequestURI(s1.URL) + require.NoError(t, err) + + _, bt := cltest.MustCreateBridge(t, db, cltest.BridgeOpts{URL: bridgeFeedURL.String()}) + + btORM := bridgesMocks.NewORM(t) + btORM.On("FindBridge", mock.Anything, bt.Name).Return(*bt, nil).Once() + + r, _ := newRunner(t, db, btORM, cfg) + + s := fmt.Sprintf(` + ds1 [type=bridge name="%s" timeout=0 requestData=<{"data": {"address": "0x1234"}}>] + ds1_parse [type=jsonparse path="data,result"] + ds1_decode [type=ethabidecode abi="int256 data" data="$(ds1_parse)"]; + ds1_value [type="multiply" input="$(ds1_decode.data)" times=1] + + ds1->ds1_parse->ds1_decode->ds1_value + +`, bt.Name.String()) + d, err := pipeline.Parse(s) + require.NoError(t, err) + + spec := pipeline.Spec{DotDagSource: s} + vars := pipeline.NewVarsFrom(nil) + + _, trrs, err := r.ExecuteRun(testutils.Context(t), spec, vars) + require.NoError(t, err) + require.Len(t, trrs, len(d.Tasks)) + + finalResults := trrs.FinalResult() + + val := finalResults.Values[0].(decimal.Decimal) + + assert.Equal(t, decimal.NewFromInt(1178718957397490305), val) +} + type taskRunWithVars struct { bridgeName string ds2URL, ds4URL string diff --git a/core/services/pipeline/task.base.go b/core/services/pipeline/task.base.go index 7a62f4e7ff8..3e1db5fcdb5 100644 --- a/core/services/pipeline/task.base.go +++ b/core/services/pipeline/task.base.go @@ -22,6 +22,8 @@ type BaseTask struct { MinBackoff time.Duration `mapstructure:"minBackoff"` MaxBackoff time.Duration `mapstructure:"maxBackoff"` + Tags string `mapstructure:"tags" json:"-"` + uuid uuid.UUID } @@ -77,3 +79,32 @@ func (t BaseTask) TaskMaxBackoff() time.Duration { } return time.Minute } + +func (t BaseTask) TaskTags() string { + return t.Tags +} + +// GetDescendantTasks retrieves all descendant tasks of a given task +func (t BaseTask) GetDescendantTasks() []Task { + if len(t.outputs) == 0 { + return []Task{} + } + var descendants []Task + queue := append([]Task{}, t.outputs...) + visited := make(map[int]bool) + + for len(queue) > 0 { + currentTask := queue[0] + queue = queue[1:] + + taskID := currentTask.ID() + if visited[taskID] { + continue + } + visited[taskID] = true + descendants = append(descendants, currentTask) + queue = append(queue, currentTask.Outputs()...) + } + + return descendants +} diff --git a/core/services/pipeline/task.bridge_test.go b/core/services/pipeline/task.bridge_test.go index d7519232eb5..cd81f8656fd 100644 --- a/core/services/pipeline/task.bridge_test.go +++ b/core/services/pipeline/task.bridge_test.go @@ -117,6 +117,18 @@ func mustReadFile(t testing.TB, file string) string { return string(content) } +// NewMockHandler returns an http.HandlerFunc that responds with the given payload for any request +func NewMockHandler(payload string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(payload)) + if err != nil { + http.Error(w, "Failed to write response", http.StatusInternalServerError) + } + } +} + func fakePriceResponder(t *testing.T, requestData map[string]interface{}, result decimal.Decimal, inputKey string, expectedInput interface{}) http.Handler { t.Helper() diff --git a/core/services/pipeline/task.eth_abi_decode_test.go b/core/services/pipeline/task.eth_abi_decode_test.go index 3c7f5b4776b..565e8b485dd 100644 --- a/core/services/pipeline/task.eth_abi_decode_test.go +++ b/core/services/pipeline/task.eth_abi_decode_test.go @@ -25,6 +25,20 @@ var testsABIDecode = []struct { expectedErrorCause error expectedErrorContains string }{ + { + "uint256", + "uint256 data", + "$(data)", + NewVarsFrom(map[string]interface{}{ + "data": "0x000000000000000000000000000000000000000000000000105ba6a589b23a81", + }), + nil, + map[string]interface{}{ + "data": big.NewInt(1178718957397490305), + }, + nil, + "", + }, { "uint256, bool, int256, string", "uint256 u, bool b, int256 i, string s", diff --git a/core/services/relay/evm/mercury/mocks/pipeline.go b/core/services/relay/evm/mercury/mocks/pipeline.go index 44be1377aeb..a7183c9a037 100644 --- a/core/services/relay/evm/mercury/mocks/pipeline.go +++ b/core/services/relay/evm/mercury/mocks/pipeline.go @@ -23,6 +23,10 @@ type MockTask struct { result pipeline.Result } +func (m *MockTask) GetDescendantTasks() []pipeline.Task { return nil } + +func (m *MockTask) TaskTags() string { return "{\"anything\": \"here\"}" } + func (m *MockTask) Type() pipeline.TaskType { return "MockTask" } func (m *MockTask) ID() int { return 0 } func (m *MockTask) DotID() string { return "" } diff --git a/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto b/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto index 8488eb1d509..dd2638d7bc4 100644 --- a/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto +++ b/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto @@ -12,6 +12,7 @@ message EnhancedEAMercury { double dp_bid=3; double dp_ask=4; bool dp_invariant_violation_detected=33; + string bridge_request_data = 35; // v1 fields (block range) int64 current_block_number=5; diff --git a/core/services/synchronization/telem/telem_wsrpc.pb.go b/core/services/synchronization/telem/telem_wsrpc.pb.go index e4028b4de49..e7df2090e4f 100644 --- a/core/services/synchronization/telem/telem_wsrpc.pb.go +++ b/core/services/synchronization/telem/telem_wsrpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-wsrpc. DO NOT EDIT. // versions: // - protoc-gen-go-wsrpc v0.0.1 -// - protoc v4.25.1 +// - protoc v5.28.0 package telem diff --git a/package.json b/package.json index ea2687a6a5f..e8bda4e61ae 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "chainlink", - "version": "2.15.1-mercury", + "version": "2.15.0", "description": "node of the decentralized oracle network, bridging on and off-chain computation", "main": "index.js", "scripts": { @@ -26,4 +26,4 @@ "@changesets/cli": "~2.26.2", "semver": "^7.6.1" } -} +} \ No newline at end of file diff --git a/tools/bin/goreleaser_utils b/tools/bin/goreleaser_utils index 4e1b3ffc4db..d05ed064e68 100755 --- a/tools/bin/goreleaser_utils +++ b/tools/bin/goreleaser_utils @@ -25,7 +25,7 @@ _get_arch() { _get_wasmvm_lib_path() { local -r platform="$1" local -r arch="$2" - wasmvm_dir=$(go list -json -m all | jq -r '. | select(.Path == "github.com/CosmWasm/wasmvm") | .Dir') + wasmvm_dir=$(go list -json -m github.com/CosmWasm/wasmvm | jq -r '.Dir') shared_lib_dir="$wasmvm_dir/internal/api" lib_name="libwasmvm" if [ "$platform" == "darwin" ]; then