Skip to content

Commit

Permalink
Merge pull request #7 from ppanphper/refactor/optimize_code
Browse files Browse the repository at this point in the history
Refactor/optimize code
  • Loading branch information
ppanphper authored Nov 4, 2019
2 parents b22badf + be6cadc commit 670961e
Show file tree
Hide file tree
Showing 25 changed files with 713 additions and 373 deletions.
1 change: 1 addition & 0 deletions admin/config/const.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Constants {
/**
* 数据库中的agent数据如果有变更,把这个md5值也一起改变
* agent发现有值,就重新加载DB数据,并删除对应的Key
* key prefix:IP_PORT
*/
const REDIS_KEY_AGENT_CHANGE_MD5 = 'agent_change_md5:';

Expand Down
6 changes: 3 additions & 3 deletions admin/helpers/RequestAgentHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ class RequestAgentHelper
/**
* 控制请求
*
* @param string $name
* @param string $cmdName
* @param array $params
* @param $ip
* @param $port
*
* @return array
*/
public static function controlCommand($name, array $params, $ip, $port) {
public static function controlCommand($cmdName, array $params, $ip, $port) {
$data = [
'cmd' => $name,
'cmd' => $cmdName,
'param' => $params,
'type' => Constants::SW_CONTROL_CMD,
];
Expand Down
4 changes: 4 additions & 0 deletions admin/helpers/StringHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,8 @@ public static function is_serialized($data)
}
return false;
}

public static function getIpPortFormat($ip, $port, $separator = ':') {
return $ip . $separator . $port;
}
}
10 changes: 6 additions & 4 deletions admin/models/Agents.php
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,17 @@ public function afterSave($insert, $changedAttributes)
parent::afterSave($insert, $changedAttributes);
$oldAttributes = $this->getOldAttributes();
$checkChangeFields = [
'ip',
'port',
// 'ip',
// 'port',
'status'
];
foreach($checkChangeFields as $field) {
// 如果状态有变更,通知Agent重新加载节点信息
if(isset($changedAttributes[$field]) && $changedAttributes[$field] != $oldAttributes[$field]) {
$ip = isset($changedAttributes['ip']) ? $changedAttributes['ip'] : $oldAttributes['ip'];
$port = isset($changedAttributes['port']) ? $changedAttributes['port'] : $oldAttributes['port'];
// $ip = isset($changedAttributes['ip']) ? $changedAttributes['ip'] : $oldAttributes['ip'];
// $port = isset($changedAttributes['port']) ? $changedAttributes['port'] : $oldAttributes['port'];
$ip = $oldAttributes['ip'];
$port = $oldAttributes['port'];
$key = Constants::REDIS_KEY_AGENT_CHANGE_MD5 . $ip .'_'.$port;
Yii::$app->redis->set($key, microtime(true), 86400);
break;
Expand Down
26 changes: 18 additions & 8 deletions admin/models/searchs/Crontab.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ public function search($params)
break;
}
}
$query->where(['<>', 'a.status', -1]);

if ($this->status === '' || (is_string($this->status) && trim($this->status) === '')) {
$query->where(['<>', 'a.status', -1]);
}

// grid filtering conditions
$query->andFilterWhere([
Expand All @@ -104,17 +107,22 @@ public function search($params)
]);
$joinWith = [];
if($this->agentId) {
/** @see Crontab::getAgents() */
$joinWith[] = 'agents';
$query->andWhere('b.bid = :bid OR b.bid IS NULL',[
':bid'=>$this->agentId
]);

// 查询出不在这个节点运行的任务,过滤掉
$ids = ViaTable::find()->select('aid')->where([
// $ids = ViaTable::find()->select('aid')->where([
// 'bid' => $this->agentId,
// 'type' => ViaTable::TYPE_CRONTAB_NOT_IN_AGENTS,
// ])->asArray();
$subQuery = ViaTable::find()->select('aid')->where([
'bid' => $this->agentId,
'type' => ViaTable::TYPE_CRONTAB_NOT_IN_AGENTS,
])->asArray();
// crontab Id 不在这些Id中
$query->andWhere(['NOT IN', 'a.id', $ids]);
]);
$query->andWhere(['NOT IN', 'a.id', $subQuery]);

$this->agentInfo = [];
$rows = Agents::getData();
Expand All @@ -130,17 +138,17 @@ public function search($params)
}
}
}

