diff --git a/Makefile b/Makefile index afb51a421e6..8631749f04f 100644 --- a/Makefile +++ b/Makefile @@ -336,6 +336,20 @@ test-e2e-sharded-minimal: build-all $(SUITES_ARGS) \ $(if $(value WAIT),|| { echo "Terminated with $$?"; wait "$$PID"; },) +# This is just easy target to run 2 shard test server locally until manually killed. +# You can targer test to it by running: +# go test ./test/e2e/apibinding/... --kcp-kubeconfig=$(pwd)/.kcp/admin.kubeconfig --shard-kubeconfigs=root=$(pwd)/.kcp-0/admin.kubeconfig -run=^TestAPIBindingEndpointSlicesSharded$ +test-run-sharded-server: WORK_DIR ?= . +test-run-sharded-server: LOG_DIR ?= $(WORK_DIR)/.kcp +test-run-sharded-server: + mkdir -p "$(LOG_DIR)" "$(WORK_DIR)/.kcp" + rm -f "$(WORK_DIR)/.kcp/ready-to-test" + UNSAFE_E2E_HACK_DISABLE_ETCD_FSYNC=true NO_GORUN=1 ./bin/sharded-test-server --quiet --v=2 --log-dir-path="$(LOG_DIR)" --work-dir-path="$(WORK_DIR)" --shard-run-virtual-workspaces=false --shard-feature-gates=$(TEST_FEATURE_GATES) $(TEST_SERVER_ARGS) --number-of-shards=2 2>&1 & PID=$$!; echo "PID $$PID" && \ + trap 'kill -TERM $$PID' TERM INT EXIT && \ + while [ ! -f "$(WORK_DIR)/.kcp/ready-to-test" ]; do sleep 1; done && \ + echo 'Server started' && \ + wait $$PID + .PHONY: test ifdef USE_GOTESTSUM test: $(GOTESTSUM) diff --git a/config/crds/apis.kcp.io_apiexportendpointslices.yaml b/config/crds/apis.kcp.io_apiexportendpointslices.yaml index bca294c4a10..dbe1b2d2cf1 100644 --- a/config/crds/apis.kcp.io_apiexportendpointslices.yaml +++ b/config/crds/apis.kcp.io_apiexportendpointslices.yaml @@ -148,6 +148,15 @@ spec: - url type: object type: array + x-kubernetes-list-map-keys: + - url + x-kubernetes-list-type: map + shardSelector: + description: |- + shardSelector is the selector used to filter the shards. It is used to filter the shards + when determining partition scope when deriving the endpoints. This is set by owning shard, + and is used by follower shards to determine if its inscope or not. + type: string type: object type: object served: true diff --git a/pkg/authorization/bootstrap/policy.go b/pkg/authorization/bootstrap/policy.go index dadc0dbe013..ee234eee7ad 100644 --- a/pkg/authorization/bootstrap/policy.go +++ b/pkg/authorization/bootstrap/policy.go @@ -24,6 +24,7 @@ import ( rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest" "k8s.io/kubernetes/plugin/pkg/auth/authorizer/rbac/bootstrappolicy" + "github.com/kcp-dev/kcp/sdk/apis/apis" "github.com/kcp-dev/kcp/sdk/apis/core" "github.com/kcp-dev/kcp/sdk/apis/tenancy" ) @@ -101,6 +102,13 @@ func clusterRoles() []rbacv1.ClusterRole { rbacv1helpers.NewRule("access").URLs("/").RuleOrDie(), }, }, + { + ObjectMeta: metav1.ObjectMeta{Name: SystemExternalLogicalClusterAdmin}, + Rules: []rbacv1.PolicyRule{ + rbacv1helpers.NewRule("update", "patch", "get").Groups(apis.GroupName).Resources("apiexportendpointslices/status").RuleOrDie(), + rbacv1helpers.NewRule("get", "list", "watch").Groups(apis.GroupName).Resources("apiexportendpointslices").RuleOrDie(), + }, + }, } } diff --git a/pkg/indexers/apiexport.go b/pkg/indexers/apiexport.go index fb9f1c18a0c..41c7d5604f4 100644 --- a/pkg/indexers/apiexport.go +++ b/pkg/indexers/apiexport.go @@ -17,6 +17,8 @@ limitations under the License. package indexers import ( + "fmt" + kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" "github.com/kcp-dev/logicalcluster/v3" @@ -33,6 +35,8 @@ const ( // APIExportByClaimedIdentities is the indexer name for retrieving APIExports that have a permission claim for a // particular identity hash. APIExportByClaimedIdentities = "APIExportByClaimedIdentities" + // APIExportEndpointSliceByAPIExport is the indexer name for retrieving APIExportEndpointSlices by their APIExport's Reference Path and Name. + APIExportEndpointSliceByAPIExport = "APIExportEndpointSliceByAPIExport" ) // IndexAPIExportByIdentity is an index function that indexes an APIExport by its identity hash. @@ -72,3 +76,17 @@ func IndexAPIExportByClaimedIdentities(obj interface{}) ([]string, error) { } return sets.List[string](claimedIdentities), nil } + +// IndexAPIExportEndpointSliceByAPIExportFunc indexes the APIExportEndpointSlice by their APIExport's Reference Path and Name. +func IndexAPIExportEndpointSliceByAPIExport(obj interface{}) ([]string, error) { + apiExportEndpointSlice, ok := obj.(*apisv1alpha1.APIExportEndpointSlice) + if !ok { + return []string{}, fmt.Errorf("obj %T is not an APIExportEndpointSlice", obj) + } + + path := logicalcluster.NewPath(apiExportEndpointSlice.Spec.APIExport.Path) + if path.Empty() { + path = logicalcluster.From(apiExportEndpointSlice).Path() + } + return []string{path.Join(apiExportEndpointSlice.Spec.APIExport.Name).String()}, nil +} diff --git a/pkg/openapi/zz_generated.openapi.go b/pkg/openapi/zz_generated.openapi.go index d8dc9f7be05..5aa662f19c5 100644 --- a/pkg/openapi/zz_generated.openapi.go +++ b/pkg/openapi/zz_generated.openapi.go @@ -776,6 +776,14 @@ func schema_sdk_apis_apis_v1alpha1_APIExportEndpointSliceStatus(ref common.Refer }, }, "endpoints": { + VendorExtensible: spec.VendorExtensible{ + Extensions: spec.Extensions{ + "x-kubernetes-list-map-keys": []interface{}{ + "url", + }, + "x-kubernetes-list-type": "map", + }, + }, SchemaProps: spec.SchemaProps{ Description: "endpoints contains all the URLs of the APIExport service.", Type: []string{"array"}, @@ -789,6 +797,13 @@ func schema_sdk_apis_apis_v1alpha1_APIExportEndpointSliceStatus(ref common.Refer }, }, }, + "shardSelector": { + SchemaProps: spec.SchemaProps{ + Description: "shardSelector is the selector used to filter the shards. It is used to filter the shards when determining partition scope when deriving the endpoints. This is set by owning shard, and is used by follower shards to determine if its inscope or not.", + Type: []string{"string"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller.go b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller.go index 344c26cbb79..1101bfdf3a5 100644 --- a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller.go +++ b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller.go @@ -45,7 +45,7 @@ import ( topologyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/topology/v1alpha1" kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" apisv1alpha1client "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/typed/apis/v1alpha1" - apisinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/apis/v1alpha1" + apisv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/apis/v1alpha1" corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1" topologyinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/topology/v1alpha1" ) @@ -57,9 +57,9 @@ const ( // NewController returns a new controller for APIExportEndpointSlices. // Shards and APIExports are read from the cache server. func NewController( - apiExportEndpointSliceClusterInformer apisinformers.APIExportEndpointSliceClusterInformer, + apiExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer, globalShardClusterInformer corev1alpha1informers.ShardClusterInformer, - globalAPIExportClusterInformer apisinformers.APIExportClusterInformer, + globalAPIExportClusterInformer apisv1alpha1informers.APIExportClusterInformer, partitionClusterInformer topologyinformers.PartitionClusterInformer, kcpClusterClient kcpclientset.ClusterInterface, ) (*controller, error) { @@ -76,8 +76,8 @@ func NewController( listShards: func(selector labels.Selector) ([]*corev1alpha1.Shard, error) { return globalShardClusterInformer.Lister().List(selector) }, - getAPIExportEndpointSlice: func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExportEndpointSlice, error) { - return apiExportEndpointSliceClusterInformer.Lister().Cluster(clusterName).Get(name) + getAPIExportEndpointSlice: func(ctx context.Context, path logicalcluster.Path, name string) (*apisv1alpha1.APIExportEndpointSlice, error) { + return indexers.ByPathAndName[*apisv1alpha1.APIExportEndpointSlice](apisv1alpha1.Resource("apiexportendpointslices"), apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), path, name) }, getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) { return indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), globalAPIExportClusterInformer.Informer().GetIndexer(), path, name) @@ -168,12 +168,12 @@ type controller struct { listShards func(selector labels.Selector) ([]*corev1alpha1.Shard, error) listAPIExportEndpointSlices func() ([]*apisv1alpha1.APIExportEndpointSlice, error) - getAPIExportEndpointSlice func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExportEndpointSlice, error) + getAPIExportEndpointSlice func(ctx context.Context, path logicalcluster.Path, name string) (*apisv1alpha1.APIExportEndpointSlice, error) getAPIExport func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) getPartition func(clusterName logicalcluster.Name, name string) (*topologyv1alpha1.Partition, error) getAPIExportEndpointSlicesByPartition func(key string) ([]*apisv1alpha1.APIExportEndpointSlice, error) - apiExportEndpointSliceClusterInformer apisinformers.APIExportEndpointSliceClusterInformer + apiExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer commit CommitFunc } @@ -204,7 +204,7 @@ func (c *controller) enqueueAPIExportEndpointSlicesForAPIExport(obj interface{}) // binding keys by full path keys := sets.New[string]() if path := logicalcluster.NewPath(export.Annotations[core.LogicalClusterPathAnnotationKey]); !path.Empty() { - pathKeys, err := c.apiExportEndpointSliceClusterInformer.Informer().GetIndexer().IndexKeys(indexAPIExportEndpointSliceByAPIExport, path.Join(export.Name).String()) + pathKeys, err := c.apiExportEndpointSliceClusterInformer.Informer().GetIndexer().IndexKeys(indexers.APIExportEndpointSliceByAPIExport, path.Join(export.Name).String()) if err != nil { utilruntime.HandleError(err) return @@ -212,7 +212,7 @@ func (c *controller) enqueueAPIExportEndpointSlicesForAPIExport(obj interface{}) keys.Insert(pathKeys...) } - clusterKeys, err := c.apiExportEndpointSliceClusterInformer.Informer().GetIndexer().IndexKeys(indexAPIExportEndpointSliceByAPIExport, logicalcluster.From(export).Path().Join(export.Name).String()) + clusterKeys, err := c.apiExportEndpointSliceClusterInformer.Informer().GetIndexer().IndexKeys(indexers.APIExportEndpointSliceByAPIExport, logicalcluster.From(export).Path().Join(export.Name).String()) if err != nil { utilruntime.HandleError(err) return @@ -328,7 +328,7 @@ func (c *controller) process(ctx context.Context, key string) error { utilruntime.HandleError(err) return nil } - obj, err := c.getAPIExportEndpointSlice(clusterName, name) + obj, err := c.getAPIExportEndpointSlice(ctx, clusterName.Path(), name) if err != nil { if errors.IsNotFound(err) { return nil // object deleted before we handled it @@ -353,6 +353,7 @@ func (c *controller) process(ctx context.Context, key string) error { // If the object being reconciled changed as a result, update it. oldResource := &Resource{ObjectMeta: old.ObjectMeta, Spec: &old.Spec, Status: &old.Status} newResource := &Resource{ObjectMeta: obj.ObjectMeta, Spec: &obj.Spec, Status: &obj.Status} + if err := c.commit(ctx, oldResource, newResource); err != nil { errs = append(errs, err) } @@ -380,15 +381,19 @@ func filterShardEvent(oldObj, newObj interface{}) bool { } // InstallIndexers adds the additional indexers that this controller requires to the informers. -func InstallIndexers(globalAPIExportClusterInformer apisinformers.APIExportClusterInformer, apiExportEndpointSliceClusterInformer apisinformers.APIExportEndpointSliceClusterInformer) { +func InstallIndexers( + globalAPIExportClusterInformer apisv1alpha1informers.APIExportClusterInformer, + apiExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer, +) { indexers.AddIfNotPresentOrDie(globalAPIExportClusterInformer.Informer().GetIndexer(), cache.Indexers{ indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, }) - indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{ - indexAPIExportEndpointSliceByAPIExport: indexAPIExportEndpointSliceByAPIExportFunc, + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + }) + indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{ + indexers.APIExportEndpointSliceByAPIExport: indexers.IndexAPIExportEndpointSliceByAPIExport, }) - indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{ indexAPIExportEndpointSlicesByPartition: indexAPIExportEndpointSlicesByPartitionFunc, }) diff --git a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller_test.go b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller_test.go index 002e9c5e69e..b7d015304c7 100644 --- a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller_test.go +++ b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_controller_test.go @@ -79,7 +79,7 @@ func TestReconcile(t *testing.T) { wantPartitionNotValid: true, wantAPIExportEndpointSliceURLsError: true, }, - "APIExportEndpointSliceURLs set when no issue": { + "APIExportEndpointSliceReadyForURLs set when no issue": { wantAPIExportEndpointSliceURLsReady: true, wantAPIExportValid: true, wantPartitionValid: true, @@ -189,7 +189,7 @@ func TestReconcile(t *testing.T) { if tc.wantAPIExportEndpointSliceURLsError { requireConditionMatches(t, apiExportEndpointSlice, conditions.FalseCondition( - apisv1alpha1.APIExportEndpointSliceURLsReady, + apisv1alpha1.APIExportEndpointSliceReadyForURLs, apisv1alpha1.ErrorGeneratingURLsReason, conditionsv1alpha1.ConditionSeverityError, "", @@ -198,17 +198,13 @@ func TestReconcile(t *testing.T) { } if tc.wantAPIExportEndpointSliceURLsReady { - requireConditionMatches(t, apiExportEndpointSlice, conditions.TrueCondition(apisv1alpha1.APIExportEndpointSliceURLsReady)) - require.Equal(t, []apisv1alpha1.APIExportEndpoint{ - {URL: "https://server-1.kcp.dev/services/apiexport/root:org:ws/my-export"}, - {URL: "https://server-2.kcp.dev/services/apiexport/root:org:ws/my-export"}, - }, apiExportEndpointSlice.Status.APIExportEndpoints) + requireConditionMatches(t, apiExportEndpointSlice, conditions.TrueCondition(apisv1alpha1.APIExportEndpointSliceReadyForURLs)) } if tc.wantAPIExportEndpointSliceURLsUnknown { requireConditionMatches(t, apiExportEndpointSlice, conditions.UnknownCondition( - apisv1alpha1.APIExportEndpointSliceURLsReady, + apisv1alpha1.APIExportEndpointSliceReadyForURLs, apisv1alpha1.ErrorGeneratingURLsReason, "", ), diff --git a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_indexes.go b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_indexes.go index b55d565e8d1..6b052a7a989 100644 --- a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_indexes.go +++ b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_indexes.go @@ -26,24 +26,9 @@ import ( ) const ( - indexAPIExportEndpointSliceByAPIExport = "indexAPIExportEndpointSliceByAPIExport" indexAPIExportEndpointSlicesByPartition = "indexAPIExportEndpointSlicesByPartition" ) -// indexAPIExportEndpointSliceByAPIExportFunc indexes the APIExportEndpointSlice by their APIExport's Reference Path and Name. -func indexAPIExportEndpointSliceByAPIExportFunc(obj interface{}) ([]string, error) { - apiExportEndpointSlice, ok := obj.(*apisv1alpha1.APIExportEndpointSlice) - if !ok { - return []string{}, fmt.Errorf("obj %T is not an APIExportEndpointSlice", obj) - } - - path := logicalcluster.NewPath(apiExportEndpointSlice.Spec.APIExport.Path) - if path.Empty() { - path = logicalcluster.From(apiExportEndpointSlice).Path() - } - return []string{path.Join(apiExportEndpointSlice.Spec.APIExport.Name).String()}, nil -} - // indexAPIExportEndpointSlicesByPartitionFunc is an index function that maps a Partition to the key for its // spec.partition. func indexAPIExportEndpointSlicesByPartitionFunc(obj interface{}) ([]string, error) { diff --git a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_reconcile.go b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_reconcile.go index 5aaf4a73e38..4f6023c46ba 100644 --- a/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_reconcile.go +++ b/pkg/reconciler/apis/apiexportendpointslice/apiexportendpointslice_reconcile.go @@ -18,20 +18,13 @@ package apiexportendpointslice import ( "context" - "net/url" - "path" "github.com/kcp-dev/logicalcluster/v3" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/klog/v2" - virtualworkspacesoptions "github.com/kcp-dev/kcp/cmd/virtual-workspaces/options" - "github.com/kcp-dev/kcp/pkg/logging" - apiexportbuilder "github.com/kcp-dev/kcp/pkg/virtual/apiexport/builder" apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" conditionsv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/apis/conditions/v1alpha1" @@ -55,18 +48,13 @@ func (c *controller) reconcile(ctx context.Context, apiExportEndpointSlice *apis return r.reconcile(ctx, apiExportEndpointSlice) } -func (r *endpointsReconciler) reconcile(ctx context.Context, apiExportEndpointSlice *apisv1alpha1.APIExportEndpointSlice) error { - // TODO (fgiloux): When the information is available in the cache server - // check if at least one APIBinding is bound in the shard to the APIExport referenced by the APIExportEndpointSlice. - // If so, add the respective endpoint to the status. - // For now the unfiltered list is added. - +func (r *endpointsReconciler) reconcile(_ context.Context, apiExportEndpointSlice *apisv1alpha1.APIExportEndpointSlice) error { // Get APIExport apiExportPath := logicalcluster.NewPath(apiExportEndpointSlice.Spec.APIExport.Path) if apiExportPath.Empty() { apiExportPath = logicalcluster.From(apiExportEndpointSlice).Path() } - apiExport, err := r.getAPIExport(apiExportPath, apiExportEndpointSlice.Spec.APIExport.Name) + _, err := r.getAPIExport(apiExportPath, apiExportEndpointSlice.Spec.APIExport.Name) if err != nil { reason := apisv1alpha1.InternalErrorReason if errors.IsNotFound(err) { @@ -83,7 +71,7 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, apiExportEndpointSl ) conditions.MarkFalse( apiExportEndpointSlice, - apisv1alpha1.APIExportEndpointSliceURLsReady, + apisv1alpha1.APIExportEndpointSliceReadyForURLs, apisv1alpha1.ErrorGeneratingURLsReason, conditionsv1alpha1.ConditionSeverityError, "", @@ -102,7 +90,7 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, apiExportEndpointSl ) conditions.MarkUnknown( apiExportEndpointSlice, - apisv1alpha1.APIExportEndpointSliceURLsReady, + apisv1alpha1.APIExportEndpointSliceReadyForURLs, apisv1alpha1.ErrorGeneratingURLsReason, "", ) @@ -130,7 +118,7 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, apiExportEndpointSl ) conditions.MarkFalse( apiExportEndpointSlice, - apisv1alpha1.APIExportEndpointSliceURLsReady, + apisv1alpha1.APIExportEndpointSliceReadyForURLs, apisv1alpha1.ErrorGeneratingURLsReason, conditionsv1alpha1.ConditionSeverityError, "", @@ -148,7 +136,7 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, apiExportEndpointSl ) conditions.MarkUnknown( apiExportEndpointSlice, - apisv1alpha1.APIExportEndpointSliceURLsReady, + apisv1alpha1.APIExportEndpointSliceReadyForURLs, apisv1alpha1.ErrorGeneratingURLsReason, "", ) @@ -167,7 +155,7 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, apiExportEndpointSl ) conditions.MarkFalse( apiExportEndpointSlice, - apisv1alpha1.APIExportEndpointSliceURLsReady, + apisv1alpha1.APIExportEndpointSliceReadyForURLs, apisv1alpha1.ErrorGeneratingURLsReason, conditionsv1alpha1.ConditionSeverityError, "", @@ -178,14 +166,15 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, apiExportEndpointSl if selector == nil { selector = labels.Everything() } + conditions.MarkTrue(apiExportEndpointSlice, apisv1alpha1.PartitionValid) - // Get shards - shards, err := r.listShards(selector) + // Check if listing shards works. This is a good indicator that the cache is up to date. + _, err = r.listShards(selector) if err != nil { conditions.MarkFalse( apiExportEndpointSlice, - apisv1alpha1.APIExportEndpointSliceURLsReady, + apisv1alpha1.APIExportEndpointSliceReadyForURLs, apisv1alpha1.ErrorGeneratingURLsReason, conditionsv1alpha1.ConditionSeverityError, "error listing shards", @@ -193,62 +182,10 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, apiExportEndpointSl return err } - if err = r.updateEndpoints(ctx, apiExportEndpointSlice, apiExport, shards); err != nil { - conditions.MarkFalse( - apiExportEndpointSlice, - apisv1alpha1.APIExportEndpointSliceURLsReady, - apisv1alpha1.ErrorGeneratingURLsReason, - conditionsv1alpha1.ConditionSeverityError, - "%v", - err, - ) - return err - } - conditions.MarkTrue(apiExportEndpointSlice, apisv1alpha1.APIExportEndpointSliceURLsReady) - - return nil -} - -func (r *endpointsReconciler) updateEndpoints(ctx context.Context, - apiExportEndpointSlice *apisv1alpha1.APIExportEndpointSlice, - apiExport *apisv1alpha1.APIExport, - shards []*corev1alpha1.Shard) error { - logger := klog.FromContext(ctx) - desiredURLs := sets.New[string]() - for _, shard := range shards { - if shard.Spec.VirtualWorkspaceURL == "" { - continue - } - - u, err := url.Parse(shard.Spec.VirtualWorkspaceURL) - if err != nil { - // Should never happen - logger = logging.WithObject(logger, shard) - logger.Error( - err, "error parsing shard.spec.virtualWorkspaceURL", - "VirtualWorkspaceURL", shard.Spec.VirtualWorkspaceURL, - ) - - continue - } - - u.Path = path.Join( - u.Path, - virtualworkspacesoptions.DefaultRootPathPrefix, - apiexportbuilder.VirtualWorkspaceName, - logicalcluster.From(apiExport).String(), - apiExport.Name, - ) - - desiredURLs.Insert(u.String()) - } - - apiExportEndpointSlice.Status.APIExportEndpoints = nil - for _, u := range sets.List[string](desiredURLs) { - apiExportEndpointSlice.Status.APIExportEndpoints = append(apiExportEndpointSlice.Status.APIExportEndpoints, apisv1alpha1.APIExportEndpoint{ - URL: u, - }) - } + // We presenrve selector in the status for url generation. Else we don't know partition selector + // without propagating partitions over the cache. + apiExportEndpointSlice.Status.ShardSelector = selector.String() + conditions.MarkTrue(apiExportEndpointSlice, apisv1alpha1.APIExportEndpointSliceReadyForURLs) return nil } diff --git a/pkg/reconciler/apis/apiexportendpointsliceurls/apiexportendpointsliceurls_controller.go b/pkg/reconciler/apis/apiexportendpointsliceurls/apiexportendpointsliceurls_controller.go new file mode 100644 index 00000000000..36c2fe96a10 --- /dev/null +++ b/pkg/reconciler/apis/apiexportendpointsliceurls/apiexportendpointsliceurls_controller.go @@ -0,0 +1,481 @@ +/* +Copyright 2025 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiexportendpointsliceurls + +import ( + "context" + "fmt" + "reflect" + "time" + + kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" + "github.com/kcp-dev/logicalcluster/v3" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "github.com/kcp-dev/kcp/pkg/indexers" + "github.com/kcp-dev/kcp/pkg/logging" + "github.com/kcp-dev/kcp/pkg/reconciler/events" + apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" + "github.com/kcp-dev/kcp/sdk/apis/core" + corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" + apisv1alpha1apply "github.com/kcp-dev/kcp/sdk/client/applyconfiguration/apis/v1alpha1" + kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" + apisv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/apis/v1alpha1" + corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1" +) + +const ( + ControllerName = "kcp-apiexportendpointslice-urls" +) + +// NewController returns a new controller for APIExportEndpointSlices. +// Shards and APIExports are read from the cache server. +func NewController( + shardName string, + apiExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer, + apiBindingInformer apisv1alpha1informers.APIBindingClusterInformer, + globalAPIExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer, + globalShardClusterInformer corev1alpha1informers.ShardClusterInformer, + globalAPIExportClusterInformer apisv1alpha1informers.APIExportClusterInformer, + clusterClient kcpclientset.ClusterInterface, +) (*controller, error) { + c := &controller{ + shardName: shardName, + clusterClient: clusterClient, + queue: workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[string](), + workqueue.TypedRateLimitingQueueConfig[string]{ + Name: ControllerName, + }, + ), + listAPIExportEndpointSlices: func() ([]*apisv1alpha1.APIExportEndpointSlice, error) { + return apiExportEndpointSliceClusterInformer.Lister().List(labels.Everything()) + }, + listShards: func(selector labels.Selector) ([]*corev1alpha1.Shard, error) { + return globalShardClusterInformer.Lister().List(selector) + }, + getAPIExportEndpointSlice: func(ctx context.Context, path logicalcluster.Path, name string) (*apisv1alpha1.APIExportEndpointSlice, error) { + obj, err := indexers.ByPathAndNameWithFallback[*apisv1alpha1.APIExportEndpointSlice](apisv1alpha1.Resource("apiexportendpointslices"), apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), globalAPIExportEndpointSliceClusterInformer.Informer().GetIndexer(), path, name) + if err != nil { + return nil, err + } + return obj, err + }, + getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) { + return indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), globalAPIExportClusterInformer.Informer().GetIndexer(), path, name) + }, + listAPIBindingsByAPIExport: func(export *apisv1alpha1.APIExport) ([]*apisv1alpha1.APIBinding, error) { + // binding keys by full path + keys := sets.New[string]() + if path := logicalcluster.NewPath(export.Annotations[core.LogicalClusterPathAnnotationKey]); !path.Empty() { + pathKeys, err := apiBindingInformer.Informer().GetIndexer().IndexKeys(indexers.APIBindingsByAPIExport, path.Join(export.Name).String()) + if err != nil { + return nil, err + } + keys.Insert(pathKeys...) + } + + clusterKeys, err := apiBindingInformer.Informer().GetIndexer().IndexKeys(indexers.APIBindingsByAPIExport, logicalcluster.From(export).Path().Join(export.Name).String()) + if err != nil { + return nil, err + } + keys.Insert(clusterKeys...) + + bindings := make([]*apisv1alpha1.APIBinding, 0, keys.Len()) + for _, key := range sets.List[string](keys) { + binding, exists, err := apiBindingInformer.Informer().GetIndexer().GetByKey(key) + if err != nil { + utilruntime.HandleError(err) + continue + } else if !exists { + utilruntime.HandleError(fmt.Errorf("APIBinding %q does not exist", key)) + continue + } + bindings = append(bindings, binding.(*apisv1alpha1.APIBinding)) + } + return bindings, nil + }, + patchAPIExportEndpointSlice: func(ctx context.Context, cluster logicalcluster.Path, patch *apisv1alpha1apply.APIExportEndpointSliceApplyConfiguration) error { + _, err := clusterClient.ApisV1alpha1().APIExportEndpointSlices().Cluster(cluster).ApplyStatus(ctx, patch, metav1.ApplyOptions{ + FieldManager: shardName, + }) + return err + }, + apiExportEndpointSliceClusterInformer: apiExportEndpointSliceClusterInformer, + globalApiExportEndpointSliceClusterInformer: globalAPIExportEndpointSliceClusterInformer, + } + + _, _ = apiExportEndpointSliceClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.enqueueAPIExportEndpointSlice(obj) + }, + UpdateFunc: func(_, newObj interface{}) { + c.enqueueAPIExportEndpointSlice(newObj) + }, + DeleteFunc: func(obj interface{}) { + c.enqueueAPIExportEndpointSlice(obj) + }, + }) + + _, _ = apiBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.enqueueAPIExportEndpointSliceByAPIBinding(obj) + }, + UpdateFunc: func(_, newObj interface{}) { + c.enqueueAPIExportEndpointSliceByAPIBinding(newObj) + }, + DeleteFunc: func(obj interface{}) { + c.enqueueAPIExportEndpointSliceByAPIBinding(obj) + }, + }) + + _, _ = globalAPIExportEndpointSliceClusterInformer.Informer().AddEventHandler(events.WithoutSyncs(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.enqueueAPIExportEndpointSliceFromCache(obj) + }, + UpdateFunc: func(_, newObj interface{}) { + c.enqueueAPIExportEndpointSliceFromCache(newObj) + }, + DeleteFunc: func(obj interface{}) { + c.enqueueAPIExportEndpointSliceFromCache(obj) + }, + })) + + _, _ = globalAPIExportClusterInformer.Informer().AddEventHandler(events.WithoutSyncs(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.enqueueAPIExportEndpointSlicesForAPIExport(obj) + }, + DeleteFunc: func(obj interface{}) { + c.enqueueAPIExportEndpointSlicesForAPIExport(obj) + }, + })) + + _, _ = globalShardClusterInformer.Informer().AddEventHandler(events.WithoutSyncs(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.enqueueAllAPIExportEndpointSlices(obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if filterShardEvent(oldObj, newObj) { + c.enqueueAllAPIExportEndpointSlices(newObj) + } + }, + DeleteFunc: func(obj interface{}) { + c.enqueueAllAPIExportEndpointSlices(obj) + }, + })) + + return c, nil +} + +// controller reconciles APIExportEndpointSlices. It ensures that the shard endpoints are populated +// in the status of every APIExportEndpointSlices. +type controller struct { + queue workqueue.TypedRateLimitingInterface[string] + shardName string + clusterClient kcpclientset.ClusterInterface + + listShards func(selector labels.Selector) ([]*corev1alpha1.Shard, error) + listAPIExportEndpointSlices func() ([]*apisv1alpha1.APIExportEndpointSlice, error) + getAPIExportEndpointSlice func(ctx context.Context, path logicalcluster.Path, name string) (*apisv1alpha1.APIExportEndpointSlice, error) + getAPIExport func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) + listAPIBindingsByAPIExport func(apiexport *apisv1alpha1.APIExport) ([]*apisv1alpha1.APIBinding, error) + patchAPIExportEndpointSlice func(ctx context.Context, cluster logicalcluster.Path, patch *apisv1alpha1apply.APIExportEndpointSliceApplyConfiguration) error + + apiExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer + globalApiExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer +} + +func (c *controller) enqueueAPIExportEndpointSliceByAPIBinding(obj interface{}) { + if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = d.Obj + } + binding, ok := obj.(*apisv1alpha1.APIBinding) + if !ok { + utilruntime.HandleError(fmt.Errorf("obj is supposed to be a APIBinding, but is %T", obj)) + return + } + + { // local to shard + keys := sets.New[string]() + if path := logicalcluster.NewPath(binding.Spec.Reference.Export.Path); !path.Empty() { + pathKeys, err := c.apiExportEndpointSliceClusterInformer.Informer().GetIndexer().IndexKeys(indexers.APIExportEndpointSliceByAPIExport, path.Join(binding.Spec.Reference.Export.Name).String()) + if err != nil { + utilruntime.HandleError(err) + return + } + keys.Insert(pathKeys...) + } + + for _, key := range sets.List[string](keys) { + slice, exists, err := c.apiExportEndpointSliceClusterInformer.Informer().GetIndexer().GetByKey(key) + if err != nil { + utilruntime.HandleError(err) + continue + } else if !exists { + continue + } + logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), obj.(*apisv1alpha1.APIBinding)) + logging.WithQueueKey(logger, key).V(4).Info("queuing APIExportEndpointSlices because of consumer APIBinding") + c.enqueueAPIExportEndpointSlice(slice) + } + } + { + keys := sets.New[string]() + if path := logicalcluster.NewPath(binding.Spec.Reference.Export.Path); !path.Empty() { + pathKeys, err := c.globalApiExportEndpointSliceClusterInformer.Informer().GetIndexer().IndexKeys(indexers.APIExportEndpointSliceByAPIExport, path.Join(binding.Spec.Reference.Export.Name).String()) + if err != nil { + utilruntime.HandleError(err) + return + } + keys.Insert(pathKeys...) + } + + for _, key := range sets.List[string](keys) { + slice, exists, err := c.globalApiExportEndpointSliceClusterInformer.Informer().GetIndexer().GetByKey(key) + if err != nil { + utilruntime.HandleError(err) + continue + } else if !exists { + continue + } + logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), obj.(*apisv1alpha1.APIBinding)) + logging.WithQueueKey(logger, key).V(4).Info("queuing APIExportEndpointSlices because of consumer APIBinding from cache") + c.enqueueAPIExportEndpointSlice(slice) + } + } +} + +// enqueueAPIExportEndpointSlice enqueues an APIExportEndpointSlice. +func (c *controller) enqueueAPIExportEndpointSlice(obj interface{}) { + key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj) + if err != nil { + utilruntime.HandleError(err) + return + } + + logger := logging.WithQueueKey(logging.WithReconciler(klog.Background(), ControllerName), key) + logger.V(4).Info("queueing APIExportEndpointSlice") + c.queue.Add(key) +} + +// enqueueAPIExportEndpointSlice enqueues an APIExportEndpointSlice. +func (c *controller) enqueueAPIExportEndpointSliceFromCache(obj interface{}) { + key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj) + if err != nil { + utilruntime.HandleError(err) + return + } + + logger := logging.WithQueueKey(logging.WithReconciler(klog.Background(), ControllerName), key) + logger.V(4).Info("queueing APIExportEndpointSlice from cache") + c.queue.Add(key) +} + +// enqueueAPIExportEndpointSlicesForAPIExport enqueues APIExportEndpointSlices referencing a specific APIExport. +func (c *controller) enqueueAPIExportEndpointSlicesForAPIExport(obj interface{}) { + if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = d.Obj + } + export, ok := obj.(*apisv1alpha1.APIExport) + if !ok { + utilruntime.HandleError(fmt.Errorf("obj is supposed to be a APIExport, but is %T", obj)) + return + } + + // binding keys by full path + keys := sets.New[string]() + if path := logicalcluster.NewPath(export.Annotations[core.LogicalClusterPathAnnotationKey]); !path.Empty() { + pathKeys, err := c.apiExportEndpointSliceClusterInformer.Informer().GetIndexer().IndexKeys(indexers.APIExportEndpointSliceByAPIExport, path.Join(export.Name).String()) + if err != nil { + utilruntime.HandleError(err) + return + } + keys.Insert(pathKeys...) + } + + clusterKeys, err := c.apiExportEndpointSliceClusterInformer.Informer().GetIndexer().IndexKeys(indexers.APIExportEndpointSliceByAPIExport, logicalcluster.From(export).Path().Join(export.Name).String()) + if err != nil { + utilruntime.HandleError(err) + return + } + keys.Insert(clusterKeys...) + + for _, key := range sets.List[string](keys) { + slice, exists, err := c.apiExportEndpointSliceClusterInformer.Informer().GetIndexer().GetByKey(key) + if err != nil { + utilruntime.HandleError(err) + continue + } else if !exists { + continue + } + logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), obj.(*apisv1alpha1.APIExport)) + logging.WithQueueKey(logger, key).V(4).Info("queuing APIExportEndpointSlices because of referenced APIExport") + c.enqueueAPIExportEndpointSlice(slice) + } +} + +// enqueueAllAPIExportEndpointSlices enqueues all APIExportEndpointSlices. +func (c *controller) enqueueAllAPIExportEndpointSlices(shard interface{}) { + list, err := c.listAPIExportEndpointSlices() + if err != nil { + utilruntime.HandleError(err) + return + } + + logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), shard.(*corev1alpha1.Shard)) + for i := range list { + key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(list[i]) + if err != nil { + utilruntime.HandleError(err) + continue + } + + logging.WithQueueKey(logger, key).V(4).Info("queuing APIExportEndpointSlice because Shard changed") + c.queue.Add(key) + } +} + +// Start starts the controller, which stops when ctx.Done() is closed. +func (c *controller) Start(ctx context.Context, numThreads int) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + logger := logging.WithReconciler(klog.FromContext(ctx), ControllerName) + ctx = klog.NewContext(ctx, logger) + logger.Info("Starting controller") + defer logger.Info("Shutting down controller") + + for i := 0; i < numThreads; i++ { + go wait.UntilWithContext(ctx, c.startWorker, time.Second) + } + + <-ctx.Done() +} + +func (c *controller) startWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *controller) processNextWorkItem(ctx context.Context) bool { + // Wait until there is a new item in the working queue + k, quit := c.queue.Get() + if quit { + return false + } + key := k + + logger := logging.WithQueueKey(klog.FromContext(ctx), key) + ctx = klog.NewContext(ctx, logger) + logger.V(4).Info("processing key") + + // No matter what, tell the queue we're done with this key, to unblock + // other workers. + defer c.queue.Done(key) + + if err := c.process(ctx, key); err != nil { + utilruntime.HandleError(fmt.Errorf("%q controller failed to sync %q, err: %w", ControllerName, key, err)) + c.queue.AddRateLimited(key) + return true + } + c.queue.Forget(key) + return true +} + +func (c *controller) process(ctx context.Context, key string) error { + clusterName, _, name, err := kcpcache.SplitMetaClusterNamespaceKey(key) + if err != nil { + utilruntime.HandleError(err) + return nil + } + obj, err := c.getAPIExportEndpointSlice(ctx, clusterName.Path(), name) + if err != nil { + if errors.IsNotFound(err) { + return nil // object deleted before we handled it + } + return err + } + + obj = obj.DeepCopy() + + logger := logging.WithObject(klog.FromContext(ctx), obj) + ctx = klog.NewContext(ctx, logger) + + var errs []error + err = c.reconcile(ctx, obj) + if err != nil { + errs = append(errs, err) + } + + return utilerrors.NewAggregate(errs) +} + +// filterShardEvent returns true if the event passes the filter and needs to be processed false otherwise. +func filterShardEvent(oldObj, newObj interface{}) bool { + oldShard, ok := oldObj.(*corev1alpha1.Shard) + if !ok { + return false + } + newShard, ok := newObj.(*corev1alpha1.Shard) + if !ok { + return false + } + if oldShard.Spec.VirtualWorkspaceURL != newShard.Spec.VirtualWorkspaceURL { + return true + } + if !reflect.DeepEqual(oldShard.Labels, newShard.Labels) { + return true + } + return false +} + +// InstallIndexers adds the additional indexers that this controller requires to the informers. +func InstallIndexers( + globalAPIExportClusterInformer apisv1alpha1informers.APIExportClusterInformer, + globalAPIExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer, + apiExportEndpointSliceClusterInformer apisv1alpha1informers.APIExportEndpointSliceClusterInformer, + apiBindingInformer apisv1alpha1informers.APIBindingClusterInformer, +) { + indexers.AddIfNotPresentOrDie(globalAPIExportClusterInformer.Informer().GetIndexer(), cache.Indexers{ + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + }) + indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{ + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + }) + indexers.AddIfNotPresentOrDie(globalAPIExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{ + indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName, + }) + indexers.AddIfNotPresentOrDie(apiExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{ + indexers.APIExportEndpointSliceByAPIExport: indexers.IndexAPIExportEndpointSliceByAPIExport, + }) + indexers.AddIfNotPresentOrDie(globalAPIExportEndpointSliceClusterInformer.Informer().GetIndexer(), cache.Indexers{ + indexers.APIExportEndpointSliceByAPIExport: indexers.IndexAPIExportEndpointSliceByAPIExport, + }) + indexers.AddIfNotPresentOrDie(apiBindingInformer.Informer().GetIndexer(), cache.Indexers{ + indexers.APIBindingsByAPIExport: indexers.IndexAPIBindingByAPIExport, + }) +} diff --git a/pkg/reconciler/apis/apiexportendpointsliceurls/apiexportendpointsliceurls_reconcile.go b/pkg/reconciler/apis/apiexportendpointsliceurls/apiexportendpointsliceurls_reconcile.go new file mode 100644 index 00000000000..768a9ff326e --- /dev/null +++ b/pkg/reconciler/apis/apiexportendpointsliceurls/apiexportendpointsliceurls_reconcile.go @@ -0,0 +1,168 @@ +/* +Copyright 2025 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiexportendpointsliceurls + +import ( + "context" + "net/url" + "path" + + "github.com/kcp-dev/logicalcluster/v3" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/klog/v2" + + virtualworkspacesoptions "github.com/kcp-dev/kcp/cmd/virtual-workspaces/options" + "github.com/kcp-dev/kcp/pkg/logging" + apiexportbuilder "github.com/kcp-dev/kcp/pkg/virtual/apiexport/builder" + apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" + corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" + "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/util/conditions" + apisv1alpha1apply "github.com/kcp-dev/kcp/sdk/client/applyconfiguration/apis/v1alpha1" +) + +type endpointsReconciler struct { + listShards func(selector labels.Selector) ([]*corev1alpha1.Shard, error) + getAPIExport func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) + listAPIBindingsByAPIExport func(apiexport *apisv1alpha1.APIExport) ([]*apisv1alpha1.APIBinding, error) + patchAPIExportEndpointSlice func(ctx context.Context, cluster logicalcluster.Path, patch *apisv1alpha1apply.APIExportEndpointSliceApplyConfiguration) error + shardName string +} + +type result struct { + url string + remove bool +} + +func (c *controller) reconcile(ctx context.Context, apiExportEndpointSlice *apisv1alpha1.APIExportEndpointSlice) error { + r := &endpointsReconciler{ + listShards: c.listShards, + getAPIExport: c.getAPIExport, + listAPIBindingsByAPIExport: c.listAPIBindingsByAPIExport, + shardName: c.shardName, + patchAPIExportEndpointSlice: c.patchAPIExportEndpointSlice, + } + + return r.reconcile(ctx, apiExportEndpointSlice) +} + +func (r *endpointsReconciler) reconcile(ctx context.Context, apiExportEndpointSlice *apisv1alpha1.APIExportEndpointSlice) error { + if !conditions.IsTrue(apiExportEndpointSlice, apisv1alpha1.APIExportEndpointSliceReadyForURLs) { + return nil + } + + s := apiExportEndpointSlice.Status.ShardSelector + if s == "" { // should never happen. + return nil + } + + selector, err := labels.Parse(s) + if err != nil { + return err + } + + apiExportPath := logicalcluster.NewPath(apiExportEndpointSlice.Spec.APIExport.Path) + if apiExportPath.Empty() { + apiExportPath = logicalcluster.From(apiExportEndpointSlice).Path() + } + apiExport, err := r.getAPIExport(apiExportPath, apiExportEndpointSlice.Spec.APIExport.Name) + if err != nil { + return err + } + + shards, err := r.listShards(selector) + if err != nil { + return err + } + + rs, err := r.updateEndpoints(ctx, apiExportEndpointSlice, apiExport, shards) + if err != nil { + return err + } + if rs == nil { // no change, nothing to do. + return nil + } + + // Patch the object + patch := apisv1alpha1apply.APIExportEndpointSlice(apiExportEndpointSlice.Name) + if rs.remove { + patch.WithStatus(apisv1alpha1apply.APIExportEndpointSliceStatus()) + } else { + patch.WithStatus(apisv1alpha1apply.APIExportEndpointSliceStatus(). + WithAPIExportEndpoints(apisv1alpha1apply.APIExportEndpoint().WithURL(rs.url))) + } + cluster := logicalcluster.From(apiExportEndpointSlice) + return r.patchAPIExportEndpointSlice(ctx, cluster.Path(), patch) +} + +func (r *endpointsReconciler) updateEndpoints(ctx context.Context, + apiExportEndpointSlice *apisv1alpha1.APIExportEndpointSlice, + apiExport *apisv1alpha1.APIExport, + shards []*corev1alpha1.Shard) (*result, error) { + logger := klog.FromContext(ctx) + var rs result + for _, shard := range shards { + if shard.Name != r.shardName { + continue + } + if shard.Spec.VirtualWorkspaceURL == "" { + continue + } + + // Check if we have local consumers + bindings, err := r.listAPIBindingsByAPIExport(apiExport) + if err != nil { + return nil, err + } + + if len(bindings) == 0 { + return &result{ + remove: true, + }, nil + } + + u, err := url.Parse(shard.Spec.VirtualWorkspaceURL) + if err != nil { + // Should never happen + logger = logging.WithObject(logger, shard) + logger.Error( + err, "error parsing shard.spec.virtualWorkspaceURL", + "VirtualWorkspaceURL", shard.Spec.VirtualWorkspaceURL, + ) + continue + } + + u.Path = path.Join( + u.Path, + virtualworkspacesoptions.DefaultRootPathPrefix, + apiexportbuilder.VirtualWorkspaceName, + logicalcluster.From(apiExport).String(), + apiExport.Name, + ) + + rs.url = u.String() + break + } + + for _, u := range apiExportEndpointSlice.Status.APIExportEndpoints { + if u.URL == rs.url { + return nil, nil + } + } + + return &rs, nil +} diff --git a/pkg/reconciler/cache/replication/replication_controller.go b/pkg/reconciler/cache/replication/replication_controller.go index c43e9801b67..0e133883f7c 100644 --- a/pkg/reconciler/cache/replication/replication_controller.go +++ b/pkg/reconciler/cache/replication/replication_controller.go @@ -209,7 +209,6 @@ func InstallIndexers( Local: localKcpInformers.Apis().V1alpha1().APIExportEndpointSlices().Informer(), Global: globalKcpInformers.Apis().V1alpha1().APIExportEndpointSlices().Informer(), }, - apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"): { Kind: "APIResourceSchema", Local: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(), diff --git a/pkg/server/controllers.go b/pkg/server/controllers.go index ac6ee296776..900995354f1 100644 --- a/pkg/server/controllers.go +++ b/pkg/server/controllers.go @@ -63,6 +63,7 @@ import ( "github.com/kcp-dev/kcp/pkg/reconciler/apis/apibindingdeletion" "github.com/kcp-dev/kcp/pkg/reconciler/apis/apiexport" "github.com/kcp-dev/kcp/pkg/reconciler/apis/apiexportendpointslice" + "github.com/kcp-dev/kcp/pkg/reconciler/apis/apiexportendpointsliceurls" "github.com/kcp-dev/kcp/pkg/reconciler/apis/crdcleanup" "github.com/kcp-dev/kcp/pkg/reconciler/apis/extraannotationsync" "github.com/kcp-dev/kcp/pkg/reconciler/apis/identitycache" @@ -1266,7 +1267,7 @@ func (s *Server) installTenancyReplicateClusterRoleBindingControllers(ctx contex }) } -func (s *Server) installAPIExportEndpointSliceController(ctx context.Context, config *rest.Config) error { +func (s *Server) installAPIExportEndpointSliceController(_ context.Context, config *rest.Config) error { config = rest.CopyConfig(config) config = rest.AddUserAgent(config, apiexportendpointslice.ControllerName) @@ -1294,7 +1295,49 @@ func (s *Server) installAPIExportEndpointSliceController(ctx context.Context, co return s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards().Informer().HasSynced() && s.KcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices().Informer().HasSynced() && s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().HasSynced() && - s.KcpSharedInformerFactory.Topology().V1alpha1().Partitions().Informer().HasSynced(), nil + s.KcpSharedInformerFactory.Topology().V1alpha1().Partitions().Informer().HasSynced() && + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices().Informer().HasSynced() && + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings().Informer().HasSynced(), nil + }) + }, + Runner: func(ctx context.Context) { + c.Start(ctx, 2) + }, + }) +} + +func (s *Server) installAPIExportEndpointSliceURLsController(_ context.Context, _ *rest.Config) error { + config := rest.CopyConfig(s.ExternalLogicalClusterAdminConfig) + config = rest.AddUserAgent(config, apiexportendpointsliceurls.ControllerName) + + kcpClusterClient, err := kcpclientset.NewForConfig(config) + if err != nil { + return err + } + + c, err := apiexportendpointsliceurls.NewController( + s.Options.Extra.ShardName, + s.KcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices(), + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), + // Shards and APIExports get retrieved from cache server + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices(), + s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + kcpClusterClient, + ) + if err != nil { + return err + } + + return s.registerController(&controllerWrapper{ + Name: apiexportendpointsliceurls.ControllerName, + Wait: func(ctx context.Context, s *Server) error { + return wait.PollUntilContextCancel(ctx, waitPollInterval, true, func(ctx context.Context) (bool, error) { + return s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards().Informer().HasSynced() && + s.KcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices().Informer().HasSynced() && + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices().Informer().HasSynced() && + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().HasSynced() && + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings().Informer().HasSynced(), nil }) }, Runner: func(ctx context.Context) { @@ -1551,6 +1594,12 @@ func (s *Server) addIndexersToInformers(_ context.Context) map[schema.GroupVersi s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), s.KcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices(), ) + apiexportendpointsliceurls.InstallIndexers( + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + s.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices(), + s.KcpSharedInformerFactory.Apis().V1alpha1().APIExportEndpointSlices(), + s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), + ) labelclusterrolebindings.InstallIndexers( s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), ) diff --git a/pkg/server/server.go b/pkg/server/server.go index 2363e03fc4b..6004206ffb8 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -314,6 +314,9 @@ func (s *Server) installControllers(ctx context.Context, controllerConfig *rest. if err := s.installAPIExportEndpointSliceController(ctx, controllerConfig); err != nil { return err } + if err := s.installAPIExportEndpointSliceURLsController(ctx, controllerConfig); err != nil { + return err + } } if s.Options.Controllers.EnableAll || enabled.Has("apibinder") { diff --git a/sdk/apis/apis/v1alpha1/types_apiexportendpointslice.go b/sdk/apis/apis/v1alpha1/types_apiexportendpointslice.go index f8f90571042..356a09c1909 100644 --- a/sdk/apis/apis/v1alpha1/types_apiexportendpointslice.go +++ b/sdk/apis/apis/v1alpha1/types_apiexportendpointslice.go @@ -1,5 +1,5 @@ /* -Copyright 2022 The KCP Authors. +Copyright 2025 The KCP Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -73,10 +73,19 @@ type APIExportEndpointSliceStatus struct { // conditions is a list of conditions that apply to the APIExportEndpointSlice. Conditions conditionsv1alpha1.Conditions `json:"conditions,omitempty"` + // endpoints contains all the URLs of the APIExport service. + // // +optional + // +listType=map + // +listMapKey=url + APIExportEndpoints []APIExportEndpoint `json:"endpoints"` - // endpoints contains all the URLs of the APIExport service. - APIExportEndpoints []APIExportEndpoint `json:"endpoints,omitempty"` + // +optional + + // shardSelector is the selector used to filter the shards. It is used to filter the shards + // when determining partition scope when deriving the endpoints. This is set by owning shard, + // and is used by follower shards to determine if its inscope or not. + ShardSelector string `json:"shardSelector,omitempty"` } // Using a struct provides an extension point @@ -106,7 +115,10 @@ const ( // PartitionValid is a condition for APIExportEndpointSlice that reflects the validity of the referenced Partition. PartitionValid conditionsv1alpha1.ConditionType = "PartitionValid" - APIExportEndpointSliceURLsReady conditionsv1alpha1.ConditionType = "EndpointURLsReady" + // EndpointSliceReadyForURLs is a condition for APIExportEndpointSlice that reflects the readiness of the slice to + // provide URLs. It is set to True when the slice is ready to provide URLs which will be lifecycled based on existing consumers in the different + // shards. + APIExportEndpointSliceReadyForURLs conditionsv1alpha1.ConditionType = "EndpointReadyForURLs" // PartitionInvalidReferenceReason is a reason for the PartitionValid condition of APIExportEndpointSlice that the // Partition reference is invalid. diff --git a/sdk/client/applyconfiguration/apis/v1alpha1/apiexportendpointslicestatus.go b/sdk/client/applyconfiguration/apis/v1alpha1/apiexportendpointslicestatus.go index a6b7d4f0f34..ad44fc9a310 100644 --- a/sdk/client/applyconfiguration/apis/v1alpha1/apiexportendpointslicestatus.go +++ b/sdk/client/applyconfiguration/apis/v1alpha1/apiexportendpointslicestatus.go @@ -27,6 +27,7 @@ import ( type APIExportEndpointSliceStatusApplyConfiguration struct { Conditions *v1alpha1.Conditions `json:"conditions,omitempty"` APIExportEndpoints []APIExportEndpointApplyConfiguration `json:"endpoints,omitempty"` + ShardSelector *string `json:"shardSelector,omitempty"` } // APIExportEndpointSliceStatusApplyConfiguration constructs a declarative configuration of the APIExportEndpointSliceStatus type for use with @@ -55,3 +56,11 @@ func (b *APIExportEndpointSliceStatusApplyConfiguration) WithAPIExportEndpoints( } return b } + +// WithShardSelector sets the ShardSelector field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the ShardSelector field is set to the value of the last call. +func (b *APIExportEndpointSliceStatusApplyConfiguration) WithShardSelector(value string) *APIExportEndpointSliceStatusApplyConfiguration { + b.ShardSelector = &value + return b +} diff --git a/test/e2e/reconciler/apiexportendpointslice/apiexportendpointslice_test.go b/test/e2e/reconciler/apiexportendpointslice/apiexportendpointslice_test.go index 82b27d95d00..446e26d81b5 100644 --- a/test/e2e/reconciler/apiexportendpointslice/apiexportendpointslice_test.go +++ b/test/e2e/reconciler/apiexportendpointslice/apiexportendpointslice_test.go @@ -18,27 +18,36 @@ package apiexportendpointslice import ( "context" + "embed" "fmt" "testing" "time" "github.com/davecgh/go-spew/spew" + kcpdynamic "github.com/kcp-dev/client-go/dynamic" + "github.com/kcp-dev/logicalcluster/v3" "github.com/stretchr/testify/require" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery/cached/memory" + "k8s.io/client-go/restmapper" "k8s.io/client-go/util/retry" + "github.com/kcp-dev/kcp/config/helpers" apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" "github.com/kcp-dev/kcp/sdk/apis/core" - corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" + "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/util/conditions" topologyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/topology/v1alpha1" kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" "github.com/kcp-dev/kcp/test/e2e/framework" ) +//go:embed *.yaml +var testFiles embed.FS + func TestAPIExportEndpointSliceWithPartition(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) @@ -116,7 +125,7 @@ func TestAPIExportEndpointSliceWithPartition(t *testing.T) { slice, err = kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceName, metav1.GetOptions{}) require.NoError(t, err) - if conditions.IsTrue(slice, apisv1alpha1.APIExportValid) && conditions.IsTrue(slice, apisv1alpha1.APIExportEndpointSliceURLsReady) { + if conditions.IsTrue(slice, apisv1alpha1.APIExportValid) && conditions.IsTrue(slice, apisv1alpha1.APIExportEndpointSliceReadyForURLs) { return true, "" } @@ -143,7 +152,7 @@ func TestAPIExportEndpointSliceWithPartition(t *testing.T) { return false, spew.Sdump(slice.Status.Conditions) }, wait.ForeverTestTimeout, 100*time.Millisecond, "expected missing Partition") require.True(t, len(slice.Status.APIExportEndpoints) == 0, "not expecting any endpoint") - require.True(t, conditions.IsFalse(slice, apisv1alpha1.APIExportEndpointSliceURLsReady), "expecting URLs not ready condition") + require.True(t, conditions.IsFalse(slice, apisv1alpha1.APIExportEndpointSliceReadyForURLs), "expecting URLs not ready condition") t.Logf("Creating the missing Partition") partitionClient := kcpClusterClient.TopologyV1alpha1().Partitions() @@ -162,208 +171,243 @@ func TestAPIExportEndpointSliceWithPartition(t *testing.T) { t.Logf("Checking that no endpoint has been populated") require.True(t, len(slice.Status.APIExportEndpoints) == 0, "not expecting any endpoint") - require.True(t, conditions.IsTrue(slice, apisv1alpha1.APIExportEndpointSliceURLsReady), "expecting the URLs ready condition") + require.True(t, conditions.IsTrue(slice, apisv1alpha1.APIExportEndpointSliceReadyForURLs), "expecting the URLs ready condition") } -func TestAPIExportEndpointSliceWithPartitionPrivate(t *testing.T) { +func TestAPIBindingEndpointSlicesSharded(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - server := framework.PrivateKcpServer(t) - // Create Organization and Workspaces - orgPath, _ := framework.NewOrganizationFixture(t, server) - exportClusterPath, _ := framework.NewWorkspaceFixture(t, server, orgPath) - partitionClusterPath, _ := framework.NewWorkspaceFixture(t, server, orgPath) + framework.Suite(t, "control-plane") + + server := framework.SharedKcpServer(t) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) cfg := server.BaseConfig(t) - var err error - kcpClusterClient, err := kcpclientset.NewForConfig(cfg) - require.NoError(t, err, "failed to construct kcp cluster client for server") + t.Logf("Check if we can access shards") + var shards *v1alpha1.ShardList + { + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") - export := &apisv1alpha1.APIExport{ - ObjectMeta: metav1.ObjectMeta{ - Name: "my-export", - }, - } + shards, err = kcpClusterClient.Cluster(core.RootCluster.Path()).CoreV1alpha1().Shards().List(ctx, metav1.ListOptions{}) + require.NoError(t, err, "failed to list shards") - slice := &apisv1alpha1.APIExportEndpointSlice{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "my-slice", - }, - Spec: apisv1alpha1.APIExportEndpointSliceSpec{ - APIExport: apisv1alpha1.ExportBindingReference{ - Path: exportClusterPath.String(), - Name: export.Name, - }, - }, - } - - partition := &topologyv1alpha1.Partition{ - ObjectMeta: metav1.ObjectMeta{ - Name: "my-partition", - }, - Spec: topologyv1alpha1.PartitionSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "region": "apiexportendpointslice-test-region", - }, - }, - }, + if len(shards.Items) < 2 { + t.Skipf("Need at least 2 shards to run this test, got %d", len(shards.Items)) + return + } } - t.Logf("Creating the APIExport") - exportClient := kcpClusterClient.ApisV1alpha1().APIExports() - _, err = exportClient.Cluster(exportClusterPath).Create(ctx, export, metav1.CreateOptions{}) - require.NoError(t, err, "error creating APIExport") + t.Logf("Setup provider workspace") + var orgPath, providerPath logicalcluster.Path + { + orgPath, _ = framework.NewOrganizationFixture(t, server) + providerPath, _ = framework.NewWorkspaceFixture(t, server, orgPath, framework.WithName("service-provider")) - t.Logf("Creating the APIExportEndpointSlice") - sliceClient := kcpClusterClient.ApisV1alpha1().APIExportEndpointSlices() - // allow some time for APIExport to be synced onto the cache server - framework.Eventually(t, func() (bool, string) { - slice, err = sliceClient.Cluster(partitionClusterPath).Create(ctx, slice, metav1.CreateOptions{}) - if err != nil { - return false, err.Error() - } - return true, "" - }, wait.ForeverTestTimeout, 100*time.Millisecond, "expected successful creation of APIExportEndpointSlice") - sliceName := slice.Name + serviceProviderClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") + dynamicClusterClient, err := kcpdynamic.NewForConfig(cfg) + require.NoError(t, err, "failed to construct dynamic cluster client for server") - framework.Eventually(t, func() (bool, string) { - slice, err = kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceName, metav1.GetOptions{}) + mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(serviceProviderClient.Cluster(providerPath).Discovery())) + err = helpers.CreateResourceFromFS(ctx, dynamicClusterClient.Cluster(providerPath), mapper, nil, "apiresourceschema_cowboys.yaml", testFiles) require.NoError(t, err) - if conditions.IsTrue(slice, apisv1alpha1.APIExportValid) { - return true, "" + + t.Logf("Create an APIExport today-cowboys in %q", providerPath) + cowboysAPIExport := &apisv1alpha1.APIExport{ + ObjectMeta: metav1.ObjectMeta{ + Name: "today-cowboys", + }, + Spec: apisv1alpha1.APIExportSpec{ + LatestResourceSchemas: []string{"today.cowboys.wildwest.dev"}, + }, } - return false, spew.Sdump(slice.Status.Conditions) - }, wait.ForeverTestTimeout, 100*time.Millisecond, "expected valid APIExport") - require.True(t, conditions.IsTrue(slice, apisv1alpha1.APIExportEndpointSliceURLsReady), "expecting URLs ready condition") + _, err = serviceProviderClient.Cluster(providerPath).ApisV1alpha1().APIExports().Create(ctx, cowboysAPIExport, metav1.CreateOptions{}) + require.NoError(t, err) + } - t.Logf("Creating the Partition") - partitionClient := kcpClusterClient.TopologyV1alpha1().Partitions() - _, err = partitionClient.Cluster(partitionClusterPath).Create(ctx, partition, metav1.CreateOptions{}) - require.NoError(t, err, "error creating Partition") + t.Logf("Create a consumer workspaces - one per shard") + var bindShardname string + { + for _, shard := range shards.Items { + if bindShardname == "" { // bind to the first shard only + bindShardname = shard.Name + } + if bindShardname != shard.Name { + continue + } + consumerPath, _ := framework.NewWorkspaceFixture(t, server, orgPath, framework.WithName("consumer-bound-against-%s", shard.Name), framework.WithShard(shard.Name)) + + t.Logf("Create an APIBinding in %q that points to the today-cowboys export from %q", consumerPath, providerPath) + apiBinding := &apisv1alpha1.APIBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cowboys", + }, + Spec: apisv1alpha1.APIBindingSpec{ + Reference: apisv1alpha1.BindingReference{ + Export: &apisv1alpha1.ExportBindingReference{ + Path: providerPath.String(), + Name: "today-cowboys", + }, + }, + }, + } - t.Logf("Adding a Partition to the APIExportEndpointSlice") - slice.Spec.Partition = partition.Name - _, err = kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Update(ctx, slice, metav1.UpdateOptions{}) - require.NoError(t, err, "error updating APIExportEndpointSlice") + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") - framework.Eventually(t, func() (bool, string) { - s, err := kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceName, metav1.GetOptions{}) - require.NoError(t, err) - if conditions.IsTrue(s, apisv1alpha1.PartitionValid) { - return true, "" + framework.Eventually(t, func() (bool, string) { + _, err = kcpClusterClient.Cluster(consumerPath).ApisV1alpha1().APIBindings().Create(ctx, apiBinding, metav1.CreateOptions{}) + return err == nil, fmt.Sprintf("Error creating APIBinding: %v", err) + }, wait.ForeverTestTimeout, time.Millisecond*100) } - return false, spew.Sdump(s.Status.Conditions) - }, wait.ForeverTestTimeout, 100*time.Millisecond, "expected valid Partition") - require.True(t, conditions.IsTrue(slice, apisv1alpha1.APIExportEndpointSliceURLsReady), "expecting URLs ready condition") + } - t.Logf("Checking that no endpoint has been populated") - framework.Eventually(t, func() (bool, string) { - s, err := kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceName, metav1.GetOptions{}) + // TODO(mjudeikis): This will be deprecated when we deperecate APIExport urls. + t.Logf("Check that APIExport has 2 virtual workspaces") + { + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") + + apiExport, err := kcpClusterClient.Cluster(providerPath).ApisV1alpha1().APIExports().Get(ctx, "today-cowboys", metav1.GetOptions{}) require.NoError(t, err) - if len(s.Status.APIExportEndpoints) == 0 { - return true, "" - } - return false, fmt.Sprintf("expected 0 endpoints, but got: %#v", s.Status.APIExportEndpoints) - }, wait.ForeverTestTimeout, 100*time.Millisecond, "not expecting any endpoint") - // Endpoint tests require the edition of shards. - // These tests are run on a private cluster to avoid side effects on other e2e tests. - // They require the resources previously created: APIExport, APIExportEndpointSlice, etc. - shard := &corev1alpha1.Shard{ - ObjectMeta: metav1.ObjectMeta{ - Name: "my-shard", - Labels: map[string]string{ - "region": "apiexportendpointslice-test-region", - }, - }, - Spec: corev1alpha1.ShardSpec{ - BaseURL: "https://base.kcp.test.dev", - }, + //nolint:staticcheck // SA1019 VirtualWorkspaces is deprecated but not removed yet + require.Len(t, apiExport.Status.VirtualWorkspaces, 2) } - t.Logf("Creating a shard in the region") - shardClient := kcpClusterClient.CoreV1alpha1().Shards() - shard, err = shardClient.Cluster(core.RootCluster.Path()).Create(ctx, shard, metav1.CreateOptions{}) - require.NoError(t, err, "error creating Shard") + t.Logf("Create a topology PartitionSet for the providers") + var partition *topologyv1alpha1.Partition + { + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") - framework.Eventually(t, func() (bool, string) { - slice, err = kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceName, metav1.GetOptions{}) + _, err = kcpClusterClient.Cluster(providerPath).TopologyV1alpha1().PartitionSets().Create(ctx, &topologyv1alpha1.PartitionSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cowboys", + }, + Spec: topologyv1alpha1.PartitionSetSpec{ + ShardSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "shared": "true", + }, + }, + }, + }, metav1.CreateOptions{}) require.NoError(t, err) - if len(slice.Status.APIExportEndpoints) == 1 { - return true, "" - } - return false, fmt.Sprintf("expected 1 endpoint, but got: %#v", slice.Status.APIExportEndpoints) - }, wait.ForeverTestTimeout, 100*time.Millisecond, "expecting a single endpoint") - require.Contains(t, slice.Status.APIExportEndpoints[0].URL, export.Name) - t.Logf("Updating the previously created shard") - shard.Labels["region"] = "doesnotexist" - shard, err = shardClient.Cluster(core.RootCluster.Path()).Update(ctx, shard, metav1.UpdateOptions{}) - require.NoError(t, err, "error updating Shard") + // Partition should be created + framework.Eventually(t, func() (bool, string) { + partitions, err := kcpClusterClient.Cluster(providerPath).TopologyV1alpha1().Partitions().List(ctx, metav1.ListOptions{}) + if err == nil && len(partitions.Items) == 1 { + partition = &partitions.Items[0] + return true, "" + } + return false, fmt.Sprintf("Error listing partitions: %v", err) + }, wait.ForeverTestTimeout, time.Millisecond*500) + } - framework.Eventually(t, func() (bool, string) { - s, err := kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceName, metav1.GetOptions{}) + t.Logf("Create APIExportEndpointSlice for consumers") + { + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") + + _, err = kcpClusterClient.Cluster(providerPath).ApisV1alpha1().APIExportEndpointSlices().Create(ctx, &apisv1alpha1.APIExportEndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shared-cowboys", + }, + Spec: apisv1alpha1.APIExportEndpointSliceSpec{ + Partition: partition.Name, + APIExport: apisv1alpha1.ExportBindingReference{ + Path: providerPath.String(), + Name: "today-cowboys", + }, + }, + }, metav1.CreateOptions{}) require.NoError(t, err) - if len(s.Status.APIExportEndpoints) == 0 { - return true, "" - } - return false, fmt.Sprintf("expected 0 endpoints, but got: %#v", s.Status.APIExportEndpoints) - }, wait.ForeverTestTimeout, 100*time.Millisecond, "expecting no endpoint") - t.Logf("Setting back the correct label") - shard.Labels["region"] = "apiexportendpointslice-test-region" - shard, err = shardClient.Cluster(core.RootCluster.Path()).Update(ctx, shard, metav1.UpdateOptions{}) - require.NoError(t, err, "error updating Shard") + // we should have 1 APIExportEndpointSlice with 1 APIExportEndpoint as we bound only once. + framework.Eventually(t, func() (bool, string) { + slice, err := kcpClusterClient.Cluster(providerPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, "shared-cowboys", metav1.GetOptions{}) + if len(slice.Status.APIExportEndpoints) == 1 { + return true, "" + } + return false, fmt.Sprintf("APIExportEndpointSlice has %d endpoints: %v", len(slice.Status.APIExportEndpoints), err) + }, wait.ForeverTestTimeout*50, time.Millisecond*500) + } - framework.Eventually(t, func() (bool, string) { - s, err := kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceName, metav1.GetOptions{}) - require.NoError(t, err) - if len(s.Status.APIExportEndpoints) == 1 { - return true, "" - } - return false, fmt.Sprintf("expected 1 endpoint, but got: %#v", s.Status.APIExportEndpoints) - }, wait.ForeverTestTimeout, 100*time.Millisecond, "expecting a single endpoint") + t.Logf("Create consumer on second shard and observe APIExportEndpointSlice to have second url added") + var consumerPath logicalcluster.Path + { + for _, shard := range shards.Items { + if bindShardname == shard.Name { + continue + } + consumerPath, _ = framework.NewWorkspaceFixture(t, server, orgPath, framework.WithName("consumer-bound-against-%s", shard.Name), framework.WithShard(shard.Name)) + + t.Logf("Create an APIBinding in %q that points to the today-cowboys export from %q", consumerPath, providerPath) + apiBinding := &apisv1alpha1.APIBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cowboys", + }, + Spec: apisv1alpha1.APIBindingSpec{ + Reference: apisv1alpha1.BindingReference{ + Export: &apisv1alpha1.ExportBindingReference{ + Path: providerPath.String(), + Name: "today-cowboys", + }, + }, + }, + } - t.Logf("Deleting the shard") - err = shardClient.Cluster(core.RootCluster.Path()).Delete(ctx, shard.Name, metav1.DeleteOptions{}) - require.NoError(t, err, "error deleting Shard") + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") - framework.Eventually(t, func() (bool, string) { - s, err := kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceName, metav1.GetOptions{}) - require.NoError(t, err) - if len(s.Status.APIExportEndpoints) == 0 { - return true, "" + framework.Eventually(t, func() (bool, string) { + _, err = kcpClusterClient.Cluster(consumerPath).ApisV1alpha1().APIBindings().Create(ctx, apiBinding, metav1.CreateOptions{}) + return err == nil, fmt.Sprintf("Error creating APIBinding: %v", err) + }, wait.ForeverTestTimeout, time.Millisecond*500) } - return false, fmt.Sprintf("expected 0 endpoints, but got: %#v", s.Status.APIExportEndpoints) - }, wait.ForeverTestTimeout, 100*time.Millisecond, "expecting no endpoint") + } - t.Logf("Creating a slice without partition") - sliceWithAll := &apisv1alpha1.APIExportEndpointSlice{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "my-slice-without-partition", - }, - Spec: apisv1alpha1.APIExportEndpointSliceSpec{ - APIExport: apisv1alpha1.ExportBindingReference{ - Path: exportClusterPath.String(), - Name: slice.Spec.APIExport.Name, - }, - }, + t.Logf("Check that APIExportEndpointSlices has 2 virtual workspaces") + { + framework.Eventually(t, func() (bool, string) { + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") + + slice, err := kcpClusterClient.Cluster(providerPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, "shared-cowboys", metav1.GetOptions{}) + if len(slice.Status.APIExportEndpoints) == 2 { + return true, "" + } + return false, fmt.Sprintf("APIExportEndpointSlice has %d endpoints: %v", len(slice.Status.APIExportEndpoints), err) + }, wait.ForeverTestTimeout*50, time.Millisecond*500) } - sliceWithAll, err = sliceClient.Cluster(partitionClusterPath).Create(ctx, sliceWithAll, metav1.CreateOptions{}) - require.NoError(t, err, "error creating APIExportEndpointSlice") - sliceWithAllName := sliceWithAll.Name + t.Logf("Delete consumer on second shard and observe APIExportEndpointSlice to have second url removed") + { + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") - framework.Eventually(t, func() (bool, string) { - sliceWithAll, err := kcpClusterClient.Cluster(partitionClusterPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, sliceWithAllName, metav1.GetOptions{}) - require.NoError(t, err) - if len(sliceWithAll.Status.APIExportEndpoints) == 1 { - return true, "" - } - return false, fmt.Sprintf("expected 1 endpoint, but got: %#v", sliceWithAll.Status.APIExportEndpoints) - }, wait.ForeverTestTimeout, 100*time.Millisecond, "expecting a single endpoint for the root shard, got %d", len(sliceWithAll.Status.APIExportEndpoints)) + framework.Eventually(t, func() (bool, string) { + err := kcpClusterClient.Cluster(consumerPath).ApisV1alpha1().APIBindings().Delete(ctx, "cowboys", metav1.DeleteOptions{}) + return err == nil, fmt.Sprintf("Error deleting APIBinding: %v", err) + }, wait.ForeverTestTimeout, time.Millisecond*500) + } + + t.Logf("Check that APIExportEndpointSlices has 1 virtual workspaces") + { + framework.Eventually(t, func() (bool, string) { + kcpClusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to construct kcp cluster client for server") + + slice, err := kcpClusterClient.Cluster(providerPath).ApisV1alpha1().APIExportEndpointSlices().Get(ctx, "shared-cowboys", metav1.GetOptions{}) + if len(slice.Status.APIExportEndpoints) == 1 { + return true, "" + } + return false, fmt.Sprintf("APIExportEndpointSlice has %d endpoints: %v", len(slice.Status.APIExportEndpoints), err) + }, wait.ForeverTestTimeout*50, time.Millisecond*500) + } } diff --git a/test/e2e/reconciler/apiexportendpointslice/apiresourceschema_cowboys.yaml b/test/e2e/reconciler/apiexportendpointslice/apiresourceschema_cowboys.yaml new file mode 100644 index 00000000000..b65c76e0175 --- /dev/null +++ b/test/e2e/reconciler/apiexportendpointslice/apiresourceschema_cowboys.yaml @@ -0,0 +1,46 @@ +apiVersion: apis.kcp.io/v1alpha1 +kind: APIResourceSchema +metadata: + name: today.cowboys.wildwest.dev +spec: + group: wildwest.dev + names: + kind: Cowboy + listKind: CowboyList + plural: cowboys + singular: cowboy + scope: Namespaced + versions: + - name: v1alpha1 + schema: + description: Cowboy is part of the wild west + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: CowboySpec holds the desired state of the Cowboy. + properties: + intent: + type: string + type: object + status: + description: CowboyStatus communicates the observed state of the Cowboy. + properties: + result: + type: string + type: object + type: object + served: true + storage: true + subresources: + status: {} \ No newline at end of file