Skip to content

Commit

Permalink
more reworking to pass/handle some otherwise custom routines that thi…
Browse files Browse the repository at this point in the history
…rd partyi intergration would need
  • Loading branch information
TheTechsTech committed Oct 11, 2019
1 parent 7e09cd8 commit 38d6b58
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 13 deletions.
5 changes: 4 additions & 1 deletion Coroutine/Coroutine.php
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public function shutdown()
}
}

public function cancelTask(int $tid)
public function cancelTask(int $tid, $customState = null)
{
if (!isset($this->taskMap[$tid])) {
return false;
Expand All @@ -143,6 +143,9 @@ public function cancelTask(int $tid)
if (\is_object($object) && \method_exists($object, 'close'))
$object->close();

if (!empty($customState))
$task->customState($customState);

$task->setState('cancelled');
unset($this->taskQueue[$i]);
break;
Expand Down
6 changes: 4 additions & 2 deletions Coroutine/CoroutineInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ public function schedule(TaskInterface $task);
public function shutdown();

/**
* kill/remove an task using task id
* kill/remove an task using task id,
* optionally pass custom cancel state for third party code integration.
*
* @param int $tid
* @param mixed $customState
* @return bool
*/
public function cancelTask(int $tid);
public function cancelTask(int $tid, $customState = null);

/**
* Process/walk the task queue and execute the tasks.
Expand Down
29 changes: 19 additions & 10 deletions Coroutine/Kernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -144,20 +144,24 @@ function(TaskInterface $task, CoroutineInterface $coroutine) use ($channel, $mes
}

/**
* kill/remove an task using task id
* kill/remove an task using task id,
* optionally pass custom cancel state and error message for third party code integration.
*
* @param int $tid
* @param mixed $customState
* @param string $errorMessage
*
* @throws \InvalidArgumentException
*/
public static function cancelTask($tid)
public static function cancelTask($tid, $customState = null, string $errorMessage = 'Invalid task ID!')
{
return new Kernel(
function(TaskInterface $task, CoroutineInterface $coroutine) use ($tid) {
if ($coroutine->cancelTask($tid)) {
function(TaskInterface $task, CoroutineInterface $coroutine) use ($tid, $customState, $errorMessage) {
if ($coroutine->cancelTask($tid, $customState)) {
$task->sendValue(true);
$coroutine->schedule($task);
} else {
throw new \InvalidArgumentException('Invalid task ID!');
throw new \InvalidArgumentException($errorMessage);
}
}
);
Expand All @@ -180,7 +184,7 @@ function(TaskInterface $task, CoroutineInterface $coroutine) {

/**
* Wait on read stream/socket to be ready read from,
* optionally schedule current task to execute immediately/next.
* optionally schedule current task to execute immediately/next for third party code integration.
*
* @param resource $streamSocket
* @param bool $immediately
Expand All @@ -199,7 +203,7 @@ function(TaskInterface $task, CoroutineInterface $coroutine) use ($streamSocket,

/**
* Wait on write stream/socket to be ready to be written to,
* optionally schedule current task to execute immediately/next.
* optionally schedule current task to execute immediately/next for third party code integration.
*
* @param resource $streamSocket
* @param bool $immediately
Expand Down Expand Up @@ -333,6 +337,7 @@ function(TaskInterface $task, CoroutineInterface $coroutine) use ($taskId) {
$completeList = $coroutine->completedList();
$countComplete = \count($completeList);
$gatherCompleteCount = 0;

if ($countComplete > 0) {
foreach($completeList as $id => $tasks) {
if (isset($taskIdList[$id])) {
Expand All @@ -355,7 +360,8 @@ function(TaskInterface $task, CoroutineInterface $coroutine) use ($taskId) {
} elseif ($gatherCompleteCount == $gatherCount) {
$count = 0;
}
}
}

while ($count > 0) {
foreach($taskIdList as $id) {
if (isset($taskList[$id])) {
Expand All @@ -377,7 +383,8 @@ function(TaskInterface $task, CoroutineInterface $coroutine) use ($taskId) {

if ($tasks->process()) {
$coroutine->execute();
}
}

} elseif ($tasks->pending() || $tasks->rescheduled()) {
if ($tasks->pending() && $tasks->isCustomState(true)) {
$tasks->customState();
Expand All @@ -396,7 +403,8 @@ function(TaskInterface $task, CoroutineInterface $coroutine) use ($taskId) {
$subCount--;
if ($subCount == 0)
break;
}
}

} elseif ($tasks->erred() || $tasks->cancelled()) {
$exception = $tasks->cancelled() ? new CancelledError() : $tasks->exception();
$count--;
Expand Down Expand Up @@ -494,6 +502,7 @@ public static function async(string $labelFunction, callable $asyncFunction)
* @param Generator|callable $asyncLabel
* @param mixed $args - if `generator`, $args can hold `customState`, and `customData`
* - if `customData` is object, and has `setId` method, store the $task id.
* - for third party code integration.
*
* @return int $task id
*/
Expand Down

0 comments on commit 38d6b58

Please sign in to comment.