diff --git a/BENCHMARKS.txt b/BENCHMARKS.txt index 5284006..0178de6 100644 --- a/BENCHMARKS.txt +++ b/BENCHMARKS.txt @@ -1,18 +1,21 @@ Test 1: -20 clients, 10 request / client, HungryIris (1 sec burning) +20 clients, 10 request / client, HungryIris (1 sec CPU burning) ML_POOL: 2 workers - 116 sec ML_POOL: 4 workers - 74 sec ML_POOL: 6 workers - 70 sec ML_POOL: 11 workers - 50 sec + SYNC: 222 sec + MLPool overhead + ------------------------------------------------------------------------------ How much latency MLPool introduces? (examples/estimate_latency_mlpool_adds.py) -Scoring CPU intensive model 100 times synchronously (1 job at a time) +Scoring CPU intensive model (1 sec scoring time) 100 times synchronously (1 job at a time) Direct scoring: 110 sec Scoring on the pool (1 worker): 103 sec @@ -20,3 +23,14 @@ Scoring on the pool asyncio (1 worker): 109 sec # Just making sure it matches The MLPool logic doesn't seem to introduce any overhead, it even seems to speed things up slightly (interesting) + + +------------------------------------------------------------------------------ +Scoring NOT CPU intensive model (ms scoring) 1000 times synchronously (1 job at a time) + +Direct scoring: 0.33 sec +Scoring on the pool (1 worker): 12 sec +Scoring on the pool asyncio (1 worker): 6 sec (interesting, why?) + +If a model takes ms to score, DO NOT use MLPool. All associated overhead will +only slow things down. diff --git a/README.md b/README.md index d28e767..4194f7b 100644 --- a/README.md +++ b/README.md @@ -29,95 +29,186 @@ it reaches a certain threshold, Cloud Run will spin up another instance to handl ### How to use / Examples +- Instantiating MLPool + In order to use MLPool, for every ML model a user wants to run on the pool, they need to provide a callable, which loads a model into memory and prepares it. Under the hood, MLPool will ensure -every worker in the pool running in a dedicated process loads the model(s) in memory. +every worker in the pool running in a dedicated process loads the model(s) in memory when it starts (done only once). -Consider these functions that load the models. +Consider these functions that load the text classification models as an example: ```python -def load_iris(model_path: str) -> HungryIris: - # TODO: Loading and preparing ML model goes here - return HungryIris(model_path) - - -def load_diabetes_classifier(model_path: str) -> HungryDiabetesClassifier: - # TODO: Loading and preparing ML model goes here - return HungryDiabetesClassifier(model_path) +def load_text_classifier(weights_path: str) -> TextClassifier: + return TextClassifier(weights_path) ``` -When instantiating MLPool, a user needs to provide a dictionary (models_to_load) where the keys are -model names MLPool will serve, and the values are the callables loading the models (functions above). -In addition, the user can specify the number of workers they want, scoring results TTL etc. +When instantiating MLPool, a user needs to pass the parameter models_to_load, which is a dictionary where the keys are +model names MLPool will serve, and the values are the callables loading the models (example function above). + +In addition, the user can specify the number of workers they want, scoring results TTL, the number of jobs +that could be queued on the pool etc. ```python if __name__ == "__main__": with MLPool( models_to_load={ - "iris": partial(load_iris, "../models/iris_xgb.json"), - "diabetes_classifier": partial( - load_diabetes_classifier, "../models/diabetes_xgb.json" - ) + "text_classifier": partial(load_text_classifier, "text_classification.pt"), + "any_other_model": load_any_other_model_callable }, nb_workers=5, ) as pool: ... ``` +! IMPORTANT, when instantiating MLPool do it under `if __name__ == "__main__":` +- Scoring on MLPool -# STOPPED HERE - SCORING +MLPool gives user full control of what gets executed on the pool, so in order to score a model +on the pool, the user needs to provide a callable such as: +```python +def score_text_classifier(model: TextClassifier, text): + return model.classify_text(text) +``` +where the first parameter is the model and then anything else the user wants to pass -When it comes to scoring, MLPool provides the ability to: +To schedule execution of the function above on the pool, the user can call `.create_job()` or asyncio friendly +`.create_job_async()` methods passing the: -- Create a scoring job on the pool -- Get results of the scoring job -- Cancel a scoring job +- `score_model_function` - a function to run on the pool, which accepts a model as the first argument +- `model_name` - model to be passed to the function as the first argument, must be one of the functions provided when instantiating MLPool +- `args`, `kwargs` - any other arguments to pass to the function +```python +job_id = pool.create_job( + score_model_function=score_text_classifier, + model_name="text_classifier", + kwargs={ + "text": "MEMPHIS, Tenn. – Four days ago, Jon Rahm was \ + enduring the season’s worst weather conditions on Sunday at The \ + Open on his way to a closing 75 at Royal Portrush, which \ + considering the wind and the rain was a respectable showing" + } +) +print(pool.get_result(job_id)) +``` + +To execution results back the user can call `get_result` or asyncio friendly `get_result_async` passing +id of the job. +- Cancelling +Once a job was created, it is possible to cancel model scoring if it hasn't run yet using `.cancel_job()`. Convenient, if, +say, your web socket client disconnects +--- -- `create_job` or asyncio friendly `create_job_async` methods to execute model on the pool. The methods -expect a callable which accepts as the first parameter a ML model (passed by MLPool) and *args, **kwargs. +### End to end example: -Consider an example +Say, we have two models we want to serve. ```python +from functools import partial + +from fastapi import FastAPI +import pydantic +import uvicorn + +from ml_pool import MLPool +from ml_pool.logger import get_logger +from examples.models import HungryIris, HungryDiabetesClassifier + + +logger = get_logger("api") + +app = FastAPI() + + +# --------------------- functions a user to provide -------------------------- +def load_iris(model_path: str) -> HungryIris: + # TODO: Loading and preparing ML model goes here + + return HungryIris(model_path) + + def score_iris(model: HungryIris, features): # TODO: Feature engineering etc goes here return model.predict(features) -``` -```python + +def load_diabetes_classifier(model_path: str) -> HungryDiabetesClassifier: + # TODO: Loading and preparing ML model goes here + + return HungryDiabetesClassifier(model_path) + + +def score_diabetes_classifier(model: HungryDiabetesClassifier, features): + # TODO: Feature engineering etc goes here + + return model.predict(features) + + +# ------------------------------- schemas ------------------------------------ +class Request(pydantic.BaseModel): + features: list[float] + + +class Response(pydantic.BaseModel): + prediction: int + + +# ------------------------------- endpoints ---------------------------------- @app.post("/iris") async def serve_iris(request: Request) -> Response: features = request.features - logger.info(f"/iris request, features: {features}") - job_id = await pool.create_job_async( score_model_function=score_iris, model_name="iris", kwargs={"features": features}, ) result = await pool.get_result_async(job_id) + return Response(prediction=result) + +@app.post("/diabetes") +async def serve_diabetes_classifier(request: Request) -> Response: + features = request.features + job_id = await pool.create_job_async( + score_model_function=score_diabetes_classifier, + model_name="diabetes_classifier", + args=(features,), + ) + result = await pool.get_result_async(job_id) return Response(prediction=result) + + +if __name__ == "__main__": + with MLPool( + models_to_load={ + "iris": partial(load_iris, "../models/iris_xgb.json"), + "diabetes_classifier": partial( + load_diabetes_classifier, "../models/diabetes_xgb.json" + ), + }, + nb_workers=5, + ) as pool: + uvicorn.run(app, workers=1) + ``` + + --- ### Gotchas: -- IPC in Python is done via pickling / unpickling objects - transferring large objects such as images -could be expensive. - -- If your model is light and takes very little time to score (tens of milliseconds), then introducing MLPool will -only slow things down. In this case it makes sense to score the model directly in the API process. -If there is feature engineering work associated with pulling/preprocessing features, then -it might make sense, depends. +- MLPool comes with significant overhead which has to do with IPC in Python (pickling/unpicking, etc), +so if your model is light and takes very little time to run (~milliseconds), then introducing MLPool will +only slow things down. In this case it might make sense to score the model directly in the API process. +If there is heavy feature engineering work associated with scoring the model, then it could +make sense, it depends. - ! It is common to spin up multiple instances of the API app inside a container using tools such as gunicorn etc. Be careful when using MLPool in such configuration as you could overload CPU constantly triggering @@ -128,6 +219,8 @@ within the same container won't help as the bottleneck is CPU hungry model scori --- + + ### Known issues: - If a worker dies, but it was processing something, then the caller will infinitely wait for the result! diff --git a/examples/estimate_latency_mlpool_adds.py b/examples/estimate_latency_mlpool_adds.py index 56c5ab6..cf39995 100644 --- a/examples/estimate_latency_mlpool_adds.py +++ b/examples/estimate_latency_mlpool_adds.py @@ -25,7 +25,7 @@ """ -NB_SCORES = 100 +NB_SCORES = 1000 def load_iris(model_path: str) -> HungryIris: @@ -77,7 +77,7 @@ async def score_on_pool_async(): models_to_load={"iris": partial(load_iris, "./models/iris_xgb.json")}, nb_workers=1, ) as pool: - score_on_pool() asyncio.run(score_on_pool_async()) + score_on_pool() score_directly() diff --git a/examples/models/models.py b/examples/models/models.py index 35ca334..29fafc3 100644 --- a/examples/models/models.py +++ b/examples/models/models.py @@ -1,19 +1,63 @@ -import time +# flake8: noqa +import functools import xgboost import numpy as np import pandas as pd +import torch +from torchtext.datasets import AG_NEWS +from torchtext.data.utils import get_tokenizer +from torchtext.vocab import build_vocab_from_iterator +from examples.models.train_text_classification import TextClassificationModel +from ml_pool.utils import timer +from ml_pool import MLPool -__all__ = ["HungryIris", "HungryDiabetesClassifier"] + +__all__ = ["TextClassifier", "HungryIris", "HungryDiabetesClassifier"] + + +class TextClassifier: + def __init__(self, weights_path: str) -> None: + self._model = TextClassificationModel( + vocab_size=95811, embed_dim=64, num_class=4 + ) + self._model.load_state_dict(torch.load(weights_path)) + self._model.eval() + self._model.to("cpu") # Testing on Mac + + self._classes = {1: "World", 2: "Sports", 3: "Business", 4: "Sci/Tec"} + self._tokenizer = get_tokenizer("basic_english") + + train_iter = AG_NEWS(split="train") + + def yield_tokens(data_iter): + for _, text in data_iter: + yield self._tokenizer(text) + + self._vocab = build_vocab_from_iterator( + yield_tokens(train_iter), specials=[""] + ) + self._vocab.set_default_index(self._vocab[""]) + + print("TextClassifier initialised") + + @timer + def classify_text(self, text: str) -> str: + with torch.no_grad(): + text = torch.tensor(self._preprocess_text(text)) + output = self._model(text, torch.tensor([0])) + return self._classes[output.argmax(1).item() + 1] + + def _preprocess_text(self, text: str): + return self._vocab(self._tokenizer(text)) class HungryModel: @staticmethod def simulate_cpu_load() -> None: - # Adds ~ 1 sec on my Mac summed = 0 - for i in range(20_000_000): + for i in range(5_000_000): summed += 1 @@ -40,13 +84,64 @@ def predict(self, features: list[float]): return self._model.predict(pd.DataFrame([features]))[0] # Batch size 1 +def load_text_classifier(weights_path: str) -> TextClassifier: + return TextClassifier(weights_path) + + +def score_text_classifier(model: TextClassifier, text): + return model.classify_text(text) + + if __name__ == "__main__": # iris = HungryIris("iris_xgb.json") # start = time.perf_counter() # print(iris.predict([6.2, 2.2, 4.5, 1.5])) # print("Took:", time.perf_counter() - start) - diabetes = HungryDiabetesClassifier("diabetes_xgb.json") - start = time.perf_counter() - print(diabetes.predict([3, 187, 70, 22, 200, 36.4, 0.408, 36])) - print("Took:", time.perf_counter() - start) + # diabetes = HungryDiabetesClassifier("diabetes_xgb.json") + # start = time.perf_counter() + # print(diabetes.predict([3, 187, 70, 22, 200, 36.4, 0.408, 36])) + # print("Took:", time.perf_counter() - start) + + # text_classifier = TextClassifier("text_classification.pt") + # for i in range(10): + # print(text_classifier.classify_text( + # text="MEMPHIS, Tenn. – Four days ago, Jon Rahm was \ + # enduring the season’s worst weather conditions on Sunday at The \ + # Open on his way to a closing 75 at Royal Portrush, which \ + # considering the wind and the rain was a respectable showing. \ + # Thursday’s first round at the WGC-FedEx St. Jude Invitational \ + # was another story. With temperatures in the mid-80s and hardly any \ + # wind, the Spaniard was 13 strokes better in a flawless round. \ + # Thanks to his best putting performance on the PGA Tour, Rahm \ + # finished with an 8-under 62 for a three-stroke lead, which \ + # was even more impressive considering he’d never played the \ + # front nine at TPC Southwind." + # )) + + with MLPool( + models_to_load={ + "text_classifier": functools.partial( + load_text_classifier, "text_classification.pt" + ) + }, + nb_workers=3, + ) as pool: + job_id = pool.create_job( + score_model_function=score_text_classifier, + model_name="text_classifier", + kwargs={ + "text": "MEMPHIS, Tenn. – Four days ago, Jon Rahm was \ + enduring the season’s worst weather conditions on Sunday at The \ + Open on his way to a closing 75 at Royal Portrush, which \ + considering the wind and the rain was a respectable showing. \ + Thursday’s first round at the WGC-FedEx St. Jude Invitational \ + was another story. With temperatures in the mid-80s and hardly any \ + wind, the Spaniard was 13 strokes better in a flawless round. \ + Thanks to his best putting performance on the PGA Tour, Rahm \ + finished with an 8-under 62 for a three-stroke lead, which \ + was even more impressive considering he’d never played the \ + front nine at TPC Southwind." + }, + ) + print(pool.get_result(job_id)) diff --git a/examples/models/train_text_classification.py b/examples/models/train_text_classification.py new file mode 100644 index 0000000..470bde5 --- /dev/null +++ b/examples/models/train_text_classification.py @@ -0,0 +1,211 @@ +# flake8: noqa +import time + +import torch +import torchtext +from torchtext.datasets import AG_NEWS +from torchtext.data.utils import get_tokenizer +from torchtext.vocab import build_vocab_from_iterator +from torch.utils.data import DataLoader +from torch import nn +from torch.utils.data.dataset import random_split +from torchtext.data.functional import to_map_style_dataset + + +""" +https://pytorch.org/tutorials/beginner/text_sentiment_ngrams_tutorial.html +""" + + +class TextClassificationModel(nn.Module): + def __init__(self, vocab_size, embed_dim, num_class): + super(TextClassificationModel, self).__init__() + self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=True) + self.fc = nn.Linear(embed_dim, num_class) + self.init_weights() + + def init_weights(self): + initrange = 0.5 + self.embedding.weight.data.uniform_(-initrange, initrange) + self.fc.weight.data.uniform_(-initrange, initrange) + self.fc.bias.data.zero_() + + def forward(self, text, offsets): + embedded = self.embedding(text, offsets) + return self.fc(embedded) + + +def train(dataloader): + model.train() + total_acc, total_count = 0, 0 + log_interval = 500 + start_time = time.time() + + for idx, (label, text, offsets) in enumerate(dataloader): + optimizer.zero_grad() + predicted_label = model(text, offsets) + loss = criterion(predicted_label, label) + loss.backward() + torch.nn.utils.clip_grad_norm_(model.parameters(), 0.1) + optimizer.step() + total_acc += (predicted_label.argmax(1) == label).sum().item() + total_count += label.size(0) + if idx % log_interval == 0 and idx > 0: + elapsed = time.time() - start_time + print( + "| epoch {:3d} | {:5d}/{:5d} batches " + "| accuracy {:8.3f}".format( + epoch, idx, len(dataloader), total_acc / total_count + ) + ) + total_acc, total_count = 0, 0 + start_time = time.time() + + +def evaluate(dataloader): + model.eval() + total_acc, total_count = 0, 0 + + with torch.no_grad(): + for idx, (label, text, offsets) in enumerate(dataloader): + predicted_label = model(text, offsets) + loss = criterion(predicted_label, label) + total_acc += (predicted_label.argmax(1) == label).sum().item() + total_count += label.size(0) + return total_acc / total_count + + +if __name__ == "__main__": + tokenizer = get_tokenizer("basic_english") + train_iter = AG_NEWS(split="train") + + def yield_tokens(data_iter): + for _, text in data_iter: + yield tokenizer(text) + + vocab = build_vocab_from_iterator( + yield_tokens(train_iter), specials=[""] + ) + vocab.set_default_index(vocab[""]) + + text_pipeline = lambda x: vocab(tokenizer(x)) + label_pipeline = lambda x: int(x) - 1 + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + def collate_batch(batch): + label_list, text_list, offsets = [], [], [0] + for (_label, _text) in batch: + label_list.append(label_pipeline(_label)) + processed_text = torch.tensor( + text_pipeline(_text), dtype=torch.int64 + ) + text_list.append(processed_text) + offsets.append(processed_text.size(0)) + label_list = torch.tensor(label_list, dtype=torch.int64) + offsets = torch.tensor(offsets[:-1]).cumsum(dim=0) + text_list = torch.cat(text_list) + return label_list.to(device), text_list.to(device), offsets.to(device) + + dataloader = DataLoader( + train_iter, batch_size=8, shuffle=False, collate_fn=collate_batch + ) + + num_class = len(set([label for (label, text) in train_iter])) + vocab_size = len(vocab) + emsize = 64 + model = TextClassificationModel(vocab_size, emsize, num_class).to(device) + + print("\n\nNum classes:", num_class) + print("Vocab size:", vocab_size) + import sys + + sys.exit(1) + + # Hyperparameters + EPOCHS = 10 # epoch + LR = 5 # learning rate + BATCH_SIZE = 64 # batch size for training + + criterion = torch.nn.CrossEntropyLoss() + optimizer = torch.optim.SGD(model.parameters(), lr=LR) + scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.1) + total_accu = None + train_iter, test_iter = AG_NEWS() + train_dataset = to_map_style_dataset(train_iter) + test_dataset = to_map_style_dataset(test_iter) + num_train = int(len(train_dataset) * 0.95) + split_train_, split_valid_ = random_split( + train_dataset, [num_train, len(train_dataset) - num_train] + ) + + train_dataloader = DataLoader( + split_train_, + batch_size=BATCH_SIZE, + shuffle=True, + collate_fn=collate_batch, + ) + valid_dataloader = DataLoader( + split_valid_, + batch_size=BATCH_SIZE, + shuffle=True, + collate_fn=collate_batch, + ) + test_dataloader = DataLoader( + test_dataset, + batch_size=BATCH_SIZE, + shuffle=True, + collate_fn=collate_batch, + ) + + for epoch in range(1, EPOCHS + 1): + epoch_start_time = time.time() + train(train_dataloader) + accu_val = evaluate(valid_dataloader) + if total_accu is not None and total_accu > accu_val: + scheduler.step() + else: + total_accu = accu_val + print("-" * 59) + print( + "| end of epoch {:3d} | time: {:5.2f}s | " + "valid accuracy {:8.3f} ".format( + epoch, time.time() - epoch_start_time, accu_val + ) + ) + print("-" * 59) + + print("Checking the results of test dataset.") + accu_test = evaluate(test_dataloader) + print("test accuracy {:8.3f}".format(accu_test)) + + ag_news_label = {1: "World", 2: "Sports", 3: "Business", 4: "Sci/Tec"} + + def predict(text, text_pipeline): + with torch.no_grad(): + text = torch.tensor(text_pipeline(text)) + output = model(text, torch.tensor([0])) + return output.argmax(1).item() + 1 + + ex_text_str = "MEMPHIS, Tenn. – Four days ago, Jon Rahm was \ + enduring the season’s worst weather conditions on Sunday at The \ + Open on his way to a closing 75 at Royal Portrush, which \ + considering the wind and the rain was a respectable showing. \ + Thursday’s first round at the WGC-FedEx St. Jude Invitational \ + was another story. With temperatures in the mid-80s and hardly any \ + wind, the Spaniard was 13 strokes better in a flawless round. \ + Thanks to his best putting performance on the PGA Tour, Rahm \ + finished with an 8-under 62 for a three-stroke lead, which \ + was even more impressive considering he’d never played the \ + front nine at TPC Southwind." + + model = model.to("cpu") + + print( + "This is a %s news" + % ag_news_label[predict(ex_text_str, text_pipeline)] + ) + + print("Saving the model...") + torch.save(model.state_dict(), "text_classification.pt") + print("Model saved") diff --git a/ml_pool/pool.py b/ml_pool/pool.py index 89278b5..7b00fec 100644 --- a/ml_pool/pool.py +++ b/ml_pool/pool.py @@ -67,7 +67,7 @@ def __init__( self._workers_exit_code = None self._pool_running = True - time.sleep(1.0) # Time to spin up workers, load the models etc + time.sleep(5.0) # Time to spin up workers, load the models etc logger.info(f"MLPool initialised. {nb_workers} workers spun up") # ---------------------------- Public methods ----------------------------