Skip to content

Commit

Permalink
Merge pull request #32 from gyselroth/dev
Browse files Browse the repository at this point in the history
v3.2.0
  • Loading branch information
raffis authored Dec 3, 2019
2 parents bd87c39 + 71f227d commit 12d6243
Show file tree
Hide file tree
Showing 17 changed files with 459 additions and 64 deletions.
2 changes: 2 additions & 0 deletions .githooks/pre-commit
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/sh
./vendor/bin/php-cs-fixer fix --config=.php_cs.dist -v
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
language: php
php:
- '7.2'
- '7.3'
- '7.4'
before_install:
- echo "extension = mongodb.so" >> ~/.phpenv/versions/$(phpenv version-name)/etc/php.ini
sudo: false
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
## 3.2.0
**Maintainer**: Raffael Sahli <[email protected]>\
**Date**: Mon Mar 25 16:14:33 CET 2018

* [FEATURE] Add event bindings in the Process handler #26
* [FEATURE] Add event callback bindings to wait(), waitFor() #28
* [FEATURE] Progress support #29


## 3.1.0
**Maintainer**: Raffael Sahli <[email protected]>\
**Date**: Mon Mar 25 16:14:33 CET 2018
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ PHP_UNITTEST_FILES = $(call macro_find_phpfiles,$(TESTS_DIR))
.PHONY: all
all: build

init:
git config core.hooksPath .githooks

.PHONY: clean
clean: mostlyclean
@-test ! -d $(VENDOR_DIR) || rm -rfv $(VENDOR_DIR)/*
Expand Down
106 changes: 105 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ This brings a real world implementation for parallel process management to PHP.
* Retry and intervals
* Schedule tasks at specific times
* Signal management
* Intercept events
* Progress support

## v3

Expand Down Expand Up @@ -52,8 +54,10 @@ The documentation for v2 is available [here](https://github.com/gyselroth/mongod
* [Cancel job](#cancel-job)
* [Modify jobs](#modify-jobs)
* [Handling of failed jobs](#handling-of-failed-jobs)
* [Job progess](#job-progress)
* [Asynchronous programming](#asynchronous-programming)
* [Listen for Events](#listen-for-events)
* [Bind events](#bind-events)
* [Advanced job options](#advanced-job-options)
* [Add job if not exists](#add-job-if-not-exists)
* [Advanced worker manager options](#advanced-worker-manager-options)
Expand Down Expand Up @@ -81,7 +85,7 @@ A job may be rescheduled if it failed. There are lots of more features available
* PHP mongodb extension
* PHP sysvmsg extension

>**Note**: This library will only work on \*nix system. There is no windows support and there will most likely never be.
>**Note**: This library will only work on \*nix systems. There is no windows support and there will most likely never be.

## Download
Expand Down Expand Up @@ -360,6 +364,49 @@ $scheduler->addJob(MailJob::class, $mail->toString(), [

This will queue our mail to be executed in one hour from now and it will re-schedule the job up to three times if it fails with an interval of one minute.

### Job progress

TaskScheduler has built-in support to update the progress of a job from your job implementation.
By default a job starts at `0` (%) and ends with progress `100` (%). Note that the progress is floating number.
You may increase the progress made within your job.

Let us have a look how this works with a job which copies a file from a to b.

```php
class CopyFileJob extends TaskScheduler\AbstractJob
{
/**
* {@inheritdoc}
*/
public function start(): bool
{
$source = $this->data['source'];
$dest = $this->data['destination'];

$size = filesize($source);
$f = fopen($source, 'r');
$t = fopen($dest, 'w');
$read = 0;

while($chunk = fread($f, 4096)) {
$read += fwrite($t, $chunk);
$this->updateProgress($read/$size*100);
}
}
}
```

The current progress may be available using the process interface:

```php
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$p = $scheduler->getJob(MongoDB\BSON\ObjectId('5b3cbc39e26cf26c6d0ede69'));
$p->getProgress();
```

>**Note** There is a rate limit for the progress updates which is by default 500ms. You may change the rate limit by configuring the `TaskScheduler::OPTION_PROGRESS_RATE_LIMIT` to something else and to 0
if you do not want a rate limit at all.

### Asynchronous programming

Have a look at this example:
Expand Down Expand Up @@ -440,6 +487,63 @@ $jobs = $scheduler->listen(function(TaskScheduler\Process $process) {

>**Note**: listen() is a blocking call, you may exit the listener and continue with main() if you return a boolean `true` in the listener callback.
### Bind events
Besides the simple listener method for the Scheduler you may bind event listeneres to your `TaskScheduler\Queue` and/or `TaskScheduler\Scheduler`.

For example:
```php
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$stack = [];
$stack[] = $scheduler->addJob(MyTask::class, 'foobar');
$stack[] = $scheduler->addJob(MyTask::class, 'barfoo');
$stack[] = $scheduler->addJob(OtherTask::class, 'barefoot');

