Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of unfinished responses from PA #30

Merged
merged 6 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
matthewkotila marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,24 @@ def _preprocess_response(
) -> None:
"""Helper function to preprocess responses of a request."""
if self._service_kind == "openai":
# Sometimes streamed chunks are returned in a splintered fashion.
# This forces a merge with the previous chunk if error detected.
if len(res_outputs) > 1:
for i in reversed(range(len(res_outputs))):
response = res_outputs[i]["response"]
if not response.startswith("data: "):
first_data = response.find("data: ")
if first_data == -1:
res_outputs[i - 1]["response"] = (
res_outputs[i - 1]["response"] + response.strip()
)
res_outputs[i]["response"] = ""
else:
res_outputs[i - 1]["response"] = (
res_outputs[i - 1]["response"]
+ response[0:first_data].strip()
)
res_outputs[i]["response"] = response[first_data:].strip()
# PA sometimes receives multiple SSE responses at once (as a single
# response). Handle these responses by merging into a single response.
debermudez marked this conversation as resolved.
Show resolved Hide resolved
for i in range(len(res_outputs)):
Expand All @@ -193,7 +211,9 @@ def _preprocess_response(
# Remove responses without any content
indices_to_remove = []
for idx, out in enumerate(res_outputs):
if self._is_openai_empty_response(out["response"]):
if not out["response"] or self._is_openai_empty_response(
out["response"]
):
indices_to_remove.append(idx)
indices_to_remove.sort(reverse=True)
for index in indices_to_remove:
Expand Down
47 changes: 47 additions & 0 deletions genai-perf/tests/test_llm_profile_data_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ def write(self: Any, content: str) -> int:
elif filename == "empty_profile_export.json":
tmp_file = StringIO(json.dumps(self.empty_profile_data))
return tmp_file
elif filename == "unfinished_responses_profile_export.json":
tmp_file = StringIO(json.dumps(self.unfinished_responses_profile_data))
return tmp_file
elif filename == "profile_export.csv":
tmp_file = StringIO()
tmp_file.write = write.__get__(tmp_file)
Expand Down Expand Up @@ -512,6 +515,50 @@ def test_empty_response(self, mock_read_write: pytest.MonkeyPatch) -> None:
tokenizer=tokenizer,
)

def test_unfinished_responses(self, mock_read_write: pytest.MonkeyPatch) -> None:
nv-hwoo marked this conversation as resolved.
Show resolved Hide resolved
"""Check if it handles unfinished responses."""
res_timestamps = [0, 1, 2]
res_outputs = [
{
"response": 'data: {"id":"8ae835f2ecbb67f3-SJC","object":"chat.completion.chunk","created":1722875835,"choices":[{"index":0,"text"'
},
{
"response": ':" writing","logprobs":null,"finish_reason":null,"seed":null,"delta":{"token_id":4477,"role":"assistant","content":" writing","tool_calls":null}}],"model":"meta-llama/Llama-3-8b-chat-hf","usage":null}'
},
{"response": "data: [DONE]\n\n"},
]
expected_response = 'data: {"id":"8ae835f2ecbb67f3-SJC","object":"chat.completion.chunk","created":1722875835,"choices":[{"index":0,"text":" writing","logprobs":null,"finish_reason":null,"seed":null,"delta":{"token_id":4477,"role":"assistant","content":" writing","tool_calls":null}}],"model":"meta-llama/Llama-3-8b-chat-hf","usage":null}'

tokenizer = get_tokenizer(DEFAULT_TOKENIZER)
pd = LLMProfileDataParser(
filename=Path("openai_profile_export.json"),
tokenizer=tokenizer,
)

pd._preprocess_response(res_timestamps, res_outputs)
assert res_outputs[0]["response"] == expected_response

def test_non_sse_response(self, mock_read_write: pytest.MonkeyPatch) -> None:
"""Check if it handles single responses."""
res_timestamps = [
0,
]
res_outputs = [
{
"response": '{"id":"1","object":"chat.completion","created":2,"model":"gpt2","choices":[{"index":0,"message":{"role":"assistant","content":"A friend of mine, who is also a cook, writes a blog.","tool_calls":[]},"logprobs":null,"finish_reason":"length","stop_reason":null}],"usage":{"prompt_tokens":47,"total_tokens":1024,"completion_tokens":977}}'
},
]
expected_response = '{"id":"1","object":"chat.completion","created":2,"model":"gpt2","choices":[{"index":0,"message":{"role":"assistant","content":"A friend of mine, who is also a cook, writes a blog.","tool_calls":[]},"logprobs":null,"finish_reason":"length","stop_reason":null}],"usage":{"prompt_tokens":47,"total_tokens":1024,"completion_tokens":977}}'

tokenizer = get_tokenizer(DEFAULT_TOKENIZER)
pd = LLMProfileDataParser(
filename=Path("openai_profile_export.json"),
tokenizer=tokenizer,
)

pd._preprocess_response(res_timestamps, res_outputs)
assert res_outputs[0]["response"] == expected_response

empty_profile_data = {
"service_kind": "openai",
"endpoint": "v1/chat/completions",
Expand Down
Loading