Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bigpicture] data replication services #365

Merged
merged 34 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
6b9dc56
Copy `backup` from sda-pipeline as `sync`
jbygdell Oct 17, 2023
227f890
[sync] make it work with the merged code base
jbygdell Oct 18, 2023
3549dff
[sync ] remove the `CopyHeader`config option
jbygdell Oct 19, 2023
10229e3
[config] Rename `backupPubKey` to `syncPubKeyPath`
jbygdell Oct 19, 2023
37c599c
[test][config] validate sync config
jbygdell Oct 19, 2023
34dee63
[test][config] validte `GetC4GHpublicKey`
jbygdell Oct 19, 2023
1ec61ca
[test][config] validte `GetC4GHkey`
jbygdell Oct 19, 2023
0bbf617
[internal][database] make `GetHeaderForStableID` return byte array
jbygdell Oct 20, 2023
8a761c8
[sync] cleanup error messages
jbygdell Oct 20, 2023
07e6e41
[sync] No need to send an message after completing the work
jbygdell Oct 20, 2023
605dadd
[Integration test] Add case for sync
jbygdell Oct 20, 2023
ac6cb63
[sync] cleanup Readme.md
jbygdell Nov 3, 2023
68ad267
Copy sync-api related files from old sda-pipline repo
jbygdell Oct 17, 2023
e4af272
[sync-api] make it work with the merged code base
jbygdell Oct 23, 2023
9d68a9b
[sync-api][test] convert to run as suite.
jbygdell Oct 25, 2023
a09a80a
[sync] switch to read messages from the `mapping_stream`
jbygdell Nov 7, 2023
56129a7
[sync] send POST after files have been synced
jbygdell Nov 7, 2023
3d5c27f
[sync] Add dataset ID validation
jbygdell Nov 7, 2023
4356d49
[sync-api] remove everything database related
jbygdell Nov 7, 2023
5fe6fab
[config] remove remote end from sync api
jbygdell Nov 7, 2023
4b39d4b
[sync-api] cleanup error messages
jbygdell Nov 9, 2023
fa15d0e
[config] fix sync and sync-api
jbygdell Nov 9, 2023
41ebaf7
[schemas] encrypted checksums are not mandatory for ingestion-trigger
jbygdell Nov 9, 2023
788a151
[schemas] set correct url in json files
jbygdell Nov 9, 2023
8187bdc
[finalize] nack message if `GetFilestatus` fails
jbygdell Nov 9, 2023
1c371f2
[Go.mod] add missing library
jbygdell Nov 9, 2023
cbad793
[integration test] update test for data sync
jbygdell Nov 9, 2023
546f740
[integration test] add case for sync-api
jbygdell Nov 9, 2023
40fd512
Cleanup after rebase
jbygdell Nov 9, 2023
8d346ea
[Integration test] fix after rebase
jbygdell Nov 9, 2023
3c292b8
[sync] [sync-api] Update readme
jbygdell Nov 9, 2023
7fd60da
[json] for isolated setup we don't care for MD5 sums
jbygdell Nov 15, 2023
d8a41cc
[sync-api] create correct correlation ids
jbygdell Nov 15, 2023
a1db42b
[sync-api] fix comment
jbygdell Nov 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/integration/scripts/make_sda_credentials.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pip install aiohttp Authlib joserfc requests > /dev/null
for n in download finalize inbox ingest mapper sync verify; do
echo "creating credentials for: $n"
psql -U postgres -h postgres -d sda -c "ALTER ROLE $n LOGIN PASSWORD '$n';"
psql -U postgres -h postgres -d sda -c "GRANT base TO $n;"

## password and permissions for MQ
body_data=$(jq -n -c --arg password "$n" --arg tags none '$ARGS.named')
Expand Down Expand Up @@ -60,6 +61,11 @@ if [ ! -f "/shared/c4gh.sec.pem" ]; then
curl -s -L https://github.com/neicnordic/crypt4gh/releases/download/v1.7.4/crypt4gh_linux_x86_64.tar.gz | tar -xz -C /shared/ && chmod +x /shared/crypt4gh
/shared/crypt4gh generate -n /shared/c4gh -p c4ghpass
fi
if [ ! -f "/shared/sync.sec.pem" ]; then
echo "creating crypth4gh key"
curl -s -L https://github.com/neicnordic/crypt4gh/releases/download/v1.7.4/crypt4gh_linux_x86_64.tar.gz | tar -xz -C /shared/ && chmod +x /shared/crypt4gh
/shared/crypt4gh generate -n /shared/sync -p syncPass
fi

