Skip to content

Commit

Permalink
fix(read-through-data-cache): don't cache regions PE-6572
Browse files Browse the repository at this point in the history
Caching regions causes both unnecessary storage overhead in the form of
cached region data and, more importantly, inserts invalid ID to hash
mappings in the database.
  • Loading branch information
djwhitt committed Aug 20, 2024
1 parent b9939dd commit 88e16b8
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 44 deletions.
2 changes: 1 addition & 1 deletion src/data/read-through-data-cache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ describe('ReadThroughDataCache', function () {

assert.deepEqual(
(mockContiguousDataStore.createWriteStream as any).mock.callCount(),
1,
0,
);

let receivedData = '';
Expand Down
92 changes: 49 additions & 43 deletions src/data/read-through-data-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,54 +190,60 @@ export class ReadThroughDataCache implements ContiguousDataSource {
requestAttributes,
region,
});
const hasher = crypto.createHash('sha256');
const cacheStream = await this.dataStore.createWriteStream();
pipeline(data.stream, cacheStream, async (error: any) => {
if (error !== undefined) {
this.log.error('Error streaming or caching data:', {
id,
message: error.message,
stack: error.stack,
});
await this.dataStore.cleanup(cacheStream);
} else {
if (cacheStream !== undefined) {
const hash = hasher.digest('base64url');

try {
await this.dataStore.finalize(cacheStream, hash);
} catch (error: any) {
this.log.error('Error finalizing data in cache:', {
id,
message: error.message,
stack: error.stack,
});
}
// Skip caching when serving regions to avoid persisting data fragments
// and (more importantly) writing invalid ID to hash relationships in the
// DB.
if (region === undefined) {
const hasher = crypto.createHash('sha256');
const cacheStream = await this.dataStore.createWriteStream();
pipeline(data.stream, cacheStream, async (error: any) => {
if (error !== undefined) {
this.log.error('Error streaming or caching data:', {
id,
message: error.message,
stack: error.stack,
});
await this.dataStore.cleanup(cacheStream);
} else {
if (cacheStream !== undefined) {
const hash = hasher.digest('base64url');

try {
await this.dataStore.finalize(cacheStream, hash);
} catch (error: any) {
this.log.error('Error finalizing data in cache:', {
id,
message: error.message,
stack: error.stack,
});
}

this.log.debug('Successfully cached data', { id, hash });
try {
this.dataContentAttributeImporter.queueDataContentAttributes({
id,
dataRoot: attributes?.dataRoot,
hash,
dataSize: data.size,
contentType: data.sourceContentType,
cachedAt: currentUnixTimestamp(),
});
} catch (error: any) {
this.log.error('Error saving data content attributes:', {
id,
message: error.message,
stack: error.stack,
});
this.log.info('Successfully cached data', { id, hash });
try {
this.dataContentAttributeImporter.queueDataContentAttributes({
id,
dataRoot: attributes?.dataRoot,
hash,
dataSize: data.size,
contentType: data.sourceContentType,
cachedAt: currentUnixTimestamp(),
});
} catch (error: any) {
this.log.error('Error saving data content attributes:', {
id,
message: error.message,
stack: error.stack,
});
}
}
}
}
});
});

data.stream.on('data', (chunk) => {
hasher.update(chunk);
});
data.stream.on('data', (chunk) => {
hasher.update(chunk);
});
}

data.stream.on('error', () => {
metrics.getDataStreamErrorsTotal.inc({
Expand Down

0 comments on commit 88e16b8

Please sign in to comment.