Skip to content

Commit

Permalink
支持在config定义defaultJobClass和defaultJobMethod,用于兼容fanout消息体 (#31)
Browse files Browse the repository at this point in the history
* 支持在config定义defaultJobClass和defaultJobMethod,用于兼容fanout消息体

* php-cs-fixer
  • Loading branch information
omgbbq authored and kcloze committed Jul 21, 2019
1 parent 347c9ec commit 693ea99
Show file tree
Hide file tree
Showing 6 changed files with 362 additions and 0 deletions.
2 changes: 2 additions & 0 deletions config.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
['name'=>'MyJob', 'workerMinNum'=>3, 'workerMaxNum'=>30, 'queueMaxNum'=>10000],
['name'=> 'MyJob2', 'workerMinNum'=>3, 'workerMaxNum'=>20],
['name'=> 'MyJob3', 'workerMinNum'=>1, 'workerMaxNum'=>1],
['name'=> 'DefaultClassMethod.test1', 'workerMinNum'=>1, 'workerMaxNum'=>2, 'defaultJobClass'=>'DefaultClassMethod', 'defaultJobMethod'=>'test1'],
['name'=> 'DefaultClassMethod.test2', 'workerMinNum'=>1, 'workerMaxNum'=>2, 'defaultJobClass'=>'DefaultClassMethod', 'defaultJobMethod'=>'test2'],
//不需要swoole-jobs消费的队列,只往队列里面写数据
//['name'=> 'TojavaConsumer'],
],
Expand Down
58 changes: 58 additions & 0 deletions src/Jobs.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public function run($topic='')
$beginTime=microtime(true);
// 根据自己的业务需求改写此方法
$jobObject = $this->loadObject($data);
if ($jobObject instanceof JobObject) {
$jobObject = $this->formatJobObjectByTopicConfig($jobObject, $topic, $data);
}
$baseAction = $this->loadFrameworkAction();
$baseAction->start($jobObject);
$endTime =microtime(true);
Expand Down Expand Up @@ -117,6 +120,61 @@ public function run($topic='')
}
}

/**
* 获取topic配置对象,格式化JobObject.
*
* @param JobObject $jobObject
* @param string $topic
* @param mixed $data
*
* @return JobObject
*/
public function formatJobObjectByTopicConfig(JobObject $jobObject, $topic, $data)
{
$topicConfigObject = new TopicConfigObject();
if ('' == $topic) {
return $jobObject;
}
if ('' === $jobObject->topic) {
$jobObject->topic = $topic;
}
//如果消息体对象的callback class或method为空,则尝试读取配置的默认class和method
if ('' == $jobObject->jobClass || '' == $jobObject->jobMethod) {
$topicConfig = $this->getConfigByTopic($topic);
if ($topicConfig != []) {
$topicConfigObject->initAttributes($topicConfig);
if ('' == $jobObject->jobClass) {
$jobObject->jobClass = $topicConfigObject->getDefaultJobClass();
}
if ('' == $jobObject->jobMethod) {
$jobObject->jobMethod = $topicConfigObject->getDefaultJobMethod();
}
if ($jobObject->jobParams == []) {
$jobObject->jobParams = $data;
}
}
}

return $jobObject;
}

/**
* 获取对应topic的配置数组.
*
* @param string $topic
*
* @return array
*/
public function getConfigByTopic($topic)
{
$topicsConfig = $this->config['job']['topics'] ?? [];
$topicConfig = array_filter($topicsConfig, function ($config) use ($topic) {
return $config['name'] == $topic;
});

return $topicConfig != [] ? reset($topicConfig) : [];
}

