Skip to content

Commit a77f0fe

Browse files
stephanschulersaschanowak
authored andcommitted
Initial commit
0 parents  commit a77f0fe

File tree

13 files changed

+773
-0
lines changed

13 files changed

+773
-0
lines changed

Classes/Cache/CacheFactory.php

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace Netlogix\JobQueue\FastRabbit\Cache;
5+
6+
use Neos\Cache\Frontend\FrontendInterface;
7+
use Neos\Flow\Core\ApplicationContext;
8+
use Neos\Flow\Utility\Environment;
9+
10+
class CacheFactory
11+
{
12+
public static function get(array $config): FrontendInterface
13+
{
14+
$temporaryDirectoryBase = $config['temporaryDirectoryBase'];
15+
$applicationIdentifier = $config['applicationIdentifier'];
16+
$contextString = $config['contextString'];
17+
18+
$frontendClassName = $config['cacheConfiguration']['frontend'];
19+
$backendClassName = $config['cacheConfiguration']['backend'];
20+
$backendOptions = $config['cacheConfiguration']['backendOptions'];
21+
22+
$applicationContext = new ApplicationContext($contextString);
23+
$environment = new Environment($applicationContext);
24+
$environment->setTemporaryDirectoryBase($temporaryDirectoryBase);
25+
26+
$factory = new \Neos\Flow\Cache\CacheFactory($applicationContext, $environment, $applicationIdentifier);
27+
28+
return $factory->create(
29+
'FlowPackJobQueueCommon_MessageCache',
30+
$frontendClassName,
31+
$backendClassName,
32+
$backendOptions
33+
);
34+
}
35+
}
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace Netlogix\JobQueue\FastRabbit\Command;
5+
6+
use Flowpack\JobQueue\Common\Queue\QueueManager;
7+
use Neos\Flow\Annotations as Flow;
8+
use Neos\Flow\Configuration\ConfigurationManager;
9+
use Neos\Flow\Configuration\Exception\InvalidConfigurationTypeException;
10+
use Neos\Flow\Configuration\Exception\SchemaValidationException;
11+
use Neos\Flow\Core\Booting\Scripts;
12+
use Neos\Flow\ObjectManagement\ObjectManagerInterface;
13+
use Neos\Flow\Reflection\ReflectionService;
14+
use Neos\Utility\Files;
15+
use Netlogix\JobQueue\FastRabbit\Job\ConfigurationFactory;
16+
use Netlogix\JobQueue\FastRabbit\Queues\Locator;
17+
18+
class SupervisorCommandController extends \Neos\Flow\Cli\CommandController
19+
{
20+
/**
21+
* @var ObjectManagerInterface
22+
* @Flow\Inject
23+
*/
24+
protected $objectManager;
25+
26+
/**
27+
* @var QueueManager
28+
* @Flow\Inject
29+
*/
30+
protected $queueManager;
31+
32+
/**
33+
* @var ConfigurationManager
34+
* @Flow\Inject
35+
*/
36+
protected $configurationManager;
37+
38+
/**
39+
* @Flow\CompileStatic
40+
* @param ObjectManagerInterface $objectManager
41+
* @return array
42+
*/
43+
public static function collectLocatorNames(
44+
ObjectManagerInterface $objectManager
45+
): array {
46+
$reflectionService = $objectManager->get(ReflectionService::class);
47+
$locatorNames = $reflectionService->getAllImplementationClassNamesForInterface(Locator::class);
48+
return array_values($locatorNames);
49+
}
50+
51+
public function createCommand(): void
52+
{
53+
$this->createSupervisorGroupConfigCommand();
54+
$queueNames = $this->collectQueueNames();
55+
foreach ($queueNames as $queueName) {
56+
$this->createSupervisorProcessConfigCommand($queueName);
57+
$this->createWorkerConfigCommand($queueName);
58+
}
59+
}
60+
61+
/**
62+
* @return void
63+
*/
64+
public function createSupervisorGroupConfigCommand(): void
65+
{
66+
$pathPrefix = FLOW_PATH_CONFIGURATION . 'Supervisor/';
67+
Files::createDirectoryRecursively($pathPrefix);
68+
foreach (glob($pathPrefix . 'program-*.conf') as $configFile) {
69+
unlink($configFile);
70+
}
71+
72+
$factory = new ConfigurationFactory();
73+
74+
$queueNames = self::collectQueueNames($this->objectManager);
75+
76+
$groupConfiguration = $factory->buildGroupConfigurationForQueues(... $queueNames);
77+
$groupFilePath = $pathPrefix . 'group.conf';
78+
file_put_contents($groupFilePath, $groupConfiguration);
79+
}
80+
81+
/**
82+
* @return void
83+
*/
84+
public function createSupervisorProcessConfigCommand(string $queueName): void
85+
{
86+
$factory = new ConfigurationFactory();
87+
$jobConfiguration = $factory->buildJobConfigurationForQueue($queueName);
88+
89+
$jobFilePath = $factory->getJobSupervisorFile($queueName);
90+
file_put_contents($jobFilePath, $jobConfiguration);
91+
}
92+
93+
/**
94+
* @param string $queueName
95+
* @throws \Flowpack\JobQueue\Common\Exception
96+
* @throws InvalidConfigurationTypeException
97+
* @throws \Neos\Flow\Exception
98+
*/
99+
public function createWorkerConfigCommand(string $queueName): void
100+
{
101+
$command = Scripts::buildPhpCommand(
102+
$this->configurationManager->getConfiguration('Settings', 'Neos.Flow')
103+
);
104+
$command .= sprintf(
105+
' %s %s --queue=%s',
106+
escapeshellarg(FLOW_PATH_FLOW . 'Scripts/flow.php'),
107+
escapeshellarg('flowpack.jobqueue.common:job:execute'),
108+
escapeshellarg($queueName)
109+
);
110+
111+
$jobConfig = [
112+
'command' => $command,
113+
'queueSettings' => $this->queueManager->getQueueSettings($queueName),
114+
'cacheConfiguration' => $this->configurationManager->getConfiguration(
115+
ConfigurationManager::CONFIGURATION_TYPE_CACHES,
116+
'FlowPackJobQueueCommon_MessageCache'
117+
),
118+
'queueName' => $queueName,
119+
'temporaryDirectoryBase' => FLOW_PATH_TEMPORARY_BASE,
120+
'applicationIdentifier' => $this->configurationManager->getConfiguration(
121+
ConfigurationManager::CONFIGURATION_TYPE_SETTINGS,
122+
'Neos.Flow.cache.applicationIdentifier'
123+
),
124+
'contextString' => $this->configurationManager->getConfiguration(
125+
ConfigurationManager::CONFIGURATION_TYPE_SETTINGS,
126+
'Neos.Flow.core.context'
127+
),
128+
'workerPool' => $this->configurationManager->getConfiguration(
129+
ConfigurationManager::CONFIGURATION_TYPE_SETTINGS,
130+
'Netlogix.JobQueue.FastRabbit.supervisor.workerPool'
131+
),
132+
];
133+
134+
$factory = new ConfigurationFactory();
135+
$jobFilePath = $factory->getJobConfigurationFile($queueName);
136+
file_put_contents($jobFilePath, json_encode($jobConfig, JSON_PRETTY_PRINT));
137+
}
138+
139+
/**
140+
* @return string[]
141+
* @throws SchemaValidationException
142+
*/
143+
protected function collectQueueNames(): array
144+
{
145+
$locatorNames = self::collectLocatorNames($this->objectManager);
146+
$queueNames = [];
147+
foreach ($locatorNames as $locatorName) {
148+
$locator = $this->objectManager->get($locatorName);
149+
assert($locator instanceof Locator);
150+
foreach ($locator as $queueName) {
151+
if (in_array($queueName, $queueNames, true)) {
152+
throw new SchemaValidationException(
153+
sprintf('Duplicate supervisor config found for queue "%s".', $queueName),
154+
1594829585
155+
);
156+
}
157+
$queueNames[$queueName] = $queueName;
158+
}
159+
}
160+
return array_values($queueNames);
161+
}
162+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace Netlogix\JobQueue\FastRabbit\Job;
5+
6+
use Flowpack\JobQueue\Common\Queue\QueueManager;
7+
use Neos\Flow\Annotations as Flow;
8+
9+
class ConfigurationFactory
10+
{
11+
const __CONFIG_FILE__ = '__CONFIG_FILE__';
12+
const __QUEUE_NAME__ = '__QUEUE_NAME__';
13+
const __JOB_NAME__ = '__JOB_NAME__';
14+
const __CONTEXT__ = '__CONTEXT__';
15+
const __NUMPROCS__ = '__NUMPROCS__';
16+
17+
/**
18+
* @var string
19+
* @Flow\InjectConfiguration(package="Netlogix.JobQueue.FastRabbit", path="supervisor.contextName")
20+
*/
21+
protected $contextName;
22+
23+
/**
24+
* @var string
25+
* @Flow\InjectConfiguration(package="Netlogix.JobQueue.FastRabbit", path="supervisor.programTemplate")
26+
*/
27+
protected $programTemplate;
28+
29+
/**
30+
* @var string
31+
* @Flow\InjectConfiguration(package="Netlogix.JobQueue.FastRabbit", path="supervisor.groupTemplate")
32+
*/
33+
protected $groupTemplate;
34+
35+
/**
36+
* @var QueueManager
37+
* @Flow\Inject
38+
*/
39+
protected $queueManager;
40+
41+
public function getShortNameForQueueName(string $queueName): string
42+
{
43+
return preg_replace('%[^a-z0-9]%iUm', '-', strtolower($queueName));
44+
}
45+
46+
public function getJobNameForQueueName(string $queueName): string
47+
{
48+
return $this->getContextName() . '-' . $this->getShortNameForQueueName($queueName);
49+
}
50+
51+
public function buildJobConfigurationForQueue(string $queueName): string
52+
{
53+
$jobName = $this->getJobNameForQueueName($queueName);
54+
$queueSettings = $this->queueManager->getQueueSettings($queueName);
55+
$fastRabbitSettings = $queueSettings['fastRabbit'] ?? [];
56+
$numProcs = (int)($fastRabbitSettings['numProcs'] ?? 1);
57+
58+
$config = $this->programTemplate;
59+
$config = str_replace(self::__CONFIG_FILE__, $this->getJobConfigurationFile($queueName), $config);
60+
$config = str_replace(self::__QUEUE_NAME__, $queueName, $config);
61+
$config = str_replace(self::__JOB_NAME__, $jobName, $config);
62+
$config = str_replace(self::__CONTEXT__, $this->contextName, $config);
63+
$config = str_replace(self::__NUMPROCS__, $numProcs, $config);
64+
65+
return $config;
66+
}
67+
68+
public function buildGroupConfigurationForQueues(string ...$queueNames): string
69+
{
70+
$jobNames = array_map([$this, 'getJobNameForQueueName'], $queueNames);
71+
72+
$programs = $this->groupTemplate;
73+
$programs = str_replace('__PROGRAMS__', join(',', $jobNames), $programs);
74+
$programs = str_replace('__CONTEXT__', $this->contextName, $programs);
75+
76+
return $programs;
77+
}
78+
79+
public function getJobConfigurationFile(string $queueName): string
80+
{
81+
return $this->getJobFilePath($queueName, 'json');
82+
}
83+
84+
public function getJobSupervisorFile(string $queueName): string
85+
{
86+
return $this->getJobFilePath($queueName, 'conf');
87+
}
88+
89+
protected function getJobFilePath(string $queueName, string $suffix): string
90+
{
91+
$jobName = $this->getJobNameForQueueName($queueName);
92+
$pathPrefix = rtrim(FLOW_PATH_CONFIGURATION, '/') . '/Supervisor/';
93+
return sprintf($pathPrefix . 'program-%s.%s', $jobName, $suffix);
94+
}
95+
96+
protected function getContextName(): string
97+
{
98+
return trim(preg_replace('%[^\\pL]+%iUum', '-', $this->contextName), '-');
99+
}
100+
}

Classes/Lock.php

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace Netlogix\JobQueue\FastRabbit;
5+
6+
class Lock
7+
{
8+
private const ONE_PERCENT_OF_A_SECOND = 100000;
9+
10+
/**
11+
* @var string[]
12+
*/
13+
protected $slots = [];
14+
15+
/**
16+
* @var string|null
17+
*/
18+
protected $slotPath;
19+
20+
/**
21+
* @var resource|null
22+
*/
23+
protected $slotPointer;
24+
25+
public function __construct(int $numberOfWorkers, string $lockFileDirectory)
26+
{
27+
@mkdir($lockFileDirectory, 0777, true);
28+
for ($i = 0; $i < $numberOfWorkers; $i++) {
29+
$this->slots[] = $lockFileDirectory . '/' . sha1(__CLASS__ . $i) . '.lock';
30+
}
31+
}
32+
33+
public function run(callable $run)
34+
{
35+
$this->findSlot();
36+
try {
37+
return $run();
38+
} finally {
39+
@unlink($this->slotPath);
40+
flock($this->slotPointer, LOCK_UN);
41+
fclose($this->slotPointer);
42+
}
43+
}
44+
45+
protected function findSlot(): void
46+
{
47+
do {
48+
foreach ($this->slots as $slot) {
49+
$fp = fopen($slot, 'w+');
50+
if (flock($fp, LOCK_EX | LOCK_NB)) {
51+
$this->slotPath = $slot;
52+
$this->slotPointer = $fp;
53+
fputs($fp, (string)getmypid());
54+
return;
55+
}
56+
fclose($fp);
57+
}
58+
usleep(self::ONE_PERCENT_OF_A_SECOND);
59+
} while (true);
60+
}
61+
}

0 commit comments

Comments
 (0)