@@ -98,6 +98,58 @@ class CctpFinalizerService extends RepeatableTask {
9898 for ( const burnEvent of burnEvents ) {
9999 await this . publishBurnEvent ( burnEvent ) ;
100100 }
101+
102+ // Process SponsoredDepositForBurn events in parallel
103+ // Query SponsoredDepositForBurn events that haven't been published yet
104+ // We check by looking for events that don't have a matching CctpFinalizerJob
105+ // with the same sponsoredDepositForBurnId
106+ const sponsoredQb = this . postgres
107+ . createQueryBuilder ( entities . SponsoredDepositForBurn , "sponsored" )
108+ . leftJoin (
109+ CctpFinalizerJob ,
110+ "job" ,
111+ "job.sponsoredDepositForBurnId = sponsored.id" ,
112+ )
113+ . where ( "job.id IS NULL" )
114+ . andWhere ( "sponsored.deletedAt IS NULL" ) ;
115+ const sponsoredEvents = await sponsoredQb . getMany ( ) ;
116+
117+ for ( const sponsored of sponsoredEvents ) {
118+ // Find the matching DepositForBurn event with the highest logIndex that's still lower
119+ // SponsoredDepositForBurn events come after DepositForBurn events in the same transaction.
120+ const matchingDepositQb = this . postgres
121+ . createQueryBuilder ( DepositForBurn , "burn" )
122+ . where ( "burn.transactionHash = :transactionHash" , {
123+ transactionHash : sponsored . transactionHash ,
124+ } )
125+ . andWhere ( "burn.chainId = :chainId" , {
126+ chainId : sponsored . chainId ,
127+ } )
128+ . andWhere ( "burn.logIndex < :logIndex" , {
129+ logIndex : sponsored . logIndex ,
130+ } )
131+ . andWhere ( "burn.deletedAt IS NULL" )
132+ . orderBy ( "burn.logIndex" , "DESC" )
133+ . limit ( 1 ) ;
134+ const matchingDeposit = await matchingDepositQb . getOne ( ) ;
135+
136+ if ( ! matchingDeposit ) {
137+ this . logger . debug ( {
138+ at : "CctpFinalizerService#taskLogic" ,
139+ message :
140+ "Skipping sponsored deposit for burn event - no matching DepositForBurn found" ,
141+ sponsoredId : sponsored . id ,
142+ transactionHash : sponsored . transactionHash ,
143+ } ) ;
144+ continue ;
145+ }
146+
147+ await this . publishBurnEvent (
148+ matchingDeposit ,
149+ sponsored . signature ,
150+ sponsored . id ,
151+ ) ;
152+ }
101153 } catch ( error ) {
102154 this . logger . error ( {
103155 at : "CctpFinalizerService#taskLogic" ,
@@ -114,7 +166,11 @@ class CctpFinalizerService extends RepeatableTask {
114166 return Promise . resolve ( ) ;
115167 }
116168
117- private async publishBurnEvent ( burnEvent : DepositForBurn ) {
169+ private async publishBurnEvent (
170+ burnEvent : DepositForBurn ,
171+ signature : string | null = null ,
172+ sponsoredDepositForBurnId ?: number ,
173+ ) {
118174 try {
119175 const { chainId, transactionHash, minFinalityThreshold, blockTimestamp } =
120176 burnEvent ;
@@ -197,16 +253,25 @@ class CctpFinalizerService extends RepeatableTask {
197253 message ,
198254 attestation ,
199255 destinationChainId ,
256+ signature || "0x" ,
200257 ) ;
201258
259+ const jobValues : {
260+ attestation : string ;
261+ message : string ;
262+ burnEventId : number ;
263+ sponsoredDepositForBurnId ?: number ;
264+ } = {
265+ attestation,
266+ message,
267+ burnEventId : burnEvent . id ,
268+ ...( sponsoredDepositForBurnId && { sponsoredDepositForBurnId } ) ,
269+ } ;
270+
202271 await this . postgres
203272 . createQueryBuilder ( CctpFinalizerJob , "j" )
204273 . insert ( )
205- . values ( {
206- attestation,
207- message,
208- burnEventId : burnEvent . id ,
209- } )
274+ . values ( jobValues )
210275 . orUpdate ( [ "attestation" ] , [ "burnEventId" ] )
211276 . execute ( ) ;
212277 } catch ( error ) {
0 commit comments