Skip to content

Commit

Permalink
Merge pull request #442 from filecoin-project/127-sharding
Browse files Browse the repository at this point in the history
127 sharding
  • Loading branch information
binocarlos authored Aug 5, 2022
2 parents f2aa15c + a59ba54 commit be6565a
Show file tree
Hide file tree
Showing 70 changed files with 2,769 additions and 1,260 deletions.
30 changes: 14 additions & 16 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
# For some reason, .circlerc (I don't know where this file is generated) reports `go env GOPATH` as '/home/circleci/.go_workspace:/usr/local/go_workspace' (with the colon)
# This breaks normal pathing. So just installing in ./bin/
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | BINDIR=${HOME}/bin sh -s ${GOLANGCILINT}
golangci-lint --version
golangci-lint version
- run:
name: Run linter
Expand Down Expand Up @@ -177,21 +177,9 @@ jobs:
steps:
- checkout
- run:
name: Run Perf
command: |
cd benchmark
sudo apt install build-essential -y
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
source $HOME/.cargo/env
cargo install hyperfine
./start_and_run.sh
- run:
name: Upload perf results
name: Run Perf and Upload Results
command: |
export PATH="$HOME/.local/bin:${PATH}"
sudo apt install python3.10 python3-pip -y
pip3 install gsutil
export SHA="<< pipeline.git.revision >>"
export DATETIME="$(date -u +"%FT%H%MZ")"
export BRANCH="<<pipeline.git.branch>>"
Expand All @@ -200,12 +188,22 @@ jobs:
else
export TAG="NOTAG"
fi
sudo apt install python3.10 python3-pip build-essential -y
pip3 install gsutil
wget https://github.com/sharkdp/hyperfine/releases/download/v1.13.0/hyperfine_1.13.0_amd64.deb
sudo dpkg -i hyperfine_1.13.0_amd64.deb
echo "$GOOGLE_CLOUD_STORAGE_BACALHAU_CICD_RW" | base64 --decode > ~/.boto
gsutil -m cp benchmark/results/* "${GCS_PERF_RESULTS_BUCKET}/${DATETIME}-<<pipeline.git.branch>>-${TAG}-${SHA}"
(
cd benchmark
bash start_and_run.sh
)
gsutil -m cp benchmark/results/* "${GCS_PERF_RESULTS_BUCKET}/${DATETIME}-${BRANCH}-${TAG}-${SHA}"
- heroku/install
- run:
command: >
heroku run build --app bacalhau-dashboards # Updates dashboard data with latest from GCS
# Updates dashboard data with latest from GCS
heroku run build --app bacalhau-dashboards
release:
executor: linux
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ ops/terraform/.terraform/*
.terraform.lock.hcl
production.tfvars
ops/terraform/*.out
benchmark_hack/results
*_unit.json
.trunk/logs
benchmark/results
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ linters-settings:
- whyNoLint
- wrapperFunc
- hugeParam
- importShadow
gocyclo:
min-complexity: 18
goimports:
Expand Down Expand Up @@ -72,7 +73,6 @@ linters:
- deadcode
- depguard
- dogsled
- dupl
- errcheck
- exportloopref
- funlen
Expand Down
1 change: 1 addition & 0 deletions benchmark/submit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ COUNTER=1
while true; do
sleep 0.1
# trunk-ignore(shellcheck/SC2312)
# TODO: get the shard state to not be a number (which is brittle to test against)
if [[ $(${BACALHAU_BIN} --api-port="${API_PORT}" --api-host=localhost describe "${ID}" 2>&1|grep "State: Complete"|wc -l) -ne 3 ]]; then
echo "JOB ${ID} FAILED"
(( COUNTER++ ))
Expand Down
119 changes: 76 additions & 43 deletions cmd/bacalhau/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package bacalhau
import (
"context"
"fmt"
"sort"
"time"

"github.com/filecoin-project/bacalhau/pkg/executor"
jobutils "github.com/filecoin-project/bacalhau/pkg/job"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"gopkg.in/yaml.v3"
Expand All @@ -28,22 +30,28 @@ type localEventDescription struct {
TargetNode string `yaml:"TargetNode"`
}

type stateDescription struct {
State string `yaml:"State"`
Status string `yaml:"Status"`
ResultsID string `yaml:"Result CID"`
type shardNodeStateDescription struct {
Node string `yaml:"Node"`
State string `yaml:"State"`
Status string `yaml:"Status"`
ResultID string `yaml:"ResultID"`
}

type shardStateDescription struct {
ShardIndex int `yaml:"ShardIndex"`
Nodes []shardNodeStateDescription `yaml:"Nodes"`
}

type jobDescription struct {
ID string `yaml:"ID"`
ClientID string `yaml:"ClientID"`
RequesterNodeID string `yaml:"RequesterNodeID"`
Spec jobSpecDescription `yaml:"Spec"`
Deal executor.JobDeal `yaml:"Deal"`
State map[string]stateDescription `yaml:"State"`
Events []eventDescription `yaml:"Events"`
LocalEvents []localEventDescription `yaml:"Local Events"`
CreatedAt time.Time `yaml:"Start Time"`
ID string `yaml:"Id"`
ClientID string `yaml:"ClientID"`
RequesterNodeID string `yaml:"RequesterNodeId"`
Spec jobSpecDescription `yaml:"Spec"`
Deal executor.JobDeal `yaml:"Deal"`
Shards []shardStateDescription `yaml:"Shards"`
CreatedAt time.Time `yaml:"Start Time"`
Events []eventDescription `yaml:"Events"`
LocalEvents []localEventDescription `yaml:"LocalEvents"`
}

type jobSpecDescription struct {
Expand Down Expand Up @@ -77,7 +85,7 @@ var describeCmd = &cobra.Command{
RunE: func(cmd *cobra.Command, cmdArgs []string) error { // nolintunparam // incorrectly suggesting unused
inputJobID := cmdArgs[0]

j, ok, err := getAPIClient().Get(context.Background(), cmdArgs[0])
job, ok, err := getAPIClient().Get(context.Background(), cmdArgs[0])

if err != nil {
log.Error().Msgf("Failure retrieving job ID '%s': %s", inputJobID, err)
Expand All @@ -90,59 +98,84 @@ var describeCmd = &cobra.Command{
return err
}

jobID := j.ID

states, err := getAPIClient().GetExecutionStates(context.Background(), jobID)
state, err := getAPIClient().GetJobState(context.Background(), job.ID)
if err != nil {
log.Error().Msgf("Failure retrieving job states '%s': %s", jobID, err)
log.Error().Msgf("Failure retrieving job states '%s': %s", job.ID, err)
return err
}

events, err := getAPIClient().GetEvents(context.Background(), jobID)
events, err := getAPIClient().GetEvents(context.Background(), job.ID)
if err != nil {
log.Error().Msgf("Failure retrieving job events '%s': %s", jobID, err)
log.Error().Msgf("Failure retrieving job events '%s': %s", job.ID, err)
return err
}

localEvents, err := getAPIClient().GetLocalEvents(context.Background(), jobID)
localEvents, err := getAPIClient().GetLocalEvents(context.Background(), job.ID)
if err != nil {
log.Error().Msgf("Failure retrieving job events '%s': %s", jobID, err)
log.Error().Msgf("Failure retrieving job events '%s': %s", job.ID, err)
return err
}

jobDockerDesc := jobSpecDockerDescription{}
jobDockerDesc.Image = j.Spec.Docker.Image
jobDockerDesc.Entrypoint = j.Spec.Docker.Entrypoint
jobDockerDesc.Env = j.Spec.Docker.Env
jobDockerDesc.Image = job.Spec.Docker.Image
jobDockerDesc.Entrypoint = job.Spec.Docker.Entrypoint
jobDockerDesc.Env = job.Spec.Docker.Env

jobDockerDesc.CPU = j.Spec.Resources.CPU
jobDockerDesc.Memory = j.Spec.Resources.Memory
jobDockerDesc.CPU = job.Spec.Resources.CPU
jobDockerDesc.Memory = job.Spec.Resources.Memory

jobSpecDesc := jobSpecDescription{}
jobSpecDesc.Engine = j.Spec.Engine.String()
jobSpecDesc.Engine = job.Spec.Engine.String()

jobDealDesc := jobDealDescription{}
jobDealDesc.Concurrency = j.Deal.Concurrency
jobDealDesc.Concurrency = job.Deal.Concurrency

jobSpecDesc.Verifier = j.Spec.Verifier.String()
jobSpecDesc.Verifier = job.Spec.Verifier.String()
jobSpecDesc.Docker = jobDockerDesc

jobDesc := jobDescription{}
jobDesc.ID = j.ID
jobDesc.ClientID = j.ClientID
jobDesc.RequesterNodeID = j.RequesterNodeID
jobDesc.ID = job.ID
jobDesc.ClientID = job.ClientID
jobDesc.RequesterNodeID = job.RequesterNodeID
jobDesc.Spec = jobSpecDesc
jobDesc.Deal = j.Deal
jobDesc.State = map[string]stateDescription{}
for id, state := range states {
jobDesc.State[id] = stateDescription{
State: state.State.String(),
Status: state.Status,
ResultsID: state.ResultsID,
jobDesc.Deal = job.Deal
jobDesc.CreatedAt = job.CreatedAt
jobDesc.Events = []eventDescription{}

shardDescriptions := map[int]shardStateDescription{}

for _, shard := range jobutils.FlattenShardStates(state) {
shardDescription, ok := shardDescriptions[shard.ShardIndex]
if !ok {
shardDescription = shardStateDescription{
ShardIndex: shard.ShardIndex,
Nodes: []shardNodeStateDescription{},
}
}
shardDescription.Nodes = append(shardDescription.Nodes, shardNodeStateDescription{
Node: shard.NodeID,
State: shard.State.String(),
Status: shard.Status,
ResultID: shard.ResultsID,
})
shardDescriptions[shard.ShardIndex] = shardDescription
}
jobDesc.CreatedAt = j.CreatedAt
jobDesc.Events = []eventDescription{}

shardIndexes := []int{}
for shardIndex := range shardDescriptions {
shardIndexes = append(shardIndexes, shardIndex)
}

sort.Ints(shardIndexes)

finalDescriptions := []shardStateDescription{}

for _, shardIndex := range shardIndexes {
finalDescriptions = append(finalDescriptions, shardDescriptions[shardIndex])
}

jobDesc.Shards = finalDescriptions

for _, event := range events {
jobDesc.Events = append(jobDesc.Events, eventDescription{
Event: event.EventName.String(),
Expand All @@ -164,7 +197,7 @@ var describeCmd = &cobra.Command{

bytes, err := yaml.Marshal(jobDesc)
if err != nil {
log.Error().Msgf("Failure marshaling job description '%s': %s", jobID, err)
log.Error().Msgf("Failure marshaling job description '%s': %s", job.ID, err)
return err
}

Expand Down
43 changes: 39 additions & 4 deletions cmd/bacalhau/devstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import (
"os"
"strconv"

"github.com/filecoin-project/bacalhau/pkg/capacitymanager"
"github.com/filecoin-project/bacalhau/pkg/computenode"
"github.com/filecoin-project/bacalhau/pkg/config"
"github.com/filecoin-project/bacalhau/pkg/controller"
"github.com/filecoin-project/bacalhau/pkg/devstack"
"github.com/filecoin-project/bacalhau/pkg/executor"
noop_executor "github.com/filecoin-project/bacalhau/pkg/executor/noop"
executor_util "github.com/filecoin-project/bacalhau/pkg/executor/util"
"github.com/filecoin-project/bacalhau/pkg/storage"
"github.com/filecoin-project/bacalhau/pkg/system"
"github.com/filecoin-project/bacalhau/pkg/verifier"
verifier_util "github.com/filecoin-project/bacalhau/pkg/verifier/util"
Expand Down Expand Up @@ -42,6 +45,9 @@ func init() { //nolint:gochecknoinits // Using init in cobra command is idomatic
&devStackPeer, "peer", "",
`Connect node 0 to another network node`,
)

setupJobSelectionCLIFlags(devstackCmd)
setupCapacityManagerCLIFlags(devstackCmd)
}

var devstackCmd = &cobra.Command{
Expand All @@ -64,7 +70,17 @@ var devstackCmd = &cobra.Command{
ctx, cancel := system.WithSignalShutdown(context.Background())
defer cancel()

getExecutors := func(ipfsMultiAddress string, nodeIndex int) (
getStorageProviders := func(ipfsMultiAddress string, nodeIndex int) (
map[storage.StorageSourceType]storage.StorageProvider, error) {

if devStackNoop {
return executor_util.NewNoopStorageProviders(cm)
}

return executor_util.NewStandardStorageProviders(cm, ipfsMultiAddress)
}

getExecutors := func(ipfsMultiAddress string, nodeIndex int, ctrl *controller.Controller) (
map[executor.EngineType]executor.Executor, error) {

if devStackNoop {
Expand All @@ -75,23 +91,42 @@ var devstackCmd = &cobra.Command{
ipfsMultiAddress, fmt.Sprintf("devstacknode%d", nodeIndex))
}

getVerifiers := func(ipfsMultiAddress string, nodeIndex int) ( //nolint:unparam // nodeIndex will be used in the future
// nodeIndex will be used in the future
getVerifiers := func(ipfsMultiAddress string, nodeIndex int, ctrl *controller.Controller) ( //nolint:unparam,lll
map[verifier.VerifierType]verifier.Verifier, error) {

if devStackNoop {
return verifier_util.NewNoopVerifiers(cm)
}

return verifier_util.NewIPFSVerifiers(cm, ipfsMultiAddress)
jobLoader := func(ctx context.Context, id string) (executor.Job, error) {
return ctrl.GetJob(ctx, id)
}
stateLoader := func(ctx context.Context, id string) (executor.JobState, error) {
return ctrl.GetJobState(ctx, id)
}
return verifier_util.NewIPFSVerifiers(cm, ipfsMultiAddress, jobLoader, stateLoader)
}

jobSelectionPolicy := getJobSelectionConfig()
totalResourceLimit, jobResourceLimit := getCapacityManagerConfig()

computeNodeConfig := computenode.ComputeNodeConfig{
JobSelectionPolicy: jobSelectionPolicy,
CapacityManagerConfig: capacitymanager.Config{
ResourceLimitTotal: totalResourceLimit,
ResourceLimitJob: jobResourceLimit,
},
}

stack, err := devstack.NewDevStack(
cm,
devStackNodes,
devStackBadActors,
getStorageProviders,
getExecutors,
getVerifiers,
computenode.NewDefaultComputeNodeConfig(),
computeNodeConfig,
devStackPeer,
)
if err != nil {
Expand Down
Loading

0 comments on commit be6565a

Please sign in to comment.