diff --git a/src/Kinds/K8sReplicaSet.php b/src/Kinds/K8sReplicaSet.php new file mode 100644 index 0000000..6209707 --- /dev/null +++ b/src/Kinds/K8sReplicaSet.php @@ -0,0 +1,111 @@ +customPodsSelector()) { + return $podsSelector; + } + + return [ + 'replicaset-name' => $this->getName(), + ]; + } + + /** + * Get the available replicas. + * + * @return int + */ + public function getAvailableReplicasCount(): int + { + return $this->getStatus('availableReplicas', 0); + } + + /** + * Get the ready replicas. + * + * @return int + */ + public function getReadyReplicasCount(): int + { + return $this->getStatus('readyReplicas', 0); + } + + /** + * Get the fully labeled replicas. + * + * @return int + */ + public function getFullyLabeledReplicasCount(): int + { + return $this->getStatus('fullyLabeledReplicas', 0); + } + + /** + * Get the total desired replicas. + * + * @return int + */ + public function getDesiredReplicasCount(): int + { + return $this->getStatus('replicas', 0); + } +} diff --git a/src/Kinds/K8sScale.php b/src/Kinds/K8sScale.php index f5fee74..68bfed7 100644 --- a/src/Kinds/K8sScale.php +++ b/src/Kinds/K8sScale.php @@ -87,4 +87,55 @@ public function refreshOriginal(array $query = ['pretty' => 1]) return parent::refreshOriginal($query); } + + /** + * Create the scale resource. + * Scale subresources should use replace (PUT) operations, not create (POST). + * Scale subresources don't support POST, so we use PUT to the scale subresource path. + * + * @param array $query + * @return \RenokiCo\PhpK8s\Kinds\K8sResource + * + * @throws \RenokiCo\PhpK8s\Exceptions\KubernetesAPIException + */ + public function create(array $query = ['pretty' => 1]) + { + return $this->cluster + ->setResourceClass(get_class($this)) + ->runOperation( + \RenokiCo\PhpK8s\KubernetesCluster::REPLACE_OP, + $this->resourcePath(), + $this->toJsonPayload(), + $query + ); + } + + /** + * Update the scale resource. + * This is the correct operation for scale subresources. + * Scale is updated via PUT to the scale subresource path. + * + * @param array $query + * @return bool + * + * @throws \RenokiCo\PhpK8s\Exceptions\KubernetesAPIException + */ + public function update(array $query = ['pretty' => 1]): bool + { + $this->refreshOriginal(); + $this->refreshResourceVersion(); + + $instance = $this->cluster + ->setResourceClass(get_class($this)) + ->runOperation( + \RenokiCo\PhpK8s\KubernetesCluster::REPLACE_OP, + $this->resourcePath(), + $this->toJsonPayload(), + $query + ); + + $this->syncWith($instance->toArray()); + + return true; + } } diff --git a/src/KubernetesCluster.php b/src/KubernetesCluster.php index 43a6979..18eeb78 100644 --- a/src/KubernetesCluster.php +++ b/src/KubernetesCluster.php @@ -58,6 +58,10 @@ * @method \RenokiCo\PhpK8s\Kinds\K8sDeployment getDeploymentByName(string $name, string $namespace = 'default', array $query = ['pretty' => 1]) * @method \RenokiCo\PhpK8s\ResourcesList getAllDeploymentsFromAllNamespaces(array $query = ['pretty' => 1]) * @method \RenokiCo\PhpK8s\ResourcesList getAllDeployments(string $namespace = 'default', array $query = ['pretty' => 1]) + * @method \RenokiCo\PhpK8s\Kinds\K8sReplicaSet replicaSet(array $attributes = []) + * @method \RenokiCo\PhpK8s\Kinds\K8sReplicaSet getReplicaSetByName(string $name, string $namespace = 'default', array $query = ['pretty' => 1]) + * @method \RenokiCo\PhpK8s\ResourcesList getAllReplicaSetsFromAllNamespaces(array $query = ['pretty' => 1]) + * @method \RenokiCo\PhpK8s\ResourcesList getAllReplicaSets(string $namespace = 'default', array $query = ['pretty' => 1]) * @method \RenokiCo\PhpK8s\Kinds\K8sJob job(array $attributes = []) * @method \RenokiCo\PhpK8s\Kinds\K8sJob getJobByName(string $name, string $namespace = 'default', array $query = ['pretty' => 1]) * @method \RenokiCo\PhpK8s\ResourcesList getAllJobsFromAllNamespaces(array $query = ['pretty' => 1]) @@ -251,27 +255,76 @@ protected function watchPath(string $path, Closure $callback, array $query = ['p { $resourceClass = $this->resourceClass; $sock = $this->createSocketConnection($this->getCallableUrl($path, $query)); - $data = null; - while (($data = fgets($sock)) == true) { - $data = @json_decode($data, true); + if ($sock === false) { + return null; + } + + // Set stream to non-blocking mode to allow timeout handling + stream_set_blocking($sock, false); - ['type' => $type, 'object' => $attributes] = $data; + // Calculate overall timeout: server timeout + buffer for network/processing + $timeout = ($query['timeoutSeconds'] ?? 30) + 5; + $endTime = time() + $timeout; - $call = call_user_func( - $callback, - $type, - new $resourceClass($this, $attributes) - ); + $buffer = ''; - if (! is_null($call)) { + while (time() < $endTime) { + // Try to read data (non-blocking) + $chunk = fread($sock, 8192); + + if ($chunk === false) { + // Error occurred fclose($sock); + return null; + } + + if ($chunk === '') { + // No data available, check if stream ended + if (feof($sock)) { + break; + } + + // No data yet, sleep briefly and continue + usleep(100000); // 100ms + continue; + } + + // Append chunk to buffer + $buffer .= $chunk; + + // Process complete lines from buffer + while (($pos = strpos($buffer, "\n")) !== false) { + $line = substr($buffer, 0, $pos); + $buffer = substr($buffer, $pos + 1); + + if (trim($line) === '') { + continue; + } + + $data = @json_decode($line, true); + + if (!$data || !isset($data['type'], $data['object'])) { + continue; + } + + ['type' => $type, 'object' => $attributes] = $data; - unset($data); + $call = call_user_func( + $callback, + $type, + new $resourceClass($this, $attributes) + ); - return $call; + if (! is_null($call)) { + fclose($sock); + return $call; + } } } + + fclose($sock); + return null; } /** @@ -286,19 +339,59 @@ protected function watchLogsPath(string $path, Closure $callback, array $query = { $sock = $this->createSocketConnection($this->getCallableUrl($path, $query)); - $data = null; + if ($sock === false) { + return null; + } + + // Set stream to non-blocking mode to allow timeout handling + stream_set_blocking($sock, false); - while (($data = fgets($sock)) == true) { - $call = call_user_func($callback, $data); + // Calculate overall timeout: server timeout + buffer for network/processing + $timeout = ($query['timeoutSeconds'] ?? 30) + 5; + $endTime = time() + $timeout; - if (! is_null($call)) { + $buffer = ''; + + while (time() < $endTime) { + // Try to read data (non-blocking) + $chunk = fread($sock, 8192); + + if ($chunk === false) { + // Error occurred fclose($sock); + return null; + } - unset($data); + if ($chunk === '') { + // No data available, check if stream ended + if (feof($sock)) { + break; + } - return $call; + // No data yet, sleep briefly and continue + usleep(100000); // 100ms + continue; + } + + // Append chunk to buffer + $buffer .= $chunk; + + // Process complete lines from buffer + while (($pos = strpos($buffer, "\n")) !== false) { + $line = substr($buffer, 0, $pos); + $buffer = substr($buffer, $pos + 1); + + $call = call_user_func($callback, $line . "\n"); + + if (! is_null($call)) { + fclose($sock); + return $call; + } } } + + fclose($sock); + return null; } /** diff --git a/src/Traits/InitializesResources.php b/src/Traits/InitializesResources.php index 274be9c..dce8325 100644 --- a/src/Traits/InitializesResources.php +++ b/src/Traits/InitializesResources.php @@ -23,6 +23,7 @@ use RenokiCo\PhpK8s\Kinds\K8sPod; use RenokiCo\PhpK8s\Kinds\K8sPodDisruptionBudget; use RenokiCo\PhpK8s\Kinds\K8sPriorityClass; +use RenokiCo\PhpK8s\Kinds\K8sReplicaSet; use RenokiCo\PhpK8s\Kinds\K8sResourceQuota; use RenokiCo\PhpK8s\Kinds\K8sRole; use RenokiCo\PhpK8s\Kinds\K8sRoleBinding; @@ -192,6 +193,18 @@ public static function deployment($cluster = null, array $attributes = []) return new K8sDeployment($cluster, $attributes); } + /** + * Create a new ReplicaSet kind. + * + * @param \RenokiCo\PhpK8s\KubernetesCluster|null $cluster + * @param array $attributes + * @return \RenokiCo\PhpK8s\Kinds\K8sReplicaSet + */ + public static function replicaSet($cluster = null, array $attributes = []) + { + return new K8sReplicaSet($cluster, $attributes); + } + /** * Create a new Job kind. * diff --git a/tests/DeploymentTest.php b/tests/DeploymentTest.php index 62984c3..bcae669 100644 --- a/tests/DeploymentTest.php +++ b/tests/DeploymentTest.php @@ -10,6 +10,42 @@ class DeploymentTest extends TestCase { + protected function tearDown(): void + { + // Clean up deployment and HPA if they exist + try { + $dep = $this->cluster->getDeploymentByName('mariadb'); + if ($dep->exists()) { + $dep->delete(); + + $timeout = 30; + $start = time(); + while ($dep->exists() && (time() - $start < $timeout)) { + sleep(1); + } + } + } catch (\Exception $e) { + // Deployment doesn't exist, that's fine + } + + try { + $hpa = $this->cluster->getHorizontalPodAutoscalerByName('deploy-mariadb'); + if ($hpa->exists()) { + $hpa->delete(); + + $timeout = 30; + $start = time(); + while ($hpa->exists() && (time() - $start < $timeout)) { + sleep(1); + } + } + } catch (\Exception $e) { + // HPA doesn't exist, that's fine + } + + parent::tearDown(); + } + public function test_deployment_build() { $mariadb = $this->createMariadbContainer(); @@ -233,15 +269,32 @@ public function runDeletionTests() $this->assertTrue($dep->delete()); $this->assertTrue($hpa->delete()); + $timeout = 60; // 60 second timeout + $start = time(); + while ($hpa->exists()) { + if (time() - $start > $timeout) { + $this->fail('Timeout waiting for HPA to be deleted'); + } sleep(1); } + $start = time(); while ($dep->exists()) { + if (time() - $start > $timeout) { + $this->fail('Timeout waiting for Deployment to be deleted'); + } sleep(1); } + $start = time(); while ($dep->getPods()->count() > 0) { + if (time() - $start > $timeout) { + $this->fail(sprintf( + 'Timeout waiting for Deployment pods to be deleted. Remaining: %d', + $dep->getPods()->count() + )); + } sleep(1); } @@ -277,7 +330,17 @@ public function runScalingTests() $scaler = $dep->scale(2); + $timeout = 60; // 60 second timeout + $start = time(); + while ($dep->getReadyReplicasCount() < 2 || $scaler->getReplicas() < 2) { + if (time() - $start > $timeout) { + $this->fail(sprintf( + 'Timeout waiting for deployment to scale to 2. Current state: ready=%d, scaler=%d', + $dep->getReadyReplicasCount(), + $scaler->getReplicas() + )); + } $scaler->refresh(); $dep->refresh(); sleep(1); diff --git a/tests/ReplicaSetTest.php b/tests/ReplicaSetTest.php new file mode 100644 index 0000000..4a9447b --- /dev/null +++ b/tests/ReplicaSetTest.php @@ -0,0 +1,269 @@ +createMariadbContainer(); + + $pod = $this->cluster->pod() + ->setName('mariadb') + ->setContainers([$mariadb]); + + $rs = $this->cluster->replicaSet() + ->setName('mariadb-rs') + ->setLabels(['tier' => 'backend-rs']) + ->setAnnotations(['mariadb/annotation' => 'yes']) + ->setReplicas(3) + ->setTemplate($pod); + + $this->assertEquals('apps/v1', $rs->getApiVersion()); + $this->assertEquals('mariadb-rs', $rs->getName()); + $this->assertEquals(['tier' => 'backend-rs'], $rs->getLabels()); + $this->assertEquals(['mariadb/annotation' => 'yes'], $rs->getAnnotations()); + $this->assertEquals(3, $rs->getReplicas()); + $this->assertEquals($pod->getName(), $rs->getTemplate()->getName()); + + $this->assertInstanceOf(K8sPod::class, $rs->getTemplate()); + } + + public function test_replica_set_from_yaml() + { + $mariadb = $this->createMariadbContainer(); + + $pod = $this->cluster->pod() + ->setName('mariadb') + ->setContainers([$mariadb]); + + $rs = $this->cluster->fromYamlFile(__DIR__.'/yaml/replicaset.yaml'); + + $this->assertEquals('apps/v1', $rs->getApiVersion()); + $this->assertEquals('mariadb-rs', $rs->getName()); + $this->assertEquals(['tier' => 'backend-rs'], $rs->getLabels()); + $this->assertEquals(['mariadb/annotation' => 'yes'], $rs->getAnnotations()); + $this->assertEquals(3, $rs->getReplicas()); + $this->assertEquals($pod->getName(), $rs->getTemplate()->getName()); + + $this->assertInstanceOf(K8sPod::class, $rs->getTemplate()); + } + + public function test_replica_set_api_interaction() + { + $this->runCreationTests(); + $this->runGetAllTests(); + $this->runGetTests(); + $this->runScalingTests(); + $this->runUpdateTests(); + $this->runWatchAllTests(); + $this->runWatchTests(); + $this->runDeletionTests(); + } + + public function runCreationTests() + { + $mariadb = $this->createMariadbContainer([ + 'includeEnv' => true, + 'additionalPort' => 3307 + ]); + + $pod = $this->createMariadbPod([ + 'labels' => ['app' => 'mariadb-rs', 'replicaset-name' => 'mariadb-rs'], + 'container' => [ + 'includeEnv' => true, + 'additionalPort' => 3307 + ] + ]) + ->setAnnotations(['mariadb/annotation' => 'yes']); + + $rs = $this->cluster->replicaSet() + ->setName('mariadb-rs') + ->setLabels(['tier' => 'backend-rs']) + ->setAnnotations(['mariadb/annotation' => 'yes']) + ->setSelectors(['matchLabels' => ['app' => 'mariadb-rs']]) + ->setReplicas(1) + ->setTemplate($pod); + + $this->assertFalse($rs->isSynced()); + $this->assertFalse($rs->exists()); + + $rs = $rs->createOrUpdate(); + + $this->assertTrue($rs->isSynced()); + $this->assertTrue($rs->exists()); + + $this->assertInstanceOf(K8sReplicaSet::class, $rs); + + $this->assertEquals('apps/v1', $rs->getApiVersion()); + $this->assertEquals('mariadb-rs', $rs->getName()); + $this->assertEquals(['tier' => 'backend-rs'], $rs->getLabels()); + $this->assertEquals(['mariadb/annotation' => 'yes'], $rs->getAnnotations()); + $this->assertEquals(1, $rs->getReplicas()); + $this->assertEquals($pod->getName(), $rs->getTemplate()->getName()); + + $this->assertInstanceOf(K8sPod::class, $rs->getTemplate()); + + while (! $rs->allPodsAreRunning()) { + sleep(1); + } + + K8sReplicaSet::selectPods(function ($rs) { + $this->assertInstanceOf(K8sReplicaSet::class, $rs); + + return ['app' => 'mariadb-rs']; + }); + + $pods = $rs->getPods(); + $this->assertTrue($pods->count() > 0); + + K8sReplicaSet::resetPodsSelector(); + + $pods = $rs->getPods(); + $this->assertTrue($pods->count() > 0); + + foreach ($pods as $pod) { + $this->assertInstanceOf(K8sPod::class, $pod); + } + + $rs->refresh(); + + while ($rs->getReadyReplicasCount() === 0) { + sleep(1); + $rs->refresh(); + } + + $this->assertEquals(1, $rs->getAvailableReplicasCount()); + $this->assertEquals(1, $rs->getReadyReplicasCount()); + $this->assertEquals(1, $rs->getDesiredReplicasCount()); + $this->assertEquals(1, $rs->getFullyLabeledReplicasCount()); + + $this->assertTrue(is_array($rs->getConditions())); + } + + public function runGetAllTests() + { + $replicaSets = $this->cluster->getAllReplicaSets(); + + $this->assertInstanceOf(ResourcesList::class, $replicaSets); + + foreach ($replicaSets as $rs) { + $this->assertInstanceOf(K8sReplicaSet::class, $rs); + + $this->assertNotNull($rs->getName()); + } + } + + public function runGetTests() + { + $rs = $this->cluster->getReplicaSetByName('mariadb-rs'); + + $this->assertInstanceOf(K8sReplicaSet::class, $rs); + + $this->assertTrue($rs->isSynced()); + + $this->assertEquals('apps/v1', $rs->getApiVersion()); + $this->assertEquals('mariadb-rs', $rs->getName()); + $this->assertEquals(['tier' => 'backend-rs'], $rs->getLabels()); + $this->assertEquals(['mariadb/annotation' => 'yes'], $rs->getAnnotations()); + $this->assertEquals(1, $rs->getReplicas()); + + $this->assertInstanceOf(K8sPod::class, $rs->getTemplate()); + } + + public function runScalingTests() + { + $rs = $this->cluster->getReplicaSetByName('mariadb-rs'); + + $this->assertTrue($rs->isSynced()); + + $scaler = $rs->scale(2); + + $this->assertTrue($rs->isSynced()); + + $timeout = 60; // 60 second timeout + $start = time(); + + while ($rs->getReadyReplicasCount() < 2 || $scaler->getReplicas() < 2) { + if (time() - $start > $timeout) { + $this->fail(sprintf( + 'Timeout waiting for replicas to scale to 2. Current state: ready=%d, scaler=%d', + $rs->getReadyReplicasCount(), + $scaler->getReplicas() + )); + } + + $scaler->refresh(); + $rs->refresh(); + + sleep(1); + } + + $this->assertEquals(2, $rs->getReadyReplicasCount()); + $this->assertEquals(2, $scaler->getReplicas()); + } + + public function runUpdateTests() + { + $rs = $this->cluster->getReplicaSetByName('mariadb-rs'); + + $this->assertTrue($rs->isSynced()); + + $rs->setAnnotations([]); + + $rs->createOrUpdate(); + + $this->assertTrue($rs->isSynced()); + + $this->assertEquals('apps/v1', $rs->getApiVersion()); + $this->assertEquals('mariadb-rs', $rs->getName()); + $this->assertEquals(['tier' => 'backend-rs'], $rs->getLabels()); + $this->assertEquals([], $rs->getAnnotations()); + $this->assertEquals(2, $rs->getReplicas()); + + $this->assertInstanceOf(K8sPod::class, $rs->getTemplate()); + } + + public function runWatchAllTests() + { + $watch = $this->cluster->replicaSet()->watchAll(function ($type, $rs) { + if ($rs->getName() === 'mariadb-rs') { + return true; + } + }, ['timeoutSeconds' => 10]); + + $this->assertTrue($watch); + } + + public function runWatchTests() + { + $watch = $this->cluster->replicaSet() + ->setName('mariadb-rs') + ->watch(function ($type, $rs) { + return $rs->getName() === 'mariadb-rs'; + }, ['timeoutSeconds' => 10]); + + $this->assertTrue($watch); + } + + public function runDeletionTests() + { + $rs = $this->cluster->getReplicaSetByName('mariadb-rs'); + + $this->assertTrue($rs->delete()); + + while ($rs->exists()) { + sleep(1); + } + + $this->expectException(KubernetesAPIException::class); + + $this->cluster->getReplicaSetByName('mariadb-rs'); + } +} diff --git a/tests/yaml/replicaset.yaml b/tests/yaml/replicaset.yaml new file mode 100644 index 0000000..cd91c39 --- /dev/null +++ b/tests/yaml/replicaset.yaml @@ -0,0 +1,26 @@ +apiVersion: apps/v1 +kind: ReplicaSet +metadata: + name: mariadb-rs + labels: + tier: backend-rs + annotations: + mariadb/annotation: "yes" +spec: + selector: + matchLabels: + app: mariadb-rs + replicas: 3 + template: + metadata: + name: mariadb + labels: + app: mariadb-rs + spec: + containers: + - name: mariadb + image: public.ecr.aws/docker/library/mariadb:11.8 + ports: + - name: mariadb + protocol: TCP + containerPort: 3306