diff --git a/.ci/tests/examples/run.sh b/.ci/tests/examples/run.sh index 83ae3cdc7..2063cbbc9 100755 --- a/.ci/tests/examples/run.sh +++ b/.ci/tests/examples/run.sh @@ -12,7 +12,7 @@ helper="$2" >&2 echo "Start FEDn" pushd "examples/$example" -docker-compose \ +docker compose \ -f ../../docker-compose.yaml \ -f docker-compose.override.yaml \ up -d --build @@ -43,7 +43,7 @@ docker-compose \ ".$example/bin/python" ../../.ci/tests/examples/api_test.py get_client_config --output ../../client.yaml # Redeploy clients with config -docker-compose \ +docker compose \ -f ../../docker-compose.yaml \ -f docker-compose.override.yaml \ -f ../../.ci/tests/examples/compose-client-settings.override.yaml \ @@ -56,4 +56,4 @@ docker-compose \ ".$example/bin/python" ../../.ci/tests/examples/api_test.py test_api_get_methods popd ->&2 echo "Test completed successfully" \ No newline at end of file +>&2 echo "Test completed successfully" diff --git a/.github/workflows/code-checks.yaml b/.github/workflows/code-checks.yaml index 3b0f615f6..a39b2ab26 100644 --- a/.github/workflows/code-checks.yaml +++ b/.github/workflows/code-checks.yaml @@ -43,6 +43,7 @@ jobs: --exclude-dir='.mnist-pytorch' --exclude-dir='.mnist-keras' --exclude-dir='docs' + --exclude-dir='flower-client' --exclude='tests.py' '^[ \t]+(import|from) ' -I . diff --git a/.vscode/settings.json b/.vscode/settings.json index d4c2ea8ad..4f993daa0 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,4 +5,5 @@ }, "python.linting.enabled": true, "python.linting.flake8Enabled": true, + "esbonio.sphinx.confDir": "", } \ No newline at end of file diff --git a/examples/flower-client/.dockerignore b/examples/flower-client/.dockerignore new file mode 100644 index 000000000..8ba9024ad --- /dev/null +++ b/examples/flower-client/.dockerignore @@ -0,0 +1,4 @@ +data +seed.npz +*.tgz +*.tar.gz \ No newline at end of file diff --git a/examples/flower-client/.gitignore b/examples/flower-client/.gitignore new file mode 100644 index 000000000..032fceb3f --- /dev/null +++ b/examples/flower-client/.gitignore @@ -0,0 +1,6 @@ +data +*.npz +*.tgz +*.tar.gz +.flower-client +client.yaml \ No newline at end of file diff --git a/examples/flower-client/Dockerfile b/examples/flower-client/Dockerfile new file mode 100644 index 000000000..a8a191403 --- /dev/null +++ b/examples/flower-client/Dockerfile @@ -0,0 +1,10 @@ +FROM ghcr.io/scaleoutsystems/fedn/fedn:0.8.0 + +COPY requirements.txt /app/config/requirements.txt + +# Install requirements +RUN python -m venv /venv \ + && /venv/bin/pip install --upgrade pip \ + && /venv/bin/pip install --no-cache-dir -r /app/config/requirements.txt \ + # Clean up + && rm -r /app/config/requirements.txt \ No newline at end of file diff --git a/examples/flower-client/README.rst b/examples/flower-client/README.rst new file mode 100644 index 000000000..7b9e30754 --- /dev/null +++ b/examples/flower-client/README.rst @@ -0,0 +1,108 @@ +Using Flower ClientApps in FEDn +============================ + +This example demonstrates how to run a Flower 'ClientApp' on FEDn. + +The FEDn compute package 'client/entrypoint' +uses a built-in Flower compatibiltiy adapter for convenient wrapping of the Flower client. +See `flwr_client.py` and `flwr_task.py` for the Flower client code (which is adapted from +https://github.com/adap/flower/tree/main/examples/app-pytorch). + + +Running the example +------------------- + +See `https://fedn.readthedocs.io/en/stable/quickstart.html` for a general introduction to FEDn. +This example follows the same structure as the pytorch quickstart example. + +Build a virtual environment (note that you might need to install the 'venv' package): + +.. code-block:: + + bin/init_venv.sh + +Activate the virtual environment: + +.. code-block:: + + source .flower-client/bin/activate + +Make the compute package (to be uploaded to FEDn): + +.. code-block:: + + tar -czvf package.tgz client + +Create the seed model (to be uploaded to FEDn): +.. code-block:: + + python client/entrypoint init_seed + +Next, you will upload the compute package and seed model to +a FEDn network. Here you have two main options: using FEDn Studio +(recommended for new users), or a pseudo-local deployment +on your own machine. + +If you are using FEDn Studio (recommended): +----------------------------------------------------- + +Follow instructions here to register for Studio and start a project: https://fedn.readthedocs.io/en/stable/studio.html. + +In your Studio project: + +- From the "Sessions" menu, upload the compute package and seed model. +- Register a client and obtain the corresponding 'client.yaml'. + +On your local machine / client (in the same virtual environment), start the FEDn client: + +.. code-block:: + + CLIENT_NUMBER=0 FEDN_AUTH_SCHEME=Bearer fedn run client -in client.yaml --force-ssl --secure=True + + +Or, if you prefer to use Docker, build an image (this might take a long time): + +.. code-block:: + + docker build -t flower-client . + +Then start the client using Docker: + +.. code-block:: + + docker run \ + -v $PWD/client.yaml:/app/client.yaml \ + -e CLIENT_NUMBER=0 \ + -e FEDN_AUTH_SCHEME=Bearer \ + flower-client run client -in client.yaml --secure=True --force-ssl + + +If you are running FEDn in pseudo-local mode: +------------------------------------------------------------------ + +Deploy a FEDn network on local host (see `https://fedn.readthedocs.io/en/stable/quickstart.html`). + +Use the FEDn API Client to initalize FEDn with the compute package and seed model: + +.. code-block:: + + python init_fedn.py + +Create a file 'client.yaml' with the following content: + +.. code-block:: + + network_id: fedn-network + discover_host: api-server + discover_port: 8092 + name: myclient + +Then start the client (using Docker) + +.. code-block:: + + docker run \ + -v $PWD/client.yaml:/app/client.yaml \ + --network=fedn_default \ + -e CLIENT_NUMBER=0 \ + flower-client run client -in client.yaml diff --git a/examples/flower-client/bin/init_venv.sh b/examples/flower-client/bin/init_venv.sh new file mode 100755 index 000000000..5a0c16d15 --- /dev/null +++ b/examples/flower-client/bin/init_venv.sh @@ -0,0 +1,10 @@ +#!/bin/bash +set -e + +# Init venv +python3 -m venv .flower-client + +# Pip deps +.flower-client/bin/pip install --upgrade pip +.flower-client/bin/pip install -e ../../fedn +.flower-client/bin/pip install -r requirements.txt diff --git a/examples/flower-client/client/entrypoint b/examples/flower-client/client/entrypoint new file mode 100755 index 000000000..79dad9eb4 --- /dev/null +++ b/examples/flower-client/client/entrypoint @@ -0,0 +1,108 @@ +#!/usr/bin/env python +import os + +import fire +from flwr_client import app + +from fedn.utils.flowercompat.client_app_adapter import FlwrClientAppAdapter +from fedn.utils.helpers.helpers import get_helper, save_metadata, save_metrics + +HELPER_MODULE = "numpyhelper" +helper = get_helper(HELPER_MODULE) + +flwr_adapter = FlwrClientAppAdapter(app) + + +def _get_node_id(): + """Get client number from environment variable.""" + + number = os.environ.get("CLIENT_NUMBER", "0") + return int(number) + + +def save_parameters(out_path, parameters_np): + """Save model paramters to file. + + :param model: The model to serialize. + :type model: torch.nn.Module + :param out_path: The path to save to. + :type out_path: str + """ + helper.save(parameters_np, out_path) + + +def init_seed(out_path="seed.npz"): + """Initialize seed model and save it to file. + + :param out_path: The path to save the seed model to. + :type out_path: str + """ + # This calls get_parameters in the flower client which needs to be implemented. + parameters_np = flwr_adapter.init_parameters(partition_id=_get_node_id()) + save_parameters(out_path, parameters_np) + + +def train(in_model_path, out_model_path): + """Complete a model update. + + Load model paramters from in_model_path (managed by the FEDn client), + perform a model update through the flower client, and write updated paramters + to out_model_path (picked up by the FEDn client). + + :param in_model_path: The path to the input model. + :type in_model_path: str + :param out_model_path: The path to save the output model to. + :type out_model_path: str + """ + parameters_np = helper.load(in_model_path) + + # Train on flower client + params, num_examples = flwr_adapter.train( + parameters=parameters_np, partition_id=_get_node_id() + ) + + # Metadata needed for aggregation server side + metadata = { + # num_examples are mandatory + "num_examples": num_examples, + } + + # Save JSON metadata file (mandatory) + save_metadata(metadata, out_model_path) + + # Save model update (mandatory) + save_parameters(out_model_path, params) + + +def validate(in_model_path, out_json_path, data_path=None): + """Validate model on the clients test dataset. + + :param in_model_path: The path to the input model. + :type in_model_path: str + :param out_json_path: The path to save the output JSON to. + :type out_json_path: str + :param data_path: The path to the data file. + :type data_path: str + """ + parameters_np = helper.load(in_model_path) + + loss, accuracy = flwr_adapter.evaluate(parameters_np, partition_id=_get_node_id()) + + # JSON schema + report = { + "test_loss": loss, + "test_accuracy": accuracy, + } + print(f"Loss: {loss}, accuracy: {accuracy}") + # Save JSON + save_metrics(report, out_json_path) + + +if __name__ == "__main__": + fire.Fire( + { + "init_seed": init_seed, + "train": train, + "validate": validate, + } + ) diff --git a/examples/flower-client/client/fedn.yaml b/examples/flower-client/client/fedn.yaml new file mode 100644 index 000000000..e5d3b2166 --- /dev/null +++ b/examples/flower-client/client/fedn.yaml @@ -0,0 +1,5 @@ +entry_points: + train: + command: python entrypoint train $ENTRYPOINT_OPTS + validate: + command: python entrypoint validate $ENTRYPOINT_OPTS \ No newline at end of file diff --git a/examples/flower-client/client/flwr_client.py b/examples/flower-client/client/flwr_client.py new file mode 100644 index 000000000..b3920d940 --- /dev/null +++ b/examples/flower-client/client/flwr_client.py @@ -0,0 +1,42 @@ +"""Flower client code using the ClientApp abstraction. +Code adapted from https://github.com/adap/flower/tree/main/examples/app-pytorch. +""" + +from flwr.client import ClientApp, NumPyClient +from flwr_task import (DEVICE, Net, get_weights, load_data, set_weights, test, + train) + + +# Define FlowerClient and client_fn +class FlowerClient(NumPyClient): + def __init__(self, cid) -> None: + super().__init__() + print(f"STARTED CLIENT WITH CID {cid}") + self.net = Net().to(DEVICE) + self.trainloader, self.testloader = load_data( + partition_id=int(cid), num_clients=10 + ) + + def get_parameters(self, config): + return [val.cpu().numpy() for _, val in self.net.state_dict().items()] + + def fit(self, parameters, config): + set_weights(self.net, parameters) + train(self.net, self.trainloader, epochs=3) + return get_weights(self.net), len(self.trainloader.dataset), {} + + def evaluate(self, parameters, config): + set_weights(self.net, parameters) + loss, accuracy = test(self.net, self.testloader) + return loss, len(self.testloader.dataset), {"accuracy": accuracy} + + +def client_fn(cid: str): + """Create and return an instance of Flower `Client`.""" + return FlowerClient(cid).to_client() + + +# Flower ClientApp +app = ClientApp( + client_fn=client_fn, +) diff --git a/examples/flower-client/client/flwr_task.py b/examples/flower-client/client/flwr_task.py new file mode 100644 index 000000000..dea53d9cc --- /dev/null +++ b/examples/flower-client/client/flwr_task.py @@ -0,0 +1,95 @@ +"""Flower client code for helper functions. +Code adapted from https://github.com/adap/flower/tree/main/examples/app-pytorch. +""" + +from collections import OrderedDict + +import torch +import torch.nn as nn +import torch.nn.functional as F +from flwr_datasets import FederatedDataset +from torch.utils.data import DataLoader +from torchvision.transforms import Compose, Normalize, ToTensor +from tqdm import tqdm + +DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + + +class Net(nn.Module): + """Model (simple CNN adapted from 'PyTorch: A 60 Minute Blitz')""" + + def __init__(self) -> None: + super(Net, self).__init__() + self.conv1 = nn.Conv2d(3, 6, 5) + self.pool = nn.MaxPool2d(2, 2) + self.conv2 = nn.Conv2d(6, 16, 5) + self.fc1 = nn.Linear(16 * 5 * 5, 120) + self.fc2 = nn.Linear(120, 84) + self.fc3 = nn.Linear(84, 10) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + x = self.pool(F.relu(self.conv1(x))) + x = self.pool(F.relu(self.conv2(x))) + x = x.view(-1, 16 * 5 * 5) + x = F.relu(self.fc1(x)) + x = F.relu(self.fc2(x)) + return self.fc3(x) + + +def load_data(partition_id, num_clients): + """Load partition CIFAR10 data.""" + fds = FederatedDataset(dataset="cifar10", partitioners={"train": num_clients}) + partition = fds.load_partition(partition_id) + # Divide data on each node: 80% train, 20% test + partition_train_test = partition.train_test_split(test_size=0.2) + pytorch_transforms = Compose( + [ToTensor(), Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))] + ) + + def apply_transforms(batch): + """Apply transforms to the partition from FederatedDataset.""" + batch["img"] = [pytorch_transforms(img) for img in batch["img"]] + return batch + + partition_train_test = partition_train_test.with_transform(apply_transforms) + trainloader = DataLoader(partition_train_test["train"], batch_size=32, shuffle=True) + testloader = DataLoader(partition_train_test["test"], batch_size=32) + return trainloader, testloader + + +def train(net, trainloader, epochs): + """Train the model on the training set.""" + criterion = torch.nn.CrossEntropyLoss() + optimizer = torch.optim.SGD(net.parameters(), lr=0.001, momentum=0.9) + for _ in range(epochs): + for batch in tqdm(trainloader, "Training"): + images = batch["img"] + labels = batch["label"] + optimizer.zero_grad() + criterion(net(images.to(DEVICE)), labels.to(DEVICE)).backward() + optimizer.step() + + +def test(net, testloader): + """Validate the model on the test set.""" + criterion = torch.nn.CrossEntropyLoss() + correct, loss = 0, 0.0 + with torch.no_grad(): + for batch in tqdm(testloader, "Testing"): + images = batch["img"].to(DEVICE) + labels = batch["label"].to(DEVICE) + outputs = net(images) + loss += criterion(outputs, labels).item() + correct += (torch.max(outputs.data, 1)[1] == labels).sum().item() + accuracy = correct / len(testloader.dataset) + return loss, accuracy + + +def get_weights(net): + return [val.cpu().numpy() for _, val in net.state_dict().items()] + + +def set_weights(net, parameters): + params_dict = zip(net.state_dict().keys(), parameters) + state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict}) + net.load_state_dict(state_dict, strict=True) diff --git a/examples/flower-client/init_fedn.py b/examples/flower-client/init_fedn.py new file mode 100644 index 000000000..23078fcd9 --- /dev/null +++ b/examples/flower-client/init_fedn.py @@ -0,0 +1,8 @@ +from fedn import APIClient + +DISCOVER_HOST = '127.0.0.1' +DISCOVER_PORT = 8092 + +client = APIClient(DISCOVER_HOST, DISCOVER_PORT) +client.set_package('package.tgz', 'numpyhelper') +client.set_initial_model('seed.npz') diff --git a/examples/flower-client/requirements.txt b/examples/flower-client/requirements.txt new file mode 100644 index 000000000..a363d216e --- /dev/null +++ b/examples/flower-client/requirements.txt @@ -0,0 +1,5 @@ +fire~=0.6.0 +flwr==1.8.0 +flwr-datasets[vision]==0.0.2 +torch==2.2.1 +torchvision==0.17.1 \ No newline at end of file diff --git a/examples/mnist-pytorch/API_Example.ipynb b/examples/mnist-pytorch/API_Example.ipynb index fabcb8c1a..f4d9e8ae3 100644 --- a/examples/mnist-pytorch/API_Example.ipynb +++ b/examples/mnist-pytorch/API_Example.ipynb @@ -14,7 +14,7 @@ }, { "cell_type": "code", - "execution_count": 50, + "execution_count": 1, "id": "743dfe47", "metadata": {}, "outputs": [], @@ -38,7 +38,7 @@ }, { "cell_type": "code", - "execution_count": 51, + "execution_count": 2, "id": "1061722d", "metadata": {}, "outputs": [], @@ -58,7 +58,7 @@ }, { "cell_type": "code", - "execution_count": 52, + "execution_count": 3, "id": "5107f6f9", "metadata": {}, "outputs": [ @@ -66,7 +66,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "{'committed_at': 'Wed, 27 Mar 2024 22:00:56 GMT', 'id': '66049718d5d8d370bf266899', 'key': 'models', 'model': '8762bb36-ffcc-4591-ad62-2758c6284a5d', 'parent_model': None, 'session_id': None}\n" + "{'committed_at': 'Tue, 02 Apr 2024 07:03:32 GMT', 'id': '660badc47b37095194dbd57a', 'key': 'models', 'model': '26020dc3-d83c-4048-b3ef-4715935dcaa0', 'parent_model': None, 'session_id': None}\n" ] } ], @@ -87,7 +87,7 @@ }, { "cell_type": "code", - "execution_count": 54, + "execution_count": 4, "id": "f0380d35", "metadata": {}, "outputs": [], @@ -119,10 +119,21 @@ }, { "cell_type": "code", - "execution_count": 67, + "execution_count": 5, "id": "4e8044b7", "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/andreash/miniforge3/lib/python3.10/site-packages/numpy/core/fromnumeric.py:3474: RuntimeWarning: Mean of empty slice.\n", + " return _methods._mean(a, axis=axis, dtype=dtype,\n", + "/Users/andreash/miniforge3/lib/python3.10/site-packages/numpy/core/_methods.py:189: RuntimeWarning: invalid value encountered in double_scalars\n", + " ret = ret.dtype.type(ret / rcount)\n" + ] + } + ], "source": [ "session_id = \"experiment1\"\n", "models = client.get_model_trail()\n", @@ -145,23 +156,23 @@ }, { "cell_type": "code", - "execution_count": 68, + "execution_count": 6, "id": "42425c43", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "" + "" ] }, - "execution_count": 68, + "execution_count": 6, "metadata": {}, "output_type": "execute_result" }, { "data": { - "image/png": "\n", + "image/png": "\n", "text/plain": [ "
" ] diff --git a/fedn/fedn/utils/flowercompat/__init__.py b/fedn/fedn/utils/flowercompat/__init__.py new file mode 100644 index 000000000..c8ac5b5db --- /dev/null +++ b/fedn/fedn/utils/flowercompat/__init__.py @@ -0,0 +1 @@ +"""Flower to FEDn compatibility utils.""" diff --git a/fedn/fedn/utils/flowercompat/client_app_adapter.py b/fedn/fedn/utils/flowercompat/client_app_adapter.py new file mode 100644 index 000000000..09ce879d8 --- /dev/null +++ b/fedn/fedn/utils/flowercompat/client_app_adapter.py @@ -0,0 +1,104 @@ +from typing import Tuple + +from flwr.client import ClientApp +from flwr.common import (Context, EvaluateIns, FitIns, GetParametersIns, + Message, MessageType, MessageTypeLegacy, Metadata, + NDArrays, ndarrays_to_parameters, + parameters_to_ndarrays) +from flwr.common.recordset_compat import (evaluateins_to_recordset, + fitins_to_recordset, + getparametersins_to_recordset, + recordset_to_evaluateres, + recordset_to_fitres, + recordset_to_getparametersres) + + +class FlwrClientAppAdapter: + """Flwr ClientApp wrapper.""" + + def __init__(self, app: ClientApp) -> None: + self.app = app + + def init_parameters(self, partition_id: int): + # Construct a get_parameters message for the ClientApp + message, context = self._construct_message( + MessageTypeLegacy.GET_PARAMETERS, [], partition_id + ) + # Call client app with train message + client_return_message = self.app(message, context) + # return NDArrays of clients parameters + parameters = self._parse_get_parameters_message(client_return_message) + if len(parameters) == 0: + raise ValueError("The 'parameters' list is empty. Ensure your flower \ + client has implemented a get_parameters() function.") + return parameters + + def train(self, parameters: NDArrays, partition_id: int): + # Construct a train message for the ClientApp with given parameters + message, context = self._construct_message( + MessageType.TRAIN, parameters, partition_id + ) + # Call client app with train message + client_return_message = self.app(message, context) + # Parse return message + params, num_examples = self._parse_train_message(client_return_message) + return params, num_examples + + def evaluate(self, parameters: NDArrays, partition_id: int): + # Construct an evaluate message for the ClientApp with given parameters + message, context = self._construct_message( + MessageType.EVALUATE, parameters, partition_id + ) + # Call client app with evaluate message + client_return_message = self.app(message, context) + # Parse return message + loss, accuracy = self._parse_evaluate_message(client_return_message) + return loss, accuracy + + def _parse_get_parameters_message(self, message: Message) -> NDArrays: + get_parameters_res = recordset_to_getparametersres(message.content, keep_input=False) + return parameters_to_ndarrays(get_parameters_res.parameters) + + def _parse_train_message(self, message: Message) -> Tuple[NDArrays, int]: + fitres = recordset_to_fitres(message.content, keep_input=False) + params, num_examples = ( + parameters_to_ndarrays(fitres.parameters), + fitres.num_examples, + ) + return params, num_examples + + def _parse_evaluate_message(self, message: Message) -> Tuple[float, float]: + evaluateres = recordset_to_evaluateres(message.content) + return evaluateres.loss, evaluateres.metrics.get("accuracy", -1) + + def _construct_message( + self, + message_type: MessageType, + parameters: NDArrays, + partition_id: int, + ) -> Tuple[Message, Context]: + parameters = ndarrays_to_parameters(parameters) + if message_type == MessageType.TRAIN: + fit_ins: FitIns = FitIns(parameters=parameters, config={}) + recordset = fitins_to_recordset(fitins=fit_ins, keep_input=False) + if message_type == MessageType.EVALUATE: + ev_ins: EvaluateIns = EvaluateIns(parameters=parameters, config={}) + recordset = evaluateins_to_recordset(evaluateins=ev_ins, keep_input=False) + if message_type == MessageTypeLegacy.GET_PARAMETERS: + get_parameters_ins: GetParametersIns = GetParametersIns({}) + recordset = getparametersins_to_recordset(getparameters_ins=get_parameters_ins) + + metadata = Metadata( + run_id=0, + message_id="", + src_node_id=0, + dst_node_id=0, + reply_to_message="", + group_id="", + ttl=0.0, + message_type=message_type, + partition_id=partition_id, + ) + context = Context(recordset) + message = Message(metadata=metadata, content=recordset) + return message, context