Skip to content

Commit

Permalink
Init grpc server of wait stage plugin (#5445)
Browse files Browse the repository at this point in the history
* init wait plugin

Signed-off-by: t-kikuc <[email protected]>

* Improve signal handling and logging in wait plugin execution

Signed-off-by: t-kikuc <[email protected]>

* refactor wait.go

Signed-off-by: t-kikuc <[email protected]>

* Handle signals

Signed-off-by: t-kikuc <[email protected]>

* TODO: add metadataStore

Signed-off-by: t-kikuc <[email protected]>

* Concentrate on Init and server funcs

Signed-off-by: t-kikuc <[email protected]>

* fix golangci errors

Signed-off-by: t-kikuc <[email protected]>

---------

Signed-off-by: t-kikuc <[email protected]>
  • Loading branch information
t-kikuc authored Dec 25, 2024
1 parent b0df29e commit ef4f6d3
Show file tree
Hide file tree
Showing 4 changed files with 339 additions and 0 deletions.
108 changes: 108 additions & 0 deletions pkg/app/pipedv1/plugin/wait/execute/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package execute

import (
"context"
"time"

"go.uber.org/zap"
"google.golang.org/grpc"

config "github.com/pipe-cd/pipecd/pkg/configv1"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
"github.com/pipe-cd/pipecd/pkg/plugin/logpersister"
"github.com/pipe-cd/pipecd/pkg/plugin/signalhandler"
)

type deploymentServiceServer struct {
deployment.UnimplementedDeploymentServiceServer

pluginConfig *config.PipedPlugin

logger *zap.Logger
logPersister logPersister
}

type logPersister interface {
StageLogPersister(deploymentID, stageID string) logpersister.StageLogPersister
}

// NewDeploymentService creates a new deploymentServiceServer of Wait Stage plugin.
func NewDeploymentService(
config *config.PipedPlugin,
logger *zap.Logger,
logPersister logPersister,
) *deploymentServiceServer {
return &deploymentServiceServer{
pluginConfig: config,
logger: logger.Named("wait-stage-plugin"),
logPersister: logPersister,
}
}

// Register registers all handling of this service into the specified gRPC server.
func (s *deploymentServiceServer) Register(server *grpc.Server) {
deployment.RegisterDeploymentServiceServer(server, s)
}

// ExecuteStage implements deployment.ExecuteStage.
func (s *deploymentServiceServer) ExecuteStage(ctx context.Context, request *deployment.ExecuteStageRequest) (response *deployment.ExecuteStageResponse, err error) {
slp := s.logPersister.StageLogPersister(request.Input.GetDeployment().GetId(), request.Input.GetStage().GetId())
defer func() {
// When termination signal received and the stage is not completed yet, we should not mark the log persister as completed.
// This can occur when the piped is shutting down while the stage is still running.
if !response.GetStatus().IsCompleted() && signalhandler.Terminated() {
return
}
slp.Complete(time.Minute)
}()

status := s.execute(ctx, request.Input, slp)
return &deployment.ExecuteStageResponse{
Status: status,
}, nil
}

// FetchDefinedStages implements deployment.FetchDefinedStages.
func (s *deploymentServiceServer) FetchDefinedStages(ctx context.Context, request *deployment.FetchDefinedStagesRequest) (*deployment.FetchDefinedStagesResponse, error) {
return &deployment.FetchDefinedStagesResponse{
Stages: []string{string(stageWait)},
}, nil
}

// DetermineVersions implements deployment.DeploymentServiceServer.
func (s *deploymentServiceServer) DetermineVersions(ctx context.Context, request *deployment.DetermineVersionsRequest) (*deployment.DetermineVersionsResponse, error) {
// TODO: Implement this func
return &deployment.DetermineVersionsResponse{}, nil
}

// DetermineStrategy implements deployment.DeploymentServiceServer.
func (s *deploymentServiceServer) DetermineStrategy(ctx context.Context, request *deployment.DetermineStrategyRequest) (*deployment.DetermineStrategyResponse, error) {
// TODO: Implement this func
return &deployment.DetermineStrategyResponse{}, nil
}

// BuildPipelineSyncStages implements deployment.BuildPipelineSyncStages.
func (s *deploymentServiceServer) BuildPipelineSyncStages(ctx context.Context, request *deployment.BuildPipelineSyncStagesRequest) (*deployment.BuildPipelineSyncStagesResponse, error) {
// TODO: Implement this func
return &deployment.BuildPipelineSyncStagesResponse{}, nil
}

// BuildQuickSyncStages implements deployment.BuildQuickSyncStages.
func (s *deploymentServiceServer) BuildQuickSyncStages(ctx context.Context, request *deployment.BuildQuickSyncStagesRequest) (*deployment.BuildQuickSyncStagesResponse, error) {
// TODO: Implement this func
return &deployment.BuildQuickSyncStagesResponse{}, nil
}
35 changes: 35 additions & 0 deletions pkg/app/pipedv1/plugin/wait/execute/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package execute

import (
"context"

"github.com/pipe-cd/pipecd/pkg/app/piped/logpersister"
"github.com/pipe-cd/pipecd/pkg/model"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
)

type Stage string

const (
stageWait Stage = "WAIT"
)

// Execute starts waiting for the specified duration.
func (s *deploymentServiceServer) execute(ctx context.Context, in *deployment.ExecutePluginInput, slp logpersister.StageLogPersister) model.StageStatus {
// TOD: implement the logic of waiting
return model.StageStatus_STAGE_FAILURE
}
34 changes: 34 additions & 0 deletions pkg/app/pipedv1/plugin/wait/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"log"

"github.com/pipe-cd/pipecd/pkg/cli"
)

