From c08817bbe5c757a6ceb5fbc5366e966056f676dc Mon Sep 17 00:00:00 2001 From: haristku Date: Wed, 9 Oct 2024 20:42:15 +0700 Subject: [PATCH 1/3] enhanced monitor, process commands --- Commands/Monitor.php | 171 +++++++++++++++++++++++++++++++++++++------ Commands/Process.php | 86 ++++++++++++++++++---- docs/faq.md | 23 ++++++ 3 files changed, 245 insertions(+), 35 deletions(-) diff --git a/Commands/Monitor.php b/Commands/Monitor.php index adbd22c..1446378 100644 --- a/Commands/Monitor.php +++ b/Commands/Monitor.php @@ -19,8 +19,9 @@ class Monitor extends ConsoleCommand protected function configure() { $this->setName('queuedtracking:monitor'); - $this->setDescription('Shows and updates the current state of the queue every 2 seconds.'); + $this->setDescription("Shows and updates the current state of the queue every 2 seconds.\n Key ,=first page, .=last page, 0-9=move to page section, arrow LEFT=prev page, RIGHT=next page, UP=next 10 pages, DOWN=prev 10 pages, q=quit"); $this->addRequiredValueOption('iterations', null, 'If set, will limit the number of monitoring iterations done.'); + $this->addRequiredValueOption('perpage', 'p', 'Number of queue worker displayed per page.', 16); } /** @@ -36,6 +37,11 @@ protected function doExecute(): int $systemCheck->checkRedisIsInstalled(); } + if (!$this->isPcntlFunctionAvailable()) { + $output->write(str_repeat("\r\n", 100)); + $output->write("\e[".(100)."A"); + } + $iterations = $this->getIterationsFromArg(); if ($iterations !== null) { $output->writeln("Only running " . $iterations . " iterations."); @@ -58,34 +64,130 @@ protected function doExecute(): int $output->writeln('The command ./console queuedtracking:process has to be executed to process request sets within queue'); } - $output->writeln(sprintf('Up to %d workers will be used', $manager->getNumberOfAvailableQueues())); - $output->writeln(sprintf('Processor will start once there are at least %s request sets in the queue', + $output->writeln(sprintf('Up to %d workers will be used', $manager->getNumberOfAvailableQueues())); + $output->writeln(sprintf('Processor will start once there are at least %s request sets in the queue', $manager->getNumberOfRequestsToProcessAtSameTime())); $iterationCount = 0; - + + $qCurrentPage = 1; + $qCount = count($queues); + $qPerPAge = min(max($this->getPerPageFromArg(), 1), $qCount); + $qPageCount = ceil($qCount / $qPerPAge); + + $signalTrap = function() use ($output) { + $output->writeln("\e[u\e[?25h"); + die; + }; + if ($this->isPcntlFunctionAvailable()) + { + pcntl_signal(SIGINT, $signalTrap); + pcntl_signal(SIGTERM, $signalTrap); + } + + readline_callback_handler_install('', function() {}); + stream_set_blocking (STDIN, false); + + $output->writeln(str_repeat("-", 30)); + $output->writeln("".str_pad(" Q INDEX", 10).str_pad(" | REQUEST SETS", 20).""); + $output->writeln(str_repeat("-", 30)); + $output->write("\e[?25l"); + + $lastStatsTimer = microtime(true) - 2; + $lastSumInQueue = false; + $diffSumInQueue = 0; + $keyPressed = ""; while (1) { - $memory = $backend->getMemoryStats(); // I know this will only work with redis currently as it is not defined in backend interface etc. needs to be refactored once we add another backend + if ($this->isPcntlFunctionAvailable()) { + pcntl_signal_dispatch(); + } + + if (microtime(true) - $lastStatsTimer >= 2 || $keyPressed != "") + { + $qCurrentPage = min(max($qCurrentPage, 1), $qPageCount); + $memory = $backend->getMemoryStats(); // I know this will only work with redis currently as it is not defined in backend interface etc. needs to be refactored once we add another backend + + $sumInQueue = 0; + foreach ($queues as $sumQ) { + $sumInQueue += $sumQ->getNumberOfRequestSetsInQueue(); + } + + if ($lastSumInQueue !== false) { + $diffSumInQueue = $lastSumInQueue - $sumInQueue; + $diffRps = round($diffSumInQueue / (microtime(true) - $lastStatsTimer), 2); + $diffSumInQueue = $diffSumInQueue < 0 ? "".abs($diffRps)."" : "{$diffRps}"; + } + + $numInQueue = 0; + for ($idxPage = 0; $idxPage < $qPerPAge; $idxPage++) { + $idx = ($qCurrentPage - 1) * $qPerPAge + $idxPage; + if (isset($queues[$idx])) { + $q = $queues[$idx]->getNumberOfRequestSetsInQueue(); + $numInQueue += (int)$q; + $output->writeln(str_pad($idx, 10, " ", STR_PAD_LEFT)." | ".str_pad(number_format($q), 16, " ", STR_PAD_LEFT)); + } else { + $output->writeln(str_pad("", 10)." | ".str_pad("", 16)); + } + } + + $output->writeln(str_repeat("-", 30)); + $output->writeln("".str_pad(" ".($qCount)." Q", 10)." | ".str_pad(number_format($sumInQueue)." R", 16).""); + $output->writeln(str_repeat("-", 30)); + $output->writeln(sprintf( + "Q [%s-%s] | page %s/%s | press (0-9.,q) or arrow(L,R,U,D) | diff/sec %s \n". + "%s used memory (%s peak). %d workers active.".str_repeat(" ", 15), + ($idx - $qPerPAge + 1), + $idx, $qCurrentPage, $qPageCount, $diffSumInQueue, + $memory['used_memory_human'] ?? 'Unknown', + $memory['used_memory_peak_human'] ?? 'Unknown', + $lock->getNumberOfAcquiredLocks() + )); + $output->write("\e[s"); + $output->write("\e[0G"); + $output->write("\e[".($qPerPAge + 5)."A"); + + if (!is_null($iterations)) { + $iterationCount += 1; + if ($iterationCount >= $iterations) { + break; + } + } - $numInQueue = array(); - foreach ($queues as $queue) { - $numInQueue[] = $queue->getNumberOfRequestSetsInQueue(); + $lastSumInQueue = $sumInQueue; + $lastStatsTimer = microtime(true); } - $message = sprintf('%s (%s) request sets left in queue. %s used memory (%s peak). %d workers active. ', - array_sum($numInQueue), - implode('+', $numInQueue), - $memory['used_memory_human'] ?? 'Unknown', - $memory['used_memory_peak_human'] ?? 'unknown', - $lock->getNumberOfAcquiredLocks()); - $output->write("\x0D"); - $output->write($message); - if (!is_null($iterations)) { - $iterationCount += 1; - if ($iterationCount >= $iterations) { - break; + $keyStroke = stream_get_contents(STDIN, 3); + $keyPressed = strlen($keyStroke) == 3 ? $keyStroke[2] : (strlen($keyStroke) > 0 ? $keyStroke[0] : ""); + if ($keyPressed != "" and in_array($keyPressed, array(".", ",", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "q"))) { + switch ($keyPressed) { + case "0": case "1": case "2": case "3": case "4": + case "5": case "6": case "7": case "8": case "9": + $keyPressed = $keyPressed != "0" ? $keyPressed : "10"; + $qCurrentPage = floor(($qCurrentPage - 0.1) / 10) * 10 + (int)$keyPressed; break; + case "C": + $qCurrentPage++; + break; + case "D": + $qCurrentPage--; + break; + case "A": + $qCurrentPage += 10; + break; + case "B": + $qCurrentPage -= 10; + break; + case ",": + $qCurrentPage = 1; + break; + case ".": + $qCurrentPage = $qPageCount; + break; + case "q": + $signalTrap(); } } - sleep(2); + + usleep(5000); } return self::SUCCESS; @@ -112,4 +214,31 @@ private function getIterationsFromArg() return $iterations; } + /** + * Loads the `perpage` argument from the commands arguments. + * + * @return int|null + */ + private function getPerPageFromArg() + { + $perPage = $this->getInput()->getOption('perpage'); + if (!is_numeric($perPage)) { + throw new \Exception('perpage needs to be numeric'); + } else { + $perPage = (int)$perPage; + if ($perPage <= 0) { + throw new \Exception('perpage needs to be a non-zero positive number'); + } + } + return $perPage; + } + + private function isPcntlFunctionAvailable() + { + if (extension_loaded('pcntl') && function_exists('pcntl_signal') && function_exists('pcntl_signal_dispatch')) { + return true; + } + + return false; + } } diff --git a/Commands/Process.php b/Commands/Process.php index 27e5c4d..295e6b0 100644 --- a/Commands/Process.php +++ b/Commands/Process.php @@ -26,6 +26,9 @@ protected function configure() $this->setName('queuedtracking:process'); $this->addRequiredValueOption('queue-id', null, 'If set, will only work on that specific queue. For example "0" or "1" (if there are multiple queues). Not recommended when only one worker is in use. If for example 4 workers are in use, you may want to use 0, 1, 2, or 3.'); $this->addRequiredValueOption('force-num-requests-process-at-once', null, 'If defined, it overwrites the setting of how many requests will be picked out of the queue and processed at once. Must be a number which is >= 1. By default, the configured value from the settings will be used. This can be useful for example if you want to process every single request within the queue. If otherwise a batch size of say 100 is configured, then there may be otherwise 99 requests left in the queue. It can be also useful for testing purposes.'); + $this->addRequiredValueOption('cycle', 'c', 'The proccess will automatically loop for "n" cycle time(s), set "0" to infinite.', 1); + $this->addRequiredValueOption('sleep', 's', 'Take a nap for "n" second(s) before recycle, minimum is 1 second.', 1); + $this->addRequiredValueOption('delay', 'd', 'Delay before finished', 0); $this->setDescription('Processes all queued tracking requests in case there are enough requests in the queue and in case they are not already in process by another script. To keep track of the queue use the --verbose option or execute the queuedtracking:monitor command.'); } @@ -76,29 +79,84 @@ protected function doExecute(): int throw new \Exception('Number of requests to process must be a number and at least 1'); } - $output->writeln("Starting to process request sets, this can take a while"); - register_shutdown_function(function () use ($queueManager) { $queueManager->unlock(); }); - $startTime = microtime(true); - $processor = new Processor($queueManager); - $processor->setNumberOfMaxBatchesToProcess(500); - $tracker = $processor->process(); - $neededTime = (microtime(true) - $startTime); - $numRequestsTracked = $tracker->getCountOfLoggedRequests(); - $requestsPerSecond = $this->getNumberOfRequestsPerSecond($numRequestsTracked, $neededTime); - Piwik::postEvent('Tracker.end'); + $numberOfProcessCycle = $input->getOption('cycle'); + if (!is_numeric($numberOfProcessCycle)) { + throw new \Exception('"cycle" needs to be numeric'); + } + $numberOfProcessCycle = (int)$numberOfProcessCycle; + $infiniteCycle = $numberOfProcessCycle == 0; + + $delayedBeforeFinish = (int)$input->getOption('delay'); + + $napster = max(1, $input->getOption('sleep')); + if (!is_numeric($napster)) { + throw new \Exception('"nap" needs to be numeric'); + } + $napster = (int)$napster; + + $lastTimeGotMoreThanZeroTrackedReq = microtime(true); + $originalNumberOfRequestsToProcessAtSameTime = $queueManager->getNumberOfRequestsToProcessAtSameTime(); + + while ($numberOfProcessCycle > 0 || $infiniteCycle) { + $wipingOutQueue = false; + if (microtime(true) - $lastTimeGotMoreThanZeroTrackedReq > 10) { + $queueManager->setNumberOfRequestsToProcessAtSameTime(1); + $wipingOutQueue = true; + $lastTimeGotMoreThanZeroTrackedReq = microtime(true); + } + + if ($wipingOutQueue) { + $output->writeln(" TRYING TO WIPE OUT THE QUEUE "); + } + $output->writeln("Starting to process request sets, this can take a while"); + + $startTime = microtime(true); + $processor = new Processor($queueManager); + $processor->setNumberOfMaxBatchesToProcess(500); + $tracker = $processor->process(); + + $neededTime = (microtime(true) - $startTime); + $numRequestsTracked = $tracker->getCountOfLoggedRequests(); + $requestsPerSecond = $this->getNumberOfRequestsPerSecond($numRequestsTracked, $neededTime); + + $this->writeSuccessMessage( + array(sprintf('This worker finished queue processing with %sreq/s (%s requests in %02.2f seconds)', $requestsPerSecond, $numRequestsTracked, $neededTime)) + ); + Piwik::postEvent('Tracker.end'); + + if ($numRequestsTracked > 0) { + $lastTimeGotMoreThanZeroTrackedReq = microtime(true); + } + + if (!$infiniteCycle) { + $numberOfProcessCycle--; + } + if ($numberOfProcessCycle > 0 || $infiniteCycle) { + $cTogo = $infiniteCycle ? "infinite" : $numberOfProcessCycle; + $output->writeln("==========================================================================="); + $output->writeln("Taking a nap for {$napster} second(s) before re-run the process. ({$cTogo}) cyle(s) to go."); + $output->writeln("==========================================================================="); + sleep($napster); + } + + if ($wipingOutQueue) { + $queueManager->setNumberOfRequestsToProcessAtSameTime($originalNumberOfRequestsToProcessAtSameTime); + } + } + // Piwik::postEvent('Tracker.end'); $trackerEnvironment->destroy(); - $this->writeSuccessMessage( - array(sprintf('This worker finished queue processing with %sreq/s (%s requests in %02.2f seconds)', $requestsPerSecond, $numRequestsTracked, $neededTime)) - ); - + if ($delayedBeforeFinish > 0) { + sleep($delayedBeforeFinish); + } + return self::SUCCESS; } diff --git a/docs/faq.md b/docs/faq.md index 2b657cf..b624c55 100644 --- a/docs/faq.md +++ b/docs/faq.md @@ -32,6 +32,7 @@ requests using the [Piwik console](http://developer.piwik.org/guides/piwik-on-th * Disable the setting "Process during tracking request" in the Piwik UI under "Settings => Plugin Settings" * Setup a cronjob that executes the command `./console queuedtracking:process` for instance every minute * That's it +* Or, if you have __"non WINDOWS OS"__ you can use the [Supervisor](http://supervisord.org/) as a cron alternative. The `queuedtracking:process` command will make sure to process all queued tracking requests whenever possible and the command will exit as soon as there are not enough requests queued anymore. That's why you should setup a cronjob to start @@ -43,6 +44,28 @@ Example crontab entry that starts the processor every minute: `* * * * * cd /piwik && ./console queuedtracking:process >/dev/null 2>&1` +Example Supervisor entry that will start 16 processors/workers with 10 loop cycle times and auto restart: + +```ini +[program:matomo] +directory=/path/to/your/matomo +command=/path/to/your/php /path/to/your/matomo/console queuedtracking:process --queue-id=%(process_num)s -c 10 -s 2 -d 5 +process_name=queuedtracking-%(process_num)s + +#change the number according to how many worker(s) you have +numprocs=16 + +numprocs_start=0 +stopsignal=TERM +autostart=true +autorestart=true +stopwaitsecs=120 +#priority=1000 +stdout_logfile=/dev/null +stdout_logfile_maxbytes=0 +redirect_stderr=true +``` + __Can I keep track of the state of the queue?__ Yes, you can. Just execute the command `./console queuedtracking:monitor`. This will show the current state of the queue. To exit this command you can for example press `CTRL + C` key at the same time. From a563ad25e20622ea365889e8722cad8ce3970ef3 Mon Sep 17 00:00:00 2001 From: haristku <48225314+haristku@users.noreply.github.com> Date: Wed, 16 Oct 2024 14:30:50 +0700 Subject: [PATCH 2/3] Update Commands/Process.php Co-authored-by: Altamash Shaikh --- Commands/Process.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Commands/Process.php b/Commands/Process.php index 295e6b0..a913561 100644 --- a/Commands/Process.php +++ b/Commands/Process.php @@ -140,7 +140,7 @@ protected function doExecute(): int if ($numberOfProcessCycle > 0 || $infiniteCycle) { $cTogo = $infiniteCycle ? "infinite" : $numberOfProcessCycle; $output->writeln("==========================================================================="); - $output->writeln("Taking a nap for {$napster} second(s) before re-run the process. ({$cTogo}) cyle(s) to go."); + $output->writeln("Taking a nap for {$napster} second(s), before re-running the process. ({$cTogo}) cyle(s) to go."); $output->writeln("==========================================================================="); sleep($napster); } From bb16ceb54803dc2e53272bf3561a4cc45d877178 Mon Sep 17 00:00:00 2001 From: haristku Date: Wed, 16 Oct 2024 14:44:38 +0700 Subject: [PATCH 3/3] remove `pcntl`. the consequence is blinking cursor on monitor command, coz cursor hide and show mechanism cannot be restored without pcntl sigterm --- Commands/Monitor.php | 43 ++++++++++--------------------------------- 1 file changed, 10 insertions(+), 33 deletions(-) diff --git a/Commands/Monitor.php b/Commands/Monitor.php index 1446378..25b633a 100644 --- a/Commands/Monitor.php +++ b/Commands/Monitor.php @@ -37,10 +37,8 @@ protected function doExecute(): int $systemCheck->checkRedisIsInstalled(); } - if (!$this->isPcntlFunctionAvailable()) { - $output->write(str_repeat("\r\n", 100)); - $output->write("\e[".(100)."A"); - } + $output->write(str_repeat("\r\n", 100)); + $output->write("\e[".(100)."A"); $iterations = $this->getIterationsFromArg(); if ($iterations !== null) { @@ -73,16 +71,6 @@ protected function doExecute(): int $qCount = count($queues); $qPerPAge = min(max($this->getPerPageFromArg(), 1), $qCount); $qPageCount = ceil($qCount / $qPerPAge); - - $signalTrap = function() use ($output) { - $output->writeln("\e[u\e[?25h"); - die; - }; - if ($this->isPcntlFunctionAvailable()) - { - pcntl_signal(SIGINT, $signalTrap); - pcntl_signal(SIGTERM, $signalTrap); - } readline_callback_handler_install('', function() {}); stream_set_blocking (STDIN, false); @@ -90,19 +78,19 @@ protected function doExecute(): int $output->writeln(str_repeat("-", 30)); $output->writeln("".str_pad(" Q INDEX", 10).str_pad(" | REQUEST SETS", 20).""); $output->writeln(str_repeat("-", 30)); - $output->write("\e[?25l"); $lastStatsTimer = microtime(true) - 2; $lastSumInQueue = false; $diffSumInQueue = 0; $keyPressed = ""; - while (1) { - if ($this->isPcntlFunctionAvailable()) { - pcntl_signal_dispatch(); - } + $output->write(str_repeat("\r\n", $qPerPAge + 5)); + + while (1) { if (microtime(true) - $lastStatsTimer >= 2 || $keyPressed != "") { + $output->write("\e[".($qPerPAge + 5)."A"); + $qCurrentPage = min(max($qCurrentPage, 1), $qPageCount); $memory = $backend->getMemoryStats(); // I know this will only work with redis currently as it is not defined in backend interface etc. needs to be refactored once we add another backend @@ -141,10 +129,7 @@ protected function doExecute(): int $memory['used_memory_peak_human'] ?? 'Unknown', $lock->getNumberOfAcquiredLocks() )); - $output->write("\e[s"); - $output->write("\e[0G"); - $output->write("\e[".($qPerPAge + 5)."A"); - + if (!is_null($iterations)) { $iterationCount += 1; if ($iterationCount >= $iterations) { @@ -183,7 +168,8 @@ protected function doExecute(): int $qCurrentPage = $qPageCount; break; case "q": - $signalTrap(); + $output->writeln(''); + die; } } @@ -232,13 +218,4 @@ private function getPerPageFromArg() } return $perPage; } - - private function isPcntlFunctionAvailable() - { - if (extension_loaded('pcntl') && function_exists('pcntl_signal') && function_exists('pcntl_signal_dispatch')) { - return true; - } - - return false; - } }