Skip to content

Commit

Permalink
Implement preparing the deploy source in pipedv1 planner / scheduler (#…
Browse files Browse the repository at this point in the history
…5410)

* Implement prepare deploy source in planner

Signed-off-by: Shinnosuke Sawada-Dazai <[email protected]>

* Create tools directory if it does not exist in tool registry

Signed-off-by: Shinnosuke Sawada-Dazai <[email protected]>

* Implement deploy source preparation in scheduler

Signed-off-by: Shinnosuke Sawada-Dazai <[email protected]>

* Start log persister in plugin run method

Signed-off-by: Shinnosuke Sawada-Dazai <[email protected]>

* Refactor deployment source handling in scheduler to use Provider interface

Signed-off-by: Shinnosuke Sawada-Dazai <[email protected]>

* Add fakeDeploySourceProvider

Signed-off-by: Yoshiki Fujikane <[email protected]>

* Import configv1 in the sourceprocessor

Signed-off-by: Yoshiki Fujikane <[email protected]>

* Fix to return nil byte when pipeline is not set

Signed-off-by: Yoshiki Fujikane <[email protected]>

* Nit

Signed-off-by: Yoshiki Fujikane <[email protected]>

* Fix test

Signed-off-by: Yoshiki Fujikane <[email protected]>

---------

Signed-off-by: Shinnosuke Sawada-Dazai <[email protected]>
Signed-off-by: Yoshiki Fujikane <[email protected]>
Co-authored-by: Yoshiki Fujikane <[email protected]>
Co-authored-by: Yoshiki Fujikane <[email protected]>
  • Loading branch information
3 people authored Dec 13, 2024
1 parent 702b984 commit d5ba3d2
Show file tree
Hide file tree
Showing 14 changed files with 222 additions and 64 deletions.
4 changes: 4 additions & 0 deletions pkg/app/pipedv1/cmd/piped/grpcapi/tool_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type templateValues struct {
}

