From 267ea0ea043e54a7c2f96ebc28b86e671e59d75e Mon Sep 17 00:00:00 2001 From: David Whittington Date: Wed, 10 Jul 2024 12:44:22 -0500 Subject: [PATCH] feat(graphql clickhouse): add ClickHouse DB adapter PE-6488 This change adds support for a new ClickHouse based DB backend. It can be used in combination with the SQLite DB backend to enable batch loading of historical data from Parquet. It also opens up the possibility of higher DB performance and scalability. In its current state it should be considered a technology preview. It won't be useful to most users until we either provide Parquet files to load into it or automate flushing of the SQLite DB to it (both are planned in future release). It also is not intended to be standalone solution. It supports bulk loading and efficient GraphQL querying of transactions and data items, but it relies on SQLite (or potentially another OLTP in the future) to index recent data. These limitations allow greatly simplified schema and query construction. Querying the new ClickHouse DB for transaction and data items via GraphQL is enabled by setting the 'CLICKHOUSE_URL' environment variable. --- package.json | 1 + src/app.ts | 2 +- src/config.ts | 3 + src/database/composite-clickhouse.ts | 445 +++++++++++++++++++++++++++ src/database/standalone-sqlite.ts | 2 + src/lib/encoding.ts | 8 + src/routes/graphql/resolvers.ts | 5 +- src/system.ts | 15 +- src/types.d.ts | 5 +- yarn.lock | 12 + 10 files changed, 492 insertions(+), 6 deletions(-) create mode 100644 src/database/composite-clickhouse.ts diff --git a/package.json b/package.json index e6f784d0..e0f34961 100644 --- a/package.json +++ b/package.json @@ -12,6 +12,7 @@ "@aws-lite/client": "^0.21.7", "@aws-lite/s3": "^0.1.21", "@permaweb/aoconnect": "^0.0.56", + "@clickhouse/client": "^1.3.0", "apollo-server-express": "^3.13.0", "arbundles": "^0.11.1", "arweave": "^1.14.4", diff --git a/src/app.ts b/src/app.ts index 169a9edc..4a4bd9a7 100644 --- a/src/app.ts +++ b/src/app.ts @@ -61,7 +61,7 @@ app.use(dataRouter); app.use(arweaveRouter); // GraphQL -const apolloServerInstanceGql = apolloServer(system.db, { +const apolloServerInstanceGql = apolloServer(system.gqlQueryable, { introspection: true, persistedQueries: false, }); diff --git a/src/config.ts b/src/config.ts index cd29b137..a6ef7896 100644 --- a/src/config.ts +++ b/src/config.ts @@ -164,6 +164,9 @@ export const MAX_DATA_ITEM_QUEUE_SIZE = +env.varOrDefault( '100000', ); +// ClickHouse URL +export const CLICKHOUSE_URL = env.varOrUndefined('CLICKHOUSE_URL'); + // // ArNS and sandboxing // diff --git a/src/database/composite-clickhouse.ts b/src/database/composite-clickhouse.ts new file mode 100644 index 00000000..26bddfb9 --- /dev/null +++ b/src/database/composite-clickhouse.ts @@ -0,0 +1,445 @@ +/** + * AR.IO Gateway + * Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +import * as winston from 'winston'; +import sql from 'sql-bricks'; +import { ClickHouseClient, createClient } from '@clickhouse/client'; +import { ValidationError } from 'apollo-server-express'; + +import { + b64UrlToHex, + b64UrlToUtf8, + hexToB64Url, + utf8ToB64Url, +} from '../lib/encoding.js'; +import { GqlTransactionsResult, GqlQueryable } from '../types.js'; + +export function encodeTransactionGqlCursor({ + height, + blockTransactionIndex, + isDataItem, + id, + indexedAt, +}: { + height: number | null; + blockTransactionIndex: number | null; + isDataItem: boolean | null; + id: string | null; + indexedAt: number | null; +}) { + return utf8ToB64Url( + JSON.stringify([height, blockTransactionIndex, isDataItem, id, indexedAt]), + ); +} + +export function decodeTransactionGqlCursor(cursor: string | undefined) { + try { + if (cursor === undefined) { + return { + height: null, + blockTransactionIndex: null, + isDataItem: null, + id: null, + indexedAt: null, + }; + } + + const [height, blockTransactionIndex, isDataItem, id, indexedAt] = + JSON.parse(b64UrlToUtf8(cursor)) as [ + number | null, + number | null, + boolean | null, + string | null, + number | null, + ]; + + return { height, blockTransactionIndex, isDataItem, id, indexedAt }; + } catch (error) { + throw new ValidationError('Invalid transaction cursor'); + } +} + +export function encodeBlockGqlCursor({ height }: { height: number }) { + return utf8ToB64Url(JSON.stringify([height])); +} + +export function decodeBlockGqlCursor(cursor: string | undefined) { + try { + if (cursor === undefined) { + return { height: null }; + } + + const [height] = JSON.parse(b64UrlToUtf8(cursor)) as [number]; + + return { height }; + } catch (error) { + throw new ValidationError('Invalid block cursor'); + } +} + +// We do not have a good way to pass raw binary data to ClickHouse so we +// workaround that by converting to/from hex. +function inB64UrlStrings(xs: string[]) { + return sql(xs.map((x) => `unhex('${b64UrlToHex(x)}')`).join(', ')); +} + +export class CompositeClickHouseDatabase implements GqlQueryable { + private log: winston.Logger; + private clickhouseClient: ClickHouseClient; + private gqlQueryable: GqlQueryable; + + constructor({ + log, + gqlQueryable, + url, + }: { + log: winston.Logger; + gqlQueryable: GqlQueryable; + url: string; + }) { + this.log = log; + + this.clickhouseClient = createClient({ + url, + }); + + this.gqlQueryable = gqlQueryable; + } + + getGqlTransactionsBaseSql() { + return sql + .select() + .distinct( + 'height AS height', + 'block_transaction_index AS block_transaction_index', + 'is_data_item', + 'hex(id) AS id', + 'hex(anchor)', + 'hex(target) AS target', + 'toString(reward) AS reward', + 'toString(quantity) AS quantity', + 'toString(data_size) AS data_size', + 'content_type', + 'hex(owner_address) AS owner_address', + 'hex(parent_id) AS parent_id', + 'tags_count', + 'tags', + ) + .from('transactions t'); + } + + addGqlTransactionFilters({ + query, + cursor, + sortOrder = 'HEIGHT_DESC', + ids = [], + recipients = [], + owners = [], + minHeight = -1, + maxHeight = -1, + bundledIn, + tags = [], + }: { + query: sql.SelectStatement; + cursor?: string; + sortOrder?: 'HEIGHT_DESC' | 'HEIGHT_ASC'; + ids?: string[]; + recipients?: string[]; + owners?: string[]; + minHeight?: number; + maxHeight?: number; + bundledIn?: string[] | null; + tags: { name: string; values: string[] }[]; + }) { + const maxDbHeight = Infinity; + + if (ids?.length > 0) { + query.where(sql.in('t.id', inB64UrlStrings(ids))); + } + + if (recipients?.length > 0) { + query.where(sql.in('t.target', inB64UrlStrings(recipients))); + } + + if (owners?.length > 0) { + query.where(sql.in('t.owner_address', inB64UrlStrings(owners))); + } + + if (tags.length > 0) { + tags.forEach((tag) => { + const hexName = Buffer.from(tag.name).toString('hex'); + const hexValues = tag.values.map((value) => + Buffer.from(value).toString('hex'), + ); + const wheres = hexValues.map((hexValue) => + sql(`has(t.tags, (unhex('${hexName}'), unhex('${hexValue}')))`), + ); + query.where(sql.or.apply(null, wheres)); + }); + } + + if (minHeight != null && minHeight > 0) { + query.where(sql.gte('t.height', minHeight)); + } + + if (maxHeight != null && maxHeight >= 0 && maxHeight < maxDbHeight) { + query.where(sql.lte('t.height', maxHeight)); + } + + if (Array.isArray(bundledIn)) { + query.where(sql.in('t.parent_id', inB64UrlStrings(bundledIn))); + } + + const { + height: cursorHeight, + blockTransactionIndex: cursorBlockTransactionIndex, + isDataItem: cursorIsDataItem, + id: cursorId, + } = decodeTransactionGqlCursor(cursor); + + let orderBy = ''; + if (sortOrder === 'HEIGHT_DESC') { + if (cursorHeight != null) { + query.where( + sql.lte('t.height', cursorHeight), + sql.or( + sql.lt('t.height', cursorHeight), + sql.and( + sql.eq('t.height', cursorHeight), + sql.lt('t.block_transaction_index', cursorBlockTransactionIndex), + ), + sql.and( + sql.eq('t.height', cursorHeight), + sql.eq('t.block_transaction_index', cursorBlockTransactionIndex), + sql.lt('t.is_data_item', cursorIsDataItem), + ), + sql.and( + sql.eq('t.height', cursorHeight), + sql.eq('t.block_transaction_index', cursorBlockTransactionIndex), + sql.eq('t.is_data_item', cursorIsDataItem), + sql.lt( + 't.id', + sql(`unhex('${sql(b64UrlToHex(cursorId ?? ''))}')`), + ), + ), + ), + ); + } + + orderBy = 't.height DESC, '; + orderBy += 't.block_transaction_index DESC, '; + orderBy += 't.is_data_item DESC, '; + orderBy += 't.id DESC'; + } else { + if (cursorHeight != null) { + query.where( + sql.gte('t.height', cursorHeight), + sql.or( + sql.gt('t.height', cursorHeight), + sql.and( + sql.eq('t.height', cursorHeight), + sql.gt('t.block_transaction_index', cursorBlockTransactionIndex), + ), + sql.and( + sql.eq('t.height', cursorHeight), + sql.eq('t.block_transaction_index', cursorBlockTransactionIndex), + sql.gt('t.is_data_item', cursorIsDataItem), + ), + sql.and( + sql.eq('t.height', cursorHeight), + sql.eq('t.block_transaction_index', cursorBlockTransactionIndex), + sql.eq('t.is_data_item', cursorIsDataItem), + sql.gt( + 't.id', + sql(`unhex('${sql(b64UrlToHex(cursorId ?? ''))}')`), + ), + ), + ), + ); + } + + orderBy = 't.height ASC, '; + orderBy += 't.block_transaction_index ASC, '; + orderBy += 't.is_data_item ASC, '; + orderBy += 't.id ASC'; + } + query.orderBy(orderBy); + } + + async getGqlTransactions({ + pageSize, + cursor, + sortOrder = 'HEIGHT_DESC', + ids = [], + recipients = [], + owners = [], + minHeight = -1, + maxHeight = -1, + bundledIn, + tags = [], + }: { + pageSize: number; + cursor?: string; + sortOrder?: 'HEIGHT_DESC' | 'HEIGHT_ASC'; + ids?: string[]; + recipients?: string[]; + owners?: string[]; + minHeight?: number; + maxHeight?: number; + bundledIn?: string[] | null; + tags?: { name: string; values: string[] }[]; + }): Promise { + const txsQuery = this.getGqlTransactionsBaseSql(); + + this.addGqlTransactionFilters({ + query: txsQuery, + cursor, + sortOrder, + ids, + recipients, + owners, + minHeight, + maxHeight, + bundledIn, + tags, + }); + + const txsSql = txsQuery.toString(); + const sql = `${txsSql} LIMIT ${pageSize + 1}`; + + this.log.debug('Querying ClickHouse transactions...', { sql }); + + const row = await this.clickhouseClient.query({ query: sql }); + const jsonRow = await row.json(); + const txs = jsonRow.data.map((tx: any) => ({ + height: tx.height as number, + blockTransactionIndex: tx.block_transaction_index as number, + isDataItem: tx.is_data_item as boolean, + id: hexToB64Url(tx.id), + dataItemId: tx.is_data_item ? hexToB64Url(tx.id) : null, + indexedAt: tx.indexed_at as number, + anchor: tx.anchor ? hexToB64Url(tx.anchor) : null, + signature: null, + recipient: tx.target ? hexToB64Url(tx.target) : null, + ownerAddress: hexToB64Url(tx.owner_address), + ownerKey: null, + fee: tx.reward as string, + quantity: tx.quantity as string, + dataSize: tx.data_size as string, + tags: + tx.tags_count > 0 + ? tx.tags.map((tag: any) => ({ + name: tag[0] as string, + value: tag[1] as string, + })) + : [], + contentType: tx.content_type as string, + blockIndepHash: tx.block_indep_hash + ? hexToB64Url(tx.block_indep_hash) + : null, + blockTimestamp: tx.block_timestamp as number, + blockPreviousBlock: tx.block_previous_block + ? hexToB64Url(tx.block_previous_block) + : null, + parentId: tx.parent_id ? hexToB64Url(tx.parent_id) : null, + })); + + const gqlQueryableResults = await this.gqlQueryable.getGqlTransactions({ + pageSize, + cursor, + sortOrder, + ids, + recipients, + owners, + minHeight, + maxHeight, + tags, + }); + + // Filter out edges that already exist in the ClickHouse results + const gqlQueryableEdges = gqlQueryableResults.edges.filter( + (edge) => !txs.some((tx) => tx.id === edge.node.id), + ); + + // Combine the ClickHouse results with the gqlQueryable results + const edges = [ + ...txs.map((tx) => ({ + cursor: encodeTransactionGqlCursor(tx), + node: tx, + })), + ...gqlQueryableEdges, + ]; + + // Sort the combined results by height, blockTransactionIndex, isDataItem, and id + edges.sort((a, b) => { + const sortOrderModifier = sortOrder === 'HEIGHT_DESC' ? -1 : 1; + + const txA = a.node; + const txB = b.node; + + const heightA = txA.height ?? Number.MAX_SAFE_INTEGER; + const heightB = txB.height ?? Number.MAX_SAFE_INTEGER; + if (heightA !== heightB) { + return (heightA - heightB) * sortOrderModifier; + } + + if (txA.blockTransactionIndex !== txB.blockTransactionIndex) { + return ( + (txA.blockTransactionIndex - txB.blockTransactionIndex) * + sortOrderModifier + ); + } + + if (txA.isDataItem !== txB.isDataItem) { + return (txA.isDataItem ? 1 : -1) * sortOrderModifier; + } + + return txA.id.localeCompare(txB.id) * sortOrderModifier; + }); + + return { + pageInfo: { + hasNextPage: edges.length > pageSize, + }, + edges: edges.slice(0, pageSize), + }; + } + + async getGqlTransaction({ id }: { id: string }) { + return ( + await this.getGqlTransactions({ + pageSize: 1, + ids: [id], + }) + ).edges[0].node; + } + + getGqlBlock(args: { id: string }) { + return this.gqlQueryable.getGqlBlock(args); + } + + getGqlBlocks(args: { + pageSize: number; + cursor?: string; + sortOrder?: 'HEIGHT_DESC' | 'HEIGHT_ASC'; + ids?: string[]; + minHeight?: number; + maxHeight?: number; + }) { + return this.gqlQueryable.getGqlBlocks(args); + } +} diff --git a/src/database/standalone-sqlite.ts b/src/database/standalone-sqlite.ts index fc318dd8..f1443476 100644 --- a/src/database/standalone-sqlite.ts +++ b/src/database/standalone-sqlite.ts @@ -1758,6 +1758,7 @@ export class StandaloneSqliteDatabaseWorker { ? toB64Url(tx.block_previous_block) : null, parentId: tx.parent_id ? toB64Url(tx.parent_id) : null, + isDataItem: tx.data_item_id.length > 1, })); } @@ -1870,6 +1871,7 @@ export class StandaloneSqliteDatabaseWorker { blockTimestamp: tx.block_timestamp, blockPreviousBlock: toB64Url(tx.block_previous_block), parentId: tx.parent_id ? toB64Url(tx.parent_id) : null, + isDataItem: tx.data_item_id.length > 1, })); } diff --git a/src/lib/encoding.ts b/src/lib/encoding.ts index bf5097f9..c2775177 100644 --- a/src/lib/encoding.ts +++ b/src/lib/encoding.ts @@ -49,6 +49,14 @@ export function utf8ToB64Url(input: string): string { return toB64Url(Buffer.from(input, 'utf8')); } +export function b64UrlToHex(input: string): string { + return fromB64Url(input).toString('hex'); +} + +export function hexToB64Url(input: string): string { + return toB64Url(Buffer.from(input, 'hex')); +} + export function sha256B64Url(input: Buffer): string { return toB64Url(createHash('sha256').update(input).digest()); } diff --git a/src/routes/graphql/resolvers.ts b/src/routes/graphql/resolvers.ts index 962a440f..04ae6146 100644 --- a/src/routes/graphql/resolvers.ts +++ b/src/routes/graphql/resolvers.ts @@ -107,7 +107,7 @@ export const resolvers: IResolvers = { id: queryParams.id, }); }, - transactions: (_, queryParams, { db }) => { + transactions: async (_, queryParams, { db }) => { log.info('GraphQL transactions query', { resolver: 'transactions', queryParams, @@ -148,7 +148,8 @@ export const resolvers: IResolvers = { }, Transaction: { block: (parent: GqlTransaction) => { - return parent.blockIndepHash !== null + // TODO remove ClickHouse height !== null hack once blocks are in ClickHouse + return parent.height !== null || parent.blockIndepHash !== null ? { id: parent.blockIndepHash, timestamp: parent.blockTimestamp, diff --git a/src/system.ts b/src/system.ts index 61a3063e..34fa5667 100644 --- a/src/system.ts +++ b/src/system.ts @@ -31,6 +31,7 @@ import { ReadThroughDataCache } from './data/read-through-data-cache.js'; import { SequentialDataSource } from './data/sequential-data-source.js'; import { TxChunksDataSource } from './data/tx-chunks-data-source.js'; import { BundleDataImporter } from './workers/bundle-data-importer.js'; +import { CompositeClickHouseDatabase } from './database/composite-clickhouse.js'; import { StandaloneSqliteDatabase } from './database/standalone-sqlite.js'; import * as events from './events.js'; import { MatchTags } from './filters.js'; @@ -48,9 +49,10 @@ import { BundleIndex, ChainIndex, ChainOffsetIndex, - ContiguousDataSource, ContiguousDataIndex, + ContiguousDataSource, DataItemIndexWriter, + GqlQueryable, MatchableItem, NestedDataIndexWriter, NormalizedDataItem, @@ -146,6 +148,17 @@ export const contiguousDataIndex: ContiguousDataIndex = db; export const blockListValidator: BlockListValidator = db; export const nestedDataIndexWriter: NestedDataIndexWriter = db; export const dataItemIndexWriter: DataItemIndexWriter = db; +export const gqlQueryable: GqlQueryable = (() => { + if (config.CLICKHOUSE_URL !== undefined) { + return new CompositeClickHouseDatabase({ + log, + gqlQueryable: db, + url: config.CLICKHOUSE_URL, + }); + } + + return db; +})(); // Workers export const eventEmitter = new EventEmitter(); diff --git a/src/types.d.ts b/src/types.d.ts index c5700180..60d0ce2e 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -313,11 +313,11 @@ interface GqlPageInfo { interface GqlTransaction { id: string; - anchor: string; + anchor: string | null; signature: string | null; recipient: string | null; ownerAddress: string; - ownerKey: string; + ownerKey: string | null; fee: string; quantity: string; dataSize: string; @@ -331,6 +331,7 @@ interface GqlTransaction { dataItemId: string | null; tags: { name: any; value: any }[]; indexedAt: number; + isDataItem: boolean; } interface GqlTransactionEdge { diff --git a/yarn.lock b/yarn.lock index 6db8be2d..04c5b930 100644 --- a/yarn.lock +++ b/yarn.lock @@ -877,6 +877,18 @@ resolved "https://registry.yarnpkg.com/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz#75a2e8b51cb758a7553d6804a5932d7aace75c39" integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw== +"@clickhouse/client-common@1.3.0": + version "1.3.0" + resolved "https://registry.yarnpkg.com/@clickhouse/client-common/-/client-common-1.3.0.tgz#9fe4b88eedb233770832cb1c0794f4ad9fbec75c" + integrity sha512-fApbhu52WSQlBU0gMZQxd2akuWbI1c9BcWjIbDDgtNZX2OggrIB7a4oI845ZGXpeZCZDI9ZqtkXzbYQYS0Yqew== + +"@clickhouse/client@^1.3.0": + version "1.3.0" + resolved "https://registry.yarnpkg.com/@clickhouse/client/-/client-1.3.0.tgz#298529b970a9ba2ae5e017258aaf6cf522cb2b51" + integrity sha512-baBiuwVpXg/DHCe9Y1pNf+tcE2ZalCAQqZsR9OhP7+01C3UqTjHeY4eYixNlpfZCb8c8R4GygdWJFbXF0aGklw== + dependencies: + "@clickhouse/client-common" "1.3.0" + "@colors/colors@1.5.0": version "1.5.0" resolved "https://registry.yarnpkg.com/@colors/colors/-/colors-1.5.0.tgz#bb504579c1cae923e6576a4f5da43d25f97bdbd9"