if [ ! -f "/shared/keys/ssh" ]; then
ssh-keygen -o -a 256 -t ed25519 -f /shared/keys/ssh -N ""
Expand Down
47 changes: 47 additions & 0 deletions .github/integration/sda-s3-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,49 @@ services:
- ./sda/config.yaml:/config.yaml
- shared:/shared

sync:
image: ghcr.io/neicnordic/sensitive-data-archive:PR${PR_NUMBER}
command: [ sda-sync ]
container_name: sync
depends_on:
credentials:
condition: service_completed_successfully
minio:
condition: service_healthy
postgres:
condition: service_healthy
rabbitmq:
condition: service_healthy
environment:
- BROKER_PASSWORD=sync
- BROKER_USER=sync
- BROKER_QUEUE=mapping_stream
- DB_PASSWORD=sync
- DB_USER=sync
restart: always
volumes:
- ./sda/config.yaml:/config.yaml
- shared:/shared

sync-api:
image: ghcr.io/neicnordic/sensitive-data-archive:PR${PR_NUMBER}
command: [ sda-syncapi ]
container_name: sync-api
depends_on:
credentials:
condition: service_completed_successfully
rabbitmq:
condition: service_healthy
environment:
- BROKER_PASSWORD=sync
- BROKER_USER=sync
- BROKER_EXCHANGE=sda.dead
ports:
- "18080:8080"
restart: always
volumes:
- ./sda/config.yaml:/config.yaml

oidc:
container_name: oidc
command:
Expand Down Expand Up @@ -250,6 +293,10 @@ services:
condition: service_started
s3inbox:
condition: service_started
sync:
condition: service_started
sync-api:
condition: service_started
verify:
condition: service_started
environment:
Expand Down
21 changes: 21 additions & 0 deletions .github/integration/sda/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,32 @@ db:
c4gh:
filePath: /shared/c4gh.sec.pem
passphrase: "c4ghpass"
syncPubKeyPath: /shared/sync.pub.pem

server:
cert: ""
key: ""
jwtpubkeypath: "/shared/keys/pub/"
jwtpubkeyurl: "http://oidc:8080/jwk"

sync:
api:
password: "pass"
user: "user"
centerPrefix: "SYNC"
destination:
type: "s3"
url: "http://s3"
port: 9000
readypath: "/minio/health/ready"
accessKey: "access"
secretKey: "secretKey"
bucket: "sync"
region: "us-east-1"
remote:
host: "http://sync-api"
port: "8080"
password: "pass"
user: "user"

schema.type: "isolated"
24 changes: 12 additions & 12 deletions .github/integration/tests/sda/10_upload_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ done
psql -U postgres -h postgres -d sda -At -c "TRUNCATE TABLE sda.files CASCADE;"

if [ "$STORAGETYPE" = "posix" ]; then
for file in NA12878.bam NA12878_20k_b37.bam; do
for file in NA12878.bam NA12878_20k_b37.bam NA12878.bai NA12878_20k_b37.bai; do
echo "downloading $file"
curl -s -L -o /shared/$file "https://github.com/ga4gh/htsget-refserver/raw/main/data/gcp/gatk-test-data/wgs_bam/$file"
if [ ! -f "$file.c4gh" ]; then
Expand Down Expand Up @@ -70,7 +70,7 @@ fi
if [ "$STORAGETYPE" = "s3" ]; then
pip -q install s3cmd

for file in NA12878.bam NA12878_20k_b37.bam; do
for file in NA12878.bam NA12878_20k_b37.bam NA12878.bai NA12878_20k_b37.bai; do
curl -s -L -o /shared/$file "https://github.com/ga4gh/htsget-refserver/raw/main/data/gcp/gatk-test-data/wgs_bam/$file"
if [ ! -f "$file.c4gh" ]; then
yes | /shared/crypt4gh encrypt -p c4gh.pub.pem -f "$file"
Expand All @@ -87,7 +87,7 @@ fi

echo "waiting for upload to complete"
RETRY_TIMES=0
until [ "$(curl -s -k -u guest:guest $URI/api/queues/sda/inbox | jq -r '."messages_ready"')" -eq 4 ]; do
until [ "$(curl -s -k -u guest:guest $URI/api/queues/sda/inbox | jq -r '."messages_ready"')" -eq 6 ]; do
echo "waiting for upload to complete"
RETRY_TIMES=$((RETRY_TIMES + 1))
if [ "$RETRY_TIMES" -eq 30 ]; then
Expand All @@ -99,14 +99,14 @@ done

