Skip to content

Commit

Permalink
Merge pull request #113 from depot/report-state
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobwgillespie authored Sep 19, 2024
2 parents 5cdc856 + 75b590c commit e2df0c5
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 337 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
syntax = "proto3";

package depot.cloud.v4;
package depot.cloud.v5;

service CloudService {
rpc GetDesiredState(GetDesiredStateRequest) returns (GetDesiredStateResponse) {}
rpc ReportCurrentState(ReportCurrentStateRequest) returns (ReportCurrentStateResponse) {}
rpc ReportErrors(ReportErrorsRequest) returns (ReportErrorsResponse);
rpc GetActiveAgentVersion(GetActiveAgentVersionRequest) returns (GetActiveAgentVersionResponse);
rpc ReconcileVolumes(ReconcileVolumesRequest) returns (stream ReconcileVolumesResponse) {}
Expand All @@ -16,6 +15,7 @@ service CloudService {

message GetDesiredStateRequest {
string client_id = 1;
CloudState current_state = 2;
}

message GetDesiredStateResponse {
Expand Down Expand Up @@ -52,12 +52,12 @@ message GetDesiredStateResponse {
}

message MachineChange {
string id = 1;
string resource_id = 1;
MachineState desired_state = 2;
}

message VolumeChange {
string id = 1;
string resource_id = 1;
VolumeState desired_state = 2;
optional string attached_to = 3;
optional string device = 4;
Expand Down Expand Up @@ -118,13 +118,6 @@ message ReplaceVolumeRequest {

message ReplaceVolumeResponse {}

message ReportCurrentStateRequest {
CloudState state = 1;
string client_id = 2;
}

message ReportCurrentStateResponse {}

message ReportErrorsRequest {
repeated string errors = 1;
}
Expand Down
101 changes: 30 additions & 71 deletions src/handlers/state.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import {PlainMessage} from '@bufbuild/protobuf'
import {Code, ConnectError} from '@connectrpc/connect'
import {compare} from 'fast-json-patch'
import {GetDesiredStateResponse, ReportCurrentStateRequest} from '../proto/depot/cloud/v4/cloud_pb'
import {GetDesiredStateResponse} from '../proto/depot/cloud/v5/cloud_pb'
import {CurrentState as AwsCurrentState} from '../types'
import {getCurrentState as getCurrentAwsState, reconcile as reconcileAws} from '../utils/aws'
import {clientID} from '../utils/clientID'
Expand All @@ -14,32 +12,53 @@ import {
} from '../utils/fly/reconcile'
import {client} from '../utils/grpc'

interface CloudProvider<T> {
interface CloudProvider<T extends AwsCurrentState | FlyCurrentState> {
getCurrentState(): Promise<T>
reportCurrentState(currentState: T): Promise<void>
reconcile(response: GetDesiredStateResponse, state: T): Promise<void>
}

export const AwsProvider: CloudProvider<AwsCurrentState> = {
getCurrentState: () => getCurrentAwsState(),
reportCurrentState: reportAwsState(),
reconcile: (response, currentState) => reconcileAws(response, currentState),
}

export const FlyProvider: CloudProvider<FlyCurrentState> = {
getCurrentState: () => getCurrentFlyState(),
reportCurrentState: reportFlyState(),
reconcile: (response, currentState) => reconcileFly(response, currentState),
}

export async function startStateStream<T>(signal: AbortSignal, provider: CloudProvider<T>) {
export async function startStateStream<T extends AwsCurrentState | FlyCurrentState>(
signal: AbortSignal,
provider: CloudProvider<T>,
) {
while (!signal.aborted) {
try {
const currentState = await provider.getCurrentState()

await provider.reportCurrentState(currentState)

const response = await client.getDesiredState({clientId: clientID}, {signal})
const response = await client.getDesiredState(
{
clientId: clientID,
currentState: {
state:
currentState.cloud === 'aws'
? {
case: 'aws',
value: {
availabilityZone: currentState.availabilityZone,
state: JSON.stringify(currentState),
},
}
: {
case: 'fly',
value: {
region: currentState.region,
state: JSON.stringify(currentState),
},
},
},
},
{signal},
)
if (isEmptyResponse(response)) continue

await provider.reconcile(response, currentState)
Expand All @@ -57,66 +76,6 @@ export async function startStateStream<T>(signal: AbortSignal, provider: CloudPr
}
}

interface StateCache<T> {
state: T
}

function reportAwsState(): (state: AwsCurrentState) => Promise<void> {
let stateCache: StateCache<AwsCurrentState> | null = null
return async function reportCurrentState(currentState: AwsCurrentState) {
const request: PlainMessage<ReportCurrentStateRequest> = {
clientId: clientID,
state: {
state: {
case: 'aws',
value: {
availabilityZone: currentState.availabilityZone,
state: JSON.stringify(currentState),
},
},
},
}

if (stateCache) {
const diff = compare(stateCache.state, currentState)

// If there is no difference, don't send a request
if (diff.length === 0) return
}

await client.reportCurrentState(request)
stateCache = {state: currentState}
}
}

function reportFlyState(): (state: FlyCurrentState) => Promise<void> {
let stateCache: StateCache<FlyCurrentState> | null = null
return async function reportCurrentState(currentState: FlyCurrentState) {
const request: PlainMessage<ReportCurrentStateRequest> = {
clientId: clientID,
state: {
state: {
case: currentState.cloud,
value: {
region: currentState.region,
state: JSON.stringify(currentState),
},
},
},
}

if (stateCache) {
const diff = compare(stateCache.state, currentState)

// If there is no difference, don't send a request
if (diff.length === 0) return
}

await client.reportCurrentState(request)
stateCache = {state: currentState}
}
}

function isEmptyResponse(response: GetDesiredStateResponse): boolean {
return (
response.newMachines.length === 0 &&
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/volumes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
ReportVolumeUpdatesRequest,
ResizeVolumeAction,
TrimVolumeAction,
} from '../proto/depot/cloud/v4/cloud_pb'
} from '../proto/depot/cloud/v5/cloud_pb'
import {
authCaps,
authGetKey,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// @generated by protoc-gen-connect-es v1.4.0 with parameter "target=ts,import_extension=none"
// @generated from file depot/cloud/v4/cloud.proto (package depot.cloud.v4, syntax proto3)
// @generated from file depot/cloud/v5/cloud.proto (package depot.cloud.v5, syntax proto3)
/* eslint-disable */
// @ts-nocheck

Expand All @@ -13,22 +13,20 @@ import {
ReconcileVolumesResponse,
ReplaceVolumeRequest,
ReplaceVolumeResponse,
ReportCurrentStateRequest,
ReportCurrentStateResponse,
ReportErrorsRequest,
ReportErrorsResponse,
ReportVolumeUpdatesRequest,
ReportVolumeUpdatesResponse,
} from './cloud_pb'

/**
* @generated from service depot.cloud.v4.CloudService
* @generated from service depot.cloud.v5.CloudService
*/
export const CloudService = {
typeName: 'depot.cloud.v4.CloudService',
typeName: 'depot.cloud.v5.CloudService',
methods: {
/**
* @generated from rpc depot.cloud.v4.CloudService.GetDesiredState
* @generated from rpc depot.cloud.v5.CloudService.GetDesiredState
*/
getDesiredState: {
name: 'GetDesiredState',
Expand All @@ -37,16 +35,7 @@ export const CloudService = {
kind: MethodKind.Unary,
},
/**
* @generated from rpc depot.cloud.v4.CloudService.ReportCurrentState
*/
reportCurrentState: {
name: 'ReportCurrentState',
I: ReportCurrentStateRequest,
O: ReportCurrentStateResponse,
kind: MethodKind.Unary,
},
/**
* @generated from rpc depot.cloud.v4.CloudService.ReportErrors
* @generated from rpc depot.cloud.v5.CloudService.ReportErrors
*/
reportErrors: {
name: 'ReportErrors',
Expand All @@ -55,7 +44,7 @@ export const CloudService = {
kind: MethodKind.Unary,
},
/**
* @generated from rpc depot.cloud.v4.CloudService.GetActiveAgentVersion
* @generated from rpc depot.cloud.v5.CloudService.GetActiveAgentVersion
*/
getActiveAgentVersion: {
name: 'GetActiveAgentVersion',
Expand All @@ -64,7 +53,7 @@ export const CloudService = {
kind: MethodKind.Unary,
},
/**
* @generated from rpc depot.cloud.v4.CloudService.ReconcileVolumes
* @generated from rpc depot.cloud.v5.CloudService.ReconcileVolumes
*/
reconcileVolumes: {
name: 'ReconcileVolumes',
Expand All @@ -73,7 +62,7 @@ export const CloudService = {
kind: MethodKind.ServerStreaming,
},
/**
* @generated from rpc depot.cloud.v4.CloudService.ReportVolumeUpdates
* @generated from rpc depot.cloud.v5.CloudService.ReportVolumeUpdates
*/
reportVolumeUpdates: {
name: 'ReportVolumeUpdates',
Expand All @@ -82,7 +71,7 @@ export const CloudService = {
kind: MethodKind.Unary,
},
/**
* @generated from rpc depot.cloud.v4.CloudService.ReplaceVolume
* @generated from rpc depot.cloud.v5.CloudService.ReplaceVolume
*/
replaceVolume: {
name: 'ReplaceVolume',
Expand Down
Loading

0 comments on commit e2df0c5

Please sign in to comment.