Skip to content

Commit

Permalink
Merge pull request #431 from filecoin-project/scale-test-and-viz
Browse files Browse the repository at this point in the history
Fixes found by scale testing, and viz framework
  • Loading branch information
lukemarsden authored Aug 2, 2022
2 parents b797491 + c3fd5f0 commit 69bcbd2
Show file tree
Hide file tree
Showing 23 changed files with 681 additions and 48 deletions.
1 change: 0 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ jobs:
command: |
export PATH="$HOME/.local/bin:${PATH}"
sudo apt install python3.10 python3-pip -y
sudo python3 -m pip install --upgrade pip
pip3 install gsutil
export SHA="<< pipeline.git.revision >>"
export DATETIME="$(date -u +"%FT%H%MZ")"
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ ops/terraform/*.out
benchmark_hack/results
*_unit.json
.trunk/logs
benchmark/results
5 changes: 3 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ linters-settings:
threshold: 100
funlen:
lines: 100
statements: 50
statements: 100
goconst:
min-len: 2
min-occurrences: 3
Expand Down Expand Up @@ -105,7 +105,7 @@ linters:
# Options for analysis running.
run:
# The default concurrency value is the number of available CPU.
concurrency: 4
# concurrency: 16
# Timeout for analysis, e.g. 30s, 5m.
# Default: 1m
timeout: 5m
Expand All @@ -127,6 +127,7 @@ run:
skip-dirs:
- experimental
- vendor
- viz
# Enables skipping of directories:
# - vendor$, third_party$, testdata$, examples$, Godeps$, builtin$
# Default: true
Expand Down
12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,22 @@ test-pythonwasm:
devstack:
go run . devstack

.PHONY: devstack-100
devstack-100:
go run . devstack --nodes 100

.PHONY: devstack-20
devstack-20:
go run . devstack --nodes 20

.PHONY: devstack-noop
devstack-noop:
go run . devstack --noop

.PHONY: devstack-noop-100
devstack-noop-100:
go run . devstack --noop --nodes 100

.PHONY: devstack-race
devstack-race:
go run -race . devstack
Expand Down
6 changes: 3 additions & 3 deletions benchmark/submit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ set -xeuo pipefail
# /bin/true

ID=$(${BACALHAU_BIN} --api-port="${API_PORT}" --api-host=localhost docker run --concurrency=3 busybox -- /bin/true)
COUNTER=0
COUNTER=1
while true; do
sleep 0.1
# trunk-ignore(shellcheck/SC2312)
if [[ $(${BACALHAU_BIN} --api-port="${API_PORT}" --api-host=localhost describe "${ID}" 2>&1|grep -c 'state:') -ne 3 ]]; then
if [[ $(${BACALHAU_BIN} --api-port="${API_PORT}" --api-host=localhost describe "${ID}" 2>&1|grep "State: Complete"|wc -l) -ne 3 ]]; then
echo "JOB ${ID} FAILED"
(( COUNTER++ ))
if (( COUNTER > 20 )); then
if (( COUNTER > 300 )); then
echo "JOB ${ID} checked ${COUNTER} times. Assuming failure."
exit 1
fi
Expand Down
107 changes: 86 additions & 21 deletions cmd/bacalhau/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,46 @@ import (
func init() { // nolint:gochecknoinits // Using init with Cobra Command is ideomatic
}

type eventDescription struct {
Event string `yaml:"Event"`
Time string `yaml:"Time"`
Concurrency int `yaml:"Concurrency"`
SourceNode string `yaml:"SourceNode"`
TargetNode string `yaml:"TargetNode"`
Status string `yaml:"Status"`
}

type localEventDescription struct {
Event string `yaml:"Event"`
TargetNode string `yaml:"TargetNode"`
}

type stateDescription struct {
State string `yaml:"State"`
Status string `yaml:"Status"`
ResultsID string `yaml:"Result CID"`
}

type jobDescription struct {
ID string `yaml:"Id"`
ClientID string `yaml:"ClientID"`
RequesterNodeID string `yaml:"RequesterNodeId"`
Spec jobSpecDescription `yaml:"Spec"`
Deal executor.JobDeal `yaml:"Deal"`
State map[string]executor.JobState `yaml:"State"`
CreatedAt time.Time `yaml:"Start Time"`
ID string `yaml:"ID"`
ClientID string `yaml:"ClientID"`
RequesterNodeID string `yaml:"RequesterNodeID"`
Spec jobSpecDescription `yaml:"Spec"`
Deal executor.JobDeal `yaml:"Deal"`
State map[string]stateDescription `yaml:"State"`
Events []eventDescription `yaml:"Events"`
LocalEvents []localEventDescription `yaml:"Local Events"`
CreatedAt time.Time `yaml:"Start Time"`
}

type jobSpecDescription struct {
Engine string `yaml:"Engine"`
Verifier string `yaml:"Verifier"`
VM jobSpecVMDescription `yaml:"VM"`
Deployment jobDealDescription `yaml:"Deployment"`
Engine string `yaml:"Engine"`
Verifier string `yaml:"Verifier"`
Docker jobSpecDockerDescription `yaml:"Docker"`
Deployment jobDealDescription `yaml:"Deployment"`
}

type jobSpecVMDescription struct {
type jobSpecDockerDescription struct {
Image string `yaml:"Image"`
Entrypoint []string `yaml:"Entrypoint Command"`
Env []string `yaml:"Submitted Env Variables"`
Expand Down Expand Up @@ -73,13 +95,25 @@ var describeCmd = &cobra.Command{
return err
}

jobVMDesc := jobSpecVMDescription{}
jobVMDesc.Image = job.Spec.Docker.Image
jobVMDesc.Entrypoint = job.Spec.Docker.Entrypoint
jobVMDesc.Env = job.Spec.Docker.Env
events, err := getAPIClient().GetEvents(context.Background(), jobID)
if err != nil {
log.Error().Msgf("Failure retrieving job events '%s': %s", jobID, err)
return err
}

localEvents, err := getAPIClient().GetLocalEvents(context.Background(), jobID)
if err != nil {
log.Error().Msgf("Failure retrieving job events '%s': %s", jobID, err)
return err
}

jobDockerDesc := jobSpecDockerDescription{}
jobDockerDesc.Image = job.Spec.Docker.Image
jobDockerDesc.Entrypoint = job.Spec.Docker.Entrypoint
jobDockerDesc.Env = job.Spec.Docker.Env

jobVMDesc.CPU = job.Spec.Resources.CPU
jobVMDesc.Memory = job.Spec.Resources.Memory
jobDockerDesc.CPU = job.Spec.Resources.CPU
jobDockerDesc.Memory = job.Spec.Resources.Memory

jobSpecDesc := jobSpecDescription{}
jobSpecDesc.Engine = job.Spec.Engine.String()
Expand All @@ -88,18 +122,49 @@ var describeCmd = &cobra.Command{
jobDealDesc.Concurrency = job.Deal.Concurrency

jobSpecDesc.Verifier = job.Spec.Verifier.String()
jobSpecDesc.VM = jobVMDesc
jobSpecDesc.Docker = jobDockerDesc

jobDesc := jobDescription{}
jobDesc.ID = job.ID
jobDesc.ClientID = job.ClientID
jobDesc.RequesterNodeID = job.RequesterNodeID
jobDesc.Spec = jobSpecDesc
jobDesc.Deal = job.Deal
jobDesc.State = states
jobDesc.State = map[string]stateDescription{}
for id, state := range states {
jobDesc.State[id] = stateDescription{
State: state.State.String(),
Status: state.Status,
ResultsID: state.ResultsID,
}
}
jobDesc.CreatedAt = job.CreatedAt
jobDesc.Events = []eventDescription{}
for _, event := range events {
jobDesc.Events = append(jobDesc.Events, eventDescription{
Event: event.EventName.String(),
Status: event.Status,
Time: event.EventTime.String(),
Concurrency: event.JobDeal.Concurrency,
SourceNode: event.SourceNodeID,
TargetNode: event.TargetNodeID,
})
}

jobDesc.LocalEvents = []localEventDescription{}
for _, event := range localEvents {
jobDesc.LocalEvents = append(jobDesc.LocalEvents, localEventDescription{
Event: event.EventName.String(),
TargetNode: event.TargetNodeID,
})
}

bytes, err := yaml.Marshal(jobDesc)
if err != nil {
log.Error().Msgf("Failure marshaling job description '%s': %s", jobID, err)
return err
}

bytes, _ := yaml.Marshal(jobDesc)
cmd.Print(string(bytes))

return nil
Expand Down
4 changes: 2 additions & 2 deletions cmd/bacalhau/describe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (suite *DescribeSuite) TestDescribeJob() {
require.Equal(suite.T(), submittedJob.ID, returnedJobDescription.ID, "IDs do not match.")
require.Equal(suite.T(),
submittedJob.Spec.Docker.Entrypoint[0],
returnedJobDescription.Spec.VM.Entrypoint[0],
returnedJobDescription.Spec.Docker.Entrypoint[0],
fmt.Sprintf("Submitted job entrypoints not the same as the description. %d - %d - %s - %d", tc.numberOfAcceptNodes, tc.numberOfRejectNodes, tc.jobState, n.numOfJobs))

// Job Id in the middle
Expand All @@ -121,7 +121,7 @@ func (suite *DescribeSuite) TestDescribeJob() {
require.Equal(suite.T(), submittedJob.ID, returnedJobDescription.ID, "IDs do not match.")
require.Equal(suite.T(),
submittedJob.Spec.Docker.Entrypoint[0],
returnedJobDescription.Spec.VM.Entrypoint[0],
returnedJobDescription.Spec.Docker.Entrypoint[0],
fmt.Sprintf("Submitted job entrypoints not the same as the description. %d - %d - %s - %d", tc.numberOfAcceptNodes, tc.numberOfRejectNodes, tc.jobState, n.numOfJobs))

}()
Expand Down
6 changes: 6 additions & 0 deletions cmd/bacalhau/devstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
var devStackNodes int
var devStackBadActors int
var devStackNoop bool
var devStackPeer string

func init() { // nolint:gochecknoinits // Using init in cobra command is idomatic
devstackCmd.PersistentFlags().IntVar(
Expand All @@ -37,6 +38,10 @@ func init() { // nolint:gochecknoinits // Using init in cobra command is idomati
&devStackNoop, "noop", false,
`Use the noop executor and verifier for all jobs`,
)
devstackCmd.PersistentFlags().StringVar(
&devStackPeer, "peer", "",
`Connect node 0 to another network node`,
)
}

var devstackCmd = &cobra.Command{
Expand Down Expand Up @@ -87,6 +92,7 @@ var devstackCmd = &cobra.Command{
getExecutors,
getVerifiers,
computenode.NewDefaultComputeNodeConfig(),
devStackPeer,
)
if err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion pkg/capacitymanager/capacitymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,16 @@ func (manager *CapacityManager) FilterRequirements(requirements ResourceUsageDat
return isOk, requirements
}

func (manager *CapacityManager) AddToBacklog(id string, requirements ResourceUsageData) {
func (manager *CapacityManager) AddToBacklog(id string, requirements ResourceUsageData) error {
existingItem := manager.backlog.Get(id)
if existingItem != nil {
return fmt.Errorf("job %s already in backlog", id)
}
manager.backlog.Add(CapacityManagerItem{
ID: id,
Requirements: requirements,
})
return nil
}

func (manager *CapacityManager) MoveToActive(id string) error {
Expand Down
32 changes: 31 additions & 1 deletion pkg/computenode/computenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ComputeNode struct {
verifiers map[verifier.VerifierType]verifier.Verifier
capacityManager *capacitymanager.CapacityManager
componentMu sync.Mutex
bidMu sync.Mutex
}

func NewDefaultComputeNodeConfig() ComputeNodeConfig {
Expand Down Expand Up @@ -138,8 +139,32 @@ func (node *ComputeNode) controlLoopSetup(cm *system.CleanupManager) {
// * add each bid on job to the "projected resources"
// * repeat until project resources >= total resources or no more jobs in queue
func (node *ComputeNode) controlLoopBidOnJobs() {
node.bidMu.Lock()
defer node.bidMu.Unlock()
bidJobIds := node.capacityManager.GetNextItems()
for _, id := range bidJobIds {
// CHECK WE DON@T HAVE A BID EVENT LOCALLY
jobLocalEvents, err := node.controller.GetJobLocalEvents(context.Background(), id)
if err != nil {
node.capacityManager.Remove(id)
continue
}

hasAlreadyBid := false

for _, localEvent := range jobLocalEvents {
if localEvent.EventName == executor.JobLocalEventBid {
hasAlreadyBid = true
break
}
}

if hasAlreadyBid {
log.Info().Msgf("node %s has already bid on job %s", node.id, id)
node.capacityManager.Remove(id)
continue
}

job, err := node.controller.GetJob(context.Background(), id)
if err != nil {
node.capacityManager.Remove(id)
Expand Down Expand Up @@ -175,6 +200,7 @@ func (node *ComputeNode) subscriptionSetup() {
}
switch jobEvent.EventName {
case executor.JobEventCreated:
log.Debug().Msgf("[%s] APPLES ARE SICK job created: %s", node.id, job.ID)
node.subscriptionEventCreated(ctx, jobEvent, job)
// we have been given the goahead to run the job
case executor.JobEventBidAccepted:
Expand Down Expand Up @@ -211,7 +237,11 @@ func (node *ComputeNode) subscriptionEventCreated(ctx context.Context, jobEvent
}

if selected {
node.capacityManager.AddToBacklog(job.ID, processedRequirements)
err = node.capacityManager.AddToBacklog(job.ID, processedRequirements)
if err != nil {
log.Info().Msgf("Error adding job to backlog on host %s: %v", node.id, err)
return
}
node.controlLoopBidOnJobs()
}
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,13 @@ func (ctrl *Controller) RejectResults(ctx context.Context, jobID, nodeID string)
// done by compute nodes when they hear about the job
func (ctrl *Controller) BidJob(ctx context.Context, jobID string) error {
jobCtx := ctrl.getJobNodeContext(ctx, jobID)
err := ctrl.db.AddLocalEvent(jobCtx, jobID, executor.JobLocalEvent{
EventName: executor.JobLocalEventBid,
JobID: jobID,
})
if err != nil {
return err
}
ctrl.addJobLifecycleEvent(jobCtx, jobID, "write_BidJob")
ev := ctrl.constructEvent(jobID, executor.JobEventBid)
// the target node is "us" because it is "us" who is bidding
Expand Down Expand Up @@ -412,8 +419,10 @@ func (ctrl *Controller) constructEvent(jobID string, eventName executor.JobEvent
}

func constructJob(ev executor.JobEvent) executor.Job {
log.Debug().Msgf("Constructing job from event: %+v", ev)
return executor.Job{
ID: ev.JobID,
ID: ev.JobID,
// TODO: add logging here
RequesterNodeID: ev.SourceNodeID,
ClientID: ev.ClientID,
Spec: ev.JobSpec,
Expand Down
Loading

0 comments on commit 69bcbd2

Please sign in to comment.