func newToolRegistry(toolsDir string) (*toolRegistry, error) {
if err := os.MkdirAll(toolsDir, 0o755); err != nil {
return nil, fmt.Errorf("failed to create the tools directory: %w", err)
}

tmpDir, err := os.MkdirTemp("", "tool-registry")
if err != nil {
return nil, fmt.Errorf("failed to create a temporary directory: %w", err)
Expand Down
52 changes: 33 additions & 19 deletions pkg/app/pipedv1/controller/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"path/filepath"
"sort"
"time"

Expand All @@ -27,6 +29,7 @@ import (
"go.uber.org/zap"

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/deploysource"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
config "github.com/pipe-cd/pipecd/pkg/configv1"
Expand Down Expand Up @@ -181,27 +184,38 @@ func (p *planner) Run(ctx context.Context) error {
controllermetrics.UpdateDeploymentStatus(p.deployment, p.doneDeploymentStatus)
}()

// TODO: Prepare running deploy source and target deploy source.
// Prepare running deploy source and target deploy source.
var runningDS, targetDS *deployment.DeploymentSource

// repoCfg := config.PipedRepository{
// RepoID: p.deployment.GitPath.Repo.Id,
// Remote: p.deployment.GitPath.Repo.Remote,
// Branch: p.deployment.GitPath.Repo.Branch,
// }

// Prepare target deploy source.
// targetDSP := deploysource.NewProvider(
// filepath.Join(p.workingDir, "deploysource"),
// deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "target", p.deployment.Trigger.Commit.Hash),
// *p.deployment.GitPath,
// nil, // TODO: Revise this secret decryter, is this need?
// )

// targetDS, err := targetDSP.Get(ctx, io.Discard)
// if err != nil {
// return fmt.Errorf("error while preparing deploy source data (%v)", err)
// }
repoCfg := config.PipedRepository{
RepoID: p.deployment.GitPath.Repo.Id,
Remote: p.deployment.GitPath.Repo.Remote,
Branch: p.deployment.GitPath.Repo.Branch,
}

runningDSP := deploysource.NewProvider(
filepath.Join(p.workingDir, "running-deploysource"),
deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "running", p.lastSuccessfulCommitHash),
p.deployment.GetGitPath(), nil, // TODO: pass secret decrypter?
)
rds, err := runningDSP.Get(ctx, io.Discard) // TODO: pass not io.Discard
if err != nil {
// TODO: log error
return fmt.Errorf("error while preparing deploy source data (%v)", err)
}
runningDS = rds.ToPluginDeploySource()

targetDSP := deploysource.NewProvider(
filepath.Join(p.workingDir, "target-deploysource"),
deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "target", p.deployment.Trigger.Commit.Hash),
p.deployment.GetGitPath(), nil, // TODO: pass secret decrypter?
)
tds, err := targetDSP.Get(ctx, io.Discard) // TODO: pass not io.Discard
if err != nil {
// TODO: log error
return fmt.Errorf("error while preparing deploy source data (%v)", err)
}
targetDS = tds.ToPluginDeploySource()

// TODO: Pass running DS as well if need?
out, err := p.buildPlan(ctx, runningDS, targetDS)
Expand Down
53 changes: 45 additions & 8 deletions pkg/app/pipedv1/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"path/filepath"
"time"

"go.opentelemetry.io/otel/attribute"
Expand All @@ -27,6 +29,7 @@ import (
"go.uber.org/zap"

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/deploysource"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
config "github.com/pipe-cd/pipecd/pkg/configv1"
Expand All @@ -47,8 +50,8 @@ type scheduler struct {
metadataStore metadatastore.MetadataStore
notifier notifier

targetDS *deployment.DeploymentSource
runningDS *deployment.DeploymentSource
targetDSP deploysource.Provider
runningDSP deploysource.Provider

// Current status of each stages.
// We stores their current statuses into this field
Expand Down Expand Up @@ -214,9 +217,32 @@ func (s *scheduler) Run(ctx context.Context) error {
)
deploymentStatus = model.DeploymentStatus_DEPLOYMENT_SUCCESS

/// TODO: prepare the targetDS and runningDS
var targetDS *deployment.DeploymentSource
cfg, err := config.DecodeYAML[*config.GenericApplicationSpec](targetDS.GetApplicationConfig())
repoCfg := config.PipedRepository{
RepoID: s.deployment.GitPath.Repo.Id,
Remote: s.deployment.GitPath.Repo.Remote,
Branch: s.deployment.GitPath.Repo.Branch,
}

s.runningDSP = deploysource.NewProvider(
filepath.Join(s.workingDir, "running-deploysource"),
deploysource.NewGitSourceCloner(s.gitClient, repoCfg, "running", s.deployment.RunningCommitHash),
s.deployment.GetGitPath(), nil, // TODO: pass secret decrypter?
)

s.targetDSP = deploysource.NewProvider(
filepath.Join(s.workingDir, "target-deploysource"),
deploysource.NewGitSourceCloner(s.gitClient, repoCfg, "target", s.deployment.Trigger.Commit.Hash),
s.deployment.GetGitPath(), nil, // TODO: pass secret decrypter?
)

ds, err := s.targetDSP.Get(ctx, io.Discard)
if err != nil {
deploymentStatus = model.DeploymentStatus_DEPLOYMENT_FAILURE
statusReason = fmt.Sprintf("Failed to get deploy source at target commit (%v)", err)
s.reportDeploymentCompleted(ctx, deploymentStatus, statusReason, "")
return err
}
cfg, err := config.DecodeYAML[*config.GenericApplicationSpec](ds.ApplicationConfig)
if err != nil {
deploymentStatus = model.DeploymentStatus_DEPLOYMENT_FAILURE
statusReason = fmt.Sprintf("Failed to decode application configuration at target commit (%v)", err)
Expand Down Expand Up @@ -441,6 +467,18 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final
originalStatus = ps.Status
)

rds, err := s.runningDSP.Get(ctx, io.Discard)
if err != nil {
s.logger.Error("failed to get running deployment source", zap.Error(err))
return model.StageStatus_STAGE_FAILURE
}

tds, err := s.targetDSP.Get(ctx, io.Discard)
if err != nil {
s.logger.Error("failed to get target deployment source", zap.Error(err))
return model.StageStatus_STAGE_FAILURE
}

// Check whether to execute the script rollback stage or not.
// If the base stage is executed, the script rollback stage will be executed.
if ps.Rollback {
Expand Down Expand Up @@ -477,7 +515,6 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final
}

// Load the stage configuration.
// TODO: Check this works with pre-defined stages. (stages added to the pipeline without user-defined configuration)
stageConfig, stageConfigFound := s.genericApplicationConfig.GetStageByte(ps.Index)
if !stageConfigFound {
s.logger.Error("Unable to find the stage configuration")
Expand All @@ -493,8 +530,8 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final
Deployment: s.deployment,
Stage: ps,
StageConfig: stageConfig,
RunningDeploymentSource: s.runningDS, // TODO: prepare this
TargetDeploymentSource: s.targetDS, // TODO: prepare this
RunningDeploymentSource: rds.ToPluginDeploySource(),
TargetDeploymentSource: tds.ToPluginDeploySource(),
},
})
if err != nil {
Expand Down
26 changes: 21 additions & 5 deletions pkg/app/pipedv1/controller/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package controller

import (
"context"
"io"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/deploysource"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
config "github.com/pipe-cd/pipecd/pkg/configv1"
"github.com/pipe-cd/pipecd/pkg/model"
Expand Down Expand Up @@ -181,7 +183,7 @@ func TestExecuteStage(t *testing.T) {
expected: model.StageStatus_STAGE_FAILURE,
},
{
name: "stage without config, should be set as failed",
name: "stage without config, should be success",
deployment: &model.Deployment{
Stages: []*model.PipelineStage{
{
Expand All @@ -200,7 +202,7 @@ func TestExecuteStage(t *testing.T) {
Stages: []config.PipelineStage{},
},
},
expected: model.StageStatus_STAGE_FAILURE,
expected: model.StageStatus_STAGE_SUCCESS,
},
}

Expand All @@ -210,7 +212,9 @@ func TestExecuteStage(t *testing.T) {
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
s := &scheduler{
apiClient: &fakeApiClient{},
apiClient: &fakeApiClient{},
targetDSP: &fakeDeploySourceProvider{},
runningDSP: &fakeDeploySourceProvider{},
stageBasedPluginsMap: map[string]pluginapi.PluginClient{
"stage-name": &fakeExecutorPluginClient{},
},
Expand All @@ -237,12 +241,22 @@ func TestExecuteStage(t *testing.T) {
}
}

type fakeDeploySourceProvider struct {
deploysource.Provider
}

func (f *fakeDeploySourceProvider) Get(ctx context.Context, logWriter io.Writer) (*deploysource.DeploySource, error) {
return &deploysource.DeploySource{}, nil
}

func TestExecuteStage_SignalTerminated(t *testing.T) {
logger := zaptest.NewLogger(t)
sig, handler := NewStopSignal()

s := &scheduler{
apiClient: &fakeApiClient{},
apiClient: &fakeApiClient{},
targetDSP: &fakeDeploySourceProvider{},
runningDSP: &fakeDeploySourceProvider{},
stageBasedPluginsMap: map[string]pluginapi.PluginClient{
"stage-name": &fakeExecutorPluginClient{},
},
Expand Down Expand Up @@ -278,7 +292,9 @@ func TestExecuteStage_SignalCancelled(t *testing.T) {
sig, handler := NewStopSignal()

s := &scheduler{
apiClient: &fakeApiClient{},
apiClient: &fakeApiClient{},
targetDSP: &fakeDeploySourceProvider{},
runningDSP: &fakeDeploySourceProvider{},
stageBasedPluginsMap: map[string]pluginapi.PluginClient{
"stage-name": &fakeExecutorPluginClient{},
},
Expand Down
60 changes: 36 additions & 24 deletions pkg/app/pipedv1/deploysource/deploysource.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,26 @@ import (
"sync"

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/sourceprocesser"
"github.com/pipe-cd/pipecd/pkg/config"
config "github.com/pipe-cd/pipecd/pkg/configv1"
"github.com/pipe-cd/pipecd/pkg/model"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
)

type DeploySource struct {
RepoDir string
AppDir string
Revision string
ApplicationConfig *config.Config
GenericApplicationConfig *config.GenericApplicationSpec
RepoDir string
AppDir string
Revision string
ApplicationConfig []byte
ApplicationConfigFilename string
}

func (d *DeploySource) ToPluginDeploySource() *deployment.DeploymentSource {
return &deployment.DeploymentSource{
ApplicationDirectory: d.AppDir,
Revision: d.Revision,
ApplicationConfig: d.ApplicationConfig,
ApplicationConfigFilename: d.ApplicationConfigFilename,
}
}

type Provider interface {
Expand All @@ -50,7 +60,7 @@ type provider struct {
cloner SourceCloner
revisionName string
revision string
appGitPath model.ApplicationGitPath
appGitPath *model.ApplicationGitPath
secretDecrypter secretDecrypter

done bool
Expand All @@ -63,7 +73,7 @@ type provider struct {
func NewProvider(
workingDir string,
cloner SourceCloner,
appGitPath model.ApplicationGitPath,
appGitPath *model.ApplicationGitPath,
sd secretDecrypter,
) Provider {

Expand Down Expand Up @@ -134,7 +144,14 @@ func (p *provider) prepare(ctx context.Context, lw io.Writer) (*DeploySource, er
cfgFileRelPath = p.appGitPath.GetApplicationConfigFilePath()
cfgFileAbsPath = filepath.Join(repoDir, cfgFileRelPath)
)
cfg, err := config.LoadFromYAML(cfgFileAbsPath)

cfgFileContent, err := os.ReadFile(cfgFileAbsPath)
if err != nil {
fmt.Fprintf(lw, "Unable to load the application configuration file at %s (%v)\n", cfgFileRelPath, err)
return nil, err
}
cfg, err := config.DecodeYAML[*config.GenericApplicationSpec](cfgFileContent)

if err != nil {
fmt.Fprintf(lw, "Unable to load the application configuration file at %s (%v)\n", cfgFileRelPath, err)

Expand All @@ -144,11 +161,8 @@ func (p *provider) prepare(ctx context.Context, lw io.Writer) (*DeploySource, er
return nil, err
}

gac, ok := cfg.GetGenericApplication()
if !ok {
fmt.Fprintf(lw, "Invalid application kind %s\n", cfg.Kind)
return nil, fmt.Errorf("unsupport application kind %s", cfg.Kind)
}
gac := cfg.Spec

fmt.Fprintln(lw, "Successfully loaded the application configuration file")

var templProcessors []sourceprocesser.SourceTemplateProcessor
Expand All @@ -172,11 +186,10 @@ func (p *provider) prepare(ctx context.Context, lw io.Writer) (*DeploySource, er
}

return &DeploySource{
RepoDir: repoDir,
AppDir: appDir,
Revision: p.revision,
ApplicationConfig: cfg,
GenericApplicationConfig: &gac,
RepoDir: repoDir,
AppDir: appDir,
Revision: p.revision,
ApplicationConfig: cfgFileContent,
}, nil
}

Expand All @@ -201,10 +214,9 @@ func (p *provider) copy(lw io.Writer) (*DeploySource, error) {
}

return &DeploySource{
RepoDir: dest,
AppDir: filepath.Join(dest, p.appGitPath.Path),
Revision: p.revision,
ApplicationConfig: p.source.ApplicationConfig,
GenericApplicationConfig: p.source.GenericApplicationConfig,
RepoDir: dest,
AppDir: filepath.Join(dest, p.appGitPath.Path),
Revision: p.revision,
ApplicationConfig: p.source.ApplicationConfig,
}, nil
}
2 changes: 1 addition & 1 deletion pkg/app/pipedv1/deploysource/sourcecloner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package deploysource
import (
"context"

"github.com/pipe-cd/pipecd/pkg/config"
config "github.com/pipe-cd/pipecd/pkg/configv1"
"github.com/pipe-cd/pipecd/pkg/git"
)

Expand Down
Loading

0 comments on commit d5ba3d2

Please sign in to comment.