Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/2.15.1 mercury #14547

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/shaggy-flowers-call.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"chainlink": patch
---

#added and internal changes
* Adds support for "tags" to Tasks that can be used generically.
* Adds a descendent task search method
* Added support in Mercury EA telemetry to utilize tags for telemetry extraction
5 changes: 5 additions & 0 deletions .changeset/slow-lizards-shout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Skip telemetry for market-status bridges #internal
203 changes: 156 additions & 47 deletions core/services/ocrcommon/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,17 +368,21 @@
e.lggr.Warnw(fmt.Sprintf("cannot get bridge response from bridge task, job=%d, id=%s, name=%q, expected string got %T", e.job.ID, trr.Task.DotID(), bridgeName, trr.Result.Value), "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
continue
}
eaTelem, err := parseEATelemetry([]byte(bridgeRawResponse))
eaResponse, err := parseEATelemetry([]byte(bridgeRawResponse))
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry, job=%d, id=%s, name=%q", e.job.ID, trr.Task.DotID(), bridgeName), "err", err, "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
}

assetSymbol := e.getAssetSymbolFromRequestData(bridgeTask.RequestData)
parsedBridgeData := parseBridgeRequestData(bridgeTask.RequestData, d.FeedVersion)
if parsedBridgeData.IsMarketStatus {
// Only collect telemetry for pricing bridges.
continue
}

benchmarkPrice, bidPrice, askPrice := e.getPricesFromResults(trr, d.TaskRunResults, d.FeedVersion)
benchmarkPrice, bidPrice, askPrice := e.getPricesFromBridgeTask(trr, d.TaskRunResults, d.FeedVersion)

t := &telem.EnhancedEAMercury{
DataSource: eaTelem.DataSource,
DataSource: eaResponse.DataSource,
DpBenchmarkPrice: benchmarkPrice,
DpBid: bidPrice,
DpAsk: askPrice,
Expand All @@ -390,10 +394,10 @@
MaxFinalizedTimestamp: mfts,
BridgeTaskRunStartedTimestamp: trr.CreatedAt.UnixMilli(),
BridgeTaskRunEndedTimestamp: trr.FinishedAt.Time.UnixMilli(),
ProviderRequestedTimestamp: eaTelem.ProviderRequestedTimestamp,
ProviderReceivedTimestamp: eaTelem.ProviderReceivedTimestamp,
ProviderDataStreamEstablished: eaTelem.ProviderDataStreamEstablished,
ProviderIndicatedTime: eaTelem.ProviderIndicatedTime,
ProviderRequestedTimestamp: eaResponse.ProviderRequestedTimestamp,
ProviderReceivedTimestamp: eaResponse.ProviderReceivedTimestamp,
ProviderDataStreamEstablished: eaResponse.ProviderDataStreamEstablished,
ProviderIndicatedTime: eaResponse.ProviderIndicatedTime,
Feed: e.job.OCR2OracleSpec.FeedID.Hex(),
ObservationBenchmarkPrice: bp.Int64(),
ObservationBid: bid.Int64(),
Expand All @@ -408,10 +412,11 @@
ConfigDigest: d.RepTimestamp.ConfigDigest.Hex(),
Round: int64(d.RepTimestamp.Round),
Epoch: int64(d.RepTimestamp.Epoch),
AssetSymbol: assetSymbol,
BridgeRequestData: bridgeTask.RequestData,

Check failure on line 415 in core/services/ocrcommon/telemetry.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

unknown field BridgeRequestData in struct literal of type telem.EnhancedEAMercury

Check failure on line 415 in core/services/ocrcommon/telemetry.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

unknown field BridgeRequestData in struct literal of type telem.EnhancedEAMercury

Check failure on line 415 in core/services/ocrcommon/telemetry.go

View workflow job for this annotation

GitHub Actions / lint

unknown field BridgeRequestData in struct literal of type telem.EnhancedEAMercury

Check failure on line 415 in core/services/ocrcommon/telemetry.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

unknown field BridgeRequestData in struct literal of type telem.EnhancedEAMercury

Check failure on line 415 in core/services/ocrcommon/telemetry.go

View workflow job for this annotation

GitHub Actions / Flakey Test Detection

unknown field BridgeRequestData in struct literal of type telem.EnhancedEAMercury
AssetSymbol: parsedBridgeData.AssetSymbol,
Version: uint32(d.FeedVersion),
}

e.lggr.Debugw(fmt.Sprintf("EA Telemetry = %+v", t), "feedID", e.job.OCR2OracleSpec.FeedID.Hex(), "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
bytes, err := proto.Marshal(t)
if err != nil {
e.lggr.Warnf("protobuf marshal failed %v", err.Error())
Expand All @@ -422,11 +427,32 @@
}
}

