Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

include results from integration test of archive db based cooperation #147

Merged
merged 29 commits into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a8870e2
correct airflow dask cluster address setting
cherusk Nov 17, 2023
a46dcc9
api - repair response msg templating
cherusk Nov 17, 2023
53badf4
api - create pass full request config to renderer
cherusk Nov 17, 2023
b2d5555
api - create correct variable scope
cherusk Nov 17, 2023
840ebba
breeder - decentralize dependency imports
cherusk Nov 17, 2023
d2583af
breeder - repair missing task steps
cherusk Nov 17, 2023
c613d33
breeder - drop debug config print task
cherusk Nov 17, 2023
179f5cb
prepare dask airflow workers
cherusk Nov 17, 2023
fe639f4
bump airflow and use airflow for dask base
cherusk Nov 17, 2023
2ef0c9b
downgrade urllib
cherusk Nov 17, 2023
b1292d2
drop preinstalled components from requirements
cherusk Nov 17, 2023
2b4d08b
breeders - use hashlib sha256 as determinstic id
cherusk Nov 18, 2023
8c21dad
godon - adapt airflow dask workers invocation
cherusk Nov 18, 2023
a2f0b15
airflow ssh operator update to 2.7 interface
cherusk Nov 18, 2023
1d26e4b
breeder - correct objectives deref from config
cherusk Nov 18, 2023
84cf954
godon airflow share logs
cherusk Nov 18, 2023
78511c6
correct dask scheduler endpoint for optuna dask
cherusk Nov 18, 2023
27b00a6
breedre repair env variables passing
cherusk Nov 18, 2023
ba776f4
correct root breeder dependencies
cherusk Nov 18, 2023
d4c1d88
breeder optimization drop best trial dump
cherusk Nov 18, 2023
ccf0f23
bump optuna version
cherusk Nov 18, 2023
398e9c1
breeder avoid collision of objective name
cherusk Nov 18, 2023
d8dcdfd
breeders - centralize dag task dependencies again
cherusk Nov 19, 2023
108a614
breeder opimitization segregate out objective
cherusk Nov 19, 2023
edddc94
breeder - add archive db os env including
cherusk Nov 19, 2023
6385a72
airflow engine share image with dask
cherusk Nov 19, 2023
e8f07a0
breeder - correct archive db query templating
cherusk Nov 19, 2023
d12622f
objective repair nats invocation
cherusk Nov 19, 2023
37df2a4
breeders - correct pals db credentials
cherusk Nov 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile-dask
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
FROM daskdev/dask:2022.12.1-py3.9
FROM apache/airflow:2.7.3-python3.9
COPY requirements.txt /
RUN pip install --no-cache-dir -r /requirements.txt
17 changes: 9 additions & 8 deletions api/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import os
import time
import datetime
import hashlib
from pprint import pprint
from dateutil.parser import parse as dateutil_parser

Expand Down Expand Up @@ -125,7 +126,7 @@ def breeders_name_delete(breeder_name): # noqa: E501
breeder_name=breeder_name)
archive.archive_db.execute(db_info=db_config, query=__query)

