Skip to content

Commit

Permalink
Merge branch 'main' into kind-setup-related-updates
Browse files Browse the repository at this point in the history
  • Loading branch information
tdoublep authored Sep 20, 2024
2 parents 73611a0 + 097ac80 commit b0210b6
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 18 deletions.
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@ FRAC_GREEDY=1.0
# number of input requests to generate (virtual users will sample from these)
SAMPLE_SIZE=1

# Default requests directory
REQUESTS_DIR=/requests

# requests file
REQUESTS_FILENAME=sample_requests.json

# results file
RESULTS_FILENAME=results.json

# filename for combined results
RESULTS_ALL_FILENAME=results_all.json

# number of virtual users
NUM_USERS=1

Expand Down
27 changes: 27 additions & 0 deletions .github/workflows/unittests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: Pytest

on: [push, pull_request]

jobs:
test:

runs-on: ubuntu-latest

steps:
- name: Checkout repository
uses: actions/checkout@v2

- name: Set up Python 3.11
uses: actions/setup-python@v2
with:
python-version: 3.11

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e .
pip install -r requirements.txt
- name: Run tests
run: |
pytest fmperf/tests/
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,6 @@ WORKDIR /home/fmperf
# Set permissions for openshift
RUN chmod -R g+rwx /home/fmperf

ENV REQUESTS_DIR=/requests
# Sanity check: We can import the installed wheel
RUN python -c "import ${SOURCE_DIR}"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Firstly, one can generate a set of requests assuming simple uniform distribution
```bash
docker run --env-file .env -it --rm -v $(pwd)/requests:/requests fmperf python -m fmperf.loadgen.generate-input
```
Alternatively, one can generate a set of requests using models that have been trained on requests sent to the internal production deployment of BAM:
Alternatively, one can generate a set of requests using models that have been trained on requests sent to a production deployment:
```bash
docker run --env-file .env -it --rm -v $(pwd)/requests:/requests fmperf python -m fmperf.loadgen.generate-input --from-model
```
Expand Down
38 changes: 32 additions & 6 deletions fmperf/loadgen/generate-input.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
import grpc
import pickle
from google.protobuf import json_format
from text_generation_tests.pb import generation_pb2_grpc as gpb2, generation_pb2 as pb2
import requests
from typing import Iterable, List
from importlib import resources as impresources
import fmperf.data
import traceback
from transformers import AutoTokenizer
from fmperf.utils.constants import REQUESTS_DIR

# read in seed text
seed_text_file = impresources.files(fmperf.data) / "ai.txt"
Expand All @@ -31,15 +31,34 @@

def get_streaming_response(response: requests.Response):
finished = False
prev_completion_tokens = 0
for chunk in response.iter_lines(
chunk_size=8192, decode_unicode=False, delimiter=b"\n"
):
if chunk and not finished:
data = chunk.decode("utf-8").strip().split("data: ")[1]
out = json.loads(data)["choices"][0]
data_parsed = json.loads(data)
out = data_parsed["choices"][0]
finished = out["finish_reason"] is not None
if not (out["text"] == ""): # filter empty tokens
yield out

if ("usage" in data_parsed) and (data_parsed["usage"] is not None):
usage = data_parsed["usage"]
token_count = usage["completion_tokens"] - prev_completion_tokens
prev_completion_tokens = usage["completion_tokens"]
for i in range(token_count):
yield {
"index": out["index"],
"text": "" if (i < token_count - 1) else out["text"],
"logprobs": None,
"finish_reason": (
None if (i < token_count - 1) else out["finish_reason"]
),
"stop_reason": (
None if (i < token_count - 1) else out["stop_reason"]
),
}
else:
raise RuntimeError("No usage data in server response")


def get_text():
Expand Down Expand Up @@ -71,7 +90,9 @@ def generate_vllm_request(config, url):
"prompt": prompt_ids,
"ignore_eos": True,
"max_tokens": config["out_tokens"],
"seed": 42,
"stream": True,
"stream_options": {"include_usage": True, "continuous_usage_stats": True},
}

