Skip to content

Commit

Permalink
fix: Fix light client sync tx type. (#2965)
Browse files Browse the repository at this point in the history
* fix: Fix light client sync tx type.
  • Loading branch information
yanguoyu authored Dec 5, 2023
1 parent cfbb392 commit e162cf3
Show file tree
Hide file tree
Showing 17 changed files with 829 additions and 820 deletions.
153 changes: 146 additions & 7 deletions packages/neuron-wallet/src/block-sync-renderer/sync/connector.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import { SyncAddressType } from '../../database/chain/entities/sync-progress'
import { Subject } from 'rxjs'
import { queue, QueueObject } from 'async'
import { Indexer as CkbIndexer, CellCollector } from '@ckb-lumos/ckb-indexer'
import { QueryOptions } from '@ckb-lumos/base'
import AddressMeta from '../../database/address/meta'
import { Address } from '../../models/address'
import { SyncAddressType } from '../../database/chain/entities/sync-progress'
import IndexerCacheService from './indexer-cache-service'
import logger from '../../utils/logger'
import IndexerTxHashCache from '../../database/chain/entities/indexer-tx-hash-cache'

export interface BlockTips {
cacheTipNumber: number
Expand Down Expand Up @@ -41,15 +49,146 @@ export interface AppendScript {
scriptType: CKBRPC.ScriptType
}

export abstract class Connector<TransactionsSubjectParam = unknown> {
abstract blockTipsSubject: Subject<BlockTips>
abstract transactionsSubject: Subject<{ txHashes: CKBComponents.Hash[]; params: TransactionsSubjectParam }>
export abstract class Connector {
public readonly blockTipsSubject: Subject<BlockTips> = new Subject<BlockTips>()
public readonly transactionsSubject = new Subject<{ txHashes: CKBComponents.Hash[]; params: string }>()
protected indexer: CkbIndexer
protected processNextBlockNumberQueue: QueueObject<void>
protected processingBlockNumber?: string
protected addressesByWalletId: Map<string, AddressMeta[]> = new Map()
protected pollingIndexer: boolean = false
private indexerQueryQueue: QueueObject<LumosCellQuery> | undefined

abstract connect(): Promise<void>
abstract notifyCurrentBlockNumberProcessed(param: TransactionsSubjectParam): void
abstract stop(): void
abstract getLiveCellsByScript(query: LumosCellQuery): Promise<unknown>
abstract processTxsInNextBlockNumber(): Promise<void>
protected abstract upsertTxHashes(): Promise<unknown>
public abstract notifyCurrentBlockNumberProcessed(blockNumber: string): Promise<void>
async appendScript(_scripts: AppendScript[]) {
// do nothing
}

constructor({ addresses, nodeUrl, indexerUrl }: { addresses: Address[]; nodeUrl: string; indexerUrl: string }) {
this.indexer = new CkbIndexer(nodeUrl, indexerUrl)
this.addressesByWalletId = addresses
.map(address => AddressMeta.fromObject(address))
.reduce((addressesByWalletId, addressMeta) => {
if (!addressesByWalletId.has(addressMeta.walletId)) {
addressesByWalletId.set(addressMeta.walletId, [])
}

const addressMetas = addressesByWalletId.get(addressMeta.walletId)
addressMetas!.push(addressMeta)

return addressesByWalletId
}, new Map<string, AddressMeta[]>())

this.processNextBlockNumberQueue = queue(async () => this.processTxsInNextBlockNumber(), 1)
this.processNextBlockNumberQueue.error((err: any) => {
logger.error(`Connector: \tError in processing next block number queue: ${err}`)
})

this.indexerQueryQueue = queue(async (query: any) => {
return await this.collectLiveCellsByScript(query)
})
}

public stop(): void {
this.pollingIndexer = false
}

protected async processNextBlockNumber() {
// the processNextBlockNumberQueue is a queue to ensure that ONLY one
// block processing task runs at a time to avoid the data conflict while syncing
this.processNextBlockNumberQueue?.push()
await this.processNextBlockNumberQueue?.drain()
}

protected async getTxHashesWithNextUnprocessedBlockNumber(): Promise<[string | undefined, string[]]> {
const txHashCachesByNextBlockNumberAndAddress = await Promise.all(
[...this.addressesByWalletId.keys()].map(async walletId =>
IndexerCacheService.nextUnprocessedTxsGroupedByBlockNumber(walletId)
)
)
const groupedTxHashCaches = txHashCachesByNextBlockNumberAndAddress.flat().reduce((grouped, txHashCache) => {
if (!grouped.get(txHashCache.blockNumber.toString())) {
grouped.set(txHashCache.blockNumber.toString(), [])
}
grouped.get(txHashCache.blockNumber.toString())!.push(txHashCache)

return grouped
}, new Map<string, Array<IndexerTxHashCache>>())

const nextUnprocessedBlockNumber = [...groupedTxHashCaches.keys()].sort((a, b) => parseInt(a) - parseInt(b)).shift()

if (!nextUnprocessedBlockNumber) {
return [undefined, []]
}

const txHashCachesInNextUnprocessedBlockNumber = groupedTxHashCaches.get(nextUnprocessedBlockNumber)

return [nextUnprocessedBlockNumber, txHashCachesInNextUnprocessedBlockNumber!.map(({ txHash }) => txHash)]
}

protected async notifyAndSyncNext(indexerTipNumber: number) {
const nextUnprocessedBlockNumber = await IndexerCacheService.nextUnprocessedBlock([
...this.addressesByWalletId.keys(),
])
if (nextUnprocessedBlockNumber) {
this.blockTipsSubject.next({
cacheTipNumber: parseInt(nextUnprocessedBlockNumber),
indexerTipNumber,
})
if (!this.processingBlockNumber) {
await this.processNextBlockNumber()
}
return true
}
this.blockTipsSubject.next({
cacheTipNumber: indexerTipNumber,
indexerTipNumber,
})
return false
}

public async getLiveCellsByScript(query: LumosCellQuery) {
return new Promise((resolve, reject) => {
this.indexerQueryQueue!.push(query, (err: any, result: unknown) => {
if (err) {
return reject(err)
}
resolve(result)
})
})
}

private async collectLiveCellsByScript(query: LumosCellQuery) {
const { lock, type, data } = query
if (!lock && !type) {
throw new Error('at least one parameter is required')
}

const queries: QueryOptions = {
...(lock ? { lock } : {}),
...(type ? { type } : {}),
data: data || 'any',
}

const collector = new CellCollector(this.indexer, queries)

const result = []
for await (const cell of collector.collect()) {
//somehow the lumos indexer returns an invalid hash type "lock" for hash type "data"
//for now we have to fix it here
const cellOutput = cell.cellOutput
// FIXME
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-expect-error
if (cellOutput.type?.hashType === 'lock') {
console.error('Unexpected hash type "lock" found with the query', JSON.stringify(queries))
cellOutput.type.hashType = 'data'
}
result.push(cell)
}
return result
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getConnection } from 'typeorm'
import { In, getConnection } from 'typeorm'
import { queue } from 'async'
import AddressMeta from '../../database/address/meta'
import IndexerTxHashCache from '../../database/chain/entities/indexer-tx-hash-cache'
Expand Down Expand Up @@ -27,19 +27,17 @@ export default class IndexerCacheService {
this.indexer = indexer
}

private async getTxHashes(): Promise<IndexerTxHashCache[]> {
private static async getTxHashes(walletIds: string[]): Promise<IndexerTxHashCache[]> {
return getConnection()
.getRepository(IndexerTxHashCache)
.createQueryBuilder()
.where({
walletId: this.walletId,
walletId: In(walletIds),
})
.getMany()
}

public static async nextUnprocessedBlock(
walletIds: string[]
): Promise<{ blockNumber: string; blockHash: string } | undefined> {
public static async nextUnprocessedBlock(walletIds: string[]): Promise<string | undefined> {
const result = await getConnection()
.getRepository(IndexerTxHashCache)
.createQueryBuilder()
Expand All @@ -51,10 +49,7 @@ export default class IndexerCacheService {
return
}

return {
blockNumber: result.blockNumber.toString(),
blockHash: result.blockHash,
}
return result.blockNumber.toString()
}

public static async updateCacheProcessed(txHash: string) {
Expand Down Expand Up @@ -183,7 +178,7 @@ export default class IndexerCacheService {
await this.saveCacheBlockNumber(tipBlockNumber)
return []
}
const txMetasCaches = await this.getTxHashes()
const txMetasCaches = await IndexerCacheService.getTxHashes([this.walletId])
const cachedTxHashes = txMetasCaches.map(meta => meta.txHash.toString())

const cachedTxHashesSet = new Set(cachedTxHashes)
Expand Down Expand Up @@ -218,7 +213,7 @@ export default class IndexerCacheService {

const indexerCaches: IndexerTxHashCache[] = []
for (const txWithStatus of txsWithStatus) {
const { transaction, txStatus } = txWithStatus
const { transaction } = txWithStatus
const mappings = mappingsByTxHash.get(transaction.hash!)
if (!mappings) {
continue
Expand All @@ -229,8 +224,6 @@ export default class IndexerCacheService {
IndexerTxHashCache.fromObject({
txHash: transaction.hash!,
blockNumber: parseInt(transaction.blockNumber!),
blockHash: txStatus.blockHash!,
blockTimestamp: transaction.timestamp!,
lockHash,
address,
walletId: this.walletId,
Expand All @@ -245,6 +238,44 @@ export default class IndexerCacheService {
return newTxHashes
}

public static async upsertIndexerCache(
txs: {
txHash: string
txIndex: string
blockNumber: string
lockHash: string
address: string
walletId: string
}[]
): Promise<string[]> {
if (!txs.length) {
return []
}
const walletIds = txs.map(v => v.walletId)
const txMetasCaches = await IndexerCacheService.getTxHashes(walletIds)
const cachedTxHashes = txMetasCaches.map(meta => meta.txHash.toString())

const cachedTxHashesSet = new Set(cachedTxHashes)

const newTxHashes = txs.filter(({ txHash }) => !cachedTxHashesSet.has(txHash))

if (!newTxHashes.length) {
return []
}
const indexerCaches: IndexerTxHashCache[] = newTxHashes.map(v =>
IndexerTxHashCache.fromObject({
txHash: v.txHash,
blockNumber: parseInt(v.blockNumber!),
lockHash: v.lockHash,
address: v.address,
walletId: v.walletId,
})
)
indexerCaches.sort((a, b) => a.blockNumber - b.blockNumber)
await getConnection().manager.save(indexerCaches, { chunk: 100 })
return newTxHashes.map(v => v.txHash)
}

public async updateProcessedTxHashes(blockNumber: string) {
await getConnection()
.createQueryBuilder()
Expand All @@ -259,13 +290,13 @@ export default class IndexerCacheService {
.execute()
}

public async nextUnprocessedTxsGroupedByBlockNumber(): Promise<IndexerTxHashCache[]> {
public static async nextUnprocessedTxsGroupedByBlockNumber(walletId: string): Promise<IndexerTxHashCache[]> {
const cache = await getConnection()
.getRepository(IndexerTxHashCache)
.createQueryBuilder()
.where({
isProcessed: false,
walletId: this.walletId,
walletId,
})
.orderBy('blockNumber', 'ASC')
.getOne()
Expand All @@ -281,7 +312,7 @@ export default class IndexerCacheService {
.where({
blockNumber,
isProcessed: false,
walletId: this.walletId,
walletId,
})
.getMany()
}
Expand Down
Loading

2 comments on commit e162cf3

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Packaging for test is done in 7097794866

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Packaging for test is done in 7097828944

Please sign in to comment.