-
Notifications
You must be signed in to change notification settings - Fork 242
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #3545 from jingyih/managed_kafka
feat: add ManagedKafkaCluster as a direct resource
- Loading branch information
Showing
23 changed files
with
3,283 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
// Copyright 2024 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package v1alpha1 | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strings" | ||
|
||
"github.com/GoogleCloudPlatform/k8s-config-connector/apis/common" | ||
refsv1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/apis/refs/v1beta1" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
) | ||
|
||
// ClusterIdentity defines the resource reference to ManagedKafkaCluster, which "External" field | ||
// holds the GCP identifier for the KRM object. | ||
type ClusterIdentity struct { | ||
parent *ClusterParent | ||
id string | ||
} | ||
|
||
func (i *ClusterIdentity) String() string { | ||
return i.parent.String() + "/clusters/" + i.id | ||
} | ||
|
||
func (i *ClusterIdentity) ID() string { | ||
return i.id | ||
} | ||
|
||
func (i *ClusterIdentity) Parent() *ClusterParent { | ||
return i.parent | ||
} | ||
|
||
type ClusterParent struct { | ||
ProjectID string | ||
Location string | ||
} | ||
|
||
func (p *ClusterParent) String() string { | ||
return "projects/" + p.ProjectID + "/locations/" + p.Location | ||
} | ||
|
||
// New builds a ClusterIdentity from the Config Connector Cluster object. | ||
func NewClusterIdentity(ctx context.Context, reader client.Reader, obj *ManagedKafkaCluster) (*ClusterIdentity, error) { | ||
// Get Parent | ||
projectRef, err := refsv1beta1.ResolveProject(ctx, reader, obj.GetNamespace(), obj.Spec.ProjectRef) | ||
if err != nil { | ||
return nil, err | ||
} | ||
projectID := projectRef.ProjectID | ||
if projectID == "" { | ||
return nil, fmt.Errorf("cannot resolve project") | ||
} | ||
location := obj.Spec.Location | ||
|
||
// Get desired ID | ||
resourceID := common.ValueOf(obj.Spec.ResourceID) | ||
if resourceID == "" { | ||
resourceID = obj.GetName() | ||
} | ||
if resourceID == "" { | ||
return nil, fmt.Errorf("cannot resolve resource ID") | ||
} | ||
|
||
// Use approved External | ||
externalRef := common.ValueOf(obj.Status.ExternalRef) | ||
if externalRef != "" { | ||
// Validate desired with actual | ||
actualParent, actualResourceID, err := ParseClusterExternal(externalRef) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if actualParent.ProjectID != projectID { | ||
return nil, fmt.Errorf("spec.projectRef changed, expect %s, got %s", actualParent.ProjectID, projectID) | ||
} | ||
if actualParent.Location != location { | ||
return nil, fmt.Errorf("spec.location changed, expect %s, got %s", actualParent.Location, location) | ||
} | ||
if actualResourceID != resourceID { | ||
return nil, fmt.Errorf("cannot reset `metadata.name` or `spec.resourceID` to %s, since it has already assigned to %s", | ||
resourceID, actualResourceID) | ||
} | ||
} | ||
return &ClusterIdentity{ | ||
parent: &ClusterParent{ | ||
ProjectID: projectID, | ||
Location: location, | ||
}, | ||
id: resourceID, | ||
}, nil | ||
} | ||
|
||
func ParseClusterExternal(external string) (parent *ClusterParent, resourceID string, err error) { | ||
tokens := strings.Split(external, "/") | ||
if len(tokens) != 6 || tokens[0] != "projects" || tokens[2] != "locations" || tokens[4] != "clusters" { | ||
return nil, "", fmt.Errorf("format of ManagedKafkaCluster external=%q was not known (use projects/{{projectID}}/locations/{{location}}/clusters/{{clusterID}})", external) | ||
} | ||
parent = &ClusterParent{ | ||
ProjectID: tokens[1], | ||
Location: tokens[3], | ||
} | ||
resourceID = tokens[5] | ||
return parent, resourceID, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
// Copyright 2024 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package v1alpha1 | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
refsv1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/apis/refs/v1beta1" | ||
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s" | ||
apierrors "k8s.io/apimachinery/pkg/api/errors" | ||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||
"k8s.io/apimachinery/pkg/types" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
) | ||
|
||
var _ refsv1beta1.ExternalNormalizer = &ClusterRef{} | ||
|
||
// ClusterRef defines the resource reference to ManagedKafkaCluster, which "External" field | ||
// holds the GCP identifier for the KRM object. | ||
type ClusterRef struct { | ||
// A reference to an externally managed ManagedKafkaCluster resource. | ||
// Should be in the format "projects/{{projectID}}/locations/{{location}}/clusters/{{clusterID}}". | ||
External string `json:"external,omitempty"` | ||
|
||
// The name of a ManagedKafkaCluster resource. | ||
Name string `json:"name,omitempty"` | ||
|
||
// The namespace of a ManagedKafkaCluster resource. | ||
Namespace string `json:"namespace,omitempty"` | ||
} | ||
|
||
// NormalizedExternal provision the "External" value for other resource that depends on ManagedKafkaCluster. | ||
// If the "External" is given in the other resource's spec.ManagedKafkaClusterRef, the given value will be used. | ||
// Otherwise, the "Name" and "Namespace" will be used to query the actual ManagedKafkaCluster object from the cluster. | ||
func (r *ClusterRef) NormalizedExternal(ctx context.Context, reader client.Reader, otherNamespace string) (string, error) { | ||
if r.External != "" && r.Name != "" { | ||
return "", fmt.Errorf("cannot specify both name and external on %s reference", ManagedKafkaClusterGVK.Kind) | ||
} | ||
// From given External | ||
if r.External != "" { | ||
if _, _, err := ParseClusterExternal(r.External); err != nil { | ||
return "", err | ||
} | ||
return r.External, nil | ||
} | ||
|
||
// From the Config Connector object | ||
if r.Namespace == "" { | ||
r.Namespace = otherNamespace | ||
} | ||
key := types.NamespacedName{Name: r.Name, Namespace: r.Namespace} | ||
u := &unstructured.Unstructured{} | ||
u.SetGroupVersionKind(ManagedKafkaClusterGVK) | ||
if err := reader.Get(ctx, key, u); err != nil { | ||
if apierrors.IsNotFound(err) { | ||
return "", k8s.NewReferenceNotFoundError(u.GroupVersionKind(), key) | ||
} | ||
return "", fmt.Errorf("reading referenced %s %s: %w", ManagedKafkaClusterGVK, key, err) | ||
} | ||
// Get external from status.externalRef. This is the most trustworthy place. | ||
actualExternalRef, _, err := unstructured.NestedString(u.Object, "status", "externalRef") | ||
if err != nil { | ||
return "", fmt.Errorf("reading status.externalRef: %w", err) | ||
} | ||
if actualExternalRef == "" { | ||
return "", k8s.NewReferenceNotReadyError(u.GroupVersionKind(), key) | ||
} | ||
r.External = actualExternalRef | ||
return r.External, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,192 @@ | ||
// Copyright 2024 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package v1alpha1 | ||
|
||
import ( | ||
refs "github.com/GoogleCloudPlatform/k8s-config-connector/apis/refs/v1beta1" | ||
commonv1alpha1 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/common/v1alpha1" | ||
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/k8s/v1alpha1" | ||
|
||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
) | ||
|
||
var ManagedKafkaClusterGVK = GroupVersion.WithKind("ManagedKafkaCluster") | ||
|
||
// +kcc:proto=google.cloud.managedkafka.v1.AccessConfig | ||
type AccessConfig struct { | ||
// Required. Virtual Private Cloud (VPC) networks that must be granted direct | ||
// access to the Kafka cluster. Minimum of 1 network is required. Maximum 10 | ||
// networks can be specified. | ||
// +kcc:proto:field=google.cloud.managedkafka.v1.AccessConfig.network_configs | ||
// +required | ||
NetworkConfigs []NetworkConfig `json:"networkConfigs,omitempty"` | ||
} | ||
|
||
// +kcc:proto=google.cloud.managedkafka.v1.CapacityConfig | ||
type CapacityConfig struct { | ||
// Required. The number of vCPUs to provision for the cluster. Minimum: 3. | ||
// +kcc:proto:field=google.cloud.managedkafka.v1.CapacityConfig.vcpu_count | ||
// +required | ||
VcpuCount *int64 `json:"vcpuCount,omitempty"` | ||
|
||
// Required. The memory to provision for the cluster in bytes. | ||
// The CPU:memory ratio (vCPU:GiB) must be between 1:1 and 1:8. | ||
// Minimum: 3221225472 (3 GiB). | ||
// +kcc:proto:field=google.cloud.managedkafka.v1.CapacityConfig.memory_bytes | ||
// +required | ||
MemoryBytes *int64 `json:"memoryBytes,omitempty"` | ||
} | ||
|
||
// +kcc:proto=google.cloud.managedkafka.v1.NetworkConfig | ||
type NetworkConfig struct { | ||
// Required. Reference to the VPC subnet in which to create Private Service Connect | ||
// (PSC) endpoints for the Kafka brokers and bootstrap address. | ||
// | ||
// The subnet must be located in the same region as the Kafka cluster. The | ||
// project may differ. Multiple subnets from the same parent network must not | ||
// be specified. | ||
// | ||
// The CIDR range of the subnet must be within the IPv4 address ranges for | ||
// private networks, as specified in RFC 1918. | ||
// +kcc:proto:field=google.cloud.managedkafka.v1.NetworkConfig.subnet | ||
// +required | ||
SubnetworkRef *refs.ComputeSubnetworkRef `json:"subnetworkRef"` | ||
} | ||
|
||
// +kcc:proto=google.cloud.managedkafka.v1.GcpConfig | ||
type GcpConfig struct { | ||
// Required. Access configuration for the Kafka cluster. | ||
// +kcc:proto:field=google.cloud.managedkafka.v1.GcpConfig.access_config | ||
// +required | ||
AccessConfig *AccessConfig `json:"accessConfig,omitempty"` | ||
|
||
// Optional. Immutable. The Cloud KMS Key name to use for encryption. The key | ||
// must be located in the same region as the cluster and cannot be changed. | ||
// +kcc:proto:field=google.cloud.managedkafka.v1.GcpConfig.kms_key | ||
KmsKeyRef *refs.KMSCryptoKeyRef `json:"kmsKeyRef,omitempty"` | ||
} | ||
|
||
// ManagedKafkaClusterSpec defines the desired state of ManagedKafkaCluster | ||
// +kcc:proto=google.cloud.managedkafka.v1.Cluster | ||
type ManagedKafkaClusterSpec struct { | ||
commonv1alpha1.CommonSpec `json:",inline"` | ||
|
||
// +required | ||
Location string `json:"location"` | ||
|
||
// The ManagedKafkaCluster name. If not given, the metadata.name will be used. | ||
ResourceID *string `json:"resourceID,omitempty"` | ||
|
||
// Required. Configuration properties for a Kafka cluster deployed to Google | ||
// Cloud Platform. | ||
// +kcc:proto:field=google.cloud.managedkafka.v1.Cluster.gcp_config | ||
// +required | ||
GcpConfig *GcpConfig `json:"gcpConfig,omitempty"` | ||
|
||
// Optional. Labels as key value pairs. | ||
// +kcc:proto:field=google.cloud.managedkafka.v1.Cluster.labels | ||
Labels map[string]string `json:"labels,omitempty"` | ||
|
||
// Required. Capacity configuration for the Kafka cluster. | ||
// +kcc:proto:field=google.cloud.managedkafka.v1.Cluster.capacity_config | ||
// +required | ||
CapacityConfig *CapacityConfig `json:"capacityConfig,omitempty"` | ||
|
||
// Optional. Rebalance configuration for the Kafka cluster. | ||
// +kcc:proto:field=google.cloud.managedkafka.v1.Cluster.rebalance_config | ||
RebalanceConfig *RebalanceConfig `json:"rebalanceConfig,omitempty"` | ||
} | ||
|
||
// ManagedKafkaClusterStatus defines the config connector machine state of ManagedKafkaCluster | ||
type ManagedKafkaClusterStatus struct { | ||
/* Conditions represent the latest available observations of the | ||
object's current state. */ | ||
Conditions []v1alpha1.Condition `json:"conditions,omitempty"` | ||
|
||
// ObservedGeneration is the generation of the resource that was most recently observed by the Config Connector controller. If this is equal to metadata.generation, then that means that the current reported status reflects the most recent desired state of the resource. | ||
ObservedGeneration *int64 `json:"observedGeneration,omitempty"` | ||
|
||
// A unique specifier for the ManagedKafkaCluster resource in GCP. | ||
ExternalRef *string `json:"externalRef,omitempty"` | ||
|
||
// ObservedState is the state of the resource as most recently observed in GCP. | ||
ObservedState *ManagedKafkaClusterObservedState `json:"observedState,omitempty"` | ||
} | ||
|
||
// ManagedKafkaClusterSpec defines the desired state of ManagedKafkaCluster | ||
// +kcc:proto=google.cloud.managedkafka.v1.Cluster | ||
// ManagedKafkaClusterObservedState is the state of the ManagedKafkaCluster resource as most recently observed in GCP. | ||
type ManagedKafkaClusterObservedState struct { | ||
// Identifier. The name of the cluster. Structured like: | ||
// projects/{project_number}/locations/{location}/clusters/{cluster_id} | ||
// +kcc:proto:field=google.cloud.managedkafka.v1.Cluster.name | ||
Name *string `json:"name,omitempty"` | ||
|
||
// Output only. The time when the cluster was created. | ||
// +kcc:proto:field=google.cloud.managedkafka.v1.Cluster.create_time | ||
CreateTime *string `json:"createTime,omitempty"` | ||
|
||
// Output only. The time when the cluster was last updated. | ||
// +kcc:proto:field=google.cloud.managedkafka.v1.Cluster.update_time | ||
UpdateTime *string `json:"updateTime,omitempty"` | ||
|
||
// Output only. The current state of the cluster. | ||
// +kcc:proto:field=google.cloud.managedkafka.v1.Cluster.state | ||
State *string `json:"state,omitempty"` | ||
|
||
// NOTYET | ||
// Output only. Reserved for future use. | ||
// +kcc:proto:field=google.cloud.managedkafka.v1.Cluster.satisfies_pzi | ||
// SatisfiesPzi *bool `json:"satisfiesPzi,omitempty"` | ||
|
||
// NOTYET | ||
// Output only. Reserved for future use. | ||
// +kcc:proto:field=google.cloud.managedkafka.v1.Cluster.satisfies_pzs | ||
// SatisfiesPzs *bool `json:"satisfiesPzs,omitempty"` | ||
} | ||
|
||
// +genclient | ||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object | ||
// TODO(user): make sure the pluralizaiton below is correct | ||
// +kubebuilder:resource:categories=gcp,shortName=gcpmanagedkafkacluster;gcpmanagedkafkaclusters | ||
// +kubebuilder:subresource:status | ||
// +kubebuilder:metadata:labels="cnrm.cloud.google.com/managed-by-kcc=true";"cnrm.cloud.google.com/system=true" | ||
// +kubebuilder:printcolumn:name="Age",JSONPath=".metadata.creationTimestamp",type="date" | ||
// +kubebuilder:printcolumn:name="Ready",JSONPath=".status.conditions[?(@.type=='Ready')].status",type="string",description="When 'True', the most recent reconcile of the resource succeeded" | ||
// +kubebuilder:printcolumn:name="Status",JSONPath=".status.conditions[?(@.type=='Ready')].reason",type="string",description="The reason for the value in 'Ready'" | ||
// +kubebuilder:printcolumn:name="Status Age",JSONPath=".status.conditions[?(@.type=='Ready')].lastTransitionTime",type="date",description="The last transition time for the value in 'Status'" | ||
|
||
// ManagedKafkaCluster is the Schema for the ManagedKafkaCluster API | ||
// +k8s:openapi-gen=true | ||
type ManagedKafkaCluster struct { | ||
metav1.TypeMeta `json:",inline"` | ||
metav1.ObjectMeta `json:"metadata,omitempty"` | ||
|
||
// +required | ||
Spec ManagedKafkaClusterSpec `json:"spec,omitempty"` | ||
Status ManagedKafkaClusterStatus `json:"status,omitempty"` | ||
} | ||
|
||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object | ||
// ManagedKafkaClusterList contains a list of ManagedKafkaCluster | ||
type ManagedKafkaClusterList struct { | ||
metav1.TypeMeta `json:",inline"` | ||
metav1.ListMeta `json:"metadata,omitempty"` | ||
Items []ManagedKafkaCluster `json:"items"` | ||
} | ||
|
||
func init() { | ||
SchemeBuilder.Register(&ManagedKafkaCluster{}, &ManagedKafkaClusterList{}) | ||
} |
Oops, something went wrong.