return Response(json.dumps(dict(message="Purged Breeder named {breeder_name}")),
return Response(json.dumps(dict(message=f"Purged Breeder named {breeder_name}")),
status=200,
mimetype='application/json')

Expand Down Expand Up @@ -186,18 +187,18 @@ def breeders_post(content): # noqa: E501

"""

api_response = dict(connection=None, breeder=None)

breeder_config_full = content
breeder_config = dict(content.get('breeder'))
breeder_id = breeder_config.get('name')

def create_breeder(api_client, content):
api_instance = dag_run_api.DAGRunApi(api_client)
breeder_config = dict(content.get('breeder'))
breeder_id = breeder_config.get('name')

# templating related
environment = Environment(loader=FileSystemLoader(DAG_TEMPLATES_DIR))
template = environment.get_template("root_dag.py")
filename = f"{DAG_DIR}/root_dag.py"
rendered_dag = template.render(breeder_config)
rendered_dag = template.render(breeder_config_full)

with open(filename, mode="w", encoding="utf-8") as dag_file:
dag_file.write(rendered_dag)
Expand All @@ -221,7 +222,7 @@ def create_breeder(api_client, content):
archive.archive_db.execute(db_info=db_config, query=__query)

for target in targets:
identifier = str(abs(hash(target.get('address'))))[0:6]
identifier = hashlib.sha256(str.encode(target.get('address'))).hexdigest()[0:6]
for run_id in range(0, parallel_runs):
dag_id = f'{dag_name}_{run_id}_{identifier}'

Expand Down Expand Up @@ -258,7 +259,7 @@ def create_breeder(api_client, content):
#api_response['breeder'] = create_breeder(api_client, content).to_dict()
create_breeder(api_client, content)

return Response(json.dumps(dict(message="Created Breeder named {breeder_id}")),
return Response(json.dumps(dict(message=f"Created Breeder named {breeder_id}")),
status=200,
mimetype='application/json')

Expand Down
39 changes: 17 additions & 22 deletions breeder/linux_network_stack/effectuation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,9 @@ def create_target_interaction_dag(dag_id, config, target, identifier):

with dag as interaction_dag:

dump_config = BashOperator(
task_id='print_config',
bash_command='echo ${config}',
env={"config": str(config)},
dag=interaction_dag,
)

@dag.task(task_id="pull_optimization_step")
def run_pull_optimization():

task_logger.debug("Entering")

msg = asyncio.run(receive_msg_via_nats(subject=f'effectuation_{identifier}'))
Expand All @@ -36,14 +30,17 @@ def run_pull_optimization():
def run_aquire_lock():
task_logger.debug("Entering")

dlm_lock = LOCKER.lock(target)

locker = pals.Locker('network_breeder_effectuation', DLM_DB_CONNECTION)

dlm_lock = locker.lock(target)

if not dlm_lock.acquire(acquire_timeout=600):
task_logger.debug("Could not aquire lock for {target}")

return dlm_lock

aquire_lock_step = run_aquire_lock
aquire_lock_step = run_aquire_lock()


@dag.task(task_id="release_lock_step")
Expand All @@ -56,33 +53,30 @@ def run_release_lock():

return dlm_lock

release_lock_step = run_release_lock
release_lock_step = run_release_lock()


@dag.task(task_id="push_optimization_step")
def run_push_optimization(ti=None):

archive_db_engine = create_engine(f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}')
task_logger.debug("Entering")

metric_value = ti.xcom_pull(task_ids="recon_step")
settings_full = ti.xcom_pull(task_ids="pull_optimization_step")

setting_id = str(abs(hash(settings_full)))
setting_id = hashlib.sha256(str.encode(settings_full)).hexdigest()[0:6]

task_logger.debug(f"Metric : f{metric_value}")

metric_data = dict(metric=metric_value)
msg = asyncio.run(send_msg_via_nats(subject=f'recon_{identifier}', data_dict=metric_data))

breeder_table_name = config.get("name")

breeder_table_name = f"from_dag_name" # TBD local dag id based name
query = f"INSERT INTO {breeder_table_name} VALUES ({setting_id}, {setting_full}, {setting_result});"

query = text("INSERT INTO :table_name VALUES (:setting_id, :setting_full, :setting_result )")
query = query.bindparams(bindparam("table_name", breeder_table_name, type_=String),
bindparam("setting_id", setting_id, type_=String),
bindparam("setting_full", settings_full, type_=String),
bindparam("setting_result", metric_data, type_=String))

ARCHIVE_DB_ENGINE.execute(query)
archive_db_engine.execute(query)

task_logger.debug("Done")

Expand All @@ -92,6 +86,7 @@ def run_push_optimization(ti=None):

@dag.task(task_id="recon_step")
def run_reconnaissance():

task_logger.debug("Entering")
prom_conn = PrometheusConnect(url=PROMETHEUS_URL,
retry=urllib3.util.retry.Retry(total=3, raise_on_status=True, backoff_factor=0.5),
Expand Down Expand Up @@ -126,15 +121,15 @@ def run_reconnaissance():
remote_host=target.get('address'),
username=target.get('user'),
key_file=target.get('key_file'),
timeout=30,
conn_timeout=30,
keepalive_interval=10
)

{% raw %}
effectuation_step = SSHOperator(
ssh_hook=_ssh_hook,
task_id='effectuation',
timeout=30,
conn_timeout=30,
command="""
{{ ti.xcom_pull(task_ids='pull_optimization_step') }}
""",
Expand Down Expand Up @@ -176,7 +171,7 @@ def is_stop_criteria_reached(iteration):

stop_step = EmptyOperator(task_id="stop_task", dag=interaction_dag)

dump_config >> pull_step >> aquire_lock_step >> effectuation_step >> recon_step >> release_lock_step >> push_step >> run_iter_count_step >> stopping_conditional_step >> [continue_step, stop_step]
pull_step >> aquire_lock_step >> effectuation_step >> recon_step >> release_lock_step >> push_step >> run_iter_count_step >> stopping_conditional_step >> [continue_step, stop_step]

return dag

3 changes: 3 additions & 0 deletions breeder/linux_network_stack/nats_coroutines.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ async def send_msg_via_nats(subject=None, data_dict=None):
{% endraw %}

async def receive_msg_via_nats(subject=None, timeout=300):
import nats
import time
import sys
# Connect to NATS Server.
nc = await nats.connect(NATS_SERVER_URL)
sub = await nc.subscribe(f'{subject}')
Expand Down
62 changes: 62 additions & 0 deletions breeder/linux_network_stack/objective.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@


def objective(trial, identifier, archive_db_url, breeder_name):

###--- definition coroutines ---###
### We have to keep to coroutines in the objective function,
### otherwise the workers do not know about those until we preinstall those.
{% macro local_coroutines_include() %}{% include 'nats_coroutines.py' %}{% endmacro %}
{{ local_coroutines_include()|indent }} # default is indent of 4 spaces!
###--- end coroutines ---###


logger = logging.getLogger('objective')
logger.setLevel(logging.DEBUG)


archive_db_engine = create_engine(archive_db_url)

logger.warning('entering')

# Compiling settings for effectuation
settings = []
for setting_name, setting_config in config.get('settings').get('sysctl').items():
constraints = setting_config.get('constraints')
step_width = setting_config.get('step')
suggested_value = trial.suggest_int(setting_name, constraints.get('lower') , constraints.get('upper'), step_width)
if setting_name in ['net.ipv4.tcp_rmem', 'net.ipv4.tcp_wmem']:
settings.append(f"sudo sysctl -w {setting_name}='4096 131072 {suggested_value}';")
else:
settings.append(f"sudo sysctl -w {setting_name}='{suggested_value}';")
settings = '\n'.join(settings)

is_setting_explored = False
setting_id = hashlib.sha256(str.encode(settings)).hexdigest()[0:6]

breeder_table_name = f"{breeder_name}"
query = f"SELECT * FROM {breeder_table_name} WHERE {breeder_table_name}.setting_id = '{setting_id}';"

archive_db_data = archive_db_engine.execute(query).fetchall()

if archive_db_data:
is_setting_explored = True
rtt = archive_db_data[0].get('setting_result').get('rtt')
delivery_rate = archive_db_data[0].get('setting_result').get('delivery_rate')

if not is_setting_explored:
logger.warning('doing effectuation')
settings_data = dict(settings=settings)
asyncio.run(send_msg_via_nats(subject=f'effectuation_{identifier}', data_dict=settings_data))

logger.warning('gathering recon')
metric = json.loads(asyncio.run(receive_msg_via_nats(subject=f'recon_{identifier}')))
metric_value = metric.get('metric')
rtt = float(metric_value['tcp_rtt'])
delivery_rate = float(metric_value['tcp_delivery_rate_bytes'])
logger.warning(f'metric received {metric_value}')

logger.warning('Done')

return rtt, delivery_rate


85 changes: 16 additions & 69 deletions breeder/linux_network_stack/optimization.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,4 @@

def objective(trial, identifier):

###--- definition coroutines ---###
### We have to keep to coroutines in the objective function,
### otherwise the workers do not know about those until we preinstall those.
{% macro local_coroutines_include() %}{% include 'nats_coroutines.py' %}{% endmacro %}
{{ local_coroutines_include()|indent }} # default is indent of 4 spaces!
###--- end coroutines ---###

import logging
logger = logging.getLogger('objective')
logger.setLevel(logging.DEBUG)

logger.warning('entering')

# Compiling settings for effectuation
settings = []
for setting_name, setting_config in config.get('settings').get('sysctl').items():
constraints = setting_config.get('constraints')
step_width = setting_config.get('step')
suggested_value = trial.suggest_int(setting_name, constraints.get('lower') , constraints.get('upper'), step_width)
if setting_name in ['net.ipv4.tcp_rmem', 'net.ipv4.tcp_wmem']:
settings.append(f"sudo sysctl -w {setting_name}='4096 131072 {suggested_value}';")
else:
settings.append(f"sudo sysctl -w {setting_name}='{suggested_value}';")
settings = '\n'.join(settings)

is_setting_explored = False
setting_id = str(abs(hash(settings)))

breeder_table_name = f"from_breeder_name" # TBD global knowledge db table nam
query = text("SELECT * FROM :table_name WHERE :table_name.setting_id == :setting_id")

query = query.bindparams(bindparam("table_name", breeder_table_name, type_=String),
bindparam("setting_id", setting_id, type_=String))

archive_db_data = ARCHIVE_DB_ENGINE.execute(query).fetchall()

if archive_db_data:
is_setting_explored = True
rtt = archive_db_data[0].get('setting_result').get('rtt')
delivery_rate = archive_db_data[0].get('setting_result').get('delivery_rate')

if not is_setting_explored:
logger.warning('doing effectuation')
settings_data = dict(settings=settings)
asyncio.run(send_msg_via_nats(subject=f'effectuation_{identifier}', data_dict=settings_data))

logger.warning('gathering recon')
metric = json.loads(asyncio.run(receive_msg_via_nat(subject=f'recon_{identifier}')))
metric_value = metric.get('metric')
rtt = float(metric_value['tcp_rtt'])
delivery_rate = float(metric_value['tcp_delivery_rate_bytes'])
logger.warning(f'metric received {metric_value}')

logger.warning('Done')

return rtt, delivery_rate



def create_optimization_dag(dag_id, config, identifier):

Expand All @@ -69,36 +9,43 @@ def create_optimization_dag(dag_id, config, identifier):

with dag as optimization_dag:

dump_config = BashOperator(
task_id='print_config',
bash_command='echo ${config}',
env={"config": str(config)},
noop = BashOperator(
task_id='noop',
bash_command='echo "noop"',
dag=optimization_dag,
)

## perform optimiziation run
@dag.task(task_id="optimization_step")
def run_optimization():

###--- definition objective ---###
### We have to keep the objective in the optuna invoking code scope,
### otherwise the dask distributed pickling will fail.
{% macro local_objective_includ() %}{% include 'objective.py' %}{% endmacro %}
{{ local_objective_includ()|indent(12) }} # default is indent of 4 spaces!
###--- end coroutines ---###

archive_db_url = f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}'
__directions = list()

for objective in config.get('objectvices'):
direction = objective.get('direction')
for __objective in config.get('objectives'):
direction = __objective.get('direction')
__directions.append(direction)

with Client(address=DASK_SERVER_ENDPOINT) as client:
# Create a study using Dask-compatible storage
storage = DaskStorage(InMemoryStorage())
study = optuna.create_study(directions=__directions, storage=storage)
objective_wrapped = lambda trial: objective(trial, identifier)
objective_wrapped = lambda trial: objective(trial,identifier, archive_db_url, config.get('name'))
# Optimize in parallel on your Dask cluster
futures = [
client.submit(study.optimize, objective_wrapped, n_trials=10, pure=False)
]
wait(futures, timeout=7200)
print(f"Best params: {study.best_params}")

optimization_step = run_optimization()

dump_config >> optimization_step
noop >> optimization_step

return dag
Loading
Loading