Skip to content

Commit

Permalink
feat: improve state registration code
Browse files Browse the repository at this point in the history
  • Loading branch information
shah committed Jan 23, 2024
1 parent 211d289 commit 43e9e84
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 24 deletions.
4 changes: 2 additions & 2 deletions pattern/orchestration/duckdb/notebook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class DuckDbOrchEmitContext implements o.OrchEmitContext {
* Compute the current timestamp and prepare DuckDB SQL
* @returns SQL supplier of the Javascript runtime's current time
*/
get newCurrentTimestamp(): SQLa.SqlTextSupplier<SQLa.SqlEmitContext> {
get jsRuntimeNow(): SQLa.SqlTextSupplier<SQLa.SqlEmitContext> {
return {
SQL: () => {
const now = new Date();
Expand All @@ -54,7 +54,7 @@ export class DuckDbOrchEmitContext implements o.OrchEmitContext {
return { SQL: () => `ON CONFLICT DO NOTHING` };
}

get sqlEngineNow(): SQLa.SqlTextSupplier<DuckDbOrchEmitContext> {
get sqlEngineNow(): SQLa.SqlTextSupplier<SQLa.SqlEmitContext> {
return { SQL: () => `CURRENT_TIMESTAMP` };
}
}
Expand Down
9 changes: 8 additions & 1 deletion pattern/orchestration/governance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@ export interface OrchEmitContext extends SQLa.SqlEmitContext {
* which returns the current time from the client (TypeScript).
* @returns SQL supplier of the Javascript runtime's current time
*/
readonly newCurrentTimestamp: SQLa.SqlTextSupplier<SQLa.SqlEmitContext>;
readonly jsRuntimeNow: SQLa.SqlTextSupplier<SQLa.SqlEmitContext>;

/**
* Compute the current timestamp and prepare db-specific dialect SQL function
* which returns the current time from the database (SQL) engine.
* @returns SQL supplier of the Database's current time
*/
readonly sqlEngineNow: SQLa.SqlTextSupplier<SQLa.SqlEmitContext>;

/**
* Property to pass into insert or other DML when we want to ignore conflicts.
Expand Down
25 changes: 18 additions & 7 deletions pattern/orchestration/ingest.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { fs } from "./deps.ts";
import * as SQLa from "../../render/mod.ts";
import * as g from "./governance.ts";
import * as nb from "./notebook.ts";

// deno-lint-ignore no-explicit-any
type Any = any;
Expand Down Expand Up @@ -29,11 +30,15 @@ export interface IngestSourceStructAssuranceContext<
}

export interface IngestableResource<
Governance extends g.OrchGovernance<EmitContext>,
EmitContext extends g.OrchEmitContext,
> {
readonly uri: string;
readonly nature: string;
readonly workflow: (sessionID: string, sessionEntryID: string) => {
readonly workflow: (
session: nb.OrchSession<Governance, EmitContext>,
sessionEntryID: string,
) => Promise<{
readonly ingestSQL: (
issac: IngestSourceStructAssuranceContext<EmitContext>,
) =>
Expand All @@ -45,30 +50,33 @@ export interface IngestableResource<
readonly exportResourceSQL: (targetSchema: string) =>
| Promise<SQLa.SqlTextSupplier<EmitContext>>
| SQLa.SqlTextSupplier<EmitContext>;
};
}>;
}

export interface InvalidIngestSource<
Governance extends g.OrchGovernance<EmitContext>,
EmitContext extends g.OrchEmitContext,
> extends IngestableResource<EmitContext> {
> extends IngestableResource<Governance, EmitContext> {
readonly nature: "ERROR";
readonly error: Error;
readonly tableName: string;
}

export interface CsvFileIngestSource<
TableName extends string,
Governance extends g.OrchGovernance<EmitContext>,
EmitContext extends g.OrchEmitContext,
> extends IngestableResource<EmitContext> {
> extends IngestableResource<Governance, EmitContext> {
readonly nature: "CSV";
readonly tableName: TableName;
}

export interface ExcelSheetIngestSource<
SheetName extends string,
TableName extends string,
Governance extends g.OrchGovernance<EmitContext>,
EmitContext extends g.OrchEmitContext,
> extends IngestableResource<EmitContext> {
> extends IngestableResource<Governance, EmitContext> {
readonly nature: "Excel Workbook Sheet";
readonly sheetName: SheetName;
readonly tableName: TableName;
Expand All @@ -94,7 +102,7 @@ export interface IngestFsPatternSourcesSupplier<PotentialIngestSource>
export class ErrorIngestSource<
Governance extends g.OrchGovernance<EmitContext>,
EmitContext extends g.OrchEmitContext,
> implements InvalidIngestSource<EmitContext> {
> implements InvalidIngestSource<Governance, EmitContext> {
readonly nature = "ERROR";
readonly tableName = "ERROR";
constructor(
Expand All @@ -105,7 +113,10 @@ export class ErrorIngestSource<
) {
}

workflow(): ReturnType<InvalidIngestSource<EmitContext>["workflow"]> {
// deno-lint-ignore require-await
async workflow(): ReturnType<
InvalidIngestSource<Governance, EmitContext>["workflow"]
> {
return {
ingestSQL: async (issac) =>
// deno-fmt-ignore
Expand Down
54 changes: 40 additions & 14 deletions pattern/orchestration/notebook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export class OrchSession<
return this.deviceDmlSingleton;
}

async orchSessionSqlDML(): Promise<
async orchSessionSqlDML(now = this.govn.emitCtx.jsRuntimeNow): Promise<
& { readonly sessionID: string }
& SQLa.SqlTextSupplier<EmitContext>
> {
Expand All @@ -97,7 +97,7 @@ export class OrchSession<
...orchSessionCRF.insertDML({
orch_session_id: sessionID,
device_id: device.deviceID,
orch_started_at: this.govn.emitCtx.newCurrentTimestamp,
orch_started_at: now,
// orch_started_at and diagnostics_arg, diagnostics_json, diagnostics_md should be
// supplied after session is completed
diagnostics_md:
Expand All @@ -112,6 +112,7 @@ export class OrchSession<
fromState: string,
toState: string,
reason: string,
at = this.govn.emitCtx.jsRuntimeNow,
elaboration?: string,
) {
const sessionDML = await this.orchSessionSqlDML();
Expand All @@ -123,32 +124,51 @@ export class OrchSession<
from_state: fromState,
to_state: toState,
transition_reason: reason,
transitioned_at: ctx.newCurrentTimestamp,
transitioned_at: at,
elaboration,
}),
);
}

async registerEntryState(
async entryStateDML(
sessionEntryID: string,
fromState: string,
toState: string,
reason: string,
at = this.govn.emitCtx.jsRuntimeNow,
elaboration?: string,
) {
const sessionDML = await this.orchSessionSqlDML();
const { emitCtx: ctx, orchSessionStateCRF, deterministicPKs } = this.govn;
return orchSessionStateCRF.insertDML({
orch_session_state_id: await ctx.newUUID(deterministicPKs),
session_id: sessionDML.sessionID,
session_entry_id: sessionEntryID,
from_state: fromState,
to_state: toState,
transition_reason: reason,
transitioned_at: at,
elaboration,
});
}

async registerEntryState(
sessionEntryID: string,
fromState: string,
toState: string,
reason: string,
at = this.govn.emitCtx.jsRuntimeNow,
elaboration?: string,
) {
this.stateChangesDML.push(
orchSessionStateCRF.insertDML({
orch_session_state_id: await ctx.newUUID(deterministicPKs),
session_id: sessionDML.sessionID,
session_entry_id: sessionEntryID,
from_state: fromState,
to_state: toState,
transition_reason: reason,
transitioned_at: ctx.newCurrentTimestamp,
await this.entryStateDML(
sessionEntryID,
fromState,
toState,
reason,
at,
elaboration,
}),
),
);
}

Expand Down Expand Up @@ -316,7 +336,13 @@ export async function orchestrate<
if (active.fromState == fromState && active.toState == toState) return;
}
// if we get here it means we're actually changing the state
await session.registerState(fromState, toState, reason, elaboration);
await session.registerState(
fromState,
toState,
reason,
govn.emitCtx.jsRuntimeNow,
elaboration,
);
cellStates.push({ fromState, toState });
};

Expand Down

0 comments on commit 43e9e84

Please sign in to comment.