Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Make Bookkeeping client thread-safe #523

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 117 additions & 22 deletions core/integration/bookkeeping/plugin.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2021 CERN and copyright holders of ALICE O².
* Copyright 2021-2024 CERN and copyright holders of ALICE O².
* Author: Claire Guyot <[email protected]>
* Teo Mrnjavac <[email protected]>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -72,10 +73,15 @@ type Plugin struct {
missingUpdateRunStarts map[string] /*envId*/ bool
missingUpdateRunStartsMu sync.Mutex
pendingRunStops map[string] /*envId*/ int64
pendingRunStopsMu sync.Mutex
pendingO2Starts map[string] /*envId*/ bool
pendingO2StartsMu sync.Mutex
pendingO2Stops map[string] /*envId*/ bool
pendingO2StopsMu sync.Mutex
pendingTrgStarts map[string] /*envId*/ bool
pendingTrgStartsMu sync.Mutex
pendingTrgStops map[string] /*envId*/ bool
pendingTrgStopsMu sync.Mutex
}

/**********************************************/
Expand Down Expand Up @@ -143,6 +149,9 @@ func (p *Plugin) getMissingUpdateRunStartsForEnvs(envIds []uid.ID) map[uid.ID]bo
}

func (p *Plugin) getPendingRunStopsForEnvs(envIds []uid.ID) map[uid.ID]string {
p.pendingRunStopsMu.Lock()
defer p.pendingRunStopsMu.Unlock()

if p.pendingRunStops == nil {
return nil
}
Expand All @@ -158,6 +167,9 @@ func (p *Plugin) getPendingRunStopsForEnvs(envIds []uid.ID) map[uid.ID]string {
}

func (p *Plugin) getPendingO2StartsForEnvs(envIds []uid.ID) map[uid.ID]bool {
p.pendingO2StartsMu.Lock()
defer p.pendingO2StartsMu.Unlock()

if p.pendingO2Starts == nil {
return nil
}
Expand All @@ -173,6 +185,9 @@ func (p *Plugin) getPendingO2StartsForEnvs(envIds []uid.ID) map[uid.ID]bool {
}

func (p *Plugin) getPendingO2StopsForEnvs(envIds []uid.ID) map[uid.ID]bool {
p.pendingO2StopsMu.Lock()
defer p.pendingO2StopsMu.Unlock()

if p.pendingO2Stops == nil {
return nil
}
Expand All @@ -188,6 +203,9 @@ func (p *Plugin) getPendingO2StopsForEnvs(envIds []uid.ID) map[uid.ID]bool {
}

func (p *Plugin) getPendingTrgStartsForEnvs(envIds []uid.ID) map[uid.ID]bool {
p.pendingTrgStartsMu.Lock()
defer p.pendingTrgStartsMu.Unlock()

if p.pendingTrgStarts == nil {
return nil
}
Expand All @@ -203,6 +221,9 @@ func (p *Plugin) getPendingTrgStartsForEnvs(envIds []uid.ID) map[uid.ID]bool {
}

func (p *Plugin) getPendingTrgStopsForEnvs(envIds []uid.ID) map[uid.ID]bool {
p.pendingTrgStopsMu.Lock()
defer p.pendingTrgStopsMu.Unlock()

if p.pendingTrgStops == nil {
return nil
}
Expand Down Expand Up @@ -559,11 +580,21 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
p.missingUpdateRunStartsMu.Lock()
p.missingUpdateRunStarts[envId] = true
p.missingUpdateRunStartsMu.Unlock()
p.pendingRunStopsMu.Lock()
p.pendingRunStops[envId] = runNumber64
p.pendingRunStopsMu.Unlock()
p.pendingO2StartsMu.Lock()
p.pendingO2Starts[envId] = true
p.pendingO2StartsMu.Unlock()
p.pendingO2StopsMu.Lock()
p.pendingO2Stops[envId] = true
p.pendingO2StopsMu.Unlock()
p.pendingTrgStartsMu.Lock()
p.pendingTrgStarts[envId] = true
p.pendingTrgStartsMu.Unlock()
p.pendingTrgStopsMu.Lock()
p.pendingTrgStops[envId] = true
p.pendingTrgStopsMu.Unlock()
log.WithField("run", runNumber64).
WithField("partition", envId).
Debug("Bookkeeping API RunServiceClient: Create call successful")
Expand Down Expand Up @@ -751,10 +782,17 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
// If UpdateRunStart was not called and the Trg start time is missing,
// it is set to the O2 start time.
p.missingUpdateRunStartsMu.Lock()
if p.missingUpdateRunStarts[envId] == true && trgEnabled && timeTrgStartOutput == nil {
if p.pendingO2Starts[envId] == false {
currentMissingUpdateRunStarts := p.missingUpdateRunStarts[envId]
p.missingUpdateRunStartsMu.Unlock()
if currentMissingUpdateRunStarts == true && trgEnabled && timeTrgStartOutput == nil {
p.pendingO2StartsMu.Lock()
currentPendingO2Starts := p.pendingO2Starts[envId]
p.pendingO2StartsMu.Unlock()
if currentPendingO2Starts == false {
timeTrgStartOutput = timeO2StartOutput
p.pendingTrgStartsMu.Lock()
p.pendingTrgStarts[envId] = false
p.pendingTrgStartsMu.Unlock()
log.WithField("run", runNumber64).
WithField("partition", envId).
Debug("Bookkeeping API RunServiceClient: Update call: completing missing Trg start timestamp after missing UpdateRunStart call")
Expand All @@ -764,7 +802,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
Warning("Bookkeeping API RunServiceClient: Update call: run information incomplete, missing O2 start time after missing UpdateRunStart call")
}
}
p.missingUpdateRunStartsMu.Unlock()
timeTrgEndTemp, err = strconv.ParseInt(timeTrgEndInput, 10, 64)
if err != nil {
log.WithField("run", runNumber64).
Expand Down Expand Up @@ -803,7 +840,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
// because it means that even if they weren't missing during the UpdateRunStart call,
// they will be the same and can be overwritten.
p.missingUpdateRunStartsMu.Lock()
if p.missingUpdateRunStarts[envId] == true && timeO2StartOutput == nil {
currentMissingUpdateRunStarts := p.missingUpdateRunStarts[envId]
p.missingUpdateRunStartsMu.Unlock()
if currentMissingUpdateRunStarts == true && timeO2StartOutput == nil {
log.WithField("run", runNumber64).
WithField("partition", envId).
Warning("Bookkeeping API RunServiceClient: Update call: run information incomplete, missing SOR timestamps after missing UpdateRunStart call")
Expand Down Expand Up @@ -840,7 +879,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
ReadoutCfgUri: &readoutUri,
}
}
p.missingUpdateRunStartsMu.Unlock()
}

// Send the run update request
Expand All @@ -864,24 +902,52 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
// If the update is successful and it is an UpdateRunStop call, we check if we are missing
// EOR timestamps, and if that is the case, we set them to Now() and send a new update request
p.missingUpdateRunStartsMu.Lock()
if p.missingUpdateRunStarts[envId] == true {
defer delete(p.pendingO2Starts, envId)
defer delete(p.pendingTrgStarts, envId)
}
currentMissingUpdateRunStarts := p.missingUpdateRunStarts[envId]
p.missingUpdateRunStartsMu.Unlock()
defer delete(p.pendingRunStops, envId)
defer delete(p.pendingO2Stops, envId)
defer delete(p.pendingTrgStops, envId)
if p.pendingO2Stops[envId] == true || (trgEnabled && p.pendingTrgStops[envId] == true) {
if currentMissingUpdateRunStarts == true {
defer func() {
p.pendingO2StartsMu.Lock()
delete(p.pendingO2Starts, envId)
p.pendingO2StartsMu.Unlock()
}()
defer func() {
p.pendingTrgStartsMu.Lock()
delete(p.pendingTrgStarts, envId)
p.pendingTrgStartsMu.Unlock()
}()
}
defer func() {
p.pendingRunStopsMu.Lock()
delete(p.pendingRunStops, envId)
p.pendingRunStopsMu.Unlock()
}()
defer func() {
p.pendingO2StopsMu.Lock()
delete(p.pendingO2Stops, envId)
p.pendingO2StopsMu.Unlock()
}()
defer func() {
p.pendingTrgStopsMu.Lock()
delete(p.pendingTrgStops, envId)
p.pendingTrgStopsMu.Unlock()
}()

p.pendingO2StopsMu.Lock()
currentPendingO2Stops := p.pendingO2Stops[envId]
p.pendingO2StopsMu.Unlock()
p.pendingTrgStopsMu.Lock()
currentPendingTrgStops := p.pendingTrgStops[envId]
p.pendingTrgStopsMu.Unlock()
if currentPendingO2Stops == true || (trgEnabled && currentPendingTrgStops == true) {
updatedRun = "INCOMPLETE STOP"
if p.pendingO2Stops[envId] == true {
if currentPendingO2Stops == true {
timeO2EndTemp = time.Now().UnixMilli()
timeO2EndOutput = &timeO2EndTemp
log.WithField("run", runNumber64).
WithField("partition", envId).
Warning("Bookkeeping API RunServiceClient: Update call: run information incomplete, missing O2 end time")
}
if trgEnabled && p.pendingTrgStops[envId] == true {
if trgEnabled && currentPendingTrgStops == true {
timeTrgEndTemp = time.Now().UnixMilli()
timeTrgEndOutput = &timeO2EndTemp
log.WithField("run", runNumber64).
Expand Down Expand Up @@ -918,18 +984,32 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
} else if function, ok := varStack["__call_func"]; ok && strings.Contains(function, "UpdateRunStart") {
// If the update is successful and it is an UpdateRunStart call, we check if we are missing
// SOR timestamps, and if that is the case, we set them to Now() and send a new update request
defer delete(p.pendingO2Starts, envId)
defer delete(p.pendingTrgStarts, envId)
if p.pendingO2Starts[envId] == true || (trgEnabled && p.pendingTrgStarts[envId] == true) {
defer func() {
p.pendingO2StartsMu.Lock()
delete(p.pendingO2Starts, envId)
p.pendingO2StartsMu.Unlock()
}()
defer func() {
p.pendingTrgStartsMu.Lock()
delete(p.pendingTrgStarts, envId)
p.pendingTrgStartsMu.Unlock()
}()
p.pendingO2StartsMu.Lock()
currentPendingO2Starts := p.pendingO2Starts[envId]
p.pendingO2StartsMu.Unlock()
p.pendingTrgStartsMu.Lock()
currentPendingTrgStarts := p.pendingTrgStarts[envId]
p.pendingTrgStartsMu.Unlock()
if currentPendingO2Starts == true || (trgEnabled && currentPendingTrgStarts == true) {
updatedRun = "INCOMPLETE START"
if p.pendingO2Starts[envId] == true {
if currentPendingO2Starts == true {
timeO2StartTemp = time.Now().UnixMilli()
timeO2StartOutput = &timeO2StartTemp
log.WithField("run", runNumber64).
WithField("partition", envId).
Warning("Bookkeeping API RunServiceClient: Update call: run information incomplete, missing O2 start time")
}
if trgEnabled && p.pendingTrgStarts[envId] == true {
if trgEnabled && currentPendingTrgStarts == true {
timeTrgStartTemp = time.Now().UnixMilli()
timeTrgStartOutput = &timeO2StartTemp
log.WithField("run", runNumber64).
Expand Down Expand Up @@ -1009,11 +1089,15 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

O2StartTime := varStack["run_start_time_ms"]
if O2StartTime != "" {
p.pendingO2StartsMu.Lock()
p.pendingO2Starts[envId] = false
p.pendingO2StartsMu.Unlock()
}
TrgStartTime := varStack["trg_start_time_ms"]
if TrgStartTime != "" {
p.pendingTrgStartsMu.Lock()
p.pendingTrgStarts[envId] = false
p.pendingTrgStartsMu.Unlock()
}

p.missingUpdateRunStartsMu.Lock()
Expand Down Expand Up @@ -1071,23 +1155,34 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

O2StartTime := varStack["run_start_time_ms"]
if O2StartTime != "" {
p.pendingO2StartsMu.Lock()
p.pendingO2Starts[envId] = false
p.pendingO2StartsMu.Unlock()
}
O2EndTime := varStack["run_end_time_ms"]
if O2EndTime != "" {
p.pendingO2StopsMu.Lock()
p.pendingO2Stops[envId] = false
p.pendingO2StopsMu.Unlock()
}

TrgStartTime := varStack["trg_start_time_ms"]
if TrgStartTime != "" {
p.pendingTrgStartsMu.Lock()
p.pendingTrgStarts[envId] = false
p.pendingTrgStartsMu.Unlock()
}
TrgEndTime := varStack["trg_end_time_ms"]
if TrgEndTime != "" {
p.pendingTrgStopsMu.Lock()
p.pendingTrgStops[envId] = false
p.pendingTrgStopsMu.Unlock()
}

if _, ok = p.pendingRunStops[envId]; ok {
p.pendingRunStopsMu.Lock()
_, hasPendingRunStops := p.pendingRunStops[envId]
p.pendingRunStopsMu.Unlock()
if hasPendingRunStops {
return updateRunFunc(runNumber64, "test", O2StartTime, O2EndTime, TrgStartTime, TrgEndTime)
} else {
log.WithField("partition", envId).
Expand Down
Loading