// getAssetSymbolFromRequestData parses the requestData of the bridge to generate an asset symbol pair
func (e *EnhancedTelemetryService[T]) getAssetSymbolFromRequestData(requestData string) string {
type telemetryAttributes struct {
PriceType *string `json:"priceType"`
}

func (e *EnhancedTelemetryService[T]) parseTelemetryAttributes(a string) (telemetryAttributes, error) {
attrs := &telemetryAttributes{}
err := json.Unmarshal([]byte(a), attrs)
if err != nil {
return telemetryAttributes{}, err
}
return *attrs, nil
}

type bridgeRequestData struct {
AssetSymbol string
IsMarketStatus bool
}

// parseRequestData parses the requestData of the bridge.
func parseBridgeRequestData(requestData string, mercuryVersion mercuryutils.FeedVersion) bridgeRequestData {
type reqDataPayload struct {
To string `json:"to"`
From string `json:"from"`
Endpoint *string `json:"endpoint"`
To *string `json:"to"`
From *string `json:"from"`
Address *string `json:"address"` // used for view function ea only
Market *string `json:"market"` // used for market status ea only
}
type reqData struct {
Data reqDataPayload `json:"data"`
Expand All @@ -435,10 +461,25 @@
rd := &reqData{}
err := json.Unmarshal([]byte(requestData), rd)
if err != nil {
return ""
return bridgeRequestData{}
}

if mercuryVersion == 4 && ((rd.Data.Endpoint != nil && *rd.Data.Endpoint == "market-status") || (rd.Data.Market != nil && *rd.Data.Market != "")) {
return bridgeRequestData{
AssetSymbol: *rd.Data.Market,
IsMarketStatus: true,
}
}

return rd.Data.From + "/" + rd.Data.To
if rd.Data.From != nil && rd.Data.To != nil {
return bridgeRequestData{AssetSymbol: *rd.Data.From + "/" + *rd.Data.To}
}

if rd.Data.Address != nil {
return bridgeRequestData{AssetSymbol: *rd.Data.Address}
}

return bridgeRequestData{}
}

// ShouldCollectEnhancedTelemetryMercury checks if enhanced telemetry should be collected and sent
Expand All @@ -449,30 +490,112 @@
return false
}

// getPricesFromResults parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice,
const (
bid = "bid"
ask = "ask"
benchmark = "benchmark"
exchangeRate = "exchangeRate"
)

func (e *EnhancedTelemetryService[T]) getPricesFromBridgeTask(bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
var benchmarkPrice, bidPrice, askPrice float64

// This will assume that all fields we care about are tagged with the correct priceType
benchmarkPrice, bidPrice, askPrice = e.getPricesFromBridgeTaskByTelemetryField(bridgeTask, allTasks)

// If prices weren't parsed by telemetry fields - attempt to get prices using the legacy method
// This is for backwards compatibility with job specs that don't have the telemetry attributes set
if benchmarkPrice == 0 && bidPrice == 0 && askPrice == 0 {
benchmarkP, bidP, askP := e.getPricesFromResultsByOrder(bridgeTask, allTasks, mercuryVersion)
bidPrice = bidP
askPrice = askP
benchmarkPrice = benchmarkP
}

return benchmarkPrice, bidPrice, askPrice
}

// CollectTaskRunResultsWithTags collects TaskRunResults for descendent tasks with non-empty TaskTags.
func (e *EnhancedTelemetryService[T]) collectTaskRunResultsWithTags(bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults) []pipeline.TaskRunResult {
startTask := bridgeTask.Task
descendants := startTask.GetDescendantTasks()
var taskRunResultsWithTags []pipeline.TaskRunResult
for _, task := range descendants {
trr := allTasks.GetTaskRunResultOf(task)
if trr != nil {
if trr.Task.TaskTags() != "" {
taskRunResultsWithTags = append(taskRunResultsWithTags, *trr)
}
}
}
return taskRunResultsWithTags
}

// getPricesFromBridgeTaskByTelemetryField attempts to parse prices from via telemetry fields in the TaskTags
func (e *EnhancedTelemetryService[T]) getPricesFromBridgeTaskByTelemetryField(bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults) (float64, float64, float64) {
var benchmarkPrice, bidPrice, askPrice float64

// Outputs are the mapped tasks from this task.
var tasksWithTags = e.collectTaskRunResultsWithTags(bridgeTask, allTasks)

for _, trr := range tasksWithTags {

attributes, err := e.parseTelemetryAttributes(trr.Task.TaskTags())
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse telemetry attributes, feed=%s, taskTags=%s", e.job.OCR2OracleSpec.FeedID.Hex(), trr.Task.TaskTags()), "err", err)
continue
}

if attributes.PriceType != nil {
switch *attributes.PriceType {
case bid:
bidPrice = e.parsePriceFromTask(trr)
case ask:
askPrice = e.parsePriceFromTask(trr)
case benchmark:
benchmarkPrice = e.parsePriceFromTask(trr)
case exchangeRate:
price := e.parsePriceFromTask(trr)
benchmarkPrice, bidPrice, askPrice = price, price, price
case "":
e.lggr.Warnw(fmt.Sprintf("no priceType found in attributes, parsedAttributes=%+v, job %d, id %s", attributes, e.job.ID, trr.Task.DotID()))
}
}
}

return benchmarkPrice, bidPrice, askPrice
}

