Skip to content

Commit

Permalink
Updated README; Trained text classifier for testing purposes; Ran mor…
Browse files Browse the repository at this point in the history
…e benchmarks
  • Loading branch information
EvgeniiTitov committed Dec 8, 2022
1 parent a8d8a91 commit 8d2fc85
Show file tree
Hide file tree
Showing 6 changed files with 462 additions and 49 deletions.
18 changes: 16 additions & 2 deletions BENCHMARKS.txt
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
Test 1:

20 clients, 10 request / client, HungryIris (1 sec burning)
20 clients, 10 request / client, HungryIris (1 sec CPU burning)

ML_POOL: 2 workers - 116 sec
ML_POOL: 4 workers - 74 sec
ML_POOL: 6 workers - 70 sec
ML_POOL: 11 workers - 50 sec

SYNC: 222 sec


MLPool overhead

------------------------------------------------------------------------------
How much latency MLPool introduces? (examples/estimate_latency_mlpool_adds.py)

Scoring CPU intensive model 100 times synchronously (1 job at a time)
Scoring CPU intensive model (1 sec scoring time) 100 times synchronously (1 job at a time)

Direct scoring: 110 sec
Scoring on the pool (1 worker): 103 sec
Scoring on the pool asyncio (1 worker): 109 sec # Just making sure it matches

The MLPool logic doesn't seem to introduce any overhead, it even seems to speed
things up slightly (interesting)


------------------------------------------------------------------------------
Scoring NOT CPU intensive model (ms scoring) 1000 times synchronously (1 job at a time)

Direct scoring: 0.33 sec
Scoring on the pool (1 worker): 12 sec
Scoring on the pool asyncio (1 worker): 6 sec (interesting, why?)

If a model takes ms to score, DO NOT use MLPool. All associated overhead will
only slow things down.
165 changes: 129 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,95 +29,186 @@ it reaches a certain threshold, Cloud Run will spin up another instance to handl

### How to use / Examples

- Instantiating MLPool

In order to use MLPool, for every ML model a user wants to run on the pool, they need to provide
a callable, which loads a model into memory and prepares it. Under the hood, MLPool will ensure
every worker in the pool running in a dedicated process loads the model(s) in memory.
every worker in the pool running in a dedicated process loads the model(s) in memory when it starts (done only once).

Consider these functions that load the models.
Consider these functions that load the text classification models as an example:

```python
def load_iris(model_path: str) -> HungryIris:
# TODO: Loading and preparing ML model goes here
return HungryIris(model_path)


def load_diabetes_classifier(model_path: str) -> HungryDiabetesClassifier:
# TODO: Loading and preparing ML model goes here
return HungryDiabetesClassifier(model_path)
def load_text_classifier(weights_path: str) -> TextClassifier:
return TextClassifier(weights_path)
```

When instantiating MLPool, a user needs to provide a dictionary (models_to_load) where the keys are
model names MLPool will serve, and the values are the callables loading the models (functions above).
In addition, the user can specify the number of workers they want, scoring results TTL etc.
When instantiating MLPool, a user needs to pass the parameter models_to_load, which is a dictionary where the keys are
model names MLPool will serve, and the values are the callables loading the models (example function above).

In addition, the user can specify the number of workers they want, scoring results TTL, the number of jobs
that could be queued on the pool etc.

```python
if __name__ == "__main__":
with MLPool(
models_to_load={
"iris": partial(load_iris, "../models/iris_xgb.json"),
"diabetes_classifier": partial(
load_diabetes_classifier, "../models/diabetes_xgb.json"
)
"text_classifier": partial(load_text_classifier, "text_classification.pt"),
"any_other_model": load_any_other_model_callable
},
nb_workers=5,
) as pool:
...
```

! IMPORTANT, when instantiating MLPool do it under `if __name__ == "__main__":`

- Scoring on MLPool

# STOPPED HERE - SCORING
MLPool gives user full control of what gets executed on the pool, so in order to score a model
on the pool, the user needs to provide a callable such as:

```python
def score_text_classifier(model: TextClassifier, text):
return model.classify_text(text)
```
where the first parameter is the model and then anything else the user wants to pass

When it comes to scoring, MLPool provides the ability to:
To schedule execution of the function above on the pool, the user can call `.create_job()` or asyncio friendly
`.create_job_async()` methods passing the:

- Create a scoring job on the pool
- Get results of the scoring job
- Cancel a scoring job
- `score_model_function` - a function to run on the pool, which accepts a model as the first argument
- `model_name` - model to be passed to the function as the first argument, must be one of the functions provided when instantiating MLPool
- `args`, `kwargs` - any other arguments to pass to the function