if [ "$STORAGETYPE" = "s3" ]; then
num_rows=$(psql -U postgres -h postgres -d sda -At -c "SELECT COUNT(*) from sda.files;")
if [ "$num_rows" -ne 3 ]; then
echo "database queries for register_files failed, expected 3 got $num_rows"
if [ "$num_rows" -ne 5 ]; then
echo "database queries for register_files failed, expected 5 got $num_rows"
exit 1
fi

num_log_rows=$(psql -U postgres -h postgres -d sda -At -c "SELECT COUNT(*) from sda.file_event_log;")
if [ "$num_log_rows" -ne 8 ]; then
echo "database queries for file_event_logs failed, expected 8 got $num_log_rows"
if [ "$num_log_rows" -ne 12 ]; then
echo "database queries for file_event_logs failed, expected 12 got $num_log_rows"
exit 1
fi

Expand All @@ -120,7 +120,7 @@ if [ "$STORAGETYPE" = "s3" ]; then
## verify that messages exists in MQ
echo "waiting for upload to complete"
RETRY_TIMES=0
until [ "$(curl -s -k -u guest:guest $URI/api/queues/sda/inbox | jq -r '."messages_ready"')" -eq 5 ]; do
until [ "$(curl -s -k -u guest:guest $URI/api/queues/sda/inbox | jq -r '."messages_ready"')" -eq 7 ]; do
echo "waiting for upload to complete"
RETRY_TIMES=$((RETRY_TIMES + 1))
if [ "$RETRY_TIMES" -eq 30 ]; then
Expand All @@ -131,14 +131,14 @@ if [ "$STORAGETYPE" = "s3" ]; then
done

num_rows=$(psql -U postgres -h postgres -d sda -At -c "SELECT COUNT(*) from sda.files;")
if [ "$num_rows" -ne 4 ]; then
echo "database queries for register_files failed, expected 4 got $num_rows"
if [ "$num_rows" -ne 6 ]; then
echo "database queries for register_files failed, expected 6 got $num_rows"
exit 1
fi

num_log_rows=$(psql -U postgres -h postgres -d sda -At -c "SELECT COUNT(*) from sda.file_event_log;")
if [ "$num_log_rows" -ne 10 ]; then
echo "database queries for file_event_logs failed, expected 10 got $num_log_rows"
if [ "$num_log_rows" -ne 14 ]; then
echo "database queries for file_event_logs failed, expected 14 got $num_log_rows"
exit 1
fi
fi
Expand Down
4 changes: 2 additions & 2 deletions .github/integration/tests/sda/20_ingest-verify_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ set -e

cd shared || true

for file in NA12878.bam NA12878_20k_b37.bam; do
for file in NA12878.bam NA12878_20k_b37.bam NA12878.bai NA12878_20k_b37.bai; do
ENC_SHA=$(sha256sum "$file.c4gh" | cut -d' ' -f 1)
ENC_MD5=$(md5sum "$file.c4gh" | cut -d' ' -f 1)

Expand Down Expand Up @@ -59,7 +59,7 @@ done

echo "waiting for verify to complete"
RETRY_TIMES=0
until [ "$(curl -su guest:guest http://rabbitmq:15672/api/queues/sda/verified/ | jq -r '.messages_ready')" -eq 2 ]; do
until [ "$(curl -su guest:guest http://rabbitmq:15672/api/queues/sda/verified/ | jq -r '.messages_ready')" -eq 4 ]; do
echo "waiting for verify to complete"
RETRY_TIMES=$((RETRY_TIMES + 1))
if [ "$RETRY_TIMES" -eq 30 ]; then
Expand Down
6 changes: 3 additions & 3 deletions .github/integration/tests/sda/21_cancel_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ curl -k -u guest:guest "http://rabbitmq:15672/api/exchanges/sda/sda/publish" \
-d "$ingest_body"

