Skip to content

Commit

Permalink
Merge pull request #105 from depot/connection-lock
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobwgillespie authored Sep 5, 2024
2 parents 8cc0744 + 4717627 commit 33ae1d7
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 272 deletions.
33 changes: 7 additions & 26 deletions proto/depot/cloud/v2/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@ syntax = "proto3";
package depot.cloud.v2;

service CloudService {
rpc GetDesiredState(GetDesiredStateRequest) returns (stream GetDesiredStateResponse) {}
rpc GetDesiredStateUnary(GetDesiredStateUnaryRequest) returns (GetDesiredStateUnaryResponse) {}
rpc ReportCurrentState(ReportCurrentStateRequest) returns (ReportCurrentStateResponse) {}
rpc ReportErrors(ReportErrorsRequest) returns (ReportErrorsResponse);
rpc ReportHealth(stream ReportHealthRequest) returns (ReportHealthResponse);
rpc GetActiveAgentVersion(GetActiveAgentVersionRequest) returns (GetActiveAgentVersionResponse);
rpc ReconcileVolumes(ReconcileVolumesRequest) returns (stream ReconcileVolumesResponse) {}
rpc ReportVolumeUpdates(ReportVolumeUpdatesRequest) returns (ReportVolumeUpdatesResponse) {}

rpc GetDesiredStateUnary(GetDesiredStateUnaryRequest) returns (GetDesiredStateUnaryResponse) {}

rpc ReplaceVolume(ReplaceVolumeRequest) returns (ReplaceVolumeResponse) {}
}

