Skip to content

Commit

Permalink
feat: initial worker framework
Browse files Browse the repository at this point in the history
  • Loading branch information
agaffney committed Aug 31, 2023
1 parent f42268d commit 173c122
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 0 deletions.
5 changes: 5 additions & 0 deletions cmd/bluefin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/blinklabs-io/bluefin/internal/logging"
"github.com/blinklabs-io/bluefin/internal/storage"
"github.com/blinklabs-io/bluefin/internal/wallet"
"github.com/blinklabs-io/bluefin/internal/worker"
)

var cmdlineFlags struct {
Expand Down Expand Up @@ -69,6 +70,10 @@ func main() {
logger.Fatalf("failed to start indexer: %s", err)
}

// TODO: remove me
// This should be started by the indexer reaching chain tip
worker.GetManager().Start(worker.WorkerParams{})

// Wait forever
select {}
}
11 changes: 11 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package config
import (
"fmt"
"os"
"runtime"

"github.com/kelseyhightower/envconfig"
"gopkg.in/yaml.v2"
Expand All @@ -26,6 +27,7 @@ type Config struct {
Storage StorageConfig `yaml:"storage"`
Indexer IndexerConfig `yaml:"indexer"`
Wallet WalletConfig `yaml:"wallet"`
Worker WorkerConfig `yaml:"worker"`
Logging LoggingConfig `yaml:"logging"`
Metrics MetricsConfig `yaml:"metrics"`
Genesis GenesisConfig `yaml:"genesis"`
Expand All @@ -50,6 +52,10 @@ type WalletConfig struct {
Mnemonic string `yaml:"mnemonic" envconfig:"MNEMONIC"`
}

type WorkerConfig struct {
Count int `yaml:"count" envconfig:"WORKER_COUNT"`
}

type LoggingConfig struct {
Healthchecks bool `yaml:"healthchecks" envconfig:"LOGGING_HEALTHCHECKS"`
Level string `yaml:"level" envconfig:"LOGGING_LEVEL"`
Expand Down Expand Up @@ -103,6 +109,11 @@ var globalConfig = &Config{
// TODO: pick a better location
Directory: "./.bluefin",
},
// The default worker config is somewhat conservative: worker count is set
// to half of the available logical CPUs
Worker: WorkerConfig{
Count: runtime.NumCPU() / 2,
},
}

func Load(configFile string) (*Config, error) {
Expand Down
85 changes: 85 additions & 0 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package worker

import (
"math/rand"
"sync"
"time"

"github.com/blinklabs-io/bluefin/internal/config"
"github.com/blinklabs-io/bluefin/internal/logging"
)

type Manager struct {
workerWaitGroup sync.WaitGroup
doneChan chan any
resultChan chan any
// TODO
}

type WorkerParams struct {
// TODO
}

var globalManager = &Manager{
// TODO
}

func (m *Manager) Reset() {
m.workerWaitGroup = sync.WaitGroup{}
m.doneChan = make(chan any)
m.resultChan = make(chan any)
}

func (m *Manager) Stop() {
close(m.doneChan)
m.workerWaitGroup.Wait()
logging.GetLogger().Infof("stopped workers")
}

func (m *Manager) Start(params WorkerParams) {
cfg := config.GetConfig()
logger := logging.GetLogger()
// Start workers
m.Reset()
logger.Infof("starting %d workers", cfg.Worker.Count)
for i := 0; i < cfg.Worker.Count; i++ {
go func(workerIdx int) {
defer m.workerWaitGroup.Done()
for {
// Check for worker shutdown
select {
case <-m.doneChan:
return
default:
break
}
// TODO
randVal := rand.Intn(100)
if randVal == 42 {
logger.Infof("worker %d found result", workerIdx)
m.resultChan <- randVal
}
time.Sleep(1 * time.Second)
}
}(i)
m.workerWaitGroup.Add(1)
}
// Wait for result
go func() {
select {
case <-m.doneChan:
return
case result := <-m.resultChan:
// TODO: do something useful with result
// TODO: let the indexer receiving an update to the script's UTxOs restart the workers
logger.Infof("result = %#v", result)
// Restart workers as a simple test
m.Stop()
m.Start(WorkerParams{})
}
}()
}

func GetManager() *Manager {
return globalManager
}

0 comments on commit 173c122

Please sign in to comment.