Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test CI #751

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,32 @@ jobs:
VERSION=${{ github.ref_name }}
APP_IMAGE=${{ matrix.image.name }}
tags: ghcr.io/${{ github.repository_owner }}/${{ github.event.repository.name }}:${{ github.ref_name }}${{ matrix.image.suffix }}

build-playground:
runs-on: ubuntu-latest
permissions:
packages: write
contents: read
steps:
- uses: actions/checkout@v4

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2

- name: Log in to registry
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Build and push
id: docker-build
uses: docker/build-push-action@v4
with:
push: true
file: ./build/package/Dockerfile_playground
platforms: linux/amd64,linux/arm64
build-args: |
VERSION=${{ github.ref_name }}
tags: ghcr.io/${{ github.repository_owner }}/${{ github.event.repository.name }}-playground:${{ github.ref_name }}
32 changes: 32 additions & 0 deletions build/package/Dockerfile_playground
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
ARG APP_IMAGE=alpine:latest

# Build
FROM --platform=$BUILDPLATFORM golang:1.21-alpine AS build

ARG VERSION
ARG TARGETARCH

WORKDIR /file-d-playground

COPY go.mod go.sum ./

RUN go mod download

COPY . .

ENV CGO_ENABLED=0
ENV GOOS=linux
ENV GOARCH=${TARGETARCH:-amd64}

RUN go build -trimpath \
-ldflags "-X github.com/ozontech/file.d/buildinfo.Version=${VERSION}" \
-o file-d-playground ./cmd/playground

# Deploy
FROM $APP_IMAGE

WORKDIR /file-d-playground

COPY --from=build /file-d-playground/file-d-playground .

CMD [ "./file-d-playground" ]
19 changes: 12 additions & 7 deletions cmd/file.d/file.d_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ func TestThatPluginsAreImported(t *testing.T) {
"throttle",
}
for _, pluginName := range action {
pluginInfo := fd.DefaultPluginRegistry.Get(pipeline.PluginKindAction, pluginName)
pluginInfo, err := fd.DefaultPluginRegistry.Get(pipeline.PluginKindAction, pluginName)
require.NoError(t, err)
require.NotNil(t, pluginInfo)
}

Expand All @@ -192,7 +193,8 @@ func TestThatPluginsAreImported(t *testing.T) {
"kafka",
}
for _, pluginName := range input {
pluginInfo := fd.DefaultPluginRegistry.Get(pipeline.PluginKindInput, pluginName)
pluginInfo, err := fd.DefaultPluginRegistry.Get(pipeline.PluginKindInput, pluginName)
require.NoError(t, err)
require.NotNil(t, pluginInfo)
}

Expand All @@ -207,7 +209,8 @@ func TestThatPluginsAreImported(t *testing.T) {
"stdout",
}
for _, pluginName := range output {
pluginInfo := fd.DefaultPluginRegistry.Get(pipeline.PluginKindOutput, pluginName)
pluginInfo, err := fd.DefaultPluginRegistry.Get(pipeline.PluginKindOutput, pluginName)
require.NoError(t, err)
require.NotNil(t, pluginInfo)
}
}
Expand Down Expand Up @@ -250,8 +253,9 @@ func TestConfigParseValid(t *testing.T) {
tl := tl
t.Run(tl.name, func(t *testing.T) {
t.Parallel()
pluginInfo := fd.DefaultPluginRegistry.Get(tl.kind, tl.name)
_, err := pipeline.GetConfig(pluginInfo, []byte(tl.configJSON), map[string]int{"gomaxprocs": 1, "capacity": 64})
pluginInfo, err := fd.DefaultPluginRegistry.Get(tl.kind, tl.name)
require.NoError(t, err)
_, err = pipeline.GetConfig(pluginInfo, []byte(tl.configJSON), map[string]int{"gomaxprocs": 1, "capacity": 64})
assert.NoError(t, err, "shouldn't be an error")
})
}
Expand Down Expand Up @@ -289,8 +293,9 @@ func TestConfigParseInvalid(t *testing.T) {
tl := tl
t.Run(tl.name, func(t *testing.T) {
t.Parallel()
pluginInfo := fd.DefaultPluginRegistry.Get(tl.kind, tl.name)
_, err := pipeline.GetConfig(pluginInfo, []byte(tl.configJSON), map[string]int{"gomaxprocs": 1, "capacity": 64})
pluginInfo, err := fd.DefaultPluginRegistry.Get(tl.kind, tl.name)
require.NoError(t, err)
_, err = pipeline.GetConfig(pluginInfo, []byte(tl.configJSON), map[string]int{"gomaxprocs": 1, "capacity": 64})
assert.Error(t, err, "should be an error")
})
}
Expand Down
99 changes: 99 additions & 0 deletions cmd/playground/playground.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package main

