Skip to content

Commit

Permalink
Reduce logging verbosity
Browse files Browse the repository at this point in the history
  • Loading branch information
Teo Mrnjavac authored and teo committed Feb 2, 2022
1 parent 1a9334e commit 261d891
Show file tree
Hide file tree
Showing 11 changed files with 335 additions and 327 deletions.
52 changes: 26 additions & 26 deletions core/controlcommands/commandqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type empty struct{}
type CommandQueue struct {
sync.Mutex

q chan queueEntry
servent *Servent
q chan queueEntry
servent *Servent
}

func NewCommandQueue(s *Servent) *CommandQueue {
Expand All @@ -70,13 +70,13 @@ func (m *CommandQueue) Enqueue(cmd MesosCommand, callback chan<- MesosCommandRes
default: // Buffer full!
err := errors.New("the queue for MESSAGE commands is full")
log.WithField("error", err.Error()).
WithField("queueSize", QUEUE_SIZE).
Error("cannot enqueue control command")
WithField("queueSize", QUEUE_SIZE).
Error("cannot enqueue control command")
return err
}
}

func (m* CommandQueue) Start() {
func (m *CommandQueue) Start() {
m.Lock()
m.q = make(chan queueEntry, QUEUE_SIZE)
m.Unlock()
Expand All @@ -86,7 +86,7 @@ func (m* CommandQueue) Start() {
select {
case entry, more := <-m.q:
m.Lock()
if !more { // if the channel is closed, we bail
if !more { // if the channel is closed, we bail
return
}
response, err := m.commit(entry.cmd)
Expand Down Expand Up @@ -116,7 +116,7 @@ func (m *CommandQueue) commit(command MesosCommand) (response MesosCommandRespon
}
defer utils.TimeTrack(time.Now(), fmt.Sprintf("cmdq.commit %s to %d targets", command.GetName(), len(command.targets())), log.WithPrefix("cmdq"))

type responseSemaphore struct{
type responseSemaphore struct {
receiver MesosCommandTarget
response MesosCommandResponse
err error
Expand All @@ -129,19 +129,19 @@ func (m *CommandQueue) commit(command MesosCommand) (response MesosCommandRespon
responses := make(map[MesosCommandTarget]MesosCommandResponse)

log.WithFields(logrus.Fields{
"name": command.GetName(),
"id": command.GetId(),
}).
"name": command.GetName(),
"id": command.GetId(),
}).
Debug("ready to commit MesosCommand")

for _, rec := range command.targets() {
go func(receiver MesosCommandTarget) {
log.WithFields(logrus.Fields{
"agentId": receiver.AgentId,
"executorId": receiver.ExecutorId,
"name": command.GetName(),
}).
Debug("sending MesosCommand to target")
"agentId": receiver.AgentId,
"executorId": receiver.ExecutorId,
"name": command.GetName(),
}).
Trace("sending MesosCommand to target")
singleCommand := command.MakeSingleTarget(receiver)
res, err := m.servent.RunCommand(singleCommand, receiver)
if err != nil {
Expand All @@ -150,36 +150,36 @@ func (m *CommandQueue) commit(command MesosCommand) (response MesosCommandRespon
semaphore <- responseSemaphore{
receiver: receiver,
response: res,
err: err,
err: err,
}
return
}

if res.Err() != nil {
log.WithFields(logrus.Fields{
"commandName": res.GetCommandName(),
"error": res.Err().Error(),
}).
"commandName": res.GetCommandName(),
"error": res.Err().Error(),
}).
Trace("received MesosCommandResponse")
} else {
log.WithFields(logrus.Fields{
"commandName": res.GetCommandName(),
}).
"commandName": res.GetCommandName(),
}).
Trace("received MesosCommandResponse")
}

semaphore <- responseSemaphore{
receiver: receiver,
response: res,
}
receiver: receiver,
response: res,
}
}(rec)
}
// Wait for goroutines to finish
for i := 0; i < len(command.targets()); i++ {
respSemaphore := <- semaphore
respSemaphore := <-semaphore
responses[respSemaphore.receiver] = respSemaphore.response
if respSemaphore.err != nil {
sendErrorList = append(sendErrorList, respSemaphore.err)
sendErrorList = append(sendErrorList, respSemaphore.err)
}
}
close(semaphore)
Expand Down
60 changes: 30 additions & 30 deletions core/controlcommands/mesoscommandservent.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,29 @@ type Call struct {

func NewCall(cmd MesosCommand) *Call {
return &Call{
Request: cmd,
Request: cmd,
Response: nil,
Done: make(chan empty),
Error: nil,
Done: make(chan empty),
Error: nil,
}
}

type CallId struct {
Id xid.ID
Id xid.ID
Target MesosCommandTarget
}

type Servent struct {
mu sync.Mutex
mu sync.Mutex
pending map[CallId]*Call

SendFunc SendCommandFunc
}

func NewServent(commandFunc SendCommandFunc) *Servent {
return &Servent{
SendFunc: commandFunc,
pending: make(map[CallId]*Call),
SendFunc: commandFunc,
pending: make(map[CallId]*Call),
}
}

Expand All @@ -82,43 +82,43 @@ func (s *Servent) RunCommand(cmd MesosCommand, receiver MesosCommandTarget) (Mes
call := NewCall(cmd)

callId := CallId{
Id: cmdId,
Id: cmdId,
Target: receiver,
}

log.Debug("servent mutex locking")
log.Trace("servent mutex locking")
s.mu.Lock()
log.Debug("servent mutex locked")
log.Trace("servent mutex locked")

// We append the new call to the pending map, and send the request
s.pending[callId] = call

s.mu.Unlock()
log.Debug("servent mutex unlocked")
log.Trace("servent mutex unlocked")

log.WithFields(logrus.Fields{
"name": cmd.GetName(),
"id": cmd.GetId(),
"agentId": receiver.AgentId,
"executorId": receiver.ExecutorId,
}).
Debug("calling scheduler SendFunc")
"name": cmd.GetName(),
"id": cmd.GetId(),
"agentId": receiver.AgentId,
"executorId": receiver.ExecutorId,
}).
Trace("calling scheduler SendFunc")