func (e *EnhancedTelemetryService[T]) parsePriceFromTask(trr pipeline.TaskRunResult) float64 {
var val float64
if trr.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("got error on EA telemetry price task, job %d, id %s: %s", e.job.ID, trr.Task.DotID(), trr.Result.Error), "err", trr.Result.Error)
return 0
}
val, err := getResultFloat64(&trr)
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry price to float64, DOT id %s", trr.Task.DotID()), "job", e.job.ID, "task_type", trr.Task.Type(), "task_tags", trr.Task.TaskTags(), "err", err)
}
return val
}

// getPricesFromResultsByOrder parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice,
// bid and ask. This functions expects the pipeline.TaskRunResults to be correctly ordered
func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
func (e *EnhancedTelemetryService[T]) getPricesFromResultsByOrder(startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
var benchmarkPrice, askPrice, bidPrice float64
var err error

// We rely on task results to be sorted in the correct order
benchmarkPriceTask := allTasks.GetNextTaskOf(startTask)
if benchmarkPriceTask == nil {
e.lggr.Warnf("cannot parse enhanced EA telemetry benchmark price, task is nil, job %d", e.job.ID)
return 0, 0, 0
}
if benchmarkPriceTask.Task.Type() == pipeline.TaskTypeJSONParse {
if benchmarkPriceTask.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry benchmark price, job %d, id %s: %s", e.job.ID, benchmarkPriceTask.Task.DotID(), benchmarkPriceTask.Result.Error), "err", benchmarkPriceTask.Result.Error)
} else {
benchmarkPrice, err = getResultFloat64(benchmarkPriceTask)
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry benchmark price, job %d, id %s", e.job.ID, benchmarkPriceTask.Task.DotID()), "err", err)
}
}
benchmarkPrice = e.parsePriceFromTask(*benchmarkPriceTask)
}

// mercury version 2 only supports benchmarkPrice
if mercuryVersion == 2 {
// mercury versions 2 and 4 only supports benchmarkPrice
if mercuryVersion == 2 || mercuryVersion == 4 {
return benchmarkPrice, 0, 0
}

Expand All @@ -482,31 +605,17 @@
return benchmarkPrice, 0, 0
}

if bidTask != nil && bidTask.Task.Type() == pipeline.TaskTypeJSONParse {
if bidTask.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry bid price, job %d, id %s: %s", e.job.ID, bidTask.Task.DotID(), bidTask.Result.Error), "err", bidTask.Result.Error)
} else {
bidPrice, err = getResultFloat64(bidTask)
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry bid price, job %d, id %s", e.job.ID, bidTask.Task.DotID()), "err", err)
}
}
if bidTask.Task.Type() == pipeline.TaskTypeJSONParse {
bidPrice = e.parsePriceFromTask(*bidTask)
}

askTask := allTasks.GetNextTaskOf(*bidTask)
if askTask == nil {
e.lggr.Warnf("cannot parse enhanced EA telemetry ask price, task is nil, job %d, id %s", e.job.ID, benchmarkPriceTask.Task.DotID())
return benchmarkPrice, bidPrice, 0
}
if askTask != nil && askTask.Task.Type() == pipeline.TaskTypeJSONParse {
if bidTask.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry ask price, job %d, id %s: %s", e.job.ID, askTask.Task.DotID(), askTask.Result.Error), "err", askTask.Result.Error)
} else {
askPrice, err = getResultFloat64(askTask)
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry ask price, job %d, id %s", e.job.ID, askTask.Task.DotID()), "err", err)
}
}
if askTask.Task.Type() == pipeline.TaskTypeJSONParse {
askPrice = e.parsePriceFromTask(*askTask)
}

return benchmarkPrice, bidPrice, askPrice
Expand Down
Loading
Loading