From a8870e218cc8e6bfeecf44e170a14cb65e3c2b45 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Fri, 17 Nov 2023 14:08:47 +0100 Subject: [PATCH 01/29] correct airflow dask cluster address setting --- docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index 94102244..2f0f1dc0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -30,7 +30,7 @@ services: - AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=False - AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.basic_auth - AIRFLOW__API__AUTH_BACKEND=airflow.api.auth.backend.basic_auth - - AIRFLOW__DASK_CLUSTER_ADDRESS=tcp://dask-scheduler-airflow:8786 + - AIRFLOW__DASK__CLUSTER_ADDRESS=tcp://dask-scheduler-airflow:8786 - 'AIRFLOW_CONN_LINUX_NETWORK_STACK_BREEDER_SSH={ "conn_type": "ssh", "login": "godon_robot", "host": "10.0.5.53", "port": 22, "extra": { "key_file": "/opt/airflow/credentials/id_rsa" } }' - ARCHIVE_DB_USER=yugabyte - ARCHIVE_DB_PASSWORD=yugabyte From a46dcc965a80269ac44f0ea222b247f006a1d99c Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Fri, 17 Nov 2023 14:46:07 +0100 Subject: [PATCH 02/29] api - repair response msg templating --- api/controller.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/controller.py b/api/controller.py index de1529c1..fa0b56d4 100644 --- a/api/controller.py +++ b/api/controller.py @@ -125,7 +125,7 @@ def breeders_name_delete(breeder_name): # noqa: E501 breeder_name=breeder_name) archive.archive_db.execute(db_info=db_config, query=__query) - return Response(json.dumps(dict(message="Purged Breeder named {breeder_name}")), + return Response(json.dumps(dict(message=f"Purged Breeder named {breeder_name}")), status=200, mimetype='application/json') @@ -258,7 +258,7 @@ def create_breeder(api_client, content): #api_response['breeder'] = create_breeder(api_client, content).to_dict() create_breeder(api_client, content) - return Response(json.dumps(dict(message="Created Breeder named {breeder_id}")), + return Response(json.dumps(dict(message=f"Created Breeder named {breeder_id}")), status=200, mimetype='application/json') From 53badf456df593121968168ace47a54d23cf0661 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Fri, 17 Nov 2023 14:50:02 +0100 Subject: [PATCH 03/29] api - create pass full request config to renderer --- api/controller.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api/controller.py b/api/controller.py index fa0b56d4..66ca3ed3 100644 --- a/api/controller.py +++ b/api/controller.py @@ -190,6 +190,7 @@ def breeders_post(content): # noqa: E501 def create_breeder(api_client, content): api_instance = dag_run_api.DAGRunApi(api_client) + breeder_config_full = content breeder_config = dict(content.get('breeder')) breeder_id = breeder_config.get('name') @@ -197,7 +198,7 @@ def create_breeder(api_client, content): environment = Environment(loader=FileSystemLoader(DAG_TEMPLATES_DIR)) template = environment.get_template("root_dag.py") filename = f"{DAG_DIR}/root_dag.py" - rendered_dag = template.render(breeder_config) + rendered_dag = template.render(breeder_config_full) with open(filename, mode="w", encoding="utf-8") as dag_file: dag_file.write(rendered_dag) From b2d5555b71834fffa49ec747b315743e382aff44 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Fri, 17 Nov 2023 15:00:15 +0100 Subject: [PATCH 04/29] api - create correct variable scope --- api/controller.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/api/controller.py b/api/controller.py index 66ca3ed3..892e17b4 100644 --- a/api/controller.py +++ b/api/controller.py @@ -186,13 +186,12 @@ def breeders_post(content): # noqa: E501 """ - api_response = dict(connection=None, breeder=None) + + breeder_config_full = content + breeder_config = dict(content.get('breeder')) + breeder_id = breeder_config.get('name') def create_breeder(api_client, content): - api_instance = dag_run_api.DAGRunApi(api_client) - breeder_config_full = content - breeder_config = dict(content.get('breeder')) - breeder_id = breeder_config.get('name') # templating related environment = Environment(loader=FileSystemLoader(DAG_TEMPLATES_DIR)) From 840ebbadbff3c268a387c6aef4626851ffc9318e Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Fri, 17 Nov 2023 15:40:48 +0100 Subject: [PATCH 05/29] breeder - decentralize dependency imports Dependencies are not to be installed on the airflow container anymore, hence we have to resolve dependencies in the tasks that are run on the dask workers. --- breeder/linux_network_stack/effectuation.py | 21 +++++++++++++++++-- .../linux_network_stack/nats_coroutines.py | 3 +++ breeder/linux_network_stack/optimization.py | 13 +++++++++++- breeder/linux_network_stack/root_dag.py | 20 ------------------ 4 files changed, 34 insertions(+), 23 deletions(-) diff --git a/breeder/linux_network_stack/effectuation.py b/breeder/linux_network_stack/effectuation.py index e69e6eb6..1852e408 100644 --- a/breeder/linux_network_stack/effectuation.py +++ b/breeder/linux_network_stack/effectuation.py @@ -20,6 +20,8 @@ def create_target_interaction_dag(dag_id, config, target, identifier): @dag.task(task_id="pull_optimization_step") def run_pull_optimization(): + import asyncio + task_logger.debug("Entering") msg = asyncio.run(receive_msg_via_nats(subject=f'effectuation_{identifier}')) @@ -36,7 +38,11 @@ def run_pull_optimization(): def run_aquire_lock(): task_logger.debug("Entering") - dlm_lock = LOCKER.lock(target) + import pals + + locker = pals.Locker('network_breeder_effectuation', DLM_DB_CONNECTION) + + dlm_lock = locker.lock(target) if not dlm_lock.acquire(acquire_timeout=600): task_logger.debug("Could not aquire lock for {target}") @@ -61,6 +67,12 @@ def run_release_lock(): @dag.task(task_id="push_optimization_step") def run_push_optimization(ti=None): + + import asyncio + from sqlalchemy import create_engine + from sqlalchemy import text + + archive_db_engine = create_engine(f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}') task_logger.debug("Entering") metric_value = ti.xcom_pull(task_ids="recon_step") @@ -82,7 +94,7 @@ def run_push_optimization(ti=None): bindparam("setting_full", settings_full, type_=String), bindparam("setting_result", metric_data, type_=String)) - ARCHIVE_DB_ENGINE.execute(query) + archive_db_engine.execute(query) task_logger.debug("Done") @@ -92,6 +104,11 @@ def run_push_optimization(ti=None): @dag.task(task_id="recon_step") def run_reconnaissance(): + + from prometheus_api_client import PrometheusConnect, MetricsList, Metric + from prometheus_api_client.utils import parse_datetime + import urllib3 + task_logger.debug("Entering") prom_conn = PrometheusConnect(url=PROMETHEUS_URL, retry=urllib3.util.retry.Retry(total=3, raise_on_status=True, backoff_factor=0.5), diff --git a/breeder/linux_network_stack/nats_coroutines.py b/breeder/linux_network_stack/nats_coroutines.py index dc96a943..6070a8db 100644 --- a/breeder/linux_network_stack/nats_coroutines.py +++ b/breeder/linux_network_stack/nats_coroutines.py @@ -24,6 +24,9 @@ async def send_msg_via_nats(subject=None, data_dict=None): {% endraw %} async def receive_msg_via_nats(subject=None, timeout=300): + import nats + import time + import sys # Connect to NATS Server. nc = await nats.connect(NATS_SERVER_URL) sub = await nc.subscribe(f'{subject}') diff --git a/breeder/linux_network_stack/optimization.py b/breeder/linux_network_stack/optimization.py index cc8d0821..d9b35607 100644 --- a/breeder/linux_network_stack/optimization.py +++ b/breeder/linux_network_stack/optimization.py @@ -9,9 +9,15 @@ def objective(trial, identifier): ###--- end coroutines ---### import logging + from sqlalchemy import create_engine + from sqlalchemy import text + logger = logging.getLogger('objective') logger.setLevel(logging.DEBUG) + + archive_db_engine = create_engine(f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}') + logger.warning('entering') # Compiling settings for effectuation @@ -35,7 +41,7 @@ def objective(trial, identifier): query = query.bindparams(bindparam("table_name", breeder_table_name, type_=String), bindparam("setting_id", setting_id, type_=String)) - archive_db_data = ARCHIVE_DB_ENGINE.execute(query).fetchall() + archive_db_data = archive_db_engine.execute(query).fetchall() if archive_db_data: is_setting_explored = True @@ -79,6 +85,11 @@ def create_optimization_dag(dag_id, config, identifier): ## perform optimiziation run @dag.task(task_id="optimization_step") def run_optimization(): + import optuna + from optuna.storages import InMemoryStorage + from optuna.integration import DaskStorage + from distributed import Client, wait + __directions = list() for objective in config.get('objectvices'): diff --git a/breeder/linux_network_stack/root_dag.py b/breeder/linux_network_stack/root_dag.py index 3c2709e8..2d492056 100644 --- a/breeder/linux_network_stack/root_dag.py +++ b/breeder/linux_network_stack/root_dag.py @@ -27,25 +27,7 @@ from airflow.utils.dates import days_ago from airflow.decorators import task -import optuna -from optuna.storages import InMemoryStorage -from optuna.integration import DaskStorage -from distributed import Client, wait - -from sqlalchemy import create_engine -from sqlalchemy import text - -from prometheus_api_client import PrometheusConnect, MetricsList, Metric -from prometheus_api_client.utils import parse_datetime from datetime import timedelta -import asyncio - -import pals -import urllib3 - -import nats -import time -import sys import random import logging @@ -86,10 +68,8 @@ DASK_SERVER_ENDPOINT = "{DASK_ENDPOINT}" -ARCHIVE_DB_ENGINE = create_engine(f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}') DLM_DB_CONNECTION = 'postgresql://{DLM_DB_USER}:{DLM_DB_PASSWORD}@{DLM_DB_HOST}/{DLM_DB_DATABASE}' -LOCKER = pals.Locker('network_breeder_effectuation', DLM_DB_CONNECTION) ### From d2583afb31d6af22750423c6a0e636f657b724bb Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Fri, 17 Nov 2023 15:51:55 +0100 Subject: [PATCH 06/29] breeder - repair missing task steps --- breeder/linux_network_stack/effectuation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/breeder/linux_network_stack/effectuation.py b/breeder/linux_network_stack/effectuation.py index 1852e408..7956417a 100644 --- a/breeder/linux_network_stack/effectuation.py +++ b/breeder/linux_network_stack/effectuation.py @@ -49,7 +49,7 @@ def run_aquire_lock(): return dlm_lock - aquire_lock_step = run_aquire_lock + aquire_lock_step = run_aquire_lock() @dag.task(task_id="release_lock_step") @@ -62,7 +62,7 @@ def run_release_lock(): return dlm_lock - release_lock_step = run_release_lock + release_lock_step = run_release_lock() @dag.task(task_id="push_optimization_step") From c613d330f96bb856297fbb8ea53a22497bab1ad9 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Fri, 17 Nov 2023 16:07:08 +0100 Subject: [PATCH 07/29] breeder - drop debug config print task --- breeder/linux_network_stack/effectuation.py | 9 +-------- breeder/linux_network_stack/optimization.py | 9 ++++----- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/breeder/linux_network_stack/effectuation.py b/breeder/linux_network_stack/effectuation.py index 7956417a..a1014117 100644 --- a/breeder/linux_network_stack/effectuation.py +++ b/breeder/linux_network_stack/effectuation.py @@ -11,13 +11,6 @@ def create_target_interaction_dag(dag_id, config, target, identifier): with dag as interaction_dag: - dump_config = BashOperator( - task_id='print_config', - bash_command='echo ${config}', - env={"config": str(config)}, - dag=interaction_dag, - ) - @dag.task(task_id="pull_optimization_step") def run_pull_optimization(): import asyncio @@ -193,7 +186,7 @@ def is_stop_criteria_reached(iteration): stop_step = EmptyOperator(task_id="stop_task", dag=interaction_dag) - dump_config >> pull_step >> aquire_lock_step >> effectuation_step >> recon_step >> release_lock_step >> push_step >> run_iter_count_step >> stopping_conditional_step >> [continue_step, stop_step] + pull_step >> aquire_lock_step >> effectuation_step >> recon_step >> release_lock_step >> push_step >> run_iter_count_step >> stopping_conditional_step >> [continue_step, stop_step] return dag diff --git a/breeder/linux_network_stack/optimization.py b/breeder/linux_network_stack/optimization.py index d9b35607..2a207b86 100644 --- a/breeder/linux_network_stack/optimization.py +++ b/breeder/linux_network_stack/optimization.py @@ -75,10 +75,9 @@ def create_optimization_dag(dag_id, config, identifier): with dag as optimization_dag: - dump_config = BashOperator( - task_id='print_config', - bash_command='echo ${config}', - env={"config": str(config)}, + noop = BashOperator( + task_id='noop', + bash_command='echo "noop"', dag=optimization_dag, ) @@ -110,6 +109,6 @@ def run_optimization(): optimization_step = run_optimization() - dump_config >> optimization_step + noop >> optimization_step return dag From 179f5cb1dba4791a37b42e0a6363f355bdb15293 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Fri, 17 Nov 2023 20:12:17 +0100 Subject: [PATCH 08/29] prepare dask airflow workers Those need access to the dags and have to have airflow preinstalled. --- docker-compose.yml | 2 ++ requirements.txt | 1 + 2 files changed, 3 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index 2f0f1dc0..de254685 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -144,6 +144,8 @@ services: command: ["dask-worker", "tcp://dask-scheduler-airflow:8786"] deploy: replicas: 2 + volumes: + - ./breeder/dags:/opt/airflow/dags/ # for optuna parallel metaheuristics execution on dask dask_scheduler: build: diff --git a/requirements.txt b/requirements.txt index 1532a4af..a066f027 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +apache-airflow == 2.4.3 optuna == 3.1.0b0 joblib == 1.2.0 dask == 2022.12.1 From fe639f4630c30de2b0cb4007241fd55287a7facc Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Fri, 17 Nov 2023 21:31:10 +0100 Subject: [PATCH 09/29] bump airflow and use airflow for dask base --- Dockerfile-dask | 2 +- docker-compose.yml | 2 +- requirements.txt | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Dockerfile-dask b/Dockerfile-dask index 049afd5d..7070b6b9 100644 --- a/Dockerfile-dask +++ b/Dockerfile-dask @@ -1,3 +1,3 @@ -FROM daskdev/dask:2022.12.1-py3.9 +FROM apache/airflow:2.7.3-python3.9 COPY requirements.txt / RUN pip install --no-cache-dir -r /requirements.txt diff --git a/docker-compose.yml b/docker-compose.yml index de254685..a116a411 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,7 +19,7 @@ version: '3.4' services: control_loop: - image: apache/airflow:2.4.3-python3.9 + image: apache/airflow:2.7.3-python3.9 restart: always environment: - AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true diff --git a/requirements.txt b/requirements.txt index a066f027..1532a4af 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ -apache-airflow == 2.4.3 optuna == 3.1.0b0 joblib == 1.2.0 dask == 2022.12.1 From 2ef0c9ba6454c9125d7ced80a0364bc7010c3c10 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Fri, 17 Nov 2023 21:36:17 +0100 Subject: [PATCH 10/29] downgrade urllib To be compatible with preinstalled components. --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 1532a4af..1be83440 100755 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,4 @@ nats-py == 2.2.0 SQLAlchemy == 1.4.50 psycopg2-binary == 2.9.7 PALs == 0.3.5 -urllib3 == 2.0.4 +urllib3 == 1.25.4 From b1292d268607ccea927c660a32b8b8988f25e5ca Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Fri, 17 Nov 2023 21:40:24 +0100 Subject: [PATCH 11/29] drop preinstalled components from requirements Otherwise leads to dependency version conflict. --- requirements.txt | 4 ---- 1 file changed, 4 deletions(-) diff --git a/requirements.txt b/requirements.txt index 1be83440..5e36052f 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,6 @@ optuna == 3.1.0b0 joblib == 1.2.0 -dask == 2022.12.1 -distributed == 2022.12.1 prometheus-api-client == 0.5.2 nats-py == 2.2.0 SQLAlchemy == 1.4.50 -psycopg2-binary == 2.9.7 PALs == 0.3.5 -urllib3 == 1.25.4 From 2b4d08b8fdcad7a27d307b7a52fcbfafe6b18388 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sat, 18 Nov 2023 11:13:29 +0100 Subject: [PATCH 12/29] breeders - use hashlib sha256 as determinstic id Builtin hash of python not deterministic because of default random seed. --- api/controller.py | 3 ++- breeder/linux_network_stack/effectuation.py | 2 +- breeder/linux_network_stack/optimization.py | 2 +- breeder/linux_network_stack/root_dag.py | 5 +++-- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/api/controller.py b/api/controller.py index 892e17b4..3912595b 100644 --- a/api/controller.py +++ b/api/controller.py @@ -22,6 +22,7 @@ import os import time import datetime +import hashlib from pprint import pprint from dateutil.parser import parse as dateutil_parser @@ -221,7 +222,7 @@ def create_breeder(api_client, content): archive.archive_db.execute(db_info=db_config, query=__query) for target in targets: - identifier = str(abs(hash(target.get('address'))))[0:6] + identifier = hashlib.sha256(str.encode(target.get('address'))).hexdigest()[0:6] for run_id in range(0, parallel_runs): dag_id = f'{dag_name}_{run_id}_{identifier}' diff --git a/breeder/linux_network_stack/effectuation.py b/breeder/linux_network_stack/effectuation.py index a1014117..a8612b48 100644 --- a/breeder/linux_network_stack/effectuation.py +++ b/breeder/linux_network_stack/effectuation.py @@ -71,7 +71,7 @@ def run_push_optimization(ti=None): metric_value = ti.xcom_pull(task_ids="recon_step") settings_full = ti.xcom_pull(task_ids="pull_optimization_step") - setting_id = str(abs(hash(settings_full))) + setting_id = hashlib.sha256(str.encode(settings_full)).hexdigest()[0:6] task_logger.debug(f"Metric : f{metric_value}") diff --git a/breeder/linux_network_stack/optimization.py b/breeder/linux_network_stack/optimization.py index 2a207b86..b90b8d3f 100644 --- a/breeder/linux_network_stack/optimization.py +++ b/breeder/linux_network_stack/optimization.py @@ -33,7 +33,7 @@ def objective(trial, identifier): settings = '\n'.join(settings) is_setting_explored = False - setting_id = str(abs(hash(settings))) + setting_id = hashlib.sha256(str.encode(settings_full)).hexdigest()[0:6] breeder_table_name = f"from_breeder_name" # TBD global knowledge db table nam query = text("SELECT * FROM :table_name WHERE :table_name.setting_id == :setting_id") diff --git a/breeder/linux_network_stack/root_dag.py b/breeder/linux_network_stack/root_dag.py index 2d492056..4156a84e 100644 --- a/breeder/linux_network_stack/root_dag.py +++ b/breeder/linux_network_stack/root_dag.py @@ -33,7 +33,7 @@ import logging import json import copy - +import hashlib task_logger = logging.getLogger("airflow.task") task_logger.setLevel(logging.DEBUG) @@ -111,7 +111,8 @@ def determine_config_shard(run_id=None, for target in targets: - identifier = str(abs(hash(target.get('address'))))[0:6] + identifier = hashlib.sha256(str.encode(target.get('address'))).hexdigest()[0:6] + for run_id in range(0, parallel_runs): dag_id = f'{dag_name}_{run_id}' if not is_cooperative: From 8c21dadbfae2c8e24e01799fb9c3a49a697908d4 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sat, 18 Nov 2023 11:23:14 +0100 Subject: [PATCH 13/29] godon - adapt airflow dask workers invocation Respecting that at the basis of the images are airflow images with dask-framework atop. Was necessary to get the airflow scheduler to dask worker interaction functioning. --- docker-compose.yml | 62 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 4 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index a116a411..5b7071bc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -136,12 +136,39 @@ services: ports: - 127.0.0.10:8786:8786 - 127.0.0.10:8787:8787 - command: ["dask-scheduler"] + entrypoint: bash -c "dask-scheduler" dask_worker_airflow: + environment: + - AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true + - AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=True + - AIRFLOW__CORE__EXECUTOR=DaskExecutor + - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@db/airflow + - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@db/airflow + - AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=False + - AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.basic_auth + - AIRFLOW__API__AUTH_BACKEND=airflow.api.auth.backend.basic_auth + - AIRFLOW__DASK__CLUSTER_ADDRESS=tcp://dask-scheduler-airflow:8786 + - 'AIRFLOW_CONN_LINUX_NETWORK_STACK_BREEDER_SSH={ "conn_type": "ssh", "login": "godon_robot", "host": "10.0.5.53", "port": 22, "extra": { "key_file": "/opt/airflow/credentials/id_rsa" } }' + - ARCHIVE_DB_USER=yugabyte + - ARCHIVE_DB_PASSWORD=yugabyte + - ARCHIVE_DB_HOST=archive-db + - ARCHIVE_DB_PORT=5433 + - ARCHIVE_DB_DATABASE=archive_db + - META_DB_USER=meta_data + - META_DB_PASSWORD=meta_data + - META_DB_HOSTNAME=meta-data-db + - META_DB_PORT=5432 + - DLM_DB_USER= + - DLM_DB_PASSWORD= + - DLM_DB_HOST=locks_db + - DLM_DB_DATABASE=distributed_locking + - DASK_ENDPOINT=dask_scheduler:8786 + - NATS_SERVER_URL="nats://godon_nats_1:4222" + - PROMETHEUS_URL="http://prometheus:9090" build: context: ./ dockerfile: ./Dockerfile-dask - command: ["dask-worker", "tcp://dask-scheduler-airflow:8786"] + entrypoint: bash -c "dask-worker tcp://dask-scheduler-airflow:8786" deploy: replicas: 2 volumes: @@ -155,12 +182,39 @@ services: ports: - 127.0.0.11:8786:8786 - 127.0.0.11:8787:8787 - command: ["dask-scheduler"] + entrypoint: bash -c "dask-scheduler" dask_worker: + environment: + - AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true + - AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=True + - AIRFLOW__CORE__EXECUTOR=DaskExecutor + - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@db/airflow + - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@db/airflow + - AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=False + - AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.basic_auth + - AIRFLOW__API__AUTH_BACKEND=airflow.api.auth.backend.basic_auth + - AIRFLOW__DASK__CLUSTER_ADDRESS=tcp://dask-scheduler-airflow:8786 + - 'AIRFLOW_CONN_LINUX_NETWORK_STACK_BREEDER_SSH={ "conn_type": "ssh", "login": "godon_robot", "host": "10.0.5.53", "port": 22, "extra": { "key_file": "/opt/airflow/credentials/id_rsa" } }' + - ARCHIVE_DB_USER=yugabyte + - ARCHIVE_DB_PASSWORD=yugabyte + - ARCHIVE_DB_HOST=archive-db + - ARCHIVE_DB_PORT=5433 + - ARCHIVE_DB_DATABASE=archive_db + - META_DB_USER=meta_data + - META_DB_PASSWORD=meta_data + - META_DB_HOSTNAME=meta-data-db + - META_DB_PORT=5432 + - DLM_DB_USER= + - DLM_DB_PASSWORD= + - DLM_DB_HOST=locks_db + - DLM_DB_DATABASE=distributed_locking + - DASK_ENDPOINT=dask_scheduler:8786 + - NATS_SERVER_URL="nats://godon_nats_1:4222" + - PROMETHEUS_URL="http://prometheus:9090" build: context: ./ dockerfile: ./Dockerfile-dask - command: ["dask-worker", "tcp://dask_scheduler:8786"] + entrypoint: bash -c "dask-worker tcp://dask_scheduler:8786" deploy: replicas: 2 prometheus: From a2f0b1510239d7e2e6aed2a8263c8519cc9edee9 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sat, 18 Nov 2023 12:50:52 +0100 Subject: [PATCH 14/29] airflow ssh operator update to 2.7 interface Parameters changed slightly. --- breeder/linux_network_stack/effectuation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/breeder/linux_network_stack/effectuation.py b/breeder/linux_network_stack/effectuation.py index a8612b48..5b61c136 100644 --- a/breeder/linux_network_stack/effectuation.py +++ b/breeder/linux_network_stack/effectuation.py @@ -136,7 +136,7 @@ def run_reconnaissance(): remote_host=target.get('address'), username=target.get('user'), key_file=target.get('key_file'), - timeout=30, + conn_timeout=30, keepalive_interval=10 ) @@ -144,7 +144,7 @@ def run_reconnaissance(): effectuation_step = SSHOperator( ssh_hook=_ssh_hook, task_id='effectuation', - timeout=30, + conn_timeout=30, command=""" {{ ti.xcom_pull(task_ids='pull_optimization_step') }} """, From 1d26e4bd8a6c6fd46fe344d0ba223bc1ce77cd7d Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sat, 18 Nov 2023 12:52:17 +0100 Subject: [PATCH 15/29] breeder - correct objectives deref from config --- breeder/linux_network_stack/optimization.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/breeder/linux_network_stack/optimization.py b/breeder/linux_network_stack/optimization.py index b90b8d3f..24fbfece 100644 --- a/breeder/linux_network_stack/optimization.py +++ b/breeder/linux_network_stack/optimization.py @@ -91,7 +91,7 @@ def run_optimization(): __directions = list() - for objective in config.get('objectvices'): + for objective in config.get('objectives'): direction = objective.get('direction') __directions.append(direction) From 84cf954a9616a5b0d936fb257934685473a0aefe Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sat, 18 Nov 2023 12:54:30 +0100 Subject: [PATCH 16/29] godon airflow share logs Let scheduler/controller and workers share logs, otherwise logs cannot be looked up via logging ui. --- docker-compose.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index 5b7071bc..567a8dcd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -54,6 +54,7 @@ services: volumes: - ./testing/infra/credentials/ssh/:/opt/airflow/credentials/ - ./breeder/dags:/opt/airflow/dags/ + - airflow-logs-volume:/opt/airflow/logs/ ports: - 127.0.0.1:8080:8080 meta_data_db: @@ -173,6 +174,7 @@ services: replicas: 2 volumes: - ./breeder/dags:/opt/airflow/dags/ + - airflow-logs-volume:/opt/airflow/logs/ # for optuna parallel metaheuristics execution on dask dask_scheduler: build: @@ -244,3 +246,4 @@ volumes: postgres-db-volume: postgres-locking-db-volume: postgres-meta-data-db-volume: + airflow-logs-volume: From 78511c6285c27fa2f79fee3dc0e35a7694cacf05 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sat, 18 Nov 2023 13:16:24 +0100 Subject: [PATCH 17/29] correct dask scheduler endpoint for optuna dask --- docker-compose.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 567a8dcd..c07802ff 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -45,7 +45,7 @@ services: - DLM_DB_PASSWORD= - DLM_DB_HOST=locks_db - DLM_DB_DATABASE=distributed_locking - - DASK_ENDPOINT=dask_scheduler:8786 + - DASK_ENDPOINT="dask-scheduler:8786" - NATS_SERVER_URL="nats://godon_nats_1:4222" - PROMETHEUS_URL="http://prometheus:9090" # minimum of setup steps @@ -163,7 +163,7 @@ services: - DLM_DB_PASSWORD= - DLM_DB_HOST=locks_db - DLM_DB_DATABASE=distributed_locking - - DASK_ENDPOINT=dask_scheduler:8786 + - DASK_ENDPOINT="dask-scheduler:8786" - NATS_SERVER_URL="nats://godon_nats_1:4222" - PROMETHEUS_URL="http://prometheus:9090" build: @@ -210,7 +210,7 @@ services: - DLM_DB_PASSWORD= - DLM_DB_HOST=locks_db - DLM_DB_DATABASE=distributed_locking - - DASK_ENDPOINT=dask_scheduler:8786 + - DASK_ENDPOINT="dask-scheduler:8786" - NATS_SERVER_URL="nats://godon_nats_1:4222" - PROMETHEUS_URL="http://prometheus:9090" build: From 27b00a6e380e9fdc5f995a82dafe85131458377b Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sat, 18 Nov 2023 13:32:52 +0100 Subject: [PATCH 18/29] breedre repair env variables passing --- breeder/linux_network_stack/root_dag.py | 13 ++++++++----- docker-compose.yml | 18 +++++++++--------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/breeder/linux_network_stack/root_dag.py b/breeder/linux_network_stack/root_dag.py index 4156a84e..47895852 100644 --- a/breeder/linux_network_stack/root_dag.py +++ b/breeder/linux_network_stack/root_dag.py @@ -62,14 +62,17 @@ # 'sla_miss_callback': yet_another_function, } -NATS_SERVER_URL = "{NATS_SERVER_URL}" +NATS_SERVER_URL = os.environ.get("NATS_SERVER_URL") -PROMETHEUS_URL = "{PROMETHEUS_URL}" +PROMETHEUS_URL = os.environ.get("PROMETHEUS_URL") -DASK_SERVER_ENDPOINT = "{DASK_ENDPOINT}" +DASK_SERVER_ENDPOINT = os.environ.get("DASK_ENDPOINT") - -DLM_DB_CONNECTION = 'postgresql://{DLM_DB_USER}:{DLM_DB_PASSWORD}@{DLM_DB_HOST}/{DLM_DB_DATABASE}' +DLM_DB_USER = os.environ.get("DLM_DB_USER") +DLM_DB_PASSWORD = os.environ.get("DLM_DB_PASSWORD") +DLM_DB_HOST = os.environ.get("DLM_DB_HOST") +DLM_DB_DATABASE = os.environ.get("DLM_DB_DATABASE") +DLM_DB_CONNECTION = f"postgresql://{DLM_DB_USER}:{DLM_DB_PASSWORD}@{DLM_DB_HOST}/{DLM_DB_DATABASE}" ### diff --git a/docker-compose.yml b/docker-compose.yml index c07802ff..caaa7601 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -45,9 +45,9 @@ services: - DLM_DB_PASSWORD= - DLM_DB_HOST=locks_db - DLM_DB_DATABASE=distributed_locking - - DASK_ENDPOINT="dask-scheduler:8786" - - NATS_SERVER_URL="nats://godon_nats_1:4222" - - PROMETHEUS_URL="http://prometheus:9090" + - DASK_ENDPOINT=dask-scheduler:8786 + - NATS_SERVER_URL=nats://godon_nats_1:4222 + - PROMETHEUS_URL=http://prometheus:9090 # minimum of setup steps # on sequencial executor and local sqlite entrypoint: bash -c "airflow db init; airflow users create -u airflow -p airflow -r Admin -f airflow -l airflow -e airflow; (airflow scheduler &); airflow webserver" @@ -163,9 +163,9 @@ services: - DLM_DB_PASSWORD= - DLM_DB_HOST=locks_db - DLM_DB_DATABASE=distributed_locking - - DASK_ENDPOINT="dask-scheduler:8786" - - NATS_SERVER_URL="nats://godon_nats_1:4222" - - PROMETHEUS_URL="http://prometheus:9090" + - DASK_ENDPOINT=dask-scheduler:8786 + - NATS_SERVER_URL=nats://godon_nats_1:4222 + - PROMETHEUS_URL=http://prometheus:9090 build: context: ./ dockerfile: ./Dockerfile-dask @@ -210,9 +210,9 @@ services: - DLM_DB_PASSWORD= - DLM_DB_HOST=locks_db - DLM_DB_DATABASE=distributed_locking - - DASK_ENDPOINT="dask-scheduler:8786" - - NATS_SERVER_URL="nats://godon_nats_1:4222" - - PROMETHEUS_URL="http://prometheus:9090" + - DASK_ENDPOINT=dask-scheduler:8786 + - NATS_SERVER_URL=nats://godon_nats_1:4222 + - PROMETHEUS_URL=http://prometheus:9090 build: context: ./ dockerfile: ./Dockerfile-dask From ba776f4391bf0270d28de36f60ce171221440bb4 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sat, 18 Nov 2023 13:42:38 +0100 Subject: [PATCH 19/29] correct root breeder dependencies --- breeder/linux_network_stack/root_dag.py | 1 + 1 file changed, 1 insertion(+) diff --git a/breeder/linux_network_stack/root_dag.py b/breeder/linux_network_stack/root_dag.py index 47895852..582546cd 100644 --- a/breeder/linux_network_stack/root_dag.py +++ b/breeder/linux_network_stack/root_dag.py @@ -34,6 +34,7 @@ import json import copy import hashlib +import os task_logger = logging.getLogger("airflow.task") task_logger.setLevel(logging.DEBUG) From d4c1d887089cf4ae16fcba4803774ec5ce949bf1 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sat, 18 Nov 2023 17:44:13 +0100 Subject: [PATCH 20/29] breeder optimization drop best trial dump Irrelevant but glitching. --- breeder/linux_network_stack/optimization.py | 1 - 1 file changed, 1 deletion(-) diff --git a/breeder/linux_network_stack/optimization.py b/breeder/linux_network_stack/optimization.py index 24fbfece..eccdf96c 100644 --- a/breeder/linux_network_stack/optimization.py +++ b/breeder/linux_network_stack/optimization.py @@ -105,7 +105,6 @@ def run_optimization(): client.submit(study.optimize, objective_wrapped, n_trials=10, pure=False) ] wait(futures, timeout=7200) - print(f"Best params: {study.best_params}") optimization_step = run_optimization() From ccf0f23c2db5551575f63ec62c0c5f118385f359 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sat, 18 Nov 2023 19:22:43 +0100 Subject: [PATCH 21/29] bump optuna version --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 5e36052f..0285f135 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -optuna == 3.1.0b0 +optuna == 3.4.0 joblib == 1.2.0 prometheus-api-client == 0.5.2 nats-py == 2.2.0 From 398e9c1c12b49a1dd5dbfd4d1b82156f53a98315 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sat, 18 Nov 2023 23:34:15 +0100 Subject: [PATCH 22/29] breeder avoid collision of objective name --- breeder/linux_network_stack/optimization.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/breeder/linux_network_stack/optimization.py b/breeder/linux_network_stack/optimization.py index eccdf96c..61b8813a 100644 --- a/breeder/linux_network_stack/optimization.py +++ b/breeder/linux_network_stack/optimization.py @@ -91,8 +91,8 @@ def run_optimization(): __directions = list() - for objective in config.get('objectives'): - direction = objective.get('direction') + for __objective in config.get('objectives'): + direction = __objective.get('direction') __directions.append(direction) with Client(address=DASK_SERVER_ENDPOINT) as client: From d8dcdfdba8f291f5ce2397af5fe653d03886a328 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sun, 19 Nov 2023 11:17:05 +0100 Subject: [PATCH 23/29] breeders - centralize dag task dependencies again --- breeder/linux_network_stack/effectuation.py | 10 ---------- breeder/linux_network_stack/optimization.py | 7 ------- breeder/linux_network_stack/root_dag.py | 19 +++++++++++++++++++ 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/breeder/linux_network_stack/effectuation.py b/breeder/linux_network_stack/effectuation.py index 5b61c136..c1ee489d 100644 --- a/breeder/linux_network_stack/effectuation.py +++ b/breeder/linux_network_stack/effectuation.py @@ -13,7 +13,6 @@ def create_target_interaction_dag(dag_id, config, target, identifier): @dag.task(task_id="pull_optimization_step") def run_pull_optimization(): - import asyncio task_logger.debug("Entering") @@ -31,7 +30,6 @@ def run_pull_optimization(): def run_aquire_lock(): task_logger.debug("Entering") - import pals locker = pals.Locker('network_breeder_effectuation', DLM_DB_CONNECTION) @@ -61,10 +59,6 @@ def run_release_lock(): @dag.task(task_id="push_optimization_step") def run_push_optimization(ti=None): - import asyncio - from sqlalchemy import create_engine - from sqlalchemy import text - archive_db_engine = create_engine(f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}') task_logger.debug("Entering") @@ -98,10 +92,6 @@ def run_push_optimization(ti=None): @dag.task(task_id="recon_step") def run_reconnaissance(): - from prometheus_api_client import PrometheusConnect, MetricsList, Metric - from prometheus_api_client.utils import parse_datetime - import urllib3 - task_logger.debug("Entering") prom_conn = PrometheusConnect(url=PROMETHEUS_URL, retry=urllib3.util.retry.Retry(total=3, raise_on_status=True, backoff_factor=0.5), diff --git a/breeder/linux_network_stack/optimization.py b/breeder/linux_network_stack/optimization.py index 61b8813a..e3770215 100644 --- a/breeder/linux_network_stack/optimization.py +++ b/breeder/linux_network_stack/optimization.py @@ -8,9 +8,6 @@ def objective(trial, identifier): {{ local_coroutines_include()|indent }} # default is indent of 4 spaces! ###--- end coroutines ---### - import logging - from sqlalchemy import create_engine - from sqlalchemy import text logger = logging.getLogger('objective') logger.setLevel(logging.DEBUG) @@ -84,10 +81,6 @@ def create_optimization_dag(dag_id, config, identifier): ## perform optimiziation run @dag.task(task_id="optimization_step") def run_optimization(): - import optuna - from optuna.storages import InMemoryStorage - from optuna.integration import DaskStorage - from distributed import Client, wait __directions = list() diff --git a/breeder/linux_network_stack/root_dag.py b/breeder/linux_network_stack/root_dag.py index 582546cd..b69bb4e0 100644 --- a/breeder/linux_network_stack/root_dag.py +++ b/breeder/linux_network_stack/root_dag.py @@ -36,6 +36,25 @@ import hashlib import os +from sqlalchemy import create_engine +from sqlalchemy import text + +import optuna +from optuna.storages import InMemoryStorage +from optuna.integration import DaskStorage +from distributed import Client, wait + +import asyncio +import pals + +import asyncio +from sqlalchemy import create_engine +from sqlalchemy import text + +from prometheus_api_client import PrometheusConnect, MetricsList, Metric +from prometheus_api_client.utils import parse_datetime +import urllib3 + task_logger = logging.getLogger("airflow.task") task_logger.setLevel(logging.DEBUG) From 108a61467078462e95623f872b493ae65bf041cb Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sun, 19 Nov 2023 12:01:10 +0100 Subject: [PATCH 24/29] breeder opimitization segregate out objective Keep objective seperate and include it in the optuna submitting code paths. Keeping it in the submitting code path is important otherwise the function pickling of recent dask distributed will fail that it cannot find the root dag module at unpickling side, the dask worker. --- breeder/linux_network_stack/objective.py | 65 ++++++++++++++++++ breeder/linux_network_stack/optimization.py | 73 +++------------------ 2 files changed, 74 insertions(+), 64 deletions(-) create mode 100644 breeder/linux_network_stack/objective.py diff --git a/breeder/linux_network_stack/objective.py b/breeder/linux_network_stack/objective.py new file mode 100644 index 00000000..23dab924 --- /dev/null +++ b/breeder/linux_network_stack/objective.py @@ -0,0 +1,65 @@ + + +def objective(trial, identifier, archive_db_url): + +###--- definition coroutines ---### +### We have to keep to coroutines in the objective function, +### otherwise the workers do not know about those until we preinstall those. + {% macro local_coroutines_include() %}{% include 'nats_coroutines.py' %}{% endmacro %} + {{ local_coroutines_include()|indent }} # default is indent of 4 spaces! +###--- end coroutines ---### + + + logger = logging.getLogger('objective') + logger.setLevel(logging.DEBUG) + + + archive_db_engine = create_engine(archive_db_url) + + logger.warning('entering') + + # Compiling settings for effectuation + settings = [] + for setting_name, setting_config in config.get('settings').get('sysctl').items(): + constraints = setting_config.get('constraints') + step_width = setting_config.get('step') + suggested_value = trial.suggest_int(setting_name, constraints.get('lower') , constraints.get('upper'), step_width) + if setting_name in ['net.ipv4.tcp_rmem', 'net.ipv4.tcp_wmem']: + settings.append(f"sudo sysctl -w {setting_name}='4096 131072 {suggested_value}';") + else: + settings.append(f"sudo sysctl -w {setting_name}='{suggested_value}';") + settings = '\n'.join(settings) + + is_setting_explored = False + setting_id = hashlib.sha256(str.encode(settings_full)).hexdigest()[0:6] + + breeder_table_name = f"from_breeder_name" # TBD global knowledge db table nam + query = text("SELECT * FROM :table_name WHERE :table_name.setting_id == :setting_id") + + query = query.bindparams(bindparam("table_name", breeder_table_name, type_=String), + bindparam("setting_id", setting_id, type_=String)) + + archive_db_data = archive_db_engine.execute(query).fetchall() + + if archive_db_data: + is_setting_explored = True + rtt = archive_db_data[0].get('setting_result').get('rtt') + delivery_rate = archive_db_data[0].get('setting_result').get('delivery_rate') + + if not is_setting_explored: + logger.warning('doing effectuation') + settings_data = dict(settings=settings) + asyncio.run(send_msg_via_nats(subject=f'effectuation_{identifier}', data_dict=settings_data)) + + logger.warning('gathering recon') + metric = json.loads(asyncio.run(receive_msg_via_nat(subject=f'recon_{identifier}'))) + metric_value = metric.get('metric') + rtt = float(metric_value['tcp_rtt']) + delivery_rate = float(metric_value['tcp_delivery_rate_bytes']) + logger.warning(f'metric received {metric_value}') + + logger.warning('Done') + + return rtt, delivery_rate + + diff --git a/breeder/linux_network_stack/optimization.py b/breeder/linux_network_stack/optimization.py index e3770215..44b949e0 100644 --- a/breeder/linux_network_stack/optimization.py +++ b/breeder/linux_network_stack/optimization.py @@ -1,67 +1,4 @@ -def objective(trial, identifier): - -###--- definition coroutines ---### -### We have to keep to coroutines in the objective function, -### otherwise the workers do not know about those until we preinstall those. - {% macro local_coroutines_include() %}{% include 'nats_coroutines.py' %}{% endmacro %} - {{ local_coroutines_include()|indent }} # default is indent of 4 spaces! -###--- end coroutines ---### - - - logger = logging.getLogger('objective') - logger.setLevel(logging.DEBUG) - - - archive_db_engine = create_engine(f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}') - - logger.warning('entering') - - # Compiling settings for effectuation - settings = [] - for setting_name, setting_config in config.get('settings').get('sysctl').items(): - constraints = setting_config.get('constraints') - step_width = setting_config.get('step') - suggested_value = trial.suggest_int(setting_name, constraints.get('lower') , constraints.get('upper'), step_width) - if setting_name in ['net.ipv4.tcp_rmem', 'net.ipv4.tcp_wmem']: - settings.append(f"sudo sysctl -w {setting_name}='4096 131072 {suggested_value}';") - else: - settings.append(f"sudo sysctl -w {setting_name}='{suggested_value}';") - settings = '\n'.join(settings) - - is_setting_explored = False - setting_id = hashlib.sha256(str.encode(settings_full)).hexdigest()[0:6] - - breeder_table_name = f"from_breeder_name" # TBD global knowledge db table nam - query = text("SELECT * FROM :table_name WHERE :table_name.setting_id == :setting_id") - - query = query.bindparams(bindparam("table_name", breeder_table_name, type_=String), - bindparam("setting_id", setting_id, type_=String)) - - archive_db_data = archive_db_engine.execute(query).fetchall() - - if archive_db_data: - is_setting_explored = True - rtt = archive_db_data[0].get('setting_result').get('rtt') - delivery_rate = archive_db_data[0].get('setting_result').get('delivery_rate') - - if not is_setting_explored: - logger.warning('doing effectuation') - settings_data = dict(settings=settings) - asyncio.run(send_msg_via_nats(subject=f'effectuation_{identifier}', data_dict=settings_data)) - - logger.warning('gathering recon') - metric = json.loads(asyncio.run(receive_msg_via_nat(subject=f'recon_{identifier}'))) - metric_value = metric.get('metric') - rtt = float(metric_value['tcp_rtt']) - delivery_rate = float(metric_value['tcp_delivery_rate_bytes']) - logger.warning(f'metric received {metric_value}') - - logger.warning('Done') - - return rtt, delivery_rate - - def create_optimization_dag(dag_id, config, identifier): @@ -82,6 +19,14 @@ def create_optimization_dag(dag_id, config, identifier): @dag.task(task_id="optimization_step") def run_optimization(): +###--- definition objective ---### +### We have to keep the objective in the optuna invoking code scope, +### otherwise the dask distributed pickling will fail. + {% macro local_objective_includ() %}{% include 'objective.py' %}{% endmacro %} + {{ local_objective_includ()|indent(12) }} # default is indent of 4 spaces! +###--- end coroutines ---### + + archive_db_url = f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}' __directions = list() for __objective in config.get('objectives'): @@ -92,7 +37,7 @@ def run_optimization(): # Create a study using Dask-compatible storage storage = DaskStorage(InMemoryStorage()) study = optuna.create_study(directions=__directions, storage=storage) - objective_wrapped = lambda trial: objective(trial, identifier) + objective_wrapped = lambda trial: objective(trial,identifier, archive_db_url) # Optimize in parallel on your Dask cluster futures = [ client.submit(study.optimize, objective_wrapped, n_trials=10, pure=False) From edddc94d907e9fa776a7b199b45b217d25ebf4d9 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sun, 19 Nov 2023 14:08:48 +0100 Subject: [PATCH 25/29] breeder - add archive db os env including --- breeder/linux_network_stack/root_dag.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/breeder/linux_network_stack/root_dag.py b/breeder/linux_network_stack/root_dag.py index b69bb4e0..65b7033e 100644 --- a/breeder/linux_network_stack/root_dag.py +++ b/breeder/linux_network_stack/root_dag.py @@ -94,6 +94,13 @@ DLM_DB_DATABASE = os.environ.get("DLM_DB_DATABASE") DLM_DB_CONNECTION = f"postgresql://{DLM_DB_USER}:{DLM_DB_PASSWORD}@{DLM_DB_HOST}/{DLM_DB_DATABASE}" + +ARCHIVE_DB_USER = os.environ.get("ARCHIVE_DB_USER") +ARCHIVE_DB_PASSWORD = os.environ.get("ARCHIVE_DB_PASSWORD") +ARCHIVE_DB_HOST = os.environ.get("ARCHIVE_DB_HOST") +ARCHIVE_DB_PORT = os.environ.get("ARCHIVE_DB_PORT") +ARCHIVE_DB_DATABASE = os.environ.get("ARCHIVE_DB_DATABASE") + ### {% include 'effectuation.py' %} From 6385a72a3722acabed90ff261667789d7b08822a Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sun, 19 Nov 2023 14:09:54 +0100 Subject: [PATCH 26/29] airflow engine share image with dask --- docker-compose.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index caaa7601..29edce7f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,7 +19,9 @@ version: '3.4' services: control_loop: - image: apache/airflow:2.7.3-python3.9 + build: + context: ./ + dockerfile: ./Dockerfile-dask restart: always environment: - AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true From e8f07a03e182ec69615773377c5c6467fd917c7f Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sun, 19 Nov 2023 16:50:48 +0100 Subject: [PATCH 27/29] breeder - correct archive db query templating Let's not use sqlalchemy to generate the archive db interacting sql queries because that has led to complications. bindparams was not working as expect for String/Text type. --- breeder/linux_network_stack/effectuation.py | 9 ++------- breeder/linux_network_stack/objective.py | 11 ++++------- breeder/linux_network_stack/optimization.py | 2 +- 3 files changed, 7 insertions(+), 15 deletions(-) diff --git a/breeder/linux_network_stack/effectuation.py b/breeder/linux_network_stack/effectuation.py index c1ee489d..80811502 100644 --- a/breeder/linux_network_stack/effectuation.py +++ b/breeder/linux_network_stack/effectuation.py @@ -72,14 +72,9 @@ def run_push_optimization(ti=None): metric_data = dict(metric=metric_value) msg = asyncio.run(send_msg_via_nats(subject=f'recon_{identifier}', data_dict=metric_data)) + breeder_table_name = config.get("name") - breeder_table_name = f"from_dag_name" # TBD local dag id based name - - query = text("INSERT INTO :table_name VALUES (:setting_id, :setting_full, :setting_result )") - query = query.bindparams(bindparam("table_name", breeder_table_name, type_=String), - bindparam("setting_id", setting_id, type_=String), - bindparam("setting_full", settings_full, type_=String), - bindparam("setting_result", metric_data, type_=String)) + query = f"INSERT INTO {breeder_table_name} VALUES ({setting_id}, {setting_full}, {setting_result});" archive_db_engine.execute(query) diff --git a/breeder/linux_network_stack/objective.py b/breeder/linux_network_stack/objective.py index 23dab924..eed62cc2 100644 --- a/breeder/linux_network_stack/objective.py +++ b/breeder/linux_network_stack/objective.py @@ -1,6 +1,6 @@ -def objective(trial, identifier, archive_db_url): +def objective(trial, identifier, archive_db_url, breeder_name): ###--- definition coroutines ---### ### We have to keep to coroutines in the objective function, @@ -31,13 +31,10 @@ def objective(trial, identifier, archive_db_url): settings = '\n'.join(settings) is_setting_explored = False - setting_id = hashlib.sha256(str.encode(settings_full)).hexdigest()[0:6] + setting_id = hashlib.sha256(str.encode(settings)).hexdigest()[0:6] - breeder_table_name = f"from_breeder_name" # TBD global knowledge db table nam - query = text("SELECT * FROM :table_name WHERE :table_name.setting_id == :setting_id") - - query = query.bindparams(bindparam("table_name", breeder_table_name, type_=String), - bindparam("setting_id", setting_id, type_=String)) + breeder_table_name = f"{breeder_name}" + query = f"SELECT * FROM {breeder_table_name} WHERE {breeder_table_name}.setting_id = '{setting_id}';" archive_db_data = archive_db_engine.execute(query).fetchall() diff --git a/breeder/linux_network_stack/optimization.py b/breeder/linux_network_stack/optimization.py index 44b949e0..d877e0c0 100644 --- a/breeder/linux_network_stack/optimization.py +++ b/breeder/linux_network_stack/optimization.py @@ -37,7 +37,7 @@ def run_optimization(): # Create a study using Dask-compatible storage storage = DaskStorage(InMemoryStorage()) study = optuna.create_study(directions=__directions, storage=storage) - objective_wrapped = lambda trial: objective(trial,identifier, archive_db_url) + objective_wrapped = lambda trial: objective(trial,identifier, archive_db_url, config.get('name')) # Optimize in parallel on your Dask cluster futures = [ client.submit(study.optimize, objective_wrapped, n_trials=10, pure=False) From d12622f1e96b543492b4e96a4157a9c80ad00622 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sun, 19 Nov 2023 17:07:13 +0100 Subject: [PATCH 28/29] objective repair nats invocation --- breeder/linux_network_stack/objective.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/breeder/linux_network_stack/objective.py b/breeder/linux_network_stack/objective.py index eed62cc2..36e8123f 100644 --- a/breeder/linux_network_stack/objective.py +++ b/breeder/linux_network_stack/objective.py @@ -49,7 +49,7 @@ def objective(trial, identifier, archive_db_url, breeder_name): asyncio.run(send_msg_via_nats(subject=f'effectuation_{identifier}', data_dict=settings_data)) logger.warning('gathering recon') - metric = json.loads(asyncio.run(receive_msg_via_nat(subject=f'recon_{identifier}'))) + metric = json.loads(asyncio.run(receive_msg_via_nats(subject=f'recon_{identifier}'))) metric_value = metric.get('metric') rtt = float(metric_value['tcp_rtt']) delivery_rate = float(metric_value['tcp_delivery_rate_bytes']) From 37df2a475f2cfcd1146f7d0ad2edf37483b98fa4 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Sun, 19 Nov 2023 17:13:25 +0100 Subject: [PATCH 29/29] breeders - correct pals db credentials --- docker-compose.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 29edce7f..b2fbd234 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -43,8 +43,8 @@ services: - META_DB_PASSWORD=meta_data - META_DB_HOSTNAME=meta-data-db - META_DB_PORT=5432 - - DLM_DB_USER= - - DLM_DB_PASSWORD= + - DLM_DB_USER=locking + - DLM_DB_PASSWORD=locking - DLM_DB_HOST=locks_db - DLM_DB_DATABASE=distributed_locking - DASK_ENDPOINT=dask-scheduler:8786 @@ -161,8 +161,8 @@ services: - META_DB_PASSWORD=meta_data - META_DB_HOSTNAME=meta-data-db - META_DB_PORT=5432 - - DLM_DB_USER= - - DLM_DB_PASSWORD= + - DLM_DB_USER=locking + - DLM_DB_PASSWORD=locking - DLM_DB_HOST=locks_db - DLM_DB_DATABASE=distributed_locking - DASK_ENDPOINT=dask-scheduler:8786 @@ -208,8 +208,8 @@ services: - META_DB_PASSWORD=meta_data - META_DB_HOSTNAME=meta-data-db - META_DB_PORT=5432 - - DLM_DB_USER= - - DLM_DB_PASSWORD= + - DLM_DB_USER=locking + - DLM_DB_PASSWORD=locking - DLM_DB_HOST=locks_db - DLM_DB_DATABASE=distributed_locking - DASK_ENDPOINT=dask-scheduler:8786