From 1d569fe8f086eb9fd13fc2c280e2f68a8cb19faf Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Mon, 4 Aug 2025 17:06:32 -0700 Subject: [PATCH 1/6] Process batches in order to handle purge messages correctly --- src/parseDelivery.ts | 2 +- src/s3poller.ts | 78 ++++++++++++++++++++++++++++++++------------ 2 files changed, 59 insertions(+), 21 deletions(-) diff --git a/src/parseDelivery.ts b/src/parseDelivery.ts index e6441a5..bafe5e0 100644 --- a/src/parseDelivery.ts +++ b/src/parseDelivery.ts @@ -967,7 +967,7 @@ function resolveAudiusGenre( // maybe try some edit distance magic? // for now just log - console.warn(`failed to resolve genre: subgenre=${subgenre} genre=${genre}`) + console.warn(`no matching genre: subgenre=${subgenre} genre=${genre}`) } export function parseDuration(dur: string) { diff --git a/src/s3poller.ts b/src/s3poller.ts index 30d02c0..c2554f0 100644 --- a/src/s3poller.ts +++ b/src/s3poller.ts @@ -4,6 +4,7 @@ import { S3Client, S3ClientConfig, } from '@aws-sdk/client-s3' +import * as cheerio from 'cheerio' import { mkdir, readFile, stat, writeFile } from 'fs/promises' import { basename, dirname, join, resolve } from 'path' import sharp from 'sharp' @@ -80,14 +81,32 @@ export async function pollS3Source( // Handle case where there are common prefixes (folder structure) if (prefixes && prefixes.length > 0) { - const batchSize = 20 + const batchSize = 100 for (let i = 0; i < prefixes.length; i += batchSize) { const batch = prefixes.slice(i, i + batchSize) + + // Collect all files from all prefixes in this batch + const allFiles: any[] = [] + + // Fetch files from all prefixes in parallel for efficiency await Promise.all( batch.map(async (prefix) => { - await scanS3Prefix(sourceName, client, bucket, prefix) + const prefixResult = await client.send( + new ListObjectsCommand({ + Bucket: bucket, + Prefix: prefix, + }) + ) + if (prefixResult.Contents) { + allFiles.push(...prefixResult.Contents) + } }) ) + + // Process all files in chronological order + if (allFiles.length > 0) { + await processS3Contents(sourceName, client, bucket, allFiles) + } } // save marker for prefixes @@ -126,26 +145,45 @@ async function processS3Contents( bucket: string, contents: any[] ) { - for (const c of contents) { - if (!c.Key) continue + // First, fetch and parse all XML files to get their timestamps + const filesWithTimestamps = await Promise.all( + contents + .filter(c => c.Key?.toLowerCase().endsWith('.xml') && !c.Key.includes('batchcomplete')) + .map(async (c) => { + try { + const { Body } = await client.send( + new GetObjectCommand({ + Bucket: bucket, + Key: c.Key, + }) + ) + const xml = await Body?.transformToString() + if (xml) { + const $ = cheerio.load(xml, { xmlMode: true }) + const messageTimestamp = $('MessageCreatedDateTime').first().text() + return { key: c.Key, xml, messageTimestamp } + } + } catch (error) { + console.error(`Failed to fetch/parse ${c.Key}:`, error) + } + return null + }) + ) - const lowerKey = c.Key.toLowerCase() - if (!lowerKey.endsWith('.xml') || lowerKey.includes('batchcomplete')) { - continue - } + // Sort by messageTimestamp, handling cases where timestamp might be missing + const sortedFiles = filesWithTimestamps + .filter(Boolean) + .sort((a, b) => { + const timestampA = a!.messageTimestamp || '' + const timestampB = b!.messageTimestamp || '' + return timestampA.localeCompare(timestampB) + }) - const xmlUrl = `s3://` + join(bucket, c.Key) - const { Body } = await client.send( - new GetObjectCommand({ - Bucket: bucket, - Key: c.Key, - }) - ) - const xml = await Body?.transformToString() - if (xml) { - console.log('parsing', xmlUrl) - const releases = (await parseDdexXml(source, xmlUrl, xml)) || [] - } + // Process in chronological order + for (const file of sortedFiles) { + const xmlUrl = `s3://` + join(bucket, file!.key) + console.log('parsing', xmlUrl, 'timestamp:', file!.messageTimestamp) + const releases = (await parseDdexXml(source, xmlUrl, file!.xml)) || [] } } From 9f0177aff8ac2143fb439dfb8b264284ae25ad34 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Sun, 10 Aug 2025 12:25:02 -0700 Subject: [PATCH 2/6] Add signed stream URL for fast playback --- package-lock.json | 33 +++++++++++++++++++++++++++++++++ package.json | 1 + src/s3poller.ts | 34 ++++++++++++++++++++++++++++++++++ src/server.tsx | 21 ++++++++++++++++++++- 4 files changed, 88 insertions(+), 1 deletion(-) diff --git a/package-lock.json b/package-lock.json index 3b93874..1656c1d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,6 +13,7 @@ "@audius/sdk": "^9.0.0", "@aws-sdk/client-s3": "3.658.1", "@aws-sdk/credential-provider-ini": "3.658.1", + "@aws-sdk/s3-request-presigner": "^3.658.1", "@hono/node-server": "^1.13.1", "cheerio": "^1.0.0", "commander": "^12.1.0", @@ -1078,6 +1079,24 @@ "node": ">=16.0.0" } }, + "node_modules/@aws-sdk/s3-request-presigner": { + "version": "3.658.1", + "resolved": "https://registry.npmjs.org/@aws-sdk/s3-request-presigner/-/s3-request-presigner-3.658.1.tgz", + "integrity": "sha512-FQsECwePc34AAZU2mt0GUOppUIwOCLdsBkDQdCDyLDuWMN1+caYVzSAu++pJpkA+1MDdAKp4AiJyNiWbe/uI5g==", + "dependencies": { + "@aws-sdk/signature-v4-multi-region": "3.658.1", + "@aws-sdk/types": "3.654.0", + "@aws-sdk/util-format-url": "3.654.0", + "@smithy/middleware-endpoint": "^3.1.3", + "@smithy/protocol-http": "^4.1.3", + "@smithy/smithy-client": "^3.3.5", + "@smithy/types": "^3.4.2", + "tslib": "^2.6.2" + }, + "engines": { + "node": ">=16.0.0" + } + }, "node_modules/@aws-sdk/signature-v4-multi-region": { "version": "3.658.1", "resolved": "https://registry.npmjs.org/@aws-sdk/signature-v4-multi-region/-/signature-v4-multi-region-3.658.1.tgz", @@ -1149,6 +1168,20 @@ "node": ">=16.0.0" } }, + "node_modules/@aws-sdk/util-format-url": { + "version": "3.654.0", + "resolved": "https://registry.npmjs.org/@aws-sdk/util-format-url/-/util-format-url-3.654.0.tgz", + "integrity": "sha512-2yAlJ/l1uTJhS52iu4+/EvdIyQhDBL+nATY8rEjFI0H+BHGVrJIH2CL4DByhvi2yvYwsqQX0HYah6pF/yoXukA==", + "dependencies": { + "@aws-sdk/types": "3.654.0", + "@smithy/querystring-builder": "^3.0.6", + "@smithy/types": "^3.4.2", + "tslib": "^2.6.2" + }, + "engines": { + "node": ">=16.0.0" + } + }, "node_modules/@aws-sdk/util-locate-window": { "version": "3.568.0", "resolved": "https://registry.npmjs.org/@aws-sdk/util-locate-window/-/util-locate-window-3.568.0.tgz", diff --git a/package.json b/package.json index 5ad0715..c166fc4 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,7 @@ "@audius/sdk": "^9.0.0", "@aws-sdk/client-s3": "3.658.1", "@aws-sdk/credential-provider-ini": "3.658.1", + "@aws-sdk/s3-request-presigner": "^3.658.1", "@hono/node-server": "^1.13.1", "cheerio": "^1.0.0", "commander": "^12.1.0", diff --git a/src/s3poller.ts b/src/s3poller.ts index c2554f0..edffa80 100644 --- a/src/s3poller.ts +++ b/src/s3poller.ts @@ -4,6 +4,7 @@ import { S3Client, S3ClientConfig, } from '@aws-sdk/client-s3' +import { getSignedUrl } from '@aws-sdk/s3-request-presigner' import * as cheerio from 'cheerio' import { mkdir, readFile, stat, writeFile } from 'fs/promises' import { basename, dirname, join, resolve } from 'path' @@ -226,11 +227,14 @@ export async function readAssetWithCaching( const destinationPath = join( ...[cacheBaseDir, Bucket, imageSize, Key].filter(Boolean) ) + console.log('destinationPath', destinationPath) // fetch if needed const exists = await fileExists(destinationPath) if (!exists) { + console.log('fetching', xmlUrl) const source = sources.findByXmlUrl(xmlUrl) + console.log('source', source) const s3 = dialS3(source) await mkdir(dirname(destinationPath), { recursive: true }) const { Body } = await s3.send(new GetObjectCommand({ Bucket, Key })) @@ -273,6 +277,36 @@ export async function readAssetWithCaching( return readFileToBuffer(fileUrl) } +// Return a short-lived presigned HTTPS URL for an S3 object referenced by xmlUrl + filePath + fileName +export async function getPresignedAssetUrl({ + xmlUrl, + filePath, + fileName, + expiresInSeconds +}: { + xmlUrl: string, + filePath: string, + fileName: string, + expiresInSeconds: number +}): Promise { + if (!xmlUrl.startsWith('s3:')) { + // local file path; resolve to file URL + const fileUrl = resolve(xmlUrl, '..', filePath, fileName) + return fileUrl + } + + const s3url = new URL(`${filePath}${fileName}`, xmlUrl) + const Bucket = s3url.host + const Key = s3url.pathname.substring(1) + const source = sources.findByXmlUrl(xmlUrl) + const s3 = dialS3(source) + const command = new GetObjectCommand({ Bucket, Key }) + const signed = await getSignedUrl(s3 as any, command as any, { + expiresIn: expiresInSeconds, + }) + return signed +} + // sdk helpers async function readFileToBuffer(filePath: string) { const buffer = await readFile(filePath) diff --git a/src/server.tsx b/src/server.tsx index ac1ced6..a44282f 100644 --- a/src/server.tsx +++ b/src/server.tsx @@ -23,7 +23,12 @@ import { import { DDEXContributor, DDEXRelease, parseDdexXml } from './parseDelivery' import { prepareAlbumMetadata, prepareTrackMetadatas } from './publishRelease' import { generateSalesReport } from './reporting/sales_report' -import { dialS3, parseS3Url, readAssetWithCaching } from './s3poller' +import { + dialS3, + getPresignedAssetUrl, + parseS3Url, + readAssetWithCaching, +} from './s3poller' import { sources } from './sources' import { parseBool } from './util' @@ -756,8 +761,22 @@ app.get('/release/:source/:key/:ref/:size?', async (c) => { const size = c.req.param('size') const asset = await assetRepo.get(source, key, ref) + console.log('asset', asset) if (!asset) return c.json({ error: 'not found' }, 404) + // If no resizing requested (a stream instead of an image), + // redirect to presigned S3 URL to avoid proxying bytes + if (!size) { + const url = await getPresignedAssetUrl({ + xmlUrl: asset.xmlUrl, + filePath: asset.filePath, + fileName: asset.fileName, + expiresInSeconds: 600 + }) + return c.redirect(url, 302) + } + + // Resize requested: keep existing behavior (read, resize via cache helper) for now const ok = await readAssetWithCaching( asset.xmlUrl, asset.filePath, From d65dbbf7dd9170a0d3e043abd621f2f3466d26f1 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Sun, 10 Aug 2025 12:29:31 -0700 Subject: [PATCH 3/6] Add status column --- src/server.tsx | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/server.tsx b/src/server.tsx index a44282f..5e18716 100644 --- a/src/server.tsx +++ b/src/server.tsx @@ -341,6 +341,7 @@ app.get('/releases', async (c) => { Genre Release Clear + Status debug @@ -405,6 +406,7 @@ ${row.soundRecordings.length} tracks`} )} + {row.status} {row.publishErrorCount > 0 && ( From 5b7696738e35556d1805bf9c2c8388423ab17c74 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Wed, 20 Aug 2025 00:14:05 -0700 Subject: [PATCH 4/6] Fix issues with ddex4.0, add better ui status --- src/parseDelivery.ts | 104 ++++++++++++++++++++++++++++++++++++++----- src/server.tsx | 23 +++++++++- 2 files changed, 114 insertions(+), 13 deletions(-) diff --git a/src/parseDelivery.ts b/src/parseDelivery.ts index bafe5e0..b602dc2 100644 --- a/src/parseDelivery.ts +++ b/src/parseDelivery.ts @@ -233,13 +233,26 @@ export async function parseDdexXml( // Only send acknowledgement for SME source === 'sme' ) { - await acknowledgeReleaseSuccess({ - source, - xmlUrl, - messageId, - messageTimestamp, - releases, - }) + const hasErrors = releases.some((r) => + r.problems.includes('InvalidDealDate') + ) + if (hasErrors) { + await acknowledgeReleaseFailure({ + source, + xmlUrl, + messageId, + messageTimestamp, + error: 'InvalidDealDate', + }) + } else { + await acknowledgeReleaseSuccess({ + source, + xmlUrl, + messageId, + messageTimestamp, + releases, + }) + } } return releases @@ -401,7 +414,7 @@ async function parseReleaseXml(source: string, $: cheerio.CheerioAPI, isDdex40: // DDEX 4.0 structure: DealList > ReleaseDeal > Deal > DealTerms $('DealList > ReleaseDeal').each((_, el) => { const $el = $(el) - const ref = $el.find('ReleaseReference').text() + const refs = $el.find('DealReleaseReference').toArray().map(el => $(el).text()); $el.find('Deal > DealTerms').each((_, el) => { const $el = $(el) @@ -434,8 +447,10 @@ async function parseReleaseXml(source: string, $: cheerio.CheerioAPI, isDdex40: // add deal function addDeal(deal: AudiusSupportedDeal) { - releaseDeals[ref] ||= [] - releaseDeals[ref].push(deal) + for (const ref of refs) { + releaseDeals[ref] ||= [] + releaseDeals[ref].push(deal) + } } const common: DealFields = { @@ -770,7 +785,38 @@ async function parseReleaseXml(source: string, $: cheerio.CheerioAPI, isDdex40: const $el = $(el) const ref = $el.find('ReleaseReference').text() - const deals = releaseDeals[ref] || [] + let deals = releaseDeals[ref] || [] + + // For DDEX 4.0, if the album has no direct deals, aggregate from TrackReleases + if (deals.length === 0 && isDdex40) { + // Build TrackRelease -> Resource mapping + const trackRefToResourceRef: Record = {} + $('ReleaseList > TrackRelease').each((_, el) => { + const $tr = $(el) + const trRef = $tr.find('ReleaseReference').text() + const rrRef = $tr.find('ReleaseResourceReference').text() + if (trRef && rrRef) trackRefToResourceRef[trRef] = rrRef + }) + + // Collect resource refs contained in this album + const releaseResourceRefs = new Set() + $el + .find('ResourceGroup ReleaseResourceReference, ResourceGroupContentItem > ReleaseResourceReference') + .each((_, el) => { + releaseResourceRefs.add($(el).text()) + }) + + // Find TrackReleases that are members of this album + const memberTrackRefs = Object.entries(trackRefToResourceRef) + .filter(([, resRef]) => releaseResourceRefs.has(resRef)) + .map(([trRef]) => trRef) + + const aggregated = memberTrackRefs.flatMap((r) => releaseDeals[r] || []) + if (aggregated.length) { + deals = aggregated + } + } + const validityStartDate = deals.length ? deals[0].validityStartDate : undefined @@ -812,7 +858,7 @@ async function parseReleaseXml(source: string, $: cheerio.CheerioAPI, isDdex40: producerCopyrightLine: pline($el), parentalWarningType: toText($el.find('ParentalWarningType')), - isMainRelease: $el.attr('IsMainRelease') == 'true', + isMainRelease: isDdex40 ? true : $el.attr('IsMainRelease') == 'true', problems: [], soundRecordings: [], @@ -863,6 +909,40 @@ async function parseReleaseXml(source: string, $: cheerio.CheerioAPI, isDdex40: release.problems.push('NoDeal') } + // Validate FirstPublicationDate/recording releaseDate + { + const hasInvalidDate = release.soundRecordings.some((sr) => { + const dateText = sr.releaseDate || '' + const m = dateText.match(/(\d{4})/) + if (!m) return true + const year = parseInt(m[1]) + if (!year || year <= 1900) return true + return false + }) + if (hasInvalidDate) { + release.problems.push('InvalidFirstPublicationDate') + } + } + + // Validate deal start vs OriginalReleaseDate + { + const originalReleaseDateText = $el.find('OriginalReleaseDate').text() + if (originalReleaseDateText) { + const ord = new Date(originalReleaseDateText) + if (!isNaN(ord.getTime())) { + const invalidDeal = release.deals.some((d) => { + if (!d.validityStartDate) return false + const ds = new Date(d.validityStartDate) + if (isNaN(ds.getTime())) return false + return ds < ord + }) + if (invalidDeal) { + release.problems.push('InvalidDealDate') + } + } + } + } + return release }) diff --git a/src/server.tsx b/src/server.tsx index 5e18716..0a48629 100644 --- a/src/server.tsx +++ b/src/server.tsx @@ -406,7 +406,28 @@ ${row.soundRecordings.length} tracks`} )} - {row.status} + + {row.status} + {row.problems?.length > 0 && ( +
+ {row.problems.map((p) => ( + + {p} + + ))} +
+ )} + {row.publishErrorCount > 0 && (
From 12f58f6afce5a20b423159d1ec90e16968f9c948 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Wed, 20 Aug 2025 01:31:41 -0700 Subject: [PATCH 5/6] Fix failure send --- src/acknowledgement.ts | 54 +++++++++++++++++++++++------------------- src/parseDelivery.ts | 1 + 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/src/acknowledgement.ts b/src/acknowledgement.ts index fde313f..2ba1898 100644 --- a/src/acknowledgement.ts +++ b/src/acknowledgement.ts @@ -110,12 +110,14 @@ function generateAcknowledgementXml({ releases, isSuccess, error, + recipientPartyId, }: { source: string, messageId: string, releases: DDEXRelease[], isSuccess: boolean, - error?: string + error?: string, + recipientPartyId?: string }): string { const $ = cheerio.load('', { xmlMode: true }) @@ -139,7 +141,8 @@ function generateAcknowledgementXml({ // MessageRecipient (Source) const messageRecipient = $('') - messageRecipient.append($(`${source.toUpperCase()}`)) + const resolvedRecipient = recipientPartyId || (source === 'sme' ? 'PADPIDA2007040502I' : source.toUpperCase()) + messageRecipient.append($(`${resolvedRecipient}`)) const recipientPartyName = $('') recipientPartyName.append($(`${getSourcePartyName(source)}`)) messageRecipient.append(recipientPartyName) @@ -175,31 +178,27 @@ function generateAcknowledgementXml({ // ReleaseStatus - different values for success vs error if (isSuccess) { releaseStatus.append($('SuccessfullyIngestedByReleaseDistributor')) + const acknowledgement = $('') + acknowledgement.append($('NewReleaseMessage')) + acknowledgement.append($(`${messageId}`)) + const messageStatus = $('') + messageStatus.append($('FileOK')) + acknowledgement.append(messageStatus) + releaseStatus.append(acknowledgement) } else { releaseStatus.append($('ProcessingErrorAtReleaseDistributor')) - - // Add ErrorText for failures (at ReleaseStatus level) + const acknowledgement = $('') + acknowledgement.append($('NewReleaseMessage')) + acknowledgement.append($(`${messageId}`)) + const messageStatus = $('') + messageStatus.append($('ResourceCorrupt')) if (error) { - releaseStatus.append($(`${error}`)) + messageStatus.append($(`${error}`)) } + acknowledgement.append(messageStatus) + releaseStatus.append(acknowledgement) } - // Acknowledgement - const acknowledgement = $('') - acknowledgement.append($('NewReleaseMessage')) - acknowledgement.append($(`${messageId}`)) - - const messageStatus = $('') - if (isSuccess) { - messageStatus.append($('FileOK')) - } else { - // Use ResourceCorrupt for errors as per the example - messageStatus.append($('ResourceCorrupt')) - } - - acknowledgement.append(messageStatus) - releaseStatus.append(acknowledgement) - root.append(releaseStatus) } } else { @@ -263,7 +262,8 @@ async function sendAcknowledgement(source: string, xml: string) { }) if (!statusResponse.ok) { - throw new Error(`Status post failed: ${statusResponse.status} ${statusResponse.statusText}`) + const errText = await statusResponse.text().catch(() => '') + throw new Error(`Status post failed: ${statusResponse.status} ${statusResponse.statusText} ${errText ? '- ' + errText : ''}`) } console.log('Acknowledgement XML posted successfully') @@ -315,7 +315,8 @@ export async function acknowledgeReleaseSuccess({ source, messageId, releases, - isSuccess: true + isSuccess: true, + recipientPartyId: source === 'sme' ? 'PADPIDA2007040502I' : undefined } ) console.log(acknowledgementXml) @@ -337,12 +338,14 @@ export async function acknowledgeReleaseFailure({ messageId, messageTimestamp, error, + releases, }: { source: string xmlUrl: string messageId: string messageTimestamp: string error: string | Error + releases?: DDEXRelease[] }) { const errorMessage = error instanceof Error ? error.message : error @@ -366,9 +369,10 @@ export async function acknowledgeReleaseFailure({ { source, messageId, - releases: [], // Empty releases array for failures + releases: releases || [], // Include releases when available isSuccess: false, - error: errorMessage + error: errorMessage, + recipientPartyId: source === 'sme' ? 'PADPIDA2007040502I' : undefined } ) console.log(acknowledgementXml) diff --git a/src/parseDelivery.ts b/src/parseDelivery.ts index b602dc2..bb90c5d 100644 --- a/src/parseDelivery.ts +++ b/src/parseDelivery.ts @@ -242,6 +242,7 @@ export async function parseDdexXml( xmlUrl, messageId, messageTimestamp, + releases, error: 'InvalidDealDate', }) } else { From 3b723eadc90dbc0099ca963d986cc28ea8cd13ed Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Wed, 20 Aug 2025 01:37:13 -0700 Subject: [PATCH 6/6] Remove logs --- src/s3poller.ts | 3 --- src/server.tsx | 1 - 2 files changed, 4 deletions(-) diff --git a/src/s3poller.ts b/src/s3poller.ts index edffa80..6edf37a 100644 --- a/src/s3poller.ts +++ b/src/s3poller.ts @@ -227,14 +227,11 @@ export async function readAssetWithCaching( const destinationPath = join( ...[cacheBaseDir, Bucket, imageSize, Key].filter(Boolean) ) - console.log('destinationPath', destinationPath) // fetch if needed const exists = await fileExists(destinationPath) if (!exists) { - console.log('fetching', xmlUrl) const source = sources.findByXmlUrl(xmlUrl) - console.log('source', source) const s3 = dialS3(source) await mkdir(dirname(destinationPath), { recursive: true }) const { Body } = await s3.send(new GetObjectCommand({ Bucket, Key })) diff --git a/src/server.tsx b/src/server.tsx index 0a48629..3ed82b4 100644 --- a/src/server.tsx +++ b/src/server.tsx @@ -784,7 +784,6 @@ app.get('/release/:source/:key/:ref/:size?', async (c) => { const size = c.req.param('size') const asset = await assetRepo.get(source, key, ref) - console.log('asset', asset) if (!asset) return c.json({ error: 'not found' }, 404) // If no resizing requested (a stream instead of an image),