Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CTDA9-390 / DDST-551: Feature/stomp queue #105

Merged
merged 45 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
d2e0e01
Status skipping and pausability.
adam-vessey Jun 2, 2023
4884cdb
Allow migrations to be skipped.
adam-vessey Jun 2, 2023
4a38db0
Fix defaulting for SKIP_STATUS.
adam-vessey Jun 2, 2023
f0e8705
Theoretical STOMP queueing.
adam-vessey Jun 4, 2023
5a815bd
Right, needs the run number as well.
adam-vessey Jun 4, 2023
7e03160
And yeah, expects the enqueueing to be async, also.
adam-vessey Jun 4, 2023
fe5dd4b
Quote the various variables.
adam-vessey Jun 5, 2023
9c882b5
Some fixes.
adam-vessey Jun 5, 2023
e9595ca
Misc fixes.
adam-vessey Jun 5, 2023
3ddc6da
Remove the patch.
adam-vessey Jun 5, 2023
059635f
Skip the count on the foxml files business.
adam-vessey Jun 5, 2023
395db2e
Explicitly segment queue instead of basing on selectors.
adam-vessey Jun 5, 2023
69f938f
Fix casing.
adam-vessey Jun 5, 2023
fe18945
Rework messaging.
adam-vessey Jun 6, 2023
4c6deb5
Fix up some spacing.
adam-vessey Jun 6, 2023
5ab56ee
Ensure the signal to shut down the queues gets sent.
adam-vessey Jun 6, 2023
26c1633
Use the appropriate method.
adam-vessey Jun 6, 2023
0fc8237
Update comment, as we're no longer dealing with signals.
adam-vessey Jun 6, 2023
e50e1b3
Get rid of a stray semi-colon.
adam-vessey Jun 6, 2023
0fb046c
Merge branch 'feature/stomp-queue' into feature/pausability
adam-vessey Jun 7, 2023
ed004af
Enhance pausability.
adam-vessey Jun 8, 2023
8acbf66
Pull sending the "terminal" messages out of the core.
adam-vessey Jun 8, 2023
e65cf52
Fix up pausing business.
adam-vessey Jun 8, 2023
0a6ad81
Get rid of unused variable.
adam-vessey Jun 8, 2023
44d2026
`flock()`-based locking, instead of Drupal's.
adam-vessey Jun 9, 2023
4cda5d0
Fix little coding standards thing.
adam-vessey Jun 9, 2023
a98697e
Merge pull request #104 from discoverygarden/feature/pausability
jordandukart Jun 9, 2023
9b6367b
Merge remote-tracking branch 'origin/feature/stomp-queue' into featur…
adam-vessey Jun 9, 2023
29ef39e
Merge pull request #106 from discoverygarden/feature/flock-locking
Alexander-Cairns Jun 9, 2023
da7d079
Things together.
adam-vessey Jun 12, 2023
7287d96
Merge remote-tracking branch 'origin/feature/stomp-queue' into featur…
adam-vessey Jun 12, 2023
dd76f48
Coding standards.
adam-vessey Jun 12, 2023
26ca2c4
Merge pull request #107 from discoverygarden/feature/more-specific-locks
nchiasson-dgi Jun 14, 2023
a565f72
Log the skip, if there's no model defined.
adam-vessey Jun 15, 2023
d524bd9
Merge pull request #108 from discoverygarden/fix/log-skipping-due-to-…
nchiasson-dgi Jun 15, 2023
2d747e6
Merge remote-tracking branch 'origin/main' into feature/stomp-queue
adam-vessey Nov 29, 2023
4f296d8
Merge branch 'main' of github.com:discoverygarden/dgi_migrate into fe…
nchiasson-dgi Feb 9, 2024
3e7c555
Merge pull request #125 from nchiasson-dgi/update-stomp-queue
nchiasson-dgi Feb 9, 2024
84d1fec
Merge branch 'main' of github.com:discoverygarden/dgi_migrate into fe…
nchiasson-dgi Jun 11, 2024
62ca009
Fixing coder complaint about use ordering
nchiasson-dgi Jun 11, 2024
3ba3b05
Merge pull request #131 from discoverygarden/feature/stomp-queue-update
nchiasson-dgi Jun 11, 2024
a5cc4e7
Merge branch 'main' of github.com:discoverygarden/dgi_migrate into fe…
nchiasson-dgi Jul 10, 2024
47f0c51
Merge pull request #133 from nchiasson-dgi/feature/stomp-queue
nchiasson-dgi Jul 10, 2024
4e55297
Merge branch 'main' of github.com:discoverygarden/dgi_migrate into fe…
nchiasson-dgi Sep 23, 2024
eac67a1
Merge pull request #144 from nchiasson-dgi/feature/stomp-queue
nchiasson-dgi Sep 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ source:
# validation of the URI in the destination may fail (presumably, naive URI
# validation inside of Drupal, expecting HTTP-like URLs)... or may just
# require a third slash to imply an empty "authority" component?
cache_counts: true
# XXX: For big sets of things, counting could take a substantial amount of
# time, so let's skip it.
skip_count: true
destination:
plugin: entity:file
validate: true
Expand Down
16 changes: 13 additions & 3 deletions modules/dgi_migrate_foxml_standard_mods/migrations/dgis_nodes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@ source:
# to the vocab in which to do the things.
- '@_vid'
- plugin: flatten
- plugin: migration_lookup
- plugin: dgi_migrate.process.locking_migration_lookup
migration: dgis_stub_terms_generic
stub_id: dgis_stub_terms_generic
lock_context_keys:
dgis_stub_terms_generic:
- { offset: [3] }
- { offset: [1], hash: '##'}
extract: &generic_term_extract
plugin: dgi_migrate.process.single_extract
index: [actual]
Expand Down Expand Up @@ -71,6 +75,7 @@ process:
method: models
- plugin: skip_on_empty
method: row
message: 'Skipping; no model defined.'
title:
- plugin: dgi_migrate.subproperty
source: '@_node_foxml_parsed'
Expand Down Expand Up @@ -2703,18 +2708,23 @@ process:
- '@_unspecified_rights_statement'
- plugin: null_coalesce
nid:
- plugin: migration_lookup
- plugin: dgi_migrate.process.locking_migration_lookup
source: '@field_pid'
migration: dgis_stub_nodes
lock_context_keys: &dgis_stub_nodes_lock_context_keys
dgis_stub_nodes:
- { offset: [ 0 ], hash: '#/##' }
field_member_of:
- plugin: flatten
source:
- '@_members'
- '@_constituents'
- plugin: multiple_values
- plugin: migration_lookup
- plugin: dgi_migrate.process.locking_migration_lookup
migration: dgis_stub_nodes
stub_id: dgis_stub_nodes
lock_context_keys:
<< : *dgis_stub_nodes_lock_context_keys
migration_dependencies:
required:
- dgis_foxml_files
Expand Down
38 changes: 38 additions & 0 deletions scripts/env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,41 @@
# PROCESSES: The number of processes to use to run the migration import.
# ---
#PROCESSES=1

