diff --git a/.env.example b/.env.example index 339f12b..d804351 100644 --- a/.env.example +++ b/.env.example @@ -19,12 +19,18 @@ FRAC_GREEDY=1.0 # number of input requests to generate (virtual users will sample from these) SAMPLE_SIZE=1 +# Default requests directory +REQUESTS_DIR=/requests + # requests file REQUESTS_FILENAME=sample_requests.json # results file RESULTS_FILENAME=results.json +# filename for combined results +RESULTS_ALL_FILENAME=results_all.json + # number of virtual users NUM_USERS=1 diff --git a/.github/workflows/unittests.yml b/.github/workflows/unittests.yml new file mode 100644 index 0000000..d897137 --- /dev/null +++ b/.github/workflows/unittests.yml @@ -0,0 +1,27 @@ +name: Pytest + +on: [push, pull_request] + +jobs: + test: + + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Set up Python 3.11 + uses: actions/setup-python@v2 + with: + python-version: 3.11 + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e . + pip install -r requirements.txt + + - name: Run tests + run: | + pytest fmperf/tests/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index dbfe4d7..e429721 100644 --- a/Dockerfile +++ b/Dockerfile @@ -82,5 +82,6 @@ WORKDIR /home/fmperf # Set permissions for openshift RUN chmod -R g+rwx /home/fmperf +ENV REQUESTS_DIR=/requests # Sanity check: We can import the installed wheel RUN python -c "import ${SOURCE_DIR}" diff --git a/README.md b/README.md index e6988f1..82a2f09 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ Firstly, one can generate a set of requests assuming simple uniform distribution ```bash docker run --env-file .env -it --rm -v $(pwd)/requests:/requests fmperf python -m fmperf.loadgen.generate-input ``` -Alternatively, one can generate a set of requests using models that have been trained on requests sent to the internal production deployment of BAM: +Alternatively, one can generate a set of requests using models that have been trained on requests sent to a production deployment: ```bash docker run --env-file .env -it --rm -v $(pwd)/requests:/requests fmperf python -m fmperf.loadgen.generate-input --from-model ``` diff --git a/fmperf/loadgen/generate-input.py b/fmperf/loadgen/generate-input.py index cb0fa8c..a813561 100644 --- a/fmperf/loadgen/generate-input.py +++ b/fmperf/loadgen/generate-input.py @@ -6,13 +6,13 @@ import grpc import pickle from google.protobuf import json_format -from text_generation_tests.pb import generation_pb2_grpc as gpb2, generation_pb2 as pb2 import requests from typing import Iterable, List from importlib import resources as impresources import fmperf.data import traceback from transformers import AutoTokenizer +from fmperf.utils.constants import REQUESTS_DIR # read in seed text seed_text_file = impresources.files(fmperf.data) / "ai.txt" @@ -31,15 +31,34 @@ def get_streaming_response(response: requests.Response): finished = False + prev_completion_tokens = 0 for chunk in response.iter_lines( chunk_size=8192, decode_unicode=False, delimiter=b"\n" ): if chunk and not finished: data = chunk.decode("utf-8").strip().split("data: ")[1] - out = json.loads(data)["choices"][0] + data_parsed = json.loads(data) + out = data_parsed["choices"][0] finished = out["finish_reason"] is not None - if not (out["text"] == ""): # filter empty tokens - yield out + + if ("usage" in data_parsed) and (data_parsed["usage"] is not None): + usage = data_parsed["usage"] + token_count = usage["completion_tokens"] - prev_completion_tokens + prev_completion_tokens = usage["completion_tokens"] + for i in range(token_count): + yield { + "index": out["index"], + "text": "" if (i < token_count - 1) else out["text"], + "logprobs": None, + "finish_reason": ( + None if (i < token_count - 1) else out["finish_reason"] + ), + "stop_reason": ( + None if (i < token_count - 1) else out["stop_reason"] + ), + } + else: + raise RuntimeError("No usage data in server response") def get_text(): @@ -71,7 +90,9 @@ def generate_vllm_request(config, url): "prompt": prompt_ids, "ignore_eos": True, "max_tokens": config["out_tokens"], + "seed": 42, "stream": True, + "stream_options": {"include_usage": True, "continuous_usage_stats": True}, } if not args.from_model: @@ -113,6 +134,11 @@ def generate_tgis_request(config, url): Generate (streaming) gRPC request and expected response """ + from text_generation_tests.pb import ( + generation_pb2_grpc as gpb2, + generation_pb2 as pb2, + ) + channel = grpc.insecure_channel(url) stub = gpb2.GenerationServiceStub(channel) @@ -181,7 +207,7 @@ def generate_tgis_request(config, url): # overwrite overwrite = os.getenv("OVERWRITE", "false").lower() != "false" -if os.path.isfile("/requests/%s" % (filename)) and not overwrite: +if os.path.isfile(os.path.join(REQUESTS_DIR, filename)) and not overwrite: print("File %s already exists; skipping workload generation" % (filename)) sys.exit() @@ -262,5 +288,5 @@ def find_class(self, module, name): if len(cases) > 0: print(">> Writing %d requests to %s" % (len(cases), filename)) - with open("/requests/%s" % (filename), "w") as f: + with open(os.path.join(REQUESTS_DIR, filename), "w") as f: json.dump(cases, f) diff --git a/fmperf/loadgen/generate-text-from-quac.py b/fmperf/loadgen/generate-text-from-quac.py index 2b4eb76..3679b61 100644 --- a/fmperf/loadgen/generate-text-from-quac.py +++ b/fmperf/loadgen/generate-text-from-quac.py @@ -4,6 +4,7 @@ import random import requests from typing import List, Tuple +from fmperf.utils.constants import REQUESTS_DIR class QuACScenario: @@ -167,7 +168,7 @@ def main(): quac = QuACScenario() prompts = quac.get_prompts() - filename = "/requests/sample_texts.json" + filename = os.path.join(REQUESTS_DIR, "sample_texts.json") print(">> Writing to %s" % (filename)) with open(filename, "w") as f: json.dump(prompts, f, ensure_ascii=False) diff --git a/fmperf/loadgen/run.py b/fmperf/loadgen/run.py index 3db3268..426c2d7 100644 --- a/fmperf/loadgen/run.py +++ b/fmperf/loadgen/run.py @@ -6,13 +6,13 @@ import os from durations import Duration import numpy as np -from text_generation_tests.approx import approx +from fmperf.utils.approx import approx import grpc from google.protobuf import json_format -from text_generation_tests.pb import generation_pb2_grpc as gpb2, generation_pb2 as pb2 from fmperf.utils import parse_results from datetime import datetime from .collect_energy import collect_metrics, summarize_energy +from fmperf.utils.constants import REQUESTS_DIR, REQUESTS_FILENAME, RESULTS_ALL_FILENAME def run(): @@ -41,6 +41,7 @@ def get_streaming_response_vllm(response): ) stop = False + prev_completion_tokens = 0 while not stop: try: chunk = next(response_iter) @@ -49,8 +50,21 @@ def get_streaming_response_vllm(response): data = chunk.decode("utf-8").strip().split("data: ")[1] out = json.loads(data)["choices"][0] stop = out["finish_reason"] is not None - if not (out["text"] == ""): # filter empty tokens - yield out, 1, timestamp, True, None + usage = json.loads(data)["usage"] + token_count = usage["completion_tokens"] - prev_completion_tokens + prev_completion_tokens = usage["completion_tokens"] + for i in range(token_count): + yield { + "index": out["index"], + "text": "" if (i < token_count - 1) else out["text"], + "logprobs": None, + "finish_reason": ( + None if (i < token_count - 1) else out["finish_reason"] + ), + "stop_reason": ( + None if (i < token_count - 1) else out["stop_reason"] + ), + }, 1, timestamp, True, None except Exception as e: timestamp = time.time_ns() yield None, 0, timestamp, False, e @@ -58,8 +72,8 @@ def get_streaming_response_vllm(response): # we have stopped yield None, 0, time.time_ns(), False, StopIteration() - infile = "/requests/%s" % (os.environ["REQUESTS_FILENAME"]) - outfile = "/requests/%s" % (os.environ["RESULTS_FILENAME"]) + infile = os.path.join(REQUESTS_DIR, REQUESTS_FILENAME) + outfile = os.path.join(REQUESTS_DIR, RESULTS_FILENAME) target = os.environ["TARGET"] api_url = os.environ["URL"] num_users = int(os.environ["NUM_USERS"]) @@ -74,6 +88,8 @@ def worker(wid, channel): rs = np.random.RandomState(seed=wid) if target == "tgis": + from text_generation_tests.pb import generation_pb2_grpc as gpb2 + stub = gpb2.GenerationServiceStub(channel) t_start = time.time_ns() @@ -97,6 +113,8 @@ def worker(wid, channel): stream=True, ) elif target == "tgis": + from text_generation_tests.pb import generation_pb2 as pb2 + message = json_format.ParseDict( sample_request, pb2.SingleGenerationRequest() ) diff --git a/fmperf/loadgen/sweep.py b/fmperf/loadgen/sweep.py index 938001f..6e82e26 100644 --- a/fmperf/loadgen/sweep.py +++ b/fmperf/loadgen/sweep.py @@ -2,6 +2,7 @@ import json from .run import run from fmperf.utils import parse_results +from fmperf.utils.constants import REQUESTS_DIR, RESULTS_ALL_FILENAME users = [int(u) for u in os.environ["SWEEP_USERS"].split(",")] @@ -13,7 +14,7 @@ run() - filename = "/requests/result_sweep_u%d.json" % (u) + filename = os.path.join(REQUESTS_DIR, "requests/result_sweep_u%d.json" % (u)) with open(filename, "rb") as f: tmp = json.load(f) @@ -21,3 +22,8 @@ results.extend(tmp["results"]) parse_results(results, print_df=True) + +outfile = os.path.point(REQUESTS_DIR, RESULTS_ALL_FILENAME) +print(f">> writing all results to file: {outfile}") +with open(outfile, "w") as f: + json.dump(results, f) diff --git a/fmperf/tests/test_import.py b/fmperf/tests/test_import.py new file mode 100644 index 0000000..516b2ff --- /dev/null +++ b/fmperf/tests/test_import.py @@ -0,0 +1,39 @@ +import unittest +import logging + + +# Configure logging +logging.basicConfig( + filename="test_logs.log", + level=logging.DEBUG, + format="%(asctime)s %(levelname)s:%(message)s", + filemode="w", +) +logging.debug("Logging configured successfully") + + +# Test class to check if the imports are working for the files in the examples folder +class TestImports(unittest.TestCase): + def setUp(self): + # Setup code goes here + logging.info("Running a test case.") + + def tearDown(self): + # Teardown code can go here, if we needed to clean up after tests + pass + + def test_fmperf_import(self): + """Test if fmperf import works correctly.""" + try: + import fmperf + + self.assertIsNotNone(fmperf) + logging.info("test_fmperf_import passed.") + except Exception as e: + logging.error(f"test_fmperf_import failed: {e}") + raise + + +if __name__ == "__main__": + unittest.main() + logging.getLogger().handlers[0].flush() diff --git a/fmperf/utils/Parsing.py b/fmperf/utils/Parsing.py index f808ff0..692cddf 100644 --- a/fmperf/utils/Parsing.py +++ b/fmperf/utils/Parsing.py @@ -60,15 +60,15 @@ def parse_results(results, print_df=False, print_csv=False): df_out["latency_prefill_ms"] = df_prefill.groupby(["exp_num_users"])[ "duration_ms" - ].median() + ].mean() df_out["latency_nexttoken_ms"] = df_nexttoken.groupby(["exp_num_users"])[ "duration_ms" - ].median() + ].mean() df_out["latency_e2e_ms"] = ( df.groupby(["exp_num_users", "worker_idx", "request_idx"])["duration_ms"] .sum() .groupby("exp_num_users") - .median() + .mean() ) with pd.option_context( diff --git a/fmperf/utils/approx.py b/fmperf/utils/approx.py new file mode 100644 index 0000000..7494c95 --- /dev/null +++ b/fmperf/utils/approx.py @@ -0,0 +1,84 @@ +# Copied from https://github.com/IBM/text-generation-inference/blob/main/integration_tests/text_generation_tests/approx.py +from typing import List + +import pytest +from collections.abc import Mapping, Sized +from _pytest.python_api import ApproxMapping, ApproxSequenceLike + +# This is a hacky extension of pytest's approx function, to get it to work with +# arbitrarily nested dict/list type objects. Any contained floats are compared with a tolerance. + + +def approx(expected, rel=5e-4, abs=1e-9, nan_ok=False): + if isinstance(expected, Mapping): + return ApproxNestedMapping(expected, rel, abs, nan_ok) + if is_seq_like(expected): + return ApproxNestedSequenceLike(expected, rel, abs, nan_ok) + return pytest.approx(expected, rel, abs, nan_ok) + + +class ApproxNestedMapping(ApproxMapping): + def _check_type(self): + return + + def __repr__(self) -> str: + return "approx({!r})".format({k: approx(v) for k, v in self.expected.items()}) + + def _yield_comparisons(self, actual): + if set(self.expected.keys()) != set(actual.keys()): + return [(self.expected, actual)] + return _yield_comparisons(self, super(), actual) + + def _repr_compare(self, other_side) -> List[str]: + # TODO impl more useful explanation here + try: + return super()._repr_compare(other_side) + except: + return ["Mismatch"] + + +class ApproxNestedSequenceLike(ApproxSequenceLike): + def _check_type(self): + return + + def __repr__(self) -> str: + seq_type = type(self.expected) + if seq_type not in (tuple, list): + seq_type = list + return "approx({!r})".format(seq_type(approx(x) for x in self.expected)) + + def _yield_comparisons(self, actual): + if len(self.expected) != len(actual): + return [(self.expected, actual)] + return _yield_comparisons(self, super(), actual) + + def _repr_compare(self, other_side) -> List[str]: + # TODO impl more useful explanation here + try: + return super()._repr_compare(other_side) + except: + return ["Mismatch"] + + +def _yield_nested(actual, expected, **kwargs): + if isinstance(expected, Mapping): + return ApproxNestedMapping(expected, **kwargs)._yield_comparisons(actual) + if is_seq_like(expected): + return ApproxNestedSequenceLike(expected, **kwargs)._yield_comparisons(actual) + return [(actual, expected)] + + +def _yield_comparisons(self, supr, actual): + for actual, expected in supr._yield_comparisons(actual): + for a, e in _yield_nested( + actual, expected, rel=self.rel, abs=self.abs, nan_ok=self.nan_ok + ): + yield a, e + + +def is_seq_like(obj): + return ( + hasattr(obj, "__getitem__") + and isinstance(obj, Sized) + and not isinstance(obj, (bytes, str)) + ) diff --git a/fmperf/utils/constants.py b/fmperf/utils/constants.py new file mode 100644 index 0000000..dbc4ed2 --- /dev/null +++ b/fmperf/utils/constants.py @@ -0,0 +1,5 @@ +import os + +REQUESTS_DIR = os.environ.get("REQUESTS_DIR", ".") +REQUESTS_FILENAME = os.environ["REQUESTS_FILENAME"] +RESULTS_ALL_FILENAME = os.environ["RESULTS_ALL_FILENAME"]