Skip to content

Commit

Permalink
Add etcd and image configuration (#247)
Browse files Browse the repository at this point in the history
* Introduce viper

* Update error message

* set env prefix to 'pg' and add an env key replacer (e.g. a get of 'my-key' will try to get the 'PG_MY_KEY' env var)

* Temporarily remove viper env prefix for testing

* Temporarily remove viper env prefix for testing

* Bind env vars

* Revert "Bind env vars"

This reverts commit e457ab4.

* Bind env vars... the easy way

* go mod tidy

* Add servicemonitor CRD to test cluster

* Add TODOs

* Add configurable docker_image and etcd_host params

* Add crd validation config option

* Force synchronous mode

* Fix feature flag

* Introduce struc to pass along options to operator manager

* Introduce struc to pass along options to lb manager

* Introduce configuration of operator image

* Switch from reference to value

* Switch from reference to value
  • Loading branch information
eberlep authored Jul 8, 2021
1 parent 88d7d24 commit ee4b0cb
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 45 deletions.
5 changes: 5 additions & 0 deletions api/v1/postgres_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,11 @@ func (p *Postgres) ToUnstructuredZalandoPostgresql(z *zalando.Postgresql, c *cor
z.Spec.Volume.Size = p.Spec.Size.StorageSize
z.Spec.Volume.StorageClass = sc

// TODO make configurable?
z.Spec.Patroni.TTL = 130
z.Spec.Patroni.SynchronousMode = true
z.Spec.Patroni.SynchronousModeStrict = false

// skip if the configmap does not exist
if c != nil {
z.Spec.AdditionalVolumes = p.buildAdditionalVolumes(c)
Expand Down
4 changes: 2 additions & 2 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ var _ = BeforeSuite(func() {
filepath.Join(externalYAMLDir, "svc-postgres-operator.yaml"),
scheme,
cr.Log.WithName("OperatorManager"),
"test-psp")
operatormanager.Options{PspName: "test-psp"})
Expect(err).ToNot(HaveOccurred())

Expect((&PostgresReconciler{
Expand All @@ -126,7 +126,7 @@ var _ = BeforeSuite(func() {
PartitionID: "sample-partition",
Tenant: "sample-tenant",
OperatorManager: opMgr,
LBManager: lbmanager.New(svcClusterMgr.GetClient(), "127.0.0.1", int32(32000), int32(8000)),
LBManager: lbmanager.New(svcClusterMgr.GetClient(), lbmanager.Options{LBIP: "127.0.0.1", PortRangeStart: int32(32000), PortRangeSize: int32(8000)}),
Log: cr.Log.WithName("controllers").WithName("Postgres"),
}).SetupWithManager(ctrlClusterMgr)).Should(Succeed())

Expand Down
36 changes: 32 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ const (
portRangeSizeFlg = "port-range-size"
customPSPNameFlg = "custom-psp-name"
storageClassFlg = "storage-class"
postgresImageFlg = "postgres-image"
etcdHostFlg = "etcd-host"
crdValidationFlg = "enable-crd-validation"
operatorImageFlg = "operator-image"
)

var (
Expand All @@ -64,8 +68,8 @@ func init() {
}

func main() {
var metricsAddrCtrlMgr, metricsAddrSvcMgr, partitionID, tenant, ctrlClusterKubeconfig, pspName, lbIP, storageClass string
var enableLeaderElection bool
var metricsAddrCtrlMgr, metricsAddrSvcMgr, partitionID, tenant, ctrlClusterKubeconfig, pspName, lbIP, storageClass, postgresImage, etcdHost, operatorImage string
var enableLeaderElection, enableCRDValidation bool
var portRangeStart, portRangeSize int

// TODO enable Prefix and update helm chart
Expand Down Expand Up @@ -111,6 +115,14 @@ func main() {

storageClass = viper.GetString(storageClassFlg)

operatorImage = viper.GetString(operatorImageFlg)
postgresImage = viper.GetString(postgresImageFlg)

etcdHost = viper.GetString(etcdHostFlg)

viper.SetDefault(crdValidationFlg, true)
enableCRDValidation = viper.GetBool(crdValidationFlg)

ctrl.SetLogger(zap.New(zap.UseDevMode(true)))

ctrl.Log.Info("flag",
Expand All @@ -125,6 +137,10 @@ func main() {
portRangeSizeFlg, portRangeSize,
customPSPNameFlg, pspName,
storageClassFlg, storageClass,
operatorImageFlg, operatorImage,
postgresImageFlg, postgresImage,
etcdHostFlg, etcdHost,
crdValidationFlg, enableCRDValidation,
)

svcClusterConf := ctrl.GetConfigOrDie()
Expand Down Expand Up @@ -157,12 +173,24 @@ func main() {
os.Exit(1)
}

opMgr, err := operatormanager.New(svcClusterConf, "external/svc-postgres-operator.yaml", scheme, ctrl.Log.WithName("OperatorManager"), pspName)
var opMgrOpts operatormanager.Options = operatormanager.Options{
PspName: pspName,
OperatorImage: operatorImage,
DockerImage: postgresImage,
EtcdHost: etcdHost,
CRDValidation: enableCRDValidation,
}
opMgr, err := operatormanager.New(svcClusterConf, "external/svc-postgres-operator.yaml", scheme, ctrl.Log.WithName("OperatorManager"), opMgrOpts)
if err != nil {
setupLog.Error(err, "unable to create `OperatorManager`")
os.Exit(1)
}

var lbMgrOpts lbmanager.Options = lbmanager.Options{
LBIP: lbIP,
PortRangeStart: int32(portRangeStart),
PortRangeSize: int32(portRangeSize),
}
if err = (&controllers.PostgresReconciler{
CtrlClient: ctrlPlaneClusterMgr.GetClient(),
SvcClient: svcClusterMgr.GetClient(),
Expand All @@ -172,7 +200,7 @@ func main() {
Tenant: tenant,
StorageClass: storageClass,
OperatorManager: opMgr,
LBManager: lbmanager.New(svcClusterMgr.GetClient(), lbIP, int32(portRangeStart), int32(portRangeSize)),
LBManager: lbmanager.New(svcClusterMgr.GetClient(), lbMgrOpts),
}).SetupWithManager(ctrlPlaneClusterMgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Postgres")
os.Exit(1)
Expand Down
26 changes: 14 additions & 12 deletions pkg/lbmanager/lbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,23 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

// LBManager Responsible for the creation and deletion of externally accessible Services to access the Postgresql clusters managed by the Postgreslet.
type LBManager struct {
client.Client
type Options struct {
LBIP string
PortRangeStart int32
PortRangeSize int32
}

// LBManager Responsible for the creation and deletion of externally accessible Services to access the Postgresql clusters managed by the Postgreslet.
type LBManager struct {
client.Client
options Options
}

// New Creates a new LBManager with the given configuration
func New(client client.Client, lbIP string, portRangeStart, portRangeSize int32) *LBManager {
func New(client client.Client, opt Options) *LBManager {
return &LBManager{
Client: client,
LBIP: lbIP,
PortRangeStart: portRangeStart,
PortRangeSize: portRangeSize,
Client: client,
options: opt,
}
}

Expand All @@ -44,9 +46,9 @@ func (m *LBManager) CreateSvcLBIfNone(ctx context.Context, in *api.Postgres) err
return fmt.Errorf("failed to get a free port for creating Service of type LoadBalancer: %w", err)
}
var lbIPToUse string
if m.LBIP != "" {
if m.options.LBIP != "" {
// a specific IP was configured in the config, so use that one
lbIPToUse = m.LBIP
lbIPToUse = m.options.LBIP
} else if existingLBIP != "" {
// no ip was configured, but one is already in use, so use the existing one
lbIPToUse = existingLBIP
Expand Down Expand Up @@ -88,7 +90,7 @@ func (m *LBManager) nextFreeSocket(ctx context.Context) (string, int32, error) {

// If there are none, this will be the first (managed) service we create, so start with PortRangeStart and return
if len(lbs.Items) == 0 {
return anyExistingLBIP, m.PortRangeStart, nil
return anyExistingLBIP, m.options.PortRangeStart, nil
}

// If there are already any managed services, store all the used ports in a slice.
Expand All @@ -109,7 +111,7 @@ func (m *LBManager) nextFreeSocket(ctx context.Context) (string, int32, error) {
// Now try all ports in the configured port range to find a free one.
// While not as effective as other implementations, this allows us to freely change PortRangeStart and PortRangeSize
// retroactively without breaking the implementation.
for port := m.PortRangeStart; port < m.PortRangeStart+m.PortRangeSize; port++ {
for port := m.options.PortRangeStart; port < m.options.PortRangeStart+m.options.PortRangeSize; port++ {
if containsElem(portsInUse, port) {
// Port already in use, try the next one
continue
Expand Down
50 changes: 30 additions & 20 deletions pkg/lbmanager/lbmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,54 +24,64 @@ func TestLBManager_nextFreePort(t *testing.T) {
{
name: "no svc in the cluster",
lbMgr: &LBManager{
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts()).Build(),
LBIP: "0.0.0.0",
PortRangeStart: portRangeStart,
PortRangeSize: portRangeSize,
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts()).Build(),
options: Options{
LBIP: "0.0.0.0",
PortRangeStart: portRangeStart,
PortRangeSize: portRangeSize,
},
},
portWant: 0,
wantErr: false,
},
{
name: "one svc already in the cluster",
lbMgr: &LBManager{
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts(0)).Build(),
LBIP: "0.0.0.0",
PortRangeStart: portRangeStart,
PortRangeSize: portRangeSize,
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts(0)).Build(),
options: Options{
LBIP: "0.0.0.0",
PortRangeStart: portRangeStart,
PortRangeSize: portRangeSize,
},
},
portWant: 1,
wantErr: false,
},
{
name: "last free port left",
lbMgr: &LBManager{
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts(0, 1, 2, 3)).Build(),
LBIP: "0.0.0.0",
PortRangeStart: portRangeStart,
PortRangeSize: portRangeSize,
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts(0, 1, 2, 3)).Build(),
options: Options{
LBIP: "0.0.0.0",
PortRangeStart: portRangeStart,
PortRangeSize: portRangeSize,
},
},
portWant: 4,
wantErr: false,
},
{
name: "no free port",
lbMgr: &LBManager{
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts(0, 1, 2, 3, 4)).Build(),
LBIP: "0.0.0.0",
PortRangeStart: portRangeStart,
PortRangeSize: portRangeSize,
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts(0, 1, 2, 3, 4)).Build(),
options: Options{
LBIP: "0.0.0.0",
PortRangeStart: portRangeStart,
PortRangeSize: portRangeSize,
},
},
portWant: 0,
wantErr: true,
},
{
name: "re-use releaased port",
lbMgr: &LBManager{
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts(0, 2, 3)).Build(),
LBIP: "0.0.0.0",
PortRangeStart: portRangeStart,
PortRangeSize: portRangeSize,
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts(0, 2, 3)).Build(),
options: Options{
LBIP: "0.0.0.0",
PortRangeStart: portRangeStart,
PortRangeSize: portRangeSize,
},
},
portWant: 1,
wantErr: false,
Expand Down
39 changes: 32 additions & 7 deletions pkg/operatormanager/operatormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ const (
// operatorPodMatchingLabels is for listing operator pods
var operatorPodMatchingLabels = client.MatchingLabels{operatorPodLabelName: operatorPodLabelValue}

// Options
type Options struct {
PspName string
OperatorImage string
DockerImage string
EtcdHost string
CRDValidation bool
}

// OperatorManager manages the operator
type OperatorManager struct {
client.Client
Expand All @@ -67,13 +76,13 @@ type OperatorManager struct {
log logr.Logger
meta.MetadataAccessor
*runtime.Scheme
pspName string
options Options
}

// New creates a new `OperatorManager`
func New(conf *rest.Config, fileName string, scheme *runtime.Scheme, log logr.Logger, pspName string) (*OperatorManager, error) {
func New(confRest *rest.Config, fileName string, scheme *runtime.Scheme, log logr.Logger, opt Options) (*OperatorManager, error) {
// Use no-cache client to avoid waiting for cashing.
client, err := client.New(conf, client.Options{
client, err := client.New(confRest, client.Options{
Scheme: scheme,
})
if err != nil {
Expand All @@ -100,7 +109,7 @@ func New(conf *rest.Config, fileName string, scheme *runtime.Scheme, log logr.Lo
list: list,
Scheme: scheme,
log: log,
pspName: pspName,
options: opt,
}, nil
}

Expand Down Expand Up @@ -284,7 +293,7 @@ func (m *OperatorManager) createNewClientObject(ctx context.Context, obj client.
APIGroups: []string{"extensions"},
Verbs: []string{"use"},
Resources: []string{"podsecuritypolicies"},
ResourceNames: []string{m.pspName},
ResourceNames: []string{m.options.PspName},
}
v.Rules = append(v.Rules, pspPolicyRule)

Expand Down Expand Up @@ -339,7 +348,7 @@ func (m *OperatorManager) createNewClientObject(ctx context.Context, obj client.
return nil
case *v1.ConfigMap:
m.log.Info("handling ConfigMap")
m.editConfigMap(v, namespace)
m.editConfigMap(v, namespace, m.options)
err = m.Get(ctx, key, &v1.ConfigMap{})
case *v1.Service:
m.log.Info("handling Service")
Expand All @@ -353,6 +362,12 @@ func (m *OperatorManager) createNewClientObject(ctx context.Context, obj client.
}
case *appsv1.Deployment:
m.log.Info("handling Deployment")
if len(v.Spec.Template.Spec.Containers) != 1 {
m.log.Info("Unexpected number of containers in deployment, ignoring.")
} else if m.options.OperatorImage != "" {
m.log.Info("Patching operator image", "image", m.options.OperatorImage)
v.Spec.Template.Spec.Containers[0].Image = m.options.OperatorImage
}
err = m.Get(ctx, key, &appsv1.Deployment{})
default:
return errs.New("unknown `client.Object`")
Expand Down Expand Up @@ -381,7 +396,7 @@ func (m *OperatorManager) createNewClientObject(ctx context.Context, obj client.
}

// editConfigMap adds info to cm
func (m *OperatorManager) editConfigMap(cm *v1.ConfigMap, namespace string) {
func (m *OperatorManager) editConfigMap(cm *v1.ConfigMap, namespace string, options Options) {
cm.Data["watched_namespace"] = namespace
// TODO don't use the same serviceaccount for operator and databases, see #88
cm.Data["pod_service_account_name"] = serviceAccountName
Expand All @@ -391,6 +406,16 @@ func (m *OperatorManager) editConfigMap(cm *v1.ConfigMap, namespace string) {
s := []string{pg.TenantLabelName, pg.ProjectIDLabelName, pg.UIDLabelName, pg.NameLabelName}
// TODO maybe use a precompiled string here
cm.Data["inherited_labels"] = strings.Join(s, ",")

if options.DockerImage != "" {
cm.Data["docker_image"] = options.DockerImage
}

if options.EtcdHost != "" {
cm.Data["etcd_host"] = options.EtcdHost
}

cm.Data["enable_crd_validation"] = strconv.FormatBool(options.CRDValidation)
}

// ensureCleanMetadata ensures obj has clean metadata
Expand Down

0 comments on commit ee4b0cb

Please sign in to comment.