Skip to content

Commit

Permalink
Merge pull request #3273 from sttts/sttts-mount-fix-index
Browse files Browse the repository at this point in the history
🐛 index: fix how error codes are stored for unavailable workspaces
  • Loading branch information
kcp-ci-bot authored Jan 26, 2025
2 parents a1b3400 + e3ec087 commit 6663f2e
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 71 deletions.
83 changes: 41 additions & 42 deletions pkg/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ func New(rewriters []PathRewriter) *State {
return &State{
rewriters: rewriters,

clusterShards: map[logicalcluster.Name]string{},
shardWorkspaceNameCluster: map[string]map[logicalcluster.Name]map[string]logicalcluster.Name{},
shardWorkspaceName: map[string]map[logicalcluster.Name]string{},
shardClusterParentCluster: map[string]map[logicalcluster.Name]logicalcluster.Name{},
shardBaseURLs: map[string]string{},
clusterShards: map[logicalcluster.Name]string{},
shardClusterWorkspaceNameCluster: map[string]map[logicalcluster.Name]map[string]logicalcluster.Name{},
shardClusterWorkspaceName: map[string]map[logicalcluster.Name]string{},
shardClusterParentCluster: map[string]map[logicalcluster.Name]logicalcluster.Name{},
shardBaseURLs: map[string]string{},
// Experimental feature: allow mounts to be used with Workspaces
// structure: (shard, logical cluster, workspace name) -> string serialized mount objects
// This should be simplified once we promote this to workspace structure.
Expand All @@ -75,12 +75,12 @@ func New(rewriters []PathRewriter) *State {
type State struct {
rewriters []PathRewriter

lock sync.RWMutex
clusterShards map[logicalcluster.Name]string // logical cluster -> shard name
shardWorkspaceNameCluster map[string]map[logicalcluster.Name]map[string]logicalcluster.Name // (shard name, logical cluster, workspace name) -> logical cluster
shardWorkspaceName map[string]map[logicalcluster.Name]string // (shard name, logical cluster) -> workspace name
shardClusterParentCluster map[string]map[logicalcluster.Name]logicalcluster.Name // (shard name, logical cluster) -> parent logical cluster
shardBaseURLs map[string]string // shard name -> base URL
lock sync.RWMutex
clusterShards map[logicalcluster.Name]string // logical cluster -> shard name
shardClusterWorkspaceNameCluster map[string]map[logicalcluster.Name]map[string]logicalcluster.Name // (shard name, logical cluster, workspace name) -> logical cluster
shardClusterWorkspaceName map[string]map[logicalcluster.Name]string // (shard name, logical cluster) -> workspace name
shardClusterParentCluster map[string]map[logicalcluster.Name]logicalcluster.Name // (shard name, logical cluster) -> parent logical cluster
shardBaseURLs map[string]string // shard name -> base URL
// Experimental feature: allow mounts to be used with Workspaces
shardClusterWorkspaceMountAnnotation map[string]map[logicalcluster.Name]map[string]string // (shard name, logical cluster, workspace name) -> mount object string

Expand All @@ -103,29 +103,29 @@ func (c *State) UpsertWorkspace(shard string, ws *tenancyv1alpha1.Workspace) {
if c.shardClusterWorkspaceNameErrorCode[shard] == nil {
c.shardClusterWorkspaceNameErrorCode[shard] = map[logicalcluster.Name]map[string]int{}
}
if c.shardClusterWorkspaceNameErrorCode[shard][logicalcluster.Name(ws.Spec.Cluster)] == nil {
c.shardClusterWorkspaceNameErrorCode[shard][logicalcluster.Name(ws.Spec.Cluster)] = map[string]int{}
if c.shardClusterWorkspaceNameErrorCode[shard][clusterName] == nil {
c.shardClusterWorkspaceNameErrorCode[shard][clusterName] = map[string]int{}
}
// Unavailable workspaces should return 503
c.shardClusterWorkspaceNameErrorCode[shard][logicalcluster.Name(ws.Spec.Cluster)][ws.Name] = 503
c.shardClusterWorkspaceNameErrorCode[shard][clusterName][ws.Name] = 503
} else {
delete(c.shardClusterWorkspaceNameErrorCode[shard][logicalcluster.Name(ws.Spec.Cluster)], ws.Name)
if len(c.shardClusterWorkspaceNameErrorCode[shard][logicalcluster.Name(ws.Spec.Cluster)]) == 0 {
delete(c.shardClusterWorkspaceNameErrorCode[shard], logicalcluster.Name(ws.Spec.Cluster))
delete(c.shardClusterWorkspaceNameErrorCode[shard][clusterName], ws.Name)
if len(c.shardClusterWorkspaceNameErrorCode[shard][clusterName]) == 0 {
delete(c.shardClusterWorkspaceNameErrorCode[shard], clusterName)
}
}

if cluster := c.shardWorkspaceNameCluster[shard][clusterName][ws.Name]; cluster.String() != ws.Spec.Cluster {
if c.shardWorkspaceNameCluster[shard] == nil {
c.shardWorkspaceNameCluster[shard] = map[logicalcluster.Name]map[string]logicalcluster.Name{}
c.shardWorkspaceName[shard] = map[logicalcluster.Name]string{}
if cluster := c.shardClusterWorkspaceNameCluster[shard][clusterName][ws.Name]; cluster.String() != ws.Spec.Cluster {
if c.shardClusterWorkspaceNameCluster[shard] == nil {
c.shardClusterWorkspaceNameCluster[shard] = map[logicalcluster.Name]map[string]logicalcluster.Name{}
c.shardClusterWorkspaceName[shard] = map[logicalcluster.Name]string{}
c.shardClusterParentCluster[shard] = map[logicalcluster.Name]logicalcluster.Name{}
}
if c.shardWorkspaceNameCluster[shard][clusterName] == nil {
c.shardWorkspaceNameCluster[shard][clusterName] = map[string]logicalcluster.Name{}
if c.shardClusterWorkspaceNameCluster[shard][clusterName] == nil {
c.shardClusterWorkspaceNameCluster[shard][clusterName] = map[string]logicalcluster.Name{}
}
c.shardWorkspaceNameCluster[shard][clusterName][ws.Name] = logicalcluster.Name(ws.Spec.Cluster)
c.shardWorkspaceName[shard][logicalcluster.Name(ws.Spec.Cluster)] = ws.Name
c.shardClusterWorkspaceNameCluster[shard][clusterName][ws.Name] = logicalcluster.Name(ws.Spec.Cluster)
c.shardClusterWorkspaceName[shard][logicalcluster.Name(ws.Spec.Cluster)] = ws.Name
c.shardClusterParentCluster[shard][logicalcluster.Name(ws.Spec.Cluster)] = clusterName
}

Expand All @@ -144,7 +144,7 @@ func (c *State) DeleteWorkspace(shard string, ws *tenancyv1alpha1.Workspace) {
clusterName := logicalcluster.From(ws)

c.lock.RLock()
_, foundCluster := c.shardWorkspaceNameCluster[shard][clusterName][ws.Name]
_, foundCluster := c.shardClusterWorkspaceNameCluster[shard][clusterName][ws.Name]
_, foundMount := c.shardClusterWorkspaceMountAnnotation[shard][clusterName][ws.Name]
c.lock.RUnlock()

Expand All @@ -155,18 +155,18 @@ func (c *State) DeleteWorkspace(shard string, ws *tenancyv1alpha1.Workspace) {
c.lock.Lock()
defer c.lock.Unlock()

if _, foundCluster = c.shardWorkspaceNameCluster[shard][clusterName][ws.Name]; foundCluster {
delete(c.shardWorkspaceNameCluster[shard][clusterName], ws.Name)
if len(c.shardWorkspaceNameCluster[shard][clusterName]) == 0 {
delete(c.shardWorkspaceNameCluster[shard], clusterName)
if _, foundCluster = c.shardClusterWorkspaceNameCluster[shard][clusterName][ws.Name]; foundCluster {
delete(c.shardClusterWorkspaceNameCluster[shard][clusterName], ws.Name)
if len(c.shardClusterWorkspaceNameCluster[shard][clusterName]) == 0 {
delete(c.shardClusterWorkspaceNameCluster[shard], clusterName)
}
if len(c.shardWorkspaceNameCluster[shard]) == 0 {
delete(c.shardWorkspaceNameCluster, shard)
if len(c.shardClusterWorkspaceNameCluster[shard]) == 0 {
delete(c.shardClusterWorkspaceNameCluster, shard)
}

delete(c.shardWorkspaceName[shard], logicalcluster.Name(ws.Spec.Cluster))
if len(c.shardWorkspaceName[shard]) == 0 {
delete(c.shardWorkspaceName, shard)
delete(c.shardClusterWorkspaceName[shard], logicalcluster.Name(ws.Spec.Cluster))
if len(c.shardClusterWorkspaceName[shard]) == 0 {
delete(c.shardClusterWorkspaceName, shard)
}

delete(c.shardClusterParentCluster[shard], logicalcluster.Name(ws.Spec.Cluster))
Expand Down Expand Up @@ -238,9 +238,9 @@ func (c *State) DeleteShard(shardName string) {
delete(c.clusterShards, lc)
}
}
delete(c.shardWorkspaceNameCluster, shardName)
delete(c.shardClusterWorkspaceNameCluster, shardName)
delete(c.shardBaseURLs, shardName)
delete(c.shardWorkspaceName, shardName)
delete(c.shardClusterWorkspaceName, shardName)
delete(c.shardClusterParentCluster, shardName)
delete(c.shardClusterWorkspaceNameErrorCode, shardName)
}
Expand Down Expand Up @@ -285,19 +285,18 @@ func (c *State) Lookup(path logicalcluster.Path) (Result, bool) {
}
}

if ec, found := c.shardClusterWorkspaceNameErrorCode[shard][cluster][s]; found {
errorCode = ec
}
var found bool
cluster, found = c.shardWorkspaceNameCluster[shard][cluster][s]
cluster, found = c.shardClusterWorkspaceNameCluster[shard][cluster][s]
if !found {
return Result{}, false
}
shard, found = c.clusterShards[cluster]
if !found {
return Result{}, false
}
ec, found := c.shardClusterWorkspaceNameErrorCode[shard][cluster][s]
if found {
errorCode = ec
}
}
return Result{Shard: shard, Cluster: cluster, ErrorCode: errorCode}, true
}
Expand Down
44 changes: 43 additions & 1 deletion pkg/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestLookup(t *testing.T) {
expectedCluster logicalcluster.Name
expectFound bool
expectedURL string
expectedError int
}{
{
name: "an empty indexer is usable",
Expand Down Expand Up @@ -228,7 +229,7 @@ func TestLookup(t *testing.T) {
expectFound: false,
},
{
name: "multiple shards: a logical cluster for one:rh workspace is found",
name: "multiple shards: one:rh workspace is a mount with URL",
initialShardsToUpsert: []shardStub{
{
name: "root",
Expand Down Expand Up @@ -260,6 +261,39 @@ func TestLookup(t *testing.T) {
expectedShard: "",
expectedURL: "https://kcp.dev.local/services/custom-url/proxy",
},
{
name: "multiple shards: one:rh workspace is a mount, but phase is Unavailable",
initialShardsToUpsert: []shardStub{
{
name: "root",
url: "https://root.kcp.dev",
},
{
name: "beta",
url: "https://beta.kcp.dev",
},
{
name: "gama",
url: "https://gama.kcp.dev",
},
},
initialWorkspacesToUpsert: map[string][]*tenancyv1alpha1.Workspace{
"root": {newWorkspace("org", "root", "one")},
"beta": {withPhase(newWorkspaceWithAnnotation("rh", "one", "two", map[string]string{
"experimental.tenancy.kcp.io/mount": `{"spec":{"ref":{"kind":"KubeCluster","name":"prod-cluster","apiVersion":"proxy.kcp.dev/v1alpha1"}},"status":{"phase":"Ready","url":"https://kcp.dev.local/services/custom-url/proxy"}}`,
}), corev1alpha1.LogicalClusterPhaseUnavailable)},
},
initialLogicalClustersToUpsert: map[string][]*corev1alpha1.LogicalCluster{
"root": {newLogicalCluster("root")},
"beta": {newLogicalCluster("one")},
"gama": {newLogicalCluster("two")},
},
targetPath: logicalcluster.NewPath("one:rh"),
expectFound: true,
expectedCluster: "",
expectedShard: "",
expectedURL: "https://kcp.dev.local/services/custom-url/proxy",
},
}

for _, scenario := range scenarios {
Expand Down Expand Up @@ -298,6 +332,9 @@ func TestLookup(t *testing.T) {
if r.URL != scenario.expectedURL {
t.Errorf("unexpected url = %v, for path = %v, expected = %v", r.URL, scenario.targetPath, scenario.expectedURL)
}
if r.ErrorCode != scenario.expectedError {
t.Errorf("unexpected error code = %v, for path = %v, expected = %v", r.ErrorCode, scenario.targetPath, scenario.expectedError)
}
})
}
}
Expand Down Expand Up @@ -503,6 +540,11 @@ func newWorkspaceWithAnnotation(name, cluster, scheduledCluster string, annotati
return ws
}

func withPhase(ws *tenancyv1alpha1.Workspace, phase corev1alpha1.LogicalClusterPhaseType) *tenancyv1alpha1.Workspace {
ws.Status.Phase = phase
return ws
}

func newLogicalCluster(cluster string) *corev1alpha1.LogicalCluster {
return &corev1alpha1.LogicalCluster{
ObjectMeta: metav1.ObjectMeta{Name: "cluster", Annotations: map[string]string{"kcp.io/cluster": cluster}},
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/authorizer/serviceaccounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestServiceAccounts(t *testing.T) {
})

t.Run("Access another workspace in the same org", func(t *testing.T) {
t.Log("Create namespace with the same name ")
t.Log("Create workspace with the same name ")
otherPath, _ := framework.NewWorkspaceFixture(t, server, orgPath)
_, err := kubeClusterClient.Cluster(otherPath).CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Expand Down
53 changes: 26 additions & 27 deletions test/e2e/mounts/mounts_machinery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func TestMountsMachinery(t *testing.T) {

orgPath, _ := framework.NewOrganizationFixture(t, server)

workspaceName := "mounts-machinery"
mountPath, _ := framework.NewWorkspaceFixture(t, server, orgPath, framework.WithName("%s", workspaceName))
mountWorkspaceName := "mounts-machinery"
mountPath, _ := framework.NewWorkspaceFixture(t, server, orgPath, framework.WithName("%s", mountWorkspaceName))

cfg := server.BaseConfig(t)
kcpClusterClient, err := kcpclientset.NewForConfig(cfg)
Expand All @@ -90,34 +90,33 @@ func TestMountsMachinery(t *testing.T) {
require.NoError(t, err)
return crdhelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established), yamlMarshal(t, crd)
}, wait.ForeverTestTimeout, 100*time.Millisecond, "waiting for CRD to be established")
require.NoError(t, err)

t.Logf("Install a mount object into workspace %q", orgPath)
framework.Eventually(t, func() (bool, string) {
mapper2 := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(orgProviderKCPClient.Cluster(orgPath).Discovery()))
err = helpers.CreateResourceFromFS(ctx, dynamicClusterClient.Cluster(orgPath), mapper2, nil, "kubecluster_mounts.yaml", testFiles)
mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(orgProviderKCPClient.Cluster(orgPath).Discovery()))
err = helpers.CreateResourceFromFS(ctx, dynamicClusterClient.Cluster(orgPath), mapper, nil, "kubecluster_mounts.yaml", testFiles)
return err == nil, fmt.Sprintf("%v", err)
}, wait.ForeverTestTimeout, 100*time.Millisecond, "waiting for mount object to be installed")

// At this point we have object backing the mount object. So lets add mount annotation to the workspace.
t.Logf("Set a mount annotation into workspace %q", orgPath)
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
current, err := kcpClusterClient.Cluster(orgPath).TenancyV1alpha1().Workspaces().Get(ctx, workspaceName, metav1.GetOptions{})
require.NoError(t, err)

mount := tenancyv1alpha1.Mount{
MountSpec: tenancyv1alpha1.MountSpec{
Reference: &tenancyv1alpha1.ObjectReference{
APIVersion: "contrib.kcp.io/v1alpha1",
Kind: "KubeCluster",
Name: "proxy-cluster", // must match name in kubecluster_mounts.yaml
},
// But order should not matter.
t.Logf("Set a mount annotation onto workspace %q", orgPath)
mount := tenancyv1alpha1.Mount{
MountSpec: tenancyv1alpha1.MountSpec{
Reference: &tenancyv1alpha1.ObjectReference{
APIVersion: "contrib.kcp.io/v1alpha1",
Kind: "KubeCluster",
Name: "proxy-cluster", // must match name in kubecluster_mounts.yaml
},
}
data, err := json.Marshal(mount)
},
}
annValue, err := json.Marshal(mount)
require.NoError(t, err)
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
current, err := kcpClusterClient.Cluster(orgPath).TenancyV1alpha1().Workspaces().Get(ctx, mountWorkspaceName, metav1.GetOptions{})
require.NoError(t, err)

current.Annotations[tenancyv1alpha1.ExperimentalWorkspaceMountAnnotationKey] = string(data)
current.Annotations[tenancyv1alpha1.ExperimentalWorkspaceMountAnnotationKey] = string(annValue)

_, err = kcpClusterClient.Cluster(orgPath).TenancyV1alpha1().Workspaces().Update(ctx, current, metav1.UpdateOptions{})
return err
Expand Down Expand Up @@ -147,14 +146,14 @@ func TestMountsMachinery(t *testing.T) {
"phase": "Ready",
}

_, err = dynamicClusterClient.Cluster(orgPath).Resource(mountGVR).Namespace("").UpdateStatus(ctx, currentMount, metav1.UpdateOptions{})
_, err = dynamicClusterClient.Cluster(orgPath).Resource(mountGVR).UpdateStatus(ctx, currentMount, metav1.UpdateOptions{})
return err
})
require.NoError(t, err)

t.Log("Workspace should have WorkspaceMountReady and WorkspaceInitialized conditions, both true")
framework.Eventually(t, func() (bool, string) {
current, err := kcpClusterClient.Cluster(orgPath).TenancyV1alpha1().Workspaces().Get(ctx, workspaceName, metav1.GetOptions{})
current, err := kcpClusterClient.Cluster(orgPath).TenancyV1alpha1().Workspaces().Get(ctx, mountWorkspaceName, metav1.GetOptions{})
if err != nil {
return false, err.Error()
}
Expand All @@ -163,11 +162,12 @@ func TestMountsMachinery(t *testing.T) {
ready := conditions.IsTrue(current, tenancyv1alpha1.MountConditionReady)
return initialized && ready, yamlMarshal(t, current)
}, wait.ForeverTestTimeout, 100*time.Millisecond, "waiting for WorkspaceMountReady and WorkspaceInitialized conditions to be true")
require.NoError(t, err)

t.Log("Workspace access should work")
_, err = kcpClusterClient.Cluster(mountPath).ApisV1alpha1().APIExports().List(ctx, metav1.ListOptions{})
require.NoError(t, err)
framework.Eventually(t, func() (bool, string) {
_, err := kcpClusterClient.Cluster(mountPath).ApisV1alpha1().APIExports().List(ctx, metav1.ListOptions{})
return err == nil, fmt.Sprintf("err = %v", err)
}, wait.ForeverTestTimeout, 100*time.Millisecond, "waiting for workspace access to work")

t.Log("Set mount to not ready")
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
Expand All @@ -182,11 +182,10 @@ func TestMountsMachinery(t *testing.T) {

t.Log("Workspace phase should become unavailable")
framework.Eventually(t, func() (bool, string) {
current, err := kcpClusterClient.Cluster(orgPath).TenancyV1alpha1().Workspaces().Get(ctx, workspaceName, metav1.GetOptions{})
current, err := kcpClusterClient.Cluster(orgPath).TenancyV1alpha1().Workspaces().Get(ctx, mountWorkspaceName, metav1.GetOptions{})
require.NoError(t, err)
return current.Status.Phase == corev1alpha1.LogicalClusterPhaseUnavailable, yamlMarshal(t, current)
}, wait.ForeverTestTimeout, 100*time.Millisecond, "waiting for workspace to become unavailable")
require.NoError(t, err)

t.Logf("Workspace access should eventually fail")
framework.Eventually(t, func() (bool, string) {
Expand Down

0 comments on commit 6663f2e

Please sign in to comment.