@@ -26,6 +26,7 @@ import (
26
26
connectapi "github.com/grafana/pyroscope/pkg/api/connect"
27
27
"github.com/grafana/pyroscope/pkg/cfg"
28
28
"github.com/grafana/pyroscope/pkg/phlare"
29
+ "github.com/grafana/pyroscope/pkg/tenant"
29
30
)
30
31
31
32
func getFreeTCPPorts (address string , count int ) ([]int , error ) {
@@ -58,34 +59,64 @@ func newComponent(target string) *Component {
58
59
}
59
60
}
60
61
61
- func NewMicroServiceCluster () * Cluster {
62
- // use custom http client to resolve dynamically to healthy components
63
-
64
- c := & Cluster {}
62
+ type testTransport struct {
63
+ defaultDialContext func (ctx context.Context , network , addr string ) (net.Conn , error )
64
+ next http.RoundTripper
65
+ c * Cluster
66
+ }
65
67
68
+ // use custom http transport to resolve dynamically to healthy components
69
+ func newTestTransport (c * Cluster ) http.RoundTripper {
66
70
defaultTransport := http .DefaultTransport .(* http.Transport )
67
- transport := & http.Transport {
68
- DialContext : func (ctx context.Context , network , addr string ) (net.Conn , error ) {
69
- var err error
70
- switch addr {
71
- case "push:80" :
72
- addr , err = c .pickHealthyComponent ("distributor" )
73
- if err != nil {
74
- return nil , err
75
- }
76
- case "querier:80" :
77
- addr , err = c .pickHealthyComponent ("query-frontend" , "querier" )
78
- if err != nil {
79
- return nil , err
80
- }
81
- default :
82
- return nil , fmt .Errorf ("unknown addr %s" , addr )
83
- }
71
+ t := & testTransport {
72
+ defaultDialContext : defaultTransport .DialContext ,
73
+ c : c ,
74
+ }
75
+ t .next = & http.Transport {
76
+ Proxy : defaultTransport .Proxy ,
77
+ TLSClientConfig : defaultTransport .TLSClientConfig ,
78
+ TLSHandshakeTimeout : defaultTransport .TLSHandshakeTimeout ,
79
+ ExpectContinueTimeout : defaultTransport .ExpectContinueTimeout ,
80
+ MaxIdleConns : defaultTransport .MaxIdleConns ,
81
+ IdleConnTimeout : defaultTransport .IdleConnTimeout ,
82
+ ForceAttemptHTTP2 : defaultTransport .ForceAttemptHTTP2 ,
83
+ DialContext : t .DialContext ,
84
+ }
85
+ return t
86
+ }
84
87
85
- return defaultTransport .DialContext (ctx , network , addr )
86
- },
88
+ func (t * testTransport ) RoundTrip (req * http.Request ) (* http.Response , error ) {
89
+ tenantID , err := tenant .ExtractTenantIDFromContext (req .Context ())
90
+ if err == nil {
91
+ req .Header .Set ("X-Scope-OrgID" , tenantID )
87
92
}
88
- c .httpClient = & http.Client {Transport : transport }
93
+ return t .next .RoundTrip (req )
94
+ }
95
+
96
+ func (t * testTransport ) DialContext (ctx context.Context , network , addr string ) (net.Conn , error ) {
97
+ var err error
98
+ switch addr {
99
+ case "push:80" :
100
+ addr , err = t .c .pickHealthyComponent ("distributor" )
101
+ if err != nil {
102
+ return nil , err
103
+ }
104
+ case "querier:80" :
105
+ addr , err = t .c .pickHealthyComponent ("query-frontend" , "querier" )
106
+ if err != nil {
107
+ return nil , err
108
+ }
109
+ default :
110
+ return nil , fmt .Errorf ("unknown addr %s" , addr )
111
+ }
112
+
113
+ return t .defaultDialContext (ctx , network , addr )
114
+ }
115
+
116
+ func NewMicroServiceCluster () * Cluster {
117
+ c := & Cluster {}
118
+
119
+ c .httpClient = & http.Client {Transport : newTestTransport (c )}
89
120
c .Components = []* Component {
90
121
newComponent ("distributor" ),
91
122
newComponent ("distributor" ),
@@ -205,6 +236,7 @@ func (c *Cluster) Prepare() (err error) {
205
236
listenAddrFlags ("127.0.0.1" )... )
206
237
comp .flags = append (comp .flags ,
207
238
[]string {
239
+ "-auth.multitenancy-enabled=true" ,
208
240
"-tracing.enabled=false" , // data race
209
241
"-distributor.replication-factor=3" ,
210
242
"-store-gateway.sharding-ring.replication-factor=3" ,
0 commit comments