Skip to content

Commit

Permalink
feat(data bundles): add support for background retrieval data sources…
Browse files Browse the repository at this point in the history
… PE-6888

Background retrieval data sources are used when retrieving data to
unbundle.  The background retrieval is configurable using the
BACKGROUND_RETRIEVAL_ORDER environment variable. The default order is
'chunks,s3,trusted-gateway,tx-data'. Chunks are given priority since
they can be checked against the L1 data root.
  • Loading branch information
djwhitt committed Oct 21, 2024
1 parent 5184d8d commit 6197b2f
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 80 deletions.
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ services:
- NODE_JS_MAX_OLD_SPACE_SIZE=${NODE_JS_MAX_OLD_SPACE_SIZE:-}
- ENABLE_FS_HEADER_CACHE_CLEANUP=${ENABLE_FS_HEADER_CACHE_CLEANUP:-true}
- ON_DEMAND_RETRIEVAL_ORDER=${ON_DEMAND_RETRIEVAL_ORDER:-}
- BACKGROUND_RETRIEVAL_ORDER=${BACKGROUND_RETRIEVAL_ORDER:-}
- WEBHOOK_TARGET_SERVERS=${WEBHOOK_TARGET_SERVERS:-}
- WEBHOOK_INDEX_FILTER=${WEBHOOK_INDEX_FILTER:-}
- WEBHOOK_BLOCK_FILTER=${WEBHOOK_BLOCK_FILTER:-}
Expand Down
2 changes: 2 additions & 0 deletions docs/envs.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ This document describes the environment variables that can be used to configure
| ADMIN_API_KEY | String | Generated | API key used for admin API requests (if not set, it's generated and logged into the console) |
| BACKFILL_BUNDLE_RECORDS | Boolean | false | If true, ar.io node will start indexing missing bundles |
| FILTER_CHANGE_REPROCESS | Boolean | false | If true, all indexed bundles will be reprocessed with the new filters (you can use this when you change the filters) |
| ON_DEMAND_RETRIEVAL_ORDER | String | s3,trusted-gateway,chunks,tx-data | Data source retrieval order for on-demand data requests |
| BACKGROUND_RETRIEVAL_ORDER | String | chunks,s3,trusted-gateway,chunks,tx-data | Data source retrieval order for background data requests (i.e., unbundling) |
| ANS104_UNBUNDLE_FILTER | String | {"never": true} | Only bundles compliant with this filter will be unbundled |
| ANS104_INDEX_FILTER | String | {"never": true} | Only bundles compliant with this filter will be indexed |
| ANS104_DOWNLOAD_WORKERS | String | 5 | Sets the number of ANS-104 bundles to attempt to download in parallel |
Expand Down
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"better-sqlite3": "^9.4.5",
"cors": "^2.8.5",
"crypto": "^1.0.1",
"dotenv": "^16.3.1",
"duckdb-async": "^1.1.0",
"express": "^4.18.1",
"express-async-handler": "^1.2.0",
Expand Down Expand Up @@ -100,7 +99,7 @@
"build": "yarn clean && npx tsc --project ./tsconfig.prod.json && yarn copy-files",
"copy-files": "copyfiles -u 1 src/**/*.graphql src/**/*.sql dist",
"clean": "npx rimraf [ .nyc_output coverage dist ]",
"start": "cross-env NODE_OPTIONS=\"--no-warnings --import=./register.js\" node src/app.ts",
"start": "cross-env NODE_OPTIONS=\"--import=./register.js\" node --env-file=.env src/app.ts",
"start:prod": "yarn build && node dist/app.js",
"watch": "nodemon",
"db:migrate": "cross-env NODE_OPTIONS=\"--import=./register.js\" node src/migrate.ts",
Expand Down
13 changes: 9 additions & 4 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import dotenv from 'dotenv';
import { canonicalize } from 'json-canonicalize';
import { isMainThread } from 'node:worker_threads';

import { createFilter } from './filters.js';
import * as env from './lib/env.js';
import { release } from './version.js';

dotenv.config();

//
// HTTP server
//
Expand Down Expand Up @@ -81,14 +78,22 @@ export const CHUNK_POST_ABORT_TIMEOUT_MS =
// Data
//

// Data retrieval priority order
// On-demand data retrieval priority order
export const ON_DEMAND_RETRIEVAL_ORDER = env
.varOrDefault(
'ON_DEMAND_RETRIEVAL_ORDER',
's3,trusted-gateway,chunks,tx-data',
)
.split(',');

// Background data retrieval priority order
export const BACKGROUND_RETRIEVAL_ORDER = env
.varOrDefault(
'BACKGROUND_RETRIEVAL_ORDER',
'chunks,s3,trusted-gateway,tx-data',
)
.split(',');

//
// Indexing
//
Expand Down
2 changes: 1 addition & 1 deletion src/log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { createLogger, format, transports } from 'winston';

import * as env from './lib/env.js';

const LOG_LEVEL = env.varOrDefault('LOG_LEVEL', 'info');
const LOG_LEVEL = env.varOrDefault('LOG_LEVEL', 'info').toLowerCase();
const LOG_ALL_STACKTRACES =
env.varOrDefault('LOG_ALL_STACKTRACES', 'false') === 'true';
const LOG_FORMAT = env.varOrDefault('LOG_FORMAT', 'simple');
Expand Down
4 changes: 2 additions & 2 deletions src/routes/data/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { createDataHandler, createRawDataHandler } from './handlers.js';
export const dataHandler = createDataHandler({
log,
dataIndex: system.contiguousDataIndex,
dataSource: system.contiguousDataSource,
dataSource: system.onDemandContiguousDataSource,
blockListValidator: system.blockListValidator,
manifestPathResolver: system.manifestPathResolver,
});
Expand All @@ -42,7 +42,7 @@ dataRouter.get(
createRawDataHandler({
log,
dataIndex: system.contiguousDataIndex,
dataSource: system.contiguousDataSource,
dataSource: system.onDemandContiguousDataSource,
blockListValidator: system.blockListValidator,
}),
);
Expand Down
64 changes: 44 additions & 20 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,31 +350,39 @@ const s3DataSource =
})
: undefined;

