Skip to content

Commit

Permalink
Merge branch '5.x-dev' into spice-psr
Browse files Browse the repository at this point in the history
  • Loading branch information
AltamashShaikh authored Oct 21, 2024
2 parents 654c711 + 9ddbab2 commit 3ff85bd
Show file tree
Hide file tree
Showing 23 changed files with 723 additions and 94 deletions.
146 changes: 126 additions & 20 deletions Commands/Monitor.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ class Monitor extends ConsoleCommand
protected function configure()
{
$this->setName('queuedtracking:monitor');
$this->setDescription('Shows and updates the current state of the queue every 2 seconds.');
$this->setDescription("Shows and updates the current state of the queue every 2 seconds.\n Key ,=first page, .=last page, 0-9=move to page section, arrow LEFT=prev page, RIGHT=next page, UP=next 10 pages, DOWN=prev 10 pages, q=quit");
$this->addRequiredValueOption('iterations', null, 'If set, will limit the number of monitoring iterations done.');
$this->addRequiredValueOption('perpage', 'p', 'Number of queue worker displayed per page.', 16);
}

/**
Expand All @@ -36,6 +37,9 @@ protected function doExecute(): int
$systemCheck->checkRedisIsInstalled();
}

$output->write(str_repeat("\r\n", 100));
$output->write("\e[".(100)."A");

$iterations = $this->getIterationsFromArg();
if ($iterations !== null) {
$output->writeln("<info>Only running " . $iterations . " iterations.</info>");
Expand All @@ -58,34 +62,118 @@ protected function doExecute(): int
$output->writeln('The command <comment>./console queuedtracking:process</comment> has to be executed to process request sets within queue');
}

$output->writeln(sprintf('Up to %d workers will be used', $manager->getNumberOfAvailableQueues()));
$output->writeln(sprintf('Processor will start once there are at least %s request sets in the queue',
$output->writeln(sprintf('Up to <info>%d</> workers will be used', $manager->getNumberOfAvailableQueues()));
$output->writeln(sprintf('Processor will start once there are at least <info>%s</> request sets in the queue',
$manager->getNumberOfRequestsToProcessAtSameTime()));
$iterationCount = 0;

$qCurrentPage = 1;
$qCount = count($queues);
$qPerPAge = min(max($this->getPerPageFromArg(), 1), $qCount);
$qPageCount = ceil($qCount / $qPerPAge);

readline_callback_handler_install('', function() {});
stream_set_blocking (STDIN, false);

$output->writeln(str_repeat("-", 30));
$output->writeln("<fg=black;bg=white;options=bold>".str_pad(" Q INDEX", 10).str_pad(" | REQUEST SETS", 20)."</>");
$output->writeln(str_repeat("-", 30));

$lastStatsTimer = microtime(true) - 2;
$lastSumInQueue = false;
$diffSumInQueue = 0;
$keyPressed = "";

$output->write(str_repeat("\r\n", $qPerPAge + 5));

while (1) {
$memory = $backend->getMemoryStats(); // I know this will only work with redis currently as it is not defined in backend interface etc. needs to be refactored once we add another backend
if (microtime(true) - $lastStatsTimer >= 2 || $keyPressed != "")
{
$output->write("\e[".($qPerPAge + 5)."A");

$qCurrentPage = min(max($qCurrentPage, 1), $qPageCount);
$memory = $backend->getMemoryStats(); // I know this will only work with redis currently as it is not defined in backend interface etc. needs to be refactored once we add another backend

$sumInQueue = 0;
foreach ($queues as $sumQ) {
$sumInQueue += $sumQ->getNumberOfRequestSetsInQueue();
}

if ($lastSumInQueue !== false) {
$diffSumInQueue = $lastSumInQueue - $sumInQueue;
$diffRps = round($diffSumInQueue / (microtime(true) - $lastStatsTimer), 2);
$diffSumInQueue = $diffSumInQueue < 0 ? "<fg=red;options=bold>".abs($diffRps)."</>" : "<fg=green;options=bold>{$diffRps}</>";
}

$numInQueue = 0;
for ($idxPage = 0; $idxPage < $qPerPAge; $idxPage++) {
$idx = ($qCurrentPage - 1) * $qPerPAge + $idxPage;
if (isset($queues[$idx])) {
$q = $queues[$idx]->getNumberOfRequestSetsInQueue();
$numInQueue += (int)$q;
$output->writeln(str_pad($idx, 10, " ", STR_PAD_LEFT)." | ".str_pad(number_format($q), 16, " ", STR_PAD_LEFT));
} else {
$output->writeln(str_pad("", 10)." | ".str_pad("", 16));
}
}

$output->writeln(str_repeat("-", 30));
$output->writeln("<fg=black;bg=white;options=bold>".str_pad(" ".($qCount)." Q", 10)." | ".str_pad(number_format($sumInQueue)." R", 16)."</>");
$output->writeln(str_repeat("-", 30));
$output->writeln(sprintf(
"Q [%s-%s] | <info>page %s/%s</> | <comment>press (0-9.,q) or arrow(L,R,U,D)</> | diff/sec %s \n".
"%s used memory (%s peak). <info>%d</> workers active.".str_repeat(" ", 15),
($idx - $qPerPAge + 1),
$idx, $qCurrentPage, $qPageCount, $diffSumInQueue,
$memory['used_memory_human'] ?? 'Unknown',
$memory['used_memory_peak_human'] ?? 'Unknown',
$lock->getNumberOfAcquiredLocks()
));

if (!is_null($iterations)) {
$iterationCount += 1;
if ($iterationCount >= $iterations) {
break;
}
}

$numInQueue = array();
foreach ($queues as $queue) {
$numInQueue[] = $queue->getNumberOfRequestSetsInQueue();
$lastSumInQueue = $sumInQueue;
$lastStatsTimer = microtime(true);
}

$message = sprintf('%s (%s) request sets left in queue. %s used memory (%s peak). %d workers active. ',
array_sum($numInQueue),
implode('+', $numInQueue),
$memory['used_memory_human'] ?? 'Unknown',
$memory['used_memory_peak_human'] ?? 'unknown',
$lock->getNumberOfAcquiredLocks());
$output->write("\x0D");
$output->write($message);
if (!is_null($iterations)) {
$iterationCount += 1;
if ($iterationCount >= $iterations) {
break;
$keyStroke = stream_get_contents(STDIN, 3);
$keyPressed = strlen($keyStroke) == 3 ? $keyStroke[2] : (strlen($keyStroke) > 0 ? $keyStroke[0] : "");
if ($keyPressed != "" and in_array($keyPressed, array(".", ",", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "q"))) {
switch ($keyPressed) {
case "0": case "1": case "2": case "3": case "4":
case "5": case "6": case "7": case "8": case "9":
$keyPressed = $keyPressed != "0" ? $keyPressed : "10";
$qCurrentPage = floor(($qCurrentPage - 0.1) / 10) * 10 + (int)$keyPressed; break;
case "C":
$qCurrentPage++;
break;
case "D":
$qCurrentPage--;
break;
case "A":
$qCurrentPage += 10;
break;
case "B":
$qCurrentPage -= 10;
break;
case ",":
$qCurrentPage = 1;
break;
case ".":
$qCurrentPage = $qPageCount;
break;
case "q":
$output->writeln('');
die;
}
}
sleep(2);

usleep(5000);
}

return self::SUCCESS;
Expand All @@ -112,4 +200,22 @@ private function getIterationsFromArg()
return $iterations;
}

/**
* Loads the `perpage` argument from the commands arguments.
*
* @return int|null
*/
private function getPerPageFromArg()
{
$perPage = $this->getInput()->getOption('perpage');
if (!is_numeric($perPage)) {
throw new \Exception('perpage needs to be numeric');
} else {
$perPage = (int)$perPage;
if ($perPage <= 0) {
throw new \Exception('perpage needs to be a non-zero positive number');
}
}
return $perPage;
}
}
86 changes: 72 additions & 14 deletions Commands/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ protected function configure()
$this->setName('queuedtracking:process');
$this->addRequiredValueOption('queue-id', null, 'If set, will only work on that specific queue. For example "0" or "1" (if there are multiple queues). Not recommended when only one worker is in use. If for example 4 workers are in use, you may want to use 0, 1, 2, or 3.');

