diff --git a/composer.json b/composer.json index 82d56dd..26ebcbf 100644 --- a/composer.json +++ b/composer.json @@ -17,7 +17,7 @@ "ext-iconv": "*", "ext-openswoole": "*", "amphp/amp": "^3.0", - "darkwood/flow": "^1.2", + "darkwood/flow": "dev-1.x-dev", "doctrine/dbal": "^3", "doctrine/doctrine-bundle": "^2.12", "doctrine/doctrine-migrations-bundle": "^3.3", diff --git a/composer.lock b/composer.lock index 6cd103d..5a6c6d0 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "d18a634d20ce822b42c53c3793af5ee3", + "content-hash": "54b0aa11a7d01327069ef09c89556155", "packages": [ { "name": "amphp/amp", @@ -170,16 +170,16 @@ }, { "name": "darkwood/flow", - "version": "v1.2.2", + "version": "dev-1.x-dev", "source": { "type": "git", "url": "https://github.com/darkwood-com/flow.git", - "reference": "af333ff8011c3d0fdac6282d9d3c62c9b8daa52d" + "reference": "2c0752cb8fb72792111eee858be9108107a45444" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/darkwood-com/flow/zipball/af333ff8011c3d0fdac6282d9d3c62c9b8daa52d", - "reference": "af333ff8011c3d0fdac6282d9d3c62c9b8daa52d", + "url": "https://api.github.com/repos/darkwood-com/flow/zipball/2c0752cb8fb72792111eee858be9108107a45444", + "reference": "2c0752cb8fb72792111eee858be9108107a45444", "shasum": "" }, "require": { @@ -232,7 +232,7 @@ ], "support": { "issues": "https://github.com/darkwood-com/flow/issues", - "source": "https://github.com/darkwood-com/flow/tree/v1.2.2" + "source": "https://github.com/darkwood-com/flow/tree/1.x-dev" }, "funding": [ { @@ -240,7 +240,7 @@ "type": "github" } ], - "time": "2024-08-30T17:02:58+00:00" + "time": "2024-09-03T23:39:27+00:00" }, { "name": "doctrine/cache", @@ -423,16 +423,16 @@ }, { "name": "doctrine/dbal", - "version": "3.9.0", + "version": "3.9.1", "source": { "type": "git", "url": "https://github.com/doctrine/dbal.git", - "reference": "d8f68ea6cc00912e5313237130b8c8decf4d28c6" + "reference": "d7dc08f98cba352b2bab5d32c5e58f7e745c11a7" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/doctrine/dbal/zipball/d8f68ea6cc00912e5313237130b8c8decf4d28c6", - "reference": "d8f68ea6cc00912e5313237130b8c8decf4d28c6", + "url": "https://api.github.com/repos/doctrine/dbal/zipball/d7dc08f98cba352b2bab5d32c5e58f7e745c11a7", + "reference": "d7dc08f98cba352b2bab5d32c5e58f7e745c11a7", "shasum": "" }, "require": { @@ -448,7 +448,7 @@ "doctrine/coding-standard": "12.0.0", "fig/log-test": "^1", "jetbrains/phpstorm-stubs": "2023.1", - "phpstan/phpstan": "1.11.7", + "phpstan/phpstan": "1.12.0", "phpstan/phpstan-strict-rules": "^1.6", "phpunit/phpunit": "9.6.20", "psalm/plugin-phpunit": "0.18.4", @@ -516,7 +516,7 @@ ], "support": { "issues": "https://github.com/doctrine/dbal/issues", - "source": "https://github.com/doctrine/dbal/tree/3.9.0" + "source": "https://github.com/doctrine/dbal/tree/3.9.1" }, "funding": [ { @@ -532,7 +532,7 @@ "type": "tidelift" } ], - "time": "2024-08-15T07:34:42+00:00" + "time": "2024-09-01T13:49:23+00:00" }, { "name": "doctrine/deprecations", @@ -583,16 +583,16 @@ }, { "name": "doctrine/doctrine-bundle", - "version": "2.12.0", + "version": "2.13.0", "source": { "type": "git", "url": "https://github.com/doctrine/DoctrineBundle.git", - "reference": "5418e811a14724068e95e0ba43353b903ada530f" + "reference": "ca59d84b8e63143ce1aed90cdb333ba329d71563" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/doctrine/DoctrineBundle/zipball/5418e811a14724068e95e0ba43353b903ada530f", - "reference": "5418e811a14724068e95e0ba43353b903ada530f", + "url": "https://api.github.com/repos/doctrine/DoctrineBundle/zipball/ca59d84b8e63143ce1aed90cdb333ba329d71563", + "reference": "ca59d84b8e63143ce1aed90cdb333ba329d71563", "shasum": "" }, "require": { @@ -683,7 +683,7 @@ ], "support": { "issues": "https://github.com/doctrine/DoctrineBundle/issues", - "source": "https://github.com/doctrine/DoctrineBundle/tree/2.12.0" + "source": "https://github.com/doctrine/DoctrineBundle/tree/2.13.0" }, "funding": [ { @@ -699,7 +699,7 @@ "type": "tidelift" } ], - "time": "2024-03-19T07:20:37+00:00" + "time": "2024-09-01T09:46:40+00:00" }, { "name": "doctrine/doctrine-migrations-bundle", @@ -8943,7 +8943,9 @@ ], "aliases": [], "minimum-stability": "stable", - "stability-flags": [], + "stability-flags": { + "darkwood/flow": 20 + }, "prefer-stable": true, "prefer-lowest": false, "platform": { diff --git a/src/Command/ScrapCommand.php b/src/Command/ScrapCommand.php new file mode 100644 index 0000000..5dbc777 --- /dev/null +++ b/src/Command/ScrapCommand.php @@ -0,0 +1,177 @@ +httpClient = $httpClient; + } + + /** + * Get user data including todos and posts. + * + * @param array $user The user data + * @param HttpClientInterface $httpClient The HTTP client for making requests + * + * @return array The user data with todos and posts added + */ + public function getUserData($user, HttpClientInterface $httpClient): array + { + $userId = $user['id']; + $todosUrl = "https://jsonplaceholder.typicode.com/users/{$userId}/todos"; + $postsUrl = "https://jsonplaceholder.typicode.com/users/{$userId}/posts"; + + $responses = [ + 'todos' => $httpClient->request('GET', $todosUrl), + 'posts' => $httpClient->request('GET', $postsUrl), + ]; + + Fiber::suspend(); + + $todos = $responses['todos']->toArray(); + $posts = $responses['posts']->toArray(); + + $user['todos'] = $todos; + $user['posts'] = $posts; + + return $user; + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $io = new SymfonyStyle($input, $output); + + $driver = new FiberDriver(); + + $flow = Flow::do(function () use ($io) { + yield new ScrapUrlsJob(); + yield static function (array $urlDatas) use ($io) { + $io->writeln(sprintf('ScrapUrlsJob : Finished scrapping %d urls', count($urlDatas))); + + return $urlDatas; + }; + yield [new ScrapUrlJob(), null, new FlattenIpStrategy()]; + yield static function (UrlContent $urlData) use ($io) { + $io->writeln(sprintf('ScrapUrlJob : Finished scrapping %s', $urlData->url)); + }; + + yield static fn () => [null, []]; + yield new YJob(function ($rec) { + return function ($data) use ($rec) { + [$i, $users] = $data; + if ($i === null) { + $response = $this->httpClient->request('GET', 'https://jsonplaceholder.typicode.com/users'); + Fiber::suspend(); + $users = $response->toArray(); + + return $rec([0, $users]); + } + if ($i >= 0 && $i < count($users)) { + $users[$i] = $this->getUserData($users[$i], $this->httpClient); + + return $rec([$i + 1, $users]); + } + + return $users; + }; + }); + yield static function ($users) use ($io) { + $io->writeln(sprintf('ScrapYJob : Finished scrapping %d', count($users))); + }; + + yield static fn () => [null, []]; + yield [new YJob(function ($rec) { + return function ($args) use ($rec) { + [$data, $defer] = $args; + + return $defer(function ($complete, $async) use ($data, $defer, $rec) { + [$i, $users] = $data; + if ($i === null) { + $response = $this->httpClient->request('GET', 'https://jsonplaceholder.typicode.com/users'); + Fiber::suspend(); + $users = $response->toArray(); + + $async($rec([[0, $users], $defer]), static function ($result) use ($complete) { + $complete($result); + }); + } elseif ($i >= 0 && $i < count($users)) { + $users[$i] = $this->getUserData($users[$i], $this->httpClient); + + $async($rec([[$i + 1, $users], $defer]), static function ($result) use ($complete) { + $complete($result); + }); + } else { + $complete([$users, $defer]); + } + }); + }; + }), null, null, null, new DeferAsyncHandler()]; + yield static function ($users) use ($io) { + $io->writeln(sprintf('ScrapYDeferJob : Finished scrapping %d', count($users))); + }; + }, ['driver' => $driver]); + + $flow(new Ip([ + new UrlContent('https://www.google.fr'), + new UrlContent('https://www.apple.com'), + new UrlContent('https://www.microsoft.com'), + new UrlContent('https://www.amazon.com'), + new UrlContent('https://www.facebook.com'), + new UrlContent('https://www.netflix.com'), + new UrlContent('https://www.spotify.com'), + new UrlContent('https://www.wikipedia.org'), + new UrlContent('https://www.x.com'), + new UrlContent('https://www.instagram.com'), + new UrlContent('https://www.linkedin.com'), + new UrlContent('https://www.reddit.com'), + new UrlContent('https://www.ebay.com'), + new UrlContent('https://www.cnn.com'), + new UrlContent('https://www.bbc.co.uk'), + new UrlContent('https://www.yahoo.com'), + new UrlContent('https://www.bing.com'), + new UrlContent('https://www.pinterest.com'), + new UrlContent('https://www.tumblr.com'), + new UrlContent('https://www.paypal.com'), + new UrlContent('https://www.dropbox.com'), + new UrlContent('https://www.adobe.com'), + new UrlContent('https://www.salesforce.com'), + ])); + + $flow->await(); + + $io->success('Scraping is done.'); + + return Command::SUCCESS; + } +} diff --git a/src/IpStrategy/FlattenIpStrategy.php b/src/IpStrategy/FlattenIpStrategy.php new file mode 100644 index 0000000..f9dae7b --- /dev/null +++ b/src/IpStrategy/FlattenIpStrategy.php @@ -0,0 +1,71 @@ + + */ +class FlattenIpStrategy implements IpStrategyInterface +{ + /** + * @var IpPool + */ + private IpPool $ipPool; + + public function __construct() + { + $this->ipPool = new IpPool(); + } + + public static function getSubscribedEvents(): array + { + return [ + Event::PUSH => 'push', + Event::PULL => 'pull', + Event::POOL => 'pool', + ]; + } + + /** + * @param PushEvent $event + */ + public function push(PushEvent $event): void + { + $ip = $event->getIp(); + if (!is_iterable($ip->data)) { + throw new LogicException('Ip data must be iterable'); + } + foreach ($ip->data as $data) { + $this->ipPool->addIp(new Ip($data)); + } + } + + /** + * @param PullEvent $event + */ + public function pull(PullEvent $event): void + { + $ip = $this->ipPool->shiftIp(); + if ($ip !== null) { + $event->addIp($ip); + } + } + + public function pool(PoolEvent $event): void + { + $event->addIps($this->ipPool->getIps()); + } +} diff --git a/src/Job/ScrapUrlJob.php b/src/Job/ScrapUrlJob.php new file mode 100644 index 0000000..2eba5f5 --- /dev/null +++ b/src/Job/ScrapUrlJob.php @@ -0,0 +1,55 @@ + + */ +class ScrapUrlJob implements JobInterface +{ + private CurlMultiHandle $mh; + + public function __construct() + { + // Initialize a cURL multi handle + $this->mh = curl_multi_init(); + } + + public function __destruct() + { + curl_multi_close($this->mh); + } + + public function __invoke($urlContent): mixed + { + $ch = curl_init(); + curl_setopt($ch, CURLOPT_URL, $urlContent->url); + curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); + curl_multi_add_handle($this->mh, $ch); + + do { + $status = curl_multi_exec($this->mh, $active); + curl_multi_exec($this->mh, $active); + + Fiber::suspend(); + + $info = curl_multi_info_read($this->mh); + } while ( + $active && $status === CURLM_OK // check curl_multi is active + && !($info !== false && $info['handle'] === $ch && $info['result'] === CURLE_OK) // check $ch is done + ); + + $content = curl_multi_getcontent($ch); + curl_multi_remove_handle($this->mh, $ch); + curl_close($ch); + + return new UrlContent($urlContent->url, $content); + } +} diff --git a/src/Job/ScrapUrlsJob.php b/src/Job/ScrapUrlsJob.php new file mode 100644 index 0000000..2d5d6a6 --- /dev/null +++ b/src/Job/ScrapUrlsJob.php @@ -0,0 +1,56 @@ +, array> + */ +class ScrapUrlsJob implements JobInterface +{ + public function __invoke($urlContents): array + { + // Initialize a cURL multi handle + $mh = curl_multi_init(); + + // Array to hold individual cURL handles + $curl_handles = []; + + // Initialize individual cURL handles and add them to the multi handle + foreach ($urlContents as $urlContent) { + $ch = curl_init(); + curl_setopt($ch, CURLOPT_URL, $urlContent->url); + curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); + curl_multi_add_handle($mh, $ch); + $curl_handles[] = [$ch, $urlContent]; + } + + // Execute the multi handle + $running = null; + do { + curl_multi_exec($mh, $running); + + Fiber::suspend(); + } while ($running > 0); + + // Collect the content from each handle + $urlContents = []; + foreach ($curl_handles as $curl_handle) { + [$ch, $urlContent] = $curl_handle; + $urlContent->content = curl_multi_getcontent($ch); + $urlContents[] = $urlContent; + curl_multi_remove_handle($mh, $ch); + curl_close($ch); + } + + // Close the multi handle + curl_multi_close($mh); + + return $urlContents; + } +} diff --git a/src/Model/UrlContent.php b/src/Model/UrlContent.php new file mode 100644 index 0000000..09bf2bc --- /dev/null +++ b/src/Model/UrlContent.php @@ -0,0 +1,10 @@ +