Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure dummy pipeline to emit warnings/errors #984

Merged
merged 8 commits into from
Feb 12, 2024
37 changes: 25 additions & 12 deletions deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import subprocess
import time
from pathlib import Path
import json
import os
import yaml

Expand All @@ -28,6 +27,7 @@

parser = argparse.ArgumentParser(description='Manage k3d cluster and helm installations.')
subparsers = parser.add_subparsers(dest='subcommand', required=True, help='Subcommands')
parser.add_argument('--dry-run', action='store_true', help='Print commands instead of executing them')

cluster_parser = subparsers.add_parser('cluster', help='Start the k3d cluster')
cluster_parser.add_argument('--dev', action='store_true',
Expand All @@ -52,6 +52,16 @@

args = parser.parse_args()

def run_command(command: list[str], **kwargs):
if args.dry_run:
if isinstance(command, str):
print(command)
else:
print(" ".join(map(str,command)))
return subprocess.CompletedProcess(args=command, returncode=0, stdout="", stderr="")
else:
return subprocess.run(command, **kwargs)


def main():
if args.subcommand == 'cluster':
Expand All @@ -70,13 +80,13 @@ def handle_cluster():
remove_port(BACKEND_PORT_MAPPING)
if args.delete:
print(f"Deleting cluster '{CLUSTER_NAME}'.")
subprocess.run(['k3d', 'cluster', 'delete', CLUSTER_NAME])
run_command(['k3d', 'cluster', 'delete', CLUSTER_NAME])
return

if cluster_exists(CLUSTER_NAME):
print(f"Cluster '{CLUSTER_NAME}' already exists.")
else:
subprocess.run(f"k3d cluster create {CLUSTER_NAME} {' '.join(PORTS)} --agents 2",
run_command(f"k3d cluster create {CLUSTER_NAME} {' '.join(PORTS)} --agents 2",
shell=True, check=True)

while not is_traefik_running():
Expand All @@ -87,13 +97,13 @@ def handle_cluster():

def is_traefik_running(namespace='kube-system', label='app.kubernetes.io/name=traefik'):
try:
result = subprocess.run(['kubectl', 'get', 'pods', '-n', namespace, '-l', label],
result = run_command(['kubectl', 'get', 'pods', '-n', namespace, '-l', label],
capture_output=True, text=True)
if result.returncode != 0:
print(f"Error executing kubectl: {result.stderr}")
return False

if 'Running' in result.stdout:
if 'Running' in result.stdout or args.dry_run:
return True
except subprocess.SubprocessError as e:
print(f"Error checking Traefik status: {e}")
Expand All @@ -106,13 +116,13 @@ def remove_port(port_mapping):


def cluster_exists(cluster_name):
result = subprocess.run(['k3d', 'cluster', 'list'], capture_output=True, text=True)
result = run_command(['k3d', 'cluster', 'list'], capture_output=True, text=True)
return cluster_name in result.stdout


def handle_helm():
if args.uninstall:
subprocess.run(['helm', 'uninstall', HELM_RELEASE_NAME], check=True)
run_command(['helm', 'uninstall', HELM_RELEASE_NAME], check=True)

return

Expand Down Expand Up @@ -141,22 +151,22 @@ def handle_helm():
if get_codespace_name():
parameters += ['--set', "codespaceName="+get_codespace_name()]

subprocess.run(parameters, check=True)
run_command(parameters, check=True)


def get_docker_config_json():
if args.dockerconfigjson:
return args.dockerconfigjson
else:
command = subprocess.run('base64 ~/.docker/config.json', capture_output=True, text=True, shell=True)
command = run_command('base64 ~/.docker/config.json', capture_output=True, text=True, shell=True)
return command.stdout.replace('\n', '')


def handle_helm_upgrade():
parameters = [
'helm', 'upgrade', HELM_RELEASE_NAME, HELM_CHART_DIR,
]
subprocess.run(parameters, check=True)
run_command(parameters, check=True)

def get_codespace_name():
return os.environ.get('CODESPACE_NAME', None)
Expand All @@ -178,7 +188,7 @@ def generate_configs():
runtime_config_path = TEMP_DIR / 'runtime_config.json'
generate_config(helm_chart, 'templates/loculus-website-config.yaml', runtime_config_path, codespace_name)

subprocess.run(['python', 'kubernetes/config-processor/config-processor.py', TEMP_DIR, output_dir], check=True)
run_command(['python', 'kubernetes/config-processor/config-processor.py', TEMP_DIR, output_dir], check=True)

def generate_config(helm_chart, template, output_path, codespace_name=None):
helm_template_cmd = ['helm', 'template', 'name-does-not-matter', helm_chart, '--show-only', template]
Expand All @@ -189,7 +199,10 @@ def generate_config(helm_chart, template, output_path, codespace_name=None):
helm_template_cmd.extend(['--set', 'environment=local'])
helm_template_cmd.extend(['--set', 'disableWebsite=true'])
helm_template_cmd.extend(['--set', 'disableBackend=true'])
helm_output = subprocess.run(helm_template_cmd, capture_output=True, text=True, check=True).stdout
helm_output = run_command(helm_template_cmd, capture_output=True, text=True, check=True).stdout
if args.dry_run:
return

parsed_yaml = yaml.safe_load(helm_output)
config_data = parsed_yaml['data'][output_path.name]

Expand Down
18 changes: 11 additions & 7 deletions kubernetes/loculus/templates/loculus-preprocessing-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
"http://host.k3d.internal:8079"
"http://loculus-backend-service:8079"
}}
{{- $keycloakHost := printf "authentication-%s" .Values.host }}

{{- $keycloakHost := printf "https://authentication-%s" $.Values.host }}
{{- if not .Values.disablePreprocessing }}
{{- range $key, $value := .Values.instances }}
---
Expand Down Expand Up @@ -35,11 +34,16 @@ spec:
- "{{ $arg }}"
{{- end }}
- "--backend-host={{ $backendHost }}/{{ $key }}"
{{- if eq $.Values.environment "server" }}
- "--keycloak-host=https://{{ $keycloakHost}}"
{{- else }}
- "--keycloak-host=http://loculus-keycloak-service:8083"
{{- end -}}
- "--keycloak-host={{ $keycloakHost }}"
{{- if $value.preprocessing.warnings }}
- "--withWarnings"
{{- end }}
{{- if $value.preprocessing.errors }}
- "--withErrors"
{{- end }}
{{- if $value.preprocessing.randomWarnError }}
- "--randomWarnError"
{{- end }}
{{- if $value.preprocessing.configFile }}
- "--config=/etc/config/preprocessing-config.yaml"
volumeMounts:
Expand Down
3 changes: 3 additions & 0 deletions kubernetes/loculus/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ instances:
image: ghcr.io/loculus-project/preprocessing-dummy
args:
- "--watch"
warnings: true
errors: true
randomWarnError: true
referenceGenomes:
nucleotideSequences:
- name: "main"
Expand Down
8 changes: 6 additions & 2 deletions preprocessing/dummy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
parser.add_argument("--watch", action="store_true", help="Watch and keep running. Fetches new data every 10 seconds.")
parser.add_argument("--withErrors", action="store_true", help="Add errors to processed data.")
parser.add_argument("--withWarnings", action="store_true", help="Add warnings to processed data.")
parser.add_argument("--randomWarnError", action="store_true", help="Make errors and warnings occur stochastically")
parser.add_argument("--maxSequences", type=int, help="Max number of sequence entry versions to process.")
parser.add_argument("--keycloak-host", type=str, default="http://172.0.0.1:8083", help="Host address of Keycloak")
parser.add_argument("--keycloak-user", type=str, default="dummy_preprocessing_pipeline",
Expand All @@ -27,6 +28,7 @@
watch_mode = args.watch
addErrors = args.withErrors
addWarnings = args.withWarnings
randomWarnError = args.randomWarnError
keycloakHost = args.keycloak_host
keycloakUser = args.keycloak_user
keycloakPassword = args.keycloak_password
Expand Down Expand Up @@ -90,7 +92,8 @@ def process(unprocessed: List[Sequence]) -> List[Sequence]:
{"metadata": metadata, **mock_sequences},
)

if addErrors:
disable_randomly = randomWarnError and random.choice([True, True, False])
if addErrors and not disable_randomly:
updated_sequence.errors = [
ProcessingAnnotation(
[AnnotationSource(list(metadata.keys())[0], "Metadata")],
Expand All @@ -107,7 +110,8 @@ def process(unprocessed: List[Sequence]) -> List[Sequence]:
),
]

if addWarnings:
disable_randomly = randomWarnError and random.choice([True, False])
if addWarnings and not disable_randomly:
updated_sequence.warnings = [
ProcessingAnnotation(
[AnnotationSource(list(metadata.keys())[0], "Metadata")],
Expand Down
Loading