diff --git a/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_foxml_files.yml b/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_foxml_files.yml index 179783ab..3b6935b4 100644 --- a/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_foxml_files.yml +++ b/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_foxml_files.yml @@ -11,7 +11,9 @@ source: # validation of the URI in the destination may fail (presumably, naive URI # validation inside of Drupal, expecting HTTP-like URLs)... or may just # require a third slash to imply an empty "authority" component? - cache_counts: true + # XXX: For big sets of things, counting could take a substantial amount of + # time, so let's skip it. + skip_count: true destination: plugin: entity:file validate: true diff --git a/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_nodes.yml b/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_nodes.yml index f428100c..bf283240 100644 --- a/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_nodes.yml +++ b/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_nodes.yml @@ -41,9 +41,13 @@ source: # to the vocab in which to do the things. - '@_vid' - plugin: flatten - - plugin: migration_lookup + - plugin: dgi_migrate.process.locking_migration_lookup migration: dgis_stub_terms_generic stub_id: dgis_stub_terms_generic + lock_context_keys: + dgis_stub_terms_generic: + - { offset: [3] } + - { offset: [1], hash: '##'} extract: &generic_term_extract plugin: dgi_migrate.process.single_extract index: [actual] @@ -71,6 +75,7 @@ process: method: models - plugin: skip_on_empty method: row + message: 'Skipping; no model defined.' title: - plugin: dgi_migrate.subproperty source: '@_node_foxml_parsed' @@ -2703,18 +2708,23 @@ process: - '@_unspecified_rights_statement' - plugin: null_coalesce nid: - - plugin: migration_lookup + - plugin: dgi_migrate.process.locking_migration_lookup source: '@field_pid' migration: dgis_stub_nodes + lock_context_keys: &dgis_stub_nodes_lock_context_keys + dgis_stub_nodes: + - { offset: [ 0 ], hash: '#/##' } field_member_of: - plugin: flatten source: - '@_members' - '@_constituents' - plugin: multiple_values - - plugin: migration_lookup + - plugin: dgi_migrate.process.locking_migration_lookup migration: dgis_stub_nodes stub_id: dgis_stub_nodes + lock_context_keys: + << : *dgis_stub_nodes_lock_context_keys migration_dependencies: required: - dgis_foxml_files diff --git a/scripts/env.sample b/scripts/env.sample index a0f9b787..1292b265 100644 --- a/scripts/env.sample +++ b/scripts/env.sample @@ -58,3 +58,41 @@ # PROCESSES: The number of processes to use to run the migration import. # --- #PROCESSES=1 + +# === +# SKIP_STATUS: Suppress dumping of migration status before/after operations. +# --- +# Default: +#SKIP_STATUS=false +# To skip, uncomment (or equivalently set): +#SKIP_STATUS=true + +# === +# MULTIPROCESS_SKIP_MIGRATIONS: Skip processing the specified migrations. +# +# May be of some use in resuming larger migrations, when we do not wish to +# undertake no-op cycling through other migrations. +# --- +#MULTIPROCESS_SKIP_MIGRATIONS=() + +# === +# MULTIPROCESS_PRE_ENQUEUE_PAUSE: Pause execution before enqueuing these. +# +# Expected to be a Bash array; in this instance, just a set of strings +# (representing migration IDs) between parentheses. +# +# NOTE: The prompt for this presently only shows in the *-import.log; _not_ in +# the main "run" process. +# --- +#MULTIPROCESS_PRE_ENQUEUE_PAUSE=() + +# === +# MULTIPROCESS_POST_PROCESS_PAUSE: Pause execution after finishing these. +# +# Expected to be a Bash array; in this instance, just a set of strings +# (representing migration IDs) between parentheses. +# +# NOTE: The prompt for this presently only shows in the *-import.log; _not_ in +# the main "run" process. +# --- +#MULTIPROCESS_POST_PROCESS_PAUSE=() diff --git a/scripts/util.in b/scripts/util.in index 214ddc2b..ccac2ca6 100644 --- a/scripts/util.in +++ b/scripts/util.in @@ -65,6 +65,10 @@ function init_vars () { declare -g TIME=${TIME:-/usr/bin/time} declare -g LOG_DIR=${LOG_DIR:-$CONFIG_DIR} declare -g PROCESSES=${PROCESSES:-1} + declare -g SKIP_STATUS=${SKIP_STATUS:-false} + declare -g -a MULTIPROCESS_SKIP_MIGRATIONS=(${MULTIPROCESS_SKIP_MIGRATIONS[@]}) + declare -g -a MULTIPROCESS_PRE_ENQUEUE_PAUSE=(${MULTIPROCESS_PRE_ENQUEUE_PAUSE[@]}) + declare -g -a MULTIPROCESS_POST_PROCESS_PAUSE=(${MULTIPROCESS_POST_PROCESS_PAUSE[@]}) # Initialize the log directory. if ! [ -d "$LOG_DIR" ]; then @@ -96,6 +100,21 @@ function do_migration_single_process() { timedwwwdrush dgi-migrate:import "--root=$DRUPAL_ROOT" "--uri=$URI" "--user=$DRUPAL_USER" "--group=$MIGRATION_GROUP" "${@:2}" } +# Helper; facilitate pausing for various reasons (likely snapshotting). +# +# Positional args: +# - 1: A descriptive string of _when_ we are pausing; e.g. "pre-enqueue", +# "post-enqueue", etc. +# - 2: The ID of the specific migration during which we are pausing. +function do_pause() { + local WHEN=$1 + local MIGRATION_ID=$2 + local DISCARD + + read -ep "Pausing $WHEN of $MIGRATION_ID as requested. Hit enter to continue." DISCARD + echo "DISCARD is $DISCARD" +} + # Kick off a migration in multiple processes. # # Positional args: @@ -106,23 +125,77 @@ function do_migration_multi_process() { local NUM=${1} local PROCESS_LOG_DIR="$LOG_DIR/$NUM-multiprocess-logs" + local STOP_LOCK_FILE="$LOG_DIR/$NUM-stop.lock" + local PAUSE_LOCK_FILE="$LOG_DIR/$NUM-pause.lock" wwwdo mkdir -p $PROCESS_LOG_DIR + wwwdo touch $STOP_LOCK_FILE + wwwdo touch $PAUSE_LOCK_FILE echo "Listing migrations..." - timedwwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --format=string \ - | sort -n --key=2 | cut -f1 | while read MIGRATION_ID; do + local -a migrations=($(wwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --field=id --sort)) + for MIGRATION_ID in ${migrations[@]}; do + if [ ! -f $PAUSE_LOCK_FILE ] ; then + echo "Pause lock file removed." + do_pause "pre-enqueue" "$MIGRATION_ID" + elif [ ! -f $STOP_LOCK_FILE ] ; then + echo "Stop lock file removed; exiting before touching $MIGRATION_ID." + return + elif [[ " ${MULTIPROCESS_SKIP_MIGRATIONS[@]} " =~ " $MIGRATION_ID " ]]; then + echo "Skipping $MIGRATION_ID as requested." + continue + elif [[ " ${MULTIPROCESS_PRE_ENQUEUE_PAUSE[@]} " =~ " $MIGRATION_ID " ]]; then + do_pause "pre-enqueue" "$MIGRATION_ID" + fi + echo "Enqueuing items for $MIGRATION_ID" - timedwwwdrush dgi-migrate:enqueue --user=$DRUPAL_USER $MIGRATION_ID "${@:2}" + timedwwwdrush dgi-migrate:enqueue "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" & + local ENQUEUEING_JOB=$! + echo "Starting $PROCESSES processes to process $MIGRATION_ID." + local -a PROCESS_JOBS=() for i in $(seq 1 $PROCESSES); do echo "Starting $i/$PROCESSES to process $MIGRATION_ID." - timedwwwdrush dgi-migrate:enqueued-process --user=$DRUPAL_USER $MIGRATION_ID "${@:2}" &> $PROCESS_LOG_DIR/$MIGRATION_ID.$i.log & + timedwwwdrush dgi-migrate:enqueued-process "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" &> $PROCESS_LOG_DIR/"$MIGRATION_ID.$i.log" & + PROCESS_JOBS+=($!) + done + + wait $ENQUEUEING_JOB + echo "Work enqueueing finished; enqueueing terminal messages." + for i in $(seq 1 $PROCESSES); do + wwwdrush dgi-migrate:enqueue-terminal "$MIGRATION_ID" "$NUM" done - echo "Waiting for processes to exit..." - wait - timedwwwdrush dgi-migrate:finish-enqueued-process --user=1 $MIGRATION_ID "${@:2}" + + echo "Terminal messages enqueued; waiting for workers to finish..." + wait ${PROCESS_JOBS[@]} + echo "Workers exited." + + if [ ! -f $PAUSE_LOCK_FILE ] ; then + do_pause "post-process, pre-finalize" "$MIGRATION_ID" + elif [ ! -f $STOP_LOCK_FILE ] ; then + echo "Lock file removed; exiting without finalizing batch." + return + fi + + echo "Finalizing $MIGRATION_ID." + timedwwwdrush dgi-migrate:finish-enqueued-process "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" + echo "Finished $MIGRATION_ID." + if [[ " ${MULTIPROCESS_POST_PROCESS_PAUSE[@]} " =~ " $MIGRATION_ID " ]] ; then + do_pause "post-process" "$MIGRATION_ID" + fi + done + wwwdo rm $STOP_LOCK_FILE $PAUSE_LOCK_FILE +} + +# Dump status for the given migration group. +# +# Can be skipped if SKIP_STATUS=true; in which case the call to this should be +# no-op. +function dump_status() { + if [ $SKIP_STATUS != 'true' ]; then + wwwdrush migrate:status --group=$MIGRATION_GROUP + fi } # Handle kicking off a migration. @@ -146,7 +219,7 @@ function do_migration () { # (lookin' at you, dgi_migrate_foxml_standard_mods_xslt dealio) wwwdrush cache:rebuild # Dump status before run. - wwwdrush migrate:status --group=$MIGRATION_GROUP + dump_status { # Do the import, one way or another. if [ $PROCESSES -eq 1 ]; then @@ -156,10 +229,10 @@ function do_migration () { fi } |& wwwdo tee $IMPORT_LOG > /dev/null # Dump status after run. - wwwdrush migrate:status --group=$MIGRATION_GROUP + dump_status # Dump messages after run, so they're not lost with a subsequent run. wwwdo mkdir -p $MESSAGES_DIR - wwwdrush migrate:status --group=$MIGRATION_GROUP --field=id --format=string | \ + wwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --field=id | \ while read NAME ; do wwwdrush migrate:messages --format=json $NAME | wwwdo tee "$MESSAGES_DIR/$NAME.json" > /dev/null done @@ -202,11 +275,11 @@ function do_rollback () { # (lookin' at you, things involving Fedora dealio) wwwdrush cache:rebuild # Dump status before rollback. - wwwdrush migrate:status --group=$MIGRATION_GROUP + dump_status # The base rollback. - timedwwwdrush dgi-migrate:rollback --user=$DRUPAL_USER --group=$MIGRATION_GROUP "${@:2}" |& wwwdo tee $ROLLBACK_LOG > /dev/null + timedwwwdrush dgi-migrate:rollback --user=$DRUPAL_USER --group=$MIGRATION_GROUP "--run=$NUM" "${@:2}" |& wwwdo tee $ROLLBACK_LOG > /dev/null # Dump status after rollback. - wwwdrush migrate:status --group=$MIGRATION_GROUP + dump_status set +x } |& wwwdo tee $RUN_LOG diff --git a/src/Drush/Commands/MigrateCommands.php b/src/Drush/Commands/MigrateCommands.php index b798c7fa..64cf9c7b 100644 --- a/src/Drush/Commands/MigrateCommands.php +++ b/src/Drush/Commands/MigrateCommands.php @@ -6,6 +6,7 @@ use Drupal\Component\Graph\Graph; use Drupal\Core\StringTranslation\StringTranslationTrait; use Drupal\dgi_migrate\MigrateBatchExecutable; +use Drupal\dgi_migrate\StompQueue; use Drupal\migrate\Plugin\MigrationInterface; use Drupal\migrate_tools\Drush\Commands\MigrateToolsCommands; use Drupal\migrate_tools\Drush9LogMigrateMessage; @@ -51,6 +52,7 @@ class MigrateCommands extends MigrateToolsCommands { * @option skip-progress-bar Skip displaying a progress bar. * @option sync Sync source and destination. Delete destination records that * do not exist in the source. + * @option run The ID of the run, if relevant. * * @default $options [] * @usage migrate:batch-import --all @@ -90,6 +92,7 @@ public function batchImport($migration_names = '', array $options = [ 'execute-dependencies' => FALSE, 'skip-progress-bar' => FALSE, 'sync' => FALSE, + 'run' => NULL, ]) : void { parent::import($migration_names, $options); } @@ -207,6 +210,7 @@ protected function executeMigration(MigrationInterface $migration, $migration_id * An optional set of row statuses, comma-separated, to which to constrain * the rollback. Valid states are: "imported", "needs_update", "ignored", * and "failed". + * @option run The ID of the run, if relevant. * * @default $options [] * @@ -239,6 +243,7 @@ public function rollback($migration_names = '', array $options = [ 'skip-progress-bar' => FALSE, 'continue-on-failure' => FALSE, 'statuses' => self::REQ, + 'run' => NULL, ]) : void { $group_names = $options['group']; $tag_names = $options['tag']; @@ -272,7 +277,8 @@ public function rollback($migration_names = '', array $options = [ $executable = new MigrateBatchExecutable( $migration, $this->getMigrateMessage(), - $options + $options, + $options['run'] ); // drush_op() provides --simulate support. $result = drush_op([$executable, 'rollback']); @@ -284,7 +290,7 @@ public function rollback($migration_names = '', array $options = [ // If any rollbacks failed, throw an exception to generate exit status. if ($has_failure) { - $error_message = dt('!name migration failed.', ['!name' => $migration_id]); + $error_message = dt(strtr('!name migration failed.', ['!name' => $migration_id])); if ($options['continue-on-failure']) { $this->logger()->error($error_message); } @@ -300,21 +306,23 @@ public function rollback($migration_names = '', array $options = [ * * @command dgi-migrate:list-migrations * - * @option all Process all migrations. - * @option group A comma-separated list of migration groups to import. - * @option tag Name of the migration tag to import. - * * @field-labels * id: Migration IDs * weight: Weight of the migration * @default-fields id,weight + * + * @option all Process all migrations. + * @option group A comma-separated list of migration groups to import. + * @option tag Name of the migration tag to import. + * @option sort Sort according to weight. */ public function listMigrations(array $options = [ 'all' => FALSE, 'group' => self::REQ, 'tag' => self::REQ, 'format' => 'csv', - ]) { + 'sort' => FALSE, + ]) : RowsOfFields { $generate_order = function () use ($options) { $migration_groups = $this->migrationsList('', $options); @@ -342,7 +350,15 @@ public function listMigrations(array $options = [ } }; - return new RowsOfFields(iterator_to_array($generate_order())); + $generated = iterator_to_array($generate_order()); + + if ($options['sort']) { + usort($generated, function ($a, $b) { + return $a['weight'] - $b['weight']; + }); + } + + return new RowsOfFields($generated); } /** @@ -401,12 +417,14 @@ protected static function getMigrateToolsLogger() : LoggerInterface { * source, update previously-imported items with the current data * @option sync Sync source and destination. Delete destination records that * do not exist in the source. + * @option run The ID of the run, if relevant. * * @islandora-drush-utils-user-wrap */ public function enqueueMigration(string $migration_id, array $options = [ 'update' => FALSE, 'sync' => FALSE, + 'run' => NULL, ]) : void { $executable = $this->getExecutable($migration_id, $options); // drush_op() provides --simulate support. @@ -422,12 +440,14 @@ public function enqueueMigration(string $migration_id, array $options = [ * source, update previously-imported items with the current data * @option sync Sync source and destination. Delete destination records that * do not exist in the source. + * @option run The ID of the run, if relevant. * * @islandora-drush-utils-user-wrap */ public function processEnqueuedMigration(string $migration_id, array $options = [ 'update' => FALSE, 'sync' => FALSE, + 'run' => NULL, ]) : void { $executable = $this->getExecutable($migration_id, $options); // drush_op() provides --simulate support. @@ -458,15 +478,36 @@ public function processEnqueuedMigration(string $migration_id, array $options = * source, update previously-imported items with the current data * @option sync Sync source and destination. Delete destination records that * do not exist in the source. + * @option run The ID of the run, if relevant. * * @islandora-drush-utils-user-wrap */ public function finishEnqueuedMigration(string $migration_id, array $options = [ 'update' => FALSE, 'sync' => FALSE, + 'run' => NULL, ]) { $executable = $this->getExecutable($migration_id, $options); drush_op([$executable, 'teardownMigration']); } + /** + * Enqueue STOMP "terminal" messages. + * + * @option priority The priority of the message. Higher should lead to earlier + * processing, allowing workers to be shutdown prior to completion of the + * queue. Defined by STOMP/JMS, an integer ranging from 0 to 9. Defaults to + * 4. + * + * @command dgi-migrate:enqueue-terminal + */ + public function enqueueTerminal(string $migration_id, string $run_id, array $options = [ + 'priority' => 4, + ]) { + $stomp_queue = StompQueue::create($migration_id, $run_id); + $stomp_queue->sendTerminal([ + 'priority' => $options['priority'] ?? 4, + ]); + } + } diff --git a/src/MigrateBatchExecutable.php b/src/MigrateBatchExecutable.php index eb23c5b5..aaba27fa 100644 --- a/src/MigrateBatchExecutable.php +++ b/src/MigrateBatchExecutable.php @@ -49,6 +49,13 @@ class MigrateBatchExecutable extends MigrateExecutable { */ protected $idMapStatuses; + /** + * The options passed. + * + * @var array + */ + protected array $options; + /** * {@inheritdoc} */ @@ -58,6 +65,7 @@ public function __construct(MigrationInterface $migration, MigrateMessageInterfa []; parent::__construct($migration, $message, $options); + $this->options = $options; $this->getQueue(); if (static::isCli()) { @@ -89,7 +97,9 @@ public function getQueueName() : string { */ protected function getQueue() : QueueInterface { if (!isset($this->queue)) { - $this->queue = \Drupal::queue($this->getQueueName(), TRUE); + $this->queue = ($this->options['run'] ?? FALSE) ? + StompQueue::create($this->migration->id(), $this->options['run']) : + \Drupal::queue($this->getQueueName(), TRUE); } return $this->queue; @@ -228,12 +238,14 @@ protected function enqueue() { // XXX: Nuke it, just in case. $queue = $this->getQueue(); $queue->deleteQueue(); + foreach ($source as $row) { $queue->createItem([ 'row' => $row, 'attempts' => 0, ]); } + return MigrationInterface::RESULT_COMPLETED; } @@ -358,26 +370,11 @@ protected function processRowFromQueue(Row $row) { * Batch context. */ public function processBatch(&$context) { - $sandbox =& $context['sandbox']; - - if (!isset($sandbox['total'])) { - $sandbox['total'] = $this->queue->numberOfItems(); - if ($sandbox['total'] === 0) { - $context['message'] = $this->t('Queue empty.'); - $context['finished'] = 1; - return; - } - } + $context['finished'] = 0; $queue = $this->getQueue(); - $get_current = function (bool $pre_delete = FALSE) use (&$sandbox, $queue) { - return $sandbox['total'] - $queue->numberOfItems() + ($pre_delete ? 1 : 0); - }; - $update_finished = function (bool $pre_delete = FALSE) use (&$context, &$sandbox, $get_current) { - $context['finished'] = $get_current($pre_delete) / $sandbox['total']; - }; + try { - $update_finished(); while ($context['finished'] < 1) { $item = $queue->claimItem(); if (!$item) { @@ -400,11 +397,10 @@ public function processBatch(&$context) { try { $status = $this->processRowFromQueue($row); - $context['message'] = $this->t('Migration "@migration": @current/@total; processed row with IDs: (@ids)', [ + $context['message'] = $this->t('Migration "@migration": @current; processed row with IDs: (@ids)', [ '@migration' => $this->migration->id(), - '@current' => $get_current(TRUE), + '@current' => $item->item_id ?? 'unknown', '@ids' => var_export($row->getSourceIdValues(), TRUE), - '@total' => $sandbox['total'], ]); if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { // XXX: Exceptions for flow control... maybe not the best, but works @@ -412,8 +408,7 @@ public function processBatch(&$context) { // phpcs:ignore DrupalPractice.General.ExceptionT.ExceptionT throw new MigrateBatchException($this->t('Stopping "@migration" after @current of @total', [ '@migration' => $this->migration->id(), - '@current' => $get_current(TRUE), - '@total' => $sandbox['total'], + '@current' => $item->item_id ?? 'unknown', ]), 1); } elseif ($status === MigrationInterface::RESULT_INCOMPLETE) { @@ -431,22 +426,20 @@ public function processBatch(&$context) { if ($item->data['attempts'] < 3) { // XXX: Not really making any progress, requeueing things, so don't // increment 'current'. - $context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); re-enqueueing. Exception info:@n@ex', [ + $context['message'] = $this->t('Migration "@migration": @current; encountered exception processing row with IDs: (@ids); re-enqueueing. Exception info:@n@ex', [ '@migration' => $this->migration->id(), - '@current' => $get_current(TRUE), + '@current' => $item->item_id ?? 'unknown', '@ids' => var_export($row->getSourceIdValues(), TRUE), - '@total' => $sandbox['total'], '@ex' => $e, '@n' => "\n", ]); $this->queue->createItem($item->data); } else { - $context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); attempts exhausted, failing. Exception info:@n@ex', [ + $context['message'] = $this->t('Migration "@migration": @current; encountered exception processing row with IDs: (@ids); attempts exhausted, failing. Exception info:@n@ex', [ '@migration' => $this->migration->id(), - '@current' => $get_current(TRUE), + '@current' => $item->item_id ?? 'unknown', '@ids' => var_export($row->getSourceIdValues(), TRUE), - '@total' => $sandbox['total'], '@ex' => $e, '@n' => "\n", ]); @@ -456,8 +449,6 @@ public function processBatch(&$context) { finally { $queue->deleteItem($item); } - - $update_finished(); } } catch (MigrateBatchException $e) { @@ -468,9 +459,6 @@ public function processBatch(&$context) { if ($e->getFinished() !== NULL) { $context['finished'] = $e->getFinished(); } - else { - $update_finished(); - } } } diff --git a/src/Plugin/migrate/process/LockingMigrationLookup.php b/src/Plugin/migrate/process/LockingMigrationLookup.php index 9b1f9f14..0a4c6e09 100644 --- a/src/Plugin/migrate/process/LockingMigrationLookup.php +++ b/src/Plugin/migrate/process/LockingMigrationLookup.php @@ -2,11 +2,16 @@ namespace Drupal\dgi_migrate\Plugin\migrate\process; +use Drupal\Component\Plugin\Exception\PluginNotFoundException; +use Drupal\Component\Utility\NestedArray; use Drupal\Core\Database\Connection; -use Drupal\Core\Lock\LockBackendInterface; +use Drupal\Core\File\FileSystemInterface; use Drupal\Core\Plugin\ContainerFactoryPluginInterface; use Drupal\migrate\MigrateException; use Drupal\migrate\MigrateExecutableInterface; +use Drupal\migrate\MigrateLookupInterface; +use Drupal\migrate\MigrateSkipProcessException; +use Drupal\migrate\MigrateStubInterface; use Drupal\migrate\Plugin\MigrateProcessInterface; use Drupal\migrate\Plugin\MigrationInterface; use Drupal\migrate\ProcessPluginBase; @@ -15,6 +20,23 @@ /** * Override upstream `migration_lookup` plugin, with some additional locking. + * + * @MigrateProcessPlugin( + * id = "dgi_migrate.process.locking_migration_lookup" + * ) + * + * Accepts all the same as the core "migration_lookup" plugin, in addition to: + * - "no_lock": Flag to explicitly skip locking, which should only be used when + * it is known that there's a one-to-one mapping between each set of paramters + * and each resultant value. + * - "lock_context_keys": A mapping of migrations IDs to arrays of maps, + * mapping: + * - "offset": An array of offsets indexing into the `$value` passed to the + * `::transform()` call, to allow the lock(s) acquired to be more + * specific. + * - "hash": An optional string representing a pattern. If provided every + * '#' found will be replaced with hexit resulting from hashing the value + * "offset". */ class LockingMigrationLookup extends ProcessPluginBase implements MigrateProcessInterface, ContainerFactoryPluginInterface { @@ -28,13 +50,6 @@ class LockingMigrationLookup extends ProcessPluginBase implements MigrateProcess */ protected MigrateProcessInterface $parent; - /** - * Lock service. - * - * @var \Drupal\Core\Lock\LockBackendInterface - */ - protected LockBackendInterface $lock; - /** * Memoized array of migrations referenced. * @@ -84,13 +99,62 @@ class LockingMigrationLookup extends ProcessPluginBase implements MigrateProcess */ protected MigrationInterface $migration; + /** + * An array of SplFileObjects, to facilitate locking. + * + * @var \SplFileObject[] + */ + protected array $lockFiles = []; + + /** + * The migration stub service. + * + * @var \Drupal\migrate\MigrateStubInterface + */ + protected MigrateStubInterface $migrateStub; + + /** + * The migration lookup service. + * + * @var \Drupal\migrate\MigrateLookupInterface + */ + protected MigrateLookupInterface $migrateLookup; + + /** + * The value from which to build the lock context. + * + * @var array + */ + protected array $lockContext; + + /** + * Array of lock context. + * + * @var array|mixed + */ + protected $lockContextKeys; + + /** + * The file system service. + * + * @var \Drupal\Core\File\FileSystemInterface + */ + protected FileSystemInterface $fileSystem; + /** * Constructor. */ public function __construct(array $configuration, $plugin_id, $plugin_definition) { parent::__construct($configuration, $plugin_id, $plugin_definition); - $this->doLocking = getenv('DGI_MIGRATE__DO_MIGRATION_LOOKUP_LOCKING') === 'TRUE'; + // We do not need to do the locking with `no_stub`, as we would not be + // creating any entities, so there would be no potential for creating + // duplicates. + $this->doLocking = empty($this->configuration['no_stub']) && + (getenv('DGI_MIGRATE__DO_MIGRATION_LOOKUP_LOCKING') === 'TRUE') && + !($this->configuration['no_lock'] ?? FALSE); + + $this->lockContextKeys = $this->configuration['lock_context_keys'] ?? []; } /** @@ -117,21 +181,55 @@ protected function getMigrations() : array { /** * List migrations mapped to lock names. * - * @return array - * An associative array mapping migration names to lock names. + * @return \Traversable + * Generated mapping of migration names to lock names. */ - protected function getLockMap() : array { + protected function getLockMap() : \Traversable { if (!isset($this->lockMap)) { $this->lockMap = array_combine( $this->getMigrations(), array_map([$this, 'getLockName'], $this->getMigrations()) ); } + if (!$this->lockMap) { throw new MigrateException('Failed to map migration IDs to lock names.'); } - return $this->lockMap; + if ($this->lockContextKeys) { + $apply_context_keys = function ($migration, $name) { + $parts = ["{$name}-extra_context"]; + + if (isset($this->lockContextKeys[$migration])) { + foreach ($this->lockContextKeys[$migration] as $info) { + $value = NestedArray::getValue($this->lockContext, $info['offset']); + + if (($prefix = ($info['hash'] ?? FALSE))) { + $hash = md5($value, FALSE); + $prefix_offset = 0; + $hash_offset = 0; + while (($prefix_offset = strpos($prefix, '#', $prefix_offset)) !== FALSE) { + $prefix[$prefix_offset++] = $hash[$hash_offset++]; + } + $parts[] = $prefix; + } + else { + $parts[] = $value; + } + } + } + + return implode('/', $parts); + }; + foreach ($this->lockMap as $migration => $original_name) { + yield $migration => $apply_context_keys($migration, $original_name); + } + } + else { + foreach ($this->lockMap as $migration => $name) { + yield $migration => $name; + } + } } /** @@ -144,36 +242,40 @@ protected function getLockMap() : array { * The lock name to use for the given migration. */ protected function getLockName(string $migration_name) : string { - // XXX: May have to get creative with hashing... thinking max lock name is - // something like 255 chars? - // XXX: 255 character limit may be a non-issue?: https://api.drupal.org/api/drupal/core%21lib%21Drupal%21Core%21Lock%21DatabaseLockBackend.php/function/DatabaseLockBackend%3A%3AnormalizeName/10 - return "dgi_migrate_locking_migration_lookup__migration__$migration_name"; + return "dgi_migrate/locking_migration_lookup/migration/$migration_name"; } /** * Acquire all migration locks. * - * @param float $timeout - * The time for which to acquire the locks. - * * @throws \Drupal\migrate\MigrateException */ - protected function acquireMigrationLocks(float $timeout = 30.0) : void { + protected function acquireMigrationLocks(int $mode = LOCK_EX, bool &$would_block = FALSE) : bool { try { - if (!$this->getControlLock()) { - throw new MigrateException('Failed to acquire control lock.'); + $lock_map = iterator_to_array($this->getLockMap()); + if (count($lock_map) > 1) { + // More than one, we need to acquire the "control" lock before + // proceeding, to avoid potential deadlocks. + if (!$this->getControlLock()) { + throw new MigrateException('Failed to acquire control lock.'); + } } + if (!$this->hasMigrationLocks) { + // Don't have 'em yet; initial acquisition. $this->hasMigrationLocks = TRUE; - foreach ($this->getLockMap() as $migration => $lock_name) { - while (!$this->lock->acquire($lock_name, $timeout)) { - while ($this->lock->wait($lock_name)); - } - if (!$this->lock->acquire($lock_name, $timeout)) { - throw new MigrateException("Failed to acquire lock for '$migration'."); - } + } + else { + // Attempting to "promote" the locks, no need to set that we have 'em. + } + + foreach ($lock_map as $lock_name) { + if (!$this->acquireLock($lock_name, $mode, $would_block)) { + return FALSE; } } + + return TRUE; } finally { $this->releaseControlLock(); @@ -188,7 +290,7 @@ protected function acquireMigrationLocks(float $timeout = 30.0) : void { protected function releaseMigrationLocks() : void { if ($this->hasMigrationLocks) { foreach ($this->getLockMap() as $lock_name) { - $this->lock->release($lock_name); + $this->releaseLock($lock_name); } $this->hasMigrationLocks = FALSE; } @@ -201,18 +303,77 @@ protected function releaseMigrationLocks() : void { * TRUE if we acquired it; otherwise, FALSE. */ protected function getControlLock() : bool { - while (!($this->hasControl = $this->lock->acquire(static::CONTROL_LOCK, 600))) { - while ($this->lock->wait(static::CONTROL_LOCK)); + if (!$this->hasControl) { + $this->hasControl = $this->acquireLock(static::CONTROL_LOCK); } return $this->hasControl; } + /** + * Get an \SplFileObject instance to act as the lock. + * + * @param string $name + * The name of the lock to acquire. Should result in a file being created + * under the temporary:// scheme of the same name, against which `flock` + * commands will be issued. + * + * @return \SplFileObject + * The \SplFileObject instance against which to lock. + */ + protected function getLockFile(string $name) : \SplFileObject { + if (!isset($this->lockFiles[$name])) { + $file_name = "temporary://{$name}"; + $directory = $this->fileSystem->dirname($file_name); + $basename = $this->fileSystem->basename($file_name); + $this->fileSystem->prepareDirectory($directory, FileSystemInterface::CREATE_DIRECTORY); + $file_name = "{$directory}/{$basename}"; + + touch($file_name); + $this->lockFiles[$name] = $file = new \SplFileObject($file_name, 'a+'); + } + + return $this->lockFiles[$name]; + } + + /** + * Helper; acquire the lock. + * + * @param string $name + * The name of the lock to acquire. + * @param int $mode + * The mode with which to acquire the lock. + * @param bool $would_block + * A reference to a boolean, to be updated if called with LOCK_NB and the + * call _would_ have blocked. + * + * @return bool + * TRUE on success. Should not be able to return FALSE, as we perform this + * in a blocking manner. + */ + protected function acquireLock(string $name, int $mode = LOCK_EX, bool &$would_block = FALSE) : bool { + return $this->getLockFile($name)->flock($mode, $would_block); + } + + /** + * Helper; Release the given lock. + * + * @param string $name + * The name of the lock to release. + * + * @return bool + * TRUE on success. Should not be able to return FALSE, unless we maybe did + * not hold the lock? + */ + protected function releaseLock(string $name) : bool { + return $this->getLockFile($name)->flock(LOCK_UN); + } + /** * Helper; release control lock. */ protected function releaseControlLock() { if ($this->hasControl) { - $this->lock->release(static::CONTROL_LOCK); + $this->releaseLock(static::CONTROL_LOCK); $this->hasControl = FALSE; } } @@ -232,35 +393,188 @@ public function transform($value, MigrateExecutableInterface $migrate_executable return $this->parent->transform($value, $migrate_executable, $row, $destination_property); } - $transaction = $this->database->startTransaction(); try { - // Acquire locks for all referenced migrations. - $log('Locking migrations.'); - $this->acquireMigrationLocks(); - $log('Locked migrations, running parent.'); - - // Perform the lookup as per the wrapped transform. - $result = $this->parent->transform($value, $migrate_executable, $row, $destination_property); - $log("Parent run, commit transaction and releasing migration locks."); - unset($transaction); - // Optimistically drop migration locks. + $this->setLockContext((array) $value); + return $this->doTransform($value, $migrate_executable, $row, $destination_property); + } + finally { + // Drop migration locks, if we still have them. $this->releaseMigrationLocks(); - $log("Releasing migration locks, returning."); - return $result; + $this->setLockContext(NULL); } - catch (\Exception $e) { - if (isset($transaction)) { - $transaction->rollBack(); + + } + + /** + * Set values for more-specific locking. + * + * @param mixed $values + * The values to set; or: NULL to reset. + */ + protected function setLockContext($values = []) : void { + if ($values === NULL) { + // Clear the context. + unset($this->lockContext); + } + $values = (array) $values; + $this->lockContext = $values; + } + + /** + * Locking transformation. + * + * Adapted from the core `migration_lookup`, with locks sprinkled in. + * + * @throws \Drupal\migrate\MigrateSkipRowException + * @throws \Drupal\migrate\MigrateException + * + * @see https://git.drupalcode.org/project/drupal/-/blob/9.5.x/core/modules/migrate/src/Plugin/migrate/process/MigrationLookup.php#L194-276 + */ + protected function doTransform($value, MigrateExecutableInterface $migrate_executable, Row $row, $destination_property) { + $context = [ + 'self' => FALSE, + 'source_id_values' => [], + 'lookup_migration_ids' => (array) $this->configuration['migration'], + ]; + $lookup_migration_ids =& $context['lookup_migration_ids']; + + try { + // Acquire shared lock to do the lookup. + $this->acquireMigrationLocks(LOCK_SH); + $destination_ids = $this->doLookup($value, $migrate_executable, $row, $destination_property, $context); + + if (!$destination_ids && !empty($this->configuration['no_stub'])) { + return NULL; + } + + if (!$destination_ids && ($context['self'] || isset($this->configuration['stub_id']) || count($lookup_migration_ids) == 1)) { + // Non-blockingly attempt to promote lock from shared to exclusive. Drop + // shared lock and reacquire as exclusive if we would block, to avoid + // potential deadlock. + $would_block = FALSE; + if (!$this->acquireMigrationLocks(LOCK_EX | LOCK_NB, $would_block) && $would_block) { + $this->releaseMigrationLocks(); + $this->acquireMigrationLocks(LOCK_EX); + + // Attempt lookup again, as something might have populated it while we + // were blocked attempting to acquire the exclusive lock. + $destination_ids = $this->doLookup($value, $migrate_executable, $row, $destination_property, $context); + } + if (!$destination_ids) { + $destination_ids = $this->doStub($context); + } + } + + if ($destination_ids) { + if (count($destination_ids) == 1) { + return reset($destination_ids); + } + else { + return $destination_ids; + } } - throw $e; } finally { - // Drop migration locks, if we still have them. $this->releaseMigrationLocks(); } } + /** + * Perform the lookup proper. + * + * @return array|null + * The array of destination ID info. + * + * @throws \Drupal\migrate\MigrateException + * @throws \Drupal\migrate\MigrateSkipProcessException + * + * @see https://git.drupalcode.org/project/drupal/-/blob/9.5.x/core/modules/migrate/src/Plugin/migrate/process/MigrationLookup.php#L194-229 + */ + protected function doLookup($value, MigrateExecutableInterface $migrate_executable, Row $row, $destination_property, array &$context) : ?array { + $source_id_values =& $context['source_id_values']; + $lookup_migration_ids =& $context['lookup_migration_ids']; + + $destination_ids = NULL; + foreach ($lookup_migration_ids as $lookup_migration_id) { + $lookup_value = $value; + if ($lookup_migration_id == $this->migration->id()) { + $context['self'] = TRUE; + } + if (isset($this->configuration['source_ids'][$lookup_migration_id])) { + $lookup_value = array_values($row->getMultiple($this->configuration['source_ids'][$lookup_migration_id])); + } + $lookup_value = (array) $lookup_value; + $this->skipInvalid($lookup_value); + $source_id_values[$lookup_migration_id] = $lookup_value; + + // Re-throw any PluginException as a MigrateException so the executable + // can shut down the migration. + try { + $destination_id_array = $this->migrateLookup->lookup($lookup_migration_id, $lookup_value); + } + catch (PluginNotFoundException $e) { + $destination_id_array = []; + } + catch (MigrateException $e) { + throw $e; + } + catch (\Exception $e) { + throw new MigrateException(sprintf('A %s was thrown while processing this migration lookup', gettype($e)), $e->getCode(), $e); + } + + if ($destination_id_array) { + $destination_ids = array_values(reset($destination_id_array)); + break; + } + } + + return $destination_ids; + } + + /** + * Perform stub creation. + * + * @throws \Drupal\migrate\MigrateSkipProcessException + * @throws \Drupal\migrate\MigrateException + * + * @see https://git.drupalcode.org/project/drupal/-/blob/9.5.x/core/modules/migrate/src/Plugin/migrate/process/MigrationLookup.php#L236-267 + */ + protected function doStub(&$context) { + $self =& $context['self']; + $source_id_values =& $context['source_id_values']; + $lookup_migration_ids =& $context['lookup_migration_ids']; + + // If the lookup didn't succeed, figure out which migration will do the + // stubbing. + if ($self) { + $stub_migration = $this->migration->id(); + } + elseif (isset($this->configuration['stub_id'])) { + $stub_migration = $this->configuration['stub_id']; + } + else { + $stub_migration = reset($lookup_migration_ids); + } + // Rethrow any exception as a MigrateException so the executable can shut + // down the migration. + try { + return $this->migrateStub->createStub($stub_migration, $source_id_values[$stub_migration], [], FALSE); + } + catch (\LogicException | PluginNotFoundException $e) { + // For BC reasons, we must allow attempting to stub: + // - a derived migration; and, + // - a non-existent migration. + } + catch (MigrateException | MigrateSkipProcessException $e) { + throw $e; + } + catch (\Exception $e) { + throw new MigrateException(sprintf('%s was thrown while attempting to stub: %s', get_class($e), $e->getMessage()), $e->getCode(), $e); + } + + } + /** * {@inheritDoc} */ @@ -270,11 +584,48 @@ public static function create(ContainerInterface $container, array $configuratio /** @var \Drupal\Component\Plugin\PluginManagerInterface $process_plugin_manager */ $process_plugin_manager = $container->get('plugin.manager.migrate.process'); $instance->parent = $process_plugin_manager->createInstance('dgi_migrate_original_migration_lookup', $configuration, $migration); - $instance->lock = $container->get('lock'); $instance->database = $container->get('database'); $instance->migration = $migration; + $instance->migrateStub = $container->get('migrate.stub'); + $instance->migrateLookup = $container->get('migrate.lookup'); + $instance->fileSystem = $container->get('file_system'); return $instance; } + /** + * Skips the migration process entirely if the value is invalid. + * + * Copypasta from upstream. + * + * @param array $value + * The incoming value to check. + * + * @throws \Drupal\migrate\MigrateSkipProcessException + * + * @see https://git.drupalcode.org/project/drupal/-/blob/9.5.x/core/modules/migrate/src/Plugin/migrate/process/MigrationLookup.php#L279-291 + */ + protected function skipInvalid(array $value) { + if (!array_filter($value, [$this, 'isValid'])) { + throw new MigrateSkipProcessException(); + } + } + + /** + * Determines if the value is valid for lookup. + * + * The only values considered invalid are: NULL, FALSE, [] and "". + * + * @param string $value + * The value to test. + * + * @return bool + * Return true if the value is valid. + * + * @see https://git.drupalcode.org/project/drupal/-/blob/9.5.x/core/modules/migrate/src/Plugin/migrate/process/MigrationLookup.php#L293-306 + */ + protected function isValid($value) { + return !in_array($value, [NULL, FALSE, [], ""], TRUE); + } + } diff --git a/src/StompQueue.php b/src/StompQueue.php new file mode 100644 index 00000000..bd9ef9ee --- /dev/null +++ b/src/StompQueue.php @@ -0,0 +1,263 @@ +stomp = $stomp; + $this->logger = $logger; + $this->name = $name; + $this->group = $group; + } + + /** + * Static factory method. + * + * @param string $name + * The name of the migration for which to manage the queue. + * @param string $group + * The run number of the migration, for which to manage the queue. + * + * @return static + */ + public static function create(string $name, string $group) { + return new static( + \Drupal::service('islandora.stomp'), + \Drupal::logger('dgi_migrate.stomp_queue'), + $name, + $group + ); + } + + /** + * {@inheritDoc} + */ + public function createItem($data) { + $id = $this->serial++; + + $body = new \stdClass(); + $body->data = $data; + $body->item_id = $id; + $body->created = time(); + + $message = new Message( + serialize($body), + [ + 'type' => 'to_process', + 'dgi_migrate_migration' => $this->name, + 'dgi_migrate_run_id' => $this->group, + 'persistent' => 'true', + ] + ); + + $this->stomp->send( + $this->getQueueName(), + $message + ); + + return $id; + } + + /** + * Send a "terminal" message. + * + * Should be one for each worker we intend to start. + * + * @param array $extra_headers + * Extra headers to send on the message. + */ + public function sendTerminal(array $extra_headers = []) { + $message = new Message( + '', + $extra_headers + [ + 'type' => 'terminal', + 'dgi_migrate_migration' => $this->name, + 'dgi_migrate_run_id' => $this->group, + 'persistent' => 'true', + ] + ); + + $this->stomp->send( + $this->getQueueName(), + $message + ); + } + + /** + * {@inheritDoc} + */ + public function numberOfItems() { + // XXX: If called near the end, should be approximately the number of items + // in the queue. + return $this->serial; + } + + /** + * Helper; get queue name. + * + * @return string + * The name of the queue with which to communicate. + */ + protected function getQueueName() { + return "/queue/dgi_migrate_{$this->name}_{$this->group}"; + } + + /** + * Helper; subscribe to the queue if we are not yet subscribed. + */ + protected function subscribe() { + if (!$this->subscribed) { + $this->stomp->subscribe( + $this->getQueueName(), + NULL, + 'client' + ); + + $this->subscribed = TRUE; + } + } + + /** + * {@inheritDoc} + */ + public function claimItem($lease_time = 3600) { + $this->subscribe(); + + // XXX: The STOMP client has an associated timeout out, after which it will + // return that it failed to read anything. We expect a "terminal" message to + // be added to the queue for each worker, telling them to quit. + while (($frame = $this->stomp->read()) === FALSE) { + $this->logger->debug('Not signalled; polling again.'); + } + $headers = $frame->getHeaders(); + if (array_key_exists('type', $headers) && $headers['type'] === 'terminal') { + // Got a terminal message; ack-knowledge it and flag the queue's empty. + $this->stomp->ack($frame); + return FALSE; + } + + $to_return = unserialize($frame->getBody(), [ + 'allowed_classes' => [ + Row::class, + \stdClass::class, + ], + ]); + $to_return->frame = $frame; + return $to_return; + } + + /** + * {@inheritDoc} + */ + public function deleteItem($item) { + $this->stomp->ack($item->frame); + } + + /** + * {@inheritDoc} + */ + public function releaseItem($item) { + $this->stomp->nack($item->frame); + } + + /** + * {@inheritDoc} + */ + public function createQueue() { + // No-op. + } + + /** + * {@inheritDoc} + */ + public function deleteQueue() { + // No-op... can't delete via STOMP. + } + + /** + * {@inheritDoc} + */ + public function __sleep() { + $vars = $this->dstSleep(); + + $to_suppress = [ + // XXX: Avoid serializing some things that we don't need. + 'subscribed', + 'signalled', + ]; + foreach ($to_suppress as $value) { + $key = array_search($value, $vars); + if ($key !== FALSE) { + unset($vars[$key]); + } + } + + return $vars; + } + +}