Skip to content

Commit

Permalink
Configure dummy pipeline to emit warnings/errors (#984)
Browse files Browse the repository at this point in the history
* feat: Add --dry-run to deploy.py

* Use --with-warnings/errors in server deployment

* Make warnings/errors random in dummy processing

* Add flag to enable random errors in dummy pipeline

* Refactor deployment template to remove server env check
  • Loading branch information
corneliusroemer authored Feb 12, 2024
1 parent 87591b1 commit e53ead0
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 21 deletions.
37 changes: 25 additions & 12 deletions deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import subprocess
import time
from pathlib import Path
import json
import os
import yaml

Expand All @@ -28,6 +27,7 @@

parser = argparse.ArgumentParser(description='Manage k3d cluster and helm installations.')
subparsers = parser.add_subparsers(dest='subcommand', required=True, help='Subcommands')
parser.add_argument('--dry-run', action='store_true', help='Print commands instead of executing them')

cluster_parser = subparsers.add_parser('cluster', help='Start the k3d cluster')
cluster_parser.add_argument('--dev', action='store_true',
Expand All @@ -52,6 +52,16 @@

args = parser.parse_args()

def run_command(command: list[str], **kwargs):
if args.dry_run:
if isinstance(command, str):
print(command)
else:
print(" ".join(map(str,command)))
return subprocess.CompletedProcess(args=command, returncode=0, stdout="", stderr="")
else:
return subprocess.run(command, **kwargs)


def main():
if args.subcommand == 'cluster':
Expand All @@ -70,13 +80,13 @@ def handle_cluster():
remove_port(BACKEND_PORT_MAPPING)
if args.delete:
print(f"Deleting cluster '{CLUSTER_NAME}'.")
subprocess.run(['k3d', 'cluster', 'delete', CLUSTER_NAME])
run_command(['k3d', 'cluster', 'delete', CLUSTER_NAME])
return

if cluster_exists(CLUSTER_NAME):
print(f"Cluster '{CLUSTER_NAME}' already exists.")
else:
subprocess.run(f"k3d cluster create {CLUSTER_NAME} {' '.join(PORTS)} --agents 2",
run_command(f"k3d cluster create {CLUSTER_NAME} {' '.join(PORTS)} --agents 2",
shell=True, check=True)

while not is_traefik_running():
Expand All @@ -87,13 +97,13 @@ def handle_cluster():

def is_traefik_running(namespace='kube-system', label='app.kubernetes.io/name=traefik'):
try:
result = subprocess.run(['kubectl', 'get', 'pods', '-n', namespace, '-l', label],
result = run_command(['kubectl', 'get', 'pods', '-n', namespace, '-l', label],
capture_output=True, text=True)
if result.returncode != 0:
print(f"Error executing kubectl: {result.stderr}")
return False

if 'Running' in result.stdout:
if 'Running' in result.stdout or args.dry_run:
return True
except subprocess.SubprocessError as e:
print(f"Error checking Traefik status: {e}")
Expand All @@ -106,13 +116,13 @@ def remove_port(port_mapping):


def cluster_exists(cluster_name):
result = subprocess.run(['k3d', 'cluster', 'list'], capture_output=True, text=True)
result = run_command(['k3d', 'cluster', 'list'], capture_output=True, text=True)
return cluster_name in result.stdout


def handle_helm():
if args.uninstall:
subprocess.run(['helm', 'uninstall', HELM_RELEASE_NAME], check=True)
run_command(['helm', 'uninstall', HELM_RELEASE_NAME], check=True)

return

Expand Down Expand Up @@ -141,22 +151,22 @@ def handle_helm():
if get_codespace_name():
parameters += ['--set', "codespaceName="+get_codespace_name()]

subprocess.run(parameters, check=True)
run_command(parameters, check=True)


def get_docker_config_json():
if args.dockerconfigjson:
return args.dockerconfigjson
else:
command = subprocess.run('base64 ~/.docker/config.json', capture_output=True, text=True, shell=True)
command = run_command('base64 ~/.docker/config.json', capture_output=True, text=True, shell=True)
return command.stdout.replace('\n', '')


def handle_helm_upgrade():
parameters = [
'helm', 'upgrade', HELM_RELEASE_NAME, HELM_CHART_DIR,
]
subprocess.run(parameters, check=True)
run_command(parameters, check=True)

def get_codespace_name():
return os.environ.get('CODESPACE_NAME', None)
Expand All @@ -178,7 +188,7 @@ def generate_configs():
runtime_config_path = TEMP_DIR / 'runtime_config.json'
generate_config(helm_chart, 'templates/loculus-website-config.yaml', runtime_config_path, codespace_name)

subprocess.run(['python', 'kubernetes/config-processor/config-processor.py', TEMP_DIR, output_dir], check=True)
run_command(['python', 'kubernetes/config-processor/config-processor.py', TEMP_DIR, output_dir], check=True)

def generate_config(helm_chart, template, output_path, codespace_name=None):
helm_template_cmd = ['helm', 'template', 'name-does-not-matter', helm_chart, '--show-only', template]
Expand All @@ -189,7 +199,10 @@ def generate_config(helm_chart, template, output_path, codespace_name=None):
helm_template_cmd.extend(['--set', 'environment=local'])
helm_template_cmd.extend(['--set', 'disableWebsite=true'])
helm_template_cmd.extend(['--set', 'disableBackend=true'])
helm_output = subprocess.run(helm_template_cmd, capture_output=True, text=True, check=True).stdout
helm_output = run_command(helm_template_cmd, capture_output=True, text=True, check=True).stdout
if args.dry_run:
return

parsed_yaml = yaml.safe_load(helm_output)
config_data = parsed_yaml['data'][output_path.name]

Expand Down
18 changes: 11 additions & 7 deletions kubernetes/loculus/templates/loculus-preprocessing-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
"http://host.k3d.internal:8079"
"http://loculus-backend-service:8079"
}}
{{- $keycloakHost := printf "authentication-%s" .Values.host }}

{{- $keycloakHost := printf "https://authentication-%s" $.Values.host }}
{{- if not .Values.disablePreprocessing }}
{{- range $key, $value := .Values.instances }}
---
Expand Down Expand Up @@ -35,11 +34,16 @@ spec:
- "{{ $arg }}"
{{- end }}
- "--backend-host={{ $backendHost }}/{{ $key }}"
{{- if eq $.Values.environment "server" }}
- "--keycloak-host=https://{{ $keycloakHost}}"
{{- else }}
- "--keycloak-host=http://loculus-keycloak-service:8083"
{{- end -}}
- "--keycloak-host={{ $keycloakHost }}"
{{- if $value.preprocessing.warnings }}
- "--withWarnings"
{{- end }}
{{- if $value.preprocessing.errors }}
- "--withErrors"
{{- end }}
{{- if $value.preprocessing.randomWarnError }}
- "--randomWarnError"
{{- end }}
{{- if $value.preprocessing.configFile }}
- "--config=/etc/config/preprocessing-config.yaml"
volumeMounts:
Expand Down
3 changes: 3 additions & 0 deletions kubernetes/loculus/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ instances:
image: ghcr.io/loculus-project/preprocessing-dummy
args:
- "--watch"
warnings: true
errors: true
randomWarnError: true
referenceGenomes:
nucleotideSequences:
- name: "main"
Expand Down
8 changes: 6 additions & 2 deletions preprocessing/dummy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
parser.add_argument("--watch", action="store_true", help="Watch and keep running. Fetches new data every 10 seconds.")
parser.add_argument("--withErrors", action="store_true", help="Add errors to processed data.")
parser.add_argument("--withWarnings", action="store_true", help="Add warnings to processed data.")
parser.add_argument("--randomWarnError", action="store_true", help="Make errors and warnings occur stochastically")
parser.add_argument("--maxSequences", type=int, help="Max number of sequence entry versions to process.")
parser.add_argument("--keycloak-host", type=str, default="http://172.0.0.1:8083", help="Host address of Keycloak")
parser.add_argument("--keycloak-user", type=str, default="dummy_preprocessing_pipeline",
Expand All @@ -27,6 +28,7 @@
watch_mode = args.watch
addErrors = args.withErrors
addWarnings = args.withWarnings
randomWarnError = args.randomWarnError
keycloakHost = args.keycloak_host
keycloakUser = args.keycloak_user
keycloakPassword = args.keycloak_password
Expand Down Expand Up @@ -90,7 +92,8 @@ def process(unprocessed: List[Sequence]) -> List[Sequence]:
{"metadata": metadata, **mock_sequences},
)

if addErrors:
disable_randomly = randomWarnError and random.choice([True, True, False])
if addErrors and not disable_randomly:
updated_sequence.errors = [
ProcessingAnnotation(
[AnnotationSource(list(metadata.keys())[0], "Metadata")],
Expand All @@ -107,7 +110,8 @@ def process(unprocessed: List[Sequence]) -> List[Sequence]:
),
]

if addWarnings:
disable_randomly = randomWarnError and random.choice([True, False])
if addWarnings and not disable_randomly:
updated_sequence.warnings = [
ProcessingAnnotation(
[AnnotationSource(list(metadata.keys())[0], "Metadata")],
Expand Down

0 comments on commit e53ead0

Please sign in to comment.