diff --git a/tools/invoker/client.go b/tools/invoker/client.go index 2a95b4f3..48dcb35a 100644 --- a/tools/invoker/client.go +++ b/tools/invoker/client.go @@ -53,6 +53,8 @@ const TimeseriesDBAddr = "10.96.0.84:90" var ( completed int64 latSlice LatencySlice + profSlice LatencySlice + funcDurEnableFlag *bool portFlag *int grpcTimeout time.Duration withTracing *bool @@ -64,6 +66,8 @@ func main() { rps := flag.Float64("rps", 1.0, "Target requests per second") runDuration := flag.Int("time", 5, "Run the experiment for X seconds") latencyOutputFile := flag.String("latf", "lat.csv", "CSV file for the latency measurements in microseconds") + funcDurationOutputFile := flag.String("durf", "dur.csv", "CSV file for the function duration measurements in microseconds") + funcDurEnableFlag := flag.Bool("profile", false, "Enable function duration profiling") portFlag = flag.Int("port", 80, "The port that functions listen to") withTracing = flag.Bool("trace", false, "Enable tracing in the client") zipkin := flag.String("zipkin", "http://localhost:9411/api/v2/spans", "zipkin url") @@ -107,6 +111,9 @@ func main() { realRPS := runExperiment(endpoints, *runDuration, *rps) writeLatencies(realRPS, *latencyOutputFile) + if *funcDurEnableFlag { + writeFunctionDurations(*funcDurationOutputFile) + } } func readEndpoints(path string) (endpoints []*endpoint.Endpoint, _ error) { @@ -187,6 +194,16 @@ func SayHello(address, workflowID string) { if err != nil { log.Warnf("Failed to invoke %v, err=%v", address, err) } else { + //if *funcDurEnableFlag { + // words := strings.Fields(response.Message) + // lastWord := words[len(words)-1] + // duration, err := strconv.ParseInt(lastWord, 10, 64) + // if err == nil { + // profSlice.Lock() + // profSlice.slice = append(profSlice.slice, duration) + // profSlice.Unlock() + // } + //} atomic.AddInt64(&completed, 1) } } @@ -256,3 +273,29 @@ func writeLatencies(rps float64, latencyOutputFile string) { datawriter.Flush() file.Close() } + +func writeFunctionDurations(funcDurationOutputFile string) { + profSlice.Lock() + defer profSlice.Unlock() + + fileName := funcDurationOutputFile + log.Info("The measured function durations are saved in ", fileName) + + file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + + if err != nil { + log.Fatal("Failed creating file: ", err) + } + + datawriter := bufio.NewWriter(file) + + for _, dur := range profSlice.slice { + _, err := datawriter.WriteString(strconv.FormatInt(dur, 10) + "\n") + if err != nil { + log.Fatal("Failed to write the URLs to a file ", err) + } + } + + datawriter.Flush() + file.Close() +} \ No newline at end of file diff --git a/tools/relay/server.go b/tools/relay/server.go index 9e7f331a..05404d55 100644 --- a/tools/relay/server.go +++ b/tools/relay/server.go @@ -28,6 +28,8 @@ import ( "fmt" "net" "os" + "strconv" + "time" pb "github.com/vhive-serverless/vSwarm-proto/proto/helloworld" @@ -61,6 +63,7 @@ var ( value = flag.String("value", "helloWorld", "String input to pass to benchmark") functionMethod = flag.String("function-method", "default", "Which method of benchmark to invoke") verbose = flag.Bool("verbose", false, "Enable verbose log printing") + profileFunction = flag.Bool("profile-function", false, "Enable function profiling") // Client grpcClient grpcClients.GrpcClient @@ -155,8 +158,22 @@ func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloRe // Create new packet pkt := inputGenerator.Next() log.Debugf("Send to func: %s\n", pkt) + + startTime := time.Now() reply, err := grpcClient.Request(ctx, pkt) - log.Debugf("Recv from func: %s\n", reply) + endTime := time.Now() + elapsedTime := int64(endTime.Sub(startTime).Microseconds()) + + var finalReply string + + if *profileFunction { + log.Debugf("Invoked in %d usec\n. Recv from func: %s\n", elapsedTime, reply) + elapsedTimeStr := strconv.FormatInt(elapsedTime, 10) + finalReply = reply + "|" + elapsedTimeStr + } else { + log.Debugf("Recv from func: %s\n", reply) + finalReply = reply + } - return &pb.HelloReply{Message: reply}, err + return &pb.HelloReply{Message: finalReply}, err }