diff --git a/README.md b/README.md index d6db0f552..07e40c2fa 100644 --- a/README.md +++ b/README.md @@ -266,6 +266,111 @@ Worker::runAll(); +#### Use HTTP proxy + +```php +onWorkerStart = function($worker){ + echo '开始链接' . PHP_EOL; + $url = 'ws://stream.binance.com:9443/ws'; + $con = new AsyncTcpConnection($url); + $con->transport = 'ssl'; +// $con->proxySocks5 = '127.0.0.1:1080'; + $con->proxyHttp = '127.0.0.1:25378'; + + $con->onConnect = function(AsyncTcpConnection $con) { + $ww = [ + 'id' => 1, + 'method' => 'SUBSCRIBE', + 'params' => [ + "btcusdt@aggTrade", + "btcusdt@depth" + ] + ]; + echo '链接成功'; + $con->send(json_encode($ww)); + echo 'ok'; + }; + + $con->onMessage = function(AsyncTcpConnection $con, $data) { + echo $data; + }; + + $con->onClose = function (AsyncTcpConnection $con) { + echo 'onClose' . PHP_EOL; + }; + + $con->onError = function (AsyncTcpConnection $con, $code, $msg) { + echo "error [ $code ] $msg\n"; + }; + + $con->connect(); +}; +\Workerman\Worker::runAll(); +``` + + + +#### Use Socks5 proxy + +```php +onWorkerStart = function($worker){ + echo '开始链接' . PHP_EOL; + $url = 'ws://stream.binance.com:9443/ws'; + $con = new AsyncTcpConnection($url); + $con->transport = 'ssl'; + $con->proxySocks5 = '127.0.0.1:1080'; +// $con->proxyHttp = '127.0.0.1:25378'; + + $con->onConnect = function(AsyncTcpConnection $con) { + $ww = [ + 'id' => 1, + 'method' => 'SUBSCRIBE', + 'params' => [ + "btcusdt@aggTrade", + "btcusdt@depth" + ] + ]; + echo '链接成功'; + $con->send(json_encode($ww)); + echo 'ok'; + }; + + $con->onMessage = function(AsyncTcpConnection $con, $data) { + echo $data; + }; + + $con->onClose = function (AsyncTcpConnection $con) { + echo 'onClose' . PHP_EOL; + }; + + $con->onError = function (AsyncTcpConnection $con, $code, $msg) { + echo "error [ $code ] $msg\n"; + }; + + $con->connect(); +}; +\Workerman\Worker::runAll(); + +``` + + + +proxy supports TLS1.3, no Sniproxy channel + + + ## Available commands ```php start.php start ``` ```php start.php start -d ``` diff --git a/src/Connection/AsyncTcpConnection.php b/src/Connection/AsyncTcpConnection.php index 25c19af30..4f099320d 100644 --- a/src/Connection/AsyncTcpConnection.php +++ b/src/Connection/AsyncTcpConnection.php @@ -38,6 +38,20 @@ class AsyncTcpConnection extends TcpConnection */ public $transport = 'tcp'; + /** + * Socks5 proxy + * + * @var string + */ + public $proxySocks5 = ''; + + /** + * Http proxy + * + * @var string + */ + public $proxyHttp = ''; + /** * Status. * @@ -179,6 +193,7 @@ public function connect() $this->_status !== self::STATUS_CLOSED) { return; } + $this->_status = self::STATUS_CONNECTING; $this->_connectStartTime = \microtime(true); if ($this->transport !== 'unix') { @@ -187,7 +202,24 @@ public function connect() $this->_remoteAddress = $this->_remoteHost . ':' . $this->_remotePort; } // Open socket connection asynchronously. - if ($this->_contextOption) { + if ($this->proxySocks5){ + $this->_contextOption['ssl']['peer_name'] = $this->_remoteHost; + $context = \stream_context_create($this->_contextOption); + $this->_socket = \stream_socket_client("tcp://{$this->proxySocks5}", $errno, $errstr, 0, \STREAM_CLIENT_ASYNC_CONNECT, $context); + fwrite($this->_socket,chr(5) . chr(1) . chr(0)); + fread($this->_socket, 512); + fwrite($this->_socket,chr(5) . chr(1) . chr(0) . chr(3) . chr(strlen($this->_remoteHost)) . $this->_remoteHost . pack("n", $this->_remotePort)); + fread($this->_socket, 512); + }else if($this->proxyHttp){ + $this->_contextOption['ssl']['peer_name'] = $this->_remoteHost; + $context = \stream_context_create($this->_contextOption); + $this->_socket = \stream_socket_client("tcp://{$this->proxyHttp}", $errno, $errstr, 0, \STREAM_CLIENT_ASYNC_CONNECT, $context); + $str = "CONNECT {$this->_remoteHost}:{$this->_remotePort} HTTP/1.1\n"; + $str .= "Host: {$this->_remoteHost}:{$this->_remotePort}\n"; + $str .= "Proxy-Connection: keep-alive\n"; + fwrite($this->_socket,$str); + fread($this->_socket, 512); + } else if ($this->_contextOption) { $context = \stream_context_create($this->_contextOption); $this->_socket = \stream_socket_client("tcp://{$this->_remoteHost}:{$this->_remotePort}", $errno, $errstr, 0, \STREAM_CLIENT_ASYNC_CONNECT, $context); @@ -299,7 +331,6 @@ public function checkConnection() if (\DIRECTORY_SEPARATOR === '\\') { Worker::$globalEvent->offExcept($this->_socket); } - // Remove write listener. Worker::$globalEvent->offWritable($this->_socket); @@ -321,7 +352,6 @@ public function checkConnection() \socket_set_option($raw_socket, \SOL_SOCKET, \SO_KEEPALIVE, 1); \socket_set_option($raw_socket, \SOL_TCP, \TCP_NODELAY, 1); } - // SSL handshake. if ($this->transport === 'ssl') { $this->_sslHandshakeCompleted = $this->doSslHandshake($this->_socket); @@ -334,7 +364,6 @@ public function checkConnection() Worker::$globalEvent->onWritable($this->_socket, [$this, 'baseWrite']); } } - // Register a listener waiting read event. Worker::$globalEvent->onReadable($this->_socket, [$this, 'baseRead']); @@ -367,5 +396,6 @@ public function checkConnection() $this->onConnect = null; } } + } }