Skip to content

Commit

Permalink
overhaul resumables again
Browse files Browse the repository at this point in the history
- better serialization of settled items
- simplified .all and .allSettled implementations
- simplified FailedResume design
- better handling of errors when serializing
- fix FailedResume linking
- cleanup unused code
  • Loading branch information
Matchlighter committed Jan 8, 2024
1 parent 8f7be90 commit 4bf1a19
Show file tree
Hide file tree
Showing 8 changed files with 336 additions and 391 deletions.
41 changes: 40 additions & 1 deletion src/common/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,45 @@ export function deep_pojso(value: any) {
return true;
}

export function serializable(value: any, serializer_keys: (string | symbol | ((v: any) => boolean))[]) {
function has_key(obj: any) {
for (let k of serializer_keys) {
if (typeof k == "function") {
if (k(obj)) return true;
} else {
if (value[k]) return true;
}
}
return false;
}

if (Array.isArray(value)) {
for (let av of value) {
if (!serializable(av, serializer_keys)) return false;
}
return true;
}

if (typeof value == "object") {
const proto = Object.getPrototypeOf(value);
if (proto) {
return has_key(value);
} else {
for (let [k, v] of Object.entries(value)) {
if (!serializable(v, serializer_keys)) return false;
}
return true;
}
}

if (typeof value == "function") {
return has_key(value);
}

// Primitive
return true;
}

export class PromiseTimedout extends Error { }

