Skip to content

Commit

Permalink
feat: Periodic validation of pending transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
devchenyan committed Jun 30, 2024
1 parent 4493d79 commit ad28a09
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import TransactionWithStatus from '../models/chain/transaction-with-status'
import logger from '../utils/logger'
import { getConnection } from '../database/chain/connection'
import { interval } from 'rxjs'
import TxStatus from '../models/chain/tx-status'

type TransactionDetail = {
hash: string
Expand All @@ -18,14 +19,22 @@ type TransactionDetail = {
const getTransactionStatus = async (hash: string) => {
const network = NetworksService.getInstance().getCurrent()
const rpcService = new RpcService(network.remote, network.type)
const txWithStatus: TransactionWithStatus | undefined = await rpcService.getTransaction(hash)
const txWithStatus: TransactionWithStatus | undefined | { transaction: null; txStatus: TxStatus } =
await rpcService.getTransactionIncludeRejected(hash)
if (!txWithStatus) {
return {
tx: txWithStatus,
status: TransactionStatus.Failed,
blockHash: null,
}
}
if (txWithStatus.txStatus.isRejected()) {
return {
status: TransactionStatus.Failed,
isRejected: true,
blockHash: null,
}
}
if (txWithStatus.txStatus.isCommitted()) {
return {
tx: txWithStatus.transaction,
Expand All @@ -41,23 +50,24 @@ const getTransactionStatus = async (hash: string) => {
}

const trackingStatus = async () => {
const pendingTransactions = await FailedTransaction.pendings()
const pendingOrFailedTransactions = await FailedTransaction.pendingOrFaileds()
await FailedTransaction.processAmendFailedTxs()

if (!pendingTransactions.length) {
if (!pendingOrFailedTransactions.length) {
return
}

const pendingHashes = pendingTransactions.map(tx => tx.hash)
const pendingOrFailedHashes = pendingOrFailedTransactions.map(tx => tx.hash)
const txs = await Promise.all(
pendingHashes.map(async hash => {
pendingOrFailedHashes.map(async hash => {
try {
const txWithStatus = await getTransactionStatus(hash)
return {
hash,
tx: txWithStatus.tx,
status: txWithStatus.status,
blockHash: txWithStatus.blockHash,
isRejected: txWithStatus.isRejected,
}
} catch (error) {
// ignore error, get failed skip current update
Expand All @@ -66,16 +76,27 @@ const trackingStatus = async () => {
)

const failedTxs = txs.filter(
(tx): tx is TransactionDetail & { status: TransactionStatus.Failed } => tx?.status === TransactionStatus.Failed
(tx): tx is TransactionDetail & { status: TransactionStatus.Failed; isRejected: undefined | boolean } =>
tx?.status === TransactionStatus.Failed
)
const successTxs = txs.filter(
(tx): tx is TransactionDetail & { status: TransactionStatus.Success } => tx?.status === TransactionStatus.Success
(tx): tx is TransactionDetail & { status: TransactionStatus.Success; isRejected: undefined | boolean } =>
tx?.status === TransactionStatus.Success
)

const rejectedTxs = txs.filter(
(tx): tx is TransactionDetail & { status: TransactionStatus.Failed; isRejected: undefined | boolean } =>
!!tx?.isRejected
)

if (failedTxs.length) {
await FailedTransaction.updateFailedTxs(failedTxs.map(tx => tx.hash))
}

if (rejectedTxs.length) {
await FailedTransaction.deleteFailedTxs(rejectedTxs.map(tx => tx.hash))
}

if (successTxs.length > 0) {
const network = NetworksService.getInstance().getCurrent()
const rpcService = new RpcService(network.remote, network.type)
Expand Down
5 changes: 5 additions & 0 deletions packages/neuron-wallet/src/models/chain/tx-status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export enum TxStatusType {
Pending = 'pending',
Proposed = 'proposed',
Committed = 'committed',
Rejected = 'rejected',
}

export default class TxStatus {
Expand All @@ -28,6 +29,10 @@ export default class TxStatus {
return this.status === TxStatusType.Committed
}

public isRejected(): boolean {
return this.status === TxStatusType.Rejected
}

public toSDK() {
return {
blockHash: this.blockHash,
Expand Down
20 changes: 19 additions & 1 deletion packages/neuron-wallet/src/services/rpc-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import TransactionWithStatus from '../models/chain/transaction-with-status'
import logger from '../utils/logger'
import { generateRPC } from '../utils/ckb-rpc'
import { NetworkType } from '../models/network'
import TxStatus, { TxStatusType } from '../models/chain/tx-status'

export default class RpcService {
private retryTime: number
Expand All @@ -29,6 +30,23 @@ export default class RpcService {
return BlockHeader.fromSDK(result)
}

public async getTransactionIncludeRejected(
hash: string
): Promise<TransactionWithStatus | undefined | { transaction: null; txStatus: TxStatus }> {
const result = await this.rpc.getTransaction(hash)
if (result?.transaction) {
return TransactionWithStatus.fromSDK(result)
}
if (result.txStatus.status === TxStatusType.Rejected) {
logger.warn(`Transaction[${hash}] was rejected`)
return {
transaction: null,
txStatus: TxStatus.fromSDK(result.txStatus),
}
}
return undefined
}

/**
* TODO: rejected tx should be handled
* {
Expand All @@ -41,7 +59,7 @@ export default class RpcService {
if (result?.transaction) {
return TransactionWithStatus.fromSDK(result)
}
if ((result.txStatus as any) === 'rejected') {
if (result.txStatus.status === TxStatusType.Rejected) {
logger.warn(`Transaction[${hash}] was rejected`)
}
return undefined
Expand Down
56 changes: 30 additions & 26 deletions packages/neuron-wallet/src/services/tx/failed-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,41 @@ import { TransactionStatus } from '../../models/chain/transaction'
import AmendTransactionEntity from '../../database/chain/entities/amend-transaction'

export class FailedTransaction {
public static pendings = async (): Promise<TransactionEntity[]> => {
const pendingTransactions = await getConnection()
public static pendingOrFaileds = async (): Promise<TransactionEntity[]> => {
const transactions = await getConnection()
.getRepository(TransactionEntity)
.createQueryBuilder('tx')
.where({
status: TransactionStatus.Pending,
status: In([TransactionStatus.Pending, TransactionStatus.Failed]),
})
.getMany()

return pendingTransactions
return transactions
}

public static deleteFailedTxs = async (hashes: string[]) => {
await getConnection().manager.transaction(async transactionalEntityManager => {
await transactionalEntityManager
.createQueryBuilder()
.delete()
.from(TransactionEntity)
.where({ hash: In(hashes) })
.execute()

await transactionalEntityManager
.createQueryBuilder()
.delete()
.from(OutputEntity)
.where({ outPointTxHash: In(hashes) })
.execute()

await transactionalEntityManager
.createQueryBuilder()
.delete()
.from(InputEntity)
.where({ outPointTxHash: In(hashes) })
.execute()
})
}

public static processAmendFailedTxs = async () => {
Expand Down Expand Up @@ -56,28 +81,7 @@ export class FailedTransaction {
}
})

await getConnection().manager.transaction(async transactionalEntityManager => {
await transactionalEntityManager
.createQueryBuilder()
.delete()
.from(TransactionEntity)
.where({ hash: In(removeTxs) })
.execute()

await transactionalEntityManager
.createQueryBuilder()
.delete()
.from(OutputEntity)
.where({ outPointTxHash: In(removeTxs) })
.execute()

await transactionalEntityManager
.createQueryBuilder()
.delete()
.from(InputEntity)
.where({ outPointTxHash: In(removeTxs) })
.execute()
})
FailedTransaction.deleteFailedTxs(removeTxs)
}

// update tx status to TransactionStatus.Failed
Expand Down

1 comment on commit ad28a09

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Packaging for test is done in 9732847118

Please sign in to comment.