func main() {
app := cli.NewApp(
"pipecd-plugin-stage-wait",
"Plugin component to execute Wait Stage.",
)
app.AddCommands(
newPluginCommand(),
)
if err := app.Run(); err != nil {
log.Fatal(err)
}
}
162 changes: 162 additions & 0 deletions pkg/app/pipedv1/plugin/wait/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"net/http"
"net/http/pprof"
"time"

"github.com/spf13/cobra"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/pipe-cd/pipecd/pkg/admin"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/wait/execute"
"github.com/pipe-cd/pipecd/pkg/cli"
config "github.com/pipe-cd/pipecd/pkg/configv1"
"github.com/pipe-cd/pipecd/pkg/plugin/logpersister"
"github.com/pipe-cd/pipecd/pkg/plugin/pipedapi"
"github.com/pipe-cd/pipecd/pkg/rpc"
"github.com/pipe-cd/pipecd/pkg/version"
)

type plugin struct {
pipedPluginService string
gracePeriod time.Duration
tls bool
certFile string
keyFile string
config string

enableGRPCReflection bool
}

// newPluginCommand creates a new cobra command for executing api server.
func newPluginCommand() *cobra.Command {
s := &plugin{
gracePeriod: 30 * time.Second,
}
cmd := &cobra.Command{
Use: "start",
Short: "Start running the wait-stage-plugin.",
RunE: cli.WithContext(s.run),
}

cmd.Flags().StringVar(&s.pipedPluginService, "piped-plugin-service", s.pipedPluginService, "The port number used to connect to the piped plugin service.") // TODO: we should discuss about the name of this flag, or we should use environment variable instead.
cmd.Flags().StringVar(&s.config, "config", s.config, "The configuration for the plugin.")
cmd.Flags().DurationVar(&s.gracePeriod, "grace-period", s.gracePeriod, "How long to wait for graceful shutdown.")

cmd.Flags().BoolVar(&s.tls, "tls", s.tls, "Whether running the gRPC server with TLS or not.")
cmd.Flags().StringVar(&s.certFile, "cert-file", s.certFile, "The path to the TLS certificate file.")
cmd.Flags().StringVar(&s.keyFile, "key-file", s.keyFile, "The path to the TLS key file.")

// For debugging early in development
cmd.Flags().BoolVar(&s.enableGRPCReflection, "enable-grpc-reflection", s.enableGRPCReflection, "Whether to enable the reflection service or not.")

cmd.MarkFlagRequired("piped-plugin-service")
cmd.MarkFlagRequired("config")

return cmd
}

func (s *plugin) run(ctx context.Context, input cli.Input) (runErr error) {
// Make a cancellable context.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

group, ctx := errgroup.WithContext(ctx)

pipedapiClient, err := pipedapi.NewClient(ctx, s.pipedPluginService)
if err != nil {
input.Logger.Error("failed to create piped plugin service client", zap.Error(err))
return err
}

// Load the configuration.
cfg, err := config.ParsePluginConfig(s.config)
if err != nil {
input.Logger.Error("failed to parse the configuration", zap.Error(err))
return err
}

// Start running admin server.
{
var (
ver = []byte(version.Get().Version) // TODO: get the plugin's version
admin = admin.NewAdmin(0, s.gracePeriod, input.Logger) // TODO: add config for admin port
)

admin.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
w.Write(ver)
})
admin.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ok"))
})
admin.HandleFunc("/debug/pprof/", pprof.Index)
admin.HandleFunc("/debug/pprof/profile", pprof.Profile)
admin.HandleFunc("/debug/pprof/trace", pprof.Trace)

group.Go(func() error {
return admin.Run(ctx)
})
}

// Start log persister
persister := logpersister.NewPersister(pipedapiClient, input.Logger)
group.Go(func() error {
return persister.Run(ctx)
})

// Start a gRPC server for handling external API requests.
{
var (
service = execute.NewDeploymentService(
cfg,
input.Logger,
persister,
)
opts = []rpc.Option{
rpc.WithPort(cfg.Port),
rpc.WithGracePeriod(s.gracePeriod),
rpc.WithLogger(input.Logger),
rpc.WithLogUnaryInterceptor(input.Logger),
rpc.WithRequestValidationUnaryInterceptor(),
rpc.WithSignalHandlingUnaryInterceptor(),
}
)
if s.tls {
opts = append(opts, rpc.WithTLS(s.certFile, s.keyFile))
}
if s.enableGRPCReflection {
opts = append(opts, rpc.WithGRPCReflection())
}
if input.Flags.Metrics {
opts = append(opts, rpc.WithPrometheusUnaryInterceptor())
}

server := rpc.NewServer(service, opts...)
group.Go(func() error {
return server.Run(ctx)
})
}

if err := group.Wait(); err != nil {
input.Logger.Error("failed while running", zap.Error(err))
return err
}
return nil
}

0 comments on commit ef4f6d3

Please sign in to comment.