From d2e0e01e131b2882f5214cf9851739fd815d5519 Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Fri, 2 Jun 2023 13:05:06 -0300 Subject: [PATCH 01/29] Status skipping and pausability. --- scripts/env.sample | 32 ++++++++++++++++++++++++++ scripts/util.in | 56 +++++++++++++++++++++++++++++++++++++++------- 2 files changed, 80 insertions(+), 8 deletions(-) diff --git a/scripts/env.sample b/scripts/env.sample index a0f9b787..4f9e5442 100644 --- a/scripts/env.sample +++ b/scripts/env.sample @@ -58,3 +58,35 @@ # PROCESSES: The number of processes to use to run the migration import. # --- #PROCESSES=1 + +# === +# SKIP_STATUS: Suppress dumping of migration status before/after operations. +# --- +# Default: +#SKIP_STATUS=false +# To skip, uncomment (or equivalently set): +#SKIP_STATUS=true + +# === +# MULTIPROCESS_PRE_ENQUEUE_PAUSE: Pause execution before enqueuing these. +# +# Expected to be a Bash array; in this instance, just a set of strings +# (representing migration IDs) between parentheses. +# --- +#MULTIPROCESS_PRE_ENQUEUE_PAUSE=() + +# === +# MULTIPROCESS_POST_ENQUEUE_PAUSE: Pause execution after enqueuing these. +# +# Expected to be a Bash array; in this instance, just a set of strings +# (representing migration IDs) between parentheses. +# --- +#MULTIPROCESS_POST_ENQUEUE_PAUSE=() + +# === +# MULTIPROCESS_POST_PROCESS_PAUSE: Pause execution after finishing these. +# +# Expected to be a Bash array; in this instance, just a set of strings +# (representing migration IDs) between parentheses. +# --- +#MULTIPROCESS_POST_PROCESS_PAUSE=() diff --git a/scripts/util.in b/scripts/util.in index 6f19bc76..e66afcc0 100644 --- a/scripts/util.in +++ b/scripts/util.in @@ -65,6 +65,10 @@ function init_vars () { declare -g TIME=${TIME:-/usr/bin/time} declare -g LOG_DIR=${LOG_DIR:-$CONFIG_DIR} declare -g PROCESSES=${PROCESSES:-1} + declare -g SKIP_STATUS=false + declare -g -a MULTIPROCESS_PRE_ENQUEUE_PAUSE=(${MULTIPROCESS_PRE_ENQUEUE_PAUSE[@]}) + declare -g -a MULTIPROCESS_POST_ENQUEUE_PAUSE=(${MULTIPROCESS_POST_ENQUEUE_PAUSE[@]}) + declare -g -a MULTIPROCESS_POST_PROCESS_PAUSE=(${MULTIPROCESS_POST_PROCESS_PAUSE[@]}) # Initialize the log directory. if ! [ -d "$LOG_DIR" ]; then @@ -96,6 +100,19 @@ function do_migration_single_process() { timedwwwdrush dgi-migrate:import "--root=$DRUPAL_ROOT" "--uri=$URI" "--user=$DRUPAL_USER" "--group=$MIGRATION_GROUP" "${@:2}" } +# Helper; facilitate pausing for various reasons (likely snapshotting). +# +# Positional args: +# - 1: A descripitive string of _when_ we are pausing; e.g. "pre-enqueue", +# "post-enqueue", etc. +# - 2: The ID of the specific migration during which we are pausing. +function do_pause() { + local WHEN=$1 + local MIGRATION_ID=$2 + + read -p "Pausing $WHEN of $MIGRATION_ID as requested. Hit enter to continue." +} + # Kick off a migration in multiple processes. # # Positional args: @@ -112,19 +129,42 @@ function do_migration_multi_process() { echo "Listing migrations..." timedwwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --format=string \ | sort -n --key=2 | cut -f1 | while read MIGRATION_ID; do + if [ " ${MULTIPROCESS_PRE_ENQUEUE_PAUSE[@]} " =~ " $MIGRATION_ID " ]; then + do_pause "pre-enqueue" "$MIGRATION_ID" + fi + echo "Enqueuing items for $MIGRATION_ID" - timedwwwdrush dgi-migrate:enqueue --user=$DRUPAL_USER $MIGRATION_ID "${@:2}" + timedwwwdrush dgi-migrate:enqueue "--user=$DRUPAL_USER" "$MIGRATION_ID" "${@:2}" + + if [ " ${MULTIPROCESS_POST_ENQUEUE_PAUSE[@]} " =~ " $MIGRATION_ID " ]; then + do_pause "post-enqueue" "$MIGRATION_ID" + fi + echo "Starting $PROCESSES processes to process $MIGRATION_ID." for i in $(seq 1 $PROCESSES); do echo "Starting $i/$PROCESSES to process $MIGRATION_ID." - timedwwwdrush dgi-migrate:enqueued-process --user=$DRUPAL_USER $MIGRATION_ID "${@:2}" &> $PROCESS_LOG_DIR/$MIGRATION_ID.$i.log & + timedwwwdrush dgi-migrate:enqueued-process "--user=$DRUPAL_USER" "$MIGRATION_ID" "${@:2}" &> $PROCESS_LOG_DIR/"$MIGRATION_ID.$i.log" & done echo "Waiting for processes to exit..." wait - timedwwwdrush dgi-migrate:finish-enqueued-process --user=1 $MIGRATION_ID "${@:2}" + timedwwwdrush dgi-migrate:finish-enqueued-process "--user=$DRUPAL_USER" "$MIGRATION_ID" "${@:2}" + + if [ " ${MULTIPROCESS_POST_PROCESS_PAUSE[@]} " =~ " $MIGRATION_ID " ]; then + do_pause "post-process" "$MIGRATION_ID" + fi done } +# Dump status for the given migration group. +# +# Can be skipped if SKIP_STATUS=true; in which case the call to this should be +# no-op. +function dump_status() { + if [ $SKIP_STATUS != 'true']; then + wwwdrush migrate:status --group=$MIGRATION_GROUP + fi +} + # Handle kicking off a migration. # # Positional args: @@ -146,7 +186,7 @@ function do_migration () { # (lookin' at you, dgi_migrate_foxml_standard_mods_xslt dealio) wwwdrush cache:rebuild # Dump status before run. - wwwdrush migrate:status --group=$MIGRATION_GROUP + dump_status { # Do the import, one way or another. if [ $PROCESSES -eq 1 ]; then @@ -156,10 +196,10 @@ function do_migration () { fi } |& wwwdo tee $IMPORT_LOG > /dev/null # Dump status after run. - wwwdrush migrate:status --group=$MIGRATION_GROUP + dump_status # Dump messages after run, so they're not lost with a subsequent run. wwwdo mkdir -p $MESSAGES_DIR - wwwdrush migrate:status --group=$MIGRATION_GROUP --field=id --format=string | \ + wwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --format=string | \ while read NAME ; do wwwdrush migrate:messages --format=json $NAME | wwwdo tee "$MESSAGES_DIR/$NAME.json" > /dev/null done @@ -202,11 +242,11 @@ function do_rollback () { # (lookin' at you, things involving Fedora dealio) wwwdrush cache:rebuild # Dump status before rollback. - wwwdrush migrate:status --group=$MIGRATION_GROUP + dump_status # The base rollback. timedwwwdrush dgi-migrate:rollback --user=$DRUPAL_USER --group=$MIGRATION_GROUP "${@:2}" |& wwwdo tee $ROLLBACK_LOG > /dev/null # Dump status after rollback. - wwwdrush migrate:status --group=$MIGRATION_GROUP + dump_status set +x } |& wwwdo tee $RUN_LOG From 4884cdb9cb69f8620b7f67d928e126a2093152c8 Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Fri, 2 Jun 2023 13:13:37 -0300 Subject: [PATCH 02/29] Allow migrations to be skipped. More intended to be used to resume migrations that didn't quite complete. --- scripts/env.sample | 8 ++++++++ scripts/util.in | 6 +++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/scripts/env.sample b/scripts/env.sample index 4f9e5442..64a080d4 100644 --- a/scripts/env.sample +++ b/scripts/env.sample @@ -67,6 +67,14 @@ # To skip, uncomment (or equivalently set): #SKIP_STATUS=true +# === +# MULTIPROCESS_SKIP_MIGRATIONS: Skip processing the specified migrations. +# +# May be of some use in resuming larger migrations, when we do not wish to +# undertake no-op cycling through other migrations. +# --- +#MULTIPROCESS_SKIP_MIGRATIONS=() + # === # MULTIPROCESS_PRE_ENQUEUE_PAUSE: Pause execution before enqueuing these. # diff --git a/scripts/util.in b/scripts/util.in index e66afcc0..efbee9b3 100644 --- a/scripts/util.in +++ b/scripts/util.in @@ -66,6 +66,7 @@ function init_vars () { declare -g LOG_DIR=${LOG_DIR:-$CONFIG_DIR} declare -g PROCESSES=${PROCESSES:-1} declare -g SKIP_STATUS=false + declare -g -a MULTIPROCESS_SKIP_MIGRATIONS=(${MULTIPROCESS_SKIP_MIGRATIONS[@]}) declare -g -a MULTIPROCESS_PRE_ENQUEUE_PAUSE=(${MULTIPROCESS_PRE_ENQUEUE_PAUSE[@]}) declare -g -a MULTIPROCESS_POST_ENQUEUE_PAUSE=(${MULTIPROCESS_POST_ENQUEUE_PAUSE[@]}) declare -g -a MULTIPROCESS_POST_PROCESS_PAUSE=(${MULTIPROCESS_POST_PROCESS_PAUSE[@]}) @@ -129,7 +130,10 @@ function do_migration_multi_process() { echo "Listing migrations..." timedwwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --format=string \ | sort -n --key=2 | cut -f1 | while read MIGRATION_ID; do - if [ " ${MULTIPROCESS_PRE_ENQUEUE_PAUSE[@]} " =~ " $MIGRATION_ID " ]; then + if [ " ${MULTIPROCESS_SKIP_MIGRATIONS[@]} " =~ " $MIGRATION_ID " ]; then + echo "Skipping $MIGRATION_ID as requested." + continue + elif [ " ${MULTIPROCESS_PRE_ENQUEUE_PAUSE[@]} " =~ " $MIGRATION_ID " ]; then do_pause "pre-enqueue" "$MIGRATION_ID" fi From 4a38db0222fb1dc67241bf4f92e3eccd35161c0e Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Fri, 2 Jun 2023 15:10:59 -0300 Subject: [PATCH 03/29] Fix defaulting for SKIP_STATUS. --- scripts/util.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/util.in b/scripts/util.in index efbee9b3..00f100d9 100644 --- a/scripts/util.in +++ b/scripts/util.in @@ -65,7 +65,7 @@ function init_vars () { declare -g TIME=${TIME:-/usr/bin/time} declare -g LOG_DIR=${LOG_DIR:-$CONFIG_DIR} declare -g PROCESSES=${PROCESSES:-1} - declare -g SKIP_STATUS=false + declare -g SKIP_STATUS=${SKIP_STATUS:-false} declare -g -a MULTIPROCESS_SKIP_MIGRATIONS=(${MULTIPROCESS_SKIP_MIGRATIONS[@]}) declare -g -a MULTIPROCESS_PRE_ENQUEUE_PAUSE=(${MULTIPROCESS_PRE_ENQUEUE_PAUSE[@]}) declare -g -a MULTIPROCESS_POST_ENQUEUE_PAUSE=(${MULTIPROCESS_POST_ENQUEUE_PAUSE[@]}) From f0e8705eea645665d87d545f50021e2c521a235d Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Sun, 4 Jun 2023 18:46:54 -0300 Subject: [PATCH 04/29] Theoretical STOMP queueing. --- scripts/util.in | 10 +- src/Commands/MigrateCommands.php | 12 +- src/MigrateBatchExecutable.php | 49 +++----- src/StompQueue.php | 191 +++++++++++++++++++++++++++++++ 4 files changed, 219 insertions(+), 43 deletions(-) create mode 100644 src/StompQueue.php diff --git a/scripts/util.in b/scripts/util.in index 214ddc2b..f2c337a3 100644 --- a/scripts/util.in +++ b/scripts/util.in @@ -114,13 +114,19 @@ function do_migration_multi_process() { | sort -n --key=2 | cut -f1 | while read MIGRATION_ID; do echo "Enqueuing items for $MIGRATION_ID" timedwwwdrush dgi-migrate:enqueue --user=$DRUPAL_USER $MIGRATION_ID "${@:2}" + local ENQUEUEING_JOB=$! echo "Starting $PROCESSES processes to process $MIGRATION_ID." + local -a PROCESS_JOBS=() for i in $(seq 1 $PROCESSES); do echo "Starting $i/$PROCESSES to process $MIGRATION_ID." timedwwwdrush dgi-migrate:enqueued-process --user=$DRUPAL_USER $MIGRATION_ID "${@:2}" &> $PROCESS_LOG_DIR/$MIGRATION_ID.$i.log & + PROCESS_JOBS+=($!) done - echo "Waiting for processes to exit..." - wait + wait $ENQUEUEING_JOB + echo "Enqueing finished; signalling workers." + kill -SIGUSR1 ${PROCESS_JOBS[@]} + echo "Workers signalled; Waiting for them to finish..." + wait ${PROCESS_JOBS[@]} timedwwwdrush dgi-migrate:finish-enqueued-process --user=1 $MIGRATION_ID "${@:2}" done } diff --git a/src/Commands/MigrateCommands.php b/src/Commands/MigrateCommands.php index 58c77c05..f4462e99 100644 --- a/src/Commands/MigrateCommands.php +++ b/src/Commands/MigrateCommands.php @@ -356,10 +356,10 @@ public function listMigrations(array $options = [ * * @throws \Drupal\Component\Plugin\Exception\PluginException */ - protected function getExecutable(string $migration_id, array $options = []) : MigrateBatchExecutable { + protected function getExecutable(string $migration_id, string $run_id, array $options = []) : MigrateBatchExecutable { /** @var \Drupal\migrate\Plugin\MigrationInterface $migration */ $migration = $this->migrationPluginManager->createInstance($migration_id); - return new MigrateBatchExecutable($migration, $this->getMigrateMessage(), $options); + return new MigrateBatchExecutable($migration, $this->getMigrateMessage(), $options, $run_id); } /** @@ -374,11 +374,11 @@ protected function getExecutable(string $migration_id, array $options = []) : Mi * * @islandora-drush-utils-user-wrap */ - public function enqueueMigration(string $migration_id, array $options = [ + public function enqueueMigration(string $migration_id, string $run_id, array $options = [ 'update' => FALSE, 'sync' => FALSE, ]) : void { - $executable = $this->getExecutable($migration_id, $options); + $executable = $this->getExecutable($migration_id, $run_id, $options); // drush_op() provides --simulate support. drush_op([$executable, 'prepareBatch']); } @@ -395,11 +395,11 @@ public function enqueueMigration(string $migration_id, array $options = [ * * @islandora-drush-utils-user-wrap */ - public function processEnqueuedMigration(string $migration_id, array $options = [ + public function processEnqueuedMigration(string $migration_id, string $run_id, array $options = [ 'update' => FALSE, 'sync' => FALSE, ]) : void { - $executable = $this->getExecutable($migration_id, $options); + $executable = $this->getExecutable($migration_id, $run_id, $options); // drush_op() provides --simulate support. $batch = [ 'title' => $this->t('Running migration: @migration', [ diff --git a/src/MigrateBatchExecutable.php b/src/MigrateBatchExecutable.php index 66293c4b..0f28f590 100644 --- a/src/MigrateBatchExecutable.php +++ b/src/MigrateBatchExecutable.php @@ -49,10 +49,13 @@ class MigrateBatchExecutable extends MigrateExecutable { */ protected $idMapStatuses; + protected string $runId; + /** * {@inheritdoc} */ - public function __construct(MigrationInterface $migration, MigrateMessageInterface $message, array $options = []) { + public function __construct(MigrationInterface $migration, MigrateMessageInterface $message, array $options = [], string $run_id = NULL) { + $this->runId = $run_id; $this->idMapStatuses = isset($options['statuses']) ? StatusFilter::mapStatuses($options['statuses']) : []; @@ -78,7 +81,7 @@ public function __construct(MigrationInterface $migration, MigrateMessageInterfa * The name of the queue. */ public function getQueueName() : string { - return "dgi_migrate__batch_queue__{$this->migration->id()}"; + return "dgi_migrate__{$this->migration->id()}"; } /** @@ -89,7 +92,7 @@ public function getQueueName() : string { */ protected function getQueue() : QueueInterface { if (!isset($this->queue)) { - $this->queue = \Drupal::queue($this->getQueueName(), TRUE); + $this->queue = StompQueue::create($this->migration->id(), $this->runId); } return $this->queue; @@ -331,24 +334,9 @@ protected function processRowFromQueue(Row $row) { public function processBatch(&$context) { $sandbox =& $context['sandbox']; - if (!isset($sandbox['total'])) { - $sandbox['total'] = $this->queue->numberOfItems(); - if ($sandbox['total'] === 0) { - $context['message'] = $this->t('Queue empty.'); - $context['finished'] = 1; - return; - } - } - $queue = $this->getQueue(); - $get_current = function (bool $pre_delete = FALSE) use (&$sandbox, $queue) { - return $sandbox['total'] - $queue->numberOfItems() + ($pre_delete ? 1 : 0); - }; - $update_finished = function (bool $pre_delete = FALSE) use (&$context, &$sandbox, $get_current) { - $context['finished'] = $get_current($pre_delete) / $sandbox['total']; - }; + try { - $update_finished(); while ($context['finished'] < 1) { $item = $queue->claimItem(); if (!$item) { @@ -371,11 +359,10 @@ public function processBatch(&$context) { try { $status = $this->processRowFromQueue($row); - $context['message'] = $this->t('Migration "@migration": @current/@total; processed row with IDs: (@ids)', [ + $context['message'] = $this->t('Migration "@migration": @current; processed row with IDs: (@ids)', [ '@migration' => $this->migration->id(), - '@current' => $get_current(TRUE), + '@current' => $item->item_id ?? 'unknown', '@ids' => var_export($row->getSourceIdValues(), TRUE), - '@total' => $sandbox['total'], ]); if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { // XXX: Exceptions for flow control... maybe not the best, but works @@ -383,8 +370,7 @@ public function processBatch(&$context) { // phpcs:ignore DrupalPractice.General.ExceptionT.ExceptionT throw new MigrateBatchException($this->t('Stopping "@migration" after @current of @total', [ '@migration' => $this->migration->id(), - '@current' => $get_current(TRUE), - '@total' => $sandbox['total'], + '@current' => $item->item_id ?? 'unknown', ]), 1); } elseif ($status === MigrationInterface::RESULT_INCOMPLETE) { @@ -402,22 +388,20 @@ public function processBatch(&$context) { if ($item->data['attempts'] < 3) { // XXX: Not really making any progress, requeueing things, so don't // increment 'current'. - $context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); re-enqueueing. Exception info:@n@ex', [ + $context['message'] = $this->t('Migration "@migration": @current; encountered exception processing row with IDs: (@ids); re-enqueueing. Exception info:@n@ex', [ '@migration' => $this->migration->id(), - '@current' => $get_current(TRUE), + '@current' => $item->item_id ?? 'unknown', '@ids' => var_export($row->getSourceIdValues(), TRUE), - '@total' => $sandbox['total'], '@ex' => $e, '@n' => "\n", ]); $this->queue->createItem($item->data); } else { - $context['message'] = $this->t('Migration "@migration": @current/@total; encountered exception processing row with IDs: (@ids); attempts exhausted, failing. Exception info:@n@ex', [ + $context['message'] = $this->t('Migration "@migration": @current; encountered exception processing row with IDs: (@ids); attempts exhausted, failing. Exception info:@n@ex', [ '@migration' => $this->migration->id(), - '@current' => $get_current(TRUE), + '@current' => $item->item_id ?? 'unknown', '@ids' => var_export($row->getSourceIdValues(), TRUE), - '@total' => $sandbox['total'], '@ex' => $e, '@n' => "\n", ]); @@ -427,8 +411,6 @@ public function processBatch(&$context) { finally { $queue->deleteItem($item); } - - $update_finished(); } } catch (MigrateBatchException $e) { @@ -439,9 +421,6 @@ public function processBatch(&$context) { if ($e->getFinished() !== NULL) { $context['finished'] = $e->getFinished(); } - else { - $update_finished(); - } } } diff --git a/src/StompQueue.php b/src/StompQueue.php new file mode 100644 index 00000000..e8737ba7 --- /dev/null +++ b/src/StompQueue.php @@ -0,0 +1,191 @@ +stomp = $stomp; + $this->name = $name; + $this->group = $group; + } + + public static function create(string $name, string $group) { + return new static( + \Drupal::service('islandora.stomp'), + $name, + $group + ); + } + + /** + * @inheritDoc + */ + public function createItem($data) { + $id = $this->serial++; + + $body = new \stdClass(); + $body->data = $data; + $body->item_id = $id; + $body->created = time(); + + $message = new Message( + serialize($body), + [ + 'dgi_migrate_migration' => $this->name, + 'dgi_migrate_run_id' => $this->group, + 'persistent' => 'true', + ] + ); + + $this->stomp->send( + '/queue/dgi_migrate', + $message + ); + + return $id; + } + + /** + * @inheritDoc + */ + public function numberOfItems() { + return $this->id; + } + + protected function subscribe() { + if (!$this->subscribed) { + $this->stomp->subscribe("/queue/dgi_migrate", "dgi_migrate_migration = '{$this->name}' AND dgi_migrate_run_id = '{$this->group}'"); + $connection = $this->stomp->getClient()->getConnection(); + $connection->setReadTimeout(10); + + if (extension_loaded('pcntl')) { + pcntl_signal(SIGUSR1, [$this, 'pcntlSignalHandler']); + pcntl_signal(SIGINT, [$this, 'pcntlSignalHandler']); + $connection->setWaitCallback([$this, 'pcntlWaitCallback']); + } + $this->subscribed = TRUE; + } + } + + /** + * Signal flag. + * + * @var bool + * + * @see https://github.com/stomp-php/stomp-php-examples/blob/693d436228c49eabeda853d1c390dab0ce0ace7d/src/pcntl_signal_handling.php#L20-L21 + */ + protected bool $signalled = FALSE; + + /** + * Signal handler. + * + * @return void + * + * @see https://github.com/stomp-php/stomp-php-examples/blob/693d436228c49eabeda853d1c390dab0ce0ace7d/src/pcntl_signal_handling.php#L26-L29 + */ + public function pcntlSignalHandler() { + $this->signalled = TRUE; + } + + /** + * Wait callback. + * + * @return false|void + * + * @see https://github.com/stomp-php/stomp-php-examples/blob/693d436228c49eabeda853d1c390dab0ce0ace7d/src/pcntl_signal_handling.php#L38-L53 + */ + public function pcntlWaitCallback() { + pcntl_signal_dispatch(); + if ($this->signalled) { + return FALSE; + } + } + + /** + * @inheritDoc + */ + public function claimItem($lease_time = 3600) { + $this->subscribe(); + + $frame = $this->stomp->read(); + + if ($frame === FALSE) { + return FALSE; + } + + $to_return = unserialize($frame->getBody()); + $to_return->frame = $frame; + return $to_return; + } + + /** + * @inheritDoc + */ + public function deleteItem($item) { + $this->stomp->ack($item->frame); + } + + /** + * @inheritDoc + */ + public function releaseItem($item) { + $this->stomp->nack($item->frame); + } + + /** + * @inheritDoc + */ + public function createQueue() { + // No-op. + } + + /** + * @inheritDoc + */ + public function deleteQueue() { + // No-op... can't delete via STOMP. + } + + public function __sleep() { + $vars = $this->dstSleep(); + + $to_suppress = [ + // XXX: Avoid serializing some things that we don't need. + 'subscribed', + 'signalled', + ]; + foreach ($to_suppress as $value) { + $key = array_search($value, $vars); + if ($key !== FALSE) { + unset($vars[$key]); + } + } + + return $vars; + } + +} From 5a815bdc61b21074d8169ebf18339a17cd7522d4 Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Sun, 4 Jun 2023 18:49:14 -0300 Subject: [PATCH 05/29] Right, needs the run number as well. --- scripts/util.in | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/util.in b/scripts/util.in index f2c337a3..d15d135b 100644 --- a/scripts/util.in +++ b/scripts/util.in @@ -113,13 +113,13 @@ function do_migration_multi_process() { timedwwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --format=string \ | sort -n --key=2 | cut -f1 | while read MIGRATION_ID; do echo "Enqueuing items for $MIGRATION_ID" - timedwwwdrush dgi-migrate:enqueue --user=$DRUPAL_USER $MIGRATION_ID "${@:2}" + timedwwwdrush dgi-migrate:enqueue --user=$DRUPAL_USER $MIGRATION_ID $NUM "${@:2}" local ENQUEUEING_JOB=$! echo "Starting $PROCESSES processes to process $MIGRATION_ID." local -a PROCESS_JOBS=() for i in $(seq 1 $PROCESSES); do echo "Starting $i/$PROCESSES to process $MIGRATION_ID." - timedwwwdrush dgi-migrate:enqueued-process --user=$DRUPAL_USER $MIGRATION_ID "${@:2}" &> $PROCESS_LOG_DIR/$MIGRATION_ID.$i.log & + timedwwwdrush dgi-migrate:enqueued-process --user=$DRUPAL_USER $MIGRATION_ID $NUM "${@:2}" &> $PROCESS_LOG_DIR/$MIGRATION_ID.$i.log & PROCESS_JOBS+=($!) done wait $ENQUEUEING_JOB From 7e0316094c52e8f11ba6a4433a11a20008d980c6 Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Sun, 4 Jun 2023 18:50:09 -0300 Subject: [PATCH 06/29] And yeah, expects the enqueueing to be async, also. --- scripts/util.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/util.in b/scripts/util.in index d15d135b..51f5fcf3 100644 --- a/scripts/util.in +++ b/scripts/util.in @@ -113,7 +113,7 @@ function do_migration_multi_process() { timedwwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --format=string \ | sort -n --key=2 | cut -f1 | while read MIGRATION_ID; do echo "Enqueuing items for $MIGRATION_ID" - timedwwwdrush dgi-migrate:enqueue --user=$DRUPAL_USER $MIGRATION_ID $NUM "${@:2}" + timedwwwdrush dgi-migrate:enqueue --user=$DRUPAL_USER $MIGRATION_ID $NUM "${@:2}" & local ENQUEUEING_JOB=$! echo "Starting $PROCESSES processes to process $MIGRATION_ID." local -a PROCESS_JOBS=() From fe5dd4b2d7a8b836e40d229d7df7a8a44542481d Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Mon, 5 Jun 2023 09:47:46 -0300 Subject: [PATCH 07/29] Quote the various variables. Should avoid all the random files being created. --- scripts/util.in | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/scripts/util.in b/scripts/util.in index 51f5fcf3..4aecdb7d 100644 --- a/scripts/util.in +++ b/scripts/util.in @@ -113,13 +113,13 @@ function do_migration_multi_process() { timedwwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --format=string \ | sort -n --key=2 | cut -f1 | while read MIGRATION_ID; do echo "Enqueuing items for $MIGRATION_ID" - timedwwwdrush dgi-migrate:enqueue --user=$DRUPAL_USER $MIGRATION_ID $NUM "${@:2}" & + timedwwwdrush dgi-migrate:enqueue "--user=$DRUPAL_USER" "$MIGRATION_ID" "$NUM" "${@:2}" & local ENQUEUEING_JOB=$! echo "Starting $PROCESSES processes to process $MIGRATION_ID." local -a PROCESS_JOBS=() for i in $(seq 1 $PROCESSES); do echo "Starting $i/$PROCESSES to process $MIGRATION_ID." - timedwwwdrush dgi-migrate:enqueued-process --user=$DRUPAL_USER $MIGRATION_ID $NUM "${@:2}" &> $PROCESS_LOG_DIR/$MIGRATION_ID.$i.log & + timedwwwdrush dgi-migrate:enqueued-process "--user=$DRUPAL_USER" "$MIGRATION_ID" "$NUM" "${@:2}" &> $PROCESS_LOG_DIR/"$MIGRATION_ID.$i.log" & PROCESS_JOBS+=($!) done wait $ENQUEUEING_JOB @@ -127,7 +127,9 @@ function do_migration_multi_process() { kill -SIGUSR1 ${PROCESS_JOBS[@]} echo "Workers signalled; Waiting for them to finish..." wait ${PROCESS_JOBS[@]} - timedwwwdrush dgi-migrate:finish-enqueued-process --user=1 $MIGRATION_ID "${@:2}" + echo "Workers exited; Finalizing $MIGRATION_ID" + timedwwwdrush dgi-migrate:finish-enqueued-process "--user=$DRUPAL_USER" "$MIGRATION_ID" "${@:2}" + echo "Finished $MIGRATION_ID." done } From 9c882b52bef0fc34476899d7ddd2438d42aacccf Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Mon, 5 Jun 2023 11:25:31 -0300 Subject: [PATCH 08/29] Some fixes. --- scripts/util.in | 2 +- src/Commands/MigrateCommands.php | 7 ++++--- src/StompQueue.php | 5 ++++- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/scripts/util.in b/scripts/util.in index 4aecdb7d..0a08e022 100644 --- a/scripts/util.in +++ b/scripts/util.in @@ -128,7 +128,7 @@ function do_migration_multi_process() { echo "Workers signalled; Waiting for them to finish..." wait ${PROCESS_JOBS[@]} echo "Workers exited; Finalizing $MIGRATION_ID" - timedwwwdrush dgi-migrate:finish-enqueued-process "--user=$DRUPAL_USER" "$MIGRATION_ID" "${@:2}" + timedwwwdrush dgi-migrate:finish-enqueued-process "--user=$DRUPAL_USER" "$MIGRATION_ID" "$NUM" "${@:2}" echo "Finished $MIGRATION_ID." done } diff --git a/src/Commands/MigrateCommands.php b/src/Commands/MigrateCommands.php index f4462e99..acf5c421 100644 --- a/src/Commands/MigrateCommands.php +++ b/src/Commands/MigrateCommands.php @@ -227,7 +227,7 @@ protected function executeMigration(MigrationInterface $migration, $migration_id * @throws \Exception * If there are not enough parameters to the command. */ - public function rollback($migration_names = '', array $options = [ + public function rollback($migration_names = '', $run_id = NULL, array $options = [ 'all' => FALSE, 'group' => self::REQ, 'tag' => self::REQ, @@ -270,6 +270,7 @@ public function rollback($migration_names = '', array $options = [ $executable = new MigrateBatchExecutable( $migration, $this->getMigrateMessage(), + $run_id, $options ); // drush_op() provides --simulate support. @@ -431,11 +432,11 @@ public function processEnqueuedMigration(string $migration_id, string $run_id, a * * @islandora-drush-utils-user-wrap */ - public function finishEnqueuedMigration(string $migration_id, array $options = [ + public function finishEnqueuedMigration(string $migration_id, string $run_id, array $options = [ 'update' => FALSE, 'sync' => FALSE, ]) { - $executable = $this->getExecutable($migration_id, $options); + $executable = $this->getExecutable($migration_id, $run_id, $options); drush_op([$executable, 'teardownMigration']); } diff --git a/src/StompQueue.php b/src/StompQueue.php index e8737ba7..e8b6cc66 100644 --- a/src/StompQueue.php +++ b/src/StompQueue.php @@ -78,7 +78,10 @@ public function numberOfItems() { protected function subscribe() { if (!$this->subscribed) { - $this->stomp->subscribe("/queue/dgi_migrate", "dgi_migrate_migration = '{$this->name}' AND dgi_migrate_run_id = '{$this->group}'"); + $this->stomp->subscribe( + "/queue/dgi_migrate", + "dgi_migrate_migration = '{$this->name}' AND dgi_migrate_run_id = '{$this->group}'" + ); $connection = $this->stomp->getClient()->getConnection(); $connection->setReadTimeout(10); From e9595ca6e9063ed6d25f5232942c58068162abfb Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Mon, 5 Jun 2023 12:58:50 -0300 Subject: [PATCH 09/29] Misc fixes. --- patch.diff | 429 +++++++++++++++++++++++++++++++ scripts/util.in | 8 +- src/Commands/MigrateCommands.php | 33 ++- src/MigrateBatchExecutable.php | 20 +- src/StompQueue.php | 83 +++++- 5 files changed, 537 insertions(+), 36 deletions(-) create mode 100644 patch.diff diff --git a/patch.diff b/patch.diff new file mode 100644 index 00000000..cdbd0cea --- /dev/null +++ b/patch.diff @@ -0,0 +1,429 @@ +diff --git a/scripts/util.in b/scripts/util.in +index 0a08e02..81c4737 100644 +--- a/scripts/util.in ++++ b/scripts/util.in +@@ -113,13 +113,13 @@ function do_migration_multi_process() { + timedwwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --format=string \ + | sort -n --key=2 | cut -f1 | while read MIGRATION_ID; do + echo "Enqueuing items for $MIGRATION_ID" +- timedwwwdrush dgi-migrate:enqueue "--user=$DRUPAL_USER" "$MIGRATION_ID" "$NUM" "${@:2}" & ++ timedwwwdrush dgi-migrate:enqueue "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" & + local ENQUEUEING_JOB=$! + echo "Starting $PROCESSES processes to process $MIGRATION_ID." + local -a PROCESS_JOBS=() + for i in $(seq 1 $PROCESSES); do + echo "Starting $i/$PROCESSES to process $MIGRATION_ID." +- timedwwwdrush dgi-migrate:enqueued-process "--user=$DRUPAL_USER" "$MIGRATION_ID" "$NUM" "${@:2}" &> $PROCESS_LOG_DIR/"$MIGRATION_ID.$i.log" & ++ timedwwwdrush dgi-migrate:enqueued-process "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" &> $PROCESS_LOG_DIR/"$MIGRATION_ID.$i.log" & + PROCESS_JOBS+=($!) + done + wait $ENQUEUEING_JOB +@@ -128,7 +128,7 @@ function do_migration_multi_process() { + echo "Workers signalled; Waiting for them to finish..." + wait ${PROCESS_JOBS[@]} + echo "Workers exited; Finalizing $MIGRATION_ID" +- timedwwwdrush dgi-migrate:finish-enqueued-process "--user=$DRUPAL_USER" "$MIGRATION_ID" "$NUM" "${@:2}" ++ timedwwwdrush dgi-migrate:finish-enqueued-process "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" + echo "Finished $MIGRATION_ID." + done + } +@@ -212,7 +212,7 @@ function do_rollback () { + # Dump status before rollback. + wwwdrush migrate:status --group=$MIGRATION_GROUP + # The base rollback. +- timedwwwdrush dgi-migrate:rollback --user=$DRUPAL_USER --group=$MIGRATION_GROUP "${@:2}" |& wwwdo tee $ROLLBACK_LOG > /dev/null ++ timedwwwdrush dgi-migrate:rollback --user=$DRUPAL_USER --group=$MIGRATION_GROUP "--run=$NUM" "${@:2}" |& wwwdo tee $ROLLBACK_LOG > /dev/null + # Dump status after rollback. + wwwdrush migrate:status --group=$MIGRATION_GROUP + set +x +diff --git a/src/Commands/MigrateCommands.php b/src/Commands/MigrateCommands.php +index acf5c42..6042248 100644 +--- a/src/Commands/MigrateCommands.php ++++ b/src/Commands/MigrateCommands.php +@@ -49,6 +49,7 @@ class MigrateCommands extends MigrateToolsCommands { + * @option skip-progress-bar Skip displaying a progress bar. + * @option sync Sync source and destination. Delete destination records that + * do not exist in the source. ++ * @option run The ID of the run, if relevant. + * + * @default $options [] + * @usage migrate:batch-import --all +@@ -88,6 +89,7 @@ class MigrateCommands extends MigrateToolsCommands { + 'execute-dependencies' => FALSE, + 'skip-progress-bar' => FALSE, + 'sync' => FALSE, ++ 'run' => NULL, + ]) { + return parent::import($migration_names, $options); + } +@@ -205,6 +207,7 @@ class MigrateCommands extends MigrateToolsCommands { + * An optional set of row statuses, comma-separated, to which to constrain + * the rollback. Valid states are: "imported", "needs_update", "ignored", + * and "failed". ++ * @option run The ID of the run, if relevant. + * + * @default $options [] + * +@@ -227,7 +230,7 @@ class MigrateCommands extends MigrateToolsCommands { + * @throws \Exception + * If there are not enough parameters to the command. + */ +- public function rollback($migration_names = '', $run_id = NULL, array $options = [ ++ public function rollback($migration_names = '', array $options = [ + 'all' => FALSE, + 'group' => self::REQ, + 'tag' => self::REQ, +@@ -237,6 +240,7 @@ class MigrateCommands extends MigrateToolsCommands { + 'skip-progress-bar' => FALSE, + 'continue-on-failure' => FALSE, + 'statuses' => self::REQ, ++ 'run' => NULL, + ]) { + $group_names = $options['group']; + $tag_names = $options['tag']; +@@ -270,8 +274,8 @@ class MigrateCommands extends MigrateToolsCommands { + $executable = new MigrateBatchExecutable( + $migration, + $this->getMigrateMessage(), +- $run_id, +- $options ++ $options, ++ $options['run'] + ); + // drush_op() provides --simulate support. + $result = drush_op([$executable, 'rollback']); +@@ -283,7 +287,7 @@ class MigrateCommands extends MigrateToolsCommands { + + // If any rollbacks failed, throw an exception to generate exit status. + if ($has_failure) { +- $error_message = dt('!name migration failed.', ['!name' => $migration_id]); ++ $error_message = dt(strtr('!name migration failed.', ['!name' => $migration_id])); + if ($options['continue-on-failure']) { + $this->logger()->error($error_message); + } +@@ -357,10 +361,10 @@ class MigrateCommands extends MigrateToolsCommands { + * + * @throws \Drupal\Component\Plugin\Exception\PluginException + */ +- protected function getExecutable(string $migration_id, string $run_id, array $options = []) : MigrateBatchExecutable { ++ protected function getExecutable(string $migration_id, array $options = []) : MigrateBatchExecutable { + /** @var \Drupal\migrate\Plugin\MigrationInterface $migration */ + $migration = $this->migrationPluginManager->createInstance($migration_id); +- return new MigrateBatchExecutable($migration, $this->getMigrateMessage(), $options, $run_id); ++ return new MigrateBatchExecutable($migration, $this->getMigrateMessage(), $options); + } + + /** +@@ -372,14 +376,16 @@ class MigrateCommands extends MigrateToolsCommands { + * source, update previously-imported items with the current data + * @option sync Sync source and destination. Delete destination records that + * do not exist in the source. ++ * @option run The ID of the run, if relevant. + * + * @islandora-drush-utils-user-wrap + */ +- public function enqueueMigration(string $migration_id, string $run_id, array $options = [ ++ public function enqueueMigration(string $migration_id, array $options = [ + 'update' => FALSE, + 'sync' => FALSE, ++ 'run' => NULL, + ]) : void { +- $executable = $this->getExecutable($migration_id, $run_id, $options); ++ $executable = $this->getExecutable($migration_id, $options); + // drush_op() provides --simulate support. + drush_op([$executable, 'prepareBatch']); + } +@@ -393,14 +399,16 @@ class MigrateCommands extends MigrateToolsCommands { + * source, update previously-imported items with the current data + * @option sync Sync source and destination. Delete destination records that + * do not exist in the source. ++ * @option run The ID of the run, if relevant. + * + * @islandora-drush-utils-user-wrap + */ +- public function processEnqueuedMigration(string $migration_id, string $run_id, array $options = [ ++ public function processEnqueuedMigration(string $migration_id, array $options = [ + 'update' => FALSE, + 'sync' => FALSE, ++ 'run' => NULL, + ]) : void { +- $executable = $this->getExecutable($migration_id, $run_id, $options); ++ $executable = $this->getExecutable($migration_id, $options); + // drush_op() provides --simulate support. + $batch = [ + 'title' => $this->t('Running migration: @migration', [ +@@ -432,11 +440,12 @@ class MigrateCommands extends MigrateToolsCommands { + * + * @islandora-drush-utils-user-wrap + */ +- public function finishEnqueuedMigration(string $migration_id, string $run_id, array $options = [ ++ public function finishEnqueuedMigration(string $migration_id, array $options = [ + 'update' => FALSE, + 'sync' => FALSE, ++ 'run' => NULL, + ]) { +- $executable = $this->getExecutable($migration_id, $run_id, $options); ++ $executable = $this->getExecutable($migration_id, $options); + drush_op([$executable, 'teardownMigration']); + } + +diff --git a/src/MigrateBatchExecutable.php b/src/MigrateBatchExecutable.php +index 0f28f59..bb6e079 100644 +--- a/src/MigrateBatchExecutable.php ++++ b/src/MigrateBatchExecutable.php +@@ -49,18 +49,23 @@ class MigrateBatchExecutable extends MigrateExecutable { + */ + protected $idMapStatuses; + +- protected string $runId; ++ /** ++ * The options passed. ++ * ++ * @var array ++ */ ++ protected array $options; + + /** + * {@inheritdoc} + */ +- public function __construct(MigrationInterface $migration, MigrateMessageInterface $message, array $options = [], string $run_id = NULL) { +- $this->runId = $run_id; ++ public function __construct(MigrationInterface $migration, MigrateMessageInterface $message, array $options = []) { + $this->idMapStatuses = isset($options['statuses']) ? + StatusFilter::mapStatuses($options['statuses']) : + []; + + parent::__construct($migration, $message, $options); ++ $this->options = $options; + $this->getQueue(); + + if (static::isCli()) { +@@ -81,7 +86,7 @@ class MigrateBatchExecutable extends MigrateExecutable { + * The name of the queue. + */ + public function getQueueName() : string { +- return "dgi_migrate__{$this->migration->id()}"; ++ return "dgi_migrate__batch_ingest__{$this->migration->id()}"; + } + + /** +@@ -92,7 +97,10 @@ class MigrateBatchExecutable extends MigrateExecutable { + */ + protected function getQueue() : QueueInterface { + if (!isset($this->queue)) { +- $this->queue = StompQueue::create($this->migration->id(), $this->runId); ++ $this->queue = ($this->options['run'] ?? FALSE) ? ++ StompQueue::create($this->migration->id(), $this->options['run']) : ++ \Drupal::queue($this->getQueueName(), TRUE); ++ ; + } + + return $this->queue; +@@ -332,7 +340,7 @@ class MigrateBatchExecutable extends MigrateExecutable { + * Batch context. + */ + public function processBatch(&$context) { +- $sandbox =& $context['sandbox']; ++ $context['finished'] = 0; + + $queue = $this->getQueue(); + +diff --git a/src/StompQueue.php b/src/StompQueue.php +index e8b6cc6..370909a 100644 +--- a/src/StompQueue.php ++++ b/src/StompQueue.php +@@ -4,25 +4,57 @@ namespace Drupal\dgi_migrate; + + use Drupal\Core\DependencyInjection\DependencySerializationTrait; + use Drupal\Core\Queue\QueueInterface; ++use Drupal\migrate\Row; + use Stomp\States\IStateful; + use Stomp\Transport\Message; + ++/** ++ * STOMP-backed queue. ++ */ + class StompQueue implements QueueInterface { + + use DependencySerializationTrait { + __sleep as dstSleep; + } + ++ /** ++ * The STOMP client. ++ * ++ * @var \Stomp\States\IStateful ++ */ + protected IStateful $stomp; + ++ /** ++ * The name of the migration for which to manage the queue. ++ * ++ * @var string ++ */ + protected string $name; ++ ++ /** ++ * The run number of the migration, for which to manage the queue. ++ * ++ * @var string ++ */ + protected string $group; + ++ /** ++ * Flag, whether we have subscribed to the queue or not. ++ * ++ * @var bool ++ */ + protected bool $subscribed = FALSE; + ++ /** ++ * Serial number allocated when enqueueing. ++ * ++ * @var int ++ */ + protected int $serial = 0; + +- ++ /** ++ * Constructor. ++ */ + public function __construct( + IStateful $stomp, + string $name, +@@ -33,6 +65,16 @@ class StompQueue implements QueueInterface { + $this->group = $group; + } + ++ /** ++ * Static factory method. ++ * ++ * @param string $name ++ * The name of the migration for which to manage the queue. ++ * @param string $group ++ * The run number of the migration, for which to manage the queue. ++ * ++ * @return static ++ */ + public static function create(string $name, string $group) { + return new static( + \Drupal::service('islandora.stomp'), +@@ -42,7 +84,7 @@ class StompQueue implements QueueInterface { + } + + /** +- * @inheritDoc ++ * {@inheritDoc} + */ + public function createItem($data) { + $id = $this->serial++; +@@ -70,20 +112,26 @@ class StompQueue implements QueueInterface { + } + + /** +- * @inheritDoc ++ * {@inheritDoc} + */ + public function numberOfItems() { +- return $this->id; ++ // XXX: If called near the end, should be approximately the number of items ++ // in the queue. ++ return $this->serial; + } + ++ /** ++ * Helper; subscribe to the queue if we are not yet subscribed. ++ */ + protected function subscribe() { + if (!$this->subscribed) { + $this->stomp->subscribe( + "/queue/dgi_migrate", +- "dgi_migrate_migration = '{$this->name}' AND dgi_migrate_run_id = '{$this->group}'" ++ "dgi_migrate_migration = '{$this->name}' AND dgi_migrate_run_id = '{$this->group}'", ++ 'client' + ); + $connection = $this->stomp->getClient()->getConnection(); +- $connection->setReadTimeout(10); ++ $connection->setReadTimeout(60); + + if (extension_loaded('pcntl')) { + pcntl_signal(SIGUSR1, [$this, 'pcntlSignalHandler']); +@@ -106,8 +154,6 @@ class StompQueue implements QueueInterface { + /** + * Signal handler. + * +- * @return void +- * + * @see https://github.com/stomp-php/stomp-php-examples/blob/693d436228c49eabeda853d1c390dab0ce0ace7d/src/pcntl_signal_handling.php#L26-L29 + */ + public function pcntlSignalHandler() { +@@ -118,6 +164,7 @@ class StompQueue implements QueueInterface { + * Wait callback. + * + * @return false|void ++ * FALSE to interrupt; otherwise, continue. + * + * @see https://github.com/stomp-php/stomp-php-examples/blob/693d436228c49eabeda853d1c390dab0ce0ace7d/src/pcntl_signal_handling.php#L38-L53 + */ +@@ -129,7 +176,7 @@ class StompQueue implements QueueInterface { + } + + /** +- * @inheritDoc ++ * {@inheritDoc} + */ + public function claimItem($lease_time = 3600) { + $this->subscribe(); +@@ -140,39 +187,47 @@ class StompQueue implements QueueInterface { + return FALSE; + } + +- $to_return = unserialize($frame->getBody()); ++ $to_return = unserialize($frame->getBody(), [ ++ 'allowed_classes' => [ ++ Row::class, ++ \stdClass::class, ++ ], ++ ]); + $to_return->frame = $frame; + return $to_return; + } + + /** +- * @inheritDoc ++ * {@inheritDoc} + */ + public function deleteItem($item) { + $this->stomp->ack($item->frame); + } + + /** +- * @inheritDoc ++ * {@inheritDoc} + */ + public function releaseItem($item) { + $this->stomp->nack($item->frame); + } + + /** +- * @inheritDoc ++ * {@inheritDoc} + */ + public function createQueue() { + // No-op. + } + + /** +- * @inheritDoc ++ * {@inheritDoc} + */ + public function deleteQueue() { + // No-op... can't delete via STOMP. + } + ++ /** ++ * {@inheritDoc} ++ */ + public function __sleep() { + $vars = $this->dstSleep(); + diff --git a/scripts/util.in b/scripts/util.in index 0a08e022..81c47379 100644 --- a/scripts/util.in +++ b/scripts/util.in @@ -113,13 +113,13 @@ function do_migration_multi_process() { timedwwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --format=string \ | sort -n --key=2 | cut -f1 | while read MIGRATION_ID; do echo "Enqueuing items for $MIGRATION_ID" - timedwwwdrush dgi-migrate:enqueue "--user=$DRUPAL_USER" "$MIGRATION_ID" "$NUM" "${@:2}" & + timedwwwdrush dgi-migrate:enqueue "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" & local ENQUEUEING_JOB=$! echo "Starting $PROCESSES processes to process $MIGRATION_ID." local -a PROCESS_JOBS=() for i in $(seq 1 $PROCESSES); do echo "Starting $i/$PROCESSES to process $MIGRATION_ID." - timedwwwdrush dgi-migrate:enqueued-process "--user=$DRUPAL_USER" "$MIGRATION_ID" "$NUM" "${@:2}" &> $PROCESS_LOG_DIR/"$MIGRATION_ID.$i.log" & + timedwwwdrush dgi-migrate:enqueued-process "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" &> $PROCESS_LOG_DIR/"$MIGRATION_ID.$i.log" & PROCESS_JOBS+=($!) done wait $ENQUEUEING_JOB @@ -128,7 +128,7 @@ function do_migration_multi_process() { echo "Workers signalled; Waiting for them to finish..." wait ${PROCESS_JOBS[@]} echo "Workers exited; Finalizing $MIGRATION_ID" - timedwwwdrush dgi-migrate:finish-enqueued-process "--user=$DRUPAL_USER" "$MIGRATION_ID" "$NUM" "${@:2}" + timedwwwdrush dgi-migrate:finish-enqueued-process "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" echo "Finished $MIGRATION_ID." done } @@ -212,7 +212,7 @@ function do_rollback () { # Dump status before rollback. wwwdrush migrate:status --group=$MIGRATION_GROUP # The base rollback. - timedwwwdrush dgi-migrate:rollback --user=$DRUPAL_USER --group=$MIGRATION_GROUP "${@:2}" |& wwwdo tee $ROLLBACK_LOG > /dev/null + timedwwwdrush dgi-migrate:rollback --user=$DRUPAL_USER --group=$MIGRATION_GROUP "--run=$NUM" "${@:2}" |& wwwdo tee $ROLLBACK_LOG > /dev/null # Dump status after rollback. wwwdrush migrate:status --group=$MIGRATION_GROUP set +x diff --git a/src/Commands/MigrateCommands.php b/src/Commands/MigrateCommands.php index acf5c421..60422483 100644 --- a/src/Commands/MigrateCommands.php +++ b/src/Commands/MigrateCommands.php @@ -49,6 +49,7 @@ class MigrateCommands extends MigrateToolsCommands { * @option skip-progress-bar Skip displaying a progress bar. * @option sync Sync source and destination. Delete destination records that * do not exist in the source. + * @option run The ID of the run, if relevant. * * @default $options [] * @usage migrate:batch-import --all @@ -88,6 +89,7 @@ public function batchImport($migration_names = '', array $options = [ 'execute-dependencies' => FALSE, 'skip-progress-bar' => FALSE, 'sync' => FALSE, + 'run' => NULL, ]) { return parent::import($migration_names, $options); } @@ -205,6 +207,7 @@ protected function executeMigration(MigrationInterface $migration, $migration_id * An optional set of row statuses, comma-separated, to which to constrain * the rollback. Valid states are: "imported", "needs_update", "ignored", * and "failed". + * @option run The ID of the run, if relevant. * * @default $options [] * @@ -227,7 +230,7 @@ protected function executeMigration(MigrationInterface $migration, $migration_id * @throws \Exception * If there are not enough parameters to the command. */ - public function rollback($migration_names = '', $run_id = NULL, array $options = [ + public function rollback($migration_names = '', array $options = [ 'all' => FALSE, 'group' => self::REQ, 'tag' => self::REQ, @@ -237,6 +240,7 @@ public function rollback($migration_names = '', $run_id = NULL, array $options = 'skip-progress-bar' => FALSE, 'continue-on-failure' => FALSE, 'statuses' => self::REQ, + 'run' => NULL, ]) { $group_names = $options['group']; $tag_names = $options['tag']; @@ -270,8 +274,8 @@ public function rollback($migration_names = '', $run_id = NULL, array $options = $executable = new MigrateBatchExecutable( $migration, $this->getMigrateMessage(), - $run_id, - $options + $options, + $options['run'] ); // drush_op() provides --simulate support. $result = drush_op([$executable, 'rollback']); @@ -283,7 +287,7 @@ public function rollback($migration_names = '', $run_id = NULL, array $options = // If any rollbacks failed, throw an exception to generate exit status. if ($has_failure) { - $error_message = dt('!name migration failed.', ['!name' => $migration_id]); + $error_message = dt(strtr('!name migration failed.', ['!name' => $migration_id])); if ($options['continue-on-failure']) { $this->logger()->error($error_message); } @@ -357,10 +361,10 @@ public function listMigrations(array $options = [ * * @throws \Drupal\Component\Plugin\Exception\PluginException */ - protected function getExecutable(string $migration_id, string $run_id, array $options = []) : MigrateBatchExecutable { + protected function getExecutable(string $migration_id, array $options = []) : MigrateBatchExecutable { /** @var \Drupal\migrate\Plugin\MigrationInterface $migration */ $migration = $this->migrationPluginManager->createInstance($migration_id); - return new MigrateBatchExecutable($migration, $this->getMigrateMessage(), $options, $run_id); + return new MigrateBatchExecutable($migration, $this->getMigrateMessage(), $options); } /** @@ -372,14 +376,16 @@ protected function getExecutable(string $migration_id, string $run_id, array $op * source, update previously-imported items with the current data * @option sync Sync source and destination. Delete destination records that * do not exist in the source. + * @option run The ID of the run, if relevant. * * @islandora-drush-utils-user-wrap */ - public function enqueueMigration(string $migration_id, string $run_id, array $options = [ + public function enqueueMigration(string $migration_id, array $options = [ 'update' => FALSE, 'sync' => FALSE, + 'run' => NULL, ]) : void { - $executable = $this->getExecutable($migration_id, $run_id, $options); + $executable = $this->getExecutable($migration_id, $options); // drush_op() provides --simulate support. drush_op([$executable, 'prepareBatch']); } @@ -393,14 +399,16 @@ public function enqueueMigration(string $migration_id, string $run_id, array $op * source, update previously-imported items with the current data * @option sync Sync source and destination. Delete destination records that * do not exist in the source. + * @option run The ID of the run, if relevant. * * @islandora-drush-utils-user-wrap */ - public function processEnqueuedMigration(string $migration_id, string $run_id, array $options = [ + public function processEnqueuedMigration(string $migration_id, array $options = [ 'update' => FALSE, 'sync' => FALSE, + 'run' => NULL, ]) : void { - $executable = $this->getExecutable($migration_id, $run_id, $options); + $executable = $this->getExecutable($migration_id, $options); // drush_op() provides --simulate support. $batch = [ 'title' => $this->t('Running migration: @migration', [ @@ -432,11 +440,12 @@ public function processEnqueuedMigration(string $migration_id, string $run_id, a * * @islandora-drush-utils-user-wrap */ - public function finishEnqueuedMigration(string $migration_id, string $run_id, array $options = [ + public function finishEnqueuedMigration(string $migration_id, array $options = [ 'update' => FALSE, 'sync' => FALSE, + 'run' => NULL, ]) { - $executable = $this->getExecutable($migration_id, $run_id, $options); + $executable = $this->getExecutable($migration_id, $options); drush_op([$executable, 'teardownMigration']); } diff --git a/src/MigrateBatchExecutable.php b/src/MigrateBatchExecutable.php index 0f28f590..bb6e0790 100644 --- a/src/MigrateBatchExecutable.php +++ b/src/MigrateBatchExecutable.php @@ -49,18 +49,23 @@ class MigrateBatchExecutable extends MigrateExecutable { */ protected $idMapStatuses; - protected string $runId; + /** + * The options passed. + * + * @var array + */ + protected array $options; /** * {@inheritdoc} */ - public function __construct(MigrationInterface $migration, MigrateMessageInterface $message, array $options = [], string $run_id = NULL) { - $this->runId = $run_id; + public function __construct(MigrationInterface $migration, MigrateMessageInterface $message, array $options = []) { $this->idMapStatuses = isset($options['statuses']) ? StatusFilter::mapStatuses($options['statuses']) : []; parent::__construct($migration, $message, $options); + $this->options = $options; $this->getQueue(); if (static::isCli()) { @@ -81,7 +86,7 @@ public function __construct(MigrationInterface $migration, MigrateMessageInterfa * The name of the queue. */ public function getQueueName() : string { - return "dgi_migrate__{$this->migration->id()}"; + return "dgi_migrate__batch_ingest__{$this->migration->id()}"; } /** @@ -92,7 +97,10 @@ public function getQueueName() : string { */ protected function getQueue() : QueueInterface { if (!isset($this->queue)) { - $this->queue = StompQueue::create($this->migration->id(), $this->runId); + $this->queue = ($this->options['run'] ?? FALSE) ? + StompQueue::create($this->migration->id(), $this->options['run']) : + \Drupal::queue($this->getQueueName(), TRUE); + ; } return $this->queue; @@ -332,7 +340,7 @@ protected function processRowFromQueue(Row $row) { * Batch context. */ public function processBatch(&$context) { - $sandbox =& $context['sandbox']; + $context['finished'] = 0; $queue = $this->getQueue(); diff --git a/src/StompQueue.php b/src/StompQueue.php index e8b6cc66..370909a9 100644 --- a/src/StompQueue.php +++ b/src/StompQueue.php @@ -4,25 +4,57 @@ use Drupal\Core\DependencyInjection\DependencySerializationTrait; use Drupal\Core\Queue\QueueInterface; +use Drupal\migrate\Row; use Stomp\States\IStateful; use Stomp\Transport\Message; +/** + * STOMP-backed queue. + */ class StompQueue implements QueueInterface { use DependencySerializationTrait { __sleep as dstSleep; } + /** + * The STOMP client. + * + * @var \Stomp\States\IStateful + */ protected IStateful $stomp; + /** + * The name of the migration for which to manage the queue. + * + * @var string + */ protected string $name; + + /** + * The run number of the migration, for which to manage the queue. + * + * @var string + */ protected string $group; + /** + * Flag, whether we have subscribed to the queue or not. + * + * @var bool + */ protected bool $subscribed = FALSE; + /** + * Serial number allocated when enqueueing. + * + * @var int + */ protected int $serial = 0; - + /** + * Constructor. + */ public function __construct( IStateful $stomp, string $name, @@ -33,6 +65,16 @@ public function __construct( $this->group = $group; } + /** + * Static factory method. + * + * @param string $name + * The name of the migration for which to manage the queue. + * @param string $group + * The run number of the migration, for which to manage the queue. + * + * @return static + */ public static function create(string $name, string $group) { return new static( \Drupal::service('islandora.stomp'), @@ -42,7 +84,7 @@ public static function create(string $name, string $group) { } /** - * @inheritDoc + * {@inheritDoc} */ public function createItem($data) { $id = $this->serial++; @@ -70,20 +112,26 @@ public function createItem($data) { } /** - * @inheritDoc + * {@inheritDoc} */ public function numberOfItems() { - return $this->id; + // XXX: If called near the end, should be approximately the number of items + // in the queue. + return $this->serial; } + /** + * Helper; subscribe to the queue if we are not yet subscribed. + */ protected function subscribe() { if (!$this->subscribed) { $this->stomp->subscribe( "/queue/dgi_migrate", - "dgi_migrate_migration = '{$this->name}' AND dgi_migrate_run_id = '{$this->group}'" + "dgi_migrate_migration = '{$this->name}' AND dgi_migrate_run_id = '{$this->group}'", + 'client' ); $connection = $this->stomp->getClient()->getConnection(); - $connection->setReadTimeout(10); + $connection->setReadTimeout(60); if (extension_loaded('pcntl')) { pcntl_signal(SIGUSR1, [$this, 'pcntlSignalHandler']); @@ -106,8 +154,6 @@ protected function subscribe() { /** * Signal handler. * - * @return void - * * @see https://github.com/stomp-php/stomp-php-examples/blob/693d436228c49eabeda853d1c390dab0ce0ace7d/src/pcntl_signal_handling.php#L26-L29 */ public function pcntlSignalHandler() { @@ -118,6 +164,7 @@ public function pcntlSignalHandler() { * Wait callback. * * @return false|void + * FALSE to interrupt; otherwise, continue. * * @see https://github.com/stomp-php/stomp-php-examples/blob/693d436228c49eabeda853d1c390dab0ce0ace7d/src/pcntl_signal_handling.php#L38-L53 */ @@ -129,7 +176,7 @@ public function pcntlWaitCallback() { } /** - * @inheritDoc + * {@inheritDoc} */ public function claimItem($lease_time = 3600) { $this->subscribe(); @@ -140,39 +187,47 @@ public function claimItem($lease_time = 3600) { return FALSE; } - $to_return = unserialize($frame->getBody()); + $to_return = unserialize($frame->getBody(), [ + 'allowed_classes' => [ + Row::class, + \stdClass::class, + ], + ]); $to_return->frame = $frame; return $to_return; } /** - * @inheritDoc + * {@inheritDoc} */ public function deleteItem($item) { $this->stomp->ack($item->frame); } /** - * @inheritDoc + * {@inheritDoc} */ public function releaseItem($item) { $this->stomp->nack($item->frame); } /** - * @inheritDoc + * {@inheritDoc} */ public function createQueue() { // No-op. } /** - * @inheritDoc + * {@inheritDoc} */ public function deleteQueue() { // No-op... can't delete via STOMP. } + /** + * {@inheritDoc} + */ public function __sleep() { $vars = $this->dstSleep(); From 3ddc6da78a0e2755a9aa3867623c12ff6678d8ce Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Mon, 5 Jun 2023 13:48:51 -0300 Subject: [PATCH 10/29] Remove the patch. Was accidental addition. --- patch.diff | 429 ----------------------------------------------------- 1 file changed, 429 deletions(-) delete mode 100644 patch.diff diff --git a/patch.diff b/patch.diff deleted file mode 100644 index cdbd0cea..00000000 --- a/patch.diff +++ /dev/null @@ -1,429 +0,0 @@ -diff --git a/scripts/util.in b/scripts/util.in -index 0a08e02..81c4737 100644 ---- a/scripts/util.in -+++ b/scripts/util.in -@@ -113,13 +113,13 @@ function do_migration_multi_process() { - timedwwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --format=string \ - | sort -n --key=2 | cut -f1 | while read MIGRATION_ID; do - echo "Enqueuing items for $MIGRATION_ID" -- timedwwwdrush dgi-migrate:enqueue "--user=$DRUPAL_USER" "$MIGRATION_ID" "$NUM" "${@:2}" & -+ timedwwwdrush dgi-migrate:enqueue "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" & - local ENQUEUEING_JOB=$! - echo "Starting $PROCESSES processes to process $MIGRATION_ID." - local -a PROCESS_JOBS=() - for i in $(seq 1 $PROCESSES); do - echo "Starting $i/$PROCESSES to process $MIGRATION_ID." -- timedwwwdrush dgi-migrate:enqueued-process "--user=$DRUPAL_USER" "$MIGRATION_ID" "$NUM" "${@:2}" &> $PROCESS_LOG_DIR/"$MIGRATION_ID.$i.log" & -+ timedwwwdrush dgi-migrate:enqueued-process "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" &> $PROCESS_LOG_DIR/"$MIGRATION_ID.$i.log" & - PROCESS_JOBS+=($!) - done - wait $ENQUEUEING_JOB -@@ -128,7 +128,7 @@ function do_migration_multi_process() { - echo "Workers signalled; Waiting for them to finish..." - wait ${PROCESS_JOBS[@]} - echo "Workers exited; Finalizing $MIGRATION_ID" -- timedwwwdrush dgi-migrate:finish-enqueued-process "--user=$DRUPAL_USER" "$MIGRATION_ID" "$NUM" "${@:2}" -+ timedwwwdrush dgi-migrate:finish-enqueued-process "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" - echo "Finished $MIGRATION_ID." - done - } -@@ -212,7 +212,7 @@ function do_rollback () { - # Dump status before rollback. - wwwdrush migrate:status --group=$MIGRATION_GROUP - # The base rollback. -- timedwwwdrush dgi-migrate:rollback --user=$DRUPAL_USER --group=$MIGRATION_GROUP "${@:2}" |& wwwdo tee $ROLLBACK_LOG > /dev/null -+ timedwwwdrush dgi-migrate:rollback --user=$DRUPAL_USER --group=$MIGRATION_GROUP "--run=$NUM" "${@:2}" |& wwwdo tee $ROLLBACK_LOG > /dev/null - # Dump status after rollback. - wwwdrush migrate:status --group=$MIGRATION_GROUP - set +x -diff --git a/src/Commands/MigrateCommands.php b/src/Commands/MigrateCommands.php -index acf5c42..6042248 100644 ---- a/src/Commands/MigrateCommands.php -+++ b/src/Commands/MigrateCommands.php -@@ -49,6 +49,7 @@ class MigrateCommands extends MigrateToolsCommands { - * @option skip-progress-bar Skip displaying a progress bar. - * @option sync Sync source and destination. Delete destination records that - * do not exist in the source. -+ * @option run The ID of the run, if relevant. - * - * @default $options [] - * @usage migrate:batch-import --all -@@ -88,6 +89,7 @@ class MigrateCommands extends MigrateToolsCommands { - 'execute-dependencies' => FALSE, - 'skip-progress-bar' => FALSE, - 'sync' => FALSE, -+ 'run' => NULL, - ]) { - return parent::import($migration_names, $options); - } -@@ -205,6 +207,7 @@ class MigrateCommands extends MigrateToolsCommands { - * An optional set of row statuses, comma-separated, to which to constrain - * the rollback. Valid states are: "imported", "needs_update", "ignored", - * and "failed". -+ * @option run The ID of the run, if relevant. - * - * @default $options [] - * -@@ -227,7 +230,7 @@ class MigrateCommands extends MigrateToolsCommands { - * @throws \Exception - * If there are not enough parameters to the command. - */ -- public function rollback($migration_names = '', $run_id = NULL, array $options = [ -+ public function rollback($migration_names = '', array $options = [ - 'all' => FALSE, - 'group' => self::REQ, - 'tag' => self::REQ, -@@ -237,6 +240,7 @@ class MigrateCommands extends MigrateToolsCommands { - 'skip-progress-bar' => FALSE, - 'continue-on-failure' => FALSE, - 'statuses' => self::REQ, -+ 'run' => NULL, - ]) { - $group_names = $options['group']; - $tag_names = $options['tag']; -@@ -270,8 +274,8 @@ class MigrateCommands extends MigrateToolsCommands { - $executable = new MigrateBatchExecutable( - $migration, - $this->getMigrateMessage(), -- $run_id, -- $options -+ $options, -+ $options['run'] - ); - // drush_op() provides --simulate support. - $result = drush_op([$executable, 'rollback']); -@@ -283,7 +287,7 @@ class MigrateCommands extends MigrateToolsCommands { - - // If any rollbacks failed, throw an exception to generate exit status. - if ($has_failure) { -- $error_message = dt('!name migration failed.', ['!name' => $migration_id]); -+ $error_message = dt(strtr('!name migration failed.', ['!name' => $migration_id])); - if ($options['continue-on-failure']) { - $this->logger()->error($error_message); - } -@@ -357,10 +361,10 @@ class MigrateCommands extends MigrateToolsCommands { - * - * @throws \Drupal\Component\Plugin\Exception\PluginException - */ -- protected function getExecutable(string $migration_id, string $run_id, array $options = []) : MigrateBatchExecutable { -+ protected function getExecutable(string $migration_id, array $options = []) : MigrateBatchExecutable { - /** @var \Drupal\migrate\Plugin\MigrationInterface $migration */ - $migration = $this->migrationPluginManager->createInstance($migration_id); -- return new MigrateBatchExecutable($migration, $this->getMigrateMessage(), $options, $run_id); -+ return new MigrateBatchExecutable($migration, $this->getMigrateMessage(), $options); - } - - /** -@@ -372,14 +376,16 @@ class MigrateCommands extends MigrateToolsCommands { - * source, update previously-imported items with the current data - * @option sync Sync source and destination. Delete destination records that - * do not exist in the source. -+ * @option run The ID of the run, if relevant. - * - * @islandora-drush-utils-user-wrap - */ -- public function enqueueMigration(string $migration_id, string $run_id, array $options = [ -+ public function enqueueMigration(string $migration_id, array $options = [ - 'update' => FALSE, - 'sync' => FALSE, -+ 'run' => NULL, - ]) : void { -- $executable = $this->getExecutable($migration_id, $run_id, $options); -+ $executable = $this->getExecutable($migration_id, $options); - // drush_op() provides --simulate support. - drush_op([$executable, 'prepareBatch']); - } -@@ -393,14 +399,16 @@ class MigrateCommands extends MigrateToolsCommands { - * source, update previously-imported items with the current data - * @option sync Sync source and destination. Delete destination records that - * do not exist in the source. -+ * @option run The ID of the run, if relevant. - * - * @islandora-drush-utils-user-wrap - */ -- public function processEnqueuedMigration(string $migration_id, string $run_id, array $options = [ -+ public function processEnqueuedMigration(string $migration_id, array $options = [ - 'update' => FALSE, - 'sync' => FALSE, -+ 'run' => NULL, - ]) : void { -- $executable = $this->getExecutable($migration_id, $run_id, $options); -+ $executable = $this->getExecutable($migration_id, $options); - // drush_op() provides --simulate support. - $batch = [ - 'title' => $this->t('Running migration: @migration', [ -@@ -432,11 +440,12 @@ class MigrateCommands extends MigrateToolsCommands { - * - * @islandora-drush-utils-user-wrap - */ -- public function finishEnqueuedMigration(string $migration_id, string $run_id, array $options = [ -+ public function finishEnqueuedMigration(string $migration_id, array $options = [ - 'update' => FALSE, - 'sync' => FALSE, -+ 'run' => NULL, - ]) { -- $executable = $this->getExecutable($migration_id, $run_id, $options); -+ $executable = $this->getExecutable($migration_id, $options); - drush_op([$executable, 'teardownMigration']); - } - -diff --git a/src/MigrateBatchExecutable.php b/src/MigrateBatchExecutable.php -index 0f28f59..bb6e079 100644 ---- a/src/MigrateBatchExecutable.php -+++ b/src/MigrateBatchExecutable.php -@@ -49,18 +49,23 @@ class MigrateBatchExecutable extends MigrateExecutable { - */ - protected $idMapStatuses; - -- protected string $runId; -+ /** -+ * The options passed. -+ * -+ * @var array -+ */ -+ protected array $options; - - /** - * {@inheritdoc} - */ -- public function __construct(MigrationInterface $migration, MigrateMessageInterface $message, array $options = [], string $run_id = NULL) { -- $this->runId = $run_id; -+ public function __construct(MigrationInterface $migration, MigrateMessageInterface $message, array $options = []) { - $this->idMapStatuses = isset($options['statuses']) ? - StatusFilter::mapStatuses($options['statuses']) : - []; - - parent::__construct($migration, $message, $options); -+ $this->options = $options; - $this->getQueue(); - - if (static::isCli()) { -@@ -81,7 +86,7 @@ class MigrateBatchExecutable extends MigrateExecutable { - * The name of the queue. - */ - public function getQueueName() : string { -- return "dgi_migrate__{$this->migration->id()}"; -+ return "dgi_migrate__batch_ingest__{$this->migration->id()}"; - } - - /** -@@ -92,7 +97,10 @@ class MigrateBatchExecutable extends MigrateExecutable { - */ - protected function getQueue() : QueueInterface { - if (!isset($this->queue)) { -- $this->queue = StompQueue::create($this->migration->id(), $this->runId); -+ $this->queue = ($this->options['run'] ?? FALSE) ? -+ StompQueue::create($this->migration->id(), $this->options['run']) : -+ \Drupal::queue($this->getQueueName(), TRUE); -+ ; - } - - return $this->queue; -@@ -332,7 +340,7 @@ class MigrateBatchExecutable extends MigrateExecutable { - * Batch context. - */ - public function processBatch(&$context) { -- $sandbox =& $context['sandbox']; -+ $context['finished'] = 0; - - $queue = $this->getQueue(); - -diff --git a/src/StompQueue.php b/src/StompQueue.php -index e8b6cc6..370909a 100644 ---- a/src/StompQueue.php -+++ b/src/StompQueue.php -@@ -4,25 +4,57 @@ namespace Drupal\dgi_migrate; - - use Drupal\Core\DependencyInjection\DependencySerializationTrait; - use Drupal\Core\Queue\QueueInterface; -+use Drupal\migrate\Row; - use Stomp\States\IStateful; - use Stomp\Transport\Message; - -+/** -+ * STOMP-backed queue. -+ */ - class StompQueue implements QueueInterface { - - use DependencySerializationTrait { - __sleep as dstSleep; - } - -+ /** -+ * The STOMP client. -+ * -+ * @var \Stomp\States\IStateful -+ */ - protected IStateful $stomp; - -+ /** -+ * The name of the migration for which to manage the queue. -+ * -+ * @var string -+ */ - protected string $name; -+ -+ /** -+ * The run number of the migration, for which to manage the queue. -+ * -+ * @var string -+ */ - protected string $group; - -+ /** -+ * Flag, whether we have subscribed to the queue or not. -+ * -+ * @var bool -+ */ - protected bool $subscribed = FALSE; - -+ /** -+ * Serial number allocated when enqueueing. -+ * -+ * @var int -+ */ - protected int $serial = 0; - -- -+ /** -+ * Constructor. -+ */ - public function __construct( - IStateful $stomp, - string $name, -@@ -33,6 +65,16 @@ class StompQueue implements QueueInterface { - $this->group = $group; - } - -+ /** -+ * Static factory method. -+ * -+ * @param string $name -+ * The name of the migration for which to manage the queue. -+ * @param string $group -+ * The run number of the migration, for which to manage the queue. -+ * -+ * @return static -+ */ - public static function create(string $name, string $group) { - return new static( - \Drupal::service('islandora.stomp'), -@@ -42,7 +84,7 @@ class StompQueue implements QueueInterface { - } - - /** -- * @inheritDoc -+ * {@inheritDoc} - */ - public function createItem($data) { - $id = $this->serial++; -@@ -70,20 +112,26 @@ class StompQueue implements QueueInterface { - } - - /** -- * @inheritDoc -+ * {@inheritDoc} - */ - public function numberOfItems() { -- return $this->id; -+ // XXX: If called near the end, should be approximately the number of items -+ // in the queue. -+ return $this->serial; - } - -+ /** -+ * Helper; subscribe to the queue if we are not yet subscribed. -+ */ - protected function subscribe() { - if (!$this->subscribed) { - $this->stomp->subscribe( - "/queue/dgi_migrate", -- "dgi_migrate_migration = '{$this->name}' AND dgi_migrate_run_id = '{$this->group}'" -+ "dgi_migrate_migration = '{$this->name}' AND dgi_migrate_run_id = '{$this->group}'", -+ 'client' - ); - $connection = $this->stomp->getClient()->getConnection(); -- $connection->setReadTimeout(10); -+ $connection->setReadTimeout(60); - - if (extension_loaded('pcntl')) { - pcntl_signal(SIGUSR1, [$this, 'pcntlSignalHandler']); -@@ -106,8 +154,6 @@ class StompQueue implements QueueInterface { - /** - * Signal handler. - * -- * @return void -- * - * @see https://github.com/stomp-php/stomp-php-examples/blob/693d436228c49eabeda853d1c390dab0ce0ace7d/src/pcntl_signal_handling.php#L26-L29 - */ - public function pcntlSignalHandler() { -@@ -118,6 +164,7 @@ class StompQueue implements QueueInterface { - * Wait callback. - * - * @return false|void -+ * FALSE to interrupt; otherwise, continue. - * - * @see https://github.com/stomp-php/stomp-php-examples/blob/693d436228c49eabeda853d1c390dab0ce0ace7d/src/pcntl_signal_handling.php#L38-L53 - */ -@@ -129,7 +176,7 @@ class StompQueue implements QueueInterface { - } - - /** -- * @inheritDoc -+ * {@inheritDoc} - */ - public function claimItem($lease_time = 3600) { - $this->subscribe(); -@@ -140,39 +187,47 @@ class StompQueue implements QueueInterface { - return FALSE; - } - -- $to_return = unserialize($frame->getBody()); -+ $to_return = unserialize($frame->getBody(), [ -+ 'allowed_classes' => [ -+ Row::class, -+ \stdClass::class, -+ ], -+ ]); - $to_return->frame = $frame; - return $to_return; - } - - /** -- * @inheritDoc -+ * {@inheritDoc} - */ - public function deleteItem($item) { - $this->stomp->ack($item->frame); - } - - /** -- * @inheritDoc -+ * {@inheritDoc} - */ - public function releaseItem($item) { - $this->stomp->nack($item->frame); - } - - /** -- * @inheritDoc -+ * {@inheritDoc} - */ - public function createQueue() { - // No-op. - } - - /** -- * @inheritDoc -+ * {@inheritDoc} - */ - public function deleteQueue() { - // No-op... can't delete via STOMP. - } - -+ /** -+ * {@inheritDoc} -+ */ - public function __sleep() { - $vars = $this->dstSleep(); - From 059635f23902f29e807fc9f96ca7a8ba85c88ab9 Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Mon, 5 Jun 2023 13:53:09 -0300 Subject: [PATCH 11/29] Skip the count on the foxml files business. --- .../migrations/dgis_foxml_files.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_foxml_files.yml b/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_foxml_files.yml index 179783ab..3b6935b4 100644 --- a/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_foxml_files.yml +++ b/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_foxml_files.yml @@ -11,7 +11,9 @@ source: # validation of the URI in the destination may fail (presumably, naive URI # validation inside of Drupal, expecting HTTP-like URLs)... or may just # require a third slash to imply an empty "authority" component? - cache_counts: true + # XXX: For big sets of things, counting could take a substantial amount of + # time, so let's skip it. + skip_count: true destination: plugin: entity:file validate: true From 395db2e9173cf20875d41998d813c11d215f6535 Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Mon, 5 Jun 2023 14:31:26 -0300 Subject: [PATCH 12/29] Explicitly segment queue instead of basing on selectors. --- src/StompQueue.php | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/StompQueue.php b/src/StompQueue.php index 370909a9..3beecabb 100644 --- a/src/StompQueue.php +++ b/src/StompQueue.php @@ -104,7 +104,7 @@ public function createItem($data) { ); $this->stomp->send( - '/queue/dgi_migrate', + $this->getQueueName(), $message ); @@ -120,14 +120,24 @@ public function numberOfItems() { return $this->serial; } + /** + * Helper; get queue name. + * + * @return string + * The name of the queue with which to communicate. + */ + protected function getQueueName() { + return "/queue/dgi_migrate_{$this->name}_{$this->group}"; + } + /** * Helper; subscribe to the queue if we are not yet subscribed. */ protected function subscribe() { if (!$this->subscribed) { $this->stomp->subscribe( - "/queue/dgi_migrate", - "dgi_migrate_migration = '{$this->name}' AND dgi_migrate_run_id = '{$this->group}'", + $this->getQueueName(), + null, 'client' ); $connection = $this->stomp->getClient()->getConnection(); From 69f938f4af60f0e69746ec1877e80af48d11934f Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Mon, 5 Jun 2023 14:34:08 -0300 Subject: [PATCH 13/29] Fix casing. --- src/StompQueue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/StompQueue.php b/src/StompQueue.php index 3beecabb..78347328 100644 --- a/src/StompQueue.php +++ b/src/StompQueue.php @@ -137,7 +137,7 @@ protected function subscribe() { if (!$this->subscribed) { $this->stomp->subscribe( $this->getQueueName(), - null, + NULL, 'client' ); $connection = $this->stomp->getClient()->getConnection(); From fe189456d21d342c528325606677dc718cb1fb1a Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Tue, 6 Jun 2023 10:21:18 -0300 Subject: [PATCH 14/29] Rework messaging. --- scripts/util.in | 6 +-- src/Commands/MigrateCommands.php | 3 ++ src/MigrateBatchExecutable.php | 8 ++- src/StompQueue.php | 89 +++++++++++++++++--------------- 4 files changed, 58 insertions(+), 48 deletions(-) diff --git a/scripts/util.in b/scripts/util.in index 81c47379..ad015895 100644 --- a/scripts/util.in +++ b/scripts/util.in @@ -113,7 +113,7 @@ function do_migration_multi_process() { timedwwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --format=string \ | sort -n --key=2 | cut -f1 | while read MIGRATION_ID; do echo "Enqueuing items for $MIGRATION_ID" - timedwwwdrush dgi-migrate:enqueue "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" & + timedwwwdrush dgi-migrate:enqueue "--user=$DRUPAL_USER" "--run=$NUM" "--send_terminals=$PROCESSES" "$MIGRATION_ID" "${@:2}" & local ENQUEUEING_JOB=$! echo "Starting $PROCESSES processes to process $MIGRATION_ID." local -a PROCESS_JOBS=() @@ -123,9 +123,7 @@ function do_migration_multi_process() { PROCESS_JOBS+=($!) done wait $ENQUEUEING_JOB - echo "Enqueing finished; signalling workers." - kill -SIGUSR1 ${PROCESS_JOBS[@]} - echo "Workers signalled; Waiting for them to finish..." + echo "Enqueing finished; Waiting for workers to finish..." wait ${PROCESS_JOBS[@]} echo "Workers exited; Finalizing $MIGRATION_ID" timedwwwdrush dgi-migrate:finish-enqueued-process "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" diff --git a/src/Commands/MigrateCommands.php b/src/Commands/MigrateCommands.php index 60422483..4ccca606 100644 --- a/src/Commands/MigrateCommands.php +++ b/src/Commands/MigrateCommands.php @@ -377,6 +377,8 @@ protected function getExecutable(string $migration_id, array $options = []) : Mi * @option sync Sync source and destination. Delete destination records that * do not exist in the source. * @option run The ID of the run, if relevant. + * @option send_terminals The number of terminal messages to send after + * enqueueing all the messages. * * @islandora-drush-utils-user-wrap */ @@ -384,6 +386,7 @@ public function enqueueMigration(string $migration_id, array $options = [ 'update' => FALSE, 'sync' => FALSE, 'run' => NULL, + 'send_terminals' => 0, ]) : void { $executable = $this->getExecutable($migration_id, $options); // drush_op() provides --simulate support. diff --git a/src/MigrateBatchExecutable.php b/src/MigrateBatchExecutable.php index bb6e0790..8cc4013c 100644 --- a/src/MigrateBatchExecutable.php +++ b/src/MigrateBatchExecutable.php @@ -86,7 +86,7 @@ public function __construct(MigrationInterface $migration, MigrateMessageInterfa * The name of the queue. */ public function getQueueName() : string { - return "dgi_migrate__batch_ingest__{$this->migration->id()}"; + return "dgi_migrate__batch_queue__{$this->migration->id()}"; } /** @@ -245,6 +245,12 @@ protected function enqueue() { 'attempts' => 0, ]); } + if ($queue instanceof StompQueue) { + $total = intval($this->options['send_terminals'] ?? 0); + for ($i = 0; $i < $total; $i++) { + $queue->sendTerminal(); + } + } return MigrationInterface::RESULT_COMPLETED; } diff --git a/src/StompQueue.php b/src/StompQueue.php index 78347328..5270d1a4 100644 --- a/src/StompQueue.php +++ b/src/StompQueue.php @@ -5,6 +5,7 @@ use Drupal\Core\DependencyInjection\DependencySerializationTrait; use Drupal\Core\Queue\QueueInterface; use Drupal\migrate\Row; +use Psr\Log\LoggerInterface; use Stomp\States\IStateful; use Stomp\Transport\Message; @@ -52,15 +53,24 @@ class StompQueue implements QueueInterface { */ protected int $serial = 0; + /** + * Logger. + * + * @var \Psr\Log\LoggerInterface + */ + protected LoggerInterface $logger; + /** * Constructor. */ public function __construct( IStateful $stomp, + LoggerInterface $logger, string $name, string $group ) { $this->stomp = $stomp; + $this->logger = $logger; $this->name = $name; $this->group = $group; } @@ -78,6 +88,7 @@ public function __construct( public static function create(string $name, string $group) { return new static( \Drupal::service('islandora.stomp'), + \Drupal::logger('dgi_migrate.stomp_queue'), $name, $group ); @@ -97,6 +108,7 @@ public function createItem($data) { $message = new Message( serialize($body), [ + 'type' => 'to_process', 'dgi_migrate_migration' => $this->name, 'dgi_migrate_run_id' => $this->group, 'persistent' => 'true', @@ -111,6 +123,28 @@ public function createItem($data) { return $id; } + /** + * Send a "terminal" message. + * + * Should be one for each worker we intend to start. + */ + public function sendTerminal() { + $message = new Message( + '', + [ + 'type' => 'terminal', + 'dgi_migrate_migration' => $this->name, + 'dgi_migrate_run_id' => $this->group, + 'persistent' => 'true', + ] + ); + + $this->stomp->send( + $this->getQueueName(), + $message + ); + } + /** * {@inheritDoc} */ @@ -140,63 +174,32 @@ protected function subscribe() { NULL, 'client' ); - $connection = $this->stomp->getClient()->getConnection(); - $connection->setReadTimeout(60); - if (extension_loaded('pcntl')) { - pcntl_signal(SIGUSR1, [$this, 'pcntlSignalHandler']); - pcntl_signal(SIGINT, [$this, 'pcntlSignalHandler']); - $connection->setWaitCallback([$this, 'pcntlWaitCallback']); - } $this->subscribed = TRUE; } } - /** - * Signal flag. - * - * @var bool - * - * @see https://github.com/stomp-php/stomp-php-examples/blob/693d436228c49eabeda853d1c390dab0ce0ace7d/src/pcntl_signal_handling.php#L20-L21 - */ - protected bool $signalled = FALSE; - - /** - * Signal handler. - * - * @see https://github.com/stomp-php/stomp-php-examples/blob/693d436228c49eabeda853d1c390dab0ce0ace7d/src/pcntl_signal_handling.php#L26-L29 - */ - public function pcntlSignalHandler() { - $this->signalled = TRUE; - } - - /** - * Wait callback. - * - * @return false|void - * FALSE to interrupt; otherwise, continue. - * - * @see https://github.com/stomp-php/stomp-php-examples/blob/693d436228c49eabeda853d1c390dab0ce0ace7d/src/pcntl_signal_handling.php#L38-L53 - */ - public function pcntlWaitCallback() { - pcntl_signal_dispatch(); - if ($this->signalled) { - return FALSE; - } - } - /** * {@inheritDoc} */ public function claimItem($lease_time = 3600) { $this->subscribe(); - $frame = $this->stomp->read(); - - if ($frame === FALSE) { + // XXX: The STOMP client has an associated timeout out, after which it will + // return that it failed to read anything. If we haven't been signalled that + // the queue has been completely populated, try again; otherwise, if the + // queue is finished, report its exhaustion. + while(($frame = $this->stomp->read()) === FALSE) { + $this->logger->debug('Not signalled; polling again.'); + } + $headers = $frame->getHeaders(); + if (array_key_exists('type', $headers) && $headers['type'] === 'terminal') { + // Got a terminal message; ack-knowledge it and flag the queue's empty. + $this->deleteItem($frame); return FALSE; } + $to_return = unserialize($frame->getBody(), [ 'allowed_classes' => [ Row::class, From 4c6deb5b05535a365a92a6548a21c518139162f0 Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Tue, 6 Jun 2023 10:24:52 -0300 Subject: [PATCH 15/29] Fix up some spacing. --- src/StompQueue.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/StompQueue.php b/src/StompQueue.php index 5270d1a4..e68be014 100644 --- a/src/StompQueue.php +++ b/src/StompQueue.php @@ -189,7 +189,7 @@ public function claimItem($lease_time = 3600) { // return that it failed to read anything. If we haven't been signalled that // the queue has been completely populated, try again; otherwise, if the // queue is finished, report its exhaustion. - while(($frame = $this->stomp->read()) === FALSE) { + while (($frame = $this->stomp->read()) === FALSE) { $this->logger->debug('Not signalled; polling again.'); } $headers = $frame->getHeaders(); @@ -199,7 +199,6 @@ public function claimItem($lease_time = 3600) { return FALSE; } - $to_return = unserialize($frame->getBody(), [ 'allowed_classes' => [ Row::class, From 5ab56ee48d76273bbd63b272d4ee89bef2dd897e Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Tue, 6 Jun 2023 10:44:26 -0300 Subject: [PATCH 16/29] Ensure the signal to shut down the queues gets sent. --- src/MigrateBatchExecutable.php | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/MigrateBatchExecutable.php b/src/MigrateBatchExecutable.php index 8cc4013c..afccf7dc 100644 --- a/src/MigrateBatchExecutable.php +++ b/src/MigrateBatchExecutable.php @@ -239,16 +239,20 @@ protected function enqueue() { // XXX: Nuke it, just in case. $queue = $this->getQueue(); $queue->deleteQueue(); - foreach ($source as $row) { - $queue->createItem([ - 'row' => $row, - 'attempts' => 0, - ]); + try { + foreach ($source as $row) { + $queue->createItem([ + 'row' => $row, + 'attempts' => 0, + ]); + } } - if ($queue instanceof StompQueue) { - $total = intval($this->options['send_terminals'] ?? 0); - for ($i = 0; $i < $total; $i++) { - $queue->sendTerminal(); + finally { + if ($queue instanceof StompQueue) { + $total = intval($this->options['send_terminals'] ?? 0); + for ($i = 0; $i < $total; $i++) { + $queue->sendTerminal(); + } } } return MigrationInterface::RESULT_COMPLETED; From 26c1633ee3dbe4a263e1886484abe44a0b9d68e2 Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Tue, 6 Jun 2023 11:03:32 -0300 Subject: [PATCH 17/29] Use the appropriate method. ... isn't building out the full "item", so can't use the `::deleteItem()` thing. --- src/StompQueue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/StompQueue.php b/src/StompQueue.php index e68be014..7044e188 100644 --- a/src/StompQueue.php +++ b/src/StompQueue.php @@ -195,7 +195,7 @@ public function claimItem($lease_time = 3600) { $headers = $frame->getHeaders(); if (array_key_exists('type', $headers) && $headers['type'] === 'terminal') { // Got a terminal message; ack-knowledge it and flag the queue's empty. - $this->deleteItem($frame); + $this->stomp->ack($frame); return FALSE; } From 0fc823718a54e48e20c0f88ac4b19b52f938ff1a Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Tue, 6 Jun 2023 11:38:53 -0300 Subject: [PATCH 18/29] Update comment, as we're no longer dealing with signals. --- src/StompQueue.php | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/StompQueue.php b/src/StompQueue.php index 7044e188..2c929332 100644 --- a/src/StompQueue.php +++ b/src/StompQueue.php @@ -186,9 +186,8 @@ public function claimItem($lease_time = 3600) { $this->subscribe(); // XXX: The STOMP client has an associated timeout out, after which it will - // return that it failed to read anything. If we haven't been signalled that - // the queue has been completely populated, try again; otherwise, if the - // queue is finished, report its exhaustion. + // return that it failed to read anything. We expect a "terminal" message to + // be added to the queue for each worker, telling them to quit. while (($frame = $this->stomp->read()) === FALSE) { $this->logger->debug('Not signalled; polling again.'); } From e50e1b3302241604b06cd437dbfb62a3cc72c69f Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Tue, 6 Jun 2023 12:34:10 -0300 Subject: [PATCH 19/29] Get rid of a stray semi-colon. --- src/MigrateBatchExecutable.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/MigrateBatchExecutable.php b/src/MigrateBatchExecutable.php index afccf7dc..b474fba4 100644 --- a/src/MigrateBatchExecutable.php +++ b/src/MigrateBatchExecutable.php @@ -100,7 +100,6 @@ protected function getQueue() : QueueInterface { $this->queue = ($this->options['run'] ?? FALSE) ? StompQueue::create($this->migration->id(), $this->options['run']) : \Drupal::queue($this->getQueueName(), TRUE); - ; } return $this->queue; From ed004afefed480465d7ecc26707bf94a62c99bd2 Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Thu, 8 Jun 2023 14:32:32 -0300 Subject: [PATCH 20/29] Enhance pausability. --- scripts/util.in | 25 ++++++++++++++++++++++--- src/Commands/MigrateCommands.php | 21 +++++++++++++++++++++ src/MigrateBatchExecutable.php | 22 +++++++--------------- src/StompQueue.php | 7 +++++-- 4 files changed, 55 insertions(+), 20 deletions(-) diff --git a/scripts/util.in b/scripts/util.in index 3a582540..9ce77f16 100644 --- a/scripts/util.in +++ b/scripts/util.in @@ -124,13 +124,18 @@ function do_migration_multi_process() { local NUM=${1} local PROCESS_LOG_DIR="$LOG_DIR/$NUM-multiprocess-logs" + local LOCK_FILE="$LOG_FILE/$NUM.lock" wwwdo mkdir -p $PROCESS_LOG_DIR + wwwdo touch $LOCK_FILE echo "Listing migrations..." timedwwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --format=string \ | sort -n --key=2 | cut -f1 | while read MIGRATION_ID; do - if [ " ${MULTIPROCESS_SKIP_MIGRATIONS[@]} " =~ " $MIGRATION_ID " ]; then + if [ ! -f $LOCK_FILE ] ; then + echo "Lock file removed; exiting before touching $MIGRATION_ID." + return + elif [ " ${MULTIPROCESS_SKIP_MIGRATIONS[@]} " =~ " $MIGRATION_ID " ]; then echo "Skipping $MIGRATION_ID as requested." continue elif [ " ${MULTIPROCESS_PRE_ENQUEUE_PAUSE[@]} " =~ " $MIGRATION_ID " ]; then @@ -148,16 +153,30 @@ function do_migration_multi_process() { timedwwwdrush dgi-migrate:enqueued-process "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" &> $PROCESS_LOG_DIR/"$MIGRATION_ID.$i.log" & PROCESS_JOBS+=($!) done + wait $ENQUEUEING_JOB - echo "Enqueing finished; Waiting for workers to finish..." + echo "Work enqueueing finished; enqueueing terminal messages." + for i in $(seq 1 $PROCESSES); do + wwwdrush dgi-migrate:enqueue-terminal "$MIGRATION_ID" "$NUM" + done + + echo "Terminal messages enqueued; waiting for workers to finish..." wait ${PROCESS_JOBS[@]} - echo "Workers exited; Finalizing $MIGRATION_ID" + echo "Workers exited." + if [ ! -f $LOCK_FILE ] ; then + echo "Lock file removed; exiting without finalizing batch." + return + fi + + echo "Finalizing $MIGRATION_ID." timedwwwdrush dgi-migrate:finish-enqueued-process "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" echo "Finished $MIGRATION_ID." if [ " ${MULTIPROCESS_POST_PROCESS_PAUSE[@]} " =~ " $MIGRATION_ID " ]; then do_pause "post-process" "$MIGRATION_ID" fi + done + wwwdo rm $LOCK_FILE } # Dump status for the given migration group. diff --git a/src/Commands/MigrateCommands.php b/src/Commands/MigrateCommands.php index 4ccca606..96ca9f1e 100644 --- a/src/Commands/MigrateCommands.php +++ b/src/Commands/MigrateCommands.php @@ -5,6 +5,7 @@ use Consolidation\OutputFormatters\StructuredData\RowsOfFields; use Drupal\Component\Graph\Graph; use Drupal\Core\StringTranslation\StringTranslationTrait; +use Drupal\dgi_migrate\StompQueue; use Drupal\migrate_tools\Commands\MigrateToolsCommands; use Drupal\dgi_migrate\MigrateBatchExecutable; use Drupal\migrate\Plugin\MigrationInterface; @@ -440,6 +441,7 @@ public function processEnqueuedMigration(string $migration_id, array $options = * source, update previously-imported items with the current data * @option sync Sync source and destination. Delete destination records that * do not exist in the source. + * @option run The ID of the run, if relevant. * * @islandora-drush-utils-user-wrap */ @@ -452,4 +454,23 @@ public function finishEnqueuedMigration(string $migration_id, array $options = [ drush_op([$executable, 'teardownMigration']); } + /** + * Enqueue STOMP "terminal" messages. + * + * @option priority The priority of the message. Higher should lead to earlier + * processing, allowing workers to be shutdown prior to completion of the + * queue. Defined by STOMP/JMS, an integer ranging from 0 to 9. Defaults to + * 4. + * + * @command dgi-migrate:enqueue-terminal + */ + public function enqueueTerminal(string $migration_id, string $run_id, array $options = [ + 'priority' => 4, + ]) { + $stomp_queue = StompQueue::create($migration_id, $run_id); + $stomp_queue->sendTerminal([ + 'priority' => $options['priority'] ?? 4, + ]); + } + } diff --git a/src/MigrateBatchExecutable.php b/src/MigrateBatchExecutable.php index b474fba4..48576ef8 100644 --- a/src/MigrateBatchExecutable.php +++ b/src/MigrateBatchExecutable.php @@ -238,22 +238,14 @@ protected function enqueue() { // XXX: Nuke it, just in case. $queue = $this->getQueue(); $queue->deleteQueue(); - try { - foreach ($source as $row) { - $queue->createItem([ - 'row' => $row, - 'attempts' => 0, - ]); - } - } - finally { - if ($queue instanceof StompQueue) { - $total = intval($this->options['send_terminals'] ?? 0); - for ($i = 0; $i < $total; $i++) { - $queue->sendTerminal(); - } - } + + foreach ($source as $row) { + $queue->createItem([ + 'row' => $row, + 'attempts' => 0, + ]); } + return MigrationInterface::RESULT_COMPLETED; } diff --git a/src/StompQueue.php b/src/StompQueue.php index 2c929332..bd9ef9ee 100644 --- a/src/StompQueue.php +++ b/src/StompQueue.php @@ -127,11 +127,14 @@ public function createItem($data) { * Send a "terminal" message. * * Should be one for each worker we intend to start. + * + * @param array $extra_headers + * Extra headers to send on the message. */ - public function sendTerminal() { + public function sendTerminal(array $extra_headers = []) { $message = new Message( '', - [ + $extra_headers + [ 'type' => 'terminal', 'dgi_migrate_migration' => $this->name, 'dgi_migrate_run_id' => $this->group, From 8acbf667b8a68156bacf7724baf9004c634d58ed Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Thu, 8 Jun 2023 15:54:10 -0300 Subject: [PATCH 21/29] Pull sending the "terminal" messages out of the core. ... they're now enqueued separately, with a "priority" flag, if we want to tell things to exit earlier. ... also, misc skipping and pausing things. --- scripts/util.in | 30 +++++++++++++++++++----------- src/Commands/MigrateCommands.php | 25 ++++++++++++++++--------- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/scripts/util.in b/scripts/util.in index 9ce77f16..dd6f9eb2 100644 --- a/scripts/util.in +++ b/scripts/util.in @@ -124,16 +124,21 @@ function do_migration_multi_process() { local NUM=${1} local PROCESS_LOG_DIR="$LOG_DIR/$NUM-multiprocess-logs" - local LOCK_FILE="$LOG_FILE/$NUM.lock" + local STOP_LOCK_FILE="$LOG_DIR/$NUM-stop.lock" + local PAUSE_LOCK_FILE="$LOG_DIR/$NUM-pause.lock" wwwdo mkdir -p $PROCESS_LOG_DIR - wwwdo touch $LOCK_FILE + wwwdo touch $STOP_LOCK_FILE + wwwdo touch $PAUSE_LOCK_FILE echo "Listing migrations..." - timedwwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --format=string \ - | sort -n --key=2 | cut -f1 | while read MIGRATION_ID; do - if [ ! -f $LOCK_FILE ] ; then - echo "Lock file removed; exiting before touching $MIGRATION_ID." + timedwwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --field=id --sort | \ + while read MIGRATION_ID; do + if [ ! -f $PAUSE_LOCK_FILE ] ; then + echo "Pause lock file removed." + do_pause "pre-enqueue" "$MIGRATION_ID" + elif [ ! -f $STOP_LOCK_FILE ] ; then + echo "Stop lock file removed; exiting before touching $MIGRATION_ID." return elif [ " ${MULTIPROCESS_SKIP_MIGRATIONS[@]} " =~ " $MIGRATION_ID " ]; then echo "Skipping $MIGRATION_ID as requested." @@ -143,7 +148,7 @@ function do_migration_multi_process() { fi echo "Enqueuing items for $MIGRATION_ID" - timedwwwdrush dgi-migrate:enqueue "--user=$DRUPAL_USER" "--run=$NUM" "--send_terminals=$PROCESSES" "$MIGRATION_ID" "${@:2}" & + timedwwwdrush dgi-migrate:enqueue "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" & local ENQUEUEING_JOB=$! echo "Starting $PROCESSES processes to process $MIGRATION_ID." @@ -163,7 +168,10 @@ function do_migration_multi_process() { echo "Terminal messages enqueued; waiting for workers to finish..." wait ${PROCESS_JOBS[@]} echo "Workers exited." - if [ ! -f $LOCK_FILE ] ; then + + if [ ! -f $PAUSE_LOCK_FILE ] ; then + do_pause "post-process, pre-finalize" "$MIGRATION_ID" + elif [ ! -f $STOP_LOCK_FILE ] ; then echo "Lock file removed; exiting without finalizing batch." return fi @@ -176,7 +184,7 @@ function do_migration_multi_process() { fi done - wwwdo rm $LOCK_FILE + wwwdo rm $STOP_LOCK_FILE $PAUSE_LOCK_FILE } # Dump status for the given migration group. @@ -184,7 +192,7 @@ function do_migration_multi_process() { # Can be skipped if SKIP_STATUS=true; in which case the call to this should be # no-op. function dump_status() { - if [ $SKIP_STATUS != 'true']; then + if [ $SKIP_STATUS != 'true' ]; then wwwdrush migrate:status --group=$MIGRATION_GROUP fi } @@ -223,7 +231,7 @@ function do_migration () { dump_status # Dump messages after run, so they're not lost with a subsequent run. wwwdo mkdir -p $MESSAGES_DIR - wwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --format=string | \ + wwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --field=id | \ while read NAME ; do wwwdrush migrate:messages --format=json $NAME | wwwdo tee "$MESSAGES_DIR/$NAME.json" > /dev/null done diff --git a/src/Commands/MigrateCommands.php b/src/Commands/MigrateCommands.php index 96ca9f1e..565377b5 100644 --- a/src/Commands/MigrateCommands.php +++ b/src/Commands/MigrateCommands.php @@ -304,21 +304,23 @@ public function rollback($migration_names = '', array $options = [ * * @command dgi-migrate:list-migrations * - * @option all Process all migrations. - * @option group A comma-separated list of migration groups to import. - * @option tag Name of the migration tag to import. - * * @field-labels * id: Migration IDs * weight: Weight of the migration * @default-fields id,weight + * + * @option all Process all migrations. + * @option group A comma-separated list of migration groups to import. + * @option tag Name of the migration tag to import. + * @option sort Sort according to weight. */ public function listMigrations(array $options = [ 'all' => FALSE, 'group' => self::REQ, 'tag' => self::REQ, 'format' => 'csv', - ]) { + 'sort' => FALSE, + ]) : RowsOfFields { $generate_order = function () use ($options) { $migration_groups = $this->migrationsList('', $options); @@ -346,7 +348,15 @@ public function listMigrations(array $options = [ } }; - return new RowsOfFields(iterator_to_array($generate_order())); + $generated = iterator_to_array($generate_order()); + + if ($options['sort']) { + usort($generated, function ($a, $b) { + return $a['weight'] - $b['weight']; + }); + } + + return new RowsOfFields($generated); } /** @@ -378,8 +388,6 @@ protected function getExecutable(string $migration_id, array $options = []) : Mi * @option sync Sync source and destination. Delete destination records that * do not exist in the source. * @option run The ID of the run, if relevant. - * @option send_terminals The number of terminal messages to send after - * enqueueing all the messages. * * @islandora-drush-utils-user-wrap */ @@ -387,7 +395,6 @@ public function enqueueMigration(string $migration_id, array $options = [ 'update' => FALSE, 'sync' => FALSE, 'run' => NULL, - 'send_terminals' => 0, ]) : void { $executable = $this->getExecutable($migration_id, $options); // drush_op() provides --simulate support. From e65cf5226708e92739b48f5d41c3fcb790b97335 Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Thu, 8 Jun 2023 17:11:15 -0300 Subject: [PATCH 22/29] Fix up pausing business. --- scripts/env.sample | 6 ++++++ scripts/util.in | 16 +++++++++------- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/scripts/env.sample b/scripts/env.sample index a1e4f017..1292b265 100644 --- a/scripts/env.sample +++ b/scripts/env.sample @@ -80,6 +80,9 @@ # # Expected to be a Bash array; in this instance, just a set of strings # (representing migration IDs) between parentheses. +# +# NOTE: The prompt for this presently only shows in the *-import.log; _not_ in +# the main "run" process. # --- #MULTIPROCESS_PRE_ENQUEUE_PAUSE=() @@ -88,5 +91,8 @@ # # Expected to be a Bash array; in this instance, just a set of strings # (representing migration IDs) between parentheses. +# +# NOTE: The prompt for this presently only shows in the *-import.log; _not_ in +# the main "run" process. # --- #MULTIPROCESS_POST_PROCESS_PAUSE=() diff --git a/scripts/util.in b/scripts/util.in index dd6f9eb2..10d117ff 100644 --- a/scripts/util.in +++ b/scripts/util.in @@ -104,14 +104,16 @@ function do_migration_single_process() { # Helper; facilitate pausing for various reasons (likely snapshotting). # # Positional args: -# - 1: A descripitive string of _when_ we are pausing; e.g. "pre-enqueue", +# - 1: A descriptive string of _when_ we are pausing; e.g. "pre-enqueue", # "post-enqueue", etc. # - 2: The ID of the specific migration during which we are pausing. function do_pause() { local WHEN=$1 local MIGRATION_ID=$2 + local DISCARD - read -p "Pausing $WHEN of $MIGRATION_ID as requested. Hit enter to continue." + read -ep "Pausing $WHEN of $MIGRATION_ID as requested. Hit enter to continue." DISCARD + echo "DISCARD is $DISCARD" } # Kick off a migration in multiple processes. @@ -132,18 +134,18 @@ function do_migration_multi_process() { wwwdo touch $PAUSE_LOCK_FILE echo "Listing migrations..." - timedwwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --field=id --sort | \ - while read MIGRATION_ID; do + local -a migrations=($(wwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --field=id --sort)) + for MIGRATION_ID in ${migrations[@]}; do if [ ! -f $PAUSE_LOCK_FILE ] ; then echo "Pause lock file removed." do_pause "pre-enqueue" "$MIGRATION_ID" elif [ ! -f $STOP_LOCK_FILE ] ; then echo "Stop lock file removed; exiting before touching $MIGRATION_ID." return - elif [ " ${MULTIPROCESS_SKIP_MIGRATIONS[@]} " =~ " $MIGRATION_ID " ]; then + elif [[ " ${MULTIPROCESS_SKIP_MIGRATIONS[@]} " =~ " $MIGRATION_ID " ]]; then echo "Skipping $MIGRATION_ID as requested." continue - elif [ " ${MULTIPROCESS_PRE_ENQUEUE_PAUSE[@]} " =~ " $MIGRATION_ID " ]; then + elif [[ " ${MULTIPROCESS_PRE_ENQUEUE_PAUSE[@]} " =~ " $MIGRATION_ID " ]]; then do_pause "pre-enqueue" "$MIGRATION_ID" fi @@ -179,7 +181,7 @@ function do_migration_multi_process() { echo "Finalizing $MIGRATION_ID." timedwwwdrush dgi-migrate:finish-enqueued-process "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" echo "Finished $MIGRATION_ID." - if [ " ${MULTIPROCESS_POST_PROCESS_PAUSE[@]} " =~ " $MIGRATION_ID " ]; then + if [[ " ${MULTIPROCESS_POST_PROCESS_PAUSE[@]} " =~ " $MIGRATION_ID " ]] ; then do_pause "post-process" "$MIGRATION_ID" fi From 0a6ad812cb922ced8c0b267f493aec6d0501aa99 Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Thu, 8 Jun 2023 17:19:04 -0300 Subject: [PATCH 23/29] Get rid of unused variable. --- scripts/util.in | 1 - 1 file changed, 1 deletion(-) diff --git a/scripts/util.in b/scripts/util.in index 10d117ff..ccac2ca6 100644 --- a/scripts/util.in +++ b/scripts/util.in @@ -68,7 +68,6 @@ function init_vars () { declare -g SKIP_STATUS=${SKIP_STATUS:-false} declare -g -a MULTIPROCESS_SKIP_MIGRATIONS=(${MULTIPROCESS_SKIP_MIGRATIONS[@]}) declare -g -a MULTIPROCESS_PRE_ENQUEUE_PAUSE=(${MULTIPROCESS_PRE_ENQUEUE_PAUSE[@]}) - declare -g -a MULTIPROCESS_POST_ENQUEUE_PAUSE=(${MULTIPROCESS_POST_ENQUEUE_PAUSE[@]}) declare -g -a MULTIPROCESS_POST_PROCESS_PAUSE=(${MULTIPROCESS_POST_PROCESS_PAUSE[@]}) # Initialize the log directory. From 44d2026b071dfd33ce78cf98683d838b7a77c121 Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Fri, 9 Jun 2023 14:27:55 -0300 Subject: [PATCH 24/29] `flock()`-based locking, instead of Drupal's. --- .../process/LockingMigrationLookup.php | 108 ++++++++++++------ 1 file changed, 76 insertions(+), 32 deletions(-) diff --git a/src/Plugin/migrate/process/LockingMigrationLookup.php b/src/Plugin/migrate/process/LockingMigrationLookup.php index 9b1f9f14..0bc1ce31 100644 --- a/src/Plugin/migrate/process/LockingMigrationLookup.php +++ b/src/Plugin/migrate/process/LockingMigrationLookup.php @@ -3,7 +3,6 @@ namespace Drupal\dgi_migrate\Plugin\migrate\process; use Drupal\Core\Database\Connection; -use Drupal\Core\Lock\LockBackendInterface; use Drupal\Core\Plugin\ContainerFactoryPluginInterface; use Drupal\migrate\MigrateException; use Drupal\migrate\MigrateExecutableInterface; @@ -28,13 +27,6 @@ class LockingMigrationLookup extends ProcessPluginBase implements MigrateProcess */ protected MigrateProcessInterface $parent; - /** - * Lock service. - * - * @var \Drupal\Core\Lock\LockBackendInterface - */ - protected LockBackendInterface $lock; - /** * Memoized array of migrations referenced. * @@ -84,13 +76,24 @@ class LockingMigrationLookup extends ProcessPluginBase implements MigrateProcess */ protected MigrationInterface $migration; + /** + * An array of SplFileObjects, to facilitate locking. + * + * @var SplFileObject[] + */ + protected array $lockFiles = []; + /** * Constructor. */ public function __construct(array $configuration, $plugin_id, $plugin_definition) { parent::__construct($configuration, $plugin_id, $plugin_definition); - $this->doLocking = getenv('DGI_MIGRATE__DO_MIGRATION_LOOKUP_LOCKING') === 'TRUE'; + // We do not need to do the locking with `no_stub`, as we would not be + // creating any entities, so there would be no potential for creating + // duplicates. + $this->doLocking = empty($this->configuration['no_stub']) && + (getenv('DGI_MIGRATE__DO_MIGRATION_LOOKUP_LOCKING') === 'TRUE'); } /** @@ -144,9 +147,6 @@ protected function getLockMap() : array { * The lock name to use for the given migration. */ protected function getLockName(string $migration_name) : string { - // XXX: May have to get creative with hashing... thinking max lock name is - // something like 255 chars? - // XXX: 255 character limit may be a non-issue?: https://api.drupal.org/api/drupal/core%21lib%21Drupal%21Core%21Lock%21DatabaseLockBackend.php/function/DatabaseLockBackend%3A%3AnormalizeName/10 return "dgi_migrate_locking_migration_lookup__migration__$migration_name"; } @@ -160,16 +160,18 @@ protected function getLockName(string $migration_name) : string { */ protected function acquireMigrationLocks(float $timeout = 30.0) : void { try { - if (!$this->getControlLock()) { - throw new MigrateException('Failed to acquire control lock.'); + if (count($this->getLockMap()) > 1) { + // More than one, we need to acquire the "control" lock before + // proceeding, to avoid potential deadlocks. + if (!$this->getControlLock()) { + throw new MigrateException('Failed to acquire control lock.'); + } } + if (!$this->hasMigrationLocks) { $this->hasMigrationLocks = TRUE; foreach ($this->getLockMap() as $migration => $lock_name) { - while (!$this->lock->acquire($lock_name, $timeout)) { - while ($this->lock->wait($lock_name)); - } - if (!$this->lock->acquire($lock_name, $timeout)) { + if (!$this->acquireLock($lock_name)) { throw new MigrateException("Failed to acquire lock for '$migration'."); } } @@ -188,7 +190,7 @@ protected function acquireMigrationLocks(float $timeout = 30.0) : void { protected function releaseMigrationLocks() : void { if ($this->hasMigrationLocks) { foreach ($this->getLockMap() as $lock_name) { - $this->lock->release($lock_name); + $this->releaseLock($lock_name); } $this->hasMigrationLocks = FALSE; } @@ -201,18 +203,68 @@ protected function releaseMigrationLocks() : void { * TRUE if we acquired it; otherwise, FALSE. */ protected function getControlLock() : bool { - while (!($this->hasControl = $this->lock->acquire(static::CONTROL_LOCK, 600))) { - while ($this->lock->wait(static::CONTROL_LOCK)); + if (!$this->hasControl) { + $this->hasControl = $this->acquireLock(static::CONTROL_LOCK); } return $this->hasControl; } + /** + * Get an \SplFileObject instance to act as the lock. + * + * @param string $name + * The name of the lock to acquire. Should result in a file being created + * under the temporary:// scheme of the same name, against which `flock` + * commands will be issued. + * + * @return \SplFileObject + * The \SplFileObject instance against which to lock. + */ + protected function getLockFile(string $name) : \SplFileObject { + if (!isset($this->lockFiles[$name])) { + $file_name = "temporary://{$name}"; + touch($file_name); + $this->lockFiles[$name] = $file = new \SplFileObject($file_name, 'w'); + $file->fwrite("This is a temporary lock file. If there are no migrations running, it should be safe to delete."); + } + + return $this->lockFiles[$name]; + } + + /** + * Helper; acquire the lock. + * + * @param string $name + * The name of the lock to acquire. + * + * @return bool + * TRUE on success. Should not be able to return FALSE, as we perform this + * in a blocking manner. + */ + protected function acquireLock(string $name) : bool { + return $this->getLockFile($name)->flock(LOCK_EX); + } + + /** + * Helper; Release the given lock. + * + * @param string $name + * The name of the lock to release. + * + * @return bool + * TRUE on success. Should not be able to return FALSE, unless we maybe did + * not hold the lock? + */ + protected function releaseLock(string $name) : bool { + return $this->getLockFile($name)->flock(LOCK_UN); + } + /** * Helper; release control lock. */ protected function releaseControlLock() { if ($this->hasControl) { - $this->lock->release(static::CONTROL_LOCK); + $this->releaseLock(static::CONTROL_LOCK); $this->hasControl = FALSE; } } @@ -232,7 +284,6 @@ public function transform($value, MigrateExecutableInterface $migrate_executable return $this->parent->transform($value, $migrate_executable, $row, $destination_property); } - $transaction = $this->database->startTransaction(); try { // Acquire locks for all referenced migrations. $log('Locking migrations.'); @@ -241,19 +292,13 @@ public function transform($value, MigrateExecutableInterface $migrate_executable // Perform the lookup as per the wrapped transform. $result = $this->parent->transform($value, $migrate_executable, $row, $destination_property); - $log("Parent run, commit transaction and releasing migration locks."); - unset($transaction); + $log("Parent run, releasing migration locks."); + // Optimistically drop migration locks. $this->releaseMigrationLocks(); $log("Releasing migration locks, returning."); return $result; } - catch (\Exception $e) { - if (isset($transaction)) { - $transaction->rollBack(); - } - throw $e; - } finally { // Drop migration locks, if we still have them. $this->releaseMigrationLocks(); @@ -270,7 +315,6 @@ public static function create(ContainerInterface $container, array $configuratio /** @var \Drupal\Component\Plugin\PluginManagerInterface $process_plugin_manager */ $process_plugin_manager = $container->get('plugin.manager.migrate.process'); $instance->parent = $process_plugin_manager->createInstance('dgi_migrate_original_migration_lookup', $configuration, $migration); - $instance->lock = $container->get('lock'); $instance->database = $container->get('database'); $instance->migration = $migration; From 4cda5d09de47f6a3f1e637b6a2c78af7bfddb26c Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Fri, 9 Jun 2023 15:11:01 -0300 Subject: [PATCH 25/29] Fix little coding standards thing. --- src/Plugin/migrate/process/LockingMigrationLookup.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Plugin/migrate/process/LockingMigrationLookup.php b/src/Plugin/migrate/process/LockingMigrationLookup.php index 0bc1ce31..754e5564 100644 --- a/src/Plugin/migrate/process/LockingMigrationLookup.php +++ b/src/Plugin/migrate/process/LockingMigrationLookup.php @@ -79,7 +79,7 @@ class LockingMigrationLookup extends ProcessPluginBase implements MigrateProcess /** * An array of SplFileObjects, to facilitate locking. * - * @var SplFileObject[] + * @var \SplFileObject[] */ protected array $lockFiles = []; From da7d079a6903d93997a493e9fba3c841381c6ce1 Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Mon, 12 Jun 2023 17:40:46 -0300 Subject: [PATCH 26/29] Things together. --- .../migrations/dgis_nodes.yml | 15 +- .../process/LockingMigrationLookup.php | 343 ++++++++++++++++-- 2 files changed, 324 insertions(+), 34 deletions(-) diff --git a/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_nodes.yml b/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_nodes.yml index 148b9115..450334d8 100644 --- a/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_nodes.yml +++ b/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_nodes.yml @@ -41,9 +41,13 @@ source: # to the vocab in which to do the things. - '@_vid' - plugin: flatten - - plugin: migration_lookup + - plugin: dgi_migrate.process.locking_migration_lookup migration: dgis_stub_terms_generic stub_id: dgis_stub_terms_generic + lock_context_keys: + dgis_stub_terms_generic: + - { offset: [3] } + - { offset: [1], hash: '##'} extract: &generic_term_extract plugin: dgi_migrate.process.single_extract index: [actual] @@ -1761,18 +1765,23 @@ process: <<: *generic_term_after - <<: *generic_term_extract nid: - - plugin: migration_lookup + - plugin: dgi_migrate.process.locking_migration_lookup source: '@field_pid' migration: dgis_stub_nodes + lock_context_keys: &dgis_stub_nodes_lock_context_keys + dgis_stub_nodes: + - { offset: [ 0 ], hash: '#/##' } field_member_of: - plugin: flatten source: - '@_members' - '@_constituents' - plugin: multiple_values - - plugin: migration_lookup + - plugin: dgi_migrate.process.locking_migration_lookup migration: dgis_stub_nodes stub_id: dgis_stub_nodes + lock_context_keys: + << : *dgis_stub_nodes_lock_context_keys migration_dependencies: required: - dgis_foxml_files diff --git a/src/Plugin/migrate/process/LockingMigrationLookup.php b/src/Plugin/migrate/process/LockingMigrationLookup.php index 754e5564..b7eb2025 100644 --- a/src/Plugin/migrate/process/LockingMigrationLookup.php +++ b/src/Plugin/migrate/process/LockingMigrationLookup.php @@ -2,10 +2,16 @@ namespace Drupal\dgi_migrate\Plugin\migrate\process; +use Drupal\Component\Plugin\Exception\PluginNotFoundException; +use Drupal\Component\Utility\NestedArray; use Drupal\Core\Database\Connection; +use Drupal\Core\File\FileSystemInterface; use Drupal\Core\Plugin\ContainerFactoryPluginInterface; use Drupal\migrate\MigrateException; use Drupal\migrate\MigrateExecutableInterface; +use Drupal\migrate\MigrateLookupInterface; +use Drupal\migrate\MigrateSkipProcessException; +use Drupal\migrate\MigrateStubInterface; use Drupal\migrate\Plugin\MigrateProcessInterface; use Drupal\migrate\Plugin\MigrationInterface; use Drupal\migrate\ProcessPluginBase; @@ -14,6 +20,23 @@ /** * Override upstream `migration_lookup` plugin, with some additional locking. + * + * @MigrateProcessPlugin( + * id = "dgi_migrate.process.locking_migration_lookup" + * ) + * + * Accepts all the same as the core "migration_lookup" plugin, in addition to: + * - "no_lock": Flag to explicitly skip locking, which should only be used when + * it is known that there's a one-to-one mapping between each set of paramters + * and each resultant value. + * - "lock_context_keys": A mapping of migrations IDs to arrays of maps, + * mapping: + * - "offset": An array of offsets indexing into the `$value` passed to the + * `::transform()` call, to allow the lock(s) acquired to be more + * specific. + * - "hash": An optional string representing a pattern. If provided every + * '#' found will be replaced with hexit resulting from hashing the value + * "offset". */ class LockingMigrationLookup extends ProcessPluginBase implements MigrateProcessInterface, ContainerFactoryPluginInterface { @@ -83,6 +106,19 @@ class LockingMigrationLookup extends ProcessPluginBase implements MigrateProcess */ protected array $lockFiles = []; + protected MigrateStubInterface $migrateStub; + + protected MigrateLookupInterface $migrateLookup; + + protected array $lockContext; + + /** + * @var array|mixed + */ + protected $lockContextKeys; + + protected FileSystemInterface $fileSystem; + /** * Constructor. */ @@ -93,7 +129,10 @@ public function __construct(array $configuration, $plugin_id, $plugin_definition // creating any entities, so there would be no potential for creating // duplicates. $this->doLocking = empty($this->configuration['no_stub']) && - (getenv('DGI_MIGRATE__DO_MIGRATION_LOOKUP_LOCKING') === 'TRUE'); + (getenv('DGI_MIGRATE__DO_MIGRATION_LOOKUP_LOCKING') === 'TRUE') && + !($this->configuration['no_lock'] ?? FALSE); + + $this->lockContextKeys = $this->configuration['lock_context_keys'] ?? []; } /** @@ -120,21 +159,55 @@ protected function getMigrations() : array { /** * List migrations mapped to lock names. * - * @return array - * An associative array mapping migration names to lock names. + * @return \Traversable + * Generated mapping of migration names to lock names. */ - protected function getLockMap() : array { + protected function getLockMap() : \Traversable { if (!isset($this->lockMap)) { $this->lockMap = array_combine( $this->getMigrations(), array_map([$this, 'getLockName'], $this->getMigrations()) ); } + if (!$this->lockMap) { throw new MigrateException('Failed to map migration IDs to lock names.'); } - return $this->lockMap; + if ($this->lockContextKeys) { + $apply_context_keys = function ($migration, $name) { + $parts = ["{$name}-extra_context"]; + + if (isset($this->lockContextKeys[$migration])) { + foreach ($this->lockContextKeys[$migration] as $info) { + $value = NestedArray::getValue($this->lockContext, $info['offset']); + + if (($prefix = ($info['hash'] ?? FALSE))) { + $hash = md5($value, FALSE); + $prefix_offset = 0; + $hash_offset = 0; + while (($prefix_offset = strpos($prefix, '#', $prefix_offset)) !== FALSE) { + $prefix[$prefix_offset++] = $hash[$hash_offset++]; + } + $parts[] = $prefix; + } + else { + $parts[] = $value; + } + } + } + + return implode('/', $parts); + }; + foreach ($this->lockMap as $migration => $original_name) { + yield $migration => $apply_context_keys($migration, $original_name); + } + } + else { + foreach ($this->lockMap as $migration => $name) { + yield $migration => $name; + } + } } /** @@ -147,20 +220,18 @@ protected function getLockMap() : array { * The lock name to use for the given migration. */ protected function getLockName(string $migration_name) : string { - return "dgi_migrate_locking_migration_lookup__migration__$migration_name"; + return "dgi_migrate/locking_migration_lookup/migration/$migration_name"; } /** * Acquire all migration locks. * - * @param float $timeout - * The time for which to acquire the locks. - * * @throws \Drupal\migrate\MigrateException */ - protected function acquireMigrationLocks(float $timeout = 30.0) : void { + protected function acquireMigrationLocks(int $mode = LOCK_EX, bool &$would_block = FALSE) : bool { try { - if (count($this->getLockMap()) > 1) { + $lock_map = iterator_to_array($this->getLockMap()); + if (count($lock_map) > 1) { // More than one, we need to acquire the "control" lock before // proceeding, to avoid potential deadlocks. if (!$this->getControlLock()) { @@ -169,13 +240,20 @@ protected function acquireMigrationLocks(float $timeout = 30.0) : void { } if (!$this->hasMigrationLocks) { + // Don't have 'em yet; initial acquisition. $this->hasMigrationLocks = TRUE; - foreach ($this->getLockMap() as $migration => $lock_name) { - if (!$this->acquireLock($lock_name)) { - throw new MigrateException("Failed to acquire lock for '$migration'."); - } + } + else { + // Attempting to "promote" the locks, no need to set that we have 'em. + } + + foreach ($lock_map as $lock_name) { + if (!$this->acquireLock($lock_name, $mode, $would_block)) { + return FALSE; } } + + return TRUE; } finally { $this->releaseControlLock(); @@ -223,9 +301,13 @@ protected function getControlLock() : bool { protected function getLockFile(string $name) : \SplFileObject { if (!isset($this->lockFiles[$name])) { $file_name = "temporary://{$name}"; + $directory = $this->fileSystem->dirname($file_name); + $basename = $this->fileSystem->basename($file_name); + $this->fileSystem->prepareDirectory($directory, FileSystemInterface::CREATE_DIRECTORY); + $file_name = "{$directory}/{$basename}"; + touch($file_name); - $this->lockFiles[$name] = $file = new \SplFileObject($file_name, 'w'); - $file->fwrite("This is a temporary lock file. If there are no migrations running, it should be safe to delete."); + $this->lockFiles[$name] = $file = new \SplFileObject($file_name, 'a+'); } return $this->lockFiles[$name]; @@ -241,8 +323,8 @@ protected function getLockFile(string $name) : \SplFileObject { * TRUE on success. Should not be able to return FALSE, as we perform this * in a blocking manner. */ - protected function acquireLock(string $name) : bool { - return $this->getLockFile($name)->flock(LOCK_EX); + protected function acquireLock(string $name, int $mode = LOCK_EX, bool &$would_block = FALSE) : bool { + return $this->getLockFile($name)->flock($mode, $would_block); } /** @@ -285,27 +367,187 @@ public function transform($value, MigrateExecutableInterface $migrate_executable } try { - // Acquire locks for all referenced migrations. - $log('Locking migrations.'); - $this->acquireMigrationLocks(); - $log('Locked migrations, running parent.'); + $this->setLockContext((array) $value); + return $this->doTransform($value, $migrate_executable, $row, $destination_property); + } + finally { + // Drop migration locks, if we still have them. + $this->releaseMigrationLocks(); + $this->setLockContext(NULL); + } - // Perform the lookup as per the wrapped transform. - $result = $this->parent->transform($value, $migrate_executable, $row, $destination_property); - $log("Parent run, releasing migration locks."); + } - // Optimistically drop migration locks. - $this->releaseMigrationLocks(); - $log("Releasing migration locks, returning."); - return $result; + /** + * Set values for more-specific locking. + * + * @param mixed $values + * The values to set; or: NULL to reset. + */ + protected function setLockContext($values = []) : void { + if ($values === NULL) { + // Clear the context. + unset($this->lockContext); + } + $values = (array) $values; + $this->lockContext = $values; + } + + /** + * Locking transformation. + * + * Adapted from the core `migration_lookup`, with locks sprinkled in. + * + * @throws \Drupal\migrate\MigrateSkipRowException + * @throws \Drupal\migrate\MigrateException + * + * @see https://git.drupalcode.org/project/drupal/-/blob/9.5.x/core/modules/migrate/src/Plugin/migrate/process/MigrationLookup.php#L194-276 + */ + protected function doTransform($value, MigrateExecutableInterface $migrate_executable, Row $row, $destination_property) { + $context = [ + 'self' => FALSE, + 'source_id_values' => [], + 'lookup_migration_ids' => (array) $this->configuration['migration'], + ]; + $lookup_migration_ids =& $context['lookup_migration_ids']; + + try { + // Acquire shared lock to do the lookup. + $this->acquireMigrationLocks(LOCK_SH); + $destination_ids = $this->doLookup($value, $migrate_executable, $row, $destination_property, $context); + + if (!$destination_ids && !empty($this->configuration['no_stub'])) { + return NULL; + } + + if (!$destination_ids && ($context['self'] || isset($this->configuration['stub_id']) || count($lookup_migration_ids) == 1)) { + // Non-blockingly attempt to promote lock from shared to exclusive. Drop + // shared lock and reacquire as exclusive if we would block, to avoid + // potential deadlock. + $would_block = FALSE; + if (!$this->acquireMigrationLocks(LOCK_EX | LOCK_NB, $would_block) && $would_block) { + $this->releaseMigrationLocks(); + $this->acquireMigrationLocks(LOCK_EX); + + // Attempt lookup again, as something might have populated it while we + // were blocked attempting to acquire the exclusive lock. + $destination_ids = $this->doLookup($value, $migrate_executable, $row, $destination_property, $context); + } + if (!$destination_ids) { + $destination_ids = $this->doStub($context); + } + } + + if ($destination_ids) { + if (count($destination_ids) == 1) { + return reset($destination_ids); + } + else { + return $destination_ids; + } + } } finally { - // Drop migration locks, if we still have them. $this->releaseMigrationLocks(); } } + /** + * Perform the lookup proper. + * + * @return array|null + * The array of destination ID info. + * + * @throws \Drupal\migrate\MigrateException + * @throws \Drupal\migrate\MigrateSkipProcessException + * + * @see https://git.drupalcode.org/project/drupal/-/blob/9.5.x/core/modules/migrate/src/Plugin/migrate/process/MigrationLookup.php#L194-229 + */ + protected function doLookup($value, MigrateExecutableInterface $migrate_executable, Row $row, $destination_property, array &$context) : ?array { + $source_id_values =& $context['source_id_values']; + $lookup_migration_ids =& $context['lookup_migration_ids']; + + $destination_ids = NULL; + foreach ($lookup_migration_ids as $lookup_migration_id) { + $lookup_value = $value; + if ($lookup_migration_id == $this->migration->id()) { + $context['self'] = TRUE; + } + if (isset($this->configuration['source_ids'][$lookup_migration_id])) { + $lookup_value = array_values($row->getMultiple($this->configuration['source_ids'][$lookup_migration_id])); + } + $lookup_value = (array) $lookup_value; + $this->skipInvalid($lookup_value); + $source_id_values[$lookup_migration_id] = $lookup_value; + + // Re-throw any PluginException as a MigrateException so the executable + // can shut down the migration. + try { + $destination_id_array = $this->migrateLookup->lookup($lookup_migration_id, $lookup_value); + } + catch (PluginNotFoundException $e) { + $destination_id_array = []; + } + catch (MigrateException $e) { + throw $e; + } + catch (\Exception $e) { + throw new MigrateException(sprintf('A %s was thrown while processing this migration lookup', gettype($e)), $e->getCode(), $e); + } + + if ($destination_id_array) { + $destination_ids = array_values(reset($destination_id_array)); + break; + } + } + + return $destination_ids; + } + + /** + * Perform stub creation. + * + * @throws \Drupal\migrate\MigrateSkipProcessException + * @throws \Drupal\migrate\MigrateException + * + * @see https://git.drupalcode.org/project/drupal/-/blob/9.5.x/core/modules/migrate/src/Plugin/migrate/process/MigrationLookup.php#L236-267 + */ + protected function doStub(&$context) { + $self =& $context['self']; + $source_id_values =& $context['source_id_values']; + $lookup_migration_ids =& $context['lookup_migration_ids']; + + // If the lookup didn't succeed, figure out which migration will do the + // stubbing. + if ($self) { + $stub_migration = $this->migration->id(); + } + elseif (isset($this->configuration['stub_id'])) { + $stub_migration = $this->configuration['stub_id']; + } + else { + $stub_migration = reset($lookup_migration_ids); + } + // Rethrow any exception as a MigrateException so the executable can shut + // down the migration. + try { + return $this->migrateStub->createStub($stub_migration, $source_id_values[$stub_migration], [], FALSE); + } + catch (\LogicException | PluginNotFoundException $e) { + // For BC reasons, we must allow attempting to stub: + // - a derived migration; and, + // - a non-existent migration. + } + catch (MigrateException | MigrateSkipProcessException $e) { + throw $e; + } + catch (\Exception $e) { + throw new MigrateException(sprintf('%s was thrown while attempting to stub: %s', get_class($e), $e->getMessage()), $e->getCode(), $e); + } + + } + /** * {@inheritDoc} */ @@ -317,8 +559,47 @@ public static function create(ContainerInterface $container, array $configuratio $instance->parent = $process_plugin_manager->createInstance('dgi_migrate_original_migration_lookup', $configuration, $migration); $instance->database = $container->get('database'); $instance->migration = $migration; + $instance->migrateStub = $container->get('migrate.stub'); + $instance->migrateLookup = $container->get('migrate.lookup'); + $instance->fileSystem = $container->get('file_system'); return $instance; } + /** + * Skips the migration process entirely if the value is invalid. + * + * Copypasta from upstream. + * + * @param array $value + * The incoming value to check. + * + * @throws \Drupal\migrate\MigrateSkipProcessException + * + * @see https://git.drupalcode.org/project/drupal/-/blob/9.5.x/core/modules/migrate/src/Plugin/migrate/process/MigrationLookup.php#L279-291 + */ + protected function skipInvalid(array $value) { + if (!array_filter($value, [$this, 'isValid'])) { + throw new MigrateSkipProcessException(); + } + } + + /** + * Determines if the value is valid for lookup. + * + * The only values considered invalid are: NULL, FALSE, [] and "". + * + * @param string $value + * The value to test. + * + * @return bool + * Return true if the value is valid. + * + * @see https://git.drupalcode.org/project/drupal/-/blob/9.5.x/core/modules/migrate/src/Plugin/migrate/process/MigrationLookup.php#L293-306 + */ + protected function isValid($value) { + return !in_array($value, [NULL, FALSE, [], ""], TRUE); + } + + } From dd76f48869532a439a3bc918b25b5df8ebb33a37 Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Mon, 12 Jun 2023 17:46:16 -0300 Subject: [PATCH 27/29] Coding standards. --- .../process/LockingMigrationLookup.php | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/src/Plugin/migrate/process/LockingMigrationLookup.php b/src/Plugin/migrate/process/LockingMigrationLookup.php index b7eb2025..0a4c6e09 100644 --- a/src/Plugin/migrate/process/LockingMigrationLookup.php +++ b/src/Plugin/migrate/process/LockingMigrationLookup.php @@ -106,17 +106,39 @@ class LockingMigrationLookup extends ProcessPluginBase implements MigrateProcess */ protected array $lockFiles = []; + /** + * The migration stub service. + * + * @var \Drupal\migrate\MigrateStubInterface + */ protected MigrateStubInterface $migrateStub; + /** + * The migration lookup service. + * + * @var \Drupal\migrate\MigrateLookupInterface + */ protected MigrateLookupInterface $migrateLookup; + /** + * The value from which to build the lock context. + * + * @var array + */ protected array $lockContext; /** + * Array of lock context. + * * @var array|mixed */ protected $lockContextKeys; + /** + * The file system service. + * + * @var \Drupal\Core\File\FileSystemInterface + */ protected FileSystemInterface $fileSystem; /** @@ -318,6 +340,11 @@ protected function getLockFile(string $name) : \SplFileObject { * * @param string $name * The name of the lock to acquire. + * @param int $mode + * The mode with which to acquire the lock. + * @param bool $would_block + * A reference to a boolean, to be updated if called with LOCK_NB and the + * call _would_ have blocked. * * @return bool * TRUE on success. Should not be able to return FALSE, as we perform this @@ -601,5 +628,4 @@ protected function isValid($value) { return !in_array($value, [NULL, FALSE, [], ""], TRUE); } - } From a565f72dd3c571fe4a7bdaba02038a50712d7858 Mon Sep 17 00:00:00 2001 From: Adam Vessey Date: Thu, 15 Jun 2023 11:57:36 -0300 Subject: [PATCH 28/29] Log the skip, if there's no model defined. --- .../dgi_migrate_foxml_standard_mods/migrations/dgis_nodes.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_nodes.yml b/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_nodes.yml index 450334d8..22bc7a3c 100644 --- a/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_nodes.yml +++ b/modules/dgi_migrate_foxml_standard_mods/migrations/dgis_nodes.yml @@ -72,6 +72,7 @@ process: method: models - plugin: skip_on_empty method: row + message: 'Skipping; no model defined.' title: - plugin: dgi_migrate.subproperty source: '@_node_foxml_parsed' From 62ca0090a4fdcecff97f355f5663bdc2e8a4ca5f Mon Sep 17 00:00:00 2001 From: Noel Chiasson Date: Tue, 11 Jun 2024 11:24:42 -0300 Subject: [PATCH 29/29] Fixing coder complaint about use ordering --- src/Drush/Commands/MigrateCommands.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Drush/Commands/MigrateCommands.php b/src/Drush/Commands/MigrateCommands.php index 3a2b4aee..64cf9c7b 100644 --- a/src/Drush/Commands/MigrateCommands.php +++ b/src/Drush/Commands/MigrateCommands.php @@ -5,8 +5,8 @@ use Consolidation\OutputFormatters\StructuredData\RowsOfFields; use Drupal\Component\Graph\Graph; use Drupal\Core\StringTranslation\StringTranslationTrait; -use Drupal\dgi_migrate\StompQueue; use Drupal\dgi_migrate\MigrateBatchExecutable; +use Drupal\dgi_migrate\StompQueue; use Drupal\migrate\Plugin\MigrationInterface; use Drupal\migrate_tools\Drush\Commands\MigrateToolsCommands; use Drupal\migrate_tools\Drush9LogMigrateMessage;