Skip to content

Commit

Permalink
Merge pull request observatorium#575 from rubenvp8510/traceql_support
Browse files Browse the repository at this point in the history
Expose tempo trace query API using the gateway
  • Loading branch information
philipgough authored Nov 7, 2023
2 parents 56abe03 + 79930ad commit 1f40fe4
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 47 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ Usage of ./observatorium-api:
File containing the default x509 private key matching --tls.server.cert-file. Leave blank to disable TLS.
-traces.read.endpoint string
The endpoint against which to make HTTP read requests for traces.
-traces.tempo.endpoint string
The endpoint against which to make HTTP read requests for traces using traceQL (tempo API).
-traces.tenant-header string
The name of the HTTP header containing the tenant ID to forward to upstream OpenTelemetry collector. (default "X-Tenant")
-traces.tls.ca-file string
Expand Down
138 changes: 92 additions & 46 deletions api/traces/v1/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type handlerConfiguration struct {
spanRoutePrefix string
readMiddlewares []func(http.Handler) http.Handler
writeMiddlewares []func(http.Handler) http.Handler
tempoMiddlewares []func(http.Handler) http.Handler
}

// HandlerOption modifies the handler's configuration.
Expand Down Expand Up @@ -80,6 +81,13 @@ func WithReadMiddleware(m func(http.Handler) http.Handler) HandlerOption {
}
}

// WithTempoMiddleware adds a middleware for all tempo read operations.
func WithTempoMiddleware(m func(http.Handler) http.Handler) HandlerOption {
return func(h *handlerConfiguration) {
h.tempoMiddlewares = append(h.tempoMiddlewares, m)
}
}