if($this->ownerId) {
/** @see Crontab::getOwners() */
$joinWith[] = 'owners';
}

if($joinWith) {
$this->isSearched = true;
$query->select([
'DISTINCT(a.id)',
'a.*',
])->joinWith($joinWith);

$dataProvider->totalCount = $query->count('DISTINCT a.id');
}

if ($this->max_process_time) {
Expand All @@ -152,6 +160,8 @@ public function search($params)
->andFilterWhere(['like', 'a.command', $this->command])
->andFilterWhere(['like', 'a.run_user', $this->run_user]);

$dataProvider->totalCount = $query->count();

return $dataProvider;
}
}
6 changes: 4 additions & 2 deletions agent/Config/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@
'task_max_load_size' => 8192,
// 最大进程数,"任务进程内存"表最大行数
'process_max_size' => 1024,
// 同时运行任务最大数量, "任务内存"表最大行数
'task_max_concurrent_size' => 1024,
// 一分钟内运行任务最大数量, "任务内存"表最大行数
'task_max_concurrent_size' => 8192,
// 日志临时存储最大条数
'log_temp_store_max_size' => 16384,

// 10分钟重新加载一次任务到内存表
'task_reload_interval' => 600,
Expand Down
6 changes: 3 additions & 3 deletions agent/Libs/Common.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ function getArg(&$arg, $level = 0)
{
// 最大层数
if ($level >= TRACE_LEVEL) {
return json_encode($arg);
return json_encode($arg, JSON_UNESCAPED_UNICODE);
}
if (is_object($arg)) {
$arr = (array)$arg;
Expand All @@ -115,7 +115,7 @@ function getArg(&$arg, $level = 0)

$arg = get_class($arg) . ' Object (' . implode(',', $args) . ')';
}
return $arg;
return !is_string($arg) ? json_encode($arg, JSON_UNESCAPED_UNICODE) : $arg;
}

/**
Expand Down Expand Up @@ -164,7 +164,7 @@ function convenienceDebug($traces, $traces_to_ignore = 0, $split = PHP_EOL, $exc
}
// 调用的方法与参数
if (DEBUG_TRACE_DETAIL) {
$args = empty($trace['args']) ? '' : json_encode($trace['args']);
$args = empty($trace['args']) ? '' : json_encode($trace['args'], JSON_UNESCAPED_UNICODE);
$args = mb_strlen($args, 'UTF-8') > DEBUG_TRACE_DETAIL_PARAM_LENGTH ? mb_substr($args, 0, DEBUG_TRACE_DETAIL_PARAM_LENGTH, 'UTF-8') . '...' : $args;
$msg .= '(' . $args . ')';
} else {
Expand Down
1 change: 1 addition & 0 deletions agent/Libs/Constants.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class Constants
/**
* 数据库中的agent数据如果有变更,把这个md5值也一起改变
* agent发现有值,就重新加载DB数据,并删除对应的Key
* key prefix:IP_PORT
*/
const REDIS_KEY_AGENT_CHANGE_MD5 = 'agent_change_md5:';

Expand Down
45 changes: 22 additions & 23 deletions agent/Libs/DbLog.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

namespace Libs;

use Models\Logs;
use \Swoole\Table as SwooleTable;

class DbLog
Expand All @@ -17,8 +18,6 @@ class DbLog
* @var LogChannel
*/
protected static $_logChannel = null;
// 日志聚合,把所有日志聚合成一条,最后一条标识完结则存储到库中
protected static $logs = [];

/**
* 写入db日志最大重试次数
Expand All @@ -27,7 +26,7 @@ class DbLog
*/
protected static $_retryMaxNum = 3;

