Skip to content

Commit

Permalink
add kubernetes sink
Browse files Browse the repository at this point in the history
  • Loading branch information
mycrEEpy committed Aug 18, 2023
1 parent 05fb53c commit 0c0bc70
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 26 deletions.
38 changes: 37 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ require (
github.com/labstack/echo/v4 v4.11.1
github.com/robfig/cron/v3 v3.0.1
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.28.0
k8s.io/apimachinery v0.28.0
k8s.io/client-go v0.28.0
)

require (
Expand All @@ -27,13 +30,46 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.15.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.21.4 // indirect
github.com/aws/smithy-go v1.14.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/labstack/gommon v0.4.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/net v0.13.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/term v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
145 changes: 140 additions & 5 deletions go.sum

Large diffs are not rendered by default.

51 changes: 46 additions & 5 deletions internal/pgopher/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@ import (

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

type profileCollector struct {
ctx context.Context
logger slog.Logger
target ProfilingTarget
sink Sink
s3Client *s3.Client
ctx context.Context
logger slog.Logger
target ProfilingTarget
sink Sink
s3Client *s3.Client
kubeClient *kubernetes.Clientset
}

func (p profileCollector) Run() {
Expand Down Expand Up @@ -76,6 +81,42 @@ func (p profileCollector) Run() {
p.logger.Error("failed to write to s3 sink", slog.String("err", err.Error()), slog.String("bucket", p.sink.S3SinkOptions.Bucket))
return
}
case "kubernetes":
name := fmt.Sprintf("pgopher-profile-%s", p.target.Name)

client := p.kubeClient.CoreV1().Secrets(p.sink.KubernetesSinkOptions.Namespace)

secret := core.Secret{
ObjectMeta: meta.ObjectMeta{
Name: name,
Namespace: p.sink.KubernetesSinkOptions.Namespace,
},
StringData: make(map[string]string),
}

secret.StringData["profile"] = buf.String()

_, err := client.Get(p.ctx, name, meta.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
_, err := client.Create(p.ctx, &secret, meta.CreateOptions{})
if err != nil {
slog.Error("failed to create secret", slog.String("err", err.Error()))
return
}

return
} else {
slog.Error("failed to get secret", slog.String("err", err.Error()))
return
}
}

_, err = client.Update(p.ctx, &secret, meta.UpdateOptions{})
if err != nil {
slog.Error("failed to update secret", slog.String("err", err.Error()))
return
}
default:
p.logger.Error("unknown sink", slog.String("sink", p.sink.Type))
return
Expand Down
12 changes: 9 additions & 3 deletions internal/pgopher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ type ProfilingTarget struct {
}

type Sink struct {
Type string `yaml:"type"`
FileSinkOptions FileSinkOptions `yaml:"fileSinkOptions"`
S3SinkOptions S3SinkOptions `yaml:"s3SinkOptions"`
Type string `yaml:"type"`
FileSinkOptions FileSinkOptions `yaml:"fileSinkOptions"`
S3SinkOptions S3SinkOptions `yaml:"s3SinkOptions"`
KubernetesSinkOptions KubernetesSinkOptions `yaml:"kubernetesSinkOptions"`
}

type FileSinkOptions struct {
Expand All @@ -36,6 +37,11 @@ type S3SinkOptions struct {
Bucket string `yaml:"bucket"`
}

type KubernetesSinkOptions struct {
APIServerURL string `yaml:"apiServerURL"`
Namespace string `yaml:"namespace"`
}

func DefaultConfig() Config {
return Config{
ListenAddress: ":8000",
Expand Down
37 changes: 33 additions & 4 deletions internal/pgopher/http.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pgopher

import (
"bytes"
"fmt"
"io"
"log/slog"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/labstack/echo/v4"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func readinessProbe(ctx echo.Context) error {
Expand All @@ -25,8 +27,10 @@ func livenessProbe(ctx echo.Context) error {
func (s *Server) handleProfile(ctx echo.Context) error {
profile := ctx.Param("profile")

logger := slog.With(slog.String("profile", profile))

if len(profile) == 0 || strings.Contains(profile, "..") {
slog.Error("invalid profile", slog.String("profile", profile))
logger.Error("invalid profile")
return ctx.NoContent(http.StatusBadRequest)
}

Expand All @@ -39,15 +43,15 @@ func (s *Server) handleProfile(ctx echo.Context) error {
Key: aws.String(fmt.Sprintf("profile=%s/%s.pgo", profile, profile)),
})
if err != nil {
slog.Error("failed to get profile from s3 sink", slog.String("err", err.Error()), slog.String("profile", profile))
logger.Error("failed to get profile from s3 sink", slog.String("err", err.Error()))
return ctx.NoContent(http.StatusInternalServerError)
}

defer resp.Body.Close()

file, err := os.CreateTemp(os.TempDir(), "pgopher-*.pgo")
if err != nil {
slog.Error("failed to create temporary file", slog.String("err", err.Error()), slog.String("profile", profile))
logger.Error("failed to create temporary file", slog.String("err", err.Error()))
return ctx.NoContent(http.StatusInternalServerError)
}

Expand All @@ -56,7 +60,32 @@ func (s *Server) handleProfile(ctx echo.Context) error {

_, err = io.Copy(file, resp.Body)
if err != nil {
slog.Error("failed to write temporary file", slog.String("err", err.Error()), slog.String("profile", profile))
logger.Error("failed to write temporary file", slog.String("err", err.Error()))
return ctx.NoContent(http.StatusInternalServerError)
}

return ctx.File(file.Name())
case "kubernetes":
name := fmt.Sprintf("pgopher-profile-%s", profile)

resp, err := s.kubeClient.CoreV1().Secrets(s.cfg.Sink.KubernetesSinkOptions.Namespace).Get(ctx.Request().Context(), name, meta.GetOptions{})
if err != nil {
logger.Error("failed to get secret", slog.String("err", err.Error()))
return ctx.NoContent(http.StatusInternalServerError)
}

file, err := os.CreateTemp(os.TempDir(), "pgopher-*.pgo")
if err != nil {
logger.Error("failed to create temporary file", slog.String("err", err.Error()))
return ctx.NoContent(http.StatusInternalServerError)
}

defer file.Close()
defer os.Remove(file.Name())

_, err = io.Copy(file, bytes.NewBuffer(resp.Data["profile"]))
if err != nil {
logger.Error("failed to write temporary file", slog.String("err", err.Error()))
return ctx.NoContent(http.StatusInternalServerError)
}

Expand Down
22 changes: 19 additions & 3 deletions internal/pgopher/pgopher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@ import (
"fmt"
"log/slog"
"net/http"
"os"
"sync"

awsConfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/labstack/echo/v4"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

type Server struct {
cfg Config
mux *echo.Echo
s3Client *s3.Client
cfg Config
mux *echo.Echo
s3Client *s3.Client
kubeClient *kubernetes.Clientset
}

func NewServer(cfg Config) (*Server, error) {
Expand All @@ -41,6 +45,18 @@ func NewServer(cfg Config) (*Server, error) {
s.s3Client = s3.NewFromConfig(sdkConfig)
}

if cfg.Sink.Type == "kubernetes" {
kubeCfg, err := clientcmd.BuildConfigFromFlags(cfg.Sink.KubernetesSinkOptions.APIServerURL, os.Getenv("KUBECONFIG"))
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes config: %w", err)
}

s.kubeClient, err = kubernetes.NewForConfig(kubeCfg)
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
}
}

return s, nil
}

Expand Down
11 changes: 6 additions & 5 deletions internal/pgopher/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ func (s *Server) startScheduler(ctx context.Context, wg *sync.WaitGroup) {
logger := slog.With(slog.String("target", target.Name))

_, err := scheduler.AddJob(target.Schedule, profileCollector{
ctx: ctx,
logger: *logger,
target: target,
sink: s.cfg.Sink,
s3Client: s.s3Client,
ctx: ctx,
logger: *logger,
target: target,
sink: s.cfg.Sink,
s3Client: s.s3Client,
kubeClient: s.kubeClient,
})
if err != nil {
logger.Error("failed to create collector for profiling target", slog.String("err", err.Error()))
Expand Down

0 comments on commit 0c0bc70

Please sign in to comment.