Skip to content

Commit

Permalink
fixed some bugs
Browse files Browse the repository at this point in the history
formatted code
  • Loading branch information
ppanphper committed Nov 4, 2019
1 parent 2aacdd4 commit be6cadc
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 219 deletions.
6 changes: 4 additions & 2 deletions agent/Config/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@
'task_max_load_size' => 8192,
// 最大进程数,"任务进程内存"表最大行数
'process_max_size' => 1024,
// 同时运行任务最大数量, "任务内存"表最大行数
'task_max_concurrent_size' => 1024,
// 一分钟内运行任务最大数量, "任务内存"表最大行数
'task_max_concurrent_size' => 8192,
// 日志临时存储最大条数
'log_temp_store_max_size' => 16384,

// 10分钟重新加载一次任务到内存表
'task_reload_interval' => 600,
Expand Down
24 changes: 11 additions & 13 deletions agent/Libs/DbLog.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ class DbLog
* @var LogChannel
*/
protected static $_logChannel = null;
// 日志聚合,把所有日志聚合成一条,最后一条标识完结则存储到库中
protected static $logs = [];

/**
* 写入db日志最大重试次数
Expand All @@ -28,7 +26,7 @@ class DbLog
*/
protected static $_retryMaxNum = 3;

public static $table;
protected static $_table;

/**
* TYPE_INT 1(Mysql TINYINT): 2 ^ 8 = -128 ~ 127
Expand All @@ -37,7 +35,7 @@ class DbLog
* TYPE_INT 8(Mysql BIGINT): 2 ^ (8 * 8) = -9223372036854775808 ~ 9223372036854775807
* @var array
*/
private static $column = [
private static $_column = [
'msg' => [SwooleTable::TYPE_STRING, 65535],
];

Expand All @@ -52,11 +50,11 @@ public static function init()
self::$_retryMaxNum = configItem('flush_db_log_max_retry_num', 3);

/** 用来存储日志内容 */
self::$table = new SwooleTable(TASK_MAX_LOAD_SIZE);
foreach (self::$column as $key => $v) {
self::$table->column($key, $v[0], $v[1]);
self::$_table = new SwooleTable(LOG_TEMP_STORE_MAX_SIZE);
foreach (self::$_column as $key => $v) {
self::$_table->column($key, $v[0], $v[1]);
}
self::$table->create();
self::$_table->create();
/** End */

// 请求完结的时候把日志Flush进文件
Expand Down Expand Up @@ -149,7 +147,7 @@ private static function writeLog($logPrefix)
if ($originLog !== false) {
// key = 任务Id + 运行Id + 第几次重试
$key = self::getMemoryTableKey($originLog['taskId'], $originLog['runId'], $originLog['retries']);
$msg = self::$table->get($key, 'msg');
$msg = self::$_table->get($key, 'msg');
if ($msg === false) {
$msg = '';
}
Expand All @@ -160,15 +158,15 @@ private static function writeLog($logPrefix)
$msg .= $originLog['msg'] . PHP_EOL;
}
// 暂存日志内容
self::$table->set($key, ['msg' => $msg]);
self::$_table->set($key, ['msg' => $msg]);
}
// 如果不是最后一条日志,就跳过
if (!isset($originLog['end']) || !$originLog['end']) {
continue;
}
// 删除并发日志
if ($originLog['code'] == Constants::CUSTOM_CODE_CONCURRENCY_LIMIT) {
self::$table->del($key);
self::$_table->del($key);
continue;
}
// 存储日志
Expand All @@ -190,12 +188,12 @@ private static function writeLog($logPrefix)
self::$_logChannel->push($originLog);
} else {
// 入库失败重试次数达到阀值就删除掉
self::$table->del($key);
self::$_table->del($key);
logWarning($logPrefix . json_encode($originLog, JSON_UNESCAPED_UNICODE));
}
} else {
// 入库后就删除掉
self::$table->del($key);
self::$_table->del($key);
}
}
}
Expand Down
45 changes: 23 additions & 22 deletions agent/Libs/Donkeyid.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/
class Donkeyid
{
static $donkeyid;
public static $donkeyid;

private $node_id;
private $epoch;
private $table;
private $lock;
private $_node_id;
private $_epoch;
private $_table;
private $_lock;

const snowflake = 0;
const TIMESTAMP_BITS = 42;
Expand All @@ -44,8 +44,8 @@ public function __construct($node_id = false, $epoch = false)
if ($epoch === false) {
$epoch = ini_get("donkeyid.epoch");
}
$this->node_id = ($node_id == false || $node_id < 0) ? 0 : $node_id;
$this->epoch = ($epoch == false || $epoch < 0) ? 0 : $epoch;
$this->_node_id = ($node_id == false || $node_id < 0) ? 0 : $node_id;
$this->_epoch = ($epoch == false || $epoch < 0) ? 0 : $epoch;
$this->create_table();
}

