Skip to content
This repository has been archived by the owner on Nov 25, 2022. It is now read-only.

Commit

Permalink
refactor: async pull ceph metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian committed Mar 10, 2022
1 parent d6ca3e5 commit ce0230d
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 23 deletions.
40 changes: 26 additions & 14 deletions ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@ import (

akashv1 "github.com/ovrclk/akash/pkg/apis/akash.network/v1"
"github.com/ovrclk/akash/util/runner"
"github.com/pkg/errors"
rookv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/watch"
)

var (
cephInventoryInProgress = errors.New("ceph inventory is being updated")
)

type stats struct {
TotalBytes uint64 `json:"total_bytes"`
TotalAvailBytes uint64 `json:"total_avail_bytes"`
Expand Down Expand Up @@ -122,10 +127,17 @@ func (c *ceph) run() error {
clusters := make(cephClusters)
scs := make(cephStorageClasses)

var pendingReq []req
scrapeData := resp{
res: nil,
err: cephInventoryInProgress,
}

var scrapech <-chan runner.Result

scrapech = runner.Do(func() runner.Result {
return runner.NewResult(c.scrapeMetrics(scs.dup(), clusters.dup()))
})

for {
select {
case <-c.ctx.Done():
Expand Down Expand Up @@ -218,23 +230,23 @@ func (c *ceph) run() error {
}
}
case req := <-c.reqch:
pendingReq = append(pendingReq, req)
if scrapech == nil {
scrapech = runner.Do(func() runner.Result {
return runner.NewResult(c.scrapeMetrics(scs.dup(), clusters.dup()))
})
}
req.resp <- scrapeData
case res := <-scrapech:
scrapech = nil
r := resp{}
if err := res.Error(); err != nil {
r.err = cephInventoryInProgress
log.Error(err, "unable to pull ceph status")
}

for _, r := range pendingReq {
r.resp <- resp{
res: res.Value().([]akashv1.InventoryClusterStorage),
err: res.Error(),
}
if data, valid := res.Value().([]akashv1.InventoryClusterStorage); valid {
r.res = data
}

pendingReq = []req{}
scrapeData = r

scrapech = runner.Do(func() runner.Result {
return runner.NewResult(c.scrapeMetrics(scs.dup(), clusters.dup()))
})
}
}
}
Expand Down
43 changes: 34 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os/signal"
"path"
"strings"
"sync"
"time"

"github.com/cskr/pubsub"
Expand All @@ -28,6 +29,8 @@ import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"

"github.com/ovrclk/k8s-inventory-operator/util/runner"

"github.com/boz/go-lifecycle"
akashclientset "github.com/ovrclk/akash/pkg/client/clientset/versioned"
rookclientset "github.com/rook/rook/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -304,17 +307,39 @@ func newRouter(log logr.Logger, apiTimeout, queryTimeout time.Duration) *mux.Rou
},
}

for _, st := range storage {
ctx, cancel := context.WithTimeout(req.Context(), queryTimeout)
res, err := st.Query(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
inv.Status.Messages = append(inv.Status.Messages, err.Error())
log.Error(err, "query failed")
}
ctx, cancel := context.WithTimeout(req.Context(), 100*time.Second)
datach := make(chan runner.Result, 1)
var wg sync.WaitGroup

wg.Add(len(storage))

for idx := range storage {
go func(idx int) {
defer wg.Done()

datach <- runner.NewResult(storage[idx].Query(ctx))
}(idx)
}

cancel()
go func() {
defer cancel()
wg.Wait()
}()

inv.Spec.Storage = append(inv.Spec.Storage, res...)
done:
for {
select {
case <-ctx.Done():
break done
case res := <-datach:
if res.Error() != nil {
inv.Status.Messages = append(inv.Status.Messages, res.Error().Error())
}

if inventory, valid := res.Value().([]akashv1.InventoryClusterStorage); valid {
inv.Spec.Storage = append(inv.Spec.Storage, inventory...)
}
}
}

data, err := json.Marshal(&inv)
Expand Down
40 changes: 40 additions & 0 deletions util/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package runner

// Task is a function type which returns result instance
type Task func() Result

// Do executes task and send output to channel
func Do(task Task) <-chan Result {
ch := make(chan Result, 1)
go func() {
ch <- task()
}()
return ch
}

// Result interface wraps Value and Error methods.
type Result interface {
Value() interface{}
Error() error
}

// NewResult returns result instance with value as input
func NewResult(value interface{}, err error) Result {
return result{
value: value,
err: err,
}
}

type result struct {
value interface{}
err error
}

func (r result) Value() interface{} {
return r.value
}

func (r result) Error() error {
return r.err
}

0 comments on commit ce0230d

Please sign in to comment.