Skip to content

Commit

Permalink
Merge pull request #391: Optimize garbage collecting
Browse files Browse the repository at this point in the history
  • Loading branch information
roxblnfk authored Jan 18, 2024
2 parents 7e646c6 + f698ec6 commit ba2d5c3
Show file tree
Hide file tree
Showing 106 changed files with 513 additions and 392 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/run-test-suite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ jobs:
- name: Run tests with Temporal test server
if: inputs.run-temporal-test-server == true
run: ${{ format(
'vendor/bin/phpunit --testsuite={0} --testdox --verbose --exclude {1}',
'vendor/bin/phpunit --testsuite={0} --testdox --colors=always --exclude-group {1}',
inputs.test-suite,
contains(matrix.extensions-suffix, 'protobuf') && 'skip-on-test-server,skip-ext-protobuf' || 'skip-on-test-server'
) }}
Expand All @@ -125,4 +125,4 @@ jobs:

- name: Run tests without Temporal test server
if: inputs.run-temporal-test-server == false
run: vendor/bin/phpunit --testsuite=${{ inputs.test-suite }} --testdox --verbose
run: vendor/bin/phpunit --testsuite=${{ inputs.test-suite }} --testdox --colors=always
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
"illuminate/support": "^9.0",
"jetbrains/phpstorm-attributes": "dev-master@dev",
"laminas/laminas-code": "^4.0",
"phpunit/phpunit": "^9.5.21",
"phpunit/phpunit": "^10.5",
"symfony/var-dumper": "^6.0 || ^7.0",
"vimeo/psalm": "^4.30 || ^5.4"
},
Expand Down
47 changes: 21 additions & 26 deletions phpunit.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,31 @@
xsi:noNamespaceSchemaLocation="vendor/phpunit/phpunit/phpunit.xsd"
bootstrap="tests/bootstrap.php"
backupGlobals="false"
backupStaticAttributes="false"
colors="true"
verbose="true"
convertErrorsToExceptions="true"
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
processIsolation="false"
stopOnFailure="false"
stopOnError="false"
stderr="true"
>
<coverage>
<include>
<directory>src</directory>
</include>
</coverage>
<testsuites>
<testsuite name="Unit">
<directory suffix="TestCase.php">tests/Unit</directory>
</testsuite>
<testsuite name="Feature">
<directory suffix="TestCase.php">tests/Feature</directory>
</testsuite>
<testsuite name="Functional">
<directory suffix="TestCase.php">tests/Functional</directory>
</testsuite>
</testsuites>
<php>
<ini name="error_reporting" value="-1"/>
<ini name="memory_limit" value="-1"/>
<env name="TEMPORAL_ADDRESS" value="127.0.0.1:7233" />
</php>
<testsuites>
<testsuite name="Unit">
<directory suffix="TestCase.php">tests/Unit</directory>
</testsuite>
<testsuite name="Feature">
<directory suffix="TestCase.php">tests/Feature</directory>
</testsuite>
<testsuite name="Functional">
<directory suffix="TestCase.php">tests/Functional</directory>
</testsuite>
</testsuites>
<php>
<ini name="error_reporting" value="-1"/>
<ini name="memory_limit" value="-1"/>
<env name="TEMPORAL_ADDRESS" value="127.0.0.1:7233"/>
</php>
<source>
<include>
<directory>src</directory>
</include>
</source>
</phpunit>
10 changes: 1 addition & 9 deletions psalm-baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@
<MissingTemplateParam>
<code>\IteratorAggregate</code>
</MissingTemplateParam>
</file>
</file>c
<file src="src/Internal/Repository/ArrayRepository.php">
<InvalidReturnStatement>
<code><![CDATA[$this->entries[$id] ?? null]]></code>
Expand Down Expand Up @@ -1159,9 +1159,6 @@
</DocblockTypeContradiction>
</file>
<file src="src/Internal/Transport/Router/DestroyWorkflow.php">
<DocblockTypeContradiction>
<code>$process === null</code>
</DocblockTypeContradiction>
<UndefinedInterfaceMethod>
<code>pull</code>
</UndefinedInterfaceMethod>
Expand Down Expand Up @@ -1333,11 +1330,6 @@
<code>$e</code>
</UnusedClosureParam>
</file>
<file src="src/Internal/Workflow/ProcessCollection.php">
<DocblockTypeContradiction>
<code>$process === null</code>
</DocblockTypeContradiction>
</file>
<file src="src/Internal/Workflow/Proxy.php">
<MissingReturnType>
<code>__call</code>
Expand Down
2 changes: 1 addition & 1 deletion src/DataConverter/EncodedCollection.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
*
* @implements IteratorAggregate<TKey, TValue>
*/
class EncodedCollection implements IteratorAggregate
class EncodedCollection implements IteratorAggregate, Countable
{
/**
* @var DataConverterInterface|null
Expand Down
21 changes: 21 additions & 0 deletions src/Internal/Declaration/Destroyable.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Temporal\Internal\Declaration;

/**
* Means that instance can be destroyed. It's preferred to destroy instance to guarantee
* that all resources related to it will be released.
*/
interface Destroyable
{
public function destroy(): void;
}
10 changes: 9 additions & 1 deletion src/Internal/Declaration/WorkflowInstance.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* @psalm-type QueryHandler = \Closure(QueryInput): mixed
* @psalm-type QueryExecutor = \Closure(QueryInput, callable(ValuesInterface): mixed): mixed
*/
final class WorkflowInstance extends Instance implements WorkflowInstanceInterface
final class WorkflowInstance extends Instance implements WorkflowInstanceInterface, Destroyable
{
/**
* @var array<non-empty-string, QueryHandler>
Expand Down Expand Up @@ -165,6 +165,14 @@ public function clearSignalQueue(): void
$this->signalQueue->clear();
}

public function destroy(): void
{
$this->signalQueue->clear();
$this->signalHandlers = [];
$this->queryHandlers = [];
unset($this->queryExecutor);
}

/**
* Make a Closure from a callable.
*
Expand Down
65 changes: 65 additions & 0 deletions src/Internal/Support/GarbageCollector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php

/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Temporal\Internal\Support;

/**
* Garbage collector that might be called after some number of ticks or after certain timeout.
* @internal
*/
final class GarbageCollector
{
/** @var positive-int Last time when GC was called. */
private int $lastTime;

/** @var int<0, max> Number of ticks since last GC. */
private int $counter = 0;

/**
* @param positive-int $threshold Number of ticks before GC will be called.
* @param int<0, max> $timeout Timeout in seconds.
* @param positive-int|null $lastTime Start point for timeout.
*/
public function __construct(
private readonly int $threshold,
private readonly int $timeout,
?int $lastTime = null,
) {
$this->lastTime = $lastTime ?? \time();
}

/**
* Check if GC should be called.
*/
public function check(): bool
{
if (++$this->counter >= $this->threshold) {
return true;
}

if (\time() - $this->lastTime > $this->timeout) {
return true;
}

return false;
}

/**
* Call GC.
*/
public function collect(): void
{
\gc_collect_cycles();

$this->lastTime = \time();
$this->counter = 0;
}
}
41 changes: 30 additions & 11 deletions src/Internal/Transport/Router/DestroyWorkflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,28 @@
use React\Promise\Deferred;
use Temporal\DataConverter\EncodedValues;
use Temporal\Exception\DestructMemorizedInstanceException;
use Temporal\Internal\Support\GarbageCollector;
use Temporal\Internal\Workflow\Process\Process;
use Temporal\Internal\Workflow\ProcessCollection;
use Temporal\Worker\LoopInterface;
use Temporal\Worker\Transport\Command\ServerRequestInterface;

class DestroyWorkflow extends WorkflowProcessAwareRoute
{
/**
* @var string
*/
private const ERROR_PROCESS_NOT_DEFINED = 'Unable to kill workflow because workflow process #%s was not found';
/** Maximum number of ticks before GC call. */
private const GC_THRESHOLD = 1000;
/** Interval between GC calls in seconds. */
private const GC_TIMEOUT_SECONDS = 30;

private GarbageCollector $gc;

public function __construct(
ProcessCollection $running,
protected LoopInterface $loop
) {
$this->gc = new GarbageCollector(self::GC_THRESHOLD, self::GC_TIMEOUT_SECONDS);
parent::__construct($running);
}

/**
* {@inheritDoc}
Expand All @@ -32,8 +45,6 @@ public function handle(ServerRequestInterface $request, array $headers, Deferred
$this->kill($request->getID());

$resolver->resolve(EncodedValues::fromValues([null]));

\gc_collect_cycles();
}

/**
Expand All @@ -43,13 +54,21 @@ public function handle(ServerRequestInterface $request, array $headers, Deferred
public function kill(string $runId): array
{
/** @var Process $process */
$process = $this->running->find($runId);
if ($process === null) {
throw new \InvalidArgumentException(\sprintf(self::ERROR_PROCESS_NOT_DEFINED, $runId));
}
$process = $this->running
->pull($runId, "Unable to kill workflow because workflow process #$runId was not found");

$this->running->pull($runId);
$process->cancel(new DestructMemorizedInstanceException());
$this->loop->once(
LoopInterface::ON_FINALLY,
function () use ($process) {
$process->destroy();

// Collect garbage if needed
if ($this->gc->check()) {
$this->gc->collect();
}
},
);

return [];
}
Expand Down
11 changes: 3 additions & 8 deletions src/Internal/Transport/Router/WorkflowProcessAwareRoute.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,12 @@ abstract class WorkflowProcessAwareRoute extends Route
*/
private const ERROR_PROCESS_NOT_FOUND = 'Workflow with the specified run identifier "%s" not found';

/**
* @var RepositoryInterface
*/
protected RepositoryInterface $running;

/**
* @param RepositoryInterface $running
*/
public function __construct(RepositoryInterface $running)
{
$this->running = $running;
public function __construct(
protected RepositoryInterface $running
) {
}

/**
Expand Down
12 changes: 10 additions & 2 deletions src/Internal/Workflow/Process/Scope.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Temporal\Exception\Failure\CanceledFailure;
use Temporal\Exception\Failure\TemporalFailure;
use Temporal\Exception\InvalidArgumentException;
use Temporal\Internal\Declaration\Destroyable;
use Temporal\Internal\ServiceContainer;
use Temporal\Internal\Transport\Request\Cancel;
use Temporal\Internal\Workflow\ScopeContext;
Expand All @@ -36,7 +37,7 @@
* @psalm-internal Temporal\Internal
* @implements CancellationScopeInterface<mixed>
*/
class Scope implements CancellationScopeInterface
class Scope implements CancellationScopeInterface, Destroyable
{
/**
* @var ServiceContainer
Expand Down Expand Up @@ -245,8 +246,8 @@ public function cancel(\Throwable $reason = null): void

foreach ($this->onCancel as $i => $handler) {
$this->makeCurrent();
$handler($reason);
unset($this->onCancel[$i]);
$handler($reason);
}
}

Expand Down Expand Up @@ -596,4 +597,11 @@ private function defer(\Closure $tick)

return $listener;
}

public function destroy(): void
{
$this->scopeContext->destroy();
$this->context->destroy();
unset($this->coroutine);
}
}
17 changes: 6 additions & 11 deletions src/Internal/Workflow/ProcessCollection.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
*/
class ProcessCollection extends ArrayRepository
{
/**
* @var string
*/
private const ERROR_PROCESS_NOT_DEFINED = 'Unable to kill workflow because workflow process #%s was not found';
private const ERROR_PROCESS_NOT_FOUND = 'Process #%s not found.';

public function __construct()
{
Expand All @@ -32,16 +29,14 @@ public function __construct()

/**
* @param string $runId
* @param non-empty-string|null $error Error message if the process was not found.
* @return Process
*/
public function pull(string $runId): Process
public function pull(string $runId, ?string $error = null): Process
{
/** @var Process $process */
$process = $this->find($runId);

if ($process === null) {
throw new \InvalidArgumentException(\sprintf(self::ERROR_PROCESS_NOT_DEFINED, $runId));
}
$process = $this->find($runId) ?? throw new \InvalidArgumentException(
$error ?? \sprintf(self::ERROR_PROCESS_NOT_FOUND, $runId),
);

$this->remove($runId);

Expand Down
Loading

0 comments on commit ba2d5c3

Please sign in to comment.