Skip to content

Commit

Permalink
observability: add support for distributed tracing
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
saiprashanth173 authored and alex-aizman committed Oct 30, 2024
1 parent 949e80c commit 1f19cde
Show file tree
Hide file tree
Showing 22 changed files with 490 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
AIS_BACKEND_PROVIDERS="aws azure gcp" MODE="debug" make node
# 5) cloud backends, debug, statsd
# (build with StatsD, and note that Prometheus is the default when `statsd` tag is not defined)
TAGS="aws azure gcp statsd debug" make node
TAGS="aws azure gcp statsd debug oteltracing" make node
# 6) statsd, debug, nethttp (note that fasthttp is used by default)
TAGS="nethttp statsd debug" make node
# 7) w/ mem profile (see cmd/aisnodeprofile)
Expand Down
22 changes: 15 additions & 7 deletions 3rdparty/golang/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"

"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/tracing"
)

// ServeMux is an HTTP request multiplexer.
Expand Down Expand Up @@ -58,10 +59,11 @@ import (
// header, stripping the port number and redirecting any request containing . or
// .. elements or repeated slashes to an equivalent, cleaner URL.
type ServeMux struct {
mu sync.RWMutex
m map[string]muxEntry
es []muxEntry // slice of entries sorted from longest to shortest.
hosts bool // whether any patterns contain hostnames
mu sync.RWMutex
m map[string]muxEntry
es []muxEntry // slice of entries sorted from longest to shortest.
hosts bool // whether any patterns contain hostnames
tracingEnabled bool // enable tracing
}

type muxEntry struct {
Expand All @@ -70,7 +72,9 @@ type muxEntry struct {
}

// NewServeMux allocates and returns a new ServeMux.
func NewServeMux() *ServeMux { return new(ServeMux) }
func NewServeMux(enableTracing bool) *ServeMux {
return &ServeMux{tracingEnabled: enableTracing}
}

// DefaultServeMux is the default ServeMux used by Serve.
var DefaultServeMux = &defaultServeMux
Expand Down Expand Up @@ -299,8 +303,12 @@ func appendSorted(es []muxEntry, e muxEntry) []muxEntry {
}

// HandleFunc registers the handler function for the given pattern.
func (mux *ServeMux) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
func (mux *ServeMux) HandleFunc(pattern string, handlerFunc http.HandlerFunc) {
mux.mu.Lock()
mux._handle(pattern, http.HandlerFunc(handler))
var handler http.Handler = handlerFunc
if mux.tracingEnabled {
handler = tracing.NewTraceableHandler(handler, pattern)
}
mux._handle(pattern, handler)
mux.mu.Unlock()
}
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ Since AIS natively supports [remote backends](/docs/providers.md), you can also
- [Prometheus](/docs/prometheus.md)
- [Reference: all supported metrics](/docs/metrics-reference.md)
- [Observability overview: StatsD and Prometheus, logs, and CLI](/docs/metrics.md)
- [Distributed Tracing](/docs/distributed-tracing.md)
- [CLI: `ais show performance`](/docs/cli/show.md)
- For users and developers
- [Getting started](/docs/getting_started.md)
Expand Down
3 changes: 2 additions & 1 deletion ais/backend/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/memsys"
"github.com/NVIDIA/aistore/stats"
"github.com/NVIDIA/aistore/tracing"
"github.com/aws/aws-sdk-go-v2/aws"
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/config"
Expand Down Expand Up @@ -832,7 +833,7 @@ func loadConfig(endpoint, profile string) (aws.Config, error) {
// NOTE: The AWS SDK for Go v2, uses lower case header maps by default.
cfg, err := config.LoadDefaultConfig(
context.Background(),
config.WithHTTPClient(cmn.NewClient(cmn.TransportArgs{})),
config.WithHTTPClient(tracing.NewTraceableClient(cmn.NewClient(cmn.TransportArgs{}))),
config.WithSharedConfigProfile(profile),
)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion ais/backend/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/NVIDIA/aistore/core"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/stats"
"github.com/NVIDIA/aistore/tracing"
jsoniter "github.com/json-iterator/go"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -113,7 +114,7 @@ func (gsbp *gsbp) createClient(ctx context.Context) (*storage.Client, error) {
}
return nil, cmn.NewErrFailedTo(nil, "gcp-backend: create", "http transport", err)
}
opts = append(opts, option.WithHTTPClient(&http.Client{Transport: transport}))
opts = append(opts, option.WithHTTPClient(tracing.NewTraceableClient(&http.Client{Transport: transport})))
// create HTTP client
client, err := storage.NewClient(ctx, opts...)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions ais/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/NVIDIA/aistore/hk"
"github.com/NVIDIA/aistore/space"
"github.com/NVIDIA/aistore/sys"
"github.com/NVIDIA/aistore/tracing"
"github.com/NVIDIA/aistore/xact/xreg"
"github.com/NVIDIA/aistore/xact/xs"
)
Expand Down Expand Up @@ -242,6 +243,10 @@ func initDaemon(version, buildTime string) cos.Runner {
// aux plumbing
nlog.SetTitle(title)
cmn.InitErrs(p.si.Name(), nil)

// init distributed tracing
tracing.Init(&config.Tracing, p.si, version)

return p
}

Expand All @@ -258,6 +263,9 @@ func initDaemon(version, buildTime string) cos.Runner {
nlog.SetTitle(title)
cmn.InitErrs(t.si.Name(), fs.CleanPathErr)

// init distributed tracing
tracing.Init(&config.Tracing, t.si, version)

cmn.InitObjProps2Hdr()

return t
Expand Down Expand Up @@ -326,6 +334,9 @@ func Run(version, buildTime string) int {
rmain := initDaemon(version, buildTime)
err := daemon.rg.runAll(rmain)

// stop traceprovider, if running.
tracing.Shutdown()

if err == nil {
nlog.Infoln("Terminated OK")
return 0
Expand Down
4 changes: 2 additions & 2 deletions ais/htcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,10 +634,10 @@ func (server *netServer) shutdown(config *cmn.Config) {
// interface guard
var _ http.Handler = (*httpMuxers)(nil)

func newMuxers() httpMuxers {
func newMuxers(enableTracing bool) httpMuxers {
m := make(httpMuxers, len(htverbs))
for _, v := range htverbs {
m[v] = mux.NewServeMux()
m[v] = mux.NewServeMux(enableTracing)
}
return m
}
Expand Down
13 changes: 9 additions & 4 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/NVIDIA/aistore/memsys"
"github.com/NVIDIA/aistore/stats"
"github.com/NVIDIA/aistore/sys"
"github.com/NVIDIA/aistore/tracing"
"github.com/NVIDIA/aistore/xact/xreg"
jsoniter "github.com/json-iterator/go"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -291,16 +292,21 @@ func (h *htrun) init(config *cmn.Config) {
tcpbuf = cmn.DefaultSendRecvBufferSize // ditto: targets use AIS default when not configured
}

muxers := newMuxers()
// PubNet enable tracing when configuration is set.
muxers := newMuxers(tracing.IsEnabled())
g.netServ.pub = &netServer{muxers: muxers, sndRcvBufSize: tcpbuf}
g.netServ.control = g.netServ.pub // if not separately configured, intra-control net is public
if config.HostNet.UseIntraControl {
muxers = newMuxers()
// TODO: for now tracing is always disabled for intra-cluster traffic.
// Allow enabling through config.
muxers = newMuxers(false /*enableTracing*/)
g.netServ.control = &netServer{muxers: muxers, sndRcvBufSize: 0}
}
g.netServ.data = g.netServ.control // if not configured, intra-data net is intra-control
if config.HostNet.UseIntraData {
muxers = newMuxers()
// TODO: for now tracing is always disabled for intra-data traffic.
// Allow enabling through config.
muxers = newMuxers(false /*enableTracing*/)
g.netServ.data = &netServer{muxers: muxers, sndRcvBufSize: tcpbuf}
}

Expand Down Expand Up @@ -450,7 +456,6 @@ func mustDiffer(ip1 meta.NetInfo, port1 int, use1 bool, ip2 meta.NetInfo, port2
func (h *htrun) loadSmap() (smap *smapX, reliable bool) {
smap = newSmap()
loaded, err := h.owner.smap.load(smap)

if err != nil {
nlog.Errorln(h.String(), "failed to load Smap:", err, "- reinitializing")
return
Expand Down
5 changes: 5 additions & 0 deletions ais/prxclu.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,11 @@ func (p *proxy) setCluCfgPersistent(w http.ResponseWriter, r *http.Request, toUp
to, _ := jsoniter.Marshal(toUpdate.Auth)
whingeToUpdate("config.auth", string(from), string(to))
}
if toUpdate.Tracing != nil {
from, _ := jsoniter.Marshal(cmn.GCO.Get().Tracing)
to, _ := jsoniter.Marshal(cmn.GCO.Get().Tracing)
whingeToUpdate("config.tracing", string(from), string(to))
}

// do
if _, err := p.owner.config.modify(ctx); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion ais/prxrev.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ func (rp *reverseProxy) init() {
}

func (rp *reverseProxy) loadOrStore(uuid string, u *url.URL,
errHdlr func(w http.ResponseWriter, r *http.Request, err error)) *httputil.ReverseProxy {
errHdlr func(w http.ResponseWriter, r *http.Request, err error),
) *httputil.ReverseProxy {
revProxyIf, exists := rp.nodes.Load(uuid)
if exists {
shrp := revProxyIf.(*singleRProxy)
Expand Down
1 change: 0 additions & 1 deletion cmn/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ func NewClientTLS(cargs TransportArgs, sargs TLSArgs, intra bool) *http.Client {
cos.ExitLog(err) // FATAL
}
transport.TLSClientConfig = tlsConfig

return &http.Client{Transport: transport, Timeout: cargs.Timeout}
}

Expand Down
72 changes: 72 additions & 0 deletions cmn/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type (
Mirror MirrorConf `json:"mirror" allow:"cluster"`
EC ECConf `json:"ec" allow:"cluster"`
Log LogConf `json:"log"`
Tracing TracingConf `json:"tracing"`
Periodic PeriodConf `json:"periodic"`
Timeout TimeoutConf `json:"timeout"`
Client ClientConf `json:"client"`
Expand Down Expand Up @@ -138,6 +139,7 @@ type (
EC *ECConfToSet `json:"ec,omitempty"`
Log *LogConfToSet `json:"log,omitempty"`
Periodic *PeriodConfToSet `json:"periodic,omitempty"`
Tracing *TracingConfToSet `json:"tracing,omitempty"`
Timeout *TimeoutConfToSet `json:"timeout,omitempty"`
Client *ClientConfToSet `json:"client,omitempty"`
Space *SpaceConfToSet `json:"space,omitempty"`
Expand Down Expand Up @@ -241,6 +243,46 @@ type (
StatsTime *cos.Duration `json:"stats_time,omitempty"`
}

// TracingConf defines the configuration used for the OpenTelemetry (OTEL) trace exporter.
// It includes settings for enabling tracing, sampling ratio, exporter endpoint, and other
// parameters necessary for distributed tracing in AIStore.
TracingConf struct {
ExporterEndpoint string `json:"exporter_endpoint"` // gRPC exporter endpoint
ExporterAuth TraceExporterAuthConf `json:"exporter_auth,omitempty"` // exporter auth config
ServiceNamePrefix string `json:"service_name_prefix"` // service name prefix used by trace exporter
ExtraAttributes map[string]string `json:"attributes,omitempty"` // any extra-attributes to be added to traces

// SamplerProbablityStr is the percentage of traces to be sampled, expressed as a float64.
// It's stored as a string to avoid potential floating-point precision issues during json unmarshal.
// Valid values range from 0.0 to 1.0, where 1.0 means 100% sampling.
SamplerProbablityStr string `json:"sampler_probability,omitempty"`
Enabled bool `json:"enabled"`
SkipVerify bool `json:"skip_verify"` // allow insecure exporter gRPC connection

SamplerProbablity float64 `json:"-"`
}

// NOTE: Updating TracingConfig requires daemon restart.
TracingConfToSet struct {
ExporterEndpoint *string `json:"exporter_endpoint,omitempty"` // gRPC exporter endpoint
ExporterAuth *TraceExporterAuthConfToSet `json:"exporter_auth,omitempty"` // exporter auth config
ServiceNamePrefix *string `json:"service_name_prefix,omitempty"` // service name used by trace exporter
ExtraAttributes map[string]string `json:"attributes,omitempty"` // any extra-attributes to be added to traces
SamplerProbablityStr *string `json:"sampler_probability,omitempty"` // percentage of traces to be sampled
Enabled *bool `json:"enabled,omitempty"`
SkipVerify *bool `json:"skip_verify,omitempty"` // allow insecure exporter gRPC connection
}

TraceExporterAuthConf struct {
TokenHeader string `json:"token_header"` // header used to pass exporter auth token
TokenFile string `json:"token_file"` // filepath from where auth token can be obtained
}

TraceExporterAuthConfToSet struct {
TokenHeader *string `json:"token_header,omitempty"` // header used to pass exporter auth token
TokenFile *string `json:"token_file,omitempty"` // filepath from where auth token can be obtained
}

// NOTE: StatsTime is a one important timer
PeriodConf struct {
StatsTime cos.Duration `json:"stats_time"` // collect and publish stats; other house-keeping
Expand Down Expand Up @@ -691,6 +733,7 @@ var (
_ Validator = (*MemsysConf)(nil)
_ Validator = (*TCBConf)(nil)
_ Validator = (*WritePolicyConf)(nil)
_ Validator = (*TracingConf)(nil)

_ PropsValidator = (*CksumConf)(nil)
_ PropsValidator = (*SpaceConf)(nil)
Expand Down Expand Up @@ -1758,6 +1801,35 @@ func (c *ResilverConf) String() string {
return "Disabled"
}

///////////////////
// Tracing Conf //
/////////////////

const defaultSampleProbability = 1.0

func (c *TracingConf) Validate() error {
if !c.Enabled {
return nil
}
if c.ExporterEndpoint == "" {
return errors.New("invalid tracing.exporter_endpoint can't be empty when tracing is enabled")
}
if c.SamplerProbablityStr == "" {
c.SamplerProbablity = defaultSampleProbability
} else {
prob, err := strconv.ParseFloat(c.SamplerProbablityStr, 64)
if err != nil {
return nil
}
c.SamplerProbablity = prob
}
return nil
}

func (tac TraceExporterAuthConf) IsEnabled() bool {
return tac.TokenFile != "" && tac.TokenHeader != ""
}

////////////////////
// ConfigToSet //
////////////////////
Expand Down
1 change: 1 addition & 0 deletions deploy/dev/local/aisnode_config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ cat > $AIS_CONF_FILE <<EOL
"burst_buffer": 128,
"enabled": ${AIS_MIRROR_ENABLED:-false}
},
$(make_tracing_conf)
"ec": {
"objsize_limit": ${AIS_OBJ_SIZE_LIMIT:-262144},
"compression": "${AIS_EC_COMPRESSION:-never}",
Expand Down
8 changes: 6 additions & 2 deletions deploy/dev/local/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,22 @@ rm $TMPF 2>/dev/null
# 4. conditionally linked backends
set_env_backends

# 5. finally, /dev/loop* devices, if any
# 5. /dev/loop* devices, if any
# see also: TEST_LOOPBACK_SIZE
TEST_LOOPBACK_COUNT=0
create_loopbacks_or_skip

# 6. conditionally enable distributed tracing
set_env_tracing_or_skip

### end reading STDIN ============================ 5 steps above =================================


## NOTE: to enable StatsD instead of Prometheus, use build tag `statsd` in the make command, as follows:
## TAGS=statsd make ...
## see docs/metrics.md and docs/prometheus.md for more information.
##
if ! AIS_BACKEND_PROVIDERS=${AIS_BACKEND_PROVIDERS} make --no-print-directory -C ${AISTORE_PATH} node; then
if ! TAGS=${TAGS} AIS_BACKEND_PROVIDERS=${AIS_BACKEND_PROVIDERS} make --no-print-directory -C ${AISTORE_PATH} node; then
exit_error "failed to compile 'aisnode' binary"
fi

Expand Down
Loading

0 comments on commit 1f19cde

Please sign in to comment.