// WithWriteMiddleware adds a middleware for all write operations.
func WithWriteMiddleware(m func(http.Handler) http.Handler) HandlerOption {
return func(h *handlerConfiguration) {
Expand All @@ -101,11 +109,11 @@ func (n nopInstrumentHandler) NewHandler(labels prometheus.Labels, handler http.
// The web UI handler is able to rewrite
// HTML to change the <base> attribute so that it works with the Observatorium-style
// "/api/v1/traces/{tenant}/" URLs.
func NewV2Handler(read *url.URL, readTemplate string, upstreamCA []byte, upstreamCert *stdtls.Certificate, opts ...HandlerOption) http.Handler {
if read == nil && readTemplate == "" {
func NewV2Handler(read *url.URL, readTemplate string, tempo *url.URL, upstreamCA []byte, upstreamCert *stdtls.Certificate, opts ...HandlerOption) http.Handler {

if read == nil && readTemplate == "" && tempo == nil {
panic("missing Jaeger read url")
}

c := &handlerConfiguration{
logger: log.NewNopLogger(),
registry: prometheus.NewRegistry(),
Expand All @@ -118,23 +126,74 @@ func NewV2Handler(read *url.URL, readTemplate string, upstreamCA []byte, upstrea

r := chi.NewRouter()

var proxyRead http.Handler
{
level.Debug(c.logger).Log("msg", "Configuring upstream Jaeger", "queryv2", read)
middlewareMetrics := proxy.MiddlewareMetrics(c.registry, prometheus.Labels{"proxy": "tracesv1-read"})

if read != nil || readTemplate != "" {

var upstreamMiddleware proxy.Middleware
if read != nil {
upstreamMiddleware = proxy.MiddlewareSetUpstream(read)
} else {
upstreamMiddleware = middlewareSetTemplatedUpstream(c.logger, readTemplate)
var proxyRead http.Handler
{
level.Debug(c.logger).Log("msg", "Configuring upstream Jaeger", "queryv2", read)

var upstreamMiddleware proxy.Middleware
if read != nil {
upstreamMiddleware = proxy.MiddlewareSetUpstream(read)
} else {
upstreamMiddleware = middlewareSetTemplatedUpstream(c.logger, readTemplate)
}

middlewares := proxy.Middlewares(
upstreamMiddleware,
proxy.MiddlewareSetPrefixHeader(),
proxy.MiddlewareLogger(c.logger),
middlewareMetrics,
)

t := &http.Transport{
DialContext: (&net.Dialer{
Timeout: dialTimeout,
}).DialContext,
TLSClientConfig: tls.NewClientConfig(upstreamCA, upstreamCert),
}

proxyRead = &httputil.ReverseProxy{
Director: middlewares,
ErrorLog: proxy.Logger(c.logger),
Transport: otelhttp.NewTransport(t),

// This is a key piece, it changes <base href=> tags on text/html content
ModifyResponse: jaegerUIResponseModifier,
}
}

middlewares := proxy.Middlewares(
upstreamMiddleware,
proxy.MiddlewareSetPrefixHeader(),
proxy.MiddlewareLogger(c.logger),
proxy.MiddlewareMetrics(c.registry, prometheus.Labels{"proxy": "tracesv1-read"}),
)
r.Group(func(r chi.Router) {
r.Use(c.readMiddlewares...)
r.Get("/api/traces*", c.instrument.NewHandler(
prometheus.Labels{"group": "tracesv1api", "handler": "traces"},
proxyRead))
r.Get("/api/services*", c.instrument.NewHandler(
prometheus.Labels{"group": "tracesv1api", "handler": "services"},
proxyRead))
r.Get("/api/dependencies*", c.instrument.NewHandler(
prometheus.Labels{"group": "tracesv1api", "handler": "dependencies"},
proxyRead))
r.Get("/api/metrics*", c.instrument.NewHandler(
prometheus.Labels{"group": "metricsv1api", "handler": "metrics"},
proxyRead))
r.Get("/static/*", c.instrument.NewHandler(
prometheus.Labels{"group": "tracesv1static", "handler": "ui"},
proxyRead))
r.Get("/search*", c.instrument.NewHandler(
prometheus.Labels{"group": "tracesv1ui", "handler": "ui"},
proxyRead))
r.Get("/favicon.ico", c.instrument.NewHandler(
prometheus.Labels{"group": "tracesv1ui", "handler": "ui"},
proxyRead))
})
}

// if tempo upstream is enabled, configure proxy and route
if tempo != nil {
level.Debug(c.logger).Log("msg", "Configuring upstream Tempo", "queryv2", tempo)

t := &http.Transport{
DialContext: (&net.Dialer{
Expand All @@ -143,40 +202,27 @@ func NewV2Handler(read *url.URL, readTemplate string, upstreamCA []byte, upstrea
TLSClientConfig: tls.NewClientConfig(upstreamCA, upstreamCert),
}

proxyRead = &httputil.ReverseProxy{
middlewares := proxy.Middlewares(
proxy.MiddlewareRemoveURLPrefix("tempo"),
proxy.MiddlewareSetUpstream(tempo),
proxy.MiddlewareSetPrefixHeader(),
proxy.MiddlewareLogger(c.logger),
middlewareMetrics,
)

tempoProxyRead := &httputil.ReverseProxy{
Director: middlewares,
ErrorLog: proxy.Logger(c.logger),
Transport: otelhttp.NewTransport(t),

// This is a key piece, it changes <base href=> tags on text/html content
ModifyResponse: jaegerUIResponseModifier,
}
}

r.Group(func(r chi.Router) {
r.Use(c.readMiddlewares...)
r.Get("/api/traces*", c.instrument.NewHandler(
prometheus.Labels{"group": "tracesv1api", "handler": "traces"},
proxyRead))
r.Get("/api/services*", c.instrument.NewHandler(
prometheus.Labels{"group": "tracesv1api", "handler": "services"},
proxyRead))
r.Get("/api/dependencies*", c.instrument.NewHandler(
prometheus.Labels{"group": "tracesv1api", "handler": "dependencies"},
proxyRead))
r.Get("/api/metrics*", c.instrument.NewHandler(
prometheus.Labels{"group": "metricsv1api", "handler": "metrics"},
proxyRead))
r.Get("/static/*", c.instrument.NewHandler(
prometheus.Labels{"group": "tracesv1static", "handler": "ui"},
proxyRead))
r.Get("/search*", c.instrument.NewHandler(
prometheus.Labels{"group": "tracesv1ui", "handler": "ui"},
proxyRead))
r.Get("/favicon.ico", c.instrument.NewHandler(
prometheus.Labels{"group": "tracesv1ui", "handler": "ui"},
proxyRead))
})
r.Group(func(r chi.Router) {
r.Use(c.tempoMiddlewares...)
r.Get("/tempo/api*", c.instrument.NewHandler(
prometheus.Labels{"group": "tracesv1api", "handler": "traces"},
tempoProxyRead))
})
}

return r
}
Expand Down
20 changes: 19 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ type tracesConfig struct {

readEndpoint *url.URL
writeEndpoint string
tempoEndpoint *url.URL
upstreamWriteTimeout time.Duration
upstreamCAFile string
upstreamCertFile string
Expand Down Expand Up @@ -733,7 +734,7 @@ func main() {
}

// Traces.
if cfg.traces.enabled && (cfg.traces.readEndpoint != nil || cfg.traces.readTemplateEndpoint != "") {
if cfg.traces.enabled && (cfg.traces.readEndpoint != nil || cfg.traces.readTemplateEndpoint != "" || cfg.traces.tempoEndpoint != nil) {
r.Group(func(r chi.Router) {
r.Use(authentication.WithTenantMiddlewares(pm.Middlewares))
r.Use(authentication.WithTenantHeader(cfg.traces.tenantHeader, tenantIDs))
Expand All @@ -757,6 +758,7 @@ func main() {
tracesv1.NewV2Handler(
cfg.traces.readEndpoint,
cfg.traces.readTemplateEndpoint,
cfg.traces.tempoEndpoint,
tracesUpstreamCACert,
tracesUpstreamClientCert,
tracesv1.Logger(logger),
Expand All @@ -765,6 +767,8 @@ func main() {
tracesv1.WithSpanRoutePrefix("/api/traces/v1/{tenant}"),
tracesv1.WithReadMiddleware(authorization.WithAuthorizers(authorizers, rbac.Read, "traces")),
tracesv1.WithReadMiddleware(logsv1.WithEnforceAuthorizationLabels()),
tracesv1.WithTempoMiddleware(authorization.WithAuthorizers(authorizers, rbac.Read, "traces")),
tracesv1.WithTempoMiddleware(logsv1.WithEnforceAuthorizationLabels()),
tracesv1.WithWriteMiddleware(authorization.WithAuthorizers(authorizers, rbac.Write, "traces")),
),
),
Expand Down Expand Up @@ -990,6 +994,7 @@ func parseFlags() (config, error) {
rawLogsRuleLabelFilters string
rawLogsAuthExtractSelectors string
rawTracesReadEndpoint string
rawTracesTempoEndpoint string
rawTracesWriteEndpoint string
rawTracingEndpointType string
)
Expand Down Expand Up @@ -1077,6 +1082,8 @@ func parseFlags() (config, error) {
"The name of the PromQL label that should hold the tenant ID in metrics upstreams.")
flag.StringVar(&rawTracesReadEndpoint, "traces.read.endpoint", "",
"The endpoint against which to make HTTP read requests for traces.")
flag.StringVar(&rawTracesTempoEndpoint, "traces.tempo.endpoint", "",
"The endpoint against which to make HTTP read requests for traces using traceQL (tempo API).")
flag.StringVar(&cfg.traces.readTemplateEndpoint, "experimental.traces.read.endpoint-template", "",
"A template replacing --read.traces.endpoint, such as http://jaeger-{tenant}-query:16686")
flag.StringVar(&rawTracesWriteEndpoint, "traces.write.endpoint", "",
Expand Down Expand Up @@ -1265,6 +1272,17 @@ func parseFlags() (config, error) {
cfg.traces.readEndpoint = tracesReadEndpoint
}

if rawTracesTempoEndpoint != "" {
cfg.traces.enabled = true

tracesTempoEndpoint, err := url.ParseRequestURI(rawTracesTempoEndpoint)
if err != nil {
return cfg, fmt.Errorf("--traces.tempo.endpoint %q is invalid: %w", rawTracesTempoEndpoint, err)
}

cfg.traces.tempoEndpoint = tracesTempoEndpoint
}

if rawTracesWriteEndpoint != "" {
cfg.traces.enabled = true

Expand Down
10 changes: 10 additions & 0 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package proxy

import (
"context"
"fmt"
stdlog "log"
"net/http"
"net/url"
"path"
"strings"

"github.com/go-chi/chi/middleware"
"github.com/go-kit/log"
Expand All @@ -19,6 +21,8 @@ const (
prefixKey contextKey = "prefix"

PrefixHeader string = "X-Forwarded-Prefix"

TempoOrgIDHeaderName string = "X-Scope-OrgID"
)

type Middleware func(r *http.Request)
Expand All @@ -31,6 +35,12 @@ func Middlewares(middlewares ...Middleware) func(r *http.Request) {
}
}

func MiddlewareRemoveURLPrefix(prefix string) Middleware {
return func(r *http.Request) {
r.URL.Path = fmt.Sprintf("/%s", strings.TrimLeft(strings.Trim(r.URL.Path, "/"), prefix))
}
}

func MiddlewareSetUpstream(upstream *url.URL) Middleware {
return func(r *http.Request) {
r.URL.Scheme = upstream.Scheme
Expand Down
57 changes: 57 additions & 0 deletions test/e2e/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
logs testType = "logs"
traces testType = "traces"
tracesTemplate testType = "tracesTemplate"
tracesTempo testType = "tracesTempo"
tenants testType = "tenants"
interactive testType = "interactive"

Expand All @@ -34,6 +35,7 @@ const (
envAlertmanagerName = "alertmanager-api"
envLogsName = "logs-tail"
envTracesName = "traces-export"
envTracesTempoName = "traces-tempo"
envTracesTemplateName = "traces-template"
envTenantsName = "tenants"
envInteractive = "interactive"
Expand Down Expand Up @@ -431,3 +433,58 @@ func createLokiYAML(

testutil.Ok(t, err)
}

const tempoConfig = `
server:
http_listen_port: 3200
query_frontend:
search:
duration_slo: 5s
throughput_bytes_slo: 1.073741824e+09
trace_by_id:
duration_slo: 5s
distributor:
receivers:
otlp:
protocols:
http:
grpc:
opencensus:
ingester:
max_block_duration: 5m # cut the headblock when this much time passes. this is being set for demo purposes and should probably be left alone normally
compactor:
compaction:
block_retention: 1h # overall Tempo trace retention. set for demo purposes
storage:
trace:
backend: local # backend configuration to use
wal:
path: /tmp/tempo/wal # where to store the the wal locally
local:
path: /tmp/tempo/blocks
`

// createTempoConfigYAML() creates YAML for Tempo inside the Observatorium API boundary.
func createTempoConfigYAML(
t *testing.T,
e e2e.Environment,
) {
// Warn if a YAML change introduced a tab character
if strings.ContainsRune(tempoConfig, '\t') {
t.Errorf("Tab in the YAML")
}

yamlContent := []byte(tempoConfig)

err := os.WriteFile(
filepath.Join(e.SharedDir(), configSharedDir, "tempo.yaml"),
yamlContent,
os.FileMode(0644),
)
testutil.Ok(t, err)
}
2 changes: 2 additions & 0 deletions test/e2e/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func getContainerName(t *testing.T, tt testType, serviceName string) string {
return envTracesName + "-" + serviceName
case tracesTemplate:
return envTracesTemplateName + "-" + serviceName
case tracesTempo:
return envTracesTempoName + "-" + serviceName
default:
t.Fatal("invalid test type provided")
return ""
Expand Down
Loading

0 comments on commit 1f40fe4

Please sign in to comment.