Skip to content

Commit

Permalink
PoC pipedv1 starts plugins (#5394)
Browse files Browse the repository at this point in the history
Signed-off-by: khanhtc1202 <[email protected]>
  • Loading branch information
khanhtc1202 authored Dec 9, 2024
1 parent 25c55a2 commit ee77b98
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 107 deletions.
93 changes: 86 additions & 7 deletions pkg/app/pipedv1/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ import (
"context"
"crypto/tls"
"encoding/base64"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/pprof"
"os"
"os/exec"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

secretmanager "cloud.google.com/go/secretmanager/apiv1"
Expand Down Expand Up @@ -64,7 +68,9 @@ import (
config "github.com/pipe-cd/pipecd/pkg/configv1"
"github.com/pipe-cd/pipecd/pkg/crypto"
"github.com/pipe-cd/pipecd/pkg/git"
"github.com/pipe-cd/pipecd/pkg/lifecycle"
"github.com/pipe-cd/pipecd/pkg/model"
pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1"
"github.com/pipe-cd/pipecd/pkg/rpc"
"github.com/pipe-cd/pipecd/pkg/rpc/rpcauth"
"github.com/pipe-cd/pipecd/pkg/rpc/rpcclient"
Expand All @@ -86,6 +92,7 @@ type piped struct {
adminPort int
pluginServicePort int
toolsDir string
pluginsDir string
enableDefaultKubernetesCloudProvider bool
gracePeriod time.Duration
addLoginUserToPasswd bool
Expand All @@ -102,6 +109,7 @@ func NewCommand() *cobra.Command {
adminPort: 9085,
pluginServicePort: 9087,
toolsDir: path.Join(home, ".piped", "tools"),
pluginsDir: path.Join(home, ".piped", "plugins"),
gracePeriod: 30 * time.Second,
maxRecvMsgSize: 1024 * 1024 * 10, // 10MB
}
Expand Down Expand Up @@ -168,19 +176,20 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
return err
}

// Make gRPC client and connect to the API.
// Make gRPC client and connect to the Control Plane API.
apiClient, err := p.createAPIClient(ctx, cfg.APIAddress, cfg.ProjectID, cfg.PipedID, pipedKey, input.Logger)
if err != nil {
input.Logger.Error("failed to create gRPC client to control plane", zap.Error(err))
return err
}

// Setup the tracer provider.
// We don't set the global tracer provider because 3rd-party library may use the global one.
tracerProvider, err := p.createTracerProvider(ctx, cfg.APIAddress, cfg.ProjectID, cfg.PipedID, pipedKey)
if err != nil {
input.Logger.Error("failed to create tracer provider", zap.Error(err))
return err
}
// we don't set the global tracer provider because 3rd-party library may use the global one.

// Send the newest piped meta to the control-plane.
if err := p.sendPipedMeta(ctx, apiClient, cfg, input.Logger); err != nil {
Expand Down Expand Up @@ -289,11 +298,6 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
eventLister = store.Lister()
}

// Start running application live state reporter.
{
// TODO: Implement the live state reporter controller.
}

// Start running plugin service server.
{
var (
Expand All @@ -317,6 +321,51 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
})
}

// Start plugins that registered in the configuration.
{
// Start all plugins and keep their commands to stop them later.
plugins, err := p.runPlugins(ctx, cfg.Plugins, input.Logger)
if err != nil {
input.Logger.Error("failed to run plugins", zap.Error(err))
return err
}

group.Go(func() error {
<-ctx.Done()
wg := &sync.WaitGroup{}
for _, plg := range plugins {
wg.Add(1)
go func() {
defer wg.Done()
if err := plg.GracefulStop(p.gracePeriod); err != nil {
input.Logger.Error("failed to stop plugin", zap.Error(err))
}
}()
}
wg.Wait()
return nil
})
}

// Make grpc clients to connect to plugins.
pluginClis := make([]pluginapi.PluginClient, 0, len(cfg.Plugins))
options := []rpcclient.DialOption{
rpcclient.WithBlock(),
rpcclient.WithInsecure(),
}
for _, plg := range cfg.Plugins {
cli, err := pluginapi.NewClient(ctx, net.JoinHostPort("localhost", strconv.Itoa(plg.Port)), options...)
if err != nil {
input.Logger.Error("failed to create client to connect plugin", zap.String("plugin", plg.Name), zap.Error(err))
}
pluginClis = append(pluginClis, cli)
}

// Start running application live state reporter.
{
// TODO: Implement the live state reporter controller.
}

// Start running application application drift detector.
{
// TODO: Implement the drift detector controller.
Expand All @@ -327,6 +376,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
c := controller.NewController(
apiClient,
gitClient,
pluginClis,
deploymentLister,
commandLister,
notifier,
Expand Down Expand Up @@ -588,6 +638,35 @@ func (p *piped) loadConfig(ctx context.Context) (*config.PipedSpec, error) {
return nil, fmt.Errorf("one of config-file, config-gcp-secret or config-aws-secret must be set")
}

func (p *piped) runPlugins(ctx context.Context, pluginsCfg []config.PipedPlugin, logger *zap.Logger) ([]*lifecycle.Command, error) {
plugins := make([]*lifecycle.Command, 0, len(pluginsCfg))
for _, pCfg := range pluginsCfg {
// Download plugin binary to piped's pluginsDir.
pPath, err := lifecycle.DownloadBinary(pCfg.URL, p.pluginsDir, pCfg.Name, logger)
if err != nil {
return nil, fmt.Errorf("failed to download plugin %s: %w", pCfg.Name, err)
}

// Build plugin's args.
args := make([]string, 0, 0)
args = append(args, "--piped-plugin-service", net.JoinHostPort("localhost", strconv.Itoa(p.pluginServicePort)))
b, err := json.Marshal(pCfg)
if err != nil {
return nil, fmt.Errorf("failed to prepare plugin %s config: %w", pCfg.Name, err)
}
args = append(args, "--config", string(b))

// Run the plugin binary.
cmd, err := lifecycle.RunBinary(ctx, pPath, args)
if err != nil {
return nil, fmt.Errorf("failed to run plugin %s: %w", pCfg.Name, err)
}

plugins = append(plugins, cmd)
}
return plugins, nil
}

// TODO: Remove this once the decryption task by plugin call to the plugin service is implemented.
func (p *piped) initializeSecretDecrypter(cfg *config.PipedSpec) (crypto.Decrypter, error) {
sm := cfg.SecretManagement
Expand Down
37 changes: 28 additions & 9 deletions pkg/app/pipedv1/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/git"
"github.com/pipe-cd/pipecd/pkg/model"
pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
)

type apiClient interface {
Expand Down Expand Up @@ -84,12 +86,15 @@ var (

type controller struct {
apiClient apiClient
pluginRegistry PluginRegistry
gitClient gitClient
deploymentLister deploymentLister
commandLister commandLister
notifier notifier

// gRPC clients to communicate with plugins.
pluginClients []pluginapi.PluginClient
// Map from stage name to the plugin client.
stageBasedPluginsMap map[string]pluginapi.PluginClient
// Map from application ID to the planner
// of a pending deployment of that application.
planners map[string]*planner
Expand Down Expand Up @@ -121,6 +126,7 @@ type controller struct {
func NewController(
apiClient apiClient,
gitClient gitClient,
pluginClients []pluginapi.PluginClient,
deploymentLister deploymentLister,
commandLister commandLister,
notifier notifier,
Expand All @@ -131,8 +137,8 @@ func NewController(

return &controller{
apiClient: apiClient,
pluginRegistry: DefaultPluginRegistry(),
gitClient: gitClient,
pluginClients: pluginClients,
deploymentLister: deploymentLister,
commandLister: commandLister,
notifier: notifier,
Expand Down Expand Up @@ -166,6 +172,23 @@ func (c *controller) Run(ctx context.Context) error {
c.workspaceDir = dir
c.logger.Info(fmt.Sprintf("workspace directory was configured to %s", c.workspaceDir))

// Build the list of stages that can be handled by piped's plugins.
stagesBasedPluginsMap := make(map[string]pluginapi.PluginClient)
for _, plugin := range c.pluginClients {
resp, err := plugin.FetchDefinedStages(ctx, &deployment.FetchDefinedStagesRequest{})
if err != nil {
return err
}
for _, stage := range resp.GetStages() {
if _, ok := stagesBasedPluginsMap[stage]; ok {
c.logger.Error("duplicated stage name", zap.String("stage", stage))
return fmt.Errorf("duplicated stage name %s", stage)
}
stagesBasedPluginsMap[stage] = plugin
}
}
c.stageBasedPluginsMap = stagesBasedPluginsMap

ticker := time.NewTicker(c.syncInternal)
defer ticker.Stop()
c.logger.Info("start syncing planners and schedulers")
Expand Down Expand Up @@ -410,18 +433,13 @@ func (c *controller) startNewPlanner(ctx context.Context, d *model.Deployment) (
}
}

pluginClient, ok := c.pluginRegistry.Plugin(d.Kind)
if !ok {
logger.Error("no plugin client for the application kind", zap.String("kind", d.Kind.String()))
return nil, fmt.Errorf("no plugin client for the application kind %s", d.Kind.String())
}

planner := newPlanner(
d,
commitHash,
configFilename,
workingDir,
pluginClient,
c.pluginClients, // FIXME: Find a way to ensure the plugins only related to deployment.
c.stageBasedPluginsMap,
c.apiClient,
c.gitClient,
c.notifier,
Expand Down Expand Up @@ -561,6 +579,7 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment)
workingDir,
c.apiClient,
c.gitClient,
c.stageBasedPluginsMap,
c.notifier,
c.logger,
c.tracerProvider,
Expand Down
16 changes: 4 additions & 12 deletions pkg/app/pipedv1/controller/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func newPlanner(
lastSuccessfulCommitHash string,
lastSuccessfulConfigFilename string,
workingDir string,
pluginClient pluginapi.PluginClient,
pluginClients []pluginapi.PluginClient,
stageBasedPluginsMap map[string]pluginapi.PluginClient,
apiClient apiClient,
gitClient gitClient,
notifier notifier,
Expand All @@ -106,22 +107,13 @@ func newPlanner(
zap.String("working-dir", workingDir),
)

// TODO: Fix this. Passed by args
tmp := make(map[string]pluginapi.PluginClient)
tmp["K8S_SYNC"] = pluginClient

plugins := make([]pluginapi.PluginClient, 0, len(tmp))
for _, v := range tmp {
plugins = append(plugins, v)
}

p := &planner{
deployment: d,
lastSuccessfulCommitHash: lastSuccessfulCommitHash,
lastSuccessfulConfigFilename: lastSuccessfulConfigFilename,
workingDir: workingDir,
stageBasedPluginsMap: tmp,
plugins: plugins,
stageBasedPluginsMap: stageBasedPluginsMap,
plugins: pluginClients,
apiClient: apiClient,
gitClient: gitClient,
metadataStore: metadatastore.NewMetadataStore(apiClient, d),
Expand Down
78 changes: 0 additions & 78 deletions pkg/app/pipedv1/controller/pluginregistry.go

This file was deleted.

Loading

0 comments on commit ee77b98

Please sign in to comment.