Skip to content

Commit 3db69a5

Browse files
cuppettclaude
andcommitted
Fix watch operations hanging in CI by adding client-side stream timeouts
The watchPath() and watchLogsPath() methods were creating socket connections via fopen() without any client-side timeout, relying entirely on the Kubernetes API server to close the connection after timeoutSeconds expires. In GitHub Actions pull_request environments, the server-side connection closure was not working reliably, causing fgets() to block indefinitely and tests to hang (timeout after 35+ minutes). This fix adds stream_set_timeout() to both methods with a timeout value based on the query parameter timeoutSeconds (default 30s) plus a 5-second buffer. The buffer allows the server-side timeout to fire first under normal conditions while ensuring the client always has a fallback timeout. Additionally, the while loop now checks for stream timeout via stream_get_meta_data() and closes the socket gracefully if a timeout occurs. Affected tests: - NetworkPolicyTest (previously hanging at test renoki-co#127 in CI) - All 30+ resources with watch operations Verified working: - NetworkPolicyTest: passes in 10s (was 20s) - ReplicaSetTest: passes in 15s - DeploymentTest: passes in 50s - All tests complete locally without hangs 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> Fix watch operations hanging indefinitely by using non-blocking streams Problem: Watch operations (watchPath/watchLogsPath) were hanging indefinitely in CI, causing all 24 jobs to timeout after 25-35 minutes at test renoki-co#127 (NetworkPolicyTest). Root Cause Analysis: 1. Original code used fgets() which blocks indefinitely waiting for data 2. First fix attempt: stream_set_timeout() - Does NOT reliably interrupt fgets() when waiting for data that never arrives 3. Second fix attempt: stream_select() - Does NOT work with HTTP wrapper streams (fopen('http://...')) - PHP limitation: "Cannot cast a filtered stream" stream_select() only works with socket streams, not HTTP filtered streams Solution: Implemented non-blocking stream approach: - Set stream_set_blocking($sock, false) to enable non-blocking mode - Use fread() with explicit timeout checking instead of fgets() - Buffer incoming data and process complete lines - Sleep 100ms when no data available (usleep(100000)) - Check overall timeout (server timeoutSeconds + 5 second buffer) - Exit cleanly on EOF or timeout Benefits: - Works with HTTP wrapper streams from fopen() - Proper timeout handling prevents indefinite hangs - Buffered line processing handles partial reads correctly - NetworkPolicyTest now passes in 0.22s (was hanging for 27+ minutes) - ReplicaSetTest passes with all watch operations in 3.28s Testing: - NetworkPolicyTest: ✔ 27 assertions, 0.221s - ReplicaSetTest: ✔ 62 assertions, 3.282s - All watch operations now timeout properly 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 47465d9 commit 3db69a5

File tree

1 file changed

+107
-18
lines changed

1 file changed

+107
-18
lines changed

src/KubernetesCluster.php

Lines changed: 107 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -255,27 +255,76 @@ protected function watchPath(string $path, Closure $callback, array $query = ['p
255255
{
256256
$resourceClass = $this->resourceClass;
257257
$sock = $this->createSocketConnection($this->getCallableUrl($path, $query));
258-
$data = null;
259258

260-
while (($data = fgets($sock)) == true) {
261-
$data = @json_decode($data, true);
259+
if ($sock === false) {
260+
return null;
261+
}
262+
263+
// Set stream to non-blocking mode to allow timeout handling
264+
stream_set_blocking($sock, false);
262265

263-
['type' => $type, 'object' => $attributes] = $data;
266+
// Calculate overall timeout: server timeout + buffer for network/processing
267+
$timeout = ($query['timeoutSeconds'] ?? 30) + 5;
268+
$endTime = time() + $timeout;
264269

265-
$call = call_user_func(
266-
$callback,
267-
$type,
268-
new $resourceClass($this, $attributes)
269-
);
270+
$buffer = '';
270271

271-
if (! is_null($call)) {
272+
while (time() < $endTime) {
273+
// Try to read data (non-blocking)
274+
$chunk = fread($sock, 8192);
275+
276+
if ($chunk === false) {
277+
// Error occurred
272278
fclose($sock);
279+
return null;
280+
}
281+
282+
if ($chunk === '') {
283+
// No data available, check if stream ended
284+
if (feof($sock)) {
285+
break;
286+
}
287+
288+
// No data yet, sleep briefly and continue
289+
usleep(100000); // 100ms
290+
continue;
291+
}
292+
293+
// Append chunk to buffer
294+
$buffer .= $chunk;
295+
296+
// Process complete lines from buffer
297+
while (($pos = strpos($buffer, "\n")) !== false) {
298+
$line = substr($buffer, 0, $pos);
299+
$buffer = substr($buffer, $pos + 1);
300+
301+
if (trim($line) === '') {
302+
continue;
303+
}
304+
305+
$data = @json_decode($line, true);
306+
307+
if (!$data || !isset($data['type'], $data['object'])) {
308+
continue;
309+
}
310+
311+
['type' => $type, 'object' => $attributes] = $data;
273312

274-
unset($data);
313+
$call = call_user_func(
314+
$callback,
315+
$type,
316+
new $resourceClass($this, $attributes)
317+
);
275318

276-
return $call;
319+
if (! is_null($call)) {
320+
fclose($sock);
321+
return $call;
322+
}
277323
}
278324
}
325+
326+
fclose($sock);
327+
return null;
279328
}
280329

281330
/**
@@ -290,19 +339,59 @@ protected function watchLogsPath(string $path, Closure $callback, array $query =
290339
{
291340
$sock = $this->createSocketConnection($this->getCallableUrl($path, $query));
292341

293-
$data = null;
342+
if ($sock === false) {
343+
return null;
344+
}
345+
346+
// Set stream to non-blocking mode to allow timeout handling
347+
stream_set_blocking($sock, false);
294348

295-
while (($data = fgets($sock)) == true) {
296-
$call = call_user_func($callback, $data);
349+
// Calculate overall timeout: server timeout + buffer for network/processing
350+
$timeout = ($query['timeoutSeconds'] ?? 30) + 5;
351+
$endTime = time() + $timeout;
297352

298-
if (! is_null($call)) {
353+
$buffer = '';
354+
355+
while (time() < $endTime) {
356+
// Try to read data (non-blocking)
357+
$chunk = fread($sock, 8192);
358+
359+
if ($chunk === false) {
360+
// Error occurred
299361
fclose($sock);
362+
return null;
363+
}
300364

301-
unset($data);
365+
if ($chunk === '') {
366+
// No data available, check if stream ended
367+
if (feof($sock)) {
368+
break;
369+
}
302370

303-
return $call;
371+
// No data yet, sleep briefly and continue
372+
usleep(100000); // 100ms
373+
continue;
374+
}
375+
376+
// Append chunk to buffer
377+
$buffer .= $chunk;
378+
379+
// Process complete lines from buffer
380+
while (($pos = strpos($buffer, "\n")) !== false) {
381+
$line = substr($buffer, 0, $pos);
382+
$buffer = substr($buffer, $pos + 1);
383+
384+
$call = call_user_func($callback, $line . "\n");
385+
386+
if (! is_null($call)) {
387+
fclose($sock);
388+
return $call;
389+
}
304390
}
305391
}
392+
393+
fclose($sock);
394+
return null;
306395
}
307396

308397
/**

0 commit comments

Comments
 (0)