Skip to content

Commit

Permalink
227 standby clusters (#289)
Browse files Browse the repository at this point in the history
  • Loading branch information
eberlep authored Dec 3, 2021
1 parent 293bfa9 commit fa2e82b
Show file tree
Hide file tree
Showing 12 changed files with 685 additions and 97 deletions.
116 changes: 116 additions & 0 deletions api/v1/postgres_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ const (
BackupConfigKey = "config"
// SharedBufferParameterKey defines the key under which the shared buffer size is stored in the parameters map. Defined by the postgres-operator/patroni
SharedBufferParameterKey = "shared_buffers"
// StandbyKey defines the key under which the standby configuration is stored in the CR. Defined by the postgres-operator/patroni
StandbyKey = "standby"
StandbyMethod = "streaming_host"

teamIDPrefix = "pg"
)
Expand Down Expand Up @@ -152,6 +155,9 @@ type PostgresSpec struct {
// BackupSecretRef reference to the secret where the backup credentials are stored
BackupSecretRef string `json:"backupSecretRef,omitempty"`

// PostgresConnection Connection info of a streaming host, independant of the current role (leader or standby)
PostgresConnection *PostgresConnection `json:"connection,omitempty"`

// AuditLogs enable or disable default audit logs
AuditLogs *bool `json:"auditLogs,omitempty"`

Expand Down Expand Up @@ -207,6 +213,22 @@ type PostgresList struct {
Items []Postgres `json:"items"`
}

// PostgresConnection A remote postgres instance this one is linked to, e.g. for standby purpouses.
type PostgresConnection struct {
// ConnectedPostgresID internal ID of the connected Postgres instance
ConnectedPostgresID string `json:"postgresID,omitempty"`
// ConnectionSecretName name of the internal secret used to connect to the remote postgres
ConnectionSecretName string `json:"secretName,omitempty"`
// ConnectionIP IP of the remote postgres
ConnectionIP string `json:"ip,omitempty"`
// ConnectionPort port of the remote postgres
ConnectionPort uint16 `json:"port,omitempty"`
// SynchronousReplication determines if async or sync replication is used for the standby postgres
SynchronousReplication bool `json:"synchronous,omitempty"`
// ReplicationPrimary determines if THIS side of the connection is the primary or the standby side
ReplicationPrimary bool `json:"localSideIsPrimary,omitempty"`
}

var SvcLoadBalancerLabel = map[string]string{
ManagedByLabelName: ManagedByLabelValue,
}
Expand Down Expand Up @@ -519,6 +541,22 @@ func (p *Postgres) ToUnstructuredZalandoPostgresql(z *zalando.Postgresql, c *cor
z.Spec.AllowedSourceRanges = p.Spec.AccessList.SourceRanges
}

// Enable replication (using unstructured json)
if p.IsReplicationPrimary() {
// delete field
z.Spec.StandbyCluster = nil
} else {
// overwrite connection info
z.Spec.StandbyCluster = &zalando.StandbyDescription{
StandbyMethod: StandbyMethod,
StandbyHost: p.Spec.PostgresConnection.ConnectionIP,
StandbyPort: strconv.FormatInt(int64(p.Spec.PostgresConnection.ConnectionPort), 10),
StandbySecretName: "standby." + p.ToPeripheralResourceName() + ".credentials",
S3WalPath: "",
StandbyApplicationName: p.ObjectMeta.Name,
}
}

jsonZ, err := runtime.DefaultUnstructuredConverter.ToUnstructured(z)
if err != nil {
return nil, fmt.Errorf("failed to convert to unstructured zalando postgresql: %w", err)
Expand Down Expand Up @@ -656,6 +694,14 @@ func setSharedBufferSize(parameters map[string]string, shmSize string) {
}
}

func (p *Postgres) IsReplicationPrimary() bool {
if p.Spec.PostgresConnection == nil || p.Spec.PostgresConnection.ReplicationPrimary {
// nothing is configured, or we are the leader. nothing to do.
return true
}
return false
}

// enableAuditLogs configures this postgres instances audit logging
func enableAuditLogs(parameters map[string]string) {
// default values: bg_mon,pg_stat_statements,pgextwlist,pg_auth_mon,set_user,timescaledb,pg_cron,pg_stat_kcache
Expand All @@ -676,3 +722,73 @@ func setPostgresParams(parameters map[string]string, providedParams map[string]s
parameters[k] = v
}
}

func (p *Postgres) ToStandbyClusterIngresCWNPName() string {
return p.ToPeripheralResourceName() + "-standby-ingress"
}
func (p *Postgres) ToStandbyClusterEgresCWNPName() string {
return p.ToPeripheralResourceName() + "-standby-egress"
}

func (p *Postgres) ToStandbyClusterIngressCWNP(sourceCIDRs []string) (*firewall.ClusterwideNetworkPolicy, error) {

standbyIngressCWNPName := p.ToStandbyClusterIngresCWNPName()
standbyIngressCWNP := &firewall.ClusterwideNetworkPolicy{ObjectMeta: metav1.ObjectMeta{Name: standbyIngressCWNPName, Namespace: firewall.ClusterwideNetworkPolicyNamespace}}

//
// Create ingress rule from pre-configured CIDR to this DBs port
//
standbyClusterIngressIPBlocks := []networkingv1.IPBlock{}
for _, cidr := range sourceCIDRs {
remoteServiceClusterCIDR, err := netaddr.ParseIPPrefix(cidr)
if err != nil {
return nil, fmt.Errorf("unable to parse standby host ip %s: %w", p.Spec.PostgresConnection.ConnectionIP, err)
}
standbyClusterIPs := networkingv1.IPBlock{
CIDR: remoteServiceClusterCIDR.String(),
}
standbyClusterIngressIPBlocks = append(standbyClusterIngressIPBlocks, standbyClusterIPs)
}

// Add Port to CWNP (if known)
tcp := corev1.ProtocolTCP
ingressTargetPorts := []networkingv1.NetworkPolicyPort{}
if p.Status.Socket.Port != 0 {
portObj := intstr.FromInt(int(p.Status.Socket.Port))
ingressTargetPorts = append(ingressTargetPorts, networkingv1.NetworkPolicyPort{Port: &portObj, Protocol: &tcp})
}
standbyIngressCWNP.Spec.Ingress = []firewall.IngressRule{
{Ports: ingressTargetPorts, From: standbyClusterIngressIPBlocks},
}

return standbyIngressCWNP, nil
}

func (p *Postgres) ToStandbyClusterEgressCWNP() (*firewall.ClusterwideNetworkPolicy, error) {
standbyClusterEgressIPBlocks := []networkingv1.IPBlock{}
if p.Spec.PostgresConnection.ConnectionIP != "" {
remoteServiceClusterCIDR, err := netaddr.ParseIPPrefix(p.Spec.PostgresConnection.ConnectionIP + "/32")
if err != nil {
return nil, fmt.Errorf("unable to parse standby host ip %s: %w", p.Spec.PostgresConnection.ConnectionIP, err)
}
standbyClusterIPs := networkingv1.IPBlock{
CIDR: remoteServiceClusterCIDR.String(),
}
standbyClusterEgressIPBlocks = append(standbyClusterEgressIPBlocks, standbyClusterIPs)
}

tcp := corev1.ProtocolTCP
standbyEgressCWNPName := p.ToStandbyClusterEgresCWNPName()
standbyEgressCWNP := &firewall.ClusterwideNetworkPolicy{ObjectMeta: metav1.ObjectMeta{Name: standbyEgressCWNPName, Namespace: firewall.ClusterwideNetworkPolicyNamespace}}
// Add Port to CWNP
egressTargetPorts := []networkingv1.NetworkPolicyPort{}
if p.Spec.PostgresConnection.ConnectionPort != 0 {
portObj := intstr.FromInt(int(p.Spec.PostgresConnection.ConnectionPort))
egressTargetPorts = append(egressTargetPorts, networkingv1.NetworkPolicyPort{Port: &portObj, Protocol: &tcp})
}
standbyEgressCWNP.Spec.Egress = []firewall.EgressRule{
{Ports: egressTargetPorts, To: standbyClusterEgressIPBlocks},
}

return standbyEgressCWNP, nil
}
20 changes: 20 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions config/crd/bases/database.fits.cloud_postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,33 @@ spec:
description: BackupSecretRef reference to the secret where the backup
credentials are stored
type: string
connection:
description: PostgresConnection Connection info of a streaming host,
independant of the current role (leader or standby)
properties:
ip:
description: ConnectionIP IP of the remote postgres
type: string
localSideIsPrimary:
description: ReplicationPrimary determines if THIS side of the
connection is the primary or the standby side
type: boolean
port:
description: ConnectionPort port of the remote postgres
type: integer
postgresID:
description: ConnectedPostgresID internal ID of the connected
Postgres instance
type: string
secretName:
description: ConnectionSecretName name of the internal secret
used to connect to the remote postgres
type: string
synchronous:
description: SynchronousReplication determines if async or sync
replication is used for the standby postgres
type: boolean
type: object
description:
description: Description
type: string
Expand Down
28 changes: 28 additions & 0 deletions config/samples/standby-a.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
apiVersion: database.fits.cloud/v1
kind: Postgres
metadata:
namespace: metal-extension-cloud
name: standby-a
spec:
accessList:
sourceRanges:
- 1.2.3.4/24
- 1.2.4.4/24
maintenance:
- "Sun:21:00-22:00"
numberOfInstances: 1
partitionID: sample-partition
projectID: sample-project
size:
cpu: 500m
memory: 512Mi
storageSize: 1Gi
tenant: sample-tenant
version: "12"
# connection:
# localSideIsPrimary: true
# synchronous: false
# postgresID: standby-b
# secretName: "standby-b-passwords"
# ip: "10.96.147.227" # manually add the current ip of the standby-b postgres cluster for local dev
# port: 5432
28 changes: 28 additions & 0 deletions config/samples/standby-b.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
apiVersion: database.fits.cloud/v1
kind: Postgres
metadata:
namespace: metal-extension-cloud
name: standby-b
spec:
accessList:
sourceRanges:
- 1.2.3.4/24
maintenance:
- "Sun:21:00-22:00"
numberOfInstances: 1
partitionID: sample-partition
projectID: sample-project
size:
cpu: 500m
memory: 512Mi
storageSize: 1Gi
tenant: sample-tenant
version: "12"
connection:
localSideIsPrimary: false
synchronous: false
postgresID: standby-a
secretName: "standby-a-passwords"
ip: "10.96.48.18" # manually add the current ip of the standby-a postgres cluster for local dev
port: 5432

Loading

0 comments on commit fa2e82b

Please sign in to comment.