Skip to content

Commit

Permalink
Merge branch 'walkor:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
1923998238 authored Jan 11, 2023
2 parents 46644d7 + 956a5d8 commit 7bde2d6
Show file tree
Hide file tree
Showing 22 changed files with 929 additions and 414 deletions.
3 changes: 3 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/.gitattributes export-ignore
/.github export-ignore
/.gitignore export-ignore
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,13 @@ proxy supports TLS1.3, no Sniproxy channel
```php start.php start ```
```php start.php start -d ```
```php start.php status ```
```php start.php status -d ```
```php start.php connections```
```php start.php stop ```
```php start.php stop -g ```
```php start.php restart ```
```php start.php reload ```
```php start.php reload -g ```

## Documentation

Expand Down
6 changes: 6 additions & 0 deletions SECURITY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Security Policy


## Reporting a Vulnerability

Please contact by email [email protected]
10 changes: 5 additions & 5 deletions src/Connection/AsyncTcpConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ class AsyncTcpConnection extends TcpConnection
/**
* PHP built-in protocols.
*
* @var array
* @var array<string,string>
*/
protected static $_builtinTransports = [
const BUILD_IN_TRANSPORTS = [
'tcp' => 'tcp',
'udp' => 'udp',
'unix' => 'unix',
Expand Down Expand Up @@ -150,7 +150,7 @@ public function __construct($remote_address, array $context_option = [])
$this->_remoteHost = $address_info['host'];
$this->_remotePort = $address_info['port'];
$this->_remoteURI = "{$address_info['path']}{$address_info['query']}";
$scheme = isset($address_info['scheme']) ? $address_info['scheme'] : 'tcp';
$scheme = $address_info['scheme'] ?? 'tcp';
$this->_remoteAddress = 'unix' === strtolower($scheme)
? substr($remote_address, strpos($remote_address, '/') + 2)
: $this->_remoteHost . ':' . $this->_remotePort;
Expand All @@ -161,7 +161,7 @@ public function __construct($remote_address, array $context_option = [])
self::$_idRecorder = 0;
}
// Check application layer protocol class.
if (!isset(self::$_builtinTransports[$scheme])) {
if (!isset(self::BUILD_IN_TRANSPORTS[$scheme])) {
$scheme = \ucfirst($scheme);
$this->protocol = '\\Protocols\\' . $scheme;
if (!\class_exists($this->protocol)) {
Expand All @@ -171,7 +171,7 @@ public function __construct($remote_address, array $context_option = [])
}
}
} else {
$this->transport = self::$_builtinTransports[$scheme];
$this->transport = self::BUILD_IN_TRANSPORTS[$scheme];
}

// For statistics.
Expand Down
2 changes: 1 addition & 1 deletion src/Connection/AsyncUdpConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public function connect()
\stream_set_blocking($this->_socket, false);

if ($this->onMessage) {
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, [$this, 'baseRead']);
Worker::$globalEvent->onWritable($this->_socket, [$this, 'baseRead']);
}
$this->connected = true;
// Try to emit onConnect callback.
Expand Down
1 change: 1 addition & 0 deletions src/Connection/ConnectionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
/**
* ConnectionInterface.
*/
#[\AllowDynamicProperties]
abstract class ConnectionInterface
{
/**
Expand Down
38 changes: 36 additions & 2 deletions src/Connection/TcpConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/**
* TcpConnection.
*/
class TcpConnection extends ConnectionInterface
class TcpConnection extends ConnectionInterface implements \JsonSerializable
{
/**
* Read buffer size.
Expand Down Expand Up @@ -157,6 +157,13 @@ class TcpConnection extends ConnectionInterface
* @var int
*/
public $maxSendBufferSize = 1048576;

/**
* Context.
*
* @var object|null
*/
public $context = null;

/**
* Default send buffer size.
Expand Down Expand Up @@ -293,6 +300,7 @@ public function __construct($socket, $remote_address = '')
$this->maxPackageSize = self::$defaultMaxPackageSize;
$this->_remoteAddress = $remote_address;
static::$connections[$this->id] = $this;
$this->context = new \stdClass;
}

/**
Expand Down Expand Up @@ -587,7 +595,7 @@ public function baseRead($socket, $check_eof = true)
} else {
$this->bytesRead += \strlen($buffer);
if ($this->_recvBuffer === '') {
if (static::$_enableCache && !isset($requests[512]) && isset($requests[$buffer])) {
if (static::$_enableCache && !isset($buffer[512]) && isset($requests[$buffer])) {
++self::$statistics['total_request'];
$request = $requests[$buffer];
if ($request instanceof Request) {
Expand Down Expand Up @@ -715,6 +723,9 @@ public function baseWrite()
}
}
if ($this->_status === self::STATUS_CLOSING) {
if ($this->context->streamSending) {
return true;
}
$this->destroy();
}
return true;
Expand Down Expand Up @@ -969,6 +980,29 @@ public static function enableCache($value)
{
static::$_enableCache = (bool)$value;
}

/**
* Get the json_encode information.
*
* @return mixed
*/
#[\ReturnTypeWillChange]
public function jsonSerialize()
{
return [
'id' => $this->id,
'status' => $this->getStatus(),
'transport' => $this->transport,
'getRemoteIp' => $this->getRemoteIp(),
'remotePort' => $this->getRemotePort(),
'getRemoteAddress' => $this->getRemoteAddress(),
'getLocalIp' => $this->getLocalIp(),
'getLocalPort' => $this->getLocalPort(),
'getLocalAddress' => $this->getLocalAddress(),
'isIpV4' => $this->isIpV4(),
'isIpV6' => $this->isIpV6(),
];
}

/**
* Destruct.
Expand Down
22 changes: 21 additions & 1 deletion src/Connection/UdpConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
/**
* UdpConnection.
*/
class UdpConnection extends ConnectionInterface
class UdpConnection extends ConnectionInterface implements \JsonSerializable
{
/**
* Application layer protocol.
Expand Down Expand Up @@ -206,4 +206,24 @@ public function getSocket()
{
return $this->_socket;
}

/**
* Get the json_encode informattion.
*
* @return array
*/
public function jsonSerialize()
{
return [
'transport' => $this->transport,
'getRemoteIp' => $this->getRemoteIp(),
'remotePort' => $this->getRemotePort(),
'getRemoteAddress' => $this->getRemoteAddress(),
'getLocalIp' => $this->getLocalIp(),
'getLocalPort' => $this->getLocalPort(),
'getLocalAddress' => $this->getLocalAddress(),
'isIpV4' => $this->isIpV4(),
'isIpV6' => $this->isIpV6(),
];
}
}
19 changes: 11 additions & 8 deletions src/Events/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Event implements EventInterface
* Timer id.
* @var int
*/
protected $_timerId = 1;
protected $_timerId = 0;

/**
* Event class name.
Expand Down Expand Up @@ -90,8 +90,10 @@ public function __construct()
public function delay(float $delay, $func, $args)
{
$class_name = $this->_eventClassName;
$event = new $class_name($this->_eventBase, -1, $class_name::TIMEOUT, function () use ($func, $args) {
$timer_id = $this->_timerId++;
$event = new $class_name($this->_eventBase, -1, $class_name::TIMEOUT, function () use ($func, $args, $timer_id) {
try {
$this->deleteTimer($timer_id);
$func(...$args);
} catch (\Throwable $e) {
Worker::stopAll(250, $e);
Expand All @@ -100,8 +102,8 @@ public function delay(float $delay, $func, $args)
if (!$event || !$event->addTimer($delay)) {
return false;
}
$this->_eventTimer[$this->_timerId] = $event;
return $this->_timerId++;
$this->_eventTimer[$timer_id] = $event;
return $timer_id;
}

/**
Expand All @@ -123,7 +125,8 @@ public function deleteTimer($timer_id)
public function repeat(float $interval, $func, $args)
{
$class_name = $this->_eventClassName;
$event = new $this->_eventClassName($this->_eventBase, -1, $class_name::TIMEOUT | $class_name::PERSIST, function () use ($func, $args) {
$timer_id = $this->_timerId++;
$event = new $class_name($this->_eventBase, -1, $class_name::TIMEOUT | $class_name::PERSIST, function () use ($func, $args) {
try {
$func(...$args);
} catch (\Throwable $e) {
Expand All @@ -133,8 +136,8 @@ public function repeat(float $interval, $func, $args)
if (!$event || !$event->addTimer($interval)) {
return false;
}
$this->_eventTimer[$this->_timerId] = $event;
return $this->_timerId++;
$this->_eventTimer[$timer_id] = $event;
return $timer_id;
}

/**
Expand Down Expand Up @@ -175,7 +178,7 @@ public function onWritable($stream, $func)
if (!$event || !$event->add()) {
return false;
}
$this->_readEvents[$fd_key] = $event;
$this->_writeEvents[$fd_key] = $event;
return true;
}

Expand Down
30 changes: 24 additions & 6 deletions src/Events/Select.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

namespace Workerman\Events;

use Throwable;
use Workerman\Worker;

/**
* select eventloop
*/
Expand Down Expand Up @@ -133,6 +136,7 @@ public function repeat(float $delay, $func, $args)
$this->_scheduler->insert($timer_id, -$run_time);
$this->_eventTimer[$timer_id] = [$func, (array)$args, $delay];
$select_timeout = ($run_time - \microtime(true)) * 1000000;
$select_timeout = $select_timeout <= 0 ? 1 : (int)$select_timeout;
if ($this->_selectTimeout > $select_timeout) {
$this->_selectTimeout = $select_timeout;
}
Expand Down Expand Up @@ -258,12 +262,13 @@ public function signalHandler($signal)
*/
protected function tick()
{
$tasks_to_insert = [];
while (!$this->_scheduler->isEmpty()) {
$scheduler_data = $this->_scheduler->top();
$timer_id = $scheduler_data['data'];
$next_run_time = -$scheduler_data['priority'];
$time_now = \microtime(true);
$this->_selectTimeout = (int)($next_run_time - $time_now) * 1000000;
$this->_selectTimeout = (int)(($next_run_time - $time_now) * 1000000);
if ($this->_selectTimeout <= 0) {
$this->_scheduler->extract();

Expand All @@ -275,13 +280,27 @@ protected function tick()
$task_data = $this->_eventTimer[$timer_id];
if (isset($task_data[2])) {
$next_run_time = $time_now + $task_data[2];
$this->_scheduler->insert($timer_id, -$next_run_time);
$tasks_to_insert[] = [$timer_id, -$next_run_time];
} else {
unset($this->_eventTimer[$timer_id]);
}
$task_data[0]($task_data[1]);
continue;
try {
$task_data[0]($task_data[1]);
} catch (Throwable $e) {
Worker::stopAll(250, $e);
}
} else {
break;
}
}
foreach ($tasks_to_insert as $item) {
$this->_scheduler->insert($item[0], $item[1]);
}
if (!$this->_scheduler->isEmpty()) {
$scheduler_data = $this->_scheduler->top();
$next_run_time = -$scheduler_data['priority'];
$time_now = \microtime(true);
$this->_selectTimeout = \max((int)(($next_run_time - $time_now) * 1000000), 0);
return;
}
$this->_selectTimeout = 100000000;
Expand Down Expand Up @@ -316,12 +335,11 @@ public function run()
// Waiting read/write/signal/timeout events.
try {
@stream_select($read, $write, $except, 0, $this->_selectTimeout);
} catch (\Throwable $e) {
} catch (Throwable $e) {
}

} else {
$this->_selectTimeout >= 1 && usleep($this->_selectTimeout);
$ret = false;
}

if (!$this->_scheduler->isEmpty()) {
Expand Down
Loading

0 comments on commit 7bde2d6

Please sign in to comment.