RETRY_TIMES=0
until [ "$(curl -su guest:guest http://rabbitmq:15672/api/queues/sda/verified/ | jq -r '.messages_ready')" -eq 3 ]; do
echo "waiting for verify to complete"
until [ "$(curl -su guest:guest http://rabbitmq:15672/api/queues/sda/verified/ | jq -r '.messages_ready')" -eq 5 ]; do
echo "waiting for verify to complete after re-ingestion"
RETRY_TIMES=$((RETRY_TIMES + 1))
if [ "$RETRY_TIMES" -eq 30 ]; then
echo "::error::Time out while waiting for verify to complete"
echo "::error::Time out while waiting for verify to complete after re-ingestion"
exit 1
fi
sleep 2
Expand Down
13 changes: 9 additions & 4 deletions .github/integration/tests/sda/30_backup-finalize_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ set -e
cd shared || true

i=1
while [ $i -le 2 ]; do
while [ $i -le 4 ]; do
## get correlation id from upload message
MSG=$(
curl -s -X POST \
Expand All @@ -28,12 +28,17 @@ while [ $i -le 2 ]; do
'$ARGS.named'
)

accession_id=EGAF7490000000$i
if [[ "$filepath" == *.bai.c4gh ]]; then
accession_id="SYNC-123-0000$i"
fi

accession_payload=$(
jq -r -c -n \
--arg type accession \
--arg user "$user" \
--arg filepath "$filepath" \
--arg accession_id "EGAF7490000000$i" \
--arg accession_id "$accession_id" \
--argjson decrypted_checksums "$decrypted_checksums" \
'$ARGS.named|@base64'
)
Expand All @@ -58,7 +63,7 @@ done

echo "waiting for finalize to complete"

until [ "$(curl -su guest:guest http://rabbitmq:15672/api/queues/sda/completed/ | jq -r '.messages_ready')" -eq 2 ]; do
until [ "$(curl -su guest:guest http://rabbitmq:15672/api/queues/sda/completed/ | jq -r '.messages_ready')" -eq 4 ]; do
echo "waiting for finalize to complete"
RETRY_TIMES=$((RETRY_TIMES + 1))
if [ "$RETRY_TIMES" -eq 30 ]; then
Expand Down Expand Up @@ -88,7 +93,7 @@ socket_timeout = 30
EOD

