Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8b149b0
implemented jaeger dependencies graph API (`/select/jaeger/api/depend…
Amper Sep 8, 2025
a34c782
Fix formatting in README.md for dependency links
Amper Sep 9, 2025
2692214
fix after rebasing master
Amper Sep 15, 2025
f7c2372
Merge branch 'master' into dependencies-api
jiekun Sep 16, 2025
d305081
feature: [vtgen] improve trace span_id generation logic
jiekun Sep 18, 2025
b9f10d9
feature: [vtselect] limit the lookbehind window for dependency API to…
jiekun Sep 18, 2025
388e680
feature: [dependency] add API doc
jiekun Sep 18, 2025
6005d16
feature: [dependency] add API doc
jiekun Sep 18, 2025
4e148b0
Merge branch 'master' into dependencies-api
Amper Sep 22, 2025
7b63c52
feature: [dependency] temporary result for dependency APIs
jiekun Sep 24, 2025
bcc928e
feature: [dependency] split dependency to differet apps
jiekun Sep 26, 2025
f81aa35
docs: solve conflict
jiekun Sep 26, 2025
60be6a3
feature: [dependency] polish the background job start and stop for se…
jiekun Sep 28, 2025
352dc5a
feature: [dependency] add more comments to the dependency functions
jiekun Sep 28, 2025
c84411c
feature: [dependency] make linter happy
jiekun Sep 28, 2025
827912a
feature: [dependency] add integration test, and fix the call count fi…
jiekun Sep 28, 2025
8d20559
feature: [dependency] add another test case, where child calls parent…
jiekun Sep 28, 2025
793343d
feature: [dependency] add comment for the test case
jiekun Sep 28, 2025
a4d1336
feature: [dependency] update doc for background task flag
jiekun Sep 28, 2025
2eaf892
feature: [dependency] update doc for dependencies API
jiekun Sep 28, 2025
bf8b386
chore: apply review suggestions. increased default lookbehind for ser…
jiekun Oct 5, 2025
757174e
feature: [dependency] update dependencies of logstorage
jiekun Oct 5, 2025
d59963b
Merge branch 'master' into dependencies-api
jiekun Oct 5, 2025
29cce0f
feature: [dependency] update docs
jiekun Oct 5, 2025
d1f55b0
feature: [dependency] fix GetMetric according to https://github.com/V…
jiekun Oct 5, 2025
d3f69e5
feature: [dependency] claim that service graph is experimental
jiekun Oct 5, 2025
2208492
feature: [dependency] apply review suggestions
jiekun Oct 7, 2025
6d82e1f
feature: [dependency] apply review suggestions
jiekun Oct 7, 2025
1b69eba
Update docs/victoriatraces/querying/README.md
jiekun Oct 7, 2025
bac8021
Update docs/victoriatraces/changelog/CHANGELOG.md
jiekun Oct 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions app/victoria-traces/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"

"github.com/VictoriaMetrics/VictoriaTraces/app/victoria-traces/servicegraph"
"github.com/VictoriaMetrics/VictoriaTraces/app/vtinsert"
"github.com/VictoriaMetrics/VictoriaTraces/app/vtinsert/insertutil"
"github.com/VictoriaMetrics/VictoriaTraces/app/vtselect"
Expand Down Expand Up @@ -49,6 +50,9 @@ func main() {
insertutil.SetLogRowsStorage(&vtstorage.Storage{})
vtinsert.Init()

// optional background task(s)
servicegraph.Init()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think this comment adds much value; consider removing it or adding more context.


go httpserver.Serve(listenAddrs, requestHandler, httpserver.ServeOptions{
UseProxyProtocol: useProxyProtocol,
})
Expand All @@ -66,6 +70,7 @@ func main() {
}
logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds())

servicegraph.Stop()
vtinsert.Stop()
vtselect.Stop()
vtstorage.Stop()
Expand Down
101 changes: 101 additions & 0 deletions app/victoria-traces/servicegraph/servicegraph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package servicegraph

import (
"context"
"flag"
"time"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"

vtinsert "github.com/VictoriaMetrics/VictoriaTraces/app/vtinsert/opentelemetry"
vtselect "github.com/VictoriaMetrics/VictoriaTraces/app/vtselect/traces/query"
"github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage"
)

var (
enableServiceGraphTask = flag.Bool("servicegraph.enableTask", false, "Whether to enable background task for generating service graph. It should only be enabled on VictoriaTraces single-node or vtstorage.")
serviceGraphTaskInterval = flag.Duration("servicegraph.taskInterval", time.Minute, "The background task interval for generating service graph data. It requires setting -servicegraph.enableTask=true.")
serviceGraphTaskTimeout = flag.Duration("servicegraph.taskTimeout", 30*time.Second, "The background task timeout duration for generating service graph data. It requires setting -servicegraph.enableTask=true.")
serviceGraphTaskLookbehind = flag.Duration("servicegraph.taskLookbehind", time.Minute, "The lookbehind window for each time service graph background task run. It requires setting -servicegraph.enableTask=true.")
serviceGraphTaskLimit = flag.Uint64("servicegraph.taskLimit", 1000, "How many service graph relations each task could fetch for each tenant. It requires setting -servicegraph.enableTask=true.")
)

var (
sgt *serviceGraphTask
)

func Init() {
if *enableServiceGraphTask {
sgt = newServiceGraphTask()
sgt.Start()
}
}

func Stop() {
if *enableServiceGraphTask {
sgt.Stop()
}
}

type serviceGraphTask struct {
stopCh chan struct{}
}

func newServiceGraphTask() *serviceGraphTask {
return &serviceGraphTask{
stopCh: make(chan struct{}),
}
}

func (sgt *serviceGraphTask) Start() {
logger.Infof("starting servicegraph background task, interval: %v, lookbehind: %v", *serviceGraphTaskInterval, *serviceGraphTaskLookbehind)
go func() {
ticker := time.NewTicker(*serviceGraphTaskInterval)
defer ticker.Stop()

for {
select {
case <-sgt.stopCh:
return
case <-ticker.C:
ctx, cancelFunc := context.WithTimeout(context.Background(), *serviceGraphTaskTimeout)
GenerateServiceGraphTimeRange(ctx)
cancelFunc()
}
}
}()
}

func (sgt *serviceGraphTask) Stop() {
close(sgt.stopCh)
}

func GenerateServiceGraphTimeRange(ctx context.Context) {
endTime := time.Now().Truncate(*serviceGraphTaskInterval)
startTime := endTime.Add(-*serviceGraphTaskLookbehind)

tenantIDs, err := vtstorage.GetTenantIDsByTimeRange(ctx, startTime.UnixNano(), endTime.UnixNano())
if err != nil {
logger.Errorf("cannot get tenant ids: %s", err)
return
}

// query and persist operations are executed sequentially, which helps not to consume excessive resources.
for _, tenantID := range tenantIDs {
// query service graph relations
rows, err := vtselect.GetServiceGraphTimeRange(ctx, tenantID, startTime, endTime, *serviceGraphTaskLimit)
if err != nil {
logger.Errorf("cannot get service graph for time range [%d, %d]: %s", startTime.Unix(), endTime.Unix(), err)
return
}
if len(rows) == 0 {
return
}

// persist service graph relations
err = vtinsert.PersistServiceGraph(ctx, tenantID, rows, endTime)
if err != nil {
logger.Errorf("cannot presist service graph for time range [%d, %d]: %s", startTime.Unix(), endTime.Unix(), err)
}
}
}
36 changes: 36 additions & 0 deletions app/vtgen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func main() {
// The traceIDMap recorded old traceID->new traceID.
// Spans with same old traceID should be replaced with same new traceID.
traceIDMap := make(map[string]string)
spanIDMap := make(map[string]string)

// The timeOffset is the time offset of span timestamp and current timestamp.
// All spans' timestamp should be increased by this offset.
Expand Down Expand Up @@ -115,6 +116,26 @@ func main() {
}
}

// replace SpanID
if sid, ok := spanIDMap[sp.SpanID]; ok {
sp.SpanID = sid
} else {
spanID := generateSpanID()
oldSpanID := sp.SpanID
sp.SpanID = spanID
spanIDMap[oldSpanID] = spanID
}

// replace parentSpanID
if sid, ok := spanIDMap[sp.ParentSpanID]; ok {
sp.ParentSpanID = sid
} else {
parentSpanID := generateSpanID()
oldParentSpanID := sp.ParentSpanID
sp.ParentSpanID = parentSpanID
spanIDMap[oldParentSpanID] = parentSpanID
}

// adjust the timestamp of the span.
sp.StartTimeUnixNano = sp.StartTimeUnixNano + timeOffset
sp.EndTimeUnixNano = sp.EndTimeUnixNano + timeOffset + uint64(rand.Int63n(100000000))
Expand Down Expand Up @@ -198,12 +219,27 @@ func loadTestData() [][]byte {
return bodyList
}

var traceIDMutex sync.Mutex

func generateTraceID() string {
traceIDMutex.Lock()
defer traceIDMutex.Unlock()

h := md5.New()
h.Write([]byte(strconv.FormatInt(time.Now().UnixNano(), 10)))
return hex.EncodeToString(h.Sum(nil))
}

var spanIDMutex sync.Mutex

func generateSpanID() string {
spanIDMutex.Lock()
defer spanIDMutex.Unlock()
h := md5.New()
h.Write([]byte(strconv.FormatInt(time.Now().UnixNano(), 10)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the function be called concurrently? What would happen if it produced the same span ID?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. This is not strong enough. Identical traceID resulted in incorrect relations.
I think adding a mutex would be enough.

return hex.EncodeToString(h.Sum(nil))[:16]
}

// readWrite Does the following:
// 1. read request body binary files like `1.bin`, `2.bin` and puts them into `BodyList`.
// 2. encode and compress the `BodyList` into `[]byte`.
Expand Down
19 changes: 19 additions & 0 deletions app/vtinsert/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package opentelemetry

import (
"context"
"fmt"
"net/http"
"strconv"
Expand Down Expand Up @@ -284,3 +285,21 @@ func appendKeyValuesWithPrefixSuffix(fields []logstorage.Field, kvs []*otelpb.Ke
}
return fields
}

func PersistServiceGraph(ctx context.Context, tenantID logstorage.TenantID, fields [][]logstorage.Field, timestamp time.Time) error {
cp := insertutil.CommonParams{
TenantID: tenantID,
TimeFields: []string{"_time"},
}
lmp := cp.NewLogMessageProcessor("internalinsert_servicegraph", false)

for _, row := range fields {
f := append(row, logstorage.Field{
Name: "_msg",
Value: "-",
})
lmp.AddRow(timestamp.UnixNano(), f, []logstorage.Field{{Name: otelpb.ServiceGraphStreamName, Value: "-"}})
}
lmp.MustClose()
return nil
}
91 changes: 89 additions & 2 deletions app/vtselect/traces/jaeger/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ func RequestHandler(ctx context.Context, w http.ResponseWriter, r *http.Request)
return true
} else if path == "/select/jaeger/api/dependencies" {
jaegerDependenciesRequests.Inc()
// todo it require additional component to calculate the dependency graph. not implemented yet.
httpserver.Errorf(w, r, "/api/dependencies API is not supported yet.")
processGetDependenciesRequest(ctx, w, r)
jaegerDependenciesDuration.UpdateDuration(startTime)
return true
}
Expand Down Expand Up @@ -401,3 +400,91 @@ func hashProcess(process process) uint64 {
hashpool.Put(d)
return h
}

// processGetDependenciesRequest handle the Jaeger /api/dependencies API request.
func processGetDependenciesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
cp, err := query.GetCommonParams(r)
if err != nil {
httpserver.Errorf(w, r, "incorrect query params: %s", err)
return
}

param, err := parseJaegerDependenciesQueryParam(ctx, r)
if err != nil {
httpserver.Errorf(w, r, "incorrect dependencies query params: %s", err)
return
}

rows, err := query.GetServiceGraphList(ctx, cp, param)
if err != nil {
httpserver.Errorf(w, r, "get dependencies error: %s", err)
return
}

if len(rows) == 0 {
// Write empty results
w.Header().Set("Content-Type", "application/json")
WriteGetDependenciesResponse(w, nil)
return
}

dependencies := make([]*dependencyLink, 0)
for _, row := range rows {
dependency := &dependencyLink{}
for _, f := range row.Fields {
switch f.Name {
case "parent":
dependency.parent = f.Value
case "child":
dependency.child = f.Value
case "callCount":
dependency.callCount, err = strconv.ParseUint(f.Value, 10, 64)
if err != nil {
logger.Errorf("cannot parse callCount [%s]: %s", f.Value, err)
continue
}
}
}
if dependency.parent != "" && dependency.child != "" && dependency.callCount > 0 {
dependencies = append(dependencies, dependency)
}
}

// Write results
w.Header().Set("Content-Type", "application/json")
WriteGetDependenciesResponse(w, dependencies)
}

// parseJaegerDependenciesQueryParam parse Jaeger request to unified ServiceGraphQueryParameters.
func parseJaegerDependenciesQueryParam(_ context.Context, r *http.Request) (*query.ServiceGraphQueryParameters, error) {
var err error

// default params
p := &query.ServiceGraphQueryParameters{
EndTs: time.Now(),
Lookback: time.Hour,
}
q := r.URL.Query()

endTs := q.Get("endTs")
if endTs != "" {
unixMilli, err := strconv.ParseInt(endTs, 10, 64)
if err != nil {
return nil, fmt.Errorf("cannot parse endTs [%s]: %w", endTs, err)
}
p.EndTs = time.UnixMilli(unixMilli)
}

lookback := q.Get("lookback")
if lookback != "" {
if strings.TrimLeft(lookback, "0123456789") == "" {
lookback += "ms"
}
p.Lookback, err = time.ParseDuration(lookback)
if err != nil {
return nil, fmt.Errorf("cannot parse lookback [%s]: %w", lookback, err)
}
}

return p, nil
}
25 changes: 25 additions & 0 deletions app/vtselect/traces/jaeger/jaeger.qtpl
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,31 @@
}
{% endfunc %}

{% func GetDependenciesResponse(dependencies []*dependencyLink) %}
{
"data":[
{% if len(dependencies) > 0 %}
{%= dependencyJson(dependencies[0]) %}
{% for _, dependency := range dependencies[1:] %}
,{%= dependencyJson(dependency) %}
{% endfor %}
{% endif %}
],
"errors": null,
"limit": 0,
"offset": 0,
"total": {%d= len(dependencies) %}
}
{% endfunc %}

{% func dependencyJson(dependency *dependencyLink) %}
{
"parent": {%q= dependency.parent %},
"child": {%q= dependency.child %},
"callCount": {%dul= dependency.callCount %}
}
{% endfunc %}

{% func traceJson(trace *trace) %}
{
"processes": {
Expand Down
Loading