Skip to content

Commit

Permalink
add aggregates first cut
Browse files Browse the repository at this point in the history
  • Loading branch information
TristenHarr committed Oct 29, 2024
1 parent 1ada467 commit 29cacad
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 24 deletions.
17 changes: 0 additions & 17 deletions connector-definition/template/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,6 @@ import { GoogleCalendar, GMail} from "@hasura/ndc-duckduckapi/services";
const connectorConfig: duckduckapi = {
dbSchema: `
DROP TABLE IF EXISTS users;
CREATE TABLE users (
id integer primary key,
name text
);
INSERT INTO users (id, name) VALUES
(1, 'Alice Johnson'),
(2, 'Bob Smith'),
(3, 'Carol Martinez'),
(4, 'David Kim'),
(5, 'Emma Wilson'),
(6, 'Frank Zhang'),
(7, 'Grace Lee'),
(8, 'Henry Garcia'),
(9, 'Isabel Patel'),
(10, 'Jack Thompson');
-- Add your SQL schema here.
-- This SQL will be run on startup every time.
-- CREATE TABLE SAAS_TABLE_NAME (.....);
Expand Down
1 change: 1 addition & 0 deletions ndc-duckduckapi/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export const CAPABILITIES_RESPONSE: Capabilities = {
query: {
variables: {},
nested_fields: {},
aggregates: {}
},
mutation: {
transactional: null,
Expand Down
87 changes: 80 additions & 7 deletions ndc-duckduckapi/src/handlers/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,57 @@ function build_query(
let order_by_sql = ``;
let collect_rows = [];
let where_conditions = ["WHERE 1"];

if (query.aggregates) {
run_agg = true;
agg_sql = "... todo";
throw new Forbidden("Aggregates not implemented yet!", {});
let agg_columns: string[] = [];

// Process each aggregate
for (const [agg_name, agg_value] of Object.entries(query.aggregates)) {
if (agg_value.type === "star_count") {
agg_columns.push(`COUNT(*) as ${escape_double(agg_name)}`);
} else if (agg_value.type === "column_count") {
// Handle column count aggregation
const column_expr = agg_value.distinct
? `COUNT(DISTINCT ${escape_double(collection_alias)}.${escape_double(agg_value.column)})`
: `COUNT(${escape_double(collection_alias)}.${escape_double(agg_value.column)})`;
agg_columns.push(`${column_expr} as ${escape_double(agg_name)}`);
} else {
throw new Forbidden(`Aggregate type ${agg_value.type} not implemented yet!`, {});
}
}

let from_sql = `${collection} as ${escape_double(collection_alias)}`;
if (path.length > 1 && relationship_key !== null) {
let relationship = query_request.collection_relationships[relationship_key];
let parent_alias = path.slice(0, -1).join("_");
let relationship_alias = config.duckdbConfig.collection_aliases[relationship.target_collection];
from_sql = `${relationship_alias} as ${escape_double(collection_alias)}`;
where_conditions.push(
...Object.entries(relationship.column_mapping).map(([from, to]) => {
return `${escape_double(parent_alias)}.${escape_double(from)} = ${escape_double(collection_alias)}.${escape_double(to)}`;
}),
);
}

if (query.predicate) {
where_conditions.push(
`(${build_where(query.predicate, query_request.collection_relationships, agg_args, variables, collection_alias, config.duckdbConfig.collection_aliases)})`
);
}

agg_sql = wrap_data(`
SELECT JSON_OBJECT(
${agg_columns.map(col => {
const parts = col.split(' as ');
return `${escape_single(parts[1].replace(/"/g, ''))}, ${parts[0]}`;
}).join(',')}
) as data
FROM ${from_sql}
${where_conditions.join(" AND ")}
`);
}

if (query.fields) {
run_sql = true;
for (let [field_name, field_value] of Object.entries(query.fields)) {
Expand Down Expand Up @@ -493,17 +539,44 @@ export async function perform_query(
state: State,
query_plans: SQLQuery[],
): Promise<QueryResponse> {
// const con = state.client.connect();
const response: RowSet[] = [];
for (let query_plan of query_plans) {
try {
const connection = await state.client.getSyncConnection();
const res = await do_all(connection, query_plan);
const row_set = JSON.parse(res[0]["data"] as string) as RowSet;
let row_set: RowSet = { rows: [] };

// Handle aggregate query if present
if (query_plan.runAgg) {
const aggRes = await do_all(connection, {
runSql: true,
runAgg: false,
sql: query_plan.aggSql,
args: query_plan.aggArgs,
aggSql: "",
aggArgs: []
});
const parsedAggData = JSON.parse(aggRes[0]["data"]);
row_set.aggregates = parsedAggData;
}

// Handle regular query if present
if (query_plan.runSql) {
const res = await do_all(connection, {
runSql: true,
runAgg: false,
sql: query_plan.sql,
args: query_plan.args,
aggSql: "",
aggArgs: []
});
const regular_results = JSON.parse(res[0]["data"]);
row_set.rows = regular_results.rows;
}
response.push(row_set);
} catch (err) {
console.error('RAAAAAAAAAAAAAAAAAAAAAAR.perform_query:: ' + "Error performing query: " + err);
console.error('Error performing query: ' + err);
throw err;
}
}
return response;
}
}

0 comments on commit 29cacad

Please sign in to comment.