Skip to content

Commit

Permalink
fix(crux): legacy agent update (#819)
Browse files Browse the repository at this point in the history
  • Loading branch information
m8vago authored Sep 14, 2023
1 parent 81f26fe commit 72a3185
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 15 deletions.
12 changes: 8 additions & 4 deletions web/crux/src/app/agent/agent.connection-strategy.provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,27 @@ export default class AgentConnectionStrategyProvider {
const token = this.jwtService.decode(connection.jwt) as AgentToken

if (!token.version) {
this.logger.verbose('No version found in the token. Using legacy strategy.')
this.logger.verbose(`${connection.nodeId} - No version found in the token. Using legacy strategy.`)
return this.legacy
}

if (token.type === 'install') {
this.logger.verbose('Install token detected. Using install strategy.')
this.logger.verbose(`${connection.nodeId} - Install token detected. Using install strategy.`)
return this.install
}

if (token.type === 'connection') {
const agent = this.service.getById(token.sub)
if (!agent) {
this.logger.verbose('Connection token detected. No connected agent found. Using default strategy.')
this.logger.verbose(
`${connection.nodeId} - Connection token detected. No connected agent found. Using default strategy.`,
)
return this.defaultStrategy
}

this.logger.verbose('Connection token detected. Connected agent found. Using update strategy.')
this.logger.verbose(
`${connection.nodeId} - Connection token detected. Connected agent found. Using update strategy.`,
)
return this.update
}

Expand Down
16 changes: 13 additions & 3 deletions web/crux/src/app/agent/agent.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ export default class AgentService {
// there should be no awaits between this and the agents.delete() call
// so we can be sure it happens in the same microtask
if (agent === storedAgent) {
this.logger.log(`Left: ${agent.id}`)
this.logger.log(`Left: ${agent.id}, version: ${agent.version}`)
this.agents.delete(agent.id)
AgentMetrics.connectedCount().dec()

Expand All @@ -434,9 +434,18 @@ export default class AgentService {

agent.onDisconnected()
} else if (status === 'connected' || status === 'outdated') {
if (this.agents.has(agent.id)) {
this.logger.warn(
`Agent connection divergence: ${agent.id} was emitting a ${status} status, while there was an agent with the same ID already connected. Sending shutdown.`,
)

agent.close(CloseReason.SHUTDOWN)
return
}

this.agents.set(agent.id, agent)

this.logger.log(`Agent joined with id: ${agent.id}, key: ${!!agent.publicKey}`)
this.logger.log(`Agent joined with id: ${agent.id}, version: ${agent.version} key: ${!!agent.publicKey}`)
AgentMetrics.connectedCount().inc()
this.logServiceInfo()

Expand All @@ -459,6 +468,7 @@ export default class AgentService {
// we just have to return the command channel

// command channel is already completed so no need for onDisconnected() call
this.logger.verbose('Crashing legacy agent intercepted.')
return agent.onConnected(AgentConnectionLegacyStrategy.CONNECTION_STATUS_LISTENER)
}

Expand Down Expand Up @@ -513,7 +523,7 @@ export default class AgentService {
}

private logServiceInfo(): void {
this.logger.debug(`Agents: ${this.agents.size}`)
this.logger.verbose(`Agents: ${this.agents.size}`)
this.agents.forEach(it => it.debugInfo(this.logger))
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Injectable } from '@nestjs/common'
import { firstValueFrom } from 'rxjs'
import { Subject, firstValueFrom } from 'rxjs'
import { Agent } from 'src/domain/agent'
import { generateAgentToken } from 'src/domain/agent-token'
import { CruxConflictException } from 'src/exception/crux-exception'
import { AgentInfo, CloseReason } from 'src/grpc/protobuf/proto/agent'
import GrpcNodeConnection from 'src/shared/grpc-node-connection'
Expand Down Expand Up @@ -55,11 +56,14 @@ export default class AgentConnectionLegacyStrategy extends AgentConnectionStrate
// this legacy token is already replaced or
// we send a shutdown to the incoming agent
info.id = AgentConnectionLegacyStrategy.LEGACY_NONCE
const legacyToken = generateAgentToken(AgentConnectionLegacyStrategy.LEGACY_NONCE, 'install')
const signedLegacyToken = this.jwtService.sign(legacyToken)
connection.onTokenReplaced(legacyToken, signedLegacyToken)

const incomingAgent = await this.createAgent({
const incomingAgent = new Agent({
connection,
eventChannel: new Subject(),
info,
node,
outdated: true,
})

Expand Down
8 changes: 4 additions & 4 deletions web/crux/src/domain/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,10 @@ export class Agent {
}

debugInfo(logger: Logger) {
logger.debug(`Agent id: ${this.id}, open: ${!this.commandChannel.closed}`)
logger.debug(`Deployments: ${this.deployments.size}`)
logger.debug(`Watchers: ${this.statusWatchers.size}`)
logger.debug(`Log streams: ${this.logStreams.size}`)
logger.verbose(`Agent id: ${this.id}, open: ${!this.commandChannel.closed}`)
logger.verbose(`Deployments: ${this.deployments.size}`)
logger.verbose(`Watchers: ${this.statusWatchers.size}`)
logger.verbose(`Log streams: ${this.logStreams.size}`)
this.deployments.forEach(it => it.debugInfo(logger))
}

Expand Down
2 changes: 1 addition & 1 deletion web/crux/src/domain/deployment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,6 @@ export default class Deployment {
}

debugInfo(logger: Logger): void {
logger.debug(`> ${this.id}, open: ${!this.statusChannel.closed}`)
logger.verbose(`> ${this.id}, open: ${!this.statusChannel.closed}`)
}
}

0 comments on commit 72a3185

Please sign in to comment.