From b8805be09df0fe969416e7f0c789654642498022 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= Date: Mon, 18 Nov 2024 23:08:54 +0100 Subject: [PATCH 01/12] [Data Liberation] Make WP_Stream_Importer re-entrant Exploratory PR to keep track of the import state so that, upon crash, the next run may seamlessly resume where the previous one left off. --- .../playground/data-liberation/bootstrap.php | 1 - .../playground/data-liberation/plugin.php | 17 +- .../src/import/WP_Attachment_Downloader.php | 130 ------ .../src/import/WP_Stream_Importer.php | 370 ++++++++++++++---- 4 files changed, 288 insertions(+), 230 deletions(-) delete mode 100644 packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php diff --git a/packages/playground/data-liberation/bootstrap.php b/packages/playground/data-liberation/bootstrap.php index b99b02fd31..e65d26aa0a 100644 --- a/packages/playground/data-liberation/bootstrap.php +++ b/packages/playground/data-liberation/bootstrap.php @@ -51,7 +51,6 @@ require_once __DIR__ . '/src/import/WP_File_Visitor.php'; require_once __DIR__ . '/src/import/WP_File_Visitor_Event.php'; require_once __DIR__ . '/src/import/WP_Imported_Entity.php'; -require_once __DIR__ . '/src/import/WP_Attachment_Downloader.php'; require_once __DIR__ . '/src/import/WP_Stream_Importer.php'; require_once __DIR__ . '/src/import/WP_Markdown_Importer.php'; diff --git a/packages/playground/data-liberation/plugin.php b/packages/playground/data-liberation/plugin.php index ecf88569fe..fc5acb9ea3 100644 --- a/packages/playground/data-liberation/plugin.php +++ b/packages/playground/data-liberation/plugin.php @@ -353,20 +353,9 @@ function data_liberation_process_import() { function data_liberation_import_step($import) { $importer = data_liberation_create_importer($import); - // @TODO: Save the last importer state so we can resume it later if interrupted. - update_option('data_liberation_import_progress', [ - 'status' => 'Downloading static assets...', - 'current' => 0, - 'total' => 0 - ]); - $importer->frontload_assets(); - // @TODO: Keep track of multiple progress dimensions – posts, assets, categories, etc. - update_option('data_liberation_import_progress', [ - 'status' => 'Importing posts...', - 'current' => 0, - 'total' => 0 - ]); - $importer->import_entities(); + while($importer->next_step()) { + // ...Twiddle our thumbs... + } delete_option('data_liberation_active_import'); // @TODO: Do not echo things. Append to an import log where we can retrace the steps. // Also, store specific import events in the database so the user can react and diff --git a/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php b/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php deleted file mode 100644 index a54ce96eda..0000000000 --- a/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php +++ /dev/null @@ -1,130 +0,0 @@ -client = new Client(); - $this->output_root = $output_root; - } - - public function enqueue_if_not_exists( $url, $output_path = null ) { - if ( null === $output_path ) { - // Use the path from the URL. - $parsed_url = parse_url( $url ); - if ( false === $parsed_url ) { - return false; - } - $output_path = $parsed_url['path']; - } - $output_path = $this->output_root . '/' . ltrim( $output_path, '/' ); - if ( file_exists( $output_path ) ) { - // @TODO: Reconsider the return value. The enqueuing operation failed, - // but overall already having a file seems like a success. - return true; - } - - $output_dir = dirname( $output_path ); - if ( ! file_exists( $output_dir ) ) { - // @TODO: think through the chmod of the created directory. - mkdir( $output_dir, 0777, true ); - } - - $protocol = parse_url( $url, PHP_URL_SCHEME ); - if ( null === $protocol ) { - return false; - } - - switch ( $protocol ) { - case 'file': - $local_path = parse_url( $url, PHP_URL_PATH ); - if ( false === $local_path ) { - return false; - } - // Just copy the file over. - // @TODO: think through the chmod of the created file. - return copy( $local_path, $output_path ); - case 'http': - case 'https': - $request = new Request( $url ); - $this->output_paths[ $request->id ] = $output_path; - $this->client->enqueue( $request ); - return true; - } - return false; - } - - public function queue_full() { - return count( $this->client->get_active_requests() ) >= 10; - } - - public function poll() { - if ( ! $this->client->await_next_event() ) { - return false; - } - $event = $this->client->get_event(); - $request = $this->client->get_request(); - // The request object we get from the client may be a redirect. - // Let's keep referring to the original request. - $original_request_id = $request->original_request()->id; - - switch ( $event ) { - case Client::EVENT_GOT_HEADERS: - if ( ! $request->is_redirected() ) { - $this->partial_files[ $original_request_id ] = $this->output_paths[ $original_request_id ] . '.partial'; - if ( file_exists( $this->partial_files[ $original_request_id ] ) ) { - unlink( $this->partial_files[ $original_request_id ] ); - } - $this->fps[ $original_request_id ] = fopen( $this->output_paths[ $original_request_id ] . '.partial', 'wb' ); - if ( false === $this->fps[ $original_request_id ] ) { - // @TODO: Log an error. - } - } - break; - case Client::EVENT_BODY_CHUNK_AVAILABLE: - $chunk = $this->client->get_response_body_chunk(); - if ( false === fwrite( $this->fps[ $original_request_id ], $chunk ) ) { - // @TODO: Log an error. - } - break; - case Client::EVENT_FAILED: - if ( isset( $this->fps[ $original_request_id ] ) ) { - fclose( $this->fps[ $original_request_id ] ); - } - if ( isset( $this->partial_files[ $original_request_id ] ) ) { - $partial_file = $this->output_root . '/' . $this->partial_files[ $original_request_id ] . '.partial'; - if ( file_exists( $partial_file ) ) { - unlink( $partial_file ); - } - } - unset( $this->output_paths[ $original_request_id ] ); - break; - case Client::EVENT_FINISHED: - if ( ! $request->is_redirected() ) { - // Only clean up if this was the last request in the chain. - if ( isset( $this->fps[ $original_request_id ] ) ) { - fclose( $this->fps[ $original_request_id ] ); - } - if ( isset( $this->output_paths[ $original_request_id ] ) && isset( $this->partial_files[ $original_request_id ] ) ) { - if ( false === rename( - $this->partial_files[ $original_request_id ], - $this->output_paths[ $original_request_id ] - ) ) { - // @TODO: Log an error. - } - } - unset( $this->partial_files[ $original_request_id ] ); - } - break; - } - - return true; - } -} diff --git a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php index 5258fb9cae..d55ec10193 100644 --- a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php +++ b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php @@ -1,4 +1,8 @@ state ) { + case self::STATE_INITIAL: + $this->state = self::STATE_TOPOLOGICAL_SORT; + return true; + case self::STATE_TOPOLOGICAL_SORT: + // @TODO: Topologically sort the entities. + $this->state = self::STATE_FRONTLOAD_ASSETS; + return true; + case self::STATE_FRONTLOAD_ASSETS: + $this->frontload_assets_step(); + return true; + case self::STATE_IMPORT_ENTITIES: + $this->import_entities_step(); + return true; + case self::STATE_FINISHED: + return false; + } + } + /** * Downloads all the assets referenced in the imported entities. * @@ -87,34 +134,23 @@ protected function __construct( * before import_entities() so that every inserted post already has * all its attachments downloaded. */ - public function frontload_assets() { - $factory = $this->entity_iterator_factory; - $entities = $factory(); - $this->downloader = new WP_Attachment_Downloader( $this->options['uploads_path'] ); - foreach ( $entities as $entity ) { - if ( $this->downloader->queue_full() ) { - $this->downloader->poll(); - continue; - } + public function frontload_assets_step() { + if ( null === $this->entities_iterator ) { + $factory = $this->entity_iterator_factory; + $this->entities_iterator = $factory(); + // @TODO: Seek to the last processed entity if we have a cursor. + $this->client = new \WordPress\AsyncHttp\Client(); + } - $data = $entity->get_data(); - if ( 'site_option' === $entity->get_type() && $data['option_name'] === 'home' ) { - $this->source_site_url = $data['option_value']; - } elseif ( 'post' === $entity->get_type() ) { - if ( isset( $data['post_type'] ) && $data['post_type'] === 'attachment' ) { - // Download media attachment entities. - $this->enqueue_attachment_download( - $data['attachment_url'] - ); - } elseif ( isset( $data['post_content'] ) ) { - $this->enqueue_attachments_referenced_in_post( - $data - ); - } - } + if ( ! $this->entities_iterator->valid() && ! $this->has_pending_requests() ) { + $this->state = self::STATE_IMPORT_ENTITIES; + $this->client = null; + $this->entities_iterator = null; + return; } - while ( $this->downloader->poll() ) { + $only_downloader_pending = ! $this->entities_iterator->valid() && $this->has_pending_requests(); + if ( $this->download_queue_full() || $only_downloader_pending ) { // Twiddle our thumbs as the downloader processes the requests... /** * @TODO: @@ -126,7 +162,45 @@ public function frontload_assets() { * might want to provide an "image not found" placeholder OR ignore the * failure. */ + $this->poll_attachments(); + /** + * @TODO: Update the download progress: + * * After every downloaded file. + * * For large files, every time a full megabyte is downloaded above 10MB. + */ + /** + * @TODO: Advance the cursor to the oldest successful download. For example: + * + * * We've started downloading files A, B, C, and D in this order. + * * D is the first to finish. We don't do anything yet. + * * A finishes next. We advance the cursor to A. + * * C finishes next. We don't do anything. + * * Then we pause. + * + * When we resume, we'll start where we left off, which is after A. The + * downloader will enqueue B for download and will skip C and D since + * the relevant files already exist in the filesystem. + */ + return; } + + $entity = $this->entities_iterator->current(); + $data = $entity->get_data(); + if ( 'site_option' === $entity->get_type() && $data['option_name'] === 'home' ) { + $this->source_site_url = $data['option_value']; + } elseif ( 'post' === $entity->get_type() ) { + if ( isset( $data['post_type'] ) && $data['post_type'] === 'attachment' ) { + // Download media attachment entities. + $this->enqueue_attachment_download( + $data['attachment_url'] + ); + } elseif ( isset( $data['post_content'] ) ) { + $this->enqueue_attachments_referenced_in_post( + $data + ); + } + } + $this->entities_iterator->next(); } /** @@ -136,55 +210,77 @@ public function frontload_assets() { * large datasets, but maybe it could be a choice for * the API consumer? */ - public function import_entities() { - $importer = new WP_Entity_Importer(); - $factory = $this->entity_iterator_factory; - $entities = $factory(); - foreach ( $entities as $entity ) { - $attachments = array(); - // Rewrite the URLs in the post. - switch ( $entity->get_type() ) { - case 'post': - $data = $entity->get_data(); - foreach ( array( 'guid', 'post_content', 'post_excerpt' ) as $key ) { - if ( ! isset( $data[ $key ] ) ) { - continue; - } - $p = new WP_Block_Markup_Url_Processor( $data[ $key ], $this->source_site_url ); - while ( $p->next_url() ) { - if ( $this->url_processor_matched_asset_url( $p ) ) { - $filename = $this->new_asset_filename( $p->get_raw_url() ); - $new_asset_url = $this->options['uploads_url'] . '/' . $filename; - $p->replace_base_url( WP_URL::parse( $new_asset_url ) ); - $attachments[] = $new_asset_url; - /** - * @TODO: How would we know a specific image block refers to a specific - * attachment? We need to cross-correlate that to rewrite the URL. - * The image block could have query parameters, too, but presumably the - * path would be the same at least? What if the same file is referred - * to by two different URLs? e.g. assets.site.com and site.com/assets/ ? - * A few ideas: GUID, block attributes, fuzzy matching. Maybe a configurable - * strategy? And the API consumer would make the decision? - */ - } elseif ( $this->source_site_url && - $p->get_parsed_url() && - url_matches( $p->get_parsed_url(), $this->source_site_url ) - ) { - $p->replace_base_url( WP_URL::parse( $this->options['new_site_url'] ) ); - } else { - // Ignore other URLs. - } + public function import_entities_step() { + if ( null === $this->entities_iterator ) { + $factory = $this->entity_iterator_factory; + $this->entities_iterator = $factory(); + $this->importer = new WP_Entity_Importer(); + // @TODO: Seek to the last processed entity if we have a cursor. + } + + if ( ! $this->entities_iterator->valid() ) { + // We're done. + $this->state = self::STATE_FINISHED; + $this->entities_iterator = null; + $this->importer = null; + return; + } + + $entity = $this->entities_iterator->current(); + + $attachments = array(); + // Rewrite the URLs in the post. + switch ( $entity->get_type() ) { + case 'post': + $data = $entity->get_data(); + foreach ( array( 'guid', 'post_content', 'post_excerpt' ) as $key ) { + if ( ! isset( $data[ $key ] ) ) { + continue; + } + $p = new WP_Block_Markup_Url_Processor( $data[ $key ], $this->source_site_url ); + while ( $p->next_url() ) { + if ( $this->url_processor_matched_asset_url( $p ) ) { + $filename = $this->new_asset_filename( $p->get_raw_url() ); + $new_asset_url = $this->options['uploads_url'] . '/' . $filename; + $p->replace_base_url( WP_URL::parse( $new_asset_url ) ); + $attachments[] = $new_asset_url; + /** + * @TODO: How would we know a specific image block refers to a specific + * attachment? We need to cross-correlate that to rewrite the URL. + * The image block could have query parameters, too, but presumably the + * path would be the same at least? What if the same file is referred + * to by two different URLs? e.g. assets.site.com and site.com/assets/ ? + * A few ideas: GUID, block attributes, fuzzy matching. Maybe a configurable + * strategy? And the API consumer would make the decision? + */ + } elseif ( $this->source_site_url && + $p->get_parsed_url() && + url_matches( $p->get_parsed_url(), $this->source_site_url ) + ) { + $p->replace_base_url( WP_URL::parse( $this->options['new_site_url'] ) ); + } else { + // Ignore other URLs. } - $data[ $key ] = $p->get_updated_html(); } - $entity->set_data( $data ); - break; - } - $post_id = $importer->import_entity( $entity ); - foreach ( $attachments as $filepath ) { - $importer->import_attachment( $filepath, $post_id ); - } + $data[ $key ] = $p->get_updated_html(); + } + $entity->set_data( $data ); + break; } + + // @TODO: Monitor failures. + $post_id = $this->importer->import_entity( $entity ); + foreach ( $attachments as $filepath ) { + // @TODO: Monitor failures. + $this->importer->import_attachment( $filepath, $post_id ); + } + + /** + * @TODO: Advance the cursor to the next entity. + * @TODO: Update the progress information. + */ + + $this->entities_iterator->next(); } /** @@ -264,20 +360,49 @@ protected function enqueue_attachments_referenced_in_post( $post ) { } protected function enqueue_attachment_download( string $raw_url, $context_path = null ) { - $new_filename = $this->new_asset_filename( $raw_url ); - $downloadable_url = $this->rewrite_attachment_url( $raw_url, $context_path ); - $success = $this->downloader->enqueue_if_not_exists( - $downloadable_url, - $new_filename - ); - if ( false === $success ) { - // @TODO: Save the failure info somewhere so the user can review it later - // and either retry or provide their own asset. - // Meanwhile, we may either halt the content import, or provide a placeholder - // asset. - _doing_it_wrong( __METHOD__, "Failed to fetch attachment '$raw_url' from '$downloadable_url'", '__WP_VERSION__' ); + $url = $this->rewrite_attachment_url( $raw_url, $context_path ); + $asset_filename = $this->new_asset_filename( $raw_url ); + $output_path = $this->options['uploads_path'] . '/' . ltrim( $asset_filename, '/' ); + + if ( file_exists( $output_path ) ) { + // @TODO: Reconsider the return value. The enqueuing operation failed, + // but overall already having a file seems like a success. + return true; + } + + $output_dir = dirname( $output_path ); + if ( ! file_exists( $output_dir ) ) { + // @TODO: think through the chmod of the created directory. + mkdir( $output_dir, 0777, true ); } - return $success; + + $protocol = parse_url( $url, PHP_URL_SCHEME ); + if ( null === $protocol ) { + return false; + } + + switch ( $protocol ) { + case 'file': + $local_path = parse_url( $url, PHP_URL_PATH ); + if ( false === $local_path ) { + return false; + } + // Just copy the file over. + // @TODO: think through the chmod of the created file. + return copy( $local_path, $output_path ); + case 'http': + case 'https': + $request = new Request( $url ); + $this->output_paths[ $request->id ] = $output_path; + $this->client->enqueue( $request ); + return true; + } + + // @TODO: Save the failure info somewhere so the user can review it later + // and either retry or provide their own asset. + // Meanwhile, we may either halt the content import, or provide a placeholder + // asset. + _doing_it_wrong( __METHOD__, "Failed to fetch attachment '$raw_url' from '$url'", '__WP_VERSION__' ); } protected function rewrite_attachment_url( string $raw_url, $context_path = null ) { @@ -310,4 +435,79 @@ protected function url_processor_matched_asset_url( WP_Block_Markup_Url_Processo ( ! $this->source_site_url || url_matches( $p->get_parsed_url(), $this->source_site_url ) ) ); } + + private $client; + private $fps = array(); + private $partial_files = array(); + private $output_paths = array(); + + private function has_pending_requests() { + return count( $this->client->get_active_requests() ) > 0; + } + + public function download_queue_full() { + return count( $this->client->get_active_requests() ) >= 10; + } + + public function poll_attachments() { + if ( ! $this->client->await_next_event() ) { + return false; + } + $event = $this->client->get_event(); + $request = $this->client->get_request(); + // The request object we get from the client may be a redirect. + // Let's keep referring to the original request. + $original_request_id = $request->original_request()->id; + + switch ( $event ) { + case Client::EVENT_GOT_HEADERS: + if ( ! $request->is_redirected() ) { + if ( file_exists( $this->output_paths[ $original_request_id ] . '.partial' ) ) { + unlink( $this->output_paths[ $original_request_id ] . '.partial' ); + } + $this->fps[ $original_request_id ] = fopen( $this->output_paths[ $original_request_id ] . '.partial', 'wb' ); + if ( false === $this->fps[ $original_request_id ] ) { + // @TODO: Log an error. + } + } + break; + case Client::EVENT_BODY_CHUNK_AVAILABLE: + $chunk = $this->client->get_response_body_chunk(); + if ( false === fwrite( $this->fps[ $original_request_id ], $chunk ) ) { + // @TODO: Log an error. + } + break; + case Client::EVENT_FAILED: + if ( isset( $this->fps[ $original_request_id ] ) ) { + fclose( $this->fps[ $original_request_id ] ); + } + if ( isset( $this->partial_files[ $original_request_id ] ) ) { + $partial_file = $this->options['uploads_path'] . '/' . $this->output_paths[ $original_request_id ] . '.partial'; + if ( file_exists( $partial_file ) ) { + unlink( $partial_file ); + } + } + unset( $this->output_paths[ $original_request_id ] ); + break; + case Client::EVENT_FINISHED: + if ( ! $request->is_redirected() ) { + // Only clean up if this was the last request in the chain. + if ( isset( $this->fps[ $original_request_id ] ) ) { + fclose( $this->fps[ $original_request_id ] ); + } + if ( isset( $this->output_paths[ $original_request_id ] ) ) { + if ( false === rename( + $this->output_paths[ $original_request_id ] . '.partial', + $this->output_paths[ $original_request_id ] + ) ) { + // @TODO: Log an error. + } + } + unset( $this->partial_files[ $original_request_id ] ); + } + break; + } + + return true; + } } From 8c162b77f2fdbd34f7378bba540efec0d51ebd0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= Date: Mon, 18 Nov 2024 23:34:55 +0100 Subject: [PATCH 02/12] Remove $this->enqueue_attachments_referenced_in_post( --- .../src/import/WP_Stream_Importer.php | 58 ++++++------------- .../WP_Markdown_Directory_Tree_Reader.php | 5 +- .../data-liberation/src/wxr/WP_WXR_Reader.php | 11 +++- 3 files changed, 31 insertions(+), 43 deletions(-) diff --git a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php index d55ec10193..37d18f86c3 100644 --- a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php +++ b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php @@ -162,14 +162,16 @@ public function frontload_assets_step() { * might want to provide an "image not found" placeholder OR ignore the * failure. */ - $this->poll_attachments(); + if(false === $this->poll_attachments()) { + return; + } /** * @TODO: Update the download progress: * * After every downloaded file. * * For large files, every time a full megabyte is downloaded above 10MB. */ /** - * @TODO: Advance the cursor to the oldest successful download. For example: + * @TODO: Advance the cursor to the oldest finished download. For example: * * * We've started downloading files A, B, C, and D in this order. * * D is the first to finish. We don't do anything yet. @@ -190,14 +192,19 @@ public function frontload_assets_step() { $this->source_site_url = $data['option_value']; } elseif ( 'post' === $entity->get_type() ) { if ( isset( $data['post_type'] ) && $data['post_type'] === 'attachment' ) { - // Download media attachment entities. - $this->enqueue_attachment_download( - $data['attachment_url'] - ); + $this->enqueue_attachment_download($data['attachment_url'], null); } elseif ( isset( $data['post_content'] ) ) { - $this->enqueue_attachments_referenced_in_post( - $data - ); + $post = $data; + $p = new WP_Block_Markup_Url_Processor( $post['post_content'], $this->source_site_url ); + while ( $p->next_url() ) { + if ( ! $this->url_processor_matched_asset_url( $p ) ) { + continue; + } + $this->enqueue_attachment_download( + $p->get_raw_url(), + $post['source_path'] ?? $post['slug'] ?? null + ); + } } } $this->entities_iterator->next(); @@ -330,35 +337,6 @@ private function new_asset_filename( string $raw_asset_url ) { return $filename; } - /** - * Infers and enqueues the attachments URLs from the post content. - * - * Why not just emit the attachment URLs from WP_Markdown_Directory_Tree_Reader - * as other entities? - * - * Whether it's Markdown, static HTML, or another static file format, - * we'll need to recover the attachment URLs from the We can either - * have a separate pipeline step for that, or burden every format - * reader with reimplementing the same logic. So let's just keep it - * separated. - */ - protected function enqueue_attachments_referenced_in_post( $post ) { - $p = new WP_Block_Markup_Url_Processor( $post['post_content'], $this->source_site_url ); - while ( $p->next_url() ) { - if ( ! $this->url_processor_matched_asset_url( $p ) ) { - continue; - } - - $enqueued = $this->enqueue_attachment_download( - $p->get_raw_url(), - $post['source_path'] ?? $post['slug'] ?? null - ); - if ( false === $enqueued ) { - continue; - } - } - } - protected function enqueue_attachment_download( string $raw_url, $context_path = null ) { $url = $this->rewrite_attachment_url( $raw_url, $context_path ); $asset_filename = $this->new_asset_filename( $raw_url ); @@ -403,6 +381,7 @@ protected function enqueue_attachment_download( string $raw_url, $context_path = // Meanwhile, we may either halt the content import, or provide a placeholder // asset. _doing_it_wrong( __METHOD__, "Failed to fetch attachment '$raw_url' from '$url'", '__WP_VERSION__' ); + return false; } protected function rewrite_attachment_url( string $raw_url, $context_path = null ) { @@ -450,9 +429,6 @@ public function download_queue_full() { } public function poll_attachments() { - if ( ! $this->client->await_next_event() ) { - return false; - } $event = $this->client->get_event(); $request = $this->client->get_request(); // The request object we get from the client may be a redirect. diff --git a/packages/playground/data-liberation/src/markdown-api/WP_Markdown_Directory_Tree_Reader.php b/packages/playground/data-liberation/src/markdown-api/WP_Markdown_Directory_Tree_Reader.php index e69571b3af..733584425e 100644 --- a/packages/playground/data-liberation/src/markdown-api/WP_Markdown_Directory_Tree_Reader.php +++ b/packages/playground/data-liberation/src/markdown-api/WP_Markdown_Directory_Tree_Reader.php @@ -15,6 +15,7 @@ class WP_Markdown_Directory_Tree_Reader implements Iterator { private $parent_ids = array(); private $next_post_id; private $is_finished = false; + private $entities_read_so_far = 0; public function __construct( $root_dir, $first_post_id ) { $this->file_visitor = new WP_File_Visitor( realpath( $root_dir ) ); @@ -38,6 +39,7 @@ public function next_entity() { } $post_id = $this->next_post_id; ++$this->next_post_id; + ++$this->entities_read_so_far; $this->entity = $this->markdown_to_post_entity( array( 'markdown' => $markdown, @@ -66,6 +68,7 @@ public function next_entity() { ) ); ++$this->next_post_id; + ++$this->entities_read_so_far; return true; } @@ -264,7 +267,7 @@ public function next(): void { } public function key(): int { - return 0; + return $this->entities_read_so_far - 1; } public function valid(): bool { diff --git a/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php b/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php index c88ce0107d..f09ad00b6c 100644 --- a/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php +++ b/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php @@ -175,6 +175,14 @@ class WP_WXR_Reader implements Iterator { */ private $entity_finished = false; + /** + * The number of entities read so far. + * + * @since WP_VERSION + * @var int + */ + private $entities_read_so_far = 0; + /** * The attributes from the last opening tag. * @@ -834,6 +842,7 @@ private function emit_entity() { $this->entity_data['taxonomy'] = 'category'; } $this->entity_finished = true; + ++$this->entities_read_so_far; } /** @@ -880,7 +889,7 @@ public function next(): void { } public function key(): int { - return 0; + return $this->entities_read_so_far - 1; } public function valid(): bool { From 23f24d9505c45b712b3705ecd97ec9af7f4f076a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= Date: Tue, 19 Nov 2024 00:45:47 +0100 Subject: [PATCH 03/12] Restore WP_Attachment_Downloader, advance the entity cursor to the oldest entity whose downloads were finalized --- .../playground/data-liberation/bootstrap.php | 1 + .../src/import/WP_Stream_Importer.php | 290 ++++++++---------- .../WP_Markdown_Directory_Tree_Reader.php | 2 +- .../tests/import/blueprint-import.json | 1 + 4 files changed, 131 insertions(+), 163 deletions(-) diff --git a/packages/playground/data-liberation/bootstrap.php b/packages/playground/data-liberation/bootstrap.php index e65d26aa0a..b99b02fd31 100644 --- a/packages/playground/data-liberation/bootstrap.php +++ b/packages/playground/data-liberation/bootstrap.php @@ -51,6 +51,7 @@ require_once __DIR__ . '/src/import/WP_File_Visitor.php'; require_once __DIR__ . '/src/import/WP_File_Visitor_Event.php'; require_once __DIR__ . '/src/import/WP_Imported_Entity.php'; +require_once __DIR__ . '/src/import/WP_Attachment_Downloader.php'; require_once __DIR__ . '/src/import/WP_Stream_Importer.php'; require_once __DIR__ . '/src/import/WP_Markdown_Importer.php'; diff --git a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php index 37d18f86c3..1905ef2946 100644 --- a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php +++ b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php @@ -98,6 +98,7 @@ protected function __construct( * Iterator that streams entities to import. */ private $entities_iterator; + private $entities_iterator_cursor; /** * The WordPress entity importer instance. @@ -127,6 +128,11 @@ public function next_step() { } } + private $downloader; + private $entity_cursors = array(); + private $entity_to_urls = array(); + private $urls_to_entities = array(); + /** * Downloads all the assets referenced in the imported entities. * @@ -139,75 +145,119 @@ public function frontload_assets_step() { $factory = $this->entity_iterator_factory; $this->entities_iterator = $factory(); // @TODO: Seek to the last processed entity if we have a cursor. - $this->client = new \WordPress\AsyncHttp\Client(); + $this->downloader = new WP_Attachment_Downloader( $this->options ); + } + + while ( $this->downloader->next_event() ) { + $event = $this->downloader->get_event(); + switch ( $event->type ) { + case WP_Attachment_Downloader_Event::SUCCESS: + case WP_Attachment_Downloader_Event::FAILURE: + $this->pop_downloaded_url( $event->url ); + break; + } } - if ( ! $this->entities_iterator->valid() && ! $this->has_pending_requests() ) { + /** + * Advance the cursor to the oldest finished download. For example: + * + * * We've started downloading files A, B, C, and D in this order. + * * D is the first to finish. We don't do anything yet. + * * A finishes next. We advance the cursor to A. + * * C finishes next. We don't do anything. + * * Then we pause. + * + * When we resume, we'll start where we left off, which is after A. The + * downloader will enqueue B for download and will skip C and D since + * the relevant files already exist in the filesystem. + */ + while ( count( $this->entity_to_urls ) > 0 ) { + $oldest_entity_key = key( $this->entity_to_urls ); + if ( null === $oldest_entity_key ) { + break; + } + if ( ! empty( $this->entity_to_urls[ $oldest_entity_key ] ) ) { + break; + } + $this->entities_iterator_cursor = $this->entity_cursors[ $oldest_entity_key ]; + unset( $this->entity_cursors[ $oldest_entity_key ] ); + unset( $this->entity_to_urls[ $oldest_entity_key ] ); + } + + // We're done if all the entities are processed and all the downloads are finished. + if ( ! $this->entities_iterator->valid() && ! $this->downloader->has_pending_requests() ) { $this->state = self::STATE_IMPORT_ENTITIES; - $this->client = null; + $this->downloader = null; $this->entities_iterator = null; - return; + return false; } - $only_downloader_pending = ! $this->entities_iterator->valid() && $this->has_pending_requests(); - if ( $this->download_queue_full() || $only_downloader_pending ) { - // Twiddle our thumbs as the downloader processes the requests... + // Poll the data periodically. + $only_downloader_pending = ! $this->entities_iterator->valid() && $this->downloader->has_pending_requests(); + if ( $this->downloader->queue_full() || $only_downloader_pending ) { /** * @TODO: - * * Consider inlining the downloader code into this class. * * Process and store failures. * E.g. what if the attachment is not found? Error out? Ignore? In a UI-based * importer scenario, this is the time to log a failure to let the user * fix it later on. In a CLI-based Blueprint step importer scenario, we * might want to provide an "image not found" placeholder OR ignore the * failure. - */ - if(false === $this->poll_attachments()) { - return; - } - /** + * * @TODO: Update the download progress: * * After every downloaded file. * * For large files, every time a full megabyte is downloaded above 10MB. */ - /** - * @TODO: Advance the cursor to the oldest finished download. For example: - * - * * We've started downloading files A, B, C, and D in this order. - * * D is the first to finish. We don't do anything yet. - * * A finishes next. We advance the cursor to A. - * * C finishes next. We don't do anything. - * * Then we pause. - * - * When we resume, we'll start where we left off, which is after A. The - * downloader will enqueue B for download and will skip C and D since - * the relevant files already exist in the filesystem. - */ - return; + return $this->downloader->poll(); } + /** + * Identify the static assets referenced in the current entity + * and enqueue them for download. + */ + $entity_key = $this->entities_iterator->key(); + + /** + * Store the cursor for the next entity. We'll advance later on + * when all its downloads are done. + * + * @TODO: Cleanup or skip over stale cursors. When processing slow downloads, + * we could easily end up with millions of entries in $this->entity_cursors. + */ + $this->entity_to_urls[ $entity_key ] = array(); + $this->entity_cursors[ $entity_key ] = $this->entities_iterator->pause(); + $entity = $this->entities_iterator->current(); $data = $entity->get_data(); - if ( 'site_option' === $entity->get_type() && $data['option_name'] === 'home' ) { - $this->source_site_url = $data['option_value']; - } elseif ( 'post' === $entity->get_type() ) { - if ( isset( $data['post_type'] ) && $data['post_type'] === 'attachment' ) { - $this->enqueue_attachment_download($data['attachment_url'], null); - } elseif ( isset( $data['post_content'] ) ) { - $post = $data; - $p = new WP_Block_Markup_Url_Processor( $post['post_content'], $this->source_site_url ); - while ( $p->next_url() ) { - if ( ! $this->url_processor_matched_asset_url( $p ) ) { - continue; + switch ( $entity->get_type() ) { + case 'site_option': + if ( $data['option_name'] === 'home' ) { + $this->source_site_url = $data['option_value']; + } + break; + case 'post': + if ( isset( $data['post_type'] ) && $data['post_type'] === 'attachment' ) { + $this->enqueue_attachment_download( $data['attachment_url'], null ); + } elseif ( isset( $data['post_content'] ) ) { + $post = $data; + $p = new WP_Block_Markup_Url_Processor( $post['post_content'], $this->source_site_url ); + while ( $p->next_url() ) { + if ( ! $this->url_processor_matched_asset_url( $p ) ) { + continue; + } + $this->enqueue_attachment_download( + $p->get_raw_url(), + $post['source_path'] ?? $post['slug'] ?? null + ); } - $this->enqueue_attachment_download( - $p->get_raw_url(), - $post['source_path'] ?? $post['slug'] ?? null - ); } - } + break; } + + // Move on to the next entity. $this->entities_iterator->next(); + + return true; } /** @@ -290,6 +340,41 @@ public function import_entities_step() { $this->entities_iterator->next(); } + protected function enqueue_attachment_download( string $raw_url, $context_path = null ) { + $url = $this->rewrite_attachment_url( $raw_url, $context_path ); + $asset_filename = $this->new_asset_filename( $raw_url ); + $output_path = $this->options['uploads_path'] . '/' . ltrim( $asset_filename, '/' ); + + $this->push_downloaded_url( $url, $this->entities_iterator->key() ); + return $this->downloader->enqueue_if_not_exists( $url, $output_path ); + } + + /** + * For cursor advancement purposes. + * Marks an URL as being downloaded in relation to the current entity. + */ + protected function push_downloaded_url( string $url, $entity_key ) { + if ( ! isset( $this->entity_to_urls[ $entity_key ] ) ) { + $this->entity_to_urls[ $entity_key ] = array(); + } + if ( ! isset( $this->urls_to_entities[ $url ] ) ) { + $this->urls_to_entities[ $url ] = array(); + } + $this->entity_to_urls[ $entity_key ][ $url ] = true; + $this->urls_to_entities[ $url ][ $entity_key ] = true; + } + + protected function pop_downloaded_url( string $url ) { + $entity_keys = array_keys( $this->urls_to_entities[ $url ] ); + foreach ( $entity_keys as $entity_key ) { + unset( $this->urls_to_entities[ $url ][ $entity_key ] ); + unset( $this->entity_to_urls[ $entity_key ][ $url ] ); + } + if ( empty( $this->urls_to_entities[ $url ] ) ) { + unset( $this->urls_to_entities[ $url ] ); + } + } + /** * The downloaded file name is based on the URL hash. * @@ -337,53 +422,6 @@ private function new_asset_filename( string $raw_asset_url ) { return $filename; } - protected function enqueue_attachment_download( string $raw_url, $context_path = null ) { - $url = $this->rewrite_attachment_url( $raw_url, $context_path ); - $asset_filename = $this->new_asset_filename( $raw_url ); - $output_path = $this->options['uploads_path'] . '/' . ltrim( $asset_filename, '/' ); - - if ( file_exists( $output_path ) ) { - // @TODO: Reconsider the return value. The enqueuing operation failed, - // but overall already having a file seems like a success. - return true; - } - - $output_dir = dirname( $output_path ); - if ( ! file_exists( $output_dir ) ) { - // @TODO: think through the chmod of the created directory. - mkdir( $output_dir, 0777, true ); - } - - $protocol = parse_url( $url, PHP_URL_SCHEME ); - if ( null === $protocol ) { - return false; - } - - switch ( $protocol ) { - case 'file': - $local_path = parse_url( $url, PHP_URL_PATH ); - if ( false === $local_path ) { - return false; - } - // Just copy the file over. - // @TODO: think through the chmod of the created file. - return copy( $local_path, $output_path ); - case 'http': - case 'https': - $request = new Request( $url ); - $this->output_paths[ $request->id ] = $output_path; - $this->client->enqueue( $request ); - return true; - } - - // @TODO: Save the failure info somewhere so the user can review it later - // and either retry or provide their own asset. - // Meanwhile, we may either halt the content import, or provide a placeholder - // asset. - _doing_it_wrong( __METHOD__, "Failed to fetch attachment '$raw_url' from '$url'", '__WP_VERSION__' ); - return false; - } - protected function rewrite_attachment_url( string $raw_url, $context_path = null ) { if ( WP_URL::can_parse( $raw_url ) ) { // Absolute URL, nothing to do. @@ -414,76 +452,4 @@ protected function url_processor_matched_asset_url( WP_Block_Markup_Url_Processo ( ! $this->source_site_url || url_matches( $p->get_parsed_url(), $this->source_site_url ) ) ); } - - private $client; - private $fps = array(); - private $partial_files = array(); - private $output_paths = array(); - - private function has_pending_requests() { - return count( $this->client->get_active_requests() ) > 0; - } - - public function download_queue_full() { - return count( $this->client->get_active_requests() ) >= 10; - } - - public function poll_attachments() { - $event = $this->client->get_event(); - $request = $this->client->get_request(); - // The request object we get from the client may be a redirect. - // Let's keep referring to the original request. - $original_request_id = $request->original_request()->id; - - switch ( $event ) { - case Client::EVENT_GOT_HEADERS: - if ( ! $request->is_redirected() ) { - if ( file_exists( $this->output_paths[ $original_request_id ] . '.partial' ) ) { - unlink( $this->output_paths[ $original_request_id ] . '.partial' ); - } - $this->fps[ $original_request_id ] = fopen( $this->output_paths[ $original_request_id ] . '.partial', 'wb' ); - if ( false === $this->fps[ $original_request_id ] ) { - // @TODO: Log an error. - } - } - break; - case Client::EVENT_BODY_CHUNK_AVAILABLE: - $chunk = $this->client->get_response_body_chunk(); - if ( false === fwrite( $this->fps[ $original_request_id ], $chunk ) ) { - // @TODO: Log an error. - } - break; - case Client::EVENT_FAILED: - if ( isset( $this->fps[ $original_request_id ] ) ) { - fclose( $this->fps[ $original_request_id ] ); - } - if ( isset( $this->partial_files[ $original_request_id ] ) ) { - $partial_file = $this->options['uploads_path'] . '/' . $this->output_paths[ $original_request_id ] . '.partial'; - if ( file_exists( $partial_file ) ) { - unlink( $partial_file ); - } - } - unset( $this->output_paths[ $original_request_id ] ); - break; - case Client::EVENT_FINISHED: - if ( ! $request->is_redirected() ) { - // Only clean up if this was the last request in the chain. - if ( isset( $this->fps[ $original_request_id ] ) ) { - fclose( $this->fps[ $original_request_id ] ); - } - if ( isset( $this->output_paths[ $original_request_id ] ) ) { - if ( false === rename( - $this->output_paths[ $original_request_id ] . '.partial', - $this->output_paths[ $original_request_id ] - ) ) { - // @TODO: Log an error. - } - } - unset( $this->partial_files[ $original_request_id ] ); - } - break; - } - - return true; - } } diff --git a/packages/playground/data-liberation/src/markdown-api/WP_Markdown_Directory_Tree_Reader.php b/packages/playground/data-liberation/src/markdown-api/WP_Markdown_Directory_Tree_Reader.php index 733584425e..b8695096df 100644 --- a/packages/playground/data-liberation/src/markdown-api/WP_Markdown_Directory_Tree_Reader.php +++ b/packages/playground/data-liberation/src/markdown-api/WP_Markdown_Directory_Tree_Reader.php @@ -14,7 +14,7 @@ class WP_Markdown_Directory_Tree_Reader implements Iterator { private $pending_files = array(); private $parent_ids = array(); private $next_post_id; - private $is_finished = false; + private $is_finished = false; private $entities_read_so_far = 0; public function __construct( $root_dir, $first_post_id ) { diff --git a/packages/playground/data-liberation/tests/import/blueprint-import.json b/packages/playground/data-liberation/tests/import/blueprint-import.json index c0ec10766a..d34478b3aa 100644 --- a/packages/playground/data-liberation/tests/import/blueprint-import.json +++ b/packages/playground/data-liberation/tests/import/blueprint-import.json @@ -2,6 +2,7 @@ "$schema": "../../../blueprints/public/blueprint-schema.json", "constants": { "WP_DEBUG": true, + "WP_DEBUG_DISPLAY": true, "WP_DEBUG_LOG": true }, "login": true, From 090a972243ead3bad1adb349616049aff012ac52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= Date: Tue, 19 Nov 2024 01:04:58 +0100 Subject: [PATCH 04/12] Keep doodling --- .../src/import/WP_Attachment_Downloader.php | 186 ++++++++++++++++++ .../src/import/WP_Stream_Importer.php | 92 +++------ 2 files changed, 209 insertions(+), 69 deletions(-) create mode 100644 packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php diff --git a/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php b/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php new file mode 100644 index 0000000000..40488b821c --- /dev/null +++ b/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php @@ -0,0 +1,186 @@ +client = new Client(); + $this->output_root = $output_root; + } + + public function has_pending_requests() { + return count( $this->client->get_active_requests() ) > 0; + } + + public function enqueue_if_not_exists( $url, $output_path ) { + $output_path = $this->output_root . '/' . ltrim( $output_path, '/' ); + if ( file_exists( $output_path ) ) { + // @TODO: Reconsider the return value. The enqueuing operation failed, + // but overall already having a file seems like a success. + return true; + } + + $output_dir = dirname( $output_path ); + if ( ! file_exists( $output_dir ) ) { + // @TODO: think through the chmod of the created directory. + mkdir( $output_dir, 0777, true ); + } + + $protocol = parse_url( $url, PHP_URL_SCHEME ); + if ( null === $protocol ) { + return false; + } + + switch ( $protocol ) { + case 'file': + $local_path = parse_url( $url, PHP_URL_PATH ); + if ( false === $local_path ) { + return false; + } + // Just copy the file over. + // @TODO: think through the chmod of the created file. + $this->pending_events[] = new WP_Attachment_Downloader_Event( + WP_Attachment_Downloader_Event::STARTED, + $url, + $output_path + ); + $success = copy( $local_path, $output_path ); + $this->pending_events[] = new WP_Attachment_Downloader_Event( + $success ? WP_Attachment_Downloader_Event::SUCCESS : WP_Attachment_Downloader_Event::FAILURE, + $url, + $output_path + ); + return true; + case 'http': + case 'https': + $request = new Request( $url ); + $this->output_paths[ $request->id ] = $output_path; + $this->pending_events[] = new WP_Attachment_Downloader_Event( + WP_Attachment_Downloader_Event::STARTED, + $url, + $output_path + ); + $this->client->enqueue( $request ); + return true; + } + return false; + } + + public function queue_full() { + return count( $this->client->get_active_requests() ) >= 10; + } + + public function get_event() { + return $this->current_event; + } + + public function next_event() { + $this->current_event = null; + if ( count( $this->pending_events ) === 0 ) { + return false; + } + + $this->current_event = array_shift( $this->pending_events ); + return true; + } + + public function poll() { + if ( ! $this->client->await_next_event() ) { + return false; + } + $event = $this->client->get_event(); + $request = $this->client->get_request(); + // The request object we get from the client may be a redirect. + // Let's keep referring to the original request. + $original_request_id = $request->original_request()->id; + + while ( true ) { + switch ( $event ) { + case Client::EVENT_GOT_HEADERS: + if ( ! $request->is_redirected() ) { + if ( file_exists( $this->output_paths[ $original_request_id ] . '.partial' ) ) { + unlink( $this->output_paths[ $original_request_id ] . '.partial' ); + } + $this->fps[ $original_request_id ] = fopen( $this->output_paths[ $original_request_id ] . '.partial', 'wb' ); + if ( false === $this->fps[ $original_request_id ] ) { + // @TODO: Log an error. + } + } + break; + case Client::EVENT_BODY_CHUNK_AVAILABLE: + $chunk = $this->client->get_response_body_chunk(); + if ( false === fwrite( $this->fps[ $original_request_id ], $chunk ) ) { + // @TODO: Log an error. + } + break; + case Client::EVENT_FAILED: + if ( isset( $this->fps[ $original_request_id ] ) ) { + fclose( $this->fps[ $original_request_id ] ); + } + if ( isset( $this->output_paths[ $original_request_id ] ) ) { + $partial_file = $this->output_root . '/' . $this->output_paths[ $original_request_id ] . '.partial'; + if ( file_exists( $partial_file ) ) { + unlink( $partial_file ); + } + } + $this->pending_events[] = new WP_Attachment_Downloader_Event( + WP_Attachment_Downloader_Event::FAILURE, + $request->original_request()->url, + $this->output_paths[ $original_request_id ] + ); + unset( $this->output_paths[ $original_request_id ] ); + break; + case Client::EVENT_FINISHED: + if ( ! $request->is_redirected() ) { + // Only clean up if this was the last request in the chain. + if ( isset( $this->fps[ $original_request_id ] ) ) { + fclose( $this->fps[ $original_request_id ] ); + } + if ( isset( $this->output_paths[ $original_request_id ] ) ) { + if ( false === rename( + $this->output_paths[ $original_request_id ] . '.partial', + $this->output_paths[ $original_request_id ] + ) ) { + // @TODO: Log an error. + } + } + $this->pending_events[] = new WP_Attachment_Downloader_Event( + WP_Attachment_Downloader_Event::SUCCESS, + $request->original_request()->url, + $this->output_paths[ $original_request_id ] + ); + unset( $this->output_paths[ $original_request_id ] ); + } + break; + } + } + + return true; + } +} + +class WP_Attachment_Downloader_Event { + + const STARTED = '#started'; + const SUCCESS = '#success'; + const FAILURE = '#failure'; + + public $type; + public $url; + public $target_path; + + public function __construct( $type, $url, $target_path = null ) { + $this->type = $type; + $this->url = $url; + $this->target_path = $target_path; + } +} diff --git a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php index 1905ef2946..f1ba1937d4 100644 --- a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php +++ b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php @@ -129,9 +129,7 @@ public function next_step() { } private $downloader; - private $entity_cursors = array(); - private $entity_to_urls = array(); - private $urls_to_entities = array(); + private $active_downloads = array(); /** * Downloads all the assets referenced in the imported entities. @@ -148,46 +146,36 @@ public function frontload_assets_step() { $this->downloader = new WP_Attachment_Downloader( $this->options ); } + /** + * Keep track of active downloads for pausing and resuming purposes. + */ while ( $this->downloader->next_event() ) { $event = $this->downloader->get_event(); switch ( $event->type ) { + case WP_Attachment_Downloader_Event::STARTED: + if( ! isset( $this->active_downloads[ $event->url ] ) ) { + $this->active_downloads[ $event->url ] = array(); + } + $this->active_downloads[ $event->url ][ $event->target_path ] = true; + break; case WP_Attachment_Downloader_Event::SUCCESS: case WP_Attachment_Downloader_Event::FAILURE: - $this->pop_downloaded_url( $event->url ); + if( ! isset( $this->active_downloads[ $event->url ] ) ) { + continue; + } + unset( $this->active_downloads[ $event->url ][ $event->target_path ] ); + if ( empty( $this->active_downloads[ $event->url ] ) ) { + unset( $this->active_downloads[ $event->url ] ); + } break; } } - /** - * Advance the cursor to the oldest finished download. For example: - * - * * We've started downloading files A, B, C, and D in this order. - * * D is the first to finish. We don't do anything yet. - * * A finishes next. We advance the cursor to A. - * * C finishes next. We don't do anything. - * * Then we pause. - * - * When we resume, we'll start where we left off, which is after A. The - * downloader will enqueue B for download and will skip C and D since - * the relevant files already exist in the filesystem. - */ - while ( count( $this->entity_to_urls ) > 0 ) { - $oldest_entity_key = key( $this->entity_to_urls ); - if ( null === $oldest_entity_key ) { - break; - } - if ( ! empty( $this->entity_to_urls[ $oldest_entity_key ] ) ) { - break; - } - $this->entities_iterator_cursor = $this->entity_cursors[ $oldest_entity_key ]; - unset( $this->entity_cursors[ $oldest_entity_key ] ); - unset( $this->entity_to_urls[ $oldest_entity_key ] ); - } - // We're done if all the entities are processed and all the downloads are finished. if ( ! $this->entities_iterator->valid() && ! $this->downloader->has_pending_requests() ) { $this->state = self::STATE_IMPORT_ENTITIES; $this->downloader = null; + $this->active_downloads = array(); $this->entities_iterator = null; return false; } @@ -215,18 +203,6 @@ public function frontload_assets_step() { * Identify the static assets referenced in the current entity * and enqueue them for download. */ - $entity_key = $this->entities_iterator->key(); - - /** - * Store the cursor for the next entity. We'll advance later on - * when all its downloads are done. - * - * @TODO: Cleanup or skip over stale cursors. When processing slow downloads, - * we could easily end up with millions of entries in $this->entity_cursors. - */ - $this->entity_to_urls[ $entity_key ] = array(); - $this->entity_cursors[ $entity_key ] = $this->entities_iterator->pause(); - $entity = $this->entities_iterator->current(); $data = $entity->get_data(); switch ( $entity->get_type() ) { @@ -254,6 +230,11 @@ public function frontload_assets_step() { break; } + /** + * @TODO: Update the progress information. + * @TODO: Save the freshly requested URLs to the cursor. + */ + // Move on to the next entity. $this->entities_iterator->next(); @@ -345,36 +326,9 @@ protected function enqueue_attachment_download( string $raw_url, $context_path = $asset_filename = $this->new_asset_filename( $raw_url ); $output_path = $this->options['uploads_path'] . '/' . ltrim( $asset_filename, '/' ); - $this->push_downloaded_url( $url, $this->entities_iterator->key() ); return $this->downloader->enqueue_if_not_exists( $url, $output_path ); } - /** - * For cursor advancement purposes. - * Marks an URL as being downloaded in relation to the current entity. - */ - protected function push_downloaded_url( string $url, $entity_key ) { - if ( ! isset( $this->entity_to_urls[ $entity_key ] ) ) { - $this->entity_to_urls[ $entity_key ] = array(); - } - if ( ! isset( $this->urls_to_entities[ $url ] ) ) { - $this->urls_to_entities[ $url ] = array(); - } - $this->entity_to_urls[ $entity_key ][ $url ] = true; - $this->urls_to_entities[ $url ][ $entity_key ] = true; - } - - protected function pop_downloaded_url( string $url ) { - $entity_keys = array_keys( $this->urls_to_entities[ $url ] ); - foreach ( $entity_keys as $entity_key ) { - unset( $this->urls_to_entities[ $url ][ $entity_key ] ); - unset( $this->entity_to_urls[ $entity_key ][ $url ] ); - } - if ( empty( $this->urls_to_entities[ $url ] ) ) { - unset( $this->urls_to_entities[ $url ] ); - } - } - /** * The downloaded file name is based on the URL hash. * From 9f5ffcd97acbcbd9986e153f4b787fc3c83a06b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= Date: Tue, 19 Nov 2024 17:45:03 +0100 Subject: [PATCH 05/12] Re-entrant frontloading in WP_Stream_Importer --- .../playground/data-liberation/plugin.php | 38 +++++ .../src/byte-readers/WP_File_Reader.php | 13 +- .../src/import/WP_Attachment_Downloader.php | 32 +++- .../src/import/WP_Stream_Importer.php | 151 ++++++++++++------ .../data-liberation/src/wxr/WP_WXR_Reader.php | 20 ++- .../src/xml-api/WP_XML_Processor.php | 26 +-- 6 files changed, 207 insertions(+), 73 deletions(-) diff --git a/packages/playground/data-liberation/plugin.php b/packages/playground/data-liberation/plugin.php index fc5acb9ea3..5337e318d8 100644 --- a/packages/playground/data-liberation/plugin.php +++ b/packages/playground/data-liberation/plugin.php @@ -25,6 +25,44 @@ return []; }); +/** + * Development debug code to run the import manually. + * @TODO: Remove this in favor of a CLI command. + */ +add_action('init', function() { + echo ''; + $wxr_path = __DIR__ . '/tests/fixtures/wxr-simple.xml'; + $entity_iterator_factory = function() use ($wxr_path) { + $wxr = new WP_WXR_Reader(); + $wxr->connect_upstream(new WP_File_Reader($wxr_path)); + + return $wxr; + }; + $importer = WP_Stream_Importer::create( + $entity_iterator_factory + ); + $importer->next_step(); + $importer->next_step(); + $importer->next_step(); + $importer->next_step(); + $importer->next_step(); + $importer->next_step(); + $paused_importer_state = $importer->pause(); + + $importer2 = WP_Stream_Importer::create( + $entity_iterator_factory + ); + $importer2->resume($paused_importer_state); + $importer2->next_step(); + $importer2->next_step(); + $importer2->next_step(); + // $importer2->next_step(); + + // var_dump($importer2); + + die("YAY"); +}); + // Register admin menu add_action('admin_menu', function() { add_menu_page( diff --git a/packages/playground/data-liberation/src/byte-readers/WP_File_Reader.php b/packages/playground/data-liberation/src/byte-readers/WP_File_Reader.php index 82e817b6e6..71256fda91 100644 --- a/packages/playground/data-liberation/src/byte-readers/WP_File_Reader.php +++ b/packages/playground/data-liberation/src/byte-readers/WP_File_Reader.php @@ -9,7 +9,8 @@ class WP_File_Reader implements WP_Byte_Reader { protected $chunk_size; protected $file_pointer; protected $offset_in_file; - protected $output_bytes; + protected $output_bytes = ''; + protected $last_chunk_size = 0; protected $last_error; protected $state = self::STATE_STREAMING; @@ -24,7 +25,10 @@ public function __construct( $file_path, $chunk_size = 8096 ) { */ public function pause(): array|bool { return array( - 'offset_in_file' => $this->offset_in_file, + // Save the previous offset, not the current one. + // This way, after resuming, the next read will yield the same $output_bytes + // as we have now. + 'offset_in_file' => $this->offset_in_file - $this->last_chunk_size, ); } @@ -34,6 +38,7 @@ public function resume( $paused_state ): bool { return false; } $this->offset_in_file = $paused_state['offset_in_file']; + $this->last_chunk_size = 0; return true; } @@ -51,6 +56,7 @@ public function get_last_error(): string|null { public function next_bytes(): bool { $this->output_bytes = ''; + $this->last_chunk_size = 0; if ( $this->last_error || $this->is_finished() ) { return false; } @@ -66,7 +72,8 @@ public function next_bytes(): bool { $this->state = static::STATE_FINISHED; return false; } - $this->offset_in_file += strlen( $bytes ); + $this->last_chunk_size = strlen( $bytes ); + $this->offset_in_file += $this->last_chunk_size; $this->output_bytes .= $bytes; return true; } diff --git a/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php b/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php index 40488b821c..64e3f2cdc4 100644 --- a/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php +++ b/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php @@ -11,6 +11,8 @@ class WP_Attachment_Downloader { private $current_event; private $pending_events = array(); + private $downloads_so_far = 0; + private $last_enqueued_resource_id; public function __construct( $output_root ) { $this->client = new Client(); @@ -22,6 +24,8 @@ public function has_pending_requests() { } public function enqueue_if_not_exists( $url, $output_path ) { + $this->last_enqueued_resource_id = null; + $output_path = $this->output_root . '/' . ltrim( $output_path, '/' ); if ( file_exists( $output_path ) ) { // @TODO: Reconsider the return value. The enqueuing operation failed, @@ -40,34 +44,42 @@ public function enqueue_if_not_exists( $url, $output_path ) { return false; } + ++$this->downloads_so_far; switch ( $protocol ) { case 'file': $local_path = parse_url( $url, PHP_URL_PATH ); if ( false === $local_path ) { return false; } + // Just copy the file over. // @TODO: think through the chmod of the created file. + + $this->last_enqueued_resource_id = 'file:' . $this->downloads_so_far; $this->pending_events[] = new WP_Attachment_Downloader_Event( WP_Attachment_Downloader_Event::STARTED, $url, - $output_path + $output_path, + $this->last_enqueued_resource_id ); $success = copy( $local_path, $output_path ); $this->pending_events[] = new WP_Attachment_Downloader_Event( $success ? WP_Attachment_Downloader_Event::SUCCESS : WP_Attachment_Downloader_Event::FAILURE, $url, - $output_path + $output_path, + $this->last_enqueued_resource_id ); return true; case 'http': case 'https': $request = new Request( $url ); + $this->last_enqueued_resource_id = 'http:' . $request->id; $this->output_paths[ $request->id ] = $output_path; $this->pending_events[] = new WP_Attachment_Downloader_Event( WP_Attachment_Downloader_Event::STARTED, $url, - $output_path + $output_path, + $this->last_enqueued_resource_id ); $this->client->enqueue( $request ); return true; @@ -75,6 +87,10 @@ public function enqueue_if_not_exists( $url, $output_path ) { return false; } + public function get_last_enqueued_resource_id() { + return $this->last_enqueued_resource_id; + } + public function queue_full() { return count( $this->client->get_active_requests() ) >= 10; } @@ -135,7 +151,8 @@ public function poll() { $this->pending_events[] = new WP_Attachment_Downloader_Event( WP_Attachment_Downloader_Event::FAILURE, $request->original_request()->url, - $this->output_paths[ $original_request_id ] + $this->output_paths[ $original_request_id ], + 'http:' . $original_request_id ); unset( $this->output_paths[ $original_request_id ] ); break; @@ -156,7 +173,8 @@ public function poll() { $this->pending_events[] = new WP_Attachment_Downloader_Event( WP_Attachment_Downloader_Event::SUCCESS, $request->original_request()->url, - $this->output_paths[ $original_request_id ] + $this->output_paths[ $original_request_id ], + 'http:' . $original_request_id ); unset( $this->output_paths[ $original_request_id ] ); } @@ -177,10 +195,12 @@ class WP_Attachment_Downloader_Event { public $type; public $url; public $target_path; + public $resource_id; - public function __construct( $type, $url, $target_path = null ) { + public function __construct( $type, $url, $target_path, $resource_id ) { $this->type = $type; $this->url = $url; $this->target_path = $target_path; + $this->resource_id = $resource_id; } } diff --git a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php index f1ba1937d4..beed4375ba 100644 --- a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php +++ b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php @@ -44,6 +44,25 @@ class WP_Stream_Importer { */ protected $options; + const STATE_INITIAL = '#initial'; + const STATE_TOPOLOGICAL_SORT = '#topological_sort'; + const STATE_FRONTLOAD_ASSETS = '#frontload_assets'; + const STATE_IMPORT_ENTITIES = '#import_entities'; + const STATE_FINISHED = '#finished'; + + /** + * The current state of the import process. + */ + private $state = self::STATE_INITIAL; + + /** + * Iterator that streams entities to import. + */ + private $entities_iterator; + private $frontloading_state = array(); + public $entities_cursor; + private $downloader; + public static function create( $entity_iterator_factory, $options = array() @@ -52,6 +71,20 @@ public static function create( return new WP_Stream_Importer( $entity_iterator_factory, $options ); } + public function pause() { + return array( + 'state' => $this->state, + 'entities_cursor' => $this->entities_cursor, + ); + } + + public function resume( $paused_state ) { + $this->state = $paused_state['state']; + $this->entities_cursor = $paused_state['entities_cursor']; + // @TODO: Should resume() call next_step() or just prepare the state? + $this->next_step(); + } + protected static function parse_options( $options ) { if ( ! isset( $options['new_site_url'] ) ) { $options['new_site_url'] = get_site_url(); @@ -83,23 +116,6 @@ protected function __construct( } } - const STATE_INITIAL = '#initial'; - const STATE_TOPOLOGICAL_SORT = '#topological_sort'; - const STATE_FRONTLOAD_ASSETS = '#frontload_assets'; - const STATE_IMPORT_ENTITIES = '#import_entities'; - const STATE_FINISHED = '#finished'; - - /** - * The current state of the import process. - */ - private $state = self::STATE_INITIAL; - - /** - * Iterator that streams entities to import. - */ - private $entities_iterator; - private $entities_iterator_cursor; - /** * The WordPress entity importer instance. * @TODO: Consider inlining the importer code into this class. @@ -128,8 +144,43 @@ public function next_step() { } } - private $downloader; - private $active_downloads = array(); + public function advance_frontloading_cursor() { + /** + * Advance the cursor to the oldest finished download. For example: + * + * * We've started downloading files A, B, C, and D in this order. + * * D is the first to finish. We don't do anything yet. + * * A finishes next. We advance the cursor to A. + * * C finishes next. We don't do anything. + * * Then we pause. + * + * When we resume, we'll start where we left off, which is after A. The + * downloader will enqueue B for download and will skip C and D since + * the relevant files already exist in the filesystem. + */ + while ( $this->downloader->next_event() ) { + $event = $this->downloader->get_event(); + switch ( $event->type ) { + case WP_Attachment_Downloader_Event::SUCCESS: + case WP_Attachment_Downloader_Event::FAILURE: + foreach( $this->frontloading_state as $state ) { + $resource_id = $event->resource_id; + unset( $state['active_downloads'][ $resource_id ] ); + } + break; + } + } + + while(count($this->frontloading_state) > 0) { + $oldest_download_key = key( $this->frontloading_state ); + if( ! empty( $this->frontloading_state[ $oldest_download_key ]['active_downloads'] ) ) { + break; + } + // Advance the cursor to the next entity. + $this->entities_cursor = $this->frontloading_state[ $oldest_download_key ]['cursor']; + unset( $this->frontloading_state[ $oldest_download_key ] ); + } + } /** * Downloads all the assets referenced in the imported entities. @@ -142,45 +193,28 @@ public function frontload_assets_step() { if ( null === $this->entities_iterator ) { $factory = $this->entity_iterator_factory; $this->entities_iterator = $factory(); - // @TODO: Seek to the last processed entity if we have a cursor. + if( $this->entities_cursor ) { + $this->entities_iterator->resume( $this->entities_cursor ); + } $this->downloader = new WP_Attachment_Downloader( $this->options ); } - /** - * Keep track of active downloads for pausing and resuming purposes. - */ - while ( $this->downloader->next_event() ) { - $event = $this->downloader->get_event(); - switch ( $event->type ) { - case WP_Attachment_Downloader_Event::STARTED: - if( ! isset( $this->active_downloads[ $event->url ] ) ) { - $this->active_downloads[ $event->url ] = array(); - } - $this->active_downloads[ $event->url ][ $event->target_path ] = true; - break; - case WP_Attachment_Downloader_Event::SUCCESS: - case WP_Attachment_Downloader_Event::FAILURE: - if( ! isset( $this->active_downloads[ $event->url ] ) ) { - continue; - } - unset( $this->active_downloads[ $event->url ][ $event->target_path ] ); - if ( empty( $this->active_downloads[ $event->url ] ) ) { - unset( $this->active_downloads[ $event->url ] ); - } - break; - } - } + $this->advance_frontloading_cursor(); // We're done if all the entities are processed and all the downloads are finished. if ( ! $this->entities_iterator->valid() && ! $this->downloader->has_pending_requests() ) { - $this->state = self::STATE_IMPORT_ENTITIES; - $this->downloader = null; - $this->active_downloads = array(); - $this->entities_iterator = null; + // This is an assertion to make double sure we're emptying the state queue. + if ( ! empty( $this->frontloading_state ) ) { + _doing_it_wrong( __METHOD__, 'Frontloading queue is not empty.', '1.0' ); + } + $this->state = self::STATE_IMPORT_ENTITIES; + $this->downloader = null; + $this->frontloading_state = array(); + $this->entities_iterator = null; return false; } - // Poll the data periodically. + // Poll the bytes between scheduling new downloads. $only_downloader_pending = ! $this->entities_iterator->valid() && $this->downloader->has_pending_requests(); if ( $this->downloader->queue_full() || $only_downloader_pending ) { /** @@ -204,6 +238,15 @@ public function frontload_assets_step() { * and enqueue them for download. */ $entity = $this->entities_iterator->current(); + $entity_key = $this->entities_iterator->key(); + $this->frontloading_state[ $entity_key ] = array( + 'key' => $entity_key, + // @TODO: Consider making key() and pause() the same thing. + // Then seek() and resume() would be the same, too! + 'cursor' => $this->entities_iterator->pause(), + 'active_downloads' => array() + ); + $data = $entity->get_data(); switch ( $entity->get_type() ) { case 'site_option': @@ -238,6 +281,7 @@ public function frontload_assets_step() { // Move on to the next entity. $this->entities_iterator->next(); + $this->advance_frontloading_cursor(); return true; } @@ -265,7 +309,6 @@ public function import_entities_step() { } $entity = $this->entities_iterator->current(); - $attachments = array(); // Rewrite the URLs in the post. switch ( $entity->get_type() ) { @@ -326,7 +369,13 @@ protected function enqueue_attachment_download( string $raw_url, $context_path = $asset_filename = $this->new_asset_filename( $raw_url ); $output_path = $this->options['uploads_path'] . '/' . ltrim( $asset_filename, '/' ); - return $this->downloader->enqueue_if_not_exists( $url, $output_path ); + $enqueued = $this->downloader->enqueue_if_not_exists( $url, $output_path ); + if ( $enqueued ) { + $resource_id = $this->downloader->get_last_enqueued_resource_id(); + $entity_key = $this->entities_iterator->key(); + $this->frontloading_state[ $entity_key ]['active_downloads'][ $resource_id ] = true; + } + return $enqueued; } /** diff --git a/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php b/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php index f09ad00b6c..6a944eca93 100644 --- a/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php +++ b/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php @@ -349,8 +349,16 @@ public function pause() { // this to, e.g., remote HTTP byte sources. $upstream_state['offset_in_file'] = $this->entity_byte_offset; } + // @TODO: I don't like messing with internals of $paused_state. How else + // can we do this? We need more than just an offset to also restore + // XML breadcrumbs and the parsing context. Perhaps the cursor could + // be a class with a method such as "override_byte_offset(int $offset)"? + // Or resume() could have an optional argument? + $xml_state = $this->xml->pause(); + $xml_state['token_starts_at_in_current_chunk'] = 0; + $xml_state['upstream_bytes_forgotten'] = $this->entity_byte_offset; return array( - 'xml' => $this->xml->pause(), + 'xml' => $xml_state, 'upstream' => $upstream_state, 'last_post_id' => $this->last_post_id, 'last_comment_id' => $this->last_comment_id, @@ -369,6 +377,7 @@ public function resume( $paused_state ) { $this->xml->resume( $paused_state['xml'] ); $this->last_post_id = $paused_state['last_post_id']; $this->last_comment_id = $paused_state['last_comment_id']; + // @TODO: Should resume() call next_step() or just prepare the state? $this->next_entity(); } @@ -817,7 +826,10 @@ private function pull_upstream_bytes() { */ private function get_current_byte_offset() { $paused_xml_state = $this->xml->pause(); - return $paused_xml_state['token_byte_offset_in_the_input_stream']; + return ( + $paused_xml_state['token_starts_at_in_current_chunk'] + + $paused_xml_state['upstream_bytes_forgotten'] + ); } /** @@ -888,8 +900,8 @@ public function next(): void { $this->last_next_result = $this->next_entity(); } - public function key(): int { - return $this->entities_read_so_far - 1; + public function key(): string { + return sha1(json_encode($this->pause())); } public function valid(): bool { diff --git a/packages/playground/data-liberation/src/xml-api/WP_XML_Processor.php b/packages/playground/data-liberation/src/xml-api/WP_XML_Processor.php index 6055854192..5386b0ffd0 100644 --- a/packages/playground/data-liberation/src/xml-api/WP_XML_Processor.php +++ b/packages/playground/data-liberation/src/xml-api/WP_XML_Processor.php @@ -387,7 +387,7 @@ class WP_XML_Processor { protected $expecting_more_input = true; /** - * How many bytes from the original XML document have been read and parsed. + * How many bytes from the current XML chunk have been read and parsed. * * This value points to the latest byte offset in the input document which * has been already parsed. It is the internal cursor for the Tag Processor @@ -405,7 +405,7 @@ class WP_XML_Processor { * @since WP_VERSION * @var int */ - public $bytes_already_forgotten = 0; + public $upstream_bytes_forgotten = 0; /** * Byte offset in input document where current token starts. @@ -676,8 +676,8 @@ public static function create_for_streaming( $xml = '', $known_definite_encoding */ public function pause() { return array( - 'token_byte_offset_in_the_input_stream' => $this->bytes_already_forgotten + $this->token_starts_at, - 'bytes_already_forgotten' => $this->bytes_already_forgotten, + 'token_starts_at_in_current_chunk' => $this->token_starts_at, + 'upstream_bytes_forgotten' => $this->upstream_bytes_forgotten, 'parser_context' => $this->parser_context, 'stack_of_open_elements' => $this->stack_of_open_elements, 'expecting_more_input' => $this->expecting_more_input, @@ -689,8 +689,8 @@ public function pause() { * – Validate the paused state, return false if it's invalid. */ public function resume( $paused_state ) { - $this->bytes_already_parsed = 0; - $this->bytes_already_forgotten = $paused_state['bytes_already_forgotten']; + $this->bytes_already_parsed = $paused_state['token_starts_at_in_current_chunk']; + $this->upstream_bytes_forgotten = $paused_state['upstream_bytes_forgotten']; $this->stack_of_open_elements = $paused_state['stack_of_open_elements']; $this->parser_context = $paused_state['parser_context']; $this->expecting_more_input = $paused_state['expecting_more_input']; @@ -796,7 +796,7 @@ public function flush_processed_xml() { if ( null !== $this->text_starts_at ) { $this->text_starts_at -= $unreferenced_bytes; } - $this->bytes_already_forgotten += $unreferenced_bytes; + $this->upstream_bytes_forgotten += $unreferenced_bytes; return $flushed_bytes; } @@ -1560,7 +1560,7 @@ private function parse_next_tag() { */ if ( 0 === $at && - 0 === $this->bytes_already_forgotten && + 0 === $this->upstream_bytes_forgotten && ! $this->is_closing_tag && '?' === $xml[ $at + 1 ] && 'x' === $xml[ $at + 2 ] && @@ -3069,7 +3069,15 @@ private function step_in_element( $node_to_process = self::PROCESS_NEXT_NODE ) { return true; default: $this->last_error = self::ERROR_SYNTAX; - _doing_it_wrong( __METHOD__, 'Unexpected token type in element stage.', 'WP_VERSION' ); + _doing_it_wrong( + __METHOD__, + sprintf( + // translators: %1$s is the unexpected token type. + __( 'Unexpected token type "%1$s" in element stage.', 'data-liberation' ), + $this->get_token_type() + ), + 'WP_VERSION' + ); return false; } } From 86aef7586fa119e0b4d4758ab557b0d78a066427 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= <adam@adamziel.com> Date: Tue, 19 Nov 2024 17:45:50 +0100 Subject: [PATCH 06/12] Reentrant entity import --- .../data-liberation/src/import/WP_Stream_Importer.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php index beed4375ba..c0b4f17050 100644 --- a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php +++ b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php @@ -357,10 +357,9 @@ public function import_entities_step() { } /** - * @TODO: Advance the cursor to the next entity. * @TODO: Update the progress information. */ - + $this->entities_cursor = $this->entities_iterator->pause(); $this->entities_iterator->next(); } From 5dd32378fa4d999d3620264f73a5a80760a31b5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= <adam@adamziel.com> Date: Tue, 19 Nov 2024 17:46:16 +0100 Subject: [PATCH 07/12] Seek to the cursor in import_entities_step --- .../data-liberation/src/import/WP_Stream_Importer.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php index c0b4f17050..81e30e7f87 100644 --- a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php +++ b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php @@ -297,7 +297,9 @@ public function import_entities_step() { $factory = $this->entity_iterator_factory; $this->entities_iterator = $factory(); $this->importer = new WP_Entity_Importer(); - // @TODO: Seek to the last processed entity if we have a cursor. + if( $this->entities_cursor ) { + $this->entities_iterator->resume( $this->entities_cursor ); + } } if ( ! $this->entities_iterator->valid() ) { From 6ca3184c516e071cad5edda479eeb9ace0548ef5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= <adam@adamziel.com> Date: Wed, 20 Nov 2024 13:21:04 +0100 Subject: [PATCH 08/12] Lint --- .../src/byte-readers/WP_File_Reader.php | 6 +- .../src/import/WP_Attachment_Downloader.php | 65 ++++--------------- .../import/WP_Attachment_Downloader_Event.php | 15 +++++ .../src/import/WP_Stream_Importer.php | 26 ++++---- .../data-liberation/src/wxr/WP_WXR_Reader.php | 6 +- .../src/xml-api/WP_XML_Processor.php | 8 +-- 6 files changed, 52 insertions(+), 74 deletions(-) create mode 100644 packages/playground/data-liberation/src/import/WP_Attachment_Downloader_Event.php diff --git a/packages/playground/data-liberation/src/byte-readers/WP_File_Reader.php b/packages/playground/data-liberation/src/byte-readers/WP_File_Reader.php index 71256fda91..179372f3b3 100644 --- a/packages/playground/data-liberation/src/byte-readers/WP_File_Reader.php +++ b/packages/playground/data-liberation/src/byte-readers/WP_File_Reader.php @@ -9,7 +9,7 @@ class WP_File_Reader implements WP_Byte_Reader { protected $chunk_size; protected $file_pointer; protected $offset_in_file; - protected $output_bytes = ''; + protected $output_bytes = ''; protected $last_chunk_size = 0; protected $last_error; protected $state = self::STATE_STREAMING; @@ -37,7 +37,7 @@ public function resume( $paused_state ): bool { _doing_it_wrong( __METHOD__, 'Cannot resume a file reader that is already initialized.', '1.0.0' ); return false; } - $this->offset_in_file = $paused_state['offset_in_file']; + $this->offset_in_file = $paused_state['offset_in_file']; $this->last_chunk_size = 0; return true; } @@ -55,7 +55,7 @@ public function get_last_error(): string|null { } public function next_bytes(): bool { - $this->output_bytes = ''; + $this->output_bytes = ''; $this->last_chunk_size = 0; if ( $this->last_error || $this->is_finished() ) { return false; diff --git a/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php b/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php index 64e3f2cdc4..951a45e632 100644 --- a/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php +++ b/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php @@ -7,12 +7,12 @@ class WP_Attachment_Downloader { private $client; private $fps = array(); private $output_root; - private $output_paths = array(); + private $output_paths = array(); private $current_event; - private $pending_events = array(); + private $pending_events = array(); private $downloads_so_far = 0; - private $last_enqueued_resource_id; + private $enqueued_resource_id; public function __construct( $output_root ) { $this->client = new Client(); @@ -24,7 +24,7 @@ public function has_pending_requests() { } public function enqueue_if_not_exists( $url, $output_path ) { - $this->last_enqueued_resource_id = null; + $this->enqueued_resource_id = null; $output_path = $this->output_root . '/' . ltrim( $output_path, '/' ); if ( file_exists( $output_path ) ) { @@ -51,44 +51,30 @@ public function enqueue_if_not_exists( $url, $output_path ) { if ( false === $local_path ) { return false; } - + // Just copy the file over. // @TODO: think through the chmod of the created file. - $this->last_enqueued_resource_id = 'file:' . $this->downloads_so_far; - $this->pending_events[] = new WP_Attachment_Downloader_Event( - WP_Attachment_Downloader_Event::STARTED, - $url, - $output_path, - $this->last_enqueued_resource_id - ); - $success = copy( $local_path, $output_path ); - $this->pending_events[] = new WP_Attachment_Downloader_Event( + $this->enqueued_resource_id = 'file:' . $this->downloads_so_far; + $success = copy( $local_path, $output_path ); + $this->pending_events[] = new WP_Attachment_Downloader_Event( + $this->enqueued_resource_id, $success ? WP_Attachment_Downloader_Event::SUCCESS : WP_Attachment_Downloader_Event::FAILURE, - $url, - $output_path, - $this->last_enqueued_resource_id ); return true; case 'http': case 'https': $request = new Request( $url ); - $this->last_enqueued_resource_id = 'http:' . $request->id; + $this->enqueued_resource_id = 'http:' . $request->id; $this->output_paths[ $request->id ] = $output_path; - $this->pending_events[] = new WP_Attachment_Downloader_Event( - WP_Attachment_Downloader_Event::STARTED, - $url, - $output_path, - $this->last_enqueued_resource_id - ); $this->client->enqueue( $request ); return true; } return false; } - public function get_last_enqueued_resource_id() { - return $this->last_enqueued_resource_id; + public function get_enqueued_resource_id() { + return $this->enqueued_resource_id; } public function queue_full() { @@ -149,10 +135,8 @@ public function poll() { } } $this->pending_events[] = new WP_Attachment_Downloader_Event( + 'http:' . $original_request_id, WP_Attachment_Downloader_Event::FAILURE, - $request->original_request()->url, - $this->output_paths[ $original_request_id ], - 'http:' . $original_request_id ); unset( $this->output_paths[ $original_request_id ] ); break; @@ -171,10 +155,8 @@ public function poll() { } } $this->pending_events[] = new WP_Attachment_Downloader_Event( + 'http:' . $original_request_id, WP_Attachment_Downloader_Event::SUCCESS, - $request->original_request()->url, - $this->output_paths[ $original_request_id ], - 'http:' . $original_request_id ); unset( $this->output_paths[ $original_request_id ] ); } @@ -185,22 +167,3 @@ public function poll() { return true; } } - -class WP_Attachment_Downloader_Event { - - const STARTED = '#started'; - const SUCCESS = '#success'; - const FAILURE = '#failure'; - - public $type; - public $url; - public $target_path; - public $resource_id; - - public function __construct( $type, $url, $target_path, $resource_id ) { - $this->type = $type; - $this->url = $url; - $this->target_path = $target_path; - $this->resource_id = $resource_id; - } -} diff --git a/packages/playground/data-liberation/src/import/WP_Attachment_Downloader_Event.php b/packages/playground/data-liberation/src/import/WP_Attachment_Downloader_Event.php new file mode 100644 index 0000000000..b759ebb8fb --- /dev/null +++ b/packages/playground/data-liberation/src/import/WP_Attachment_Downloader_Event.php @@ -0,0 +1,15 @@ +<?php + +class WP_Attachment_Downloader_Event { + + const SUCCESS = '#success'; + const FAILURE = '#failure'; + + public $type; + public $resource_id; + + public function __construct( $resource_id, $type ) { + $this->resource_id = $resource_id; + $this->type = $type; + } +} diff --git a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php index 81e30e7f87..b4c8fd7915 100644 --- a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php +++ b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php @@ -79,7 +79,7 @@ public function pause() { } public function resume( $paused_state ) { - $this->state = $paused_state['state']; + $this->state = $paused_state['state']; $this->entities_cursor = $paused_state['entities_cursor']; // @TODO: Should resume() call next_step() or just prepare the state? $this->next_step(); @@ -163,7 +163,7 @@ public function advance_frontloading_cursor() { switch ( $event->type ) { case WP_Attachment_Downloader_Event::SUCCESS: case WP_Attachment_Downloader_Event::FAILURE: - foreach( $this->frontloading_state as $state ) { + foreach ( $this->frontloading_state as $state ) { $resource_id = $event->resource_id; unset( $state['active_downloads'][ $resource_id ] ); } @@ -171,9 +171,9 @@ public function advance_frontloading_cursor() { } } - while(count($this->frontloading_state) > 0) { + while ( count( $this->frontloading_state ) > 0 ) { $oldest_download_key = key( $this->frontloading_state ); - if( ! empty( $this->frontloading_state[ $oldest_download_key ]['active_downloads'] ) ) { + if ( ! empty( $this->frontloading_state[ $oldest_download_key ]['active_downloads'] ) ) { break; } // Advance the cursor to the next entity. @@ -193,7 +193,7 @@ public function frontload_assets_step() { if ( null === $this->entities_iterator ) { $factory = $this->entity_iterator_factory; $this->entities_iterator = $factory(); - if( $this->entities_cursor ) { + if ( $this->entities_cursor ) { $this->entities_iterator->resume( $this->entities_cursor ); } $this->downloader = new WP_Attachment_Downloader( $this->options ); @@ -225,7 +225,7 @@ public function frontload_assets_step() { * fix it later on. In a CLI-based Blueprint step importer scenario, we * might want to provide an "image not found" placeholder OR ignore the * failure. - * + * * @TODO: Update the download progress: * * After every downloaded file. * * For large files, every time a full megabyte is downloaded above 10MB. @@ -237,17 +237,17 @@ public function frontload_assets_step() { * Identify the static assets referenced in the current entity * and enqueue them for download. */ - $entity = $this->entities_iterator->current(); - $entity_key = $this->entities_iterator->key(); + $entity = $this->entities_iterator->current(); + $entity_key = $this->entities_iterator->key(); $this->frontloading_state[ $entity_key ] = array( 'key' => $entity_key, // @TODO: Consider making key() and pause() the same thing. // Then seek() and resume() would be the same, too! 'cursor' => $this->entities_iterator->pause(), - 'active_downloads' => array() + 'active_downloads' => array(), ); - $data = $entity->get_data(); + $data = $entity->get_data(); switch ( $entity->get_type() ) { case 'site_option': if ( $data['option_name'] === 'home' ) { @@ -297,7 +297,7 @@ public function import_entities_step() { $factory = $this->entity_iterator_factory; $this->entities_iterator = $factory(); $this->importer = new WP_Entity_Importer(); - if( $this->entities_cursor ) { + if ( $this->entities_cursor ) { $this->entities_iterator->resume( $this->entities_cursor ); } } @@ -310,7 +310,7 @@ public function import_entities_step() { return; } - $entity = $this->entities_iterator->current(); + $entity = $this->entities_iterator->current(); $attachments = array(); // Rewrite the URLs in the post. switch ( $entity->get_type() ) { @@ -373,7 +373,7 @@ protected function enqueue_attachment_download( string $raw_url, $context_path = $enqueued = $this->downloader->enqueue_if_not_exists( $url, $output_path ); if ( $enqueued ) { $resource_id = $this->downloader->get_last_enqueued_resource_id(); - $entity_key = $this->entities_iterator->key(); + $entity_key = $this->entities_iterator->key(); $this->frontloading_state[ $entity_key ]['active_downloads'][ $resource_id ] = true; } return $enqueued; diff --git a/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php b/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php index 6a944eca93..dc4a4f39f9 100644 --- a/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php +++ b/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php @@ -354,9 +354,9 @@ public function pause() { // XML breadcrumbs and the parsing context. Perhaps the cursor could // be a class with a method such as "override_byte_offset(int $offset)"? // Or resume() could have an optional argument? - $xml_state = $this->xml->pause(); + $xml_state = $this->xml->pause(); $xml_state['token_starts_at_in_current_chunk'] = 0; - $xml_state['upstream_bytes_forgotten'] = $this->entity_byte_offset; + $xml_state['upstream_bytes_forgotten'] = $this->entity_byte_offset; return array( 'xml' => $xml_state, 'upstream' => $upstream_state, @@ -901,7 +901,7 @@ public function next(): void { } public function key(): string { - return sha1(json_encode($this->pause())); + return sha1( json_encode( $this->pause() ) ); } public function valid(): bool { diff --git a/packages/playground/data-liberation/src/xml-api/WP_XML_Processor.php b/packages/playground/data-liberation/src/xml-api/WP_XML_Processor.php index 5386b0ffd0..c35e41c4ce 100644 --- a/packages/playground/data-liberation/src/xml-api/WP_XML_Processor.php +++ b/packages/playground/data-liberation/src/xml-api/WP_XML_Processor.php @@ -689,11 +689,11 @@ public function pause() { * – Validate the paused state, return false if it's invalid. */ public function resume( $paused_state ) { - $this->bytes_already_parsed = $paused_state['token_starts_at_in_current_chunk']; + $this->bytes_already_parsed = $paused_state['token_starts_at_in_current_chunk']; $this->upstream_bytes_forgotten = $paused_state['upstream_bytes_forgotten']; - $this->stack_of_open_elements = $paused_state['stack_of_open_elements']; - $this->parser_context = $paused_state['parser_context']; - $this->expecting_more_input = $paused_state['expecting_more_input']; + $this->stack_of_open_elements = $paused_state['stack_of_open_elements']; + $this->parser_context = $paused_state['parser_context']; + $this->expecting_more_input = $paused_state['expecting_more_input']; $this->next_token(); } From 5466cb9a5c569a0e35032fe2c34299072b64b6f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= <adam@adamziel.com> Date: Wed, 20 Nov 2024 16:37:49 +0100 Subject: [PATCH 09/12] Reentrancy API v1: get_reentrancy_cursor()/create() methods, and tell()/seek() methods --- .../playground/data-liberation/plugin.php | 81 ++++---- .../src/byte-readers/WP_Byte_Reader.php | 4 +- .../src/byte-readers/WP_File_Reader.php | 26 ++- .../byte-readers/WP_Remote_File_Reader.php | 28 ++- .../src/import/WP_Markdown_Importer.php | 8 - .../src/import/WP_Stream_Importer.php | 196 +++++++++--------- .../WP_Markdown_Directory_Tree_Reader.php | 3 + .../data-liberation/src/wxr/WP_WXR_Reader.php | 119 ++++++----- .../src/xml-api/WP_XML_Processor.php | 103 ++++++--- 9 files changed, 311 insertions(+), 257 deletions(-) diff --git a/packages/playground/data-liberation/plugin.php b/packages/playground/data-liberation/plugin.php index 5337e318d8..6251d2c8e8 100644 --- a/packages/playground/data-liberation/plugin.php +++ b/packages/playground/data-liberation/plugin.php @@ -8,13 +8,13 @@ /** * Don't run KSES on the attribute values during the import. - * + * * Without this filter, WP_HTML_Tag_Processor::set_attribute() will * assume the value is a URL and run KSES on it, which will incorrectly * prefix relative paths with http://. - * + * * For example: - * + * * > $html = new WP_HTML_Tag_Processor( '<img>' ); * > $html->next_tag(); * > $html->set_attribute( 'src', './_assets/log-errors.png' ); @@ -30,34 +30,37 @@ * @TODO: Remove this in favor of a CLI command. */ add_action('init', function() { - echo '<plaintext>'; + return; $wxr_path = __DIR__ . '/tests/fixtures/wxr-simple.xml'; - $entity_iterator_factory = function() use ($wxr_path) { - $wxr = new WP_WXR_Reader(); - $wxr->connect_upstream(new WP_File_Reader($wxr_path)); - - return $wxr; + $entity_iterator_factory = function($cursor=null) use ($wxr_path) { + return WP_WXR_Reader::create( + new WP_File_Reader($wxr_path), + $cursor + ); }; $importer = WP_Stream_Importer::create( $entity_iterator_factory ); + while($importer->next_step()) { + // ... + } + return; $importer->next_step(); - $importer->next_step(); - $importer->next_step(); - $importer->next_step(); - $importer->next_step(); - $importer->next_step(); - $paused_importer_state = $importer->pause(); + $paused_importer_state = $importer->get_reentrancy_cursor(); + + echo "\n\n"; + echo "moving to importer2\n"; + echo "\n\n"; $importer2 = WP_Stream_Importer::create( - $entity_iterator_factory + $entity_iterator_factory, + array(), + $paused_importer_state ); - $importer2->resume($paused_importer_state); $importer2->next_step(); $importer2->next_step(); $importer2->next_step(); // $importer2->next_step(); - // var_dump($importer2); die("YAY"); @@ -124,7 +127,7 @@ function data_liberation_admin_page() { data_liberation_process_import(); echo '</pre>'; } - + ?> <h2>Active import</h2> <?php @@ -186,9 +189,9 @@ function data_liberation_admin_page() { > <?php wp_nonce_field('data_liberation_import'); ?> <input type="hidden" name="action" value="data_liberation_import"> - + <h2>Import Content</h2> - + <table class="form-table"> <tr> <th scope="row">Import Type</th> @@ -213,7 +216,7 @@ function data_liberation_admin_page() { </label> </td> </tr> - + <tr data-wp-context='{ "importType": "wxr_file" }' data-wp-class--hidden="!state.isImportTypeSelected"> <th scope="row">WXR File</th> @@ -222,7 +225,7 @@ function data_liberation_admin_page() { <p class="description">Upload a WordPress eXtended RSS (WXR) file</p> </td> </tr> - + <tr data-wp-context='{ "importType": "wxr_url" }' data-wp-class--hidden="!state.isImportTypeSelected"> <th scope="row">WXR URL</th> @@ -231,7 +234,7 @@ function data_liberation_admin_page() { <p class="description">Enter the URL of a WXR file</p> </td> </tr> - + <tr data-wp-context='{ "importType": "markdown_zip" }' data-wp-class--hidden="!state.isImportTypeSelected"> <th scope="row">Markdown ZIP</th> @@ -248,7 +251,7 @@ function data_liberation_admin_page() { <h2>Previous Imports</h2> <p>TODO: Show a table of previous imports.</p> - + <table class="form-table"> <tr> <th scope="row">Date</th> @@ -367,7 +370,7 @@ function data_liberation_admin_page() { */ // if(is_wp_error(wp_schedule_event(time(), 'data_liberation_minute', 'data_liberation_process_import'))) { // wp_delete_attachment($attachment_id, true); - // // @TODO: More user friendly error message – maybe redirect back to the import screen and + // // @TODO: More user friendly error message – maybe redirect back to the import screen and // // show the error there. // wp_die('Failed to schedule import – the "data_liberation_minute" schedule may not be registered.'); // } @@ -409,11 +412,11 @@ function data_liberation_create_importer($import) { // @TODO: Save the error, report it to the user. return; } - $entity_iterator_factory = function() use ($wxr_path) { - $wxr = new WP_WXR_Reader(); - $wxr->connect_upstream(new WP_File_Reader($wxr_path)); - - return $wxr; + $entity_iterator_factory = function($cursor=null) use ($wxr_path) { + return WP_WXR_Reader::create( + new WP_File_Reader($wxr_path), + $cursor + ); }; return WP_Stream_Importer::create( $entity_iterator_factory @@ -421,10 +424,11 @@ function data_liberation_create_importer($import) { case 'wxr_url': $wxr_url = $import['wxr_url']; - $entity_iterator_factory = function() use ($wxr_url) { - $wxr = new WP_WXR_Reader(); - $wxr->connect_upstream(new WP_Remote_File_Reader($wxr_url)); - return $wxr; + $entity_iterator_factory = function($cursor=null) use ($wxr_url) { + return WP_WXR_Reader::create( + new WP_Remote_File_Reader($wxr_url), + $cursor + ); }; return WP_Stream_Importer::create( $entity_iterator_factory @@ -446,7 +450,10 @@ function data_liberation_create_importer($import) { } } $markdown_root = $temp_dir; - $entity_iterator_factory = function() use ($markdown_root) { + $entity_iterator_factory = function($cursor=null) use ($markdown_root) { + if(null !== $cursor) { + throw new \Exception('Resuming Markdown imports is not supported yet.'); + } return new WP_Markdown_Directory_Tree_Reader( $markdown_root, 1000 @@ -460,4 +467,4 @@ function data_liberation_create_importer($import) { ] ); } -} \ No newline at end of file +} diff --git a/packages/playground/data-liberation/src/byte-readers/WP_Byte_Reader.php b/packages/playground/data-liberation/src/byte-readers/WP_Byte_Reader.php index efc80bffd9..6d8bd247db 100644 --- a/packages/playground/data-liberation/src/byte-readers/WP_Byte_Reader.php +++ b/packages/playground/data-liberation/src/byte-readers/WP_Byte_Reader.php @@ -1,8 +1,8 @@ <?php interface WP_Byte_Reader { - public function pause(): array|bool; - public function resume( $paused_state ): bool; + public function tell(): int; + public function seek( int $offset ): bool; public function is_finished(): bool; public function next_bytes(): bool; public function get_bytes(): string|null; diff --git a/packages/playground/data-liberation/src/byte-readers/WP_File_Reader.php b/packages/playground/data-liberation/src/byte-readers/WP_File_Reader.php index 179372f3b3..f432c3ba7a 100644 --- a/packages/playground/data-liberation/src/byte-readers/WP_File_Reader.php +++ b/packages/playground/data-liberation/src/byte-readers/WP_File_Reader.php @@ -19,25 +19,23 @@ public function __construct( $file_path, $chunk_size = 8096 ) { $this->chunk_size = $chunk_size; } - /** - * Really these are just `tell()` and `seek()` operations, only the state is more - * involved than a simple offset. Hmm. - */ - public function pause(): array|bool { - return array( - // Save the previous offset, not the current one. - // This way, after resuming, the next read will yield the same $output_bytes - // as we have now. - 'offset_in_file' => $this->offset_in_file - $this->last_chunk_size, - ); + public function tell(): int { + // Save the previous offset, not the current one. + // This way, after resuming, the next read will yield the same $output_bytes + // as we have now. + return $this->offset_in_file - $this->last_chunk_size; } - public function resume( $paused_state ): bool { + public function seek( $offset_in_file ): bool { + if ( ! is_int( $offset_in_file ) ) { + _doing_it_wrong( __METHOD__, 'Cannot set a file reader cursor to a non-integer offset.', '1.0.0' ); + return false; + } if ( $this->file_pointer ) { - _doing_it_wrong( __METHOD__, 'Cannot resume a file reader that is already initialized.', '1.0.0' ); + _doing_it_wrong( __METHOD__, 'Cannot set a file reader cursor on a file reader that is already initialized.', '1.0.0' ); return false; } - $this->offset_in_file = $paused_state['offset_in_file']; + $this->offset_in_file = $offset_in_file; $this->last_chunk_size = 0; return true; } diff --git a/packages/playground/data-liberation/src/byte-readers/WP_Remote_File_Reader.php b/packages/playground/data-liberation/src/byte-readers/WP_Remote_File_Reader.php index e9e19ba0b1..819480d8fe 100644 --- a/packages/playground/data-liberation/src/byte-readers/WP_Remote_File_Reader.php +++ b/packages/playground/data-liberation/src/byte-readers/WP_Remote_File_Reader.php @@ -22,6 +22,19 @@ public function __construct( $url ) { $this->url = $url; } + public function tell(): int { + return $this->bytes_already_read + $this->skip_bytes; + } + + public function seek( $offset_in_file ): bool { + if ( $this->request ) { + _doing_it_wrong( __METHOD__, 'Cannot set a remote file reader cursor on a remote file reader that is already initialized.', '1.0.0' ); + return false; + } + $this->skip_bytes = $offset_in_file; + return true; + } + public function next_bytes(): bool { if ( null === $this->request ) { $this->request = new WordPress\AsyncHttp\Request( @@ -90,21 +103,6 @@ public function get_bytes(): string|null { return $this->current_chunk; } - public function pause(): array|bool { - return array( - 'offset_in_file' => $this->bytes_already_read + $this->skip_bytes, - ); - } - - public function resume( $paused_state ): bool { - if ( $this->request ) { - _doing_it_wrong( __METHOD__, 'Cannot resume a remote file reader that is already initialized.', '1.0.0' ); - return false; - } - $this->skip_bytes = $paused_state['offset_in_file']; - return true; - } - public function is_finished(): bool { return $this->is_finished; } diff --git a/packages/playground/data-liberation/src/import/WP_Markdown_Importer.php b/packages/playground/data-liberation/src/import/WP_Markdown_Importer.php index f4551a35a0..d1aee0fb0d 100644 --- a/packages/playground/data-liberation/src/import/WP_Markdown_Importer.php +++ b/packages/playground/data-liberation/src/import/WP_Markdown_Importer.php @@ -2,14 +2,6 @@ class WP_Markdown_Importer extends WP_Stream_Importer { - public static function create( - $entity_iterator_factory, - $options = array() - ) { - $options = static::parse_options( $options ); - return new WP_Markdown_Importer( $entity_iterator_factory, $options ); - } - protected static function parse_options( $options ) { if ( ! isset( $options['source_site_url'] ) ) { _doing_it_wrong( __METHOD__, 'The source_site_url option is required.', '__WP_VERSION__' ); diff --git a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php index b4c8fd7915..ffaaa240ff 100644 --- a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php +++ b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php @@ -13,8 +13,9 @@ * too slow for large datasets. * * @TODO: - * * Re-entrant import via storing state on error, pausing, and resuming. - * * Idempotent import. + * ✅ Re-entrant import + * * Log errors for the user to decide what to do. + * * Log progress information. * * Error out if `source_site_url` is not set by the time we're processing * the first encountered URL. * * Disable anything remotely related to KSES during the import. KSES @@ -32,7 +33,7 @@ class WP_Stream_Importer { /** * Populated from the WXR file's <wp:base_blog_url> tag. */ - protected $source_site_url; + private $source_site_url; private $entity_iterator_factory; /** * @param array|string|null $query { @@ -42,50 +43,70 @@ class WP_Stream_Importer { * after the import. E.g. http://127.0.0.1:9400/wp-content/uploads/ * } */ - protected $options; + private $options; - const STATE_INITIAL = '#initial'; - const STATE_TOPOLOGICAL_SORT = '#topological_sort'; - const STATE_FRONTLOAD_ASSETS = '#frontload_assets'; - const STATE_IMPORT_ENTITIES = '#import_entities'; - const STATE_FINISHED = '#finished'; + const STAGE_INITIAL = '#initial'; + const STAGE_TOPOLOGICAL_SORT = '#topological_sort'; + const STAGE_FRONTLOAD_ASSETS = '#frontload_assets'; + const STAGE_IMPORT_ENTITIES = '#import_entities'; + const STAGE_FINISHED = '#finished'; /** * The current state of the import process. + * @var string */ - private $state = self::STATE_INITIAL; + private $stage = self::STAGE_INITIAL; /** * Iterator that streams entities to import. */ private $entities_iterator; - private $frontloading_state = array(); - public $entities_cursor; + private $resume_at_entity; + /** + * A map of currently downloaded resources for each entity in + * the following format: + * + * [$entity_cursor => [$resource_id => true]] + * + * @var array<string,array<string,bool>> + */ + private $active_downloads = array(); private $downloader; public static function create( $entity_iterator_factory, - $options = array() + $options = array(), + $cursor = null ) { - $options = static::parse_options( $options ); - return new WP_Stream_Importer( $entity_iterator_factory, $options ); + $options = static::parse_options( $options ); + $importer = new WP_Stream_Importer( $entity_iterator_factory, $options ); + if ( null !== $cursor && true !== $importer->initialize_from_cursor( $cursor ) ) { + return false; + } + return $importer; } - public function pause() { - return array( - 'state' => $this->state, - 'entities_cursor' => $this->entities_cursor, + public function get_reentrancy_cursor() { + return json_encode( + array( + 'state' => $this->stage, + 'resume_at_entity' => $this->resume_at_entity, + ) ); } - public function resume( $paused_state ) { - $this->state = $paused_state['state']; - $this->entities_cursor = $paused_state['entities_cursor']; - // @TODO: Should resume() call next_step() or just prepare the state? - $this->next_step(); + private function initialize_from_cursor( $cursor ) { + $cursor = json_decode( $cursor, true ); + if ( ! is_array( $cursor ) ) { + _doing_it_wrong( __METHOD__, 'Cannot resume an importer with a non-array cursor.', '1.0.0' ); + return false; + } + $this->stage = $cursor['state']; + $this->resume_at_entity = $cursor['resume_at_entity']; + return true; } - protected static function parse_options( $options ) { + private static function parse_options( $options ) { if ( ! isset( $options['new_site_url'] ) ) { $options['new_site_url'] = get_site_url(); } @@ -105,7 +126,7 @@ protected static function parse_options( $options ) { return $options; } - protected function __construct( + private function __construct( $entity_iterator_factory, $options = array() ) { @@ -125,60 +146,60 @@ protected function __construct( private $importer; public function next_step() { - switch ( $this->state ) { - case self::STATE_INITIAL: - $this->state = self::STATE_TOPOLOGICAL_SORT; + switch ( $this->stage ) { + case self::STAGE_INITIAL: + $this->stage = self::STAGE_TOPOLOGICAL_SORT; return true; - case self::STATE_TOPOLOGICAL_SORT: + case self::STAGE_TOPOLOGICAL_SORT: // @TODO: Topologically sort the entities. - $this->state = self::STATE_FRONTLOAD_ASSETS; + $this->stage = self::STAGE_FRONTLOAD_ASSETS; return true; - case self::STATE_FRONTLOAD_ASSETS: - $this->frontload_assets_step(); + case self::STAGE_FRONTLOAD_ASSETS: + $this->next_frontloading_step(); return true; - case self::STATE_IMPORT_ENTITIES: - $this->import_entities_step(); + case self::STAGE_IMPORT_ENTITIES: + $this->import_next_entity(); return true; - case self::STATE_FINISHED: + case self::STAGE_FINISHED: return false; } } - public function advance_frontloading_cursor() { - /** - * Advance the cursor to the oldest finished download. For example: - * - * * We've started downloading files A, B, C, and D in this order. - * * D is the first to finish. We don't do anything yet. - * * A finishes next. We advance the cursor to A. - * * C finishes next. We don't do anything. - * * Then we pause. - * - * When we resume, we'll start where we left off, which is after A. The - * downloader will enqueue B for download and will skip C and D since - * the relevant files already exist in the filesystem. - */ + /** + * Advance the cursor to the oldest finished download. For example: + * + * * We've started downloading files A, B, C, and D in this order. + * * D is the first to finish. We don't do anything yet. + * * A finishes next. We advance the cursor to A. + * * C finishes next. We don't do anything. + * * Then we pause. + * + * When we resume, we'll start where we left off, which is after A. The + * downloader will enqueue B for download and will skip C and D since + * the relevant files already exist in the filesystem. + */ + private function frontloading_advance_reentrancy_cursor() { while ( $this->downloader->next_event() ) { $event = $this->downloader->get_event(); switch ( $event->type ) { case WP_Attachment_Downloader_Event::SUCCESS: case WP_Attachment_Downloader_Event::FAILURE: - foreach ( $this->frontloading_state as $state ) { - $resource_id = $event->resource_id; - unset( $state['active_downloads'][ $resource_id ] ); + foreach ( array_keys( $this->active_downloads ) as $entity_cursor ) { + unset( $this->active_downloads[ $entity_cursor ][ $event->resource_id ] ); } break; } } - while ( count( $this->frontloading_state ) > 0 ) { - $oldest_download_key = key( $this->frontloading_state ); - if ( ! empty( $this->frontloading_state[ $oldest_download_key ]['active_downloads'] ) ) { + while ( count( $this->active_downloads ) > 0 ) { + $oldest_download_cursor = key( $this->active_downloads ); + $downloads_completed = empty( $this->active_downloads[ $oldest_download_cursor ] ); + if ( ! $downloads_completed ) { break; } // Advance the cursor to the next entity. - $this->entities_cursor = $this->frontloading_state[ $oldest_download_key ]['cursor']; - unset( $this->frontloading_state[ $oldest_download_key ] ); + $this->resume_at_entity = $oldest_download_cursor; + unset( $this->active_downloads[ $oldest_download_cursor ] ); } } @@ -189,28 +210,26 @@ public function advance_frontloading_cursor() { * before import_entities() so that every inserted post already has * all its attachments downloaded. */ - public function frontload_assets_step() { + private function next_frontloading_step() { if ( null === $this->entities_iterator ) { $factory = $this->entity_iterator_factory; - $this->entities_iterator = $factory(); - if ( $this->entities_cursor ) { - $this->entities_iterator->resume( $this->entities_cursor ); - } - $this->downloader = new WP_Attachment_Downloader( $this->options ); + $this->entities_iterator = $factory( $this->resume_at_entity ); + $this->downloader = new WP_Attachment_Downloader( $this->options ); } - $this->advance_frontloading_cursor(); + $this->frontloading_advance_reentrancy_cursor(); // We're done if all the entities are processed and all the downloads are finished. if ( ! $this->entities_iterator->valid() && ! $this->downloader->has_pending_requests() ) { // This is an assertion to make double sure we're emptying the state queue. - if ( ! empty( $this->frontloading_state ) ) { + if ( ! empty( $this->active_downloads ) ) { _doing_it_wrong( __METHOD__, 'Frontloading queue is not empty.', '1.0' ); } - $this->state = self::STATE_IMPORT_ENTITIES; - $this->downloader = null; - $this->frontloading_state = array(); - $this->entities_iterator = null; + $this->stage = self::STAGE_IMPORT_ENTITIES; + $this->downloader = null; + $this->active_downloads = array(); + $this->entities_iterator = null; + $this->resume_at_entity = null; return false; } @@ -237,15 +256,9 @@ public function frontload_assets_step() { * Identify the static assets referenced in the current entity * and enqueue them for download. */ - $entity = $this->entities_iterator->current(); - $entity_key = $this->entities_iterator->key(); - $this->frontloading_state[ $entity_key ] = array( - 'key' => $entity_key, - // @TODO: Consider making key() and pause() the same thing. - // Then seek() and resume() would be the same, too! - 'cursor' => $this->entities_iterator->pause(), - 'active_downloads' => array(), - ); + $entity = $this->entities_iterator->current(); + $cursor = $this->entities_iterator->get_reentrancy_cursor(); + $this->active_downloads[ $cursor ] = array(); $data = $entity->get_data(); switch ( $entity->get_type() ) { @@ -281,7 +294,7 @@ public function frontload_assets_step() { // Move on to the next entity. $this->entities_iterator->next(); - $this->advance_frontloading_cursor(); + $this->frontloading_advance_reentrancy_cursor(); return true; } @@ -292,19 +305,16 @@ public function frontload_assets_step() { * large datasets, but maybe it could be a choice for * the API consumer? */ - public function import_entities_step() { + private function import_next_entity() { if ( null === $this->entities_iterator ) { $factory = $this->entity_iterator_factory; - $this->entities_iterator = $factory(); + $this->entities_iterator = $factory( $this->resume_at_entity ); $this->importer = new WP_Entity_Importer(); - if ( $this->entities_cursor ) { - $this->entities_iterator->resume( $this->entities_cursor ); - } } if ( ! $this->entities_iterator->valid() ) { // We're done. - $this->state = self::STATE_FINISHED; + $this->stage = self::STAGE_FINISHED; $this->entities_iterator = null; $this->importer = null; return; @@ -361,20 +371,20 @@ public function import_entities_step() { /** * @TODO: Update the progress information. */ - $this->entities_cursor = $this->entities_iterator->pause(); + $this->resume_at_entity = $this->entities_iterator->get_reentrancy_cursor(); $this->entities_iterator->next(); } - protected function enqueue_attachment_download( string $raw_url, $context_path = null ) { + private function enqueue_attachment_download( string $raw_url, $context_path = null ) { $url = $this->rewrite_attachment_url( $raw_url, $context_path ); $asset_filename = $this->new_asset_filename( $raw_url ); $output_path = $this->options['uploads_path'] . '/' . ltrim( $asset_filename, '/' ); $enqueued = $this->downloader->enqueue_if_not_exists( $url, $output_path ); if ( $enqueued ) { - $resource_id = $this->downloader->get_last_enqueued_resource_id(); - $entity_key = $this->entities_iterator->key(); - $this->frontloading_state[ $entity_key ]['active_downloads'][ $resource_id ] = true; + $resource_id = $this->downloader->get_last_enqueued_resource_id(); + $entity_cursor = $this->entities_iterator->get_reentrancy_cursor(); + $this->active_downloads[ $entity_cursor ][ $resource_id ] = true; } return $enqueued; } @@ -426,7 +436,7 @@ private function new_asset_filename( string $raw_asset_url ) { return $filename; } - protected function rewrite_attachment_url( string $raw_url, $context_path = null ) { + private function rewrite_attachment_url( string $raw_url, $context_path = null ) { if ( WP_URL::can_parse( $raw_url ) ) { // Absolute URL, nothing to do. return $raw_url; @@ -449,7 +459,7 @@ protected function rewrite_attachment_url( string $raw_url, $context_path = null * @TODO: How can we process the videos? * @TODO: What other asset types are there? */ - protected function url_processor_matched_asset_url( WP_Block_Markup_Url_Processor $p ) { + private function url_processor_matched_asset_url( WP_Block_Markup_Url_Processor $p ) { return ( $p->get_tag() === 'IMG' && $p->get_inspected_attribute_name() === 'src' && diff --git a/packages/playground/data-liberation/src/markdown-api/WP_Markdown_Directory_Tree_Reader.php b/packages/playground/data-liberation/src/markdown-api/WP_Markdown_Directory_Tree_Reader.php index b8695096df..da7514eb89 100644 --- a/packages/playground/data-liberation/src/markdown-api/WP_Markdown_Directory_Tree_Reader.php +++ b/packages/playground/data-liberation/src/markdown-api/WP_Markdown_Directory_Tree_Reader.php @@ -4,6 +4,9 @@ * * This exploration accompanies the WXR reader to inform a generic * data importing pipeline that's not specific to a single input format. + * + * @TODO: Support multiple data sources – filesystem directory tree, zip file, ... + * @TODO: Expose a cursor to allow resuming from where we left off. */ class WP_Markdown_Directory_Tree_Reader implements Iterator { diff --git a/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php b/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php index dc4a4f39f9..25e716256c 100644 --- a/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php +++ b/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php @@ -342,43 +342,42 @@ class WP_WXR_Reader implements Iterator { ), ); - public function pause() { - $upstream_state = $this->upstream ? $this->upstream->pause() : null; - if ( $upstream_state ) { - // @TODO: Don't assume this specific key name. Find a way to generalize - // this to, e.g., remote HTTP byte sources. - $upstream_state['offset_in_file'] = $this->entity_byte_offset; + public static function create( WP_Byte_Reader $upstream = null, $cursor = null ) { + $xml_cursor = null; + if ( null !== $cursor ) { + $cursor = json_decode( $cursor, true ); + if ( false === $cursor ) { + _doing_it_wrong( + __METHOD__, + 'Invalid cursor provided for WP_WXR_Reader::create().', + null + ); + return false; + } + $xml_cursor = $cursor['xml']; } - // @TODO: I don't like messing with internals of $paused_state. How else - // can we do this? We need more than just an offset to also restore - // XML breadcrumbs and the parsing context. Perhaps the cursor could - // be a class with a method such as "override_byte_offset(int $offset)"? - // Or resume() could have an optional argument? - $xml_state = $this->xml->pause(); - $xml_state['token_starts_at_in_current_chunk'] = 0; - $xml_state['upstream_bytes_forgotten'] = $this->entity_byte_offset; - return array( - 'xml' => $xml_state, - 'upstream' => $upstream_state, - 'last_post_id' => $this->last_post_id, - 'last_comment_id' => $this->last_comment_id, - ); - } - public function resume( $paused_state ) { - // @TODO: Validate the paused state. - if ( $paused_state['upstream'] ) { - if ( ! $this->upstream ) { - // @TODO: _doing_it_wrong() - return false; + $xml = WP_XML_Processor::create_for_streaming( '', $xml_cursor ); + $reader = new WP_WXR_Reader( $xml ); + if ( null !== $cursor ) { + $reader->last_post_id = $cursor['last_post_id']; + $reader->last_comment_id = $cursor['last_comment_id']; + } + if ( null !== $upstream ) { + $reader->connect_upstream( $upstream ); + if ( null !== $cursor ) { + if ( ! isset( $cursor['upstream'] ) ) { + _doing_it_wrong( + __METHOD__, + 'Invalid cursor provided for WP_WXR_Reader::create(). The upstream offset was missing.', + null + ); + return false; + } + $upstream->seek( $cursor['upstream'] ); } - $this->upstream->resume( $paused_state['upstream'] ); } - $this->xml->resume( $paused_state['xml'] ); - $this->last_post_id = $paused_state['last_post_id']; - $this->last_comment_id = $paused_state['last_comment_id']; - // @TODO: Should resume() call next_step() or just prepare the state? - $this->next_entity(); + return $reader; } /** @@ -388,8 +387,31 @@ public function resume( $paused_state ) { * * @param WP_XML_Processor $xml The XML processor to use. */ - public function __construct() { - $this->xml = WP_XML_Processor::create_for_streaming(); + protected function __construct( WP_XML_Processor $xml ) { + $this->xml = $xml; + } + + public function get_reentrancy_cursor() { + /** + * @TODO: Instead of adjusting the XML cursor internals, adjust the get_reentrancy_cursor() + * call to support $bookmark_name, e.g. $this->xml->get_reentrancy_cursor( 'last_entity' ); + * If the cursor internal data was a part of every bookmark, this would have worked + * even after evicting the actual bytes where $last_entity is stored. + */ + $xml_cursor = $this->xml->get_reentrancy_cursor(); + $xml_cursor = json_decode( base64_decode( $xml_cursor ), true ); + $xml_cursor['token_starts_at_in_current_chunk'] = 0; + $xml_cursor['upstream_bytes_forgotten'] = $this->entity_byte_offset; + $xml_cursor = base64_encode( json_encode( $xml_cursor ) ); + return json_encode( + array( + 'xml' => $xml_cursor, + // WP_Byte_Reader cursors are always integer byte offsets in the stream. + 'upstream' => $this->entity_byte_offset, + 'last_post_id' => $this->last_post_id, + 'last_comment_id' => $this->last_comment_id, + ) + ); } /** @@ -620,7 +642,7 @@ private function read_next_entity() { if ( $this->xml->is_tag_opener() ) { $this->set_entity_tag( $tag ); if ( array_key_exists( $this->xml->get_tag(), static::KNOWN_ENITIES ) ) { - $this->entity_byte_offset = $this->get_current_byte_offset(); + $this->entity_byte_offset = $this->xml->get_token_byte_offset_in_the_input_stream(); } } continue; @@ -676,7 +698,7 @@ private function read_next_entity() { array_key_exists( $this->xml->get_tag(), static::KNOWN_SITE_OPTIONS ) ); if ( $is_site_option_opener ) { - $this->entity_byte_offset = $this->get_current_byte_offset(); + $this->entity_byte_offset = $this->xml->get_token_byte_offset_in_the_input_stream(); } continue; } @@ -817,21 +839,6 @@ private function pull_upstream_bytes() { return true; } - /** - * Returns current's XML token offset in the input stream. - * - * @since WP_VERSION - * - * @return int The current byte offset. - */ - private function get_current_byte_offset() { - $paused_xml_state = $this->xml->pause(); - return ( - $paused_xml_state['token_starts_at_in_current_chunk'] + - $paused_xml_state['upstream_bytes_forgotten'] - ); - } - /** * Marks the current entity as emitted and updates tracking variables. * @@ -901,7 +908,7 @@ public function next(): void { } public function key(): string { - return sha1( json_encode( $this->pause() ) ); + return $this->get_reentrancy_cursor(); } public function valid(): bool { @@ -909,6 +916,10 @@ public function valid(): bool { } public function rewind(): void { - // noop + _doing_it_wrong( + __METHOD__, + 'WP_WXR_Reader does not support rewinding.', + null + ); } } diff --git a/packages/playground/data-liberation/src/xml-api/WP_XML_Processor.php b/packages/playground/data-liberation/src/xml-api/WP_XML_Processor.php index c35e41c4ce..de89856c25 100644 --- a/packages/playground/data-liberation/src/xml-api/WP_XML_Processor.php +++ b/packages/playground/data-liberation/src/xml-api/WP_XML_Processor.php @@ -22,8 +22,7 @@ * starting with 1.0, however, because most that's what most WXR * files declare. * - * @TODO: Scrutinize pause() and resume() methods. Can we avoid exposing string - * indices and other internal state? + * @TODO: Include the cursor string in internal bookmarks and use it for seeking. * * @TODO: Track specific error states, expose informative messages, line * numbers, indexes, and other debugging info. @@ -647,54 +646,90 @@ class WP_XML_Processor { /** * */ - public static function create_from_string( $xml, $known_definite_encoding = 'UTF-8' ) { - if ( 'UTF-8' !== $known_definite_encoding ) { - return null; + public static function create_from_string( $xml, $cursor = null, $known_definite_encoding = 'UTF-8' ) { + $processor = static::create_for_streaming( $xml, $cursor, $known_definite_encoding ); + if ( null === $processor ) { + return false; } - - $processor = new WP_XML_Processor( $xml, self::CONSTRUCTOR_UNLOCK_CODE ); $processor->input_finished(); return $processor; } - public static function create_for_streaming( $xml = '', $known_definite_encoding = 'UTF-8' ) { + public static function create_for_streaming( $xml = '', $cursor = null, $known_definite_encoding = 'UTF-8' ) { if ( 'UTF-8' !== $known_definite_encoding ) { - return null; + return false; + } + $processor = new WP_XML_Processor( $xml, self::CONSTRUCTOR_UNLOCK_CODE ); + if ( null !== $cursor && true !== $processor->initialize_from_cursor( $cursor ) ) { + return false; } - return new WP_XML_Processor( $xml, self::CONSTRUCTOR_UNLOCK_CODE ); + return $processor; } /** - * Pauses the processor and returns an array of the information needed to resume. + * Returns a re-entrancy cursor – it's a string that can instruct a new XML + * Processor instance to continue parsing from the current location in the + * document. * - * @TODO: - * – What to do with bookmarks when pausing? - * – Consider including all the below information in internal bookmarks. Consider using a logic - * similar to resume() in seek(). - * – Consider a WP_XML_Processor_Paused_State or a WP_XML_Processor_Bookmark class. - * – Should we flush the enqueued lexical updates first? - */ - public function pause() { - return array( - 'token_starts_at_in_current_chunk' => $this->token_starts_at, - 'upstream_bytes_forgotten' => $this->upstream_bytes_forgotten, - 'parser_context' => $this->parser_context, - 'stack_of_open_elements' => $this->stack_of_open_elements, - 'expecting_more_input' => $this->expecting_more_input, + * The only stable part of this API is the return type of string. The consumer + * of this method MUST NOT assume any specific structure of the returned + * string. It will change without a warning between WordPress releases. + * + * This is not a tell() API. No XML Processor method will accept the cursor + * to move to another location. The only way to use this cursor is creating + * a new XML Processor instance. If you need to move around the document, use + * `set_bookmark()` and `seek()`. + */ + public function get_reentrancy_cursor() { + return base64_encode( + json_encode( + array( + 'token_starts_at_in_current_chunk' => $this->token_starts_at, + 'upstream_bytes_forgotten' => $this->upstream_bytes_forgotten, + 'parser_context' => $this->parser_context, + 'stack_of_open_elements' => $this->stack_of_open_elements, + 'expecting_more_input' => $this->expecting_more_input, + ) + ) ); } /** - * @TODO: - * – Validate the paused state, return false if it's invalid. + * Returns the byte offset in the input stream where the current token starts. + * + * You should probably not use this method. + * + * It's only exists to allow resuming the input stream at the same offset where + * the XML parsing was finished. It will never expose any attribute's byte + * offset and no method in the XML processor API will ever accept the byte offset + * to move to another location. If you need to move around the document, use + * `set_bookmark()` and `seek()` instead. */ - public function resume( $paused_state ) { - $this->bytes_already_parsed = $paused_state['token_starts_at_in_current_chunk']; - $this->upstream_bytes_forgotten = $paused_state['upstream_bytes_forgotten']; - $this->stack_of_open_elements = $paused_state['stack_of_open_elements']; - $this->parser_context = $paused_state['parser_context']; - $this->expecting_more_input = $paused_state['expecting_more_input']; - $this->next_token(); + public function get_token_byte_offset_in_the_input_stream() { + return $this->token_starts_at + $this->upstream_bytes_forgotten; + } + + protected function initialize_from_cursor( $cursor ) { + if ( ! is_string( $cursor ) ) { + _doing_it_wrong( __METHOD__, 'Cursor must be a JSON-encoded string.', '1.0.0' ); + return false; + } + $cursor = base64_decode( $cursor ); + if ( false === $cursor ) { + _doing_it_wrong( __METHOD__, 'Invalid cursor provided to initialize_from_cursor().', '1.0.0' ); + return false; + } + $cursor = json_decode( $cursor, true ); + if ( false === $cursor || ! isset( $cursor['token_starts_at_in_current_chunk'], $cursor['upstream_bytes_forgotten'], $cursor['stack_of_open_elements'], $cursor['parser_context'], $cursor['expecting_more_input'] ) ) { + _doing_it_wrong( __METHOD__, 'Invalid cursor provided to initialize_from_cursor().', '1.0.0' ); + return false; + } + $this->bytes_already_parsed = $cursor['token_starts_at_in_current_chunk']; + $this->upstream_bytes_forgotten = $cursor['upstream_bytes_forgotten']; + $this->stack_of_open_elements = $cursor['stack_of_open_elements']; + $this->parser_context = $cursor['parser_context']; + $this->expecting_more_input = $cursor['expecting_more_input']; + return true; } /** From ccc04721aa61320c850db000b7dabada57cc7d9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= <adam@adamziel.com> Date: Wed, 20 Nov 2024 16:46:49 +0100 Subject: [PATCH 10/12] Brush up the interface --- .../playground/data-liberation/plugin.php | 48 +++--------- .../src/import/WP_Markdown_Importer.php | 10 +++ .../src/import/WP_Stream_Importer.php | 77 ++++++++++++------- .../data-liberation/src/wxr/WP_WXR_Reader.php | 4 +- 4 files changed, 71 insertions(+), 68 deletions(-) diff --git a/packages/playground/data-liberation/plugin.php b/packages/playground/data-liberation/plugin.php index 6251d2c8e8..3a0f15f7a4 100644 --- a/packages/playground/data-liberation/plugin.php +++ b/packages/playground/data-liberation/plugin.php @@ -32,14 +32,8 @@ add_action('init', function() { return; $wxr_path = __DIR__ . '/tests/fixtures/wxr-simple.xml'; - $entity_iterator_factory = function($cursor=null) use ($wxr_path) { - return WP_WXR_Reader::create( - new WP_File_Reader($wxr_path), - $cursor - ); - }; - $importer = WP_Stream_Importer::create( - $entity_iterator_factory + $importer = WP_Stream_Importer::create_for_wxr_file( + $wxr_path ); while($importer->next_step()) { // ... @@ -52,8 +46,8 @@ echo "moving to importer2\n"; echo "\n\n"; - $importer2 = WP_Stream_Importer::create( - $entity_iterator_factory, + $importer2 = WP_Stream_Importer::create_for_wxr_file( + $wxr_path, array(), $paused_importer_state ); @@ -412,26 +406,13 @@ function data_liberation_create_importer($import) { // @TODO: Save the error, report it to the user. return; } - $entity_iterator_factory = function($cursor=null) use ($wxr_path) { - return WP_WXR_Reader::create( - new WP_File_Reader($wxr_path), - $cursor - ); - }; - return WP_Stream_Importer::create( - $entity_iterator_factory + return WP_Stream_Importer::create_for_wxr_file( + $wxr_path ); case 'wxr_url': - $wxr_url = $import['wxr_url']; - $entity_iterator_factory = function($cursor=null) use ($wxr_url) { - return WP_WXR_Reader::create( - new WP_Remote_File_Reader($wxr_url), - $cursor - ); - }; - return WP_Stream_Importer::create( - $entity_iterator_factory + return WP_Stream_Importer::create_for_wxr_url( + $import['wxr_url'] ); case 'markdown_zip': @@ -450,17 +431,8 @@ function data_liberation_create_importer($import) { } } $markdown_root = $temp_dir; - $entity_iterator_factory = function($cursor=null) use ($markdown_root) { - if(null !== $cursor) { - throw new \Exception('Resuming Markdown imports is not supported yet.'); - } - return new WP_Markdown_Directory_Tree_Reader( - $markdown_root, - 1000 - ); - }; - return WP_Markdown_Importer::create( - $entity_iterator_factory, [ + return WP_Markdown_Importer::create_for_markdown_directory( + $markdown_root, [ 'source_site_url' => 'file://' . $markdown_root, 'local_markdown_assets_root' => $markdown_root, 'local_markdown_assets_url_prefix' => '@site/', diff --git a/packages/playground/data-liberation/src/import/WP_Markdown_Importer.php b/packages/playground/data-liberation/src/import/WP_Markdown_Importer.php index d1aee0fb0d..e76e6d643f 100644 --- a/packages/playground/data-liberation/src/import/WP_Markdown_Importer.php +++ b/packages/playground/data-liberation/src/import/WP_Markdown_Importer.php @@ -2,6 +2,16 @@ class WP_Markdown_Importer extends WP_Stream_Importer { + public static function create_for_markdown_directory( $markdown_directory, $options = array(), $cursor = null ) { + return static::create( + function ( $cursor = null ) use ( $markdown_directory ) { + return WP_Markdown_Importer::create( $markdown_directory, $cursor ); + }, + $options, + $cursor + ); + } + protected static function parse_options( $options ) { if ( ! isset( $options['source_site_url'] ) ) { _doing_it_wrong( __METHOD__, 'The source_site_url option is required.', '__WP_VERSION__' ); diff --git a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php index ffaaa240ff..cc4ea76c30 100644 --- a/packages/playground/data-liberation/src/import/WP_Stream_Importer.php +++ b/packages/playground/data-liberation/src/import/WP_Stream_Importer.php @@ -60,7 +60,7 @@ class WP_Stream_Importer { /** * Iterator that streams entities to import. */ - private $entities_iterator; + private $entity_iterator; private $resume_at_entity; /** * A map of currently downloaded resources for each entity in @@ -73,6 +73,26 @@ class WP_Stream_Importer { private $active_downloads = array(); private $downloader; + public static function create_for_wxr_file( $wxr_path, $options = array(), $cursor = null ) { + return static::create( + function ( $cursor = null ) use ( $wxr_path ) { + return WP_WXR_Reader::create( new WP_File_Reader( $wxr_path ), $cursor ); + }, + $options, + $cursor + ); + } + + public static function create_for_wxr_url( $wxr_url, $options = array(), $cursor = null ) { + return static::create( + function ( $cursor = null ) use ( $wxr_url ) { + return WP_WXR_Reader::create( new WP_Remote_File_Reader( $wxr_url ), $cursor ); + }, + $options, + $cursor + ); + } + public static function create( $entity_iterator_factory, $options = array(), @@ -211,30 +231,29 @@ private function frontloading_advance_reentrancy_cursor() { * all its attachments downloaded. */ private function next_frontloading_step() { - if ( null === $this->entities_iterator ) { - $factory = $this->entity_iterator_factory; - $this->entities_iterator = $factory( $this->resume_at_entity ); - $this->downloader = new WP_Attachment_Downloader( $this->options ); + if ( null === $this->entity_iterator ) { + $this->entity_iterator = $this->create_entity_iterator(); + $this->downloader = new WP_Attachment_Downloader( $this->options ); } $this->frontloading_advance_reentrancy_cursor(); // We're done if all the entities are processed and all the downloads are finished. - if ( ! $this->entities_iterator->valid() && ! $this->downloader->has_pending_requests() ) { + if ( ! $this->entity_iterator->valid() && ! $this->downloader->has_pending_requests() ) { // This is an assertion to make double sure we're emptying the state queue. if ( ! empty( $this->active_downloads ) ) { _doing_it_wrong( __METHOD__, 'Frontloading queue is not empty.', '1.0' ); } - $this->stage = self::STAGE_IMPORT_ENTITIES; - $this->downloader = null; - $this->active_downloads = array(); - $this->entities_iterator = null; - $this->resume_at_entity = null; + $this->stage = self::STAGE_IMPORT_ENTITIES; + $this->downloader = null; + $this->active_downloads = array(); + $this->entity_iterator = null; + $this->resume_at_entity = null; return false; } // Poll the bytes between scheduling new downloads. - $only_downloader_pending = ! $this->entities_iterator->valid() && $this->downloader->has_pending_requests(); + $only_downloader_pending = ! $this->entity_iterator->valid() && $this->downloader->has_pending_requests(); if ( $this->downloader->queue_full() || $only_downloader_pending ) { /** * @TODO: @@ -256,8 +275,8 @@ private function next_frontloading_step() { * Identify the static assets referenced in the current entity * and enqueue them for download. */ - $entity = $this->entities_iterator->current(); - $cursor = $this->entities_iterator->get_reentrancy_cursor(); + $entity = $this->entity_iterator->current(); + $cursor = $this->entity_iterator->get_reentrancy_cursor(); $this->active_downloads[ $cursor ] = array(); $data = $entity->get_data(); @@ -292,7 +311,7 @@ private function next_frontloading_step() { */ // Move on to the next entity. - $this->entities_iterator->next(); + $this->entity_iterator->next(); $this->frontloading_advance_reentrancy_cursor(); return true; @@ -306,21 +325,20 @@ private function next_frontloading_step() { * the API consumer? */ private function import_next_entity() { - if ( null === $this->entities_iterator ) { - $factory = $this->entity_iterator_factory; - $this->entities_iterator = $factory( $this->resume_at_entity ); - $this->importer = new WP_Entity_Importer(); + if ( null === $this->entity_iterator ) { + $this->entity_iterator = $this->create_entity_iterator(); + $this->importer = new WP_Entity_Importer(); } - if ( ! $this->entities_iterator->valid() ) { + if ( ! $this->entity_iterator->valid() ) { // We're done. - $this->stage = self::STAGE_FINISHED; - $this->entities_iterator = null; - $this->importer = null; + $this->stage = self::STAGE_FINISHED; + $this->entity_iterator = null; + $this->importer = null; return; } - $entity = $this->entities_iterator->current(); + $entity = $this->entity_iterator->current(); $attachments = array(); // Rewrite the URLs in the post. switch ( $entity->get_type() ) { @@ -371,8 +389,8 @@ private function import_next_entity() { /** * @TODO: Update the progress information. */ - $this->resume_at_entity = $this->entities_iterator->get_reentrancy_cursor(); - $this->entities_iterator->next(); + $this->resume_at_entity = $this->entity_iterator->get_reentrancy_cursor(); + $this->entity_iterator->next(); } private function enqueue_attachment_download( string $raw_url, $context_path = null ) { @@ -383,7 +401,7 @@ private function enqueue_attachment_download( string $raw_url, $context_path = n $enqueued = $this->downloader->enqueue_if_not_exists( $url, $output_path ); if ( $enqueued ) { $resource_id = $this->downloader->get_last_enqueued_resource_id(); - $entity_cursor = $this->entities_iterator->get_reentrancy_cursor(); + $entity_cursor = $this->entity_iterator->get_reentrancy_cursor(); $this->active_downloads[ $entity_cursor ][ $resource_id ] = true; } return $enqueued; @@ -466,4 +484,9 @@ private function url_processor_matched_asset_url( WP_Block_Markup_Url_Processor ( ! $this->source_site_url || url_matches( $p->get_parsed_url(), $this->source_site_url ) ) ); } + + private function create_entity_iterator() { + $factory = $this->entity_iterator_factory; + return $factory( $this->resume_at_entity ); + } } diff --git a/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php b/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php index 25e716256c..76497fec89 100644 --- a/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php +++ b/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php @@ -118,9 +118,7 @@ * * @TODO: * - * - Save parser state after each entity or every `n` entities to speed it up. Then also save the `n` - * for a quick rewind after resuming. - * - Resume parsing from saved state. + * - Revisit the need to implement the Iterator interface. * * @since WP_VERSION */ From ee422732d1ab140d58a266677fce1fd4881fdfdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= <adam@adamziel.com> Date: Wed, 20 Nov 2024 17:09:08 +0100 Subject: [PATCH 11/12] Remove trailing commas --- .../data-liberation/src/import/WP_Attachment_Downloader.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php b/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php index 951a45e632..a6be3e74f0 100644 --- a/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php +++ b/packages/playground/data-liberation/src/import/WP_Attachment_Downloader.php @@ -59,7 +59,7 @@ public function enqueue_if_not_exists( $url, $output_path ) { $success = copy( $local_path, $output_path ); $this->pending_events[] = new WP_Attachment_Downloader_Event( $this->enqueued_resource_id, - $success ? WP_Attachment_Downloader_Event::SUCCESS : WP_Attachment_Downloader_Event::FAILURE, + $success ? WP_Attachment_Downloader_Event::SUCCESS : WP_Attachment_Downloader_Event::FAILURE ); return true; case 'http': @@ -136,7 +136,7 @@ public function poll() { } $this->pending_events[] = new WP_Attachment_Downloader_Event( 'http:' . $original_request_id, - WP_Attachment_Downloader_Event::FAILURE, + WP_Attachment_Downloader_Event::FAILURE ); unset( $this->output_paths[ $original_request_id ] ); break; @@ -156,7 +156,7 @@ public function poll() { } $this->pending_events[] = new WP_Attachment_Downloader_Event( 'http:' . $original_request_id, - WP_Attachment_Downloader_Event::SUCCESS, + WP_Attachment_Downloader_Event::SUCCESS ); unset( $this->output_paths[ $original_request_id ] ); } From 86074d44e24815b7c406f08ca67d9c1009ab2cc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= <adam@adamziel.com> Date: Thu, 21 Nov 2024 11:08:49 +0100 Subject: [PATCH 12/12] Fix unit tests --- .../data-liberation/src/wxr/WP_WXR_Reader.php | 9 ++++----- .../src/xml-api/WP_XML_Processor.php | 6 +++--- .../tests/WPWXRReaderTests.php | 20 +++++++++---------- .../tests/WPXMLProcessorTests.php | 10 ++++++---- 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php b/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php index 76497fec89..c37d952714 100644 --- a/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php +++ b/packages/playground/data-liberation/src/wxr/WP_WXR_Reader.php @@ -396,11 +396,10 @@ public function get_reentrancy_cursor() { * If the cursor internal data was a part of every bookmark, this would have worked * even after evicting the actual bytes where $last_entity is stored. */ - $xml_cursor = $this->xml->get_reentrancy_cursor(); - $xml_cursor = json_decode( base64_decode( $xml_cursor ), true ); - $xml_cursor['token_starts_at_in_current_chunk'] = 0; - $xml_cursor['upstream_bytes_forgotten'] = $this->entity_byte_offset; - $xml_cursor = base64_encode( json_encode( $xml_cursor ) ); + $xml_cursor = $this->xml->get_reentrancy_cursor(); + $xml_cursor = json_decode( base64_decode( $xml_cursor ), true ); + $xml_cursor['upstream_bytes_forgotten'] = $this->entity_byte_offset; + $xml_cursor = base64_encode( json_encode( $xml_cursor ) ); return json_encode( array( 'xml' => $xml_cursor, diff --git a/packages/playground/data-liberation/src/xml-api/WP_XML_Processor.php b/packages/playground/data-liberation/src/xml-api/WP_XML_Processor.php index de89856c25..9dce95e3fc 100644 --- a/packages/playground/data-liberation/src/xml-api/WP_XML_Processor.php +++ b/packages/playground/data-liberation/src/xml-api/WP_XML_Processor.php @@ -684,7 +684,6 @@ public function get_reentrancy_cursor() { return base64_encode( json_encode( array( - 'token_starts_at_in_current_chunk' => $this->token_starts_at, 'upstream_bytes_forgotten' => $this->upstream_bytes_forgotten, 'parser_context' => $this->parser_context, 'stack_of_open_elements' => $this->stack_of_open_elements, @@ -720,11 +719,12 @@ protected function initialize_from_cursor( $cursor ) { return false; } $cursor = json_decode( $cursor, true ); - if ( false === $cursor || ! isset( $cursor['token_starts_at_in_current_chunk'], $cursor['upstream_bytes_forgotten'], $cursor['stack_of_open_elements'], $cursor['parser_context'], $cursor['expecting_more_input'] ) ) { + if ( false === $cursor ) { _doing_it_wrong( __METHOD__, 'Invalid cursor provided to initialize_from_cursor().', '1.0.0' ); return false; } - $this->bytes_already_parsed = $cursor['token_starts_at_in_current_chunk']; + // Assume the input stream will start from the last known byte offset. + $this->bytes_already_parsed = 0; $this->upstream_bytes_forgotten = $cursor['upstream_bytes_forgotten']; $this->stack_of_open_elements = $cursor['stack_of_open_elements']; $this->parser_context = $cursor['parser_context']; diff --git a/packages/playground/data-liberation/tests/WPWXRReaderTests.php b/packages/playground/data-liberation/tests/WPWXRReaderTests.php index 2b08768080..881749cf81 100644 --- a/packages/playground/data-liberation/tests/WPWXRReaderTests.php +++ b/packages/playground/data-liberation/tests/WPWXRReaderTests.php @@ -8,7 +8,7 @@ class WPWXRReaderTests extends TestCase { * @dataProvider preexisting_wxr_files_provider */ public function test_does_not_crash_when_parsing_preexisting_wxr_files_as_string($path, $expected_entitys) { - $wxr = new WP_WXR_Reader(); + $wxr = WP_WXR_Reader::create(); $wxr->append_bytes(file_get_contents($path)); $wxr->input_finished(); @@ -25,7 +25,7 @@ public function test_does_not_crash_when_parsing_preexisting_wxr_files_as_string */ public function test_does_not_crash_when_parsing_preexisting_wxr_files_as_stream($path, $expected_entitys) { $stream = fopen($path, 'r'); - $wxr = new WP_WXR_Reader(); + $wxr = WP_WXR_Reader::create(); $found_entities = 0; while(true) { $chunk = fread($stream, 100); @@ -64,7 +64,7 @@ public function preexisting_wxr_files_provider() { public function test_simple_wxr() { - $importer = new WP_WXR_Reader(); + $importer = WP_WXR_Reader::create(); $importer->append_bytes(file_get_contents(__DIR__ . '/fixtures/wxr-simple.xml')); $importer->input_finished(); $this->assertTrue( $importer->next_entity() ); @@ -182,7 +182,7 @@ public function test_simple_wxr() { } public function test_attachments() { - $importer = new WP_WXR_Reader(); + $importer = WP_WXR_Reader::create(); $importer->append_bytes(<<<XML <?xml version="1.0" encoding="UTF-8"?> <rss> @@ -265,7 +265,7 @@ public function test_attachments() { } public function test_terms() { - $importer = new WP_WXR_Reader(); + $importer = WP_WXR_Reader::create(); $importer->append_bytes(<<<XML <?xml version="1.0" encoding="UTF-8"?> <rss> @@ -300,7 +300,7 @@ public function test_terms() { } public function test_category() { - $importer = new WP_WXR_Reader(); + $importer = WP_WXR_Reader::create(); $importer->append_bytes(<<<XML <?xml version="1.0" encoding="UTF-8"?> <rss> @@ -331,7 +331,7 @@ public function test_category() { } public function test_tag_string() { - $importer = new WP_WXR_Reader(); + $importer = WP_WXR_Reader::create(); $importer->append_bytes(<<<XML <?xml version="1.0" encoding="UTF-8"?> <rss> @@ -379,7 +379,7 @@ public function test_tag_streaming() { XML; $chunks = str_split($wxr, 10); - $wxr = new WP_WXR_Reader(); + $wxr = WP_WXR_Reader::create(); while(true) { if(true === $wxr->next_entity()) { break; @@ -411,7 +411,7 @@ public function test_tag_streaming() { } public function test_parse_comment() { - $wxr = new WP_WXR_Reader(); + $wxr = WP_WXR_Reader::create(); $wxr->append_bytes(<<<XML <?xml version="1.0" encoding="UTF-8"?> <rss> @@ -494,7 +494,7 @@ public function test_parse_comment() { } public function test_retains_last_ids() { - $wxr = new WP_WXR_Reader(); + $wxr = WP_WXR_Reader::create(); $wxr->append_bytes(<<<XML <?xml version="1.0" encoding="UTF-8"?> <rss> diff --git a/packages/playground/data-liberation/tests/WPXMLProcessorTests.php b/packages/playground/data-liberation/tests/WPXMLProcessorTests.php index faee0c31a9..9a48413631 100644 --- a/packages/playground/data-liberation/tests/WPXMLProcessorTests.php +++ b/packages/playground/data-liberation/tests/WPXMLProcessorTests.php @@ -1735,13 +1735,15 @@ public function test_pause_and_resume() { $processor->next_tag(); $processor->next_tag(); $this->assertEquals( 'first_child', $processor->get_tag(), 'Did not find a tag.' ); - $paused_state = $processor->pause(); - $this->assertEquals( 10, $paused_state['token_byte_offset_in_the_input_stream'], 'Wrong position in the input stream exported.' ); + + $entity_offset = $processor->get_token_byte_offset_in_the_input_stream(); + $cursor = $processor->get_reentrancy_cursor(); $resumed = WP_XML_Processor::create_for_streaming( - substr( $xml, $paused_state['token_byte_offset_in_the_input_stream'] ) + substr( $xml, $entity_offset ), + $cursor ); - $resumed->resume( $paused_state ); + $resumed->next_tag(); $this->assertEquals( 'first_child', $resumed->get_tag(), 'Did not find a tag.' ); $resumed->next_token(); $this->assertEquals( 'Hello there', $resumed->get_modifiable_text(), 'Did not find the expected text.' );