Skip to content

Commit

Permalink
Merge pull request #129 from rmeisler/master
Browse files Browse the repository at this point in the history
Persistent connections still not working as intended. This fixes them.
  • Loading branch information
sirn-se authored Feb 3, 2021
2 parents e3b1867 + f601df3 commit 2272656
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 61 deletions.
15 changes: 13 additions & 2 deletions lib/Base.php
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,21 @@ public function close(int $status = 1000, string $message = 'ttfn'): void
$this->receive(); // Receiving a close frame will close the socket now.
}

/**
* Disconnect from client/server.
*/
public function disconnect(): void
{
if ($this->isConnected()) {
fclose($this->socket);
$this->socket = null;
}
}

protected function write(string $data): void
{
$length = strlen($data);
$written = fwrite($this->socket, $data);
$written = @fwrite($this->socket, $data);
if ($written === false) {
$this->throwException("Failed to write {$length} bytes.");
}
Expand All @@ -427,7 +438,7 @@ protected function read(string $length): string
{
$data = '';
while (strlen($data) < $length) {
$buffer = fread($this->socket, $length - strlen($data));
$buffer = @fread($this->socket, $length - strlen($data));
if ($buffer === false) {
$read = strlen($data);
$this->throwException("Broken frame, read {$read} of stated {$length} bytes.");
Expand Down
121 changes: 62 additions & 59 deletions lib/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public function __construct(string $uri, array $options = [])

public function __destruct()
{
if ($this->isConnected()) {
if ($this->isConnected() && get_resource_type($this->socket) !== 'persistent stream') {
fclose($this->socket);
}
$this->socket = null;
Expand Down Expand Up @@ -100,8 +100,9 @@ protected function connect(): void
$context = stream_context_create();
}

$persistent = $this->options['persistent'] === true;
$flags = STREAM_CLIENT_CONNECT;
$flags = ($this->options['persistent'] === true) ? $flags | STREAM_CLIENT_PERSISTENT : $flags;
$flags = $persistent ? $flags | STREAM_CLIENT_PERSISTENT : $flags;

$error = $errno = $errstr = null;
set_error_handler(function (int $severity, string $message, string $file, int $line) use (&$error) {
Expand All @@ -127,73 +128,75 @@ protected function connect(): void
throw new ConnectionException($error);
}

// Set timeout on the stream as well.
stream_set_timeout($this->socket, $this->options['timeout']);

// Generate the WebSocket key.
$key = self::generateKey();

// Default headers
$headers = [
'Host' => $host . ":" . $port,
'User-Agent' => 'websocket-client-php',
'Connection' => 'Upgrade',
'Upgrade' => 'websocket',
'Sec-WebSocket-Key' => $key,
'Sec-WebSocket-Version' => '13',
];

// Handle basic authentication.
if ($user || $pass) {
$headers['authorization'] = 'Basic ' . base64_encode($user . ':' . $pass);
}
if (!$persistent || ftell($this->socket) == 0) {
// Set timeout on the stream as well.
stream_set_timeout($this->socket, $this->options['timeout']);

// Generate the WebSocket key.
$key = self::generateKey();

// Default headers
$headers = [
'Host' => $host . ":" . $port,
'User-Agent' => 'websocket-client-php',
'Connection' => 'Upgrade',
'Upgrade' => 'websocket',
'Sec-WebSocket-Key' => $key,
'Sec-WebSocket-Version' => '13',
];

// Handle basic authentication.
if ($user || $pass) {
$headers['authorization'] = 'Basic ' . base64_encode($user . ':' . $pass);
}

// Deprecated way of adding origin (use headers instead).
if (isset($this->options['origin'])) {
$headers['origin'] = $this->options['origin'];
}
// Deprecated way of adding origin (use headers instead).
if (isset($this->options['origin'])) {
$headers['origin'] = $this->options['origin'];
}

// Add and override with headers from options.
if (isset($this->options['headers'])) {
$headers = array_merge($headers, $this->options['headers']);
}
// Add and override with headers from options.
if (isset($this->options['headers'])) {
$headers = array_merge($headers, $this->options['headers']);
}

$header = "GET " . $path_with_query . " HTTP/1.1\r\n" . implode(
"\r\n",
array_map(
function ($key, $value) {
return "$key: $value";
},
array_keys($headers),
$headers
)
) . "\r\n\r\n";
$header = "GET " . $path_with_query . " HTTP/1.1\r\n" . implode(
"\r\n",
array_map(
function ($key, $value) {
return "$key: $value";
},
array_keys($headers),
$headers
)
) . "\r\n\r\n";

// Send headers.
$this->write($header);
// Send headers.
$this->write($header);

// Get server response header (terminated with double CR+LF).
$response = stream_get_line($this->socket, 1024, "\r\n\r\n");
// Get server response header (terminated with double CR+LF).
$response = stream_get_line($this->socket, 1024, "\r\n\r\n");

/// @todo Handle version switching
/// @todo Handle version switching

$address = "{$scheme}://{$host}{$path_with_query}";
$address = "{$scheme}://{$host}{$path_with_query}";

// Validate response.
if (!preg_match('#Sec-WebSocket-Accept:\s(.*)$#mUi', $response, $matches)) {
$error = "Connection to '{$address}' failed: Server sent invalid upgrade response: {$response}";
$this->logger->error($error);
throw new ConnectionException($error);
}
// Validate response.
if (!preg_match('#Sec-WebSocket-Accept:\s(.*)$#mUi', $response, $matches)) {
$error = "Connection to '{$address}' failed: Server sent invalid upgrade response: {$response}";
$this->logger->error($error);
throw new ConnectionException($error);
}

$keyAccept = trim($matches[1]);
$expectedResonse
= base64_encode(pack('H*', sha1($key . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
$keyAccept = trim($matches[1]);
$expectedResonse
= base64_encode(pack('H*', sha1($key . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));

if ($keyAccept !== $expectedResonse) {
$error = 'Server sent bad upgrade response.';
$this->logger->error($error);
throw new ConnectionException($error);
if ($keyAccept !== $expectedResonse) {
$error = 'Server sent bad upgrade response.';
$this->logger->error($error);
throw new ConnectionException($error);
}
}

$this->logger->info("Client connected to {$address}");
Expand Down
2 changes: 2 additions & 0 deletions tests/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ public function testPersistentConnection(): void
MockSocket::initialize('client.connect-persistent', $this);
$client = new Client('ws://localhost:8000/my/mock/path', ['persistent' => true]);
$client->send('Connect');
$client->disconnect();
$this->assertFalse($client->isConnected());
$this->assertTrue(MockSocket::isEmpty());
}

Expand Down
5 changes: 5 additions & 0 deletions tests/mock/mock-socket.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ function feof()
$args = func_get_args();
return MockSocket::handle('feof', $args);
}
function ftell()
{
$args = func_get_args();
return MockSocket::handle('ftell', $args);
}
function fclose()
{
$args = func_get_args();
Expand Down
28 changes: 28 additions & 0 deletions tests/scripts/client.connect-persistent.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
],
"return": "persistent stream"
},
{
"function": "ftell",
"params": [
"@mock-stream"
],
"return": 0
},
{
"function": "stream_set_timeout",
"params": [
Expand Down Expand Up @@ -62,6 +69,27 @@
"@mock-stream"
],
"return": 13
},
{
"function": "get_resource_type",
"params": [
"@mock-stream"
],
"return": "persistent stream"
},
{
"function": "get_resource_type",
"params": [
"@mock-stream"
],
"return": "persistent stream"
},
{
"function": "fclose",
"params": [
"@mock-stream"
],
"return":true
}
]

7 changes: 7 additions & 0 deletions tests/scripts/client.destruct.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
[
{
"function": "get_resource_type",
"params": [
"@mock-stream"
],
"return": "stream"
},
{
"function": "get_resource_type",
"params": [
Expand Down

0 comments on commit 2272656

Please sign in to comment.