From ee77b98d3a0998ae427960d11540ccab4afc9951 Mon Sep 17 00:00:00 2001 From: Khanh Tran <32532742+khanhtc1202@users.noreply.github.com> Date: Mon, 9 Dec 2024 09:06:06 +0700 Subject: [PATCH] PoC pipedv1 starts plugins (#5394) Signed-off-by: khanhtc1202 --- pkg/app/pipedv1/cmd/piped/piped.go | 93 ++++++++++++++++++-- pkg/app/pipedv1/controller/controller.go | 37 ++++++-- pkg/app/pipedv1/controller/planner.go | 16 +--- pkg/app/pipedv1/controller/pluginregistry.go | 78 ---------------- pkg/app/pipedv1/controller/scheduler.go | 3 +- pkg/configv1/piped.go | 20 +++++ 6 files changed, 140 insertions(+), 107 deletions(-) delete mode 100644 pkg/app/pipedv1/controller/pluginregistry.go diff --git a/pkg/app/pipedv1/cmd/piped/piped.go b/pkg/app/pipedv1/cmd/piped/piped.go index dbd2007a0e..b2e2bcfdd6 100644 --- a/pkg/app/pipedv1/cmd/piped/piped.go +++ b/pkg/app/pipedv1/cmd/piped/piped.go @@ -19,14 +19,18 @@ import ( "context" "crypto/tls" "encoding/base64" + "encoding/json" "fmt" + "net" "net/http" "net/http/pprof" "os" "os/exec" "path" "path/filepath" + "strconv" "strings" + "sync" "time" secretmanager "cloud.google.com/go/secretmanager/apiv1" @@ -64,7 +68,9 @@ import ( config "github.com/pipe-cd/pipecd/pkg/configv1" "github.com/pipe-cd/pipecd/pkg/crypto" "github.com/pipe-cd/pipecd/pkg/git" + "github.com/pipe-cd/pipecd/pkg/lifecycle" "github.com/pipe-cd/pipecd/pkg/model" + pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1" "github.com/pipe-cd/pipecd/pkg/rpc" "github.com/pipe-cd/pipecd/pkg/rpc/rpcauth" "github.com/pipe-cd/pipecd/pkg/rpc/rpcclient" @@ -86,6 +92,7 @@ type piped struct { adminPort int pluginServicePort int toolsDir string + pluginsDir string enableDefaultKubernetesCloudProvider bool gracePeriod time.Duration addLoginUserToPasswd bool @@ -102,6 +109,7 @@ func NewCommand() *cobra.Command { adminPort: 9085, pluginServicePort: 9087, toolsDir: path.Join(home, ".piped", "tools"), + pluginsDir: path.Join(home, ".piped", "plugins"), gracePeriod: 30 * time.Second, maxRecvMsgSize: 1024 * 1024 * 10, // 10MB } @@ -168,19 +176,20 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { return err } - // Make gRPC client and connect to the API. + // Make gRPC client and connect to the Control Plane API. apiClient, err := p.createAPIClient(ctx, cfg.APIAddress, cfg.ProjectID, cfg.PipedID, pipedKey, input.Logger) if err != nil { input.Logger.Error("failed to create gRPC client to control plane", zap.Error(err)) return err } + // Setup the tracer provider. + // We don't set the global tracer provider because 3rd-party library may use the global one. tracerProvider, err := p.createTracerProvider(ctx, cfg.APIAddress, cfg.ProjectID, cfg.PipedID, pipedKey) if err != nil { input.Logger.Error("failed to create tracer provider", zap.Error(err)) return err } - // we don't set the global tracer provider because 3rd-party library may use the global one. // Send the newest piped meta to the control-plane. if err := p.sendPipedMeta(ctx, apiClient, cfg, input.Logger); err != nil { @@ -289,11 +298,6 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { eventLister = store.Lister() } - // Start running application live state reporter. - { - // TODO: Implement the live state reporter controller. - } - // Start running plugin service server. { var ( @@ -317,6 +321,51 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { }) } + // Start plugins that registered in the configuration. + { + // Start all plugins and keep their commands to stop them later. + plugins, err := p.runPlugins(ctx, cfg.Plugins, input.Logger) + if err != nil { + input.Logger.Error("failed to run plugins", zap.Error(err)) + return err + } + + group.Go(func() error { + <-ctx.Done() + wg := &sync.WaitGroup{} + for _, plg := range plugins { + wg.Add(1) + go func() { + defer wg.Done() + if err := plg.GracefulStop(p.gracePeriod); err != nil { + input.Logger.Error("failed to stop plugin", zap.Error(err)) + } + }() + } + wg.Wait() + return nil + }) + } + + // Make grpc clients to connect to plugins. + pluginClis := make([]pluginapi.PluginClient, 0, len(cfg.Plugins)) + options := []rpcclient.DialOption{ + rpcclient.WithBlock(), + rpcclient.WithInsecure(), + } + for _, plg := range cfg.Plugins { + cli, err := pluginapi.NewClient(ctx, net.JoinHostPort("localhost", strconv.Itoa(plg.Port)), options...) + if err != nil { + input.Logger.Error("failed to create client to connect plugin", zap.String("plugin", plg.Name), zap.Error(err)) + } + pluginClis = append(pluginClis, cli) + } + + // Start running application live state reporter. + { + // TODO: Implement the live state reporter controller. + } + // Start running application application drift detector. { // TODO: Implement the drift detector controller. @@ -327,6 +376,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { c := controller.NewController( apiClient, gitClient, + pluginClis, deploymentLister, commandLister, notifier, @@ -588,6 +638,35 @@ func (p *piped) loadConfig(ctx context.Context) (*config.PipedSpec, error) { return nil, fmt.Errorf("one of config-file, config-gcp-secret or config-aws-secret must be set") } +func (p *piped) runPlugins(ctx context.Context, pluginsCfg []config.PipedPlugin, logger *zap.Logger) ([]*lifecycle.Command, error) { + plugins := make([]*lifecycle.Command, 0, len(pluginsCfg)) + for _, pCfg := range pluginsCfg { + // Download plugin binary to piped's pluginsDir. + pPath, err := lifecycle.DownloadBinary(pCfg.URL, p.pluginsDir, pCfg.Name, logger) + if err != nil { + return nil, fmt.Errorf("failed to download plugin %s: %w", pCfg.Name, err) + } + + // Build plugin's args. + args := make([]string, 0, 0) + args = append(args, "--piped-plugin-service", net.JoinHostPort("localhost", strconv.Itoa(p.pluginServicePort))) + b, err := json.Marshal(pCfg) + if err != nil { + return nil, fmt.Errorf("failed to prepare plugin %s config: %w", pCfg.Name, err) + } + args = append(args, "--config", string(b)) + + // Run the plugin binary. + cmd, err := lifecycle.RunBinary(ctx, pPath, args) + if err != nil { + return nil, fmt.Errorf("failed to run plugin %s: %w", pCfg.Name, err) + } + + plugins = append(plugins, cmd) + } + return plugins, nil +} + // TODO: Remove this once the decryption task by plugin call to the plugin service is implemented. func (p *piped) initializeSecretDecrypter(cfg *config.PipedSpec) (crypto.Decrypter, error) { sm := cfg.SecretManagement diff --git a/pkg/app/pipedv1/controller/controller.go b/pkg/app/pipedv1/controller/controller.go index 7dd4cf7219..a2827a5f2d 100644 --- a/pkg/app/pipedv1/controller/controller.go +++ b/pkg/app/pipedv1/controller/controller.go @@ -37,6 +37,8 @@ import ( "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" "github.com/pipe-cd/pipecd/pkg/git" "github.com/pipe-cd/pipecd/pkg/model" + pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1" + "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" ) type apiClient interface { @@ -84,12 +86,15 @@ var ( type controller struct { apiClient apiClient - pluginRegistry PluginRegistry gitClient gitClient deploymentLister deploymentLister commandLister commandLister notifier notifier + // gRPC clients to communicate with plugins. + pluginClients []pluginapi.PluginClient + // Map from stage name to the plugin client. + stageBasedPluginsMap map[string]pluginapi.PluginClient // Map from application ID to the planner // of a pending deployment of that application. planners map[string]*planner @@ -121,6 +126,7 @@ type controller struct { func NewController( apiClient apiClient, gitClient gitClient, + pluginClients []pluginapi.PluginClient, deploymentLister deploymentLister, commandLister commandLister, notifier notifier, @@ -131,8 +137,8 @@ func NewController( return &controller{ apiClient: apiClient, - pluginRegistry: DefaultPluginRegistry(), gitClient: gitClient, + pluginClients: pluginClients, deploymentLister: deploymentLister, commandLister: commandLister, notifier: notifier, @@ -166,6 +172,23 @@ func (c *controller) Run(ctx context.Context) error { c.workspaceDir = dir c.logger.Info(fmt.Sprintf("workspace directory was configured to %s", c.workspaceDir)) + // Build the list of stages that can be handled by piped's plugins. + stagesBasedPluginsMap := make(map[string]pluginapi.PluginClient) + for _, plugin := range c.pluginClients { + resp, err := plugin.FetchDefinedStages(ctx, &deployment.FetchDefinedStagesRequest{}) + if err != nil { + return err + } + for _, stage := range resp.GetStages() { + if _, ok := stagesBasedPluginsMap[stage]; ok { + c.logger.Error("duplicated stage name", zap.String("stage", stage)) + return fmt.Errorf("duplicated stage name %s", stage) + } + stagesBasedPluginsMap[stage] = plugin + } + } + c.stageBasedPluginsMap = stagesBasedPluginsMap + ticker := time.NewTicker(c.syncInternal) defer ticker.Stop() c.logger.Info("start syncing planners and schedulers") @@ -410,18 +433,13 @@ func (c *controller) startNewPlanner(ctx context.Context, d *model.Deployment) ( } } - pluginClient, ok := c.pluginRegistry.Plugin(d.Kind) - if !ok { - logger.Error("no plugin client for the application kind", zap.String("kind", d.Kind.String())) - return nil, fmt.Errorf("no plugin client for the application kind %s", d.Kind.String()) - } - planner := newPlanner( d, commitHash, configFilename, workingDir, - pluginClient, + c.pluginClients, // FIXME: Find a way to ensure the plugins only related to deployment. + c.stageBasedPluginsMap, c.apiClient, c.gitClient, c.notifier, @@ -561,6 +579,7 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment) workingDir, c.apiClient, c.gitClient, + c.stageBasedPluginsMap, c.notifier, c.logger, c.tracerProvider, diff --git a/pkg/app/pipedv1/controller/planner.go b/pkg/app/pipedv1/controller/planner.go index 4521b81ba1..2984aef823 100644 --- a/pkg/app/pipedv1/controller/planner.go +++ b/pkg/app/pipedv1/controller/planner.go @@ -90,7 +90,8 @@ func newPlanner( lastSuccessfulCommitHash string, lastSuccessfulConfigFilename string, workingDir string, - pluginClient pluginapi.PluginClient, + pluginClients []pluginapi.PluginClient, + stageBasedPluginsMap map[string]pluginapi.PluginClient, apiClient apiClient, gitClient gitClient, notifier notifier, @@ -106,22 +107,13 @@ func newPlanner( zap.String("working-dir", workingDir), ) - // TODO: Fix this. Passed by args - tmp := make(map[string]pluginapi.PluginClient) - tmp["K8S_SYNC"] = pluginClient - - plugins := make([]pluginapi.PluginClient, 0, len(tmp)) - for _, v := range tmp { - plugins = append(plugins, v) - } - p := &planner{ deployment: d, lastSuccessfulCommitHash: lastSuccessfulCommitHash, lastSuccessfulConfigFilename: lastSuccessfulConfigFilename, workingDir: workingDir, - stageBasedPluginsMap: tmp, - plugins: plugins, + stageBasedPluginsMap: stageBasedPluginsMap, + plugins: pluginClients, apiClient: apiClient, gitClient: gitClient, metadataStore: metadatastore.NewMetadataStore(apiClient, d), diff --git a/pkg/app/pipedv1/controller/pluginregistry.go b/pkg/app/pipedv1/controller/pluginregistry.go deleted file mode 100644 index 436bfda150..0000000000 --- a/pkg/app/pipedv1/controller/pluginregistry.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2024 The PipeCD Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package controller provides a piped component -// that handles all of the not completed deployments by managing a pool of planners and schedulers. -// Whenever a new PENDING deployment is detected, controller spawns a new planner for deciding -// the deployment pipeline and update the deployment status to PLANNED. -// Whenever a new PLANNED deployment is detected, controller spawns a new scheduler -// for scheduling and running its pipeline executors. -package controller - -import ( - "sync" - - "github.com/pipe-cd/pipecd/pkg/model" - pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1" -) - -type PluginRegistry interface { - Plugin(k model.ApplicationKind) (pluginapi.PluginClient, bool) -} - -type pluginRegistry struct { - plugins map[model.ApplicationKind]pluginapi.PluginClient - mu sync.RWMutex -} - -func (r *pluginRegistry) Plugin(k model.ApplicationKind) (pluginapi.PluginClient, bool) { - r.mu.RLock() - defer r.mu.RUnlock() - - e, ok := r.plugins[k] - if !ok { - return nil, false - } - - return e, true -} - -var defaultPluginRegistry = &pluginRegistry{ - plugins: make(map[model.ApplicationKind]pluginapi.PluginClient), -} - -func DefaultPluginRegistry() PluginRegistry { - return defaultPluginRegistry -} - -func init() { - // TODO: Register all available built-in plugins. - - // NOTE: If you want to directry test the plugin, you can use the following code. - - // defaultPluginRegistry.mu.Lock() - // defer defaultPluginRegistry.mu.Unlock() - - // options := []rpcclient.DialOption{ - // rpcclient.WithBlock(), - // rpcclient.WithInsecure(), - // } - - // cli, err := platform.NewClient(context.Background(), "localhost:10000", options...) - // if err != nil { - // panic(err) - // } - - // defaultPluginRegistry.plugins[model.ApplicationKind_KUBERNETES] = cli -} diff --git a/pkg/app/pipedv1/controller/scheduler.go b/pkg/app/pipedv1/controller/scheduler.go index ae04c5de13..b3f8671423 100644 --- a/pkg/app/pipedv1/controller/scheduler.go +++ b/pkg/app/pipedv1/controller/scheduler.go @@ -75,6 +75,7 @@ func newScheduler( workingDir string, apiClient apiClient, gitClient gitClient, + stageBasedPluginsMap map[string]pluginapi.PluginClient, notifier notifier, logger *zap.Logger, tracerProvider trace.TracerProvider, @@ -90,7 +91,7 @@ func newScheduler( s := &scheduler{ deployment: d, workingDir: workingDir, - stageBasedPluginsMap: make(map[string]pluginapi.PluginClient), // TODO: prepare this + stageBasedPluginsMap: stageBasedPluginsMap, apiClient: apiClient, gitClient: gitClient, metadataStore: metadatastore.NewMetadataStore(apiClient, d), diff --git a/pkg/configv1/piped.go b/pkg/configv1/piped.go index dce2fe68fa..b38a2e6dd4 100644 --- a/pkg/configv1/piped.go +++ b/pkg/configv1/piped.go @@ -19,6 +19,7 @@ import ( "encoding/json" "errors" "fmt" + "net/url" "os" "strings" @@ -1295,6 +1296,8 @@ type PipedEventWatcherGitRepo struct { type PipedPlugin struct { // The name of the plugin. Name string `json:"name"` + // Source to download the plugin binary. + URL string `json:"url"` // The port which the plugin listens to. Port int `json:"port"` // The deploy target names. @@ -1311,6 +1314,23 @@ type PipedDeployTarget struct { Config json.RawMessage `json:"config"` } +func (p *PipedPlugin) Validate() error { + if p.Name == "" { + return errors.New("name must be set") + } + if p.URL == "" { + return errors.New("url must be set") + } + u, err := url.Parse(p.URL) + if err != nil { + return fmt.Errorf("invalid plugin url: %w", err) + } + if u.Scheme != "file" && u.Scheme != "https" { + return errors.New("only file and https schemes are supported") + } + return nil +} + // FindDeployTarget finds the deploy target by the given name. func (p *PipedPlugin) FindDeployTarget(name string) *PipedDeployTarget { for _, dt := range p.DeployTargets {