@@ -26,7 +26,18 @@ class IngestionUtility extends ReplicationUtility {
2626 Key : objName ,
2727 VersionId : versionId ,
2828 } ) )
29- . then ( data => cb ( null , data ) )
29+ . then ( async ( data ) => {
30+ // AWS SDK v3 returns a readable stream in data.Body
31+ // We need to collect the stream data into a buffer
32+ if ( data . Body ) {
33+ const chunks = [ ] ;
34+ for await ( const chunk of data . Body ) {
35+ chunks . push ( chunk ) ;
36+ }
37+ data . Body = Buffer . concat ( chunks ) ;
38+ }
39+ cb ( null , data ) ;
40+ } )
3041 . catch ( err => cb ( err ) ) ;
3142 }
3243
@@ -36,37 +47,60 @@ class IngestionUtility extends ReplicationUtility {
3647 Key : objName ,
3748 VersionId : versionId ,
3849 } ) )
39- . then ( data => cb ( null , data ) )
50+ . then ( async ( data ) => {
51+ // AWS SDK v3 returns a readable stream in data.Body
52+ // We need to collect the stream data into a buffer
53+ if ( data . Body ) {
54+ const chunks = [ ] ;
55+ for await ( const chunk of data . Body ) {
56+ chunks . push ( chunk ) ;
57+ }
58+ data . Body = Buffer . concat ( chunks ) ;
59+ }
60+ cb ( null , data ) ;
61+ } )
4062 . catch ( err => cb ( err ) ) ;
4163 }
4264
4365 createIngestionBucket ( bucketName , locationName , cb ) {
66+ console . log ( `[INGESTION] Creating ingestion bucket: ${ bucketName } with location: ${ locationName } ` ) ;
4467 const locationNameWithSuffix = `${ locationName } :ingest` ;
45- return this . s3 . send ( new CreateBucketCommand ( {
68+ this . s3 . send ( new CreateBucketCommand ( {
4669 Bucket : bucketName ,
4770 CreateBucketConfiguration : {
4871 LocationConstraint : locationNameWithSuffix ,
4972 } ,
5073 } ) )
5174 . then ( ( ) => {
75+ console . log ( `[INGESTION] Successfully created bucket: ${ bucketName } ` ) ;
76+ console . log ( `[INGESTION] Waiting 10 seconds for backbeat ingestion processes to be up-to-date` ) ;
5277 // When resuming an ingestion-enabled location,
5378 // backbeat gets the list of buckets with ingestion-enabled
5479 // to check if the location is valid.
5580 // Backbeat sets the list of buckets with ingestion-enabled periodically,
5681 // so the list might be outdated for few seconds leading to a 404 API error response.
5782 // Also backbeat "ingestion producer" process applies update every 5 seconds.
5883 // For this reason, we are waiting 10 seconds to make sure ingestion processes are up-to-date.
59- return setTimeout ( ( ) => backbeatAPIUtils . resumeIngestion ( locationName , false , null , ( err , body ) => {
60- if ( err ) {
61- return cb ( err ) ;
62- }
63- if ( body . code ) {
64- return cb ( `error resuming ingestion: ${ JSON . stringify ( body ) } ` ) ;
65- }
66- return cb ( ) ;
67- } ) , 10000 ) ;
84+ setTimeout ( ( ) => {
85+ console . log ( `[INGESTION] Resuming ingestion for location: ${ locationName } ` ) ;
86+ backbeatAPIUtils . resumeIngestion ( locationName , false , null , ( err , body ) => {
87+ if ( err ) {
88+ console . error ( `[INGESTION] Error resuming ingestion:` , err ) ;
89+ return cb ( err ) ;
90+ }
91+ if ( body . code ) {
92+ console . error ( `[INGESTION] Error resuming ingestion - bad response:` , body ) ;
93+ return cb ( `error resuming ingestion: ${ JSON . stringify ( body ) } ` ) ;
94+ }
95+ console . log ( `[INGESTION] Successfully resumed ingestion for location: ${ locationName } ` ) ;
96+ return cb ( ) ;
97+ } ) ;
98+ } , 10000 ) ;
6899 } )
69- . catch ( err => cb ( err ) ) ;
100+ . catch ( err => {
101+ console . error ( `[INGESTION] Error creating bucket: ${ bucketName } ` , err ) ;
102+ cb ( err ) ;
103+ } ) ;
70104 }
71105
72106 putObjectWithProperties ( bucketName , objectName , content , cb ) {
@@ -84,27 +118,49 @@ class IngestionUtility extends ReplicationUtility {
84118 }
85119
86120 waitUntilIngested ( bucketName , key , versionId , cb ) {
121+ console . log ( `[INGESTION] Waiting for object to be ingested: ${ bucketName } /${ key } (version: ${ versionId } )` ) ;
87122 let status ;
123+ let attemptCount = 0 ;
124+ const maxAttempts = 90 ; // 3 minutes max (90 * 2 seconds = 180 seconds)
88125 const expectedCode = 'NotFound' ;
89126 return async . doWhilst (
90- callback => this . s3 . send ( new HeadObjectCommand ( {
91- Bucket : bucketName ,
92- Key : key ,
93- VersionId : versionId ,
94- } ) )
95- . then ( ( ) => {
96- status = true ;
97- return callback ( ) ;
98- } )
99- . catch ( err => {
100- if ( err . name !== expectedCode ) {
101- return callback ( err ) ;
102- }
103- status = false ;
104- return setTimeout ( callback , 2000 ) ;
105- } ) ,
127+ callback => {
128+ attemptCount ++ ;
129+ if ( attemptCount > maxAttempts ) {
130+ console . error ( `[INGESTION] Timeout: Object not ingested after ${ maxAttempts } attempts (${ maxAttempts * 2 } seconds)` ) ;
131+ return callback ( new Error ( `Ingestion timeout: Object ${ bucketName } /${ key } not ingested after ${ maxAttempts * 2 } seconds` ) ) ;
132+ }
133+ console . log ( `[INGESTION] Attempt ${ attemptCount } /${ maxAttempts } : Checking if object exists in destination bucket` ) ;
134+ this . s3 . send ( new HeadObjectCommand ( {
135+ Bucket : bucketName ,
136+ Key : key ,
137+ VersionId : versionId ,
138+ } ) )
139+ . then ( ( ) => {
140+ console . log ( `[INGESTION] Object found in destination bucket after ${ attemptCount } attempts` ) ;
141+ status = true ;
142+ return callback ( ) ;
143+ } )
144+ . catch ( err => {
145+ console . log ( `[INGESTION] Err (${ err } ` ) ;
146+ if ( err . name !== expectedCode ) {
147+ console . error ( `[INGESTION] Unexpected error checking object existence:` , err ) ;
148+ return callback ( err ) ;
149+ }
150+ console . log ( `[INGESTION] Object not yet ingested, waiting 2 seconds before retry (${ attemptCount } /${ maxAttempts } )...` ) ;
151+ status = false ;
152+ return setTimeout ( callback , 2000 ) ;
153+ } ) ;
154+ } ,
106155 ( ) => ! status ,
107- cb ,
156+ err => {
157+ if ( err ) {
158+ console . error ( `[INGESTION] Failed waiting for ingestion after ${ attemptCount } attempts:` , err ) ;
159+ } else {
160+ console . log ( `[INGESTION] Successfully waited for ingestion after ${ attemptCount } attempts` ) ;
161+ }
162+ cb ( err ) ;
163+ } ,
108164 ) ;
109165 }
110166
@@ -131,59 +187,98 @@ class IngestionUtility extends ReplicationUtility {
131187 return async . doWhilst (
132188 callback => this . s3 . send ( new ListObjectVersionsCommand ( { Bucket : bucketName } ) )
133189 . then ( data => {
134- const versionLength = data . Versions . length ;
135- const deleteLength = data . DeleteMarkers . length ;
190+ const versionLength = ( data . Versions || [ ] ) . length ;
191+ const deleteLength = ( data . DeleteMarkers || [ ] ) . length ;
136192 objectsEmpty = ( versionLength + deleteLength ) === 0 ;
137193 if ( objectsEmpty ) {
138194 return callback ( ) ;
139195 }
140196 return setTimeout ( callback , 2000 ) ;
141197 } )
142- . catch ( err => cb ( err ) ) ,
198+ . catch ( err => callback ( err ) ) ,
143199 ( ) => ! objectsEmpty ,
144200 cb ,
145201 ) ;
146202 }
147203
148204 compareObjectsRINGS3C ( srcBucket , destBucket , key , versionId , optionalFields , cb ) {
205+ console . log ( `[INGESTION] Starting object comparison: ${ srcBucket } /${ key } -> ${ destBucket } /${ key } (version: ${ versionId } )` ) ;
149206 return async . series ( [
150- next => this . waitUntilIngested (
151- destBucket ,
152- key ,
153- versionId ,
154- next ,
155- ) ,
156- next => this . getSourceObject ( srcBucket , key , versionId , next ) ,
157- next => this . getDestObject ( destBucket , key , versionId , next ) ,
207+ next => {
208+ console . log ( `[INGESTION] Step 1: Waiting for object to be ingested` ) ;
209+ this . waitUntilIngested (
210+ destBucket ,
211+ key ,
212+ versionId ,
213+ err => {
214+ if ( err ) {
215+ console . error ( `[INGESTION] Step 1 failed:` , err ) ;
216+ } else {
217+ console . log ( `[INGESTION] Step 1 completed: Object is ingested` ) ;
218+ }
219+ next ( err ) ;
220+ } ,
221+ ) ;
222+ } ,
223+ next => {
224+ console . log ( `[INGESTION] Step 2: Getting source object` ) ;
225+ this . getSourceObject ( srcBucket , key , versionId , ( err , data ) => {
226+ if ( err ) {
227+ console . error ( `[INGESTION] Step 2 failed:` , err ) ;
228+ } else {
229+ console . log ( `[INGESTION] Step 2 completed: Got source object` ) ;
230+ }
231+ next ( err , data ) ;
232+ } ) ;
233+ } ,
234+ next => {
235+ console . log ( `[INGESTION] Step 3: Getting destination object` ) ;
236+ this . getDestObject ( destBucket , key , versionId , ( err , data ) => {
237+ if ( err ) {
238+ console . error ( `[INGESTION] Step 3 failed:` , err ) ;
239+ } else {
240+ console . log ( `[INGESTION] Step 3 completed: Got destination object` ) ;
241+ }
242+ next ( err , data ) ;
243+ } ) ;
244+ } ,
158245 ] , ( err , data ) => {
159246 if ( err ) {
247+ console . error ( `[INGESTION] Object comparison failed:` , err ) ;
160248 return cb ( err ) ;
161249 }
250+ console . log ( `[INGESTION] Starting object data comparison` ) ;
162251 const srcData = data [ 1 ] ;
163252 const destData = data [ 2 ] ;
164- assert . strictEqual (
165- srcData . ContentLength ,
166- destData . ContentLength ,
167- ) ;
168- this . _compareObjectBody ( srcData . Body , destData . Body ) ;
169- assert . deepStrictEqual ( srcData . Metadata , destData . Metadata ) ;
170- assert . strictEqual ( srcData . ETag , destData . ETag ) ;
171- assert . strictEqual ( srcData . ContentType , destData . ContentType ) ;
172- assert . strictEqual ( srcData . VersionId , destData . VersionId ) ;
173- assert . strictEqual (
174- srcData . LastModified . toString ( ) ,
175- destData . LastModified . toString ( ) ,
176- ) ;
177- if ( optionalFields ) {
178- optionalFields . forEach ( field => {
179- if ( field === 'Metadata' ) {
180- assert . strictEqual ( srcData . customKey , destData . customKey ) ;
181- } else {
182- assert . strictEqual ( srcData [ field ] , destData [ field ] ) ;
183- }
184- } ) ;
253+ try {
254+ assert . strictEqual (
255+ srcData . ContentLength ,
256+ destData . ContentLength ,
257+ ) ;
258+ this . _compareObjectBody ( srcData . Body , destData . Body ) ;
259+ assert . deepStrictEqual ( srcData . Metadata , destData . Metadata ) ;
260+ assert . strictEqual ( srcData . ETag , destData . ETag ) ;
261+ assert . strictEqual ( srcData . ContentType , destData . ContentType ) ;
262+ assert . strictEqual ( srcData . VersionId , destData . VersionId ) ;
263+ assert . strictEqual (
264+ srcData . LastModified . toString ( ) ,
265+ destData . LastModified . toString ( ) ,
266+ ) ;
267+ if ( optionalFields ) {
268+ optionalFields . forEach ( field => {
269+ if ( field === 'Metadata' ) {
270+ assert . strictEqual ( srcData . customKey , destData . customKey ) ;
271+ } else {
272+ assert . strictEqual ( srcData [ field ] , destData [ field ] ) ;
273+ }
274+ } ) ;
275+ }
276+ console . log ( `[INGESTION] Object comparison completed successfully` ) ;
277+ return cb ( ) ;
278+ } catch ( comparisonError ) {
279+ console . error ( `[INGESTION] Object comparison assertion failed:` , comparisonError ) ;
280+ return cb ( comparisonError ) ;
185281 }
186- return cb ( ) ;
187282 } ) ;
188283 }
189284
0 commit comments