diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..0659d3a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +FROM golang:1.16 AS build +WORKDIR /app +COPY go.mod ./ +COPY go.sum ./ +COPY *.go ./ +COPY sls_store/*.go ./sls_store/ +RUN go mod download && CGO_ENABLED=0 GOARCH=amd64 GOOS=linux go build -o /jaeger-sls-plugin + +FROM jaegertracing/all-in-one:1.25.0 +ENV ACCESS_KEY_SECRET="" \ + ACCESS_KEY_ID="" \ + PROJECT="" \ + ENDPOINT="" \ + INSTANCE="" \ + GRPC_STORAGE_PLUGIN_BINARY="/jaeger-sls-plugin" \ + SPAN_STORAGE_TYPE=grpc-plugin \ + JAEGER_DISABLED=true \ + GRPC_STORAGE_PLUGIN_LOG_LEVEL=DEBUG +COPY --from=build /jaeger-sls-plugin /jaeger-sls-plugin + diff --git a/LICENSE b/LICENSE index aa8dfae..d532d69 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2021 千乘(Qian Sheng) +Copyright (c) 2021 Alibaba Cloud Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index d7bd636..e3f74d5 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +# SLS Storage Plugin + This is the repository that contains object storage (Alibaba Could log service) plugin for Jaeger. ## About @@ -12,7 +14,21 @@ The [Alibaba Could log service](https://www.alibabacloud.com/product/log-service resilience, and freedom from operation and maintenance, allowing users to cope with surge traffic or inaccurate size assessment quickly, and the SLS service itself provides 99.9% availability and 11 out of 9 data reliability. -Jaeger :heart: The Alibab Cloud log service +The Alibab Cloud log service :heart: Jaeger + +## Quick Start + +1. Login [Alibaba Cloud log service Console](https://sls.console.aliyun.com/lognext/profile) +2. Create Project + [CrateProject](./images/create_project.gif) +3. Create Trace Instance + [CreateTraceInstance](./images/create_trace_instance.gif) +4. Modify Docker compose file + [ModifyConfigure](./images/modify_configure.gif) +5. Start Demo + [StartingDemo](./images/start_demos.gif) +6. Having fun with Jaeger and SLS. :grinning: + [HavingFunWithJaegerAndSLS](./images/havingfun.gif) ## Build/Compile @@ -23,13 +39,17 @@ cd /path/to/jaeger-sls go build ``` -## Parameter Flag - -(TODO) - ## Start -(TODO) +Executing the following command to start jaeger with Tenon Plugin +```shell +export ACCESS_KEY_SECRET: "" +export ACCESS_KEY_ID: "" +export PROJECT: "" +export ENDPOINT: "" +export INSTANCE: "" +GRPC_STORAGE_PLUGIN_BINARY="./jaeger-sls" GRPC_STORAGE_PLUGIN_CONFIGURATION_FILE=./config.yaml SPAN_STORAGE_TYPE=grpc-plugin JAEGER_DISABLED=true GRPC_STORAGE_PLUGIN_LOG_LEVEL=DEBUG ./all-in-one +``` ## License diff --git a/config.yaml.template b/config.yaml.template new file mode 100644 index 0000000..d3caabc --- /dev/null +++ b/config.yaml.template @@ -0,0 +1,5 @@ +ACCESS_KEY_SECRET: "" +ACCESS_KEY_ID: "" +PROJECT: "" +ENDPOINT: "" +INSTANCE: "" diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..b05e08a --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,128 @@ +version: '3.3' + +services: + mysql: + image: mysql:5.6 + ports: + - "3306:3306" + expose: + - 3306 + networks: + - backend + restart: always + environment: + MYSQL_ROOT_PASSWORD: "mysqlpwd" + TZ: "Asia/Shanghai" + volumes: + - "./examples/database.sql:/docker-entrypoint-initdb.d/database.sql" + + jaeger: + build: . + networks: + - backend + ports: + - "6831:6831" + - "6832:6832" + - "5778:5778" + - "16686:16686" + - "14268:14268" + - "14250:14250" + environment: + TZ: "Asia/Shanghai" + ACCESS_KEY_SECRET: "" + ACCESS_KEY_ID: "" + PROJECT: "" + ENDPOINT: "" + INSTANCE: "" + + tracing-poc: + image: iqfarhad/medium-poc_tracing:latest + ports: + - "8080:8080" + networks: + - backend + restart: always + environment: + PORT: ":8080" + DEBUG: "true" + TRACING_OPTION: "otel-collector" + OTEL_EXPORTER_OTLP_ENDPOINT: "otel-agent:4317" + MYSQL_URL: "root:mysqlpwd@tcp(mysql:3306)/sampleDB" + JAEGER_AGENT_NAME: "jaeger" + JAEGER_AGENT_PORT: "5775" + JAEGER_COLLECTOR_URL: "http://jaeger:14268/api/traces" + QUERYYER_URL: "http://tracing-queryyer:8081/getPerson/" + FORMATTER_URL: "http://tracing-formatter:8082/formatGreeting?" + TZ: "Asia/Shanghai" + entrypoint: "/go/bin/tracing-poc" + depends_on: + - otel-collector + - mysql + - tracing-queryyer + + tracing-queryyer: + image: iqfarhad/medium-poc_tracing:latest + networks: + - backend + restart: always + environment: + PORT: ":8081" + DEBUG: "true" + TRACING_OPTION: "otel-collector" + OTEL_EXPORTER_OTLP_ENDPOINT: "otel-agent:4317" + JAEGER_AGENT_NAME: "jaeger" + JAEGER_AGENT_PORT: "5775" + MYSQL_URL: "root:mysqlpwd@tcp(mysql:3306)/sampleDB" + JAEGER_COLLECTOR_URL: "http://jaeger:14268/api/traces" + TZ: "Asia/Shanghai" + entrypoint: "/go/bin/queryyer" + depends_on: + - otel-collector + + tracing-formatter: + image: iqfarhad/medium-poc_tracing:latest + networks: + - backend + restart: always + environment: + PORT: ":8082" + DEBUG: "true" + TRACING_OPTION: "otel-collector" + OTEL_EXPORTER_OTLP_ENDPOINT: "otel-agent:4317" + JAEGER_AGENT_NAME: "jaeger" + JAEGER_AGENT_PORT: "5775" + MYSQL_URL: "root:mysqlpwd@tcp(mysql:3306)/sampleDB" + JAEGER_COLLECTOR_URL: "http://jaeger:14268/api/traces" + TZ: "Asia/Shanghai" + entrypoint: "/go/bin/formatter" + depends_on: + - otel-collector + + otel-agent: + image: otel/opentelemetry-collector:latest + command: [ "--config=/etc/otel-agent-config.yaml", "--log-level=DEBUG" ] + volumes: + - ./examples/config-agent.yaml:/etc/otel-agent-config.yaml + ports: + - "1777:1777" + - "55679:55679" + - "13133" + - "4317:4317" + - "55681:55681" + networks: + - backend + environment: + TZ: "Asia/Shanghai" + + otel-collector: + image: otel/opentelemetry-collector:latest + command: [ "--config=/etc/otel-collector-config.yaml", "--log-level=DEBUG" ] + volumes: + - ./examples/config-collector.yaml:/etc/otel-collector-config.yaml + networks: + - backend + environment: + TZ: "Asia/Shanghai" + +networks: + backend: diff --git a/examples/config-agent.yaml b/examples/config-agent.yaml new file mode 100644 index 0000000..c16c87a --- /dev/null +++ b/examples/config-agent.yaml @@ -0,0 +1,29 @@ +extensions: + health_check: + +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:55681 + +processors: + batch/traces: + timeout: 1s + send_batch_size: 50 + +exporters: + otlp: + endpoint: otel-collector:4317 + insecure: true + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch/traces] + exporters: [otlp] + + extensions: [health_check] \ No newline at end of file diff --git a/examples/config-collector.yaml b/examples/config-collector.yaml new file mode 100644 index 0000000..540ab1d --- /dev/null +++ b/examples/config-collector.yaml @@ -0,0 +1,21 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + +exporters: + jaeger: + endpoint: "jaeger:14250" + insecure: true + +processors: + batch: + + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [jaeger] diff --git a/examples/database.sql b/examples/database.sql new file mode 100644 index 0000000..52cacc3 --- /dev/null +++ b/examples/database.sql @@ -0,0 +1,18 @@ +CREATE DATABASE IF NOT EXISTS sampleDB; + +USE sampleDB; + +CREATE TABLE IF NOT EXISTS sampleDB.people ( + name VARCHAR(100), + title VARCHAR(10), + description VARCHAR(100), + PRIMARY KEY (name) + ); + +DELETE FROM sampleDB.people; + +INSERT INTO sampleDB.people VALUES ('EQ', 'Tech', 'Where are the cakes?'); +INSERT INTO sampleDB.people VALUES ('Farhad', 'Dr.', 'Why ... why are you so nice?'); +INSERT INTO sampleDB.people VALUES ('Sonos', 'Mr.', 'you are so loud!'); +INSERT INTO sampleDB.people VALUES ('Margo', 'Ms.', 'Privet!'); +INSERT INTO sampleDB.people VALUES ('Trace', 'Mr.', 'This is so cool!'); \ No newline at end of file diff --git a/go.mod b/go.mod index eb700ef..09d7963 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,14 @@ -module github.com/qiansheng91/jaeger-sls +module github.com/aliyun/aliyun-log-jaeger go 1.16 require ( github.com/aliyun/aliyun-log-go-sdk v0.1.21 + github.com/gogo/protobuf v1.3.2 + github.com/hashicorp/go-hclog v0.16.2 github.com/jaegertracing/jaeger v1.24.0 + github.com/spf13/cast v1.3.1 + github.com/spf13/viper v1.8.1 ) -replace . => github.com/qiansheng91/jaeger-sls v0.0.0-20210803014446-26eb89a251e1 +replace github.com/aliyun/aliyun-log-jaeger => ./ diff --git a/go.sum b/go.sum index 4c12a9c..4740b4c 100644 --- a/go.sum +++ b/go.sum @@ -408,8 +408,9 @@ github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyN github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-hclog v0.14.1/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ= -github.com/hashicorp/go-hclog v0.16.1 h1:IVQwpTGNRRIHafnTs2dQLIk4ENtneRIEEJWOVDqz99o= github.com/hashicorp/go-hclog v0.16.1/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ= +github.com/hashicorp/go-hclog v0.16.2 h1:K4ev2ib4LdQETX5cSZBG0DVLk1jwGqSPXBjdah3veNs= +github.com/hashicorp/go-hclog v0.16.2/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= diff --git a/images/create_project.gif b/images/create_project.gif new file mode 100644 index 0000000..460b84d Binary files /dev/null and b/images/create_project.gif differ diff --git a/images/create_trace_instance.gif b/images/create_trace_instance.gif new file mode 100644 index 0000000..3de44f9 Binary files /dev/null and b/images/create_trace_instance.gif differ diff --git a/images/havingfun.gif b/images/havingfun.gif new file mode 100644 index 0000000..313d569 Binary files /dev/null and b/images/havingfun.gif differ diff --git a/images/modify_configure.gif b/images/modify_configure.gif new file mode 100644 index 0000000..62d4b38 Binary files /dev/null and b/images/modify_configure.gif differ diff --git a/images/start_demos.gif b/images/start_demos.gif new file mode 100644 index 0000000..802936a Binary files /dev/null and b/images/start_demos.gif differ diff --git a/main.go b/main.go index e9481ae..d490572 100644 --- a/main.go +++ b/main.go @@ -1,70 +1,125 @@ package main import ( + "errors" "flag" + "github.com/aliyun/aliyun-log-jaeger/sls_store" + "github.com/hashicorp/go-hclog" "github.com/jaegertracing/jaeger/plugin/storage/grpc" "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" - "github.com/qiansheng91/jaeger-sls/sls_store" + "github.com/spf13/viper" "time" ) const ( - DefaultLookBack = 7 + DefaultLookBack = 6 ) -var ( - endpoint string - accessKeyID string - accessSecret string - project string - instance string - lookBack int64 -) +var configPath string + +type Configuration struct { + Endpoint string `yaml:"endpoint"` + AccessKeyID string `yaml:"accessKeyId"` + AccessSecret string `yaml:"accessSecret"` + Project string `yaml:"project"` + Instance string `yaml:"instance"` + MaxLookBack time.Duration `yaml:maxLookBack` +} + +var logger = hclog.New(&hclog.LoggerOptions{ + Level: hclog.Info, + Name: "aliyun-log-jaeger-plugin", + JSONFormat: true, +}) func main() { - flag.StringVar(&endpoint, "sls-endpoint", "", "The endpoint of Log Service. The format is ${project}.${region-endpoint}") - flag.StringVar(&accessKeyID, "sls-access_key-id", "", "The AccessKey ID of your Alibaba Cloud account.") - flag.StringVar(&accessSecret, "sls-access-secret", "", "The AccessKey secret of your Alibaba Cloud account.") - flag.StringVar(&project, "sls-project", "", "The name of the Log Service project.") - flag.StringVar(&instance, "sls-instance", "", "The name of the trace instance.") - flag.Int64Var(&lookBack, "sls-max-look-back", DefaultLookBack, "Maximum time frame for searching data. (Unit: Day)") + flag.StringVar(&configPath, "config", "", "Path to the alibaba log jaeger plugin's configuration file") flag.Parse() - checkParameter(endpoint, accessKeyID, accessSecret, project, instance) + configuration, err := initialParameters(configPath, logger) + if err != nil { + logger.Error("Fatal error config file: %w\n", err) + return + } var plugin = sls_store.NewSLSStorageForJaegerPlugin( - endpoint, - accessKeyID, - accessSecret, - project, - instance, - time.Duration(lookBack)*24*time.Hour, + configuration.Endpoint, + configuration.AccessKeyID, + configuration.AccessSecret, + configuration.Project, + configuration.Instance, + configuration.MaxLookBack, + logger, ) grpc.Serve(&shared.PluginServices{ - Store: plugin, - ArchiveStore: plugin, + Store: plugin, }) + + logger.Info("SLS jaeger plugin initialized Successfully") } -func checkParameter(endpoint string, accessKeyID string, accessSecret string, project string, instance string) { - if endpoint == "" { - panic("The Endpoint can't be empty") +func initialParameters(configPath string, logger hclog.Logger) (*Configuration, error) { + v := viper.New() + v.AutomaticEnv() + + if configPath != "" { + v.SetConfigFile(configPath) + if err := v.ReadInConfig(); err != nil { + logger.Error("Failed to read file.", "Exception", err) + return nil, err + } } - if accessKeyID == "" { - panic("The access key id can't be empty") + configuration := &Configuration{} + if err := configuration.InitFromViper(v); err != nil { + return nil, err + } else { + return configuration, nil } +} - if accessSecret == "" { - panic("The access secret can't be empty") +func (c *Configuration) InitFromViper(v *viper.Viper) error { + c.AccessSecret = v.GetString("ACCESS_KEY_SECRET") + if c.AccessSecret == "" { + logger.Error("The AccessSecret can't be empty") + return errors.New("The AccessSecret can't be empty") } - if project == "" { - panic("The access secret can't be empty") + c.AccessKeyID = v.GetString("ACCESS_KEY_ID") + if c.AccessKeyID == "" { + logger.Error("The access key id can't be empty") + return errors.New("The access key id can't be empty") } - if instance == "" { - panic("The access secret can't be empty") + c.Project = v.GetString("PROJECT") + if c.Project == "" { + logger.Error("The project name can't be empty") + return errors.New("The project name can't be empty") } + + c.Endpoint = v.GetString("ENDPOINT") + if c.Endpoint == "" { + logger.Error("The endpoint can't be empty") + return errors.New("The endpoint can't be empty") + } + + c.Instance = v.GetString("INSTANCE") + if c.Instance == "" { + logger.Error("The instance can't be empty") + return errors.New("The instance can't be empty") + } + + lookBack := v.GetInt32("MAX_LOOK_BACK") + if lookBack == 0 { + lookBack = DefaultLookBack + } + + if lookBack > 3*24 { + logger.Warn("Setting a larger value for MAX_LOOK_BACK will affect the query efficiency.", "MAX_LOOK_BACK", lookBack) + } + + c.MaxLookBack = time.Duration(lookBack) * time.Hour + logger.Info("Parameters", "AccessSecret", c.AccessSecret, "AccessKeyID", c.AccessKeyID, "Project", c.Project, "Instance", c.Instance, "Endpoint", c.Endpoint, "MaxLookBack", c.MaxLookBack) + return nil } diff --git a/sls_store/constant.go b/sls_store/constant.go new file mode 100644 index 0000000..78da22e --- /dev/null +++ b/sls_store/constant.go @@ -0,0 +1,75 @@ +package sls_store + +import "time" + +// Span Attribute name List +const ( + // ParentService The log item key of parent service name + ParentService = "parent_service" + // ChildService the field name of child service + ChildService = "child_service" + // ServiceName the field name of service + ServiceName = "service" + // OperationName the field name of operation name + OperationName = "name" + // SpanKind the field name of span kind + SpanKind = "kind" + // TraceID the field name of trace id + TraceID = "traceID" + // TraceIDField + TraceIDField = "traceid" + // SpanID the field name of span id + SpanID = "spanID" + // SpanIDField + SpanIDField = "spanID" + // ParentSpanID the field name of parent span id + ParentSpanID = "parentSpanID" + // StartTime the field name of start time + StartTime = "start" + // Duration the field name of duration + Duration = "duration" + // Attribute the field name of span tags + Attribute = "attribute" + // Resource the field name of span process tag + Resource = "resource" + // Logs the field name of span log + Logs = "logs" + // Links the field name of span reference + Links = "links" + // StatusMessage the field name of warning message of span + StatusMessage = "statusMessage" + //StatusMessageField + StatusMessageField = "statusmessage" + // Flags the field name of flags + Flags = "flags" + // EndTime the field name of end time + EndTime = "end" + // StatusCode the field name of status code + StatusCode = "statusCode" + // StatusCodeField + StatusCodeField = "statuscode" +) + +// Query template List +const ( + // DependenciesQueryString The query string which calculates the dependency relationship between each service. + DependenciesQueryString = "* and version: service_name | SELECT parent_service, child_service , sum(n_status_fail + n_status_succ) as count from log group by parent_service, child_service" + // GetTraceQueryTemplate The template query string which selects trace by trace id + GetTraceQueryTemplate = "traceID: %s" + // GetServiceQueryString the query string which queries all service name + GetServiceQueryString = "* | select DISTINCT service" +) + +// query operation values +const ( + // DefaultFetchNumber the max fetching number of span + DefaultFetchNumber = 1000 + // DefaultOffset default offset + DefaultOffset = 0 + // DefaultTopicName default topic name + DefaultTopicName = "" + // DefaultRetryTimeOut the default value of retry timeout + DefaultRetryTimeOut = 2 * time.Minute + // DefaultRequestTimeOut the default value of request timeout + DefaultRequestTimeOut = 2 * time.Minute +) diff --git a/sls_store/query_sql_builder.go b/sls_store/query_sql_builder.go new file mode 100644 index 0000000..fc3b27a --- /dev/null +++ b/sls_store/query_sql_builder.go @@ -0,0 +1,105 @@ +package sls_store + +import ( + "fmt" + "time" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +func toGetTraceQuery(id model.TraceID) string { + return fmt.Sprintf(GetTraceQueryTemplate, id.String()) +} + +func toGetServicesQuery() string { + return GetServiceQueryString +} + +var queryBuilder = QueryBuilder{ + query: "*", + analyze: " select * from log where 1=1 ", +} + +func toOperationsQuery(parameters spanstore.OperationQueryParameters) string { + return queryBuilder.withSpanKind(parameters.SpanKind).withServiceName(parameters.ServiceName).toString() +} + +func toFindTraceIdsQuery(parameters *spanstore.TraceQueryParameters) string { + return QueryBuilder{ + query: "*", + analyze: "select traceid from log where 1=1 ", + }.withTags(parameters.Tags). + withDuration(parameters.DurationMin, parameters.DurationMax). + withServiceName(parameters.ServiceName). + withOperationName(parameters.OperationName). + withGroupByTraceID(). + withLimit(parameters.NumTraces). + toString() +} + +type QueryBuilder struct { + query string + analyze string +} + +func (o QueryBuilder) withLimit(p int) *QueryBuilder { + o.analyze += fmt.Sprintf(" limit %d", p) + return &o +} +func (o QueryBuilder) withGroupByTraceID() *QueryBuilder { + o.analyze += " group by traceid" + return &o +} + +func (o QueryBuilder) withSpanKind(p string) *QueryBuilder { + if p != "" { + o.query += fmt.Sprintf(" and kind: %s", p) + } + + return &o +} + +func (o QueryBuilder) withServiceName(p string) *QueryBuilder { + if p != "" { + o.query += fmt.Sprintf(" and service: %s", p) + } + + return &o +} + +func (o QueryBuilder) withOperationName(p string) *QueryBuilder { + if p != "" { + o.analyze += fmt.Sprintf(" and name like '%s'", p) + } + + return &o +} + +func (o QueryBuilder) withTags(p map[string]string) QueryBuilder { + if len(p) == 0 { + return o + } + + for key, value := range p { + o.query += fmt.Sprintf(" and attribute.%s: %s", key, value) + } + + return o +} + +func (o QueryBuilder) withDuration(min, max time.Duration) QueryBuilder { + if min > 0 { + o.analyze += fmt.Sprintf(" and duration >= %d", min.Nanoseconds()/1000) + } + + if max > 0 { + o.analyze += fmt.Sprintf(" and duration <= %d", max.Nanoseconds()/1000) + } + + return o +} + +func (o QueryBuilder) toString() string { + return fmt.Sprintf("%s | %s", o.query, o.analyze) +} diff --git a/sls_store/sls_dependency_reader.go b/sls_store/sls_dependency_reader.go index 76eb91a..9292d83 100644 --- a/sls_store/sls_dependency_reader.go +++ b/sls_store/sls_dependency_reader.go @@ -2,65 +2,48 @@ package sls_store import ( "context" - slsSdk "github.com/aliyun/aliyun-log-go-sdk" - "github.com/jaegertracing/jaeger/model" "strconv" "time" -) - -const ( - // DependenciesQueryString The query string which calculates the dependency relationship between each service. - DependenciesQueryString = "* and version: service_name" - // ParentServiceFieldName The log item key of parent service name - ParentServiceFieldName = "parent_service" - // FailureCallingTimesFieldName The key of failure calling times - FailureCallingTimesFieldName = "n_status_fail" - // SuccessfulCallingTimesFieldName The key of successful calling times - SuccessfulCallingTimesFieldName = "n_status_succ" - // ParentService the key of parent service - ParentService = "parent_service" - // ChildService the key of child service - ChildService = "child_service" + slsSdk "github.com/aliyun/aliyun-log-go-sdk" + "github.com/hashicorp/go-hclog" + "github.com/jaegertracing/jaeger/model" ) type slsDependencyReader struct { client *slsSdk.Client instance slsTraceInstance + logger hclog.Logger } func (s slsDependencyReader) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - response, error := s.client.GetLogs(s.instance.project(), s.instance.serviceDependencyLogStore(), "", - endTs.Unix()-int64(lookback), endTs.Unix(), DependenciesQueryString, 1000, 0, false) + defer func() { + if err := recover(); err != nil { + s.logger.Error("Failed to get DependencyLink", "Exception", err) + } + }() + + response, error := s.client.GetLogs(s.instance.project(), s.instance.serviceDependencyLogStore(), DefaultTopicName, + endTs.Add(-1*lookback).Unix(), endTs.Unix(), DependenciesQueryString, DefaultFetchNumber, DefaultOffset, false) if error != nil { return nil, error } - result := make([]model.DependencyLink, response.Count) - for i, log := range response.Logs { - if log[ParentServiceFieldName] == "None" { + s.logger.Info("GetDependencies", "Query", DependenciesQueryString, "Logstore", s.instance.serviceDependencyLogStore(), "DependencyLinks", response.Count) + var result []model.DependencyLink + for _, log := range response.Logs { + count, _ := strconv.ParseFloat(log["count"], 0) + if log[ParentService] == "None" { continue } - result[i] = model.DependencyLink{ + result = append(result, model.DependencyLink{ Parent: log[ParentService], Child: log[ChildService], - CallCount: getCallingTime(log), - } + CallCount: uint64(count), + }) } return result, nil } - -func getCallingTime(log map[string]string) uint64 { - return getCallingTimes(log, SuccessfulCallingTimesFieldName) + getCallingTimes(log, FailureCallingTimesFieldName) -} - -func getCallingTimes(log map[string]string, key string) uint64 { - successCount, e2 := strconv.Atoi(log[key]) - if e2 != nil { - successCount = 0 - } - return uint64(successCount) -} diff --git a/sls_store/sls_span_reader.go b/sls_store/sls_span_reader.go index 0b0f144..383dc85 100644 --- a/sls_store/sls_span_reader.go +++ b/sls_store/sls_span_reader.go @@ -2,38 +2,182 @@ package sls_store import ( "context" + "time" + slsSdk "github.com/aliyun/aliyun-log-go-sdk" + "github.com/hashicorp/go-hclog" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/spanstore" - "time" ) type slsSpanReader struct { client *slsSdk.Client instance slsTraceInstance maxLookBack time.Duration -} - -func (s slsSpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { - // traceID: xxx - return nil, nil + logger hclog.Logger } func (s slsSpanReader) GetServices(ctx context.Context) ([]string, error) { - // * | select DISTINCT service as service from log - return nil, nil + defer func() { + if err := recover(); err != nil { + s.logger.Error("Failed to GetServices", "Exception", err) + } + }() + from, to := buildSearchingData(s.maxLookBack) + + response, e := s.client.GetLogs(s.instance.project(), s.instance.traceLogStore(), DefaultTopicName, from, to, + toGetServicesQuery(), DefaultFetchNumber, DefaultOffset, false) + + s.logger.Info("GetServicesList", "Query", toGetServicesQuery(), "StartTime", time.Unix(from, 0), "EndTime", time.Unix(to, 0), "Logstore", s.instance.traceLogStore()) + + if e != nil { + return nil, e + } + + services := make([]string, response.Count) + for i, data := range response.Logs { + services[i] = data[ServiceName] + } + + return services, nil + } func (s slsSpanReader) GetOperations(ctx context.Context, query spanstore.OperationQueryParameters) ([]spanstore.Operation, error) { - // * and kind: client and service: front-end - return nil, nil + defer func() { + if err := recover(); err != nil { + s.logger.Error("Failed to get operations", "Exception", err) + } + }() + + from, to := buildSearchingData(s.maxLookBack) + + response, e := s.client.GetLogs(s.instance.project(), s.instance.traceLogStore(), DefaultTopicName, from, to, + toOperationsQuery(query), DefaultFetchNumber, DefaultOffset, false) + + s.logger.Info("GetOperations", "Query", toOperationsQuery(query), "StartTime", time.Unix(from, 0), "EndTime", time.Unix(to, 0), "Logstore", s.instance.traceLogStore()) + if e != nil { + return nil, e + } + + operations := make([]spanstore.Operation, response.Count) + for i, data := range response.Logs { + operations[i] = spanstore.Operation{ + Name: data[OperationName], + SpanKind: data[SpanKind], + } + } + + return operations, nil } func (s slsSpanReader) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { - // - return nil, nil + defer func() { + if err := recover(); err != nil { + s.logger.Error("Failed to find traces", "Exceptions", err) + } + }() + + traceIDs, err := GetTraceIDsWithQuery(s.client, s.instance.project(), s.instance.traceLogStore(), query) + if err != nil { + return nil, err + } + + var result []*model.Trace + for _, tid := range traceIDs { + if t, e := GetTraceWithTime(s.client, tid, query.StartTimeMin.Unix(), query.StartTimeMax.Unix(), s.instance.project(), + s.instance.traceLogStore()); e == nil { + result = append(result, t) + } else { + logger.Warn("Failed to get trace data.", "TID", tid, "Exception", e) + } + } + + return result, nil } func (s slsSpanReader) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) { - return nil, nil + defer func() { + if err := recover(); err != nil { + s.logger.Error("Failed to FindTraceIDs", "Exception", err) + } + }() + + return GetTraceIDsWithQuery(s.client, s.instance.project(), s.instance.traceLogStore(), query) +} + +func (s slsSpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { + from, to := buildSearchingData(s.maxLookBack) + return GetTraceWithTime(s.client, traceID, from, to, s.instance.project(), s.instance.traceLogStore()) +} + +var logger = hclog.New(&hclog.LoggerOptions{ + Level: hclog.Info, + Name: "aliyun-log-jaeger-plugin", + JSONFormat: true, +}) + +func GetTraceIDsWithQuery(client *slsSdk.Client, project, logstore string, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) { + from, to := query.StartTimeMin.Unix(), query.StartTimeMax.Unix() + queryString := toFindTraceIdsQuery(query) + + response, e := client.GetLogs(project, logstore, DefaultTopicName, from, to, + queryString, DefaultFetchNumber, DefaultOffset, false) + + if e != nil { + return nil, e + } + + traceIDS := make(map[string]bool) + for _, log := range response.Logs { + traceIDS[log[TraceIDField]] = true + } + + var result []model.TraceID + + for key, _ := range traceIDS { + traceId, e1 := model.TraceIDFromString(key) + if e1 != nil { + logger.Warn("Failed to convert trace ID", "tid", key) + continue + } + + result = append(result, traceId) + } + + return result, nil +} + +func GetTraceWithTime(client *slsSdk.Client, traceID model.TraceID, from, to int64, project, logstore string) (*model.Trace, error) { + if response, e := client.GetLogs(project, logstore, DefaultTopicName, from, to, toGetTraceQuery(traceID), + DefaultFetchNumber, DefaultOffset, false); e != nil { + return nil, e + } else { + return mappingTraceData(response.Logs) + } +} + +// mappingTraceData the method used to converting sls span data to jaeger span data. +func mappingTraceData(logs []map[string]string) (*model.Trace, error) { + var processMapping []model.Trace_ProcessMapping + spans := make([]*model.Span, 0) + for _, data := range logs { + if spanData, err := dataConvert.ToJaegerSpan(data); err != nil { + continue + } else { + spans = append(spans, spanData) + } + } + + for _, span := range spans { + processMapping = append(processMapping, model.Trace_ProcessMapping{ + ProcessID: span.ProcessID, + Process: *span.Process, + }) + } + + return &model.Trace{ + Spans: spans, + ProcessMap: processMapping, + }, nil } diff --git a/sls_store/sls_span_writer.go b/sls_store/sls_span_writer.go index 3bc57c5..7387753 100644 --- a/sls_store/sls_span_writer.go +++ b/sls_store/sls_span_writer.go @@ -2,17 +2,55 @@ package sls_store import ( "context" + "time" + slsSdk "github.com/aliyun/aliyun-log-go-sdk" + "github.com/gogo/protobuf/proto" + "github.com/hashicorp/go-hclog" "github.com/jaegertracing/jaeger/model" - "time" ) type slsSpanWriter struct { client *slsSdk.Client instance slsTraceInstance maxLookBack time.Duration + logger hclog.Logger } func (s slsSpanWriter) WriteSpan(ctx context.Context, span *model.Span) error { - return nil + if contents, err := convertToSpanLog(span, "", "0.0.0.0"); err != nil { + s.logger.Error("Failed to convert span", "spanID", span.SpanID) + return nil + } else { + e := s.client.PutLogs(s.instance.project(), s.instance.traceLogStore(), contents) + if e != nil { + s.logger.Error("Failed to send log.", "exception", e) + } + return e + } +} + +func convertToSpanLog(span *model.Span, topic, source string) (*slsSdk.LogGroup, error) { + if logs, err := spanToLog(span); err == nil { + return &slsSdk.LogGroup{ + Topic: proto.String(topic), + Source: proto.String(source), + Logs: logs, + }, nil + } else { + return nil, err + } +} + +func spanToLog(span *model.Span) ([]*slsSdk.Log, error) { + contents, err := dataConvert.ToSLSSpan(span) + if err != nil { + return nil, err + } + return []*slsSdk.Log{ + { + Time: proto.Uint32(uint32(span.StartTime.Unix())), + Contents: contents, + }, + }, nil } diff --git a/sls_store/sls_storage_plugin.go b/sls_store/sls_storage_plugin.go index a65a010..3fe9dab 100644 --- a/sls_store/sls_storage_plugin.go +++ b/sls_store/sls_storage_plugin.go @@ -1,15 +1,12 @@ package sls_store import ( + "time" + slsSdk "github.com/aliyun/aliyun-log-go-sdk" + "github.com/hashicorp/go-hclog" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" - "time" -) - -const ( - DefaultRetryTimeOut = 30 - DefaultRequestTimeOut = 30 ) type SlsJaegerStoragePlugin struct { @@ -19,10 +16,11 @@ type SlsJaegerStoragePlugin struct { project string instance slsTraceInstance maxLookBack time.Duration + logger hclog.Logger } func NewSLSStorageForJaegerPlugin(endpoint string, accessKeyID string, accessSecret string, - project string, instance string, maxLookBack time.Duration) *SlsJaegerStoragePlugin { + project string, instance string, maxLookBack time.Duration, logger hclog.Logger) *SlsJaegerStoragePlugin { return &SlsJaegerStoragePlugin{ endpoint: endpoint, accessKeyID: accessKeyID, @@ -30,6 +28,7 @@ func NewSLSStorageForJaegerPlugin(endpoint string, accessKeyID string, accessSec project: project, instance: newSlsTraceInstance(project, instance), maxLookBack: maxLookBack, + logger: logger, } } @@ -38,6 +37,7 @@ func (s SlsJaegerStoragePlugin) ArchiveSpanReader() spanstore.Reader { client: buildSLSSdkClient(s), instance: s.instance, maxLookBack: s.maxLookBack, + logger: s.logger, } } @@ -46,20 +46,25 @@ func (s SlsJaegerStoragePlugin) ArchiveSpanWriter() spanstore.Writer { client: buildSLSSdkClient(s), instance: s.instance, maxLookBack: s.maxLookBack, + logger: s.logger, } } func (s SlsJaegerStoragePlugin) SpanReader() spanstore.Reader { return &slsSpanReader{ - client: buildSLSSdkClient(s), - instance: s.instance, + client: buildSLSSdkClient(s), + instance: s.instance, + maxLookBack: s.maxLookBack, + logger: s.logger, } } func (s SlsJaegerStoragePlugin) SpanWriter() spanstore.Writer { return &slsSpanWriter{ - client: buildSLSSdkClient(s), - instance: s.instance, + client: buildSLSSdkClient(s), + instance: s.instance, + maxLookBack: s.maxLookBack, + logger: s.logger, } } @@ -67,6 +72,7 @@ func (s SlsJaegerStoragePlugin) DependencyReader() dependencystore.Reader { return &slsDependencyReader{ client: buildSLSSdkClient(s), instance: s.instance, + logger: s.logger, } } @@ -75,7 +81,6 @@ func buildSLSSdkClient(s SlsJaegerStoragePlugin) *slsSdk.Client { Endpoint: s.endpoint, AccessKeyID: s.accessKeyID, AccessKeySecret: s.accessSecret, - SecurityToken: s.accessSecret, RequestTimeOut: DefaultRequestTimeOut, RetryTimeOut: DefaultRetryTimeOut, } diff --git a/sls_store/span_data_converter.go b/sls_store/span_data_converter.go new file mode 100644 index 0000000..f76966b --- /dev/null +++ b/sls_store/span_data_converter.go @@ -0,0 +1,331 @@ +package sls_store + +import ( + "encoding/json" + "fmt" + "time" + + slsSdk "github.com/aliyun/aliyun-log-go-sdk" + "github.com/gogo/protobuf/proto" + "github.com/jaegertracing/jaeger/model" + "github.com/spf13/cast" +) + +type DataConverter interface { + ToJaegerSpan(data map[string]string) (*model.Span, error) + + ToSLSSpan(span *model.Span) ([]*slsSdk.LogContent, error) +} + +var dataConvert = &dataConverterImpl{} + +type dataConverterImpl struct { +} + +func (dataConverterImpl) ToJaegerSpan(log map[string]string) (*model.Span, error) { + span := model.Span{} + process := model.Process{ + Tags: make([]model.KeyValue, 0), + } + + for k, v := range log { + switch k { + case TraceID: + traceID, err := model.TraceIDFromString(v) + if err != nil { + logger.Warn("Failed to convert traceId", "key", k, "value", v) + return nil, err + } + span.TraceID = traceID + break + case SpanID: + spanID, err := model.SpanIDFromString(v) + if err != nil { + logger.Warn("Failed to convert spanID", "key", k, "value", v) + return nil, err + } + span.SpanID = spanID + break + case OperationName: + span.OperationName = v + break + case Flags: + span.Flags = model.Flags(cast.ToUint64(v)) + break + case StartTime: + span.StartTime = model.EpochMicrosecondsAsTime(cast.ToUint64(v)) + break + case Duration: + span.Duration = model.MicrosecondsAsDuration(cast.ToUint64(v)) + break + case ServiceName: + process.ServiceName = v + break + case Links: + refs, err := unmarshalReferences(v) + if err != nil { + logger.Warn("Failed to convert links", "key", k, "value", v, "exception", err) + return nil, err + } + span.References = refs + break + case Logs: + logs, err := unmarshalLogs(v) + if err != nil { + logger.Warn("Failed to convert logs", "key", k, "value", v, "exception", err) + return nil, err + } + span.Logs = logs + break + case StatusMessageField: + if v != "" { + span.Warnings = append(span.Warnings, v) + } + break + case Attribute: + span.Tags = unmarshalTags(v) + break + case Resource: + process.Tags, span.ProcessID = unmarshalResource(v) + break + case StatusCodeField: + if v == "ERROR" { + span.Warnings = append(span.Warnings, v) + } + } + } + + span.Process = &process + return &span, nil +} + +func (dataConverterImpl) ToSLSSpan(span *model.Span) ([]*slsSdk.LogContent, error) { + contents := make([]*slsSdk.LogContent, 0) + contents = appendAttributeToLogContent(contents, TraceID, TraceIDToString(&span.TraceID)) + contents = appendAttributeToLogContent(contents, SpanID, span.SpanID.String()) + contents = appendAttributeToLogContent(contents, ParentSpanID, span.ParentSpanID().String()) + contents = appendAttributeToLogContent(contents, OperationName, span.OperationName) + contents = appendAttributeToLogContent(contents, Flags, fmt.Sprintf("%d", span.Flags)) + contents = appendAttributeToLogContent(contents, StartTime, cast.ToString(span.StartTime.UnixNano()/1000)) + contents = appendAttributeToLogContent(contents, Duration, cast.ToString(span.Duration.Nanoseconds()/1000)) + contents = appendAttributeToLogContent(contents, EndTime, cast.ToString((span.StartTime.UnixNano()+span.Duration.Nanoseconds())/1000)) + contents = appendAttributeToLogContent(contents, ServiceName, span.Process.ServiceName) + contents = appendAttributeToLogContent(contents, StatusCode, "UNSET") + contents = appendAttributeToLogContent(contents, Attribute, marshalTags(span.Tags)) + contents = appendAttributeToLogContent(contents, Resource, marshalResource(span.Process.Tags, span.ProcessID)) + + if refStr, err := marshalReferences(span.References); err != nil { + logger.Warn("Failed to convert references", "spanID", span.SpanID, "reference", span.References, "exception", err) + return nil, err + } else { + contents = appendAttributeToLogContent(contents, Links, refStr) + } + + if logsStr, err := marshalLogs(span.Logs); err != nil { + logger.Warn("Failed to convert logs", "spanID", span.SpanID, "logs", span.Logs, "exception", err) + return nil, err + } else { + contents = appendAttributeToLogContent(contents, Logs, logsStr) + } + + contents, err := appendWarnings(contents, span.Warnings) + if err != nil { + logger.Warn("Failed to convert warnings", "spanID", span.SpanID, "warnings", span.Warnings, "exception", err) + return nil, err + } + + return contents, nil +} + +func appendWarnings(contents []*slsSdk.LogContent, warnings []string) ([]*slsSdk.LogContent, error) { + if len(warnings) < 1 { + return contents, nil + } + + r, err := json.Marshal(warnings) + if err != nil { + return nil, err + } + + return appendAttributeToLogContent(contents, StatusMessage, string(r)), nil +} + +func marshalResource(v []model.KeyValue, processID string) string { + dataMap := keyValueToMap(v) + dataMap["ProcessID"] = processID + + data, err := json.Marshal(dataMap) + if err != nil { + return fmt.Sprintf("%v", string(data)) + } + + return string(data) +} + +func unmarshalResource(v string) (kvs []model.KeyValue, processID string) { + data := make(map[string]string) + + err := json.Unmarshal([]byte(v), &data) + if err != nil { + kvs = append(kvs, model.String("tags", v)) + return kvs, "" + } + + return mapToKeyValue(data), data["ProcessID"] + +} + +func marshalTags(v []model.KeyValue) string { + dataMap := keyValueToMap(v) + + data, err := json.Marshal(dataMap) + if err != nil { + return fmt.Sprintf("%v", string(data)) + } + + return string(data) +} + +func unmarshalTags(v string) (kvs []model.KeyValue) { + data := make(map[string]string) + + err := json.Unmarshal([]byte(v), &data) + if err != nil { + kvs = append(kvs, model.String("tags", v)) + return + } + + return mapToKeyValue(data) +} + +type SpanLog struct { + Attribute map[string]string `json:"attribute"` + Time int64 `json:"time"` +} + +func marshalLogs(logs []model.Log) (string, error) { + if len(logs) <= 0 { + return "[]", nil + } + + slsLogs := make([]SpanLog, len(logs)) + for i, l := range logs { + slsLogs[i] = SpanLog{ + Time: l.Timestamp.UnixNano(), + Attribute: keyValueToMap(l.Fields), + } + } + + r, err := json.Marshal(slsLogs) + if err != nil { + return "", err + } + + return string(r), nil +} + +func unmarshalLogs(s string) ([]model.Log, error) { + if s == "[]" { + return nil, nil + } + + logs := make([]SpanLog, 0) + if err := json.Unmarshal([]byte(s), &logs); err != nil { + return nil, err + } + + result := make([]model.Log, len(logs)) + for i, log := range logs { + result[i] = model.Log{ + Timestamp: time.Unix(log.Time/1e9, log.Time%1e9), + Fields: mapToKeyValue(log.Attribute), + } + } + return result, nil +} + +func marshalReferences(refs []model.SpanRef) (string, error) { + if len(refs) <= 0 { + return "[]", nil + } + + rs := make([]map[string]string, 0) + + for _, ref := range refs { + r := make(map[string]string) + r["TraceID"] = ref.TraceID.String() + r["SpanID"] = ref.SpanID.String() + r["RefType"] = ref.RefType.String() + rs = append(rs, r) + } + + r, err := json.Marshal(rs) + if err != nil { + return "", err + } + + return string(r), nil +} + +func unmarshalReferences(s string) (refs []model.SpanRef, err error) { + if s == "[]" { + return nil, nil + } + + rs := make([]map[string]string, 0) + + err = json.Unmarshal([]byte(s), &rs) + if err != nil { + return nil, err + } + + for _, r := range rs { + tid, e1 := model.TraceIDFromString(r["TraceID"]) + if e1 != nil { + return nil, e1 + } + + spanID, e2 := model.SpanIDFromString(r["SpanID"]) + if e2 != nil { + return nil, e2 + } + + spanType := model.SpanRefType_value[r["RefType"]] + refs = append(refs, model.SpanRef{ + TraceID: tid, + SpanID: spanID, + RefType: model.SpanRefType(spanType), + }) + } + + return refs, nil +} + +func mapToKeyValue(data map[string]string) []model.KeyValue { + result := make([]model.KeyValue, 0) + for key, value := range data { + result = append(result, model.String(key, value)) + } + + return result +} + +func keyValueToMap(fields []model.KeyValue) map[string]string { + m := make(map[string]string) + for _, keyVal := range fields { + m[keyVal.Key] = keyVal.AsString() + } + return m +} + +func TraceIDToString(t *model.TraceID) string { + return t.String() +} + +func appendAttributeToLogContent(contents []*slsSdk.LogContent, k, v string) []*slsSdk.LogContent { + content := slsSdk.LogContent{ + Key: proto.String(k), + Value: proto.String(v), + } + return append(contents, &content) +} diff --git a/sls_store/utils.go b/sls_store/utils.go new file mode 100644 index 0000000..5d1fecd --- /dev/null +++ b/sls_store/utils.go @@ -0,0 +1,12 @@ +package sls_store + +import ( + "time" +) + +func buildSearchingData(lookback time.Duration) (int64, int64) { + currentTime := time.Now() + to := currentTime.Unix() + from := currentTime.Add(-1 * lookback).Unix() + return from, to +}