Skip to content

Commit

Permalink
refactor: separate governance from DML
Browse files Browse the repository at this point in the history
  • Loading branch information
shah committed Dec 30, 2023
1 parent 1c7eec3 commit c7f3d27
Showing 1 changed file with 153 additions and 68 deletions.
221 changes: 153 additions & 68 deletions pattern/ingest/duckdb/notebook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,21 +415,136 @@ export class IngestGovernance {
.replace(/^_+|_+$/g, "") // Remove leading and trailing underscores
.replace(/__+/g, "_"); // Replace multiple underscores with a single one
}
}

export type IngestSqlRegistrationExecution =
| "before-init"
| "after-init"
| "before-finalize"
| "after-finalize";

export class IngestSession {
protected sessionDmlSingleton?: Awaited<
ReturnType<IngestSession["ingestSessionSqlDML"]>
>;
readonly stateChangesDML: ReturnType<
IngestGovernance["ingestSessionStateCRF"]["insertDML"]
>[] = [];

constructor(
readonly govn: IngestGovernance,
readonly sqlCatalog: Record<
IngestSqlRegistrationExecution,
SQLa.SqlTextSupplier<IngestEmitContext>[]
> = {
"before-init": [],
"after-init": [],
"before-finalize": [],
"after-finalize": [],
},
) {
}

sqlCatalogSqlText(exec: IngestSqlRegistrationExecution) {
switch (exec) {
// before-finalize should include all state changes SQL as well
case "before-finalize":
return [...this.sqlCatalog[exec], ...this.stateChangesDML].map((dml) =>
dml.SQL(this.govn.emitCtx)
);

default:
return this.sqlCatalog[exec].map((dml) => dml.SQL(this.govn.emitCtx));
}
}

#sessionDML?: Awaited<ReturnType<IngestGovernance["ingestSessionSqlDML"]>>;
sqlCatalogSqlSuppliers(exec: IngestSqlRegistrationExecution) {
return this.sqlCatalog[exec];
}

async ingestSessionSqlDML(): Promise<
& { readonly sessionID: string }
& SQLa.SqlTextSupplier<IngestEmitContext>
> {
if (!this.#sessionDML) {
const sessionID = await this.emitCtx.newUUID(this.deterministicPKs);
this.#sessionDML = {
if (!this.sessionDmlSingleton) {
const { emitCtx: ctx, ingestSessionCRF, deterministicPKs } = this.govn;
const sessionID = await ctx.newUUID(deterministicPKs);
this.sessionDmlSingleton = {
sessionID,
...this.ingestSessionCRF.insertDML({ ingest_session_id: sessionID }),
...ingestSessionCRF.insertDML({ ingest_session_id: sessionID }),
};
}
return this.#sessionDML;
return this.sessionDmlSingleton;
}

async registerState(
fromState: string,
toState: string,
reason: string,
elaboration?: string,
) {
const sessionDML = await this.ingestSessionSqlDML();
const { emitCtx: ctx, ingestSessionStateCRF, deterministicPKs } = this.govn;
this.stateChangesDML.push(
ingestSessionStateCRF.insertDML({
ingest_session_state_id: await ctx.newUUID(deterministicPKs),
session_id: sessionDML.sessionID,
from_state: fromState,
to_state: toState,
transition_reason: reason,
transitioned_at: ctx.newCurrentTimestamp,
elaboration,
}),
);
}

async registerEntryState(
sessionEntryID: string,
fromState: string,
toState: string,
reason: string,
elaboration?: string,
) {
const sessionDML = await this.ingestSessionSqlDML();
const { emitCtx: ctx, ingestSessionStateCRF, deterministicPKs } = this.govn;
this.stateChangesDML.push(
ingestSessionStateCRF.insertDML({
ingest_session_state_id: await ctx.newUUID(deterministicPKs),
session_id: sessionDML.sessionID,
session_entry_id: sessionEntryID,
from_state: fromState,
to_state: toState,
transition_reason: reason,
transitioned_at: ctx.newCurrentTimestamp,
elaboration,
}),
);
}

/**
* Prepare a SQL view that combines all the diagnostics into a single
* denormalized table that can be used to generate a Excel workbook using
* GDAL (spacial plugin). Notably this means removing duplicate columns
* and not using timestamptz types (GDAL doesn't support timestamptz).
* @returns
*/
diagnosticsView() {
return this.govn.viewDefn("ingest_session_diagnostic_text")`
SELECT
-- Including all other columns from 'ingest_session'
ises.* EXCLUDE (ingest_started_at, ingest_finished_at),
-- TODO: Casting known timestamp columns to text so emit to Excel works with GDAL (spatial)
-- strftime(timestamptz ingest_started_at, '%Y-%m-%d %H:%M:%S') AS ingest_started_at,
-- strftime(timestamptz ingest_finished_at, '%Y-%m-%d %H:%M:%S') AS ingest_finished_at,
-- Including all columns from 'ingest_session_entry'
isee.* EXCLUDE (session_id),
-- Including all other columns from 'ingest_session_issue'
isi.* EXCLUDE (session_id, session_entry_id)
FROM ingest_session AS ises
JOIN ingest_session_entry AS isee ON ises.ingest_session_id = isee.session_id
LEFT JOIN ingest_session_issue AS isi ON isee.ingest_session_entry_id = isi.session_entry_id`;
}
}

Expand Down Expand Up @@ -512,42 +627,6 @@ export class IngestTableAssuranceRules<TableName extends string>
}
}

