Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions src/Kinds/K8sReplicaSet.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
<?php

namespace RenokiCo\PhpK8s\Kinds;

use RenokiCo\PhpK8s\Contracts\InteractsWithK8sCluster;
use RenokiCo\PhpK8s\Contracts\Podable;
use RenokiCo\PhpK8s\Contracts\Scalable;
use RenokiCo\PhpK8s\Contracts\Watchable;
use RenokiCo\PhpK8s\Traits\Resource\CanScale;
use RenokiCo\PhpK8s\Traits\Resource\HasPods;
use RenokiCo\PhpK8s\Traits\Resource\HasReplicas;
use RenokiCo\PhpK8s\Traits\Resource\HasSelector;
use RenokiCo\PhpK8s\Traits\Resource\HasSpec;
use RenokiCo\PhpK8s\Traits\Resource\HasStatus;
use RenokiCo\PhpK8s\Traits\Resource\HasStatusConditions;
use RenokiCo\PhpK8s\Traits\Resource\HasTemplate;

class K8sReplicaSet extends K8sResource implements
InteractsWithK8sCluster,
Podable,
Scalable,
Watchable
{
use CanScale;
use HasPods {
podsSelector as protected customPodsSelector;
}
use HasReplicas;
use HasSelector;
use HasSpec;
use HasStatus;
use HasStatusConditions;
use HasTemplate;

/**
* The resource Kind parameter.
*
* @var null|string
*/
protected static $kind = 'ReplicaSet';

/**
* The default version for the resource.
*
* @var string
*/
protected static $defaultVersion = 'apps/v1';

/**
* Whether the resource has a namespace.
*
* @var bool
*/
protected static $namespaceable = true;

/**
* Get the selector for the pods that are owned by this resource.
*
* @return array
*/
public function podsSelector(): array
{
if ($podsSelector = $this->customPodsSelector()) {
return $podsSelector;
}

return [
'replicaset-name' => $this->getName(),
];
}

/**
* Get the available replicas.
*
* @return int
*/
public function getAvailableReplicasCount(): int
{
return $this->getStatus('availableReplicas', 0);
}

/**
* Get the ready replicas.
*
* @return int
*/
public function getReadyReplicasCount(): int
{
return $this->getStatus('readyReplicas', 0);
}

/**
* Get the fully labeled replicas.
*
* @return int
*/
public function getFullyLabeledReplicasCount(): int
{
return $this->getStatus('fullyLabeledReplicas', 0);
}

/**
* Get the total desired replicas.
*
* @return int
*/
public function getDesiredReplicasCount(): int
{
return $this->getStatus('replicas', 0);
}
}
51 changes: 51 additions & 0 deletions src/Kinds/K8sScale.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,55 @@ public function refreshOriginal(array $query = ['pretty' => 1])

return parent::refreshOriginal($query);
}

/**
* Create the scale resource.
* Scale subresources should use replace (PUT) operations, not create (POST).
* Scale subresources don't support POST, so we use PUT to the scale subresource path.
*
* @param array $query
* @return \RenokiCo\PhpK8s\Kinds\K8sResource
*
* @throws \RenokiCo\PhpK8s\Exceptions\KubernetesAPIException
*/
public function create(array $query = ['pretty' => 1])
{
return $this->cluster
->setResourceClass(get_class($this))
->runOperation(
\RenokiCo\PhpK8s\KubernetesCluster::REPLACE_OP,
$this->resourcePath(),
$this->toJsonPayload(),
$query
);
}

