Skip to content

Commit

Permalink
monitoring setup configure mtls and assign unique identifiers to otlp…
Browse files Browse the repository at this point in the history
… service names for trace display (#289)

* updated initProvider function of the Virtual Kubelet to handle mTLS authentication; updated the name of the OTLP Service

* updated initProvider function of InterLink.

* updated initProvider of InterLink to wait the grpc connection to be ready to continue

---------

Co-authored-by: Diego Ciangottini <[email protected]>
  • Loading branch information
Bianco95 and dciangot authored Sep 16, 2024
1 parent c146e07 commit b9526ab
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 8 deletions.
71 changes: 67 additions & 4 deletions cmd/interlink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package main
import (
"context"
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
Expand All @@ -13,13 +15,16 @@ import (
"syscall"
"time"

"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/virtual-kubelet/virtual-kubelet/log"
logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"github.com/virtual-kubelet/virtual-kubelet/trace/opentelemetry"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

types "github.com/intertwin-eu/interlink/pkg/interlink"
"github.com/intertwin-eu/interlink/pkg/interlink/api"
Expand All @@ -34,10 +39,23 @@ import (
)

func initProvider(ctx context.Context) (func(context.Context) error, error) {
log.G(ctx).Info("Tracing is enabled, setting up the TracerProvider")

// Get the TELEMETRY_UNIQUE_ID from the environment, if it is not set, use the hostname
uniqueID := os.Getenv("TELEMETRY_UNIQUE_ID")
if uniqueID == "" {
log.G(ctx).Info("No TELEMETRY_UNIQUE_ID set, generating a new one")
newUUID := uuid.New()
uniqueID = newUUID.String()
log.G(ctx).Info("Generated unique ID: ", uniqueID, " use InterLink-Plugin-"+uniqueID+" as service name from Grafana")
}

serviceName := "InterLink-Plugin-" + uniqueID

res, err := resource.New(ctx,
resource.WithAttributes(
// the service name used to display traces in backends
semconv.ServiceName("InterLink-API"),
semconv.ServiceName(serviceName),
),
)
if err != nil {
Expand All @@ -53,11 +71,56 @@ func initProvider(ctx context.Context) (func(context.Context) error, error) {
otlpEndpoint = "localhost:4317"
}

fmt.Println("TELEMETRY_ENDPOINT: ", otlpEndpoint)
log.G(ctx).Info("TELEMETRY_ENDPOINT: ", otlpEndpoint)

caCrtFilePath := os.Getenv("TELEMETRY_CA_CRT_FILEPATH")

conn := &grpc.ClientConn{}
creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock())
if caCrtFilePath != "" {

// if the CA certificate is provided, set up mutual TLS

log.G(ctx).Info("CA certificate provided, setting up mutual TLS")

caCert, err := ioutil.ReadFile(caCrtFilePath)
if err != nil {
return nil, fmt.Errorf("failed to load CA certificate: %w", err)
}

clientKeyFilePath := os.Getenv("TELEMETRY_CLIENT_KEY_FILEPATH")
if clientKeyFilePath == "" {
return nil, fmt.Errorf("client key file path not provided. Since a CA certificate is provided, a client key is required for mutual TLS")
}

clientCrtFilePath := os.Getenv("TELEMETRY_CLIENT_CRT_FILEPATH")
if clientCrtFilePath == "" {
return nil, fmt.Errorf("client certificate file path not provided. Since a CA certificate is provided, a client certificate is required for mutual TLS")
}

certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to append CA certificate")
}

cert, err := tls.LoadX509KeyPair(clientCrtFilePath, clientKeyFilePath)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %w", err)
}

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: certPool,
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: true,
}
creds := credentials.NewTLS(tlsConfig)
conn, err = grpc.NewClient(otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock())

} else {
conn, err = grpc.NewClient(otlpEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

conn.WaitForStateChange(ctx, connectivity.Ready)

if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
Expand Down
80 changes: 76 additions & 4 deletions cmd/virtual-kubelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package main
import (
"context"
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io/ioutil"
"net"
"os"
"path"
Expand All @@ -29,7 +31,9 @@ import (

// "k8s.io/client-go/rest"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -65,6 +69,8 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"

"github.com/google/uuid"
)

func PodInformerFilter(node string) informers.SharedInformerOption {
Expand Down Expand Up @@ -95,10 +101,29 @@ type Opts struct {
}

func initProvider(ctx context.Context) (func(context.Context) error, error) {

log.G(ctx).Info("Tracing is enabled, setting up the TracerProvider")

// Get the TELEMETRY_UNIQUE_ID from the environment, if it is not set, use the hostname
uniqueID := os.Getenv("TELEMETRY_UNIQUE_ID")
if uniqueID == "" {
log.G(ctx).Info("No TELEMETRY_UNIQUE_ID set, generating a new one")
newUUID := uuid.New()
uniqueID = newUUID.String()
log.G(ctx).Info("Generated unique ID: ", uniqueID, " use VK-InterLink-"+uniqueID+" as service name from Grafana")
}

// Create a new resource with the service name set to the TELEMETRY_UNIQUE_ID
// The nomenclature VK-InterLink-<TELEMETRY_UNIQUE_ID> is used to identify the service in Grafana.
// VK-InterLink-<TELEMETRY_UNIQUE_ID> means that the traces are coming from Virtual Kubelet
// and are related to the call that are made for the InterLink API service

serviceName := "VK-InterLink-" + uniqueID

res, err := resource.New(ctx,
resource.WithAttributes(
// the service name used to display traces in backends
semconv.ServiceName("InterLink-VK"),
semconv.ServiceName(serviceName),
),
)
if err != nil {
Expand All @@ -114,11 +139,58 @@ func initProvider(ctx context.Context) (func(context.Context) error, error) {
otlpEndpoint = "localhost:4317"
}

fmt.Println("TELEMETRY_ENDPOINT: ", otlpEndpoint)
log.G(ctx).Info("TELEMETRY_ENDPOINT: ", otlpEndpoint)

caCrtFilePath := os.Getenv("TELEMETRY_CA_CRT_FILEPATH")

conn := &grpc.ClientConn{}
creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock())
if caCrtFilePath != "" {

// if the CA certificate is provided, set up mutual TLS

log.G(ctx).Info("CA certificate provided, setting up mutual TLS")

caCert, err := ioutil.ReadFile(caCrtFilePath)
if err != nil {
return nil, fmt.Errorf("failed to load CA certificate: %w", err)
}

clientKeyFilePath := os.Getenv("TELEMETRY_CLIENT_KEY_FILEPATH")
if clientKeyFilePath == "" {
return nil, fmt.Errorf("client key file path not provided. Since a CA certificate is provided, a client key is required for mutual TLS")
}

clientCrtFilePath := os.Getenv("TELEMETRY_CLIENT_CRT_FILEPATH")
if clientCrtFilePath == "" {
return nil, fmt.Errorf("client certificate file path not provided. Since a CA certificate is provided, a client certificate is required for mutual TLS")
}

certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to append CA certificate")
}

cert, err := tls.LoadX509KeyPair(clientCrtFilePath, clientKeyFilePath)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %w", err)
}

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: certPool,
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: true,
}
creds := credentials.NewTLS(tlsConfig)
conn, err = grpc.NewClient(otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock())

} else {
// if the CA certificate is not provided, use an insecure connection
// this means that the telemetry collector is not using a certificate, i.e. is inside the k8s cluster
conn, err = grpc.NewClient(otlpEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

conn.WaitForStateChange(ctx, connectivity.Ready)

if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
Expand Down

0 comments on commit b9526ab

Please sign in to comment.