forked from openzipkin-contrib/zipkin-go-opentracing
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tracer.go
440 lines (398 loc) · 13.8 KB
/
tracer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
package zipkintracer
import (
"errors"
"time"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
otobserver "github.com/opentracing-contrib/go-observer"
"github.com/openzipkin/zipkin-go-opentracing/flag"
)
// ErrInvalidEndpoint will be thrown if hostPort parameter is corrupted or host
// can't be resolved
var ErrInvalidEndpoint = errors.New("Invalid Endpoint. Please check hostPort parameter")
// Tracer extends the opentracing.Tracer interface with methods to
// probe implementation state, for use by zipkintracer consumers.
type Tracer interface {
opentracing.Tracer
// Options gets the Options used in New() or NewWithOptions().
Options() TracerOptions
}
// TracerOptions allows creating a customized Tracer.
type TracerOptions struct {
// shouldSample is a function which is called when creating a new Span and
// determines whether that Span is sampled. The randomized TraceID is supplied
// to allow deterministic sampling decisions to be made across different nodes.
shouldSample func(traceID uint64) bool
// trimUnsampledSpans turns potentially expensive operations on unsampled
// Spans into no-ops. More precisely, tags and log events are silently
// discarded. If NewSpanEventListener is set, the callbacks will still fire.
trimUnsampledSpans bool
// recorder receives Spans which have been finished.
recorder SpanRecorder
// newSpanEventListener can be used to enhance the tracer by effectively
// attaching external code to trace events. See NetTraceIntegrator for a
// practical example, and event.go for the list of possible events.
newSpanEventListener func() func(SpanEvent)
// dropAllLogs turns log events on all Spans into no-ops.
// If NewSpanEventListener is set, the callbacks will still fire.
dropAllLogs bool
// MaxLogsPerSpan limits the number of Logs in a span (if set to a nonzero
// value). If a span has more logs than this value, logs are dropped as
// necessary (and replaced with a log describing how many were dropped).
//
// About half of the MaxLogPerSpan logs kept are the oldest logs, and about
// half are the newest logs.
//
// If NewSpanEventListener is set, the callbacks will still fire for all log
// events. This value is ignored if DropAllLogs is true.
maxLogsPerSpan int
// debugAssertSingleGoroutine internally records the ID of the goroutine
// creating each Span and verifies that no operation is carried out on
// it on a different goroutine.
// Provided strictly for development purposes.
// Passing Spans between goroutine without proper synchronization often
// results in use-after-Finish() errors. For a simple example, consider the
// following pseudocode:
//
// func (s *Server) Handle(req http.Request) error {
// sp := s.StartSpan("server")
// defer sp.Finish()
// wait := s.queueProcessing(opentracing.ContextWithSpan(context.Background(), sp), req)
// select {
// case resp := <-wait:
// return resp.Error
// case <-time.After(10*time.Second):
// sp.LogEvent("timed out waiting for processing")
// return ErrTimedOut
// }
// }
//
// This looks reasonable at first, but a request which spends more than ten
// seconds in the queue is abandoned by the main goroutine and its trace
// finished, leading to use-after-finish when the request is finally
// processed. Note also that even joining on to a finished Span via
// StartSpanWithOptions constitutes an illegal operation.
//
// Code bases which do not require (or decide they do not want) Spans to
// be passed across goroutine boundaries can run with this flag enabled in
// tests to increase their chances of spotting wrong-doers.
debugAssertSingleGoroutine bool
// debugAssertUseAfterFinish is provided strictly for development purposes.
// When set, it attempts to exacerbate issues emanating from use of Spans
// after calling Finish by running additional assertions.
debugAssertUseAfterFinish bool
// enableSpanPool enables the use of a pool, so that the tracer reuses spans
// after Finish has been called on it. Adds a slight performance gain as it
// reduces allocations. However, if you have any use-after-finish race
// conditions the code may panic.
enableSpanPool bool
// logger ...
logger Logger
// clientServerSameSpan allows for Zipkin V1 style span per RPC. This places
// both client end and server end of a RPC call into the same span.
clientServerSameSpan bool
// debugMode activates Zipkin's debug request allowing for all Spans originating
// from this tracer to pass through and bypass sampling. Use with extreme care
// as it might flood your system if you have many traces starting from the
// service you are instrumenting.
debugMode bool
// traceID128Bit enables the generation of 128 bit traceIDs in case the tracer
// needs to create a root span. By default regular 64 bit traceIDs are used.
// Regardless of this setting, the library will propagate and support both
// 64 and 128 bit incoming traces from upstream sources.
traceID128Bit bool
observer otobserver.Observer
}
// TracerOption allows for functional options.
// See: http://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
type TracerOption func(opts *TracerOptions) error
// WithSampler allows one to add a Sampler function
func WithSampler(sampler Sampler) TracerOption {
return func(opts *TracerOptions) error {
opts.shouldSample = sampler
return nil
}
}
// TrimUnsampledSpans option
func TrimUnsampledSpans(trim bool) TracerOption {
return func(opts *TracerOptions) error {
opts.trimUnsampledSpans = trim
return nil
}
}
// DropAllLogs option
func DropAllLogs(dropAllLogs bool) TracerOption {
return func(opts *TracerOptions) error {
opts.dropAllLogs = dropAllLogs
return nil
}
}
// WithLogger option
func WithLogger(logger Logger) TracerOption {
return func(opts *TracerOptions) error {
opts.logger = logger
return nil
}
}
// DebugAssertSingleGoroutine option
func DebugAssertSingleGoroutine(val bool) TracerOption {
return func(opts *TracerOptions) error {
opts.debugAssertSingleGoroutine = val
return nil
}
}
// DebugAssertUseAfterFinish option
func DebugAssertUseAfterFinish(val bool) TracerOption {
return func(opts *TracerOptions) error {
opts.debugAssertUseAfterFinish = val
return nil
}
}
// TraceID128Bit option
func TraceID128Bit(val bool) TracerOption {
return func(opts *TracerOptions) error {
opts.traceID128Bit = val
return nil
}
}
// ClientServerSameSpan allows to place client-side and server-side annotations
// for a RPC call in the same span (Zipkin V1 behavior) or different spans
// (more in line with other tracing solutions). By default this Tracer
// uses shared host spans (so client-side and server-side in the same span).
// If using separate spans you might run into trouble with Zipkin V1 as clock
// skew issues can't be remedied at Zipkin server side.
func ClientServerSameSpan(val bool) TracerOption {
return func(opts *TracerOptions) error {
opts.clientServerSameSpan = val
return nil
}
}
// DebugMode allows to set the tracer to Zipkin debug mode
func DebugMode(val bool) TracerOption {
return func(opts *TracerOptions) error {
opts.debugMode = val
return nil
}
}
// EnableSpanPool ...
func EnableSpanPool(val bool) TracerOption {
return func(opts *TracerOptions) error {
opts.enableSpanPool = val
return nil
}
}
// NewSpanEventListener option
func NewSpanEventListener(f func() func(SpanEvent)) TracerOption {
return func(opts *TracerOptions) error {
opts.newSpanEventListener = f
return nil
}
}
// WithMaxLogsPerSpan option
func WithMaxLogsPerSpan(limit int) TracerOption {
return func(opts *TracerOptions) error {
if limit < 5 || limit > 10000 {
return errors.New("invalid MaxLogsPerSpan limit. Should be between 5 and 10000")
}
opts.maxLogsPerSpan = limit
return nil
}
}
// NewTracer creates a new OpenTracing compatible Zipkin Tracer.
func NewTracer(recorder SpanRecorder, options ...TracerOption) (opentracing.Tracer, error) {
opts := &TracerOptions{
recorder: recorder,
shouldSample: alwaysSample,
trimUnsampledSpans: false,
newSpanEventListener: func() func(SpanEvent) { return nil },
logger: &nopLogger{},
debugAssertSingleGoroutine: false,
debugAssertUseAfterFinish: false,
clientServerSameSpan: true,
debugMode: false,
traceID128Bit: false,
maxLogsPerSpan: 10000,
observer: nil,
}
for _, o := range options {
err := o(opts)
if err != nil {
return nil, err
}
}
rval := &tracerImpl{options: *opts}
rval.textPropagator = &textMapPropagator{rval}
rval.binaryPropagator = &binaryPropagator{rval}
rval.accessorPropagator = &accessorPropagator{rval}
return rval, nil
}
// Implements the `Tracer` interface.
type tracerImpl struct {
options TracerOptions
textPropagator *textMapPropagator
binaryPropagator *binaryPropagator
accessorPropagator *accessorPropagator
}
func (t *tracerImpl) StartSpan(
operationName string,
opts ...opentracing.StartSpanOption,
) opentracing.Span {
sso := opentracing.StartSpanOptions{}
for _, o := range opts {
o.Apply(&sso)
}
return t.startSpanWithOptions(operationName, sso)
}
func (t *tracerImpl) getSpan() *spanImpl {
if t.options.enableSpanPool {
sp := spanPool.Get().(*spanImpl)
sp.reset()
return sp
}
return &spanImpl{}
}
func (t *tracerImpl) startSpanWithOptions(
operationName string,
opts opentracing.StartSpanOptions,
) opentracing.Span {
// Start time.
startTime := opts.StartTime
if startTime.IsZero() {
startTime = time.Now()
}
// Tags.
tags := opts.Tags
// Build the new span. This is the only allocation: We'll return this as
// an opentracing.Span.
sp := t.getSpan()
if t.options.observer != nil {
sp.observer, _ = t.options.observer.OnStartSpan(sp, operationName, opts)
}
// Look for a parent in the list of References.
//
// TODO: would be nice if basictracer did something with all
// References, not just the first one.
ReferencesLoop:
for _, ref := range opts.References {
switch ref.Type {
case opentracing.ChildOfRef:
refCtx := ref.ReferencedContext.(SpanContext)
sp.raw.Context.TraceID = refCtx.TraceID
sp.raw.Context.ParentSpanID = &refCtx.SpanID
sp.raw.Context.Sampled = refCtx.Sampled
sp.raw.Context.Flags = refCtx.Flags
sp.raw.Context.Flags &^= flag.IsRoot // unset IsRoot flag if needed
if t.options.clientServerSameSpan &&
tags[string(ext.SpanKind)] == ext.SpanKindRPCServer.Value {
sp.raw.Context.SpanID = refCtx.SpanID
sp.raw.Context.ParentSpanID = refCtx.ParentSpanID
sp.raw.Context.Owner = false
} else {
sp.raw.Context.SpanID = randomID()
sp.raw.Context.ParentSpanID = &refCtx.SpanID
sp.raw.Context.Owner = true
}
if l := len(refCtx.Baggage); l > 0 {
sp.raw.Context.Baggage = make(map[string]string, l)
for k, v := range refCtx.Baggage {
sp.raw.Context.Baggage[k] = v
}
}
break ReferencesLoop
case opentracing.FollowsFromRef:
refCtx := ref.ReferencedContext.(SpanContext)
sp.raw.Context.TraceID = refCtx.TraceID
sp.raw.Context.ParentSpanID = &refCtx.SpanID
sp.raw.Context.Sampled = refCtx.Sampled
sp.raw.Context.Flags = refCtx.Flags
sp.raw.Context.Flags &^= flag.IsRoot // unset IsRoot flag if needed
sp.raw.Context.SpanID = randomID()
sp.raw.Context.ParentSpanID = &refCtx.SpanID
sp.raw.Context.Owner = true
if l := len(refCtx.Baggage); l > 0 {
sp.raw.Context.Baggage = make(map[string]string, l)
for k, v := range refCtx.Baggage {
sp.raw.Context.Baggage[k] = v
}
}
break ReferencesLoop
}
}
if sp.raw.Context.TraceID.Empty() {
// No parent Span found; allocate new trace and span ids and determine
// the Sampled status.
if t.options.traceID128Bit {
sp.raw.Context.TraceID.High = randomID()
}
sp.raw.Context.TraceID.Low, sp.raw.Context.SpanID = randomID2()
sp.raw.Context.Sampled = t.options.shouldSample(sp.raw.Context.TraceID.Low)
sp.raw.Context.Flags = flag.IsRoot
sp.raw.Context.Owner = true
}
if t.options.debugMode {
sp.raw.Context.Flags |= flag.Debug
}
return t.startSpanInternal(
sp,
operationName,
startTime,
tags,
)
}
func (t *tracerImpl) startSpanInternal(
sp *spanImpl,
operationName string,
startTime time.Time,
tags opentracing.Tags,
) opentracing.Span {
sp.tracer = t
if t.options.newSpanEventListener != nil {
sp.event = t.options.newSpanEventListener()
}
sp.raw.Operation = operationName
sp.raw.Start = startTime
sp.raw.Duration = -1
sp.raw.Tags = tags
if t.options.debugAssertSingleGoroutine {
sp.SetTag(debugGoroutineIDTag, curGoroutineID())
}
defer sp.onCreate(operationName)
return sp
}
type delegatorType struct{}
// Delegator is the format to use for DelegatingCarrier.
var Delegator delegatorType
func (t *tracerImpl) Inject(sc opentracing.SpanContext, format interface{}, carrier interface{}) error {
switch format {
case opentracing.TextMap, opentracing.HTTPHeaders:
return t.textPropagator.Inject(sc, carrier)
case opentracing.Binary:
return t.binaryPropagator.Inject(sc, carrier)
}
if _, ok := format.(delegatorType); ok {
return t.accessorPropagator.Inject(sc, carrier)
}
return opentracing.ErrUnsupportedFormat
}
func (t *tracerImpl) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) {
switch format {
case opentracing.TextMap, opentracing.HTTPHeaders:
return t.textPropagator.Extract(carrier)
case opentracing.Binary:
return t.binaryPropagator.Extract(carrier)
}
if _, ok := format.(delegatorType); ok {
return t.accessorPropagator.Extract(carrier)
}
return nil, opentracing.ErrUnsupportedFormat
}
func (t *tracerImpl) Options() TracerOptions {
return t.options
}
// WithObserver assigns an initialized observer to opts.observer
func WithObserver(observer otobserver.Observer) TracerOption {
return func(opts *TracerOptions) error {
opts.observer = observer
return nil
}
}