//根据配置装入不同的框架
private function loadFrameworkAction()
{
Expand Down
21 changes: 21 additions & 0 deletions src/Jobs/DefaultClassMethod.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

namespace Kcloze\Jobs\Jobs;

class DefaultClassMethod
{
public function test1(...$args)
{
echo __CLASS__,'->',__FUNCTION__,'(' . var_export($args, true) . ')',PHP_EOL;
}

public function test2(...$args)
{
echo __CLASS__,'->',__FUNCTION__,'(' . var_export($args, true) . ')',PHP_EOL;
}

public static function test3(...$args)
{
echo __CLASS__,'::',__FUNCTION__,'(' . var_export($args, true) . ')',PHP_EOL;
}
}
137 changes: 137 additions & 0 deletions src/TopicConfigObject.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
<?php

namespace Kcloze\Jobs;

class TopicConfigObject
{
private $name = '';
private $workerMinNum;
private $workerMaxNum;
private $queueMaxNum;
private $defaultJobClass = '';
private $defaultJobMethod = '';

/**
* @param array $config
*/
public function __construct(array $config = [])
{
if ($config != []) {
$this->initAttributes($config);
}
}

/**
* @param array $config
*/
public function initAttributes(array $config)
{
$class = new \ReflectionClass($this);
foreach ($class->getProperties() as $property) {
if (isset($config[$property->getName()])) {
$method = 'set' . $property->getName();
if (method_exists($this, $method)) {
$reflectionMethod = new \ReflectionMethod($this, $method);
$reflectionMethod->setAccessible(true);
$reflectionMethod->invoke($this, $config[$property->getName()]);
}
}
}
}

/**
* @return string
*/
public function getName()
{
return $this->name;
}

/**
* @param string $name
*/
public function setName($name)
{
$this->name = $name;
}

/**
* @return int
*/
public function getWorkerMinNum()
{
return $this->workerMinNum;
}

/**
* @param int $workerMinNum
*/
public function setWorkerMinNum($workerMinNum)
{
$this->workerMinNum = $workerMinNum;
}

/**
* @return int
*/
public function getWorkerMaxNum()
{
return $this->workerMaxNum;
}

/**
* @param int $workerMaxNum
*/
public function setWorkerMaxNum($workerMaxNum)
{
$this->workerMaxNum = $workerMaxNum;
}

/**
* @return int
*/
public function getQueueMaxNum()
{
return $this->queueMaxNum;
}

/**
* @param int $queueMaxNum
*/
public function setQueueMaxNum($queueMaxNum)
{
$this->queueMaxNum = $queueMaxNum;
}

/**
* @return string
*/
public function getDefaultJobClass()
{
return $this->defaultJobClass;
}

/**
* @param string $defaultJobClass
*/
public function setDefaultJobClass($defaultJobClass)
{
$this->defaultJobClass = $defaultJobClass;
}

/**
* @return string
*/
public function getDefaultJobMethod()
{
return $this->defaultJobMethod;
}

/**
* @param string $defaultJobMethod
*/
public function setDefaultJobMethod($defaultJobMethod)
{
$this->defaultJobMethod = $defaultJobMethod;
}
}
73 changes: 73 additions & 0 deletions tests/DefaultJobClassMethodConfigTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

define('SWOOLE_JOBS_ROOT_PATH', __DIR__ . '/..');
use PHPUnit\Framework\TestCase;

class DefaultJobClassMethodConfigTest extends TestCase
{
private $queue =null;
private $config=[];

public function __construct()
{
$this->config= require SWOOLE_JOBS_ROOT_PATH . '/config.php';
\Kcloze\Jobs\Config::setConfig($this->config);
$logger = \Kcloze\Jobs\Logs::getLogger($this->config['logPath'] ?? '', $this->config['logSaveFileApp'] ?? '');
$this->queue =\Kcloze\Jobs\Queue\Queue::getQueue($this->config['job']['queue'], $logger);
}

public function testBase()
{
$this->assertSame(get_class($this->queue), 'Kcloze\Jobs\Queue\RedisTopicQueue');
//$this->assertSame(get_class($this->queue), 'Kcloze\Jobs\Queue\RabbitmqTopicQueue');
$topicName = 'DefaultClassMethod.test1';
$this->queue->delete($topicName);
$jobObject = new \Kcloze\Jobs\JobObject($topicName, '', '', ['functionName'=>__FUNCTION__, 'timestamp'=>time()]);
$this->assertNotEmpty($this->queue->push($topicName, $jobObject, 1, 'json'));
$this->assertSame(1, $this->queue->len($topicName));
$messageBody = $this->queue->pop($topicName, 'json');
$this->assertNotEmpty($messageBody);
$this->assertSame($jobObject->topic, $messageBody['topic']);
$this->assertSame($jobObject->jobClass, $messageBody['jobClass']);
$this->assertSame($jobObject->jobMethod, $messageBody['jobMethod']);
$this->assertSame($jobObject->jobParams, $messageBody['jobParams']);
$this->assertSame($jobObject->jobExtras, $messageBody['jobExtras']);
}

public function testDefault()
{
$topicName = 'DefaultClassMethod.test1';
$this->queue->delete($topicName);
$jobParams = ['orderNo'=>'12345678910', 'userId'=>'9527', 'userName'=>'凌凌漆', 'paymentTime'=>time()];
$jobObject = new \Kcloze\Jobs\JobObject(
$topicName,
'',
'',
$jobParams
);
$this->assertNotEmpty($this->queue->push($topicName, $jobObject, 1, 'json'));
$this->assertSame(1, $this->queue->len($topicName));
$messageBody = $this->queue->pop($topicName, 'json');
$this->assertNotEmpty($messageBody);
$this->assertSame($jobObject->topic, $messageBody['topic']);
$this->assertSame($jobObject->jobClass, $messageBody['jobClass']);
$this->assertSame($jobObject->jobMethod, $messageBody['jobMethod']);
$this->assertSame($jobObject->jobParams, $messageBody['jobParams']);
$this->assertSame($jobObject->jobParams, $jobParams);
$this->assertSame($jobObject->jobExtras, $messageBody['jobExtras']);

$job = new \Kcloze\Jobs\Jobs('');
$config = $job->getConfigByTopic($topicName);
$jobObject = $job->formatJobObjectByTopicConfig($jobObject, $topicName);
$this->assertNotEmpty($jobObject->topic);
$this->assertNotEmpty($jobObject->jobClass);
$this->assertNotEmpty($jobObject->jobMethod);
$this->assertSame($jobObject->topic, $topicName);
$this->assertSame($jobObject->jobParams, $jobParams);
$this->assertSame($jobObject->jobParams, $messageBody['jobParams']);
$this->assertSame($jobObject->jobExtras, $messageBody['jobExtras']);
$this->assertSame($jobObject->topic, $config['name']);
$this->assertSame($jobObject->jobClass, $config['defaultJobClass']);
$this->assertSame($jobObject->jobMethod, $config['defaultJobMethod']);
}
}
71 changes: 71 additions & 0 deletions tests/TopicConfigObjectTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php

use PHPUnit\Framework\TestCase;

class TopicConfigObjectTest extends TestCase
{
public function testEmpty()
{
$config = [
'params'=> [
'a', 'b', 'c',
],
'attribute'=> null,
];
$object = new \Kcloze\Jobs\TopicConfigObject();
$object->initAttributes($config);
$this->assertSame('', $object->getName());
$this->assertSame('', $object->getDefaultJobClass());
$this->assertSame('', $object->getDefaultJobMethod());
$this->assertNull($object->getWorkerMinNum());
$this->assertNull($object->getWorkerMaxNum());
$this->assertNull($object->getQueueMaxNum());
}

public function testAttributes()
{
$config = [
'name' => 'nameValue',
'defaultJobClass' => 'jobClassValue',
'defaultJobMethod'=> 'jobMethodValue',
'workerMinNum' => 1,
'workerMaxNum' => 3,
'queueMaxNum' => 10,
'params' => [
'a', 'b', 'c',
],
'attribute'=> null,
];
$object = new \Kcloze\Jobs\TopicConfigObject();
$object->initAttributes($config);
$this->assertSame($config['name'], $object->getName());
$this->assertSame($config['defaultJobClass'], $object->getDefaultJobClass());
$this->assertSame($config['defaultJobMethod'], $object->getDefaultJobMethod());
$this->assertSame($config['workerMinNum'], $object->getWorkerMinNum());
$this->assertSame($config['workerMaxNum'], $object->getWorkerMaxNum());
$this->assertSame($config['queueMaxNum'], $object->getQueueMaxNum());
}

public function testConstruct()
{
$config = [
'name' => 'nameValue',
'defaultJobClass' => 'jobClassValue',
'defaultJobMethod'=> 'jobMethodValue',
'workerMinNum' => 1,
'workerMaxNum' => 3,
'queueMaxNum' => 10,
'params' => [
'a', 'b', 'c',
],
'attribute'=> null,
];
$object = new \Kcloze\Jobs\TopicConfigObject($config);
$this->assertSame($config['name'], $object->getName());
$this->assertSame($config['defaultJobClass'], $object->getDefaultJobClass());
$this->assertSame($config['defaultJobMethod'], $object->getDefaultJobMethod());
$this->assertSame($config['workerMinNum'], $object->getWorkerMinNum());
$this->assertSame($config['workerMaxNum'], $object->getWorkerMaxNum());
$this->assertSame($config['queueMaxNum'], $object->getQueueMaxNum());
}
}

0 comments on commit 693ea99

Please sign in to comment.