From 1198cea2dfa38af527e1b2b6046632d3cae1d3a5 Mon Sep 17 00:00:00 2001 From: Teo Mrnjavac Date: Mon, 4 Mar 2024 17:21:47 +0100 Subject: [PATCH] [core] Make Bookkeeping client thread-safe --- core/integration/bookkeeping/plugin.go | 139 +++++++++++++++++++++---- 1 file changed, 117 insertions(+), 22 deletions(-) diff --git a/core/integration/bookkeeping/plugin.go b/core/integration/bookkeeping/plugin.go index e6d46502..7b58bd7c 100644 --- a/core/integration/bookkeeping/plugin.go +++ b/core/integration/bookkeeping/plugin.go @@ -1,8 +1,9 @@ /* * === This file is part of ALICE O² === * -* Copyright 2021 CERN and copyright holders of ALICE O². +* Copyright 2021-2024 CERN and copyright holders of ALICE O². * Author: Claire Guyot +* Teo Mrnjavac * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -72,10 +73,15 @@ type Plugin struct { missingUpdateRunStarts map[string] /*envId*/ bool missingUpdateRunStartsMu sync.Mutex pendingRunStops map[string] /*envId*/ int64 + pendingRunStopsMu sync.Mutex pendingO2Starts map[string] /*envId*/ bool + pendingO2StartsMu sync.Mutex pendingO2Stops map[string] /*envId*/ bool + pendingO2StopsMu sync.Mutex pendingTrgStarts map[string] /*envId*/ bool + pendingTrgStartsMu sync.Mutex pendingTrgStops map[string] /*envId*/ bool + pendingTrgStopsMu sync.Mutex } /**********************************************/ @@ -143,6 +149,9 @@ func (p *Plugin) getMissingUpdateRunStartsForEnvs(envIds []uid.ID) map[uid.ID]bo } func (p *Plugin) getPendingRunStopsForEnvs(envIds []uid.ID) map[uid.ID]string { + p.pendingRunStopsMu.Lock() + defer p.pendingRunStopsMu.Unlock() + if p.pendingRunStops == nil { return nil } @@ -158,6 +167,9 @@ func (p *Plugin) getPendingRunStopsForEnvs(envIds []uid.ID) map[uid.ID]string { } func (p *Plugin) getPendingO2StartsForEnvs(envIds []uid.ID) map[uid.ID]bool { + p.pendingO2StartsMu.Lock() + defer p.pendingO2StartsMu.Unlock() + if p.pendingO2Starts == nil { return nil } @@ -173,6 +185,9 @@ func (p *Plugin) getPendingO2StartsForEnvs(envIds []uid.ID) map[uid.ID]bool { } func (p *Plugin) getPendingO2StopsForEnvs(envIds []uid.ID) map[uid.ID]bool { + p.pendingO2StopsMu.Lock() + defer p.pendingO2StopsMu.Unlock() + if p.pendingO2Stops == nil { return nil } @@ -188,6 +203,9 @@ func (p *Plugin) getPendingO2StopsForEnvs(envIds []uid.ID) map[uid.ID]bool { } func (p *Plugin) getPendingTrgStartsForEnvs(envIds []uid.ID) map[uid.ID]bool { + p.pendingTrgStartsMu.Lock() + defer p.pendingTrgStartsMu.Unlock() + if p.pendingTrgStarts == nil { return nil } @@ -203,6 +221,9 @@ func (p *Plugin) getPendingTrgStartsForEnvs(envIds []uid.ID) map[uid.ID]bool { } func (p *Plugin) getPendingTrgStopsForEnvs(envIds []uid.ID) map[uid.ID]bool { + p.pendingTrgStopsMu.Lock() + defer p.pendingTrgStopsMu.Unlock() + if p.pendingTrgStops == nil { return nil } @@ -559,11 +580,21 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { p.missingUpdateRunStartsMu.Lock() p.missingUpdateRunStarts[envId] = true p.missingUpdateRunStartsMu.Unlock() + p.pendingRunStopsMu.Lock() p.pendingRunStops[envId] = runNumber64 + p.pendingRunStopsMu.Unlock() + p.pendingO2StartsMu.Lock() p.pendingO2Starts[envId] = true + p.pendingO2StartsMu.Unlock() + p.pendingO2StopsMu.Lock() p.pendingO2Stops[envId] = true + p.pendingO2StopsMu.Unlock() + p.pendingTrgStartsMu.Lock() p.pendingTrgStarts[envId] = true + p.pendingTrgStartsMu.Unlock() + p.pendingTrgStopsMu.Lock() p.pendingTrgStops[envId] = true + p.pendingTrgStopsMu.Unlock() log.WithField("run", runNumber64). WithField("partition", envId). Debug("Bookkeeping API RunServiceClient: Create call successful") @@ -751,10 +782,17 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { // If UpdateRunStart was not called and the Trg start time is missing, // it is set to the O2 start time. p.missingUpdateRunStartsMu.Lock() - if p.missingUpdateRunStarts[envId] == true && trgEnabled && timeTrgStartOutput == nil { - if p.pendingO2Starts[envId] == false { + currentMissingUpdateRunStarts := p.missingUpdateRunStarts[envId] + p.missingUpdateRunStartsMu.Unlock() + if currentMissingUpdateRunStarts == true && trgEnabled && timeTrgStartOutput == nil { + p.pendingO2StartsMu.Lock() + currentPendingO2Starts := p.pendingO2Starts[envId] + p.pendingO2StartsMu.Unlock() + if currentPendingO2Starts == false { timeTrgStartOutput = timeO2StartOutput + p.pendingTrgStartsMu.Lock() p.pendingTrgStarts[envId] = false + p.pendingTrgStartsMu.Unlock() log.WithField("run", runNumber64). WithField("partition", envId). Debug("Bookkeeping API RunServiceClient: Update call: completing missing Trg start timestamp after missing UpdateRunStart call") @@ -764,7 +802,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Warning("Bookkeeping API RunServiceClient: Update call: run information incomplete, missing O2 start time after missing UpdateRunStart call") } } - p.missingUpdateRunStartsMu.Unlock() timeTrgEndTemp, err = strconv.ParseInt(timeTrgEndInput, 10, 64) if err != nil { log.WithField("run", runNumber64). @@ -803,7 +840,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { // because it means that even if they weren't missing during the UpdateRunStart call, // they will be the same and can be overwritten. p.missingUpdateRunStartsMu.Lock() - if p.missingUpdateRunStarts[envId] == true && timeO2StartOutput == nil { + currentMissingUpdateRunStarts := p.missingUpdateRunStarts[envId] + p.missingUpdateRunStartsMu.Unlock() + if currentMissingUpdateRunStarts == true && timeO2StartOutput == nil { log.WithField("run", runNumber64). WithField("partition", envId). Warning("Bookkeeping API RunServiceClient: Update call: run information incomplete, missing SOR timestamps after missing UpdateRunStart call") @@ -840,7 +879,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { ReadoutCfgUri: &readoutUri, } } - p.missingUpdateRunStartsMu.Unlock() } // Send the run update request @@ -864,24 +902,52 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { // If the update is successful and it is an UpdateRunStop call, we check if we are missing // EOR timestamps, and if that is the case, we set them to Now() and send a new update request p.missingUpdateRunStartsMu.Lock() - if p.missingUpdateRunStarts[envId] == true { - defer delete(p.pendingO2Starts, envId) - defer delete(p.pendingTrgStarts, envId) - } + currentMissingUpdateRunStarts := p.missingUpdateRunStarts[envId] p.missingUpdateRunStartsMu.Unlock() - defer delete(p.pendingRunStops, envId) - defer delete(p.pendingO2Stops, envId) - defer delete(p.pendingTrgStops, envId) - if p.pendingO2Stops[envId] == true || (trgEnabled && p.pendingTrgStops[envId] == true) { + if currentMissingUpdateRunStarts == true { + defer func() { + p.pendingO2StartsMu.Lock() + delete(p.pendingO2Starts, envId) + p.pendingO2StartsMu.Unlock() + }() + defer func() { + p.pendingTrgStartsMu.Lock() + delete(p.pendingTrgStarts, envId) + p.pendingTrgStartsMu.Unlock() + }() + } + defer func() { + p.pendingRunStopsMu.Lock() + delete(p.pendingRunStops, envId) + p.pendingRunStopsMu.Unlock() + }() + defer func() { + p.pendingO2StopsMu.Lock() + delete(p.pendingO2Stops, envId) + p.pendingO2StopsMu.Unlock() + }() + defer func() { + p.pendingTrgStopsMu.Lock() + delete(p.pendingTrgStops, envId) + p.pendingTrgStopsMu.Unlock() + }() + + p.pendingO2StopsMu.Lock() + currentPendingO2Stops := p.pendingO2Stops[envId] + p.pendingO2StopsMu.Unlock() + p.pendingTrgStopsMu.Lock() + currentPendingTrgStops := p.pendingTrgStops[envId] + p.pendingTrgStopsMu.Unlock() + if currentPendingO2Stops == true || (trgEnabled && currentPendingTrgStops == true) { updatedRun = "INCOMPLETE STOP" - if p.pendingO2Stops[envId] == true { + if currentPendingO2Stops == true { timeO2EndTemp = time.Now().UnixMilli() timeO2EndOutput = &timeO2EndTemp log.WithField("run", runNumber64). WithField("partition", envId). Warning("Bookkeeping API RunServiceClient: Update call: run information incomplete, missing O2 end time") } - if trgEnabled && p.pendingTrgStops[envId] == true { + if trgEnabled && currentPendingTrgStops == true { timeTrgEndTemp = time.Now().UnixMilli() timeTrgEndOutput = &timeO2EndTemp log.WithField("run", runNumber64). @@ -918,18 +984,32 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { } else if function, ok := varStack["__call_func"]; ok && strings.Contains(function, "UpdateRunStart") { // If the update is successful and it is an UpdateRunStart call, we check if we are missing // SOR timestamps, and if that is the case, we set them to Now() and send a new update request - defer delete(p.pendingO2Starts, envId) - defer delete(p.pendingTrgStarts, envId) - if p.pendingO2Starts[envId] == true || (trgEnabled && p.pendingTrgStarts[envId] == true) { + defer func() { + p.pendingO2StartsMu.Lock() + delete(p.pendingO2Starts, envId) + p.pendingO2StartsMu.Unlock() + }() + defer func() { + p.pendingTrgStartsMu.Lock() + delete(p.pendingTrgStarts, envId) + p.pendingTrgStartsMu.Unlock() + }() + p.pendingO2StartsMu.Lock() + currentPendingO2Starts := p.pendingO2Starts[envId] + p.pendingO2StartsMu.Unlock() + p.pendingTrgStartsMu.Lock() + currentPendingTrgStarts := p.pendingTrgStarts[envId] + p.pendingTrgStartsMu.Unlock() + if currentPendingO2Starts == true || (trgEnabled && currentPendingTrgStarts == true) { updatedRun = "INCOMPLETE START" - if p.pendingO2Starts[envId] == true { + if currentPendingO2Starts == true { timeO2StartTemp = time.Now().UnixMilli() timeO2StartOutput = &timeO2StartTemp log.WithField("run", runNumber64). WithField("partition", envId). Warning("Bookkeeping API RunServiceClient: Update call: run information incomplete, missing O2 start time") } - if trgEnabled && p.pendingTrgStarts[envId] == true { + if trgEnabled && currentPendingTrgStarts == true { timeTrgStartTemp = time.Now().UnixMilli() timeTrgStartOutput = &timeO2StartTemp log.WithField("run", runNumber64). @@ -1009,11 +1089,15 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { O2StartTime := varStack["run_start_time_ms"] if O2StartTime != "" { + p.pendingO2StartsMu.Lock() p.pendingO2Starts[envId] = false + p.pendingO2StartsMu.Unlock() } TrgStartTime := varStack["trg_start_time_ms"] if TrgStartTime != "" { + p.pendingTrgStartsMu.Lock() p.pendingTrgStarts[envId] = false + p.pendingTrgStartsMu.Unlock() } p.missingUpdateRunStartsMu.Lock() @@ -1071,23 +1155,34 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { O2StartTime := varStack["run_start_time_ms"] if O2StartTime != "" { + p.pendingO2StartsMu.Lock() p.pendingO2Starts[envId] = false + p.pendingO2StartsMu.Unlock() } O2EndTime := varStack["run_end_time_ms"] if O2EndTime != "" { + p.pendingO2StopsMu.Lock() p.pendingO2Stops[envId] = false + p.pendingO2StopsMu.Unlock() } TrgStartTime := varStack["trg_start_time_ms"] if TrgStartTime != "" { + p.pendingTrgStartsMu.Lock() p.pendingTrgStarts[envId] = false + p.pendingTrgStartsMu.Unlock() } TrgEndTime := varStack["trg_end_time_ms"] if TrgEndTime != "" { + p.pendingTrgStopsMu.Lock() p.pendingTrgStops[envId] = false + p.pendingTrgStopsMu.Unlock() } - if _, ok = p.pendingRunStops[envId]; ok { + p.pendingRunStopsMu.Lock() + _, hasPendingRunStops := p.pendingRunStops[envId] + p.pendingRunStopsMu.Unlock() + if hasPendingRunStops { return updateRunFunc(runNumber64, "test", O2StartTime, O2EndTime, TrgStartTime, TrgEndTime) } else { log.WithField("partition", envId).