```python
job_id = pool.create_job(
score_model_function=score_text_classifier,
model_name="text_classifier",
kwargs={
"text": "MEMPHIS, Tenn. – Four days ago, Jon Rahm was \
enduring the season’s worst weather conditions on Sunday at The \
Open on his way to a closing 75 at Royal Portrush, which \
considering the wind and the rain was a respectable showing"
}
)
print(pool.get_result(job_id))
```

To execution results back the user can call `get_result` or asyncio friendly `get_result_async` passing
id of the job.


- Cancelling

Once a job was created, it is possible to cancel model scoring if it hasn't run yet using `.cancel_job()`. Convenient, if,
say, your web socket client disconnects

---

- `create_job` or asyncio friendly `create_job_async` methods to execute model on the pool. The methods
expect a callable which accepts as the first parameter a ML model (passed by MLPool) and *args, **kwargs.
### End to end example:

Consider an example
Say, we have two models we want to serve.

```python
from functools import partial

from fastapi import FastAPI
import pydantic
import uvicorn

from ml_pool import MLPool
from ml_pool.logger import get_logger
from examples.models import HungryIris, HungryDiabetesClassifier


logger = get_logger("api")

app = FastAPI()


# --------------------- functions a user to provide --------------------------
def load_iris(model_path: str) -> HungryIris:
# TODO: Loading and preparing ML model goes here

return HungryIris(model_path)


def score_iris(model: HungryIris, features):
# TODO: Feature engineering etc goes here

return model.predict(features)
```

```python

def load_diabetes_classifier(model_path: str) -> HungryDiabetesClassifier:
# TODO: Loading and preparing ML model goes here

return HungryDiabetesClassifier(model_path)


def score_diabetes_classifier(model: HungryDiabetesClassifier, features):
# TODO: Feature engineering etc goes here

return model.predict(features)


# ------------------------------- schemas ------------------------------------
class Request(pydantic.BaseModel):
features: list[float]


class Response(pydantic.BaseModel):
prediction: int


# ------------------------------- endpoints ----------------------------------
@app.post("/iris")
async def serve_iris(request: Request) -> Response:
features = request.features
logger.info(f"/iris request, features: {features}")

job_id = await pool.create_job_async(
score_model_function=score_iris,
model_name="iris",
kwargs={"features": features},
)
result = await pool.get_result_async(job_id)
return Response(prediction=result)


@app.post("/diabetes")
async def serve_diabetes_classifier(request: Request) -> Response:
features = request.features
job_id = await pool.create_job_async(
score_model_function=score_diabetes_classifier,
model_name="diabetes_classifier",
args=(features,),
)
result = await pool.get_result_async(job_id)
return Response(prediction=result)


if __name__ == "__main__":
with MLPool(
models_to_load={
"iris": partial(load_iris, "../models/iris_xgb.json"),
"diabetes_classifier": partial(
load_diabetes_classifier, "../models/diabetes_xgb.json"
),
},
nb_workers=5,
) as pool:
uvicorn.run(app, workers=1)

```


---

### Gotchas:

- IPC in Python is done via pickling / unpickling objects - transferring large objects such as images
could be expensive.

- If your model is light and takes very little time to score (tens of milliseconds), then introducing MLPool will
only slow things down. In this case it makes sense to score the model directly in the API process.
If there is feature engineering work associated with pulling/preprocessing features, then
it might make sense, depends.
- MLPool comes with significant overhead which has to do with IPC in Python (pickling/unpicking, etc),
so if your model is light and takes very little time to run (~milliseconds), then introducing MLPool will
only slow things down. In this case it might make sense to score the model directly in the API process.
If there is heavy feature engineering work associated with scoring the model, then it could
make sense, it depends.

- ! It is common to spin up multiple instances of the API app inside a container using tools such as
gunicorn etc. Be careful when using MLPool in such configuration as you could overload CPU constantly triggering
Expand All @@ -128,6 +219,8 @@ within the same container won't help as the bottleneck is CPU hungry model scori
---




### Known issues:

- If a worker dies, but it was processing something, then the caller will infinitely wait for the result!
Expand Down
4 changes: 2 additions & 2 deletions examples/estimate_latency_mlpool_adds.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"""


NB_SCORES = 100
NB_SCORES = 1000


def load_iris(model_path: str) -> HungryIris:
Expand Down Expand Up @@ -77,7 +77,7 @@ async def score_on_pool_async():
models_to_load={"iris": partial(load_iris, "./models/iris_xgb.json")},
nb_workers=1,
) as pool:
score_on_pool()
asyncio.run(score_on_pool_async())
score_on_pool()

score_directly()
Loading

0 comments on commit 8d2fc85

Please sign in to comment.