# check DB for archive file names
for file in NA12878.bam.c4gh NA12878_20k_b37.bam.c4gh; do
for file in NA12878.bam.c4gh NA12878.bai.c4gh NA12878_20k_b37.bam.c4gh NA12878_20k_b37.bai.c4gh; do
archiveName=$(psql -U postgres -h postgres -d sda -At -c "SELECT archive_file_path from sda.files where submission_file_path = 'test_dummy.org/$file';")
size=$(s3cmd -c direct ls s3://backup/"$archiveName" | tr -s ' ' | cut -d ' ' -f 3)
if [ "$size" -eq 0 ]; then
Expand Down
52 changes: 47 additions & 5 deletions .github/integration/tests/sda/40_mapper_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ curl -s -u guest:guest "http://rabbitmq:15672/api/exchanges/sda/sda/publish" \

# check DB for dataset contents
RETRY_TIMES=0
until [ "$(psql -U postgres -h postgres -d sda -At -c "select count(id) from sda.file_dataset where dataset_id = (select id from sda.datasets where stable_id = 'EGAD74900000101')")" -eq 2 ]; do
until [ "$(psql -U postgres -h postgres -d sda -At -c "select count(id) from sda.file_dataset where dataset_id = (select id from sda.datasets where stable_id = 'EGAD74900000101');")" -eq 2 ]; do
echo "waiting for mapper to complete"
RETRY_TIMES=$((RETRY_TIMES + 1))
if [ "$RETRY_TIMES" -eq 30 ]; then
Expand All @@ -63,7 +63,7 @@ for file in NA12878.bam.c4gh NA12878_20k_b37.bam.c4gh; do
fi
done

until [ "$(psql -U postgres -h postgres -d sda -At -c "select event from sda.file_event_log where file_id = (select id from sda.files where stable_id = 'EGAF74900000002') order by started_at DESC LIMIT 1")" = "ready" ]; do
until [ "$(psql -U postgres -h postgres -d sda -At -c "select event from sda.file_event_log where file_id = (select id from sda.files where stable_id = 'EGAF74900000002') order by started_at DESC LIMIT 1;")" = "ready" ]; do
echo "waiting for files be ready"
RETRY_TIMES=$((RETRY_TIMES + 1))
if [ "$RETRY_TIMES" -eq 30 ]; then
Expand All @@ -73,7 +73,7 @@ until [ "$(psql -U postgres -h postgres -d sda -At -c "select event from sda.fil
sleep 2
done

until [ "$(psql -U postgres -h postgres -d sda -At -c "select event from sda.dataset_event_log where dataset_id = 'EGAD74900000101' order by event_date DESC LIMIT 1")" = "registered" ]; do
until [ "$(psql -U postgres -h postgres -d sda -At -c "select event from sda.dataset_event_log where dataset_id = 'EGAD74900000101' order by event_date DESC LIMIT 1;")" = "registered" ]; do
echo "waiting for dataset be registered"
RETRY_TIMES=$((RETRY_TIMES + 1))
if [ "$RETRY_TIMES" -eq 30 ]; then
Expand Down Expand Up @@ -108,7 +108,7 @@ curl -s -u guest:guest "http://rabbitmq:15672/api/exchanges/sda/sda/publish" \
-H 'Content-Type: application/json;charset=UTF-8' \
-d "$release_body"

until [ "$(psql -U postgres -h postgres -d sda -At -c "select event from sda.dataset_event_log where dataset_id = 'EGAD74900000101' order by event_date DESC LIMIT 1")" = "released" ]; do
until [ "$(psql -U postgres -h postgres -d sda -At -c "select event from sda.dataset_event_log where dataset_id = 'EGAD74900000101' order by event_date DESC LIMIT 1;")" = "released" ]; do
echo "waiting for dataset be released"
RETRY_TIMES=$((RETRY_TIMES + 1))
if [ "$RETRY_TIMES" -eq 30 ]; then
Expand Down Expand Up @@ -153,4 +153,46 @@ until [ "$(psql -U postgres -h postgres -d sda -At -c "select event from sda.dat
sleep 2
done

echo "dataset deprecated successfully"
echo "dataset deprecated successfully"

mappings=$(
jq -c -n \
'$ARGS.positional' \
--args "SYNC-123-00003" \
--args "SYNC-123-00004"
)

mapping_payload=$(
jq -r -c -n \
--arg type mapping \
--arg dataset_id SYNC-001-12345 \
--argjson accession_ids "$mappings" \
'$ARGS.named|@base64'
)

mapping_body=$(
jq -c -n \
--arg vhost test \
--arg name sda \
--argjson properties "$properties" \
--arg routing_key "mappings" \
--arg payload_encoding base64 \
--arg payload "$mapping_payload" \
'$ARGS.named'
)

curl -s -u guest:guest "http://rabbitmq:15672/api/exchanges/sda/sda/publish" \
-H 'Content-Type: application/json;charset=UTF-8' \
-d "$mapping_body"

# check DB for dataset contents
RETRY_TIMES=0
until [ "$(psql -U postgres -h postgres -d sda -At -c "select count(id) from sda.file_dataset where dataset_id = (select id from sda.datasets where stable_id = 'SYNC-001-12345')")" -eq 2 ]; do
echo "waiting for mapper to complete"
RETRY_TIMES=$((RETRY_TIMES + 1))
if [ "$RETRY_TIMES" -eq 30 ]; then
echo "::error::Time out while waiting for dataset to be mapped"
exit 1
fi
sleep 2
done
35 changes: 35 additions & 0 deletions .github/integration/tests/sda/45_sync_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/bin/bash
set -e

cd shared || true

if [ "$STORAGETYPE" = "posix" ]; then
exit 0
fi

# check bucket for synced files
for file in NA12878.bai NA12878_20k_b37.bai; do
RETRY_TIMES=0
until [ "$(s3cmd -c direct ls s3://sync/test_dummy.org/"$file")" != "" ]; do
RETRY_TIMES=$((RETRY_TIMES + 1))
if [ "$RETRY_TIMES" -eq 30 ]; then
echo "::error::Time out while waiting for files to be synced"
exit 1
fi
sleep 2
done
done

echo "files synced successfully"

echo "waiting for sync-api to send messages"
RETRY_TIMES=0
until [ "$(curl -su guest:guest http://rabbitmq:15672/api/queues/sda/catch_all.dead/ | jq -r '.messages_ready')" -eq 5 ]; do
echo "waiting for sync-api to send messages"
RETRY_TIMES=$((RETRY_TIMES + 1))
if [ "$RETRY_TIMES" -eq 30 ]; then
echo "::error::Time out while waiting for sync-api to send messages"
exit 1
fi
sleep 2
done
Loading
Loading