export function timeoutPromise<T>(timeout: number, promise: Promise<T>, timeoutAction?: () => void): Promise<T> {
Expand Down Expand Up @@ -241,5 +280,5 @@ export function read_lines(file: string, options: LineReaderOptions, iteratee: (

export function split_once(str: string, splitter: string) {
const i = str.indexOf(splitter);
return [str.slice(0,i), str.slice(i+1)];
return [str.slice(0, i), str.slice(i + 1)];
}
28 changes: 16 additions & 12 deletions src/hypervisor/cross_call.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import path = require("path");
import { randomUUID } from "crypto";

import { deep_pojso } from "../common/util";
import { serializable } from "../common/util";
import { ResumablePromise } from "../runtime/resumable";
import { SerializeContext } from "../runtime/resumable/resumable_promise";
import { ApplicationInstance } from "./application_instance";
Expand Down Expand Up @@ -103,7 +103,7 @@ export class CrossCallStore {
call = { ...call }
if (status == "accept") {
call.result = "accept";
if (!deep_pojso(value)) {
if (!serializable(value, [])) {
logMessage("error", `CrossAppCall to ${call.target_app}.${call.method}() return non-JSON-serialiable`, value);
call.result = "reject_error";
value = "Cross-App Call returns must be JSON serializable";
Expand Down Expand Up @@ -211,9 +211,9 @@ class CrossCallWrapper extends ResumablePromise<any> {
super();

if (await_for instanceof ResumablePromise) {
await_for.then(this._resolve, this._reject, this);
await_for.then(this.resolve.bind(this), this.reject.bind(this), this);
} else {
await_for.then(this._resolve, this._reject);
await_for.then(this.resolve.bind(this), this.reject.bind(this));
}

current.hypervisor.crossCallStore._markCallTouched(current.application, call_uuid);
Expand All @@ -239,9 +239,9 @@ class CrossCallWrapper extends ResumablePromise<any> {
}

serialize(ctx: SerializeContext) {
ctx.set_type('cross-call-awaiter');
ctx.side_effects(true);
return {
type: 'cross-call-awaiter',
sideeffect_free: false,
call_uuid: this.call_uuid,
await_for: ctx.ref(this.await_for),
}
Expand All @@ -260,9 +260,6 @@ class CrossCallPromise extends ResumablePromise<any> {
this.do_unsuspend();
}

resolve = this._resolve;
reject = this._reject;

static {
ResumablePromise.defineClass<CrossCallPromise>({
type: 'cross-call-awaitee',
Expand All @@ -272,6 +269,13 @@ class CrossCallPromise extends ResumablePromise<any> {
})
}

resolve(arg: any): void {
return super.resolve(arg)
}
reject(arg: any): void {
return super.resolve(arg)
}

protected do_unsuspend() {
current.hypervisor.crossCallStore._setClientPromise(this.call_uuid, this);
}
Expand All @@ -280,10 +284,10 @@ class CrossCallPromise extends ResumablePromise<any> {
current.hypervisor.crossCallStore._setClientPromise(this.call_uuid, null);
}

serialize() {
serialize(ctx: SerializeContext) {
ctx.set_type('cross-call-awaitee');
ctx.side_effects(false);
return {
type: 'cross-call-awaitee',
sideeffect_free: false,
call_uuid: this.call_uuid,
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/plugins/home_assistant/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import { sync_to_observable } from '@matchlighter/common_library/sync_observable
import { mqtt } from '..';
import { DeepReadonly } from '../../common/util';
import { HyperWrapper } from '../../hypervisor/managed_apps';
import { ResumablePromise, SerializedResumable } from "../../runtime/resumable";
import { ResumablePromise } from "../../runtime/resumable";
import { SerializeContext } from '../../runtime/resumable/resumable_promise';
import { Plugin, get_plugin, handle_client_error } from '../base';
import { MqttPlugin } from '../mqtt';
import { HomeAssistantApi, homeAssistantApi } from './api';
Expand Down Expand Up @@ -292,10 +293,10 @@ class EventAwaiter extends ResumablePromise<any>{
this.ha_untrack = this.hap['trackEventAwaiter'](this);
}

serialize(): SerializedResumable {
serialize(ctx: SerializeContext) {
ctx.set_type('ha_event_waiter');
ctx.side_effects(false);
return {
type: 'ha_event_waiter',
sideeffect_free: true,
plugin: this.hap[HyperWrapper].id,
schema: this.schema,
}
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/resumable/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

export { ResumablePromise, SerializedResumable } from './resumable_promise'
export { resumable } from './resumable_method'
export { ResumablePromise } from './resumable_promise'
export { ResumableStore } from './resumable_store'
15 changes: 8 additions & 7 deletions src/runtime/resumable/resumable_method.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@

import { InvertedWeakMap } from "@matchlighter/common_library/data/inverted_weakmap"
import { InvertedWeakMap } from "@matchlighter/common_library/data/inverted_weakmap";

import { ResumablePromise, SerializeContext, SerializedResumable, Suspend } from "./resumable_promise";
import { deep_pojso } from "../../common/util";
import { AsyncLocalStorage } from "async_hooks";
import { deep_pojso } from "../../common/util";
import { ResumablePromise, SerializeContext } from "./resumable_promise";

const Op = Object.prototype;
const hasOwn = Op.hasOwnProperty;
Expand Down Expand Up @@ -129,8 +129,9 @@ class Executor<T> extends ResumablePromise<T> {
readonly tryEntries: any[];

serialize(context: SerializeContext) {
context.set_type("@resumable");
context.side_effects(true);
return {
type: "@resumable",
pending_promise: context.ref(this.pending_promise),
scope: {
owner: this.scope.owner[RESUMABLE_CONTEXT_ID],
Expand Down Expand Up @@ -222,14 +223,14 @@ class Executor<T> extends ResumablePromise<T> {
// the .value of the Promise<{value,done}> result for the
// current iteration.
result.value = unwrapped;
this._resolve(result.value);
this.resolve(result.value);
}, (error) => {
// If a rejected Promise was yielded, throw the rejection back
// into the async generator function so it can be handled there.
return this.invoke("throw", error);
});
} catch (ex) {
this._reject(ex);
this.reject(ex);
}
}

Expand Down Expand Up @@ -549,8 +550,8 @@ export class ResumableCallbackPromise extends ResumablePromise<any> {
}

serialize(ctx: SerializeContext) {
ctx.set_type('call_by_name');
return {
type: 'call_by_name',
method: this.method_name,
lookup_context: this.lookup_context,
await_for: ctx.ref(this.await_for),
Expand Down
Loading

0 comments on commit 4bf1a19

Please sign in to comment.