public static $table;
protected static $_table;

/**
* TYPE_INT 1(Mysql TINYINT): 2 ^ 8 = -128 ~ 127
Expand All @@ -36,7 +35,7 @@ class DbLog
* TYPE_INT 8(Mysql BIGINT): 2 ^ (8 * 8) = -9223372036854775808 ~ 9223372036854775807
* @var array
*/
private static $column = [
private static $_column = [
'msg' => [SwooleTable::TYPE_STRING, 65535],
];

Expand All @@ -51,11 +50,11 @@ public static function init()
self::$_retryMaxNum = configItem('flush_db_log_max_retry_num', 3);

/** 用来存储日志内容 */
self::$table = new SwooleTable(TASK_MAX_LOAD_SIZE);
foreach (self::$column as $key => $v) {
self::$table->column($key, $v[0], $v[1]);
self::$_table = new SwooleTable(LOG_TEMP_STORE_MAX_SIZE);
foreach (self::$_column as $key => $v) {
self::$_table->column($key, $v[0], $v[1]);
}
self::$table->create();
self::$_table->create();
/** End */

// 请求完结的时候把日志Flush进文件
Expand Down Expand Up @@ -140,7 +139,6 @@ public static function shutdown()
private static function writeLog($logPrefix)
{
try {
$db = getDBInstance();
$stats = self::$_logChannel->stats();
$dateFormat = configItem('default_date_format');
if ($stats['queue_num'] > 0) {
Expand All @@ -149,7 +147,7 @@ private static function writeLog($logPrefix)
if ($originLog !== false) {
// key = 任务Id + 运行Id + 第几次重试
$key = self::getMemoryTableKey($originLog['taskId'], $originLog['runId'], $originLog['retries']);
$msg = self::$table->get($key, 'msg');
$msg = self::$_table->get($key, 'msg');
if ($msg === false) {
$msg = '';
}
Expand All @@ -160,26 +158,27 @@ private static function writeLog($logPrefix)
$msg .= $originLog['msg'] . PHP_EOL;
}
// 暂存日志内容
self::$table->set($key, ['msg' => $msg]);
self::$_table->set($key, ['msg' => $msg]);
}
// 如果不是最后一条日志,就跳过
if (!isset($originLog['end']) || !$originLog['end']) {
continue;
}
// 删除并发日志
if ($originLog['code'] == Constants::CUSTOM_CODE_CONCURRENCY_LIMIT) {
self::$table->del($key);
self::$_table->del($key);
continue;
}
if (!$db->insertInto('logs', [
'task_id' => $originLog['taskId'],
'run_id' => $originLog['runId'],
'code' => $originLog['code'],
'title' => $originLog['title'],
'msg' => $msg,
'consume_time' => $originLog['consumeTime'],
'created' => $originLog['created'],
])->execute()) {
// 存储日志
if (!Logs::saveLog([
'taskId' => $originLog['taskId'],
'runId' => $originLog['runId'],
'code' => $originLog['code'],
'title' => $originLog['title'],
'msg' => $msg,
'consumeTime' => $originLog['consumeTime'],
'created' => $originLog['created'],
])) {
if (!isset($originLog['retryCount'])) {
$originLog['retryCount'] = 0;
}
Expand All @@ -189,12 +188,12 @@ private static function writeLog($logPrefix)
self::$_logChannel->push($originLog);
} else {
// 入库失败重试次数达到阀值就删除掉
self::$table->del($key);
self::$_table->del($key);
logWarning($logPrefix . json_encode($originLog, JSON_UNESCAPED_UNICODE));
}
} else {
// 入库后就删除掉
self::$table->del($key);
self::$_table->del($key);
}
}
}
Expand Down
45 changes: 23 additions & 22 deletions agent/Libs/Donkeyid.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/
class Donkeyid
{
static $donkeyid;
public static $donkeyid;

private $node_id;
private $epoch;
private $table;
private $lock;
private $_node_id;
private $_epoch;
private $_table;
private $_lock;

const snowflake = 0;
const TIMESTAMP_BITS = 42;
Expand All @@ -44,8 +44,8 @@ public function __construct($node_id = false, $epoch = false)
if ($epoch === false) {
$epoch = ini_get("donkeyid.epoch");
}
$this->node_id = ($node_id == false || $node_id < 0) ? 0 : $node_id;
$this->epoch = ($epoch == false || $epoch < 0) ? 0 : $epoch;
$this->_node_id = ($node_id == false || $node_id < 0) ? 0 : $node_id;
$this->_epoch = ($epoch == false || $epoch < 0) ? 0 : $epoch;
$this->create_table();
}

Expand All @@ -69,11 +69,11 @@ static function getInstance()
*/
private function create_table()
{
$this->table = new SwooleTable(3);
$this->table->column("last_timestamp", \swoole_table::TYPE_INT, 8);
$this->table->column("sequence", \swoole_table::TYPE_INT, 4);
$this->table->create();
$this->lock = new SwooleLock(SWOOLE_SPINLOCK);
$this->_table = new SwooleTable(3);
$this->_table->column("last_timestamp", \swoole_table::TYPE_INT, 8);
$this->_table->column("sequence", \swoole_table::TYPE_INT, 4);
$this->_table->create();
$this->_lock = new SwooleLock(SWOOLE_SPINLOCK);
}

/**
Expand Down Expand Up @@ -105,9 +105,9 @@ public function dk_get_next_id()
$isMacOS = isMacOS();
// MacOS Mojave使用锁会异常退出
if (!$isMacOS) {
$this->lock->lock();
$this->_lock->lock();
}
$col = $this->table->get(self::snowflake);
$col = $this->_table->get(self::snowflake);
if ($col == false || $col["last_timestamp"] > $now) {
$last_timestamp = $now;
$sequence = mt_rand(0, 10) % 2;
Expand All @@ -121,13 +121,13 @@ public function dk_get_next_id()
$now = $this->wait_next_ms();
}
}
$this->table->set(self::snowflake, array("last_timestamp" => $now, "sequence" => $sequence));
$this->_table->set(self::snowflake, array("last_timestamp" => $now, "sequence" => $sequence));
// MacOS Mojave使用锁会异常退出
if (!$isMacOS) {
$this->lock->unlock();
$this->_lock->unlock();
}
$id = (($now - ($this->epoch * 1000) & (-1 ^ (-1 << self::TIMESTAMP_BITS))) << self::TIMESTAMP_LEFT_SHIFT)
| (($this->node_id & (-1 ^ (-1 << self::NODE_ID_BITS))) << self::NODE_ID_LEFT_SHIFT)
$id = (($now - ($this->_epoch * 1000) & (-1 ^ (-1 << self::TIMESTAMP_BITS))) << self::TIMESTAMP_LEFT_SHIFT)
| (($this->_node_id & (-1 ^ (-1 << self::NODE_ID_BITS))) << self::NODE_ID_LEFT_SHIFT)
| ($sequence);
return $id;
}
Expand All @@ -141,9 +141,10 @@ public function dk_get_next_id()
*/
public function dk_parse_id($id)
{
$ret["time"] = ($id >> self::TIMESTAMP_LEFT_SHIFT) + ($this->epoch * 1000);
$ret["node_id"] = ($id >> self::NODE_ID_LEFT_SHIFT) & (-1 ^ (-1 << self::NODE_ID_BITS));
$ret["sequence"] = $id & (-1 ^ (-1 << self::SEQUENCE_BITS));
return $ret;
return [
'time' => ($id >> self::TIMESTAMP_LEFT_SHIFT) + ($this->_epoch * 1000),
'node_id' => ($id >> self::NODE_ID_LEFT_SHIFT) & (-1 ^ (-1 << self::NODE_ID_BITS)),
'sequence' => $id & (-1 ^ (-1 << self::SEQUENCE_BITS))
];
}
}
Loading

0 comments on commit 670961e

Please sign in to comment.