import (
"context"
"errors"
"math"
"net/http"
"net/http/pprof"
"os/signal"
"syscall"
"time"

"github.com/alecthomas/kingpin"
"github.com/ozontech/file.d/buildinfo"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/playground"
insaneJSON "github.com/ozontech/insane-json"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"
)

var (
addr = kingpin.Flag("addr", "HTTP API server addr").Default(":5950").String()
debugAddr = kingpin.Flag("debug-addr", "The server address that serves metrics and profiling, set 'false' value to disable listening").Default(":5951").String()
)

func main() {
kingpin.Version(buildinfo.Version)
kingpin.Parse()

logger.Infof("Hi! I'm file.d playground version=%s", buildinfo.Version)

insaneJSON.DisableBeautifulErrors = true
insaneJSON.MapUseThreshold = math.MaxInt
insaneJSON.StartNodePoolSize = 16

_, _ = maxprocs.Set(maxprocs.Logger(logger.Warnf))

ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
run()
// Wait for interrupt.
<-ctx.Done()
}

func run() {
lg := logger.Instance.Desugar().Named("playground")

// Start playground API server.
play := playground.NewHandler(lg)
appAPI := appAPIHandler(play)
appAPIWithMetrics := promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, appAPI)
startServer(*addr, appAPIWithMetrics, lg.Named("api"))

if *debugAddr != "off" {
// Start debug server.
debugMux := http.NewServeMux()
debugMux.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{}))
debugMux.HandleFunc("/debug/pprof/", pprof.Index)
debugMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
debugMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
debugMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
debugMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
startServer(*debugAddr, debugMux, lg.Named("debug-api"))
}
}

func startServer(addr string, handler http.Handler, lg *zap.Logger) {
server := &http.Server{
Addr: addr,
Handler: handler,
ReadTimeout: time.Second * 30,
ReadHeaderTimeout: time.Second * 5,
WriteTimeout: time.Second * 30,
IdleTimeout: time.Minute,
MaxHeaderBytes: http.DefaultMaxHeaderBytes,
}

go func() {
lg.Info("starting HTTP server...", zap.String("addr", addr))
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
lg.Fatal("can't serve", zap.Error(err))
}
lg.Info("server stopped")
}()
}