# ===
# SKIP_STATUS: Suppress dumping of migration status before/after operations.
# ---
# Default:
#SKIP_STATUS=false
# To skip, uncomment (or equivalently set):
#SKIP_STATUS=true

# ===
# MULTIPROCESS_SKIP_MIGRATIONS: Skip processing the specified migrations.
#
# May be of some use in resuming larger migrations, when we do not wish to
# undertake no-op cycling through other migrations.
# ---
#MULTIPROCESS_SKIP_MIGRATIONS=()

# ===
# MULTIPROCESS_PRE_ENQUEUE_PAUSE: Pause execution before enqueuing these.
#
# Expected to be a Bash array; in this instance, just a set of strings
# (representing migration IDs) between parentheses.
#
# NOTE: The prompt for this presently only shows in the *-import.log; _not_ in
# the main "run" process.
# ---
#MULTIPROCESS_PRE_ENQUEUE_PAUSE=()

# ===
# MULTIPROCESS_POST_PROCESS_PAUSE: Pause execution after finishing these.
#
# Expected to be a Bash array; in this instance, just a set of strings
# (representing migration IDs) between parentheses.
#
# NOTE: The prompt for this presently only shows in the *-import.log; _not_ in
# the main "run" process.
# ---
#MULTIPROCESS_POST_PROCESS_PAUSE=()
99 changes: 86 additions & 13 deletions scripts/util.in
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ function init_vars () {
declare -g TIME=${TIME:-/usr/bin/time}
declare -g LOG_DIR=${LOG_DIR:-$CONFIG_DIR}
declare -g PROCESSES=${PROCESSES:-1}
declare -g SKIP_STATUS=${SKIP_STATUS:-false}
declare -g -a MULTIPROCESS_SKIP_MIGRATIONS=(${MULTIPROCESS_SKIP_MIGRATIONS[@]})
declare -g -a MULTIPROCESS_PRE_ENQUEUE_PAUSE=(${MULTIPROCESS_PRE_ENQUEUE_PAUSE[@]})
declare -g -a MULTIPROCESS_POST_PROCESS_PAUSE=(${MULTIPROCESS_POST_PROCESS_PAUSE[@]})

# Initialize the log directory.
if ! [ -d "$LOG_DIR" ]; then
Expand Down Expand Up @@ -96,6 +100,21 @@ function do_migration_single_process() {
timedwwwdrush dgi-migrate:import "--root=$DRUPAL_ROOT" "--uri=$URI" "--user=$DRUPAL_USER" "--group=$MIGRATION_GROUP" "${@:2}"
}

# Helper; facilitate pausing for various reasons (likely snapshotting).
#
# Positional args:
# - 1: A descriptive string of _when_ we are pausing; e.g. "pre-enqueue",
# "post-enqueue", etc.
# - 2: The ID of the specific migration during which we are pausing.
function do_pause() {
local WHEN=$1
local MIGRATION_ID=$2
local DISCARD

read -ep "Pausing $WHEN of $MIGRATION_ID as requested. Hit enter to continue." DISCARD
echo "DISCARD is $DISCARD"
}

# Kick off a migration in multiple processes.
#
# Positional args:
Expand All @@ -106,23 +125,77 @@ function do_migration_multi_process() {
local NUM=${1}

local PROCESS_LOG_DIR="$LOG_DIR/$NUM-multiprocess-logs"
local STOP_LOCK_FILE="$LOG_DIR/$NUM-stop.lock"
local PAUSE_LOCK_FILE="$LOG_DIR/$NUM-pause.lock"

wwwdo mkdir -p $PROCESS_LOG_DIR
wwwdo touch $STOP_LOCK_FILE
wwwdo touch $PAUSE_LOCK_FILE

echo "Listing migrations..."
timedwwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --format=string \
| sort -n --key=2 | cut -f1 | while read MIGRATION_ID; do
local -a migrations=($(wwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --field=id --sort))
for MIGRATION_ID in ${migrations[@]}; do
if [ ! -f $PAUSE_LOCK_FILE ] ; then
echo "Pause lock file removed."
do_pause "pre-enqueue" "$MIGRATION_ID"
elif [ ! -f $STOP_LOCK_FILE ] ; then
echo "Stop lock file removed; exiting before touching $MIGRATION_ID."
return
elif [[ " ${MULTIPROCESS_SKIP_MIGRATIONS[@]} " =~ " $MIGRATION_ID " ]]; then
echo "Skipping $MIGRATION_ID as requested."
continue
elif [[ " ${MULTIPROCESS_PRE_ENQUEUE_PAUSE[@]} " =~ " $MIGRATION_ID " ]]; then
do_pause "pre-enqueue" "$MIGRATION_ID"
fi

echo "Enqueuing items for $MIGRATION_ID"
timedwwwdrush dgi-migrate:enqueue --user=$DRUPAL_USER $MIGRATION_ID "${@:2}"
timedwwwdrush dgi-migrate:enqueue "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" &
local ENQUEUEING_JOB=$!

echo "Starting $PROCESSES processes to process $MIGRATION_ID."
local -a PROCESS_JOBS=()
for i in $(seq 1 $PROCESSES); do
echo "Starting $i/$PROCESSES to process $MIGRATION_ID."
timedwwwdrush dgi-migrate:enqueued-process --user=$DRUPAL_USER $MIGRATION_ID "${@:2}" &> $PROCESS_LOG_DIR/$MIGRATION_ID.$i.log &
timedwwwdrush dgi-migrate:enqueued-process "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}" &> $PROCESS_LOG_DIR/"$MIGRATION_ID.$i.log" &
PROCESS_JOBS+=($!)
done

wait $ENQUEUEING_JOB
echo "Work enqueueing finished; enqueueing terminal messages."
for i in $(seq 1 $PROCESSES); do
wwwdrush dgi-migrate:enqueue-terminal "$MIGRATION_ID" "$NUM"
done
echo "Waiting for processes to exit..."
wait
timedwwwdrush dgi-migrate:finish-enqueued-process --user=1 $MIGRATION_ID "${@:2}"

echo "Terminal messages enqueued; waiting for workers to finish..."
wait ${PROCESS_JOBS[@]}
echo "Workers exited."

if [ ! -f $PAUSE_LOCK_FILE ] ; then
do_pause "post-process, pre-finalize" "$MIGRATION_ID"
elif [ ! -f $STOP_LOCK_FILE ] ; then
echo "Lock file removed; exiting without finalizing batch."
return
fi

echo "Finalizing $MIGRATION_ID."
timedwwwdrush dgi-migrate:finish-enqueued-process "--user=$DRUPAL_USER" "--run=$NUM" "$MIGRATION_ID" "${@:2}"
echo "Finished $MIGRATION_ID."
if [[ " ${MULTIPROCESS_POST_PROCESS_PAUSE[@]} " =~ " $MIGRATION_ID " ]] ; then
do_pause "post-process" "$MIGRATION_ID"
fi

done
wwwdo rm $STOP_LOCK_FILE $PAUSE_LOCK_FILE
}

# Dump status for the given migration group.
#
# Can be skipped if SKIP_STATUS=true; in which case the call to this should be
# no-op.
function dump_status() {
if [ $SKIP_STATUS != 'true' ]; then
wwwdrush migrate:status --group=$MIGRATION_GROUP
fi
}

# Handle kicking off a migration.
Expand All @@ -146,7 +219,7 @@ function do_migration () {
# (lookin' at you, dgi_migrate_foxml_standard_mods_xslt dealio)
wwwdrush cache:rebuild
# Dump status before run.
wwwdrush migrate:status --group=$MIGRATION_GROUP
dump_status
{
# Do the import, one way or another.
if [ $PROCESSES -eq 1 ]; then
Expand All @@ -156,10 +229,10 @@ function do_migration () {
fi
} |& wwwdo tee $IMPORT_LOG > /dev/null
# Dump status after run.
wwwdrush migrate:status --group=$MIGRATION_GROUP
dump_status
# Dump messages after run, so they're not lost with a subsequent run.
wwwdo mkdir -p $MESSAGES_DIR
wwwdrush migrate:status --group=$MIGRATION_GROUP --field=id --format=string | \
wwwdrush dgi-migrate:list-migrations "--group=$MIGRATION_GROUP" --field=id | \
while read NAME ; do
wwwdrush migrate:messages --format=json $NAME | wwwdo tee "$MESSAGES_DIR/$NAME.json" > /dev/null
done
Expand Down Expand Up @@ -202,11 +275,11 @@ function do_rollback () {
# (lookin' at you, things involving Fedora dealio)
wwwdrush cache:rebuild
# Dump status before rollback.
wwwdrush migrate:status --group=$MIGRATION_GROUP
dump_status
# The base rollback.
timedwwwdrush dgi-migrate:rollback --user=$DRUPAL_USER --group=$MIGRATION_GROUP "${@:2}" |& wwwdo tee $ROLLBACK_LOG > /dev/null
timedwwwdrush dgi-migrate:rollback --user=$DRUPAL_USER --group=$MIGRATION_GROUP "--run=$NUM" "${@:2}" |& wwwdo tee $ROLLBACK_LOG > /dev/null
# Dump status after rollback.
wwwdrush migrate:status --group=$MIGRATION_GROUP
dump_status
set +x
} |& wwwdo tee $RUN_LOG

Expand Down
Loading
Loading