Skip to content

Commit

Permalink
Merge pull request #38 from kenriortega/development
Browse files Browse the repository at this point in the history
Development Otel
  • Loading branch information
kenriortega authored Nov 14, 2021
2 parents 77b7860 + ba17915 commit 6efa59b
Show file tree
Hide file tree
Showing 18 changed files with 654 additions and 218 deletions.
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,9 @@ certbot renew

---


> Yaml file fields
BenchMarking
------------

```bash
ab -c 1000 -n 10000 http://localhost:<proxyPort>/health
```
```
1 change: 1 addition & 0 deletions cmd/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ var (
flagPrevKey = "prevkey"
flagCfgFile = "cfgfile"
flagCfgPath = "cfgpath"
flagMetric = "metric"
)
58 changes: 55 additions & 3 deletions cmd/cli/grpcxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (

"github.com/kenriortega/ngonx/pkg/errors"
"github.com/kenriortega/ngonx/pkg/logger"
"github.com/kenriortega/ngonx/pkg/otelify"
"github.com/spf13/cobra"
"github.com/talos-systems/grpc-proxy/proxy"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand All @@ -25,6 +28,25 @@ var grpcCmd = &cobra.Command{
Use: "grpc",
Short: "Run ngonx as a grpc proxy",
Run: func(cmd *cobra.Command, args []string) {

enableMetric, err := cmd.Flags().GetBool(flagMetric)
if err != nil {
logger.LogError(errors.Errorf("proxy: %v", err).Error())
}
if enableMetric {
// TODO: pass from compile vars
flush := otelify.InitProvider(
"ngonx",
"v0.4.5",
"dev",
// TODO: pass from yml file object
"0.0.0.0:55680",
)
defer flush()
// Exporter Metrics
go otelify.ExposeMetricServer(configFromYaml.ProxyGateway.PortExporterProxy)
}

var opts []grpc.ServerOption

lis, err := net.Listen("tcp", configFromYaml.GrpcProxy.Listener)
Expand All @@ -34,14 +56,19 @@ var grpcCmd = &cobra.Command{

logger.LogInfo(fmt.Sprintf("Proxy running at %q\n", configFromYaml.GrpcProxy.Listener))
simpleBackendGen := func(hostname string) proxy.Backend {
ctx, span := otel.Tracer("grpcxy.simpleBackendGen").Start(context.Background(), "simpleBackendGen")
defer span.End()
traceID := trace.SpanContextFromContext(ctx).TraceID().String()
return &proxy.SingleBackend{
GetConn: func(ctx context.Context) (context.Context, *grpc.ClientConn, error) {
md, _ := metadata.FromIncomingContext(ctx)

outCtx := metadata.NewOutgoingContext(ctx, md.Copy())
if configFromYaml.GrpcSSL.Enable {
creds, sslErr := credentials.NewClientTLSFromFile(
configFromYaml.GrpcClientCert, "")
configFromYaml.GrpcClientCert,
"",
)
if sslErr != nil {
logger.LogError(errors.Errorf("grpc: failed to parse credentials: %v", sslErr).Error())
}
Expand All @@ -51,6 +78,10 @@ var grpcCmd = &cobra.Command{
grpc.WithTransportCredentials(creds),
grpc.WithCodec(proxy.Codec()),
) //nolint: staticcheck
if err != nil {
otelify.InstrumentedError(span, "grpcxy.grpc.DialContext", traceID, err)
}
otelify.InstrumentedInfo(span, "grpcxy.grpc.DialContext", traceID)
return outCtx, conn, err
}
conn, err := grpc.DialContext(
Expand All @@ -59,27 +90,48 @@ var grpcCmd = &cobra.Command{
grpc.WithInsecure(),
grpc.WithCodec(proxy.Codec()),
) //nolint: staticcheck

if err != nil {
otelify.InstrumentedError(span, "grpcxy.grpc.DialContext", traceID, err)
}
otelify.InstrumentedInfo(span, "grpcxy.grpc.DialContext", traceID)
return outCtx, conn, err
},
}
}

director = func(ctx context.Context, fullMethodName string) (proxy.Mode, []proxy.Backend, error) {
ctx, span := otel.Tracer("grpcxy.director").Start(context.Background(), "director")
defer span.End()
traceID := trace.SpanContextFromContext(ctx).TraceID().String()
for _, bkd := range configFromYaml.GrpcEndpoints {
// Make sure we never forward internal services.
if !strings.HasPrefix(fullMethodName, bkd.Name) {
otelify.InstrumentedError(
span,
"grpcxy.not.strings.HasPrefix",
traceID,
errors.NewError("Unknown method"),
)

return proxy.One2One, nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if _, exists := md[":authority"]; exists {
otelify.InstrumentedInfo(span, "director.proxy.One2One", traceID)

return proxy.One2One, []proxy.Backend{
simpleBackendGen(bkd.HostURI),
}, nil
}
}
}
otelify.InstrumentedError(
span,
"grpcxy.One2One",
traceID,
errors.NewError("Unknown method"),
)
return proxy.One2One, nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
opts = append(opts,
Expand Down Expand Up @@ -110,7 +162,7 @@ var grpcCmd = &cobra.Command{
}

func init() {

grpcCmd.Flags().Bool(flagMetric, false, "Action for enable metrics OTEL")
rootCmd.AddCommand(grpcCmd)
}

Expand Down
29 changes: 24 additions & 5 deletions cmd/cli/proxy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cli

import (
"context"

domain "github.com/kenriortega/ngonx/internal/proxy/domain"
handlers "github.com/kenriortega/ngonx/internal/proxy/handlers"
services "github.com/kenriortega/ngonx/internal/proxy/services"
Expand All @@ -9,14 +11,32 @@ import (
"github.com/kenriortega/ngonx/pkg/genkey"
"github.com/kenriortega/ngonx/pkg/httpsrv"
"github.com/kenriortega/ngonx/pkg/logger"
"github.com/kenriortega/ngonx/pkg/metric"
"github.com/kenriortega/ngonx/pkg/otelify"
"github.com/spf13/cobra"
)

var proxyCmd = &cobra.Command{
Use: "proxy",
Short: "Run ngonx as a reverse proxy",
Run: func(cmd *cobra.Command, args []string) {
enableMetric, err := cmd.Flags().GetBool(flagMetric)
if err != nil {
logger.LogError(errors.Errorf("proxy: %v", err).Error())
}
if enableMetric {
// TODO: pass from compile vars
flush := otelify.InitProvider(
"ngonx",
"v0.4.5",
"dev",
// TODO: pass from yml file object
"0.0.0.0:55680",
)
defer flush()
// Exporter Metrics
go otelify.ExposeMetricServer(configFromYaml.ProxyGateway.PortExporterProxy)
}

port, err := cmd.Flags().GetInt(flagPort)
if err != nil {
logger.LogError(errors.Errorf("proxy: %v", err).Error())
Expand All @@ -29,15 +49,14 @@ var proxyCmd = &cobra.Command{
if err != nil {
logger.LogError(errors.Errorf("proxy: %v", err).Error())
}
// Exporter Metrics
go metric.ExposeMetricServer(configFromYaml.ProxyGateway.PortExporterProxy)

// proxy logic
engine := configFromYaml.ProxyCache.Engine
securityType := configFromYaml.ProxySecurity.Type
key := configFromYaml.ProxyCache.Key + "_" + securityType

var proxyRepository domain.ProxyRepository
clientBadger := badgerdb.GetBadgerDB(false)
clientBadger := badgerdb.GetBadgerDB(context.Background(), false)
proxyRepository = domain.NewProxyRepository(clientBadger)
h := handlers.ProxyHandler{
Service: services.NewProxyService(proxyRepository),
Expand All @@ -61,7 +80,6 @@ var proxyCmd = &cobra.Command{
}

for _, endpoints := range configFromYaml.ProxyGateway.EnpointsProxy {

h.ProxyGateway(endpoints, engine, key, securityType)
}

Expand Down Expand Up @@ -91,6 +109,7 @@ var proxyCmd = &cobra.Command{
func init() {
proxyCmd.Flags().Int(flagPort, 5000, "Port to serve to run proxy")
proxyCmd.Flags().Bool(flagGenApiKey, false, "Action for generate hash for protected routes")
proxyCmd.Flags().Bool(flagMetric, false, "Action for enable metrics OTEL")
proxyCmd.Flags().String(flagPrevKey, "", "Action for save a previous hash for protected routes to validate JWT")
rootCmd.AddCommand(proxyCmd)

Expand Down
146 changes: 146 additions & 0 deletions examples/otelp/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package main

import (
"context"
"fmt"
"log"
"math/rand"
"net/http"
"time"

"github.com/gorilla/mux"
"github.com/kenriortega/ngonx/pkg/logger"
"github.com/kenriortega/ngonx/pkg/otelify"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

// global vars...gasp!
var addr = "127.0.0.1:8000"
var tracer trace.Tracer
var httpClient http.Client

func main() {
flush := otelify.InitProvider(
"example",
"v0.4.5",
"test",
"0.0.0.0:55680",
)
defer flush()

// initiate globals
tracer = otel.Tracer("example-app")
httpClient = http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)}
// create and start server
server := instrumentedServer(handler)

fmt.Println("listening...")
log.Fatal(server.ListenAndServe())
}

func handler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

longRunningProcess(ctx)

// check cache
if shouldExecute(40) {
url := "http://" + addr + "/"

resp, err := instrumentedGet(ctx, url)
defer resp.Body.Close()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}

// query database
if shouldExecute(40) {
url := "http://" + addr + "/"

resp, err := instrumentedGet(ctx, url)
defer resp.Body.Close()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}

// #nosec
func shouldExecute(percent int) bool {
return rand.Int()%100 < percent
}

func longRunningProcess(ctx context.Context) {
ctx, sp := tracer.Start(ctx, "Long Running Process")
defer sp.End()

time.Sleep(time.Millisecond * 50)
sp.AddEvent("halfway done!")
time.Sleep(time.Millisecond * 50)
}

/***
Server
***/
func instrumentedServer(handler http.HandlerFunc) *http.Server {
// OpenMetrics handler : metrics and exemplars
omHandleFunc := func(w http.ResponseWriter, r *http.Request) {
start := time.Now()

handler.ServeHTTP(w, r)

ctx := r.Context()
traceID := trace.SpanContextFromContext(ctx).TraceID().String()

otelify.MetricRequestLatencyProxy.(prometheus.ExemplarObserver).ObserveWithExemplar(
time.Since(start).Seconds(), prometheus.Labels{"traceID": traceID},
)

// log the trace id with other fields so we can discover traces through logs
logger.LogInfo(
"http request",
zap.String("traceID", traceID),
zap.String("path", r.URL.Path),
zap.Duration("latency", time.Since(start)),
)
}

// OTel handler : traces
otelHandler := otelhttp.NewHandler(http.HandlerFunc(omHandleFunc), "http")

r := mux.NewRouter()
r.Handle("/", otelHandler)
r.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{
EnableOpenMetrics: true,
}))

return &http.Server{
Handler: r,
Addr: "0.0.0.0:8000",
}
}

/***
Client
***/
func instrumentedGet(ctx context.Context, url string) (*http.Response, error) {
// create http request
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
panic(err)
}

return httpClient.Do(req)
}

func handleErr(err error, message string) {
if err != nil {
panic(fmt.Sprintf("%s: %s", err, message))
}
}
Loading

0 comments on commit 6efa59b

Please sign in to comment.