func appAPIHandler(play *playground.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v1/play":
play.ServeHTTP(w, r)
default:
http.Error(w, "", http.StatusNotFound)
}
})
}
48 changes: 33 additions & 15 deletions fd/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
insaneJSON "github.com/ozontech/insane-json"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/atomic"
Expand Down Expand Up @@ -93,13 +94,19 @@ func (f *FileD) addPipeline(name string, config *cfg.PipelineConfig) {

logger.Infof("creating pipeline %q: capacity=%d, stream field=%s, decoder=%s", name, settings.Capacity, settings.StreamField, settings.Decoder)

p := pipeline.New(name, settings, f.registry)
if settings.Pool == pipeline.PoolTypeLowMem {
insaneJSON.StartNodePoolSize = 16
}
p := pipeline.New(name, settings, f.registry, logger.Instance.Named(name).Desugar())
err := f.setupInput(p, config, values)
if err != nil {
logger.Fatalf("can't create pipeline %q: %s", name, err.Error())
}

f.setupActions(p, config, values)
actions := config.Raw.Get("actions")
if err := SetupActions(p, f.plugins, actions, values); err != nil {
logger.Fatalf("can't create pipeline %q: %s", name, err.Error())
}

err = f.setupOutput(p, config, values)
if err != nil {
Expand All @@ -122,7 +129,10 @@ func (f *FileD) setupInput(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineCon
})

for _, actionType := range inputInfo.AdditionalActions {
actionInfo := f.plugins.GetActionByType(actionType)
actionInfo, err := f.plugins.GetActionByType(actionType)
if err != nil {
return err
}

infoCopy := *actionInfo
infoCopy.Config = inputInfo.Config
Expand All @@ -137,25 +147,29 @@ func (f *FileD) setupInput(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineCon
return nil
}

func (f *FileD) setupActions(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineConfig, values map[string]int) {
actions := pipelineConfig.Raw.Get("actions")
func SetupActions(p *pipeline.Pipeline, plugins *PluginRegistry, actions *simplejson.Json, values map[string]int) error {
for index := range actions.MustArray() {
actionJSON := actions.GetIndex(index)
if actionJSON.MustMap() == nil {
logger.Fatalf("empty action #%d for pipeline %q", index, p.Name)
return fmt.Errorf("empty action #%d for pipeline %q", index, p.Name)
}

t := actionJSON.Get("type").MustString()
if t == "" {
logger.Fatalf("action #%d doesn't provide type %q", index, p.Name)
return fmt.Errorf("action #%d doesn't provide type %q", index, p.Name)
}
if err := setupAction(p, plugins, index, t, actionJSON, values); err != nil {
return err
}
f.setupAction(p, index, t, actionJSON, values)
}
return nil
}

func (f *FileD) setupAction(p *pipeline.Pipeline, index int, t string, actionJSON *simplejson.Json, values map[string]int) {
logger.Infof("creating action with type %q for pipeline %q", t, p.Name)
info := f.plugins.GetActionByType(t)
func setupAction(p *pipeline.Pipeline, plugins *PluginRegistry, index int, t string, actionJSON *simplejson.Json, values map[string]int) error {
info, err := plugins.GetActionByType(t)
if err != nil {
return err
}

doIfChecker, err := extractDoIfChecker(actionJSON.Get("do_if"))
if err != nil {
Expand All @@ -164,18 +178,18 @@ func (f *FileD) setupAction(p *pipeline.Pipeline, index int, t string, actionJSO

matchMode := extractMatchMode(actionJSON)
if matchMode == pipeline.MatchModeUnknown {
logger.Fatalf("unknown match_mode value for action %d/%s in pipeline %q", index, t, p.Name)
return fmt.Errorf("unknown match_mode value for action %d/%s", index, t)
}
matchInvert := extractMatchInvert(actionJSON)
conditions, err := extractConditions(actionJSON.Get("match_fields"))
if err != nil {
logger.Fatalf("can't extract conditions for action %d/%s in pipeline %q: %s", index, t, p.Name, err.Error())
return fmt.Errorf("can't extract conditions for action %d/%s: %s", index, t, err.Error())
}
metricName, metricLabels, skipStatus := extractMetrics(actionJSON)
configJSON := makeActionJSON(actionJSON)
config, err := pipeline.GetConfig(info, configJSON, values)
if err != nil {
logger.Fatalf("wrong config for action %d/%s in pipeline %q: %s", index, t, p.Name, err.Error())
return fmt.Errorf("wrong config for action %d/%s in pipeline %q: %s", index, t, p.Name, err.Error())
}

infoCopy := *info
Expand All @@ -192,6 +206,7 @@ func (f *FileD) setupAction(p *pipeline.Pipeline, index int, t string, actionJSO
MatchInvert: matchInvert,
DoIfChecker: doIfChecker,
})
return nil
}

func (f *FileD) setupOutput(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineConfig, values map[string]int) error {
Expand Down Expand Up @@ -228,7 +243,10 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip
return nil, fmt.Errorf("%s doesn't have type", pluginKind)
}
logger.Infof("creating %s with type %q", pluginKind, t)
info := f.plugins.Get(pluginKind, t)
info, err := f.plugins.Get(pluginKind, t)
if err != nil {
return nil, err
}
configJson, err := configJSON.Encode()
if err != nil {
logger.Panicf("can't create config json for %s", t)
Expand Down
17 changes: 8 additions & 9 deletions fd/plugin_registry.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package fd

import (
"fmt"

"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/pipeline"
)
Expand All @@ -13,28 +15,25 @@ type PluginRegistry struct {
plugins map[string]*pipeline.PluginStaticInfo
}

func (r *PluginRegistry) Get(kind pipeline.PluginKind, t string) *pipeline.PluginStaticInfo {
func (r *PluginRegistry) Get(kind pipeline.PluginKind, t string) (*pipeline.PluginStaticInfo, error) {
id := r.MakeID(kind, t)

info := r.plugins[id]
if info == nil {
logger.Fatalf("can't find plugin kind=%s type=%s", kind, t)
return nil
return nil, fmt.Errorf("can't find plugin kind=%s type=%s", kind, t)
}

return info
return info, nil
}

func (r *PluginRegistry) GetActionByType(t string) *pipeline.PluginStaticInfo {
func (r *PluginRegistry) GetActionByType(t string) (*pipeline.PluginStaticInfo, error) {
id := r.MakeID(pipeline.PluginKindAction, t)

info := r.plugins[id]
if info == nil {
logger.Fatalf("can't find action plugin with type %q", t)
return nil
return nil, fmt.Errorf("can't find action plugin with type %q", t)
}

return info
return info, nil
}

func (r *PluginRegistry) RegisterInput(info *pipeline.PluginStaticInfo) {
Expand Down
Loading
Loading