Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge upstream #4

Merged
merged 14 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion applications/rag/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ variable "gpu_pools" {
name = "gpu-pool-l4"
machine_type = "g2-standard-24"
autoscaling = true
min_count = 1
min_count = 0
max_count = 3
disk_size_gb = 200
disk_type = "pd-balanced"
Expand Down
Binary file not shown.
2 changes: 1 addition & 1 deletion applications/ray/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ variable "gpu_pools" {
name = "gpu-pool-l4"
machine_type = "g2-standard-24"
autoscaling = true
min_count = 1
min_count = 0
max_count = 3
disk_size_gb = 100
disk_type = "pd-balanced"
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ cd inference-server/text-generation-inference
# Copy the sample variables and update the project number and cluster name in
# the fleet_host variable "https://connectgateway.googleapis.com/v1/projects/<project-number>/locations/global/gkeMemberships/<cluster-name>"
# in the `terraform.tfvars` file.
cp ./sample-tfvars/gpu-sample.tfvars terraform.tfvars
cp ./sample-terraform.tfvars terraform.tfvars

# Initialize the Terraform modules.
terraform init
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/benchmark/tools/locust-load-inference/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ You will set the `gcs_path` in your `terraform.tfvars` to this gcs path containi

The Locust workload requires storage.admin access to view the dataset in the given gcs bucket. If you are running with workload identity, it obtains this access via a kubernetes service account that is backed by a gcloud service account. If you followed steps in `../../infra`, then you already have a kubernetes and gcloud service account created that you can use here.

**Note: If you would like your raw benchmark data as a CSV, add the Locust master serviceAccount as a Storage Admin to the GCS bucket**

To give viewer permissions on the gcs bucket to the gcloud service account, run the following:

```
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import datetime
import logging
import json
import csv
import os
from datetime import datetime
from google.cloud import storage

class TokenMetricCollector:
class MetricCollector:
def __init__(self):
self.request_metrics = []
self.tokens_sent = []
self.tokens_received = []
self.test_time = []
Expand All @@ -12,6 +17,7 @@ def __init__(self):
self.time_to_first_token_list = []

def add_metric(self, sent, received, test_time, request_succesful_bool, ttft):
self.request_metrics.append({"success": request_succesful_bool, "input_tokens": sent, "output_tokens": received, "total_request_time": test_time, "time_to_first_token": ttft})
if request_succesful_bool == 1:
self.tokens_sent.append(sent)
self.tokens_received.append(received)
Expand All @@ -22,16 +28,17 @@ def add_metric(self, sent, received, test_time, request_succesful_bool, ttft):
else:
self.failure_count += 1

def add_metrics(self, tokens_sent, tokens_received, test_time, success_count, failure_count, ttfts):
def add_metrics(self, tokens_sent, tokens_received, test_time, success_count, failure_count, ttfts, request_metrics):
self.tokens_sent = self.tokens_sent + tokens_sent
self.tokens_received = self.tokens_received + tokens_received
self.test_time = self.test_time + test_time
self.success_count += success_count
self.failure_count += failure_count
self.time_to_first_token_list = self.time_to_first_token_list + ttfts
self.request_metrics = self.request_metrics + request_metrics

def share_stats(self):
return self.tokens_sent, self.tokens_received, self.test_time, self.success_count, self.failure_count, self.time_to_first_token_list
return self.tokens_sent, self.tokens_received, self.test_time, self.success_count, self.failure_count, self.time_to_first_token_list, self.request_metrics

def calculate_average_tokens(self):
if self.tokens_sent and len(self.tokens_sent) > 0:
Expand Down Expand Up @@ -60,4 +67,16 @@ def json_dump_report(self):
"average-time-to-first-token": sum(self.time_to_first_token_list)/max(len(self.time_to_first_token_list),1)
}
return json.dumps(stats)

def dump_to_csv(self):
fields = ['success', 'total_request_time', 'time_to_first_token', 'input_tokens', 'output_tokens']
now = datetime.now()
storage_client = storage.Client()
bucket = storage_client.bucket(os.environ['BUCKET'])
timestamp = now.strftime('metrics%Y-%m-%d%H:%M:%S.csv')
blob = bucket.blob(timestamp)
with blob.open('w') as metricsfile:
writer = csv.DictWriter(metricsfile, fieldnames=fields)
writer.writeheader()
writer.writerows(self.request_metrics)

Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
from grpc_interceptor import ClientInterceptor


from custom_metric_aggregator import TokenMetricCollector
local_metric_collector = TokenMetricCollector()
from custom_metric_aggregator import MetricCollector
local_metric_collector = MetricCollector()

logging.basicConfig(level=logging.INFO)
grpc_gevent.init_gevent()
Expand Down Expand Up @@ -208,6 +208,7 @@ def on_test_stop(environment, **kwargs):
"""on test stop the locust master resets metric collector"""
if isinstance(environment.runner, MasterRunner):
logging.info(f'dumping metrics before clear: {local_metric_collector.json_dump_report()}')
local_metric_collector.dump_to_csv()
logging.info(f'init metric_collector')
local_metric_collector.__init__()

Expand All @@ -224,13 +225,14 @@ def on_report_to_master(client_id, data):
to the dict that is being sent, and then we clear the local stats in the worker, so
as to avoid sending duplicate data to the master on the next run.
"""
tokens_sent, tokens_recieved, test_time, success_count, failure_count, ttft = local_metric_collector.share_stats()
tokens_sent, tokens_recieved, test_time, success_count, failure_count, ttft, request_metrics = local_metric_collector.share_stats()
data["tokens-sent"] = tokens_sent
data["tokens-received"] = tokens_recieved
data["test-time"] = test_time
data["success-count"] = success_count
data["failure-count"] = failure_count
data["time_to_first_token"] = ttft
data["request-metrics"] = request_metrics
local_metric_collector.__init__


Expand All @@ -242,7 +244,7 @@ def on_worker_report(client_id, data):
stats dict.
"""
local_metric_collector.add_metrics(
data["tokens-sent"], data["tokens-received"], data["test-time"], data["success-count"], data["failure-count"], data["time_to_first_token"])
data["tokens-sent"], data["tokens-received"], data["test-time"], data["success-count"], data["failure-count"], data["time_to_first_token"], data["request-metrics"])


@events.init_command_line_parser.add_listener
Expand Down Expand Up @@ -339,13 +341,16 @@ def grpc_infer(self):
)
logging.info(f"Prompt: {prompt}")
#return values format is from the interceptor, which makes the actual call
output, ttft, response_time = self.stub.Decode(request)
logging.info(f"Response: {output}")

number_of_input_tokens = len(tokenizer.encode(prompt))
number_of_output_tokens = len(tokenizer.encode(output))
send_metrics(number_of_input_tokens, number_of_output_tokens, response_time,1, ttft)
try:
output, ttft, response_time = self.stub.Decode(request)
logging.info(f"Response: {output}")

number_of_input_tokens = len(tokenizer.encode(prompt))
number_of_output_tokens = len(tokenizer.encode(output))
send_metrics(number_of_input_tokens, number_of_output_tokens, response_time, 1, ttft)
except:
# Capture that a test was ran, but the request threw an exception
send_metrics(-1,-1,-1,0,-1)

class LocustInterceptor(ClientInterceptor):
def __init__(self, environment, *args, **kwargs):
Expand All @@ -371,11 +376,11 @@ def intercept(
# chunk sent back is used to calculate time to first token(TTFT).
for response in responses:
if ttft == 0:
ttft = time.perf_counter() - start_perf_counter
ttft = (time.perf_counter() - start_perf_counter) * 1000
output += response.response[0]
response_length += response.ByteSize()
response_time_ms = (time.perf_counter() - start_perf_counter) * 1000
logging.info(f"response_time {response_time_ms}; ttft:{ttft * 1000}")
logging.info(f"response_time {response_time_ms}; ttft:{ttft}")
self.env.events.request.fire(
request_type="grpc",
name=call_details.method,
Expand Down
1 change: 1 addition & 0 deletions benchmarks/benchmark/tools/locust-load-inference/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ locals {
k8s_hf_secret_list = var.k8s_hf_secret == null ? [] : [var.k8s_hf_secret]
stop_timeout = var.stop_timeout
request_type = var.request_type
bucket = var.output_bucket
})) : data]
])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ spec:
value: ${stop_timeout}
- name: REQUEST_TYPE
value: ${request_type}
- name: BUCKET
value: ${bucket}
ports:
- name: loc-master-web
containerPort: 8089
Expand Down
Loading