Check warning on line 27 in Commands/Process.php

View workflow job for this annotation

GitHub Actions / PHPCS

Process.php: Line exceeds 250 characters; contains 281 characters
$this->addRequiredValueOption('force-num-requests-process-at-once', null, 'If defined, it overwrites the setting of how many requests will be picked out of the queue and processed at once. Must be a number which is >= 1. By default, the configured value from the settings will be used. This can be useful for example if you want to process every single request within the queue. If otherwise a batch size of say 100 is configured, then there may be otherwise 99 requests left in the queue. It can be also useful for testing purposes.');

Check warning on line 28 in Commands/Process.php

View workflow job for this annotation

GitHub Actions / PHPCS

Process.php: Line exceeds 250 characters; contains 544 characters
$this->addRequiredValueOption('cycle', 'c', 'The proccess will automatically loop for "n" cycle time(s), set "0" to infinite.', 1);
$this->addRequiredValueOption('sleep', 's', 'Take a nap for "n" second(s) before recycle, minimum is 1 second.', 1);
$this->addRequiredValueOption('delay', 'd', 'Delay before finished', 0);
$this->setDescription('Processes all queued tracking requests in case there are enough requests in the queue and in case they are not already in process by another script. To keep track of the queue use the <comment>--verbose</comment> option or execute the <comment>queuedtracking:monitor</comment> command.');

Check warning on line 32 in Commands/Process.php

View workflow job for this annotation

GitHub Actions / PHPCS

Process.php: Line exceeds 250 characters; contains 319 characters
}

Expand Down Expand Up @@ -76,29 +79,84 @@ protected function doExecute(): int
throw new \Exception('Number of requests to process must be a number and at least 1');
}

