Skip to content

Commit

Permalink
Compatible with apache/php-fpm
Browse files Browse the repository at this point in the history
  • Loading branch information
walkor authored Apr 28, 2018
1 parent 23a2ccb commit 73def8e
Showing 1 changed file with 90 additions and 56 deletions.
146 changes: 90 additions & 56 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

/**
* Channel/Client
* @version 1.0.3
* @version 1.0.4
*/
class Client
class Client
{
/**
* onMessage.
Expand Down Expand Up @@ -50,19 +50,24 @@ class Client
* @var Timer
*/
protected static $_reconnectTimer = null;

/**
* Ping timer.
* @var Timer
*/
protected static $_pingTimer = null;

/**
* All event callback.
* @var array
*/
protected static $_events = array();


/**
* @var bool
*/
protected static $_isWorkermanEnv = true;

/**
* Ping interval.
* @var int
Expand All @@ -79,50 +84,61 @@ public static function connect($ip = '127.0.0.1', $port = 2206)
{
if(!self::$_remoteConnection)
{
self::$_remoteIp = $ip;
self::$_remotePort = $port;
self::$_remoteConnection = new AsyncTcpConnection('frame://'.self::$_remoteIp.':'.self::$_remotePort);
self::$_remoteConnection->onClose = 'Channel\Client::onRemoteClose';
self::$_remoteConnection->onConnect = 'Channel\Client::onRemoteConnect';
self::$_remoteConnection->onMessage = 'Channel\Client::onRemoteMessage';
self::$_remoteConnection->connect();

if(empty(self::$_pingTimer))
{
self::$_pingTimer = Timer::add(self::$pingInterval, 'Channel\Client::ping');
}
}
self::$_remoteIp = $ip;
self::$_remotePort = $port;
if (PHP_SAPI !== 'cli' || !class_exists('Workerman\Worker', false)) {
self::$_isWorkermanEnv = false;
}
// For workerman environment.
if (self::$_isWorkermanEnv) {
self::$_remoteConnection = new AsyncTcpConnection('frame://' . self::$_remoteIp . ':' . self::$_remotePort);
self::$_remoteConnection->onClose = 'Channel\Client::onRemoteClose';
self::$_remoteConnection->onConnect = 'Channel\Client::onRemoteConnect';
self::$_remoteConnection->onMessage = 'Channel\Client::onRemoteMessage';
self::$_remoteConnection->connect();

if (empty(self::$_pingTimer)) {
self::$_pingTimer = Timer::add(self::$pingInterval, 'Channel\Client::ping');
}
// Not workerman environment.
} else {
self::$_remoteConnection = stream_socket_client('tcp://'.self::$_remoteIp.':'.self::$_remotePort, $code, $message, 5);
if (!self::$_remoteConnection) {
throw new \Exception($message);
}
}
}
}

/**
* onRemoteMessage.
* @param TcpConnection $connection
* @param string $data
* @throws \Exception
*/
public static function onRemoteMessage($connection, $data)
{
$data = unserialize($data);
$event = $data['channel'];
$event_data = $data['data'];
if(!empty(self::$_events[$event]))
{
call_user_func(self::$_events[$event], $event_data);
}
elseif(!empty(Client::$onMessage))
{
call_user_func(Client::$onMessage, $event, $event_data);
}
else
{
throw new \Exception("event:$event have not callback");
}
}
/**
* Ping.
* @return void
*/
{
$data = unserialize($data);
$event = $data['channel'];
$event_data = $data['data'];
if(!empty(self::$_events[$event]))
{
call_user_func(self::$_events[$event], $event_data);
}
elseif(!empty(Client::$onMessage))
{
call_user_func(Client::$onMessage, $event, $event_data);
}
else
{
throw new \Exception("event:$event have not callback");
}
}

/**
* Ping.
* @return void
*/
public static function ping()
{
if(self::$_remoteConnection)
Expand Down Expand Up @@ -170,13 +186,16 @@ public static function onRemoteConnect()
*/
public static function clearTimer()
{
if (!self::$_isWorkermanEnv) {
throw new \Exception('Channel\\Client not support clearTimer method when it is not in the workerman environment.');
}
if(self::$_reconnectTimer)
{
Timer::del(self::$_reconnectTimer);
self::$_reconnectTimer = null;
Timer::del(self::$_reconnectTimer);
self::$_reconnectTimer = null;
}
}

/**
* On.
* @param string $event
Expand All @@ -185,6 +204,9 @@ public static function clearTimer()
*/
public static function on($event, $callback)
{
if (!self::$_isWorkermanEnv) {
throw new \Exception('Channel\\Client not support on method when it is not in the workerman environment.');
}
if(!is_callable($callback))
{
throw new \Exception('callback is not callable');
Expand All @@ -200,16 +222,19 @@ public static function on($event, $callback)
*/
public static function subscribe($events)
{
self::connect();
$events = (array)$events;
foreach($events as $event)
{
if(!isset(self::$_events[$event]))
{
self::$_events[$event] = null;
}
}
self::$_remoteConnection->send(serialize(array('type' => 'subscribe', 'channels'=>(array)$events)));
if (!self::$_isWorkermanEnv) {
throw new \Exception('Channel\\Client not support subscribe method when it is not in the workerman environment.');
}
self::connect();
$events = (array)$events;
foreach($events as $event)
{
if(!isset(self::$_events[$event]))
{
self::$_events[$event] = null;
}
}
self::$_remoteConnection->send(serialize(array('type' => 'subscribe', 'channels'=>(array)$events)));
}

/**
Expand All @@ -219,13 +244,16 @@ public static function subscribe($events)
*/
public static function unsubscribe($events)
{
if (!self::$_isWorkermanEnv) {
throw new \Exception('Channel\\Client not support unsubscribe method when it is not in the workerman environment.');
}
self::connect();
$events = (array)$events;
foreach($events as $event)
{
unset(self::$_events[$event]);
}
self::$_remoteConnection->send(serialize(array('type' => 'unsubscribe', 'channels'=>$events)));
self::$_remoteConnection->send(serialize(array('type' => 'unsubscribe', 'channels'=>$events)));
}

/**
Expand All @@ -236,6 +264,12 @@ public static function unsubscribe($events)
public static function publish($events, $data)
{
self::connect();
self::$_remoteConnection->send(serialize(array('type' => 'publish', 'channels'=>(array)$events, 'data' => $data)));
if (self::$_isWorkermanEnv) {
self::$_remoteConnection->send(serialize(array('type' => 'publish', 'channels' => (array)$events, 'data' => $data)));
} else {
$body = serialize(array('type' => 'publish', 'channels'=>(array)$events, 'data' => $data));
$buffer = pack('N', 4+strlen($body)) . $body;
fwrite(self::$_remoteConnection, $buffer);
}
}
}

0 comments on commit 73def8e

Please sign in to comment.