Skip to content

Commit

Permalink
[Data Liberation] Re-entrant WP_Stream_Importer (#2004)
Browse files Browse the repository at this point in the history
Adds re-entrancy semantics to the importer API to enable pausing and
resuming data imports:

```php
$wxr_path = __DIR__ . '/tests/fixtures/wxr-simple.xml';
$importer = WP_Stream_Importer::create_for_wxr_file( $wxr_path );
// Do some work
for($i = 0;$i<10;$i++) {
	$importer->next_step();
}
// Save our progress
$cursor = $importer->get_reentrancy_cursor();

// Continue where we left off later on
$new_importer = WP_Stream_Importer::create_for_wxr_file( $wxr_path, [], $cursor );
$new_importer->next_step();
```

## Motivation

Most WordPress importers fail because they assume a happy path: we have
enough memory, we have enough time, all the assets will be available,
and so on.

In Data Liberation, I want to assume the worst possible path through
thorny quicksand in full sun with venomous wasps stinging us. We'll run
out of memory after the first post, all the assets will be 40GB large,
and half of them won't be possible to download.

Pausing, resuming, and recovering from errors should be a basic
primitive of the system. The first step to supporting that is the
ability to suspend the import operation and restart it from the same
spot later on. And that's exactly what this PR adds.

## Re-entrancy interface

This PR doesn't store any information in the database yet. It merely
adds the plumbing for pausing and resuming the `WP_Stream_Importer`
instance.

### WP_Byte_Stream re-entrancy

The `WP_Byte_Stream` interface directly exposes a `tell(): int` and
`seek($offset)` methods. There's no need for anything fancier than that
– we're only interested in an offset in the stream. It seems to work
well for simple byte streams.

My only worry is we may need to revisit this interface later on to
support fetching fixed-size chunks from large files using byte ranges.

### WP_XML_Processor re-entrancy

`WP_XML_Processor` supports exporting state via:

* A `get_reentrancy_cursor()` method
* Resuming via a static `create($xml, $options, $cursor=null)`.
* Seeking the input stream to the correct location via
`get_token_byte_offset_in_the_input_stream()`

No method in the XML processor API will ever accept the cursor or the
byte offset as a way of moving to another location in the document. You
can only create a new XML processor at `$cursor`.

This is a measure to:

* Discourage using the byte offsets for manual string operations on the
XML document. It's a footgun and most API consumers who would try that
would just introduce bugs into their codebase.
* Make it impossible to misuse the re-entrancy API for `seek()`-ing. We
already have named bookmarks for that.

Usage:

```php
$xml = WP_XML_Processor::create_from_string( $xml_bytes );
for($i = 0;$i<10;$i++) {
	$xml->next_step();
}

$cursor = $xml->get_reentrancy_cursor();

$unparsed_xml = substr(
	$xml_bytes,
	$xml->get_token_byte_offset_in_the_input_stream()
);
$xml2 = WP_XML_Processor::create_from_string( $unparsed_xml, $cursor );
$xml2->next_step();
```

### WP_WXR_Reader re-entrancy

The `WP_WXR_Reader` class uses the same `get_reentrancy_cursor()`
interface as `WP_XML_Processor`.


### WP_Stream_Importer re-entrancy

The `WP_Stream_Importer` class uses the same `get_reentrancy_cursor()`
interface as `WP_XML_Processor`. See the example at the top of this
description.


## Testing instructions

TBD. We don't yet have a good way of running PHPUnit in the WordPress
context yet. @zaerl is working on running import in CLI, we may need to
wait for that before adding tests to this PR and shipping it.
  • Loading branch information
adamziel authored Nov 22, 2024
1 parent 7e9a1ac commit e4ffd2d
Show file tree
Hide file tree
Showing 14 changed files with 695 additions and 370 deletions.
106 changes: 56 additions & 50 deletions packages/playground/data-liberation/plugin.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

/**
* Don't run KSES on the attribute values during the import.
*
*
* Without this filter, WP_HTML_Tag_Processor::set_attribute() will
* assume the value is a URL and run KSES on it, which will incorrectly
* prefix relative paths with http://.
*
*
* For example:
*
*
* > $html = new WP_HTML_Tag_Processor( '<img>' );
* > $html->next_tag();
* > $html->set_attribute( 'src', './_assets/log-errors.png' );
Expand All @@ -25,6 +25,41 @@
return [];
});

/**
* Development debug code to run the import manually.
* @TODO: Remove this in favor of a CLI command.
*/
add_action('init', function() {
return;
$wxr_path = __DIR__ . '/tests/fixtures/wxr-simple.xml';
$importer = WP_Stream_Importer::create_for_wxr_file(
$wxr_path
);
while($importer->next_step()) {
// ...
}
return;
$importer->next_step();
$paused_importer_state = $importer->get_reentrancy_cursor();

echo "\n\n";
echo "moving to importer2\n";
echo "\n\n";

$importer2 = WP_Stream_Importer::create_for_wxr_file(
$wxr_path,
array(),
$paused_importer_state
);
$importer2->next_step();
$importer2->next_step();
$importer2->next_step();
// $importer2->next_step();
// var_dump($importer2);

die("YAY");
});

// Register admin menu
add_action('admin_menu', function() {
add_menu_page(
Expand Down Expand Up @@ -86,7 +121,7 @@ function data_liberation_admin_page() {
data_liberation_process_import();
echo '</pre>';
}

?>
<h2>Active import</h2>
<?php
Expand Down Expand Up @@ -148,9 +183,9 @@ function data_liberation_admin_page() {
>
<?php wp_nonce_field('data_liberation_import'); ?>
<input type="hidden" name="action" value="data_liberation_import">

<h2>Import Content</h2>

<table class="form-table">
<tr>
<th scope="row">Import Type</th>
Expand All @@ -175,7 +210,7 @@ function data_liberation_admin_page() {
</label>
</td>
</tr>

<tr data-wp-context='{ "importType": "wxr_file" }'
data-wp-class--hidden="!state.isImportTypeSelected">
<th scope="row">WXR File</th>
Expand All @@ -184,7 +219,7 @@ function data_liberation_admin_page() {
<p class="description">Upload a WordPress eXtended RSS (WXR) file</p>
</td>
</tr>

<tr data-wp-context='{ "importType": "wxr_url" }'
data-wp-class--hidden="!state.isImportTypeSelected">
<th scope="row">WXR URL</th>
Expand All @@ -193,7 +228,7 @@ function data_liberation_admin_page() {
<p class="description">Enter the URL of a WXR file</p>
</td>
</tr>

<tr data-wp-context='{ "importType": "markdown_zip" }'
data-wp-class--hidden="!state.isImportTypeSelected">
<th scope="row">Markdown ZIP</th>
Expand All @@ -210,7 +245,7 @@ function data_liberation_admin_page() {
<h2>Previous Imports</h2>

<p>TODO: Show a table of previous imports.</p>

<table class="form-table">
<tr>
<th scope="row">Date</th>
Expand Down Expand Up @@ -329,7 +364,7 @@ function data_liberation_admin_page() {
*/
// if(is_wp_error(wp_schedule_event(time(), 'data_liberation_minute', 'data_liberation_process_import'))) {
// wp_delete_attachment($attachment_id, true);
// // @TODO: More user friendly error message – maybe redirect back to the import screen and
// // @TODO: More user friendly error message – maybe redirect back to the import screen and
// // show the error there.
// wp_die('Failed to schedule import – the "data_liberation_minute" schedule may not be registered.');
// }
Expand All @@ -353,20 +388,9 @@ function data_liberation_process_import() {

function data_liberation_import_step($import) {
$importer = data_liberation_create_importer($import);
// @TODO: Save the last importer state so we can resume it later if interrupted.
update_option('data_liberation_import_progress', [
'status' => 'Downloading static assets...',
'current' => 0,
'total' => 0
]);
$importer->frontload_assets();
// @TODO: Keep track of multiple progress dimensions – posts, assets, categories, etc.
update_option('data_liberation_import_progress', [
'status' => 'Importing posts...',
'current' => 0,
'total' => 0
]);
$importer->import_entities();
while($importer->next_step()) {
// ...Twiddle our thumbs...
}
delete_option('data_liberation_active_import');
// @TODO: Do not echo things. Append to an import log where we can retrace the steps.
// Also, store specific import events in the database so the user can react and
Expand All @@ -382,25 +406,13 @@ function data_liberation_create_importer($import) {
// @TODO: Save the error, report it to the user.
return;
}
$entity_iterator_factory = function() use ($wxr_path) {
$wxr = new WP_WXR_Reader();
$wxr->connect_upstream(new WP_File_Reader($wxr_path));

return $wxr;
};
return WP_Stream_Importer::create(
$entity_iterator_factory
return WP_Stream_Importer::create_for_wxr_file(
$wxr_path
);

case 'wxr_url':
$wxr_url = $import['wxr_url'];
$entity_iterator_factory = function() use ($wxr_url) {
$wxr = new WP_WXR_Reader();
$wxr->connect_upstream(new WP_Remote_File_Reader($wxr_url));
return $wxr;
};
return WP_Stream_Importer::create(
$entity_iterator_factory
return WP_Stream_Importer::create_for_wxr_url(
$import['wxr_url']
);

case 'markdown_zip':
Expand All @@ -419,18 +431,12 @@ function data_liberation_create_importer($import) {
}
}
$markdown_root = $temp_dir;
$entity_iterator_factory = function() use ($markdown_root) {
return new WP_Markdown_Directory_Tree_Reader(
$markdown_root,
1000
);
};
return WP_Markdown_Importer::create(
$entity_iterator_factory, [
return WP_Markdown_Importer::create_for_markdown_directory(
$markdown_root, [
'source_site_url' => 'file://' . $markdown_root,
'local_markdown_assets_root' => $markdown_root,
'local_markdown_assets_url_prefix' => '@site/',
]
);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<?php

interface WP_Byte_Reader {
public function pause(): array|bool;
public function resume( $paused_state ): bool;
public function tell(): int;
public function seek( int $offset ): bool;
public function is_finished(): bool;
public function next_bytes(): bool;
public function get_bytes(): string|null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ class WP_File_Reader implements WP_Byte_Reader {
protected $chunk_size;
protected $file_pointer;
protected $offset_in_file;
protected $output_bytes;
protected $output_bytes = '';
protected $last_chunk_size = 0;
protected $last_error;
protected $state = self::STATE_STREAMING;

Expand All @@ -18,22 +19,24 @@ public function __construct( $file_path, $chunk_size = 8096 ) {
$this->chunk_size = $chunk_size;
}

/**
* Really these are just `tell()` and `seek()` operations, only the state is more
* involved than a simple offset. Hmm.
*/
public function pause(): array|bool {
return array(
'offset_in_file' => $this->offset_in_file,
);
public function tell(): int {
// Save the previous offset, not the current one.
// This way, after resuming, the next read will yield the same $output_bytes
// as we have now.
return $this->offset_in_file - $this->last_chunk_size;
}

public function resume( $paused_state ): bool {
public function seek( $offset_in_file ): bool {
if ( ! is_int( $offset_in_file ) ) {
_doing_it_wrong( __METHOD__, 'Cannot set a file reader cursor to a non-integer offset.', '1.0.0' );
return false;
}
if ( $this->file_pointer ) {
_doing_it_wrong( __METHOD__, 'Cannot resume a file reader that is already initialized.', '1.0.0' );
_doing_it_wrong( __METHOD__, 'Cannot set a file reader cursor on a file reader that is already initialized.', '1.0.0' );
return false;
}
$this->offset_in_file = $paused_state['offset_in_file'];
$this->offset_in_file = $offset_in_file;
$this->last_chunk_size = 0;
return true;
}

Expand All @@ -50,7 +53,8 @@ public function get_last_error(): string|null {
}

public function next_bytes(): bool {
$this->output_bytes = '';
$this->output_bytes = '';
$this->last_chunk_size = 0;
if ( $this->last_error || $this->is_finished() ) {
return false;
}
Expand All @@ -66,7 +70,8 @@ public function next_bytes(): bool {
$this->state = static::STATE_FINISHED;
return false;
}
$this->offset_in_file += strlen( $bytes );
$this->last_chunk_size = strlen( $bytes );
$this->offset_in_file += $this->last_chunk_size;
$this->output_bytes .= $bytes;
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ public function __construct( $url ) {
$this->url = $url;
}

public function tell(): int {
return $this->bytes_already_read + $this->skip_bytes;
}

public function seek( $offset_in_file ): bool {
if ( $this->request ) {
_doing_it_wrong( __METHOD__, 'Cannot set a remote file reader cursor on a remote file reader that is already initialized.', '1.0.0' );
return false;
}
$this->skip_bytes = $offset_in_file;
return true;
}

public function next_bytes(): bool {
if ( null === $this->request ) {
$this->request = new WordPress\AsyncHttp\Request(
Expand Down Expand Up @@ -90,21 +103,6 @@ public function get_bytes(): string|null {
return $this->current_chunk;
}

public function pause(): array|bool {
return array(
'offset_in_file' => $this->bytes_already_read + $this->skip_bytes,
);
}

public function resume( $paused_state ): bool {
if ( $this->request ) {
_doing_it_wrong( __METHOD__, 'Cannot resume a remote file reader that is already initialized.', '1.0.0' );
return false;
}
$this->skip_bytes = $paused_state['offset_in_file'];
return true;
}

public function is_finished(): bool {
return $this->is_finished;
}
Expand Down
Loading

0 comments on commit e4ffd2d

Please sign in to comment.