Skip to content

Commit

Permalink
New gRPC interface
Browse files Browse the repository at this point in the history
Signed-off-by: Lazar Cvetković <[email protected]>
  • Loading branch information
cvetkovic committed Sep 3, 2024
1 parent 684e8d1 commit b46e2b2
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 16 deletions.
6 changes: 5 additions & 1 deletion .github/configs/wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -763,4 +763,8 @@ RpsMemoryMB
RpsRuntimeMs
RpsTarget
SQRT
RpsImage
RpsImage
BusyLoopOnSandboxStartup
DirigentControlPlaneIP
InvokeProtocol
autoscaler
1 change: 1 addition & 0 deletions pkg/config/test_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"Seed": 42,

"Platform": "Knative",
"InvokeProtocol": "grpc",
"YAMLSelector": "container",
"EndpointPort": 80,

Expand Down
24 changes: 10 additions & 14 deletions pkg/driver/clients/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/vhive-serverless/loader/pkg/config"
"github.com/vhive-serverless/loader/pkg/workload/proto"

log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

Expand All @@ -51,7 +51,7 @@ func newGRPCInvoker(cfg *config.LoaderConfiguration) *grpcInvoker {
}

func (i *grpcInvoker) Invoke(function *common.Function, runtimeSpec *common.RuntimeSpecification) (bool, *mc.ExecutionRecord) {
log.Tracef("(Invoke)\t %s: %d[ms], %d[MiB]", function.Name, runtimeSpec.Runtime, runtimeSpec.Memory)
logrus.Tracef("(Invoke)\t %s: %d[ms], %d[MiB]", function.Name, runtimeSpec.Runtime, runtimeSpec.Memory)

record := &mc.ExecutionRecord{
ExecutionRecordBase: mc.ExecutionRecordBase{
Expand All @@ -65,30 +65,25 @@ func (i *grpcInvoker) Invoke(function *common.Function, runtimeSpec *common.Runt
start := time.Now()
record.StartTime = start.UnixMicro()

dialContext, cancelDialing := context.WithTimeout(context.Background(), time.Duration(i.cfg.GRPCConnectionTimeoutSeconds)*time.Second)
defer cancelDialing()

var dialOptions []grpc.DialOption
dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
dialOptions = append(dialOptions, grpc.WithBlock())

grpcStart := time.Now()

conn, err := grpc.DialContext(dialContext, function.Endpoint, dialOptions...)
defer gRPCConnectionClose(conn)
conn, err := grpc.NewClient(function.Endpoint, dialOptions...)
if err != nil {
log.Debugf("Failed to establish a gRPC connection - %v\n", err)
logrus.Debugf("Failed to establish a gRPC connection - %v\n", err)

record.ResponseTime = time.Since(start).Microseconds()
record.ConnectionTimeout = true

return false, record
}
defer gRPCConnectionClose(conn)

record.GRPCConnectionEstablishTime = time.Since(grpcStart).Microseconds()

grpcClient := proto.NewExecutorClient(conn)

executionCxt, cancelExecution := context.WithTimeout(context.Background(), time.Duration(i.cfg.GRPCFunctionTimeoutSeconds)*time.Second)
defer cancelExecution()

Expand All @@ -99,9 +94,10 @@ func (i *grpcInvoker) Invoke(function *common.Function, runtimeSpec *common.Runt
})

if err != nil {
log.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err)
logrus.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err)

record.ResponseTime = time.Since(start).Microseconds()
record.ConnectionTimeout = true // WithBlock deprecated in new gRPC interface
record.FunctionTimeout = true

return false, record
Expand All @@ -117,9 +113,9 @@ func (i *grpcInvoker) Invoke(function *common.Function, runtimeSpec *common.Runt
record.ActualMemoryUsage = common.Kib2Mib(response.MemoryUsageInKb)
}

log.Tracef("(Replied)\t %s: %s, %.2f[ms], %d[MiB]", function.Name, response.Message,
logrus.Tracef("(Replied)\t %s: %s, %.2f[ms], %d[MiB]", function.Name, response.Message,
float64(response.DurationInMicroSec)/1e3, common.Kib2Mib(response.MemoryUsageInKb))
log.Tracef("(E2E Latency) %s: %.2f[ms]\n", function.Name, float64(record.ResponseTime)/1e3)
logrus.Tracef("(E2E Latency) %s: %.2f[ms]\n", function.Name, float64(record.ResponseTime)/1e3)

return true, record
}
Expand All @@ -139,6 +135,6 @@ func gRPCConnectionClose(conn *grpc.ClientConn) {
}

if err := conn.Close(); err != nil {
log.Warnf("Error while closing gRPC connection - %s\n", err)
logrus.Warnf("Error while closing gRPC connection - %s\n", err)
}
}
1 change: 0 additions & 1 deletion tools/driver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ is then the duration of the measurement phase + the duration of the warmup phase
"outputDir": The local output directory on the machine running the experiment driver, this is where experiment results
will be saved.
"YAMLSelector": Which yaml specification to use, supported values are
"[wimpy](/workloads/container/wimpy.yaml)",
"[container](/workloads/container/trace_func_go.yaml)" and
"[firecracker](/workloads/firecracker/trace_func_go.yaml)".
"IATDistribution": The IAT distribution that the loader will use when sending invocations to the worker node(s).
Expand Down

0 comments on commit b46e2b2

Please sign in to comment.