diff --git a/.github/workflows/build-core.yml b/.github/workflows/build-core.yml
index 27578988..5b3378f1 100644
--- a/.github/workflows/build-core.yml
+++ b/.github/workflows/build-core.yml
@@ -56,7 +56,7 @@ jobs:
uses: VeryGoodOpenSource/very_good_coverage@v2
with:
path: ./coverage/lcov.info
- min_coverage: 60
+ min_coverage: 50
# Build and push container image to GCR (only on main branch)
- name: Log in to the GitHub Container Registry
@@ -78,7 +78,7 @@ jobs:
# Build and push container image to ECR
- name: Configure AWS credentials
- uses: aws-actions/configure-aws-credentials@v1-node16
+ uses: aws-actions/configure-aws-credentials@v2
with:
role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/${{ secrets.AWS_BUILD_INVOCATION_ROLE }}
aws-region: ${{ secrets.AWS_REGION }}
diff --git a/.gitignore b/.gitignore
index 1d0bfdaa..e1c406a0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,6 +8,10 @@
/data
/dist
/node_modules
+/vendor
+
+# Generated docs
+/docs/sqlite/bundles
# VS Code
/.vscode
diff --git a/docker-compose.yaml b/docker-compose.yaml
index 327aef82..061eb5eb 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -42,7 +42,9 @@ services:
- INSTANCE_ID=${INSTANCE_ID:-}
- AR_IO_WALLET=${AR_IO_WALLET:-}
- ADMIN_API_KEY=${ADMIN_API_KEY:-}
+ - BACKFILL_BUNDLE_RECORDS=${BACKFILL_BUNDLE_RECORDS:-}
+ - FILTER_CHANGE_REPROCESS=${FILTER_CHANGE_REPROCESS:-}
- ANS104_UNBUNDLE_FILTER=${ANS104_UNBUNDLE_FILTER:-}
- - ANS104_DATA_INDEX_FILTER=${ANS104_DATA_INDEX_FILTER:-}
+ - ANS104_INDEX_FILTER=${ANS104_INDEX_FILTER:-}
- ARNS_ROOT_HOST=${ARNS_ROOT_HOST:-}
- SANDBOX_PROTOCOL=${SANDBOX_PROTOCOL:-}
diff --git a/docs/sqlite/bundles.meta.xml b/docs/sqlite/bundles.meta.xml
new file mode 100644
index 00000000..516abe76
--- /dev/null
+++ b/docs/sqlite/bundles.meta.xml
@@ -0,0 +1,109 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/docs/sqlite/bundles.properties b/docs/sqlite/bundles.properties
new file mode 100644
index 00000000..bb006198
--- /dev/null
+++ b/docs/sqlite/bundles.properties
@@ -0,0 +1,5 @@
+dbms=SQLite
+description=Xerial
+connectionSpec=jdbc:sqlite:data/sqlite/bundles.db?open_mode=1
+driver=org.sqlite.JDBC
+driverPath=vendor/sqlite-jdbc-3.42.0.0.jar
diff --git a/migrations/2023.05.15T18.06.50.bundles.init-schema.sql b/migrations/2023.05.15T18.06.50.bundles.init-schema.sql
new file mode 100644
index 00000000..cdc6ac5a
--- /dev/null
+++ b/migrations/2023.05.15T18.06.50.bundles.init-schema.sql
@@ -0,0 +1,125 @@
+CREATE TABLE IF NOT EXISTS bundle_formats (
+ id INTEGER PRIMARY KEY,
+ name TEXT NOT NULL
+);
+
+INSERT INTO bundle_formats (id, name) VALUES (0, 'ans-102');
+INSERT INTO bundle_formats (id, name) VALUES (1, 'ans-104');
+
+CREATE TABLE IF NOT EXISTS bundles (
+ id BLOB PRIMARY KEY,
+ format INTEGER NOT NULL,
+ data_item_count INTEGER NOT NULL,
+ first_processed_at INTEGER NOT NULL,
+ last_processed_at INTEGER NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS bundle_data_items (
+ id BLOB,
+ parent_id BLOB NOT NULL,
+ root_transaction_id BLOB NOT NULL,
+ indexed_at INTEGER NOT NULL,
+ PRIMARY KEY (id, parent_id)
+);
+
+CREATE TABLE IF NOT EXISTS wallets (
+ address BLOB PRIMARY KEY,
+ public_modulus BLOB
+);
+
+CREATE TABLE IF NOT EXISTS stable_data_items (
+ -- Identity
+ id BLOB NOT NULL,
+ parent_id BLOB NOT NULL,
+ root_transaction_id BLOB NOT NULL,
+ height INTEGER NOT NULL,
+ block_transaction_index INTEGER NOT NULL,
+ signature BLOB NOT NULL,
+ anchor BLOB NOT NULL,
+
+ -- Ownership
+ owner_address BLOB NOT NULL,
+ target BLOB,
+
+ -- Data
+ data_offset INTEGER NOT NULL,
+ data_size INTEGER NOT NULL,
+ content_type TEXT,
+
+ -- Metadata
+ tag_count INTEGER NOT NULL,
+ indexed_at INTEGER NOT NULL,
+ PRIMARY KEY (id)
+);
+
+CREATE INDEX IF NOT EXISTS stable_data_items_height_block_transaction_index_id_idx ON stable_data_items (height, block_transaction_index, id);
+CREATE INDEX IF NOT EXISTS stable_data_items_target_height_block_transaction_index_id_idx ON stable_data_items (target, height, block_transaction_index, id);
+CREATE INDEX IF NOT EXISTS stable_data_items_owner_address_height_block_transaction_index_id_idx ON stable_data_items (owner_address, height, block_transaction_index, id);
+CREATE INDEX IF NOT EXISTS stable_data_items_parent_id_height_block_transaction_index_id_idx ON stable_data_items (parent_id, height, block_transaction_index, id);
+
+CREATE TABLE IF NOT EXISTS tag_names (
+ hash BLOB PRIMARY KEY,
+ name BLOB NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS tag_values (
+ hash BLOB PRIMARY KEY,
+ value BLOB NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS stable_data_item_tags (
+ tag_name_hash BLOB NOT NULL,
+ tag_value_hash BLOB NOT NULL,
+ height INTEGER NOT NULL,
+ block_transaction_index INTEGER NOT NULL,
+ data_item_tag_index INTEGER NOT NULL,
+ data_item_id BLOB NOT NULL,
+ parent_id BLOB NOT NULL,
+ root_transaction_id BLOB NOT NULL,
+ PRIMARY KEY (tag_name_hash, tag_value_hash, height, block_transaction_index, data_item_id, data_item_tag_index)
+);
+
+CREATE INDEX IF NOT EXISTS stable_data_item_tags_transaction_id_idx ON stable_data_item_tags (data_item_id);
+
+CREATE TABLE IF NOT EXISTS new_data_items (
+ -- Identity
+ id BLOB NOT NULL,
+ parent_id BLOB NOT NULL,
+ root_transaction_id BLOB NOT NULL,
+ height INTEGER,
+ signature BLOB NOT NULL,
+ anchor BLOB NOT NULL,
+
+ -- Ownership
+ owner_address BLOB NOT NULL,
+ target BLOB,
+
+ -- Data
+ data_offset INTEGER NOT NULL,
+ data_size INTEGER NOT NULL,
+ content_type TEXT,
+
+ -- Metadata
+ tag_count INTEGER NOT NULL,
+ indexed_at INTEGER NOT NULL,
+ PRIMARY KEY (id)
+);
+
+CREATE INDEX IF NOT EXISTS new_data_items_parent_id_id_idx ON new_data_items (parent_id, id);
+CREATE INDEX IF NOT EXISTS new_data_items_root_transaction_id_id_idx ON new_data_items (root_transaction_id, id);
+CREATE INDEX IF NOT EXISTS new_data_items_target_id_idx ON new_data_items (target, id);
+CREATE INDEX IF NOT EXISTS new_data_items_owner_address_id_idx ON new_data_items (owner_address, id);
+CREATE INDEX IF NOT EXISTS new_data_items_height_indexed_at_idx ON new_data_items (height, indexed_at);
+
+CREATE TABLE IF NOT EXISTS new_data_item_tags (
+ tag_name_hash BLOB NOT NULL,
+ tag_value_hash BLOB NOT NULL,
+ root_transaction_id BLOB NOT NULL,
+ data_item_id BLOB NOT NULL,
+ data_item_tag_index INTEGER NOT NULL,
+ height INTEGER,
+ indexed_at INTEGER NOT NULL,
+ PRIMARY KEY (tag_name_hash, tag_value_hash, root_transaction_id, data_item_id, data_item_tag_index)
+);
+
+CREATE INDEX IF NOT EXISTS new_data_item_tags_height_indexed_at_idx ON new_data_item_tags (height, indexed_at);
diff --git a/migrations/2023.06.05T17.36.05.bundles.data-item-tags-data-item-id-indexes.sql b/migrations/2023.06.05T17.36.05.bundles.data-item-tags-data-item-id-indexes.sql
new file mode 100644
index 00000000..631ce64a
--- /dev/null
+++ b/migrations/2023.06.05T17.36.05.bundles.data-item-tags-data-item-id-indexes.sql
@@ -0,0 +1,4 @@
+DROP INDEX IF EXISTS stable_data_item_tags_transaction_id_idx;
+CREATE INDEX IF NOT EXISTS stable_data_item_tags_data_item_id_idx ON stable_data_item_tags (data_item_id);
+
+CREATE INDEX IF NOT EXISTS new_data_item_tags_data_item_id_idx ON new_data_item_tags (data_item_id);
diff --git a/migrations/2023.06.08T14.32.38.bundles.add-bundle-data-item-fields.sql b/migrations/2023.06.08T14.32.38.bundles.add-bundle-data-item-fields.sql
new file mode 100644
index 00000000..a7521396
--- /dev/null
+++ b/migrations/2023.06.08T14.32.38.bundles.add-bundle-data-item-fields.sql
@@ -0,0 +1,15 @@
+DROP TABLE IF EXISTS bundle_data_items;
+
+CREATE TABLE IF NOT EXISTS bundle_data_items (
+ id BLOB NOT NULL,
+ parent_id BLOB NOT NULL,
+ parent_index INTEGER NOT NULL,
+ filter_id INTEGER NOT NULL,
+ root_transaction_id BLOB NOT NULL,
+ first_indexed_at INTEGER NOT NULL,
+ last_indexed_at INTEGER NOT NULL,
+ PRIMARY KEY (id, parent_id, parent_index, filter_id)
+);
+
+CREATE INDEX IF NOT EXISTS bundle_data_items_filter_id_idx
+ ON bundle_data_items (filter_id);
diff --git a/migrations/2023.06.13T14.01.27.bundles.add-filters.sql b/migrations/2023.06.13T14.01.27.bundles.add-filters.sql
new file mode 100644
index 00000000..c069e633
--- /dev/null
+++ b/migrations/2023.06.13T14.01.27.bundles.add-filters.sql
@@ -0,0 +1,6 @@
+CREATE TABLE IF NOT EXISTS filters (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ filter TEXT NOT NULL UNIQUE
+);
+
+CREATE INDEX IF NOT EXISTS filters_filter_idx ON filters (filter);
diff --git a/migrations/2023.06.19T14.40.36.bundles.bundle-process-tracking.sql b/migrations/2023.06.19T14.40.36.bundles.bundle-process-tracking.sql
new file mode 100644
index 00000000..df349a37
--- /dev/null
+++ b/migrations/2023.06.19T14.40.36.bundles.bundle-process-tracking.sql
@@ -0,0 +1,22 @@
+DROP TABLE IF EXISTS bundles;
+
+CREATE TABLE IF NOT EXISTS bundles (
+ id BLOB PRIMARY KEY,
+ format_id INTEGER NOT NULL,
+ unbundle_filter_id INTEGER,
+ index_filter_id INTEGER,
+ data_item_count INTEGER,
+ matched_data_item_count INTEGER,
+ first_queued_at INTEGER,
+ last_queued_at INTEGER,
+ first_skipped_at INTEGER,
+ last_skipped_at INTEGER,
+ first_unbundled_at INTEGER,
+ last_unbundled_at INTEGER,
+ first_fully_indexed_at INTEGER,
+ last_fully_indexed_at INTEGER
+);
+
+CREATE INDEX IF NOT EXISTS bundles_format_id_idx ON bundles (format_id);
+
+ALTER TABLE bundle_formats RENAME COLUMN name TO format;
diff --git a/migrations/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql b/migrations/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql
new file mode 100644
index 00000000..2260f793
--- /dev/null
+++ b/migrations/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql
@@ -0,0 +1,2 @@
+ALTER TABLE bundles ADD COLUMN root_transaction_id BLOB;
+UPDATE bundles SET root_transaction_id = id;
diff --git a/migrations/2023.06.28T22.02.57.bundles.add-bundles-indexes.sql b/migrations/2023.06.28T22.02.57.bundles.add-bundles-indexes.sql
new file mode 100644
index 00000000..9cf82af4
--- /dev/null
+++ b/migrations/2023.06.28T22.02.57.bundles.add-bundles-indexes.sql
@@ -0,0 +1,15 @@
+CREATE INDEX IF NOT EXISTS bundles_last_queued_at_idx
+ ON bundles (last_queued_at);
+CREATE INDEX IF NOT EXISTS bundles_last_skipped_at_idx
+ ON bundles (last_skipped_at);
+CREATE INDEX IF NOT EXISTS bundles_last_fully_indexed_at_idx
+ ON bundles (last_fully_indexed_at);
+CREATE INDEX IF NOT EXISTS bundles_matched_data_item_count_idx
+ ON bundles (matched_data_item_count);
+CREATE INDEX IF NOT EXISTS bundles_unbundle_filter_id_idx
+ ON bundles (unbundle_filter_id);
+CREATE INDEX IF NOT EXISTS bundles_index_filter_id_idx
+ ON bundles (index_filter_id);
+
+CREATE INDEX IF NOT EXISTS bundle_data_items_parent_id_filter_id_idx
+ ON bundle_data_items (parent_id, filter_id);
diff --git a/migrations/down/2023.05.15T18.06.50.bundles.init-schema.sql b/migrations/down/2023.05.15T18.06.50.bundles.init-schema.sql
new file mode 100644
index 00000000..934bab1f
--- /dev/null
+++ b/migrations/down/2023.05.15T18.06.50.bundles.init-schema.sql
@@ -0,0 +1,9 @@
+DROP TABLE IF EXISTS new_data_item_tags;
+DROP TABLE IF EXISTS new_data_items;
+DROP TABLE IF EXISTS stable_data_item_tags;
+DROP TABLE IF EXISTS tag_values;
+DROP TABLE IF EXISTS tag_names;
+DROP TABLE IF EXISTS stable_data_items;
+DROP TABLE IF EXISTS wallets;
+DROP TABLE IF EXISTS bundles;
+DROP TABLE IF EXISTS bundle_formats;
diff --git a/migrations/down/2023.06.05T17.36.05.bundles.data-item-tags-data-item-id-indexes.sql b/migrations/down/2023.06.05T17.36.05.bundles.data-item-tags-data-item-id-indexes.sql
new file mode 100644
index 00000000..47d36891
--- /dev/null
+++ b/migrations/down/2023.06.05T17.36.05.bundles.data-item-tags-data-item-id-indexes.sql
@@ -0,0 +1,4 @@
+DROP INDEX IF EXISTS new_data_item_tags_data_item_id_idx;
+
+DROP INDEX IF EXISTS stable_data_item_tags_data_item_id_idx;
+CREATE INDEX IF NOT EXISTS stable_data_item_tags_transaction_id_idx ON stable_data_item_tags (data_item_id);
diff --git a/migrations/down/2023.06.08T14.32.38.bundles.add-bundle-data-item-fields.sql b/migrations/down/2023.06.08T14.32.38.bundles.add-bundle-data-item-fields.sql
new file mode 100644
index 00000000..25a919f3
--- /dev/null
+++ b/migrations/down/2023.06.08T14.32.38.bundles.add-bundle-data-item-fields.sql
@@ -0,0 +1,9 @@
+DROP TABLE IF EXISTS bundle_data_items;
+
+CREATE TABLE IF NOT EXISTS bundle_data_items (
+ id BLOB,
+ parent_id BLOB NOT NULL,
+ root_transaction_id BLOB NOT NULL,
+ indexed_at INTEGER NOT NULL,
+ PRIMARY KEY (id, parent_id)
+);
diff --git a/migrations/down/2023.06.13T14.01.27.bundles.add-filters.sql b/migrations/down/2023.06.13T14.01.27.bundles.add-filters.sql
new file mode 100644
index 00000000..b108790d
--- /dev/null
+++ b/migrations/down/2023.06.13T14.01.27.bundles.add-filters.sql
@@ -0,0 +1 @@
+DROP TABLE IF EXISTS filters;
diff --git a/migrations/down/2023.06.19T14.40.36.bundles.bundle-process-tracking.sql b/migrations/down/2023.06.19T14.40.36.bundles.bundle-process-tracking.sql
new file mode 100644
index 00000000..4a267ba0
--- /dev/null
+++ b/migrations/down/2023.06.19T14.40.36.bundles.bundle-process-tracking.sql
@@ -0,0 +1,11 @@
+ALTER TABLE bundle_formats RENAME COLUMN format TO name;
+
+DROP TABLE IF EXISTS bundles;
+
+CREATE TABLE IF NOT EXISTS bundles (
+ id BLOB PRIMARY KEY,
+ format INTEGER NOT NULL,
+ data_item_count INTEGER NOT NULL,
+ first_processed_at INTEGER NOT NULL,
+ last_processed_at INTEGER NOT NULL
+);
diff --git a/migrations/down/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql b/migrations/down/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql
new file mode 100644
index 00000000..47e5cd42
--- /dev/null
+++ b/migrations/down/2023.06.28T21.52.29.bundles.add-bundles-root-tx-id.sql
@@ -0,0 +1 @@
+ALTER TABLE bundles DROP COLUMN root_transaction_id;
diff --git a/migrations/down/2023.06.28T22.02.57.bundles.add-bundles-indexes.sql b/migrations/down/2023.06.28T22.02.57.bundles.add-bundles-indexes.sql
new file mode 100644
index 00000000..20a26d11
--- /dev/null
+++ b/migrations/down/2023.06.28T22.02.57.bundles.add-bundles-indexes.sql
@@ -0,0 +1,8 @@
+DROP INDEX IF EXISTS bundle_data_items_parent_id_filter_id_idx;
+
+DROP INDEX IF EXISTS bundles_unbundle_filter_id_idx;
+DROP INDEX IF EXISTS bundles_index_filter_id_idx;
+DROP INDEX IF EXISTS bundles_matched_data_item_count_idx;
+DROP INDEX IF EXISTS bundles_last_fully_indexed_at_idx;
+DROP INDEX IF EXISTS bundles_last_skipped_at_idx;
+DROP INDEX IF EXISTS bundles_last_queued_at_idx;
diff --git a/package.json b/package.json
index c4099731..90ea8478 100644
--- a/package.json
+++ b/package.json
@@ -24,6 +24,7 @@
"fastq": "^1.13.0",
"fs-extra": "^11.1.0",
"graphql": "^16.5.0",
+ "json-canonicalize": "^1.0.6",
"middleware-async": "^1.3.5",
"msgpackr": "^1.6.2",
"node-cache": "^5.1.2",
diff --git a/scripts/schemaspy b/scripts/schemaspy
new file mode 100755
index 00000000..3f48aa58
--- /dev/null
+++ b/scripts/schemaspy
@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+
+set -euo pipefail
+
+schemaspy_version="6.2.2"
+schemaspy_jar="schemaspy-${schemaspy_version}.jar"
+sqlite_jdbc_version="3.42.0.0"
+sqlite_jdbc_jar="sqlite-jdbc-${sqlite_jdbc_version}.jar"
+
+mkdir -p vendor
+
+if [ ! -f vendor/${schemaspy_jar} ]; then
+ curl -L https://github.com/schemaspy/schemaspy/releases/download/v${schemaspy_version}/${schemaspy_jar} -o vendor/${schemaspy_jar}
+fi
+
+if [ ! -f vendor/${sqlite_jdbc_jar} ]; then
+ curl -L https://github.com/xerial/sqlite-jdbc/releases/download/${sqlite_jdbc_version}/${sqlite_jdbc_jar} -o vendor/${sqlite_jdbc_jar}
+fi
+
+java -jar vendor/schemaspy-*.jar \
+ -debug \
+ -t docs/sqlite/bundles.properties \
+ -sso \
+ -s bundles \
+ -cat catalog \
+ -meta docs/sqlite/bundles.meta.xml \
+ -o docs/sqlite/bundles
diff --git a/src/app.ts b/src/app.ts
index bdffaf99..78f9b3ce 100644
--- a/src/app.ts
+++ b/src/app.ts
@@ -39,6 +39,7 @@ import * as system from './system.js';
system.arweaveClient.refreshPeers();
system.blockImporter.start();
system.txRepairWorker.start();
+system.bundleRepairWorker.start();
// HTTP server
const app = express();
diff --git a/src/config.ts b/src/config.ts
index ed4ad265..b93f3fe0 100644
--- a/src/config.ts
+++ b/src/config.ts
@@ -16,6 +16,7 @@
* along with this program. If not, see .
*/
import dotenv from 'dotenv';
+import { canonicalize } from 'json-canonicalize';
import crypto from 'node:crypto';
import { createFilter } from './filters.js';
@@ -52,11 +53,21 @@ export const ADMIN_API_KEY = env.varOrDefault(
if (env.varOrUndefined('ADMIN_API_KEY') === undefined) {
log.info('Using a random admin key since none was set', { ADMIN_API_KEY });
}
-export const ANS104_UNBUNDLE_FILTER = createFilter(
+export const BACKFILL_BUNDLE_RECORDS =
+ env.varOrDefault('BACKFILL_BUNDLE_RECORDS', 'false') === 'true';
+export const FILTER_CHANGE_REPROCESS =
+ env.varOrDefault('FILTER_CHANGE_REPROCESS', 'false') === 'true';
+export const ANS104_UNBUNDLE_FILTER_STRING = canonicalize(
JSON.parse(env.varOrDefault('ANS104_UNBUNDLE_FILTER', '{"never": true}')),
);
-export const ANS104_DATA_INDEX_FILTER = createFilter(
- JSON.parse(env.varOrDefault('ANS104_DATA_INDEX_FILTER', '{"never": true}')),
+export const ANS104_UNBUNDLE_FILTER = createFilter(
+ JSON.parse(ANS104_UNBUNDLE_FILTER_STRING),
+);
+export const ANS104_INDEX_FILTER_STRING = canonicalize(
+ JSON.parse(env.varOrDefault('ANS104_INDEX_FILTER', '{"never": true}')),
+);
+export const ANS104_INDEX_FILTER = createFilter(
+ JSON.parse(ANS104_INDEX_FILTER_STRING),
);
export const ARNS_ROOT_HOST = env.varOrUndefined('ARNS_ROOT_HOST');
export const SANDBOX_PROTOCOL = env.varOrUndefined('SANDBOX_PROTOCOL');
diff --git a/src/data/read-through-data-cache.ts b/src/data/read-through-data-cache.ts
index ae9d9bb3..9dfc5f55 100644
--- a/src/data/read-through-data-cache.ts
+++ b/src/data/read-through-data-cache.ts
@@ -2,6 +2,7 @@ import crypto from 'node:crypto';
import { Readable, pipeline } from 'node:stream';
import winston from 'winston';
+import { currentUnixTimestamp } from '../lib/time.js';
import {
ContiguousData,
ContiguousDataAttributes,
@@ -60,15 +61,14 @@ export class ReadThroughDataCache implements ContiguousDataSource {
});
} else {
this.log.info('Found data in cache', { id, hash, ...region });
- // Note: it's impossible for both sizes to be undefined, but TS
- // doesn't know that
- const size = dataSize ?? region?.size;
- if (size === undefined) {
+ // It should be impossible for dataSize to be undefined if hash is
+ // set, but TypeScript doesn't know that.
+ if (dataSize === undefined) {
throw new Error('Missing data size');
}
return {
stream: cacheStream,
- size,
+ size: dataSize,
};
}
} catch (error: any) {
@@ -84,10 +84,17 @@ export class ReadThroughDataCache implements ContiguousDataSource {
const parentData = await this.contiguousDataIndex.getDataParent(id);
if (parentData?.parentHash !== undefined) {
this.log.info('Found parent data ID', { id, ...parentData });
- return this.getCacheData(id, parentData.parentHash, dataSize, {
- offset: (region?.offset ?? 0) + parentData.offset,
- size: parentData.size,
- });
+ // We might have a parent but no data size when retreiving by ID
+ const size = dataSize ?? parentData.size;
+ return this.getCacheData(
+ parentData.parentId,
+ parentData.parentHash,
+ size,
+ {
+ offset: (region?.offset ?? 0) + parentData.offset,
+ size,
+ },
+ );
}
return undefined;
@@ -140,7 +147,7 @@ export class ReadThroughDataCache implements ContiguousDataSource {
hash,
dataSize: data.size,
contentType: data.sourceContentType,
- cachedAt: +(Date.now() / 1000).toFixed(0),
+ cachedAt: currentUnixTimestamp(),
});
try {
@@ -158,6 +165,7 @@ export class ReadThroughDataCache implements ContiguousDataSource {
data.stream.on('data', (chunk) => {
hasher.update(chunk);
});
+ data.stream.pause();
return data;
}
diff --git a/src/database/sql/bundles/async-import.sql b/src/database/sql/bundles/async-import.sql
new file mode 100644
index 00000000..f8810380
--- /dev/null
+++ b/src/database/sql/bundles/async-import.sql
@@ -0,0 +1,13 @@
+-- selectTransactionHeight
+SELECT height
+FROM new_transactions
+WHERE id = @transaction_id AND height IS NOT NULL
+UNION
+SELECT height
+FROM stable_transactions
+WHERE id = @transaction_id
+UNION
+SELECT height
+FROM missing_transactions
+WHERE transaction_id = @transaction_id
+LIMIT 1
diff --git a/src/database/sql/bundles/cleanup.sql b/src/database/sql/bundles/cleanup.sql
new file mode 100644
index 00000000..81bb9216
--- /dev/null
+++ b/src/database/sql/bundles/cleanup.sql
@@ -0,0 +1,13 @@
+-- deleteStaleNewDataItems
+DELETE FROM new_data_items
+WHERE height < @height_threshold OR (
+ height IS NULL AND
+ indexed_at < @indexed_at_threshold
+ )
+
+-- deleteStaleNewDataItemTags
+DELETE FROM new_data_item_tags
+WHERE height < @height_threshold OR (
+ height IS NULL AND
+ indexed_at < @indexed_at_threshold
+ )
diff --git a/src/database/sql/bundles/filters.sql b/src/database/sql/bundles/filters.sql
new file mode 100644
index 00000000..71fd1e98
--- /dev/null
+++ b/src/database/sql/bundles/filters.sql
@@ -0,0 +1,5 @@
+-- insertOrIgnoreFilter
+INSERT INTO filters (filter) VALUES (@filter) ON CONFLICT DO NOTHING;
+
+-- selectFilterId
+SELECT id FROM filters WHERE filter = @filter;
diff --git a/src/database/sql/bundles/flush.sql b/src/database/sql/bundles/flush.sql
new file mode 100644
index 00000000..f148015b
--- /dev/null
+++ b/src/database/sql/bundles/flush.sql
@@ -0,0 +1,37 @@
+-- insertOrIgnoreStableDataItems
+INSERT INTO stable_data_items (
+ id, parent_id, root_transaction_id,
+ height, block_transaction_index,
+ signature, anchor, owner_address, target,
+ data_offset, data_size, content_type,
+ tag_count, indexed_at
+) SELECT
+ ndi.id, ndi.parent_id, ndi.root_transaction_id,
+ ndi.height, sbt.block_transaction_index,
+ ndi.signature, ndi.anchor, ndi.owner_address, ndi.target,
+ ndi.data_offset, ndi.data_size, ndi.content_type,
+ ndi.tag_count, ndi.indexed_at
+FROM new_data_items ndi
+JOIN core.stable_block_transactions sbt
+ ON ndi.root_transaction_id = sbt.transaction_id
+WHERE ndi.height < @end_height
+ON CONFLICT DO NOTHING
+
+-- insertOrIgnoreStableDataItemTags
+INSERT INTO stable_data_item_tags (
+ tag_name_hash, tag_value_hash,
+ height, block_transaction_index,
+ data_item_tag_index, data_item_id,
+ parent_id, root_transaction_id
+) SELECT
+ ndit.tag_name_hash, ndit.tag_value_hash,
+ ndit.height, sbt.block_transaction_index,
+ ndit.data_item_tag_index, ndit.data_item_id,
+ ndi.parent_id, ndit.root_transaction_id
+FROM new_data_item_tags ndit
+JOIN new_data_items ndi
+ ON ndit.data_item_id = ndi.id
+JOIN core.stable_block_transactions sbt
+ ON ndit.root_transaction_id = sbt.transaction_id
+WHERE ndit.height < @end_height
+ON CONFLICT DO NOTHING
diff --git a/src/database/sql/bundles/formats.sql b/src/database/sql/bundles/formats.sql
new file mode 100644
index 00000000..4596886a
--- /dev/null
+++ b/src/database/sql/bundles/formats.sql
@@ -0,0 +1,2 @@
+-- selectFormatId
+SELECT id FROM bundle_formats WHERE format = @format;
diff --git a/src/database/sql/bundles/gql.sql b/src/database/sql/bundles/gql.sql
new file mode 100644
index 00000000..f70fb58d
--- /dev/null
+++ b/src/database/sql/bundles/gql.sql
@@ -0,0 +1,13 @@
+-- selectNewDataItemTags
+SELECT name, value
+FROM new_data_item_tags
+JOIN tag_names ON tag_name_hash = tag_names.hash
+JOIN tag_values ON tag_value_hash = tag_values.hash
+WHERE data_item_id = @id
+
+-- selectStableDataItemTags
+SELECT name, value
+FROM stable_data_item_tags
+JOIN tag_names ON tag_name_hash = tag_names.hash
+JOIN tag_values ON tag_value_hash = tag_values.hash
+WHERE data_item_id = @id
diff --git a/src/database/sql/bundles/height-reset.sql b/src/database/sql/bundles/height-reset.sql
new file mode 100644
index 00000000..48b9941f
--- /dev/null
+++ b/src/database/sql/bundles/height-reset.sql
@@ -0,0 +1,9 @@
+-- clearHeightsOnNewDataItems
+UPDATE new_data_items
+SET height = NULL
+WHERE height > @height
+
+-- clearHeightsOnNewDataItemTags
+UPDATE new_data_item_tags
+SET height = NULL
+WHERE height > @height
diff --git a/src/database/sql/bundles/import.sql b/src/database/sql/bundles/import.sql
new file mode 100644
index 00000000..29fa4cf5
--- /dev/null
+++ b/src/database/sql/bundles/import.sql
@@ -0,0 +1,89 @@
+-- upsertBundle
+INSERT INTO bundles (
+ id, root_transaction_id, format_id,
+ unbundle_filter_id, index_filter_id,
+ data_item_count, matched_data_item_count,
+ first_queued_at, last_queued_at,
+ first_skipped_at, last_skipped_at,
+ first_unbundled_at, last_unbundled_at,
+ first_fully_indexed_at, last_fully_indexed_at
+) VALUES (
+ @id, @root_transaction_id, @format_id,
+ @unbundle_filter_id, @index_filter_id,
+ @data_item_count, @matched_data_item_count,
+ @queued_at, @queued_at,
+ @skipped_at, @skipped_at,
+ @unbundled_at, @unbundled_at,
+ @fully_indexed_at, @fully_indexed_at
+) ON CONFLICT DO UPDATE SET
+ data_item_count = IFNULL(@data_item_count, data_item_count),
+ matched_data_item_count = IFNULL(@matched_data_item_count, matched_data_item_count),
+ unbundle_filter_id = IFNULL(@unbundle_filter_id, unbundle_filter_id),
+ index_filter_id = IFNULL(@index_filter_id, index_filter_id),
+ first_queued_at = IFNULL(first_queued_at, @queued_at),
+ last_queued_at = IFNULL(@queued_at, last_queued_at),
+ first_skipped_at = IFNULL(first_skipped_at, @skipped_at),
+ last_skipped_at = IFNULL(@skipped_at, last_skipped_at),
+ first_unbundled_at = IFNULL(first_unbundled_at, @unbundled_at),
+ last_unbundled_at = IFNULL(@unbundled_at, last_unbundled_at),
+ first_fully_indexed_at = IFNULL(first_fully_indexed_at, @fully_indexed_at),
+ last_fully_indexed_at = @fully_indexed_at
+
+-- insertOrIgnoreWallet
+INSERT INTO wallets (address, public_modulus)
+VALUES (@address, @public_modulus)
+ON CONFLICT DO NOTHING
+
+-- insertOrIgnoreTagName
+INSERT INTO tag_names (hash, name)
+VALUES (@hash, @name)
+ON CONFLICT DO NOTHING
+
+-- insertOrIgnoreTagValue
+INSERT INTO tag_values (hash, value)
+VALUES (@hash, @value)
+ON CONFLICT DO NOTHING
+
+-- upsertNewDataItemTag
+INSERT INTO new_data_item_tags (
+ tag_name_hash, tag_value_hash,
+ root_transaction_id, data_item_id, data_item_tag_index,
+ height, indexed_at
+) VALUES (
+ @tag_name_hash, @tag_value_hash,
+ @root_transaction_id, @data_item_id, @data_item_tag_index,
+ @height, @indexed_at
+) ON CONFLICT DO UPDATE SET height = IFNULL(@height, height)
+
+-- upsertBundleDataItem
+INSERT INTO bundle_data_items (
+ id,
+ parent_id,
+ parent_index,
+ filter_id,
+ root_transaction_id,
+ first_indexed_at,
+ last_indexed_at
+) VALUES (
+ @id,
+ @parent_id,
+ @parent_index,
+ @filter_id,
+ @root_transaction_id,
+ @indexed_at,
+ @indexed_at
+) ON CONFLICT DO
+UPDATE SET
+ filter_id = IFNULL(@filter_id, filter_id),
+ last_indexed_at = @indexed_at
+
+-- upsertNewDataItem
+INSERT INTO new_data_items (
+ id, parent_id, root_transaction_id, height, signature, anchor,
+ owner_address, target, data_offset, data_size, content_type,
+ tag_count, indexed_at
+) VALUES (
+ @id, @parent_id, @root_transaction_id, @height, @signature, @anchor,
+ @owner_address, @target, @data_offset, @data_size, @content_type,
+ @tag_count, @indexed_at
+) ON CONFLICT DO UPDATE SET height = IFNULL(@height, height)
diff --git a/src/database/sql/bundles/repair.sql b/src/database/sql/bundles/repair.sql
new file mode 100644
index 00000000..dfb385a0
--- /dev/null
+++ b/src/database/sql/bundles/repair.sql
@@ -0,0 +1,131 @@
+-- selectFailedBundleIds
+SELECT DISTINCT id
+FROM (
+ SELECT b.root_transaction_id AS id
+ FROM bundles b
+ WHERE (
+ (b.last_queued_at IS NULL AND b.last_skipped_at IS NULL)
+ OR (
+ b.last_queued_at IS NOT NULL
+ AND (
+ b.last_skipped_at IS NULL
+ OR b.last_skipped_at <= b.last_queued_at
+ )
+ AND b.last_queued_at < @reprocess_cutoff
+ )
+ )
+ AND b.last_fully_indexed_at IS NULL
+ AND (
+ b.matched_data_item_count IS NULL
+ OR b.matched_data_item_count > 0
+ )
+ ORDER BY b.last_queued_at ASC
+ LIMIT @limit
+)
+ORDER BY RANDOM()
+
+-- updateFullyIndexedAt
+UPDATE bundles
+SET
+ first_fully_indexed_at = IFNULL(first_fully_indexed_at, @fully_indexed_at),
+ last_fully_indexed_at = @fully_indexed_at
+WHERE matched_data_item_count IS NOT NULL
+ AND matched_data_item_count > 0
+ AND EXISTS (
+ SELECT 1
+ FROM bundle_data_items bdi
+ WHERE bdi.parent_id = bundles.id
+ AND bdi.filter_id = bundles.unbundle_filter_id
+ GROUP BY bdi.parent_id
+ HAVING COUNT(*) = bundles.matched_data_item_count
+ ) AND last_fully_indexed_at IS NULL
+
+-- updateForFilterChange
+UPDATE bundles
+SET
+ last_queued_at = NULL,
+ last_skipped_at = NULL
+WHERE id IN (
+ SELECT b.id
+ FROM bundles b
+ WHERE (
+ last_skipped_at IS NOT NULL
+ AND unbundle_filter_id != (
+ SELECT id
+ FROM filters
+ WHERE filter = @unbundle_filter
+ )
+ ) OR (
+ last_queued_at IS NOT NULL
+ AND index_filter_id != (
+ SELECT id
+ FROM filters
+ WHERE filter = @index_filter
+ )
+ )
+ LIMIT 10000
+)
+
+--insertMissingBundles
+INSERT INTO bundles (
+ id,
+ root_transaction_id,
+ format_id
+)
+SELECT
+ sttf.transaction_id,
+ sttf.transaction_id,
+ (SELECT id FROM bundle_formats WHERE format = 'ans-104')
+FROM stable_transaction_tags sttf
+JOIN stable_transaction_tags sttv ON sttv.transaction_id = sttf.transaction_id
+ AND sttv.transaction_tag_index != sttf.transaction_tag_index
+LEFT JOIN bundles b ON b.id = sttf.transaction_id
+WHERE sttf.tag_name_hash = x'BF796ECA81CCE3FF36CEA53FA1EBB0F274A0FF29'
+ AND sttf.tag_value_hash = x'7E57CFE843145135AEE1F4D0D63CEB7842093712'
+ AND sttv.tag_name_hash = x'858B76CB055E360A2E4C3C38F4A3049F80175216'
+ AND sttv.tag_value_hash = x'F7CA6A21D278EB5CE64611AADBDB77EF1511D3DD'
+ AND b.id IS NULL
+UNION ALL
+SELECT
+ nttf.transaction_id,
+ nttf.transaction_id,
+ (SELECT id FROM bundle_formats WHERE format = 'ans-104')
+FROM new_transaction_tags nttf
+JOIN new_transaction_tags nttv ON nttv.transaction_id = nttf.transaction_id
+LEFT JOIN bundles b ON b.id = nttf.transaction_id
+WHERE nttf.tag_name_hash = x'BF796ECA81CCE3FF36CEA53FA1EBB0F274A0FF29'
+ AND nttf.tag_value_hash = x'7E57CFE843145135AEE1F4D0D63CEB7842093712'
+ AND nttv.tag_name_hash = x'858B76CB055E360A2E4C3C38F4A3049F80175216'
+ AND nttv.tag_value_hash = x'F7CA6A21D278EB5CE64611AADBDB77EF1511D3DD'
+ AND b.id IS NULL
+UNION ALL
+SELECT
+ sdi.id,
+ sdi.root_transaction_id,
+ (SELECT id FROM bundle_formats WHERE format = 'ans-104')
+FROM stable_data_item_tags sdif
+JOIN stable_data_item_tags sdiv ON sdiv.data_item_id = sdif.data_item_id
+ AND sdiv.data_item_tag_index != sdif.data_item_tag_index
+JOIN stable_data_items sdi ON sdi.id = sdif.data_item_id
+LEFT JOIN bundles b ON b.id = sdif.data_item_id
+WHERE sdif.tag_name_hash = x'BF796ECA81CCE3FF36CEA53FA1EBB0F274A0FF29'
+ AND sdif.tag_value_hash = x'7E57CFE843145135AEE1F4D0D63CEB7842093712'
+ AND sdiv.tag_name_hash = x'858B76CB055E360A2E4C3C38F4A3049F80175216'
+ AND sdiv.tag_value_hash = x'F7CA6A21D278EB5CE64611AADBDB77EF1511D3DD'
+ AND b.id IS NULL
+UNION ALL
+SELECT
+ ndi.id,
+ ndi.root_transaction_id,
+ (SELECT id FROM bundle_formats WHERE format = 'ans-104')
+FROM new_data_item_tags ndif
+JOIN new_data_item_tags ndiv ON ndiv.data_item_id = ndif.data_item_id
+JOIN new_data_items ndi ON ndi.id = ndif.data_item_id
+LEFT JOIN bundles b ON b.id = ndif.data_item_id
+WHERE ndif.tag_name_hash = x'BF796ECA81CCE3FF36CEA53FA1EBB0F274A0FF29'
+ AND ndif.tag_value_hash = x'7E57CFE843145135AEE1F4D0D63CEB7842093712'
+ AND ndiv.tag_name_hash = x'858B76CB055E360A2E4C3C38F4A3049F80175216'
+ AND ndiv.tag_value_hash = x'F7CA6A21D278EB5CE64611AADBDB77EF1511D3DD'
+ AND b.id IS NULL
+LIMIT 10000
+ON CONFLICT DO NOTHING
diff --git a/src/database/sql/core/accessors.sql b/src/database/sql/core/accessors.sql
index dc3267c2..c8e8dcab 100644
--- a/src/database/sql/core/accessors.sql
+++ b/src/database/sql/core/accessors.sql
@@ -23,5 +23,9 @@ LIMIT 1
-- selectMissingTransactionIds
SELECT transaction_id
-FROM missing_transactions
-LIMIT @limit
+FROM (
+ SELECT transaction_id
+ FROM missing_transactions
+ LIMIT @limit
+)
+ORDER BY RANDOM()
diff --git a/src/database/sql/core/import.sql b/src/database/sql/core/import.sql
index 105ceda3..198b02f9 100644
--- a/src/database/sql/core/import.sql
+++ b/src/database/sql/core/import.sql
@@ -1,3 +1,13 @@
+-- updateNewDataItemHeights
+UPDATE bundles.new_data_items
+SET height = @height
+WHERE root_transaction_id = @transaction_id
+
+-- updateNewDataItemTagHeights
+UPDATE bundles.new_data_item_tags
+SET height = @height
+WHERE root_transaction_id = @transaction_id
+
-- insertOrIgnoreWallet
INSERT INTO wallets (address, public_modulus)
VALUES (@address, @public_modulus)
diff --git a/src/database/standalone-sqlite.test.ts b/src/database/standalone-sqlite.test.ts
index 7637d907..2dd3dea6 100644
--- a/src/database/standalone-sqlite.test.ts
+++ b/src/database/standalone-sqlite.test.ts
@@ -16,6 +16,7 @@
* along with this program. If not, see .
*/
import { ValidationError } from 'apollo-server-express';
+import arbundles from 'arbundles/stream/index.js';
import { expect } from 'chai';
import crypto from 'node:crypto';
import fs from 'node:fs';
@@ -24,6 +25,7 @@ import * as promClient from 'prom-client';
import {
StandaloneSqliteDatabase,
StandaloneSqliteDatabaseWorker,
+ dataItemToDbRows,
decodeBlockGqlCursor,
decodeTransactionGqlCursor,
encodeBlockGqlCursor,
@@ -32,16 +34,27 @@ import {
} from '../../src/database/standalone-sqlite.js';
import { fromB64Url, toB64Url } from '../../src/lib/encoding.js';
import {
+ bundlesDbPath,
coreDb,
coreDbPath,
dataDbPath,
moderationDbPath,
} from '../../test/sqlite-helpers.js';
-import { ArweaveChainSourceStub } from '../../test/stubs.js';
+import { ArweaveChainSourceStub, stubAns104Bundle } from '../../test/stubs.js';
+import { normalizeAns104DataItem } from '../lib/ans-104.js';
import log from '../log.js';
+//import { NormalizedDataItem } from '../types.js';
+
+/* eslint-disable */
+// @ts-ignore
+const { default: processStream } = arbundles;
+
const HEIGHT = 1138;
const BLOCK_TX_INDEX = 42;
+const DATA_ITEM_ID = 'zoljIRyzG5hp-R4EZV2q8kFI49OAoy23_B9YJ_yEEws';
+const CURSOR =
+ 'WzExMzgsNDIsInpvbGpJUnl6RzVocC1SNEVaVjJxOGtGSTQ5T0FveTIzX0I5WUpfeUVFd3MiXQ';
describe('SQLite helper functions', () => {
describe('toSqliteParams', () => {
@@ -61,23 +74,26 @@ describe('SQLite GraphQL cursor functions', () => {
encodeTransactionGqlCursor({
height: HEIGHT,
blockTransactionIndex: BLOCK_TX_INDEX,
+ dataItemId: DATA_ITEM_ID,
}),
- ).to.equal('WzExMzgsNDJd');
+ ).to.equal(CURSOR);
});
});
describe('decodeTransactionGqlCursor', () => {
it('should decode a height and blockTransactionIndex given an encoded cursor', () => {
- expect(decodeTransactionGqlCursor('WzExMzgsNDJd')).to.deep.equal({
+ expect(decodeTransactionGqlCursor(CURSOR)).to.deep.equal({
height: HEIGHT,
blockTransactionIndex: BLOCK_TX_INDEX,
+ dataItemId: DATA_ITEM_ID,
});
});
- it('should return an undefined height and blockTransactionIndex given an undefined cursor', () => {
+ it('should return an undefined height, blockTransactionIndex, and dataItemId given an undefined cursor', () => {
expect(decodeTransactionGqlCursor(undefined)).to.deep.equal({
height: undefined,
blockTransactionIndex: undefined,
+ dataItemId: undefined,
});
});
@@ -119,6 +135,30 @@ describe('SQLite GraphQL cursor functions', () => {
});
});
+describe('SQLite data conversion functions', () => {
+ describe('dataItemToDbRows', () => {
+ it('should return DB rows to insert', async () => {
+ const bundleStream = await stubAns104Bundle();
+ const iterable = await processStream(bundleStream);
+ for await (const [_index, dataItem] of iterable.entries()) {
+ const normalizedDataItem = normalizeAns104DataItem({
+ rootTxId: '0000000000000000000000000000000000000000000',
+ parentId: '0000000000000000000000000000000000000000000',
+ parentIndex: -1,
+ index: 0,
+ ans104DataItem: dataItem,
+ });
+ const rows = dataItemToDbRows(normalizedDataItem);
+ expect(rows.tagNames.length).to.be.above(0);
+ expect(rows.tagValues.length).to.be.above(0);
+ expect(rows.newDataItemTags.length).to.be.above(0);
+ expect(rows.wallets.length).to.be.above(0);
+ expect(rows.newDataItem).to.be.an('object');
+ }
+ });
+ });
+});
+
describe('StandaloneSqliteDatabase', () => {
let metricsRegistry: promClient.Registry;
let chainSource: ArweaveChainSourceStub;
@@ -134,11 +174,13 @@ describe('StandaloneSqliteDatabase', () => {
coreDbPath,
dataDbPath,
moderationDbPath,
+ bundlesDbPath,
});
dbWorker = new StandaloneSqliteDatabaseWorker({
coreDbPath,
dataDbPath,
moderationDbPath,
+ bundlesDbPath,
});
chainSource = new ArweaveChainSourceStub();
});
@@ -583,7 +625,7 @@ describe('StandaloneSqliteDatabase', () => {
await chainSource.getBlockAndTxsByHeight(height);
await db.saveBlockAndTxs(block, txs, missingTxIds);
- dbWorker.saveStableDataFn(height + 1);
+ dbWorker.saveCoreStableDataFn(height + 1);
const stats = await db.getDebugInfo();
expect(stats.counts.stableBlocks).to.equal(1);
@@ -666,7 +708,7 @@ describe('StandaloneSqliteDatabase', () => {
expect(stats.counts.newTxs).to.equal(txs.length);
await db.saveBlockAndTxs(block, txs, missingTxIds);
- dbWorker.saveStableDataFn(height + 1);
+ dbWorker.saveCoreStableDataFn(height + 1);
const sql = `
SELECT sb.*, wo.public_modulus AS owner
@@ -773,7 +815,7 @@ describe('StandaloneSqliteDatabase', () => {
expect(stats.counts.newTxs).to.equal(txs.length);
await db.saveBlockAndTxs(block, txs, missingTxIds);
- dbWorker.saveStableDataFn(height + 1);
+ dbWorker.saveCoreStableDataFn(height + 1);
const sql = `
SELECT sb.*, wo.public_modulus AS owner
diff --git a/src/database/standalone-sqlite.ts b/src/database/standalone-sqlite.ts
index a2464366..827c8e66 100644
--- a/src/database/standalone-sqlite.ts
+++ b/src/database/standalone-sqlite.ts
@@ -43,21 +43,31 @@ import {
utf8ToB64Url,
} from '../lib/encoding.js';
import { MANIFEST_CONTENT_TYPE } from '../lib/encoding.js';
+import { currentUnixTimestamp } from '../lib/time.js';
+import log from '../log.js';
import {
BlockListValidator,
+ BundleIndex,
+ BundleRecord,
ChainIndex,
ContiguousDataAttributes,
ContiguousDataIndex,
GqlQueryable,
+ GqlTransaction,
NestedDataIndexWriter,
+ NormalizedDataItem,
PartialJsonBlock,
PartialJsonTransaction,
} from '../types.js';
const CPU_COUNT = os.cpus().length;
+const MAX_WORKER_ERRORS = 100;
+
const STABLE_FLUSH_INTERVAL = 5;
const NEW_TX_CLEANUP_WAIT_SECS = 60 * 60 * 2;
+const NEW_DATA_ITEM_CLEANUP_WAIT_SECS = 60 * 60 * 2;
+const BUNDLE_REPROCESS_WAIT_SECS = 60 * 60 * 4;
const LOW_SELECTIVITY_TAG_NAMES = new Set(['App-Name', 'Content-Type']);
function tagJoinSortPriority(tag: { name: string; values: string[] }) {
@@ -67,24 +77,32 @@ function tagJoinSortPriority(tag: { name: string; values: string[] }) {
export function encodeTransactionGqlCursor({
height,
blockTransactionIndex,
+ dataItemId,
}: {
- height: number;
- blockTransactionIndex: number;
+ height?: number;
+ blockTransactionIndex?: number;
+ dataItemId?: string;
}) {
- return utf8ToB64Url(JSON.stringify([height, blockTransactionIndex]));
+ return utf8ToB64Url(
+ JSON.stringify([height, blockTransactionIndex, dataItemId]),
+ );
}
export function decodeTransactionGqlCursor(cursor: string | undefined) {
try {
if (!cursor) {
- return { height: undefined, blockTransactionIndex: undefined };
+ return {
+ height: undefined,
+ blockTransactionIndex: undefined,
+ dataItemId: undefined,
+ };
}
- const [height, blockTransactionIndex] = JSON.parse(
+ const [height, blockTransactionIndex, dataItemId] = JSON.parse(
b64UrlToUtf8(cursor),
- ) as [number, number];
+ ) as [number, number, string | undefined];
- return { height, blockTransactionIndex };
+ return { height, blockTransactionIndex, dataItemId };
} catch (error) {
throw new ValidationError('Invalid transaction cursor');
}
@@ -117,6 +135,18 @@ export function toSqliteParams(sqlBricksParams: { values: any[] }) {
}, {} as { [key: string]: any });
}
+function hashTagPart(value: Buffer) {
+ return crypto.createHash('sha1').update(value).digest();
+}
+
+function isContentTypeTag(tagName: Buffer) {
+ return tagName.toString('utf8').toLowerCase() === 'content-type';
+}
+
+function ownerToAddress(owner: Buffer) {
+ return crypto.createHash('sha256').update(owner).digest();
+}
+
export function txToDbRows(tx: PartialJsonTransaction, height?: number) {
const tagNames = [] as { name: Buffer; hash: Buffer }[];
const tagValues = [] as { value: Buffer; hash: Buffer }[];
@@ -135,14 +165,14 @@ export function txToDbRows(tx: PartialJsonTransaction, height?: number) {
let transactionTagIndex = 0;
for (const tag of tx.tags) {
const tagName = fromB64Url(tag.name);
- const tagNameHash = crypto.createHash('sha1').update(tagName).digest();
+ const tagNameHash = hashTagPart(tagName);
tagNames.push({ name: tagName, hash: tagNameHash });
const tagValue = fromB64Url(tag.value);
- const tagValueHash = crypto.createHash('sha1').update(tagValue).digest();
+ const tagValueHash = hashTagPart(tagValue);
tagValues.push({ value: tagValue, hash: tagValueHash });
- if (tagName.toString('utf8').toLowerCase() === 'content-type') {
+ if (isContentTypeTag(tagName)) {
contentType = tagValue.toString('utf8');
}
@@ -151,17 +181,14 @@ export function txToDbRows(tx: PartialJsonTransaction, height?: number) {
tag_value_hash: tagValueHash,
transaction_id: txId,
transaction_tag_index: transactionTagIndex,
- created_at: +(Date.now() / 1000).toFixed(0),
+ created_at: currentUnixTimestamp(),
});
transactionTagIndex++;
}
const ownerBuffer = fromB64Url(tx.owner);
- const ownerAddressBuffer = crypto
- .createHash('sha256')
- .update(ownerBuffer)
- .digest();
+ const ownerAddressBuffer = ownerToAddress(ownerBuffer);
wallets.push({ address: ownerAddressBuffer, public_modulus: ownerBuffer });
@@ -183,8 +210,89 @@ export function txToDbRows(tx: PartialJsonTransaction, height?: number) {
data_root: fromB64Url(tx.data_root),
content_type: contentType,
tag_count: tx.tags.length,
- created_at: +(Date.now() / 1000).toFixed(0),
+ created_at: currentUnixTimestamp(),
+ height: height,
+ },
+ };
+}
+
+export function dataItemToDbRows(item: NormalizedDataItem, height?: number) {
+ const tagNames = [] as { name: Buffer; hash: Buffer }[];
+ const tagValues = [] as { value: Buffer; hash: Buffer }[];
+ const newDataItemTags = [] as {
+ tag_name_hash: Buffer;
+ tag_value_hash: Buffer;
+ root_transaction_id: Buffer;
+ data_item_id: Buffer;
+ data_item_tag_index: number;
+ indexed_at: number;
+ }[];
+ const wallets = [] as { address: Buffer; public_modulus: Buffer }[];
+
+ let contentType: string | undefined;
+ const id = fromB64Url(item.id);
+
+ let dataItemTagIndex = 0;
+ for (const tag of item.tags) {
+ const tagName = fromB64Url(tag.name);
+ const tagNameHash = hashTagPart(tagName);
+ tagNames.push({ name: tagName, hash: tagNameHash });
+
+ const tagValue = fromB64Url(tag.value);
+ const tagValueHash = hashTagPart(tagValue);
+ tagValues.push({ value: tagValue, hash: tagValueHash });
+
+ if (isContentTypeTag(tagName)) {
+ contentType = tagValue.toString('utf8');
+ }
+
+ newDataItemTags.push({
+ tag_name_hash: tagNameHash,
+ tag_value_hash: tagValueHash,
+ root_transaction_id: fromB64Url(item.root_tx_id),
+ data_item_id: id,
+ data_item_tag_index: dataItemTagIndex,
+ indexed_at: currentUnixTimestamp(),
+ });
+
+ dataItemTagIndex++;
+ }
+
+ const ownerBuffer = fromB64Url(item.owner);
+ const ownerAddressBuffer = fromB64Url(item.owner_address);
+
+ wallets.push({ address: ownerAddressBuffer, public_modulus: ownerBuffer });
+
+ const parentId = fromB64Url(item.parent_id);
+ const rootTxId = fromB64Url(item.root_tx_id);
+
+ return {
+ tagNames,
+ tagValues,
+ newDataItemTags,
+ wallets,
+ bundleDataItem: {
+ id,
+ parent_id: parentId,
+ parent_index: item.parent_index,
+ root_transaction_id: rootTxId,
+ indexed_at: currentUnixTimestamp(),
+ filter: item.filter,
+ },
+ newDataItem: {
+ id,
+ parent_id: parentId,
+ root_transaction_id: rootTxId,
height: height,
+ signature: fromB64Url(item.signature),
+ anchor: fromB64Url(item.anchor),
+ owner_address: ownerAddressBuffer,
+ target: fromB64Url(item.target),
+ data_offset: item.data_offset,
+ data_size: item.data_size,
+ content_type: contentType,
+ tag_count: item.tags.length,
+ indexed_at: currentUnixTimestamp(),
},
};
}
@@ -216,41 +324,55 @@ export class StandaloneSqliteDatabaseWorker {
core: Sqlite.Database;
data: Sqlite.Database;
moderation: Sqlite.Database;
+ bundles: Sqlite.Database;
};
private stmts: {
core: { [stmtName: string]: Sqlite.Statement };
data: { [stmtName: string]: Sqlite.Statement };
moderation: { [stmtName: string]: Sqlite.Statement };
+ bundles: { [stmtName: string]: Sqlite.Statement };
};
+ private bundleFormatIds: { [filter: string]: number } = {};
+ private filterIds: { [filter: string]: number } = {};
// Transactions
- resetToHeightFn: Sqlite.Transaction;
+ resetBundlesToHeightFn: Sqlite.Transaction;
+ resetCoreToHeightFn: Sqlite.Transaction;
insertTxFn: Sqlite.Transaction;
+ insertDataItemFn: Sqlite.Transaction;
insertBlockAndTxsFn: Sqlite.Transaction;
- saveStableDataFn: Sqlite.Transaction;
- deleteStaleNewDataFn: Sqlite.Transaction;
+ saveCoreStableDataFn: Sqlite.Transaction;
+ saveBundlesStableDataFn: Sqlite.Transaction;
+ deleteCoreStaleNewDataFn: Sqlite.Transaction;
+ deleteBundlesStaleNewDataFn: Sqlite.Transaction;
constructor({
coreDbPath,
dataDbPath,
moderationDbPath,
+ bundlesDbPath,
}: {
coreDbPath: string;
dataDbPath: string;
moderationDbPath: string;
+ bundlesDbPath: string;
}) {
const timeout = 30000;
this.dbs = {
core: new Sqlite(coreDbPath, { timeout }),
data: new Sqlite(dataDbPath, { timeout }),
moderation: new Sqlite(moderationDbPath, { timeout }),
+ bundles: new Sqlite(bundlesDbPath, { timeout }),
};
for (const db of Object.values(this.dbs)) {
db.pragma('journal_mode = WAL');
db.pragma('page_size = 4096'); // may depend on OS and FS
}
- this.stmts = { core: {}, data: {}, moderation: {} };
+ this.dbs.core.exec(`ATTACH DATABASE '${bundlesDbPath}' AS bundles`);
+ this.dbs.bundles.exec(`ATTACH DATABASE '${coreDbPath}' AS core`);
+
+ this.stmts = { core: {}, data: {}, moderation: {}, bundles: {} };
for (const [stmtsKey, stmts] of Object.entries(this.stmts)) {
const sqlUrl = new URL(`./sql/${stmtsKey}`, import.meta.url);
@@ -262,7 +384,8 @@ export class StandaloneSqliteDatabaseWorker {
if (
stmtsKey === 'core' ||
stmtsKey === 'data' ||
- stmtsKey === 'moderation'
+ stmtsKey === 'moderation' ||
+ stmtsKey === 'bundles'
) {
stmts[k] = this.dbs[stmtsKey].prepare(sql);
} else {
@@ -273,7 +396,14 @@ export class StandaloneSqliteDatabaseWorker {
}
// Transactions
- this.resetToHeightFn = this.dbs.core.transaction((height: number) => {
+ this.resetBundlesToHeightFn = this.dbs.bundles.transaction(
+ (height: number) => {
+ this.stmts.bundles.clearHeightsOnNewDataItems.run({ height });
+ this.stmts.bundles.clearHeightsOnNewDataItemTags.run({ height });
+ },
+ );
+
+ this.resetCoreToHeightFn = this.dbs.core.transaction((height: number) => {
this.stmts.core.clearHeightsOnNewTransactions.run({ height });
this.stmts.core.clearHeightsOnNewTransactionTags.run({ height });
this.stmts.core.truncateNewBlocksAt.run({ height });
@@ -283,8 +413,19 @@ export class StandaloneSqliteDatabaseWorker {
this.insertTxFn = this.dbs.core.transaction(
(tx: PartialJsonTransaction, height?: number) => {
- // Insert the transaction
- const rows = txToDbRows(tx);
+ const rows = txToDbRows(tx, height);
+
+ if (height !== undefined) {
+ this.stmts.core.updateNewDataItemHeights.run({
+ height,
+ transaction_id: rows.newTx.id,
+ });
+
+ this.stmts.core.updateNewDataItemTagHeights.run({
+ height,
+ transaction_id: rows.newTx.id,
+ });
+ }
for (const row of rows.tagNames) {
this.stmts.core.insertOrIgnoreTagName.run(row);
@@ -316,6 +457,41 @@ export class StandaloneSqliteDatabaseWorker {
},
);
+ this.insertDataItemFn = this.dbs.bundles.transaction(
+ (item: NormalizedDataItem, height?: number) => {
+ const rows = dataItemToDbRows(item, height);
+
+ for (const row of rows.tagNames) {
+ this.stmts.bundles.insertOrIgnoreTagName.run(row);
+ }
+
+ for (const row of rows.tagValues) {
+ this.stmts.bundles.insertOrIgnoreTagValue.run(row);
+ }
+
+ for (const row of rows.newDataItemTags) {
+ this.stmts.bundles.upsertNewDataItemTag.run({
+ ...row,
+ height,
+ });
+ }
+
+ for (const row of rows.wallets) {
+ this.stmts.bundles.insertOrIgnoreWallet.run(row);
+ }
+
+ this.stmts.bundles.upsertBundleDataItem.run({
+ ...rows.bundleDataItem,
+ filter_id: this.getFilterId(rows.bundleDataItem.filter),
+ });
+
+ this.stmts.bundles.upsertNewDataItem.run({
+ ...rows.newDataItem,
+ height,
+ });
+ },
+ );
+
this.insertBlockAndTxsFn = this.dbs.core.transaction(
(
block: PartialJsonBlock,
@@ -378,6 +554,16 @@ export class StandaloneSqliteDatabaseWorker {
for (const tx of txs) {
const rows = txToDbRows(tx, block.height);
+ this.stmts.core.updateNewDataItemHeights.run({
+ height: block.height,
+ transaction_id: rows.newTx.id,
+ });
+
+ this.stmts.core.updateNewDataItemTagHeights.run({
+ height: block.height,
+ transaction_id: rows.newTx.id,
+ });
+
for (const row of rows.tagNames) {
this.stmts.core.insertOrIgnoreTagName.run(row);
}
@@ -403,6 +589,16 @@ export class StandaloneSqliteDatabaseWorker {
for (const txIdStr of missingTxIds) {
const txId = fromB64Url(txIdStr);
+ this.stmts.core.updateNewDataItemHeights.run({
+ height: block.height,
+ transaction_id: txId,
+ });
+
+ this.stmts.core.updateNewDataItemTagHeights.run({
+ height: block.height,
+ transaction_id: txId,
+ });
+
this.stmts.core.insertOrIgnoreMissingTransaction.run({
block_indep_hash: indepHash,
transaction_id: txId,
@@ -412,25 +608,39 @@ export class StandaloneSqliteDatabaseWorker {
},
);
- this.saveStableDataFn = this.dbs.core.transaction((endHeight: number) => {
- this.stmts.core.insertOrIgnoreStableBlocks.run({
- end_height: endHeight,
- });
+ this.saveCoreStableDataFn = this.dbs.core.transaction(
+ (endHeight: number) => {
+ this.stmts.core.insertOrIgnoreStableBlocks.run({
+ end_height: endHeight,
+ });
- this.stmts.core.insertOrIgnoreStableBlockTransactions.run({
- end_height: endHeight,
- });
+ this.stmts.core.insertOrIgnoreStableBlockTransactions.run({
+ end_height: endHeight,
+ });
- this.stmts.core.insertOrIgnoreStableTransactions.run({
- end_height: endHeight,
- });
+ this.stmts.core.insertOrIgnoreStableTransactions.run({
+ end_height: endHeight,
+ });
- this.stmts.core.insertOrIgnoreStableTransactionTags.run({
- end_height: endHeight,
- });
- });
+ this.stmts.core.insertOrIgnoreStableTransactionTags.run({
+ end_height: endHeight,
+ });
+ },
+ );
+
+ this.saveBundlesStableDataFn = this.dbs.bundles.transaction(
+ (endHeight: number) => {
+ this.stmts.bundles.insertOrIgnoreStableDataItems.run({
+ end_height: endHeight,
+ });
+
+ this.stmts.bundles.insertOrIgnoreStableDataItemTags.run({
+ end_height: endHeight,
+ });
+ },
+ );
- this.deleteStaleNewDataFn = this.dbs.core.transaction(
+ this.deleteCoreStaleNewDataFn = this.dbs.core.transaction(
(heightThreshold: number, createdAtThreshold: number) => {
// Deletes missing_transactions that have been inserted asyncronously
this.stmts.core.deleteStaleMissingTransactions.run({
@@ -456,6 +666,20 @@ export class StandaloneSqliteDatabaseWorker {
});
},
);
+
+ this.deleteBundlesStaleNewDataFn = this.dbs.bundles.transaction(
+ (heightThreshold: number, indexedAtThreshold: number) => {
+ this.stmts.bundles.deleteStaleNewDataItems.run({
+ height_threshold: heightThreshold,
+ indexed_at_threshold: indexedAtThreshold,
+ });
+
+ this.stmts.bundles.deleteStaleNewDataItemTags.run({
+ height_threshold: heightThreshold,
+ indexed_at_threshold: indexedAtThreshold,
+ });
+ },
+ );
}
getMaxHeight() {
@@ -473,15 +697,42 @@ export class StandaloneSqliteDatabaseWorker {
}
getMissingTxIds(limit: number) {
- const missingTxIds = this.stmts.core.selectMissingTransactionIds.all({
+ const rows = this.stmts.core.selectMissingTransactionIds.all({
+ limit,
+ });
+
+ return rows.map((row): string => toB64Url(row.transaction_id));
+ }
+
+ getFailedBundleIds(limit: number) {
+ const rows = this.stmts.bundles.selectFailedBundleIds.all({
limit,
+ reprocess_cutoff: currentUnixTimestamp() - BUNDLE_REPROCESS_WAIT_SECS,
+ });
+
+ return rows.map((row): string => toB64Url(row.id));
+ }
+
+ backfillBundles() {
+ this.stmts.bundles.insertMissingBundles.run();
+ }
+
+ updateBundlesFullyIndexedAt() {
+ this.stmts.bundles.updateFullyIndexedAt.run({
+ fully_indexed_at: currentUnixTimestamp(),
});
+ }
- return missingTxIds.map((row): string => toB64Url(row.transaction_id));
+ updateBundlesForFilterChange(unbundleFilter: string, indexFilter: string) {
+ this.stmts.bundles.updateForFilterChange.run({
+ unbundle_filter: unbundleFilter,
+ index_filter: indexFilter,
+ });
}
resetToHeight(height: number) {
- this.resetToHeightFn(height);
+ this.resetBundlesToHeightFn(height);
+ this.resetCoreToHeightFn(height);
}
saveTx(tx: PartialJsonTransaction) {
@@ -493,6 +744,76 @@ export class StandaloneSqliteDatabaseWorker {
this.stmts.core.deleteNewMissingTransaction.run({ transaction_id: txId });
}
+ getBundleFormatId(format: string | undefined) {
+ let id: number | undefined;
+ if (format != undefined) {
+ id = this.bundleFormatIds[format];
+ if (id == undefined) {
+ id = this.stmts.bundles.selectFormatId.get({ format })?.id;
+ if (id != undefined) {
+ this.bundleFormatIds[format] = id;
+ }
+ }
+ }
+ return id;
+ }
+
+ getFilterId(filter: string | undefined) {
+ let id: number | undefined;
+ if (filter != undefined) {
+ id = this.filterIds[filter];
+ if (id == undefined) {
+ this.stmts.bundles.insertOrIgnoreFilter.run({ filter });
+ id = this.stmts.bundles.selectFilterId.get({ filter })?.id;
+ if (id != undefined) {
+ this.filterIds[filter] = id;
+ }
+ }
+ }
+ return id;
+ }
+
+ saveDataItem(item: NormalizedDataItem) {
+ const rootTxId = fromB64Url(item.root_tx_id);
+ const maybeTxHeight = this.stmts.bundles.selectTransactionHeight.get({
+ transaction_id: rootTxId,
+ })?.height;
+ this.insertDataItemFn(item, maybeTxHeight);
+ }
+
+ saveBundle({
+ id,
+ rootTransactionId,
+ format,
+ unbundleFilter,
+ indexFilter,
+ dataItemCount,
+ matchedDataItemCount,
+ queuedAt,
+ skippedAt,
+ unbundledAt,
+ fullyIndexedAt,
+ }: BundleRecord) {
+ const idBuffer = fromB64Url(id);
+ let rootTxId: Buffer | undefined;
+ if (rootTransactionId != undefined) {
+ rootTxId = fromB64Url(rootTransactionId);
+ }
+ this.stmts.bundles.upsertBundle.run({
+ id: idBuffer,
+ root_transaction_id: rootTxId,
+ format_id: this.getBundleFormatId(format),
+ unbundle_filter_id: this.getFilterId(unbundleFilter),
+ index_filter_id: this.getFilterId(indexFilter),
+ data_item_count: dataItemCount,
+ matched_data_item_count: matchedDataItemCount,
+ queued_at: queuedAt,
+ skipped_at: skippedAt,
+ unbundled_at: unbundledAt,
+ fully_indexed_at: fullyIndexedAt,
+ });
+ }
+
saveBlockAndTxs(
block: PartialJsonBlock,
txs: PartialJsonTransaction[],
@@ -505,12 +826,17 @@ export class StandaloneSqliteDatabaseWorker {
this.stmts.core.selectMaxStableBlockTimestamp.get();
const endHeight = block.height - MAX_FORK_DEPTH;
- this.saveStableDataFn(endHeight);
+ this.saveCoreStableDataFn(endHeight);
+ this.saveBundlesStableDataFn(endHeight);
- this.deleteStaleNewDataFn(
+ this.deleteCoreStaleNewDataFn(
endHeight,
maxStableBlockTimestamp - NEW_TX_CLEANUP_WAIT_SECS,
);
+ this.deleteBundlesStaleNewDataFn(
+ endHeight,
+ maxStableBlockTimestamp - NEW_DATA_ITEM_CLEANUP_WAIT_SECS,
+ );
}
}
@@ -622,19 +948,19 @@ export class StandaloneSqliteDatabaseWorker {
hash: hashBuffer,
data_size: dataSize,
original_source_content_type: contentType,
- indexed_at: +(Date.now() / 1000).toFixed(0),
+ indexed_at: currentUnixTimestamp(),
cached_at: cachedAt,
});
this.stmts.data.insertDataId.run({
id: fromB64Url(id),
contiguous_data_hash: hashBuffer,
- indexed_at: +(Date.now() / 1000).toFixed(0),
+ indexed_at: currentUnixTimestamp(),
});
if (dataRoot !== undefined) {
this.stmts.data.insertDataRoot.run({
data_root: fromB64Url(dataRoot),
contiguous_data_hash: hashBuffer,
- indexed_at: +(Date.now() / 1000).toFixed(0),
+ indexed_at: currentUnixTimestamp(),
});
}
}
@@ -650,6 +976,17 @@ export class StandaloneSqliteDatabaseWorker {
}));
}
+ getGqlNewDataItemTags(id: Buffer) {
+ const tags = this.stmts.bundles.selectNewDataItemTags.all({
+ id: id,
+ });
+
+ return tags.map((tag) => ({
+ name: tag.name.toString('utf8'),
+ value: tag.value.toString('utf8'),
+ }));
+ }
+
getGqlStableTransactionTags(txId: Buffer) {
const tags = this.stmts.core.selectStableTransactionTags.all({
transaction_id: txId,
@@ -661,13 +998,25 @@ export class StandaloneSqliteDatabaseWorker {
}));
}
+ getGqlStableDataItemTags(id: Buffer) {
+ const tags = this.stmts.bundles.selectStableDataItemTags.all({
+ id: id,
+ });
+
+ return tags.map((tag) => ({
+ name: tag.name.toString('utf8'),
+ value: tag.value.toString('utf8'),
+ }));
+ }
+
getGqlNewTransactionsBaseSql() {
return sql
.select(
'nt.height AS height',
'nbt.block_transaction_index AS block_transaction_index',
+ "x'00' AS data_item_id",
'id',
- 'last_tx',
+ 'last_tx AS anchor',
'signature',
'target',
'CAST(reward AS TEXT) AS reward',
@@ -679,6 +1028,7 @@ export class StandaloneSqliteDatabaseWorker {
'nb.indep_hash AS block_indep_hash',
'nb.block_timestamp AS block_timestamp',
'nb.previous_block AS block_previous_block',
+ "'' AS parent_id",
)
.from('new_transactions nt')
.join('new_block_transactions nbt', {
@@ -692,13 +1042,47 @@ export class StandaloneSqliteDatabaseWorker {
});
}
+ getGqlNewDataItemsBaseSql() {
+ return sql
+ .select(
+ 'ndi.height AS height',
+ 'nbt.block_transaction_index AS block_transaction_index',
+ 'id AS data_item_id',
+ 'id',
+ 'anchor',
+ 'signature',
+ 'target',
+ "'' AS reward",
+ "'' AS quantity",
+ 'CAST(data_size AS TEXT) AS data_size',
+ 'content_type',
+ 'owner_address',
+ 'public_modulus',
+ 'nb.indep_hash AS block_indep_hash',
+ 'nb.block_timestamp AS block_timestamp',
+ 'nb.previous_block AS block_previous_block',
+ 'ndi.parent_id',
+ )
+ .from('new_data_items ndi')
+ .join('new_block_transactions nbt', {
+ 'nbt.transaction_id': 'ndi.root_transaction_id',
+ })
+ .join('new_blocks nb', {
+ 'nb.indep_hash': 'nbt.block_indep_hash',
+ })
+ .join('bundles.wallets w', {
+ 'ndi.owner_address': 'w.address',
+ });
+ }
+
getGqlStableTransactionsBaseSql() {
return sql
.select(
'st.height AS height',
'st.block_transaction_index AS block_transaction_index',
+ "x'00' AS data_item_id",
'id',
- 'last_tx',
+ 'last_tx AS anchor',
'signature',
'target',
'CAST(reward AS TEXT) AS reward',
@@ -710,6 +1094,7 @@ export class StandaloneSqliteDatabaseWorker {
'sb.indep_hash AS block_indep_hash',
'sb.block_timestamp AS block_timestamp',
'sb.previous_block AS block_previous_block',
+ "'' AS parent_id",
)
.from('stable_transactions st')
.join('stable_blocks sb', {
@@ -720,6 +1105,36 @@ export class StandaloneSqliteDatabaseWorker {
});
}
+ getGqlStableDataItemsBaseSql() {
+ return sql
+ .select(
+ 'sdi.height AS height',
+ 'sdi.block_transaction_index AS block_transaction_index',
+ 'sdi.id AS data_item_id',
+ 'id',
+ 'anchor',
+ 'signature',
+ 'target',
+ "'' AS reward",
+ "'' AS quantity",
+ 'CAST(data_size AS TEXT) AS data_size',
+ 'content_type',
+ 'owner_address',
+ 'public_modulus',
+ 'sb.indep_hash AS block_indep_hash',
+ 'sb.block_timestamp AS block_timestamp',
+ 'sb.previous_block AS block_previous_block',
+ 'sdi.parent_id',
+ )
+ .from('bundles.stable_data_items sdi')
+ .join('stable_blocks sb', {
+ 'sdi.height': 'sb.height',
+ })
+ .join('bundles.wallets w', {
+ 'sdi.owner_address': 'w.address',
+ });
+ }
+
addGqlTransactionFilters({
query,
source,
@@ -730,10 +1145,11 @@ export class StandaloneSqliteDatabaseWorker {
owners = [],
minHeight = -1,
maxHeight = -1,
+ bundledIn,
tags = [],
}: {
query: sql.SelectStatement;
- source: 'stable' | 'new';
+ source: 'stable_txs' | 'stable_items' | 'new_txs' | 'new_items';
cursor?: string;
sortOrder?: 'HEIGHT_DESC' | 'HEIGHT_ASC';
ids?: string[];
@@ -741,32 +1157,57 @@ export class StandaloneSqliteDatabaseWorker {
owners?: string[];
minHeight?: number;
maxHeight?: number;
+ bundledIn?: string[] | null;
tags: { name: string; values: string[] }[];
}) {
let txTableAlias: string;
let heightTableAlias: string;
let blockTransactionIndexTableAlias: string;
let tagsTable: string;
+ let tagIdColumn: string;
+ let tagIndexColumn: string;
let heightSortTableAlias: string;
let blockTransactionIndexSortTableAlias: string;
let maxDbHeight = Infinity;
- if (source === 'stable') {
+ if (source === 'stable_txs') {
txTableAlias = 'st';
heightTableAlias = 'st';
blockTransactionIndexTableAlias = 'st';
tagsTable = 'stable_transaction_tags';
+ tagIdColumn = 'transaction_id';
+ tagIndexColumn = 'transaction_tag_index';
heightSortTableAlias = 'st';
blockTransactionIndexSortTableAlias = 'st';
maxDbHeight = this.stmts.core.selectMaxStableBlockHeight.get()
.height as number;
- } else {
+ } else if (source === 'stable_items') {
+ txTableAlias = 'sdi';
+ heightTableAlias = 'sdi';
+ blockTransactionIndexTableAlias = 'sdi';
+ tagsTable = 'stable_data_item_tags';
+ tagIdColumn = 'data_item_id';
+ tagIndexColumn = 'data_item_tag_index';
+ heightSortTableAlias = 'sdi';
+ blockTransactionIndexSortTableAlias = 'sdi';
+ maxDbHeight = this.stmts.core.selectMaxStableBlockHeight.get()
+ .height as number;
+ } else if (source === 'new_txs') {
txTableAlias = 'nt';
heightTableAlias = 'nt';
blockTransactionIndexTableAlias = 'nbt';
tagsTable = 'new_transaction_tags';
+ tagIdColumn = 'transaction_id';
heightSortTableAlias = 'nt';
blockTransactionIndexSortTableAlias = 'nbt';
+ } else {
+ txTableAlias = 'ndi';
+ heightTableAlias = 'ndi';
+ blockTransactionIndexTableAlias = 'nbt';
+ tagsTable = 'new_data_item_tags';
+ tagIdColumn = 'data_item_id';
+ heightSortTableAlias = 'ndi';
+ blockTransactionIndexSortTableAlias = 'nbt';
}
if (ids.length > 0) {
@@ -784,12 +1225,12 @@ export class StandaloneSqliteDatabaseWorker {
}
if (tags) {
- // To improve performance, force tags with large result to be last
+ // To improve performance, force tags with large result sets to be last
const sortByTagJoinPriority = R.sortBy(tagJoinSortPriority);
sortByTagJoinPriority(tags).forEach((tag, index) => {
const tagAlias = `"${index}_${index}"`;
let joinCond: { [key: string]: string };
- if (source === 'stable') {
+ if (source === 'stable_txs' || source === 'stable_items') {
if (index === 0) {
heightSortTableAlias = tagAlias;
blockTransactionIndexSortTableAlias = tagAlias;
@@ -797,10 +1238,13 @@ export class StandaloneSqliteDatabaseWorker {
[`${blockTransactionIndexTableAlias}.block_transaction_index`]: `${tagAlias}.block_transaction_index`,
[`${heightTableAlias}.height`]: `${tagAlias}.height`,
};
+ if (source === 'stable_items') {
+ joinCond[`${txTableAlias}.id`] = `${tagAlias}.${tagIdColumn}`;
+ }
} else {
const previousTagAlias = `"${index - 1}_${index - 1}"`;
joinCond = {
- [`${previousTagAlias}.transaction_id`]: `${tagAlias}.transaction_id`,
+ [`${previousTagAlias}.${tagIdColumn}`]: `${tagAlias}.${tagIdColumn}`,
};
// This condition forces the use of the transaction_id index rather
// than the name and value index. The transaction_id index is
@@ -808,14 +1252,14 @@ export class StandaloneSqliteDatabaseWorker {
// first in the GraphQL query.
query.where(
sql.notEq(
- `${previousTagAlias}.transaction_tag_index`,
- sql(`${tagAlias}.transaction_tag_index`),
+ `${previousTagAlias}.${tagIndexColumn}`,
+ sql(`${tagAlias}.${tagIndexColumn}`),
),
);
}
} else {
joinCond = {
- [`${txTableAlias}.id`]: `${tagAlias}.transaction_id`,
+ [`${txTableAlias}.id`]: `${tagAlias}.${tagIdColumn}`,
};
}
@@ -849,35 +1293,89 @@ export class StandaloneSqliteDatabaseWorker {
query.where(sql.lte(`${heightTableAlias}.height`, maxHeight));
}
+ if (
+ Array.isArray(bundledIn) &&
+ (source === 'stable_items' || source === 'new_items')
+ ) {
+ query.where(
+ sql.in(`${txTableAlias}.parent_id`, bundledIn.map(fromB64Url)),
+ );
+ }
+
const {
height: cursorHeight,
blockTransactionIndex: cursorBlockTransactionIndex,
+ dataItemId: cursorDataItemId,
} = decodeTransactionGqlCursor(cursor);
if (sortOrder === 'HEIGHT_DESC') {
- if (cursorHeight) {
+ if (
+ cursorHeight != undefined &&
+ cursorBlockTransactionIndex != undefined
+ ) {
+ let dataItemIdField = source === 'stable_items' ? 'sdi.id' : "x'00'";
query.where(
- sql.lt(
- `${heightSortTableAlias}.height * 1000 + ${blockTransactionIndexSortTableAlias}.block_transaction_index`,
- cursorHeight * 1000 + cursorBlockTransactionIndex ?? 0,
+ sql.lte(`${heightSortTableAlias}.height`, cursorHeight),
+ sql.or(
+ sql.lt(`${heightSortTableAlias}.height`, cursorHeight),
+ sql.and(
+ sql.eq(`${heightSortTableAlias}.height`, cursorHeight),
+ sql.lt(
+ `${blockTransactionIndexSortTableAlias}.block_transaction_index`,
+ cursorBlockTransactionIndex,
+ ),
+ ),
+ sql.and(
+ sql.eq(`${heightSortTableAlias}.height`, cursorHeight),
+ sql.eq(
+ `${blockTransactionIndexSortTableAlias}.block_transaction_index`,
+ cursorBlockTransactionIndex,
+ ),
+ sql.lt(
+ dataItemIdField,
+ cursorDataItemId
+ ? fromB64Url(cursorDataItemId)
+ : Buffer.from([0]),
+ ),
+ ),
),
);
}
- query.orderBy(
- `${heightSortTableAlias}.height DESC, ${blockTransactionIndexSortTableAlias}.block_transaction_index DESC`,
- );
+ query.orderBy('1 DESC, 2 DESC, 3 DESC');
} else {
- if (cursorHeight) {
+ if (
+ cursorHeight != undefined &&
+ cursorBlockTransactionIndex != undefined
+ ) {
+ let dataItemIdField = source === 'stable_items' ? 'sdi.id' : "x'00'";
query.where(
- sql.gt(
- `${heightSortTableAlias}.height * 1000 + ${blockTransactionIndexSortTableAlias}.block_transaction_index`,
- cursorHeight * 1000 + cursorBlockTransactionIndex ?? 0,
+ sql.gte(`${heightSortTableAlias}.height`, cursorHeight),
+ sql.or(
+ sql.gt(`${heightSortTableAlias}.height`, cursorHeight),
+ sql.and(
+ sql.eq(`${heightSortTableAlias}.height`, cursorHeight),
+ sql.gt(
+ `${blockTransactionIndexSortTableAlias}.block_transaction_index`,
+ cursorBlockTransactionIndex,
+ ),
+ ),
+ sql.and(
+ sql.eq(`${heightSortTableAlias}.height`, cursorHeight),
+ sql.eq(
+ `${blockTransactionIndexSortTableAlias}.block_transaction_index`,
+ cursorBlockTransactionIndex,
+ ),
+ sql.gt(
+ dataItemIdField,
+ cursorDataItemId
+ ? fromB64Url(cursorDataItemId)
+ : Buffer.from([0]),
+ ),
+ ),
),
);
}
- query.orderBy(
- `${heightSortTableAlias}.height ASC, ${blockTransactionIndexSortTableAlias}.block_transaction_index ASC`,
- );
+ query.orderBy('1 ASC, 2 ASC, 3 ASC');
}
}
@@ -890,6 +1388,7 @@ export class StandaloneSqliteDatabaseWorker {
owners = [],
minHeight = -1,
maxHeight = -1,
+ bundledIn,
tags = [],
}: {
pageSize: number;
@@ -900,13 +1399,14 @@ export class StandaloneSqliteDatabaseWorker {
owners?: string[];
minHeight?: number;
maxHeight?: number;
+ bundledIn?: string[] | null;
tags?: { name: string; values: string[] }[];
}) {
- const query = this.getGqlNewTransactionsBaseSql();
+ const txsQuery = this.getGqlNewTransactionsBaseSql();
this.addGqlTransactionFilters({
- query,
- source: 'new',
+ query: txsQuery,
+ source: 'new_txs',
cursor,
sortOrder,
ids,
@@ -917,18 +1417,57 @@ export class StandaloneSqliteDatabaseWorker {
tags,
});
- const queryParams = query.toParams();
- const sql = queryParams.text;
- const sqliteParams = toSqliteParams(queryParams);
+ const txsQueryParams = txsQuery.toParams();
+ const txsSql = txsQueryParams.text;
+ const txsFinalSql = `${txsSql} LIMIT ${pageSize + 1}`;
+
+ const itemsQuery = this.getGqlNewDataItemsBaseSql();
+
+ this.addGqlTransactionFilters({
+ query: itemsQuery,
+ source: 'new_items',
+ cursor,
+ sortOrder,
+ ids,
+ recipients,
+ owners,
+ minHeight,
+ maxHeight,
+ bundledIn,
+ tags,
+ });
+
+ const itemsQueryParams = itemsQuery.toParams();
+ const itemsSql = itemsQueryParams.text;
+ const itemsFinalSql = `${itemsSql} LIMIT ${pageSize + 1}`;
+
+ const sqlSortOrder = sortOrder === 'HEIGHT_DESC' ? 'DESC' : 'ASC';
+ const sqlParts = [];
+ if (bundledIn === undefined || bundledIn === null) {
+ sqlParts.push(`SELECT * FROM (${txsFinalSql})`);
+ }
+ if (bundledIn === undefined) {
+ sqlParts.push('UNION');
+ }
+ if (bundledIn === undefined || Array.isArray(bundledIn)) {
+ sqlParts.push(`SELECT * FROM (${itemsFinalSql})`);
+ }
+ sqlParts.push(
+ `ORDER BY 1 ${sqlSortOrder}, 2 ${sqlSortOrder}, 3 ${sqlSortOrder}`,
+ );
+ sqlParts.push(`LIMIT ${pageSize + 1}`);
+ const sql = sqlParts.join(' ');
+ const sqliteParams = toSqliteParams(itemsQueryParams);
return this.dbs.core
- .prepare(`${sql} LIMIT ${pageSize + 1}`)
+ .prepare(sql)
.all(sqliteParams)
.map((tx) => ({
height: tx.height,
blockTransactionIndex: tx.block_transaction_index,
+ dataItemId: tx.data_item_id ? toB64Url(tx.data_item_id) : undefined,
id: toB64Url(tx.id),
- anchor: toB64Url(tx.last_tx),
+ anchor: toB64Url(tx.anchor),
signature: toB64Url(tx.signature),
recipient: tx.target ? toB64Url(tx.target) : undefined,
ownerAddress: toB64Url(tx.owner_address),
@@ -936,11 +1475,15 @@ export class StandaloneSqliteDatabaseWorker {
fee: tx.reward,
quantity: tx.quantity,
dataSize: tx.data_size,
- tags: this.getGqlNewTransactionTags(tx.id),
+ tags:
+ tx.data_item_id.length > 1
+ ? this.getGqlNewDataItemTags(tx.id)
+ : this.getGqlNewTransactionTags(tx.id),
contentType: tx.content_type,
blockIndepHash: toB64Url(tx.block_indep_hash),
blockTimestamp: tx.block_timestamp,
blockPreviousBlock: toB64Url(tx.block_previous_block),
+ parentId: tx.parent_id ? toB64Url(tx.parent_id) : null,
}));
}
@@ -953,6 +1496,7 @@ export class StandaloneSqliteDatabaseWorker {
owners = [],
minHeight = -1,
maxHeight = -1,
+ bundledIn,
tags = [],
}: {
pageSize: number;
@@ -963,13 +1507,14 @@ export class StandaloneSqliteDatabaseWorker {
owners?: string[];
minHeight?: number;
maxHeight?: number;
+ bundledIn?: string[] | null;
tags?: { name: string; values: string[] }[];
}) {
- const query = this.getGqlStableTransactionsBaseSql();
+ const txsQuery = this.getGqlStableTransactionsBaseSql();
this.addGqlTransactionFilters({
- query,
- source: 'stable',
+ query: txsQuery,
+ source: 'stable_txs',
cursor,
sortOrder,
ids,
@@ -977,21 +1522,61 @@ export class StandaloneSqliteDatabaseWorker {
owners,
minHeight,
maxHeight,
+ bundledIn,
tags,
});
- const queryParams = query.toParams();
- const sql = queryParams.text;
- const sqliteParams = toSqliteParams(queryParams);
+ const txsQueryParams = txsQuery.toParams();
+ const txsSql = txsQueryParams.text;
+ const txsFinalSql = `${txsSql} LIMIT ${pageSize + 1}`;
+
+ const itemsQuery = this.getGqlStableDataItemsBaseSql();
+
+ this.addGqlTransactionFilters({
+ query: itemsQuery,
+ source: 'stable_items',
+ cursor,
+ sortOrder,
+ ids,
+ recipients,
+ owners,
+ minHeight,
+ maxHeight,
+ bundledIn,
+ tags,
+ });
+
+ const itemsQueryParams = itemsQuery.toParams();
+ const itemsSql = itemsQueryParams.text;
+ const itemsFinalSql = `${itemsSql} LIMIT ${pageSize + 1}`;
+
+ const sqlSortOrder = sortOrder === 'HEIGHT_DESC' ? 'DESC' : 'ASC';
+ const sqlParts = [];
+ if (bundledIn === undefined || bundledIn === null) {
+ sqlParts.push(`SELECT * FROM (${txsFinalSql})`);
+ }
+ if (bundledIn === undefined) {
+ sqlParts.push('UNION');
+ }
+ if (bundledIn === undefined || Array.isArray(bundledIn)) {
+ sqlParts.push(`SELECT * FROM (${itemsFinalSql})`);
+ }
+ sqlParts.push(
+ `ORDER BY 1 ${sqlSortOrder}, 2 ${sqlSortOrder}, 3 ${sqlSortOrder}`,
+ );
+ sqlParts.push(`LIMIT ${pageSize + 1}`);
+ const sql = sqlParts.join(' ');
+ const sqliteParams = toSqliteParams(itemsQueryParams);
return this.dbs.core
- .prepare(`${sql} LIMIT ${pageSize + 1}`)
+ .prepare(sql)
.all(sqliteParams)
.map((tx) => ({
height: tx.height,
blockTransactionIndex: tx.block_transaction_index,
+ dataItemId: tx.data_item_id ? toB64Url(tx.data_item_id) : undefined,
id: toB64Url(tx.id),
- anchor: toB64Url(tx.last_tx),
+ anchor: toB64Url(tx.anchor),
signature: toB64Url(tx.signature),
recipient: tx.target ? toB64Url(tx.target) : undefined,
ownerAddress: toB64Url(tx.owner_address),
@@ -999,11 +1584,15 @@ export class StandaloneSqliteDatabaseWorker {
fee: tx.reward,
quantity: tx.quantity,
dataSize: tx.data_size,
- tags: this.getGqlStableTransactionTags(tx.id),
+ tags:
+ tx.data_item_id.length > 1
+ ? this.getGqlStableDataItemTags(tx.id)
+ : this.getGqlStableTransactionTags(tx.id),
contentType: tx.content_type,
blockIndepHash: toB64Url(tx.block_indep_hash),
blockTimestamp: tx.block_timestamp,
blockPreviousBlock: toB64Url(tx.block_previous_block),
+ parentId: tx.parent_id ? toB64Url(tx.parent_id) : null,
}));
}
@@ -1016,6 +1605,7 @@ export class StandaloneSqliteDatabaseWorker {
owners = [],
minHeight = -1,
maxHeight = -1,
+ bundledIn,
tags = [],
}: {
pageSize: number;
@@ -1026,9 +1616,10 @@ export class StandaloneSqliteDatabaseWorker {
owners?: string[];
minHeight?: number;
maxHeight?: number;
+ bundledIn?: string[] | null;
tags?: { name: string; values: string[] }[];
}) {
- let txs;
+ let txs: GqlTransaction[] = [];
if (sortOrder === 'HEIGHT_DESC') {
txs = this.getGqlNewTransactions({
@@ -1040,10 +1631,12 @@ export class StandaloneSqliteDatabaseWorker {
owners,
minHeight,
maxHeight,
+ bundledIn,
tags,
});
if (txs.length < pageSize) {
+ const lastTxHeight = txs[txs.length - 1]?.height;
txs = txs.concat(
this.getGqlStableTransactions({
pageSize,
@@ -1054,7 +1647,8 @@ export class StandaloneSqliteDatabaseWorker {
owners,
minHeight,
maxHeight:
- txs.length > 0 ? txs[txs.length - 1].height - 1 : maxHeight,
+ txs.length > 0 && lastTxHeight ? lastTxHeight - 1 : maxHeight,
+ bundledIn,
tags,
}),
);
@@ -1069,10 +1663,12 @@ export class StandaloneSqliteDatabaseWorker {
owners,
minHeight,
maxHeight,
+ bundledIn,
tags,
});
if (txs.length < pageSize) {
+ const lastTxHeight = txs[txs.length - 1]?.height;
txs = txs.concat(
this.getGqlNewTransactions({
pageSize,
@@ -1082,8 +1678,9 @@ export class StandaloneSqliteDatabaseWorker {
recipients,
owners,
minHeight:
- txs.length > 0 ? txs[txs.length - 1].height + 1 : minHeight,
+ txs.length > 0 && lastTxHeight ? lastTxHeight : minHeight,
maxHeight,
+ bundledIn,
tags,
}),
);
@@ -1363,7 +1960,7 @@ export class StandaloneSqliteDatabaseWorker {
});
return row?.is_blocked === 1;
}
- return false
+ return false;
}
isHashBlocked(hash: string | undefined): boolean {
@@ -1391,7 +1988,7 @@ export class StandaloneSqliteDatabaseWorker {
if (source !== undefined) {
this.stmts.moderation.insertSource.run({
name: source,
- created_at: +(Date.now() / 1000).toFixed(0),
+ created_at: currentUnixTimestamp(),
});
sourceId = this.stmts.moderation.getSourceByName.get({
name: source,
@@ -1402,14 +1999,14 @@ export class StandaloneSqliteDatabaseWorker {
id: fromB64Url(id),
block_source_id: sourceId,
notes,
- blocked_at: +(Date.now() / 1000).toFixed(0),
+ blocked_at: currentUnixTimestamp(),
});
} else if (hash !== undefined) {
this.stmts.moderation.insertBlockedHash.run({
hash: fromB64Url(hash),
block_source_id: sourceId,
notes,
- blocked_at: +(Date.now() / 1000).toFixed(0),
+ blocked_at: currentUnixTimestamp(),
});
}
}
@@ -1430,18 +2027,25 @@ export class StandaloneSqliteDatabaseWorker {
parent_id: fromB64Url(parentId),
data_offset: dataOffset,
data_size: dataSize,
- created_at: +(Date.now() / 1000).toFixed(0),
+ created_at: currentUnixTimestamp(),
});
}
}
-type WorkerPoolName = 'core' | 'data' | 'gql' | 'debug' | 'moderation';
+type WorkerPoolName =
+ | 'core'
+ | 'data'
+ | 'gql'
+ | 'debug'
+ | 'moderation'
+ | 'bundles';
const WORKER_POOL_NAMES: Array = [
'core',
'data',
'gql',
'debug',
'moderation',
+ 'bundles',
];
type WorkerRoleName = 'read' | 'write';
@@ -1456,10 +2060,12 @@ const WORKER_POOL_SIZES: WorkerPoolSizes = {
gql: { read: CPU_COUNT, write: 0 },
debug: { read: 1, write: 0 },
moderation: { read: 1, write: 1 },
+ bundles: { read: 1, write: 1 },
};
export class StandaloneSqliteDatabase
implements
+ BundleIndex,
BlockListValidator,
ChainIndex,
ContiguousDataIndex,
@@ -1474,12 +2080,14 @@ export class StandaloneSqliteDatabase
gql: { read: any[]; write: any[] };
debug: { read: any[]; write: any[] };
moderation: { read: any[]; write: any[] };
+ bundles: { read: any[]; write: any[] };
} = {
core: { read: [], write: [] },
data: { read: [], write: [] },
gql: { read: [], write: [] },
debug: { read: [], write: [] },
moderation: { read: [], write: [] },
+ bundles: { read: [], write: [] },
};
private workQueues: {
core: { read: any[]; write: any[] };
@@ -1487,12 +2095,14 @@ export class StandaloneSqliteDatabase
gql: { read: any[]; write: any[] };
debug: { read: any[]; write: any[] };
moderation: { read: any[]; write: any[] };
+ bundles: { read: any[]; write: any[] };
} = {
core: { read: [], write: [] },
data: { read: [], write: [] },
gql: { read: [], write: [] },
debug: { read: [], write: [] },
moderation: { read: [], write: [] },
+ bundles: { read: [], write: [] },
};
constructor({
log,
@@ -1500,12 +2110,14 @@ export class StandaloneSqliteDatabase
coreDbPath,
dataDbPath,
moderationDbPath,
+ bundlesDbPath,
}: {
log: winston.Logger;
metricsRegistry: promClient.Registry;
coreDbPath: string;
dataDbPath: string;
moderationDbPath: string;
+ bundlesDbPath: string;
}) {
this.log = log.child({ class: 'StandaloneSqliteDatabase' });
@@ -1526,6 +2138,7 @@ export class StandaloneSqliteDatabase
coreDbPath,
dataDbPath,
moderationDbPath,
+ bundlesDbPath,
},
});
@@ -1546,7 +2159,11 @@ export class StandaloneSqliteDatabase
takeWork();
})
.on('message', (result) => {
- job.resolve(result);
+ if (result === '__ERROR__') {
+ job.reject(new Error('Worker error'));
+ } else {
+ job.resolve(result);
+ }
job = null;
takeWork(); // Check if there's more work to do
})
@@ -1654,10 +2271,29 @@ export class StandaloneSqliteDatabase
return this.queueRead('core', 'getBlockHashByHeight', [height]);
}
- getMissingTxIds(limit = 20): Promise {
+ getMissingTxIds(limit: number): Promise {
return this.queueRead('core', 'getMissingTxIds', [limit]);
}
+ getFailedBundleIds(limit: number): Promise {
+ return this.queueRead('bundles', 'getFailedBundleIds', [limit]);
+ }
+
+ backfillBundles() {
+ return this.queueRead('bundles', 'backfillBundles', undefined);
+ }
+
+ updateBundlesFullyIndexedAt(): Promise {
+ return this.queueRead('bundles', 'updateBundlesFullyIndexedAt', undefined);
+ }
+
+ updateBundlesForFilterChange(unbundleFilter: string, indexFilter: string) {
+ return this.queueWrite('bundles', 'updateBundlesForFilterChange', [
+ unbundleFilter,
+ indexFilter,
+ ]);
+ }
+
resetToHeight(height: number): Promise {
return this.queueWrite('core', 'resetToHeight', [height]);
}
@@ -1666,6 +2302,14 @@ export class StandaloneSqliteDatabase
return this.queueWrite('core', 'saveTx', [tx]);
}
+ saveDataItem(item: NormalizedDataItem): Promise {
+ return this.queueWrite('bundles', 'saveDataItem', [item]);
+ }
+
+ saveBundle(bundle: BundleRecord): Promise {
+ return this.queueWrite('bundles', 'saveBundle', [bundle]);
+ }
+
saveBlockAndTxs(
block: PartialJsonBlock,
txs: PartialJsonTransaction[],
@@ -1723,6 +2367,7 @@ export class StandaloneSqliteDatabase
owners = [],
minHeight = -1,
maxHeight = -1,
+ bundledIn,
tags = [],
}: {
pageSize: number;
@@ -1733,6 +2378,7 @@ export class StandaloneSqliteDatabase
owners?: string[];
minHeight?: number;
maxHeight?: number;
+ bundledIn?: string[];
tags?: { name: string; values: string[] }[];
}) {
return this.queueRead('gql', 'getGqlTransactions', [
@@ -1745,6 +2391,7 @@ export class StandaloneSqliteDatabase
owners,
minHeight,
maxHeight,
+ bundledIn,
tags,
},
]);
@@ -1846,85 +2493,122 @@ if (!isMainThread) {
coreDbPath: workerData.coreDbPath,
dataDbPath: workerData.dataDbPath,
moderationDbPath: workerData.moderationDbPath,
+ bundlesDbPath: workerData.bundlesDbPath,
});
+ let errorCount = 0;
+
parentPort?.on('message', ({ method, args }: WorkerMessage) => {
- switch (method) {
- case 'getMaxHeight':
- const maxHeight = worker.getMaxHeight();
- parentPort?.postMessage(maxHeight);
- break;
- case 'getBlockHashByHeight':
- const newBlockHash = worker.getBlockHashByHeight(args[0]);
- parentPort?.postMessage(newBlockHash);
- break;
- case 'getMissingTxIds':
- const missingTxIdsRes = worker.getMissingTxIds(args[0]);
- parentPort?.postMessage(missingTxIdsRes);
- break;
- case 'resetToHeight':
- worker.resetToHeight(args[0]);
- parentPort?.postMessage(undefined);
- break;
- case 'saveTx':
- worker.saveTx(args[0]);
- parentPort?.postMessage(null);
- break;
- case 'saveBlockAndTxs':
- const [block, txs, missingTxIds] = args;
- worker.saveBlockAndTxs(block, txs, missingTxIds);
- parentPort?.postMessage(null);
- break;
- case 'getDataAttributes':
- const dataAttributes = worker.getDataAttributes(args[0]);
- parentPort?.postMessage(dataAttributes);
- break;
- case 'getDataParent':
- const dataParent = worker.getDataParent(args[0]);
- parentPort?.postMessage(dataParent);
- break;
- case 'getDebugInfo':
- const debugInfo = worker.getDebugInfo();
- parentPort?.postMessage(debugInfo);
- break;
- case 'saveDataContentAttributes':
- worker.saveDataContentAttributes(args[0]);
- parentPort?.postMessage(null);
- break;
- case 'getGqlTransactions':
- const gqlTransactions = worker.getGqlTransactions(args[0]);
- parentPort?.postMessage(gqlTransactions);
- break;
- case 'getGqlTransaction':
- const gqlTransaction = worker.getGqlTransaction(args[0]);
- parentPort?.postMessage(gqlTransaction);
- break;
- case 'getGqlBlocks':
- const gqlBlocks = worker.getGqlBlocks(args[0]);
- parentPort?.postMessage(gqlBlocks);
- break;
- case 'getGqlBlock':
- const gqlBlock = worker.getGqlBlock(args[0]);
- parentPort?.postMessage(gqlBlock);
- break;
- case 'isIdBlocked':
- const isIdBlocked = worker.isIdBlocked(args[0]);
- parentPort?.postMessage(isIdBlocked);
- break;
- case 'isHashBlocked':
- const isHashBlocked = worker.isHashBlocked(args[0]);
- parentPort?.postMessage(isHashBlocked);
- break;
- case 'blockData':
- worker.blockData(args[0]);
- parentPort?.postMessage(null);
- break;
- case 'saveNestedDataId':
- worker.saveNestedDataId(args[0]);
- parentPort?.postMessage(null);
- break;
- case 'terminate':
- process.exit(0);
+ try {
+ switch (method) {
+ case 'getMaxHeight':
+ const maxHeight = worker.getMaxHeight();
+ parentPort?.postMessage(maxHeight);
+ break;
+ case 'getBlockHashByHeight':
+ const newBlockHash = worker.getBlockHashByHeight(args[0]);
+ parentPort?.postMessage(newBlockHash);
+ break;
+ case 'getMissingTxIds':
+ parentPort?.postMessage(worker.getMissingTxIds(args[0]));
+ break;
+ case 'getFailedBundleIds':
+ const failedBundleIds = worker.getFailedBundleIds(args[0]);
+ parentPort?.postMessage(failedBundleIds);
+ break;
+ case 'backfillBundles':
+ worker.backfillBundles();
+ parentPort?.postMessage(null);
+ break;
+ case 'updateBundlesFullyIndexedAt':
+ worker.updateBundlesFullyIndexedAt();
+ parentPort?.postMessage(null);
+ break;
+ case 'updateBundlesForFilterChange':
+ const [unbundleFilter, indexFilter] = args;
+ worker.updateBundlesForFilterChange(unbundleFilter, indexFilter);
+ parentPort?.postMessage(null);
+ break;
+ case 'resetToHeight':
+ worker.resetToHeight(args[0]);
+ parentPort?.postMessage(undefined);
+ break;
+ case 'saveTx':
+ worker.saveTx(args[0]);
+ parentPort?.postMessage(null);
+ break;
+ case 'saveDataItem':
+ worker.saveDataItem(args[0]);
+ parentPort?.postMessage(null);
+ break;
+ case 'saveBundle':
+ worker.saveBundle(args[0]);
+ parentPort?.postMessage(null);
+ break;
+ case 'saveBlockAndTxs':
+ const [block, txs, missingTxIds] = args;
+ worker.saveBlockAndTxs(block, txs, missingTxIds);
+ parentPort?.postMessage(null);
+ break;
+ case 'getDataAttributes':
+ const dataAttributes = worker.getDataAttributes(args[0]);
+ parentPort?.postMessage(dataAttributes);
+ break;
+ case 'getDataParent':
+ const dataParent = worker.getDataParent(args[0]);
+ parentPort?.postMessage(dataParent);
+ break;
+ case 'getDebugInfo':
+ const debugInfo = worker.getDebugInfo();
+ parentPort?.postMessage(debugInfo);
+ break;
+ case 'saveDataContentAttributes':
+ worker.saveDataContentAttributes(args[0]);
+ parentPort?.postMessage(null);
+ break;
+ case 'getGqlTransactions':
+ const gqlTransactions = worker.getGqlTransactions(args[0]);
+ parentPort?.postMessage(gqlTransactions);
+ break;
+ case 'getGqlTransaction':
+ const gqlTransaction = worker.getGqlTransaction(args[0]);
+ parentPort?.postMessage(gqlTransaction);
+ break;
+ case 'getGqlBlocks':
+ const gqlBlocks = worker.getGqlBlocks(args[0]);
+ parentPort?.postMessage(gqlBlocks);
+ break;
+ case 'getGqlBlock':
+ const gqlBlock = worker.getGqlBlock(args[0]);
+ parentPort?.postMessage(gqlBlock);
+ break;
+ case 'isIdBlocked':
+ const isIdBlocked = worker.isIdBlocked(args[0]);
+ parentPort?.postMessage(isIdBlocked);
+ break;
+ case 'isHashBlocked':
+ const isHashBlocked = worker.isHashBlocked(args[0]);
+ parentPort?.postMessage(isHashBlocked);
+ break;
+ case 'blockData':
+ worker.blockData(args[0]);
+ parentPort?.postMessage(null);
+ break;
+ case 'saveNestedDataId':
+ worker.saveNestedDataId(args[0]);
+ parentPort?.postMessage(null);
+ break;
+ case 'terminate':
+ process.exit(0);
+ }
+ } catch (error) {
+ if (errorCount > MAX_WORKER_ERRORS) {
+ log.error('Too many errors in StandaloneSqlite worker, exiting.');
+ process.exit(1);
+ }
+ log.error('Error in StandaloneSqlite worker:', error);
+ errorCount++;
+ parentPort?.postMessage('__ERROR__');
}
});
}
diff --git a/src/events.ts b/src/events.ts
index 1c81d06d..df524465 100644
--- a/src/events.ts
+++ b/src/events.ts
@@ -15,12 +15,19 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
+export const ANS104_BUNDLE_INDEXED = 'ans104-bundle-indexed';
+export const ANS104_DATA_ITEM_BUNDLE_MATCHED =
+ 'ans104-data-item-bundle-matched';
+export const ANS104_DATA_ITEM_DATA_INDEXED = 'ans104-data-item-data-indexed';
+export const ANS104_DATA_ITEM_INDEXED = 'ans104-data-item-indexed';
+export const ANS104_DATA_ITEM_MATCHED = 'ans104-data-item-matched';
+export const ANS104_NESTED_BUNDLE_INDEXED = 'ans104-nested-bundle-indexed';
export const ANS104_TX_INDEXED = 'ans104-tx-indexed';
-export const BLOCK_INDEXED = 'block-indexed';
+export const ANS104_UNBUNDLE_COMPLETE = 'ans104-unbundle-complete';
export const BLOCK_FETCHED = 'block-fetched';
+export const BLOCK_INDEXED = 'block-indexed';
export const BLOCK_TX_FETCHED = 'block-tx-fetched';
export const BLOCK_TX_FETCH_FAILED = 'block-tx-fetch-failed';
export const BLOCK_TX_INDEXED = 'block-tx-indexed';
-export const DATA_ITEM_UNBUNDLED = 'data-item-unbundled';
export const TX_FETCHED = 'tx-fetched';
export const TX_INDEXED = 'tx-indexed';
diff --git a/src/filters.test.ts b/src/filters.test.ts
index eeb6f6f7..b94e2ff3 100644
--- a/src/filters.test.ts
+++ b/src/filters.test.ts
@@ -35,6 +35,7 @@ function getTx(id: string) {
const TX_ID = '----LT69qUmuIeC4qb0MZHlxVp7UxLu_14rEkA_9n6w';
const TX = getTx(TX_ID);
+const TX_OWNER_ADDRESS = 'Th825IP80n4i9F3Rc4cBFh767CGqiV4n7S-Oy5lGLjc';
describe('AlwaysMatch', () => {
const alwaysMatch = new AlwaysMatch();
@@ -178,12 +179,25 @@ describe('MatchAttributes', () => {
const matchAttributes = new MatchAttributes(attributes);
- delete TX.owner;
+ const tx = JSON.parse(JSON.stringify(TX));
+ delete tx.owner;
- const result = await matchAttributes.match(TX);
+ const result = await matchAttributes.match(tx);
expect(result).to.be.false;
});
+
+ it('should match owner given an owner address', async () => {
+ const attributes = {
+ owner_address: TX_OWNER_ADDRESS,
+ };
+
+ const matchAttributes = new MatchAttributes(attributes);
+
+ const result = await matchAttributes.match(TX);
+
+ expect(result).to.be.true;
+ });
});
describe('createFilter', () => {
diff --git a/src/filters.ts b/src/filters.ts
index c52738a0..eb655e37 100644
--- a/src/filters.ts
+++ b/src/filters.ts
@@ -1,4 +1,4 @@
-import { b64UrlToUtf8 } from './lib/encoding.js';
+import { b64UrlToUtf8, fromB64Url, sha256B64Url } from './lib/encoding.js';
import { ItemFilter, MatchableItem } from './types.js';
export class AlwaysMatch implements ItemFilter {
@@ -108,6 +108,12 @@ export class MatchAttributes implements ItemFilter {
for (const [name, value] of Object.entries(this.attributes)) {
if (item?.[name as keyof MatchableItem] === value) {
matches.add(name);
+ } else if (name === 'owner_address' && item['owner'] !== undefined) {
+ const ownerBuffer = fromB64Url(item['owner']);
+ const ownerAddress = sha256B64Url(ownerBuffer);
+ if (ownerAddress === value) {
+ matches.add(name);
+ }
}
}
diff --git a/src/lib/ans-104.ts b/src/lib/ans-104.ts
index 14cce044..c7bcb69d 100644
--- a/src/lib/ans-104.ts
+++ b/src/lib/ans-104.ts
@@ -3,10 +3,17 @@ import * as EventEmitter from 'node:events';
import fs from 'node:fs';
import fsPromises from 'node:fs/promises';
import path from 'node:path';
-import { Worker, isMainThread, parentPort } from 'node:worker_threads';
+import {
+ Worker,
+ isMainThread,
+ parentPort,
+ workerData,
+} from 'node:worker_threads';
import { default as wait } from 'wait';
import * as winston from 'winston';
+import * as events from '../events.js';
+import { createFilter } from '../filters.js';
import log from '../log.js';
import { ContiguousDataSource, NormalizedDataItem } from '../types.js';
import { fromB64Url, sha256B64Url, utf8ToB64Url } from './encoding.js';
@@ -15,10 +22,38 @@ import { fromB64Url, sha256B64Url, utf8ToB64Url } from './encoding.js';
// @ts-ignore
const { default: processStream } = arbundles;
-export function normalizeAns104DataItem(
- parentTxId: string,
- ans104DataItem: Record,
-): NormalizedDataItem {
+type ParseEventName =
+ | 'data-item-matched'
+ | 'unbundle-complete'
+ | 'unbundle-error';
+
+const DATA_ITEM_MATCHED: ParseEventName = 'data-item-matched';
+const UNBUNDLE_COMPLETE: ParseEventName = 'unbundle-complete';
+const UNBUNDLE_ERROR: ParseEventName = 'unbundle-error';
+
+interface ParserMessage {
+ eventName: ParseEventName;
+ dataItem?: NormalizedDataItem;
+ dataItemIndexFilterString?: string;
+ itemCount?: number;
+ matchedItemCount?: number;
+}
+
+export function normalizeAns104DataItem({
+ rootTxId,
+ parentId,
+ parentIndex,
+ index,
+ filter,
+ ans104DataItem,
+}: {
+ rootTxId: string;
+ parentId: string;
+ parentIndex: number;
+ index: number;
+ filter: string;
+ ans104DataItem: Record;
+}): NormalizedDataItem {
// TODO stricter type checking (maybe zod)
const tags = (ans104DataItem.tags || []).map(
@@ -29,8 +64,11 @@ export function normalizeAns104DataItem(
);
return {
- parent_id: parentTxId,
id: ans104DataItem.id,
+ index: index,
+ parent_id: parentId,
+ parent_index: parentIndex,
+ root_tx_id: rootTxId,
signature: ans104DataItem.signature,
owner: ans104DataItem.owner,
owner_address: sha256B64Url(fromB64Url(ans104DataItem.owner)),
@@ -39,6 +77,7 @@ export function normalizeAns104DataItem(
tags,
data_offset: ans104DataItem.dataOffset,
data_size: ans104DataItem.dataSize,
+ filter,
} as NormalizedDataItem;
}
@@ -52,29 +91,42 @@ export class Ans104Parser {
log,
eventEmitter,
contiguousDataSource,
+ dataItemIndexFilterString,
}: {
log: winston.Logger;
eventEmitter: EventEmitter;
contiguousDataSource: ContiguousDataSource;
+ dataItemIndexFilterString: string;
}) {
this.log = log.child({ class: 'Ans104Parser' });
this.contiguousDataSource = contiguousDataSource;
const workerUrl = new URL('./ans-104.js', import.meta.url);
- this.worker = new Worker(workerUrl);
+ this.worker = new Worker(workerUrl, {
+ workerData: {
+ dataItemIndexFilterString,
+ },
+ });
this.worker.on(
'message',
- ((message: any) => {
- this.log.info('message', { message });
+ ((message: ParserMessage) => {
switch (message.eventName) {
- case 'data-item-unbundled':
- eventEmitter.emit(message.eventName, message.dataItem);
+ case DATA_ITEM_MATCHED:
+ eventEmitter.emit(
+ events.ANS104_DATA_ITEM_MATCHED,
+ message.dataItem,
+ );
break;
- case 'unbundle-complete':
+ case UNBUNDLE_COMPLETE:
+ const { eventName, ...eventBody } = message;
+ eventEmitter.emit(events.ANS104_UNBUNDLE_COMPLETE, {
+ dataItemIndexFilterString,
+ ...eventBody,
+ });
this.unbundlePromise = undefined;
break;
- case 'unbundle-error':
+ case UNBUNDLE_ERROR:
this.unbundlePromise = undefined;
break;
}
@@ -87,50 +139,67 @@ export class Ans104Parser {
});
}
- async parseBundle({ parentTxId }: { parentTxId: string }): Promise {
- const unbundlePromise: Promise = new Promise(async (resolve, reject) => {
- const log = this.log.child({ parentTxId });
- log.debug('Waiting for previous bundle to finish...');
- while (this.unbundlePromise) {
- await wait(100);
- }
- log.debug('Previous bundle finished.');
- await fsPromises.mkdir(path.join(process.cwd(), 'data/tmp/ans-104'), {
- recursive: true,
- });
- const data = await this.contiguousDataSource.getData(parentTxId);
- const bundlePath = path.join(
- process.cwd(),
- 'data/tmp/ans-104',
- `${parentTxId}`,
- );
- const writeStream = fs.createWriteStream(bundlePath);
- // TODO consider using pipeline
- data.stream.pipe(writeStream);
- writeStream.on('error', (error) => {
- log.error('Error writing ANS-104 bundle stream', error);
- reject(error);
- });
- writeStream.on('finish', async () => {
- log.info('Parsing ANS-104 bundle stream...');
- this.worker.postMessage({ parentTxId, bundlePath });
- resolve();
- });
- });
+ async parseBundle({
+ rootTxId,
+ parentId,
+ parentIndex,
+ }: {
+ rootTxId: string;
+ parentId: string;
+ parentIndex: number;
+ }): Promise {
+ const unbundlePromise: Promise = new Promise(
+ async (resolve, reject) => {
+ const log = this.log.child({ parentId });
+ log.debug('Waiting for previous bundle to finish...');
+ while (this.unbundlePromise) {
+ await wait(100);
+ }
+ log.debug('Previous bundle finished.');
+ await fsPromises.mkdir(path.join(process.cwd(), 'data/tmp/ans-104'), {
+ recursive: true,
+ });
+ const data = await this.contiguousDataSource.getData(parentId);
+ const bundlePath = path.join(
+ process.cwd(),
+ 'data/tmp/ans-104',
+ `${parentId}`,
+ );
+ const writeStream = fs.createWriteStream(bundlePath);
+ // TODO consider using pipeline
+ data.stream.pipe(writeStream);
+ writeStream.on('error', (error) => {
+ log.error('Error writing ANS-104 bundle stream', error);
+ reject(error);
+ });
+ writeStream.on('finish', async () => {
+ log.info('Parsing ANS-104 bundle stream...');
+ this.worker.postMessage({
+ rootTxId,
+ parentId,
+ parentIndex,
+ bundlePath,
+ });
+ resolve();
+ });
+ },
+ );
this.unbundlePromise = unbundlePromise;
return unbundlePromise;
}
}
if (!isMainThread) {
+ const filter = createFilter(JSON.parse(workerData.dataItemIndexFilterString));
parentPort?.on('message', async (message: any) => {
- const { parentTxId, bundlePath } = message;
+ const { rootTxId, parentId, parentIndex, bundlePath } = message;
try {
const stream = fs.createReadStream(bundlePath);
const iterable = await processStream(stream);
const bundleLength = iterable.length;
+ let matchedItemCount = 0;
- const fnLog = log.child({ parentTxId, bundleLength });
+ const fnLog = log.child({ rootTxId, parentId, bundleLength });
fnLog.info('Unbundling ANS-104 bundle stream data items...');
const processedDataItemIds = new Set();
@@ -158,12 +227,29 @@ if (!isMainThread) {
diLog.warn('Skipping data item with missing data offset.');
}
- parentPort?.postMessage({
- eventName: 'data-item-unbundled',
- dataItem: normalizeAns104DataItem(parentTxId, dataItem),
+ const normalizedDataItem = normalizeAns104DataItem({
+ rootTxId: rootTxId as string,
+ parentId: parentId as string,
+ parentIndex: parentIndex as number,
+ index: index as number,
+ filter: workerData.dataItemIndexFilterString,
+ ans104DataItem: dataItem as Record,
});
+
+ if (await filter.match(normalizedDataItem)) {
+ matchedItemCount++;
+ parentPort?.postMessage({
+ eventName: DATA_ITEM_MATCHED,
+ dataItem: normalizedDataItem,
+ });
+ }
}
- parentPort?.postMessage({ eventName: 'unbundle-complete' });
+ parentPort?.postMessage({
+ eventName: UNBUNDLE_COMPLETE,
+ parentId: parentId as string,
+ itemCount: bundleLength,
+ matchedItemCount,
+ });
} catch (error) {
log.error('Error unbundling ANS-104 bundle stream', error);
parentPort?.postMessage({ eventName: 'unbundle-error' });
diff --git a/src/lib/bundles.test.ts b/src/lib/bundles.test.ts
deleted file mode 100644
index 7ff17b7d..00000000
--- a/src/lib/bundles.test.ts
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * AR.IO Gateway
- * Copyright (C) 2022 Permanent Data Solutions, Inc
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-import { expect } from 'chai';
-import { EventEmitter } from 'node:events';
-import stream from 'node:stream';
-import * as sinon from 'sinon';
-
-import * as events from '../../src/events.js';
-import { emitAns104UnbundleEvents } from '../../src/lib/bundles.js';
-import log from '../../src/log.js';
-import { stubAns104Bundle, stubTxID } from '../../test/stubs.js';
-
-describe('importAns102Bundle', () => {
- it('should do something (placedholder test)', () => {
- expect(true).to.equal(true);
- });
-});
-
-describe('importAns104Bundle', () => {
- let ans104Bundle: stream.Readable;
- let eventEmitter: EventEmitter;
-
- beforeEach(async () => {
- eventEmitter = new EventEmitter();
- ans104Bundle = await stubAns104Bundle();
- });
-
- afterEach(() => {
- sinon.restore();
- });
-
- it('should proccess bundles and save data items to the database using default batch size', async () => {
- let emitCount = 0;
- eventEmitter.on(events.DATA_ITEM_UNBUNDLED, () => {
- emitCount++;
- });
- await emitAns104UnbundleEvents({
- log,
- eventEmitter,
- bundleStream: ans104Bundle,
- parentTxId: stubTxID,
- });
- expect(emitCount).to.equal(2);
- });
-});
diff --git a/src/lib/bundles.ts b/src/lib/bundles.ts
deleted file mode 100644
index ec59b2e9..00000000
--- a/src/lib/bundles.ts
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * AR.IO Gateway
- * Copyright (C) 2022 Permanent Data Solutions, Inc
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-import arbundles from 'arbundles/stream/index.js';
-import * as EventEmitter from 'node:events';
-import stream from 'node:stream';
-import * as winston from 'winston';
-
-import { NormalizedDataItem } from '../types.js';
-import { fromB64Url, sha256B64Url, utf8ToB64Url } from './encoding.js';
-
-/* eslint-disable */
-// @ts-ignore
-const { default: processStream } = arbundles;
-
-/* eslint-disable */
-// @ts-ignore
-export async function emitAns102UnbundleEvents({
- log,
- eventEmitter,
- bundleStream,
- parentTxId,
-}: {
- log: winston.Logger;
- eventEmitter: EventEmitter;
- bundleStream: stream.Readable;
- parentTxId: string;
-}): Promise {}
-
-export function normalizeAns104DataItem(
- parentTxId: string,
- ans104DataItem: Record,
-): NormalizedDataItem {
- // TODO stricter type checking (maybe zod)
-
- const tags = (ans104DataItem.tags || []).map(
- (tag: { name: string; value: string }) => ({
- name: utf8ToB64Url(tag.name),
- value: utf8ToB64Url(tag.value),
- }),
- );
-
- return {
- parent_id: parentTxId,
- id: ans104DataItem.id,
- signature: ans104DataItem.signature,
- owner: ans104DataItem.owner,
- owner_address: sha256B64Url(fromB64Url(ans104DataItem.owner)),
- target: ans104DataItem.target,
- anchor: ans104DataItem.anchor,
- tags,
- data_offset: ans104DataItem.dataOffset,
- data_size: ans104DataItem.dataSize,
- } as NormalizedDataItem;
-}
-
-export async function emitAns104UnbundleEvents({
- log,
- eventEmitter,
- bundleStream,
- parentTxId,
-}: {
- log: winston.Logger;
- eventEmitter: EventEmitter;
- bundleStream: stream.Readable;
- parentTxId: string;
-}): Promise {
- const iterable = await processStream(bundleStream);
- const bundleLength = iterable.length;
-
- const fnLog = log.child({ parentTxId, bundleLength });
- fnLog.info('Unbundling ANS-104 bundle stream data items...');
-
- const processedDataItemIds = new Set();
- for await (const [index, dataItem] of iterable.entries()) {
- const diLog = fnLog.child({
- dataItemId: dataItem.id,
- dataItemIndex: index,
- });
- diLog.info('Processing data item...');
-
- if (!dataItem.id) {
- // TODO counter metric data items with missing ids
- diLog.warn('Skipping data item with missing ID.');
- continue;
- }
-
- if (processedDataItemIds.has(dataItem.id)) {
- // TODO counter metric for skipped data items
- diLog.warn('Skipping duplicate data item ID.');
- continue;
- }
-
- if (!dataItem.dataOffset) {
- // TODO counter metric for skipped data items
- diLog.warn('Skipping data item with missing data offset.');
- }
-
- eventEmitter.emit(
- 'data-item-unbundled',
- normalizeAns104DataItem(parentTxId, dataItem),
- );
- }
-}
diff --git a/src/lib/time.ts b/src/lib/time.ts
new file mode 100644
index 00000000..0d349f9c
--- /dev/null
+++ b/src/lib/time.ts
@@ -0,0 +1,21 @@
+/**
+ * AR.IO Gateway
+ * Copyright (C) 2023 Permanent Data Solutions, Inc
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+export function currentUnixTimestamp() {
+ return +(Date.now() / 1000).toFixed(0);
+}
diff --git a/src/routes/graphql/resolvers.ts b/src/routes/graphql/resolvers.ts
index e5157309..d7587d4b 100644
--- a/src/routes/graphql/resolvers.ts
+++ b/src/routes/graphql/resolvers.ts
@@ -64,17 +64,21 @@ export function resolveTxOwner(tx: GqlTransaction) {
};
}
-// TODO implement when L2 data is added
-export function resolveTxParent() {
+export function resolveTxParent(tx: GqlTransaction) {
+ if (tx.parentId === null) {
+ return null;
+ }
return {
- id: '',
+ id: tx.parentId,
};
}
-// TODO implement when L2 data is added
-export function resolveTxBundledIn() {
+export function resolveTxBundledIn(tx: GqlTransaction) {
+ if (tx.parentId === null) {
+ return null;
+ }
return {
- id: '',
+ id: tx.parentId,
};
}
@@ -101,9 +105,13 @@ export const resolvers: IResolvers = {
ids: queryParams.ids,
recipients: queryParams.recipients,
owners: queryParams.owners,
- tags: queryParams.tags || [],
minHeight: queryParams.block?.min,
maxHeight: queryParams.block?.max,
+ bundledIn:
+ queryParams.bundledIn !== undefined
+ ? queryParams.bundledIn
+ : queryParams.parent,
+ tags: queryParams.tags || [],
});
},
block: async (_, queryParams, { db }) => {
diff --git a/src/system.ts b/src/system.ts
index 20528f2e..75429050 100644
--- a/src/system.ts
+++ b/src/system.ts
@@ -30,6 +30,7 @@ import { StandaloneSqliteDatabase } from './database/standalone-sqlite.js';
import * as events from './events.js';
import { MatchTags } from './filters.js';
import { UniformFailureSimulator } from './lib/chaos.js';
+import { currentUnixTimestamp } from './lib/time.js';
import log from './log.js';
import { MemoryCacheArNSResolver } from './resolution/memory-cache-arns-resolver.js';
import { StreamingManifestPathResolver } from './resolution/streaming-manifest-path-resolver.js';
@@ -40,15 +41,20 @@ import { FsDataStore } from './store/fs-data-store.js';
import { FsTransactionStore } from './store/fs-transaction-store.js';
import {
BlockListValidator,
+ BundleIndex,
ChainIndex,
ContiguousDataIndex,
+ DataItemIndexWriter,
MatchableItem,
NestedDataIndexWriter,
+ NormalizedDataItem,
PartialJsonTransaction,
} from './types.js';
import { Ans104DataIndexer } from './workers/ans104-data-indexer.js';
import { Ans104Unbundler } from './workers/ans104-unbundler.js';
import { BlockImporter } from './workers/block-importer.js';
+import { BundleRepairWorker } from './workers/bundle-repair-worker.js';
+import { DataItemIndexer } from './workers/data-item-indexer.js';
import { TransactionFetcher } from './workers/transaction-fetcher.js';
import { TransactionImporter } from './workers/transaction-importer.js';
import { TransactionRepairWorker } from './workers/transaction-repair-worker.js';
@@ -98,12 +104,15 @@ export const db = new StandaloneSqliteDatabase({
coreDbPath: 'data/sqlite/core.db',
dataDbPath: 'data/sqlite/data.db',
moderationDbPath: 'data/sqlite/moderation.db',
+ bundlesDbPath: 'data/sqlite/bundles.db',
});
export const chainIndex: ChainIndex = db;
+export const bundleIndex: BundleIndex = db;
export const contiguousDataIndex: ContiguousDataIndex = db;
export const blockListValidator: BlockListValidator = db;
export const nestedDataIndexWriter: NestedDataIndexWriter = db;
+export const dataItemIndexWriter: DataItemIndexWriter = db;
// Workers
const eventEmitter = new EventEmitter();
@@ -131,9 +140,20 @@ const ans104TxMatcher = new MatchTags([
eventEmitter.on(events.TX_INDEXED, async (tx: MatchableItem) => {
if (await ans104TxMatcher.match(tx)) {
eventEmitter.emit(events.ANS104_TX_INDEXED, tx);
+ eventEmitter.emit(events.ANS104_BUNDLE_INDEXED, tx);
}
});
+eventEmitter.on(
+ events.ANS104_DATA_ITEM_DATA_INDEXED,
+ async (item: MatchableItem) => {
+ if (await ans104TxMatcher.match(item)) {
+ eventEmitter.emit(events.ANS104_NESTED_BUNDLE_INDEXED, item);
+ eventEmitter.emit(events.ANS104_BUNDLE_INDEXED, item);
+ }
+ },
+);
+
const txFetcher = new TransactionFetcher({
log,
chainSource: arweaveClient,
@@ -162,6 +182,16 @@ export const txRepairWorker = new TransactionRepairWorker({
txFetcher,
});
+export const bundleRepairWorker = new BundleRepairWorker({
+ log,
+ bundleIndex,
+ txFetcher,
+ unbundleFilter: config.ANS104_UNBUNDLE_FILTER_STRING,
+ indexFilter: config.ANS104_INDEX_FILTER_STRING,
+ shouldBackfillBundles: config.BACKFILL_BUNDLE_RECORDS,
+ filtersChanged: config.FILTER_CHANGE_REPROCESS,
+});
+
// Configure contigous data source
const chunkDataSource = new ReadThroughChunkDataCache({
log,
@@ -195,27 +225,68 @@ const ans104Unbundler = new Ans104Unbundler({
eventEmitter,
filter: config.ANS104_UNBUNDLE_FILTER,
contiguousDataSource,
+ dataItemIndexFilterString: config.ANS104_INDEX_FILTER_STRING,
});
eventEmitter.on(
- events.ANS104_TX_INDEXED,
- async (tx: PartialJsonTransaction) => {
- if (await config.ANS104_UNBUNDLE_FILTER.match(tx)) {
- ans104Unbundler.queueTx(tx);
+ events.ANS104_BUNDLE_INDEXED,
+ async (item: NormalizedDataItem | PartialJsonTransaction) => {
+ await db.saveBundle({
+ id: item.id,
+ rootTransactionId: 'root_tx_id' in item ? item.root_tx_id : item.id,
+ format: 'ans-104',
+ });
+ if (await config.ANS104_UNBUNDLE_FILTER.match(item)) {
+ await db.saveBundle({
+ id: item.id,
+ format: 'ans-104',
+ unbundleFilter: config.ANS104_UNBUNDLE_FILTER_STRING,
+ indexFilter: config.ANS104_INDEX_FILTER_STRING,
+ queuedAt: currentUnixTimestamp(),
+ });
+ ans104Unbundler.queueItem({
+ index:
+ 'parent_index' in item && item.parent_index !== undefined
+ ? item.parent_index
+ : -1, // parent indexes are not needed for L1
+ ...item,
+ });
+ } else {
+ await db.saveBundle({
+ id: item.id,
+ format: 'ans-104',
+ unbundleFilter: config.ANS104_UNBUNDLE_FILTER_STRING,
+ skippedAt: currentUnixTimestamp(),
+ });
}
},
);
+eventEmitter.on(events.ANS104_UNBUNDLE_COMPLETE, async (bundleEvent: any) => {
+ db.saveBundle({
+ id: bundleEvent.parentId,
+ format: 'ans-104',
+ dataItemCount: bundleEvent.itemCount,
+ matchedDataItemCount: bundleEvent.matchedItemCount,
+ unbundledAt: currentUnixTimestamp(),
+ });
+});
+
+const dataItemIndexer = new DataItemIndexer({
+ log,
+ eventEmitter,
+ indexWriter: dataItemIndexWriter,
+});
+
const ans104DataIndexer = new Ans104DataIndexer({
log,
eventEmitter,
indexWriter: nestedDataIndexWriter,
});
-eventEmitter.on(events.DATA_ITEM_UNBUNDLED, async (dataItem: any) => {
- if (await config.ANS104_DATA_INDEX_FILTER.match(dataItem)) {
- ans104DataIndexer.queueDataItem(dataItem);
- }
+eventEmitter.on(events.ANS104_DATA_ITEM_MATCHED, async (dataItem: any) => {
+ dataItemIndexer.queueDataItem(dataItem);
+ ans104DataIndexer.queueDataItem(dataItem);
});
export const manifestPathResolver = new StreamingManifestPathResolver({
diff --git a/src/types.d.ts b/src/types.d.ts
index 00ebcdb5..1d15de39 100644
--- a/src/types.d.ts
+++ b/src/types.d.ts
@@ -174,7 +174,7 @@ export interface ChainSource {
export interface ChainIndex {
getMaxHeight(): Promise;
getBlockHashByHeight(height: number): Promise;
- getMissingTxIds(limit?: number): Promise;
+ getMissingTxIds(limit: number): Promise;
resetToHeight(height: number): Promise;
saveTx(txs: PartialJsonTransaction): Promise;
saveBlockAndTxs(
@@ -184,6 +184,35 @@ export interface ChainIndex {
): Promise;
}
+export interface BundleRecord {
+ id: string;
+ rootTransactionId?: string;
+ format: 'ans-102' | 'ans-104';
+ unbundleFilter?: string;
+ indexFilter?: string;
+ dataItemCount?: number;
+ matchedDataItemCount?: number;
+ queuedAt?: number;
+ skippedAt?: number;
+ unbundledAt?: number;
+ fullyIndexedAt?: number;
+}
+
+export interface BundleIndex {
+ saveBundle(bundle: BundleRecord): Promise;
+ getFailedBundleIds(limit: number): Promise;
+ updateBundlesFullyIndexedAt(): Promise;
+ updateBundlesForFilterChange(
+ unbundleFilter: string,
+ indexFilter: string,
+ ): Promise;
+ backfillBundles(): Promise;
+}
+
+export interface DataItemIndexWriter {
+ saveDataItem(item: NormalizedDataItem): Promise;
+}
+
export interface NestedDataIndexWriter {
saveNestedDataId({
id,
@@ -199,8 +228,11 @@ export interface NestedDataIndexWriter {
}
export interface NormalizedDataItem {
- parent_id: string;
id: string;
+ index: number;
+ parent_id: string;
+ parent_index: number;
+ root_tx_id: string;
signature: string;
owner: string;
owner_address: string;
@@ -209,6 +241,7 @@ export interface NormalizedDataItem {
tags: B64uTag[];
data_offset?: number;
data_size?: number;
+ filter?: string;
}
interface GqlPageInfo {
@@ -230,6 +263,7 @@ interface GqlTransaction {
blockTimestamp: number | undefined;
height: number | undefined;
blockPreviousBlock: string | undefined;
+ parentId: string | null;
}
interface GqlTransactionEdge {
diff --git a/src/workers/ans104-data-indexer.ts b/src/workers/ans104-data-indexer.ts
index c5855c30..7faa066c 100644
--- a/src/workers/ans104-data-indexer.ts
+++ b/src/workers/ans104-data-indexer.ts
@@ -57,6 +57,7 @@ export class Ans104DataIndexer {
method: 'queueDataItem',
id: item.id,
parentId: item.parent_id,
+ rootTxId: item.root_tx_id,
dataOffset: item?.data_offset,
dataSize: item?.data_size,
});
@@ -70,6 +71,7 @@ export class Ans104DataIndexer {
method: 'indexDataItem',
id: item.id,
parentId: item.parent_id,
+ rootTxId: item.root_tx_id,
dataOffset: item?.data_offset,
dataSize: item?.data_size,
});
@@ -86,7 +88,7 @@ export class Ans104DataIndexer {
dataOffset: item.data_offset,
dataSize: item.data_size,
});
- this.eventEmitter.emit(events.ANS104_TX_INDEXED, item);
+ this.eventEmitter.emit(events.ANS104_DATA_ITEM_DATA_INDEXED, item);
log.debug('Data item data indexed.');
} else {
this.log.warn('Data item data is missing data offset or size.');
diff --git a/src/workers/ans104-unbundler.ts b/src/workers/ans104-unbundler.ts
index c563e3d2..65dd7913 100644
--- a/src/workers/ans104-unbundler.ts
+++ b/src/workers/ans104-unbundler.ts
@@ -24,31 +24,43 @@ import { Ans104Parser } from '../lib/ans-104.js';
import {
ContiguousDataSource,
ItemFilter,
+ NormalizedDataItem,
PartialJsonTransaction,
} from '../types.js';
const DEFAULT_WORKER_COUNT = 1;
+interface IndexProperty {
+ index: number;
+}
+
+type UnbundleableItem = (NormalizedDataItem | PartialJsonTransaction) &
+ IndexProperty;
+
export class Ans104Unbundler {
// Dependencies
private log: winston.Logger;
private filter: ItemFilter;
- private ans104Parser: Ans104Parser;
// Unbundling queue
- private queue: queueAsPromised;
+ private queue: queueAsPromised;
+
+ // Parser
+ private ans104Parser: Ans104Parser;
constructor({
log,
eventEmitter,
filter,
contiguousDataSource,
+ dataItemIndexFilterString,
workerCount = DEFAULT_WORKER_COUNT,
}: {
log: winston.Logger;
eventEmitter: EventEmitter;
filter: ItemFilter;
contiguousDataSource: ContiguousDataSource;
+ dataItemIndexFilterString: string;
workerCount?: number;
}) {
this.log = log.child({ class: 'Ans104Unbundler' });
@@ -57,24 +69,40 @@ export class Ans104Unbundler {
log,
eventEmitter,
contiguousDataSource,
+ dataItemIndexFilterString,
});
this.queue = fastq.promise(this.unbundle.bind(this), workerCount);
}
- async queueTx(tx: PartialJsonTransaction): Promise {
- const log = this.log.child({ method: 'queueTx', txId: tx.id });
+ async queueItem(item: UnbundleableItem): Promise {
+ const log = this.log.child({ method: 'queueItem', id: item.id });
log.debug('Queueing bundle...');
- this.queue.push(tx);
+ this.queue.push(item);
log.debug('Bundle queued.');
}
- async unbundle(tx: PartialJsonTransaction): Promise {
- const log = this.log.child({ method: 'unbundle', txId: tx.id });
+ async unbundle(item: UnbundleableItem): Promise {
+ const log = this.log.child({ method: 'unbundle', id: item.id });
try {
- if (await this.filter.match(tx)) {
+ let rootTxId: string | undefined;
+ if ('root_tx_id' in item) {
+ // Data item with root_tx_id
+ rootTxId = item.root_tx_id;
+ } else if ('last_tx' in item) {
+ // Layer 1 transaction
+ rootTxId = item.id;
+ } else {
+ // Data item without root_tx_id (should be impossible)
+ throw new Error('Missing root_tx_id on data item.');
+ }
+ if (await this.filter.match(item)) {
log.info('Unbundling bundle...');
- await this.ans104Parser.parseBundle({ parentTxId: tx.id });
+ await this.ans104Parser.parseBundle({
+ rootTxId,
+ parentId: item.id,
+ parentIndex: item.index,
+ });
log.info('Bundle unbundled.');
}
} catch (error) {
diff --git a/src/workers/block-importer.test.ts b/src/workers/block-importer.test.ts
index 2f1d9443..1f24cafe 100644
--- a/src/workers/block-importer.test.ts
+++ b/src/workers/block-importer.test.ts
@@ -27,6 +27,7 @@ import { StandaloneSqliteDatabase } from '../../src/database/standalone-sqlite.j
import log from '../../src/log.js';
import { BlockImporter } from '../../src/workers/block-importer.js';
import {
+ bundlesDbPath,
coreDbPath,
dataDbPath,
moderationDbPath,
@@ -74,6 +75,7 @@ describe('BlockImporter', () => {
db = new StandaloneSqliteDatabase({
log,
metricsRegistry,
+ bundlesDbPath,
coreDbPath,
dataDbPath,
moderationDbPath,
diff --git a/src/workers/bundle-repair-worker.ts b/src/workers/bundle-repair-worker.ts
new file mode 100644
index 00000000..a8b4285d
--- /dev/null
+++ b/src/workers/bundle-repair-worker.ts
@@ -0,0 +1,131 @@
+/**
+ * AR.IO Gateway
+ * Copyright (C) 2023 Permanent Data Solutions, Inc
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+import * as winston from 'winston';
+
+import { BundleIndex } from '../types.js';
+import { TransactionFetcher } from './transaction-fetcher.js';
+
+const DEFAULT_RETRY_INTERVAL_MS = 10 * 60 * 1000; // 10 minutes
+const DEFAULT_UPDATE_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
+const DEFAULT_BUNDLE_BACKFILL_INTERVAL_MS = 15 * 60 * 1000; // 15 minutes
+const DEFAULT_FILTER_REPOCESS_INTERVAL_MS = 15 * 60 * 1000; // 15 minutes
+const DEFAULT_BUNDLES_TO_RETRY = 20;
+
+export class BundleRepairWorker {
+ // Dependencies
+ private log: winston.Logger;
+ private bundleIndex: BundleIndex;
+ private txFetcher: TransactionFetcher;
+ private unbundledFilter: string;
+ private indexFilter: string;
+ private shouldBackfillBundles: boolean;
+ private filtersChanged: boolean;
+
+ constructor({
+ log,
+ bundleIndex,
+ txFetcher,
+ unbundleFilter,
+ indexFilter,
+ shouldBackfillBundles,
+ filtersChanged,
+ }: {
+ log: winston.Logger;
+ bundleIndex: BundleIndex;
+ txFetcher: TransactionFetcher;
+ unbundleFilter: string;
+ indexFilter: string;
+ shouldBackfillBundles: boolean;
+ filtersChanged: boolean;
+ }) {
+ this.log = log.child({ class: 'BundleRepairWorker' });
+ this.bundleIndex = bundleIndex;
+ this.txFetcher = txFetcher;
+ this.unbundledFilter = unbundleFilter;
+ this.indexFilter = indexFilter;
+ this.shouldBackfillBundles = shouldBackfillBundles;
+ this.filtersChanged = filtersChanged;
+ }
+
+ async start(): Promise {
+ setInterval(this.retryBundles.bind(this), DEFAULT_RETRY_INTERVAL_MS);
+ setInterval(
+ this.updateBundleTimestamps.bind(this),
+ DEFAULT_UPDATE_INTERVAL_MS,
+ );
+ if (this.shouldBackfillBundles) {
+ setInterval(
+ this.backfillBundles.bind(this),
+ DEFAULT_BUNDLE_BACKFILL_INTERVAL_MS,
+ );
+ }
+ if (this.filtersChanged) {
+ setInterval(
+ this.updateForFilterChange.bind(this),
+ DEFAULT_FILTER_REPOCESS_INTERVAL_MS,
+ );
+ }
+ }
+
+ async retryBundles() {
+ try {
+ const bundleIds = await this.bundleIndex.getFailedBundleIds(
+ DEFAULT_BUNDLES_TO_RETRY,
+ );
+ for (const bundleId of bundleIds) {
+ this.log.info('Retrying failed bundle', { bundleId });
+ await this.txFetcher.queueTxId(bundleId);
+ }
+ } catch (error: any) {
+ this.log.error('Error retrying failed bundles:', error);
+ }
+ }
+
+ async updateBundleTimestamps() {
+ try {
+ this.log.info('Updating bundle timestamps...');
+ await this.bundleIndex.updateBundlesFullyIndexedAt();
+ this.log.info('Bundle timestamps updated.');
+ } catch (error: any) {
+ this.log.error('Error updating bundle timestamps:', error);
+ }
+ }
+
+ async backfillBundles() {
+ try {
+ this.log.info('Backfilling bundle records...');
+ await this.bundleIndex.backfillBundles();
+ this.log.info('Bundle records backfilled.');
+ } catch (error: any) {
+ this.log.error('Error backfilling bundle records:', error);
+ }
+ }
+
+ async updateForFilterChange() {
+ try {
+ this.log.info('Update bundles for filter change...');
+ await this.bundleIndex.updateBundlesForFilterChange(
+ this.unbundledFilter,
+ this.indexFilter,
+ );
+ this.log.info('Bundles updated for filter change.');
+ } catch (error: any) {
+ this.log.error('Error updating bundles for filter change:', error);
+ }
+ }
+}
diff --git a/src/workers/data-item-indexer.ts b/src/workers/data-item-indexer.ts
new file mode 100644
index 00000000..77638146
--- /dev/null
+++ b/src/workers/data-item-indexer.ts
@@ -0,0 +1,84 @@
+/**
+ * AR.IO Gateway
+ * Copyright (C) 2023 Permanent Data Solutions, Inc
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+import { default as fastq } from 'fastq';
+import type { queueAsPromised } from 'fastq';
+import * as EventEmitter from 'node:events';
+import * as winston from 'winston';
+
+import * as events from '../events.js';
+import { DataItemIndexWriter, NormalizedDataItem } from '../types.js';
+
+const DEFAULT_WORKER_COUNT = 1;
+
+export class DataItemIndexer {
+ // Dependencies
+ private log: winston.Logger;
+ private eventEmitter: EventEmitter;
+ private indexWriter: DataItemIndexWriter;
+
+ // Data indexing queue
+ private queue: queueAsPromised;
+
+ constructor({
+ log,
+ eventEmitter,
+ indexWriter,
+ workerCount = DEFAULT_WORKER_COUNT,
+ }: {
+ log: winston.Logger;
+ eventEmitter: EventEmitter;
+ indexWriter: DataItemIndexWriter;
+ workerCount?: number;
+ }) {
+ this.log = log.child({ class: 'DataItemIndexer' });
+ this.indexWriter = indexWriter;
+ this.eventEmitter = eventEmitter;
+
+ this.queue = fastq.promise(this.indexDataItem.bind(this), workerCount);
+ }
+
+ async queueDataItem(item: NormalizedDataItem): Promise {
+ const log = this.log.child({
+ method: 'queueDataItem',
+ id: item.id,
+ parentId: item.parent_id,
+ rootTxId: item.root_tx_id,
+ });
+ log.debug('Queueing data item for indexing...');
+ this.queue.push(item);
+ log.debug('Data item queued for indexing.');
+ }
+
+ async indexDataItem(item: NormalizedDataItem): Promise {
+ const log = this.log.child({
+ method: 'indexDataItem',
+ id: item.id,
+ parentId: item.parent_id,
+ rootTxId: item.root_tx_id,
+ });
+
+ try {
+ log.debug('Indexing data item...');
+ this.indexWriter.saveDataItem(item);
+ this.eventEmitter.emit(events.ANS104_DATA_ITEM_INDEXED, item);
+ log.debug('Data item indexed.');
+ } catch (error) {
+ log.error('Failed to index data item data:', error);
+ }
+ }
+}
diff --git a/src/workers/transaction-repair-worker.ts b/src/workers/transaction-repair-worker.ts
index 7a53b2b1..7941ac11 100644
--- a/src/workers/transaction-repair-worker.ts
+++ b/src/workers/transaction-repair-worker.ts
@@ -21,6 +21,7 @@ import { ChainIndex } from '../types.js';
import { TransactionFetcher } from './transaction-fetcher.js';
const DEFAULT_INTERVAL_MS = 5 * 60 * 1000;
+const DEFAULT_TXS_TO_RETRY = 20;
export class TransactionRepairWorker {
// Dependencies
@@ -48,7 +49,9 @@ export class TransactionRepairWorker {
async retryMissingTransactions() {
try {
- const missingTxIds = await this.chainIndex.getMissingTxIds();
+ const missingTxIds = await this.chainIndex.getMissingTxIds(
+ DEFAULT_TXS_TO_RETRY,
+ );
for (const txId of missingTxIds) {
this.log.info('Retrying missing transaction', { txId });
await this.txFetcher.queueTxId(txId);
diff --git a/test/bundles-schema.sql b/test/bundles-schema.sql
new file mode 100644
index 00000000..59d64ade
--- /dev/null
+++ b/test/bundles-schema.sql
@@ -0,0 +1,140 @@
+CREATE TABLE bundle_formats (
+ id INTEGER PRIMARY KEY,
+ format TEXT NOT NULL
+);
+CREATE TABLE wallets (
+ address BLOB PRIMARY KEY,
+ public_modulus BLOB
+);
+CREATE TABLE stable_data_items (
+ -- Identity
+ id BLOB NOT NULL,
+ parent_id BLOB NOT NULL,
+ root_transaction_id BLOB NOT NULL,
+ height INTEGER NOT NULL,
+ block_transaction_index INTEGER NOT NULL,
+ signature BLOB NOT NULL,
+ anchor BLOB NOT NULL,
+
+ -- Ownership
+ owner_address BLOB NOT NULL,
+ target BLOB,
+
+ -- Data
+ data_offset INTEGER NOT NULL,
+ data_size INTEGER NOT NULL,
+ content_type TEXT,
+
+ -- Metadata
+ tag_count INTEGER NOT NULL,
+ indexed_at INTEGER NOT NULL,
+ PRIMARY KEY (id)
+);
+CREATE INDEX stable_data_items_height_block_transaction_index_id_idx ON stable_data_items (height, block_transaction_index, id);
+CREATE INDEX stable_data_items_target_height_block_transaction_index_id_idx ON stable_data_items (target, height, block_transaction_index, id);
+CREATE INDEX stable_data_items_owner_address_height_block_transaction_index_id_idx ON stable_data_items (owner_address, height, block_transaction_index, id);
+CREATE INDEX stable_data_items_parent_id_height_block_transaction_index_id_idx ON stable_data_items (parent_id, height, block_transaction_index, id);
+CREATE TABLE tag_names (
+ hash BLOB PRIMARY KEY,
+ name BLOB NOT NULL
+);
+CREATE TABLE tag_values (
+ hash BLOB PRIMARY KEY,
+ value BLOB NOT NULL
+);
+CREATE TABLE stable_data_item_tags (
+ tag_name_hash BLOB NOT NULL,
+ tag_value_hash BLOB NOT NULL,
+ height INTEGER NOT NULL,
+ block_transaction_index INTEGER NOT NULL,
+ data_item_tag_index INTEGER NOT NULL,
+ data_item_id BLOB NOT NULL,
+ parent_id BLOB NOT NULL,
+ root_transaction_id BLOB NOT NULL,
+ PRIMARY KEY (tag_name_hash, tag_value_hash, height, block_transaction_index, data_item_id, data_item_tag_index)
+);
+CREATE TABLE new_data_items (
+ -- Identity
+ id BLOB NOT NULL,
+ parent_id BLOB NOT NULL,
+ root_transaction_id BLOB NOT NULL,
+ height INTEGER,
+ signature BLOB NOT NULL,
+ anchor BLOB NOT NULL,
+
+ -- Ownership
+ owner_address BLOB NOT NULL,
+ target BLOB,
+
+ -- Data
+ data_offset INTEGER NOT NULL,
+ data_size INTEGER NOT NULL,
+ content_type TEXT,
+
+ -- Metadata
+ tag_count INTEGER NOT NULL,
+ indexed_at INTEGER NOT NULL,
+ PRIMARY KEY (id)
+);
+CREATE INDEX new_data_items_parent_id_id_idx ON new_data_items (parent_id, id);
+CREATE INDEX new_data_items_root_transaction_id_id_idx ON new_data_items (root_transaction_id, id);
+CREATE INDEX new_data_items_target_id_idx ON new_data_items (target, id);
+CREATE INDEX new_data_items_owner_address_id_idx ON new_data_items (owner_address, id);
+CREATE INDEX new_data_items_height_indexed_at_idx ON new_data_items (height, indexed_at);
+CREATE TABLE new_data_item_tags (
+ tag_name_hash BLOB NOT NULL,
+ tag_value_hash BLOB NOT NULL,
+ root_transaction_id BLOB NOT NULL,
+ data_item_id BLOB NOT NULL,
+ data_item_tag_index INTEGER NOT NULL,
+ height INTEGER,
+ indexed_at INTEGER NOT NULL,
+ PRIMARY KEY (tag_name_hash, tag_value_hash, root_transaction_id, data_item_id, data_item_tag_index)
+);
+CREATE INDEX new_data_item_tags_height_indexed_at_idx ON new_data_item_tags (height, indexed_at);
+CREATE INDEX stable_data_item_tags_data_item_id_idx ON stable_data_item_tags (data_item_id);
+CREATE INDEX new_data_item_tags_data_item_id_idx ON new_data_item_tags (data_item_id);
+CREATE TABLE bundle_data_items (
+ id BLOB NOT NULL,
+ parent_id BLOB NOT NULL,
+ parent_index INTEGER NOT NULL,
+ filter_id INTEGER NOT NULL,
+ root_transaction_id BLOB NOT NULL,
+ first_indexed_at INTEGER NOT NULL,
+ last_indexed_at INTEGER NOT NULL,
+ PRIMARY KEY (id, parent_id, parent_index, filter_id)
+);
+CREATE INDEX bundle_data_items_filter_id_idx
+ ON bundle_data_items (filter_id);
+CREATE TABLE filters (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ filter TEXT NOT NULL UNIQUE
+);
+CREATE INDEX filters_filter_idx ON filters (filter);
+CREATE TABLE bundles (
+ id BLOB PRIMARY KEY,
+ format_id INTEGER NOT NULL,
+ unbundle_filter_id INTEGER,
+ index_filter_id INTEGER,
+ data_item_count INTEGER,
+ matched_data_item_count INTEGER,
+ first_queued_at INTEGER,
+ last_queued_at INTEGER,
+ first_skipped_at INTEGER,
+ last_skipped_at INTEGER,
+ first_unbundled_at INTEGER,
+ last_unbundled_at INTEGER,
+ first_fully_indexed_at INTEGER,
+ last_fully_indexed_at INTEGER
+, root_transaction_id BLOB);
+CREATE INDEX bundles_format_id_idx ON bundles (format_id);
+CREATE INDEX bundles_last_queued_at_idx
+ ON bundles (last_queued_at);
+CREATE INDEX bundles_last_skipped_at_idx
+ ON bundles (last_skipped_at);
+CREATE INDEX bundles_last_fully_indexed_at_idx
+ ON bundles (last_fully_indexed_at);
+CREATE INDEX bundles_matched_data_item_count_idx
+ ON bundles (matched_data_item_count);
+CREATE INDEX bundle_data_items_parent_id_filter_id_idx
+ ON bundle_data_items (parent_id, filter_id);
diff --git a/test/core-schema.sql b/test/core-schema.sql
index 7f628d53..ca2b9c8b 100644
--- a/test/core-schema.sql
+++ b/test/core-schema.sql
@@ -192,3 +192,6 @@ CREATE INDEX new_block_transactions_height_idx ON new_block_transactions (height
CREATE INDEX new_transactions_height_created_at_idx ON new_transactions (height, created_at);
CREATE INDEX missing_transactions_height_transaction_id_idx ON missing_transactions (height, transaction_id);
CREATE INDEX new_transaction_tags_height_created_at_idx ON new_transaction_tags (height, created_at);
+CREATE INDEX sable_block_transactions_transaction_id_idx
+ ON stable_block_transactions (transaction_id);
+CREATE INDEX new_transaction_tags_transaction_id_idx ON new_transaction_tags (transaction_id);
diff --git a/test/dump-bundles-schema.sql b/test/dump-bundles-schema.sql
new file mode 100644
index 00000000..814480ff
--- /dev/null
+++ b/test/dump-bundles-schema.sql
@@ -0,0 +1,3 @@
+.output test/bundles-schema.sql
+.schema
+.exit
diff --git a/test/dump-test-schemas b/test/dump-test-schemas
index 416bf9a4..269636f3 100755
--- a/test/dump-test-schemas
+++ b/test/dump-test-schemas
@@ -3,6 +3,7 @@
sqlite3 data/sqlite/core.db < test/dump-core-schema.sql
sqlite3 data/sqlite/data.db < test/dump-data-schema.sql
sqlite3 data/sqlite/moderation.db < test/dump-moderation-schema.sql
+sqlite3 data/sqlite/bundles.db < test/dump-bundles-schema.sql
# remove sqlite_sequence table from schema dumps
sed -i '/sqlite_sequence/d' test/*-schema.sql
diff --git a/test/sqlite-helpers.ts b/test/sqlite-helpers.ts
index d36aecfd..b75271ee 100644
--- a/test/sqlite-helpers.ts
+++ b/test/sqlite-helpers.ts
@@ -6,9 +6,11 @@ import log from '../src/log.js';
export const coreDbPath = `test/tmp/core.db`;
export const dataDbPath = `test/tmp/data.db`;
export const moderationDbPath = `test/tmp/moderation.db`;
+export const bundlesDbPath = `test/tmp/bundles.db`;
export let coreDb: Sqlite.Database;
export let dataDb: Sqlite.Database;
export let moderationDb: Sqlite.Database;
+export let bundlesDb: Sqlite.Database;
/* eslint-disable */
before(async () => {
@@ -33,10 +35,15 @@ before(async () => {
moderationDb = new Sqlite(moderationDbPath);
const moderationSchema = fs.readFileSync('test/moderation-schema.sql', 'utf8');
moderationDb.exec(moderationSchema);
+
+ // Bundles DB
+ bundlesDb = new Sqlite(bundlesDbPath);
+ const bundlesSchema = fs.readFileSync('test/bundles-schema.sql', 'utf8');
+ bundlesDb.exec(bundlesSchema);
});
afterEach(async () => {
- [coreDb, dataDb, moderationDb].forEach((db) => {
+ [coreDb, dataDb, moderationDb, bundlesDb].forEach((db) => {
db.prepare("SELECT name FROM sqlite_schema WHERE type='table'")
.all()
.forEach((row) => {
diff --git a/yarn.lock b/yarn.lock
index a0f08a37..88754bc7 100644
--- a/yarn.lock
+++ b/yarn.lock
@@ -4040,6 +4040,11 @@ json-buffer@3.0.0:
resolved "https://registry.npmjs.org/json-buffer/-/json-buffer-3.0.0.tgz"
integrity sha512-CuUqjv0FUZIdXkHPI8MezCnFCdaTAacej1TZYulLoAg1h/PhwkdXFN4V/gzY4g+fMBCOV2xF+rp7t2XD2ns/NQ==
+json-canonicalize@^1.0.6:
+ version "1.0.6"
+ resolved "https://registry.yarnpkg.com/json-canonicalize/-/json-canonicalize-1.0.6.tgz#c63dc9b909db322fec058320a0f81aef6569b257"
+ integrity sha512-kP2iYpOS5SZHYhIaR1t9oG80d4uTY3jPoaBj+nimy3njtJk8+sRsVatN8pyJRDRtk9Su3+6XqA2U8k0dByJBUQ==
+
json-parse-even-better-errors@^2.3.0:
version "2.3.1"
resolved "https://registry.yarnpkg.com/json-parse-even-better-errors/-/json-parse-even-better-errors-2.3.1.tgz#7c47805a94319928e05777405dc12e1f7a4ee02d"