if not args.from_model:
Expand Down Expand Up @@ -113,6 +134,11 @@ def generate_tgis_request(config, url):
Generate (streaming) gRPC request and expected response
"""

from text_generation_tests.pb import (
generation_pb2_grpc as gpb2,
generation_pb2 as pb2,
)

channel = grpc.insecure_channel(url)
stub = gpb2.GenerationServiceStub(channel)

Expand Down Expand Up @@ -181,7 +207,7 @@ def generate_tgis_request(config, url):
# overwrite
overwrite = os.getenv("OVERWRITE", "false").lower() != "false"

if os.path.isfile("/requests/%s" % (filename)) and not overwrite:
if os.path.isfile(os.path.join(REQUESTS_DIR, filename)) and not overwrite:
print("File %s already exists; skipping workload generation" % (filename))
sys.exit()

Expand Down Expand Up @@ -262,5 +288,5 @@ def find_class(self, module, name):

if len(cases) > 0:
print(">> Writing %d requests to %s" % (len(cases), filename))
with open("/requests/%s" % (filename), "w") as f:
with open(os.path.join(REQUESTS_DIR, filename), "w") as f:
json.dump(cases, f)
3 changes: 2 additions & 1 deletion fmperf/loadgen/generate-text-from-quac.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import random
import requests
from typing import List, Tuple
from fmperf.utils.constants import REQUESTS_DIR


class QuACScenario:
Expand Down Expand Up @@ -167,7 +168,7 @@ def main():
quac = QuACScenario()
prompts = quac.get_prompts()

filename = "/requests/sample_texts.json"
filename = os.path.join(REQUESTS_DIR, "sample_texts.json")
print(">> Writing to %s" % (filename))
with open(filename, "w") as f:
json.dump(prompts, f, ensure_ascii=False)
Expand Down
30 changes: 24 additions & 6 deletions fmperf/loadgen/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
import os
from durations import Duration
import numpy as np
from text_generation_tests.approx import approx
from fmperf.utils.approx import approx
import grpc
from google.protobuf import json_format
from text_generation_tests.pb import generation_pb2_grpc as gpb2, generation_pb2 as pb2
from fmperf.utils import parse_results
from datetime import datetime
from .collect_energy import collect_metrics, summarize_energy
from fmperf.utils.constants import REQUESTS_DIR, REQUESTS_FILENAME, RESULTS_ALL_FILENAME


def run():
Expand Down Expand Up @@ -41,6 +41,7 @@ def get_streaming_response_vllm(response):
)

stop = False
prev_completion_tokens = 0
while not stop:
try:
chunk = next(response_iter)
Expand All @@ -49,17 +50,30 @@ def get_streaming_response_vllm(response):
data = chunk.decode("utf-8").strip().split("data: ")[1]
out = json.loads(data)["choices"][0]
stop = out["finish_reason"] is not None
if not (out["text"] == ""): # filter empty tokens
yield out, 1, timestamp, True, None
usage = json.loads(data)["usage"]
token_count = usage["completion_tokens"] - prev_completion_tokens
prev_completion_tokens = usage["completion_tokens"]
for i in range(token_count):
yield {
"index": out["index"],
"text": "" if (i < token_count - 1) else out["text"],
"logprobs": None,
"finish_reason": (
None if (i < token_count - 1) else out["finish_reason"]
),
"stop_reason": (
None if (i < token_count - 1) else out["stop_reason"]
),
}, 1, timestamp, True, None
except Exception as e:
timestamp = time.time_ns()
yield None, 0, timestamp, False, e

# we have stopped
yield None, 0, time.time_ns(), False, StopIteration()

infile = "/requests/%s" % (os.environ["REQUESTS_FILENAME"])
outfile = "/requests/%s" % (os.environ["RESULTS_FILENAME"])
infile = os.path.join(REQUESTS_DIR, REQUESTS_FILENAME)
outfile = os.path.join(REQUESTS_DIR, RESULTS_FILENAME)
target = os.environ["TARGET"]
api_url = os.environ["URL"]
num_users = int(os.environ["NUM_USERS"])
Expand All @@ -74,6 +88,8 @@ def worker(wid, channel):
rs = np.random.RandomState(seed=wid)

if target == "tgis":
from text_generation_tests.pb import generation_pb2_grpc as gpb2

stub = gpb2.GenerationServiceStub(channel)

t_start = time.time_ns()
Expand All @@ -97,6 +113,8 @@ def worker(wid, channel):
stream=True,
)
elif target == "tgis":
from text_generation_tests.pb import generation_pb2 as pb2

message = json_format.ParseDict(
sample_request, pb2.SingleGenerationRequest()
)
Expand Down
8 changes: 7 additions & 1 deletion fmperf/loadgen/sweep.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
from .run import run
from fmperf.utils import parse_results
from fmperf.utils.constants import REQUESTS_DIR, RESULTS_ALL_FILENAME

users = [int(u) for u in os.environ["SWEEP_USERS"].split(",")]

Expand All @@ -13,11 +14,16 @@

run()

filename = "/requests/result_sweep_u%d.json" % (u)
filename = os.path.join(REQUESTS_DIR, "requests/result_sweep_u%d.json" % (u))

with open(filename, "rb") as f:
tmp = json.load(f)

results.extend(tmp["results"])

parse_results(results, print_df=True)

outfile = os.path.point(REQUESTS_DIR, RESULTS_ALL_FILENAME)
print(f">> writing all results to file: {outfile}")
with open(outfile, "w") as f:
json.dump(results, f)
39 changes: 39 additions & 0 deletions fmperf/tests/test_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import unittest
import logging


# Configure logging
logging.basicConfig(
filename="test_logs.log",
level=logging.DEBUG,
format="%(asctime)s %(levelname)s:%(message)s",
filemode="w",
)
logging.debug("Logging configured successfully")


# Test class to check if the imports are working for the files in the examples folder
class TestImports(unittest.TestCase):
def setUp(self):
# Setup code goes here
logging.info("Running a test case.")

def tearDown(self):
# Teardown code can go here, if we needed to clean up after tests
pass

def test_fmperf_import(self):
"""Test if fmperf import works correctly."""
try:
import fmperf

self.assertIsNotNone(fmperf)
logging.info("test_fmperf_import passed.")
except Exception as e:
logging.error(f"test_fmperf_import failed: {e}")
raise


if __name__ == "__main__":
unittest.main()
logging.getLogger().handlers[0].flush()
6 changes: 3 additions & 3 deletions fmperf/utils/Parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ def parse_results(results, print_df=False, print_csv=False):

df_out["latency_prefill_ms"] = df_prefill.groupby(["exp_num_users"])[
"duration_ms"
].median()
].mean()
df_out["latency_nexttoken_ms"] = df_nexttoken.groupby(["exp_num_users"])[
"duration_ms"
].median()
].mean()
df_out["latency_e2e_ms"] = (
df.groupby(["exp_num_users", "worker_idx", "request_idx"])["duration_ms"]
.sum()
.groupby("exp_num_users")
.median()
.mean()
)

with pd.option_context(
Expand Down
Loading

0 comments on commit b0210b6

Please sign in to comment.