Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added profiling to the relay and client #1020

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions tools/invoker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"flag"
"fmt"
"os"
"strings"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -53,6 +54,8 @@ const TimeseriesDBAddr = "10.96.0.84:90"
var (
completed int64
latSlice LatencySlice
profSlice LatencySlice
funcDurEnableFlag *bool
portFlag *int
grpcTimeout time.Duration
withTracing *bool
Expand All @@ -64,6 +67,8 @@ func main() {
rps := flag.Float64("rps", 1.0, "Target requests per second")
runDuration := flag.Int("time", 5, "Run the experiment for X seconds")
latencyOutputFile := flag.String("latf", "lat.csv", "CSV file for the latency measurements in microseconds")
funcDurationOutputFile := flag.String("durf", "dur.csv", "CSV file for the function duration measurements in microseconds")
funcDurEnableFlag = flag.Bool("profile", false, "Enable function duration profiling")
portFlag = flag.Int("port", 80, "The port that functions listen to")
withTracing = flag.Bool("trace", false, "Enable tracing in the client")
zipkin := flag.String("zipkin", "http://localhost:9411/api/v2/spans", "zipkin url")
Expand Down Expand Up @@ -107,6 +112,9 @@ func main() {
realRPS := runExperiment(endpoints, *runDuration, *rps)

writeLatencies(realRPS, *latencyOutputFile)
if *funcDurEnableFlag {
writeFunctionDurations(*funcDurationOutputFile)
}
}

func readEndpoints(path string) (endpoints []*endpoint.Endpoint, _ error) {
Expand Down Expand Up @@ -176,7 +184,7 @@ func SayHello(address, workflowID string) {
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
defer cancel()

_, err = c.SayHello(ctx, &pb.HelloRequest{
response, err := c.SayHello(ctx, &pb.HelloRequest{
Name: "Invoke relay",
VHiveMetadata: vhivemetadata.MakeVHiveMetadata(
workflowID,
Expand All @@ -187,6 +195,17 @@ func SayHello(address, workflowID string) {
if err != nil {
log.Warnf("Failed to invoke %v, err=%v", address, err)
} else {
if *funcDurEnableFlag {
log.Debugf("Inside if\n")
words := strings.Fields(response.Message)
lastWord := words[len(words)-1]
duration, err := strconv.ParseInt(lastWord, 10, 64)
if err == nil {
profSlice.Lock()
profSlice.slice = append(profSlice.slice, duration)
profSlice.Unlock()
}
}
atomic.AddInt64(&completed, 1)
}
}
Expand Down Expand Up @@ -249,10 +268,36 @@ func writeLatencies(rps float64, latencyOutputFile string) {
for _, lat := range latSlice.slice {
_, err := datawriter.WriteString(strconv.FormatInt(lat, 10) + "\n")
if err != nil {
log.Fatal("Failed to write the URLs to a file ", err)
log.Fatal("Failed to write the latencies to a file ", err)
}
}

datawriter.Flush()
file.Close()
}

func writeFunctionDurations(funcDurationOutputFile string) {
profSlice.Lock()
defer profSlice.Unlock()

fileName := funcDurationOutputFile
log.Info("The measured function durations are saved in ", fileName)

file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)

if err != nil {
log.Fatal("Failed creating file: ", err)
}

datawriter := bufio.NewWriter(file)

for _, dur := range profSlice.slice {
_, err := datawriter.WriteString(strconv.FormatInt(dur, 10) + "\n")
if err != nil {
log.Fatal("Failed to write the function durations to a file ", err)
}
}

datawriter.Flush()
file.Close()
}
21 changes: 19 additions & 2 deletions tools/relay/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"fmt"
"net"
"os"
"strconv"
"time"

pb "github.com/vhive-serverless/vSwarm-proto/proto/helloworld"

Expand Down Expand Up @@ -61,6 +63,7 @@ var (
value = flag.String("value", "helloWorld", "String input to pass to benchmark")
functionMethod = flag.String("function-method", "default", "Which method of benchmark to invoke")
verbose = flag.Bool("verbose", false, "Enable verbose log printing")
profileFunction = flag.Bool("profile-function", false, "Enable function profiling")

// Client
grpcClient grpcClients.GrpcClient
Expand Down Expand Up @@ -155,8 +158,22 @@ func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloRe
// Create new packet
pkt := inputGenerator.Next()
log.Debugf("Send to func: %s\n", pkt)

startTime := time.Now()
reply, err := grpcClient.Request(ctx, pkt)
log.Debugf("Recv from func: %s\n", reply)
endTime := time.Now()
elapsedTime := int64(endTime.Sub(startTime).Microseconds())

var finalReply string

if *profileFunction {
log.Debugf("Invoked in %d usec\n. Recv from func: %s\n", elapsedTime, reply)
elapsedTimeStr := strconv.FormatInt(elapsedTime, 10)
finalReply = reply + "|" + elapsedTimeStr
} else {
log.Debugf("Recv from func: %s\n", reply)
finalReply = reply
}

return &pb.HelloReply{Message: reply}, err
return &pb.HelloReply{Message: finalReply}, err
}
Loading