Skip to content

Commit

Permalink
UBERF-9137: Fix Support for suspended installations (#7667)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <[email protected]>
  • Loading branch information
haiodo authored Jan 15, 2025
1 parent 0c6feb6 commit c8dde57
Show file tree
Hide file tree
Showing 24 changed files with 425 additions and 326 deletions.
4 changes: 2 additions & 2 deletions packages/core/src/__tests__/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import { fillConfiguration, pluginFilterTx } from '../utils'
import { connect } from './connection'
import { genMinModel } from './minmodel'

function filterPlugin (plugin: Plugin): (txes: Tx[]) => Promise<Tx[]> {
return async (txes) => {
function filterPlugin (plugin: Plugin): (txes: Tx[]) => Tx[] {
return (txes) => {
const configs = new Map<Ref<PluginConfiguration>, PluginConfiguration>()
fillConfiguration(txes, configs)

Expand Down
33 changes: 21 additions & 12 deletions packages/core/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ export interface TxPersistenceStore {
store: (model: LoadModelResponse) => Promise<void>
}

export type ModelFilter = (tx: Tx[]) => Promise<Tx[]>
export type ModelFilter = (tx: Tx[]) => Tx[]

/**
* @public
Expand Down Expand Up @@ -256,17 +256,21 @@ export async function createClient (
}
const conn = await ctx.with('connect', {}, () => connect(txHandler))

const { mode, current, addition } = await ctx.with('load-model', {}, (ctx) => loadModel(ctx, conn, txPersistence))
let { mode, current, addition } = await ctx.with('load-model', {}, (ctx) => loadModel(ctx, conn, txPersistence))
switch (mode) {
case 'same':
case 'upgrade':
await ctx.with('build-model', {}, (ctx) => buildModel(ctx, current, modelFilter, hierarchy, model))
ctx.withSync('build-model', {}, (ctx) => {
buildModel(ctx, current, modelFilter, hierarchy, model)
})
break
case 'addition':
await ctx.with('build-model', {}, (ctx) =>
ctx.withSync('build-model', {}, (ctx) => {
buildModel(ctx, current.concat(addition), modelFilter, hierarchy, model)
)
})
}
current = []
addition = []

txBuffer = txBuffer.filter((tx) => tx.space !== core.space.Model)

Expand All @@ -287,7 +291,7 @@ export async function createClient (
return
}
// Find all new transactions and apply
const { mode, current, addition } = await ctx.with('load-model', {}, (ctx) => loadModel(ctx, conn, txPersistence))
let { mode, current, addition } = await ctx.with('load-model', {}, (ctx) => loadModel(ctx, conn, txPersistence))

switch (mode) {
case 'upgrade':
Expand All @@ -296,16 +300,21 @@ export async function createClient (
model = new ModelDb(hierarchy)
;(client as ClientImpl).setModel(hierarchy, model)

await ctx.with('build-model', {}, (ctx) => buildModel(ctx, current, modelFilter, hierarchy, model))
ctx.withSync('build-model', {}, (ctx) => {
buildModel(ctx, current, modelFilter, hierarchy, model)
})
current = []
await oldOnConnect?.(ClientConnectEvent.Upgraded, _lastTx, data)
// No need to fetch more stuff since upgrade was happened.
break
case 'addition':
await ctx.with('build-model', {}, (ctx) =>
ctx.withSync('build-model', {}, (ctx) => {
buildModel(ctx, current.concat(addition), modelFilter, hierarchy, model)
)
})
break
}
current = []
addition = []

if (lastTx === undefined) {
// No need to do anything here since we connected.
Expand Down Expand Up @@ -391,13 +400,13 @@ async function loadModel (
return { mode: 'addition', current: current.transactions, addition: result.transactions }
}

async function buildModel (
function buildModel (
ctx: MeasureContext,
transactions: Tx[],
modelFilter: ModelFilter | undefined,
hierarchy: Hierarchy,
model: ModelDb
): Promise<void> {
): void {
const systemTx: Tx[] = []
const userTx: Tx[] = []

Expand All @@ -416,7 +425,7 @@ async function buildModel (

let txes = systemTx.concat(userTx)
if (modelFilter !== undefined) {
txes = await modelFilter(txes)
txes = modelFilter(txes)
}

ctx.withSync('build hierarchy', {}, () => {
Expand Down
105 changes: 72 additions & 33 deletions plugins/client-resources/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class RequestPromise {
chunks?: { index: number, data: FindResult<any> }[]
}

const globalRPCHandler: RPCHandler = new RPCHandler()

class Connection implements ClientConnection {
private websocket: ClientSocket | null = null
binaryMode = false
Expand Down Expand Up @@ -115,7 +117,7 @@ class Connection implements ClientConnection {

onConnect?: (event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>

rpcHandler = new RPCHandler()
rpcHandler: RPCHandler

lastHash?: string

Expand Down Expand Up @@ -145,6 +147,7 @@ class Connection implements ClientConnection {
} else {
this.sessionId = generateId()
}
this.rpcHandler = opt?.useGlobalRPCHandler === true ? globalRPCHandler : new RPCHandler()

this.onConnect = opt?.onConnect

Expand Down Expand Up @@ -187,6 +190,8 @@ class Connection implements ClientConnection {
this.pingResponse = Date.now()
}
}
}).catch((err) => {
this.ctx.error('failed to send msg', { err })
})
} else {
clearInterval(this.interval)
Expand Down Expand Up @@ -336,7 +341,9 @@ class Connection implements ClientConnection {
helloResp.reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected,
helloResp.lastTx,
this.sessionId
)
)?.catch((err) => {
this.ctx.error('failed to call onConnect', { err })
})
this.schedulePing(socketId)
return
} else {
Expand All @@ -345,7 +352,9 @@ class Connection implements ClientConnection {
return
}
if (resp.result === pingConst) {
void this.sendRequest({ method: pingConst, params: [] })
void this.sendRequest({ method: pingConst, params: [] }).catch((err) => {
this.ctx.error('failed to send ping', { err })
})
return
}
if (resp.id !== undefined) {
Expand Down Expand Up @@ -416,14 +425,21 @@ class Connection implements ClientConnection {
promise.reject(new PlatformError(resp.error))
} else {
if (request?.handleResult !== undefined) {
void request.handleResult(resp.result).then(() => {
promise.resolve(resp.result)
})
void request
.handleResult(resp.result)
.then(() => {
promise.resolve(resp.result)
})
.catch((err) => {
this.ctx.error('failed to handleResult', { err })
})
} else {
promise.resolve(resp.result)
}
}
void broadcastEvent(client.event.NetworkRequests, this.requests.size)
void broadcastEvent(client.event.NetworkRequests, this.requests.size).catch((err) => {
this.ctx.error('failed to broadcast', { err })
})
} else {
const txArr = Array.isArray(resp.result) ? (resp.result as Tx[]) : [resp.result as Tx]

Expand All @@ -437,10 +453,14 @@ class Connection implements ClientConnection {
this.handler(...txArr)

clearTimeout(this.incomingTimer)
void broadcastEvent(client.event.NetworkRequests, this.requests.size + 1)
void broadcastEvent(client.event.NetworkRequests, this.requests.size + 1).catch((err) => {
this.ctx.error('failed to broadcast', { err })
})

this.incomingTimer = setTimeout(() => {
void broadcastEvent(client.event.NetworkRequests, this.requests.size)
void broadcastEvent(client.event.NetworkRequests, this.requests.size).catch((err) => {
this.ctx.error('failed to broadcast', { err })
})
}, 500)
}
}
Expand Down Expand Up @@ -476,7 +496,9 @@ class Connection implements ClientConnection {
this.dialTimer = setTimeout(() => {
this.dialTimer = null
if (!opened && !this.closed) {
void this.opt?.onDialTimeout?.()
void this.opt?.onDialTimeout?.()?.catch((err) => {
this.ctx.error('failed to handle dial timeout', { err })
})
this.scheduleOpen(this.ctx, true)
}
}, dialTimeout)
Expand All @@ -494,7 +516,9 @@ class Connection implements ClientConnection {
return
}
if (event.data === pingConst) {
void this.sendRequest({ method: pingConst, params: [] })
void this.sendRequest({ method: pingConst, params: [] }).catch((err) => {
this.ctx.error('failed to send ping', { err })
})
return
}
if (
Expand All @@ -503,35 +527,42 @@ class Connection implements ClientConnection {
) {
const text = new TextDecoder().decode(event.data)
if (text === pingConst) {
void this.sendRequest({ method: pingConst, params: [] })
void this.sendRequest({ method: pingConst, params: [] }).catch((err) => {
this.ctx.error('failed to send ping', { err })
})
}
if (text === pongConst) {
this.pingResponse = Date.now()
}
return
}
if (event.data instanceof Blob) {
void event.data.arrayBuffer().then((data) => {
if (this.compressionMode && this.helloReceived) {
void event.data
.arrayBuffer()
.then((data) => {
if (this.compressionMode && this.helloReceived) {
try {
data = uncompress(data)
} catch (err: any) {
// Ignore
console.error(err)
}
}
try {
data = uncompress(data)
const resp = this.rpcHandler.readResponse<any>(data, this.binaryMode)
this.handleMsg(socketId, resp)
} catch (err: any) {
// Ignore
console.error(err)
if (!this.helloReceived) {
// Just error and ignore for now.
console.error(err)
} else {
throw err
}
}
}
try {
const resp = this.rpcHandler.readResponse<any>(data, this.binaryMode)
this.handleMsg(socketId, resp)
} catch (err: any) {
if (!this.helloReceived) {
// Just error and ignore for now.
console.error(err)
} else {
throw err
}
}
})
})
.catch((err) => {
this.ctx.error('failed to decode array buffer', { err })
})
} else {
let data = event.data
if (this.compressionMode && this.helloReceived) {
Expand Down Expand Up @@ -561,7 +592,9 @@ class Connection implements ClientConnection {
return
}
// console.log('client websocket closed', socketId, ev?.reason)
void broadcastEvent(client.event.NetworkRequests, -1)
void broadcastEvent(client.event.NetworkRequests, -1).catch((err) => {
this.ctx.error('failed broadcast', { err })
})
this.scheduleOpen(this.ctx, true)
}
wsocket.onopen = () => {
Expand Down Expand Up @@ -591,7 +624,9 @@ class Connection implements ClientConnection {
if (opened) {
console.error('client websocket error:', socketId, this.url, this.workspace, this.email)
}
void broadcastEvent(client.event.NetworkRequests, -1)
void broadcastEvent(client.event.NetworkRequests, -1).catch((err) => {
this.ctx.error('failed to broadcast', { err })
})
}
}

Expand Down Expand Up @@ -669,7 +704,11 @@ class Connection implements ClientConnection {
ctx.withSync('send-data', {}, () => {
sendData()
})
void ctx.with('broadcast-event', {}, () => broadcastEvent(client.event.NetworkRequests, this.requests.size))
void ctx
.with('broadcast-event', {}, () => broadcastEvent(client.event.NetworkRequests, this.requests.size))
.catch((err) => {
this.ctx.error('failed to broadcast', { err })
})
if (data.method !== pingConst) {
return await promise.promise
}
Expand Down
15 changes: 13 additions & 2 deletions plugins/client-resources/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ export default async () => {
return await Promise.resolve(clientConnection)
}

const modelFilter: ModelFilter = async (txes) => {
const modelFilter: ModelFilter = (txes) => {
if (filterModel === 'client') {
return returnClientTxes(txes)
}
Expand Down Expand Up @@ -177,7 +177,18 @@ function returnClientTxes (txes: Tx[]): Tx[] {
'templates:class:TemplateField' as Ref<Class<Doc>>,
'activity:class:DocUpdateMessageViewlet' as Ref<Class<Doc>>,
'core:class:PluginConfiguration' as Ref<Class<Doc>>,
'core:class:DomainIndexConfiguration' as Ref<Class<Doc>>
'core:class:DomainIndexConfiguration' as Ref<Class<Doc>>,
'view:class:ViewletDescriptor' as Ref<Class<Doc>>,
'presentation:class:ComponentPointExtension' as Ref<Class<Doc>>,
'activity:class:ActivityMessagesFilter' as Ref<Class<Doc>>,
'view:class:ActionCategory' as Ref<Class<Doc>>,
'activity:class:ActivityExtension' as Ref<Class<Doc>>,
'chunter:class:ChatMessageViewlet' as Ref<Class<Doc>>,
'activity:class:ActivityMessageControl' as Ref<Class<Doc>>,
'notification:class:ActivityNotificationViewlet' as Ref<Class<Doc>>,
'setting:class:SettingsCategory' as Ref<Class<Doc>>,
'setting:class:WorkspaceSettingCategory' as Ref<Class<Doc>>,
'notification:class:NotificationProvider' as Ref<Class<Doc>>
])

const result = pluginFilterTx(excludedPlugins, configs, txes).filter((tx) => {
Expand Down
2 changes: 2 additions & 0 deletions plugins/client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ export interface ClientFactoryOptions {
onConnect?: (event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>
ctx?: MeasureContext
onDialTimeout?: () => void | Promise<void>

useGlobalRPCHandler?: boolean
}

/**
Expand Down
4 changes: 2 additions & 2 deletions plugins/view-resources/src/components/RelationsEditor.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
object._class,
{ _id: object._id },
(res) => {
relationsA = res[0].$associations ?? {}
relationsA = res?.[0]?.$associations ?? {}
},
{ associations: associationsA.map((a) => [a._id, 1]) }
)
Expand All @@ -60,7 +60,7 @@
object._class,
{ _id: object._id },
(res) => {
relationsB = res[0].$associations ?? {}
relationsB = res?.[0]?.$associations ?? {}
},
{ associations: associationsB.map((a) => [a._id, -1]) }
)
Expand Down
3 changes: 2 additions & 1 deletion services/github/github-assets/lang/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
"AuthenticationRevokedGithub": "Re-authorization for {login} is required for the proper functioning of the GitHub App",
"UnlinkInstallationTitle": "Uninstall Github App",
"UnlinkInstallation": "Are you sure you want to uninstall the GitHub App? Synchronization will be disabled.",
"RemoveInstallation": "Uninstall"
"RemoveInstallation": "Uninstall",
"Suspended": "Suspended"
}
}
3 changes: 2 additions & 1 deletion services/github/github-assets/lang/pt.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
"AuthenticationRevokedGithub": "É necessária uma nova autorização para {login} para o funcionamento adequado do aplicativo GitHub",
"UnlinkInstallationTitle": "Desinstalar aplicativo Github",
"UnlinkInstallation": "Tem certeza de que deseja desinstalar o aplicativo GitHub? A sincronização será desabilitada.",
"RemoveInstallation": "Desinstalar"
"RemoveInstallation": "Desinstalar",
"Suspended": "Suspended"
}
}
Loading

0 comments on commit c8dde57

Please sign in to comment.