Skip to content

Commit

Permalink
fix: concurrency issues
Browse files Browse the repository at this point in the history
  • Loading branch information
mcharytoniuk committed Jul 17, 2024
1 parent e95f486 commit 6874008
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 5 deletions.
18 changes: 16 additions & 2 deletions loadbalancer/LlamaCppTarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ func (self *LlamaCppTarget) DecrementRemainingTicks() {
self.RemainingTicksUntilRemoved -= 1
}

func (self *LlamaCppTarget) GetSlotsStatus() (int, int) {
mutexToken := self.RBMutex.RLock()
defer self.RBMutex.RUnlock(mutexToken)

return self.LlamaCppHealthStatus.SlotsIdle, self.LlamaCppHealthStatus.SlotsProcessing
}

func (self *LlamaCppTarget) HasLessSlotsThan(other *LlamaCppTarget) bool {
mutexToken := self.RBMutex.RLock()
defer self.RBMutex.RUnlock(mutexToken)
Expand All @@ -44,6 +51,13 @@ func (self *LlamaCppTarget) HasLessSlotsThan(other *LlamaCppTarget) bool {
return self.LlamaCppHealthStatus.SlotsIdle < other.LlamaCppHealthStatus.SlotsIdle
}

func (self *LlamaCppTarget) HasRemainingTicks() bool {
mutexToken := self.RBMutex.RLock()
defer self.RBMutex.RUnlock(mutexToken)

return self.RemainingTicksUntilRemoved > 0
}

func (self *LlamaCppTarget) SetTickStatus(
lastUpdate time.Time,
llamaCppHealthStatus *llamacpp.LlamaCppHealthStatus,
Expand All @@ -52,8 +66,8 @@ func (self *LlamaCppTarget) SetTickStatus(
self.RBMutex.Lock()
defer self.RBMutex.Unlock()

slotsIdleDiff := self.LlamaCppHealthStatus.SlotsIdle-llamaCppHealthStatus.SlotsIdle
slotsProcessingDiff := self.LlamaCppHealthStatus.SlotsProcessing-llamaCppHealthStatus.SlotsProcessing
slotsIdleDiff := self.LlamaCppHealthStatus.SlotsIdle - llamaCppHealthStatus.SlotsIdle
slotsProcessingDiff := self.LlamaCppHealthStatus.SlotsProcessing - llamaCppHealthStatus.SlotsProcessing

self.LastUpdate = lastUpdate
self.LlamaCppHealthStatus = llamaCppHealthStatus
Expand Down
23 changes: 23 additions & 0 deletions loadbalancer/LoadBalancerTargetCollection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ func (self *LoadBalancerTargetCollection) FixTargetOrder(target *LlamaCppTarget)
return
}

self.TargetsRBMutex.Lock()
defer self.TargetsRBMutex.Unlock()

nextElement := element.Next()

for nextElement != nil {
Expand Down Expand Up @@ -62,6 +65,9 @@ func (self *LoadBalancerTargetCollection) GetTargetByConfiguration(
}

func (self *LoadBalancerTargetCollection) GetHeadTarget() *LlamaCppTarget {
mutexToken := self.TargetsRBMutex.RLock()
defer self.TargetsRBMutex.RUnlock(mutexToken)

headElement := self.Targets.Front()

if headElement == nil {
Expand All @@ -76,13 +82,19 @@ func (self *LoadBalancerTargetCollection) GetHeadTarget() *LlamaCppTarget {
}

func (self *LoadBalancerTargetCollection) Len() int {
mutexToken := self.TargetsRBMutex.RLock()
defer self.TargetsRBMutex.RUnlock(mutexToken)

return self.Targets.Len()
}

func (self *LoadBalancerTargetCollection) RegisterTarget(llamaCppTarget *LlamaCppTarget) {
self.setTargetByConfiguration(llamaCppTarget)
self.LlamaCppHealthStatusAggregate.AddSlotsFrom(llamaCppTarget)

self.TargetsRBMutex.Lock()
defer self.TargetsRBMutex.Unlock()

if self.Targets.Len() < 1 {
self.elementByTarget.Store(llamaCppTarget, self.Targets.PushFront(llamaCppTarget))

Expand All @@ -101,6 +113,9 @@ func (self *LoadBalancerTargetCollection) RegisterTarget(llamaCppTarget *LlamaCp
}

func (self *LoadBalancerTargetCollection) RemoveTarget(llamaCppTarget *LlamaCppTarget) {
self.TargetsRBMutex.Lock()
defer self.TargetsRBMutex.Unlock()

self.LlamaCppHealthStatusAggregate.RemoveSlotsFrom(llamaCppTarget)
element := self.getElementByTarget(llamaCppTarget)

Expand All @@ -124,6 +139,14 @@ func (self *LoadBalancerTargetCollection) UpdateTargetWithLlamaCppHealthStatus(

func (self *LoadBalancerTargetCollection) UseSlot(llamaCppTarget *LlamaCppTarget) {
targetElement := self.getElementByTarget(llamaCppTarget)

if targetElement == nil {
return
}

self.TargetsRBMutex.Lock()
defer self.TargetsRBMutex.Unlock()

nextTarget := targetElement.Next()

llamaCppTarget.DecrementIdleSlots()
Expand Down
12 changes: 9 additions & 3 deletions loadbalancer/LoadBalancerTemporalManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ func (self *LoadBalancerTemporalManager) ReduceTargetCollectionRemainingTicks()
var aggregatedSlotsIdle int
var aggregatedSlotsProcessing int

targetsMutexToken := self.LoadBalancerTargetCollection.TargetsRBMutex.RLock()

for element := self.LoadBalancerTargetCollection.Targets.Front(); element != nil; element = element.Next() {
if element.Value == nil {
continue
Expand All @@ -31,14 +33,18 @@ func (self *LoadBalancerTemporalManager) ReduceTargetCollectionRemainingTicks()
target := element.Value.(*LlamaCppTarget)
target.DecrementRemainingTicks()

if target.RemainingTicksUntilRemoved < 1 {
if !target.HasRemainingTicks() {
defer self.LoadBalancerTargetCollection.RemoveTarget(target)
}

aggregatedSlotsIdle += target.LlamaCppHealthStatus.SlotsIdle
aggregatedSlotsProcessing += target.LlamaCppHealthStatus.SlotsProcessing
slotsIdle, slotsProcessing := target.GetSlotsStatus()

aggregatedSlotsIdle += slotsIdle
aggregatedSlotsProcessing += slotsProcessing
}

self.LoadBalancerTargetCollection.TargetsRBMutex.RUnlock(targetsMutexToken)

self.LlamaCppHealthStatusAggregate.SetTo(
aggregatedSlotsIdle,
aggregatedSlotsProcessing,
Expand Down
3 changes: 3 additions & 0 deletions loadbalancer/RespondToAggregatedHealth.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ type RespondToAggregatedHealth struct {
}

func (self *RespondToAggregatedHealth) ServeHTTP(response http.ResponseWriter, request *http.Request) {
mutexToken := self.LlamaCppHealthStatusAggregate.RBMutex.RLock()
defer self.LlamaCppHealthStatusAggregate.RBMutex.RUnlock(mutexToken)

jsonLoadBalancerStatus, err := json.Marshal(self.LlamaCppHealthStatusAggregate.AggregatedHealthStatus)

if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions loadbalancer/StatsdReporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ func (self *StatsdReporter) ReportAggregatedHealthStatus(
bufferedRequestsStats *BufferedRequestsStats,
llamaCppHealthStatusAggregate *LlamaCppHealthStatusAggregate,
) error {
mutexToken := llamaCppHealthStatusAggregate.RBMutex.RLock()
defer llamaCppHealthStatusAggregate.RBMutex.RUnlock(mutexToken)

self.StatsdClient.Gauge("requests_buffered", int64(bufferedRequestsStats.RequestsBuffered))
self.StatsdClient.Gauge("slots_idle", int64(llamaCppHealthStatusAggregate.AggregatedHealthStatus.SlotsIdle))
self.StatsdClient.Gauge("slots_processing", int64(llamaCppHealthStatusAggregate.AggregatedHealthStatus.SlotsProcessing))
Expand Down
3 changes: 3 additions & 0 deletions management/RespondToDashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ func iterList[T any](loadBalancerCollection *list.List) <-chan T {
}

func (self *RespondToDashboard) ServeHTTP(response http.ResponseWriter, request *http.Request) {
mutexToken := self.LoadBalancer.LoadBalancerTargetCollection.TargetsRBMutex.RLock()
defer self.LoadBalancer.LoadBalancerTargetCollection.TargetsRBMutex.RUnlock(mutexToken)

response.Header().Set("Content-Type", "text/html")
response.WriteHeader(http.StatusOK)

Expand Down

0 comments on commit 6874008

Please sign in to comment.