/**
* Update the scale resource.
* This is the correct operation for scale subresources.
* Scale is updated via PUT to the scale subresource path.
*
* @param array $query
* @return bool
*
* @throws \RenokiCo\PhpK8s\Exceptions\KubernetesAPIException
*/
public function update(array $query = ['pretty' => 1]): bool
{
$this->refreshOriginal();
$this->refreshResourceVersion();

$instance = $this->cluster
->setResourceClass(get_class($this))
->runOperation(
\RenokiCo\PhpK8s\KubernetesCluster::REPLACE_OP,
$this->resourcePath(),
$this->toJsonPayload(),
$query
);

$this->syncWith($instance->toArray());

return true;
}
}
129 changes: 111 additions & 18 deletions src/KubernetesCluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
* @method \RenokiCo\PhpK8s\Kinds\K8sDeployment getDeploymentByName(string $name, string $namespace = 'default', array $query = ['pretty' => 1])
* @method \RenokiCo\PhpK8s\ResourcesList getAllDeploymentsFromAllNamespaces(array $query = ['pretty' => 1])
* @method \RenokiCo\PhpK8s\ResourcesList getAllDeployments(string $namespace = 'default', array $query = ['pretty' => 1])
* @method \RenokiCo\PhpK8s\Kinds\K8sReplicaSet replicaSet(array $attributes = [])
* @method \RenokiCo\PhpK8s\Kinds\K8sReplicaSet getReplicaSetByName(string $name, string $namespace = 'default', array $query = ['pretty' => 1])
* @method \RenokiCo\PhpK8s\ResourcesList getAllReplicaSetsFromAllNamespaces(array $query = ['pretty' => 1])
* @method \RenokiCo\PhpK8s\ResourcesList getAllReplicaSets(string $namespace = 'default', array $query = ['pretty' => 1])
* @method \RenokiCo\PhpK8s\Kinds\K8sJob job(array $attributes = [])
* @method \RenokiCo\PhpK8s\Kinds\K8sJob getJobByName(string $name, string $namespace = 'default', array $query = ['pretty' => 1])
* @method \RenokiCo\PhpK8s\ResourcesList getAllJobsFromAllNamespaces(array $query = ['pretty' => 1])
Expand Down Expand Up @@ -251,27 +255,76 @@ protected function watchPath(string $path, Closure $callback, array $query = ['p
{
$resourceClass = $this->resourceClass;
$sock = $this->createSocketConnection($this->getCallableUrl($path, $query));
$data = null;

while (($data = fgets($sock)) == true) {
$data = @json_decode($data, true);
if ($sock === false) {
return null;
}

// Set stream to non-blocking mode to allow timeout handling
stream_set_blocking($sock, false);

['type' => $type, 'object' => $attributes] = $data;
// Calculate overall timeout: server timeout + buffer for network/processing
$timeout = ($query['timeoutSeconds'] ?? 30) + 5;
$endTime = time() + $timeout;

$call = call_user_func(
$callback,
$type,
new $resourceClass($this, $attributes)
);
$buffer = '';

if (! is_null($call)) {
while (time() < $endTime) {
// Try to read data (non-blocking)
$chunk = fread($sock, 8192);

if ($chunk === false) {
// Error occurred
fclose($sock);
return null;
}

if ($chunk === '') {
// No data available, check if stream ended
if (feof($sock)) {
break;
}

// No data yet, sleep briefly and continue
usleep(100000); // 100ms
continue;
}

// Append chunk to buffer
$buffer .= $chunk;

// Process complete lines from buffer
while (($pos = strpos($buffer, "\n")) !== false) {
$line = substr($buffer, 0, $pos);
$buffer = substr($buffer, $pos + 1);

if (trim($line) === '') {
continue;
}

$data = @json_decode($line, true);

if (!$data || !isset($data['type'], $data['object'])) {
continue;
}

['type' => $type, 'object' => $attributes] = $data;

unset($data);
$call = call_user_func(
$callback,
$type,
new $resourceClass($this, $attributes)
);

return $call;
if (! is_null($call)) {
fclose($sock);
return $call;
}
}
}

fclose($sock);
return null;
}

/**
Expand All @@ -286,19 +339,59 @@ protected function watchLogsPath(string $path, Closure $callback, array $query =
{
$sock = $this->createSocketConnection($this->getCallableUrl($path, $query));

$data = null;
if ($sock === false) {
return null;
}

// Set stream to non-blocking mode to allow timeout handling
stream_set_blocking($sock, false);

while (($data = fgets($sock)) == true) {
$call = call_user_func($callback, $data);
// Calculate overall timeout: server timeout + buffer for network/processing
$timeout = ($query['timeoutSeconds'] ?? 30) + 5;
$endTime = time() + $timeout;

if (! is_null($call)) {
$buffer = '';

while (time() < $endTime) {
// Try to read data (non-blocking)
$chunk = fread($sock, 8192);

if ($chunk === false) {
// Error occurred
fclose($sock);
return null;
}

unset($data);
if ($chunk === '') {
// No data available, check if stream ended
if (feof($sock)) {
break;
}

return $call;
// No data yet, sleep briefly and continue
usleep(100000); // 100ms
continue;
}

// Append chunk to buffer
$buffer .= $chunk;

// Process complete lines from buffer
while (($pos = strpos($buffer, "\n")) !== false) {
$line = substr($buffer, 0, $pos);
$buffer = substr($buffer, $pos + 1);

$call = call_user_func($callback, $line . "\n");

if (! is_null($call)) {
fclose($sock);
return $call;
}
}
}

fclose($sock);
return null;
}

/**
Expand Down
13 changes: 13 additions & 0 deletions src/Traits/InitializesResources.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use RenokiCo\PhpK8s\Kinds\K8sPod;
use RenokiCo\PhpK8s\Kinds\K8sPodDisruptionBudget;
use RenokiCo\PhpK8s\Kinds\K8sPriorityClass;
use RenokiCo\PhpK8s\Kinds\K8sReplicaSet;
use RenokiCo\PhpK8s\Kinds\K8sResourceQuota;
use RenokiCo\PhpK8s\Kinds\K8sRole;
use RenokiCo\PhpK8s\Kinds\K8sRoleBinding;
Expand Down Expand Up @@ -192,6 +193,18 @@ public static function deployment($cluster = null, array $attributes = [])
return new K8sDeployment($cluster, $attributes);
}

/**
* Create a new ReplicaSet kind.
*
* @param \RenokiCo\PhpK8s\KubernetesCluster|null $cluster
* @param array $attributes
* @return \RenokiCo\PhpK8s\Kinds\K8sReplicaSet
*/
public static function replicaSet($cluster = null, array $attributes = [])
{
return new K8sReplicaSet($cluster, $attributes);
}

/**
* Create a new Job kind.
*
Expand Down
Loading