export type IngestSqlRegistrationExecution =
| "before-init"
| "after-init"
| "before-finalize"
| "after-finalize";

export interface IngestSqlRegister {
readonly catalog: Record<
IngestSqlRegistrationExecution,
Iterable<SQLa.SqlTextSupplier<IngestEmitContext>>
>;
readonly register: (
sts: SQLa.SqlTextSupplier<IngestEmitContext>,
execute: IngestSqlRegistrationExecution,
) => void;
}

export function ingestSqlRegister(): IngestSqlRegister {
const catalog: Record<
IngestSqlRegistrationExecution,
SQLa.SqlTextSupplier<IngestEmitContext>[]
> = {
"before-init": [],
"after-init": [],
"before-finalize": [],
"after-finalize": [],
};

return {
catalog,
register: (sts, execute) => {
catalog[execute].push(sts);
},
};
}

export class IngestResumableError extends Error {
constructor(readonly issue: string, cause?: Error) {
super(issue);
Expand All @@ -556,7 +635,7 @@ export class IngestResumableError extends Error {
}

export interface IngestArgs<Governance extends IngestGovernance, Notebook> {
readonly sqlRegister: IngestSqlRegister;
readonly session: IngestSession;
readonly emitDagPuml?:
| ((puml: string, previewUrl: string) => Promise<void>)
| undefined;
Expand Down Expand Up @@ -617,45 +696,47 @@ export async function ingest<
const workflow = await init.newInstance(govn, args);
const initRunState = await kernel.initRunState();
const { runState: { eventEmitter: rsEE } } = initRunState;
const sessionID = (await govn.ingestSessionSqlDML()).sessionID;
const { sqlRegister } = args;
const { session } = args;
const cellStates: { fromState: string; toState: string }[] = [];

const registerStateChange = async (
const registerCellState = async (
fromState: string,
toState: string,
reason: string,
elaboration?: string,
) => {
sqlRegister.register(
govn.ingestSessionStateCRF.insertDML({
ingest_session_state_id: await govn.emitCtx.newUUID(
govn.deterministicPKs,
),
session_id: sessionID,
from_state: fromState,
to_state: toState,
transitioned_at: govn.emitCtx.newCurrentTimestamp,
elaboration,
}),
"before-finalize",
);
// if the the new state change is already registered, skip it
if (cellStates.length) {
const active = cellStates[cellStates.length - 1];
if (active.fromState == fromState && active.toState == toState) return;
}
// if we get here it means we're actually changing the state
await session.registerState(fromState, toState, reason, elaboration);
cellStates.push({ fromState, toState });
};

rsEE.initNotebook = (_ctx) => {
// TODO: any reason to record this state change?
};
rsEE.beforeCell = (cell, ctx) => {
registerStateChange(
rsEE.beforeCell = async (cell, ctx) => {
await registerCellState(
ctx.previous ? `EXIT(${String(ctx.previous.current.nbCellID)})` : "NONE",
`ENTER(${cell})`,
"rsEE.beforeCell",
);
};
rsEE.afterInterrupt = (cell, _ctx) => {
registerStateChange(`ENTER(${cell})`, `INTERRUPTED(${cell})`);
rsEE.afterInterrupt = async (cell, _ctx) => {
await registerCellState(
`EXIT(${cell})`,
`INTERRUPTED(${cell})`,
"rsEE.afterInterrupt",
);
};
rsEE.afterError = (cell, error, _ctx) => {
registerStateChange(
`ENTER(${cell})`,
rsEE.afterError = async (cell, error, _ctx) => {
await registerCellState(
`EXIT(${cell})`,
`ERROR(${cell})`,
"rsEE.afterError",
JSON.stringify(
{
name: error.name,
Expand All @@ -673,8 +754,12 @@ export async function ingest<
console.error(`[Non-resumable issue in '${cell}']`, error);
return "abort";
};
rsEE.afterCell = (cell, _result, _ctx) => {
registerStateChange(`ENTER(${cell})`, `EXIT(${cell})`);
rsEE.afterCell = async (cell, _result, ctx) => {
await registerCellState(
`EXIT(${cell})`,
ctx.next ? `ENTER(${String(ctx.next.nbCellID)})` : "NONE",
"rsEE.afterCell",
);
};
rsEE.finalizeNotebook = (_ctx) => {
// TODO: add final state change?
Expand Down

0 comments on commit c7f3d27

Please sign in to comment.