const dataSources: ContiguousDataSource[] = [];
for (const sourceName of config.ON_DEMAND_RETRIEVAL_ORDER) {
function getDataSource(sourceName: string): ContiguousDataSource | undefined {
switch (sourceName) {
case 's3':
if (s3DataSource !== undefined) {
dataSources.push(s3DataSource);
}
break;
return s3DataSource;
case 'ario-peer':
dataSources.push(arIODataSource);
break;
return arIODataSource;
case 'trusted-gateway':
dataSources.push(gatewayDataSource);
break;
return gatewayDataSource;
case 'chunks':
dataSources.push(txChunksDataSource);
break;
return txChunksDataSource;
case 'tx-data':
dataSources.push(arweaveClient);
break;
return arweaveClient;
default:
throw new Error(`Unknown data source: ${sourceName}`);
}
}

const onDemandDataSources: ContiguousDataSource[] = [];
for (const sourceName of config.ON_DEMAND_RETRIEVAL_ORDER) {
const dataSource = getDataSource(sourceName);
if (dataSource !== undefined) {
onDemandDataSources.push(dataSource);
}
}

const backgroundDataSources: ContiguousDataSource[] = [];
for (const sourceName of config.BACKGROUND_RETRIEVAL_ORDER) {
const dataSource = getDataSource(sourceName);
if (dataSource !== undefined) {
backgroundDataSources.push(dataSource);
}
}

const dataContentAttributeImporter = new DataContentAttributeImporter({
log,
contiguousDataIndex: contiguousDataIndex,
Expand All @@ -383,13 +391,29 @@ metrics.registerQueueLengthGauge('dataContentAttributeImporter', {
length: () => dataContentAttributeImporter.queueDepth(),
});

export const contiguousDataSource = new ReadThroughDataCache({
const contiguousDataStore = new FsDataStore({
log,
baseDir: 'data/contiguous',
});

export const onDemandContiguousDataSource = new ReadThroughDataCache({
log,
dataSource: new SequentialDataSource({
log,
dataSources: onDemandDataSources,
}),
dataStore: contiguousDataStore,
contiguousDataIndex,
dataContentAttributeImporter,
});

export const backgroundContiguousDataSource = new ReadThroughDataCache({
log,
dataSource: new SequentialDataSource({
log,
dataSources,
dataSources: backgroundDataSources,
}),
dataStore: new FsDataStore({ log, baseDir: 'data/contiguous' }),
dataStore: contiguousDataStore,
contiguousDataIndex,
dataContentAttributeImporter,
});
Expand Down Expand Up @@ -421,7 +445,7 @@ const ans104Unbundler = new Ans104Unbundler({
log,
eventEmitter,
filter: config.ANS104_UNBUNDLE_FILTER,
contiguousDataSource,
contiguousDataSource: backgroundContiguousDataSource,
dataItemIndexFilterString: config.ANS104_INDEX_FILTER_STRING,
workerCount: config.ANS104_UNBUNDLE_WORKERS,
shouldUnbundle: shouldUnbundleDataItems,
Expand All @@ -432,7 +456,7 @@ metrics.registerQueueLengthGauge('ans104Unbundler', {

const bundleDataImporter = new BundleDataImporter({
log,
contiguousDataSource,
contiguousDataSource: backgroundContiguousDataSource,
ans104Unbundler,
workerCount: config.ANS104_DOWNLOAD_WORKERS,
});
Expand Down Expand Up @@ -598,7 +622,7 @@ export const mempoolWatcher = config.ENABLE_MEMPOOL_WATCHER
export const signatureStore = makeSignatureStore({ log });
export const signatureFetcher = new SignatureFetcher({
log,
dataSource: contiguousDataSource,
dataSource: onDemandContiguousDataSource,
dataIndex: contiguousDataIndex,
chainSource: arweaveClient,
signatureStore,
Expand Down
Loading

0 comments on commit 6197b2f

Please sign in to comment.