Expand All @@ -69,11 +69,11 @@ static function getInstance()
*/
private function create_table()
{
$this->table = new SwooleTable(3);
$this->table->column("last_timestamp", \swoole_table::TYPE_INT, 8);
$this->table->column("sequence", \swoole_table::TYPE_INT, 4);
$this->table->create();
$this->lock = new SwooleLock(SWOOLE_SPINLOCK);
$this->_table = new SwooleTable(3);
$this->_table->column("last_timestamp", \swoole_table::TYPE_INT, 8);
$this->_table->column("sequence", \swoole_table::TYPE_INT, 4);
$this->_table->create();
$this->_lock = new SwooleLock(SWOOLE_SPINLOCK);
}

/**
Expand Down Expand Up @@ -105,9 +105,9 @@ public function dk_get_next_id()
$isMacOS = isMacOS();
// MacOS Mojave使用锁会异常退出
if (!$isMacOS) {
$this->lock->lock();
$this->_lock->lock();
}
$col = $this->table->get(self::snowflake);
$col = $this->_table->get(self::snowflake);
if ($col == false || $col["last_timestamp"] > $now) {
$last_timestamp = $now;
$sequence = mt_rand(0, 10) % 2;
Expand All @@ -121,13 +121,13 @@ public function dk_get_next_id()
$now = $this->wait_next_ms();
}
}
$this->table->set(self::snowflake, array("last_timestamp" => $now, "sequence" => $sequence));
$this->_table->set(self::snowflake, array("last_timestamp" => $now, "sequence" => $sequence));
// MacOS Mojave使用锁会异常退出
if (!$isMacOS) {
$this->lock->unlock();
$this->_lock->unlock();
}
$id = (($now - ($this->epoch * 1000) & (-1 ^ (-1 << self::TIMESTAMP_BITS))) << self::TIMESTAMP_LEFT_SHIFT)
| (($this->node_id & (-1 ^ (-1 << self::NODE_ID_BITS))) << self::NODE_ID_LEFT_SHIFT)
$id = (($now - ($this->_epoch * 1000) & (-1 ^ (-1 << self::TIMESTAMP_BITS))) << self::TIMESTAMP_LEFT_SHIFT)
| (($this->_node_id & (-1 ^ (-1 << self::NODE_ID_BITS))) << self::NODE_ID_LEFT_SHIFT)
| ($sequence);
return $id;
}
Expand All @@ -141,9 +141,10 @@ public function dk_get_next_id()
*/
public function dk_parse_id($id)
{
$ret["time"] = ($id >> self::TIMESTAMP_LEFT_SHIFT) + ($this->epoch * 1000);
$ret["node_id"] = ($id >> self::NODE_ID_LEFT_SHIFT) & (-1 ^ (-1 << self::NODE_ID_BITS));
$ret["sequence"] = $id & (-1 ^ (-1 << self::SEQUENCE_BITS));
return $ret;
return [
'time' => ($id >> self::TIMESTAMP_LEFT_SHIFT) + ($this->_epoch * 1000),
'node_id' => ($id >> self::NODE_ID_LEFT_SHIFT) & (-1 ^ (-1 << self::NODE_ID_BITS)),
'sequence' => $id & (-1 ^ (-1 << self::SEQUENCE_BITS))
];
}
}
1 change: 0 additions & 1 deletion agent/Libs/LoadTasks.php
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ public static function load($loadSpecifiedIds = [])
break;
}

// 如果内存不够,会出现 Swoole_table::set(): unable to allocate memory
self::$_table->set($task['id'], [
'name' => $task['name'],
'rule' => $task['rule'],
Expand Down
10 changes: 5 additions & 5 deletions agent/Libs/Loader.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class Loader
/**
* 命名空间的路径
*/
protected static $namespaces;
protected static $_namespaces;

/**
* 自动载入类
Expand All @@ -27,10 +27,10 @@ class Loader
public static function autoload($class)
{
$root = explode('\\', trim($class, '\\'), 2);
if (count($root) > 1 and isset(self::$namespaces[$root[0]])) {
$filePath = self::$namespaces[$root[0]] . DIRECTORY_SEPARATOR . str_replace('\\', DIRECTORY_SEPARATOR, $root[1]) . '.php';
if (count($root) > 1 and isset(self::$_namespaces[$root[0]])) {
$filePath = self::$_namespaces[$root[0]] . DIRECTORY_SEPARATOR . str_replace('\\', DIRECTORY_SEPARATOR, $root[1]) . '.php';
if (is_file($filePath)) {
require $filePath;
require_once $filePath;
}
}
}
Expand All @@ -53,6 +53,6 @@ public static function register($prepend = false)
*/
public static function addNameSpace($root, $path)
{
self::$namespaces[$root] = $path;
self::$_namespaces[$root] = $path;
}
}
Loading

0 comments on commit be6cadc

Please sign in to comment.