diff --git a/.github/workflows/code-checks.yaml b/.github/workflows/code-checks.yaml index 8c48a3015..7098b0732 100644 --- a/.github/workflows/code-checks.yaml +++ b/.github/workflows/code-checks.yaml @@ -26,6 +26,8 @@ jobs: --exclude-dir='flower-client' --exclude='tests.py' --exclude='controller_cmd.py' + --exclude='combiner_cmd.py' + --exclude='run_cmd.py' --exclude='README.rst' '^[ \t]+(import|from) ' -I . diff --git a/LICENSE b/LICENSE index a8b7d2c09..ddf746a6b 100644 --- a/LICENSE +++ b/LICENSE @@ -1,3 +1,5 @@ +Copyright 2021 Scaleout Systems AB. All rights reserved. + Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 000000000..b1504311c --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,2 @@ +include README.rst +include fedn/common/settings-controller.yaml.template \ No newline at end of file diff --git a/config/settings-client.yaml.local.template b/config/settings-client.yaml.local.template new file mode 100644 index 000000000..e48e779af --- /dev/null +++ b/config/settings-client.yaml.local.template @@ -0,0 +1,3 @@ +network_id: fedn-network +discover_host: localhost +discover_port: 8092 diff --git a/config/settings-combiner.yaml.local.template b/config/settings-combiner.yaml.local.template new file mode 100644 index 000000000..b49917389 --- /dev/null +++ b/config/settings-combiner.yaml.local.template @@ -0,0 +1,31 @@ +network_id: fedn-network + +name: combiner +host: localhost +address: localhost +port: 12080 +max_clients: 30 + +cert_path: tmp/server.crt +key_path: tmp/server.key + +statestore: + type: MongoDB + mongo_config: + username: fedn_admin + password: password + host: localhost + port: 6534 + +storage: + storage_type: S3 + storage_config: + storage_hostname: localhost + storage_port: 9000 + storage_access_key: fedn_admin + storage_secret_key: password + storage_bucket: fedn-models + context_bucket: fedn-context + storage_secure_mode: False + + diff --git a/config/settings-combiner.yaml.template b/config/settings-combiner.yaml.template index 8cef6643a..11911cc6f 100644 --- a/config/settings-combiner.yaml.template +++ b/config/settings-combiner.yaml.template @@ -7,4 +7,23 @@ host: combiner port: 12080 max_clients: 30 +statestore: + type: MongoDB + mongo_config: + username: fedn_admin + password: password + host: mongo + port: 6534 + +storage: + storage_type: S3 + storage_config: + storage_hostname: minio + storage_port: 9000 + storage_access_key: fedn_admin + storage_secret_key: password + storage_bucket: fedn-models + context_bucket: fedn-context + storage_secure_mode: False + diff --git a/config/settings-controller.yaml.local.template b/config/settings-controller.yaml.local.template new file mode 100644 index 000000000..a5266a38b --- /dev/null +++ b/config/settings-controller.yaml.local.template @@ -0,0 +1,24 @@ +network_id: fedn-network +controller: + host: localhost + port: 8092 + debug: True + +statestore: + type: MongoDB + mongo_config: + username: fedn_admin + password: password + host: localhost + port: 6534 + +storage: + storage_type: S3 + storage_config: + storage_hostname: localhost + storage_port: 9000 + storage_access_key: fedn_admin + storage_secret_key: password + storage_bucket: fedn-models + context_bucket: fedn-context + storage_secure_mode: False diff --git a/docker-compose.yaml b/docker-compose.yaml index c3620e79d..26291748f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -87,6 +87,8 @@ services: environment: - PYTHONUNBUFFERED=0 - GET_HOSTS_FROM=dns + - STATESTORE_CONFIG=/app/config/settings-combiner.yaml + - MODELSTORAGE_CONFIG=/app/config/settings-combiner.yaml build: context: . args: diff --git a/docs/_static/css/elements.css b/docs/_static/css/elements.css index 8c3b1fdd2..b4efe08f5 100644 --- a/docs/_static/css/elements.css +++ b/docs/_static/css/elements.css @@ -19,7 +19,6 @@ article ul { .rst-content .section ul li, .rst-content .toctree-wrapper ul li, -.rst-content section ul li, .wy-plain-list-disc li, article ul li { list-style: none; diff --git a/docs/conf.py b/docs/conf.py index f52a282b3..1c3cdcbb3 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -8,11 +8,10 @@ # Project info project = "FEDn" -copyright = "2021, Scaleout Systems AB" author = "Scaleout Systems AB" # The full version, including alpha/beta/rc tags -release = "0.12.1" +release = "0.14.0" # Add any Sphinx extension module names here, as strings extensions = [ @@ -98,8 +97,7 @@ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - (master_doc, "fedn", "FEDn Documentation", author, "fedn", - "One line description of project.", "Miscellaneous"), + (master_doc, "fedn", "FEDn Documentation", author, "fedn", "One line description of project.", "Miscellaneous"), ] # Bibliographic Dublin Core info. diff --git a/docs/img/studio_model_overview.png b/docs/img/studio_model_overview.png index 62deed68c..7645bfc1f 100644 Binary files a/docs/img/studio_model_overview.png and b/docs/img/studio_model_overview.png differ diff --git a/docs/projects.rst b/docs/projects.rst index 4ff75549f..ff4c6e735 100644 --- a/docs/projects.rst +++ b/docs/projects.rst @@ -385,6 +385,15 @@ Then, standing inside the 'client folder', you can test *train* and *validate* b python train.py ../seed.npz ../model_update.npz --data_path data/clients/1/mnist.pt python validate.py ../model_update.npz ../validation.json --data_path data/clients/1/mnist.pt +You can also test *train* and *validate* entrypoint using CLI command: + +.. note:: Before running the fedn run train or fedn run validate commands, make sure to download the training and test data. The downloads are usually handled by the "fedn run startup" command in the examples provided by FEDn. + +.. code-block:: bash + + fedn run train --path client --input --output + fedn run validate --path client --input --output + Packaging for training on FEDn =============================== diff --git a/docs/quickstart.rst b/docs/quickstart.rst index 47ddeaa2e..c81bf9752 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -61,13 +61,13 @@ code used by the client to execute local training and local validation. The seed For a detailed explaination of the compute package and seed model, see this guide: :ref:`projects-label` To work through this quick start you need a local copy of the ``mnist-pytorch`` example project contained in the main FEDn Git repository. -The following command clones the entire repository but you will only use the example: +Clone the repository using the following command, if you didn't already do it in the previous step: .. code-block:: bash git clone https://github.com/scaleoutsystems/fedn.git -Locate into the ``fedn/examples/mnist-pytorch`` folder. The compute package is located in the folder ``client``. +Navigate to the ``fedn/examples/mnist-pytorch`` folder. The compute package is located in the folder ``client``. Create a compute package: @@ -94,31 +94,39 @@ Next will now upload these files to your Studio project. 3. Initialize the server-side ------------------------------ -The next step is to initialize the server side with the client code and the initial global model. -In the Studio UI, navigate to the project you created in step one and click on the "Sessions" tab. Click on the "New Session" button. Under the "Compute package" tab, select a name and upload the generated package file. Under the "Seed model" tab, upload the generated seed file: +The next step is to initialize the server side with the client code and the initial global model. In the Studio UI, + +#. Navigate to the project you created in step 1 and click on the "Sessions" tab. +#. Click on the "New Session" button. +#. Under the "Compute package" tab, select a name and upload the generated package file. +#. Under the "Seed model" tab, upload the generated seed file: .. image:: img/upload_package.png +Continue to step 4 before starting the session. The uploaded package and seed files are saved. + 4. Start clients ----------------- Now we are ready to start FEDn clients on your local machine. There are two steps involved: -1. Register a new client in your Studio project, issuing an access token. -2. Start up a client process on your local host (using the token to connect securely) +#. Register a new client in your Studio project, issuing an access token. +#. Start up a client process on your local host (using the token to connect securely) **Generate an access token for the client (in Studio)** Each local client needs an access token in order to connect securely to the FEDn server. These tokens are issued from your Studio Project. -Go to the Clients' tab and click 'Connect client'. Download a client configuration file and save it to the root of the ``examples/mnist-pytorch folder``. -Rename the file to 'client.yaml'. + +#. Go to the 'Clients' tab and click 'Connect client'. +#. Download a client configuration file and save it to the root of the ``examples/mnist-pytorch`` folder. +#. Rename the file to 'client.yaml'. **Start the client (on your local machine)** The default training and test data for this particular example (mnist-pytorch) is for convenience downloaded and split automatically by the client when it starts up. The number of splits and which split to use by a client can be controlled via the environment variables ``FEDN_NUM_DATA_SPLITS`` and ``FEDN_DATA_PATH``. -Start a client (using a 10-split and the first partition) by running the following commands: +Start a client (using a 10-split and the 1st partition) by running the following commands: .. tabs:: @@ -139,7 +147,7 @@ Start a client (using a 10-split and the first partition) by running the followi fedn run client -in client.yaml --secure=True --force-ssl Repeat these two steps (generate an access token and start a local client) for the number of clients you want to use. -A normal laptop should be able to handle several clients for this example. Remember to use different partitions for each client. +A normal laptop should be able to handle several clients for this example. Remember to use different partitions for each client, by changing the number in the ``FEDN_DATA_PATH`` variable. 5. Train the global model ----------------------------- @@ -147,12 +155,17 @@ A normal laptop should be able to handle several clients for this example. Remem With clients connected, we are now ready to train the global model. This can be done using either the Studio dashboard or the Python API. In FEDn, training is organised in Sessions. One training session consists of a configurable number of training rounds (local model updates and aggregation). -In Studio click on the "Sessions" link, then the "New session" button in the upper right corner. Click the "Start session" tab and enter your desirable settings (the default settings are good for this example) and hit the "Start run" button. +In Studio, + +#. Click on the "Sessions" link, then the "New session" button in the upper right corner. +#. Click the "Start session" tab and enter your desirable settings (the default settings are good for this example). +#. Hit the "Start run" button. + In the terminal where your are running your client you should now see some activity. When a round is completed, you can see the results on the "Models" page. **Watch real-time updates of training progress** -Once a training session is started, you can monitor the progress by clicking the drop-down button for the active Sessions and the clicking on the "View session" button. The session page will show +Once a training session is started, you can monitor the progress by clicking the drop-down button for the active Sessions and then clicking on the "View session" button. The session page will show metrics related to the training progress (accuracy, loss etc), as well as performance data such as total round times and individual client training times. A list of models in the session is updated as soon as new models are generated. To get more information about a particular model, navigate to the model page by clicking the model name. From the model page you can download the model weights and get validation metrics. @@ -173,19 +186,19 @@ be useful as you progress in your federated learning journey. With you first FEDn federated project set up, we suggest that you take a closer look at how a FEDn project is structured to learn how to develop your own FEDn projects: -- :ref:`projects-label` +:ref:`projects-label` In this tutorial we relied on the UI for running training sessions and retrieving models and results. The Python APIClient provides a flexible alternative, with additional functionality exposed, including the use of different aggregators. Learn how to use the APIClient here: -- :ref:`apiclient-label` +:ref:`apiclient-label` Study the architecture overview to learn more about how FEDn is designed and works under the hood: -- :ref:`architecture-label` +:ref:`architecture-label` For developers looking to customize FEDn and develop own aggregators, check out the local development guide to learn how to set up an all-in-one development environment using Docker and docker-compose: -- :ref:`developer-label` +:ref:`developer-label` diff --git a/examples/mnist-pytorch/client/python_env.yaml b/examples/mnist-pytorch/client/python_env.yaml index afdea926f..f43d2353d 100644 --- a/examples/mnist-pytorch/client/python_env.yaml +++ b/examples/mnist-pytorch/client/python_env.yaml @@ -4,6 +4,6 @@ build_dependencies: - setuptools - wheel dependencies: - - torch==2.3.1 - - torchvision==0.18.1 + - torch + - torchvision - fedn diff --git a/fedn/cli/client_cmd.py b/fedn/cli/client_cmd.py index be6b233af..8a68399ed 100644 --- a/fedn/cli/client_cmd.py +++ b/fedn/cli/client_cmd.py @@ -6,8 +6,8 @@ from fedn.common.exceptions import InvalidClientConfig from fedn.network.clients.client import Client -from .main import main -from .shared import CONTROLLER_DEFAULTS, apply_config, get_api_url, get_token, print_response +from fedn.cli.main import main +from fedn.cli.shared import CONTROLLER_DEFAULTS, apply_config, get_api_url, get_token, print_response def validate_client_config(config): diff --git a/fedn/cli/combiner_cmd.py b/fedn/cli/combiner_cmd.py index 02a797448..3e7753e80 100644 --- a/fedn/cli/combiner_cmd.py +++ b/fedn/cli/combiner_cmd.py @@ -3,8 +3,6 @@ import click import requests -from fedn.network.combiner.combiner import Combiner - from .main import main from .shared import CONTROLLER_DEFAULTS, apply_config, get_api_url, get_token, print_response @@ -12,8 +10,7 @@ @main.group("combiner") @click.pass_context def combiner_cmd(ctx): - """:param ctx: - """ + """:param ctx:""" pass @@ -60,6 +57,8 @@ def start_cmd(ctx, discoverhost, discoverport, token, name, host, port, fqdn, se click.echo(f"\nCombiner configuration loaded from file: {init}") click.echo("Values set in file override defaults and command line arguments...\n") + from fedn.network.combiner.combiner import Combiner + combiner = Combiner(config) combiner.run() diff --git a/fedn/cli/controller_cmd.py b/fedn/cli/controller_cmd.py index ab8727b27..443628c9a 100644 --- a/fedn/cli/controller_cmd.py +++ b/fedn/cli/controller_cmd.py @@ -1,6 +1,6 @@ import click -from .main import main +from fedn.cli.main import main @main.group("controller") diff --git a/fedn/cli/run_cmd.py b/fedn/cli/run_cmd.py index ba6ff8c96..0342bb39d 100644 --- a/fedn/cli/run_cmd.py +++ b/fedn/cli/run_cmd.py @@ -4,16 +4,15 @@ import click import yaml + +from fedn.cli.client_cmd import validate_client_config +from fedn.cli.main import main +from fedn.cli.shared import apply_config from fedn.common.exceptions import InvalidClientConfig from fedn.common.log_config import logger from fedn.network.clients.client import Client -from fedn.network.combiner.combiner import Combiner from fedn.utils.dispatcher import Dispatcher, _read_yaml_file -from .client_cmd import validate_client_config -from .main import main -from .shared import apply_config - def get_statestore_config_from_file(init): """:param init: @@ -37,18 +36,38 @@ def check_helper_config_file(config): return helper +def check_yaml_exists(path): + """Check if fedn.yaml exists in the given path.""" + yaml_file = os.path.join(path, "fedn.yaml") + if not os.path.exists(yaml_file): + logger.error(f"Could not find fedn.yaml in {path}") + click.echo(f"Could not find fedn.yaml in {path}") + exit(-1) + return yaml_file + + +def delete_virtual_environment(dispatcher): + if dispatcher.python_env_path: + logger.info(f"Removing virtualenv {dispatcher.python_env_path}") + shutil.rmtree(dispatcher.python_env_path) + else: + logger.warning("No virtualenv found to remove.") + + @main.group("run") @click.pass_context def run_cmd(ctx): - """:param ctx: - """ + """:param ctx:""" pass + + @run_cmd.command("validate") @click.option("-p", "--path", required=True, help="Path to package directory containing fedn.yaml") -@click.option("-i", "--input", required=True, help="Path to input model" ) -@click.option("-o", "--output", required=True,help="Path to write the output JSON containing validation metrics") +@click.option("-i", "--input", required=True, help="Path to input model") +@click.option("-o", "--output", required=True, help="Path to write the output JSON containing validation metrics") +@click.option("-v", "--keep-venv", is_flag=True, required=False, help="Use flag to keep the python virtual environment (python_env in fedn.yaml)") @click.pass_context -def validate_cmd(ctx, path,input,output): +def validate_cmd(ctx, path, input, output, keep_venv): """Execute 'validate' entrypoint in fedn.yaml. :param ctx: @@ -56,10 +75,7 @@ def validate_cmd(ctx, path,input,output): :type path: str """ path = os.path.abspath(path) - yaml_file = os.path.join(path, "fedn.yaml") - if not os.path.exists(yaml_file): - logger.error(f"Could not find fedn.yaml in {path}") - exit(-1) + yaml_file = check_yaml_exists(path) config = _read_yaml_file(yaml_file) # Check that validate is defined in fedn.yaml under entry_points @@ -70,17 +86,17 @@ def validate_cmd(ctx, path,input,output): dispatcher = Dispatcher(config, path) _ = dispatcher._get_or_create_python_env() dispatcher.run_cmd("validate {} {}".format(input, output)) + if not keep_venv: + delete_virtual_environment(dispatcher) + - # delete the virtualenv - if dispatcher.python_env_path: - logger.info(f"Removing virtualenv {dispatcher.python_env_path}") - shutil.rmtree(dispatcher.python_env_path) @run_cmd.command("train") @click.option("-p", "--path", required=True, help="Path to package directory containing fedn.yaml") -@click.option("-i", "--input", required=True, help="Path to input model parameters" ) -@click.option("-o", "--output", required=True,help="Path to write the updated model parameters ") +@click.option("-i", "--input", required=True, help="Path to input model parameters") +@click.option("-o", "--output", required=True, help="Path to write the updated model parameters ") +@click.option("-v", "--keep-venv", is_flag=True, required=False, help="Use flag to keep the python virtual environment (python_env in fedn.yaml)") @click.pass_context -def train_cmd(ctx, path,input,output): +def train_cmd(ctx, path, input, output, keep_venv): """Execute 'train' entrypoint in fedn.yaml. :param ctx: @@ -88,10 +104,7 @@ def train_cmd(ctx, path,input,output): :type path: str """ path = os.path.abspath(path) - yaml_file = os.path.join(path, "fedn.yaml") - if not os.path.exists(yaml_file): - logger.error(f"Could not find fedn.yaml in {path}") - exit(-1) + yaml_file = check_yaml_exists(path) config = _read_yaml_file(yaml_file) # Check that train is defined in fedn.yaml under entry_points @@ -102,15 +115,15 @@ def train_cmd(ctx, path,input,output): dispatcher = Dispatcher(config, path) _ = dispatcher._get_or_create_python_env() dispatcher.run_cmd("train {} {}".format(input, output)) + if not keep_venv: + delete_virtual_environment(dispatcher) + - # delete the virtualenv - if dispatcher.python_env_path: - logger.info(f"Removing virtualenv {dispatcher.python_env_path}") - shutil.rmtree(dispatcher.python_env_path) @run_cmd.command("startup") @click.option("-p", "--path", required=True, help="Path to package directory containing fedn.yaml") +@click.option("-v", "--keep-venv", is_flag=True, required=False, help="Use flag to keep the python virtual environment (python_env in fedn.yaml)") @click.pass_context -def startup_cmd(ctx, path): +def startup_cmd(ctx, path, keep_venv): """Execute 'startup' entrypoint in fedn.yaml. :param ctx: @@ -118,30 +131,25 @@ def startup_cmd(ctx, path): :type path: str """ path = os.path.abspath(path) - yaml_file = os.path.join(path, "fedn.yaml") - if not os.path.exists(yaml_file): - logger.error(f"Could not find fedn.yaml in {path}") - exit(-1) + yaml_file = check_yaml_exists(path) config = _read_yaml_file(yaml_file) # Check that startup is defined in fedn.yaml under entry_points if "startup" not in config["entry_points"]: logger.error("No startup command defined in fedn.yaml") exit(-1) - dispatcher = Dispatcher(config, path) _ = dispatcher._get_or_create_python_env() dispatcher.run_cmd("startup") + if not keep_venv: + delete_virtual_environment(dispatcher) - # delete the virtualenv - if dispatcher.python_env_path: - logger.info(f"Removing virtualenv {dispatcher.python_env_path}") - shutil.rmtree(dispatcher.python_env_path) @run_cmd.command("build") @click.option("-p", "--path", required=True, help="Path to package directory containing fedn.yaml") +@click.option("-v", "--keep-venv", is_flag=True, required=False, help="Use flag to keep the python virtual environment (python_env in fedn.yaml)") @click.pass_context -def build_cmd(ctx, path): +def build_cmd(ctx, path, keep_venv): """Execute 'build' entrypoint in fedn.yaml. :param ctx: @@ -149,10 +157,7 @@ def build_cmd(ctx, path): :type path: str """ path = os.path.abspath(path) - yaml_file = os.path.join(path, "fedn.yaml") - if not os.path.exists(yaml_file): - logger.error(f"Could not find fedn.yaml in {path}") - exit(-1) + yaml_file = check_yaml_exists(path) config = _read_yaml_file(yaml_file) # Check that build is defined in fedn.yaml under entry_points @@ -163,11 +168,8 @@ def build_cmd(ctx, path): dispatcher = Dispatcher(config, path) _ = dispatcher._get_or_create_python_env() dispatcher.run_cmd("build") - - # delete the virtualenv - if dispatcher.python_env_path: - logger.info(f"Removing virtualenv {dispatcher.python_env_path}") - shutil.rmtree(dispatcher.python_env_path) + if not keep_venv: + delete_virtual_environment(dispatcher) @run_cmd.command("client") @@ -182,7 +184,7 @@ def build_cmd(ctx, path): @click.option("-s", "--secure", required=False, default=False) @click.option("-pc", "--preshared-cert", required=False, default=False) @click.option("-v", "--verify", is_flag=True, help="Verify SSL/TLS for REST service") -@click.option("-c", "--preferred-combiner", required=False,type=str, default="",help="url to the combiner or name of the preferred combiner") +@click.option("-c", "--preferred-combiner", required=False, type=str, default="", help="url to the combiner or name of the preferred combiner") @click.option("-va", "--validator", required=False, default=True) @click.option("-tr", "--trainer", required=False, default=True) @click.option("-in", "--init", required=False, default=None, help="Set to a filename to (re)init client from file state.") @@ -319,5 +321,7 @@ def combiner_cmd(ctx, discoverhost, discoverport, token, name, host, port, fqdn, click.echo(f"\nCombiner configuration loaded from file: {init}") click.echo("Values set in file override defaults and command line arguments...\n") + from fedn.network.combiner.combiner import Combiner + combiner = Combiner(config) combiner.run() diff --git a/fedn/cli/tests/tests.py b/fedn/cli/tests/tests.py index dffe90a2c..7cf6878c4 100644 --- a/fedn/cli/tests/tests.py +++ b/fedn/cli/tests/tests.py @@ -1,9 +1,17 @@ import unittest - +import sys +import os +import fedn +from unittest.mock import patch from click.testing import CliRunner from run_cmd import check_helper_config_file +from run_cmd import run_cmd,check_yaml_exists,logger +import click +from main import main +from fedn.network.api.server import start_server_api +from controller_cmd import main, controller_cmd - +MOCK_VERSION = "0.11.1" class TestReducerCLI(unittest.TestCase): def setUp(self): @@ -56,6 +64,221 @@ def test_get_statestore_config_from_file(self): # self.assertEqual(result.output, "--remote was set to False, but no helper was found in --init settings file: settings.yaml\n") # self.assertEqual(result.exit_code, -1) + #testcase for --version in fedn + @patch('main.get_version') + def test_version_output(self, mock_get_version): + # Mock the get_version function to return a predefined version + mock_get_version.return_value = MOCK_VERSION + + runner = CliRunner() + result = runner.invoke(main, ['--version']) + + # Check that the command exits with a status code of 0 + self.assertEqual(result.exit_code, 0) + + # Check that the output contains the mocked version string + expected_output = f"main, version {MOCK_VERSION}\n" + self.assertEqual(result.output, expected_output) + + #train command unit test cases + #To test check yaml function + @patch('run_cmd.os.path.exists') + @patch('run_cmd.click.echo') + def test_yaml_exists(self, mock_click_echo, mock_exists): + path = '/fake/path' + mock_exists.return_value = True + + # Call the function + result = check_yaml_exists(path) + # Assertions + mock_exists.assert_called_once_with(os.path.join(path, 'fedn.yaml')) + self.assertEqual(result, os.path.join(path, 'fedn.yaml')) + mock_click_echo.assert_not_called() + #test missing fedn yaml file + @patch('run_cmd.os.path.exists') + def test_missing_fedn_yaml(self, mock_exists): + mock_exists.return_value = False + result = self.runner.invoke(run_cmd, [ + 'train', + '--path', 'fedn/examples/mnist-pytorch/client', + '--input', 'client.npz', + '--output', 'client' + ]) + self.assertEqual(result.exit_code, -1) + self.assertIn("", result.output) + + #train cmd missing in fedn yaml file + @patch('run_cmd._read_yaml_file') + @patch('run_cmd.logger') + @patch('run_cmd.exit') + @patch('run_cmd.check_yaml_exists') + def test_train_not_defined(self, mock_check_yaml_exists, mock_exit, mock_logger, mock_read_yaml_file): + # Setup the mock to simulate fedn.yaml content without "train" entry point + mock_read_yaml_file.return_value = { + "entry_points": { + "vaidate": "some_train_command" + } + } + mock_check_yaml_exists.return_value = '/fake/path/fedn.yaml' + result = self.runner.invoke(run_cmd, [ + 'train', + '--path', '/fake/path', + '--input', 'input', + '--output', 'output', + '--remove-venv', 'True' + ]) + mock_logger.error.assert_called_once_with("No train command defined in fedn.yaml") + #print("hereeeeee",mock_logger.error.call_count) + log_messages = [call[0][0] for call in mock_logger.error.call_args_list] + #print("Captured log messages:", log_messages) + mock_exit.assert_called_once_with(-1) + + #to test with venv flag as false + @patch('run_cmd.os.path.exists') + @patch('run_cmd.logger') + @patch('run_cmd.Dispatcher') + def test_train_cmd_with_venv_false(self, MockDispatcher,mock_exists,mock_logger): + mock_exists.return_value = True + mock_dispatcher = MockDispatcher.return_value + mock_dispatcher.run_cmd.return_value = None + result = self.runner.invoke(run_cmd, [ + 'train', + '--path', '../../.././fedn/examples/mnist-pytorch/client', + '--input', 'client.npz', + '--output', 'client', + '--remove-venv', 'False' + ]) + + self.assertEqual(result.exit_code, 0) + mock_dispatcher.run_cmd.assert_called_once_with("train client.npz client") + #print(mock_dispatcher.run_cmd.call_count) + +#Validate cmd test cases + @patch('run_cmd._read_yaml_file') + @patch('run_cmd.logger') + @patch('run_cmd.exit') + @patch('run_cmd.check_yaml_exists') + def test_validate_not_defined(self, mock_check_yaml_exists, mock_exit, mock_logger, mock_read_yaml_file): + mock_read_yaml_file.return_value = { + "entry_points": { + "train": "some_train_command" + } + } + mock_check_yaml_exists.return_value = '/fake/path/fedn.yaml' + result = self.runner.invoke(run_cmd, [ + 'validate', + '--path', '/fake/path', + '--input', 'input', + '--output', 'output', + '--remove-venv', 'True' + ]) + + + # Verify that the error was logged + mock_logger.error.assert_called_once_with("No validate command defined in fedn.yaml") + #log_messages = [call[0][0] for call in mock_logger.error.call_args_list] + #print("Captured log messages:", log_messages) + mock_exit.assert_called_once_with(-1) + + #test missing fedn yaml file + @patch('run_cmd.os.path.exists') + def test_missing_fedn_yaml(self, mock_exists): + mock_exists.return_value = False + result = self.runner.invoke(run_cmd, [ + 'vaidate', + '--path', 'fedn/examples/mnist-pytorch/client', + '--input', 'client.npz', + '--output', 'client' + ]) + self.assertEqual(result.exit_code, -1) + self.assertIn("", result.output) + + #Test validate cmd with venv false + @patch('run_cmd.os.path.exists') + @patch('run_cmd.logger') + @patch('run_cmd.Dispatcher') + def test_validate_cmd_with_venv_false(self, MockDispatcher,mock_exists,mock_logger): + mock_exists.return_value = True + mock_dispatcher = MockDispatcher.return_value + mock_dispatcher.run_cmd.return_value = None + result = self.runner.invoke(run_cmd, [ + 'validate', + '--path', '../../.././fedn/examples/mnist-pytorch/client', + '--input', 'client.npz', + '--output', 'client', + '--remove-venv', 'False' + ]) + + self.assertEqual(result.exit_code, 0) + mock_dispatcher.run_cmd.assert_called_once_with("validate client.npz client") + #print(mock_dispatcher.run_cmd.call_count) + +#build cmd test cases + @patch('run_cmd._read_yaml_file') + @patch('run_cmd.logger') + @patch('run_cmd.exit') + @patch('run_cmd.check_yaml_exists') + def test_startup_not_defined(self, mock_check_yaml_exists, mock_exit, mock_logger, mock_read_yaml_file): + mock_read_yaml_file.return_value = { + "entry_points": { + "train": "some_train_command" + } + } + mock_check_yaml_exists.return_value = '/fake/path/fedn.yaml' + runner = CliRunner() + result = runner.invoke(run_cmd, [ + 'startup', + '--path', '/fake/path', + '--remove-venv', 'True' + ]) + + + # Verify that the error was logged + mock_logger.error.assert_called_once_with("No startup command defined in fedn.yaml") + log_messages = [call[0][0] for call in mock_logger.error.call_args_list] + #print("Captured log messages:", log_messages) + mock_exit.assert_called_once_with(-1) + + #test missing fedn yaml file + @patch('run_cmd.os.path.exists') + def test_missing_fedn_yaml(self, mock_exists): + mock_exists.return_value = False + result = self.runner.invoke(run_cmd, [ + 'startup', + '--path', 'fedn/examples/mnist-pytorch/client' + ]) + self.assertEqual(result.exit_code, -1) + self.assertIn("", result.output) + + @patch('run_cmd.os.path.exists') + @patch('run_cmd.logger') + @patch('run_cmd.Dispatcher') + def test_startup_cmd_with_venv_false(self, MockDispatcher,mock_exists,mock_logger): + mock_exists.return_value = True + mock_dispatcher = MockDispatcher.return_value + mock_dispatcher.run_cmd.return_value = None + result = self.runner.invoke(run_cmd, [ + 'startup', + '--path', '../../.././fedn/examples/mnist-pytorch/client', + '--remove-venv', 'False' + ]) + + self.assertEqual(result.exit_code, 0) + mock_dispatcher.run_cmd.assert_called_once_with("startup") + #print(mock_dispatcher.run_cmd.call_count) + + #to test controller start + @patch('fedn.network.api.server.start_server_api') + def test_controller_start(self, mock_start_server_api): + runner = CliRunner() + result = runner.invoke(main, ['controller', 'start']) + + # Check that the command exits with a status code of 0 + self.assertEqual(result.exit_code, 0) + + # Check that the start_server_api function was called + mock_start_server_api.assert_called_once() + #print("hereeee",mock_start_server_api.call_count) def test_check_helper_config_file(self): self.assertEqual(check_helper_config_file( diff --git a/fedn/common/certificate/certificate.py b/fedn/common/certificate/certificate.py index 3cb09016c..547175a20 100644 --- a/fedn/common/certificate/certificate.py +++ b/fedn/common/certificate/certificate.py @@ -9,24 +9,27 @@ class Certificate: - """Utility to generate unsigned certificates. - - """ + """Utility to generate unsigned certificates.""" CERT_NAME = "cert.pem" KEY_NAME = "key.pem" BITS = 2048 - def __init__(self, cwd, name=None, key_name="key.pem", cert_name="cert.pem", create_dirs=True): - try: - os.makedirs(cwd) - except OSError: - logger.info("Directory exists, will store all cert and keys here.") + def __init__(self, name=None, key_path="", cert_path="", create_dirs=False): + if create_dirs: + try: + cwd = os.getcwd() + os.makedirs(cwd) + except OSError: + logger.info("Directory exists, will store all cert and keys here.") + else: + logger.info("Successfully created the directory to store cert and keys in {}".format(cwd)) + + self.key_path = os.path.join(cwd, "key.pem") + self.cert_path = os.path.join(cwd, "cert.pem") else: - logger.info("Successfully created the directory to store cert and keys in {}".format(cwd)) - - self.key_path = os.path.join(cwd, key_name) - self.cert_path = os.path.join(cwd, cert_name) + self.key_path = key_path + self.cert_path = cert_path if name: self.name = name @@ -36,9 +39,7 @@ def __init__(self, cwd, name=None, key_name="key.pem", cert_name="cert.pem", cre def gen_keypair( self, ): - """Generate keypair. - - """ + """Generate keypair.""" key = crypto.PKey() key.generate_key(crypto.TYPE_RSA, 2048) cert = crypto.X509() @@ -73,8 +74,7 @@ def set_keypair_raw(self, certificate, privatekey): certfile.write(crypto.dump_certificate(crypto.FILETYPE_PEM, certificate)) def get_keypair_raw(self): - """:return: - """ + """:return:""" with open(self.key_path, "rb") as keyfile: key_buf = keyfile.read() with open(self.cert_path, "rb") as certfile: @@ -82,16 +82,14 @@ def get_keypair_raw(self): return copy.deepcopy(cert_buf), copy.deepcopy(key_buf) def get_key(self): - """:return: - """ + """:return:""" with open(self.key_path, "rb") as keyfile: key_buf = keyfile.read() key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_buf) return key def get_cert(self): - """:return: - """ + """:return:""" with open(self.cert_path, "rb") as certfile: cert_buf = certfile.read() cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_buf) diff --git a/fedn/common/config.py b/fedn/common/config.py index 23d873ff7..517e57d94 100644 --- a/fedn/common/config.py +++ b/fedn/common/config.py @@ -15,6 +15,8 @@ FEDN_AUTH_REFRESH_TOKEN = os.environ.get("FEDN_AUTH_REFRESH_TOKEN", False) FEDN_CUSTOM_URL_PREFIX = os.environ.get("FEDN_CUSTOM_URL_PREFIX", "") + +FEDN_ALLOW_LOCAL_PACKAGE = os.environ.get("FEDN_ALLOW_LOCAL_PACKAGE", False) FEDN_PACKAGE_EXTRACT_DIR = os.environ.get("FEDN_PACKAGE_EXTRACT_DIR", "package") diff --git a/fedn/network/api/gunicorn_app.py b/fedn/network/api/gunicorn_app.py new file mode 100644 index 000000000..8c5cb0c30 --- /dev/null +++ b/fedn/network/api/gunicorn_app.py @@ -0,0 +1,23 @@ +from gunicorn.app.base import BaseApplication +class GunicornApp(BaseApplication): + def __init__(self, app, options=None): + self.options = options or {} + self.application = app + super().__init__() + + def load_config(self): + config = {key: value for key, value in self.options.items() + if key in self.cfg.settings and value is not None} + for key, value in config.items(): + self.cfg.set(key.lower(), value) + + def load(self): + return self.application + +def run_gunicorn(app, host,port,workers=4): + bind_address = f"{host}:{port}" + options = { + "bind": bind_address, # Specify the bind address and port here + "workers": workers, + } + GunicornApp(app, options).run() diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index 9936e0bc0..3ffaadecf 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -1,5 +1,4 @@ -import base64 -import copy +import os import threading import uuid from io import BytesIO @@ -8,9 +7,9 @@ from werkzeug.security import safe_join from werkzeug.utils import secure_filename -from fedn.common.config import get_controller_config, get_network_config +from fedn.common.config import FEDN_ALLOW_LOCAL_PACKAGE, get_controller_config, get_network_config from fedn.common.log_config import logger -from fedn.network.combiner.interfaces import CombinerInterface, CombinerUnavailableError +from fedn.network.combiner.interfaces import CombinerUnavailableError from fedn.network.state import ReducerState, ReducerStateToString from fedn.utils.checksum import sha @@ -231,7 +230,7 @@ def set_compute_package(self, file, helper_type: str, name: str = None, descript file_name = file.filename storage_file_name = secure_filename(f"{str(uuid.uuid4())}.{extension}") - file_path = safe_join("/app/client/package/", storage_file_name) + file_path = safe_join(os.getcwd(), storage_file_name) file.save(file_path) self.control.set_compute_package(storage_file_name, file_path) @@ -247,7 +246,8 @@ def set_compute_package(self, file, helper_type: str, name: str = None, descript ), 400, ) - + # Delete the file after it has been saved + os.remove(file_path) return jsonify({"success": True, "message": "Compute package set."}) def _get_compute_package_name(self): @@ -371,19 +371,21 @@ def download_compute_package(self, name): mutex = threading.Lock() mutex.acquire() # TODO: make configurable, perhaps in config.py or package.py - return send_from_directory("/app/client/package/", name, as_attachment=True) + return send_from_directory(os.getcwd(), name, as_attachment=True) except Exception: try: data = self.control.get_compute_package(name) # TODO: make configurable, perhaps in config.py or package.py - file_path = safe_join("/app/client/package/", name) + file_path = safe_join(os.getcwd(), name) with open(file_path, "wb") as fh: fh.write(data) # TODO: make configurable, perhaps in config.py or package.py - return send_from_directory("/app/client/package/", name, as_attachment=True) + return send_from_directory(os.getcwd(), name, as_attachment=True) except Exception: raise finally: + # Delete the file after it has been saved + os.remove(file_path) mutex.release() def _create_checksum(self, name=None): @@ -398,7 +400,7 @@ def _create_checksum(self, name=None): name, message = self._get_compute_package_name() if name is None: return False, message, "" - file_path = safe_join("/app/client/package/", name) # TODO: make configurable, perhaps in config.py or package.py + file_path = safe_join(os.getcwd(), name) # TODO: make configurable, perhaps in config.py or package.py try: sum = str(sha(file_path)) except FileNotFoundError: @@ -502,58 +504,15 @@ def add_combiner(self, combiner_id, secure_grpc, address, remote_addr, fqdn, por :return: Config of the combiner as a json response. :rtype: :class:`flask.Response` """ - # TODO: Any more required check for config? Formerly based on status: "retry" - if not self.control.idle(): - return jsonify( - { - "success": False, - "status": "retry", - "message": "Conroller is not in idle state, try again later. ", - } - ) - # Check if combiner already exists - combiner = self.control.network.get_combiner(combiner_id) - if not combiner: - if secure_grpc == "True": - certificate, key = self.certificate_manager.get_or_create(address).get_keypair_raw() - _ = base64.b64encode(certificate) - _ = base64.b64encode(key) - - else: - certificate = None - key = None - - combiner_interface = CombinerInterface( - parent=self._to_dict(), - name=combiner_id, - address=address, - fqdn=fqdn, - port=port, - certificate=copy.deepcopy(certificate), - key=copy.deepcopy(key), - ip=remote_addr, - ) - - self.control.network.add_combiner(combiner_interface) - - # Check combiner now exists - combiner = self.control.network.get_combiner(combiner_id) - if not combiner: - return jsonify({"success": False, "message": "Combiner not added."}) - payload = { - "success": True, - "message": "Combiner added successfully.", - "status": "added", - "storage": self.statestore.get_storage_backend(), - "statestore": self.statestore.get_config(), - "certificate": combiner.get_certificate(), - "key": combiner.get_key(), + "success": False, + "message": "Adding combiner via REST API is obsolete. Include statestore and object store config in combiner config.", + "status": "abort", } return jsonify(payload) - def add_client(self, client_id, preferred_combiner, remote_addr, name): + def add_client(self, client_id, preferred_combiner, remote_addr, name, package): """Add a client to the network. :param client_id: The client id to add. @@ -563,19 +522,37 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name): :return: A json response with combiner assignment config. :rtype: :class:`flask.Response` """ - # Check if package has been set - package_object = self.statestore.get_compute_package() - if package_object is None: + local_package = FEDN_ALLOW_LOCAL_PACKAGE + if local_package: + local_package = True + + if package == "remote": + package_object = self.statestore.get_compute_package() + if package_object is None: + return ( + jsonify( + { + "success": False, + "status": "retry", + "message": "No compute package found. Set package in controller.", + } + ), + 203, + ) + helper_type = self.control.statestore.get_helper() + elif package == "local" and local_package is False: + print("Local package not allowed. Set FEDN_ALLOW_LOCAL_PACKAGE=True in controller config.") return ( jsonify( { "success": False, - "status": "retry", - "message": "No compute package found. Set package in controller.", + "message": "Local package not allowed. Set FEDN_ALLOW_LOCAL_PACKAGE=True in controller config.", } ), - 203, + 400, ) + elif package == "local" and local_package is True: + helper_type = "" # Assign client to combiner if preferred_combiner: @@ -609,22 +586,14 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name): # Add client to network self.control.network.add_client(client_config) - # Setup response containing information about the combiner for assinging the client - if combiner.certificate: - cert_b64 = base64.b64encode(combiner.certificate) - cert = str(cert_b64).split("'")[1] - else: - cert = None - payload = { "status": "assigned", "host": combiner.address, "fqdn": combiner.fqdn, - "package": "remote", # TODO: Make this configurable + "package": package, "ip": combiner.ip, "port": combiner.port, - "certificate": cert, - "helper_type": self.control.statestore.get_helper(), + "helper_type": helper_type, } return jsonify(payload) diff --git a/fedn/network/api/network.py b/fedn/network/api/network.py index 5e2f2ef91..542761f49 100644 --- a/fedn/network/api/network.py +++ b/fedn/network/api/network.py @@ -1,4 +1,4 @@ -import base64 +import os from fedn.common.log_config import logger from fedn.network.combiner.interfaces import CombinerInterface @@ -47,14 +47,19 @@ def get_combiners(self): data = self.statestore.get_combiners() combiners = [] for c in data["result"]: - if c["certificate"]: - cert = base64.b64decode(c["certificate"]) - key = base64.b64decode(c["key"]) + name = c["name"].upper() + # General certificate handling, same for all combiners. + if os.environ.get("FEDN_GRPC_CERT_PATH"): + with open(os.environ.get("FEDN_GRPC_CERT_PATH"), "rb") as f: + cert = f.read() + # Specific certificate handling for each combiner. + elif os.environ.get(f"FEDN_GRPC_CERT_PATH_{name}"): + cert_path = os.environ.get(f"FEDN_GRPC_CERT_PATH_{name}") + with open(cert_path, "rb") as f: + cert = f.read() else: cert = None - key = None - - combiners.append(CombinerInterface(c["parent"], c["name"], c["address"], c["fqdn"], c["port"], certificate=cert, key=key, ip=c["ip"])) + combiners.append(CombinerInterface(c["parent"], c["name"], c["address"], c["fqdn"], c["port"], certificate=cert, ip=c["ip"])) return combiners diff --git a/fedn/network/api/server.py b/fedn/network/api/server.py index d56c3ab0b..185877577 100644 --- a/fedn/network/api/server.py +++ b/fedn/network/api/server.py @@ -7,6 +7,7 @@ from fedn.network.api.interface import API from fedn.network.api.shared import control, statestore from fedn.network.api.v1 import _routes +from fedn.network.api import gunicorn_app custom_url_prefix = os.environ.get("FEDN_CUSTOM_URL_PREFIX", False) # statestore_config,modelstorage_config,network_id,control=set_statestore_config() @@ -591,9 +592,11 @@ def add_client(): remote_addr = request.remote_addr try: response = api.add_client(**json_data, remote_addr=remote_addr) - except TypeError: + except TypeError as e: + print(e) return jsonify({"success": False, "message": "Invalid data provided"}), 400 - except Exception: + except Exception as e: + print(e) return jsonify({"success": False, "message": "An unexpected error occurred"}), 500 return response @@ -630,10 +633,12 @@ def list_combiners_data(): def start_server_api(): config = get_controller_config() port = config["port"] - debug = config["debug"] host = "0.0.0.0" - app.run(debug=debug, port=port, host=host) - - + debug = config["debug"] + if debug: + app.run(debug=debug, port=port, host=host) + else: + workers=os.cpu_count() + gunicorn_app.run_gunicorn(app, host, port, workers) if __name__ == "__main__": start_server_api() diff --git a/fedn/network/api/v1/client_routes.py b/fedn/network/api/v1/client_routes.py index 8fa13febe..fb268905b 100644 --- a/fedn/network/api/v1/client_routes.py +++ b/fedn/network/api/v1/client_routes.py @@ -1,13 +1,11 @@ from flask import Blueprint, jsonify, request from fedn.network.api.auth import jwt_auth_required -from fedn.network.api.v1.shared import api_version, get_post_data_to_kwargs, get_typed_list_headers, mdb -from fedn.network.storage.statestore.stores.client_store import ClientStore +from fedn.network.api.v1.shared import api_version, client_store, get_post_data_to_kwargs, get_typed_list_headers from fedn.network.storage.statestore.stores.shared import EntityNotFound bp = Blueprint("client", __name__, url_prefix=f"/api/{api_version}/clients") -client_store = ClientStore(mdb, "network.clients") @bp.route("/", methods=["GET"]) @@ -368,3 +366,47 @@ def get_client(id: str): return jsonify({"message": f"Entity with id: {id} not found"}), 404 except Exception: return jsonify({"message": "An unexpected error occurred"}), 500 + +# delete client +@bp.route("/", methods=["DELETE"]) +@jwt_auth_required(role="admin") +def delete_client(id: str): + """Delete client + Deletes a client based on the provided id. + --- + tags: + - Clients + parameters: + - name: id + in: path + required: true + type: string + description: The id of the client + responses: + 200: + description: The client was deleted + 404: + description: The client was not found + schema: + type: object + properties: + message: + type: string + 500: + description: An error occurred + schema: + type: object + properties: + message: + type: string + """ + try: + result: bool = client_store.delete(id) + + msg = "Client deleted" if result else "Client not deleted" + + return jsonify({"message": msg}), 200 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 diff --git a/fedn/network/api/v1/combiner_routes.py b/fedn/network/api/v1/combiner_routes.py index 9210a7e30..02617b7bb 100644 --- a/fedn/network/api/v1/combiner_routes.py +++ b/fedn/network/api/v1/combiner_routes.py @@ -1,7 +1,7 @@ from flask import Blueprint, jsonify, request from fedn.network.api.auth import jwt_auth_required -from fedn.network.api.v1.shared import api_version, get_post_data_to_kwargs, get_typed_list_headers, mdb +from fedn.network.api.v1.shared import api_version, client_store, get_post_data_to_kwargs, get_typed_list_headers, mdb from fedn.network.storage.statestore.stores.combiner_store import CombinerStore from fedn.network.storage.statestore.stores.shared import EntityNotFound @@ -339,3 +339,92 @@ def get_combiner(id: str): return jsonify({"message": f"Entity with id: {id} not found"}), 404 except Exception: return jsonify({"message": "An unexpected error occurred"}), 500 + +@bp.route("/", methods=["DELETE"]) +@jwt_auth_required(role="admin") +def delete_combiner(id: str): + """Delete combiner + Deletes a combiner based on the provided id. + --- + tags: + - Combiners + parameters: + - name: id + in: path + required: true + type: string + description: The id of the combiner + responses: + 200: + description: The combiner was deleted + 404: + description: The combiner was not found + schema: + type: object + properties: + error: + type: string + 500: + description: An error occurred + schema: + type: object + properties: + message: + type: string + """ + try: + result: bool = combiner_store.delete(id) + msg = "Combiner deleted" if result else "Combiner not deleted" + + return jsonify({"message": msg}), 200 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 + + +@bp.route("/clients/count", methods=["POST"]) +@jwt_auth_required(role="admin") +def number_of_clients_connected(): + """Number of clients connected + Retrieves the number of clients connected to the combiner. + --- + tags: + - Combiners + parameters: + - name: combiners + in: body + required: true + type: object + description: Object containing the ids of the combiners + schema: + type: object + properties: + combiners: + type: string + responses: + 200: + description: A list of objects containing the number of clients connected to each combiner + schema: + type: Array + 500: + description: An error occurred + schema: + type: object + properties: + message: + type: string + """ + try: + data = request.get_json() + combiners = data.get("combiners", "") + combiners = combiners.split(",") if combiners else [] + response = client_store.connected_client_count(combiners) + + result = { + "result": response + } + + return jsonify(result), 200 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 diff --git a/fedn/network/api/v1/shared.py b/fedn/network/api/v1/shared.py index a27a6f637..0fda39c45 100644 --- a/fedn/network/api/v1/shared.py +++ b/fedn/network/api/v1/shared.py @@ -3,13 +3,15 @@ import pymongo from pymongo.database import Database -from fedn.network.api.shared import statestore_config,network_id +from fedn.network.api.shared import network_id, statestore_config +from fedn.network.storage.statestore.stores.client_store import ClientStore api_version = "v1" mc = pymongo.MongoClient(**statestore_config["mongo_config"]) mc.server_info() mdb: Database = mc[network_id] +client_store = ClientStore(mdb, "network.clients") def is_positive_integer(s): return s is not None and s.isdigit() and int(s) > 0 diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index e594d7b6d..8508291ff 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -1,4 +1,3 @@ -import base64 import io import json import os @@ -11,7 +10,6 @@ import uuid from datetime import datetime from io import BytesIO -from shutil import copytree import grpc import requests @@ -28,7 +26,6 @@ from fedn.network.clients.package import PackageRuntime from fedn.network.clients.state import ClientState, ClientStateToString from fedn.network.combiner.modelservice import get_tmp_path, upload_request_generator -from fedn.utils.dispatcher import Dispatcher from fedn.utils.helpers.helpers import get_helper CHUNK_SIZE = 1024 * 1024 @@ -198,13 +195,7 @@ def connect(self, combiner_config): port = 443 logger.info(f"Initiating connection to combiner host at: {host}:{port}") - if combiner_config["certificate"]: - logger.info("Utilizing CA certificate for GRPC channel authentication.") - secure = True - cert = base64.b64decode(combiner_config["certificate"]) # .decode('utf-8') - credentials = grpc.ssl_channel_credentials(root_certificates=cert) - channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials) - elif os.getenv("FEDN_GRPC_ROOT_CERT_PATH"): + if os.getenv("FEDN_GRPC_ROOT_CERT_PATH"): secure = True logger.info("Using root certificate from environment variable for GRPC channel.") with open(os.environ["FEDN_GRPC_ROOT_CERT_PATH"], "rb") as f: @@ -236,8 +227,6 @@ def connect(self, combiner_config): logger.info("Successfully established {} connection to {}:{}".format("secure" if secure else "insecure", host, port)) - logger.info("Using {} compute package.".format(combiner_config["package"])) - self._connected = True def disconnect(self): @@ -259,7 +248,11 @@ def _initialize_helper(self, combiner_config): :return: """ if "helper_type" in combiner_config.keys(): - self.helper = get_helper(combiner_config["helper_type"]) + if not combiner_config["helper_type"]: + # Default to numpyhelper + self.helper = get_helper("numpyhelper") + else: + self.helper = get_helper(combiner_config["helper_type"]) def _subscribe_to_combiner(self, config): """Listen to combiner message stream and start all processing threads. @@ -292,9 +285,8 @@ def _initialize_dispatcher(self, config): :type config: dict :return: """ + pr = PackageRuntime(self.run_path) if config["remote_compute_context"]: - pr = PackageRuntime(self.run_path) - retval = None tries = 10 @@ -333,18 +325,8 @@ def _initialize_dispatcher(self, config): logger.error(f"Caught exception: {type(e).__name__}") else: - # TODO: Deprecate - dispatch_config = { - "entry_points": { - "predict": {"command": "python3 predict.py"}, - "train": {"command": "python3 train.py"}, - "validate": {"command": "python3 validate.py"}, - } - } from_path = os.path.join(os.getcwd(), "client") - - copytree(from_path, self.run_path) - self.dispatcher = Dispatcher(dispatch_config, self.run_path) + self.dispatcher = pr.dispatcher(from_path) # Get or create python environment activate_cmd = self.dispatcher._get_or_create_python_env() if activate_cmd: diff --git a/fedn/network/clients/connect.py b/fedn/network/clients/connect.py index 59aaead35..bd7262936 100644 --- a/fedn/network/clients/connect.py +++ b/fedn/network/clients/connect.py @@ -74,7 +74,7 @@ def assign(self): """ try: retval = None - payload = {"name": self.name, "client_id": self.id, "preferred_combiner": self.preferred_combiner} + payload = {"name": self.name, "client_id": self.id, "preferred_combiner": self.preferred_combiner, "package": self.package} retval = requests.post( self.connect_string + FEDN_CUSTOM_URL_PREFIX + "/add_client", json=payload, @@ -110,17 +110,10 @@ def assign(self): if "message" in retval.json(): reason = retval.json()["message"] else: - reason = "Reducer was not ready. Try again later." + reason = "Controller was not ready. Try again later." return Status.TryAgain, reason - reducer_package = retval.json()["package"] - if reducer_package != self.package: - reason = "Unmatched config of compute package between client and reducer.\n" + "Reducer uses {} package and client uses {}.".format( - reducer_package, self.package - ) - return Status.UnMatchedConfig, reason - return Status.Assigned, retval.json() return Status.Unassigned, None diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index 2c59991f9..d336932c5 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -1,24 +1,21 @@ -import base64 import json import queue import re import signal -import sys import threading import time import uuid from datetime import datetime, timedelta from enum import Enum +from typing import TypedDict import fedn.network.grpc.fedn_pb2 as fedn import fedn.network.grpc.fedn_pb2_grpc as rpc +from fedn.common.certificate.certificate import Certificate from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream -from fedn.network.combiner.connect import ConnectorCombiner, Status -from fedn.network.combiner.modelservice import ModelService from fedn.network.combiner.roundhandler import RoundConfig, RoundHandler -from fedn.network.grpc.server import Server -from fedn.network.storage.s3.repository import Repository -from fedn.network.storage.statestore.mongostatestore import MongoStateStore +from fedn.network.combiner.shared import repository, statestore +from fedn.network.grpc.server import Server, ServerConfig VALID_NAME_REGEX = "^[a-zA-Z0-9_-]*$" @@ -50,6 +47,28 @@ def role_to_proto_role(role): return fedn.OTHER +class CombinerConfig(TypedDict): + """Configuration for the combiner.""" + + discover_host: str + discover_port: int + token: str + host: str + port: int + ip: str + parent: str + fqdn: str + name: str + secure: bool + verify: bool + cert_path: str + key_path: str + max_clients: int + network_id: str + logfile: str + verbosity: str + + class Combiner(rpc.CombinerServicer, rpc.ReducerServicer, rpc.ConnectorServicer, rpc.ControlServicer): """Combiner gRPC server. @@ -74,52 +93,21 @@ def __init__(self, config): self.role = Role.COMBINER self.max_clients = config["max_clients"] - # Connector to announce combiner to discover service (reducer) - announce_client = ConnectorCombiner( - host=config["discover_host"], - port=config["discover_port"], - myhost=config["host"], - fqdn=config["fqdn"], - myport=config["port"], - token=config["token"], - name=config["name"], - secure=config["secure"], - verify=config["verify"], - ) - - while True: - # Announce combiner to discover service - status, response = announce_client.announce() - if status == Status.TryAgain: - logger.info(response) - time.sleep(5) - elif status == Status.Assigned: - announce_config = response - logger.info("COMBINER {0}: Announced successfully".format(self.id)) - break - elif status == Status.UnAuthorized: - logger.info(response) - logger.info("Status.UnAuthorized") - sys.exit("Exiting: Unauthorized") - elif status == Status.UnMatchedConfig: - logger.info(response) - logger.info("Status.UnMatchedConfig") - sys.exit("Exiting: Missing config") - - cert = announce_config["certificate"] - key = announce_config["key"] - - if announce_config["certificate"]: - cert = base64.b64decode(announce_config["certificate"]) # .decode('utf-8') - key = base64.b64decode(announce_config["key"]) # .decode('utf-8') - - # Set up gRPC server configuration - grpc_config = {"port": config["port"], "secure": config["secure"], "certificate": cert, "key": key} - # Set up model repository - self.repository = Repository(announce_config["storage"]["storage_config"]) - - self.statestore = MongoStateStore(announce_config["statestore"]["network_id"], announce_config["statestore"]["mongo_config"]) + self.repository = repository + + self.statestore = statestore + + # Add combiner to statestore + interface_config = { + "port": config["port"], + "fqdn": config["fqdn"], + "name": config["name"], + "address": config["host"], + "parent": "localhost", + "ip": "", + } + self.statestore.set_combiner(interface_config) # Fetch all clients previously connected to the combiner # If a client and a combiner goes down at the same time, @@ -132,13 +120,19 @@ def __init__(self, config): except KeyError: self.statestore.set_client({"name": client["name"], "status": "offline"}) - self.modelservice = ModelService() + # Set up gRPC server configuration + if config["secure"]: + cert = Certificate(key_path=config["key_path"], cert_path=config["cert_path"]) + certificate, key = cert.get_keypair_raw() + grpc_server_config = ServerConfig(port=config["port"], secure=True, key=key, certificate=certificate) + else: + grpc_server_config = ServerConfig(port=config["port"], secure=False) # Create gRPC server - self.server = Server(self, self.modelservice, grpc_config) + self.server = Server(self, grpc_server_config) # Set up round controller - self.round_handler = RoundHandler(self.repository, self, self.modelservice) + self.round_handler = RoundHandler(self) # Start thread for round controller threading.Thread(target=self.round_handler.run, daemon=True).start() diff --git a/fedn/network/combiner/interfaces.py b/fedn/network/combiner/interfaces.py index 935b75442..20da29d23 100644 --- a/fedn/network/combiner/interfaces.py +++ b/fedn/network/combiner/interfaces.py @@ -125,13 +125,6 @@ def to_dict(self): "key": None, "config": self.config, } - - if self.certificate: - cert_b64 = base64.b64encode(self.certificate) - key_b64 = base64.b64encode(self.key) - data["certificate"] = str(cert_b64).split("'")[1] - data["key"] = str(key_b64).split("'")[1] - return data def to_json(self): diff --git a/fedn/network/combiner/roundhandler.py b/fedn/network/combiner/roundhandler.py index 54cfd189c..1f0025303 100644 --- a/fedn/network/combiner/roundhandler.py +++ b/fedn/network/combiner/roundhandler.py @@ -9,6 +9,7 @@ from fedn.common.log_config import logger from fedn.network.combiner.aggregators.aggregatorbase import get_aggregator from fedn.network.combiner.modelservice import load_model_from_BytesIO, serialize_model_to_BytesIO +from fedn.network.combiner.shared import modelservice, repository from fedn.utils.helpers.helpers import get_helper from fedn.utils.parameters import Parameters @@ -84,10 +85,10 @@ class RoundHandler: :type modelservice: class: `fedn.network.combiner.modelservice.ModelService` """ - def __init__(self, storage, server, modelservice): + def __init__(self, server): """Initialize the RoundHandler.""" self.round_configs = queue.Queue() - self.storage = storage + self.storage = repository self.server = server self.modelservice = modelservice diff --git a/fedn/network/combiner/shared.py b/fedn/network/combiner/shared.py new file mode 100644 index 000000000..5e5ee114c --- /dev/null +++ b/fedn/network/combiner/shared.py @@ -0,0 +1,13 @@ +from fedn.common.config import get_modelstorage_config, get_network_config, get_statestore_config +from fedn.network.combiner.modelservice import ModelService +from fedn.network.storage.s3.repository import Repository +from fedn.network.storage.statestore.mongostatestore import MongoStateStore + +statestore_config = get_statestore_config() +modelstorage_config = get_modelstorage_config() +network_id = get_network_config() + +statestore = MongoStateStore(network_id, statestore_config["mongo_config"]) +repository = Repository(modelstorage_config["storage_config"]) + +modelservice = ModelService() diff --git a/fedn/network/grpc/server.py b/fedn/network/grpc/server.py index a23691505..edd2fd6d5 100644 --- a/fedn/network/grpc/server.py +++ b/fedn/network/grpc/server.py @@ -1,17 +1,28 @@ from concurrent import futures +from typing import TypedDict import grpc from grpc_health.v1 import health, health_pb2_grpc import fedn.network.grpc.fedn_pb2_grpc as rpc from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream +from fedn.network.combiner.shared import modelservice from fedn.network.grpc.auth import JWTInterceptor +class ServerConfig(TypedDict): + port: int + secure: bool + key: str + certificate: str + logfile: str + verbosity: str + + class Server: """Class for configuring and launching the gRPC server.""" - def __init__(self, servicer, modelservicer, config): + def __init__(self, servicer, config: ServerConfig): set_log_level_from_string(config.get("verbosity", "INFO")) set_log_stream(config.get("logfile", None)) @@ -25,8 +36,8 @@ def __init__(self, servicer, modelservicer, config): rpc.add_ConnectorServicer_to_server(servicer, self.server) if isinstance(servicer, rpc.ReducerServicer): rpc.add_ReducerServicer_to_server(servicer, self.server) - if isinstance(modelservicer, rpc.ModelServiceServicer): - rpc.add_ModelServiceServicer_to_server(modelservicer, self.server) + if isinstance(modelservice, rpc.ModelServiceServicer): + rpc.add_ModelServiceServicer_to_server(modelservice, self.server) if isinstance(servicer, rpc.CombinerServicer): rpc.add_ControlServicer_to_server(servicer, self.server) diff --git a/fedn/network/storage/statestore/mongostatestore.py b/fedn/network/storage/statestore/mongostatestore.py index 7ef22a795..3ef204b5c 100644 --- a/fedn/network/storage/statestore/mongostatestore.py +++ b/fedn/network/storage/statestore/mongostatestore.py @@ -6,7 +6,6 @@ from google.protobuf.json_format import MessageToDict from fedn.common.log_config import logger -from fedn.network.combiner.roundhandler import RoundConfig from fedn.network.state import ReducerStateToString, StringToReducerState @@ -878,7 +877,7 @@ def create_round(self, round_data): # TODO: Add check if round_id already exists self.rounds.insert_one(round_data) - def set_session_config(self, id: str, config: RoundConfig) -> None: + def set_session_config(self, id: str, config) -> None: """Set the session configuration. :param id: The session id @@ -889,7 +888,7 @@ def set_session_config(self, id: str, config: RoundConfig) -> None: self.sessions.update_one({"session_id": str(id)}, {"$push": {"session_config": config}}, True) # Added to accomodate new session config structure - def set_session_config_v2(self, id: str, config: RoundConfig) -> None: + def set_session_config_v2(self, id: str, config) -> None: """Set the session configuration. :param id: The session id @@ -916,7 +915,7 @@ def set_round_combiner_data(self, data): """ self.rounds.update_one({"round_id": str(data["round_id"])}, {"$push": {"combiners": data}}, True) - def set_round_config(self, round_id, round_config: RoundConfig): + def set_round_config(self, round_id, round_config): """Set round configuration. :param round_id: The round unique identifier diff --git a/fedn/network/storage/statestore/stores/client_store.py b/fedn/network/storage/statestore/stores/client_store.py index 6d7d5865e..c3c2e5225 100644 --- a/fedn/network/storage/statestore/stores/client_store.py +++ b/fedn/network/storage/statestore/stores/client_store.py @@ -2,10 +2,13 @@ from typing import Any, Dict, List, Tuple import pymongo +from bson import ObjectId from pymongo.database import Database from fedn.network.storage.statestore.stores.store import Store +from .shared import EntityNotFound + class Client: def __init__(self, id: str, name: str, combiner: str, combiner_preferred: str, ip: str, status: str, updated_at: str, last_seen: datetime): @@ -53,7 +56,14 @@ def add(self, item: Client)-> Tuple[bool, Any]: raise NotImplementedError("Add not implemented for ClientStore") def delete(self, id: str) -> bool: - raise NotImplementedError("Delete not implemented for ClientStore") + kwargs = { "_id": ObjectId(id) } if ObjectId.is_valid(id) else { "client_id": id } + + document = self.database[self.collection].find_one(kwargs) + + if document is None: + raise EntityNotFound(f"Entity with (id | client_id) {id} not found") + + return super().delete(document["_id"]) def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDING, use_typing: bool = False, **kwargs) -> Dict[int, List[Client]]: """List entities @@ -83,3 +93,36 @@ def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDI def count(self, **kwargs) -> int: return super().count(**kwargs) + + def connected_client_count(self, combiners): + """Count the number of connected clients for each combiner. + + :param combiners: list of combiners to get data for. + :type combiners: list + :param sort_key: The key to sort by. + :type sort_key: str + :param sort_order: The sort order. + :type sort_order: pymongo.ASCENDING or pymongo.DESCENDING + :return: list of combiner data. + :rtype: list(ObjectId) + """ + try: + pipeline = ( + [ + {"$match": {"combiner": {"$in": combiners}, "status": "online"}}, + {"$group": {"_id": "$combiner", "count": {"$sum": 1}}}, + {"$project": {"id": "$_id", "count": 1, "_id": 0}} + ] + if len(combiners) > 0 + else [ + {"$match": { "status": "online"}}, + {"$group": {"_id": "$combiner", "count": {"$sum": 1}}}, + {"$project": {"id": "$_id", "count": 1, "_id": 0}} + ] + ) + + result = list(self.database[self.collection].aggregate(pipeline)) + except Exception: + result = {} + + return result diff --git a/fedn/network/storage/statestore/stores/combiner_store.py b/fedn/network/storage/statestore/stores/combiner_store.py index 28f54aa6c..5fceea1b7 100644 --- a/fedn/network/storage/statestore/stores/combiner_store.py +++ b/fedn/network/storage/statestore/stores/combiner_store.py @@ -86,7 +86,17 @@ def add(self, item: Combiner)-> Tuple[bool, Any]: raise NotImplementedError("Add not implemented for CombinerStore") def delete(self, id: str) -> bool: - raise NotImplementedError("Delete not implemented for CombinerStore") + if(ObjectId.is_valid(id)): + kwargs = { "_id": ObjectId(id)} + else: + return False + + document = self.database[self.collection].find_one(kwargs) + + if document is None: + raise EntityNotFound(f"Entity with (id) {id} not found") + + return super().delete(document["_id"]) def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDING, use_typing: bool = False, **kwargs) -> Dict[int, List[Combiner]]: """List entities diff --git a/fedn/network/storage/statestore/stores/store.py b/fedn/network/storage/statestore/stores/store.py index f1175c9f7..eb9d8b1bb 100644 --- a/fedn/network/storage/statestore/stores/store.py +++ b/fedn/network/storage/statestore/stores/store.py @@ -51,7 +51,8 @@ def add(self, item: T) -> Tuple[bool, Any]: return False, str(e) def delete(self, id: str) -> bool: - pass + result = self.database[self.collection].delete_one({"_id": ObjectId(id)}) + return result.deleted_count == 1 def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDING, use_typing: bool = False, **kwargs) -> Dict[int, List[T]]: """List entities diff --git a/pyproject.toml b/pyproject.toml index 2df87137a..3a0be7aee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "fedn" -version = "0.12.1" +version = "0.14.0" description = "Scaleout Federated Learning" authors = [{ name = "Scaleout Systems AB", email = "contact@scaleoutsystems.com" }] readme = "README.rst" @@ -31,6 +31,7 @@ requires-python = '>=3.8,<3.13' dependencies = [ "requests", "urllib3>=1.26.4", + "gunicorn>=20.0.4", "minio", "grpcio~=1.60.0", "grpcio-tools~=1.60.0",