Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Periodic validation of pending transactions #3199

Merged
merged 4 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ export default class IndexerCacheService {
const txsWithStatus: TransactionWithStatus[] = []
const fetchBlockDetailsQueue = queue(async (hash: string) => {
const txWithStatus = await this.rpcService.getTransaction(hash)
if (!txWithStatus) {
if (!txWithStatus?.transaction) {
return
}
const blockHeader = await this.rpcService.getHeader(txWithStatus!.txStatus.blockHash!)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import TransactionWithStatus from '../models/chain/transaction-with-status'
import logger from '../utils/logger'
import { getConnection } from '../database/chain/connection'
import { interval } from 'rxjs'
import TxStatus from '../models/chain/tx-status'

type TransactionDetail = {
hash: string
Expand All @@ -18,7 +19,8 @@ type TransactionDetail = {
const getTransactionStatus = async (hash: string) => {
const network = NetworksService.getInstance().getCurrent()
const rpcService = new RpcService(network.remote, network.type)
const txWithStatus: TransactionWithStatus | undefined = await rpcService.getTransaction(hash)
const txWithStatus: TransactionWithStatus | undefined | { transaction: null; txStatus: TxStatus } =
await rpcService.getTransaction(hash)
if (!txWithStatus) {
return {
tx: txWithStatus,
Expand All @@ -33,6 +35,13 @@ const getTransactionStatus = async (hash: string) => {
blockHash: txWithStatus.txStatus.blockHash,
}
}
if (txWithStatus.txStatus.isRejected()) {
return {
status: TransactionStatus.Failed,
yanguoyu marked this conversation as resolved.
Show resolved Hide resolved
isRejected: true,
blockHash: null,
}
}
return {
tx: txWithStatus.transaction,
status: TransactionStatus.Pending,
Expand All @@ -41,23 +50,24 @@ const getTransactionStatus = async (hash: string) => {
}

const trackingStatus = async () => {
const pendingTransactions = await FailedTransaction.pendings()
const pendingOrFailedTransactions = await FailedTransaction.pendingOrFaileds()
await FailedTransaction.processAmendFailedTxs()

if (!pendingTransactions.length) {
if (!pendingOrFailedTransactions.length) {
return
}

const pendingHashes = pendingTransactions.map(tx => tx.hash)
const pendingOrFailedHashes = pendingOrFailedTransactions.map(tx => tx.hash)
const txs = await Promise.all(
pendingHashes.map(async hash => {
pendingOrFailedHashes.map(async hash => {
try {
const txWithStatus = await getTransactionStatus(hash)
return {
hash,
tx: txWithStatus.tx,
status: txWithStatus.status,
blockHash: txWithStatus.blockHash,
isRejected: !!txWithStatus.isRejected,
}
} catch (error) {
// ignore error, get failed skip current update
Expand All @@ -66,16 +76,26 @@ const trackingStatus = async () => {
)

const failedTxs = txs.filter(
(tx): tx is TransactionDetail & { status: TransactionStatus.Failed } => tx?.status === TransactionStatus.Failed
(tx): tx is (TransactionDetail | null) & { status: TransactionStatus.Failed; isRejected: boolean } =>
tx?.status === TransactionStatus.Failed
)
const successTxs = txs.filter(
(tx): tx is TransactionDetail & { status: TransactionStatus.Success } => tx?.status === TransactionStatus.Success
(tx): tx is (TransactionDetail | null) & { status: TransactionStatus.Success; isRejected: boolean } =>
yanguoyu marked this conversation as resolved.
Show resolved Hide resolved
tx?.status === TransactionStatus.Success
)
const rejectedTxs = txs.filter(
(tx): tx is (TransactionDetail | null) & { status: TransactionStatus.Failed; isRejected: boolean } =>
!!tx?.isRejected
)

if (failedTxs.length) {
await FailedTransaction.updateFailedTxs(failedTxs.map(tx => tx.hash))
}

if (rejectedTxs.length) {
await FailedTransaction.deleteFailedTxs(rejectedTxs.map(tx => tx.hash))
yanguoyu marked this conversation as resolved.
Show resolved Hide resolved
}

if (successTxs.length > 0) {
const network = NetworksService.getInstance().getCurrent()
const rpcService = new RpcService(network.remote, network.type)
Expand Down
5 changes: 5 additions & 0 deletions packages/neuron-wallet/src/models/chain/tx-status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export enum TxStatusType {
Pending = 'pending',
Proposed = 'proposed',
Committed = 'committed',
Rejected = 'rejected',
}

export default class TxStatus {
Expand All @@ -28,6 +29,10 @@ export default class TxStatus {
return this.status === TxStatusType.Committed
}

public isRejected(): boolean {
return this.status === TxStatusType.Rejected
}

public toSDK() {
return {
blockHash: this.blockHash,
Expand Down
18 changes: 9 additions & 9 deletions packages/neuron-wallet/src/services/rpc-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import TransactionWithStatus from '../models/chain/transaction-with-status'
import logger from '../utils/logger'
import { generateRPC } from '../utils/ckb-rpc'
import { NetworkType } from '../models/network'
import TxStatus, { TxStatusType } from '../models/chain/tx-status'

export default class RpcService {
private retryTime: number
Expand All @@ -29,20 +30,19 @@ export default class RpcService {
return BlockHeader.fromSDK(result)
}

/**
* TODO: rejected tx should be handled
* {
* transaction: null,
* txStatus: { blockHash: null, status: 'rejected' }
* }
*/
public async getTransaction(hash: string): Promise<TransactionWithStatus | undefined> {
public async getTransaction(
hash: string
): Promise<TransactionWithStatus | undefined | { transaction: null; txStatus: TxStatus }> {
const result = await this.rpc.getTransaction(hash)
if (result?.transaction) {
return TransactionWithStatus.fromSDK(result)
}
if ((result.txStatus as any) === 'rejected') {
if (result.txStatus.status === TxStatusType.Rejected) {
logger.warn(`Transaction[${hash}] was rejected`)
return {
transaction: null,
txStatus: TxStatus.fromSDK(result.txStatus),
}
}
return undefined
}
Expand Down
56 changes: 30 additions & 26 deletions packages/neuron-wallet/src/services/tx/failed-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,41 @@ import { TransactionStatus } from '../../models/chain/transaction'
import AmendTransactionEntity from '../../database/chain/entities/amend-transaction'

export class FailedTransaction {
public static pendings = async (): Promise<TransactionEntity[]> => {
const pendingTransactions = await getConnection()
public static pendingOrFaileds = async (): Promise<TransactionEntity[]> => {
const transactions = await getConnection()
.getRepository(TransactionEntity)
.createQueryBuilder('tx')
.where({
status: TransactionStatus.Pending,
status: In([TransactionStatus.Pending, TransactionStatus.Failed]),
})
.getMany()

return pendingTransactions
return transactions
}

public static deleteFailedTxs = async (hashes: string[]) => {
await getConnection().manager.transaction(async transactionalEntityManager => {
await transactionalEntityManager
.createQueryBuilder()
.delete()
.from(TransactionEntity)
.where({ hash: In(hashes) })
.execute()

await transactionalEntityManager
.createQueryBuilder()
.delete()
.from(OutputEntity)
.where({ outPointTxHash: In(hashes) })
.execute()

await transactionalEntityManager
.createQueryBuilder()
.delete()
.from(InputEntity)
.where({ outPointTxHash: In(hashes) })
.execute()
})
}

public static processAmendFailedTxs = async () => {
Expand Down Expand Up @@ -56,28 +81,7 @@ export class FailedTransaction {
}
})

await getConnection().manager.transaction(async transactionalEntityManager => {
await transactionalEntityManager
.createQueryBuilder()
.delete()
.from(TransactionEntity)
.where({ hash: In(removeTxs) })
.execute()

await transactionalEntityManager
.createQueryBuilder()
.delete()
.from(OutputEntity)
.where({ outPointTxHash: In(removeTxs) })
.execute()

await transactionalEntityManager
.createQueryBuilder()
.delete()
.from(InputEntity)
.where({ outPointTxHash: In(removeTxs) })
.execute()
})
await FailedTransaction.deleteFailedTxs(removeTxs)
}

// update tx status to TransactionStatus.Failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export class TransactionGenerator {
if (!nftCell) return

const nftTx = await this.getRpcService().getTransaction(outPoint.txHash)
const nftOriginalOutputData = nftTx?.transaction.outputsData[Number(outPoint.index)]
const nftOriginalOutputData = nftTx?.transaction?.outputsData[Number(outPoint.index)]
if (!nftOriginalOutputData) return

nftCell.data = nftOriginalOutputData
Expand Down