Skip to content

Commit

Permalink
Porting minor changes from aux branch
Browse files Browse the repository at this point in the history
Signed-off-by: Lazar Cvetković <[email protected]>
  • Loading branch information
cvetkovic committed Nov 15, 2024
1 parent cdb68d7 commit cd1cfba
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 42 deletions.
2 changes: 1 addition & 1 deletion cmd/config_dirigent_dandelion_rps.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"BusyLoopOnSandboxStartup": false,

"AsyncMode": false,
"AsyncResponseURL": "10.0.1.3:8082",
"AsyncResponseURL": "10.0.1.253:8082",
"AsyncWaitToCollectMin": 1,

"RpsTarget": 1,
Expand Down
2 changes: 1 addition & 1 deletion cmd/config_dirigent_dandelion_trace.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"BusyLoopOnSandboxStartup": false,

"AsyncMode": false,
"AsyncResponseURL": "10.0.1.3:8082",
"AsyncResponseURL": "10.0.1.253:8082",
"AsyncWaitToCollectMin": 1,

"TracePath": "data/traces/example",
Expand Down
4 changes: 2 additions & 2 deletions cmd/config_dirigent_rps.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
"InvokeProtocol" : "http2",
"EndpointPort": 80,

"DirigentControlPlaneIP": "localhost:9092",
"DirigentControlPlaneIP": "10.0.1.253:9091",
"BusyLoopOnSandboxStartup": false,

"AsyncMode": false,
"AsyncResponseURL": "10.0.1.3:8082",
"AsyncResponseURL": "10.0.1.253:8082",
"AsyncWaitToCollectMin": 1,

"RpsTarget": 1,
Expand Down
2 changes: 1 addition & 1 deletion cmd/config_dirigent_trace.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"InvokeProtocol" : "http2",
"EndpointPort": 80,

"DirigentControlPlaneIP": "10.0.1.253:9092",
"DirigentControlPlaneIP": "10.0.1.253:9091",
"BusyLoopOnSandboxStartup": false,

"AsyncMode": false,
Expand Down
1 change: 0 additions & 1 deletion pkg/common/specification_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package common

// IATMatrix - columns are minutes, rows are IATs
type IATArray []float64

// ProbabilisticDuration used for testing the exponential distribution
Expand Down
18 changes: 16 additions & 2 deletions pkg/driver/deployment/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"math"
"os/exec"
"regexp"
"runtime"
"strconv"
"sync"
)