$scheduler->on('waiting', function(League\Event\Event $e, TaskScheduler\Process $p) {
echo 'job '.$p->getId().' is waiting';
})->on('done', function(League\Event\Event $e, TaskScheduler\Process $p) {
echo 'job '.$p->getId().' is finished';
})->on('*', function(League\Event\Event $e, TaskScheduler\Process $p) {
echo 'job '.$p->getId().' is '.$p->getStats();
});

$scheduler->waitFor($stack);
```

>**Note**: You need to to bind your listeneres before calling `Scheduler::waitFor()` since that is a synchronous blocking call.

You may bind listeneres to the same events in your queue nodes:

```php
$queue = new TaskScheduler\Queue($scheduler, $mongodb, $worker_factory, $logger);

$queue->on('timeout', function(League\Event\Event $e, TaskScheduler\Process $p) {
echo 'job '.$p->getId().' is timed out';
})->on('*', function(League\Event\Event $e, TaskScheduler\Process $p) {
echo 'job '.$p->getId().' is '.$p->getStats();
});

$queue->process();
```

>**Note**: You need to to bind your listeneres before calling `Queue::process()` since that is a synchronous blocking call.

#### Custom event emitter

Under the hood both `TaskScheduler\Queue` and `TaskScheduler\Scheduler` use `League\Event\Emitter` as event emitter.
You may create both instances with your own Leage Event emitter instance:

```php
$emitter = new League\Event\Emitter();

//Queue
$queue = new TaskScheduler\Queue($scheduler, $mongodb, $worker_factory, $logger, $emitter);

//Scheduler
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger, [], $emitter);
```

### Advanced job options
TaskScheduler\Scheduler::addJob()/TaskScheduler\Scheduler::addJobOnce() also accept a third option (options) which let you set more advanced options for the job:

Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
"ext-pcntl": "*",
"ext-posix": "*",
"ext-sysvmsg": "*",
"mongodb/mongodb": "^1.4.0"
"mongodb/mongodb": "^1.4.0",
"league/event": "^2.2"
},
"repositories": [
{
Expand Down
25 changes: 25 additions & 0 deletions src/AbstractJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ abstract class AbstractJob implements JobInterface
**/
protected $data;

/**
* Scheduler
*
* @var Scheduler
*/
protected $scheduler;

/**
* Job ID.
*
Expand Down Expand Up @@ -65,4 +72,22 @@ public function getId(): ObjectId
{
return $this->id;
}

/**
* {@inheritdoc}
*/
public function setScheduler(Scheduler $scheduler): JobInterface
{
$this->scheduler = $scheduler;
return $this;
}

/**
* {@inheritdoc}
*/
public function updateProgress(float $progress): JobInterface
{
$this->scheduler->updateJobProgress($this, $progress);
return $this;
}
}
48 changes: 48 additions & 0 deletions src/EventsTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php

declare(strict_types=1);

/**
* TaskScheduler
*
* @author Raffael Sahli <[email protected]>
* @copyright Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com)
* @license MIT https://opensource.org/licenses/MIT
*/

namespace TaskScheduler;

use Closure;
use League\Event\Emitter;

trait EventsTrait
{
/**
* Emitter
*
* @var Emitter
*/
protected $emitter;

/**
* Bind event listener
*/
public function on(string $event, Closure $handler)
{
if(!in_array($event, Scheduler::VALID_EVENTS) || $event === '*') {
$event = 'taskscheduler.on'.ucfirst($event);
}

$this->emitter->addListener($event, $handler);
return $this;
}

/**
* Emit process event
*/
protected function emit(Process $process): bool
{
$this->emitter->emit(Scheduler::VALID_EVENTS[$process->getStatus()], $process);
return true;
}
}
17 changes: 17 additions & 0 deletions src/Exception/LogicException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

/**
* TaskScheduler
*
* @author Raffael Sahli <[email protected]>
* @copyright Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com)
* @license MIT https://opensource.org/licenses/MIT
*/

namespace TaskScheduler\Exception;

class LogicException extends \LogicException
{
}
12 changes: 11 additions & 1 deletion src/JobInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ interface JobInterface
];

/**
* Get job data.
* Set job data.
*/
public function setData($data): self;

Expand All @@ -56,6 +56,16 @@ public function setId(ObjectId $id): self;
*/
public function getId(): ObjectId;

/**
* Set scheduler.
*/
public function setScheduler(Scheduler $scheduler): self;

/**
* Update job progress.
*/
public function updateProgress(float $progress): self;

/**
* Start job.
*/
Expand Down
Loading

0 comments on commit 12d6243

Please sign in to comment.