$output->writeln("<info>Starting to process request sets, this can take a while</info>");

register_shutdown_function(function () use ($queueManager) {
$queueManager->unlock();
});

$startTime = microtime(true);
$processor = new Processor($queueManager);
$processor->setNumberOfMaxBatchesToProcess(500);
$tracker = $processor->process();

$neededTime = (microtime(true) - $startTime);
$numRequestsTracked = $tracker->getCountOfLoggedRequests();
$requestsPerSecond = $this->getNumberOfRequestsPerSecond($numRequestsTracked, $neededTime);

Piwik::postEvent('Tracker.end');
$numberOfProcessCycle = $input->getOption('cycle');
if (!is_numeric($numberOfProcessCycle)) {
throw new \Exception('"cycle" needs to be numeric');
}
$numberOfProcessCycle = (int)$numberOfProcessCycle;
$infiniteCycle = $numberOfProcessCycle == 0;

$delayedBeforeFinish = (int)$input->getOption('delay');

$napster = max(1, $input->getOption('sleep'));
if (!is_numeric($napster)) {
throw new \Exception('"nap" needs to be numeric');
}
$napster = (int)$napster;

$lastTimeGotMoreThanZeroTrackedReq = microtime(true);
$originalNumberOfRequestsToProcessAtSameTime = $queueManager->getNumberOfRequestsToProcessAtSameTime();

while ($numberOfProcessCycle > 0 || $infiniteCycle) {
$wipingOutQueue = false;
if (microtime(true) - $lastTimeGotMoreThanZeroTrackedReq > 10) {
$queueManager->setNumberOfRequestsToProcessAtSameTime(1);
$wipingOutQueue = true;
$lastTimeGotMoreThanZeroTrackedReq = microtime(true);
}

if ($wipingOutQueue) {
$output->writeln("<fg=red;bg=white;options=bold> TRYING TO WIPE OUT THE QUEUE </>");
}
$output->writeln("<info>Starting to process request sets, this can take a while</info>");

$startTime = microtime(true);
$processor = new Processor($queueManager);
$processor->setNumberOfMaxBatchesToProcess(500);
$tracker = $processor->process();

$neededTime = (microtime(true) - $startTime);
$numRequestsTracked = $tracker->getCountOfLoggedRequests();
$requestsPerSecond = $this->getNumberOfRequestsPerSecond($numRequestsTracked, $neededTime);

$this->writeSuccessMessage(
array(sprintf('This worker finished queue processing with %sreq/s (%s requests in %02.2f seconds)', $requestsPerSecond, $numRequestsTracked, $neededTime))
);
Piwik::postEvent('Tracker.end');

if ($numRequestsTracked > 0) {
$lastTimeGotMoreThanZeroTrackedReq = microtime(true);
}

if (!$infiniteCycle) {
$numberOfProcessCycle--;
}
if ($numberOfProcessCycle > 0 || $infiniteCycle) {
$cTogo = $infiniteCycle ? "infinite" : $numberOfProcessCycle;
$output->writeln("===========================================================================");
$output->writeln("<comment>Taking a nap for {$napster} second(s), before re-running the process. <info>({$cTogo})</info> cyle(s) to go.</comment>");
$output->writeln("===========================================================================");
sleep($napster);
}

if ($wipingOutQueue) {
$queueManager->setNumberOfRequestsToProcessAtSameTime($originalNumberOfRequestsToProcessAtSameTime);
}
}

// Piwik::postEvent('Tracker.end');
$trackerEnvironment->destroy();

$this->writeSuccessMessage(
array(sprintf('This worker finished queue processing with %sreq/s (%s requests in %02.2f seconds)', $requestsPerSecond, $numRequestsTracked, $neededTime))
);

if ($delayedBeforeFinish > 0) {
sleep($delayedBeforeFinish);
}
return self::SUCCESS;
}

Expand Down
11 changes: 8 additions & 3 deletions Commands/Test.php
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ protected function doExecute(): int
$output->writeln('Timeout: ' . $settings->redisTimeout->getValue());
$output->writeln('Password: ' . $settings->redisPassword->getValue());
$output->writeln('Database: ' . $settings->redisDatabase->getValue());
$output->writeln('UseSentinelBackend: ' . (int) $settings->useSentinelBackend->getValue());
$output->writeln('RedisBackendType: ' . $settings->getRedisType());
$output->writeln('SentinelMasterName: ' . $settings->sentinelMasterName->getValue());

$output->writeln('');
Expand Down Expand Up @@ -222,9 +222,14 @@ protected function doExecute(): int
*/
private function getRedisConfig($redis, $configName)
{
$config = $redis->config('GET', $configName);
$value = strtolower(array_shift($config));
if ($redis instanceof \RedisCluster) {
$config = $redis->config('CONFIG', 'GET', $configName);
unset($config[0]);
} else {
$config = $redis->config('GET', $configName);
}

$value = strtolower(array_shift($config));
return $value;
}

Expand Down
Loading

0 comments on commit 3ff85bd

Please sign in to comment.