Expand All @@ -28,6 +24,7 @@ message GetDesiredStateUnaryResponse {

message GetDesiredStateRequest {
string connection_id = 1;
string client_id = 2;
}

message GetDesiredStateResponse {
Expand Down Expand Up @@ -136,13 +133,12 @@ message ReportCurrentStateRequest {
string connection_id = 1;
oneof state {
CloudState replace = 2;
CloudStatePatch patch = 3;
}
reserved 3;
string client_id = 4;
}

message ReportCurrentStateResponse {
int32 generation = 1;
}
message ReportCurrentStateResponse {}

message ReportErrorsRequest {
string connection_id = 1;
Expand All @@ -151,12 +147,6 @@ message ReportErrorsRequest {

message ReportErrorsResponse {}

message ReportHealthRequest {
string connection_id = 1;
}

message ReportHealthResponse {}

message GetActiveAgentVersionRequest {
string connection_id = 1;
}
Expand Down Expand Up @@ -185,19 +175,10 @@ message CloudState {
}
}

message CloudStatePatch {
int32 generation = 1;
oneof patch {
Aws aws = 2;
}

message Aws {
string patch = 1;
}
message ReconcileVolumesRequest {
string client_id = 1;
}

message ReconcileVolumesRequest {}

message ReconcileVolumesResponse {
oneof action {
CreateVolumeAction create_volume = 1;
Expand Down
22 changes: 0 additions & 22 deletions src/handlers/health.ts

This file was deleted.

24 changes: 17 additions & 7 deletions src/handlers/state.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import {PlainMessage} from '@bufbuild/protobuf'
import {Code, ConnectError} from '@connectrpc/connect'
import {compare} from 'fast-json-patch'
import {GetDesiredStateResponse, ReportCurrentStateRequest} from '../proto/depot/cloud/v2/cloud_pb'
import {CurrentState as AwsCurrentState} from '../types'
import {getCurrentState as getCurrentAwsState, reconcile as reconcileAws} from '../utils/aws'
import {clientID} from '../utils/clientID'
import {sleep} from '../utils/common'
import {CLOUD_AGENT_CONNECTION_ID} from '../utils/env'
import {reportError} from '../utils/errors'
Expand Down Expand Up @@ -39,7 +41,7 @@ export async function startStateStream<T>(signal: AbortSignal, provider: CloudPr
await provider.reportCurrentState(currentState)

const {response} = await client.getDesiredStateUnary(
{request: {connectionId: CLOUD_AGENT_CONNECTION_ID}},
{request: {connectionId: CLOUD_AGENT_CONNECTION_ID, clientId: clientID}},
{signal},
)
if (!response || isEmptyResponse(response)) continue
Expand All @@ -51,15 +53,20 @@ export async function startStateStream<T>(signal: AbortSignal, provider: CloudPr
await reportError(error)
}
} catch (err: any) {
await reportError(err)
if (err instanceof ConnectError && err.code === Code.FailedPrecondition) {
// Connection lock was not acquired, sleep and retry
console.log('Connection lock was not acquired for state stream, sleeping and retrying...')
await sleep(5 * 1000)
} else {
await reportError(err)
}
} finally {
await sleep(1000)
}
}
}

interface StateCache<T> {
generation: number
state: T
}

Expand All @@ -68,6 +75,7 @@ function reportAwsState(): (state: AwsCurrentState) => Promise<void> {
return async function reportCurrentState(currentState: AwsCurrentState) {
const request: PlainMessage<ReportCurrentStateRequest> = {
connectionId: CLOUD_AGENT_CONNECTION_ID,
clientId: clientID,
state: {
case: 'replace',
value: {
Expand All @@ -89,15 +97,17 @@ function reportAwsState(): (state: AwsCurrentState) => Promise<void> {
if (diff.length === 0) return
}

const res = await client.reportCurrentState(request)
stateCache = {state: currentState, generation: res.generation}
await client.reportCurrentState(request)
stateCache = {state: currentState}
}
}

function reportFlyState(): (state: FlyCurrentState) => Promise<void> {
let stateCache: StateCache<FlyCurrentState> | null = null
return async function reportCurrentState(currentState: FlyCurrentState) {
const request: PlainMessage<ReportCurrentStateRequest> = {
connectionId: CLOUD_AGENT_CONNECTION_ID,
clientId: clientID,
state: {
case: 'replace',
value: {
Expand All @@ -119,8 +129,8 @@ function reportFlyState(): (state: FlyCurrentState) => Promise<void> {
if (diff.length === 0) return
}

const res = await client.reportCurrentState(request)
stateCache = {state: currentState, generation: res.generation}
await client.reportCurrentState(request)
stateCache = {state: currentState}
}
}

Expand Down
13 changes: 11 additions & 2 deletions src/handlers/volumes.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {PlainMessage} from '@bufbuild/protobuf'
import {Code, ConnectError} from '@connectrpc/connect'
import {
AuthorizeClientAction,
CopyVolumeAction,
Expand Down Expand Up @@ -36,6 +37,8 @@ import {
snapshotRm,
sparsify,
} from '../utils/ceph'
import {clientID} from '../utils/clientID'
import {sleep} from '../utils/common'
import {reportError} from '../utils/errors'
import {client} from '../utils/grpc'

Expand All @@ -45,7 +48,7 @@ export async function startVolumeStream(signal: AbortSignal) {
const inProgressUpdates: Record<string, boolean> = {}
const completedUpdates: Record<string, boolean> = {}

const stream = client.reconcileVolumes({}, {signal})
const stream = client.reconcileVolumes({clientId: clientID}, {signal})

for await (const response of stream) {
if (signal.aborted) return
Expand All @@ -68,7 +71,13 @@ export async function startVolumeStream(signal: AbortSignal) {
})()
}
} catch (err: any) {
await reportError(err)
if (err instanceof ConnectError && err.code === Code.FailedPrecondition) {
// Connection lock was not acquired, sleep and retry
console.log('Connection lock was not acquired for volume stream, sleeping and retrying...')
await sleep(5 * 1000)
} else {
await reportError(err)
}
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {startHealthStream} from './handlers/health'
import {AwsProvider, FlyProvider, startStateStream} from './handlers/state'
import {startUpdater} from './handlers/updater'
import {startVolumeStream} from './handlers/volumes'
import {writeCephConf} from './utils/ceph'
import {clientID} from './utils/clientID'
import {sleep} from './utils/common'
import {
CLOUD_AGENT_CEPH_CONFIG,
Expand All @@ -17,7 +17,7 @@ import {logger} from './utils/logger'
const controller = new AbortController()

async function main() {
logger.info(`cloud-agent ${CLOUD_AGENT_VERSION} started`)
logger.info(`cloud-agent ${CLOUD_AGENT_VERSION} started: ${clientID}`)
const signal = controller.signal

function trapShutdown(signal: 'SIGINT' | 'SIGTERM') {
Expand Down Expand Up @@ -48,8 +48,6 @@ async function main() {
process.exit(1)
}

startHealthStream(signal)

switch (CLOUD_AGENT_PROVIDER) {
case 'fly':
startStateStream(signal, FlyProvider)
Expand Down
34 changes: 6 additions & 28 deletions src/proto/depot/cloud/v2/cloud_connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import {MethodKind} from '@bufbuild/protobuf'
import {
GetActiveAgentVersionRequest,
GetActiveAgentVersionResponse,
GetDesiredStateRequest,
GetDesiredStateResponse,
GetDesiredStateUnaryRequest,
GetDesiredStateUnaryResponse,
ReconcileVolumesRequest,
Expand All @@ -19,8 +17,6 @@ import {
ReportCurrentStateResponse,
ReportErrorsRequest,
ReportErrorsResponse,
ReportHealthRequest,
ReportHealthResponse,
ReportVolumeUpdatesRequest,
ReportVolumeUpdatesResponse,
} from './cloud_pb'
Expand All @@ -32,13 +28,13 @@ export const CloudService = {
typeName: 'depot.cloud.v2.CloudService',
methods: {
/**
* @generated from rpc depot.cloud.v2.CloudService.GetDesiredState
* @generated from rpc depot.cloud.v2.CloudService.GetDesiredStateUnary
*/
getDesiredState: {
name: 'GetDesiredState',
I: GetDesiredStateRequest,
O: GetDesiredStateResponse,
kind: MethodKind.ServerStreaming,
getDesiredStateUnary: {
name: 'GetDesiredStateUnary',
I: GetDesiredStateUnaryRequest,
O: GetDesiredStateUnaryResponse,
kind: MethodKind.Unary,
},
/**
* @generated from rpc depot.cloud.v2.CloudService.ReportCurrentState
Expand All @@ -58,15 +54,6 @@ export const CloudService = {
O: ReportErrorsResponse,
kind: MethodKind.Unary,
},
/**
* @generated from rpc depot.cloud.v2.CloudService.ReportHealth
*/
reportHealth: {
name: 'ReportHealth',
I: ReportHealthRequest,
O: ReportHealthResponse,
kind: MethodKind.ClientStreaming,
},
/**
* @generated from rpc depot.cloud.v2.CloudService.GetActiveAgentVersion
*/
Expand Down Expand Up @@ -94,15 +81,6 @@ export const CloudService = {
O: ReportVolumeUpdatesResponse,
kind: MethodKind.Unary,
},
/**
* @generated from rpc depot.cloud.v2.CloudService.GetDesiredStateUnary
*/
getDesiredStateUnary: {
name: 'GetDesiredStateUnary',
I: GetDesiredStateUnaryRequest,
O: GetDesiredStateUnaryResponse,
kind: MethodKind.Unary,
},
/**
* @generated from rpc depot.cloud.v2.CloudService.ReplaceVolume
*/
Expand Down
Loading

0 comments on commit 33ae1d7

Please sign in to comment.