err := s.SendFunc(cmd, receiver)
if err != nil {
log.Debug("servent mutex locking")
log.Trace("servent mutex locking")
s.mu.Lock()
log.Debug("servent mutex locked")
log.Trace("servent mutex locked")

delete(s.pending, callId)

s.mu.Unlock()
log.WithError(err).Debug("servent mutex unlocked")
log.WithError(err).Trace("servent mutex unlocked")

return nil, err
}

log.WithField("timeout", cmd.GetResponseTimeout()).Debug("blocking until response or timeout")
log.WithField("timeout", cmd.GetResponseTimeout()).Trace("blocking until response or timeout")
// Neat, now we block until done||timeout
select {
case <-call.Done:
Expand All @@ -127,14 +127,14 @@ func (s *Servent) RunCommand(cmd MesosCommand, receiver MesosCommandTarget) (Mes
case <-time.After(cmd.GetResponseTimeout()):
call.Error = fmt.Errorf("MesosCommand %s timed out for task %s", cmd.GetName(), receiver.TaskId.Value)

log.Debug("servent mutex locking")
log.Trace("servent mutex locking")
s.mu.Lock()
log.Debug("servent mutex locked")
log.Trace("servent mutex locked")

delete(s.pending, callId)

s.mu.Unlock()
log.Debug("servent mutex unlocked")
log.Trace("servent mutex unlocked")
}

if call.Error != nil {
Expand All @@ -145,7 +145,7 @@ func (s *Servent) RunCommand(cmd MesosCommand, receiver MesosCommandTarget) (Mes

func (s *Servent) ProcessResponse(res MesosCommandResponse, sender MesosCommandTarget) {
callId := CallId{
Id: res.GetCommandId(),
Id: res.GetCommandId(),
Target: sender,
}

Expand All @@ -157,14 +157,14 @@ func (s *Servent) ProcessResponse(res MesosCommandResponse, sender MesosCommandT
if call == nil {
log.WithFields(logrus.Fields{
"commandName": res.GetCommandName(),
"commandId": res.GetCommandId(),
"agentId": sender.AgentId,
"executorId": sender.ExecutorId,
"commandId": res.GetCommandId(),
"agentId": sender.AgentId,
"executorId": sender.ExecutorId,
}).
Warning("no pending request found")
Warning("no pending request found")
return
}

call.Response = res
call.Done <- empty{}
}
}
22 changes: 11 additions & 11 deletions core/environment/transition_deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@ import (
func NewDeployTransition(taskman *task.Manager, addRoles []string, removeRoles []string) Transition {
return &DeployTransition{
baseTransition: baseTransition{
name: "DEPLOY",
name: "DEPLOY",
taskman: taskman,
},
addRoles: addRoles,
addRoles: addRoles,
removeRoles: removeRoles,
}
}

type DeployTransition struct {
baseTransition
addRoles []string
removeRoles []string
addRoles []string
removeRoles []string
}

func (t DeployTransition) do(env *Environment) (err error) {
Expand Down Expand Up @@ -159,7 +159,7 @@ func (t DeployTransition) do(env *Environment) (err error) {
// We set all callRoles to ACTIVE right now, because there's no task activation for them.
// This is the callRole equivalent of AcquireTasks, which only pushes updates to taskRoles.
allHooks := wf.GetAllHooks()
callHooks := allHooks.FilterCalls() // get the calls
callHooks := allHooks.FilterCalls() // get the calls
if len(callHooks) > 0 {
for _, h := range callHooks {
pr, ok := h.GetParentRole().(workflow.PublicUpdatable)
Expand All @@ -173,19 +173,19 @@ func (t DeployTransition) do(env *Environment) (err error) {
deploymentTimeout := 90 * time.Second
wfStatus := wf.GetStatus()
if wfStatus != task.ACTIVE {
WORKFLOW_ACTIVE_LOOP:
log.Debug("waiting for workflow to become active")
WORKFLOW_ACTIVE_LOOP:
for {
log.Debug("waiting for workflow to become active")
select {
case wfStatus = <-notifyStatus:
log.WithField("status", wfStatus.String()).
Debug("workflow status change")
Debug("workflow status change")
if wfStatus == task.ACTIVE {
break WORKFLOW_ACTIVE_LOOP
}
continue
case <-time.After(deploymentTimeout):
err = errors.New(fmt.Sprintf("workflow deployment timed out. timeout: %s",deploymentTimeout.String()))
err = errors.New(fmt.Sprintf("workflow deployment timed out. timeout: %s", deploymentTimeout.String()))
break WORKFLOW_ACTIVE_LOOP
// This is needed for when the workflow fails during the STAGING state(mesos status),mesos responds with the `REASON_COMMAND_EXECUTOR_FAILED`,
// By listening to workflow state ERROR we can break the loop before reaching the timeout (1m30s), we can trigger the cleanup faster
Expand All @@ -195,7 +195,7 @@ func (t DeployTransition) do(env *Environment) (err error) {
if wfState == task.ERROR {
failedRoles := make([]string, 0)
workflow.LeafWalk(wf, func(role workflow.Role) {
if st := role.GetState(); st == task.ERROR {
if st := role.GetState(); st == task.ERROR {
log.WithField("state", st).
WithField("role", role.GetPath()).
WithField("environment", role.GetEnvironmentId().String()).
Expand All @@ -204,7 +204,7 @@ func (t DeployTransition) do(env *Environment) (err error) {
}
})
log.WithField("state", wfState.String()).
Debug("workflow state change")
Debug("workflow state change")
err = fmt.Errorf("workflow deployment failed, aborting and cleaning up [offending roles: %s]", strings.Join(failedRoles, ", "))
break WORKFLOW_ACTIVE_LOOP
}
Expand Down
5 changes: 2 additions & 3 deletions core/task/constraint/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"strings"

"github.com/AliceO2Group/Control/common/utils"
"github.com/mesos/mesos-go/api/v1/lib"
mesos "github.com/mesos/mesos-go/api/v1/lib"
)

type Attributes []mesos.Attribute
Expand Down Expand Up @@ -68,13 +68,12 @@ func (attrs Attributes) Satisfy(cts Constraints) (ok bool) {
}

for _, constraint := range cts {
log.WithField("constraint", constraint.String()).Trace("processing constraint")
switch constraint.Operator {
case Equals:
var value string
if value, ok = attrs.Get(constraint.Attribute); ok {
if strings.Contains(value, ",") {
values := strings.Split(value,",")
values := strings.Split(value, ",")
if utils.StringSliceContains(values, constraint.Value) {
ok = true
continue
Expand Down
Loading

0 comments on commit 261d891

Please sign in to comment.