const (
Expand Down Expand Up @@ -46,15 +48,25 @@ func newKnativeDeployerConfiguration(cfg *config.Configuration) knativeDeploymen
func (*knativeDeployer) Deploy(cfg *config.Configuration) {
knativeConfig := newKnativeDeployerConfiguration(cfg)

queue := make(chan struct{}, runtime.NumCPU()) // message queue as a sync method
deployed := sync.WaitGroup{}
deployed.Add(len(cfg.Functions))

for i := 0; i < len(cfg.Functions); i++ {
queue <- struct{}{}

knativeDeploySingleFunction(
cfg.Functions[i],
knativeConfig.YamlPath,
knativeConfig.IsPartiallyPanic,
knativeConfig.EndpointPort,
knativeConfig.AutoscalingMetric,
&deployed,
queue,
)
}

deployed.Wait()
}

func (*knativeDeployer) Clean() {
Expand All @@ -68,8 +80,10 @@ func (*knativeDeployer) Clean() {
}
}

func knativeDeploySingleFunction(function *common.Function, yamlPath string, isPartiallyPanic bool, endpointPort int,
autoscalingMetric string) bool {
func knativeDeploySingleFunction(function *common.Function, yamlPath string, isPartiallyPanic bool, endpointPort int, autoscalingMetric string, deployed *sync.WaitGroup, queue chan struct{}) bool {
defer deployed.Done()
defer func() { <-queue }()

panicWindow := "\"10.0\""
panicThreshold := "\"200.0\""
if isPartiallyPanic {
Expand Down
53 changes: 31 additions & 22 deletions pkg/driver/trace_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,37 +514,46 @@ func (d *Driver) internalRun(generated bool) {
log.Infof("Number of failed invocations: \t%d\n", atomic.LoadInt64(&failedInvocations))
}

func (d *Driver) generateSpecs() {
log.Info("Generating IAT and runtime specifications for all the functions")

for i, function := range d.Configuration.Functions {
// Equalising all the InvocationStats to the first function
if d.Configuration.LoaderConfiguration.DAGMode {
function.InvocationStats.Invocations = d.Configuration.Functions[0].InvocationStats.Invocations
}
spec := d.SpecificationGenerator.GenerateInvocationData(
function,
d.Configuration.IATDistribution,
d.Configuration.ShiftIAT,
d.Configuration.TraceGranularity,
)

d.Configuration.Functions[i].Specification = spec
}
}

func (d *Driver) outputIATsToFile() {
for i, function := range d.Configuration.Functions {
file, _ := json.MarshalIndent(function.Specification, "", " ")
err := os.WriteFile("iat"+strconv.Itoa(i)+".json", file, 0644)
if err != nil {
log.Fatalf("Writing the loader config file failed: %s", err)
}
}
}

func (d *Driver) RunExperiment(generateSpecs bool, writeIATsToFile bool, readIATsFromFile bool) {
if generateSpecs && readIATsFromFile {
log.Fatal("Invalid loader configuration. Cannot be forced to generate IATs and read the from file in the same experiment.")
}

if generateSpecs {
log.Info("Generating IAT and runtime specifications for all the functions")
for i, function := range d.Configuration.Functions {
// Equalising all the InvocationStats to the first function
if d.Configuration.LoaderConfiguration.DAGMode {
function.InvocationStats.Invocations = d.Configuration.Functions[0].InvocationStats.Invocations
}
spec := d.SpecificationGenerator.GenerateInvocationData(
function,
d.Configuration.IATDistribution,
d.Configuration.ShiftIAT,
d.Configuration.TraceGranularity,
)

d.Configuration.Functions[i].Specification = spec
}
d.generateSpecs()
}

if writeIATsToFile {
for i, function := range d.Configuration.Functions {
file, _ := json.MarshalIndent(function.Specification, "", " ")
err := os.WriteFile("iat"+strconv.Itoa(i)+".json", file, 0644)
if err != nil {
log.Fatalf("Writing the loader config file failed: %s", err)
}
}
d.outputIATsToFile()

log.Info("IATs have been generated. The program has exited.")
os.Exit(0)
Expand Down
8 changes: 2 additions & 6 deletions pkg/trace/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,8 @@ func createDirigentMetadataMap(metadata *[]common.DirigentMetadata) map[string]*
return result
}

func (p *AzureTraceParser) extractFunctions(
invocations *[]common.FunctionInvocationStats,
runtime *[]common.FunctionRuntimeStats,
memory *[]common.FunctionMemoryStats,
dirigentMetadata *[]common.DirigentMetadata,
platform string) []*common.Function {
func (p *AzureTraceParser) extractFunctions(invocations *[]common.FunctionInvocationStats, runtime *[]common.FunctionRuntimeStats,
memory *[]common.FunctionMemoryStats, dirigentMetadata *[]common.DirigentMetadata, platform string) []*common.Function {

var result []*common.Function

Expand Down
73 changes: 67 additions & 6 deletions pkg/workload/openwhisk/workload_openwhisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,74 @@

package main

func Main(obj map[string]interface{}) map[string]interface{} {
msg := "You did not tell me who you are."
name, ok := obj["name"].(string)
if ok {
msg = "Hello, " + name + "!"
import (
"encoding/json"
"strconv"
"time"
)

// static double SQRTSD (double x) {
// double r;
// __asm__ ("sqrtsd %1, %0" : "=x" (r) : "x" (x));
// return r;
// }
import "C"

const ExecUnit int = 1e2

func takeSqrts() C.double {
var tmp C.double // Circumvent compiler optimizations
for i := 0; i < ExecUnit; i++ {
tmp = C.SQRTSD(C.double(10))
}
return tmp
}

func busySpin(multiplier, runtimeMilli uint32) {
totalIterations := int(multiplier * runtimeMilli)

for i := 0; i < totalIterations; i++ {
takeSqrts()
}
}

type FunctionResponse struct {
Status string `json:"Status"`
Function string `json:"Function"`
MachineName string `json:"MachineName"`
ExecutionTime int64 `json:"ExecutionTime"`
}

func Main(obj map[string]interface{}) map[string]interface{} {
requestedCpu, ok := obj["cpu"].(string)
result := make(map[string]interface{})
result["body"] = `<html><body><h3>` + msg + `</h3></body></html>`

if !ok {
result["body"] = obj
return result
}

ts, _ := strconv.Atoi(requestedCpu)

start := time.Now()
timeLeftMilliseconds := uint32(ts)

timeConsumedMilliseconds := uint32(time.Since(start).Milliseconds())
if timeConsumedMilliseconds < timeLeftMilliseconds {
timeLeftMilliseconds -= timeConsumedMilliseconds
if timeLeftMilliseconds > 0 {
busySpin(uint32(155), timeLeftMilliseconds)
}
}

responseBytes, _ := json.Marshal(FunctionResponse{
Status: "OK",
Function: "",
MachineName: "NYI",
ExecutionTime: time.Since(start).Microseconds(),
})

result["body"] = responseBytes

return result
}

0 comments on commit cd1cfba

Please sign in to comment.