diff --git a/agent/Config/config.php b/agent/Config/config.php index 6d14dc4..9b6061d 100644 --- a/agent/Config/config.php +++ b/agent/Config/config.php @@ -50,8 +50,10 @@ 'task_max_load_size' => 8192, // 最大进程数,"任务进程内存"表最大行数 'process_max_size' => 1024, - // 同时运行任务最大数量, "任务内存"表最大行数 - 'task_max_concurrent_size' => 1024, + // 一分钟内运行任务最大数量, "任务内存"表最大行数 + 'task_max_concurrent_size' => 8192, + // 日志临时存储最大条数 + 'log_temp_store_max_size' => 16384, // 10分钟重新加载一次任务到内存表 'task_reload_interval' => 600, diff --git a/agent/Libs/DbLog.php b/agent/Libs/DbLog.php index 590e456..fcf193f 100644 --- a/agent/Libs/DbLog.php +++ b/agent/Libs/DbLog.php @@ -18,8 +18,6 @@ class DbLog * @var LogChannel */ protected static $_logChannel = null; - // 日志聚合,把所有日志聚合成一条,最后一条标识完结则存储到库中 - protected static $logs = []; /** * 写入db日志最大重试次数 @@ -28,7 +26,7 @@ class DbLog */ protected static $_retryMaxNum = 3; - public static $table; + protected static $_table; /** * TYPE_INT 1(Mysql TINYINT): 2 ^ 8 = -128 ~ 127 @@ -37,7 +35,7 @@ class DbLog * TYPE_INT 8(Mysql BIGINT): 2 ^ (8 * 8) = -9223372036854775808 ~ 9223372036854775807 * @var array */ - private static $column = [ + private static $_column = [ 'msg' => [SwooleTable::TYPE_STRING, 65535], ]; @@ -52,11 +50,11 @@ public static function init() self::$_retryMaxNum = configItem('flush_db_log_max_retry_num', 3); /** 用来存储日志内容 */ - self::$table = new SwooleTable(TASK_MAX_LOAD_SIZE); - foreach (self::$column as $key => $v) { - self::$table->column($key, $v[0], $v[1]); + self::$_table = new SwooleTable(LOG_TEMP_STORE_MAX_SIZE); + foreach (self::$_column as $key => $v) { + self::$_table->column($key, $v[0], $v[1]); } - self::$table->create(); + self::$_table->create(); /** End */ // 请求完结的时候把日志Flush进文件 @@ -149,7 +147,7 @@ private static function writeLog($logPrefix) if ($originLog !== false) { // key = 任务Id + 运行Id + 第几次重试 $key = self::getMemoryTableKey($originLog['taskId'], $originLog['runId'], $originLog['retries']); - $msg = self::$table->get($key, 'msg'); + $msg = self::$_table->get($key, 'msg'); if ($msg === false) { $msg = ''; } @@ -160,7 +158,7 @@ private static function writeLog($logPrefix) $msg .= $originLog['msg'] . PHP_EOL; } // 暂存日志内容 - self::$table->set($key, ['msg' => $msg]); + self::$_table->set($key, ['msg' => $msg]); } // 如果不是最后一条日志,就跳过 if (!isset($originLog['end']) || !$originLog['end']) { @@ -168,7 +166,7 @@ private static function writeLog($logPrefix) } // 删除并发日志 if ($originLog['code'] == Constants::CUSTOM_CODE_CONCURRENCY_LIMIT) { - self::$table->del($key); + self::$_table->del($key); continue; } // 存储日志 @@ -190,12 +188,12 @@ private static function writeLog($logPrefix) self::$_logChannel->push($originLog); } else { // 入库失败重试次数达到阀值就删除掉 - self::$table->del($key); + self::$_table->del($key); logWarning($logPrefix . json_encode($originLog, JSON_UNESCAPED_UNICODE)); } } else { // 入库后就删除掉 - self::$table->del($key); + self::$_table->del($key); } } } diff --git a/agent/Libs/Donkeyid.php b/agent/Libs/Donkeyid.php index fde699b..d7ba892 100755 --- a/agent/Libs/Donkeyid.php +++ b/agent/Libs/Donkeyid.php @@ -15,12 +15,12 @@ */ class Donkeyid { - static $donkeyid; + public static $donkeyid; - private $node_id; - private $epoch; - private $table; - private $lock; + private $_node_id; + private $_epoch; + private $_table; + private $_lock; const snowflake = 0; const TIMESTAMP_BITS = 42; @@ -44,8 +44,8 @@ public function __construct($node_id = false, $epoch = false) if ($epoch === false) { $epoch = ini_get("donkeyid.epoch"); } - $this->node_id = ($node_id == false || $node_id < 0) ? 0 : $node_id; - $this->epoch = ($epoch == false || $epoch < 0) ? 0 : $epoch; + $this->_node_id = ($node_id == false || $node_id < 0) ? 0 : $node_id; + $this->_epoch = ($epoch == false || $epoch < 0) ? 0 : $epoch; $this->create_table(); } @@ -69,11 +69,11 @@ static function getInstance() */ private function create_table() { - $this->table = new SwooleTable(3); - $this->table->column("last_timestamp", \swoole_table::TYPE_INT, 8); - $this->table->column("sequence", \swoole_table::TYPE_INT, 4); - $this->table->create(); - $this->lock = new SwooleLock(SWOOLE_SPINLOCK); + $this->_table = new SwooleTable(3); + $this->_table->column("last_timestamp", \swoole_table::TYPE_INT, 8); + $this->_table->column("sequence", \swoole_table::TYPE_INT, 4); + $this->_table->create(); + $this->_lock = new SwooleLock(SWOOLE_SPINLOCK); } /** @@ -105,9 +105,9 @@ public function dk_get_next_id() $isMacOS = isMacOS(); // MacOS Mojave使用锁会异常退出 if (!$isMacOS) { - $this->lock->lock(); + $this->_lock->lock(); } - $col = $this->table->get(self::snowflake); + $col = $this->_table->get(self::snowflake); if ($col == false || $col["last_timestamp"] > $now) { $last_timestamp = $now; $sequence = mt_rand(0, 10) % 2; @@ -121,13 +121,13 @@ public function dk_get_next_id() $now = $this->wait_next_ms(); } } - $this->table->set(self::snowflake, array("last_timestamp" => $now, "sequence" => $sequence)); + $this->_table->set(self::snowflake, array("last_timestamp" => $now, "sequence" => $sequence)); // MacOS Mojave使用锁会异常退出 if (!$isMacOS) { - $this->lock->unlock(); + $this->_lock->unlock(); } - $id = (($now - ($this->epoch * 1000) & (-1 ^ (-1 << self::TIMESTAMP_BITS))) << self::TIMESTAMP_LEFT_SHIFT) - | (($this->node_id & (-1 ^ (-1 << self::NODE_ID_BITS))) << self::NODE_ID_LEFT_SHIFT) + $id = (($now - ($this->_epoch * 1000) & (-1 ^ (-1 << self::TIMESTAMP_BITS))) << self::TIMESTAMP_LEFT_SHIFT) + | (($this->_node_id & (-1 ^ (-1 << self::NODE_ID_BITS))) << self::NODE_ID_LEFT_SHIFT) | ($sequence); return $id; } @@ -141,9 +141,10 @@ public function dk_get_next_id() */ public function dk_parse_id($id) { - $ret["time"] = ($id >> self::TIMESTAMP_LEFT_SHIFT) + ($this->epoch * 1000); - $ret["node_id"] = ($id >> self::NODE_ID_LEFT_SHIFT) & (-1 ^ (-1 << self::NODE_ID_BITS)); - $ret["sequence"] = $id & (-1 ^ (-1 << self::SEQUENCE_BITS)); - return $ret; + return [ + 'time' => ($id >> self::TIMESTAMP_LEFT_SHIFT) + ($this->_epoch * 1000), + 'node_id' => ($id >> self::NODE_ID_LEFT_SHIFT) & (-1 ^ (-1 << self::NODE_ID_BITS)), + 'sequence' => $id & (-1 ^ (-1 << self::SEQUENCE_BITS)) + ]; } } \ No newline at end of file diff --git a/agent/Libs/LoadTasks.php b/agent/Libs/LoadTasks.php index 19671fc..3a6f5de 100755 --- a/agent/Libs/LoadTasks.php +++ b/agent/Libs/LoadTasks.php @@ -127,7 +127,6 @@ public static function load($loadSpecifiedIds = []) break; } - // 如果内存不够,会出现 Swoole_table::set(): unable to allocate memory self::$_table->set($task['id'], [ 'name' => $task['name'], 'rule' => $task['rule'], diff --git a/agent/Libs/Loader.php b/agent/Libs/Loader.php index 5789b3b..6d1cda9 100755 --- a/agent/Libs/Loader.php +++ b/agent/Libs/Loader.php @@ -17,7 +17,7 @@ class Loader /** * 命名空间的路径 */ - protected static $namespaces; + protected static $_namespaces; /** * 自动载入类 @@ -27,10 +27,10 @@ class Loader public static function autoload($class) { $root = explode('\\', trim($class, '\\'), 2); - if (count($root) > 1 and isset(self::$namespaces[$root[0]])) { - $filePath = self::$namespaces[$root[0]] . DIRECTORY_SEPARATOR . str_replace('\\', DIRECTORY_SEPARATOR, $root[1]) . '.php'; + if (count($root) > 1 and isset(self::$_namespaces[$root[0]])) { + $filePath = self::$_namespaces[$root[0]] . DIRECTORY_SEPARATOR . str_replace('\\', DIRECTORY_SEPARATOR, $root[1]) . '.php'; if (is_file($filePath)) { - require $filePath; + require_once $filePath; } } } @@ -53,6 +53,6 @@ public static function register($prepend = false) */ public static function addNameSpace($root, $path) { - self::$namespaces[$root] = $path; + self::$_namespaces[$root] = $path; } } diff --git a/agent/Libs/Log.php b/agent/Libs/Log.php index 32989f2..41744af 100755 --- a/agent/Libs/Log.php +++ b/agent/Libs/Log.php @@ -5,18 +5,18 @@ class Log { - protected static $_log_path; + protected static $_logPath; /** * 日志文件名前缀 * @var */ protected static $_logNamePrefix = ''; - protected static $_date_fmt = 'Y-m-d H:i:s.u'; + protected static $_dateFmt = 'Y-m-d H:i:s.u'; /** * 日期格式化函数 * @var string */ - protected static $_date_format_callable = 'date'; + protected static $_dateFormatCallable = 'date'; protected static $_enabled = TRUE; /** * 当前变量内日志记录的数量 @@ -56,14 +56,14 @@ class Log * 是否记录所有级别日志 * @var bool */ - protected static $_record_all_levels = false; + protected static $_recordAllLevels = false; /** * @var LogChannel */ protected static $_logChannel = null; - protected static $fp = null; + protected static $_fp = null; protected static $_mode = 0664; @@ -77,8 +77,8 @@ public static function init() $defaultLogPath = ROOT_PATH . 'Logs' . DIRECTORY_SEPARATOR; // 默认日志都写在此目录 - self::$_log_path = !empty($config['path']) ? $config['path'] : $defaultLogPath; - self::$_log_path = rtrim(self::$_log_path, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; + self::$_logPath = !empty($config['path']) ? $config['path'] : $defaultLogPath; + self::$_logPath = rtrim(self::$_logPath, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; if (!empty($config['prefix']) && is_string($config['prefix'])) { self::$_logNamePrefix = $config['prefix']; @@ -88,11 +88,11 @@ public static function init() self::$_mode = $config['mode']; } - if (!is_dir(self::$_log_path)) { - createDir(self::$_log_path); + if (!is_dir(self::$_logPath)) { + createDir(self::$_logPath); } - if (!is_writable(self::$_log_path)) { + if (!is_writable(self::$_logPath)) { self::$_enabled = FALSE; } @@ -115,9 +115,9 @@ public static function init() } if ($config['date_format'] != '') { - self::$_date_fmt = $config['date_format']; - if (preg_match('#(?= self::$_autoFlush || ($logCount > 0 && self::$_timerFFlush && (microtime(true) - $startTime) > self::$_timerFFlush)) { - $boolean = fflush(self::$fp); + $boolean = fflush(self::$_fp); // 写入成功才把记录数清零 $boolean && $logCount = 0; $startTime = microtime(true); @@ -220,20 +220,20 @@ public static function flush() // 判断日志是否隔天了 if (self::getFileName($log['fn']) !== $fileName) { // 关闭内存映射,底层会自动执行fflush将数据同步到磁盘文件 - fclose(self::$fp); + fclose(self::$_fp); // 设置新的日志文件名称,用来判断 $fileName = self::getFileName($log['fn']); // 获取新的日志文件路径 $filePath = self::getLogFilePath($log['fn']); - self::$fp = fopen($filePath, 'ab'); - if (empty(self::$fp)) { + self::$_fp = fopen($filePath, 'ab'); + if (empty(self::$_fp)) { echo self::formatLogMessage(__METHOD__ . ' 打开日志文件失败', 'Core Error'); return FALSE; } @chmod($filePath, self::$_mode); } - fwrite(self::$fp, $message); + fwrite(self::$_fp, $message); $logCount++; continue; } @@ -249,18 +249,18 @@ public static function shutdown() return FALSE; } - if (!is_resource(self::$fp)) { + if (!is_resource(self::$_fp)) { $filePath = self::getLogFilePath(); - self::$fp = fopen($filePath, 'ab'); - if (empty(self::$fp)) { + self::$_fp = fopen($filePath, 'ab'); + if (empty(self::$_fp)) { echo self::formatLogMessage(__METHOD__ . ' 打开日志文件失败', 'Core Error'); return FALSE; } @chmod($filePath, self::$_mode); } - if (is_resource(self::$fp)) { - fflush(self::$fp); + if (is_resource(self::$_fp)) { + fflush(self::$_fp); } $fileName = self::getFileName(); @@ -275,13 +275,13 @@ public static function shutdown() // 判断日志是否隔天了 if (self::getFileName($log['fn']) !== $fileName) { // 关闭内存映射,底层会自动执行fflush将数据同步到磁盘文件 - fclose(self::$fp); + fclose(self::$_fp); // 设置新的日志文件名称,用来判断 $fileName = self::getFileName($log['fn']); // 获取新的日志文件路径 $filePath = self::getLogFilePath($log['fn']); - self::$fp = fopen($filePath, 'ab'); - if (empty(self::$fp)) { + self::$_fp = fopen($filePath, 'ab'); + if (empty(self::$_fp)) { echo self::formatLogMessage(__METHOD__ . ' 打开日志文件失败', 'Core Error'); return FALSE; } @@ -289,17 +289,17 @@ public static function shutdown() unset($filePath); } - fwrite(self::$fp, $message); + fwrite(self::$_fp, $message); $logCount++; } if ($logCount >= self::$_autoFlush) { - fflush(self::$fp); + fflush(self::$_fp); $logCount = 0; } } // 关闭内存映射,底层会自动执行fflush将数据同步到磁盘文件 - fclose(self::$fp); + fclose(self::$_fp); } } @@ -323,8 +323,8 @@ public static function getChannel() public static function formatLogMessage($message, $level, $time = null) { $time ?: $time = microtime(true); - $callback = self::$_date_format_callable; - $date = $callback(...array(self::$_date_fmt, $time)); + $callback = self::$_dateFormatCallable; + $date = $callback(...array(self::$_dateFmt, $time)); return '[' . $date . "] [$level] $message" . PHP_EOL; } @@ -338,7 +338,7 @@ public static function formatLogMessage($message, $level, $time = null) */ public static function getLogFilePath($suffix = '', $subdirectory = '') { - $logPath = self::$_log_path; + $logPath = self::$_logPath; if ($subdirectory) { $logPath .= trim($subdirectory, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; } @@ -427,12 +427,12 @@ public static function uDate($format = 'Y-m-d H:i:s.u', $uTimeStamp = null) } /** - * @param string $subdirectory + * @param string $subDirectory * @param int $mode */ - public static function createLogDir($subdirectory, $mode = 0744) + public static function createLogDir($subDirectory, $mode = 0744) { - $logDir = self::$_log_path . trim($subdirectory, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; + $logDir = self::$_logPath . trim($subDirectory, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; if (!is_dir($logDir)) { createDir($logDir, $mode); } diff --git a/agent/Libs/Process.php b/agent/Libs/Process.php index 338cc4c..e251137 100755 --- a/agent/Libs/Process.php +++ b/agent/Libs/Process.php @@ -14,8 +14,8 @@ class Process { - private static $table; - static private $column = [ + private static $_table; + private static $_column = [ 'taskId' => [SwooleTable::TYPE_INT, 8], 'runId' => [SwooleTable::TYPE_INT, 8], 'sec' => [SwooleTable::TYPE_INT, 8], @@ -26,20 +26,21 @@ class Process const PROCESS_START = 0;//程序开始运行 const PROCESS_STOP = 1;//程序结束运行 - public $task; - static public $process_list = []; - private static $process_stdout = []; - private static $max_stdout = 60000; + private static $_processList = []; + private static $_processStdOut = []; + private static $_maxStdOut = 60000; // 输出写入按运行Id生成的日志文件 - private static $process_logWriteFile = []; + private static $_processLogWriteFile = []; + + public $task; public static function init() { - self::$table = new SwooleTable(PROCESS_MAX_SIZE); - foreach (self::$column as $key => $v) { - self::$table->column($key, $v[0], $v[1]); + self::$_table = new SwooleTable(PROCESS_MAX_SIZE); + foreach (self::$_column as $key => $v) { + self::$_table->column($key, $v[0], $v[1]); } - self::$table->create(); + self::$_table->create(); } /** @@ -55,9 +56,9 @@ public static function signal($server) //必须为false,非阻塞模式 while ($ret = SwooleProcess::wait(false)) { $pid = $ret['pid']; - if ($processTask = self::$table->get($pid)) { + if ($processTask = self::$_table->get($pid)) { $processTask['end'] = microtime(true); - $processTask['stdout'] = isset(self::$process_stdout[$pid]) ? self::$process_stdout[$pid] : ""; + $processTask['stdout'] = isset(self::$_processStdOut[$pid]) ? self::$_processStdOut[$pid] : ""; $metric = Constants::MONITOR_KEY_EXEC_SUCCESS; $code = Constants::CUSTOM_CODE_END_RUN; @@ -83,7 +84,7 @@ public static function signal($server) } // 关闭管道监听 swoole_event_del($processTask['pipe']); - self::$table->del($pid); + self::$_table->del($pid); $consumeTime = $processTask['end'] - $processTask['start']; @@ -159,8 +160,8 @@ public static function signal($server) Report::monitor($metric . '.' . $processTask['taskId']); } // 关闭创建的好的管道 - self::$process_list[$pid]->close(); - unset(self::$process_list[$pid], self::$process_stdout[$pid], self::$process_logWriteFile[$pid]); + self::$_processList[$pid]->close(); + unset(self::$_processList[$pid], self::$_processStdOut[$pid], self::$_processLogWriteFile[$pid]); } }); @@ -204,19 +205,19 @@ public static function create_process($task) } swoole_event_add($process->pipe, function ($pipe) use ($pid) { - !isset(self::$process_stdout[$pid]) && self::$process_stdout[$pid] = ""; + !isset(self::$_processStdOut[$pid]) && self::$_processStdOut[$pid] = ""; // 默认每次读取8192字节 - $tmp = self::$process_list[$pid]->read(); + $tmp = self::$_processList[$pid]->read(); if ($tmp) { - $len = mb_strlen(self::$process_stdout[$pid]); + $len = mb_strlen(self::$_processStdOut[$pid]); // 如果一次性读取超过了最大长度,就截取 - if (($length = (self::$max_stdout - $len)) > 0 && $tmp) { - self::$process_stdout[$pid] .= mb_substr($tmp, 0, $length); + if (($length = (self::$_maxStdOut - $len)) > 0) { + self::$_processStdOut[$pid] .= mb_substr($tmp, 0, $length); } // 是否按运行Id生成日志文件并记录输出 - if (isset(self::$process_logWriteFile[$pid])) { - $task = self::$process_logWriteFile[$pid]; + if (isset(self::$_processLogWriteFile[$pid])) { + $task = self::$_processLogWriteFile[$pid]; $logPath = date('Y-m-d', $task['sec']) . DIRECTORY_SEPARATOR . $task['taskId']; Log::createLogDir($logPath); $logFilePath = Log::getLogFilePath($task['runId'], $logPath); @@ -233,7 +234,7 @@ public static function create_process($task) } }); - self::$table->set($pid, [ + self::$_table->set($pid, [ 'taskId' => $task['taskId'], 'runId' => $task['runId'], 'sec' => $task['sec'], @@ -243,14 +244,14 @@ public static function create_process($task) // 是否记录输出到日志文件 if ($task['logOpt'] == Constants::LOG_OPT_WRITE_FILE) { - self::$process_logWriteFile[$pid] = [ + self::$_processLogWriteFile[$pid] = [ 'taskId' => $task['taskId'], 'runId' => $task['runId'], 'sec' => $task['sec'], ]; } - self::$process_list[$pid] = $process; + self::$_processList[$pid] = $process; // 上报监控系统创建进程成功 Report::monitor(Constants::MONITOR_KEY_CREATE_PROCESS_SUCCESS . '.' . $task['taskId']); @@ -281,12 +282,12 @@ public static function create_process($task) */ public function exec(SwooleProcess $process) { - if (self::$process_list) { - foreach (self::$process_list as $p) { + if (self::$_processList) { + foreach (self::$_processList as $p) { $p->close(); } } - self::$process_list = []; + self::$_processList = []; $command = $this->task['command']; if ($this->task['runUser'] && !self::changeUser($this->task['runUser'])) { $msg = 'RunUser: ' . $this->task['runUser']; @@ -321,7 +322,7 @@ public function exec(SwooleProcess $process) // 上报监控系统创建进程失败 Report::monitor(Constants::MONITOR_KEY_CHILD_PROCESS_STARTS_RUN . '.' . $this->task['taskId']); - DbLog::log($this->task['runId'], $this->task['taskId'], Constants::CUSTOM_CODE_CHILD_PROCESS_STARTS_RUN, '任务开始执行'); + DbLog::log($this->task['runId'], $this->task['taskId'], Constants::CUSTOM_CODE_CHILD_PROCESS_STARTS_RUN, '任务开始执行', $this->task['command']); logInfo('任务开始执行' . (!empty($this->task['retries']) ? '(第' . $this->task['retries'] . '次重试)' : '') . ': Id = ' . $this->task['taskId'] . '; runId = ' . $this->task['runId'] . '; command = ' . $this->task['command']); if (isWindowsOS()) { @@ -363,6 +364,6 @@ public static function changeUser($user) */ public static function getTable() { - return self::$table; + return self::$_table; } } \ No newline at end of file diff --git a/agent/Libs/RedisClient.php b/agent/Libs/RedisClient.php index 97e59a3..22fc843 100755 --- a/agent/Libs/RedisClient.php +++ b/agent/Libs/RedisClient.php @@ -31,7 +31,7 @@ class RedisClient * @static * @var array */ - protected static $_default_config = array( + protected static $_defaultConfig = array( 'socket_type' => 'tcp', 'host' => '127.0.0.1', 'password' => NULL, @@ -149,7 +149,7 @@ class RedisClient * 是否使用的是PHPRedis扩展 * @var bool */ - protected $_is_phpRedis = true; + protected $_isPhpRedis = true; // ------------------------------------------------------------------------ @@ -164,7 +164,7 @@ public function __construct($config) { // 加载配置 if (!empty($config)) { - self::$_config = array_merge(self::$_default_config, $config); + self::$_config = array_merge(self::$_defaultConfig, $config); } // 没有配置就禁用Redis @@ -179,8 +179,8 @@ public function __construct($config) } // 如果是PHPRedis 或者 PHPRedis集群 - $this->_is_phpRedis = in_array(self::$_config['client_type'], [self::CLIENT_TYPE_PHP_REDIS, self::CLIENT_TYPE_PHP_REDIS_CLUSTER], true); - if ($this->_is_phpRedis) { + $this->_isPhpRedis = in_array(self::$_config['client_type'], [self::CLIENT_TYPE_PHP_REDIS, self::CLIENT_TYPE_PHP_REDIS_CLUSTER], true); + if ($this->_isPhpRedis) { // 是否支持Redis if (!$this->is_supported()) { throw new Exception("Redis class is not exists,Please make sure is installed!"); @@ -377,7 +377,7 @@ public function setExpireIncr($key, $expireTime) public function evalScript($scriptKey, $keys, $args = []) { // 如果不是用PHPRedis扩展 - if (!$this->_is_phpRedis) { + if (!$this->_isPhpRedis) { // 如果是incr,就用另外的方式实现 if ($scriptKey == 'incr') { return $this->setExpireIncr($keys, $args); diff --git a/agent/Libs/Server.php b/agent/Libs/Server.php index fe75c62..bccf1f7 100755 --- a/agent/Libs/Server.php +++ b/agent/Libs/Server.php @@ -2,7 +2,6 @@ namespace Libs; -use \Models\DB; use \Swoole\Server as SwooleServer; use \Swoole\Process as SwooleProcess; @@ -33,16 +32,35 @@ class Server extends ServerBase */ public $_initTaskMaps = []; - protected $pid_file; // 状态进程Pid文件 public static $statsPidFile; // 接收数据处理方法映射数组 - protected $receiveModeProcessMaps = array( + protected $_receiveModeProcessMaps = array( Constants::SW_CONTROL_CMD => 'controlCommand', Constants::SW_API_CMD => 'apiCommand', ); + protected static $_defaultOptions = [ + 'd|daemon' => '启用守护进程模式', + 'h|host?' => '指定监听地址', + 'p|port?' => '指定监听端口', + 'help' => '显示帮助界面', +// 'b|base' => '使用BASE模式启动', +// 'w|worker?' => '设置Worker进程的数量', + 'r|thread?' => '设置Reactor线程的数量', +// 't|tasker?' => '设置Task进程的数量', + ]; + + /** + * SwooleServer constructor. + * + * @param string $host + * @param int $port + * @param bool $ssl 是否开启安全加密 + * + * @throws \Exception + */ public function __construct($host, $port = 0, $ssl = false) { parent::__construct($host, $port, $ssl); @@ -117,6 +135,9 @@ public static function init() exit("Server is not running\n"); } $pid = file_get_contents(self::$statsPidFile); + /** + * @see Server::initServer() + */ posix_kill($pid, SIGUSR1); exit(0); } @@ -138,28 +159,24 @@ public function run(array $setting = []) if (self::$pidFile) { $this->_config['pid_file'] = self::$pidFile; } - if (!empty(self::$options['daemon'])) { + if (!empty(self::$_options['daemon'])) { $this->_config['daemonize'] = true; } - if (!empty(self::$options['thread'])) { - $this->_config['reator_num'] = intval(self::$options['thread']); - } - - $this->sw->on('Start', [$this, 'onMasterStart']); - $this->sw->on('ManagerStart', [$this, 'onManagerStart']); - $this->sw->on('Shutdown', [$this, 'onShutdown']); - $this->sw->on('ManagerStop', [$this, 'onManagerStop']); - $this->sw->on('WorkerStart', [$this, 'onWorkerStart']); - $this->sw->on('Connect', [$this, 'onConnect']); - $this->sw->on('Receive', [$this, 'onReceive']); - $this->sw->on('PipeMessage', [$this, 'onPipeMessage']); - $this->sw->on('Close', [$this, 'onClose']); - $this->sw->on('WorkerStop', [$this, 'onWorkerStop']); + $this->_sw->on('Start', [$this, 'onMasterStart']); + $this->_sw->on('ManagerStart', [$this, 'onManagerStart']); + $this->_sw->on('Shutdown', [$this, 'onShutdown']); + $this->_sw->on('ManagerStop', [$this, 'onManagerStop']); + $this->_sw->on('WorkerStart', [$this, 'onWorkerStart']); + $this->_sw->on('Connect', [$this, 'onConnect']); + $this->_sw->on('Receive', [$this, 'onReceive']); + $this->_sw->on('PipeMessage', [$this, 'onPipeMessage']); + $this->_sw->on('Close', [$this, 'onClose']); + $this->_sw->on('WorkerStop', [$this, 'onWorkerStop']); if (is_callable([$this, 'onTask'])) { - $this->sw->on('Task', [$this, 'onTask']); - $this->sw->on('Finish', [$this, 'onFinish']); + $this->_sw->on('Task', [$this, 'onTask']); + $this->_sw->on('Finish', [$this, 'onFinish']); } parent::run(); @@ -172,39 +189,36 @@ public function run(array $setting = []) */ protected function initServer() { - DB::init();// 数据库实例初始化 - LoadTasks::init();// 载入crontab表符合条件的记录 - Donkeyid::init();//初始化donkeyid对象 - Process::init();//载入任务进程处理表,目前有哪些进程在执行任务 - Tasks::init();// 载入任务表,当前这一分钟要执行的任务 + LoadTasks::init(); // 载入crontab表符合条件的记录 + Donkeyid::init(); // 初始化donkeyid对象 + Process::init(); //载入任务进程处理表,目前有哪些进程在执行任务 + Tasks::init(); // 载入任务表,当前这一分钟要执行的任务 - Report::init(); + // 新增监控告警进程 + Report::init(); // 告警模块 $monitorAlarmProcessNum = configItem('monitor_alarm_process_num', 1); for ($i = 0; $i < $monitorAlarmProcessNum; $i++) { - $this->sw->addProcess(new SwooleProcess(function ($process) use ($i) { + $this->_sw->addProcess(new SwooleProcess(function ($process) use ($i) { $this->setProcessName('monitorAlarm|' . $i, null, $process); -// $process->name($this->_serverName . '|monitorAlarm|' . $i); Report::monitorAlarm(); })); } - /** End */ - /** - * DB日志进程,异步刷日志到数据库 - */ + // DB日志进程,异步刷日志到数据库 DbLog::init(); - $this->sw->addProcess(new SwooleProcess(function ($process) { + $this->_sw->addProcess(new SwooleProcess(function ($process) { $this->setProcessName('logFlushToDB', null, $process); -// $process->name($this->_serverName . '|logFlushToDB'); DbLog::flush(); })); - /** End */ + // 如果配置了主进程PID文件路径,就把主进程PID写入文件,以供调用 if (self::$statsPidFile) { - $this->sw->addProcess(new SwooleProcess(function ($process) { + $this->_sw->addProcess(new SwooleProcess(function ($process) { $this->setProcessName('listenPipeProcess', null, $process); -// $process->name($this->_serverName . '|listenPipeProcess'); + // 把pid写入文件,给其它地方使用 file_put_contents(self::$statsPidFile, $process->pid); + + // 捕获用户自定义信号,用来打印当前状态信息 SwooleProcess::signal(SIGUSR1, function ($sig) { $tasks = LoadTasks::getTable(); $content = 'The node has no tasks' . PHP_EOL; @@ -217,8 +231,9 @@ protected function initServer() // 当前待执行的任务表 $tasks = Tasks::$table; - if (count($tasks) > 0) { - $content .= 'Task to be performed:' . PHP_EOL; + $taskCount = count($tasks); + if ($taskCount > 0) { + $content .= 'Task to be performed:' . $taskCount . PHP_EOL; foreach ($tasks as $task) { $content .= 'taskId = ' . $task['taskId'] . '; exec time = ' . date('Y-m-d H:i:s', $task['sec']) . '; retries = ' . $task['retries'] . PHP_EOL; } @@ -231,7 +246,7 @@ protected function initServer() parent::initServer(); // 连接中心服注册服务 - $this->sw->addProcess(new SwooleProcess([$this, 'register'])); + $this->_sw->addProcess(new SwooleProcess([$this, 'register'])); } @@ -242,13 +257,13 @@ protected function initServer() */ public function onMasterStart(SwooleServer $server) { - $this->setProcessName(': master -host=' . $this->host . ' -port=' . $this->port, ''); + $this->setProcessName(': master -host=' . $this->_host . ' -port=' . $this->_port, ''); $this->formatOutput("Master PID = {$server->master_pid}"); $this->formatOutput("Manager PID = {$server->manager_pid}"); $this->formatOutput("Swoole Version = [" . SWOOLE_VERSION . "]"); $this->formatOutput("Server IP = " . SERVER_INTERNAL_IP); - $this->formatOutput("Listen IP = {$this->host}"); - $this->formatOutput("Listen Port = {$this->port}"); + $this->formatOutput("Listen IP = {$this->_host}"); + $this->formatOutput("Listen Port = {$this->_port}"); $this->formatOutput("Worker Number = {$server->setting['worker_num']}"); $this->formatOutput("Task Number = {$server->setting['task_worker_num']}"); } @@ -322,7 +337,7 @@ public function onPipeMessage(SwooleServer $server, $src_worker_id, $data) $msg = '第' . $item['currentRetries'] . '次重试' . PHP_EOL . $msg; } - //正在运行标示 + // 正在运行标示 if (Tasks::$table->exist($runId)) { Tasks::$table->set($runId, [ 'runStatus' => LoadTasks::RUN_STATUS_START, @@ -397,10 +412,10 @@ public function onReceive(SwooleServer $server, $fd, $from_id, $data) ]; // 是否有消息类型处理方法 - if (isset($this->receiveModeProcessMaps[$data['type']])) { + if (isset($this->_receiveModeProcessMaps[$data['type']])) { call_user_func_array([ $this, - $this->receiveModeProcessMaps[$data['type']] + $this->_receiveModeProcessMaps[$data['type']] ], [$server, $task, $data]); } else { $pack = Packet::packFormat("unknown task type", Constants::STATUS_CODE_UNKNOW_TASK_TYPE); @@ -521,12 +536,12 @@ public function register(SwooleProcess $process) public function send($client_id, $data) { $data = Packet::packEncode($data); - return $this->sw->send($client_id, $data); + return $this->_sw->send($client_id, $data); } public function __call($func, $params) { - return call_user_func_array([$this->sw, $func], $params); + return call_user_func_array([$this->_sw, $func], $params); } public function onShutdown(SwooleServer $server) diff --git a/agent/Libs/ServerBase.php b/agent/Libs/ServerBase.php index e9c97d5..0cb99f5 100755 --- a/agent/Libs/ServerBase.php +++ b/agent/Libs/ServerBase.php @@ -20,12 +20,12 @@ */ abstract class ServerBase { - protected static $options = array(); + protected static $_options = array(); /** * SwooleServer对象 * @var null|SwooleServer */ - protected $sw = null; + protected $_sw = null; /** * 默认日期格式 @@ -34,29 +34,29 @@ abstract class ServerBase */ public $dateFormat; - protected $host = null; + protected $_host = null; - protected $port = null; + protected $_port = null; - protected $ssl = false; + protected $_ssl = false; - protected static $beforeStopCallback; - protected static $beforeReloadCallback; + protected static $_beforeStopCallback; + protected static $_beforeReloadCallback; public static $swooleMode; public static $optionKit; public static $pidFile; - public static $defaultOptions = array( + protected static $_defaultOptions = [ 'd|daemon' => '启用守护进程模式', 'h|host?' => '指定监听地址', 'p|port?' => '指定监听端口', 'help' => '显示帮助界面', 'b|base' => '使用BASE模式启动', -// 'w|worker?' => '设置Worker进程的数量', - 'r|thread?' => '设置Reactor线程的数量', -// 't|tasker?' => '设置Task进程的数量', - ); + 'w|worker_num?' => '设置Worker进程的数量', + 'r|reactor_num?' => '设置Reactor线程的数量', + 't|task_worker_num?' => '设置Task进程的数量', + ]; /** * 连接池名称 @@ -106,22 +106,42 @@ public function __construct($host = "0.0.0.0", $port = 0, $ssl = false) } $flag = $ssl ? (SWOOLE_SOCK_TCP | SWOOLE_SSL) : SWOOLE_SOCK_TCP; - if (!empty(self::$options['base'])) { + if (!empty(self::$_options['base'])) { self::$swooleMode = SWOOLE_BASE; } elseif (extension_loaded('swoole')) { self::$swooleMode = SWOOLE_PROCESS; } - $this->sw = new SwooleServer($host, $port, self::$swooleMode, $flag); + /** 脚本启动参数配置 */ + // 自定义监听地址, 针对多网卡支持 + if (!empty(self::$_options['host'])) { + $host = intval(self::$_options['host']); + } + + $opt = [ + 'port' => $port, + 'worker_num' => 10, + 'reactor_num' => 0, + 'task_worker_num' => 4, + ]; + foreach($opt as $key => $val) { + // 自定义监听启动端口 + if (!empty(self::$_options[$key])) { + $opt[$key] = intval(self::$_options[$key]); + } + } + /** END */ + + $this->_sw = new SwooleServer($host, $opt['port'], self::$swooleMode, $flag); //store current ip port - $this->host = $host; - $this->port = $this->sw->port; - $this->ssl = $ssl; + $this->_host = $this->_sw->host; + $this->_port = $this->_sw->port; + $this->_ssl = $ssl; // 定义内部IP和端口常量 !defined('SERVER_INTERNAL_IP') && define('SERVER_INTERNAL_IP', getServerInternalIp()); - !defined('SERVER_PORT') && define('SERVER_PORT', $this->port); + !defined('SERVER_PORT') && define('SERVER_PORT', $this->_port); $this->dateFormat = configItem('default_date_format', 'Y-m-d H:i:s'); @@ -148,8 +168,8 @@ public function __construct($host = "0.0.0.0", $port = 0, $ssl = false) 'heartbeat_idle_time' => 180, // 3分钟客户端没有发送请求,关闭连接 'open_cpu_affinity' => 1, - 'worker_num' => 10, - 'task_worker_num' => 4, + 'worker_num' => $opt['worker_num'], + 'task_worker_num' => $opt['task_worker_num'], 'max_request' => 0, //必须设置为0否则并发任务容易丢,don't change this number 'task_max_request' => 0, // 不退出 @@ -160,8 +180,18 @@ public function __construct($host = "0.0.0.0", $port = 0, $ssl = false) 'backlog' => 20000, 'log_file' => LOGS_PATH . 'sw_server.log', 'daemonize' => 0, + + /** + * 异步安全重启特性 + * @see https://wiki.swoole.com/wiki/page/775.html + */ + 'reload_async' => true, ]; + if ($opt['reactor_num'] > 0) { + $this->_config['reactor_num'] = $opt['reactor_num']; + } + set_error_handler([$this, '_error_handler']); } @@ -180,8 +210,8 @@ public static function init() 'restart' => function ($serverPID, $opt) { //已存在ServerPID,并且进程存在 if (!empty($serverPID) and posix_kill($serverPID, 0)) { - if (self::$beforeStopCallback) { - call_user_func(self::$beforeStopCallback, $opt); + if (self::$_beforeStopCallback) { + call_user_func(self::$_beforeStopCallback, $opt); } posix_kill($serverPID, SIGTERM); self::formatOutput('Stopped'); @@ -191,8 +221,8 @@ public static function init() if (empty($serverPID)) { exit("Server is not running"); } - if (self::$beforeReloadCallback) { - call_user_func(self::$beforeReloadCallback, $opt); + if (self::$_beforeReloadCallback) { + call_user_func(self::$_beforeReloadCallback, $opt); } posix_kill($serverPID, SIGUSR1); exit(0); @@ -201,8 +231,8 @@ public static function init() if (empty($serverPID)) { exit("Server is not running\n"); } - if (self::$beforeStopCallback) { - call_user_func(self::$beforeStopCallback, $opt); + if (self::$_beforeStopCallback) { + call_user_func(self::$_beforeStopCallback, $opt); } posix_kill($serverPID, SIGTERM); exit(0); @@ -235,7 +265,7 @@ public static function start(callable $startFunction) } $kit = self::$optionKit; - foreach (self::$defaultOptions as $k => $v) { + foreach (self::$_defaultOptions as $k => $v) { //解决Windows平台乱码问题 if (PHP_OS == 'WINNT') { $v = iconv('utf-8', 'gbk', $v); @@ -246,20 +276,21 @@ public static function start(callable $startFunction) $opt = $kit->parse($argv); if (empty($argv[1]) or isset($opt['help']) || !isset(self::$_startMethodMaps[$argv[1]])) { usage: - $kit->specs->printOptions("php {$argv[0]} start|restart|stop|reload"); + $kit->specs->printOptions("php {$argv[0]} " . implode('|', array_keys(self::$_startMethodMaps))); exit(0); } call_user_func_array(self::$_startMethodMaps[$argv[1]], [$serverPID, $opt]); - self::$options = $opt; + self::$_options = $opt; self::formatOutput('Starting'); + // 回调闭包启动函数 $startFunction($opt); } public function run() { - $this->sw->set($this->_config); + $this->_sw->set($this->_config); $this->initServer(); - $this->sw->start(); + $this->_sw->start(); } public function setServerName($name) @@ -310,8 +341,8 @@ public function onMasterStart(SwooleServer $server) $this->formatOutput("Master PID = {$server->master_pid}"); $this->formatOutput("Manager PID = {$server->manager_pid}"); $this->formatOutput("Swoole Version = [" . SWOOLE_VERSION . "]"); - $this->formatOutput("Listen IP = {$this->host}"); - $this->formatOutput("Listen Port = {$this->port}"); + $this->formatOutput("Listen IP = {$this->_host}"); + $this->formatOutput("Listen Port = {$this->_port}"); $this->formatOutput("Reactor Number = {$server->setting['reactor_num']}"); $this->formatOutput("Worker Number = {$server->setting['worker_num']}"); $this->formatOutput("Task Number = {$server->setting['task_worker_num']}"); @@ -416,7 +447,7 @@ protected function initServer() * 日志进程,异步刷日志到文件 */ Log::init(); - $this->sw->addProcess(new SwooleProcess(function ($process) { + $this->_sw->addProcess(new SwooleProcess(function ($process) { if (!isMacOS()) { $process->name($this->_serverName . '|LogFlushToDisk'); } @@ -490,7 +521,7 @@ public static function addOption($specString, $description) Loader::addNameSpace('GetOptionKit', LIBS_PATH . "GetOptionKit/src/GetOptionKit"); self::$optionKit = new \GetOptionKit\GetOptionKit; } - foreach (self::$defaultOptions as $k => $v) { + foreach (self::$_defaultOptions as $k => $v) { if ($k[0] == $specString[0]) { throw new ServerOptionException("不能添加系统保留的选项名称"); } @@ -507,7 +538,7 @@ public static function addOption($specString, $description) */ public static function beforeStop(callable $function) { - self::$beforeStopCallback = $function; + self::$_beforeStopCallback = $function; } /** @@ -515,7 +546,7 @@ public static function beforeStop(callable $function) */ public static function beforeReload(callable $function) { - self::$beforeReloadCallback = $function; + self::$_beforeReloadCallback = $function; } public function daemonize() @@ -525,17 +556,17 @@ public function daemonize() public function connection_info($fd) { - return $this->sw->connection_info($fd); + return $this->_sw->connection_info($fd); } public function close($client_id) { - return $this->sw->close($client_id); + return $this->_sw->close($client_id); } public function send($client_id, $data) { - return $this->sw->send($client_id, $data); + return $this->_sw->send($client_id, $data); } /** diff --git a/agent/Libs/Tasks.php b/agent/Libs/Tasks.php index b2ae302..982e08b 100755 --- a/agent/Libs/Tasks.php +++ b/agent/Libs/Tasks.php @@ -23,7 +23,7 @@ class Tasks * TYPE_INT 8(Mysql BIGINT): 2 ^ (8 * 8) = -9223372036854775808 ~ 9223372036854775807 * @var array */ - private static $column = [ + private static $_column = [ 'minute' => [SwooleTable::TYPE_INT, 8], // 分钟 'sec' => [SwooleTable::TYPE_INT, 8], // 哪一秒执行 'taskId' => [SwooleTable::TYPE_INT, 8], // crontab Id @@ -39,7 +39,7 @@ class Tasks public static function init() { self::$table = new SwooleTable(TASKS_MAX_CONCURRENT_SIZE); - foreach (self::$column as $key => $v) { + foreach (self::$_column as $key => $v) { self::$table->column($key, $v[0], $v[1]); } self::$table->create(); diff --git a/agent/Models/DB.php b/agent/Models/DB.php index a3d9a7b..2794a4a 100644 --- a/agent/Models/DB.php +++ b/agent/Models/DB.php @@ -14,10 +14,6 @@ class DB { private static $_dbs = []; - public static function init() { - self::getInstance(); - } - /** * 获取DB实例 * 注意,此方法调用在worker/process/task进程中,所以每个进程都会需要调用一次 @@ -30,7 +26,7 @@ public static function init() { public static function getInstance($name='') { // 默认数据库 - $name = strtolower($name) ?: configItem('default_select_db');; + $name = strtolower($name) ?: configItem('default_select_db'); if (isset(self::$_dbs[$name])) { return self::$_dbs[$name]; } diff --git a/agent/agent.php b/agent/agent.php index df2bce0..c169fef 100644 --- a/agent/agent.php +++ b/agent/agent.php @@ -33,12 +33,14 @@ //重定向PHP错误日志到logs目录 ini_set('error_log', LOGS_PATH . 'php_errors.log'); -//最多载入任务数量 +// 最多载入任务数量 define('TASK_MAX_LOAD_SIZE', configItem('task_max_load_size', 8192)); // 最大进程数 define('PROCESS_MAX_SIZE', configItem('process_max_size', 1024)); -// 同时运行任务最大数量 -define('TASKS_MAX_CONCURRENT_SIZE', configItem('task_max_concurrent_size', 1024)); +// 一分钟内运行任务最大数量 +define('TASKS_MAX_CONCURRENT_SIZE', configItem('task_max_concurrent_size', 8192)); +// 日志临时存储最大条数 +define('LOG_TEMP_STORE_MAX_SIZE', configItem('log_temp_store_max_size', 16384)); if (!class_exists('Agent\\Libs\\Loader')) { $autoloadPath = LIBS_PATH . 'Loader.php';